1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use rusqlite::params;
7use serde_json::Value;
8use std::time::Duration as StdDuration;
9use tokio::io::{AsyncBufRead, AsyncBufReadExt};
10use tokio::sync::Mutex;
11use uuid::Uuid;
12
13use crate::core::ResourceStorage;
14use crate::core::bulk_export::ExportJobId;
15use crate::core::bulk_export_worker::{LeaseError, WorkerId};
16use crate::core::bulk_submit::{
17 BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
18 BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
19 StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
20 SubmissionManifest, SubmissionStatus, SubmissionSummary,
21};
22use crate::core::bulk_submit_worker::{
23 ManifestLease, ManifestWorkerView, PollTokenTarget, SubmitClaimStrategy, SubmitFileRecord,
24 SubmitFileRow, SubmitWorkerStorage,
25};
26use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
27use crate::tenant::{TenantContext, TenantId, TenantPermissions};
28
29use super::SqliteBackend;
30
31static SUBMIT_CLAIM_LOCK: Mutex<()> = Mutex::const_new(());
34
35fn lease_lost(lease: &ManifestLease) -> LeaseError {
38 LeaseError::LeaseLost {
39 job_id: ExportJobId::from_string(format!("{}/{}", lease.submission_id, lease.manifest_id)),
40 }
41}
42
43fn fhir_version_from_output_format(output_format: Option<&str>) -> FhirVersion {
45 output_format
46 .and_then(|fmt| {
47 fmt.split(';').find_map(|part| {
48 let part = part.trim();
49 part.strip_prefix("fhirVersion=")
50 .and_then(FhirVersion::from_mime_param)
51 })
52 })
53 .unwrap_or_else(FhirVersion::default_enabled)
54}
55
56fn internal_error(message: String) -> StorageError {
57 StorageError::Backend(BackendError::Internal {
58 backend_name: "sqlite".to_string(),
59 message,
60 source: None,
61 })
62}
63
64#[async_trait]
65impl BulkSubmitProvider for SqliteBackend {
66 async fn create_submission(
67 &self,
68 tenant: &TenantContext,
69 id: &SubmissionId,
70 metadata: Option<Value>,
71 ) -> StorageResult<SubmissionSummary> {
72 let conn = self.get_connection()?;
73 let tenant_id = tenant.tenant_id().as_str();
74
75 let exists: bool = conn
77 .query_row(
78 "SELECT 1 FROM bulk_submissions
79 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
80 params![tenant_id, &id.submitter, &id.submission_id],
81 |_| Ok(true),
82 )
83 .unwrap_or(false);
84
85 if exists {
86 return Err(StorageError::BulkSubmit(
87 BulkSubmitError::DuplicateSubmission {
88 submitter: id.submitter.clone(),
89 submission_id: id.submission_id.clone(),
90 },
91 ));
92 }
93
94 let now = Utc::now();
95 let now_str = now.to_rfc3339();
96 let metadata_bytes = metadata.as_ref().and_then(|m| serde_json::to_vec(m).ok());
97
98 conn.execute(
99 "INSERT INTO bulk_submissions
100 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
101 VALUES (?1, ?2, ?3, 'in-progress', ?4, ?5, ?6)",
102 params![
103 tenant_id,
104 &id.submitter,
105 &id.submission_id,
106 now_str,
107 now_str,
108 metadata_bytes
109 ],
110 )
111 .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
112
113 Ok(SubmissionSummary {
114 id: id.clone(),
115 status: SubmissionStatus::InProgress,
116 created_at: now,
117 updated_at: now,
118 completed_at: None,
119 manifest_count: 0,
120 total_entries: 0,
121 success_count: 0,
122 error_count: 0,
123 skipped_count: 0,
124 metadata,
125 })
126 }
127
128 async fn get_submission(
129 &self,
130 tenant: &TenantContext,
131 id: &SubmissionId,
132 ) -> StorageResult<Option<SubmissionSummary>> {
133 let conn = self.get_connection()?;
134 let tenant_id = tenant.tenant_id().as_str();
135
136 let result = conn.query_row(
137 "SELECT status, created_at, updated_at, completed_at, metadata
138 FROM bulk_submissions
139 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
140 params![tenant_id, &id.submitter, &id.submission_id],
141 |row| {
142 Ok((
143 row.get::<_, String>(0)?,
144 row.get::<_, String>(1)?,
145 row.get::<_, String>(2)?,
146 row.get::<_, Option<String>>(3)?,
147 row.get::<_, Option<Vec<u8>>>(4)?,
148 ))
149 },
150 );
151
152 match result {
153 Ok((status_str, created_at, updated_at, completed_at, metadata_bytes)) => {
154 let status: SubmissionStatus = status_str
155 .parse()
156 .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
157
158 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at)
159 .map_err(|e| internal_error(format!("Invalid created_at: {}", e)))?
160 .with_timezone(&Utc);
161
162 let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at)
163 .map_err(|e| internal_error(format!("Invalid updated_at: {}", e)))?
164 .with_timezone(&Utc);
165
166 let completed_at = completed_at.and_then(|s| {
167 chrono::DateTime::parse_from_rfc3339(&s)
168 .ok()
169 .map(|dt| dt.with_timezone(&Utc))
170 });
171
172 let metadata = metadata_bytes.and_then(|b| serde_json::from_slice(&b).ok());
173
174 let manifest_count: i32 = conn
176 .query_row(
177 "SELECT COUNT(*) FROM bulk_manifests
178 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
179 params![tenant_id, &id.submitter, &id.submission_id],
180 |row| row.get(0),
181 )
182 .unwrap_or(0);
183
184 let (total, success, errors, skipped): (i64, i64, i64, i64) = conn
186 .query_row(
187 "SELECT
188 COUNT(*),
189 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
190 SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
191 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
192 FROM bulk_entry_results
193 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
194 params![tenant_id, &id.submitter, &id.submission_id],
195 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
196 )
197 .unwrap_or((0, 0, 0, 0));
198
199 Ok(Some(SubmissionSummary {
200 id: id.clone(),
201 status,
202 created_at,
203 updated_at,
204 completed_at,
205 manifest_count: manifest_count as u32,
206 total_entries: total as u64,
207 success_count: success as u64,
208 error_count: errors as u64,
209 skipped_count: skipped as u64,
210 metadata,
211 }))
212 }
213 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
214 Err(e) => Err(internal_error(format!("Failed to get submission: {}", e))),
215 }
216 }
217
218 async fn list_submissions(
219 &self,
220 tenant: &TenantContext,
221 submitter: Option<&str>,
222 status: Option<SubmissionStatus>,
223 limit: u32,
224 offset: u32,
225 ) -> StorageResult<Vec<SubmissionSummary>> {
226 let ids: Vec<(String, String)> = {
228 let conn = self.get_connection()?;
229 let tenant_id = tenant.tenant_id().as_str();
230
231 let (query, params): (String, Vec<String>) = {
232 let mut query =
233 "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = ?1"
234 .to_string();
235 let mut params = vec![tenant_id.to_string()];
236
237 if let Some(submitter) = submitter {
238 query.push_str(" AND submitter = ?2");
239 params.push(submitter.to_string());
240 }
241
242 if let Some(status) = status {
243 let param_num = params.len() + 1;
244 query.push_str(&format!(" AND status = ?{}", param_num));
245 params.push(status.to_string());
246 }
247
248 query.push_str(" ORDER BY created_at DESC");
249 query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
250
251 (query, params)
252 };
253
254 let mut stmt = conn
255 .prepare(&query)
256 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
257
258 let params_refs: Vec<&dyn rusqlite::ToSql> =
259 params.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
260
261 stmt.query_map(params_refs.as_slice(), |row| {
262 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
263 })
264 .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?
265 .filter_map(|r| r.ok())
266 .collect()
267 };
268
269 let mut results = Vec::new();
270 for (submitter, submission_id) in ids {
271 let sub_id = SubmissionId::new(submitter, submission_id);
272 if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
273 results.push(summary);
274 }
275 }
276
277 Ok(results)
278 }
279
280 async fn complete_submission(
281 &self,
282 tenant: &TenantContext,
283 id: &SubmissionId,
284 ) -> StorageResult<SubmissionSummary> {
285 let conn = self.get_connection()?;
286 let tenant_id = tenant.tenant_id().as_str();
287
288 let current_status: String = conn
290 .query_row(
291 "SELECT status FROM bulk_submissions
292 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
293 params![tenant_id, &id.submitter, &id.submission_id],
294 |row| row.get(0),
295 )
296 .map_err(|e| {
297 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
298 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
299 submitter: id.submitter.clone(),
300 submission_id: id.submission_id.clone(),
301 })
302 } else {
303 internal_error(format!("Failed to get submission status: {}", e))
304 }
305 })?;
306
307 if current_status != "in-progress" {
308 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
309 submission_id: id.submission_id.clone(),
310 }));
311 }
312
313 let now = Utc::now().to_rfc3339();
314 conn.execute(
315 "UPDATE bulk_submissions SET status = 'complete', completed_at = ?1, updated_at = ?2
316 WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
317 params![now, now, tenant_id, &id.submitter, &id.submission_id],
318 )
319 .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
320
321 self.get_submission(tenant, id)
322 .await?
323 .ok_or_else(|| internal_error("Submission disappeared".to_string()))
324 }
325
326 async fn abort_submission(
327 &self,
328 tenant: &TenantContext,
329 id: &SubmissionId,
330 _reason: &str,
331 ) -> StorageResult<u64> {
332 let conn = self.get_connection()?;
333 let tenant_id = tenant.tenant_id().as_str();
334
335 let current_status: String = conn
337 .query_row(
338 "SELECT status FROM bulk_submissions
339 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
340 params![tenant_id, &id.submitter, &id.submission_id],
341 |row| row.get(0),
342 )
343 .map_err(|e| {
344 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
345 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
346 submitter: id.submitter.clone(),
347 submission_id: id.submission_id.clone(),
348 })
349 } else {
350 internal_error(format!("Failed to get submission status: {}", e))
351 }
352 })?;
353
354 if current_status != "in-progress" {
355 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
356 submission_id: id.submission_id.clone(),
357 }));
358 }
359
360 let pending_count: i64 = conn
362 .query_row(
363 "SELECT COUNT(*) FROM bulk_manifests
364 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
365 AND status IN ('pending', 'processing')",
366 params![tenant_id, &id.submitter, &id.submission_id],
367 |row| row.get(0),
368 )
369 .unwrap_or(0);
370
371 let now = Utc::now().to_rfc3339();
372
373 conn.execute(
375 "UPDATE bulk_submissions SET status = 'aborted', completed_at = ?1, updated_at = ?2
376 WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
377 params![now, now, tenant_id, &id.submitter, &id.submission_id],
378 )
379 .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
380
381 conn.execute(
383 "UPDATE bulk_manifests SET status = 'failed'
384 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
385 AND status IN ('pending', 'processing')",
386 params![tenant_id, &id.submitter, &id.submission_id],
387 )
388 .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
389
390 Ok(pending_count as u64)
391 }
392
393 async fn add_manifest(
394 &self,
395 tenant: &TenantContext,
396 submission_id: &SubmissionId,
397 manifest_url: Option<&str>,
398 replaces_manifest_url: Option<&str>,
399 ) -> StorageResult<SubmissionManifest> {
400 let conn = self.get_connection()?;
401 let tenant_id = tenant.tenant_id().as_str();
402
403 let status: String = conn
405 .query_row(
406 "SELECT status FROM bulk_submissions
407 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
408 params![
409 tenant_id,
410 &submission_id.submitter,
411 &submission_id.submission_id
412 ],
413 |row| row.get(0),
414 )
415 .map_err(|e| {
416 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
417 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
418 submitter: submission_id.submitter.clone(),
419 submission_id: submission_id.submission_id.clone(),
420 })
421 } else {
422 internal_error(format!("Failed to get submission: {}", e))
423 }
424 })?;
425
426 if status != "in-progress" {
427 return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
428 submission_id: submission_id.submission_id.clone(),
429 expected: "in-progress".to_string(),
430 actual: status,
431 }));
432 }
433
434 let manifest_id = Uuid::new_v4().to_string();
435 let now = Utc::now();
436 let now_str = now.to_rfc3339();
437
438 conn.execute(
439 "INSERT INTO bulk_manifests
440 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
441 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
442 params![
443 tenant_id,
444 &submission_id.submitter,
445 &submission_id.submission_id,
446 manifest_id,
447 manifest_url,
448 replaces_manifest_url,
449 now_str
450 ],
451 )
452 .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
453
454 conn.execute(
456 "UPDATE bulk_submissions SET updated_at = ?1
457 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
458 params![
459 now_str,
460 tenant_id,
461 &submission_id.submitter,
462 &submission_id.submission_id
463 ],
464 )
465 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
466
467 Ok(SubmissionManifest {
468 manifest_id,
469 manifest_url: manifest_url.map(String::from),
470 replaces_manifest_url: replaces_manifest_url.map(String::from),
471 status: ManifestStatus::Pending,
472 added_at: now,
473 total_entries: 0,
474 processed_entries: 0,
475 failed_entries: 0,
476 })
477 }
478
479 async fn get_manifest(
480 &self,
481 tenant: &TenantContext,
482 submission_id: &SubmissionId,
483 manifest_id: &str,
484 ) -> StorageResult<Option<SubmissionManifest>> {
485 let conn = self.get_connection()?;
486 let tenant_id = tenant.tenant_id().as_str();
487
488 let result = conn.query_row(
489 "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
490 FROM bulk_manifests
491 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
492 params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
493 |row| {
494 Ok((
495 row.get::<_, Option<String>>(0)?,
496 row.get::<_, Option<String>>(1)?,
497 row.get::<_, String>(2)?,
498 row.get::<_, String>(3)?,
499 row.get::<_, i64>(4)?,
500 row.get::<_, i64>(5)?,
501 row.get::<_, i64>(6)?,
502 ))
503 },
504 );
505
506 match result {
507 Ok((
508 manifest_url,
509 replaces_manifest_url,
510 status_str,
511 added_at,
512 total,
513 processed,
514 failed,
515 )) => {
516 let status: ManifestStatus = status_str.parse().map_err(|_| {
517 internal_error(format!("Invalid manifest status: {}", status_str))
518 })?;
519
520 let added_at = chrono::DateTime::parse_from_rfc3339(&added_at)
521 .map_err(|e| internal_error(format!("Invalid added_at: {}", e)))?
522 .with_timezone(&Utc);
523
524 Ok(Some(SubmissionManifest {
525 manifest_id: manifest_id.to_string(),
526 manifest_url,
527 replaces_manifest_url,
528 status,
529 added_at,
530 total_entries: total as u64,
531 processed_entries: processed as u64,
532 failed_entries: failed as u64,
533 }))
534 }
535 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
536 Err(e) => Err(internal_error(format!("Failed to get manifest: {}", e))),
537 }
538 }
539
540 async fn list_manifests(
541 &self,
542 tenant: &TenantContext,
543 submission_id: &SubmissionId,
544 ) -> StorageResult<Vec<SubmissionManifest>> {
545 let manifest_ids: Vec<String> = {
547 let conn = self.get_connection()?;
548 let tenant_id = tenant.tenant_id().as_str();
549
550 let mut stmt = conn
551 .prepare(
552 "SELECT manifest_id FROM bulk_manifests
553 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
554 ORDER BY added_at",
555 )
556 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
557
558 stmt.query_map(
559 params![
560 tenant_id,
561 &submission_id.submitter,
562 &submission_id.submission_id
563 ],
564 |row| row.get(0),
565 )
566 .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?
567 .filter_map(|r| r.ok())
568 .collect()
569 };
570
571 let mut results = Vec::new();
572 for manifest_id in manifest_ids {
573 if let Some(manifest) = self
574 .get_manifest(tenant, submission_id, &manifest_id)
575 .await?
576 {
577 results.push(manifest);
578 }
579 }
580
581 Ok(results)
582 }
583
584 async fn process_entries(
585 &self,
586 tenant: &TenantContext,
587 submission_id: &SubmissionId,
588 manifest_id: &str,
589 entries: Vec<NdjsonEntry>,
590 options: &BulkProcessingOptions,
591 ) -> StorageResult<Vec<BulkEntryResult>> {
592 let conn = self.get_connection()?;
593 let tenant_id = tenant.tenant_id().as_str();
594
595 if self
597 .get_manifest(tenant, submission_id, manifest_id)
598 .await?
599 .is_none()
600 {
601 return Err(StorageError::BulkSubmit(
602 BulkSubmitError::ManifestNotFound {
603 submission_id: submission_id.submission_id.clone(),
604 manifest_id: manifest_id.to_string(),
605 },
606 ));
607 }
608
609 conn.execute(
611 "UPDATE bulk_manifests SET status = 'processing'
612 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
613 params![
614 tenant_id,
615 &submission_id.submitter,
616 &submission_id.submission_id,
617 manifest_id
618 ],
619 )
620 .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
621
622 let mut results = Vec::new();
623 let mut error_count = 0u32;
624
625 for entry in entries {
626 if options.max_errors > 0 && error_count >= options.max_errors {
628 if !options.continue_on_error {
629 return Err(StorageError::BulkSubmit(
630 BulkSubmitError::MaxErrorsExceeded {
631 submission_id: submission_id.submission_id.clone(),
632 max_errors: options.max_errors,
633 },
634 ));
635 }
636 let skip_result = BulkEntryResult::skipped(
638 entry.line_number,
639 &entry.resource_type,
640 "max errors exceeded",
641 );
642 results.push(skip_result);
643 continue;
644 }
645
646 let result = self
648 .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
649 .await;
650
651 let entry_result = match result {
652 Ok(r) => r,
653 Err(e) => {
654 error_count += 1;
655 BulkEntryResult::processing_error(
656 entry.line_number,
657 &entry.resource_type,
658 serde_json::json!({
659 "resourceType": "OperationOutcome",
660 "issue": [{
661 "severity": "error",
662 "code": "exception",
663 "diagnostics": e.to_string()
664 }]
665 }),
666 )
667 }
668 };
669
670 if entry_result.is_error() {
671 error_count += 1;
672 }
673
674 self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
676 .await?;
677
678 results.push(entry_result);
679 }
680
681 let now = Utc::now().to_rfc3339();
683 conn.execute(
684 "UPDATE bulk_manifests SET
685 total_entries = total_entries + ?1,
686 processed_entries = processed_entries + ?2,
687 failed_entries = failed_entries + ?3
688 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
689 params![
690 results.len() as i64,
691 results.iter().filter(|r| r.is_success()).count() as i64,
692 error_count as i64,
693 tenant_id,
694 &submission_id.submitter,
695 &submission_id.submission_id,
696 manifest_id
697 ],
698 )
699 .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
700
701 conn.execute(
703 "UPDATE bulk_submissions SET updated_at = ?1
704 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
705 params![
706 now,
707 tenant_id,
708 &submission_id.submitter,
709 &submission_id.submission_id
710 ],
711 )
712 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
713
714 Ok(results)
715 }
716
717 async fn get_entry_results(
718 &self,
719 tenant: &TenantContext,
720 submission_id: &SubmissionId,
721 manifest_id: &str,
722 outcome_filter: Option<BulkEntryOutcome>,
723 limit: u32,
724 offset: u32,
725 ) -> StorageResult<Vec<BulkEntryResult>> {
726 let conn = self.get_connection()?;
727 let tenant_id = tenant.tenant_id().as_str();
728
729 let mut query =
730 "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
731 FROM bulk_entry_results
732 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4"
733 .to_string();
734
735 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
736 Box::new(tenant_id.to_string()),
737 Box::new(submission_id.submitter.clone()),
738 Box::new(submission_id.submission_id.clone()),
739 Box::new(manifest_id.to_string()),
740 ];
741
742 if let Some(outcome) = outcome_filter {
743 query.push_str(" AND outcome = ?");
744 params_vec.push(Box::new(outcome.to_string()));
745 }
746
747 query.push_str(" ORDER BY line_number");
748 query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
749
750 let params_slice: Vec<&dyn rusqlite::ToSql> =
751 params_vec.iter().map(|p| p.as_ref()).collect();
752
753 let mut stmt = conn
754 .prepare(&query)
755 .map_err(|e| internal_error(format!("Failed to prepare results query: {}", e)))?;
756
757 let results: Vec<BulkEntryResult> = stmt
758 .query_map(params_slice.as_slice(), |row| {
759 let line_number: i64 = row.get(0)?;
760 let resource_type: String = row.get(1)?;
761 let resource_id: Option<String> = row.get(2)?;
762 let created: Option<i32> = row.get(3)?;
763 let outcome_str: String = row.get(4)?;
764 let operation_outcome_bytes: Option<Vec<u8>> = row.get(5)?;
765
766 let outcome: BulkEntryOutcome = outcome_str
767 .parse()
768 .unwrap_or(BulkEntryOutcome::ProcessingError);
769
770 let operation_outcome =
771 operation_outcome_bytes.and_then(|b| serde_json::from_slice(&b).ok());
772
773 Ok(BulkEntryResult {
774 line_number: line_number as u64,
775 resource_type,
776 resource_id,
777 created: created.map(|c| c != 0).unwrap_or(false),
778 outcome,
779 operation_outcome,
780 })
781 })
782 .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?
783 .filter_map(|r| r.ok())
784 .collect();
785
786 Ok(results)
787 }
788
789 async fn get_entry_counts(
790 &self,
791 tenant: &TenantContext,
792 submission_id: &SubmissionId,
793 manifest_id: &str,
794 ) -> StorageResult<EntryCountSummary> {
795 let conn = self.get_connection()?;
796 let tenant_id = tenant.tenant_id().as_str();
797
798 let (total, success, validation_error, processing_error, skipped): (i64, i64, i64, i64, i64) = conn
799 .query_row(
800 "SELECT
801 COUNT(*),
802 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
803 SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
804 SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
805 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
806 FROM bulk_entry_results
807 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
808 params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
809 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
810 )
811 .unwrap_or((0, 0, 0, 0, 0));
812
813 Ok(EntryCountSummary {
814 total: total as u64,
815 success: success as u64,
816 validation_error: validation_error as u64,
817 processing_error: processing_error as u64,
818 skipped: skipped as u64,
819 })
820 }
821}
822
823impl SqliteBackend {
824 async fn process_single_entry(
826 &self,
827 tenant: &TenantContext,
828 submission_id: &SubmissionId,
829 manifest_id: &str,
830 entry: &NdjsonEntry,
831 options: &BulkProcessingOptions,
832 ) -> StorageResult<BulkEntryResult> {
833 let resource_id = entry.resource_id.as_ref();
835
836 if let Some(id) = resource_id {
837 let existing = self.read(tenant, &entry.resource_type, id).await;
839
840 match existing {
841 Ok(Some(current)) => {
842 if !options.allow_updates {
844 return Ok(BulkEntryResult::skipped(
845 entry.line_number,
846 &entry.resource_type,
847 "updates not allowed",
848 ));
849 }
850
851 let change = SubmissionChange::update(
853 manifest_id,
854 &entry.resource_type,
855 id,
856 current.version_id(),
857 (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
858 current.content().clone(),
859 );
860 self.record_change(tenant, submission_id, &change).await?;
861
862 let updated = self
864 .update(tenant, ¤t, entry.resource.clone())
865 .await?;
866
867 Ok(BulkEntryResult::success(
868 entry.line_number,
869 &entry.resource_type,
870 updated.id(),
871 false,
872 ))
873 }
874 Ok(None)
875 | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
876 let created = self
879 .create(
880 tenant,
881 &entry.resource_type,
882 entry.resource.clone(),
883 FhirVersion::default_enabled(),
884 )
885 .await?;
886
887 let change = SubmissionChange::create(
889 manifest_id,
890 &entry.resource_type,
891 created.id(),
892 created.version_id(),
893 );
894 self.record_change(tenant, submission_id, &change).await?;
895
896 Ok(BulkEntryResult::success(
897 entry.line_number,
898 &entry.resource_type,
899 created.id(),
900 true,
901 ))
902 }
903 Err(e) => Err(e),
904 }
905 } else {
906 let created = self
909 .create(
910 tenant,
911 &entry.resource_type,
912 entry.resource.clone(),
913 FhirVersion::default_enabled(),
914 )
915 .await?;
916
917 let change = SubmissionChange::create(
919 manifest_id,
920 &entry.resource_type,
921 created.id(),
922 created.version_id(),
923 );
924 self.record_change(tenant, submission_id, &change).await?;
925
926 Ok(BulkEntryResult::success(
927 entry.line_number,
928 &entry.resource_type,
929 created.id(),
930 true,
931 ))
932 }
933 }
934
935 async fn store_entry_result(
937 &self,
938 tenant: &TenantContext,
939 submission_id: &SubmissionId,
940 manifest_id: &str,
941 result: &BulkEntryResult,
942 ) -> StorageResult<()> {
943 let conn = self.get_connection()?;
944 let tenant_id = tenant.tenant_id().as_str();
945
946 let outcome_bytes = result
947 .operation_outcome
948 .as_ref()
949 .and_then(|o| serde_json::to_vec(o).ok());
950
951 conn.execute(
952 "INSERT INTO bulk_entry_results
953 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
954 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
955 params![
956 tenant_id,
957 &submission_id.submitter,
958 &submission_id.submission_id,
959 manifest_id,
960 result.line_number as i64,
961 &result.resource_type,
962 &result.resource_id,
963 if result.created { Some(1) } else { Some(0) },
964 result.outcome.to_string(),
965 outcome_bytes
966 ],
967 )
968 .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
969
970 Ok(())
971 }
972}
973
974#[async_trait]
975impl StreamingBulkSubmitProvider for SqliteBackend {
976 async fn process_ndjson_stream(
977 &self,
978 tenant: &TenantContext,
979 submission_id: &SubmissionId,
980 manifest_id: &str,
981 resource_type: &str,
982 mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
983 options: &BulkProcessingOptions,
984 ) -> StorageResult<StreamProcessingResult> {
985 let mut result = StreamProcessingResult::new();
986 let mut line_number = 0u64;
987 let mut batch = Vec::new();
988
989 loop {
990 let mut line = String::new();
991 let bytes_read = reader
992 .read_line(&mut line)
993 .await
994 .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
995
996 if bytes_read == 0 {
997 break;
999 }
1000
1001 line_number += 1;
1002 result.lines_processed = line_number;
1003
1004 let line = line.trim();
1005 if line.is_empty() {
1006 continue;
1007 }
1008
1009 match NdjsonEntry::parse(line_number, line) {
1011 Ok(entry) => {
1012 if entry.resource_type != resource_type {
1014 let error_result = BulkEntryResult::validation_error(
1015 line_number,
1016 &entry.resource_type,
1017 serde_json::json!({
1018 "resourceType": "OperationOutcome",
1019 "issue": [{
1020 "severity": "error",
1021 "code": "invalid",
1022 "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1023 }]
1024 }),
1025 );
1026 result.counts.increment(error_result.outcome);
1027
1028 if !options.continue_on_error
1029 && (options.max_errors == 0
1030 || result.counts.error_count() >= options.max_errors as u64)
1031 {
1032 return Ok(result.aborted("max errors exceeded"));
1033 }
1034 continue;
1035 }
1036
1037 batch.push(entry);
1038 }
1039 Err(e) => {
1040 result.counts.increment(BulkEntryOutcome::ValidationError);
1041
1042 if !options.continue_on_error
1043 && (options.max_errors == 0
1044 || result.counts.error_count() >= options.max_errors as u64)
1045 {
1046 return Ok(result.aborted(format!("Parse error: {}", e)));
1047 }
1048 }
1049 }
1050
1051 if batch.len() >= options.batch_size as usize {
1053 let batch_results = self
1054 .process_entries(
1055 tenant,
1056 submission_id,
1057 manifest_id,
1058 std::mem::take(&mut batch),
1059 options,
1060 )
1061 .await?;
1062
1063 for r in batch_results {
1064 result.counts.increment(r.outcome);
1065 }
1066
1067 if !options.continue_on_error
1069 && options.max_errors > 0
1070 && result.counts.error_count() >= options.max_errors as u64
1071 {
1072 return Ok(result.aborted("max errors exceeded"));
1073 }
1074 }
1075 }
1076
1077 if !batch.is_empty() {
1079 let batch_results = self
1080 .process_entries(tenant, submission_id, manifest_id, batch, options)
1081 .await?;
1082
1083 for r in batch_results {
1084 result.counts.increment(r.outcome);
1085 }
1086 }
1087
1088 Ok(result)
1089 }
1090}
1091
1092#[async_trait]
1093impl BulkSubmitRollbackProvider for SqliteBackend {
1094 async fn record_change(
1095 &self,
1096 tenant: &TenantContext,
1097 submission_id: &SubmissionId,
1098 change: &SubmissionChange,
1099 ) -> StorageResult<()> {
1100 let conn = self.get_connection()?;
1101 let tenant_id = tenant.tenant_id().as_str();
1102
1103 let previous_content_bytes = change
1104 .previous_content
1105 .as_ref()
1106 .and_then(|c| serde_json::to_vec(c).ok());
1107
1108 conn.execute(
1109 "INSERT INTO bulk_submission_changes
1110 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1111 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1112 params![
1113 tenant_id,
1114 &submission_id.submitter,
1115 &submission_id.submission_id,
1116 &change.change_id,
1117 &change.manifest_id,
1118 change.change_type.to_string(),
1119 &change.resource_type,
1120 &change.resource_id,
1121 &change.previous_version,
1122 &change.new_version,
1123 previous_content_bytes,
1124 change.changed_at.to_rfc3339()
1125 ],
1126 )
1127 .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1128
1129 Ok(())
1130 }
1131
1132 async fn list_changes(
1133 &self,
1134 tenant: &TenantContext,
1135 submission_id: &SubmissionId,
1136 limit: u32,
1137 offset: u32,
1138 ) -> StorageResult<Vec<SubmissionChange>> {
1139 let conn = self.get_connection()?;
1140 let tenant_id = tenant.tenant_id().as_str();
1141
1142 let mut stmt = conn
1143 .prepare(&format!(
1144 "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1145 FROM bulk_submission_changes
1146 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1147 ORDER BY changed_at DESC
1148 LIMIT {} OFFSET {}",
1149 limit, offset
1150 ))
1151 .map_err(|e| internal_error(format!("Failed to prepare changes query: {}", e)))?;
1152
1153 let changes: Vec<SubmissionChange> = stmt
1154 .query_map(
1155 params![
1156 tenant_id,
1157 &submission_id.submitter,
1158 &submission_id.submission_id
1159 ],
1160 |row| {
1161 let change_id: String = row.get(0)?;
1162 let manifest_id: String = row.get(1)?;
1163 let change_type_str: String = row.get(2)?;
1164 let resource_type: String = row.get(3)?;
1165 let resource_id: String = row.get(4)?;
1166 let previous_version: Option<String> = row.get(5)?;
1167 let new_version: String = row.get(6)?;
1168 let previous_content_bytes: Option<Vec<u8>> = row.get(7)?;
1169 let changed_at_str: String = row.get(8)?;
1170
1171 let change_type: ChangeType =
1172 change_type_str.parse().unwrap_or(ChangeType::Create);
1173 let previous_content =
1174 previous_content_bytes.and_then(|b| serde_json::from_slice(&b).ok());
1175 let changed_at = chrono::DateTime::parse_from_rfc3339(&changed_at_str)
1176 .map(|dt| dt.with_timezone(&Utc))
1177 .unwrap_or_else(|_| Utc::now());
1178
1179 Ok(SubmissionChange {
1180 change_id,
1181 manifest_id,
1182 change_type,
1183 resource_type,
1184 resource_id,
1185 previous_version,
1186 new_version,
1187 previous_content,
1188 changed_at,
1189 })
1190 },
1191 )
1192 .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?
1193 .filter_map(|r| r.ok())
1194 .collect();
1195
1196 Ok(changes)
1197 }
1198
1199 async fn rollback_change(
1200 &self,
1201 tenant: &TenantContext,
1202 _submission_id: &SubmissionId,
1203 change: &SubmissionChange,
1204 ) -> StorageResult<bool> {
1205 match change.change_type {
1206 ChangeType::Create => {
1207 match self
1209 .delete(tenant, &change.resource_type, &change.resource_id)
1210 .await
1211 {
1212 Ok(()) => Ok(true),
1213 Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1214 ..
1215 })) => {
1216 Ok(true)
1218 }
1219 Err(e) => Err(e),
1220 }
1221 }
1222 ChangeType::Update => {
1223 if let Some(ref previous_content) = change.previous_content {
1225 let current = self
1227 .read(tenant, &change.resource_type, &change.resource_id)
1228 .await?;
1229 if let Some(current) = current {
1230 self.update(tenant, ¤t, previous_content.clone())
1231 .await?;
1232 Ok(true)
1233 } else {
1234 Ok(false)
1236 }
1237 } else {
1238 Ok(false)
1240 }
1241 }
1242 }
1243 }
1244}
1245
1246#[async_trait]
1247impl SubmitClaimStrategy for SqliteBackend {
1248 async fn claim_next_manifest(
1249 &self,
1250 worker_id: &WorkerId,
1251 lease_duration: StdDuration,
1252 ) -> StorageResult<Option<ManifestLease>> {
1253 let _guard = SUBMIT_CLAIM_LOCK.lock().await;
1254 let conn = self.get_connection()?;
1255 let now = Utc::now();
1256 let now_str = now.to_rfc3339();
1257 let lease_expiry = now
1258 + chrono::Duration::from_std(lease_duration)
1259 .unwrap_or_else(|_| chrono::Duration::seconds(60));
1260 let lease_expiry_str = lease_expiry.to_rfc3339();
1261
1262 let row: Option<(String, String, String, String, i64)> = conn
1265 .query_row(
1266 "SELECT m.tenant_id, m.submitter, m.submission_id, m.manifest_id, m.fencing_token
1267 FROM bulk_manifests m
1268 JOIN bulk_submissions s
1269 ON s.tenant_id = m.tenant_id AND s.submitter = m.submitter
1270 AND s.submission_id = m.submission_id
1271 WHERE m.manifest_url IS NOT NULL
1272 AND s.status = 'in-progress'
1273 AND (m.status = 'pending'
1274 OR (m.status = 'processing'
1275 AND (m.lease_expiry IS NULL OR m.lease_expiry < ?1)))
1276 ORDER BY m.added_at LIMIT 1",
1277 params![now_str],
1278 |row| {
1279 Ok((
1280 row.get::<_, String>(0)?,
1281 row.get::<_, String>(1)?,
1282 row.get::<_, String>(2)?,
1283 row.get::<_, String>(3)?,
1284 row.get::<_, i64>(4)?,
1285 ))
1286 },
1287 )
1288 .ok();
1289
1290 let Some((tenant_id, submitter, submission_id, manifest_id, fencing_token)) = row else {
1291 return Ok(None);
1292 };
1293 let new_token = fencing_token + 1;
1294
1295 conn.execute(
1296 "UPDATE bulk_manifests
1297 SET status = 'processing', worker_id = ?1, lease_expiry = ?2, fencing_token = ?3
1298 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
1299 params![
1300 worker_id.as_str(),
1301 lease_expiry_str,
1302 new_token,
1303 tenant_id,
1304 submitter,
1305 submission_id,
1306 manifest_id
1307 ],
1308 )
1309 .map_err(|e| internal_error(format!("Failed to claim manifest: {}", e)))?;
1310
1311 Ok(Some(ManifestLease {
1312 tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1313 submission_id: SubmissionId::new(submitter, submission_id),
1314 manifest_id,
1315 worker_id: worker_id.clone(),
1316 lease_expiry,
1317 fencing_token: new_token as u64,
1318 }))
1319 }
1320
1321 async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError> {
1322 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1323 let now = Utc::now();
1324 let new_expiry = now + chrono::Duration::seconds(60);
1325 let affected = conn
1326 .execute(
1327 "UPDATE bulk_manifests SET lease_expiry = ?1
1328 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4
1329 AND manifest_id = ?5 AND worker_id = ?6 AND fencing_token = ?7",
1330 params![
1331 new_expiry.to_rfc3339(),
1332 lease.tenant.tenant_id().as_str(),
1333 lease.submission_id.submitter,
1334 lease.submission_id.submission_id,
1335 lease.manifest_id,
1336 lease.worker_id.as_str(),
1337 lease.fencing_token as i64
1338 ],
1339 )
1340 .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
1341 if affected == 0 {
1342 Err(lease_lost(lease))
1343 } else {
1344 Ok(new_expiry)
1345 }
1346 }
1347
1348 async fn release(&self, lease: ManifestLease) -> StorageResult<()> {
1349 let conn = self.get_connection()?;
1350 conn.execute(
1351 "UPDATE bulk_manifests
1352 SET status = 'pending', worker_id = NULL, lease_expiry = NULL
1353 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4
1354 AND worker_id = ?5 AND fencing_token = ?6 AND status = 'processing'",
1355 params![
1356 lease.tenant.tenant_id().as_str(),
1357 lease.submission_id.submitter,
1358 lease.submission_id.submission_id,
1359 lease.manifest_id,
1360 lease.worker_id.as_str(),
1361 lease.fencing_token as i64
1362 ],
1363 )
1364 .map_err(|e| internal_error(format!("Failed to release manifest lease: {}", e)))?;
1365 Ok(())
1366 }
1367}
1368
1369#[async_trait]
1370impl SubmitWorkerStorage for SqliteBackend {
1371 async fn get_manifest_for_worker(
1372 &self,
1373 lease: &ManifestLease,
1374 ) -> Result<ManifestWorkerView, LeaseError> {
1375 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1376 type Row = (
1377 Option<String>,
1378 Option<String>,
1379 Option<String>,
1380 Option<String>,
1381 Option<String>,
1382 Option<String>,
1383 i64,
1384 );
1385 let row: Row = conn
1386 .query_row(
1387 "SELECT manifest_url, fhir_base_url, output_format, file_request_headers,
1388 oauth_metadata_urls, file_encryption_key, last_processed_line
1389 FROM bulk_manifests
1390 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1391 AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1392 params![
1393 lease.tenant.tenant_id().as_str(),
1394 lease.submission_id.submitter,
1395 lease.submission_id.submission_id,
1396 lease.manifest_id,
1397 lease.worker_id.as_str(),
1398 lease.fencing_token as i64
1399 ],
1400 |r| {
1401 Ok((
1402 r.get(0)?,
1403 r.get(1)?,
1404 r.get(2)?,
1405 r.get(3)?,
1406 r.get(4)?,
1407 r.get(5)?,
1408 r.get(6)?,
1409 ))
1410 },
1411 )
1412 .map_err(|_| lease_lost(lease))?;
1413
1414 let (
1415 manifest_url,
1416 fhir_base_url,
1417 output_format,
1418 headers_json,
1419 oauth_json,
1420 encryption_json,
1421 last_processed_line,
1422 ) = row;
1423
1424 let file_request_headers: Vec<(String, String)> = headers_json
1425 .as_deref()
1426 .and_then(|s| serde_json::from_str(s).ok())
1427 .unwrap_or_default();
1428 let oauth_metadata_urls: Vec<String> = oauth_json
1429 .as_deref()
1430 .and_then(|s| serde_json::from_str(s).ok())
1431 .unwrap_or_default();
1432 let file_encryption_key: Option<Value> = encryption_json
1433 .as_deref()
1434 .and_then(|s| serde_json::from_str(s).ok());
1435 let fhir_version = fhir_version_from_output_format(output_format.as_deref());
1436
1437 Ok(ManifestWorkerView {
1438 manifest_id: lease.manifest_id.clone(),
1439 manifest_url,
1440 fhir_base_url,
1441 output_format,
1442 file_request_headers,
1443 oauth_metadata_urls,
1444 file_encryption_key,
1445 last_processed_line: last_processed_line.max(0) as u64,
1446 fhir_version,
1447 })
1448 }
1449
1450 async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1451 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1452 let affected = conn
1453 .execute(
1454 "UPDATE bulk_manifests SET status = 'processing'
1455 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1456 AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1457 params![
1458 lease.tenant.tenant_id().as_str(),
1459 lease.submission_id.submitter,
1460 lease.submission_id.submission_id,
1461 lease.manifest_id,
1462 lease.worker_id.as_str(),
1463 lease.fencing_token as i64
1464 ],
1465 )
1466 .map_err(|e| LeaseError::Storage(internal_error(format!("mark processing: {e}"))))?;
1467 if affected == 0 {
1468 Err(lease_lost(lease))
1469 } else {
1470 Ok(())
1471 }
1472 }
1473
1474 async fn update_manifest_progress(
1475 &self,
1476 lease: &ManifestLease,
1477 processed_entries: u64,
1478 failed_entries: u64,
1479 last_processed_line: u64,
1480 ) -> Result<(), LeaseError> {
1481 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1482 let affected = conn
1483 .execute(
1484 "UPDATE bulk_manifests
1485 SET processed_entries = ?1, failed_entries = ?2, last_processed_line = ?3
1486 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6
1487 AND manifest_id = ?7 AND worker_id = ?8 AND fencing_token = ?9",
1488 params![
1489 processed_entries as i64,
1490 failed_entries as i64,
1491 last_processed_line as i64,
1492 lease.tenant.tenant_id().as_str(),
1493 lease.submission_id.submitter,
1494 lease.submission_id.submission_id,
1495 lease.manifest_id,
1496 lease.worker_id.as_str(),
1497 lease.fencing_token as i64
1498 ],
1499 )
1500 .map_err(|e| LeaseError::Storage(internal_error(format!("update progress: {e}"))))?;
1501 if affected == 0 {
1502 Err(lease_lost(lease))
1503 } else {
1504 Ok(())
1505 }
1506 }
1507
1508 async fn record_submit_file(
1509 &self,
1510 lease: &ManifestLease,
1511 file: &SubmitFileRecord,
1512 ) -> Result<(), LeaseError> {
1513 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1514 let holds: bool = conn
1516 .query_row(
1517 "SELECT 1 FROM bulk_manifests
1518 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1519 AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1520 params![
1521 lease.tenant.tenant_id().as_str(),
1522 lease.submission_id.submitter,
1523 lease.submission_id.submission_id,
1524 lease.manifest_id,
1525 lease.worker_id.as_str(),
1526 lease.fencing_token as i64
1527 ],
1528 |_| Ok(true),
1529 )
1530 .unwrap_or(false);
1531 if !holds {
1532 return Err(lease_lost(lease));
1533 }
1534
1535 let count_severity = file
1536 .count_severity
1537 .as_ref()
1538 .and_then(|v| serde_json::to_string(v).ok());
1539 conn.execute(
1540 "INSERT INTO bulk_submit_files
1541 (tenant_id, submitter, submission_id, manifest_url, file_type, resource_type,
1542 part_index, fencing_token, file_path, line_count, byte_count, count_severity,
1543 created_at)
1544 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1545 params![
1546 lease.tenant.tenant_id().as_str(),
1547 lease.submission_id.submitter,
1548 lease.submission_id.submission_id,
1549 file.manifest_url,
1550 file.file_type,
1551 file.resource_type,
1552 file.part_index as i64,
1553 lease.fencing_token as i64,
1554 file.file_path,
1555 file.line_count as i64,
1556 file.byte_count as i64,
1557 count_severity,
1558 Utc::now().to_rfc3339()
1559 ],
1560 )
1561 .map_err(|e| LeaseError::Storage(internal_error(format!("record submit file: {e}"))))?;
1562 Ok(())
1563 }
1564
1565 async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1566 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1567 let affected = conn
1568 .execute(
1569 "UPDATE bulk_manifests SET status = 'completed', worker_id = NULL, lease_expiry = NULL
1570 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1571 AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1572 params![
1573 lease.tenant.tenant_id().as_str(),
1574 lease.submission_id.submitter,
1575 lease.submission_id.submission_id,
1576 lease.manifest_id,
1577 lease.worker_id.as_str(),
1578 lease.fencing_token as i64
1579 ],
1580 )
1581 .map_err(|e| LeaseError::Storage(internal_error(format!("finish manifest: {e}"))))?;
1582 if affected == 0 {
1583 Err(lease_lost(lease))
1584 } else {
1585 Ok(())
1586 }
1587 }
1588
1589 async fn fail_manifest(
1590 &self,
1591 lease: &ManifestLease,
1592 _error_message: &str,
1593 ) -> Result<(), LeaseError> {
1594 let conn = self.get_connection().map_err(LeaseError::Storage)?;
1595 let affected = conn
1596 .execute(
1597 "UPDATE bulk_manifests SET status = 'failed', worker_id = NULL, lease_expiry = NULL
1598 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1599 AND manifest_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
1600 params![
1601 lease.tenant.tenant_id().as_str(),
1602 lease.submission_id.submitter,
1603 lease.submission_id.submission_id,
1604 lease.manifest_id,
1605 lease.worker_id.as_str(),
1606 lease.fencing_token as i64
1607 ],
1608 )
1609 .map_err(|e| LeaseError::Storage(internal_error(format!("fail manifest: {e}"))))?;
1610 if affected == 0 {
1611 Err(lease_lost(lease))
1612 } else {
1613 Ok(())
1614 }
1615 }
1616
1617 async fn set_manifest_fetch_params(
1618 &self,
1619 tenant: &TenantContext,
1620 id: &SubmissionId,
1621 manifest_id: &str,
1622 fhir_base_url: Option<&str>,
1623 output_format: Option<&str>,
1624 file_request_headers: &[(String, String)],
1625 oauth_metadata_urls: &[String],
1626 file_encryption_key: Option<&Value>,
1627 ) -> StorageResult<()> {
1628 let conn = self.get_connection()?;
1629 let headers_json = serde_json::to_string(file_request_headers).ok();
1630 let oauth_json = serde_json::to_string(oauth_metadata_urls).ok();
1631 let encryption_json = file_encryption_key.and_then(|v| serde_json::to_string(v).ok());
1632 conn.execute(
1633 "UPDATE bulk_manifests
1634 SET fhir_base_url = ?1, output_format = ?2, file_request_headers = ?3,
1635 oauth_metadata_urls = ?4, file_encryption_key = ?5
1636 WHERE tenant_id = ?6 AND submitter = ?7 AND submission_id = ?8 AND manifest_id = ?9",
1637 params![
1638 fhir_base_url,
1639 output_format,
1640 headers_json,
1641 oauth_json,
1642 encryption_json,
1643 tenant.tenant_id().as_str(),
1644 id.submitter,
1645 id.submission_id,
1646 manifest_id
1647 ],
1648 )
1649 .map_err(|e| internal_error(format!("set manifest fetch params: {e}")))?;
1650 Ok(())
1651 }
1652
1653 async fn replace_manifest_by_url(
1654 &self,
1655 tenant: &TenantContext,
1656 id: &SubmissionId,
1657 manifest_url: &str,
1658 ) -> StorageResult<Vec<String>> {
1659 let conn = self.get_connection()?;
1660 let tenant_id = tenant.tenant_id().as_str();
1661 let mut stmt = conn
1662 .prepare(
1663 "SELECT manifest_id FROM bulk_manifests
1664 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1665 AND manifest_url = ?4 AND status != 'replaced'",
1666 )
1667 .map_err(|e| internal_error(format!("prepare replace lookup: {e}")))?;
1668 let ids: Vec<String> = stmt
1669 .query_map(
1670 params![tenant_id, id.submitter, id.submission_id, manifest_url],
1671 |r| r.get::<_, String>(0),
1672 )
1673 .map_err(|e| internal_error(format!("query replace lookup: {e}")))?
1674 .filter_map(|r| r.ok())
1675 .collect();
1676 conn.execute(
1677 "UPDATE bulk_manifests SET status = 'replaced'
1678 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_url = ?4",
1679 params![tenant_id, id.submitter, id.submission_id, manifest_url],
1680 )
1681 .map_err(|e| internal_error(format!("mark replaced: {e}")))?;
1682 Ok(ids)
1683 }
1684
1685 async fn set_submission_kickoff_meta(
1686 &self,
1687 tenant: &TenantContext,
1688 id: &SubmissionId,
1689 owner_subject: Option<&str>,
1690 request_url: &str,
1691 requires_access_token: bool,
1692 ) -> StorageResult<()> {
1693 let conn = self.get_connection()?;
1694 conn.execute(
1695 "UPDATE bulk_submissions
1696 SET owner_subject = ?1, request_url = ?2, requires_access_token = ?3
1697 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6",
1698 params![
1699 owner_subject,
1700 request_url,
1701 requires_access_token as i64,
1702 tenant.tenant_id().as_str(),
1703 id.submitter,
1704 id.submission_id
1705 ],
1706 )
1707 .map_err(|e| internal_error(format!("set kickoff meta: {e}")))?;
1708 Ok(())
1709 }
1710
1711 async fn ensure_poll_token(
1712 &self,
1713 tenant: &TenantContext,
1714 id: &SubmissionId,
1715 ) -> StorageResult<String> {
1716 let conn = self.get_connection()?;
1717 let tenant_id = tenant.tenant_id().as_str();
1718 let existing: Option<String> = conn
1719 .query_row(
1720 "SELECT poll_token FROM bulk_submissions
1721 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1722 params![tenant_id, id.submitter, id.submission_id],
1723 |r| r.get::<_, Option<String>>(0),
1724 )
1725 .map_err(|e| internal_error(format!("read poll token: {e}")))?;
1726 if let Some(token) = existing {
1727 return Ok(token);
1728 }
1729 let token = Uuid::new_v4().to_string();
1730 conn.execute(
1731 "UPDATE bulk_submissions SET poll_token = ?1
1732 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
1733 params![token, tenant_id, id.submitter, id.submission_id],
1734 )
1735 .map_err(|e| internal_error(format!("set poll token: {e}")))?;
1736 Ok(token)
1737 }
1738
1739 async fn list_expired_submissions(
1740 &self,
1741 now: DateTime<Utc>,
1742 ttl: StdDuration,
1743 limit: u32,
1744 ) -> StorageResult<Vec<(TenantContext, SubmissionId)>> {
1745 let conn = self.get_connection()?;
1746 let cutoff = (now
1747 - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::seconds(86400)))
1748 .to_rfc3339();
1749 let mut stmt = conn
1750 .prepare(
1751 "SELECT tenant_id, submitter, submission_id FROM bulk_submissions
1752 WHERE updated_at < ?1 ORDER BY updated_at LIMIT ?2",
1753 )
1754 .map_err(|e| internal_error(format!("prepare expired: {e}")))?;
1755 let rows = stmt
1756 .query_map(params![cutoff, limit], |r| {
1757 Ok((
1758 r.get::<_, String>(0)?,
1759 r.get::<_, String>(1)?,
1760 r.get::<_, String>(2)?,
1761 ))
1762 })
1763 .map_err(|e| internal_error(format!("query expired: {e}")))?;
1764 let mut out = Vec::new();
1765 for row in rows {
1766 let (tenant_id, submitter, submission_id) =
1767 row.map_err(|e| internal_error(format!("row expired: {e}")))?;
1768 out.push((
1769 TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1770 SubmissionId::new(submitter, submission_id),
1771 ));
1772 }
1773 Ok(out)
1774 }
1775
1776 async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>> {
1777 let conn = self.get_connection()?;
1778 let row: Option<(String, String, String, Option<String>)> = conn
1779 .query_row(
1780 "SELECT tenant_id, submitter, submission_id, owner_subject
1781 FROM bulk_submissions WHERE poll_token = ?1",
1782 params![token],
1783 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
1784 )
1785 .ok();
1786 Ok(row.map(
1787 |(tenant_id, submitter, submission_id, owner_subject)| PollTokenTarget {
1788 tenant: TenantContext::new(
1789 TenantId::new(tenant_id),
1790 TenantPermissions::full_access(),
1791 ),
1792 submission_id: SubmissionId::new(submitter, submission_id),
1793 owner_subject,
1794 },
1795 ))
1796 }
1797
1798 async fn clear_poll_token(
1799 &self,
1800 tenant: &TenantContext,
1801 id: &SubmissionId,
1802 ) -> StorageResult<()> {
1803 let conn = self.get_connection()?;
1804 conn.execute(
1805 "UPDATE bulk_submissions SET poll_token = NULL
1806 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1807 params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1808 )
1809 .map_err(|e| internal_error(format!("clear poll token: {e}")))?;
1810 Ok(())
1811 }
1812
1813 async fn list_submit_files(
1814 &self,
1815 tenant: &TenantContext,
1816 id: &SubmissionId,
1817 ) -> StorageResult<Vec<SubmitFileRow>> {
1818 let conn = self.get_connection()?;
1819 let mut stmt = conn
1820 .prepare(
1821 "SELECT manifest_url, file_type, resource_type, part_index, fencing_token,
1822 file_path, line_count, byte_count, count_severity
1823 FROM bulk_submit_files
1824 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1825 ORDER BY id",
1826 )
1827 .map_err(|e| internal_error(format!("prepare list files: {e}")))?;
1828 let rows = stmt
1829 .query_map(
1830 params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1831 |r| {
1832 let count_severity: Option<String> = r.get(8)?;
1833 Ok(SubmitFileRow {
1834 manifest_url: r.get(0)?,
1835 file_type: r.get(1)?,
1836 resource_type: r.get(2)?,
1837 part_index: r.get::<_, i64>(3)? as u32,
1838 fencing_token: r.get::<_, i64>(4)? as u64,
1839 file_path: r.get(5)?,
1840 line_count: r.get::<_, i64>(6)? as u64,
1841 byte_count: r.get::<_, i64>(7)? as u64,
1842 count_severity: count_severity
1843 .as_deref()
1844 .and_then(|s| serde_json::from_str(s).ok()),
1845 })
1846 },
1847 )
1848 .map_err(|e| internal_error(format!("query list files: {e}")))?;
1849 let mut out = Vec::new();
1850 for row in rows {
1851 out.push(row.map_err(|e| internal_error(format!("row list files: {e}")))?);
1852 }
1853 Ok(out)
1854 }
1855
1856 async fn delete_submission_artifacts(
1857 &self,
1858 tenant: &TenantContext,
1859 id: &SubmissionId,
1860 ) -> StorageResult<()> {
1861 let conn = self.get_connection()?;
1862 conn.execute(
1863 "DELETE FROM bulk_submit_files
1864 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1865 params![tenant.tenant_id().as_str(), id.submitter, id.submission_id],
1866 )
1867 .map_err(|e| internal_error(format!("delete artifacts: {e}")))?;
1868 Ok(())
1869 }
1870
1871 async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64> {
1872 let conn = self.get_connection()?;
1873 let count: i64 = conn
1874 .query_row(
1875 "SELECT COUNT(*) FROM bulk_submissions
1876 WHERE tenant_id = ?1 AND status = 'in-progress'",
1877 params![tenant.tenant_id().as_str()],
1878 |r| r.get(0),
1879 )
1880 .map_err(|e| internal_error(format!("count active submissions: {e}")))?;
1881 Ok(count.max(0) as u64)
1882 }
1883
1884 async fn ensure_transaction_time(
1885 &self,
1886 tenant: &TenantContext,
1887 id: &SubmissionId,
1888 ) -> StorageResult<DateTime<Utc>> {
1889 let conn = self.get_connection()?;
1890 let tenant_id = tenant.tenant_id().as_str();
1891 let existing: Option<String> = conn
1892 .query_row(
1893 "SELECT transaction_time FROM bulk_submissions
1894 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
1895 params![tenant_id, id.submitter, id.submission_id],
1896 |r| r.get::<_, Option<String>>(0),
1897 )
1898 .map_err(|e| internal_error(format!("read transaction_time: {e}")))?;
1899 if let Some(ts) = existing.as_deref() {
1900 if let Ok(dt) = DateTime::parse_from_rfc3339(ts) {
1901 return Ok(dt.with_timezone(&Utc));
1902 }
1903 }
1904 let now = Utc::now();
1905 conn.execute(
1906 "UPDATE bulk_submissions SET transaction_time = ?1
1907 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
1908 params![now.to_rfc3339(), tenant_id, id.submitter, id.submission_id],
1909 )
1910 .map_err(|e| internal_error(format!("set transaction_time: {e}")))?;
1911 Ok(now)
1912 }
1913}
1914
1915#[cfg(test)]
1916mod tests {
1917 use super::*;
1918 use crate::tenant::{TenantId, TenantPermissions};
1919 use serde_json::json;
1920
1921 fn create_test_backend() -> SqliteBackend {
1922 let backend = SqliteBackend::in_memory().unwrap();
1923 backend.init_schema().unwrap();
1924 backend
1925 }
1926
1927 fn create_test_tenant() -> TenantContext {
1928 TenantContext::new(
1929 TenantId::new("test-tenant"),
1930 TenantPermissions::full_access(),
1931 )
1932 }
1933
1934 #[tokio::test]
1935 async fn test_create_submission() {
1936 let backend = create_test_backend();
1937 let tenant = create_test_tenant();
1938
1939 let sub_id = SubmissionId::generate("test-system");
1940 let summary = backend
1941 .create_submission(&tenant, &sub_id, None)
1942 .await
1943 .unwrap();
1944
1945 assert_eq!(summary.status, SubmissionStatus::InProgress);
1946 assert_eq!(summary.manifest_count, 0);
1947 }
1948
1949 #[tokio::test]
1950 async fn test_duplicate_submission() {
1951 let backend = create_test_backend();
1952 let tenant = create_test_tenant();
1953
1954 let sub_id = SubmissionId::new("test-system", "sub-123");
1955 backend
1956 .create_submission(&tenant, &sub_id, None)
1957 .await
1958 .unwrap();
1959
1960 let result = backend.create_submission(&tenant, &sub_id, None).await;
1961 assert!(matches!(
1962 result,
1963 Err(StorageError::BulkSubmit(
1964 BulkSubmitError::DuplicateSubmission { .. }
1965 ))
1966 ));
1967 }
1968
1969 #[tokio::test]
1970 async fn test_add_manifest() {
1971 let backend = create_test_backend();
1972 let tenant = create_test_tenant();
1973
1974 let sub_id = SubmissionId::generate("test-system");
1975 backend
1976 .create_submission(&tenant, &sub_id, None)
1977 .await
1978 .unwrap();
1979
1980 let manifest = backend
1981 .add_manifest(
1982 &tenant,
1983 &sub_id,
1984 Some("http://example.com/data.ndjson"),
1985 None,
1986 )
1987 .await
1988 .unwrap();
1989
1990 assert_eq!(manifest.status, ManifestStatus::Pending);
1991 assert_eq!(
1992 manifest.manifest_url,
1993 Some("http://example.com/data.ndjson".to_string())
1994 );
1995 }
1996
1997 #[tokio::test]
1998 async fn test_process_entries() {
1999 let backend = create_test_backend();
2000 let tenant = create_test_tenant();
2001
2002 let sub_id = SubmissionId::generate("test-system");
2003 backend
2004 .create_submission(&tenant, &sub_id, None)
2005 .await
2006 .unwrap();
2007
2008 let manifest = backend
2009 .add_manifest(&tenant, &sub_id, None, None)
2010 .await
2011 .unwrap();
2012
2013 let entries = vec![
2014 NdjsonEntry::new(
2015 1,
2016 "Patient",
2017 json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
2018 ),
2019 NdjsonEntry::new(
2020 2,
2021 "Patient",
2022 json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
2023 ),
2024 ];
2025
2026 let options = BulkProcessingOptions::new();
2027 let results = backend
2028 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2029 .await
2030 .unwrap();
2031
2032 assert_eq!(results.len(), 2);
2033 assert!(results.iter().all(|r| r.is_success()));
2034 assert!(results.iter().all(|r| r.created));
2035 }
2036
2037 #[tokio::test]
2038 async fn test_complete_submission() {
2039 let backend = create_test_backend();
2040 let tenant = create_test_tenant();
2041
2042 let sub_id = SubmissionId::generate("test-system");
2043 backend
2044 .create_submission(&tenant, &sub_id, None)
2045 .await
2046 .unwrap();
2047
2048 let summary = backend.complete_submission(&tenant, &sub_id).await.unwrap();
2049 assert_eq!(summary.status, SubmissionStatus::Complete);
2050 assert!(summary.completed_at.is_some());
2051 }
2052
2053 #[tokio::test]
2054 async fn test_abort_submission() {
2055 let backend = create_test_backend();
2056 let tenant = create_test_tenant();
2057
2058 let sub_id = SubmissionId::generate("test-system");
2059 backend
2060 .create_submission(&tenant, &sub_id, None)
2061 .await
2062 .unwrap();
2063
2064 backend
2065 .add_manifest(&tenant, &sub_id, None, None)
2066 .await
2067 .unwrap();
2068
2069 let cancelled = backend
2070 .abort_submission(&tenant, &sub_id, "test abort")
2071 .await
2072 .unwrap();
2073 assert_eq!(cancelled, 1);
2074
2075 let summary = backend
2076 .get_submission(&tenant, &sub_id)
2077 .await
2078 .unwrap()
2079 .unwrap();
2080 assert_eq!(summary.status, SubmissionStatus::Aborted);
2081 }
2082
2083 #[tokio::test]
2084 async fn test_rollback_create() {
2085 let backend = create_test_backend();
2086 let tenant = create_test_tenant();
2087
2088 let sub_id = SubmissionId::generate("test-system");
2089 backend
2090 .create_submission(&tenant, &sub_id, None)
2091 .await
2092 .unwrap();
2093
2094 let manifest = backend
2095 .add_manifest(&tenant, &sub_id, None, None)
2096 .await
2097 .unwrap();
2098
2099 let entries = vec![NdjsonEntry::new(
2100 1,
2101 "Patient",
2102 json!({"resourceType": "Patient", "id": "rollback-test", "name": [{"family": "Test"}]}),
2103 )];
2104
2105 let options = BulkProcessingOptions::new();
2106 let _results = backend
2107 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2108 .await
2109 .unwrap();
2110
2111 let patient = backend
2113 .read(&tenant, "Patient", "rollback-test")
2114 .await
2115 .unwrap();
2116 assert!(patient.is_some());
2117
2118 let changes = backend.list_changes(&tenant, &sub_id, 10, 0).await.unwrap();
2120 assert_eq!(changes.len(), 1);
2121
2122 let rolled_back = backend
2123 .rollback_change(&tenant, &sub_id, &changes[0])
2124 .await
2125 .unwrap();
2126 assert!(rolled_back);
2127
2128 let patient = backend.read(&tenant, "Patient", "rollback-test").await;
2130 assert!(patient.is_err()); }
2132
2133 #[tokio::test]
2134 async fn test_entry_counts() {
2135 let backend = create_test_backend();
2136 let tenant = create_test_tenant();
2137
2138 let sub_id = SubmissionId::generate("test-system");
2139 backend
2140 .create_submission(&tenant, &sub_id, None)
2141 .await
2142 .unwrap();
2143
2144 let manifest = backend
2145 .add_manifest(&tenant, &sub_id, None, None)
2146 .await
2147 .unwrap();
2148
2149 let entries = vec![
2150 NdjsonEntry::new(
2151 1,
2152 "Patient",
2153 json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
2154 ),
2155 NdjsonEntry::new(
2156 2,
2157 "Patient",
2158 json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
2159 ),
2160 ];
2161
2162 let options = BulkProcessingOptions::new();
2163 backend
2164 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
2165 .await
2166 .unwrap();
2167
2168 let counts = backend
2169 .get_entry_counts(&tenant, &sub_id, &manifest.manifest_id)
2170 .await
2171 .unwrap();
2172
2173 assert_eq!(counts.total, 2);
2174 assert_eq!(counts.success, 2);
2175 assert_eq!(counts.error_count(), 0);
2176 }
2177
2178 async fn seed_claimable(backend: &SqliteBackend, tenant: &TenantContext) -> SubmissionId {
2179 let sub_id = SubmissionId::generate("worker-system");
2180 backend
2181 .create_submission(tenant, &sub_id, None)
2182 .await
2183 .unwrap();
2184 backend
2185 .add_manifest(
2186 tenant,
2187 &sub_id,
2188 Some("http://example.com/manifest.json"),
2189 None,
2190 )
2191 .await
2192 .unwrap();
2193 sub_id
2194 }
2195
2196 #[tokio::test]
2197 async fn test_claim_heartbeat_finish() {
2198 let backend = create_test_backend();
2199 let tenant = create_test_tenant();
2200 let sub_id = seed_claimable(&backend, &tenant).await;
2201 let worker = WorkerId::new("w1");
2202
2203 let lease = backend
2204 .claim_next_manifest(&worker, StdDuration::from_secs(60))
2205 .await
2206 .unwrap()
2207 .expect("a manifest should be claimable");
2208 assert_eq!(lease.submission_id, sub_id);
2209 assert_eq!(lease.fencing_token, 1);
2210
2211 assert!(
2213 backend
2214 .claim_next_manifest(&WorkerId::new("w2"), StdDuration::from_secs(60))
2215 .await
2216 .unwrap()
2217 .is_none()
2218 );
2219
2220 backend.heartbeat(&lease).await.unwrap();
2221 backend.mark_manifest_processing(&lease).await.unwrap();
2222 backend
2223 .update_manifest_progress(&lease, 5, 1, 6)
2224 .await
2225 .unwrap();
2226 backend.finish_manifest(&lease).await.unwrap();
2227
2228 assert!(
2230 backend
2231 .claim_next_manifest(&worker, StdDuration::from_secs(60))
2232 .await
2233 .unwrap()
2234 .is_none()
2235 );
2236 }
2237
2238 #[tokio::test]
2239 async fn test_fencing_blocks_zombie_writer() {
2240 let backend = create_test_backend();
2241 let tenant = create_test_tenant();
2242 seed_claimable(&backend, &tenant).await;
2243
2244 let stale = backend
2246 .claim_next_manifest(&WorkerId::new("old"), StdDuration::from_secs(0))
2247 .await
2248 .unwrap()
2249 .unwrap();
2250 let fresh = backend
2252 .claim_next_manifest(&WorkerId::new("new"), StdDuration::from_secs(60))
2253 .await
2254 .unwrap()
2255 .unwrap();
2256 assert!(fresh.fencing_token > stale.fencing_token);
2257
2258 assert!(matches!(
2260 backend.heartbeat(&stale).await,
2261 Err(LeaseError::LeaseLost { .. })
2262 ));
2263 assert!(matches!(
2264 backend.finish_manifest(&stale).await,
2265 Err(LeaseError::LeaseLost { .. })
2266 ));
2267 backend.finish_manifest(&fresh).await.unwrap();
2269 }
2270
2271 #[tokio::test]
2272 async fn test_poll_token_lifecycle() {
2273 let backend = create_test_backend();
2274 let tenant = create_test_tenant();
2275 let sub_id = SubmissionId::generate("poll-system");
2276 backend
2277 .create_submission(&tenant, &sub_id, None)
2278 .await
2279 .unwrap();
2280
2281 let token = backend.ensure_poll_token(&tenant, &sub_id).await.unwrap();
2282 assert_eq!(
2284 token,
2285 backend.ensure_poll_token(&tenant, &sub_id).await.unwrap()
2286 );
2287
2288 let resolved = backend.resolve_poll_token(&token).await.unwrap().unwrap();
2289 assert_eq!(resolved.submission_id, sub_id);
2290
2291 backend.clear_poll_token(&tenant, &sub_id).await.unwrap();
2292 assert!(backend.resolve_poll_token(&token).await.unwrap().is_none());
2293 }
2294
2295 #[tokio::test]
2296 async fn test_record_and_delete_artifacts() {
2297 let backend = create_test_backend();
2298 let tenant = create_test_tenant();
2299 seed_claimable(&backend, &tenant).await;
2300 let lease = backend
2301 .claim_next_manifest(&WorkerId::new("w1"), StdDuration::from_secs(60))
2302 .await
2303 .unwrap()
2304 .unwrap();
2305
2306 backend
2307 .record_submit_file(
2308 &lease,
2309 &SubmitFileRecord {
2310 manifest_url: Some("http://example.com/manifest.json".to_string()),
2311 file_type: "error".to_string(),
2312 resource_type: None,
2313 part_index: 0,
2314 file_path: "tenant/sub/error-0.ndjson".to_string(),
2315 line_count: 3,
2316 byte_count: 120,
2317 count_severity: Some(json!({"error": 3})),
2318 },
2319 )
2320 .await
2321 .unwrap();
2322
2323 let files = backend
2324 .list_submit_files(&tenant, &lease.submission_id)
2325 .await
2326 .unwrap();
2327 assert_eq!(files.len(), 1);
2328 assert_eq!(files[0].file_type, "error");
2329 assert_eq!(files[0].line_count, 3);
2330
2331 backend
2332 .delete_submission_artifacts(&tenant, &lease.submission_id)
2333 .await
2334 .unwrap();
2335 assert!(
2336 backend
2337 .list_submit_files(&tenant, &lease.submission_id)
2338 .await
2339 .unwrap()
2340 .is_empty()
2341 );
2342 }
2343}