1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7use std::time::Duration as StdDuration;
8use tokio::io::{AsyncBufRead, AsyncBufReadExt};
9use uuid::Uuid;
10
11use crate::core::ResourceStorage;
12use crate::core::bulk_export::ExportJobId;
13use crate::core::bulk_export_worker::{LeaseError, WorkerId};
14use crate::core::bulk_submit::{
15 BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
16 BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
17 StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
18 SubmissionManifest, SubmissionStatus, SubmissionSummary,
19};
20use crate::core::bulk_submit_worker::{
21 ManifestLease, ManifestWorkerView, PollTokenTarget, SubmitClaimStrategy, SubmitFileRecord,
22 SubmitFileRow, SubmitWorkerStorage,
23};
24use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
25use crate::tenant::{TenantContext, TenantId, TenantPermissions};
26
27use super::PostgresBackend;
28
29fn internal_error(message: String) -> StorageError {
30 StorageError::Backend(BackendError::Internal {
31 backend_name: "postgres".to_string(),
32 message,
33 source: None,
34 })
35}
36
37fn lease_lost(lease: &ManifestLease) -> LeaseError {
39 LeaseError::LeaseLost {
40 job_id: ExportJobId::from_string(format!("{}/{}", lease.submission_id, lease.manifest_id)),
41 }
42}
43
44fn fhir_version_from_output_format(output_format: Option<&str>) -> FhirVersion {
46 output_format
47 .and_then(|fmt| {
48 fmt.split(';').find_map(|part| {
49 let part = part.trim();
50 part.strip_prefix("fhirVersion=")
51 .and_then(FhirVersion::from_mime_param)
52 })
53 })
54 .unwrap_or_else(FhirVersion::default_enabled)
55}
56
57#[async_trait]
58impl BulkSubmitProvider for PostgresBackend {
59 async fn create_submission(
60 &self,
61 tenant: &TenantContext,
62 id: &SubmissionId,
63 metadata: Option<Value>,
64 ) -> StorageResult<SubmissionSummary> {
65 let client = self.get_client().await?;
66 let tenant_id = tenant.tenant_id().as_str();
67
68 let rows = client
70 .query(
71 "SELECT 1 FROM bulk_submissions
72 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
73 &[
74 &tenant_id,
75 &id.submitter.as_str(),
76 &id.submission_id.as_str(),
77 ],
78 )
79 .await
80 .map_err(|e| internal_error(format!("Failed to check duplicate: {}", e)))?;
81
82 if !rows.is_empty() {
83 return Err(StorageError::BulkSubmit(
84 BulkSubmitError::DuplicateSubmission {
85 submitter: id.submitter.clone(),
86 submission_id: id.submission_id.clone(),
87 },
88 ));
89 }
90
91 let now = Utc::now();
92 let metadata_json: Option<Value> = metadata.clone();
93
94 client
95 .execute(
96 "INSERT INTO bulk_submissions
97 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
98 VALUES ($1, $2, $3, 'in-progress', $4, $5, $6)",
99 &[
100 &tenant_id,
101 &id.submitter.as_str(),
102 &id.submission_id.as_str(),
103 &now,
104 &now,
105 &metadata_json,
106 ],
107 )
108 .await
109 .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
110
111 Ok(SubmissionSummary {
112 id: id.clone(),
113 status: SubmissionStatus::InProgress,
114 created_at: now,
115 updated_at: now,
116 completed_at: None,
117 manifest_count: 0,
118 total_entries: 0,
119 success_count: 0,
120 error_count: 0,
121 skipped_count: 0,
122 metadata,
123 })
124 }
125
126 async fn get_submission(
127 &self,
128 tenant: &TenantContext,
129 id: &SubmissionId,
130 ) -> StorageResult<Option<SubmissionSummary>> {
131 let client = self.get_client().await?;
132 let tenant_id = tenant.tenant_id().as_str();
133
134 let rows = client
135 .query(
136 "SELECT status, created_at, updated_at, completed_at, metadata
137 FROM bulk_submissions
138 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
139 &[
140 &tenant_id,
141 &id.submitter.as_str(),
142 &id.submission_id.as_str(),
143 ],
144 )
145 .await
146 .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
147
148 if rows.is_empty() {
149 return Ok(None);
150 }
151
152 let row = &rows[0];
153 let status_str: String = row.get(0);
154 let created_at: chrono::DateTime<Utc> = row.get(1);
155 let updated_at: chrono::DateTime<Utc> = row.get(2);
156 let completed_at: Option<chrono::DateTime<Utc>> = row.get(3);
157 let metadata: Option<Value> = row.get(4);
158
159 let status: SubmissionStatus = status_str
160 .parse()
161 .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
162
163 let manifest_row = client
165 .query_one(
166 "SELECT COUNT(*) FROM bulk_manifests
167 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
168 &[
169 &tenant_id,
170 &id.submitter.as_str(),
171 &id.submission_id.as_str(),
172 ],
173 )
174 .await
175 .map_err(|e| internal_error(format!("Failed to count manifests: {}", e)))?;
176
177 let manifest_count: i64 = manifest_row.get(0);
178
179 let counts_row = client
181 .query_one(
182 "SELECT
183 COUNT(*),
184 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
185 SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
186 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
187 FROM bulk_entry_results
188 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
189 &[&tenant_id, &id.submitter.as_str(), &id.submission_id.as_str()],
190 )
191 .await
192 .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
193
194 let total: i64 = counts_row.get(0);
195 let success: Option<i64> = counts_row.get(1);
196 let errors: Option<i64> = counts_row.get(2);
197 let skipped: Option<i64> = counts_row.get(3);
198
199 Ok(Some(SubmissionSummary {
200 id: id.clone(),
201 status,
202 created_at,
203 updated_at,
204 completed_at,
205 manifest_count: manifest_count as u32,
206 total_entries: total as u64,
207 success_count: success.unwrap_or(0) as u64,
208 error_count: errors.unwrap_or(0) as u64,
209 skipped_count: skipped.unwrap_or(0) as u64,
210 metadata,
211 }))
212 }
213
214 async fn list_submissions(
215 &self,
216 tenant: &TenantContext,
217 submitter: Option<&str>,
218 status: Option<SubmissionStatus>,
219 limit: u32,
220 offset: u32,
221 ) -> StorageResult<Vec<SubmissionSummary>> {
222 let client = self.get_client().await?;
223 let tenant_id = tenant.tenant_id().as_str();
224
225 let mut sql = "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = $1"
226 .to_string();
227 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
228 vec![Box::new(tenant_id.to_string())];
229 let mut param_idx = 2;
230
231 if let Some(submitter) = submitter {
232 sql.push_str(&format!(" AND submitter = ${}", param_idx));
233 params.push(Box::new(submitter.to_string()));
234 param_idx += 1;
235 }
236
237 if let Some(status) = status {
238 sql.push_str(&format!(" AND status = ${}", param_idx));
239 params.push(Box::new(status.to_string()));
240 }
241
242 sql.push_str(&format!(
243 " ORDER BY created_at DESC LIMIT {} OFFSET {}",
244 limit, offset
245 ));
246
247 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
248 .iter()
249 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
250 .collect();
251
252 let rows = client
253 .query(&sql, ¶m_refs)
254 .await
255 .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?;
256
257 let mut results = Vec::new();
258 for row in &rows {
259 let submitter: String = row.get(0);
260 let submission_id: String = row.get(1);
261 let sub_id = SubmissionId::new(submitter, submission_id);
262 if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
263 results.push(summary);
264 }
265 }
266
267 Ok(results)
268 }
269
270 async fn complete_submission(
271 &self,
272 tenant: &TenantContext,
273 id: &SubmissionId,
274 ) -> StorageResult<SubmissionSummary> {
275 let client = self.get_client().await?;
276 let tenant_id = tenant.tenant_id().as_str();
277
278 let rows = client
280 .query(
281 "SELECT status FROM bulk_submissions
282 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
283 &[
284 &tenant_id,
285 &id.submitter.as_str(),
286 &id.submission_id.as_str(),
287 ],
288 )
289 .await
290 .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
291
292 if rows.is_empty() {
293 return Err(StorageError::BulkSubmit(
294 BulkSubmitError::SubmissionNotFound {
295 submitter: id.submitter.clone(),
296 submission_id: id.submission_id.clone(),
297 },
298 ));
299 }
300
301 let current_status: String = rows[0].get(0);
302 if current_status != "in-progress" {
303 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
304 submission_id: id.submission_id.clone(),
305 }));
306 }
307
308 let now = Utc::now();
309 client
310 .execute(
311 "UPDATE bulk_submissions SET status = 'complete', completed_at = $1, updated_at = $2
312 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
313 &[
314 &now,
315 &now,
316 &tenant_id,
317 &id.submitter.as_str(),
318 &id.submission_id.as_str(),
319 ],
320 )
321 .await
322 .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
323
324 self.get_submission(tenant, id)
325 .await?
326 .ok_or_else(|| internal_error("Submission disappeared".to_string()))
327 }
328
329 async fn abort_submission(
330 &self,
331 tenant: &TenantContext,
332 id: &SubmissionId,
333 _reason: &str,
334 ) -> StorageResult<u64> {
335 let client = self.get_client().await?;
336 let tenant_id = tenant.tenant_id().as_str();
337
338 let rows = client
340 .query(
341 "SELECT status FROM bulk_submissions
342 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
343 &[
344 &tenant_id,
345 &id.submitter.as_str(),
346 &id.submission_id.as_str(),
347 ],
348 )
349 .await
350 .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
351
352 if rows.is_empty() {
353 return Err(StorageError::BulkSubmit(
354 BulkSubmitError::SubmissionNotFound {
355 submitter: id.submitter.clone(),
356 submission_id: id.submission_id.clone(),
357 },
358 ));
359 }
360
361 let current_status: String = rows[0].get(0);
362 if current_status != "in-progress" {
363 return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
364 submission_id: id.submission_id.clone(),
365 }));
366 }
367
368 let pending_row = client
370 .query_one(
371 "SELECT COUNT(*) FROM bulk_manifests
372 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
373 AND status IN ('pending', 'processing')",
374 &[
375 &tenant_id,
376 &id.submitter.as_str(),
377 &id.submission_id.as_str(),
378 ],
379 )
380 .await
381 .map_err(|e| internal_error(format!("Failed to count pending manifests: {}", e)))?;
382
383 let pending_count: i64 = pending_row.get(0);
384 let now = Utc::now();
385
386 client
388 .execute(
389 "UPDATE bulk_submissions SET status = 'aborted', completed_at = $1, updated_at = $2
390 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
391 &[
392 &now,
393 &now,
394 &tenant_id,
395 &id.submitter.as_str(),
396 &id.submission_id.as_str(),
397 ],
398 )
399 .await
400 .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
401
402 client
404 .execute(
405 "UPDATE bulk_manifests SET status = 'failed'
406 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
407 AND status IN ('pending', 'processing')",
408 &[
409 &tenant_id,
410 &id.submitter.as_str(),
411 &id.submission_id.as_str(),
412 ],
413 )
414 .await
415 .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
416
417 Ok(pending_count as u64)
418 }
419
420 async fn add_manifest(
421 &self,
422 tenant: &TenantContext,
423 submission_id: &SubmissionId,
424 manifest_url: Option<&str>,
425 replaces_manifest_url: Option<&str>,
426 ) -> StorageResult<SubmissionManifest> {
427 let client = self.get_client().await?;
428 let tenant_id = tenant.tenant_id().as_str();
429
430 let rows = client
432 .query(
433 "SELECT status FROM bulk_submissions
434 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
435 &[
436 &tenant_id,
437 &submission_id.submitter.as_str(),
438 &submission_id.submission_id.as_str(),
439 ],
440 )
441 .await
442 .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
443
444 if rows.is_empty() {
445 return Err(StorageError::BulkSubmit(
446 BulkSubmitError::SubmissionNotFound {
447 submitter: submission_id.submitter.clone(),
448 submission_id: submission_id.submission_id.clone(),
449 },
450 ));
451 }
452
453 let status: String = rows[0].get(0);
454 if status != "in-progress" {
455 return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
456 submission_id: submission_id.submission_id.clone(),
457 expected: "in-progress".to_string(),
458 actual: status,
459 }));
460 }
461
462 let manifest_id = Uuid::new_v4().to_string();
463 let now = Utc::now();
464
465 client
466 .execute(
467 "INSERT INTO bulk_manifests
468 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
469 VALUES ($1, $2, $3, $4, $5, $6, 'pending', $7)",
470 &[
471 &tenant_id,
472 &submission_id.submitter.as_str(),
473 &submission_id.submission_id.as_str(),
474 &manifest_id.as_str(),
475 &manifest_url,
476 &replaces_manifest_url,
477 &now,
478 ],
479 )
480 .await
481 .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
482
483 client
485 .execute(
486 "UPDATE bulk_submissions SET updated_at = $1
487 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
488 &[
489 &now,
490 &tenant_id,
491 &submission_id.submitter.as_str(),
492 &submission_id.submission_id.as_str(),
493 ],
494 )
495 .await
496 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
497
498 Ok(SubmissionManifest {
499 manifest_id,
500 manifest_url: manifest_url.map(String::from),
501 replaces_manifest_url: replaces_manifest_url.map(String::from),
502 status: ManifestStatus::Pending,
503 added_at: now,
504 total_entries: 0,
505 processed_entries: 0,
506 failed_entries: 0,
507 })
508 }
509
510 async fn get_manifest(
511 &self,
512 tenant: &TenantContext,
513 submission_id: &SubmissionId,
514 manifest_id: &str,
515 ) -> StorageResult<Option<SubmissionManifest>> {
516 let client = self.get_client().await?;
517 let tenant_id = tenant.tenant_id().as_str();
518
519 let rows = client
520 .query(
521 "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
522 FROM bulk_manifests
523 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
524 &[
525 &tenant_id,
526 &submission_id.submitter.as_str(),
527 &submission_id.submission_id.as_str(),
528 &manifest_id,
529 ],
530 )
531 .await
532 .map_err(|e| internal_error(format!("Failed to get manifest: {}", e)))?;
533
534 if rows.is_empty() {
535 return Ok(None);
536 }
537
538 let row = &rows[0];
539 let manifest_url: Option<String> = row.get(0);
540 let replaces_manifest_url: Option<String> = row.get(1);
541 let status_str: String = row.get(2);
542 let added_at: chrono::DateTime<Utc> = row.get(3);
543 let total: i32 = row.get(4);
544 let processed: i32 = row.get(5);
545 let failed: i32 = row.get(6);
546
547 let status: ManifestStatus = status_str
548 .parse()
549 .map_err(|_| internal_error(format!("Invalid manifest status: {}", status_str)))?;
550
551 Ok(Some(SubmissionManifest {
552 manifest_id: manifest_id.to_string(),
553 manifest_url,
554 replaces_manifest_url,
555 status,
556 added_at,
557 total_entries: total as u64,
558 processed_entries: processed as u64,
559 failed_entries: failed as u64,
560 }))
561 }
562
563 async fn list_manifests(
564 &self,
565 tenant: &TenantContext,
566 submission_id: &SubmissionId,
567 ) -> StorageResult<Vec<SubmissionManifest>> {
568 let client = self.get_client().await?;
569 let tenant_id = tenant.tenant_id().as_str();
570
571 let rows = client
572 .query(
573 "SELECT manifest_id FROM bulk_manifests
574 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
575 ORDER BY added_at",
576 &[
577 &tenant_id,
578 &submission_id.submitter.as_str(),
579 &submission_id.submission_id.as_str(),
580 ],
581 )
582 .await
583 .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?;
584
585 let mut results = Vec::new();
586 for row in &rows {
587 let manifest_id: String = row.get(0);
588 if let Some(manifest) = self
589 .get_manifest(tenant, submission_id, &manifest_id)
590 .await?
591 {
592 results.push(manifest);
593 }
594 }
595
596 Ok(results)
597 }
598
599 async fn process_entries(
600 &self,
601 tenant: &TenantContext,
602 submission_id: &SubmissionId,
603 manifest_id: &str,
604 entries: Vec<NdjsonEntry>,
605 options: &BulkProcessingOptions,
606 ) -> StorageResult<Vec<BulkEntryResult>> {
607 let client = self.get_client().await?;
608 let tenant_id = tenant.tenant_id().as_str();
609
610 if self
612 .get_manifest(tenant, submission_id, manifest_id)
613 .await?
614 .is_none()
615 {
616 return Err(StorageError::BulkSubmit(
617 BulkSubmitError::ManifestNotFound {
618 submission_id: submission_id.submission_id.clone(),
619 manifest_id: manifest_id.to_string(),
620 },
621 ));
622 }
623
624 client
626 .execute(
627 "UPDATE bulk_manifests SET status = 'processing'
628 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
629 &[
630 &tenant_id,
631 &submission_id.submitter.as_str(),
632 &submission_id.submission_id.as_str(),
633 &manifest_id,
634 ],
635 )
636 .await
637 .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
638
639 let mut results = Vec::new();
640 let mut error_count = 0u32;
641
642 for entry in entries {
643 if options.max_errors > 0 && error_count >= options.max_errors {
644 if !options.continue_on_error {
645 return Err(StorageError::BulkSubmit(
646 BulkSubmitError::MaxErrorsExceeded {
647 submission_id: submission_id.submission_id.clone(),
648 max_errors: options.max_errors,
649 },
650 ));
651 }
652 let skip_result = BulkEntryResult::skipped(
653 entry.line_number,
654 &entry.resource_type,
655 "max errors exceeded",
656 );
657 results.push(skip_result);
658 continue;
659 }
660
661 let result = self
662 .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
663 .await;
664
665 let entry_result = match result {
666 Ok(r) => r,
667 Err(e) => {
668 error_count += 1;
669 BulkEntryResult::processing_error(
670 entry.line_number,
671 &entry.resource_type,
672 serde_json::json!({
673 "resourceType": "OperationOutcome",
674 "issue": [{
675 "severity": "error",
676 "code": "exception",
677 "diagnostics": e.to_string()
678 }]
679 }),
680 )
681 }
682 };
683
684 if entry_result.is_error() {
685 error_count += 1;
686 }
687
688 self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
689 .await?;
690
691 results.push(entry_result);
692 }
693
694 let now = Utc::now();
696 client
697 .execute(
698 "UPDATE bulk_manifests SET
699 total_entries = total_entries + $1,
700 processed_entries = processed_entries + $2,
701 failed_entries = failed_entries + $3
702 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
703 &[
704 &(results.len() as i32),
705 &(results.iter().filter(|r| r.is_success()).count() as i32),
706 &(error_count as i32),
707 &tenant_id,
708 &submission_id.submitter.as_str(),
709 &submission_id.submission_id.as_str(),
710 &manifest_id,
711 ],
712 )
713 .await
714 .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
715
716 client
718 .execute(
719 "UPDATE bulk_submissions SET updated_at = $1
720 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
721 &[
722 &now,
723 &tenant_id,
724 &submission_id.submitter.as_str(),
725 &submission_id.submission_id.as_str(),
726 ],
727 )
728 .await
729 .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
730
731 Ok(results)
732 }
733
734 async fn get_entry_results(
735 &self,
736 tenant: &TenantContext,
737 submission_id: &SubmissionId,
738 manifest_id: &str,
739 outcome_filter: Option<BulkEntryOutcome>,
740 limit: u32,
741 offset: u32,
742 ) -> StorageResult<Vec<BulkEntryResult>> {
743 let client = self.get_client().await?;
744 let tenant_id = tenant.tenant_id().as_str();
745
746 let mut sql =
747 "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
748 FROM bulk_entry_results
749 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4"
750 .to_string();
751
752 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
753 Box::new(tenant_id.to_string()),
754 Box::new(submission_id.submitter.clone()),
755 Box::new(submission_id.submission_id.clone()),
756 Box::new(manifest_id.to_string()),
757 ];
758
759 if let Some(outcome) = outcome_filter {
760 sql.push_str(" AND outcome = $5");
761 params.push(Box::new(outcome.to_string()));
762 }
763
764 sql.push_str(&format!(
765 " ORDER BY line_number LIMIT {} OFFSET {}",
766 limit, offset
767 ));
768
769 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
770 .iter()
771 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
772 .collect();
773
774 let rows = client
775 .query(&sql, ¶m_refs)
776 .await
777 .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?;
778
779 let results: Vec<BulkEntryResult> = rows
780 .iter()
781 .map(|row| {
782 let line_number: i32 = row.get(0);
783 let resource_type: String = row.get(1);
784 let resource_id: Option<String> = row.get(2);
785 let created: Option<bool> = row.get(3);
786 let outcome_str: String = row.get(4);
787 let operation_outcome: Option<Value> = row.get(5);
788
789 let outcome: BulkEntryOutcome = outcome_str
790 .parse()
791 .unwrap_or(BulkEntryOutcome::ProcessingError);
792
793 BulkEntryResult {
794 line_number: line_number as u64,
795 resource_type,
796 resource_id,
797 created: created.unwrap_or(false),
798 outcome,
799 operation_outcome,
800 }
801 })
802 .collect();
803
804 Ok(results)
805 }
806
807 async fn get_entry_counts(
808 &self,
809 tenant: &TenantContext,
810 submission_id: &SubmissionId,
811 manifest_id: &str,
812 ) -> StorageResult<EntryCountSummary> {
813 let client = self.get_client().await?;
814 let tenant_id = tenant.tenant_id().as_str();
815
816 let row = client
817 .query_one(
818 "SELECT
819 COUNT(*),
820 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
821 SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
822 SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
823 SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
824 FROM bulk_entry_results
825 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
826 &[
827 &tenant_id,
828 &submission_id.submitter.as_str(),
829 &submission_id.submission_id.as_str(),
830 &manifest_id,
831 ],
832 )
833 .await
834 .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
835
836 let total: i64 = row.get(0);
837 let success: Option<i64> = row.get(1);
838 let validation_error: Option<i64> = row.get(2);
839 let processing_error: Option<i64> = row.get(3);
840 let skipped: Option<i64> = row.get(4);
841
842 Ok(EntryCountSummary {
843 total: total as u64,
844 success: success.unwrap_or(0) as u64,
845 validation_error: validation_error.unwrap_or(0) as u64,
846 processing_error: processing_error.unwrap_or(0) as u64,
847 skipped: skipped.unwrap_or(0) as u64,
848 })
849 }
850}
851
852impl PostgresBackend {
853 async fn process_single_entry(
855 &self,
856 tenant: &TenantContext,
857 submission_id: &SubmissionId,
858 manifest_id: &str,
859 entry: &NdjsonEntry,
860 options: &BulkProcessingOptions,
861 ) -> StorageResult<BulkEntryResult> {
862 let resource_id = entry.resource_id.as_ref();
863
864 if let Some(id) = resource_id {
865 let existing = self.read(tenant, &entry.resource_type, id).await;
866
867 match existing {
868 Ok(Some(current)) => {
869 if !options.allow_updates {
870 return Ok(BulkEntryResult::skipped(
871 entry.line_number,
872 &entry.resource_type,
873 "updates not allowed",
874 ));
875 }
876
877 let change = SubmissionChange::update(
878 manifest_id,
879 &entry.resource_type,
880 id,
881 current.version_id(),
882 (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
883 current.content().clone(),
884 );
885 self.record_change(tenant, submission_id, &change).await?;
886
887 let updated = self
888 .update(tenant, ¤t, entry.resource.clone())
889 .await?;
890
891 Ok(BulkEntryResult::success(
892 entry.line_number,
893 &entry.resource_type,
894 updated.id(),
895 false,
896 ))
897 }
898 Ok(None)
899 | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
900 let created = self
901 .create(
902 tenant,
903 &entry.resource_type,
904 entry.resource.clone(),
905 FhirVersion::default_enabled(),
906 )
907 .await?;
908
909 let change = SubmissionChange::create(
910 manifest_id,
911 &entry.resource_type,
912 created.id(),
913 created.version_id(),
914 );
915 self.record_change(tenant, submission_id, &change).await?;
916
917 Ok(BulkEntryResult::success(
918 entry.line_number,
919 &entry.resource_type,
920 created.id(),
921 true,
922 ))
923 }
924 Err(e) => Err(e),
925 }
926 } else {
927 let created = self
928 .create(
929 tenant,
930 &entry.resource_type,
931 entry.resource.clone(),
932 FhirVersion::default_enabled(),
933 )
934 .await?;
935
936 let change = SubmissionChange::create(
937 manifest_id,
938 &entry.resource_type,
939 created.id(),
940 created.version_id(),
941 );
942 self.record_change(tenant, submission_id, &change).await?;
943
944 Ok(BulkEntryResult::success(
945 entry.line_number,
946 &entry.resource_type,
947 created.id(),
948 true,
949 ))
950 }
951 }
952
953 async fn store_entry_result(
955 &self,
956 tenant: &TenantContext,
957 submission_id: &SubmissionId,
958 manifest_id: &str,
959 result: &BulkEntryResult,
960 ) -> StorageResult<()> {
961 let client = self.get_client().await?;
962 let tenant_id = tenant.tenant_id().as_str();
963
964 let outcome_json: Option<Value> = result.operation_outcome.clone();
965
966 client
967 .execute(
968 "INSERT INTO bulk_entry_results
969 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
970 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
971 &[
972 &tenant_id,
973 &submission_id.submitter.as_str(),
974 &submission_id.submission_id.as_str(),
975 &manifest_id,
976 &(result.line_number as i32),
977 &result.resource_type.as_str(),
978 &result.resource_id,
979 &result.created,
980 &result.outcome.to_string().as_str(),
981 &outcome_json,
982 ],
983 )
984 .await
985 .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
986
987 Ok(())
988 }
989}
990
991#[async_trait]
992impl StreamingBulkSubmitProvider for PostgresBackend {
993 async fn process_ndjson_stream(
994 &self,
995 tenant: &TenantContext,
996 submission_id: &SubmissionId,
997 manifest_id: &str,
998 resource_type: &str,
999 mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
1000 options: &BulkProcessingOptions,
1001 ) -> StorageResult<StreamProcessingResult> {
1002 let mut result = StreamProcessingResult::new();
1003 let mut line_number = 0u64;
1004 let mut batch = Vec::new();
1005
1006 loop {
1007 let mut line = String::new();
1008 let bytes_read = reader
1009 .read_line(&mut line)
1010 .await
1011 .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
1012
1013 if bytes_read == 0 {
1014 break;
1015 }
1016
1017 line_number += 1;
1018 result.lines_processed = line_number;
1019
1020 let line = line.trim();
1021 if line.is_empty() {
1022 continue;
1023 }
1024
1025 match NdjsonEntry::parse(line_number, line) {
1026 Ok(entry) => {
1027 if entry.resource_type != resource_type {
1028 let error_result = BulkEntryResult::validation_error(
1029 line_number,
1030 &entry.resource_type,
1031 serde_json::json!({
1032 "resourceType": "OperationOutcome",
1033 "issue": [{
1034 "severity": "error",
1035 "code": "invalid",
1036 "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1037 }]
1038 }),
1039 );
1040 result.counts.increment(error_result.outcome);
1041
1042 if !options.continue_on_error
1043 && (options.max_errors == 0
1044 || result.counts.error_count() >= options.max_errors as u64)
1045 {
1046 return Ok(result.aborted("max errors exceeded"));
1047 }
1048 continue;
1049 }
1050
1051 batch.push(entry);
1052 }
1053 Err(e) => {
1054 result.counts.increment(BulkEntryOutcome::ValidationError);
1055
1056 if !options.continue_on_error
1057 && (options.max_errors == 0
1058 || result.counts.error_count() >= options.max_errors as u64)
1059 {
1060 return Ok(result.aborted(format!("Parse error: {}", e)));
1061 }
1062 }
1063 }
1064
1065 if batch.len() >= options.batch_size as usize {
1066 let batch_results = self
1067 .process_entries(
1068 tenant,
1069 submission_id,
1070 manifest_id,
1071 std::mem::take(&mut batch),
1072 options,
1073 )
1074 .await?;
1075
1076 for r in batch_results {
1077 result.counts.increment(r.outcome);
1078 }
1079
1080 if !options.continue_on_error
1081 && options.max_errors > 0
1082 && result.counts.error_count() >= options.max_errors as u64
1083 {
1084 return Ok(result.aborted("max errors exceeded"));
1085 }
1086 }
1087 }
1088
1089 if !batch.is_empty() {
1091 let batch_results = self
1092 .process_entries(tenant, submission_id, manifest_id, batch, options)
1093 .await?;
1094
1095 for r in batch_results {
1096 result.counts.increment(r.outcome);
1097 }
1098 }
1099
1100 Ok(result)
1101 }
1102}
1103
1104#[async_trait]
1105impl BulkSubmitRollbackProvider for PostgresBackend {
1106 async fn record_change(
1107 &self,
1108 tenant: &TenantContext,
1109 submission_id: &SubmissionId,
1110 change: &SubmissionChange,
1111 ) -> StorageResult<()> {
1112 let client = self.get_client().await?;
1113 let tenant_id = tenant.tenant_id().as_str();
1114
1115 let previous_content_json: Option<Value> = change.previous_content.clone();
1116
1117 client
1118 .execute(
1119 "INSERT INTO bulk_submission_changes
1120 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1121 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1122 &[
1123 &tenant_id,
1124 &submission_id.submitter.as_str(),
1125 &submission_id.submission_id.as_str(),
1126 &change.change_id.as_str(),
1127 &change.manifest_id.as_str(),
1128 &change.change_type.to_string().as_str(),
1129 &change.resource_type.as_str(),
1130 &change.resource_id.as_str(),
1131 &change.previous_version,
1132 &change.new_version.as_str(),
1133 &previous_content_json,
1134 &change.changed_at,
1135 ],
1136 )
1137 .await
1138 .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1139
1140 Ok(())
1141 }
1142
1143 async fn list_changes(
1144 &self,
1145 tenant: &TenantContext,
1146 submission_id: &SubmissionId,
1147 limit: u32,
1148 offset: u32,
1149 ) -> StorageResult<Vec<SubmissionChange>> {
1150 let client = self.get_client().await?;
1151 let tenant_id = tenant.tenant_id().as_str();
1152
1153 let sql = format!(
1154 "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1155 FROM bulk_submission_changes
1156 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1157 ORDER BY changed_at DESC
1158 LIMIT {} OFFSET {}",
1159 limit, offset
1160 );
1161
1162 let rows = client
1163 .query(
1164 &sql,
1165 &[
1166 &tenant_id,
1167 &submission_id.submitter.as_str(),
1168 &submission_id.submission_id.as_str(),
1169 ],
1170 )
1171 .await
1172 .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?;
1173
1174 let changes: Vec<SubmissionChange> = rows
1175 .iter()
1176 .map(|row| {
1177 let change_id: String = row.get(0);
1178 let manifest_id: String = row.get(1);
1179 let change_type_str: String = row.get(2);
1180 let resource_type: String = row.get(3);
1181 let resource_id: String = row.get(4);
1182 let previous_version: Option<String> = row.get(5);
1183 let new_version: String = row.get(6);
1184 let previous_content: Option<Value> = row.get(7);
1185 let changed_at: chrono::DateTime<Utc> = row.get(8);
1186
1187 let change_type: ChangeType = change_type_str.parse().unwrap_or(ChangeType::Create);
1188
1189 SubmissionChange {
1190 change_id,
1191 manifest_id,
1192 change_type,
1193 resource_type,
1194 resource_id,
1195 previous_version,
1196 new_version,
1197 previous_content,
1198 changed_at,
1199 }
1200 })
1201 .collect();
1202
1203 Ok(changes)
1204 }
1205
1206 async fn rollback_change(
1207 &self,
1208 tenant: &TenantContext,
1209 _submission_id: &SubmissionId,
1210 change: &SubmissionChange,
1211 ) -> StorageResult<bool> {
1212 match change.change_type {
1213 ChangeType::Create => {
1214 match self
1215 .delete(tenant, &change.resource_type, &change.resource_id)
1216 .await
1217 {
1218 Ok(()) => Ok(true),
1219 Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1220 ..
1221 })) => Ok(true),
1222 Err(e) => Err(e),
1223 }
1224 }
1225 ChangeType::Update => {
1226 if let Some(ref previous_content) = change.previous_content {
1227 let current = self
1228 .read(tenant, &change.resource_type, &change.resource_id)
1229 .await?;
1230 if let Some(current) = current {
1231 self.update(tenant, ¤t, previous_content.clone())
1232 .await?;
1233 Ok(true)
1234 } else {
1235 Ok(false)
1236 }
1237 } else {
1238 Ok(false)
1239 }
1240 }
1241 }
1242 }
1243}
1244
1245#[async_trait]
1246impl SubmitClaimStrategy for PostgresBackend {
1247 async fn claim_next_manifest(
1248 &self,
1249 worker_id: &WorkerId,
1250 lease_duration: StdDuration,
1251 ) -> StorageResult<Option<ManifestLease>> {
1252 let mut client = self.get_client().await?;
1253 let now = Utc::now();
1254 let lease_expiry = now
1255 + chrono::Duration::from_std(lease_duration)
1256 .unwrap_or_else(|_| chrono::Duration::seconds(60));
1257
1258 let txn = client
1259 .transaction()
1260 .await
1261 .map_err(|e| internal_error(format!("Failed to begin claim txn: {}", e)))?;
1262
1263 let rows = txn
1264 .query(
1265 "SELECT m.tenant_id, m.submitter, m.submission_id, m.manifest_id, m.fencing_token
1266 FROM bulk_manifests m
1267 JOIN bulk_submissions s
1268 ON s.tenant_id = m.tenant_id AND s.submitter = m.submitter
1269 AND s.submission_id = m.submission_id
1270 WHERE m.manifest_url IS NOT NULL
1271 AND s.status = 'in-progress'
1272 AND (m.status = 'pending'
1273 OR (m.status = 'processing'
1274 AND (m.lease_expiry IS NULL OR m.lease_expiry < $1)))
1275 ORDER BY m.added_at
1276 LIMIT 1
1277 FOR UPDATE OF m SKIP LOCKED",
1278 &[&now],
1279 )
1280 .await
1281 .map_err(|e| internal_error(format!("Failed to select claimable manifest: {}", e)))?;
1282
1283 let Some(row) = rows.first() else {
1284 txn.commit()
1285 .await
1286 .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
1287 return Ok(None);
1288 };
1289 let tenant_id: String = row.get(0);
1290 let submitter: String = row.get(1);
1291 let submission_id: String = row.get(2);
1292 let manifest_id: String = row.get(3);
1293 let fencing_token: i64 = row.get(4);
1294 let new_token = fencing_token + 1;
1295
1296 txn.execute(
1297 "UPDATE bulk_manifests
1298 SET status = 'processing', worker_id = $1, lease_expiry = $2, fencing_token = $3
1299 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
1300 &[
1301 &worker_id.as_str(),
1302 &lease_expiry,
1303 &new_token,
1304 &tenant_id,
1305 &submitter,
1306 &submission_id,
1307 &manifest_id,
1308 ],
1309 )
1310 .await
1311 .map_err(|e| internal_error(format!("Failed to claim manifest: {}", e)))?;
1312
1313 txn.commit()
1314 .await
1315 .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
1316
1317 Ok(Some(ManifestLease {
1318 tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1319 submission_id: SubmissionId::new(submitter, submission_id),
1320 manifest_id,
1321 worker_id: worker_id.clone(),
1322 lease_expiry,
1323 fencing_token: new_token as u64,
1324 }))
1325 }
1326
1327 async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError> {
1328 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1329 let now = Utc::now();
1330 let new_expiry = now + chrono::Duration::seconds(60);
1331 let affected = client
1332 .execute(
1333 "UPDATE bulk_manifests SET lease_expiry = $1
1334 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4
1335 AND manifest_id = $5 AND worker_id = $6 AND fencing_token = $7",
1336 &[
1337 &new_expiry,
1338 &lease.tenant.tenant_id().as_str(),
1339 &lease.submission_id.submitter,
1340 &lease.submission_id.submission_id,
1341 &lease.manifest_id,
1342 &lease.worker_id.as_str(),
1343 &(lease.fencing_token as i64),
1344 ],
1345 )
1346 .await
1347 .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
1348 if affected == 0 {
1349 Err(lease_lost(lease))
1350 } else {
1351 Ok(new_expiry)
1352 }
1353 }
1354
1355 async fn release(&self, lease: ManifestLease) -> StorageResult<()> {
1356 let client = self.get_client().await?;
1357 client
1358 .execute(
1359 "UPDATE bulk_manifests
1360 SET status = 'pending', worker_id = NULL, lease_expiry = NULL
1361 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1362 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6
1363 AND status = 'processing'",
1364 &[
1365 &lease.tenant.tenant_id().as_str(),
1366 &lease.submission_id.submitter,
1367 &lease.submission_id.submission_id,
1368 &lease.manifest_id,
1369 &lease.worker_id.as_str(),
1370 &(lease.fencing_token as i64),
1371 ],
1372 )
1373 .await
1374 .map_err(|e| internal_error(format!("Failed to release manifest lease: {}", e)))?;
1375 Ok(())
1376 }
1377}
1378
1379#[async_trait]
1380impl SubmitWorkerStorage for PostgresBackend {
1381 async fn get_manifest_for_worker(
1382 &self,
1383 lease: &ManifestLease,
1384 ) -> Result<ManifestWorkerView, LeaseError> {
1385 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1386 let rows = client
1387 .query(
1388 "SELECT manifest_url, fhir_base_url, output_format, file_request_headers,
1389 oauth_metadata_urls, file_encryption_key, last_processed_line
1390 FROM bulk_manifests
1391 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1392 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1393 &[
1394 &lease.tenant.tenant_id().as_str(),
1395 &lease.submission_id.submitter,
1396 &lease.submission_id.submission_id,
1397 &lease.manifest_id,
1398 &lease.worker_id.as_str(),
1399 &(lease.fencing_token as i64),
1400 ],
1401 )
1402 .await
1403 .map_err(|e| LeaseError::Storage(internal_error(format!("load manifest: {e}"))))?;
1404 let row = rows.first().ok_or_else(|| lease_lost(lease))?;
1405
1406 let manifest_url: Option<String> = row.get(0);
1407 let fhir_base_url: Option<String> = row.get(1);
1408 let output_format: Option<String> = row.get(2);
1409 let headers_json: Option<String> = row.get(3);
1410 let oauth_json: Option<String> = row.get(4);
1411 let encryption_json: Option<String> = row.get(5);
1412 let last_processed_line: i64 = row.get(6);
1413
1414 let file_request_headers: Vec<(String, String)> = headers_json
1415 .as_deref()
1416 .and_then(|s| serde_json::from_str(s).ok())
1417 .unwrap_or_default();
1418 let oauth_metadata_urls: Vec<String> = oauth_json
1419 .as_deref()
1420 .and_then(|s| serde_json::from_str(s).ok())
1421 .unwrap_or_default();
1422 let file_encryption_key: Option<Value> = encryption_json
1423 .as_deref()
1424 .and_then(|s| serde_json::from_str(s).ok());
1425 let fhir_version = fhir_version_from_output_format(output_format.as_deref());
1426
1427 Ok(ManifestWorkerView {
1428 manifest_id: lease.manifest_id.clone(),
1429 manifest_url,
1430 fhir_base_url,
1431 output_format,
1432 file_request_headers,
1433 oauth_metadata_urls,
1434 file_encryption_key,
1435 last_processed_line: last_processed_line.max(0) as u64,
1436 fhir_version,
1437 })
1438 }
1439
1440 async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1441 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1442 let affected = client
1443 .execute(
1444 "UPDATE bulk_manifests SET status = 'processing'
1445 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1446 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1447 &[
1448 &lease.tenant.tenant_id().as_str(),
1449 &lease.submission_id.submitter,
1450 &lease.submission_id.submission_id,
1451 &lease.manifest_id,
1452 &lease.worker_id.as_str(),
1453 &(lease.fencing_token as i64),
1454 ],
1455 )
1456 .await
1457 .map_err(|e| LeaseError::Storage(internal_error(format!("mark processing: {e}"))))?;
1458 if affected == 0 {
1459 Err(lease_lost(lease))
1460 } else {
1461 Ok(())
1462 }
1463 }
1464
1465 async fn update_manifest_progress(
1466 &self,
1467 lease: &ManifestLease,
1468 processed_entries: u64,
1469 failed_entries: u64,
1470 last_processed_line: u64,
1471 ) -> Result<(), LeaseError> {
1472 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1473 let affected = client
1474 .execute(
1475 "UPDATE bulk_manifests
1476 SET processed_entries = $1, failed_entries = $2, last_processed_line = $3
1477 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6
1478 AND manifest_id = $7 AND worker_id = $8 AND fencing_token = $9",
1479 &[
1480 &(processed_entries as i32),
1481 &(failed_entries as i32),
1482 &(last_processed_line as i64),
1483 &lease.tenant.tenant_id().as_str(),
1484 &lease.submission_id.submitter,
1485 &lease.submission_id.submission_id,
1486 &lease.manifest_id,
1487 &lease.worker_id.as_str(),
1488 &(lease.fencing_token as i64),
1489 ],
1490 )
1491 .await
1492 .map_err(|e| LeaseError::Storage(internal_error(format!("update progress: {e}"))))?;
1493 if affected == 0 {
1494 Err(lease_lost(lease))
1495 } else {
1496 Ok(())
1497 }
1498 }
1499
1500 async fn record_submit_file(
1501 &self,
1502 lease: &ManifestLease,
1503 file: &SubmitFileRecord,
1504 ) -> Result<(), LeaseError> {
1505 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1506 let holds = client
1508 .query(
1509 "SELECT 1 FROM bulk_manifests
1510 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1511 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1512 &[
1513 &lease.tenant.tenant_id().as_str(),
1514 &lease.submission_id.submitter,
1515 &lease.submission_id.submission_id,
1516 &lease.manifest_id,
1517 &lease.worker_id.as_str(),
1518 &(lease.fencing_token as i64),
1519 ],
1520 )
1521 .await
1522 .map_err(|e| LeaseError::Storage(internal_error(format!("fence check: {e}"))))?;
1523 if holds.is_empty() {
1524 return Err(lease_lost(lease));
1525 }
1526
1527 let count_severity = file
1528 .count_severity
1529 .as_ref()
1530 .and_then(|v| serde_json::to_string(v).ok());
1531 client
1532 .execute(
1533 "INSERT INTO bulk_submit_files
1534 (tenant_id, submitter, submission_id, manifest_url, file_type, resource_type,
1535 part_index, fencing_token, file_path, line_count, byte_count, count_severity,
1536 created_at)
1537 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
1538 &[
1539 &lease.tenant.tenant_id().as_str(),
1540 &lease.submission_id.submitter,
1541 &lease.submission_id.submission_id,
1542 &file.manifest_url,
1543 &file.file_type,
1544 &file.resource_type,
1545 &(file.part_index as i32),
1546 &(lease.fencing_token as i64),
1547 &file.file_path,
1548 &(file.line_count as i64),
1549 &(file.byte_count as i64),
1550 &count_severity,
1551 &Utc::now(),
1552 ],
1553 )
1554 .await
1555 .map_err(|e| LeaseError::Storage(internal_error(format!("record submit file: {e}"))))?;
1556 Ok(())
1557 }
1558
1559 async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1560 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1561 let affected = client
1562 .execute(
1563 "UPDATE bulk_manifests SET status = 'completed', worker_id = NULL, lease_expiry = NULL
1564 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1565 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1566 &[
1567 &lease.tenant.tenant_id().as_str(),
1568 &lease.submission_id.submitter,
1569 &lease.submission_id.submission_id,
1570 &lease.manifest_id,
1571 &lease.worker_id.as_str(),
1572 &(lease.fencing_token as i64),
1573 ],
1574 )
1575 .await
1576 .map_err(|e| LeaseError::Storage(internal_error(format!("finish manifest: {e}"))))?;
1577 if affected == 0 {
1578 Err(lease_lost(lease))
1579 } else {
1580 Ok(())
1581 }
1582 }
1583
1584 async fn fail_manifest(
1585 &self,
1586 lease: &ManifestLease,
1587 _error_message: &str,
1588 ) -> Result<(), LeaseError> {
1589 let client = self.get_client().await.map_err(LeaseError::Storage)?;
1590 let affected = client
1591 .execute(
1592 "UPDATE bulk_manifests SET status = 'failed', worker_id = NULL, lease_expiry = NULL
1593 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1594 AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1595 &[
1596 &lease.tenant.tenant_id().as_str(),
1597 &lease.submission_id.submitter,
1598 &lease.submission_id.submission_id,
1599 &lease.manifest_id,
1600 &lease.worker_id.as_str(),
1601 &(lease.fencing_token as i64),
1602 ],
1603 )
1604 .await
1605 .map_err(|e| LeaseError::Storage(internal_error(format!("fail manifest: {e}"))))?;
1606 if affected == 0 {
1607 Err(lease_lost(lease))
1608 } else {
1609 Ok(())
1610 }
1611 }
1612
1613 async fn set_manifest_fetch_params(
1614 &self,
1615 tenant: &TenantContext,
1616 id: &SubmissionId,
1617 manifest_id: &str,
1618 fhir_base_url: Option<&str>,
1619 output_format: Option<&str>,
1620 file_request_headers: &[(String, String)],
1621 oauth_metadata_urls: &[String],
1622 file_encryption_key: Option<&Value>,
1623 ) -> StorageResult<()> {
1624 let client = self.get_client().await?;
1625 let fhir_base_url = fhir_base_url.map(|s| s.to_string());
1626 let output_format = output_format.map(|s| s.to_string());
1627 let headers_json = serde_json::to_string(file_request_headers).ok();
1628 let oauth_json = serde_json::to_string(oauth_metadata_urls).ok();
1629 let encryption_json = file_encryption_key.and_then(|v| serde_json::to_string(v).ok());
1630 client
1631 .execute(
1632 "UPDATE bulk_manifests
1633 SET fhir_base_url = $1, output_format = $2, file_request_headers = $3,
1634 oauth_metadata_urls = $4, file_encryption_key = $5
1635 WHERE tenant_id = $6 AND submitter = $7 AND submission_id = $8 AND manifest_id = $9",
1636 &[
1637 &fhir_base_url,
1638 &output_format,
1639 &headers_json,
1640 &oauth_json,
1641 &encryption_json,
1642 &tenant.tenant_id().as_str(),
1643 &id.submitter,
1644 &id.submission_id,
1645 &manifest_id,
1646 ],
1647 )
1648 .await
1649 .map_err(|e| internal_error(format!("set manifest fetch params: {e}")))?;
1650 Ok(())
1651 }
1652
1653 async fn replace_manifest_by_url(
1654 &self,
1655 tenant: &TenantContext,
1656 id: &SubmissionId,
1657 manifest_url: &str,
1658 ) -> StorageResult<Vec<String>> {
1659 let client = self.get_client().await?;
1660 let tenant_id = tenant.tenant_id().as_str();
1661 let rows = client
1662 .query(
1663 "SELECT manifest_id FROM bulk_manifests
1664 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1665 AND manifest_url = $4 AND status != 'replaced'",
1666 &[&tenant_id, &id.submitter, &id.submission_id, &manifest_url],
1667 )
1668 .await
1669 .map_err(|e| internal_error(format!("replace lookup: {e}")))?;
1670 let ids: Vec<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
1671 client
1672 .execute(
1673 "UPDATE bulk_manifests SET status = 'replaced'
1674 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_url = $4",
1675 &[&tenant_id, &id.submitter, &id.submission_id, &manifest_url],
1676 )
1677 .await
1678 .map_err(|e| internal_error(format!("mark replaced: {e}")))?;
1679 Ok(ids)
1680 }
1681
1682 async fn set_submission_kickoff_meta(
1683 &self,
1684 tenant: &TenantContext,
1685 id: &SubmissionId,
1686 owner_subject: Option<&str>,
1687 request_url: &str,
1688 requires_access_token: bool,
1689 ) -> StorageResult<()> {
1690 let client = self.get_client().await?;
1691 let owner = owner_subject.map(|s| s.to_string());
1692 client
1693 .execute(
1694 "UPDATE bulk_submissions
1695 SET owner_subject = $1, request_url = $2, requires_access_token = $3
1696 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6",
1697 &[
1698 &owner,
1699 &request_url,
1700 &requires_access_token,
1701 &tenant.tenant_id().as_str(),
1702 &id.submitter,
1703 &id.submission_id,
1704 ],
1705 )
1706 .await
1707 .map_err(|e| internal_error(format!("set kickoff meta: {e}")))?;
1708 Ok(())
1709 }
1710
1711 async fn ensure_poll_token(
1712 &self,
1713 tenant: &TenantContext,
1714 id: &SubmissionId,
1715 ) -> StorageResult<String> {
1716 let client = self.get_client().await?;
1717 let tenant_id = tenant.tenant_id().as_str();
1718 let rows = client
1719 .query(
1720 "SELECT poll_token FROM bulk_submissions
1721 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1722 &[&tenant_id, &id.submitter, &id.submission_id],
1723 )
1724 .await
1725 .map_err(|e| internal_error(format!("read poll token: {e}")))?;
1726 if let Some(row) = rows.first() {
1727 let existing: Option<String> = row.get(0);
1728 if let Some(token) = existing {
1729 return Ok(token);
1730 }
1731 }
1732 let token = Uuid::new_v4().to_string();
1733 client
1734 .execute(
1735 "UPDATE bulk_submissions SET poll_token = $1
1736 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
1737 &[&token, &tenant_id, &id.submitter, &id.submission_id],
1738 )
1739 .await
1740 .map_err(|e| internal_error(format!("set poll token: {e}")))?;
1741 Ok(token)
1742 }
1743
1744 async fn list_expired_submissions(
1745 &self,
1746 now: DateTime<Utc>,
1747 ttl: StdDuration,
1748 limit: u32,
1749 ) -> StorageResult<Vec<(TenantContext, SubmissionId)>> {
1750 let client = self.get_client().await?;
1751 let cutoff = now
1752 - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::seconds(86400));
1753 let rows = client
1754 .query(
1755 "SELECT tenant_id, submitter, submission_id FROM bulk_submissions
1756 WHERE updated_at < $1 ORDER BY updated_at LIMIT $2",
1757 &[&cutoff, &(limit as i64)],
1758 )
1759 .await
1760 .map_err(|e| internal_error(format!("list expired: {e}")))?;
1761 Ok(rows
1762 .iter()
1763 .map(|r| {
1764 let tenant_id: String = r.get(0);
1765 let submitter: String = r.get(1);
1766 let submission_id: String = r.get(2);
1767 (
1768 TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1769 SubmissionId::new(submitter, submission_id),
1770 )
1771 })
1772 .collect())
1773 }
1774
1775 async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>> {
1776 let client = self.get_client().await?;
1777 let rows = client
1778 .query(
1779 "SELECT tenant_id, submitter, submission_id, owner_subject
1780 FROM bulk_submissions WHERE poll_token = $1",
1781 &[&token],
1782 )
1783 .await
1784 .map_err(|e| internal_error(format!("resolve poll token: {e}")))?;
1785 Ok(rows.first().map(|row| {
1786 let tenant_id: String = row.get(0);
1787 let submitter: String = row.get(1);
1788 let submission_id: String = row.get(2);
1789 let owner_subject: Option<String> = row.get(3);
1790 PollTokenTarget {
1791 tenant: TenantContext::new(
1792 TenantId::new(tenant_id),
1793 TenantPermissions::full_access(),
1794 ),
1795 submission_id: SubmissionId::new(submitter, submission_id),
1796 owner_subject,
1797 }
1798 }))
1799 }
1800
1801 async fn clear_poll_token(
1802 &self,
1803 tenant: &TenantContext,
1804 id: &SubmissionId,
1805 ) -> StorageResult<()> {
1806 let client = self.get_client().await?;
1807 client
1808 .execute(
1809 "UPDATE bulk_submissions SET poll_token = NULL
1810 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1811 &[
1812 &tenant.tenant_id().as_str(),
1813 &id.submitter,
1814 &id.submission_id,
1815 ],
1816 )
1817 .await
1818 .map_err(|e| internal_error(format!("clear poll token: {e}")))?;
1819 Ok(())
1820 }
1821
1822 async fn list_submit_files(
1823 &self,
1824 tenant: &TenantContext,
1825 id: &SubmissionId,
1826 ) -> StorageResult<Vec<SubmitFileRow>> {
1827 let client = self.get_client().await?;
1828 let rows = client
1829 .query(
1830 "SELECT manifest_url, file_type, resource_type, part_index, fencing_token,
1831 file_path, line_count, byte_count, count_severity
1832 FROM bulk_submit_files
1833 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1834 ORDER BY id",
1835 &[
1836 &tenant.tenant_id().as_str(),
1837 &id.submitter,
1838 &id.submission_id,
1839 ],
1840 )
1841 .await
1842 .map_err(|e| internal_error(format!("list submit files: {e}")))?;
1843 Ok(rows
1844 .iter()
1845 .map(|row| {
1846 let count_severity: Option<String> = row.get(8);
1847 SubmitFileRow {
1848 manifest_url: row.get(0),
1849 file_type: row.get(1),
1850 resource_type: row.get(2),
1851 part_index: row.get::<_, i32>(3) as u32,
1852 fencing_token: row.get::<_, i64>(4) as u64,
1853 file_path: row.get(5),
1854 line_count: row.get::<_, i64>(6) as u64,
1855 byte_count: row.get::<_, i64>(7) as u64,
1856 count_severity: count_severity
1857 .as_deref()
1858 .and_then(|s| serde_json::from_str(s).ok()),
1859 }
1860 })
1861 .collect())
1862 }
1863
1864 async fn delete_submission_artifacts(
1865 &self,
1866 tenant: &TenantContext,
1867 id: &SubmissionId,
1868 ) -> StorageResult<()> {
1869 let client = self.get_client().await?;
1870 client
1871 .execute(
1872 "DELETE FROM bulk_submit_files
1873 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1874 &[
1875 &tenant.tenant_id().as_str(),
1876 &id.submitter,
1877 &id.submission_id,
1878 ],
1879 )
1880 .await
1881 .map_err(|e| internal_error(format!("delete artifacts: {e}")))?;
1882 Ok(())
1883 }
1884
1885 async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64> {
1886 let client = self.get_client().await?;
1887 let row = client
1888 .query_one(
1889 "SELECT COUNT(*) FROM bulk_submissions
1890 WHERE tenant_id = $1 AND status = 'in-progress'",
1891 &[&tenant.tenant_id().as_str()],
1892 )
1893 .await
1894 .map_err(|e| internal_error(format!("count active submissions: {e}")))?;
1895 let count: i64 = row.get(0);
1896 Ok(count.max(0) as u64)
1897 }
1898
1899 async fn ensure_transaction_time(
1900 &self,
1901 tenant: &TenantContext,
1902 id: &SubmissionId,
1903 ) -> StorageResult<DateTime<Utc>> {
1904 let client = self.get_client().await?;
1905 let tenant_id = tenant.tenant_id().as_str();
1906 let rows = client
1907 .query(
1908 "SELECT transaction_time FROM bulk_submissions
1909 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1910 &[&tenant_id, &id.submitter, &id.submission_id],
1911 )
1912 .await
1913 .map_err(|e| internal_error(format!("read transaction_time: {e}")))?;
1914 if let Some(row) = rows.first() {
1915 let existing: Option<DateTime<Utc>> = row.get(0);
1916 if let Some(dt) = existing {
1917 return Ok(dt);
1918 }
1919 }
1920 let now = Utc::now();
1921 client
1922 .execute(
1923 "UPDATE bulk_submissions SET transaction_time = $1
1924 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
1925 &[&now, &tenant_id, &id.submitter, &id.submission_id],
1926 )
1927 .await
1928 .map_err(|e| internal_error(format!("set transaction_time: {e}")))?;
1929 Ok(now)
1930 }
1931}