1use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use rusqlite::params;
7use serde_json::Value;
8use tokio::io::{AsyncBufRead, AsyncBufReadExt};
9use uuid::Uuid;
10
11use crate::core::ResourceStorage;
12use crate::core::bulk_submit::{
13 BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
14 BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
15 StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
16 SubmissionManifest, SubmissionStatus, SubmissionSummary,
17};
18use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
19use crate::tenant::TenantContext;
20
21use super::SqliteBackend;
22
23fn internal_error(message: String) -> StorageError {
24 StorageError::Backend(BackendError::Internal {
25 backend_name: "sqlite".to_string(),
26 message,
27 source: None,
28 })
29}
30
31#[async_trait]
32impl BulkSubmitProvider for SqliteBackend {
33 async fn create_submission(
34 &self,
35 tenant: &TenantContext,
36 id: &SubmissionId,
37 metadata: Option<Value>,
38 ) -> StorageResult<SubmissionSummary> {
39 let conn = self.get_connection()?;
40 let tenant_id = tenant.tenant_id().as_str();
41
42 let exists: bool = conn
44 .query_row(
45 "SELECT 1 FROM bulk_submissions
46 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
47 params![tenant_id, &id.submitter, &id.submission_id],
48 |_| Ok(true),
49 )
50 .unwrap_or(false);
51
52 if exists {
53 return Err(StorageError::BulkSubmit(
54 BulkSubmitError::DuplicateSubmission {
55 submitter: id.submitter.clone(),
56 submission_id: id.submission_id.clone(),
57 },
58 ));
59 }
60
61 let now = Utc::now();
62 let now_str = now.to_rfc3339();
63 let metadata_bytes = metadata.as_ref().and_then(|m| serde_json::to_vec(m).ok());
64
65 conn.execute(
66 "INSERT INTO bulk_submissions
67 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
68 VALUES (?1, ?2, ?3, 'in-progress', ?4, ?5, ?6)",
69 params![
70 tenant_id,
71 &id.submitter,
72 &id.submission_id,
73 now_str,
74 now_str,
75 metadata_bytes
76 ],
77 )
78 .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
79
80 Ok(SubmissionSummary {
81 id: id.clone(),
82 status: SubmissionStatus::InProgress,
83 created_at: now,
84 updated_at: now,
85 completed_at: None,
86 manifest_count: 0,
87 total_entries: 0,
88 success_count: 0,
89 error_count: 0,
90 skipped_count: 0,
91 metadata,
92 })
93 }
94
95 async fn get_submission(
96 &self,
97 tenant: &TenantContext,
98 id: &SubmissionId,
99 ) -> StorageResult<Option<SubmissionSummary>> {
100 let conn = self.get_connection()?;
101 let tenant_id = tenant.tenant_id().as_str();
102
103 let result = conn.query_row(
104 "SELECT status, created_at, updated_at, completed_at, metadata
105 FROM bulk_submissions
106 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
107 params![tenant_id, &id.submitter, &id.submission_id],
108 |row| {
109 Ok((
110 row.get::<_, String>(0)?,
111 row.get::<_, String>(1)?,
112 row.get::<_, String>(2)?,
113 row.get::<_, Option<String>>(3)?,
114 row.get::<_, Option<Vec<u8>>>(4)?,
115 ))
116 },
117 );
118
119 match result {
120 Ok((status_str, created_at, updated_at, completed_at, metadata_bytes)) => {
121 let status: SubmissionStatus = status_str
122 .parse()
123 .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
124
125 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at)
126 .map_err(|e| internal_error(format!("Invalid created_at: {}", e)))?
127 .with_timezone(&Utc);
128
129 let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at)
130 .map_err(|e| internal_error(format!("Invalid updated_at: {}", e)))?
131 .with_timezone(&Utc);
132
133 let completed_at = completed_at.and_then(|s| {
134 chrono::DateTime::parse_from_rfc3339(&s)
135 .ok()
136 .map(|dt| dt.with_timezone(&Utc))
137 });
138
139 let metadata = metadata_bytes.and_then(|b| serde_json::from_slice(&b).ok());
140
141 let manifest_count: i32 = conn
143 .query_row(
144 "SELECT COUNT(*) FROM bulk_manifests
145 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
146 params![tenant_id, &id.submitter, &id.submission_id],
147 |row| row.get(0),
148 )
149 .unwrap_or(0);
150
151 let (total, success, errors, skipped): (i64, i64, i64, i64) = conn
153 .query_row(
154 "SELECT
155 COUNT(*),
156 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
157 SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
158 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
159 FROM bulk_entry_results
160 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
161 params![tenant_id, &id.submitter, &id.submission_id],
162 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
163 )
164 .unwrap_or((0, 0, 0, 0));
165
166 Ok(Some(SubmissionSummary {
167 id: id.clone(),
168 status,
169 created_at,
170 updated_at,
171 completed_at,
172 manifest_count: manifest_count as u32,
173 total_entries: total as u64,
174 success_count: success as u64,
175 error_count: errors as u64,
176 skipped_count: skipped as u64,
177 metadata,
178 }))
179 }
180 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
181 Err(e) => Err(internal_error(format!("Failed to get submission: {}", e))),
182 }
183 }
184
185 async fn list_submissions(
186 &self,
187 tenant: &TenantContext,
188 submitter: Option<&str>,
189 status: Option<SubmissionStatus>,
190 limit: u32,
191 offset: u32,
192 ) -> StorageResult<Vec<SubmissionSummary>> {
193 let ids: Vec<(String, String)> = {
195 let conn = self.get_connection()?;
196 let tenant_id = tenant.tenant_id().as_str();
197
198 let (query, params): (String, Vec<String>) = {
199 let mut query =
200 "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = ?1"
201 .to_string();
202 let mut params = vec![tenant_id.to_string()];
203
204 if let Some(submitter) = submitter {
205 query.push_str(" AND submitter = ?2");
206 params.push(submitter.to_string());
207 }
208
209 if let Some(status) = status {
210 let param_num = params.len() + 1;
211 query.push_str(&format!(" AND status = ?{}", param_num));
212 params.push(status.to_string());
213 }
214
215 query.push_str(" ORDER BY created_at DESC");
216 query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
217
218 (query, params)
219 };
220
221 let mut stmt = conn
222 .prepare(&query)
223 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
224
225 let params_refs: Vec<&dyn rusqlite::ToSql> =
226 params.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
227
228 stmt.query_map(params_refs.as_slice(), |row| {
229 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
230 })
231 .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?
232 .filter_map(|r| r.ok())
233 .collect()
234 };
235
236 let mut results = Vec::new();
237 for (submitter, submission_id) in ids {
238 let sub_id = SubmissionId::new(submitter, submission_id);
239 if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
240 results.push(summary);
241 }
242 }
243
244 Ok(results)
245 }
246
247 async fn complete_submission(
248 &self,
249 tenant: &TenantContext,
250 id: &SubmissionId,
251 ) -> StorageResult<SubmissionSummary> {
252 let conn = self.get_connection()?;
253 let tenant_id = tenant.tenant_id().as_str();
254
255 let current_status: String = conn
257 .query_row(
258 "SELECT status FROM bulk_submissions
259 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
260 params![tenant_id, &id.submitter, &id.submission_id],
261 |row| row.get(0),
262 )
263 .map_err(|e| {
264 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
265 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
266 submitter: id.submitter.clone(),
267 submission_id: id.submission_id.clone(),
268 })
269 } else {
270 internal_error(format!("Failed to get submission status: {}", e))
271 }
272 })?;
273
274 if current_status != "in-progress" {
275 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
276 submission_id: id.submission_id.clone(),
277 }));
278 }
279
280 let now = Utc::now().to_rfc3339();
281 conn.execute(
282 "UPDATE bulk_submissions SET status = 'complete', completed_at = ?1, updated_at = ?2
283 WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
284 params![now, now, tenant_id, &id.submitter, &id.submission_id],
285 )
286 .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
287
288 self.get_submission(tenant, id)
289 .await?
290 .ok_or_else(|| internal_error("Submission disappeared".to_string()))
291 }
292
293 async fn abort_submission(
294 &self,
295 tenant: &TenantContext,
296 id: &SubmissionId,
297 _reason: &str,
298 ) -> StorageResult<u64> {
299 let conn = self.get_connection()?;
300 let tenant_id = tenant.tenant_id().as_str();
301
302 let current_status: String = conn
304 .query_row(
305 "SELECT status FROM bulk_submissions
306 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
307 params![tenant_id, &id.submitter, &id.submission_id],
308 |row| row.get(0),
309 )
310 .map_err(|e| {
311 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
312 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
313 submitter: id.submitter.clone(),
314 submission_id: id.submission_id.clone(),
315 })
316 } else {
317 internal_error(format!("Failed to get submission status: {}", e))
318 }
319 })?;
320
321 if current_status != "in-progress" {
322 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
323 submission_id: id.submission_id.clone(),
324 }));
325 }
326
327 let pending_count: i64 = conn
329 .query_row(
330 "SELECT COUNT(*) FROM bulk_manifests
331 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
332 AND status IN ('pending', 'processing')",
333 params![tenant_id, &id.submitter, &id.submission_id],
334 |row| row.get(0),
335 )
336 .unwrap_or(0);
337
338 let now = Utc::now().to_rfc3339();
339
340 conn.execute(
342 "UPDATE bulk_submissions SET status = 'aborted', completed_at = ?1, updated_at = ?2
343 WHERE tenant_id = ?3 AND submitter = ?4 AND submission_id = ?5",
344 params![now, now, tenant_id, &id.submitter, &id.submission_id],
345 )
346 .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
347
348 conn.execute(
350 "UPDATE bulk_manifests SET status = 'failed'
351 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
352 AND status IN ('pending', 'processing')",
353 params![tenant_id, &id.submitter, &id.submission_id],
354 )
355 .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
356
357 Ok(pending_count as u64)
358 }
359
360 async fn add_manifest(
361 &self,
362 tenant: &TenantContext,
363 submission_id: &SubmissionId,
364 manifest_url: Option<&str>,
365 replaces_manifest_url: Option<&str>,
366 ) -> StorageResult<SubmissionManifest> {
367 let conn = self.get_connection()?;
368 let tenant_id = tenant.tenant_id().as_str();
369
370 let status: String = conn
372 .query_row(
373 "SELECT status FROM bulk_submissions
374 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3",
375 params![
376 tenant_id,
377 &submission_id.submitter,
378 &submission_id.submission_id
379 ],
380 |row| row.get(0),
381 )
382 .map_err(|e| {
383 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
384 StorageError::BulkSubmit(BulkSubmitError::SubmissionNotFound {
385 submitter: submission_id.submitter.clone(),
386 submission_id: submission_id.submission_id.clone(),
387 })
388 } else {
389 internal_error(format!("Failed to get submission: {}", e))
390 }
391 })?;
392
393 if status != "in-progress" {
394 return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
395 submission_id: submission_id.submission_id.clone(),
396 expected: "in-progress".to_string(),
397 actual: status,
398 }));
399 }
400
401 let manifest_id = Uuid::new_v4().to_string();
402 let now = Utc::now();
403 let now_str = now.to_rfc3339();
404
405 conn.execute(
406 "INSERT INTO bulk_manifests
407 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
408 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'pending', ?7)",
409 params![
410 tenant_id,
411 &submission_id.submitter,
412 &submission_id.submission_id,
413 manifest_id,
414 manifest_url,
415 replaces_manifest_url,
416 now_str
417 ],
418 )
419 .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
420
421 conn.execute(
423 "UPDATE bulk_submissions SET updated_at = ?1
424 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
425 params![
426 now_str,
427 tenant_id,
428 &submission_id.submitter,
429 &submission_id.submission_id
430 ],
431 )
432 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
433
434 Ok(SubmissionManifest {
435 manifest_id,
436 manifest_url: manifest_url.map(String::from),
437 replaces_manifest_url: replaces_manifest_url.map(String::from),
438 status: ManifestStatus::Pending,
439 added_at: now,
440 total_entries: 0,
441 processed_entries: 0,
442 failed_entries: 0,
443 })
444 }
445
446 async fn get_manifest(
447 &self,
448 tenant: &TenantContext,
449 submission_id: &SubmissionId,
450 manifest_id: &str,
451 ) -> StorageResult<Option<SubmissionManifest>> {
452 let conn = self.get_connection()?;
453 let tenant_id = tenant.tenant_id().as_str();
454
455 let result = conn.query_row(
456 "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
457 FROM bulk_manifests
458 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
459 params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
460 |row| {
461 Ok((
462 row.get::<_, Option<String>>(0)?,
463 row.get::<_, Option<String>>(1)?,
464 row.get::<_, String>(2)?,
465 row.get::<_, String>(3)?,
466 row.get::<_, i64>(4)?,
467 row.get::<_, i64>(5)?,
468 row.get::<_, i64>(6)?,
469 ))
470 },
471 );
472
473 match result {
474 Ok((
475 manifest_url,
476 replaces_manifest_url,
477 status_str,
478 added_at,
479 total,
480 processed,
481 failed,
482 )) => {
483 let status: ManifestStatus = status_str.parse().map_err(|_| {
484 internal_error(format!("Invalid manifest status: {}", status_str))
485 })?;
486
487 let added_at = chrono::DateTime::parse_from_rfc3339(&added_at)
488 .map_err(|e| internal_error(format!("Invalid added_at: {}", e)))?
489 .with_timezone(&Utc);
490
491 Ok(Some(SubmissionManifest {
492 manifest_id: manifest_id.to_string(),
493 manifest_url,
494 replaces_manifest_url,
495 status,
496 added_at,
497 total_entries: total as u64,
498 processed_entries: processed as u64,
499 failed_entries: failed as u64,
500 }))
501 }
502 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
503 Err(e) => Err(internal_error(format!("Failed to get manifest: {}", e))),
504 }
505 }
506
507 async fn list_manifests(
508 &self,
509 tenant: &TenantContext,
510 submission_id: &SubmissionId,
511 ) -> StorageResult<Vec<SubmissionManifest>> {
512 let manifest_ids: Vec<String> = {
514 let conn = self.get_connection()?;
515 let tenant_id = tenant.tenant_id().as_str();
516
517 let mut stmt = conn
518 .prepare(
519 "SELECT manifest_id FROM bulk_manifests
520 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
521 ORDER BY added_at",
522 )
523 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
524
525 stmt.query_map(
526 params![
527 tenant_id,
528 &submission_id.submitter,
529 &submission_id.submission_id
530 ],
531 |row| row.get(0),
532 )
533 .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?
534 .filter_map(|r| r.ok())
535 .collect()
536 };
537
538 let mut results = Vec::new();
539 for manifest_id in manifest_ids {
540 if let Some(manifest) = self
541 .get_manifest(tenant, submission_id, &manifest_id)
542 .await?
543 {
544 results.push(manifest);
545 }
546 }
547
548 Ok(results)
549 }
550
551 async fn process_entries(
552 &self,
553 tenant: &TenantContext,
554 submission_id: &SubmissionId,
555 manifest_id: &str,
556 entries: Vec<NdjsonEntry>,
557 options: &BulkProcessingOptions,
558 ) -> StorageResult<Vec<BulkEntryResult>> {
559 let conn = self.get_connection()?;
560 let tenant_id = tenant.tenant_id().as_str();
561
562 if self
564 .get_manifest(tenant, submission_id, manifest_id)
565 .await?
566 .is_none()
567 {
568 return Err(StorageError::BulkSubmit(
569 BulkSubmitError::ManifestNotFound {
570 submission_id: submission_id.submission_id.clone(),
571 manifest_id: manifest_id.to_string(),
572 },
573 ));
574 }
575
576 conn.execute(
578 "UPDATE bulk_manifests SET status = 'processing'
579 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
580 params![
581 tenant_id,
582 &submission_id.submitter,
583 &submission_id.submission_id,
584 manifest_id
585 ],
586 )
587 .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
588
589 let mut results = Vec::new();
590 let mut error_count = 0u32;
591
592 for entry in entries {
593 if options.max_errors > 0 && error_count >= options.max_errors {
595 if !options.continue_on_error {
596 return Err(StorageError::BulkSubmit(
597 BulkSubmitError::MaxErrorsExceeded {
598 submission_id: submission_id.submission_id.clone(),
599 max_errors: options.max_errors,
600 },
601 ));
602 }
603 let skip_result = BulkEntryResult::skipped(
605 entry.line_number,
606 &entry.resource_type,
607 "max errors exceeded",
608 );
609 results.push(skip_result);
610 continue;
611 }
612
613 let result = self
615 .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
616 .await;
617
618 let entry_result = match result {
619 Ok(r) => r,
620 Err(e) => {
621 error_count += 1;
622 BulkEntryResult::processing_error(
623 entry.line_number,
624 &entry.resource_type,
625 serde_json::json!({
626 "resourceType": "OperationOutcome",
627 "issue": [{
628 "severity": "error",
629 "code": "exception",
630 "diagnostics": e.to_string()
631 }]
632 }),
633 )
634 }
635 };
636
637 if entry_result.is_error() {
638 error_count += 1;
639 }
640
641 self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
643 .await?;
644
645 results.push(entry_result);
646 }
647
648 let now = Utc::now().to_rfc3339();
650 conn.execute(
651 "UPDATE bulk_manifests SET
652 total_entries = total_entries + ?1,
653 processed_entries = processed_entries + ?2,
654 failed_entries = failed_entries + ?3
655 WHERE tenant_id = ?4 AND submitter = ?5 AND submission_id = ?6 AND manifest_id = ?7",
656 params![
657 results.len() as i64,
658 results.iter().filter(|r| r.is_success()).count() as i64,
659 error_count as i64,
660 tenant_id,
661 &submission_id.submitter,
662 &submission_id.submission_id,
663 manifest_id
664 ],
665 )
666 .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
667
668 conn.execute(
670 "UPDATE bulk_submissions SET updated_at = ?1
671 WHERE tenant_id = ?2 AND submitter = ?3 AND submission_id = ?4",
672 params![
673 now,
674 tenant_id,
675 &submission_id.submitter,
676 &submission_id.submission_id
677 ],
678 )
679 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
680
681 Ok(results)
682 }
683
684 async fn get_entry_results(
685 &self,
686 tenant: &TenantContext,
687 submission_id: &SubmissionId,
688 manifest_id: &str,
689 outcome_filter: Option<BulkEntryOutcome>,
690 limit: u32,
691 offset: u32,
692 ) -> StorageResult<Vec<BulkEntryResult>> {
693 let conn = self.get_connection()?;
694 let tenant_id = tenant.tenant_id().as_str();
695
696 let mut query =
697 "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
698 FROM bulk_entry_results
699 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4"
700 .to_string();
701
702 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
703 Box::new(tenant_id.to_string()),
704 Box::new(submission_id.submitter.clone()),
705 Box::new(submission_id.submission_id.clone()),
706 Box::new(manifest_id.to_string()),
707 ];
708
709 if let Some(outcome) = outcome_filter {
710 query.push_str(" AND outcome = ?");
711 params_vec.push(Box::new(outcome.to_string()));
712 }
713
714 query.push_str(" ORDER BY line_number");
715 query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
716
717 let params_slice: Vec<&dyn rusqlite::ToSql> =
718 params_vec.iter().map(|p| p.as_ref()).collect();
719
720 let mut stmt = conn
721 .prepare(&query)
722 .map_err(|e| internal_error(format!("Failed to prepare results query: {}", e)))?;
723
724 let results: Vec<BulkEntryResult> = stmt
725 .query_map(params_slice.as_slice(), |row| {
726 let line_number: i64 = row.get(0)?;
727 let resource_type: String = row.get(1)?;
728 let resource_id: Option<String> = row.get(2)?;
729 let created: Option<i32> = row.get(3)?;
730 let outcome_str: String = row.get(4)?;
731 let operation_outcome_bytes: Option<Vec<u8>> = row.get(5)?;
732
733 let outcome: BulkEntryOutcome = outcome_str
734 .parse()
735 .unwrap_or(BulkEntryOutcome::ProcessingError);
736
737 let operation_outcome =
738 operation_outcome_bytes.and_then(|b| serde_json::from_slice(&b).ok());
739
740 Ok(BulkEntryResult {
741 line_number: line_number as u64,
742 resource_type,
743 resource_id,
744 created: created.map(|c| c != 0).unwrap_or(false),
745 outcome,
746 operation_outcome,
747 })
748 })
749 .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?
750 .filter_map(|r| r.ok())
751 .collect();
752
753 Ok(results)
754 }
755
756 async fn get_entry_counts(
757 &self,
758 tenant: &TenantContext,
759 submission_id: &SubmissionId,
760 manifest_id: &str,
761 ) -> StorageResult<EntryCountSummary> {
762 let conn = self.get_connection()?;
763 let tenant_id = tenant.tenant_id().as_str();
764
765 let (total, success, validation_error, processing_error, skipped): (i64, i64, i64, i64, i64) = conn
766 .query_row(
767 "SELECT
768 COUNT(*),
769 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
770 SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
771 SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
772 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
773 FROM bulk_entry_results
774 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3 AND manifest_id = ?4",
775 params![tenant_id, &submission_id.submitter, &submission_id.submission_id, manifest_id],
776 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
777 )
778 .unwrap_or((0, 0, 0, 0, 0));
779
780 Ok(EntryCountSummary {
781 total: total as u64,
782 success: success as u64,
783 validation_error: validation_error as u64,
784 processing_error: processing_error as u64,
785 skipped: skipped as u64,
786 })
787 }
788}
789
790impl SqliteBackend {
791 async fn process_single_entry(
793 &self,
794 tenant: &TenantContext,
795 submission_id: &SubmissionId,
796 manifest_id: &str,
797 entry: &NdjsonEntry,
798 options: &BulkProcessingOptions,
799 ) -> StorageResult<BulkEntryResult> {
800 let resource_id = entry.resource_id.as_ref();
802
803 if let Some(id) = resource_id {
804 let existing = self.read(tenant, &entry.resource_type, id).await;
806
807 match existing {
808 Ok(Some(current)) => {
809 if !options.allow_updates {
811 return Ok(BulkEntryResult::skipped(
812 entry.line_number,
813 &entry.resource_type,
814 "updates not allowed",
815 ));
816 }
817
818 let change = SubmissionChange::update(
820 manifest_id,
821 &entry.resource_type,
822 id,
823 current.version_id(),
824 (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
825 current.content().clone(),
826 );
827 self.record_change(tenant, submission_id, &change).await?;
828
829 let updated = self
831 .update(tenant, ¤t, entry.resource.clone())
832 .await?;
833
834 Ok(BulkEntryResult::success(
835 entry.line_number,
836 &entry.resource_type,
837 updated.id(),
838 false,
839 ))
840 }
841 Ok(None)
842 | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
843 let created = self
846 .create(
847 tenant,
848 &entry.resource_type,
849 entry.resource.clone(),
850 FhirVersion::default(),
851 )
852 .await?;
853
854 let change = SubmissionChange::create(
856 manifest_id,
857 &entry.resource_type,
858 created.id(),
859 created.version_id(),
860 );
861 self.record_change(tenant, submission_id, &change).await?;
862
863 Ok(BulkEntryResult::success(
864 entry.line_number,
865 &entry.resource_type,
866 created.id(),
867 true,
868 ))
869 }
870 Err(e) => Err(e),
871 }
872 } else {
873 let created = self
876 .create(
877 tenant,
878 &entry.resource_type,
879 entry.resource.clone(),
880 FhirVersion::default(),
881 )
882 .await?;
883
884 let change = SubmissionChange::create(
886 manifest_id,
887 &entry.resource_type,
888 created.id(),
889 created.version_id(),
890 );
891 self.record_change(tenant, submission_id, &change).await?;
892
893 Ok(BulkEntryResult::success(
894 entry.line_number,
895 &entry.resource_type,
896 created.id(),
897 true,
898 ))
899 }
900 }
901
902 async fn store_entry_result(
904 &self,
905 tenant: &TenantContext,
906 submission_id: &SubmissionId,
907 manifest_id: &str,
908 result: &BulkEntryResult,
909 ) -> StorageResult<()> {
910 let conn = self.get_connection()?;
911 let tenant_id = tenant.tenant_id().as_str();
912
913 let outcome_bytes = result
914 .operation_outcome
915 .as_ref()
916 .and_then(|o| serde_json::to_vec(o).ok());
917
918 conn.execute(
919 "INSERT INTO bulk_entry_results
920 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
921 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
922 params![
923 tenant_id,
924 &submission_id.submitter,
925 &submission_id.submission_id,
926 manifest_id,
927 result.line_number as i64,
928 &result.resource_type,
929 &result.resource_id,
930 if result.created { Some(1) } else { Some(0) },
931 result.outcome.to_string(),
932 outcome_bytes
933 ],
934 )
935 .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
936
937 Ok(())
938 }
939}
940
941#[async_trait]
942impl StreamingBulkSubmitProvider for SqliteBackend {
943 async fn process_ndjson_stream(
944 &self,
945 tenant: &TenantContext,
946 submission_id: &SubmissionId,
947 manifest_id: &str,
948 resource_type: &str,
949 mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
950 options: &BulkProcessingOptions,
951 ) -> StorageResult<StreamProcessingResult> {
952 let mut result = StreamProcessingResult::new();
953 let mut line_number = 0u64;
954 let mut batch = Vec::new();
955
956 loop {
957 let mut line = String::new();
958 let bytes_read = reader
959 .read_line(&mut line)
960 .await
961 .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
962
963 if bytes_read == 0 {
964 break;
966 }
967
968 line_number += 1;
969 result.lines_processed = line_number;
970
971 let line = line.trim();
972 if line.is_empty() {
973 continue;
974 }
975
976 match NdjsonEntry::parse(line_number, line) {
978 Ok(entry) => {
979 if entry.resource_type != resource_type {
981 let error_result = BulkEntryResult::validation_error(
982 line_number,
983 &entry.resource_type,
984 serde_json::json!({
985 "resourceType": "OperationOutcome",
986 "issue": [{
987 "severity": "error",
988 "code": "invalid",
989 "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
990 }]
991 }),
992 );
993 result.counts.increment(error_result.outcome);
994
995 if !options.continue_on_error
996 && (options.max_errors == 0
997 || result.counts.error_count() >= options.max_errors as u64)
998 {
999 return Ok(result.aborted("max errors exceeded"));
1000 }
1001 continue;
1002 }
1003
1004 batch.push(entry);
1005 }
1006 Err(e) => {
1007 result.counts.increment(BulkEntryOutcome::ValidationError);
1008
1009 if !options.continue_on_error
1010 && (options.max_errors == 0
1011 || result.counts.error_count() >= options.max_errors as u64)
1012 {
1013 return Ok(result.aborted(format!("Parse error: {}", e)));
1014 }
1015 }
1016 }
1017
1018 if batch.len() >= options.batch_size as usize {
1020 let batch_results = self
1021 .process_entries(
1022 tenant,
1023 submission_id,
1024 manifest_id,
1025 std::mem::take(&mut batch),
1026 options,
1027 )
1028 .await?;
1029
1030 for r in batch_results {
1031 result.counts.increment(r.outcome);
1032 }
1033
1034 if !options.continue_on_error
1036 && options.max_errors > 0
1037 && result.counts.error_count() >= options.max_errors as u64
1038 {
1039 return Ok(result.aborted("max errors exceeded"));
1040 }
1041 }
1042 }
1043
1044 if !batch.is_empty() {
1046 let batch_results = self
1047 .process_entries(tenant, submission_id, manifest_id, batch, options)
1048 .await?;
1049
1050 for r in batch_results {
1051 result.counts.increment(r.outcome);
1052 }
1053 }
1054
1055 Ok(result)
1056 }
1057}
1058
1059#[async_trait]
1060impl BulkSubmitRollbackProvider for SqliteBackend {
1061 async fn record_change(
1062 &self,
1063 tenant: &TenantContext,
1064 submission_id: &SubmissionId,
1065 change: &SubmissionChange,
1066 ) -> StorageResult<()> {
1067 let conn = self.get_connection()?;
1068 let tenant_id = tenant.tenant_id().as_str();
1069
1070 let previous_content_bytes = change
1071 .previous_content
1072 .as_ref()
1073 .and_then(|c| serde_json::to_vec(c).ok());
1074
1075 conn.execute(
1076 "INSERT INTO bulk_submission_changes
1077 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1078 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
1079 params![
1080 tenant_id,
1081 &submission_id.submitter,
1082 &submission_id.submission_id,
1083 &change.change_id,
1084 &change.manifest_id,
1085 change.change_type.to_string(),
1086 &change.resource_type,
1087 &change.resource_id,
1088 &change.previous_version,
1089 &change.new_version,
1090 previous_content_bytes,
1091 change.changed_at.to_rfc3339()
1092 ],
1093 )
1094 .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1095
1096 Ok(())
1097 }
1098
1099 async fn list_changes(
1100 &self,
1101 tenant: &TenantContext,
1102 submission_id: &SubmissionId,
1103 limit: u32,
1104 offset: u32,
1105 ) -> StorageResult<Vec<SubmissionChange>> {
1106 let conn = self.get_connection()?;
1107 let tenant_id = tenant.tenant_id().as_str();
1108
1109 let mut stmt = conn
1110 .prepare(&format!(
1111 "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1112 FROM bulk_submission_changes
1113 WHERE tenant_id = ?1 AND submitter = ?2 AND submission_id = ?3
1114 ORDER BY changed_at DESC
1115 LIMIT {} OFFSET {}",
1116 limit, offset
1117 ))
1118 .map_err(|e| internal_error(format!("Failed to prepare changes query: {}", e)))?;
1119
1120 let changes: Vec<SubmissionChange> = stmt
1121 .query_map(
1122 params![
1123 tenant_id,
1124 &submission_id.submitter,
1125 &submission_id.submission_id
1126 ],
1127 |row| {
1128 let change_id: String = row.get(0)?;
1129 let manifest_id: String = row.get(1)?;
1130 let change_type_str: String = row.get(2)?;
1131 let resource_type: String = row.get(3)?;
1132 let resource_id: String = row.get(4)?;
1133 let previous_version: Option<String> = row.get(5)?;
1134 let new_version: String = row.get(6)?;
1135 let previous_content_bytes: Option<Vec<u8>> = row.get(7)?;
1136 let changed_at_str: String = row.get(8)?;
1137
1138 let change_type: ChangeType =
1139 change_type_str.parse().unwrap_or(ChangeType::Create);
1140 let previous_content =
1141 previous_content_bytes.and_then(|b| serde_json::from_slice(&b).ok());
1142 let changed_at = chrono::DateTime::parse_from_rfc3339(&changed_at_str)
1143 .map(|dt| dt.with_timezone(&Utc))
1144 .unwrap_or_else(|_| Utc::now());
1145
1146 Ok(SubmissionChange {
1147 change_id,
1148 manifest_id,
1149 change_type,
1150 resource_type,
1151 resource_id,
1152 previous_version,
1153 new_version,
1154 previous_content,
1155 changed_at,
1156 })
1157 },
1158 )
1159 .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?
1160 .filter_map(|r| r.ok())
1161 .collect();
1162
1163 Ok(changes)
1164 }
1165
1166 async fn rollback_change(
1167 &self,
1168 tenant: &TenantContext,
1169 _submission_id: &SubmissionId,
1170 change: &SubmissionChange,
1171 ) -> StorageResult<bool> {
1172 match change.change_type {
1173 ChangeType::Create => {
1174 match self
1176 .delete(tenant, &change.resource_type, &change.resource_id)
1177 .await
1178 {
1179 Ok(()) => Ok(true),
1180 Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1181 ..
1182 })) => {
1183 Ok(true)
1185 }
1186 Err(e) => Err(e),
1187 }
1188 }
1189 ChangeType::Update => {
1190 if let Some(ref previous_content) = change.previous_content {
1192 let current = self
1194 .read(tenant, &change.resource_type, &change.resource_id)
1195 .await?;
1196 if let Some(current) = current {
1197 self.update(tenant, ¤t, previous_content.clone())
1198 .await?;
1199 Ok(true)
1200 } else {
1201 Ok(false)
1203 }
1204 } else {
1205 Ok(false)
1207 }
1208 }
1209 }
1210 }
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215 use super::*;
1216 use crate::tenant::{TenantId, TenantPermissions};
1217 use serde_json::json;
1218
1219 fn create_test_backend() -> SqliteBackend {
1220 let backend = SqliteBackend::in_memory().unwrap();
1221 backend.init_schema().unwrap();
1222 backend
1223 }
1224
1225 fn create_test_tenant() -> TenantContext {
1226 TenantContext::new(
1227 TenantId::new("test-tenant"),
1228 TenantPermissions::full_access(),
1229 )
1230 }
1231
1232 #[tokio::test]
1233 async fn test_create_submission() {
1234 let backend = create_test_backend();
1235 let tenant = create_test_tenant();
1236
1237 let sub_id = SubmissionId::generate("test-system");
1238 let summary = backend
1239 .create_submission(&tenant, &sub_id, None)
1240 .await
1241 .unwrap();
1242
1243 assert_eq!(summary.status, SubmissionStatus::InProgress);
1244 assert_eq!(summary.manifest_count, 0);
1245 }
1246
1247 #[tokio::test]
1248 async fn test_duplicate_submission() {
1249 let backend = create_test_backend();
1250 let tenant = create_test_tenant();
1251
1252 let sub_id = SubmissionId::new("test-system", "sub-123");
1253 backend
1254 .create_submission(&tenant, &sub_id, None)
1255 .await
1256 .unwrap();
1257
1258 let result = backend.create_submission(&tenant, &sub_id, None).await;
1259 assert!(matches!(
1260 result,
1261 Err(StorageError::BulkSubmit(
1262 BulkSubmitError::DuplicateSubmission { .. }
1263 ))
1264 ));
1265 }
1266
1267 #[tokio::test]
1268 async fn test_add_manifest() {
1269 let backend = create_test_backend();
1270 let tenant = create_test_tenant();
1271
1272 let sub_id = SubmissionId::generate("test-system");
1273 backend
1274 .create_submission(&tenant, &sub_id, None)
1275 .await
1276 .unwrap();
1277
1278 let manifest = backend
1279 .add_manifest(
1280 &tenant,
1281 &sub_id,
1282 Some("http://example.com/data.ndjson"),
1283 None,
1284 )
1285 .await
1286 .unwrap();
1287
1288 assert_eq!(manifest.status, ManifestStatus::Pending);
1289 assert_eq!(
1290 manifest.manifest_url,
1291 Some("http://example.com/data.ndjson".to_string())
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn test_process_entries() {
1297 let backend = create_test_backend();
1298 let tenant = create_test_tenant();
1299
1300 let sub_id = SubmissionId::generate("test-system");
1301 backend
1302 .create_submission(&tenant, &sub_id, None)
1303 .await
1304 .unwrap();
1305
1306 let manifest = backend
1307 .add_manifest(&tenant, &sub_id, None, None)
1308 .await
1309 .unwrap();
1310
1311 let entries = vec![
1312 NdjsonEntry::new(
1313 1,
1314 "Patient",
1315 json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
1316 ),
1317 NdjsonEntry::new(
1318 2,
1319 "Patient",
1320 json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
1321 ),
1322 ];
1323
1324 let options = BulkProcessingOptions::new();
1325 let results = backend
1326 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1327 .await
1328 .unwrap();
1329
1330 assert_eq!(results.len(), 2);
1331 assert!(results.iter().all(|r| r.is_success()));
1332 assert!(results.iter().all(|r| r.created));
1333 }
1334
1335 #[tokio::test]
1336 async fn test_complete_submission() {
1337 let backend = create_test_backend();
1338 let tenant = create_test_tenant();
1339
1340 let sub_id = SubmissionId::generate("test-system");
1341 backend
1342 .create_submission(&tenant, &sub_id, None)
1343 .await
1344 .unwrap();
1345
1346 let summary = backend.complete_submission(&tenant, &sub_id).await.unwrap();
1347 assert_eq!(summary.status, SubmissionStatus::Complete);
1348 assert!(summary.completed_at.is_some());
1349 }
1350
1351 #[tokio::test]
1352 async fn test_abort_submission() {
1353 let backend = create_test_backend();
1354 let tenant = create_test_tenant();
1355
1356 let sub_id = SubmissionId::generate("test-system");
1357 backend
1358 .create_submission(&tenant, &sub_id, None)
1359 .await
1360 .unwrap();
1361
1362 backend
1363 .add_manifest(&tenant, &sub_id, None, None)
1364 .await
1365 .unwrap();
1366
1367 let cancelled = backend
1368 .abort_submission(&tenant, &sub_id, "test abort")
1369 .await
1370 .unwrap();
1371 assert_eq!(cancelled, 1);
1372
1373 let summary = backend
1374 .get_submission(&tenant, &sub_id)
1375 .await
1376 .unwrap()
1377 .unwrap();
1378 assert_eq!(summary.status, SubmissionStatus::Aborted);
1379 }
1380
1381 #[tokio::test]
1382 async fn test_rollback_create() {
1383 let backend = create_test_backend();
1384 let tenant = create_test_tenant();
1385
1386 let sub_id = SubmissionId::generate("test-system");
1387 backend
1388 .create_submission(&tenant, &sub_id, None)
1389 .await
1390 .unwrap();
1391
1392 let manifest = backend
1393 .add_manifest(&tenant, &sub_id, None, None)
1394 .await
1395 .unwrap();
1396
1397 let entries = vec![NdjsonEntry::new(
1398 1,
1399 "Patient",
1400 json!({"resourceType": "Patient", "id": "rollback-test", "name": [{"family": "Test"}]}),
1401 )];
1402
1403 let options = BulkProcessingOptions::new();
1404 let _results = backend
1405 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1406 .await
1407 .unwrap();
1408
1409 let patient = backend
1411 .read(&tenant, "Patient", "rollback-test")
1412 .await
1413 .unwrap();
1414 assert!(patient.is_some());
1415
1416 let changes = backend.list_changes(&tenant, &sub_id, 10, 0).await.unwrap();
1418 assert_eq!(changes.len(), 1);
1419
1420 let rolled_back = backend
1421 .rollback_change(&tenant, &sub_id, &changes[0])
1422 .await
1423 .unwrap();
1424 assert!(rolled_back);
1425
1426 let patient = backend.read(&tenant, "Patient", "rollback-test").await;
1428 assert!(patient.is_err()); }
1430
1431 #[tokio::test]
1432 async fn test_entry_counts() {
1433 let backend = create_test_backend();
1434 let tenant = create_test_tenant();
1435
1436 let sub_id = SubmissionId::generate("test-system");
1437 backend
1438 .create_submission(&tenant, &sub_id, None)
1439 .await
1440 .unwrap();
1441
1442 let manifest = backend
1443 .add_manifest(&tenant, &sub_id, None, None)
1444 .await
1445 .unwrap();
1446
1447 let entries = vec![
1448 NdjsonEntry::new(
1449 1,
1450 "Patient",
1451 json!({"resourceType": "Patient", "name": [{"family": "Test1"}]}),
1452 ),
1453 NdjsonEntry::new(
1454 2,
1455 "Patient",
1456 json!({"resourceType": "Patient", "name": [{"family": "Test2"}]}),
1457 ),
1458 ];
1459
1460 let options = BulkProcessingOptions::new();
1461 backend
1462 .process_entries(&tenant, &sub_id, &manifest.manifest_id, entries, &options)
1463 .await
1464 .unwrap();
1465
1466 let counts = backend
1467 .get_entry_counts(&tenant, &sub_id, &manifest.manifest_id)
1468 .await
1469 .unwrap();
1470
1471 assert_eq!(counts.total, 2);
1472 assert_eq!(counts.success, 2);
1473 assert_eq!(counts.error_count(), 0);
1474 }
1475}