1use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt};
8use uuid::Uuid;
9
10use crate::core::ResourceStorage;
11use crate::core::bulk_submit::{
12 BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
13 BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
14 StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
15 SubmissionManifest, SubmissionStatus, SubmissionSummary,
16};
17use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
18use crate::tenant::TenantContext;
19
20use super::PostgresBackend;
21
22fn internal_error(message: String) -> StorageError {
23 StorageError::Backend(BackendError::Internal {
24 backend_name: "postgres".to_string(),
25 message,
26 source: None,
27 })
28}
29
30#[async_trait]
31impl BulkSubmitProvider for PostgresBackend {
32 async fn create_submission(
33 &self,
34 tenant: &TenantContext,
35 id: &SubmissionId,
36 metadata: Option<Value>,
37 ) -> StorageResult<SubmissionSummary> {
38 let client = self.get_client().await?;
39 let tenant_id = tenant.tenant_id().as_str();
40
41 let rows = client
43 .query(
44 "SELECT 1 FROM bulk_submissions
45 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
46 &[
47 &tenant_id,
48 &id.submitter.as_str(),
49 &id.submission_id.as_str(),
50 ],
51 )
52 .await
53 .map_err(|e| internal_error(format!("Failed to check duplicate: {}", e)))?;
54
55 if !rows.is_empty() {
56 return Err(StorageError::BulkSubmit(
57 BulkSubmitError::DuplicateSubmission {
58 submitter: id.submitter.clone(),
59 submission_id: id.submission_id.clone(),
60 },
61 ));
62 }
63
64 let now = Utc::now();
65 let metadata_json: Option<Value> = metadata.clone();
66
67 client
68 .execute(
69 "INSERT INTO bulk_submissions
70 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
71 VALUES ($1, $2, $3, 'in-progress', $4, $5, $6)",
72 &[
73 &tenant_id,
74 &id.submitter.as_str(),
75 &id.submission_id.as_str(),
76 &now,
77 &now,
78 &metadata_json,
79 ],
80 )
81 .await
82 .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
83
84 Ok(SubmissionSummary {
85 id: id.clone(),
86 status: SubmissionStatus::InProgress,
87 created_at: now,
88 updated_at: now,
89 completed_at: None,
90 manifest_count: 0,
91 total_entries: 0,
92 success_count: 0,
93 error_count: 0,
94 skipped_count: 0,
95 metadata,
96 })
97 }
98
99 async fn get_submission(
100 &self,
101 tenant: &TenantContext,
102 id: &SubmissionId,
103 ) -> StorageResult<Option<SubmissionSummary>> {
104 let client = self.get_client().await?;
105 let tenant_id = tenant.tenant_id().as_str();
106
107 let rows = client
108 .query(
109 "SELECT status, created_at, updated_at, completed_at, metadata
110 FROM bulk_submissions
111 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
112 &[
113 &tenant_id,
114 &id.submitter.as_str(),
115 &id.submission_id.as_str(),
116 ],
117 )
118 .await
119 .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
120
121 if rows.is_empty() {
122 return Ok(None);
123 }
124
125 let row = &rows[0];
126 let status_str: String = row.get(0);
127 let created_at: chrono::DateTime<Utc> = row.get(1);
128 let updated_at: chrono::DateTime<Utc> = row.get(2);
129 let completed_at: Option<chrono::DateTime<Utc>> = row.get(3);
130 let metadata: Option<Value> = row.get(4);
131
132 let status: SubmissionStatus = status_str
133 .parse()
134 .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
135
136 let manifest_row = client
138 .query_one(
139 "SELECT COUNT(*) FROM bulk_manifests
140 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
141 &[
142 &tenant_id,
143 &id.submitter.as_str(),
144 &id.submission_id.as_str(),
145 ],
146 )
147 .await
148 .map_err(|e| internal_error(format!("Failed to count manifests: {}", e)))?;
149
150 let manifest_count: i64 = manifest_row.get(0);
151
152 let counts_row = client
154 .query_one(
155 "SELECT
156 COUNT(*),
157 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
158 SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
159 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
160 FROM bulk_entry_results
161 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
162 &[&tenant_id, &id.submitter.as_str(), &id.submission_id.as_str()],
163 )
164 .await
165 .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
166
167 let total: i64 = counts_row.get(0);
168 let success: Option<i64> = counts_row.get(1);
169 let errors: Option<i64> = counts_row.get(2);
170 let skipped: Option<i64> = counts_row.get(3);
171
172 Ok(Some(SubmissionSummary {
173 id: id.clone(),
174 status,
175 created_at,
176 updated_at,
177 completed_at,
178 manifest_count: manifest_count as u32,
179 total_entries: total as u64,
180 success_count: success.unwrap_or(0) as u64,
181 error_count: errors.unwrap_or(0) as u64,
182 skipped_count: skipped.unwrap_or(0) as u64,
183 metadata,
184 }))
185 }
186
187 async fn list_submissions(
188 &self,
189 tenant: &TenantContext,
190 submitter: Option<&str>,
191 status: Option<SubmissionStatus>,
192 limit: u32,
193 offset: u32,
194 ) -> StorageResult<Vec<SubmissionSummary>> {
195 let client = self.get_client().await?;
196 let tenant_id = tenant.tenant_id().as_str();
197
198 let mut sql = "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = $1"
199 .to_string();
200 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
201 vec![Box::new(tenant_id.to_string())];
202 let mut param_idx = 2;
203
204 if let Some(submitter) = submitter {
205 sql.push_str(&format!(" AND submitter = ${}", param_idx));
206 params.push(Box::new(submitter.to_string()));
207 param_idx += 1;
208 }
209
210 if let Some(status) = status {
211 sql.push_str(&format!(" AND status = ${}", param_idx));
212 params.push(Box::new(status.to_string()));
213 }
214
215 sql.push_str(&format!(
216 " ORDER BY created_at DESC LIMIT {} OFFSET {}",
217 limit, offset
218 ));
219
220 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
221 .iter()
222 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
223 .collect();
224
225 let rows = client
226 .query(&sql, ¶m_refs)
227 .await
228 .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?;
229
230 let mut results = Vec::new();
231 for row in &rows {
232 let submitter: String = row.get(0);
233 let submission_id: String = row.get(1);
234 let sub_id = SubmissionId::new(submitter, submission_id);
235 if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
236 results.push(summary);
237 }
238 }
239
240 Ok(results)
241 }
242
243 async fn complete_submission(
244 &self,
245 tenant: &TenantContext,
246 id: &SubmissionId,
247 ) -> StorageResult<SubmissionSummary> {
248 let client = self.get_client().await?;
249 let tenant_id = tenant.tenant_id().as_str();
250
251 let rows = client
253 .query(
254 "SELECT status FROM bulk_submissions
255 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
256 &[
257 &tenant_id,
258 &id.submitter.as_str(),
259 &id.submission_id.as_str(),
260 ],
261 )
262 .await
263 .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
264
265 if rows.is_empty() {
266 return Err(StorageError::BulkSubmit(
267 BulkSubmitError::SubmissionNotFound {
268 submitter: id.submitter.clone(),
269 submission_id: id.submission_id.clone(),
270 },
271 ));
272 }
273
274 let current_status: String = rows[0].get(0);
275 if current_status != "in-progress" {
276 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
277 submission_id: id.submission_id.clone(),
278 }));
279 }
280
281 let now = Utc::now();
282 client
283 .execute(
284 "UPDATE bulk_submissions SET status = 'complete', completed_at = $1, updated_at = $2
285 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
286 &[
287 &now,
288 &now,
289 &tenant_id,
290 &id.submitter.as_str(),
291 &id.submission_id.as_str(),
292 ],
293 )
294 .await
295 .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
296
297 self.get_submission(tenant, id)
298 .await?
299 .ok_or_else(|| internal_error("Submission disappeared".to_string()))
300 }
301
302 async fn abort_submission(
303 &self,
304 tenant: &TenantContext,
305 id: &SubmissionId,
306 _reason: &str,
307 ) -> StorageResult<u64> {
308 let client = self.get_client().await?;
309 let tenant_id = tenant.tenant_id().as_str();
310
311 let rows = client
313 .query(
314 "SELECT status FROM bulk_submissions
315 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
316 &[
317 &tenant_id,
318 &id.submitter.as_str(),
319 &id.submission_id.as_str(),
320 ],
321 )
322 .await
323 .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
324
325 if rows.is_empty() {
326 return Err(StorageError::BulkSubmit(
327 BulkSubmitError::SubmissionNotFound {
328 submitter: id.submitter.clone(),
329 submission_id: id.submission_id.clone(),
330 },
331 ));
332 }
333
334 let current_status: String = rows[0].get(0);
335 if current_status != "in-progress" {
336 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
337 submission_id: id.submission_id.clone(),
338 }));
339 }
340
341 let pending_row = client
343 .query_one(
344 "SELECT COUNT(*) FROM bulk_manifests
345 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
346 AND status IN ('pending', 'processing')",
347 &[
348 &tenant_id,
349 &id.submitter.as_str(),
350 &id.submission_id.as_str(),
351 ],
352 )
353 .await
354 .map_err(|e| internal_error(format!("Failed to count pending manifests: {}", e)))?;
355
356 let pending_count: i64 = pending_row.get(0);
357 let now = Utc::now();
358
359 client
361 .execute(
362 "UPDATE bulk_submissions SET status = 'aborted', completed_at = $1, updated_at = $2
363 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
364 &[
365 &now,
366 &now,
367 &tenant_id,
368 &id.submitter.as_str(),
369 &id.submission_id.as_str(),
370 ],
371 )
372 .await
373 .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
374
375 client
377 .execute(
378 "UPDATE bulk_manifests SET status = 'failed'
379 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
380 AND status IN ('pending', 'processing')",
381 &[
382 &tenant_id,
383 &id.submitter.as_str(),
384 &id.submission_id.as_str(),
385 ],
386 )
387 .await
388 .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
389
390 Ok(pending_count as u64)
391 }
392
393 async fn add_manifest(
394 &self,
395 tenant: &TenantContext,
396 submission_id: &SubmissionId,
397 manifest_url: Option<&str>,
398 replaces_manifest_url: Option<&str>,
399 ) -> StorageResult<SubmissionManifest> {
400 let client = self.get_client().await?;
401 let tenant_id = tenant.tenant_id().as_str();
402
403 let rows = client
405 .query(
406 "SELECT status FROM bulk_submissions
407 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
408 &[
409 &tenant_id,
410 &submission_id.submitter.as_str(),
411 &submission_id.submission_id.as_str(),
412 ],
413 )
414 .await
415 .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
416
417 if rows.is_empty() {
418 return Err(StorageError::BulkSubmit(
419 BulkSubmitError::SubmissionNotFound {
420 submitter: submission_id.submitter.clone(),
421 submission_id: submission_id.submission_id.clone(),
422 },
423 ));
424 }
425
426 let status: String = rows[0].get(0);
427 if status != "in-progress" {
428 return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
429 submission_id: submission_id.submission_id.clone(),
430 expected: "in-progress".to_string(),
431 actual: status,
432 }));
433 }
434
435 let manifest_id = Uuid::new_v4().to_string();
436 let now = Utc::now();
437
438 client
439 .execute(
440 "INSERT INTO bulk_manifests
441 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
442 VALUES ($1, $2, $3, $4, $5, $6, 'pending', $7)",
443 &[
444 &tenant_id,
445 &submission_id.submitter.as_str(),
446 &submission_id.submission_id.as_str(),
447 &manifest_id.as_str(),
448 &manifest_url,
449 &replaces_manifest_url,
450 &now,
451 ],
452 )
453 .await
454 .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
455
456 client
458 .execute(
459 "UPDATE bulk_submissions SET updated_at = $1
460 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
461 &[
462 &now,
463 &tenant_id,
464 &submission_id.submitter.as_str(),
465 &submission_id.submission_id.as_str(),
466 ],
467 )
468 .await
469 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
470
471 Ok(SubmissionManifest {
472 manifest_id,
473 manifest_url: manifest_url.map(String::from),
474 replaces_manifest_url: replaces_manifest_url.map(String::from),
475 status: ManifestStatus::Pending,
476 added_at: now,
477 total_entries: 0,
478 processed_entries: 0,
479 failed_entries: 0,
480 })
481 }
482
483 async fn get_manifest(
484 &self,
485 tenant: &TenantContext,
486 submission_id: &SubmissionId,
487 manifest_id: &str,
488 ) -> StorageResult<Option<SubmissionManifest>> {
489 let client = self.get_client().await?;
490 let tenant_id = tenant.tenant_id().as_str();
491
492 let rows = client
493 .query(
494 "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
495 FROM bulk_manifests
496 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
497 &[
498 &tenant_id,
499 &submission_id.submitter.as_str(),
500 &submission_id.submission_id.as_str(),
501 &manifest_id,
502 ],
503 )
504 .await
505 .map_err(|e| internal_error(format!("Failed to get manifest: {}", e)))?;
506
507 if rows.is_empty() {
508 return Ok(None);
509 }
510
511 let row = &rows[0];
512 let manifest_url: Option<String> = row.get(0);
513 let replaces_manifest_url: Option<String> = row.get(1);
514 let status_str: String = row.get(2);
515 let added_at: chrono::DateTime<Utc> = row.get(3);
516 let total: i64 = row.get(4);
517 let processed: i64 = row.get(5);
518 let failed: i64 = row.get(6);
519
520 let status: ManifestStatus = status_str
521 .parse()
522 .map_err(|_| internal_error(format!("Invalid manifest status: {}", status_str)))?;
523
524 Ok(Some(SubmissionManifest {
525 manifest_id: manifest_id.to_string(),
526 manifest_url,
527 replaces_manifest_url,
528 status,
529 added_at,
530 total_entries: total as u64,
531 processed_entries: processed as u64,
532 failed_entries: failed as u64,
533 }))
534 }
535
536 async fn list_manifests(
537 &self,
538 tenant: &TenantContext,
539 submission_id: &SubmissionId,
540 ) -> StorageResult<Vec<SubmissionManifest>> {
541 let client = self.get_client().await?;
542 let tenant_id = tenant.tenant_id().as_str();
543
544 let rows = client
545 .query(
546 "SELECT manifest_id FROM bulk_manifests
547 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
548 ORDER BY added_at",
549 &[
550 &tenant_id,
551 &submission_id.submitter.as_str(),
552 &submission_id.submission_id.as_str(),
553 ],
554 )
555 .await
556 .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?;
557
558 let mut results = Vec::new();
559 for row in &rows {
560 let manifest_id: String = row.get(0);
561 if let Some(manifest) = self
562 .get_manifest(tenant, submission_id, &manifest_id)
563 .await?
564 {
565 results.push(manifest);
566 }
567 }
568
569 Ok(results)
570 }
571
572 async fn process_entries(
573 &self,
574 tenant: &TenantContext,
575 submission_id: &SubmissionId,
576 manifest_id: &str,
577 entries: Vec<NdjsonEntry>,
578 options: &BulkProcessingOptions,
579 ) -> StorageResult<Vec<BulkEntryResult>> {
580 let client = self.get_client().await?;
581 let tenant_id = tenant.tenant_id().as_str();
582
583 if self
585 .get_manifest(tenant, submission_id, manifest_id)
586 .await?
587 .is_none()
588 {
589 return Err(StorageError::BulkSubmit(
590 BulkSubmitError::ManifestNotFound {
591 submission_id: submission_id.submission_id.clone(),
592 manifest_id: manifest_id.to_string(),
593 },
594 ));
595 }
596
597 client
599 .execute(
600 "UPDATE bulk_manifests SET status = 'processing'
601 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
602 &[
603 &tenant_id,
604 &submission_id.submitter.as_str(),
605 &submission_id.submission_id.as_str(),
606 &manifest_id,
607 ],
608 )
609 .await
610 .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
611
612 let mut results = Vec::new();
613 let mut error_count = 0u32;
614
615 for entry in entries {
616 if options.max_errors > 0 && error_count >= options.max_errors {
617 if !options.continue_on_error {
618 return Err(StorageError::BulkSubmit(
619 BulkSubmitError::MaxErrorsExceeded {
620 submission_id: submission_id.submission_id.clone(),
621 max_errors: options.max_errors,
622 },
623 ));
624 }
625 let skip_result = BulkEntryResult::skipped(
626 entry.line_number,
627 &entry.resource_type,
628 "max errors exceeded",
629 );
630 results.push(skip_result);
631 continue;
632 }
633
634 let result = self
635 .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
636 .await;
637
638 let entry_result = match result {
639 Ok(r) => r,
640 Err(e) => {
641 error_count += 1;
642 BulkEntryResult::processing_error(
643 entry.line_number,
644 &entry.resource_type,
645 serde_json::json!({
646 "resourceType": "OperationOutcome",
647 "issue": [{
648 "severity": "error",
649 "code": "exception",
650 "diagnostics": e.to_string()
651 }]
652 }),
653 )
654 }
655 };
656
657 if entry_result.is_error() {
658 error_count += 1;
659 }
660
661 self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
662 .await?;
663
664 results.push(entry_result);
665 }
666
667 let now = Utc::now();
669 client
670 .execute(
671 "UPDATE bulk_manifests SET
672 total_entries = total_entries + $1,
673 processed_entries = processed_entries + $2,
674 failed_entries = failed_entries + $3
675 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
676 &[
677 &(results.len() as i64),
678 &(results.iter().filter(|r| r.is_success()).count() as i64),
679 &(error_count as i64),
680 &tenant_id,
681 &submission_id.submitter.as_str(),
682 &submission_id.submission_id.as_str(),
683 &manifest_id,
684 ],
685 )
686 .await
687 .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
688
689 client
691 .execute(
692 "UPDATE bulk_submissions SET updated_at = $1
693 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
694 &[
695 &now,
696 &tenant_id,
697 &submission_id.submitter.as_str(),
698 &submission_id.submission_id.as_str(),
699 ],
700 )
701 .await
702 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
703
704 Ok(results)
705 }
706
707 async fn get_entry_results(
708 &self,
709 tenant: &TenantContext,
710 submission_id: &SubmissionId,
711 manifest_id: &str,
712 outcome_filter: Option<BulkEntryOutcome>,
713 limit: u32,
714 offset: u32,
715 ) -> StorageResult<Vec<BulkEntryResult>> {
716 let client = self.get_client().await?;
717 let tenant_id = tenant.tenant_id().as_str();
718
719 let mut sql =
720 "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
721 FROM bulk_entry_results
722 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4"
723 .to_string();
724
725 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
726 Box::new(tenant_id.to_string()),
727 Box::new(submission_id.submitter.clone()),
728 Box::new(submission_id.submission_id.clone()),
729 Box::new(manifest_id.to_string()),
730 ];
731
732 if let Some(outcome) = outcome_filter {
733 sql.push_str(" AND outcome = $5");
734 params.push(Box::new(outcome.to_string()));
735 }
736
737 sql.push_str(&format!(
738 " ORDER BY line_number LIMIT {} OFFSET {}",
739 limit, offset
740 ));
741
742 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
743 .iter()
744 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
745 .collect();
746
747 let rows = client
748 .query(&sql, ¶m_refs)
749 .await
750 .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?;
751
752 let results: Vec<BulkEntryResult> = rows
753 .iter()
754 .map(|row| {
755 let line_number: i64 = row.get(0);
756 let resource_type: String = row.get(1);
757 let resource_id: Option<String> = row.get(2);
758 let created: Option<bool> = row.get(3);
759 let outcome_str: String = row.get(4);
760 let operation_outcome: Option<Value> = row.get(5);
761
762 let outcome: BulkEntryOutcome = outcome_str
763 .parse()
764 .unwrap_or(BulkEntryOutcome::ProcessingError);
765
766 BulkEntryResult {
767 line_number: line_number as u64,
768 resource_type,
769 resource_id,
770 created: created.unwrap_or(false),
771 outcome,
772 operation_outcome,
773 }
774 })
775 .collect();
776
777 Ok(results)
778 }
779
780 async fn get_entry_counts(
781 &self,
782 tenant: &TenantContext,
783 submission_id: &SubmissionId,
784 manifest_id: &str,
785 ) -> StorageResult<EntryCountSummary> {
786 let client = self.get_client().await?;
787 let tenant_id = tenant.tenant_id().as_str();
788
789 let row = client
790 .query_one(
791 "SELECT
792 COUNT(*),
793 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
794 SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
795 SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
796 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
797 FROM bulk_entry_results
798 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
799 &[
800 &tenant_id,
801 &submission_id.submitter.as_str(),
802 &submission_id.submission_id.as_str(),
803 &manifest_id,
804 ],
805 )
806 .await
807 .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
808
809 let total: i64 = row.get(0);
810 let success: Option<i64> = row.get(1);
811 let validation_error: Option<i64> = row.get(2);
812 let processing_error: Option<i64> = row.get(3);
813 let skipped: Option<i64> = row.get(4);
814
815 Ok(EntryCountSummary {
816 total: total as u64,
817 success: success.unwrap_or(0) as u64,
818 validation_error: validation_error.unwrap_or(0) as u64,
819 processing_error: processing_error.unwrap_or(0) as u64,
820 skipped: skipped.unwrap_or(0) as u64,
821 })
822 }
823}
824
825impl PostgresBackend {
826 async fn process_single_entry(
828 &self,
829 tenant: &TenantContext,
830 submission_id: &SubmissionId,
831 manifest_id: &str,
832 entry: &NdjsonEntry,
833 options: &BulkProcessingOptions,
834 ) -> StorageResult<BulkEntryResult> {
835 let resource_id = entry.resource_id.as_ref();
836
837 if let Some(id) = resource_id {
838 let existing = self.read(tenant, &entry.resource_type, id).await;
839
840 match existing {
841 Ok(Some(current)) => {
842 if !options.allow_updates {
843 return Ok(BulkEntryResult::skipped(
844 entry.line_number,
845 &entry.resource_type,
846 "updates not allowed",
847 ));
848 }
849
850 let change = SubmissionChange::update(
851 manifest_id,
852 &entry.resource_type,
853 id,
854 current.version_id(),
855 (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
856 current.content().clone(),
857 );
858 self.record_change(tenant, submission_id, &change).await?;
859
860 let updated = self
861 .update(tenant, ¤t, entry.resource.clone())
862 .await?;
863
864 Ok(BulkEntryResult::success(
865 entry.line_number,
866 &entry.resource_type,
867 updated.id(),
868 false,
869 ))
870 }
871 Ok(None)
872 | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
873 let created = self
874 .create(
875 tenant,
876 &entry.resource_type,
877 entry.resource.clone(),
878 FhirVersion::default(),
879 )
880 .await?;
881
882 let change = SubmissionChange::create(
883 manifest_id,
884 &entry.resource_type,
885 created.id(),
886 created.version_id(),
887 );
888 self.record_change(tenant, submission_id, &change).await?;
889
890 Ok(BulkEntryResult::success(
891 entry.line_number,
892 &entry.resource_type,
893 created.id(),
894 true,
895 ))
896 }
897 Err(e) => Err(e),
898 }
899 } else {
900 let created = self
901 .create(
902 tenant,
903 &entry.resource_type,
904 entry.resource.clone(),
905 FhirVersion::default(),
906 )
907 .await?;
908
909 let change = SubmissionChange::create(
910 manifest_id,
911 &entry.resource_type,
912 created.id(),
913 created.version_id(),
914 );
915 self.record_change(tenant, submission_id, &change).await?;
916
917 Ok(BulkEntryResult::success(
918 entry.line_number,
919 &entry.resource_type,
920 created.id(),
921 true,
922 ))
923 }
924 }
925
926 async fn store_entry_result(
928 &self,
929 tenant: &TenantContext,
930 submission_id: &SubmissionId,
931 manifest_id: &str,
932 result: &BulkEntryResult,
933 ) -> StorageResult<()> {
934 let client = self.get_client().await?;
935 let tenant_id = tenant.tenant_id().as_str();
936
937 let outcome_json: Option<Value> = result.operation_outcome.clone();
938
939 client
940 .execute(
941 "INSERT INTO bulk_entry_results
942 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
943 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
944 &[
945 &tenant_id,
946 &submission_id.submitter.as_str(),
947 &submission_id.submission_id.as_str(),
948 &manifest_id,
949 &(result.line_number as i64),
950 &result.resource_type.as_str(),
951 &result.resource_id,
952 &result.created,
953 &result.outcome.to_string().as_str(),
954 &outcome_json,
955 ],
956 )
957 .await
958 .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
959
960 Ok(())
961 }
962}
963
964#[async_trait]
965impl StreamingBulkSubmitProvider for PostgresBackend {
966 async fn process_ndjson_stream(
967 &self,
968 tenant: &TenantContext,
969 submission_id: &SubmissionId,
970 manifest_id: &str,
971 resource_type: &str,
972 mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
973 options: &BulkProcessingOptions,
974 ) -> StorageResult<StreamProcessingResult> {
975 let mut result = StreamProcessingResult::new();
976 let mut line_number = 0u64;
977 let mut batch = Vec::new();
978
979 loop {
980 let mut line = String::new();
981 let bytes_read = reader
982 .read_line(&mut line)
983 .await
984 .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
985
986 if bytes_read == 0 {
987 break;
988 }
989
990 line_number += 1;
991 result.lines_processed = line_number;
992
993 let line = line.trim();
994 if line.is_empty() {
995 continue;
996 }
997
998 match NdjsonEntry::parse(line_number, line) {
999 Ok(entry) => {
1000 if entry.resource_type != resource_type {
1001 let error_result = BulkEntryResult::validation_error(
1002 line_number,
1003 &entry.resource_type,
1004 serde_json::json!({
1005 "resourceType": "OperationOutcome",
1006 "issue": [{
1007 "severity": "error",
1008 "code": "invalid",
1009 "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1010 }]
1011 }),
1012 );
1013 result.counts.increment(error_result.outcome);
1014
1015 if !options.continue_on_error
1016 && (options.max_errors == 0
1017 || result.counts.error_count() >= options.max_errors as u64)
1018 {
1019 return Ok(result.aborted("max errors exceeded"));
1020 }
1021 continue;
1022 }
1023
1024 batch.push(entry);
1025 }
1026 Err(e) => {
1027 result.counts.increment(BulkEntryOutcome::ValidationError);
1028
1029 if !options.continue_on_error
1030 && (options.max_errors == 0
1031 || result.counts.error_count() >= options.max_errors as u64)
1032 {
1033 return Ok(result.aborted(format!("Parse error: {}", e)));
1034 }
1035 }
1036 }
1037
1038 if batch.len() >= options.batch_size as usize {
1039 let batch_results = self
1040 .process_entries(
1041 tenant,
1042 submission_id,
1043 manifest_id,
1044 std::mem::take(&mut batch),
1045 options,
1046 )
1047 .await?;
1048
1049 for r in batch_results {
1050 result.counts.increment(r.outcome);
1051 }
1052
1053 if !options.continue_on_error
1054 && options.max_errors > 0
1055 && result.counts.error_count() >= options.max_errors as u64
1056 {
1057 return Ok(result.aborted("max errors exceeded"));
1058 }
1059 }
1060 }
1061
1062 if !batch.is_empty() {
1064 let batch_results = self
1065 .process_entries(tenant, submission_id, manifest_id, batch, options)
1066 .await?;
1067
1068 for r in batch_results {
1069 result.counts.increment(r.outcome);
1070 }
1071 }
1072
1073 Ok(result)
1074 }
1075}
1076
1077#[async_trait]
1078impl BulkSubmitRollbackProvider for PostgresBackend {
1079 async fn record_change(
1080 &self,
1081 tenant: &TenantContext,
1082 submission_id: &SubmissionId,
1083 change: &SubmissionChange,
1084 ) -> StorageResult<()> {
1085 let client = self.get_client().await?;
1086 let tenant_id = tenant.tenant_id().as_str();
1087
1088 let previous_content_json: Option<Value> = change.previous_content.clone();
1089
1090 client
1091 .execute(
1092 "INSERT INTO bulk_submission_changes
1093 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1094 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1095 &[
1096 &tenant_id,
1097 &submission_id.submitter.as_str(),
1098 &submission_id.submission_id.as_str(),
1099 &change.change_id.as_str(),
1100 &change.manifest_id.as_str(),
1101 &change.change_type.to_string().as_str(),
1102 &change.resource_type.as_str(),
1103 &change.resource_id.as_str(),
1104 &change.previous_version,
1105 &change.new_version.as_str(),
1106 &previous_content_json,
1107 &change.changed_at,
1108 ],
1109 )
1110 .await
1111 .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1112
1113 Ok(())
1114 }
1115
1116 async fn list_changes(
1117 &self,
1118 tenant: &TenantContext,
1119 submission_id: &SubmissionId,
1120 limit: u32,
1121 offset: u32,
1122 ) -> StorageResult<Vec<SubmissionChange>> {
1123 let client = self.get_client().await?;
1124 let tenant_id = tenant.tenant_id().as_str();
1125
1126 let sql = format!(
1127 "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1128 FROM bulk_submission_changes
1129 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1130 ORDER BY changed_at DESC
1131 LIMIT {} OFFSET {}",
1132 limit, offset
1133 );
1134
1135 let rows = client
1136 .query(
1137 &sql,
1138 &[
1139 &tenant_id,
1140 &submission_id.submitter.as_str(),
1141 &submission_id.submission_id.as_str(),
1142 ],
1143 )
1144 .await
1145 .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?;
1146
1147 let changes: Vec<SubmissionChange> = rows
1148 .iter()
1149 .map(|row| {
1150 let change_id: String = row.get(0);
1151 let manifest_id: String = row.get(1);
1152 let change_type_str: String = row.get(2);
1153 let resource_type: String = row.get(3);
1154 let resource_id: String = row.get(4);
1155 let previous_version: Option<String> = row.get(5);
1156 let new_version: String = row.get(6);
1157 let previous_content: Option<Value> = row.get(7);
1158 let changed_at: chrono::DateTime<Utc> = row.get(8);
1159
1160 let change_type: ChangeType = change_type_str.parse().unwrap_or(ChangeType::Create);
1161
1162 SubmissionChange {
1163 change_id,
1164 manifest_id,
1165 change_type,
1166 resource_type,
1167 resource_id,
1168 previous_version,
1169 new_version,
1170 previous_content,
1171 changed_at,
1172 }
1173 })
1174 .collect();
1175
1176 Ok(changes)
1177 }
1178
1179 async fn rollback_change(
1180 &self,
1181 tenant: &TenantContext,
1182 _submission_id: &SubmissionId,
1183 change: &SubmissionChange,
1184 ) -> StorageResult<bool> {
1185 match change.change_type {
1186 ChangeType::Create => {
1187 match self
1188 .delete(tenant, &change.resource_type, &change.resource_id)
1189 .await
1190 {
1191 Ok(()) => Ok(true),
1192 Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1193 ..
1194 })) => Ok(true),
1195 Err(e) => Err(e),
1196 }
1197 }
1198 ChangeType::Update => {
1199 if let Some(ref previous_content) = change.previous_content {
1200 let current = self
1201 .read(tenant, &change.resource_type, &change.resource_id)
1202 .await?;
1203 if let Some(current) = current {
1204 self.update(tenant, ¤t, previous_content.clone())
1205 .await?;
1206 Ok(true)
1207 } else {
1208 Ok(false)
1209 }
1210 } else {
1211 Ok(false)
1212 }
1213 }
1214 }
1215 }
1216}