Skip to main content

helios_persistence/backends/sqlite/
bulk_submit.rs

1//! Bulk submit implementation for SQLite backend.
2
3use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use rusqlite::params;
7use serde_json::Value;
8use tokio::io::{AsyncBufRead, AsyncBufReadExt};
9use uuid::Uuid;
10
11use crate::core::ResourceStorage;
12use crate::core::bulk_submit::{
13    BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
14    BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
15    StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
16    SubmissionManifest, SubmissionStatus, SubmissionSummary,
17};
18use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
19use crate::tenant::TenantContext;
20
21use super::SqliteBackend;
22
23fn internal_error(message: String) -> StorageError {
24    StorageError::Backend(BackendError::Internal {
25        backend_name: "sqlite".to_string(),
26        message,
27        source: None,
28    })
29}
30
31#[async_trait]
32impl BulkSubmitProvider for SqliteBackend {
33    async fn create_submission(
34        &self,
35        tenant: &TenantContext,
36        id: &SubmissionId,
37        metadata: Option<Value>,
38    ) -> StorageResult<SubmissionSummary> {
39        let conn = self.get_connection()?;
40        let tenant_id = tenant.tenant_id().as_str();
41
42        // Check for duplicate
43        let exists: bool = conn
44            .query_row(
45                "SELECT 1 FROM bulk_submissions
46                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
47                params![tenant_id, &id.submitter, &id.submission_id],
48                |_| Ok(true),
49            )
50            .unwrap_or(false);
51
52        if exists {
53            return Err(StorageError::BulkSubmit(
54                BulkSubmitError::DuplicateSubmission {
55                    submitter: id.submitter.clone(),
56                    submission_id: id.submission_id.clone(),
57                },
58            ));
59        }
60
61        let now = Utc::now();
62        let now_str = now.to_rfc3339();
63        let metadata_bytes = metadata.as_ref().and_then(|m| serde_json::to_vec(m).ok());
64
65        conn.execute(
66            "INSERT INTO bulk_submissions
67             (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
68             VALUES (?1, ?2, ?3, 'in-progress', ?4, ?5, ?6)",
69            params![
70                tenant_id,
71                &id.submitter,
72                &id.submission_id,
73                now_str,
74                now_str,
75                metadata_bytes
76            ],
77        )
78        .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
79
80        Ok(SubmissionSummary {
81            id: id.clone(),
82            status: SubmissionStatus::InProgress,
83            created_at: now,
84            updated_at: now,
85            completed_at: None,
86            manifest_count: 0,
87            total_entries: 0,
88            success_count: 0,
89            error_count: 0,
90            skipped_count: 0,
91            metadata,
92        })
93    }
94
95    async fn get_submission(
96        &self,
97        tenant: &TenantContext,
98        id: &SubmissionId,
99    ) -> StorageResult<Option<SubmissionSummary>> {
100        let conn = self.get_connection()?;
101        let tenant_id = tenant.tenant_id().as_str();
102
103        let result = conn.query_row(
104            "SELECT status, created_at, updated_at, completed_at, metadata
105             FROM bulk_submissions
106             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
107            params![tenant_id, &id.submitter, &id.submission_id],
108            |row| {
109                Ok((
110                    row.get::<_, String>(0)?,
111                    row.get::<_, String>(1)?,
112                    row.get::<_, String>(2)?,
113                    row.get::<_, Option<String>>(3)?,
114                    row.get::<_, Option<Vec<u8>>>(4)?,
115                ))
116            },
117        );
118
119        match result {
120            Ok((status_str, created_at, updated_at, completed_at, metadata_bytes)) => {
121                let status: SubmissionStatus = status_str
122                    .parse()
123                    .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
124
125                let created_at = chrono::DateTime::parse_from_rfc3339(&created_at)
126                    .map_err(|e| internal_error(format!("Invalid created_at: {}", e)))?
127                    .with_timezone(&Utc);
128
129                let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at)
130                    .map_err(|e| internal_error(format!("Invalid updated_at: {}", e)))?
131                    .with_timezone(&Utc);
132
133                let completed_at = completed_at.and_then(|s| {
134                    chrono::DateTime::parse_from_rfc3339(&s)
135                        .ok()
136                        .map(|dt| dt.with_timezone(&Utc))
137                });
138
139                let metadata = metadata_bytes.and_then(|b| serde_json::from_slice(&b).ok());
140
141                // Get manifest count
142                let manifest_count: i32 = conn
143                    .query_row(
144                        "SELECT COUNT(*) FROM bulk_manifests
145                         WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
146                        params![tenant_id, &id.submitter, &id.submission_id],
147                        |row| row.get(0),
148                    )
149                    .unwrap_or(0);
150
151                // Get aggregated counts from entry results
152                let (total, success, errors, skipped): (i64, i64, i64, i64) = conn
153                    .query_row(
154                        "SELECT
155                            COUNT(*),
156                            SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
157                            SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
158                            SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
159                         FROM bulk_entry_results
160                         WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
161                        params![tenant_id, &id.submitter, &id.submission_id],
162                        |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
163                    )
164                    .unwrap_or((0, 0, 0, 0));
165
166                Ok(Some(SubmissionSummary {
167                    id: id.clone(),
168                    status,
169                    created_at,
170                    updated_at,
171                    completed_at,
172                    manifest_count: manifest_count as u32,
173                    total_entries: total as u64,
174                    success_count: success as u64,
175                    error_count: errors as u64,
176                    skipped_count: skipped as u64,
177                    metadata,
178                }))
179            }
180            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
181            Err(e) => Err(internal_error(format!("Failed to get submission: {}", e))),
182        }
183    }
184
185    async fn list_submissions(
186        &self,
187        tenant: &TenantContext,
188        submitter: Option<&str>,
189        status: Option<SubmissionStatus>,
190        limit: u32,
191        offset: u32,
192    ) -> StorageResult<Vec<SubmissionSummary>> {
193        // Collect IDs first, then drop the connection before calling async methods
194        let ids: Vec<(String, String)> = {
195            let conn = self.get_connection()?;
196            let tenant_id = tenant.tenant_id().as_str();
197
198            let (query, params): (String, Vec<String>) = {
199                let mut query =
200                    "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = ?1"
201                        .to_string();
202                let mut params = vec![tenant_id.to_string()];
203
204                if let Some(submitter) = submitter {
205                    query.push_str(" AND submitter = ?2");
206                    params.push(submitter.to_string());
207                }
208
209                if let Some(status) = status {
210                    let param_num = params.len() + 1;
211                    query.push_str(&format!(" AND status = ?{}", param_num));
212                    params.push(status.to_string());
213                }
214
215                query.push_str(" ORDER BY created_at DESC");
216                query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
217
218                (query, params)
219            };
220
221            let mut stmt = conn
222                .prepare(&query)
223                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
224
225            let params_refs: Vec<&dyn rusqlite::ToSql> =
226                params.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
227
228            stmt.query_map(params_refs.as_slice(), |row| {
229                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
230            })
231            .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?
232            .filter_map(|r| r.ok())
233            .collect()
234        };
235
236        let mut results = Vec::new();
237        for (submitter, submission_id) in ids {
238            let sub_id = SubmissionId::new(submitter, submission_id);
239            if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
240                results.push(summary);
241            }
242        }
243
244        Ok(results)
245    }
246
247    async fn complete_submission(
248        &self,
249        tenant: &TenantContext,
250        id: &SubmissionId,
251    ) -> StorageResult<SubmissionSummary> {
252        let conn = self.get_connection()?;
253        let tenant_id = tenant.tenant_id().as_str();
254
255        // Check current status
256        let current_status: String = conn
257            .query_row(
258                "SELECT status FROM bulk_submissions
259                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
260                params![tenant_id, &id.submitter, &id.submission_id],
261                |row| row.get(0),
262            )
263            .map_err(|e| {
264                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
265                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
266                        submitter: id.submitter.clone(),
267                        submission_id: id.submission_id.clone(),
268                    })
269                } else {
270                    internal_error(format!("Failed to get submission status: {}", e))
271                }
272            })?;
273
274        if current_status != "in-progress" {
275            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
276                submission_id: id.submission_id.clone(),
277            }));
278        }
279
280        let now = Utc::now().to_rfc3339();
281        conn.execute(
282            "UPDATE bulk_submissions SET status = 'complete', completed_at = ?1, updated_at = ?2
283             WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
284            params![now, now, tenant_id, &id.submitter, &id.submission_id],
285        )
286        .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
287
288        self.get_submission(tenant, id)
289            .await?
290            .ok_or_else(|| internal_error("Submission disappeared".to_string()))
291    }
292
293    async fn abort_submission(
294        &self,
295        tenant: &TenantContext,
296        id: &SubmissionId,
297        _reason: &str,
298    ) -> StorageResult<u64> {
299        let conn = self.get_connection()?;
300        let tenant_id = tenant.tenant_id().as_str();
301
302        // Check current status
303        let current_status: String = conn
304            .query_row(
305                "SELECT status FROM bulk_submissions
306                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
307                params![tenant_id, &id.submitter, &id.submission_id],
308                |row| row.get(0),
309            )
310            .map_err(|e| {
311                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
312                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
313                        submitter: id.submitter.clone(),
314                        submission_id: id.submission_id.clone(),
315                    })
316                } else {
317                    internal_error(format!("Failed to get submission status: {}", e))
318                }
319            })?;
320
321        if current_status != "in-progress" {
322            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
323                submission_id: id.submission_id.clone(),
324            }));
325        }
326
327        // Count pending manifests
328        let pending_count: i64 = conn
329            .query_row(
330                "SELECT COUNT(*) FROM bulk_manifests
331                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
332                 AND status IN ('pending', 'processing')",
333                params![tenant_id, &id.submitter, &id.submission_id],
334                |row| row.get(0),
335            )
336            .unwrap_or(0);
337
338        let now = Utc::now().to_rfc3339();
339
340        // Update submission status
341        conn.execute(
342            "UPDATE bulk_submissions SET status = 'aborted', completed_at = ?1, updated_at = ?2
343             WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
344            params![now, now, tenant_id, &id.submitter, &id.submission_id],
345        )
346        .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
347
348        // Update pending manifests to failed
349        conn.execute(
350            "UPDATE bulk_manifests SET status = 'failed'
351             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
352             AND status IN ('pending', 'processing')",
353            params![tenant_id, &id.submitter, &id.submission_id],
354        )
355        .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
356
357        Ok(pending_count as u64)
358    }
359
360    async fn add_manifest(
361        &self,
362        tenant: &TenantContext,
363        submission_id: &SubmissionId,
364        manifest_url: Option<&str>,
365        replaces_manifest_url: Option<&str>,
366    ) -> StorageResult<SubmissionManifest> {
367        let conn = self.get_connection()?;
368        let tenant_id = tenant.tenant_id().as_str();
369
370        // Check submission exists and is in progress
371        let status: String = conn
372            .query_row(
373                "SELECT status FROM bulk_submissions
374                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
375                params![
376                    tenant_id,
377                    &submission_id.submitter,
378                    &submission_id.submission_id
379                ],
380                |row| row.get(0),
381            )
382            .map_err(|e| {
383                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
384                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
385                        submitter: submission_id.submitter.clone(),
386                        submission_id: submission_id.submission_id.clone(),
387                    })
388                } else {
389                    internal_error(format!("Failed to get submission: {}", e))
390                }
391            })?;
392
393        if status != "in-progress" {
394            return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
395                submission_id: submission_id.submission_id.clone(),
396                expected: "in-progress".to_string(),
397                actual: status,
398            }));
399        }
400
401        let manifest_id = Uuid::new_v4().to_string();
402        let now = Utc::now();
403        let now_str = now.to_rfc3339();
404
405        conn.execute(
406            "INSERT INTO bulk_manifests
407             (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
408             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
409            params![
410                tenant_id,
411                &submission_id.submitter,
412                &submission_id.submission_id,
413                manifest_id,
414                manifest_url,
415                replaces_manifest_url,
416                now_str
417            ],
418        )
419        .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
420
421        // Update submission updated_at
422        conn.execute(
423            "UPDATE bulk_submissions SET updated_at = ?1
424             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
425            params![
426                now_str,
427                tenant_id,
428                &submission_id.submitter,
429                &submission_id.submission_id
430            ],
431        )
432        .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
433
434        Ok(SubmissionManifest {
435            manifest_id,
436            manifest_url: manifest_url.map(String::from),
437            replaces_manifest_url: replaces_manifest_url.map(String::from),
438            status: ManifestStatus::Pending,
439            added_at: now,
440            total_entries: 0,
441            processed_entries: 0,
442            failed_entries: 0,
443        })
444    }
445
446    async fn get_manifest(
447        &self,
448        tenant: &TenantContext,
449        submission_id: &SubmissionId,
450        manifest_id: &str,
451    ) -> StorageResult<Option<SubmissionManifest>> {
452        let conn = self.get_connection()?;
453        let tenant_id = tenant.tenant_id().as_str();
454
455        let result = conn.query_row(
456            "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
457             FROM bulk_manifests
458             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
459            params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
460            |row| {
461                Ok((
462                    row.get::<_, Option<String>>(0)?,
463                    row.get::<_, Option<String>>(1)?,
464                    row.get::<_, String>(2)?,
465                    row.get::<_, String>(3)?,
466                    row.get::<_, i64>(4)?,
467                    row.get::<_, i64>(5)?,
468                    row.get::<_, i64>(6)?,
469                ))
470            },
471        );
472
473        match result {
474            Ok((
475                manifest_url,
476                replaces_manifest_url,
477                status_str,
478                added_at,
479                total,
480                processed,
481                failed,
482            )) => {
483                let status: ManifestStatus = status_str.parse().map_err(|_| {
484                    internal_error(format!("Invalid manifest status: {}", status_str))
485                })?;
486
487                let added_at = chrono::DateTime::parse_from_rfc3339(&added_at)
488                    .map_err(|e| internal_error(format!("Invalid added_at: {}", e)))?
489                    .with_timezone(&Utc);
490
491                Ok(Some(SubmissionManifest {
492                    manifest_id: manifest_id.to_string(),
493                    manifest_url,
494                    replaces_manifest_url,
495                    status,
496                    added_at,
497                    total_entries: total as u64,
498                    processed_entries: processed as u64,
499                    failed_entries: failed as u64,
500                }))
501            }
502            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
503            Err(e) => Err(internal_error(format!("Failed to get manifest: {}", e))),
504        }
505    }
506
507    async fn list_manifests(
508        &self,
509        tenant: &TenantContext,
510        submission_id: &SubmissionId,
511    ) -> StorageResult<Vec<SubmissionManifest>> {
512        // Collect IDs first, then drop the connection before calling async methods
513        let manifest_ids: Vec<String> = {
514            let conn = self.get_connection()?;
515            let tenant_id = tenant.tenant_id().as_str();
516
517            let mut stmt = conn
518                .prepare(
519                    "SELECT manifest_id FROM bulk_manifests
520                     WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
521                     ORDER BY added_at",
522                )
523                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
524
525            stmt.query_map(
526                params![
527                    tenant_id,
528                    &submission_id.submitter,
529                    &submission_id.submission_id
530                ],
531                |row| row.get(0),
532            )
533            .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?
534            .filter_map(|r| r.ok())
535            .collect()
536        };
537
538        let mut results = Vec::new();
539        for manifest_id in manifest_ids {
540            if let Some(manifest) = self
541                .get_manifest(tenant, submission_id, &manifest_id)
542                .await?
543            {
544                results.push(manifest);
545            }
546        }
547
548        Ok(results)
549    }
550
551    async fn process_entries(
552        &self,
553        tenant: &TenantContext,
554        submission_id: &SubmissionId,
555        manifest_id: &str,
556        entries: Vec<NdjsonEntry>,
557        options: &BulkProcessingOptions,
558    ) -> StorageResult<Vec<BulkEntryResult>> {
559        let conn = self.get_connection()?;
560        let tenant_id = tenant.tenant_id().as_str();
561
562        // Verify manifest exists
563        if self
564            .get_manifest(tenant, submission_id, manifest_id)
565            .await?
566            .is_none()
567        {
568            return Err(StorageError::BulkSubmit(
569                BulkSubmitError::ManifestNotFound {
570                    submission_id: submission_id.submission_id.clone(),
571                    manifest_id: manifest_id.to_string(),
572                },
573            ));
574        }
575
576        // Update manifest status to processing
577        conn.execute(
578            "UPDATE bulk_manifests SET status = 'processing'
579             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
580            params![
581                tenant_id,
582                &submission_id.submitter,
583                &submission_id.submission_id,
584                manifest_id
585            ],
586        )
587        .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
588
589        let mut results = Vec::new();
590        let mut error_count = 0u32;
591
592        for entry in entries {
593            // Check if we've hit max errors
594            if options.max_errors > 0 && error_count >= options.max_errors {
595                if !options.continue_on_error {
596                    return Err(StorageError::BulkSubmit(
597                        BulkSubmitError::MaxErrorsExceeded {
598                            submission_id: submission_id.submission_id.clone(),
599                            max_errors: options.max_errors,
600                        },
601                    ));
602                }
603                // Skip remaining entries
604                let skip_result = BulkEntryResult::skipped(
605                    entry.line_number,
606                    &entry.resource_type,
607                    "max errors exceeded",
608                );
609                results.push(skip_result);
610                continue;
611            }
612
613            // Process the entry
614            let result = self
615                .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
616                .await;
617
618            let entry_result = match result {
619                Ok(r) => r,
620                Err(e) => {
621                    error_count += 1;
622                    BulkEntryResult::processing_error(
623                        entry.line_number,
624                        &entry.resource_type,
625                        serde_json::json!({
626                            "resourceType": "OperationOutcome",
627                            "issue": [{
628                                "severity": "error",
629                                "code": "exception",
630                                "diagnostics": e.to_string()
631                            }]
632                        }),
633                    )
634                }
635            };
636
637            if entry_result.is_error() {
638                error_count += 1;
639            }
640
641            // Store the result
642            self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
643                .await?;
644
645            results.push(entry_result);
646        }
647
648        // Update manifest counts
649        let now = Utc::now().to_rfc3339();
650        conn.execute(
651            "UPDATE bulk_manifests SET
652                total_entries = total_entries + ?1,
653                processed_entries = processed_entries + ?2,
654                failed_entries = failed_entries + ?3
655             WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
656            params![
657                results.len() as i64,
658                results.iter().filter(|r| r.is_success()).count() as i64,
659                error_count as i64,
660                tenant_id,
661                &submission_id.submitter,
662                &submission_id.submission_id,
663                manifest_id
664            ],
665        )
666        .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
667
668        // Update submission updated_at
669        conn.execute(
670            "UPDATE bulk_submissions SET updated_at = ?1
671             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
672            params![
673                now,
674                tenant_id,
675                &submission_id.submitter,
676                &submission_id.submission_id
677            ],
678        )
679        .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
680
681        Ok(results)
682    }
683
684    async fn get_entry_results(
685        &self,
686        tenant: &TenantContext,
687        submission_id: &SubmissionId,
688        manifest_id: &str,
689        outcome_filter: Option<BulkEntryOutcome>,
690        limit: u32,
691        offset: u32,
692    ) -> StorageResult<Vec<BulkEntryResult>> {
693        let conn = self.get_connection()?;
694        let tenant_id = tenant.tenant_id().as_str();
695
696        let mut query =
697            "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
698             FROM bulk_entry_results
699             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4"
700                .to_string();
701
702        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
703            Box::new(tenant_id.to_string()),
704            Box::new(submission_id.submitter.clone()),
705            Box::new(submission_id.submission_id.clone()),
706            Box::new(manifest_id.to_string()),
707        ];
708
709        if let Some(outcome) = outcome_filter {
710            query.push_str(" AND outcome = ?");
711            params_vec.push(Box::new(outcome.to_string()));
712        }
713
714        query.push_str(" ORDER BY line_number");
715        query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
716
717        let params_slice: Vec<&dyn rusqlite::ToSql> =
718            params_vec.iter().map(|p| p.as_ref()).collect();
719
720        let mut stmt = conn
721            .prepare(&query)
722            .map_err(|e| internal_error(format!("Failed to prepare results query: {}", e)))?;
723
724        let results: Vec<BulkEntryResult> = stmt
725            .query_map(params_slice.as_slice(), |row| {
726                let line_number: i64 = row.get(0)?;
727                let resource_type: String = row.get(1)?;
728                let resource_id: Option<String> = row.get(2)?;
729                let created: Option<i32> = row.get(3)?;
730                let outcome_str: String = row.get(4)?;
731                let operation_outcome_bytes: Option<Vec<u8>> = row.get(5)?;
732
733                let outcome: BulkEntryOutcome = outcome_str
734                    .parse()
735                    .unwrap_or(BulkEntryOutcome::ProcessingError);
736
737                let operation_outcome =
738                    operation_outcome_bytes.and_then(|b| serde_json::from_slice(&b).ok());
739
740                Ok(BulkEntryResult {
741                    line_number: line_number as u64,
742                    resource_type,
743                    resource_id,
744                    created: created.map(|c| c != 0).unwrap_or(false),
745                    outcome,
746                    operation_outcome,
747                })
748            })
749            .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?
750            .filter_map(|r| r.ok())
751            .collect();
752
753        Ok(results)
754    }
755
756    async fn get_entry_counts(
757        &self,
758        tenant: &TenantContext,
759        submission_id: &SubmissionId,
760        manifest_id: &str,
761    ) -> StorageResult<EntryCountSummary> {
762        let conn = self.get_connection()?;
763        let tenant_id = tenant.tenant_id().as_str();
764
765        let (total, success, validation_error, processing_error, skipped): (i64, i64, i64, i64, i64) = conn
766            .query_row(
767                "SELECT
768                    COUNT(*),
769                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
770                    SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
771                    SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
772                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
773                 FROM bulk_entry_results
774                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
775                params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
776                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
777            )
778            .unwrap_or((0, 0, 0, 0, 0));
779
780        Ok(EntryCountSummary {
781            total: total as u64,
782            success: success as u64,
783            validation_error: validation_error as u64,
784            processing_error: processing_error as u64,
785            skipped: skipped as u64,
786        })
787    }
788}
789
790impl SqliteBackend {
791    /// Process a single NDJSON entry.
792    async fn process_single_entry(
793        &self,
794        tenant: &TenantContext,
795        submission_id: &SubmissionId,
796        manifest_id: &str,
797        entry: &NdjsonEntry,
798        options: &BulkProcessingOptions,
799    ) -> StorageResult<BulkEntryResult> {
800        // Check if resource has an ID
801        let resource_id = entry.resource_id.as_ref();
802
803        if let Some(id) = resource_id {
804            // Check if resource exists
805            let existing = self.read(tenant, &entry.resource_type, id).await;
806
807            match existing {
808                Ok(Some(current)) => {
809                    // Resource exists - update if allowed
810                    if !options.allow_updates {
811                        return Ok(BulkEntryResult::skipped(
812                            entry.line_number,
813                            &entry.resource_type,
814                            "updates not allowed",
815                        ));
816                    }
817
818                    // Record change for rollback
819                    let change = SubmissionChange::update(
820                        manifest_id,
821                        &entry.resource_type,
822                        id,
823                        current.version_id(),
824                        (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
825                        current.content().clone(),
826                    );
827                    self.record_change(tenant, submission_id, &change).await?;
828
829                    // Update the resource
830                    let updated = self
831                        .update(tenant, &current, entry.resource.clone())
832                        .await?;
833
834                    Ok(BulkEntryResult::success(
835                        entry.line_number,
836                        &entry.resource_type,
837                        updated.id(),
838                        false,
839                    ))
840                }
841                Ok(None)
842                | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
843                    // Resource doesn't exist - create it
844                    // Use default FHIR version for bulk operations
845                    let created = self
846                        .create(
847                            tenant,
848                            &entry.resource_type,
849                            entry.resource.clone(),
850                            FhirVersion::default(),
851                        )
852                        .await?;
853
854                    // Record change for rollback
855                    let change = SubmissionChange::create(
856                        manifest_id,
857                        &entry.resource_type,
858                        created.id(),
859                        created.version_id(),
860                    );
861                    self.record_change(tenant, submission_id, &change).await?;
862
863                    Ok(BulkEntryResult::success(
864                        entry.line_number,
865                        &entry.resource_type,
866                        created.id(),
867                        true,
868                    ))
869                }
870                Err(e) => Err(e),
871            }
872        } else {
873            // No ID - create new resource
874            // Use default FHIR version for bulk operations
875            let created = self
876                .create(
877                    tenant,
878                    &entry.resource_type,
879                    entry.resource.clone(),
880                    FhirVersion::default(),
881                )
882                .await?;
883
884            // Record change for rollback
885            let change = SubmissionChange::create(
886                manifest_id,
887                &entry.resource_type,
888                created.id(),
889                created.version_id(),
890            );
891            self.record_change(tenant, submission_id, &change).await?;
892
893            Ok(BulkEntryResult::success(
894                entry.line_number,
895                &entry.resource_type,
896                created.id(),
897                true,
898            ))
899        }
900    }
901
902    /// Store an entry result in the database.
903    async fn store_entry_result(
904        &self,
905        tenant: &TenantContext,
906        submission_id: &SubmissionId,
907        manifest_id: &str,
908        result: &BulkEntryResult,
909    ) -> StorageResult<()> {
910        let conn = self.get_connection()?;
911        let tenant_id = tenant.tenant_id().as_str();
912
913        let outcome_bytes = result
914            .operation_outcome
915            .as_ref()
916            .and_then(|o| serde_json::to_vec(o).ok());
917
918        conn.execute(
919            "INSERT INTO bulk_entry_results
920             (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
921             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
922            params![
923                tenant_id,
924                &submission_id.submitter,
925                &submission_id.submission_id,
926                manifest_id,
927                result.line_number as i64,
928                &result.resource_type,
929                &result.resource_id,
930                if result.created { Some(1) } else { Some(0) },
931                result.outcome.to_string(),
932                outcome_bytes
933            ],
934        )
935        .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
936
937        Ok(())
938    }
939}
940
941#[async_trait]
942impl StreamingBulkSubmitProvider for SqliteBackend {
943    async fn process_ndjson_stream(
944        &self,
945        tenant: &TenantContext,
946        submission_id: &SubmissionId,
947        manifest_id: &str,
948        resource_type: &str,
949        mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
950        options: &BulkProcessingOptions,
951    ) -> StorageResult<StreamProcessingResult> {
952        let mut result = StreamProcessingResult::new();
953        let mut line_number = 0u64;
954        let mut batch = Vec::new();
955
956        loop {
957            let mut line = String::new();
958            let bytes_read = reader
959                .read_line(&mut line)
960                .await
961                .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
962
963            if bytes_read == 0 {
964                // End of stream
965                break;
966            }
967
968            line_number += 1;
969            result.lines_processed = line_number;
970
971            let line = line.trim();
972            if line.is_empty() {
973                continue;
974            }
975
976            // Parse the line
977            match NdjsonEntry::parse(line_number, line) {
978                Ok(entry) => {
979                    // Validate resource type matches
980                    if entry.resource_type != resource_type {
981                        let error_result = BulkEntryResult::validation_error(
982                            line_number,
983                            &entry.resource_type,
984                            serde_json::json!({
985                                "resourceType": "OperationOutcome",
986                                "issue": [{
987                                    "severity": "error",
988                                    "code": "invalid",
989                                    "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
990                                }]
991                            }),
992                        );
993                        result.counts.increment(error_result.outcome);
994
995                        if !options.continue_on_error
996                            && (options.max_errors == 0
997                                || result.counts.error_count() >= options.max_errors as u64)
998                        {
999                            return Ok(result.aborted("max errors exceeded"));
1000                        }
1001                        continue;
1002                    }
1003
1004                    batch.push(entry);
1005                }
1006                Err(e) => {
1007                    result.counts.increment(BulkEntryOutcome::ValidationError);
1008
1009                    if !options.continue_on_error
1010                        && (options.max_errors == 0
1011                            || result.counts.error_count() >= options.max_errors as u64)
1012                    {
1013                        return Ok(result.aborted(format!("Parse error: {}", e)));
1014                    }
1015                }
1016            }
1017
1018            // Process batch if it's full
1019            if batch.len() >= options.batch_size as usize {
1020                let batch_results = self
1021                    .process_entries(
1022                        tenant,
1023                        submission_id,
1024                        manifest_id,
1025                        std::mem::take(&mut batch),
1026                        options,
1027                    )
1028                    .await?;
1029
1030                for r in batch_results {
1031                    result.counts.increment(r.outcome);
1032                }
1033
1034                // Check if we need to abort
1035                if !options.continue_on_error
1036                    && options.max_errors > 0
1037                    && result.counts.error_count() >= options.max_errors as u64
1038                {
1039                    return Ok(result.aborted("max errors exceeded"));
1040                }
1041            }
1042        }
1043
1044        // Process remaining entries
1045        if !batch.is_empty() {
1046            let batch_results = self
1047                .process_entries(tenant, submission_id, manifest_id, batch, options)
1048                .await?;
1049
1050            for r in batch_results {
1051                result.counts.increment(r.outcome);
1052            }
1053        }
1054
1055        Ok(result)
1056    }
1057}
1058
1059#[async_trait]
1060impl BulkSubmitRollbackProvider for SqliteBackend {
1061    async fn record_change(
1062        &self,
1063        tenant: &TenantContext,
1064        submission_id: &SubmissionId,
1065        change: &SubmissionChange,
1066    ) -> StorageResult<()> {
1067        let conn = self.get_connection()?;
1068        let tenant_id = tenant.tenant_id().as_str();
1069
1070        let previous_content_bytes = change
1071            .previous_content
1072            .as_ref()
1073            .and_then(|c| serde_json::to_vec(c).ok());
1074
1075        conn.execute(
1076            "INSERT INTO bulk_submission_changes
1077             (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1078             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1079            params![
1080                tenant_id,
1081                &submission_id.submitter,
1082                &submission_id.submission_id,
1083                &change.change_id,
1084                &change.manifest_id,
1085                change.change_type.to_string(),
1086                &change.resource_type,
1087                &change.resource_id,
1088                &change.previous_version,
1089                &change.new_version,
1090                previous_content_bytes,
1091                change.changed_at.to_rfc3339()
1092            ],
1093        )
1094        .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1095
1096        Ok(())
1097    }
1098
1099    async fn list_changes(
1100        &self,
1101        tenant: &TenantContext,
1102        submission_id: &SubmissionId,
1103        limit: u32,
1104        offset: u32,
1105    ) -> StorageResult<Vec<SubmissionChange>> {
1106        let conn = self.get_connection()?;
1107        let tenant_id = tenant.tenant_id().as_str();
1108
1109        let mut stmt = conn
1110            .prepare(&format!(
1111                "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1112                 FROM bulk_submission_changes
1113                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1114                 ORDER BY changed_at DESC
1115                 LIMIT {} OFFSET {}",
1116                limit, offset
1117            ))
1118            .map_err(|e| internal_error(format!("Failed to prepare changes query: {}", e)))?;
1119
1120        let changes: Vec<SubmissionChange> = stmt
1121            .query_map(
1122                params![
1123                    tenant_id,
1124                    &submission_id.submitter,
1125                    &submission_id.submission_id
1126                ],
1127                |row| {
1128                    let change_id: String = row.get(0)?;
1129                    let manifest_id: String = row.get(1)?;
1130                    let change_type_str: String = row.get(2)?;
1131                    let resource_type: String = row.get(3)?;
1132                    let resource_id: String = row.get(4)?;
1133                    let previous_version: Option<String> = row.get(5)?;
1134                    let new_version: String = row.get(6)?;
1135                    let previous_content_bytes: Option<Vec<u8>> = row.get(7)?;
1136                    let changed_at_str: String = row.get(8)?;
1137
1138                    let change_type: ChangeType =
1139                        change_type_str.parse().unwrap_or(ChangeType::Create);
1140                    let previous_content =
1141                        previous_content_bytes.and_then(|b| serde_json::from_slice(&b).ok());
1142                    let changed_at = chrono::DateTime::parse_from_rfc3339(&changed_at_str)
1143                        .map(|dt| dt.with_timezone(&Utc))
1144                        .unwrap_or_else(|_| Utc::now());
1145
1146                    Ok(SubmissionChange {
1147                        change_id,
1148                        manifest_id,
1149                        change_type,
1150                        resource_type,
1151                        resource_id,
1152                        previous_version,
1153                        new_version,
1154                        previous_content,
1155                        changed_at,
1156                    })
1157                },
1158            )
1159            .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?
1160            .filter_map(|r| r.ok())
1161            .collect();
1162
1163        Ok(changes)
1164    }
1165
1166    async fn rollback_change(
1167        &self,
1168        tenant: &TenantContext,
1169        _submission_id: &SubmissionId,
1170        change: &SubmissionChange,
1171    ) -> StorageResult<bool> {
1172        match change.change_type {
1173            ChangeType::Create => {
1174                // Delete the created resource
1175                match self
1176                    .delete(tenant, &change.resource_type, &change.resource_id)
1177                    .await
1178                {
1179                    Ok(()) => Ok(true),
1180                    Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1181                        ..
1182                    })) => {
1183                        // Already deleted
1184                        Ok(true)
1185                    }
1186                    Err(e) => Err(e),
1187                }
1188            }
1189            ChangeType::Update => {
1190                // Restore the previous content
1191                if let Some(ref previous_content) = change.previous_content {
1192                    // Read current to get version for update
1193                    let current = self
1194                        .read(tenant, &change.resource_type, &change.resource_id)
1195                        .await?;
1196                    if let Some(current) = current {
1197                        self.update(tenant, &current, previous_content.clone())
1198                            .await?;
1199                        Ok(true)
1200                    } else {
1201                        // Resource no longer exists
1202                        Ok(false)
1203                    }
1204                } else {
1205                    // No previous content to restore
1206                    Ok(false)
1207                }
1208            }
1209        }
1210    }
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215    use super::*;
1216    use crate::tenant::{TenantId, TenantPermissions};
1217    use serde_json::json;
1218
1219    fn create_test_backend() -> SqliteBackend {
1220        let backend = SqliteBackend::in_memory().unwrap();
1221        backend.init_schema().unwrap();
1222        backend
1223    }
1224
1225    fn create_test_tenant() -> TenantContext {
1226        TenantContext::new(
1227            TenantId::new("test-tenant"),
1228            TenantPermissions::full_access(),
1229        )
1230    }
1231
1232    #[tokio::test]
1233    async fn test_create_submission() {
1234        let backend = create_test_backend();
1235        let tenant = create_test_tenant();
1236
1237        let sub_id = SubmissionId::generate("test-system");
1238        let summary = backend
1239            .create_submission(&tenant, &sub_id, None)
1240            .await
1241            .unwrap();
1242
1243        assert_eq!(summary.status, SubmissionStatus::InProgress);
1244        assert_eq!(summary.manifest_count, 0);
1245    }
1246
1247    #[tokio::test]
1248    async fn test_duplicate_submission() {
1249        let backend = create_test_backend();
1250        let tenant = create_test_tenant();
1251
1252        let sub_id = SubmissionId::new("test-system", "sub-123");
1253        backend
1254            .create_submission(&tenant, &sub_id, None)
1255            .await
1256            .unwrap();
1257
1258        let result = backend.create_submission(&tenant, &sub_id, None).await;
1259        assert!(matches!(
1260            result,
1261            Err(StorageError::BulkSubmit(
1262                BulkSubmitError::DuplicateSubmission { .. }
1263            ))
1264        ));
1265    }
1266
1267    #[tokio::test]
1268    async fn test_add_manifest() {
1269        let backend = create_test_backend();
1270        let tenant = create_test_tenant();
1271
1272        let sub_id = SubmissionId::generate("test-system");
1273        backend
1274            .create_submission(&tenant, &sub_id, None)
1275            .await
1276            .unwrap();
1277
1278        let manifest = backend
1279            .add_manifest(
1280                &tenant,
1281                &sub_id,
1282                Some("http://example.com/data.ndjson"),
1283                None,
1284            )
1285            .await
1286            .unwrap();
1287
1288        assert_eq!(manifest.status, ManifestStatus::Pending);
1289        assert_eq!(
1290            manifest.manifest_url,
1291            Some("http://example.com/data.ndjson".to_string())
1292        );
1293    }
1294
1295    #[tokio::test]
1296    async fn test_process_entries() {
1297        let backend = create_test_backend();
1298        let tenant = create_test_tenant();
1299
1300        let sub_id = SubmissionId::generate("test-system");
1301        backend
1302            .create_submission(&tenant, &sub_id, None)
1303            .await
1304            .unwrap();
1305
1306        let manifest = backend
1307            .add_manifest(&tenant, &sub_id, None, None)
1308            .await
1309            .unwrap();
1310
1311        let entries = vec![
1312            NdjsonEntry::new(
1313                1,
1314                "Patient",
1315                json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
1316            ),
1317            NdjsonEntry::new(
1318                2,
1319                "Patient",
1320                json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
1321            ),
1322        ];
1323
1324        let options = BulkProcessingOptions::new();
1325        let results = backend
1326            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1327            .await
1328            .unwrap();
1329
1330        assert_eq!(results.len(), 2);
1331        assert!(results.iter().all(|r| r.is_success()));
1332        assert!(results.iter().all(|r| r.created));
1333    }
1334
1335    #[tokio::test]
1336    async fn test_complete_submission() {
1337        let backend = create_test_backend();
1338        let tenant = create_test_tenant();
1339
1340        let sub_id = SubmissionId::generate("test-system");
1341        backend
1342            .create_submission(&tenant, &sub_id, None)
1343            .await
1344            .unwrap();
1345
1346        let summary = backend.complete_submission(&tenant, &sub_id).await.unwrap();
1347        assert_eq!(summary.status, SubmissionStatus::Complete);
1348        assert!(summary.completed_at.is_some());
1349    }
1350
1351    #[tokio::test]
1352    async fn test_abort_submission() {
1353        let backend = create_test_backend();
1354        let tenant = create_test_tenant();
1355
1356        let sub_id = SubmissionId::generate("test-system");
1357        backend
1358            .create_submission(&tenant, &sub_id, None)
1359            .await
1360            .unwrap();
1361
1362        backend
1363            .add_manifest(&tenant, &sub_id, None, None)
1364            .await
1365            .unwrap();
1366
1367        let cancelled = backend
1368            .abort_submission(&tenant, &sub_id, "test abort")
1369            .await
1370            .unwrap();
1371        assert_eq!(cancelled, 1);
1372
1373        let summary = backend
1374            .get_submission(&tenant, &sub_id)
1375            .await
1376            .unwrap()
1377            .unwrap();
1378        assert_eq!(summary.status, SubmissionStatus::Aborted);
1379    }
1380
1381    #[tokio::test]
1382    async fn test_rollback_create() {
1383        let backend = create_test_backend();
1384        let tenant = create_test_tenant();
1385
1386        let sub_id = SubmissionId::generate("test-system");
1387        backend
1388            .create_submission(&tenant, &sub_id, None)
1389            .await
1390            .unwrap();
1391
1392        let manifest = backend
1393            .add_manifest(&tenant, &sub_id, None, None)
1394            .await
1395            .unwrap();
1396
1397        let entries = vec![NdjsonEntry::new(
1398            1,
1399            "Patient",
1400            json!({"resourceType": "Patient", "id": "rollback-test", "name": [{"family": "Test"}]}),
1401        )];
1402
1403        let options = BulkProcessingOptions::new();
1404        let _results = backend
1405            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1406            .await
1407            .unwrap();
1408
1409        // Verify resource was created
1410        let patient = backend
1411            .read(&tenant, "Patient", "rollback-test")
1412            .await
1413            .unwrap();
1414        assert!(patient.is_some());
1415
1416        // Rollback
1417        let changes = backend.list_changes(&tenant, &sub_id, 10, 0).await.unwrap();
1418        assert_eq!(changes.len(), 1);
1419
1420        let rolled_back = backend
1421            .rollback_change(&tenant, &sub_id, &changes[0])
1422            .await
1423            .unwrap();
1424        assert!(rolled_back);
1425
1426        // Verify resource was deleted
1427        let patient = backend.read(&tenant, "Patient", "rollback-test").await;
1428        assert!(patient.is_err()); // Should be Gone
1429    }
1430
1431    #[tokio::test]
1432    async fn test_entry_counts() {
1433        let backend = create_test_backend();
1434        let tenant = create_test_tenant();
1435
1436        let sub_id = SubmissionId::generate("test-system");
1437        backend
1438            .create_submission(&tenant, &sub_id, None)
1439            .await
1440            .unwrap();
1441
1442        let manifest = backend
1443            .add_manifest(&tenant, &sub_id, None, None)
1444            .await
1445            .unwrap();
1446
1447        let entries = vec![
1448            NdjsonEntry::new(
1449                1,
1450                "Patient",
1451                json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
1452            ),
1453            NdjsonEntry::new(
1454                2,
1455                "Patient",
1456                json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
1457            ),
1458        ];
1459
1460        let options = BulkProcessingOptions::new();
1461        backend
1462            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1463            .await
1464            .unwrap();
1465
1466        let counts = backend
1467            .get_entry_counts(&tenant, &sub_id, &manifest.manifest_id)
1468            .await
1469            .unwrap();
1470
1471        assert_eq!(counts.total, 2);
1472        assert_eq!(counts.success, 2);
1473        assert_eq!(counts.error_count(), 0);
1474    }
1475}