Skip to main content

helios_persistence/backends/sqlite/
bulk_submit.rs

1//! Bulk submit implementation for SQLite backend.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use rusqlite::params;
7use serde_json::Value;
8use std::time::Duration as StdDuration;
9use tokio::io::{AsyncBufRead, AsyncBufReadExt};
10use tokio::sync::Mutex;
11use uuid::Uuid;
12
13use crate::core::ResourceStorage;
14use crate::core::bulk_export::ExportJobId;
15use crate::core::bulk_export_worker::{LeaseError, WorkerId};
16use crate::core::bulk_submit::{
17    BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
18    BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
19    StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
20    SubmissionManifest, SubmissionStatus, SubmissionSummary,
21};
22use crate::core::bulk_submit_worker::{
23    ManifestLease, ManifestWorkerView, PollTokenTarget, SubmitClaimStrategy, SubmitFileRecord,
24    SubmitFileRow, SubmitWorkerStorage,
25};
26use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
27use crate::tenant::{TenantContext, TenantId, TenantPermissions};
28
29use super::SqliteBackend;
30
31/// Process-local lock serializing manifest claims for the single-instance SQLite
32/// job store (SQLite has no `SELECT … FOR UPDATE SKIP LOCKED`).
33static SUBMIT_CLAIM_LOCK: Mutex<()> = Mutex::const_new(());
34
35/// Builds a `LeaseError::LeaseLost` for a submit manifest (the shared variant
36/// carries an `ExportJobId`, so we encode `submission/manifest` into it).
37fn lease_lost(lease: &ManifestLease) -> LeaseError {
38    LeaseError::LeaseLost {
39        job_id: ExportJobId::from_string(format!("{}/{}", lease.submission_id, lease.manifest_id)),
40    }
41}
42
43/// Derives the ingest FHIR version from a stored `outputFormat` MIME string.
44fn fhir_version_from_output_format(output_format: Option<&str>) -> FhirVersion {
45    output_format
46        .and_then(|fmt| {
47            fmt.split(';').find_map(|part| {
48                let part = part.trim();
49                part.strip_prefix("fhirVersion=")
50                    .and_then(FhirVersion::from_mime_param)
51            })
52        })
53        .unwrap_or_else(FhirVersion::default_enabled)
54}
55
56fn internal_error(message: String) -> StorageError {
57    StorageError::Backend(BackendError::Internal {
58        backend_name: "sqlite".to_string(),
59        message,
60        source: None,
61    })
62}
63
64#[async_trait]
65impl BulkSubmitProvider for SqliteBackend {
66    async fn create_submission(
67        &self,
68        tenant: &TenantContext,
69        id: &SubmissionId,
70        metadata: Option<Value>,
71    ) -> StorageResult<SubmissionSummary> {
72        let conn = self.get_connection()?;
73        let tenant_id = tenant.tenant_id().as_str();
74
75        // Check for duplicate
76        let exists: bool = conn
77            .query_row(
78                "SELECT 1 FROM bulk_submissions
79                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
80                params![tenant_id, &id.submitter, &id.submission_id],
81                |_| Ok(true),
82            )
83            .unwrap_or(false);
84
85        if exists {
86            return Err(StorageError::BulkSubmit(
87                BulkSubmitError::DuplicateSubmission {
88                    submitter: id.submitter.clone(),
89                    submission_id: id.submission_id.clone(),
90                },
91            ));
92        }
93
94        let now = Utc::now();
95        let now_str = now.to_rfc3339();
96        let metadata_bytes = metadata.as_ref().and_then(|m| serde_json::to_vec(m).ok());
97
98        conn.execute(
99            "INSERT INTO bulk_submissions
100             (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
101             VALUES (?1, ?2, ?3, 'in-progress', ?4, ?5, ?6)",
102            params![
103                tenant_id,
104                &id.submitter,
105                &id.submission_id,
106                now_str,
107                now_str,
108                metadata_bytes
109            ],
110        )
111        .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
112
113        Ok(SubmissionSummary {
114            id: id.clone(),
115            status: SubmissionStatus::InProgress,
116            created_at: now,
117            updated_at: now,
118            completed_at: None,
119            manifest_count: 0,
120            total_entries: 0,
121            success_count: 0,
122            error_count: 0,
123            skipped_count: 0,
124            metadata,
125        })
126    }
127
128    async fn get_submission(
129        &self,
130        tenant: &TenantContext,
131        id: &SubmissionId,
132    ) -> StorageResult<Option<SubmissionSummary>> {
133        let conn = self.get_connection()?;
134        let tenant_id = tenant.tenant_id().as_str();
135
136        let result = conn.query_row(
137            "SELECT status, created_at, updated_at, completed_at, metadata
138             FROM bulk_submissions
139             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
140            params![tenant_id, &id.submitter, &id.submission_id],
141            |row| {
142                Ok((
143                    row.get::<_, String>(0)?,
144                    row.get::<_, String>(1)?,
145                    row.get::<_, String>(2)?,
146                    row.get::<_, Option<String>>(3)?,
147                    row.get::<_, Option<Vec<u8>>>(4)?,
148                ))
149            },
150        );
151
152        match result {
153            Ok((status_str, created_at, updated_at, completed_at, metadata_bytes)) => {
154                let status: SubmissionStatus = status_str
155                    .parse()
156                    .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
157
158                let created_at = chrono::DateTime::parse_from_rfc3339(&created_at)
159                    .map_err(|e| internal_error(format!("Invalid created_at: {}", e)))?
160                    .with_timezone(&Utc);
161
162                let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at)
163                    .map_err(|e| internal_error(format!("Invalid updated_at: {}", e)))?
164                    .with_timezone(&Utc);
165
166                let completed_at = completed_at.and_then(|s| {
167                    chrono::DateTime::parse_from_rfc3339(&s)
168                        .ok()
169                        .map(|dt| dt.with_timezone(&Utc))
170                });
171
172                let metadata = metadata_bytes.and_then(|b| serde_json::from_slice(&b).ok());
173
174                // Get manifest count
175                let manifest_count: i32 = conn
176                    .query_row(
177                        "SELECT COUNT(*) FROM bulk_manifests
178                         WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
179                        params![tenant_id, &id.submitter, &id.submission_id],
180                        |row| row.get(0),
181                    )
182                    .unwrap_or(0);
183
184                // Get aggregated counts from entry results
185                let (total, success, errors, skipped): (i64, i64, i64, i64) = conn
186                    .query_row(
187                        "SELECT
188                            COUNT(*),
189                            SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
190                            SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
191                            SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
192                         FROM bulk_entry_results
193                         WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
194                        params![tenant_id, &id.submitter, &id.submission_id],
195                        |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
196                    )
197                    .unwrap_or((0, 0, 0, 0));
198
199                Ok(Some(SubmissionSummary {
200                    id: id.clone(),
201                    status,
202                    created_at,
203                    updated_at,
204                    completed_at,
205                    manifest_count: manifest_count as u32,
206                    total_entries: total as u64,
207                    success_count: success as u64,
208                    error_count: errors as u64,
209                    skipped_count: skipped as u64,
210                    metadata,
211                }))
212            }
213            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
214            Err(e) => Err(internal_error(format!("Failed to get submission: {}", e))),
215        }
216    }
217
218    async fn list_submissions(
219        &self,
220        tenant: &TenantContext,
221        submitter: Option<&str>,
222        status: Option<SubmissionStatus>,
223        limit: u32,
224        offset: u32,
225    ) -> StorageResult<Vec<SubmissionSummary>> {
226        // Collect IDs first, then drop the connection before calling async methods
227        let ids: Vec<(String, String)> = {
228            let conn = self.get_connection()?;
229            let tenant_id = tenant.tenant_id().as_str();
230
231            let (query, params): (String, Vec<String>) = {
232                let mut query =
233                    "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = ?1"
234                        .to_string();
235                let mut params = vec![tenant_id.to_string()];
236
237                if let Some(submitter) = submitter {
238                    query.push_str(" AND submitter = ?2");
239                    params.push(submitter.to_string());
240                }
241
242                if let Some(status) = status {
243                    let param_num = params.len() + 1;
244                    query.push_str(&format!(" AND status = ?{}", param_num));
245                    params.push(status.to_string());
246                }
247
248                query.push_str(" ORDER BY created_at DESC");
249                query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
250
251                (query, params)
252            };
253
254            let mut stmt = conn
255                .prepare(&query)
256                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
257
258            let params_refs: Vec<&dyn rusqlite::ToSql> =
259                params.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
260
261            stmt.query_map(params_refs.as_slice(), |row| {
262                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
263            })
264            .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?
265            .filter_map(|r| r.ok())
266            .collect()
267        };
268
269        let mut results = Vec::new();
270        for (submitter, submission_id) in ids {
271            let sub_id = SubmissionId::new(submitter, submission_id);
272            if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
273                results.push(summary);
274            }
275        }
276
277        Ok(results)
278    }
279
280    async fn complete_submission(
281        &self,
282        tenant: &TenantContext,
283        id: &SubmissionId,
284    ) -> StorageResult<SubmissionSummary> {
285        let conn = self.get_connection()?;
286        let tenant_id = tenant.tenant_id().as_str();
287
288        // Check current status
289        let current_status: String = conn
290            .query_row(
291                "SELECT status FROM bulk_submissions
292                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
293                params![tenant_id, &id.submitter, &id.submission_id],
294                |row| row.get(0),
295            )
296            .map_err(|e| {
297                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
298                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
299                        submitter: id.submitter.clone(),
300                        submission_id: id.submission_id.clone(),
301                    })
302                } else {
303                    internal_error(format!("Failed to get submission status: {}", e))
304                }
305            })?;
306
307        if current_status != "in-progress" {
308            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
309                submission_id: id.submission_id.clone(),
310            }));
311        }
312
313        let now = Utc::now().to_rfc3339();
314        conn.execute(
315            "UPDATE bulk_submissions SET status = 'complete', completed_at = ?1, updated_at = ?2
316             WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
317            params![now, now, tenant_id, &id.submitter, &id.submission_id],
318        )
319        .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
320
321        self.get_submission(tenant, id)
322            .await?
323            .ok_or_else(|| internal_error("Submission disappeared".to_string()))
324    }
325
326    async fn abort_submission(
327        &self,
328        tenant: &TenantContext,
329        id: &SubmissionId,
330        _reason: &str,
331    ) -> StorageResult<u64> {
332        let conn = self.get_connection()?;
333        let tenant_id = tenant.tenant_id().as_str();
334
335        // Check current status
336        let current_status: String = conn
337            .query_row(
338                "SELECT status FROM bulk_submissions
339                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
340                params![tenant_id, &id.submitter, &id.submission_id],
341                |row| row.get(0),
342            )
343            .map_err(|e| {
344                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
345                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
346                        submitter: id.submitter.clone(),
347                        submission_id: id.submission_id.clone(),
348                    })
349                } else {
350                    internal_error(format!("Failed to get submission status: {}", e))
351                }
352            })?;
353
354        if current_status != "in-progress" {
355            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
356                submission_id: id.submission_id.clone(),
357            }));
358        }
359
360        // Count pending manifests
361        let pending_count: i64 = conn
362            .query_row(
363                "SELECT COUNT(*) FROM bulk_manifests
364                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
365                 AND status IN ('pending', 'processing')",
366                params![tenant_id, &id.submitter, &id.submission_id],
367                |row| row.get(0),
368            )
369            .unwrap_or(0);
370
371        let now = Utc::now().to_rfc3339();
372
373        // Update submission status
374        conn.execute(
375            "UPDATE bulk_submissions SET status = 'aborted', completed_at = ?1, updated_at = ?2
376             WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
377            params![now, now, tenant_id, &id.submitter, &id.submission_id],
378        )
379        .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
380
381        // Update pending manifests to failed
382        conn.execute(
383            "UPDATE bulk_manifests SET status = 'failed'
384             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
385             AND status IN ('pending', 'processing')",
386            params![tenant_id, &id.submitter, &id.submission_id],
387        )
388        .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
389
390        Ok(pending_count as u64)
391    }
392
393    async fn add_manifest(
394        &self,
395        tenant: &TenantContext,
396        submission_id: &SubmissionId,
397        manifest_url: Option<&str>,
398        replaces_manifest_url: Option<&str>,
399    ) -> StorageResult<SubmissionManifest> {
400        let conn = self.get_connection()?;
401        let tenant_id = tenant.tenant_id().as_str();
402
403        // Check submission exists and is in progress
404        let status: String = conn
405            .query_row(
406                "SELECT status FROM bulk_submissions
407                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
408                params![
409                    tenant_id,
410                    &submission_id.submitter,
411                    &submission_id.submission_id
412                ],
413                |row| row.get(0),
414            )
415            .map_err(|e| {
416                if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
417                    StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
418                        submitter: submission_id.submitter.clone(),
419                        submission_id: submission_id.submission_id.clone(),
420                    })
421                } else {
422                    internal_error(format!("Failed to get submission: {}", e))
423                }
424            })?;
425
426        if status != "in-progress" {
427            return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
428                submission_id: submission_id.submission_id.clone(),
429                expected: "in-progress".to_string(),
430                actual: status,
431            }));
432        }
433
434        let manifest_id = Uuid::new_v4().to_string();
435        let now = Utc::now();
436        let now_str = now.to_rfc3339();
437
438        conn.execute(
439            "INSERT INTO bulk_manifests
440             (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
441             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
442            params![
443                tenant_id,
444                &submission_id.submitter,
445                &submission_id.submission_id,
446                manifest_id,
447                manifest_url,
448                replaces_manifest_url,
449                now_str
450            ],
451        )
452        .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
453
454        // Update submission updated_at
455        conn.execute(
456            "UPDATE bulk_submissions SET updated_at = ?1
457             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
458            params![
459                now_str,
460                tenant_id,
461                &submission_id.submitter,
462                &submission_id.submission_id
463            ],
464        )
465        .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
466
467        Ok(SubmissionManifest {
468            manifest_id,
469            manifest_url: manifest_url.map(String::from),
470            replaces_manifest_url: replaces_manifest_url.map(String::from),
471            status: ManifestStatus::Pending,
472            added_at: now,
473            total_entries: 0,
474            processed_entries: 0,
475            failed_entries: 0,
476        })
477    }
478
479    async fn get_manifest(
480        &self,
481        tenant: &TenantContext,
482        submission_id: &SubmissionId,
483        manifest_id: &str,
484    ) -> StorageResult<Option<SubmissionManifest>> {
485        let conn = self.get_connection()?;
486        let tenant_id = tenant.tenant_id().as_str();
487
488        let result = conn.query_row(
489            "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
490             FROM bulk_manifests
491             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
492            params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
493            |row| {
494                Ok((
495                    row.get::<_, Option<String>>(0)?,
496                    row.get::<_, Option<String>>(1)?,
497                    row.get::<_, String>(2)?,
498                    row.get::<_, String>(3)?,
499                    row.get::<_, i64>(4)?,
500                    row.get::<_, i64>(5)?,
501                    row.get::<_, i64>(6)?,
502                ))
503            },
504        );
505
506        match result {
507            Ok((
508                manifest_url,
509                replaces_manifest_url,
510                status_str,
511                added_at,
512                total,
513                processed,
514                failed,
515            )) => {
516                let status: ManifestStatus = status_str.parse().map_err(|_| {
517                    internal_error(format!("Invalid manifest status: {}", status_str))
518                })?;
519
520                let added_at = chrono::DateTime::parse_from_rfc3339(&added_at)
521                    .map_err(|e| internal_error(format!("Invalid added_at: {}", e)))?
522                    .with_timezone(&Utc);
523
524                Ok(Some(SubmissionManifest {
525                    manifest_id: manifest_id.to_string(),
526                    manifest_url,
527                    replaces_manifest_url,
528                    status,
529                    added_at,
530                    total_entries: total as u64,
531                    processed_entries: processed as u64,
532                    failed_entries: failed as u64,
533                }))
534            }
535            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
536            Err(e) => Err(internal_error(format!("Failed to get manifest: {}", e))),
537        }
538    }
539
540    async fn list_manifests(
541        &self,
542        tenant: &TenantContext,
543        submission_id: &SubmissionId,
544    ) -> StorageResult<Vec<SubmissionManifest>> {
545        // Collect IDs first, then drop the connection before calling async methods
546        let manifest_ids: Vec<String> = {
547            let conn = self.get_connection()?;
548            let tenant_id = tenant.tenant_id().as_str();
549
550            let mut stmt = conn
551                .prepare(
552                    "SELECT manifest_id FROM bulk_manifests
553                     WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
554                     ORDER BY added_at",
555                )
556                .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
557
558            stmt.query_map(
559                params![
560                    tenant_id,
561                    &submission_id.submitter,
562                    &submission_id.submission_id
563                ],
564                |row| row.get(0),
565            )
566            .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?
567            .filter_map(|r| r.ok())
568            .collect()
569        };
570
571        let mut results = Vec::new();
572        for manifest_id in manifest_ids {
573            if let Some(manifest) = self
574                .get_manifest(tenant, submission_id, &manifest_id)
575                .await?
576            {
577                results.push(manifest);
578            }
579        }
580
581        Ok(results)
582    }
583
584    async fn process_entries(
585        &self,
586        tenant: &TenantContext,
587        submission_id: &SubmissionId,
588        manifest_id: &str,
589        entries: Vec<NdjsonEntry>,
590        options: &BulkProcessingOptions,
591    ) -> StorageResult<Vec<BulkEntryResult>> {
592        let conn = self.get_connection()?;
593        let tenant_id = tenant.tenant_id().as_str();
594
595        // Verify manifest exists
596        if self
597            .get_manifest(tenant, submission_id, manifest_id)
598            .await?
599            .is_none()
600        {
601            return Err(StorageError::BulkSubmit(
602                BulkSubmitError::ManifestNotFound {
603                    submission_id: submission_id.submission_id.clone(),
604                    manifest_id: manifest_id.to_string(),
605                },
606            ));
607        }
608
609        // Update manifest status to processing
610        conn.execute(
611            "UPDATE bulk_manifests SET status = 'processing'
612             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
613            params![
614                tenant_id,
615                &submission_id.submitter,
616                &submission_id.submission_id,
617                manifest_id
618            ],
619        )
620        .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
621
622        let mut results = Vec::new();
623        let mut error_count = 0u32;
624
625        for entry in entries {
626            // Check if we've hit max errors
627            if options.max_errors > 0 && error_count >= options.max_errors {
628                if !options.continue_on_error {
629                    return Err(StorageError::BulkSubmit(
630                        BulkSubmitError::MaxErrorsExceeded {
631                            submission_id: submission_id.submission_id.clone(),
632                            max_errors: options.max_errors,
633                        },
634                    ));
635                }
636                // Skip remaining entries
637                let skip_result = BulkEntryResult::skipped(
638                    entry.line_number,
639                    &entry.resource_type,
640                    "max errors exceeded",
641                );
642                results.push(skip_result);
643                continue;
644            }
645
646            // Process the entry
647            let result = self
648                .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
649                .await;
650
651            let entry_result = match result {
652                Ok(r) => r,
653                Err(e) => {
654                    error_count += 1;
655                    BulkEntryResult::processing_error(
656                        entry.line_number,
657                        &entry.resource_type,
658                        serde_json::json!({
659                            "resourceType": "OperationOutcome",
660                            "issue": [{
661                                "severity": "error",
662                                "code": "exception",
663                                "diagnostics": e.to_string()
664                            }]
665                        }),
666                    )
667                }
668            };
669
670            if entry_result.is_error() {
671                error_count += 1;
672            }
673
674            // Store the result
675            self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
676                .await?;
677
678            results.push(entry_result);
679        }
680
681        // Update manifest counts
682        let now = Utc::now().to_rfc3339();
683        conn.execute(
684            "UPDATE bulk_manifests SET
685                total_entries = total_entries + ?1,
686                processed_entries = processed_entries + ?2,
687                failed_entries = failed_entries + ?3
688             WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
689            params![
690                results.len() as i64,
691                results.iter().filter(|r| r.is_success()).count() as i64,
692                error_count as i64,
693                tenant_id,
694                &submission_id.submitter,
695                &submission_id.submission_id,
696                manifest_id
697            ],
698        )
699        .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
700
701        // Update submission updated_at
702        conn.execute(
703            "UPDATE bulk_submissions SET updated_at = ?1
704             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
705            params![
706                now,
707                tenant_id,
708                &submission_id.submitter,
709                &submission_id.submission_id
710            ],
711        )
712        .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
713
714        Ok(results)
715    }
716
717    async fn get_entry_results(
718        &self,
719        tenant: &TenantContext,
720        submission_id: &SubmissionId,
721        manifest_id: &str,
722        outcome_filter: Option<BulkEntryOutcome>,
723        limit: u32,
724        offset: u32,
725    ) -> StorageResult<Vec<BulkEntryResult>> {
726        let conn = self.get_connection()?;
727        let tenant_id = tenant.tenant_id().as_str();
728
729        let mut query =
730            "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
731             FROM bulk_entry_results
732             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4"
733                .to_string();
734
735        let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
736            Box::new(tenant_id.to_string()),
737            Box::new(submission_id.submitter.clone()),
738            Box::new(submission_id.submission_id.clone()),
739            Box::new(manifest_id.to_string()),
740        ];
741
742        if let Some(outcome) = outcome_filter {
743            query.push_str(" AND outcome = ?");
744            params_vec.push(Box::new(outcome.to_string()));
745        }
746
747        query.push_str(" ORDER BY line_number");
748        query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
749
750        let params_slice: Vec<&dyn rusqlite::ToSql> =
751            params_vec.iter().map(|p| p.as_ref()).collect();
752
753        let mut stmt = conn
754            .prepare(&query)
755            .map_err(|e| internal_error(format!("Failed to prepare results query: {}", e)))?;
756
757        let results: Vec<BulkEntryResult> = stmt
758            .query_map(params_slice.as_slice(), |row| {
759                let line_number: i64 = row.get(0)?;
760                let resource_type: String = row.get(1)?;
761                let resource_id: Option<String> = row.get(2)?;
762                let created: Option<i32> = row.get(3)?;
763                let outcome_str: String = row.get(4)?;
764                let operation_outcome_bytes: Option<Vec<u8>> = row.get(5)?;
765
766                let outcome: BulkEntryOutcome = outcome_str
767                    .parse()
768                    .unwrap_or(BulkEntryOutcome::ProcessingError);
769
770                let operation_outcome =
771                    operation_outcome_bytes.and_then(|b| serde_json::from_slice(&b).ok());
772
773                Ok(BulkEntryResult {
774                    line_number: line_number as u64,
775                    resource_type,
776                    resource_id,
777                    created: created.map(|c| c != 0).unwrap_or(false),
778                    outcome,
779                    operation_outcome,
780                })
781            })
782            .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?
783            .filter_map(|r| r.ok())
784            .collect();
785
786        Ok(results)
787    }
788
789    async fn get_entry_counts(
790        &self,
791        tenant: &TenantContext,
792        submission_id: &SubmissionId,
793        manifest_id: &str,
794    ) -> StorageResult<EntryCountSummary> {
795        let conn = self.get_connection()?;
796        let tenant_id = tenant.tenant_id().as_str();
797
798        let (total, success, validation_error, processing_error, skipped): (i64, i64, i64, i64, i64) = conn
799            .query_row(
800                "SELECT
801                    COUNT(*),
802                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
803                    SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
804                    SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
805                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
806                 FROM bulk_entry_results
807                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
808                params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
809                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
810            )
811            .unwrap_or((0, 0, 0, 0, 0));
812
813        Ok(EntryCountSummary {
814            total: total as u64,
815            success: success as u64,
816            validation_error: validation_error as u64,
817            processing_error: processing_error as u64,
818            skipped: skipped as u64,
819        })
820    }
821}
822
823impl SqliteBackend {
824    /// Process a single NDJSON entry.
825    async fn process_single_entry(
826        &self,
827        tenant: &TenantContext,
828        submission_id: &SubmissionId,
829        manifest_id: &str,
830        entry: &NdjsonEntry,
831        options: &BulkProcessingOptions,
832    ) -> StorageResult<BulkEntryResult> {
833        // Check if resource has an ID
834        let resource_id = entry.resource_id.as_ref();
835
836        if let Some(id) = resource_id {
837            // Check if resource exists
838            let existing = self.read(tenant, &entry.resource_type, id).await;
839
840            match existing {
841                Ok(Some(current)) => {
842                    // Resource exists - update if allowed
843                    if !options.allow_updates {
844                        return Ok(BulkEntryResult::skipped(
845                            entry.line_number,
846                            &entry.resource_type,
847                            "updates not allowed",
848                        ));
849                    }
850
851                    // Record change for rollback
852                    let change = SubmissionChange::update(
853                        manifest_id,
854                        &entry.resource_type,
855                        id,
856                        current.version_id(),
857                        (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
858                        current.content().clone(),
859                    );
860                    self.record_change(tenant, submission_id, &change).await?;
861
862                    // Update the resource
863                    let updated = self
864                        .update(tenant, &current, entry.resource.clone())
865                        .await?;
866
867                    Ok(BulkEntryResult::success(
868                        entry.line_number,
869                        &entry.resource_type,
870                        updated.id(),
871                        false,
872                    ))
873                }
874                Ok(None)
875                | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
876                    // Resource doesn't exist - create it
877                    // Use default FHIR version for bulk operations
878                    let created = self
879                        .create(
880                            tenant,
881                            &entry.resource_type,
882                            entry.resource.clone(),
883                            FhirVersion::default_enabled(),
884                        )
885                        .await?;
886
887                    // Record change for rollback
888                    let change = SubmissionChange::create(
889                        manifest_id,
890                        &entry.resource_type,
891                        created.id(),
892                        created.version_id(),
893                    );
894                    self.record_change(tenant, submission_id, &change).await?;
895
896                    Ok(BulkEntryResult::success(
897                        entry.line_number,
898                        &entry.resource_type,
899                        created.id(),
900                        true,
901                    ))
902                }
903                Err(e) => Err(e),
904            }
905        } else {
906            // No ID - create new resource
907            // Use default FHIR version for bulk operations
908            let created = self
909                .create(
910                    tenant,
911                    &entry.resource_type,
912                    entry.resource.clone(),
913                    FhirVersion::default_enabled(),
914                )
915                .await?;
916
917            // Record change for rollback
918            let change = SubmissionChange::create(
919                manifest_id,
920                &entry.resource_type,
921                created.id(),
922                created.version_id(),
923            );
924            self.record_change(tenant, submission_id, &change).await?;
925
926            Ok(BulkEntryResult::success(
927                entry.line_number,
928                &entry.resource_type,
929                created.id(),
930                true,
931            ))
932        }
933    }
934
935    /// Store an entry result in the database.
936    async fn store_entry_result(
937        &self,
938        tenant: &TenantContext,
939        submission_id: &SubmissionId,
940        manifest_id: &str,
941        result: &BulkEntryResult,
942    ) -> StorageResult<()> {
943        let conn = self.get_connection()?;
944        let tenant_id = tenant.tenant_id().as_str();
945
946        let outcome_bytes = result
947            .operation_outcome
948            .as_ref()
949            .and_then(|o| serde_json::to_vec(o).ok());
950
951        conn.execute(
952            "INSERT INTO bulk_entry_results
953             (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
954             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
955            params![
956                tenant_id,
957                &submission_id.submitter,
958                &submission_id.submission_id,
959                manifest_id,
960                result.line_number as i64,
961                &result.resource_type,
962                &result.resource_id,
963                if result.created { Some(1) } else { Some(0) },
964                result.outcome.to_string(),
965                outcome_bytes
966            ],
967        )
968        .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
969
970        Ok(())
971    }
972}
973
974#[async_trait]
975impl StreamingBulkSubmitProvider for SqliteBackend {
976    async fn process_ndjson_stream(
977        &self,
978        tenant: &TenantContext,
979        submission_id: &SubmissionId,
980        manifest_id: &str,
981        resource_type: &str,
982        mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
983        options: &BulkProcessingOptions,
984    ) -> StorageResult<StreamProcessingResult> {
985        let mut result = StreamProcessingResult::new();
986        let mut line_number = 0u64;
987        let mut batch = Vec::new();
988
989        loop {
990            let mut line = String::new();
991            let bytes_read = reader
992                .read_line(&mut line)
993                .await
994                .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
995
996            if bytes_read == 0 {
997                // End of stream
998                break;
999            }
1000
1001            line_number += 1;
1002            result.lines_processed = line_number;
1003
1004            let line = line.trim();
1005            if line.is_empty() {
1006                continue;
1007            }
1008
1009            // Parse the line
1010            match NdjsonEntry::parse(line_number, line) {
1011                Ok(entry) => {
1012                    // Validate resource type matches
1013                    if entry.resource_type != resource_type {
1014                        let error_result = BulkEntryResult::validation_error(
1015                            line_number,
1016                            &entry.resource_type,
1017                            serde_json::json!({
1018                                "resourceType": "OperationOutcome",
1019                                "issue": [{
1020                                    "severity": "error",
1021                                    "code": "invalid",
1022                                    "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1023                                }]
1024                            }),
1025                        );
1026                        result.counts.increment(error_result.outcome);
1027
1028                        if !options.continue_on_error
1029                            && (options.max_errors == 0
1030                                || result.counts.error_count() >= options.max_errors as u64)
1031                        {
1032                            return Ok(result.aborted("max errors exceeded"));
1033                        }
1034                        continue;
1035                    }
1036
1037                    batch.push(entry);
1038                }
1039                Err(e) => {
1040                    result.counts.increment(BulkEntryOutcome::ValidationError);
1041
1042                    if !options.continue_on_error
1043                        && (options.max_errors == 0
1044                            || result.counts.error_count() >= options.max_errors as u64)
1045                    {
1046                        return Ok(result.aborted(format!("Parse error: {}", e)));
1047                    }
1048                }
1049            }
1050
1051            // Process batch if it's full
1052            if batch.len() >= options.batch_size as usize {
1053                let batch_results = self
1054                    .process_entries(
1055                        tenant,
1056                        submission_id,
1057                        manifest_id,
1058                        std::mem::take(&mut batch),
1059                        options,
1060                    )
1061                    .await?;
1062
1063                for r in batch_results {
1064                    result.counts.increment(r.outcome);
1065                }
1066
1067                // Check if we need to abort
1068                if !options.continue_on_error
1069                    && options.max_errors > 0
1070                    && result.counts.error_count() >= options.max_errors as u64
1071                {
1072                    return Ok(result.aborted("max errors exceeded"));
1073                }
1074            }
1075        }
1076
1077        // Process remaining entries
1078        if !batch.is_empty() {
1079            let batch_results = self
1080                .process_entries(tenant, submission_id, manifest_id, batch, options)
1081                .await?;
1082
1083            for r in batch_results {
1084                result.counts.increment(r.outcome);
1085            }
1086        }
1087
1088        Ok(result)
1089    }
1090}
1091
1092#[async_trait]
1093impl BulkSubmitRollbackProvider for SqliteBackend {
1094    async fn record_change(
1095        &self,
1096        tenant: &TenantContext,
1097        submission_id: &SubmissionId,
1098        change: &SubmissionChange,
1099    ) -> StorageResult<()> {
1100        let conn = self.get_connection()?;
1101        let tenant_id = tenant.tenant_id().as_str();
1102
1103        let previous_content_bytes = change
1104            .previous_content
1105            .as_ref()
1106            .and_then(|c| serde_json::to_vec(c).ok());
1107
1108        conn.execute(
1109            "INSERT INTO bulk_submission_changes
1110             (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1111             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1112            params![
1113                tenant_id,
1114                &submission_id.submitter,
1115                &submission_id.submission_id,
1116                &change.change_id,
1117                &change.manifest_id,
1118                change.change_type.to_string(),
1119                &change.resource_type,
1120                &change.resource_id,
1121                &change.previous_version,
1122                &change.new_version,
1123                previous_content_bytes,
1124                change.changed_at.to_rfc3339()
1125            ],
1126        )
1127        .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1128
1129        Ok(())
1130    }
1131
1132    async fn list_changes(
1133        &self,
1134        tenant: &TenantContext,
1135        submission_id: &SubmissionId,
1136        limit: u32,
1137        offset: u32,
1138    ) -> StorageResult<Vec<SubmissionChange>> {
1139        let conn = self.get_connection()?;
1140        let tenant_id = tenant.tenant_id().as_str();
1141
1142        let mut stmt = conn
1143            .prepare(&format!(
1144                "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1145                 FROM bulk_submission_changes
1146                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1147                 ORDER BY changed_at DESC
1148                 LIMIT {} OFFSET {}",
1149                limit, offset
1150            ))
1151            .map_err(|e| internal_error(format!("Failed to prepare changes query: {}", e)))?;
1152
1153        let changes: Vec<SubmissionChange> = stmt
1154            .query_map(
1155                params![
1156                    tenant_id,
1157                    &submission_id.submitter,
1158                    &submission_id.submission_id
1159                ],
1160                |row| {
1161                    let change_id: String = row.get(0)?;
1162                    let manifest_id: String = row.get(1)?;
1163                    let change_type_str: String = row.get(2)?;
1164                    let resource_type: String = row.get(3)?;
1165                    let resource_id: String = row.get(4)?;
1166                    let previous_version: Option<String> = row.get(5)?;
1167                    let new_version: String = row.get(6)?;
1168                    let previous_content_bytes: Option<Vec<u8>> = row.get(7)?;
1169                    let changed_at_str: String = row.get(8)?;
1170
1171                    let change_type: ChangeType =
1172                        change_type_str.parse().unwrap_or(ChangeType::Create);
1173                    let previous_content =
1174                        previous_content_bytes.and_then(|b| serde_json::from_slice(&b).ok());
1175                    let changed_at = chrono::DateTime::parse_from_rfc3339(&changed_at_str)
1176                        .map(|dt| dt.with_timezone(&Utc))
1177                        .unwrap_or_else(|_| Utc::now());
1178
1179                    Ok(SubmissionChange {
1180                        change_id,
1181                        manifest_id,
1182                        change_type,
1183                        resource_type,
1184                        resource_id,
1185                        previous_version,
1186                        new_version,
1187                        previous_content,
1188                        changed_at,
1189                    })
1190                },
1191            )
1192            .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?
1193            .filter_map(|r| r.ok())
1194            .collect();
1195
1196        Ok(changes)
1197    }
1198
1199    async fn rollback_change(
1200        &self,
1201        tenant: &TenantContext,
1202        _submission_id: &SubmissionId,
1203        change: &SubmissionChange,
1204    ) -> StorageResult<bool> {
1205        match change.change_type {
1206            ChangeType::Create => {
1207                // Delete the created resource
1208                match self
1209                    .delete(tenant, &change.resource_type, &change.resource_id)
1210                    .await
1211                {
1212                    Ok(()) => Ok(true),
1213                    Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1214                        ..
1215                    })) => {
1216                        // Already deleted
1217                        Ok(true)
1218                    }
1219                    Err(e) => Err(e),
1220                }
1221            }
1222            ChangeType::Update => {
1223                // Restore the previous content
1224                if let Some(ref previous_content) = change.previous_content {
1225                    // Read current to get version for update
1226                    let current = self
1227                        .read(tenant, &change.resource_type, &change.resource_id)
1228                        .await?;
1229                    if let Some(current) = current {
1230                        self.update(tenant, &current, previous_content.clone())
1231                            .await?;
1232                        Ok(true)
1233                    } else {
1234                        // Resource no longer exists
1235                        Ok(false)
1236                    }
1237                } else {
1238                    // No previous content to restore
1239                    Ok(false)
1240                }
1241            }
1242        }
1243    }
1244}
1245
1246#[async_trait]
1247impl SubmitClaimStrategy for SqliteBackend {
1248    async fn claim_next_manifest(
1249        &self,
1250        worker_id: &WorkerId,
1251        lease_duration: StdDuration,
1252    ) -> StorageResult<Option<ManifestLease>> {
1253        let _guard = SUBMIT_CLAIM_LOCK.lock().await;
1254        let conn = self.get_connection()?;
1255        let now = Utc::now();
1256        let now_str = now.to_rfc3339();
1257        let lease_expiry = now
1258            + chrono::Duration::from_std(lease_duration)
1259                .unwrap_or_else(|_| chrono::Duration::seconds(60));
1260        let lease_expiry_str = lease_expiry.to_rfc3339();
1261
1262        // Find one eligible manifest with a fetchable URL: pending, or processing
1263        // with an expired lease. Only manifests of non-terminal submissions count.
1264        let row: Option<(String, String, String, String, i64)> = conn
1265            .query_row(
1266                "SELECT m.tenant_id, m.submitter, m.submission_id, m.manifest_id, m.fencing_token
1267                 FROM bulk_manifests m
1268                 JOIN bulk_submissions s
1269                   ON s.tenant_id = m.tenant_id AND s.submitter = m.submitter
1270                      AND s.submission_id = m.submission_id
1271                 WHERE m.manifest_url IS NOT NULL
1272                   AND s.status = 'in-progress'
1273                   AND (m.status = 'pending'
1274                        OR (m.status = 'processing'
1275                            AND (m.lease_expiry IS NULL OR m.lease_expiry < ?1)))
1276                 ORDER BY m.added_at LIMIT 1",
1277                params![now_str],
1278                |row| {
1279                    Ok((
1280                        row.get::<_, String>(0)?,
1281                        row.get::<_, String>(1)?,
1282                        row.get::<_, String>(2)?,
1283                        row.get::<_, String>(3)?,
1284                        row.get::<_, i64>(4)?,
1285                    ))
1286                },
1287            )
1288            .ok();
1289
1290        let Some((tenant_id, submitter, submission_id, manifest_id, fencing_token)) = row else {
1291            return Ok(None);
1292        };
1293        let new_token = fencing_token + 1;
1294
1295        conn.execute(
1296            "UPDATE bulk_manifests
1297             SET status = 'processing', worker_id = ?1, lease_expiry = ?2, fencing_token = ?3
1298             WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
1299            params![
1300                worker_id.as_str(),
1301                lease_expiry_str,
1302                new_token,
1303                tenant_id,
1304                submitter,
1305                submission_id,
1306                manifest_id
1307            ],
1308        )
1309        .map_err(|e| internal_error(format!("Failed to claim manifest: {}", e)))?;
1310
1311        Ok(Some(ManifestLease {
1312            tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1313            submission_id: SubmissionId::new(submitter, submission_id),
1314            manifest_id,
1315            worker_id: worker_id.clone(),
1316            lease_expiry,
1317            fencing_token: new_token as u64,
1318        }))
1319    }
1320
1321    async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError> {
1322        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1323        let now = Utc::now();
1324        let new_expiry = now + chrono::Duration::seconds(60);
1325        let affected = conn
1326            .execute(
1327                "UPDATE bulk_manifests SET lease_expiry = ?1
1328                 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4
1329                   AND manifest_id = ?5 AND worker_id = ?6 AND fencing_token = ?7",
1330                params![
1331                    new_expiry.to_rfc3339(),
1332                    lease.tenant.tenant_id().as_str(),
1333                    lease.submission_id.submitter,
1334                    lease.submission_id.submission_id,
1335                    lease.manifest_id,
1336                    lease.worker_id.as_str(),
1337                    lease.fencing_token as i64
1338                ],
1339            )
1340            .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
1341        if affected == 0 {
1342            Err(lease_lost(lease))
1343        } else {
1344            Ok(new_expiry)
1345        }
1346    }
1347
1348    async fn release(&self, lease: ManifestLease) -> StorageResult<()> {
1349        let conn = self.get_connection()?;
1350        conn.execute(
1351            "UPDATE bulk_manifests
1352             SET status = 'pending', worker_id = NULL, lease_expiry = NULL
1353             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4
1354               AND worker_id = ?5 AND fencing_token = ?6 AND status = 'processing'",
1355            params![
1356                lease.tenant.tenant_id().as_str(),
1357                lease.submission_id.submitter,
1358                lease.submission_id.submission_id,
1359                lease.manifest_id,
1360                lease.worker_id.as_str(),
1361                lease.fencing_token as i64
1362            ],
1363        )
1364        .map_err(|e| internal_error(format!("Failed to release manifest lease: {}", e)))?;
1365        Ok(())
1366    }
1367}
1368
1369#[async_trait]
1370impl SubmitWorkerStorage for SqliteBackend {
1371    async fn get_manifest_for_worker(
1372        &self,
1373        lease: &ManifestLease,
1374    ) -> Result<ManifestWorkerView, LeaseError> {
1375        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1376        type Row = (
1377            Option<String>,
1378            Option<String>,
1379            Option<String>,
1380            Option<String>,
1381            Option<String>,
1382            Option<String>,
1383            i64,
1384        );
1385        let row: Row = conn
1386            .query_row(
1387                "SELECT manifest_url, fhir_base_url, output_format, file_request_headers,
1388                        oauth_metadata_urls, file_encryption_key, last_processed_line
1389                 FROM bulk_manifests
1390                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1391                   AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1392                params![
1393                    lease.tenant.tenant_id().as_str(),
1394                    lease.submission_id.submitter,
1395                    lease.submission_id.submission_id,
1396                    lease.manifest_id,
1397                    lease.worker_id.as_str(),
1398                    lease.fencing_token as i64
1399                ],
1400                |r| {
1401                    Ok((
1402                        r.get(0)?,
1403                        r.get(1)?,
1404                        r.get(2)?,
1405                        r.get(3)?,
1406                        r.get(4)?,
1407                        r.get(5)?,
1408                        r.get(6)?,
1409                    ))
1410                },
1411            )
1412            .map_err(|_| lease_lost(lease))?;
1413
1414        let (
1415            manifest_url,
1416            fhir_base_url,
1417            output_format,
1418            headers_json,
1419            oauth_json,
1420            encryption_json,
1421            last_processed_line,
1422        ) = row;
1423
1424        let file_request_headers: Vec<(String, String)> = headers_json
1425            .as_deref()
1426            .and_then(|s| serde_json::from_str(s).ok())
1427            .unwrap_or_default();
1428        let oauth_metadata_urls: Vec<String> = oauth_json
1429            .as_deref()
1430            .and_then(|s| serde_json::from_str(s).ok())
1431            .unwrap_or_default();
1432        let file_encryption_key: Option<Value> = encryption_json
1433            .as_deref()
1434            .and_then(|s| serde_json::from_str(s).ok());
1435        let fhir_version = fhir_version_from_output_format(output_format.as_deref());
1436
1437        Ok(ManifestWorkerView {
1438            manifest_id: lease.manifest_id.clone(),
1439            manifest_url,
1440            fhir_base_url,
1441            output_format,
1442            file_request_headers,
1443            oauth_metadata_urls,
1444            file_encryption_key,
1445            last_processed_line: last_processed_line.max(0) as u64,
1446            fhir_version,
1447        })
1448    }
1449
1450    async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1451        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1452        let affected = conn
1453            .execute(
1454                "UPDATE bulk_manifests SET status = 'processing'
1455                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1456                   AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1457                params![
1458                    lease.tenant.tenant_id().as_str(),
1459                    lease.submission_id.submitter,
1460                    lease.submission_id.submission_id,
1461                    lease.manifest_id,
1462                    lease.worker_id.as_str(),
1463                    lease.fencing_token as i64
1464                ],
1465            )
1466            .map_err(|e| LeaseError::Storage(internal_error(format!("mark processing: {e}"))))?;
1467        if affected == 0 {
1468            Err(lease_lost(lease))
1469        } else {
1470            Ok(())
1471        }
1472    }
1473
1474    async fn update_manifest_progress(
1475        &self,
1476        lease: &ManifestLease,
1477        processed_entries: u64,
1478        failed_entries: u64,
1479        last_processed_line: u64,
1480    ) -> Result<(), LeaseError> {
1481        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1482        let affected = conn
1483            .execute(
1484                "UPDATE bulk_manifests
1485                 SET processed_entries = ?1, failed_entries = ?2, last_processed_line = ?3
1486                 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6
1487                   AND manifest_id = ?7 AND worker_id = ?8 AND fencing_token = ?9",
1488                params![
1489                    processed_entries as i64,
1490                    failed_entries as i64,
1491                    last_processed_line as i64,
1492                    lease.tenant.tenant_id().as_str(),
1493                    lease.submission_id.submitter,
1494                    lease.submission_id.submission_id,
1495                    lease.manifest_id,
1496                    lease.worker_id.as_str(),
1497                    lease.fencing_token as i64
1498                ],
1499            )
1500            .map_err(|e| LeaseError::Storage(internal_error(format!("update progress: {e}"))))?;
1501        if affected == 0 {
1502            Err(lease_lost(lease))
1503        } else {
1504            Ok(())
1505        }
1506    }
1507
1508    async fn record_submit_file(
1509        &self,
1510        lease: &ManifestLease,
1511        file: &SubmitFileRecord,
1512    ) -> Result<(), LeaseError> {
1513        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1514        // Fence: only record if we still hold the lease.
1515        let holds: bool = conn
1516            .query_row(
1517                "SELECT 1 FROM bulk_manifests
1518                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1519                   AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1520                params![
1521                    lease.tenant.tenant_id().as_str(),
1522                    lease.submission_id.submitter,
1523                    lease.submission_id.submission_id,
1524                    lease.manifest_id,
1525                    lease.worker_id.as_str(),
1526                    lease.fencing_token as i64
1527                ],
1528                |_| Ok(true),
1529            )
1530            .unwrap_or(false);
1531        if !holds {
1532            return Err(lease_lost(lease));
1533        }
1534
1535        let count_severity = file
1536            .count_severity
1537            .as_ref()
1538            .and_then(|v| serde_json::to_string(v).ok());
1539        conn.execute(
1540            "INSERT INTO bulk_submit_files
1541             (tenant_id, submitter, submission_id, manifest_url, file_type, resource_type,
1542              part_index, fencing_token, file_path, line_count, byte_count, count_severity,
1543              created_at)
1544             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1545            params![
1546                lease.tenant.tenant_id().as_str(),
1547                lease.submission_id.submitter,
1548                lease.submission_id.submission_id,
1549                file.manifest_url,
1550                file.file_type,
1551                file.resource_type,
1552                file.part_index as i64,
1553                lease.fencing_token as i64,
1554                file.file_path,
1555                file.line_count as i64,
1556                file.byte_count as i64,
1557                count_severity,
1558                Utc::now().to_rfc3339()
1559            ],
1560        )
1561        .map_err(|e| LeaseError::Storage(internal_error(format!("record submit file: {e}"))))?;
1562        Ok(())
1563    }
1564
1565    async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1566        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1567        let affected = conn
1568            .execute(
1569                "UPDATE bulk_manifests SET status = 'completed', worker_id = NULL, lease_expiry = NULL
1570                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1571                   AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1572                params![
1573                    lease.tenant.tenant_id().as_str(),
1574                    lease.submission_id.submitter,
1575                    lease.submission_id.submission_id,
1576                    lease.manifest_id,
1577                    lease.worker_id.as_str(),
1578                    lease.fencing_token as i64
1579                ],
1580            )
1581            .map_err(|e| LeaseError::Storage(internal_error(format!("finish manifest: {e}"))))?;
1582        if affected == 0 {
1583            Err(lease_lost(lease))
1584        } else {
1585            Ok(())
1586        }
1587    }
1588
1589    async fn fail_manifest(
1590        &self,
1591        lease: &ManifestLease,
1592        _error_message: &str,
1593    ) -> Result<(), LeaseError> {
1594        let conn = self.get_connection().map_err(LeaseError::Storage)?;
1595        let affected = conn
1596            .execute(
1597                "UPDATE bulk_manifests SET status = 'failed', worker_id = NULL, lease_expiry = NULL
1598                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1599                   AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1600                params![
1601                    lease.tenant.tenant_id().as_str(),
1602                    lease.submission_id.submitter,
1603                    lease.submission_id.submission_id,
1604                    lease.manifest_id,
1605                    lease.worker_id.as_str(),
1606                    lease.fencing_token as i64
1607                ],
1608            )
1609            .map_err(|e| LeaseError::Storage(internal_error(format!("fail manifest: {e}"))))?;
1610        if affected == 0 {
1611            Err(lease_lost(lease))
1612        } else {
1613            Ok(())
1614        }
1615    }
1616
1617    async fn set_manifest_fetch_params(
1618        &self,
1619        tenant: &TenantContext,
1620        id: &SubmissionId,
1621        manifest_id: &str,
1622        fhir_base_url: Option<&str>,
1623        output_format: Option<&str>,
1624        file_request_headers: &[(String, String)],
1625        oauth_metadata_urls: &[String],
1626        file_encryption_key: Option<&Value>,
1627    ) -> StorageResult<()> {
1628        let conn = self.get_connection()?;
1629        let headers_json = serde_json::to_string(file_request_headers).ok();
1630        let oauth_json = serde_json::to_string(oauth_metadata_urls).ok();
1631        let encryption_json = file_encryption_key.and_then(|v| serde_json::to_string(v).ok());
1632        conn.execute(
1633            "UPDATE bulk_manifests
1634             SET fhir_base_url = ?1, output_format = ?2, file_request_headers = ?3,
1635                 oauth_metadata_urls = ?4, file_encryption_key = ?5
1636             WHERE tenant_id = ?6 AND submitter = ?7 AND submission_id = ?8 AND manifest_id = ?9",
1637            params![
1638                fhir_base_url,
1639                output_format,
1640                headers_json,
1641                oauth_json,
1642                encryption_json,
1643                tenant.tenant_id().as_str(),
1644                id.submitter,
1645                id.submission_id,
1646                manifest_id
1647            ],
1648        )
1649        .map_err(|e| internal_error(format!("set manifest fetch params: {e}")))?;
1650        Ok(())
1651    }
1652
1653    async fn replace_manifest_by_url(
1654        &self,
1655        tenant: &TenantContext,
1656        id: &SubmissionId,
1657        manifest_url: &str,
1658    ) -> StorageResult<Vec<String>> {
1659        let conn = self.get_connection()?;
1660        let tenant_id = tenant.tenant_id().as_str();
1661        let mut stmt = conn
1662            .prepare(
1663                "SELECT manifest_id FROM bulk_manifests
1664                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1665                   AND manifest_url = ?4 AND status != 'replaced'",
1666            )
1667            .map_err(|e| internal_error(format!("prepare replace lookup: {e}")))?;
1668        let ids: Vec<String> = stmt
1669            .query_map(
1670                params![tenant_id, id.submitter, id.submission_id, manifest_url],
1671                |r| r.get::<_, String>(0),
1672            )
1673            .map_err(|e| internal_error(format!("query replace lookup: {e}")))?
1674            .filter_map(|r| r.ok())
1675            .collect();
1676        conn.execute(
1677            "UPDATE bulk_manifests SET status = 'replaced'
1678             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_url = ?4",
1679            params![tenant_id, id.submitter, id.submission_id, manifest_url],
1680        )
1681        .map_err(|e| internal_error(format!("mark replaced: {e}")))?;
1682        Ok(ids)
1683    }
1684
1685    async fn set_submission_kickoff_meta(
1686        &self,
1687        tenant: &TenantContext,
1688        id: &SubmissionId,
1689        owner_subject: Option<&str>,
1690        request_url: &str,
1691        requires_access_token: bool,
1692    ) -> StorageResult<()> {
1693        let conn = self.get_connection()?;
1694        conn.execute(
1695            "UPDATE bulk_submissions
1696             SET owner_subject = ?1, request_url = ?2, requires_access_token = ?3
1697             WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6",
1698            params![
1699                owner_subject,
1700                request_url,
1701                requires_access_token as i64,
1702                tenant.tenant_id().as_str(),
1703                id.submitter,
1704                id.submission_id
1705            ],
1706        )
1707        .map_err(|e| internal_error(format!("set kickoff meta: {e}")))?;
1708        Ok(())
1709    }
1710
1711    async fn ensure_poll_token(
1712        &self,
1713        tenant: &TenantContext,
1714        id: &SubmissionId,
1715    ) -> StorageResult<String> {
1716        let conn = self.get_connection()?;
1717        let tenant_id = tenant.tenant_id().as_str();
1718        let existing: Option<String> = conn
1719            .query_row(
1720                "SELECT poll_token FROM bulk_submissions
1721                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1722                params![tenant_id, id.submitter, id.submission_id],
1723                |r| r.get::<_, Option<String>>(0),
1724            )
1725            .map_err(|e| internal_error(format!("read poll token: {e}")))?;
1726        if let Some(token) = existing {
1727            return Ok(token);
1728        }
1729        let token = Uuid::new_v4().to_string();
1730        conn.execute(
1731            "UPDATE bulk_submissions SET poll_token = ?1
1732             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
1733            params![token, tenant_id, id.submitter, id.submission_id],
1734        )
1735        .map_err(|e| internal_error(format!("set poll token: {e}")))?;
1736        Ok(token)
1737    }
1738
1739    async fn list_expired_submissions(
1740        &self,
1741        now: DateTime<Utc>,
1742        ttl: StdDuration,
1743        limit: u32,
1744    ) -> StorageResult<Vec<(TenantContext, SubmissionId)>> {
1745        let conn = self.get_connection()?;
1746        let cutoff = (now
1747            - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::seconds(86400)))
1748        .to_rfc3339();
1749        let mut stmt = conn
1750            .prepare(
1751                "SELECT tenant_id, submitter, submission_id FROM bulk_submissions
1752                 WHERE updated_at < ?1 ORDER BY updated_at LIMIT ?2",
1753            )
1754            .map_err(|e| internal_error(format!("prepare expired: {e}")))?;
1755        let rows = stmt
1756            .query_map(params![cutoff, limit], |r| {
1757                Ok((
1758                    r.get::<_, String>(0)?,
1759                    r.get::<_, String>(1)?,
1760                    r.get::<_, String>(2)?,
1761                ))
1762            })
1763            .map_err(|e| internal_error(format!("query expired: {e}")))?;
1764        let mut out = Vec::new();
1765        for row in rows {
1766            let (tenant_id, submitter, submission_id) =
1767                row.map_err(|e| internal_error(format!("row expired: {e}")))?;
1768            out.push((
1769                TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1770                SubmissionId::new(submitter, submission_id),
1771            ));
1772        }
1773        Ok(out)
1774    }
1775
1776    async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>> {
1777        let conn = self.get_connection()?;
1778        let row: Option<(String, String, String, Option<String>)> = conn
1779            .query_row(
1780                "SELECT tenant_id, submitter, submission_id, owner_subject
1781                 FROM bulk_submissions WHERE poll_token = ?1",
1782                params![token],
1783                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
1784            )
1785            .ok();
1786        Ok(row.map(
1787            |(tenant_id, submitter, submission_id, owner_subject)| PollTokenTarget {
1788                tenant: TenantContext::new(
1789                    TenantId::new(tenant_id),
1790                    TenantPermissions::full_access(),
1791                ),
1792                submission_id: SubmissionId::new(submitter, submission_id),
1793                owner_subject,
1794            },
1795        ))
1796    }
1797
1798    async fn clear_poll_token(
1799        &self,
1800        tenant: &TenantContext,
1801        id: &SubmissionId,
1802    ) -> StorageResult<()> {
1803        let conn = self.get_connection()?;
1804        conn.execute(
1805            "UPDATE bulk_submissions SET poll_token = NULL
1806             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1807            params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1808        )
1809        .map_err(|e| internal_error(format!("clear poll token: {e}")))?;
1810        Ok(())
1811    }
1812
1813    async fn list_submit_files(
1814        &self,
1815        tenant: &TenantContext,
1816        id: &SubmissionId,
1817    ) -> StorageResult<Vec<SubmitFileRow>> {
1818        let conn = self.get_connection()?;
1819        let mut stmt = conn
1820            .prepare(
1821                "SELECT manifest_url, file_type, resource_type, part_index, fencing_token,
1822                        file_path, line_count, byte_count, count_severity
1823                 FROM bulk_submit_files
1824                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1825                 ORDER BY id",
1826            )
1827            .map_err(|e| internal_error(format!("prepare list files: {e}")))?;
1828        let rows = stmt
1829            .query_map(
1830                params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1831                |r| {
1832                    let count_severity: Option<String> = r.get(8)?;
1833                    Ok(SubmitFileRow {
1834                        manifest_url: r.get(0)?,
1835                        file_type: r.get(1)?,
1836                        resource_type: r.get(2)?,
1837                        part_index: r.get::<_, i64>(3)? as u32,
1838                        fencing_token: r.get::<_, i64>(4)? as u64,
1839                        file_path: r.get(5)?,
1840                        line_count: r.get::<_, i64>(6)? as u64,
1841                        byte_count: r.get::<_, i64>(7)? as u64,
1842                        count_severity: count_severity
1843                            .as_deref()
1844                            .and_then(|s| serde_json::from_str(s).ok()),
1845                    })
1846                },
1847            )
1848            .map_err(|e| internal_error(format!("query list files: {e}")))?;
1849        let mut out = Vec::new();
1850        for row in rows {
1851            out.push(row.map_err(|e| internal_error(format!("row list files: {e}")))?);
1852        }
1853        Ok(out)
1854    }
1855
1856    async fn delete_submission_artifacts(
1857        &self,
1858        tenant: &TenantContext,
1859        id: &SubmissionId,
1860    ) -> StorageResult<()> {
1861        let conn = self.get_connection()?;
1862        conn.execute(
1863            "DELETE FROM bulk_submit_files
1864             WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1865            params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1866        )
1867        .map_err(|e| internal_error(format!("delete artifacts: {e}")))?;
1868        Ok(())
1869    }
1870
1871    async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64> {
1872        let conn = self.get_connection()?;
1873        let count: i64 = conn
1874            .query_row(
1875                "SELECT COUNT(*) FROM bulk_submissions
1876                 WHERE tenant_id = ?1 AND status = 'in-progress'",
1877                params![tenant.tenant_id().as_str()],
1878                |r| r.get(0),
1879            )
1880            .map_err(|e| internal_error(format!("count active submissions: {e}")))?;
1881        Ok(count.max(0) as u64)
1882    }
1883
1884    async fn ensure_transaction_time(
1885        &self,
1886        tenant: &TenantContext,
1887        id: &SubmissionId,
1888    ) -> StorageResult<DateTime<Utc>> {
1889        let conn = self.get_connection()?;
1890        let tenant_id = tenant.tenant_id().as_str();
1891        let existing: Option<String> = conn
1892            .query_row(
1893                "SELECT transaction_time FROM bulk_submissions
1894                 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1895                params![tenant_id, id.submitter, id.submission_id],
1896                |r| r.get::<_, Option<String>>(0),
1897            )
1898            .map_err(|e| internal_error(format!("read transaction_time: {e}")))?;
1899        if let Some(ts) = existing.as_deref() {
1900            if let Ok(dt) = DateTime::parse_from_rfc3339(ts) {
1901                return Ok(dt.with_timezone(&Utc));
1902            }
1903        }
1904        let now = Utc::now();
1905        conn.execute(
1906            "UPDATE bulk_submissions SET transaction_time = ?1
1907             WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
1908            params![now.to_rfc3339(), tenant_id, id.submitter, id.submission_id],
1909        )
1910        .map_err(|e| internal_error(format!("set transaction_time: {e}")))?;
1911        Ok(now)
1912    }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917    use super::*;
1918    use crate::tenant::{TenantId, TenantPermissions};
1919    use serde_json::json;
1920
1921    fn create_test_backend() -> SqliteBackend {
1922        let backend = SqliteBackend::in_memory().unwrap();
1923        backend.init_schema().unwrap();
1924        backend
1925    }
1926
1927    fn create_test_tenant() -> TenantContext {
1928        TenantContext::new(
1929            TenantId::new("test-tenant"),
1930            TenantPermissions::full_access(),
1931        )
1932    }
1933
1934    #[tokio::test]
1935    async fn test_create_submission() {
1936        let backend = create_test_backend();
1937        let tenant = create_test_tenant();
1938
1939        let sub_id = SubmissionId::generate("test-system");
1940        let summary = backend
1941            .create_submission(&tenant, &sub_id, None)
1942            .await
1943            .unwrap();
1944
1945        assert_eq!(summary.status, SubmissionStatus::InProgress);
1946        assert_eq!(summary.manifest_count, 0);
1947    }
1948
1949    #[tokio::test]
1950    async fn test_duplicate_submission() {
1951        let backend = create_test_backend();
1952        let tenant = create_test_tenant();
1953
1954        let sub_id = SubmissionId::new("test-system", "sub-123");
1955        backend
1956            .create_submission(&tenant, &sub_id, None)
1957            .await
1958            .unwrap();
1959
1960        let result = backend.create_submission(&tenant, &sub_id, None).await;
1961        assert!(matches!(
1962            result,
1963            Err(StorageError::BulkSubmit(
1964                BulkSubmitError::DuplicateSubmission { .. }
1965            ))
1966        ));
1967    }
1968
1969    #[tokio::test]
1970    async fn test_add_manifest() {
1971        let backend = create_test_backend();
1972        let tenant = create_test_tenant();
1973
1974        let sub_id = SubmissionId::generate("test-system");
1975        backend
1976            .create_submission(&tenant, &sub_id, None)
1977            .await
1978            .unwrap();
1979
1980        let manifest = backend
1981            .add_manifest(
1982                &tenant,
1983                &sub_id,
1984                Some("http://example.com/data.ndjson"),
1985                None,
1986            )
1987            .await
1988            .unwrap();
1989
1990        assert_eq!(manifest.status, ManifestStatus::Pending);
1991        assert_eq!(
1992            manifest.manifest_url,
1993            Some("http://example.com/data.ndjson".to_string())
1994        );
1995    }
1996
1997    #[tokio::test]
1998    async fn test_process_entries() {
1999        let backend = create_test_backend();
2000        let tenant = create_test_tenant();
2001
2002        let sub_id = SubmissionId::generate("test-system");
2003        backend
2004            .create_submission(&tenant, &sub_id, None)
2005            .await
2006            .unwrap();
2007
2008        let manifest = backend
2009            .add_manifest(&tenant, &sub_id, None, None)
2010            .await
2011            .unwrap();
2012
2013        let entries = vec![
2014            NdjsonEntry::new(
2015                1,
2016                "Patient",
2017                json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
2018            ),
2019            NdjsonEntry::new(
2020                2,
2021                "Patient",
2022                json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
2023            ),
2024        ];
2025
2026        let options = BulkProcessingOptions::new();
2027        let results = backend
2028            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2029            .await
2030            .unwrap();
2031
2032        assert_eq!(results.len(), 2);
2033        assert!(results.iter().all(|r| r.is_success()));
2034        assert!(results.iter().all(|r| r.created));
2035    }
2036
2037    #[tokio::test]
2038    async fn test_complete_submission() {
2039        let backend = create_test_backend();
2040        let tenant = create_test_tenant();
2041
2042        let sub_id = SubmissionId::generate("test-system");
2043        backend
2044            .create_submission(&tenant, &sub_id, None)
2045            .await
2046            .unwrap();
2047
2048        let summary = backend.complete_submission(&tenant, &sub_id).await.unwrap();
2049        assert_eq!(summary.status, SubmissionStatus::Complete);
2050        assert!(summary.completed_at.is_some());
2051    }
2052
2053    #[tokio::test]
2054    async fn test_abort_submission() {
2055        let backend = create_test_backend();
2056        let tenant = create_test_tenant();
2057
2058        let sub_id = SubmissionId::generate("test-system");
2059        backend
2060            .create_submission(&tenant, &sub_id, None)
2061            .await
2062            .unwrap();
2063
2064        backend
2065            .add_manifest(&tenant, &sub_id, None, None)
2066            .await
2067            .unwrap();
2068
2069        let cancelled = backend
2070            .abort_submission(&tenant, &sub_id, "test abort")
2071            .await
2072            .unwrap();
2073        assert_eq!(cancelled, 1);
2074
2075        let summary = backend
2076            .get_submission(&tenant, &sub_id)
2077            .await
2078            .unwrap()
2079            .unwrap();
2080        assert_eq!(summary.status, SubmissionStatus::Aborted);
2081    }
2082
2083    #[tokio::test]
2084    async fn test_rollback_create() {
2085        let backend = create_test_backend();
2086        let tenant = create_test_tenant();
2087
2088        let sub_id = SubmissionId::generate("test-system");
2089        backend
2090            .create_submission(&tenant, &sub_id, None)
2091            .await
2092            .unwrap();
2093
2094        let manifest = backend
2095            .add_manifest(&tenant, &sub_id, None, None)
2096            .await
2097            .unwrap();
2098
2099        let entries = vec![NdjsonEntry::new(
2100            1,
2101            "Patient",
2102            json!({"resourceType": "Patient", "id": "rollback-test", "name": [{"family": "Test"}]}),
2103        )];
2104
2105        let options = BulkProcessingOptions::new();
2106        let _results = backend
2107            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2108            .await
2109            .unwrap();
2110
2111        // Verify resource was created
2112        let patient = backend
2113            .read(&tenant, "Patient", "rollback-test")
2114            .await
2115            .unwrap();
2116        assert!(patient.is_some());
2117
2118        // Rollback
2119        let changes = backend.list_changes(&tenant, &sub_id, 10, 0).await.unwrap();
2120        assert_eq!(changes.len(), 1);
2121
2122        let rolled_back = backend
2123            .rollback_change(&tenant, &sub_id, &changes[0])
2124            .await
2125            .unwrap();
2126        assert!(rolled_back);
2127
2128        // Verify resource was deleted
2129        let patient = backend.read(&tenant, "Patient", "rollback-test").await;
2130        assert!(patient.is_err()); // Should be Gone
2131    }
2132
2133    #[tokio::test]
2134    async fn test_entry_counts() {
2135        let backend = create_test_backend();
2136        let tenant = create_test_tenant();
2137
2138        let sub_id = SubmissionId::generate("test-system");
2139        backend
2140            .create_submission(&tenant, &sub_id, None)
2141            .await
2142            .unwrap();
2143
2144        let manifest = backend
2145            .add_manifest(&tenant, &sub_id, None, None)
2146            .await
2147            .unwrap();
2148
2149        let entries = vec![
2150            NdjsonEntry::new(
2151                1,
2152                "Patient",
2153                json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
2154            ),
2155            NdjsonEntry::new(
2156                2,
2157                "Patient",
2158                json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
2159            ),
2160        ];
2161
2162        let options = BulkProcessingOptions::new();
2163        backend
2164            .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2165            .await
2166            .unwrap();
2167
2168        let counts = backend
2169            .get_entry_counts(&tenant, &sub_id, &manifest.manifest_id)
2170            .await
2171            .unwrap();
2172
2173        assert_eq!(counts.total, 2);
2174        assert_eq!(counts.success, 2);
2175        assert_eq!(counts.error_count(), 0);
2176    }
2177
2178    async fn seed_claimable(backend: &SqliteBackend, tenant: &TenantContext) -> SubmissionId {
2179        let sub_id = SubmissionId::generate("worker-system");
2180        backend
2181            .create_submission(tenant, &sub_id, None)
2182            .await
2183            .unwrap();
2184        backend
2185            .add_manifest(
2186                tenant,
2187                &sub_id,
2188                Some("http://example.com/manifest.json"),
2189                None,
2190            )
2191            .await
2192            .unwrap();
2193        sub_id
2194    }
2195
2196    #[tokio::test]
2197    async fn test_claim_heartbeat_finish() {
2198        let backend = create_test_backend();
2199        let tenant = create_test_tenant();
2200        let sub_id = seed_claimable(&backend, &tenant).await;
2201        let worker = WorkerId::new("w1");
2202
2203        let lease = backend
2204            .claim_next_manifest(&worker, StdDuration::from_secs(60))
2205            .await
2206            .unwrap()
2207            .expect("a manifest should be claimable");
2208        assert_eq!(lease.submission_id, sub_id);
2209        assert_eq!(lease.fencing_token, 1);
2210
2211        // A second claim finds nothing (the only manifest is now processing w/ fresh lease).
2212        assert!(
2213            backend
2214                .claim_next_manifest(&WorkerId::new("w2"), StdDuration::from_secs(60))
2215                .await
2216                .unwrap()
2217                .is_none()
2218        );
2219
2220        backend.heartbeat(&lease).await.unwrap();
2221        backend.mark_manifest_processing(&lease).await.unwrap();
2222        backend
2223            .update_manifest_progress(&lease, 5, 1, 6)
2224            .await
2225            .unwrap();
2226        backend.finish_manifest(&lease).await.unwrap();
2227
2228        // After completion nothing is claimable.
2229        assert!(
2230            backend
2231                .claim_next_manifest(&worker, StdDuration::from_secs(60))
2232                .await
2233                .unwrap()
2234                .is_none()
2235        );
2236    }
2237
2238    #[tokio::test]
2239    async fn test_fencing_blocks_zombie_writer() {
2240        let backend = create_test_backend();
2241        let tenant = create_test_tenant();
2242        seed_claimable(&backend, &tenant).await;
2243
2244        // Claim with a zero-duration lease so it is immediately reclaimable.
2245        let stale = backend
2246            .claim_next_manifest(&WorkerId::new("old"), StdDuration::from_secs(0))
2247            .await
2248            .unwrap()
2249            .unwrap();
2250        // A new worker reclaims it (bumps the fencing token).
2251        let fresh = backend
2252            .claim_next_manifest(&WorkerId::new("new"), StdDuration::from_secs(60))
2253            .await
2254            .unwrap()
2255            .unwrap();
2256        assert!(fresh.fencing_token > stale.fencing_token);
2257
2258        // The stale lease can no longer mutate the manifest.
2259        assert!(matches!(
2260            backend.heartbeat(&stale).await,
2261            Err(LeaseError::LeaseLost { .. })
2262        ));
2263        assert!(matches!(
2264            backend.finish_manifest(&stale).await,
2265            Err(LeaseError::LeaseLost { .. })
2266        ));
2267        // The fresh lease still works.
2268        backend.finish_manifest(&fresh).await.unwrap();
2269    }
2270
2271    #[tokio::test]
2272    async fn test_poll_token_lifecycle() {
2273        let backend = create_test_backend();
2274        let tenant = create_test_tenant();
2275        let sub_id = SubmissionId::generate("poll-system");
2276        backend
2277            .create_submission(&tenant, &sub_id, None)
2278            .await
2279            .unwrap();
2280
2281        let token = backend.ensure_poll_token(&tenant, &sub_id).await.unwrap();
2282        // Idempotent: same token returned.
2283        assert_eq!(
2284            token,
2285            backend.ensure_poll_token(&tenant, &sub_id).await.unwrap()
2286        );
2287
2288        let resolved = backend.resolve_poll_token(&token).await.unwrap().unwrap();
2289        assert_eq!(resolved.submission_id, sub_id);
2290
2291        backend.clear_poll_token(&tenant, &sub_id).await.unwrap();
2292        assert!(backend.resolve_poll_token(&token).await.unwrap().is_none());
2293    }
2294
2295    #[tokio::test]
2296    async fn test_record_and_delete_artifacts() {
2297        let backend = create_test_backend();
2298        let tenant = create_test_tenant();
2299        seed_claimable(&backend, &tenant).await;
2300        let lease = backend
2301            .claim_next_manifest(&WorkerId::new("w1"), StdDuration::from_secs(60))
2302            .await
2303            .unwrap()
2304            .unwrap();
2305
2306        backend
2307            .record_submit_file(
2308                &lease,
2309                &SubmitFileRecord {
2310                    manifest_url: Some("http://example.com/manifest.json".to_string()),
2311                    file_type: "error".to_string(),
2312                    resource_type: None,
2313                    part_index: 0,
2314                    file_path: "tenant/sub/error-0.ndjson".to_string(),
2315                    line_count: 3,
2316                    byte_count: 120,
2317                    count_severity: Some(json!({"error": 3})),
2318                },
2319            )
2320            .await
2321            .unwrap();
2322
2323        let files = backend
2324            .list_submit_files(&tenant, &lease.submission_id)
2325            .await
2326            .unwrap();
2327        assert_eq!(files.len(), 1);
2328        assert_eq!(files[0].file_type, "error");
2329        assert_eq!(files[0].line_count, 3);
2330
2331        backend
2332            .delete_submission_artifacts(&tenant, &lease.submission_id)
2333            .await
2334            .unwrap();
2335        assert!(
2336            backend
2337                .list_submit_files(&tenant, &lease.submission_id)
2338                .await
2339                .unwrap()
2340                .is_empty()
2341        );
2342    }
2343}