Skip to main content

helios_persistence/backends/postgres/
bulk_submit.rs

1//! Bulk submit implementation for PostgreSQL backend.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7use std::time::Duration as StdDuration;
8use tokio::io::{AsyncBufRead, AsyncBufReadExt};
9use uuid::Uuid;
10
11use crate::core::ResourceStorage;
12use crate::core::bulk_export::ExportJobId;
13use crate::core::bulk_export_worker::{LeaseError, WorkerId};
14use crate::core::bulk_submit::{
15    BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
16    BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
17    StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
18    SubmissionManifest, SubmissionStatus, SubmissionSummary,
19};
20use crate::core::bulk_submit_worker::{
21    ManifestLease, ManifestWorkerView, PollTokenTarget, SubmitClaimStrategy, SubmitFileRecord,
22    SubmitFileRow, SubmitWorkerStorage,
23};
24use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
25use crate::tenant::{TenantContext, TenantId, TenantPermissions};
26
27use super::PostgresBackend;
28
29fn internal_error(message: String) -> StorageError {
30    StorageError::Backend(BackendError::Internal {
31        backend_name: "postgres".to_string(),
32        message,
33        source: None,
34    })
35}
36
37/// Builds a `LeaseError::LeaseLost` for a submit manifest.
38fn lease_lost(lease: &ManifestLease) -> LeaseError {
39    LeaseError::LeaseLost {
40        job_id: ExportJobId::from_string(format!("{}/{}", lease.submission_id, lease.manifest_id)),
41    }
42}
43
44/// Derives the ingest FHIR version from a stored `outputFormat` MIME string.
45fn fhir_version_from_output_format(output_format: Option<&str>) -> FhirVersion {
46    output_format
47        .and_then(|fmt| {
48            fmt.split(';').find_map(|part| {
49                let part = part.trim();
50                part.strip_prefix("fhirVersion=")
51                    .and_then(FhirVersion::from_mime_param)
52            })
53        })
54        .unwrap_or_else(FhirVersion::default_enabled)
55}
56
57#[async_trait]
58impl BulkSubmitProvider for PostgresBackend {
59    async fn create_submission(
60        &self,
61        tenant: &TenantContext,
62        id: &SubmissionId,
63        metadata: Option<Value>,
64    ) -> StorageResult<SubmissionSummary> {
65        let client = self.get_client().await?;
66        let tenant_id = tenant.tenant_id().as_str();
67
68        // Check for duplicate
69        let rows = client
70            .query(
71                "SELECT 1 FROM bulk_submissions
72                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
73                &[
74                    &tenant_id,
75                    &id.submitter.as_str(),
76                    &id.submission_id.as_str(),
77                ],
78            )
79            .await
80            .map_err(|e| internal_error(format!("Failed to check duplicate: {}", e)))?;
81
82        if !rows.is_empty() {
83            return Err(StorageError::BulkSubmit(
84                BulkSubmitError::DuplicateSubmission {
85                    submitter: id.submitter.clone(),
86                    submission_id: id.submission_id.clone(),
87                },
88            ));
89        }
90
91        let now = Utc::now();
92        let metadata_json: Option<Value> = metadata.clone();
93
94        client
95            .execute(
96                "INSERT INTO bulk_submissions
97                 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
98                 VALUES ($1, $2, $3, 'in-progress', $4, $5, $6)",
99                &[
100                    &tenant_id,
101                    &id.submitter.as_str(),
102                    &id.submission_id.as_str(),
103                    &now,
104                    &now,
105                    &metadata_json,
106                ],
107            )
108            .await
109            .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
110
111        Ok(SubmissionSummary {
112            id: id.clone(),
113            status: SubmissionStatus::InProgress,
114            created_at: now,
115            updated_at: now,
116            completed_at: None,
117            manifest_count: 0,
118            total_entries: 0,
119            success_count: 0,
120            error_count: 0,
121            skipped_count: 0,
122            metadata,
123        })
124    }
125
126    async fn get_submission(
127        &self,
128        tenant: &TenantContext,
129        id: &SubmissionId,
130    ) -> StorageResult<Option<SubmissionSummary>> {
131        let client = self.get_client().await?;
132        let tenant_id = tenant.tenant_id().as_str();
133
134        let rows = client
135            .query(
136                "SELECT status, created_at, updated_at, completed_at, metadata
137                 FROM bulk_submissions
138                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
139                &[
140                    &tenant_id,
141                    &id.submitter.as_str(),
142                    &id.submission_id.as_str(),
143                ],
144            )
145            .await
146            .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
147
148        if rows.is_empty() {
149            return Ok(None);
150        }
151
152        let row = &rows[0];
153        let status_str: String = row.get(0);
154        let created_at: chrono::DateTime<Utc> = row.get(1);
155        let updated_at: chrono::DateTime<Utc> = row.get(2);
156        let completed_at: Option<chrono::DateTime<Utc>> = row.get(3);
157        let metadata: Option<Value> = row.get(4);
158
159        let status: SubmissionStatus = status_str
160            .parse()
161            .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
162
163        // Get manifest count
164        let manifest_row = client
165            .query_one(
166                "SELECT COUNT(*) FROM bulk_manifests
167                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
168                &[
169                    &tenant_id,
170                    &id.submitter.as_str(),
171                    &id.submission_id.as_str(),
172                ],
173            )
174            .await
175            .map_err(|e| internal_error(format!("Failed to count manifests: {}", e)))?;
176
177        let manifest_count: i64 = manifest_row.get(0);
178
179        // Get aggregated counts from entry results
180        let counts_row = client
181            .query_one(
182                "SELECT
183                    COUNT(*),
184                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
185                    SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
186                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
187                 FROM bulk_entry_results
188                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
189                &[&tenant_id, &id.submitter.as_str(), &id.submission_id.as_str()],
190            )
191            .await
192            .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
193
194        let total: i64 = counts_row.get(0);
195        let success: Option<i64> = counts_row.get(1);
196        let errors: Option<i64> = counts_row.get(2);
197        let skipped: Option<i64> = counts_row.get(3);
198
199        Ok(Some(SubmissionSummary {
200            id: id.clone(),
201            status,
202            created_at,
203            updated_at,
204            completed_at,
205            manifest_count: manifest_count as u32,
206            total_entries: total as u64,
207            success_count: success.unwrap_or(0) as u64,
208            error_count: errors.unwrap_or(0) as u64,
209            skipped_count: skipped.unwrap_or(0) as u64,
210            metadata,
211        }))
212    }
213
214    async fn list_submissions(
215        &self,
216        tenant: &TenantContext,
217        submitter: Option<&str>,
218        status: Option<SubmissionStatus>,
219        limit: u32,
220        offset: u32,
221    ) -> StorageResult<Vec<SubmissionSummary>> {
222        let client = self.get_client().await?;
223        let tenant_id = tenant.tenant_id().as_str();
224
225        let mut sql = "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = $1"
226            .to_string();
227        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
228            vec![Box::new(tenant_id.to_string())];
229        let mut param_idx = 2;
230
231        if let Some(submitter) = submitter {
232            sql.push_str(&format!(" AND submitter = ${}", param_idx));
233            params.push(Box::new(submitter.to_string()));
234            param_idx += 1;
235        }
236
237        if let Some(status) = status {
238            sql.push_str(&format!(" AND status = ${}", param_idx));
239            params.push(Box::new(status.to_string()));
240        }
241
242        sql.push_str(&format!(
243            " ORDER BY created_at DESC LIMIT {} OFFSET {}",
244            limit, offset
245        ));
246
247        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
248            .iter()
249            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
250            .collect();
251
252        let rows = client
253            .query(&sql, &param_refs)
254            .await
255            .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?;
256
257        let mut results = Vec::new();
258        for row in &rows {
259            let submitter: String = row.get(0);
260            let submission_id: String = row.get(1);
261            let sub_id = SubmissionId::new(submitter, submission_id);
262            if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
263                results.push(summary);
264            }
265        }
266
267        Ok(results)
268    }
269
270    async fn complete_submission(
271        &self,
272        tenant: &TenantContext,
273        id: &SubmissionId,
274    ) -> StorageResult<SubmissionSummary> {
275        let client = self.get_client().await?;
276        let tenant_id = tenant.tenant_id().as_str();
277
278        // Check current status
279        let rows = client
280            .query(
281                "SELECT status FROM bulk_submissions
282                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
283                &[
284                    &tenant_id,
285                    &id.submitter.as_str(),
286                    &id.submission_id.as_str(),
287                ],
288            )
289            .await
290            .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
291
292        if rows.is_empty() {
293            return Err(StorageError::BulkSubmit(
294                BulkSubmitError::SubmissionNotFound {
295                    submitter: id.submitter.clone(),
296                    submission_id: id.submission_id.clone(),
297                },
298            ));
299        }
300
301        let current_status: String = rows[0].get(0);
302        if current_status != "in-progress" {
303            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
304                submission_id: id.submission_id.clone(),
305            }));
306        }
307
308        let now = Utc::now();
309        client
310            .execute(
311                "UPDATE bulk_submissions SET status = 'complete', completed_at = $1, updated_at = $2
312                 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
313                &[
314                    &now,
315                    &now,
316                    &tenant_id,
317                    &id.submitter.as_str(),
318                    &id.submission_id.as_str(),
319                ],
320            )
321            .await
322            .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
323
324        self.get_submission(tenant, id)
325            .await?
326            .ok_or_else(|| internal_error("Submission disappeared".to_string()))
327    }
328
329    async fn abort_submission(
330        &self,
331        tenant: &TenantContext,
332        id: &SubmissionId,
333        _reason: &str,
334    ) -> StorageResult<u64> {
335        let client = self.get_client().await?;
336        let tenant_id = tenant.tenant_id().as_str();
337
338        // Check current status
339        let rows = client
340            .query(
341                "SELECT status FROM bulk_submissions
342                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
343                &[
344                    &tenant_id,
345                    &id.submitter.as_str(),
346                    &id.submission_id.as_str(),
347                ],
348            )
349            .await
350            .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
351
352        if rows.is_empty() {
353            return Err(StorageError::BulkSubmit(
354                BulkSubmitError::SubmissionNotFound {
355                    submitter: id.submitter.clone(),
356                    submission_id: id.submission_id.clone(),
357                },
358            ));
359        }
360
361        let current_status: String = rows[0].get(0);
362        if current_status != "in-progress" {
363            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
364                submission_id: id.submission_id.clone(),
365            }));
366        }
367
368        // Count pending manifests
369        let pending_row = client
370            .query_one(
371                "SELECT COUNT(*) FROM bulk_manifests
372                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
373                 AND status IN ('pending', 'processing')",
374                &[
375                    &tenant_id,
376                    &id.submitter.as_str(),
377                    &id.submission_id.as_str(),
378                ],
379            )
380            .await
381            .map_err(|e| internal_error(format!("Failed to count pending manifests: {}", e)))?;
382
383        let pending_count: i64 = pending_row.get(0);
384        let now = Utc::now();
385
386        // Update submission status
387        client
388            .execute(
389                "UPDATE bulk_submissions SET status = 'aborted', completed_at = $1, updated_at = $2
390                 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
391                &[
392                    &now,
393                    &now,
394                    &tenant_id,
395                    &id.submitter.as_str(),
396                    &id.submission_id.as_str(),
397                ],
398            )
399            .await
400            .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
401
402        // Update pending manifests to failed
403        client
404            .execute(
405                "UPDATE bulk_manifests SET status = 'failed'
406                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
407                 AND status IN ('pending', 'processing')",
408                &[
409                    &tenant_id,
410                    &id.submitter.as_str(),
411                    &id.submission_id.as_str(),
412                ],
413            )
414            .await
415            .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
416
417        Ok(pending_count as u64)
418    }
419
420    async fn add_manifest(
421        &self,
422        tenant: &TenantContext,
423        submission_id: &SubmissionId,
424        manifest_url: Option<&str>,
425        replaces_manifest_url: Option<&str>,
426    ) -> StorageResult<SubmissionManifest> {
427        let client = self.get_client().await?;
428        let tenant_id = tenant.tenant_id().as_str();
429
430        // Check submission exists and is in progress
431        let rows = client
432            .query(
433                "SELECT status FROM bulk_submissions
434                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
435                &[
436                    &tenant_id,
437                    &submission_id.submitter.as_str(),
438                    &submission_id.submission_id.as_str(),
439                ],
440            )
441            .await
442            .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
443
444        if rows.is_empty() {
445            return Err(StorageError::BulkSubmit(
446                BulkSubmitError::SubmissionNotFound {
447                    submitter: submission_id.submitter.clone(),
448                    submission_id: submission_id.submission_id.clone(),
449                },
450            ));
451        }
452
453        let status: String = rows[0].get(0);
454        if status != "in-progress" {
455            return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
456                submission_id: submission_id.submission_id.clone(),
457                expected: "in-progress".to_string(),
458                actual: status,
459            }));
460        }
461
462        let manifest_id = Uuid::new_v4().to_string();
463        let now = Utc::now();
464
465        client
466            .execute(
467                "INSERT INTO bulk_manifests
468                 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
469                 VALUES ($1, $2, $3, $4, $5, $6, 'pending', $7)",
470                &[
471                    &tenant_id,
472                    &submission_id.submitter.as_str(),
473                    &submission_id.submission_id.as_str(),
474                    &manifest_id.as_str(),
475                    &manifest_url,
476                    &replaces_manifest_url,
477                    &now,
478                ],
479            )
480            .await
481            .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
482
483        // Update submission updated_at
484        client
485            .execute(
486                "UPDATE bulk_submissions SET updated_at = $1
487                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
488                &[
489                    &now,
490                    &tenant_id,
491                    &submission_id.submitter.as_str(),
492                    &submission_id.submission_id.as_str(),
493                ],
494            )
495            .await
496            .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
497
498        Ok(SubmissionManifest {
499            manifest_id,
500            manifest_url: manifest_url.map(String::from),
501            replaces_manifest_url: replaces_manifest_url.map(String::from),
502            status: ManifestStatus::Pending,
503            added_at: now,
504            total_entries: 0,
505            processed_entries: 0,
506            failed_entries: 0,
507        })
508    }
509
510    async fn get_manifest(
511        &self,
512        tenant: &TenantContext,
513        submission_id: &SubmissionId,
514        manifest_id: &str,
515    ) -> StorageResult<Option<SubmissionManifest>> {
516        let client = self.get_client().await?;
517        let tenant_id = tenant.tenant_id().as_str();
518
519        let rows = client
520            .query(
521                "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
522                 FROM bulk_manifests
523                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
524                &[
525                    &tenant_id,
526                    &submission_id.submitter.as_str(),
527                    &submission_id.submission_id.as_str(),
528                    &manifest_id,
529                ],
530            )
531            .await
532            .map_err(|e| internal_error(format!("Failed to get manifest: {}", e)))?;
533
534        if rows.is_empty() {
535            return Ok(None);
536        }
537
538        let row = &rows[0];
539        let manifest_url: Option<String> = row.get(0);
540        let replaces_manifest_url: Option<String> = row.get(1);
541        let status_str: String = row.get(2);
542        let added_at: chrono::DateTime<Utc> = row.get(3);
543        let total: i32 = row.get(4);
544        let processed: i32 = row.get(5);
545        let failed: i32 = row.get(6);
546
547        let status: ManifestStatus = status_str
548            .parse()
549            .map_err(|_| internal_error(format!("Invalid manifest status: {}", status_str)))?;
550
551        Ok(Some(SubmissionManifest {
552            manifest_id: manifest_id.to_string(),
553            manifest_url,
554            replaces_manifest_url,
555            status,
556            added_at,
557            total_entries: total as u64,
558            processed_entries: processed as u64,
559            failed_entries: failed as u64,
560        }))
561    }
562
563    async fn list_manifests(
564        &self,
565        tenant: &TenantContext,
566        submission_id: &SubmissionId,
567    ) -> StorageResult<Vec<SubmissionManifest>> {
568        let client = self.get_client().await?;
569        let tenant_id = tenant.tenant_id().as_str();
570
571        let rows = client
572            .query(
573                "SELECT manifest_id FROM bulk_manifests
574                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
575                 ORDER BY added_at",
576                &[
577                    &tenant_id,
578                    &submission_id.submitter.as_str(),
579                    &submission_id.submission_id.as_str(),
580                ],
581            )
582            .await
583            .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?;
584
585        let mut results = Vec::new();
586        for row in &rows {
587            let manifest_id: String = row.get(0);
588            if let Some(manifest) = self
589                .get_manifest(tenant, submission_id, &manifest_id)
590                .await?
591            {
592                results.push(manifest);
593            }
594        }
595
596        Ok(results)
597    }
598
599    async fn process_entries(
600        &self,
601        tenant: &TenantContext,
602        submission_id: &SubmissionId,
603        manifest_id: &str,
604        entries: Vec<NdjsonEntry>,
605        options: &BulkProcessingOptions,
606    ) -> StorageResult<Vec<BulkEntryResult>> {
607        let client = self.get_client().await?;
608        let tenant_id = tenant.tenant_id().as_str();
609
610        // Verify manifest exists
611        if self
612            .get_manifest(tenant, submission_id, manifest_id)
613            .await?
614            .is_none()
615        {
616            return Err(StorageError::BulkSubmit(
617                BulkSubmitError::ManifestNotFound {
618                    submission_id: submission_id.submission_id.clone(),
619                    manifest_id: manifest_id.to_string(),
620                },
621            ));
622        }
623
624        // Update manifest status to processing
625        client
626            .execute(
627                "UPDATE bulk_manifests SET status = 'processing'
628                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
629                &[
630                    &tenant_id,
631                    &submission_id.submitter.as_str(),
632                    &submission_id.submission_id.as_str(),
633                    &manifest_id,
634                ],
635            )
636            .await
637            .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
638
639        let mut results = Vec::new();
640        let mut error_count = 0u32;
641
642        for entry in entries {
643            if options.max_errors > 0 && error_count >= options.max_errors {
644                if !options.continue_on_error {
645                    return Err(StorageError::BulkSubmit(
646                        BulkSubmitError::MaxErrorsExceeded {
647                            submission_id: submission_id.submission_id.clone(),
648                            max_errors: options.max_errors,
649                        },
650                    ));
651                }
652                let skip_result = BulkEntryResult::skipped(
653                    entry.line_number,
654                    &entry.resource_type,
655                    "max errors exceeded",
656                );
657                results.push(skip_result);
658                continue;
659            }
660
661            let result = self
662                .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
663                .await;
664
665            let entry_result = match result {
666                Ok(r) => r,
667                Err(e) => {
668                    error_count += 1;
669                    BulkEntryResult::processing_error(
670                        entry.line_number,
671                        &entry.resource_type,
672                        serde_json::json!({
673                            "resourceType": "OperationOutcome",
674                            "issue": [{
675                                "severity": "error",
676                                "code": "exception",
677                                "diagnostics": e.to_string()
678                            }]
679                        }),
680                    )
681                }
682            };
683
684            if entry_result.is_error() {
685                error_count += 1;
686            }
687
688            self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
689                .await?;
690
691            results.push(entry_result);
692        }
693
694        // Update manifest counts
695        let now = Utc::now();
696        client
697            .execute(
698                "UPDATE bulk_manifests SET
699                    total_entries = total_entries + $1,
700                    processed_entries = processed_entries + $2,
701                    failed_entries = failed_entries + $3
702                 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
703                &[
704                    &(results.len() as i32),
705                    &(results.iter().filter(|r| r.is_success()).count() as i32),
706                    &(error_count as i32),
707                    &tenant_id,
708                    &submission_id.submitter.as_str(),
709                    &submission_id.submission_id.as_str(),
710                    &manifest_id,
711                ],
712            )
713            .await
714            .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
715
716        // Update submission updated_at
717        client
718            .execute(
719                "UPDATE bulk_submissions SET updated_at = $1
720                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
721                &[
722                    &now,
723                    &tenant_id,
724                    &submission_id.submitter.as_str(),
725                    &submission_id.submission_id.as_str(),
726                ],
727            )
728            .await
729            .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
730
731        Ok(results)
732    }
733
734    async fn get_entry_results(
735        &self,
736        tenant: &TenantContext,
737        submission_id: &SubmissionId,
738        manifest_id: &str,
739        outcome_filter: Option<BulkEntryOutcome>,
740        limit: u32,
741        offset: u32,
742    ) -> StorageResult<Vec<BulkEntryResult>> {
743        let client = self.get_client().await?;
744        let tenant_id = tenant.tenant_id().as_str();
745
746        let mut sql =
747            "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
748             FROM bulk_entry_results
749             WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4"
750                .to_string();
751
752        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
753            Box::new(tenant_id.to_string()),
754            Box::new(submission_id.submitter.clone()),
755            Box::new(submission_id.submission_id.clone()),
756            Box::new(manifest_id.to_string()),
757        ];
758
759        if let Some(outcome) = outcome_filter {
760            sql.push_str(" AND outcome = $5");
761            params.push(Box::new(outcome.to_string()));
762        }
763
764        sql.push_str(&format!(
765            " ORDER BY line_number LIMIT {} OFFSET {}",
766            limit, offset
767        ));
768
769        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
770            .iter()
771            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
772            .collect();
773
774        let rows = client
775            .query(&sql, &param_refs)
776            .await
777            .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?;
778
779        let results: Vec<BulkEntryResult> = rows
780            .iter()
781            .map(|row| {
782                let line_number: i32 = row.get(0);
783                let resource_type: String = row.get(1);
784                let resource_id: Option<String> = row.get(2);
785                let created: Option<bool> = row.get(3);
786                let outcome_str: String = row.get(4);
787                let operation_outcome: Option<Value> = row.get(5);
788
789                let outcome: BulkEntryOutcome = outcome_str
790                    .parse()
791                    .unwrap_or(BulkEntryOutcome::ProcessingError);
792
793                BulkEntryResult {
794                    line_number: line_number as u64,
795                    resource_type,
796                    resource_id,
797                    created: created.unwrap_or(false),
798                    outcome,
799                    operation_outcome,
800                }
801            })
802            .collect();
803
804        Ok(results)
805    }
806
807    async fn get_entry_counts(
808        &self,
809        tenant: &TenantContext,
810        submission_id: &SubmissionId,
811        manifest_id: &str,
812    ) -> StorageResult<EntryCountSummary> {
813        let client = self.get_client().await?;
814        let tenant_id = tenant.tenant_id().as_str();
815
816        let row = client
817            .query_one(
818                "SELECT
819                    COUNT(*),
820                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
821                    SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
822                    SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
823                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
824                 FROM bulk_entry_results
825                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
826                &[
827                    &tenant_id,
828                    &submission_id.submitter.as_str(),
829                    &submission_id.submission_id.as_str(),
830                    &manifest_id,
831                ],
832            )
833            .await
834            .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
835
836        let total: i64 = row.get(0);
837        let success: Option<i64> = row.get(1);
838        let validation_error: Option<i64> = row.get(2);
839        let processing_error: Option<i64> = row.get(3);
840        let skipped: Option<i64> = row.get(4);
841
842        Ok(EntryCountSummary {
843            total: total as u64,
844            success: success.unwrap_or(0) as u64,
845            validation_error: validation_error.unwrap_or(0) as u64,
846            processing_error: processing_error.unwrap_or(0) as u64,
847            skipped: skipped.unwrap_or(0) as u64,
848        })
849    }
850}
851
852impl PostgresBackend {
853    /// Process a single NDJSON entry.
854    async fn process_single_entry(
855        &self,
856        tenant: &TenantContext,
857        submission_id: &SubmissionId,
858        manifest_id: &str,
859        entry: &NdjsonEntry,
860        options: &BulkProcessingOptions,
861    ) -> StorageResult<BulkEntryResult> {
862        let resource_id = entry.resource_id.as_ref();
863
864        if let Some(id) = resource_id {
865            let existing = self.read(tenant, &entry.resource_type, id).await;
866
867            match existing {
868                Ok(Some(current)) => {
869                    if !options.allow_updates {
870                        return Ok(BulkEntryResult::skipped(
871                            entry.line_number,
872                            &entry.resource_type,
873                            "updates not allowed",
874                        ));
875                    }
876
877                    let change = SubmissionChange::update(
878                        manifest_id,
879                        &entry.resource_type,
880                        id,
881                        current.version_id(),
882                        (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
883                        current.content().clone(),
884                    );
885                    self.record_change(tenant, submission_id, &change).await?;
886
887                    let updated = self
888                        .update(tenant, &current, entry.resource.clone())
889                        .await?;
890
891                    Ok(BulkEntryResult::success(
892                        entry.line_number,
893                        &entry.resource_type,
894                        updated.id(),
895                        false,
896                    ))
897                }
898                Ok(None)
899                | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
900                    let created = self
901                        .create(
902                            tenant,
903                            &entry.resource_type,
904                            entry.resource.clone(),
905                            FhirVersion::default_enabled(),
906                        )
907                        .await?;
908
909                    let change = SubmissionChange::create(
910                        manifest_id,
911                        &entry.resource_type,
912                        created.id(),
913                        created.version_id(),
914                    );
915                    self.record_change(tenant, submission_id, &change).await?;
916
917                    Ok(BulkEntryResult::success(
918                        entry.line_number,
919                        &entry.resource_type,
920                        created.id(),
921                        true,
922                    ))
923                }
924                Err(e) => Err(e),
925            }
926        } else {
927            let created = self
928                .create(
929                    tenant,
930                    &entry.resource_type,
931                    entry.resource.clone(),
932                    FhirVersion::default_enabled(),
933                )
934                .await?;
935
936            let change = SubmissionChange::create(
937                manifest_id,
938                &entry.resource_type,
939                created.id(),
940                created.version_id(),
941            );
942            self.record_change(tenant, submission_id, &change).await?;
943
944            Ok(BulkEntryResult::success(
945                entry.line_number,
946                &entry.resource_type,
947                created.id(),
948                true,
949            ))
950        }
951    }
952
953    /// Store an entry result in the database.
954    async fn store_entry_result(
955        &self,
956        tenant: &TenantContext,
957        submission_id: &SubmissionId,
958        manifest_id: &str,
959        result: &BulkEntryResult,
960    ) -> StorageResult<()> {
961        let client = self.get_client().await?;
962        let tenant_id = tenant.tenant_id().as_str();
963
964        let outcome_json: Option<Value> = result.operation_outcome.clone();
965
966        client
967            .execute(
968                "INSERT INTO bulk_entry_results
969                 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
970                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
971                &[
972                    &tenant_id,
973                    &submission_id.submitter.as_str(),
974                    &submission_id.submission_id.as_str(),
975                    &manifest_id,
976                    &(result.line_number as i32),
977                    &result.resource_type.as_str(),
978                    &result.resource_id,
979                    &result.created,
980                    &result.outcome.to_string().as_str(),
981                    &outcome_json,
982                ],
983            )
984            .await
985            .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
986
987        Ok(())
988    }
989}
990
991#[async_trait]
992impl StreamingBulkSubmitProvider for PostgresBackend {
993    async fn process_ndjson_stream(
994        &self,
995        tenant: &TenantContext,
996        submission_id: &SubmissionId,
997        manifest_id: &str,
998        resource_type: &str,
999        mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
1000        options: &BulkProcessingOptions,
1001    ) -> StorageResult<StreamProcessingResult> {
1002        let mut result = StreamProcessingResult::new();
1003        let mut line_number = 0u64;
1004        let mut batch = Vec::new();
1005
1006        loop {
1007            let mut line = String::new();
1008            let bytes_read = reader
1009                .read_line(&mut line)
1010                .await
1011                .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
1012
1013            if bytes_read == 0 {
1014                break;
1015            }
1016
1017            line_number += 1;
1018            result.lines_processed = line_number;
1019
1020            let line = line.trim();
1021            if line.is_empty() {
1022                continue;
1023            }
1024
1025            match NdjsonEntry::parse(line_number, line) {
1026                Ok(entry) => {
1027                    if entry.resource_type != resource_type {
1028                        let error_result = BulkEntryResult::validation_error(
1029                            line_number,
1030                            &entry.resource_type,
1031                            serde_json::json!({
1032                                "resourceType": "OperationOutcome",
1033                                "issue": [{
1034                                    "severity": "error",
1035                                    "code": "invalid",
1036                                    "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1037                                }]
1038                            }),
1039                        );
1040                        result.counts.increment(error_result.outcome);
1041
1042                        if !options.continue_on_error
1043                            && (options.max_errors == 0
1044                                || result.counts.error_count() >= options.max_errors as u64)
1045                        {
1046                            return Ok(result.aborted("max errors exceeded"));
1047                        }
1048                        continue;
1049                    }
1050
1051                    batch.push(entry);
1052                }
1053                Err(e) => {
1054                    result.counts.increment(BulkEntryOutcome::ValidationError);
1055
1056                    if !options.continue_on_error
1057                        && (options.max_errors == 0
1058                            || result.counts.error_count() >= options.max_errors as u64)
1059                    {
1060                        return Ok(result.aborted(format!("Parse error: {}", e)));
1061                    }
1062                }
1063            }
1064
1065            if batch.len() >= options.batch_size as usize {
1066                let batch_results = self
1067                    .process_entries(
1068                        tenant,
1069                        submission_id,
1070                        manifest_id,
1071                        std::mem::take(&mut batch),
1072                        options,
1073                    )
1074                    .await?;
1075
1076                for r in batch_results {
1077                    result.counts.increment(r.outcome);
1078                }
1079
1080                if !options.continue_on_error
1081                    && options.max_errors > 0
1082                    && result.counts.error_count() >= options.max_errors as u64
1083                {
1084                    return Ok(result.aborted("max errors exceeded"));
1085                }
1086            }
1087        }
1088
1089        // Process remaining entries
1090        if !batch.is_empty() {
1091            let batch_results = self
1092                .process_entries(tenant, submission_id, manifest_id, batch, options)
1093                .await?;
1094
1095            for r in batch_results {
1096                result.counts.increment(r.outcome);
1097            }
1098        }
1099
1100        Ok(result)
1101    }
1102}
1103
1104#[async_trait]
1105impl BulkSubmitRollbackProvider for PostgresBackend {
1106    async fn record_change(
1107        &self,
1108        tenant: &TenantContext,
1109        submission_id: &SubmissionId,
1110        change: &SubmissionChange,
1111    ) -> StorageResult<()> {
1112        let client = self.get_client().await?;
1113        let tenant_id = tenant.tenant_id().as_str();
1114
1115        let previous_content_json: Option<Value> = change.previous_content.clone();
1116
1117        client
1118            .execute(
1119                "INSERT INTO bulk_submission_changes
1120                 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1121                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1122                &[
1123                    &tenant_id,
1124                    &submission_id.submitter.as_str(),
1125                    &submission_id.submission_id.as_str(),
1126                    &change.change_id.as_str(),
1127                    &change.manifest_id.as_str(),
1128                    &change.change_type.to_string().as_str(),
1129                    &change.resource_type.as_str(),
1130                    &change.resource_id.as_str(),
1131                    &change.previous_version,
1132                    &change.new_version.as_str(),
1133                    &previous_content_json,
1134                    &change.changed_at,
1135                ],
1136            )
1137            .await
1138            .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1139
1140        Ok(())
1141    }
1142
1143    async fn list_changes(
1144        &self,
1145        tenant: &TenantContext,
1146        submission_id: &SubmissionId,
1147        limit: u32,
1148        offset: u32,
1149    ) -> StorageResult<Vec<SubmissionChange>> {
1150        let client = self.get_client().await?;
1151        let tenant_id = tenant.tenant_id().as_str();
1152
1153        let sql = format!(
1154            "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1155             FROM bulk_submission_changes
1156             WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1157             ORDER BY changed_at DESC
1158             LIMIT {} OFFSET {}",
1159            limit, offset
1160        );
1161
1162        let rows = client
1163            .query(
1164                &sql,
1165                &[
1166                    &tenant_id,
1167                    &submission_id.submitter.as_str(),
1168                    &submission_id.submission_id.as_str(),
1169                ],
1170            )
1171            .await
1172            .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?;
1173
1174        let changes: Vec<SubmissionChange> = rows
1175            .iter()
1176            .map(|row| {
1177                let change_id: String = row.get(0);
1178                let manifest_id: String = row.get(1);
1179                let change_type_str: String = row.get(2);
1180                let resource_type: String = row.get(3);
1181                let resource_id: String = row.get(4);
1182                let previous_version: Option<String> = row.get(5);
1183                let new_version: String = row.get(6);
1184                let previous_content: Option<Value> = row.get(7);
1185                let changed_at: chrono::DateTime<Utc> = row.get(8);
1186
1187                let change_type: ChangeType = change_type_str.parse().unwrap_or(ChangeType::Create);
1188
1189                SubmissionChange {
1190                    change_id,
1191                    manifest_id,
1192                    change_type,
1193                    resource_type,
1194                    resource_id,
1195                    previous_version,
1196                    new_version,
1197                    previous_content,
1198                    changed_at,
1199                }
1200            })
1201            .collect();
1202
1203        Ok(changes)
1204    }
1205
1206    async fn rollback_change(
1207        &self,
1208        tenant: &TenantContext,
1209        _submission_id: &SubmissionId,
1210        change: &SubmissionChange,
1211    ) -> StorageResult<bool> {
1212        match change.change_type {
1213            ChangeType::Create => {
1214                match self
1215                    .delete(tenant, &change.resource_type, &change.resource_id)
1216                    .await
1217                {
1218                    Ok(()) => Ok(true),
1219                    Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1220                        ..
1221                    })) => Ok(true),
1222                    Err(e) => Err(e),
1223                }
1224            }
1225            ChangeType::Update => {
1226                if let Some(ref previous_content) = change.previous_content {
1227                    let current = self
1228                        .read(tenant, &change.resource_type, &change.resource_id)
1229                        .await?;
1230                    if let Some(current) = current {
1231                        self.update(tenant, &current, previous_content.clone())
1232                            .await?;
1233                        Ok(true)
1234                    } else {
1235                        Ok(false)
1236                    }
1237                } else {
1238                    Ok(false)
1239                }
1240            }
1241        }
1242    }
1243}
1244
1245#[async_trait]
1246impl SubmitClaimStrategy for PostgresBackend {
1247    async fn claim_next_manifest(
1248        &self,
1249        worker_id: &WorkerId,
1250        lease_duration: StdDuration,
1251    ) -> StorageResult<Option<ManifestLease>> {
1252        let mut client = self.get_client().await?;
1253        let now = Utc::now();
1254        let lease_expiry = now
1255            + chrono::Duration::from_std(lease_duration)
1256                .unwrap_or_else(|_| chrono::Duration::seconds(60));
1257
1258        let txn = client
1259            .transaction()
1260            .await
1261            .map_err(|e| internal_error(format!("Failed to begin claim txn: {}", e)))?;
1262
1263        let rows = txn
1264            .query(
1265                "SELECT m.tenant_id, m.submitter, m.submission_id, m.manifest_id, m.fencing_token
1266                 FROM bulk_manifests m
1267                 JOIN bulk_submissions s
1268                   ON s.tenant_id = m.tenant_id AND s.submitter = m.submitter
1269                      AND s.submission_id = m.submission_id
1270                 WHERE m.manifest_url IS NOT NULL
1271                   AND s.status = 'in-progress'
1272                   AND (m.status = 'pending'
1273                        OR (m.status = 'processing'
1274                            AND (m.lease_expiry IS NULL OR m.lease_expiry < $1)))
1275                 ORDER BY m.added_at
1276                 LIMIT 1
1277                 FOR UPDATE OF m SKIP LOCKED",
1278                &[&now],
1279            )
1280            .await
1281            .map_err(|e| internal_error(format!("Failed to select claimable manifest: {}", e)))?;
1282
1283        let Some(row) = rows.first() else {
1284            txn.commit()
1285                .await
1286                .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
1287            return Ok(None);
1288        };
1289        let tenant_id: String = row.get(0);
1290        let submitter: String = row.get(1);
1291        let submission_id: String = row.get(2);
1292        let manifest_id: String = row.get(3);
1293        let fencing_token: i64 = row.get(4);
1294        let new_token = fencing_token + 1;
1295
1296        txn.execute(
1297            "UPDATE bulk_manifests
1298             SET status = 'processing', worker_id = $1, lease_expiry = $2, fencing_token = $3
1299             WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
1300            &[
1301                &worker_id.as_str(),
1302                &lease_expiry,
1303                &new_token,
1304                &tenant_id,
1305                &submitter,
1306                &submission_id,
1307                &manifest_id,
1308            ],
1309        )
1310        .await
1311        .map_err(|e| internal_error(format!("Failed to claim manifest: {}", e)))?;
1312
1313        txn.commit()
1314            .await
1315            .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
1316
1317        Ok(Some(ManifestLease {
1318            tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1319            submission_id: SubmissionId::new(submitter, submission_id),
1320            manifest_id,
1321            worker_id: worker_id.clone(),
1322            lease_expiry,
1323            fencing_token: new_token as u64,
1324        }))
1325    }
1326
1327    async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError> {
1328        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1329        let now = Utc::now();
1330        let new_expiry = now + chrono::Duration::seconds(60);
1331        let affected = client
1332            .execute(
1333                "UPDATE bulk_manifests SET lease_expiry = $1
1334                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4
1335                   AND manifest_id = $5 AND worker_id = $6 AND fencing_token = $7",
1336                &[
1337                    &new_expiry,
1338                    &lease.tenant.tenant_id().as_str(),
1339                    &lease.submission_id.submitter,
1340                    &lease.submission_id.submission_id,
1341                    &lease.manifest_id,
1342                    &lease.worker_id.as_str(),
1343                    &(lease.fencing_token as i64),
1344                ],
1345            )
1346            .await
1347            .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
1348        if affected == 0 {
1349            Err(lease_lost(lease))
1350        } else {
1351            Ok(new_expiry)
1352        }
1353    }
1354
1355    async fn release(&self, lease: ManifestLease) -> StorageResult<()> {
1356        let client = self.get_client().await?;
1357        client
1358            .execute(
1359                "UPDATE bulk_manifests
1360                 SET status = 'pending', worker_id = NULL, lease_expiry = NULL
1361                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1362                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6
1363                   AND status = 'processing'",
1364                &[
1365                    &lease.tenant.tenant_id().as_str(),
1366                    &lease.submission_id.submitter,
1367                    &lease.submission_id.submission_id,
1368                    &lease.manifest_id,
1369                    &lease.worker_id.as_str(),
1370                    &(lease.fencing_token as i64),
1371                ],
1372            )
1373            .await
1374            .map_err(|e| internal_error(format!("Failed to release manifest lease: {}", e)))?;
1375        Ok(())
1376    }
1377}
1378
1379#[async_trait]
1380impl SubmitWorkerStorage for PostgresBackend {
1381    async fn get_manifest_for_worker(
1382        &self,
1383        lease: &ManifestLease,
1384    ) -> Result<ManifestWorkerView, LeaseError> {
1385        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1386        let rows = client
1387            .query(
1388                "SELECT manifest_url, fhir_base_url, output_format, file_request_headers,
1389                        oauth_metadata_urls, file_encryption_key, last_processed_line
1390                 FROM bulk_manifests
1391                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1392                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1393                &[
1394                    &lease.tenant.tenant_id().as_str(),
1395                    &lease.submission_id.submitter,
1396                    &lease.submission_id.submission_id,
1397                    &lease.manifest_id,
1398                    &lease.worker_id.as_str(),
1399                    &(lease.fencing_token as i64),
1400                ],
1401            )
1402            .await
1403            .map_err(|e| LeaseError::Storage(internal_error(format!("load manifest: {e}"))))?;
1404        let row = rows.first().ok_or_else(|| lease_lost(lease))?;
1405
1406        let manifest_url: Option<String> = row.get(0);
1407        let fhir_base_url: Option<String> = row.get(1);
1408        let output_format: Option<String> = row.get(2);
1409        let headers_json: Option<String> = row.get(3);
1410        let oauth_json: Option<String> = row.get(4);
1411        let encryption_json: Option<String> = row.get(5);
1412        let last_processed_line: i64 = row.get(6);
1413
1414        let file_request_headers: Vec<(String, String)> = headers_json
1415            .as_deref()
1416            .and_then(|s| serde_json::from_str(s).ok())
1417            .unwrap_or_default();
1418        let oauth_metadata_urls: Vec<String> = oauth_json
1419            .as_deref()
1420            .and_then(|s| serde_json::from_str(s).ok())
1421            .unwrap_or_default();
1422        let file_encryption_key: Option<Value> = encryption_json
1423            .as_deref()
1424            .and_then(|s| serde_json::from_str(s).ok());
1425        let fhir_version = fhir_version_from_output_format(output_format.as_deref());
1426
1427        Ok(ManifestWorkerView {
1428            manifest_id: lease.manifest_id.clone(),
1429            manifest_url,
1430            fhir_base_url,
1431            output_format,
1432            file_request_headers,
1433            oauth_metadata_urls,
1434            file_encryption_key,
1435            last_processed_line: last_processed_line.max(0) as u64,
1436            fhir_version,
1437        })
1438    }
1439
1440    async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1441        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1442        let affected = client
1443            .execute(
1444                "UPDATE bulk_manifests SET status = 'processing'
1445                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1446                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1447                &[
1448                    &lease.tenant.tenant_id().as_str(),
1449                    &lease.submission_id.submitter,
1450                    &lease.submission_id.submission_id,
1451                    &lease.manifest_id,
1452                    &lease.worker_id.as_str(),
1453                    &(lease.fencing_token as i64),
1454                ],
1455            )
1456            .await
1457            .map_err(|e| LeaseError::Storage(internal_error(format!("mark processing: {e}"))))?;
1458        if affected == 0 {
1459            Err(lease_lost(lease))
1460        } else {
1461            Ok(())
1462        }
1463    }
1464
1465    async fn update_manifest_progress(
1466        &self,
1467        lease: &ManifestLease,
1468        processed_entries: u64,
1469        failed_entries: u64,
1470        last_processed_line: u64,
1471    ) -> Result<(), LeaseError> {
1472        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1473        let affected = client
1474            .execute(
1475                "UPDATE bulk_manifests
1476                 SET processed_entries = $1, failed_entries = $2, last_processed_line = $3
1477                 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6
1478                   AND manifest_id = $7 AND worker_id = $8 AND fencing_token = $9",
1479                &[
1480                    &(processed_entries as i32),
1481                    &(failed_entries as i32),
1482                    &(last_processed_line as i64),
1483                    &lease.tenant.tenant_id().as_str(),
1484                    &lease.submission_id.submitter,
1485                    &lease.submission_id.submission_id,
1486                    &lease.manifest_id,
1487                    &lease.worker_id.as_str(),
1488                    &(lease.fencing_token as i64),
1489                ],
1490            )
1491            .await
1492            .map_err(|e| LeaseError::Storage(internal_error(format!("update progress: {e}"))))?;
1493        if affected == 0 {
1494            Err(lease_lost(lease))
1495        } else {
1496            Ok(())
1497        }
1498    }
1499
1500    async fn record_submit_file(
1501        &self,
1502        lease: &ManifestLease,
1503        file: &SubmitFileRecord,
1504    ) -> Result<(), LeaseError> {
1505        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1506        // Fence: only record if we still hold the lease.
1507        let holds = client
1508            .query(
1509                "SELECT 1 FROM bulk_manifests
1510                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1511                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1512                &[
1513                    &lease.tenant.tenant_id().as_str(),
1514                    &lease.submission_id.submitter,
1515                    &lease.submission_id.submission_id,
1516                    &lease.manifest_id,
1517                    &lease.worker_id.as_str(),
1518                    &(lease.fencing_token as i64),
1519                ],
1520            )
1521            .await
1522            .map_err(|e| LeaseError::Storage(internal_error(format!("fence check: {e}"))))?;
1523        if holds.is_empty() {
1524            return Err(lease_lost(lease));
1525        }
1526
1527        let count_severity = file
1528            .count_severity
1529            .as_ref()
1530            .and_then(|v| serde_json::to_string(v).ok());
1531        client
1532            .execute(
1533                "INSERT INTO bulk_submit_files
1534                 (tenant_id, submitter, submission_id, manifest_url, file_type, resource_type,
1535                  part_index, fencing_token, file_path, line_count, byte_count, count_severity,
1536                  created_at)
1537                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
1538                &[
1539                    &lease.tenant.tenant_id().as_str(),
1540                    &lease.submission_id.submitter,
1541                    &lease.submission_id.submission_id,
1542                    &file.manifest_url,
1543                    &file.file_type,
1544                    &file.resource_type,
1545                    &(file.part_index as i32),
1546                    &(lease.fencing_token as i64),
1547                    &file.file_path,
1548                    &(file.line_count as i64),
1549                    &(file.byte_count as i64),
1550                    &count_severity,
1551                    &Utc::now(),
1552                ],
1553            )
1554            .await
1555            .map_err(|e| LeaseError::Storage(internal_error(format!("record submit file: {e}"))))?;
1556        Ok(())
1557    }
1558
1559    async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError> {
1560        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1561        let affected = client
1562            .execute(
1563                "UPDATE bulk_manifests SET status = 'completed', worker_id = NULL, lease_expiry = NULL
1564                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1565                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1566                &[
1567                    &lease.tenant.tenant_id().as_str(),
1568                    &lease.submission_id.submitter,
1569                    &lease.submission_id.submission_id,
1570                    &lease.manifest_id,
1571                    &lease.worker_id.as_str(),
1572                    &(lease.fencing_token as i64),
1573                ],
1574            )
1575            .await
1576            .map_err(|e| LeaseError::Storage(internal_error(format!("finish manifest: {e}"))))?;
1577        if affected == 0 {
1578            Err(lease_lost(lease))
1579        } else {
1580            Ok(())
1581        }
1582    }
1583
1584    async fn fail_manifest(
1585        &self,
1586        lease: &ManifestLease,
1587        _error_message: &str,
1588    ) -> Result<(), LeaseError> {
1589        let client = self.get_client().await.map_err(LeaseError::Storage)?;
1590        let affected = client
1591            .execute(
1592                "UPDATE bulk_manifests SET status = 'failed', worker_id = NULL, lease_expiry = NULL
1593                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1594                   AND manifest_id = $4 AND worker_id = $5 AND fencing_token = $6",
1595                &[
1596                    &lease.tenant.tenant_id().as_str(),
1597                    &lease.submission_id.submitter,
1598                    &lease.submission_id.submission_id,
1599                    &lease.manifest_id,
1600                    &lease.worker_id.as_str(),
1601                    &(lease.fencing_token as i64),
1602                ],
1603            )
1604            .await
1605            .map_err(|e| LeaseError::Storage(internal_error(format!("fail manifest: {e}"))))?;
1606        if affected == 0 {
1607            Err(lease_lost(lease))
1608        } else {
1609            Ok(())
1610        }
1611    }
1612
1613    async fn set_manifest_fetch_params(
1614        &self,
1615        tenant: &TenantContext,
1616        id: &SubmissionId,
1617        manifest_id: &str,
1618        fhir_base_url: Option<&str>,
1619        output_format: Option<&str>,
1620        file_request_headers: &[(String, String)],
1621        oauth_metadata_urls: &[String],
1622        file_encryption_key: Option<&Value>,
1623    ) -> StorageResult<()> {
1624        let client = self.get_client().await?;
1625        let fhir_base_url = fhir_base_url.map(|s| s.to_string());
1626        let output_format = output_format.map(|s| s.to_string());
1627        let headers_json = serde_json::to_string(file_request_headers).ok();
1628        let oauth_json = serde_json::to_string(oauth_metadata_urls).ok();
1629        let encryption_json = file_encryption_key.and_then(|v| serde_json::to_string(v).ok());
1630        client
1631            .execute(
1632                "UPDATE bulk_manifests
1633                 SET fhir_base_url = $1, output_format = $2, file_request_headers = $3,
1634                     oauth_metadata_urls = $4, file_encryption_key = $5
1635                 WHERE tenant_id = $6 AND submitter = $7 AND submission_id = $8 AND manifest_id = $9",
1636                &[
1637                    &fhir_base_url,
1638                    &output_format,
1639                    &headers_json,
1640                    &oauth_json,
1641                    &encryption_json,
1642                    &tenant.tenant_id().as_str(),
1643                    &id.submitter,
1644                    &id.submission_id,
1645                    &manifest_id,
1646                ],
1647            )
1648            .await
1649            .map_err(|e| internal_error(format!("set manifest fetch params: {e}")))?;
1650        Ok(())
1651    }
1652
1653    async fn replace_manifest_by_url(
1654        &self,
1655        tenant: &TenantContext,
1656        id: &SubmissionId,
1657        manifest_url: &str,
1658    ) -> StorageResult<Vec<String>> {
1659        let client = self.get_client().await?;
1660        let tenant_id = tenant.tenant_id().as_str();
1661        let rows = client
1662            .query(
1663                "SELECT manifest_id FROM bulk_manifests
1664                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1665                   AND manifest_url = $4 AND status != 'replaced'",
1666                &[&tenant_id, &id.submitter, &id.submission_id, &manifest_url],
1667            )
1668            .await
1669            .map_err(|e| internal_error(format!("replace lookup: {e}")))?;
1670        let ids: Vec<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
1671        client
1672            .execute(
1673                "UPDATE bulk_manifests SET status = 'replaced'
1674                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_url = $4",
1675                &[&tenant_id, &id.submitter, &id.submission_id, &manifest_url],
1676            )
1677            .await
1678            .map_err(|e| internal_error(format!("mark replaced: {e}")))?;
1679        Ok(ids)
1680    }
1681
1682    async fn set_submission_kickoff_meta(
1683        &self,
1684        tenant: &TenantContext,
1685        id: &SubmissionId,
1686        owner_subject: Option<&str>,
1687        request_url: &str,
1688        requires_access_token: bool,
1689    ) -> StorageResult<()> {
1690        let client = self.get_client().await?;
1691        let owner = owner_subject.map(|s| s.to_string());
1692        client
1693            .execute(
1694                "UPDATE bulk_submissions
1695                 SET owner_subject = $1, request_url = $2, requires_access_token = $3
1696                 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6",
1697                &[
1698                    &owner,
1699                    &request_url,
1700                    &requires_access_token,
1701                    &tenant.tenant_id().as_str(),
1702                    &id.submitter,
1703                    &id.submission_id,
1704                ],
1705            )
1706            .await
1707            .map_err(|e| internal_error(format!("set kickoff meta: {e}")))?;
1708        Ok(())
1709    }
1710
1711    async fn ensure_poll_token(
1712        &self,
1713        tenant: &TenantContext,
1714        id: &SubmissionId,
1715    ) -> StorageResult<String> {
1716        let client = self.get_client().await?;
1717        let tenant_id = tenant.tenant_id().as_str();
1718        let rows = client
1719            .query(
1720                "SELECT poll_token FROM bulk_submissions
1721                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1722                &[&tenant_id, &id.submitter, &id.submission_id],
1723            )
1724            .await
1725            .map_err(|e| internal_error(format!("read poll token: {e}")))?;
1726        if let Some(row) = rows.first() {
1727            let existing: Option<String> = row.get(0);
1728            if let Some(token) = existing {
1729                return Ok(token);
1730            }
1731        }
1732        let token = Uuid::new_v4().to_string();
1733        client
1734            .execute(
1735                "UPDATE bulk_submissions SET poll_token = $1
1736                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
1737                &[&token, &tenant_id, &id.submitter, &id.submission_id],
1738            )
1739            .await
1740            .map_err(|e| internal_error(format!("set poll token: {e}")))?;
1741        Ok(token)
1742    }
1743
1744    async fn list_expired_submissions(
1745        &self,
1746        now: DateTime<Utc>,
1747        ttl: StdDuration,
1748        limit: u32,
1749    ) -> StorageResult<Vec<(TenantContext, SubmissionId)>> {
1750        let client = self.get_client().await?;
1751        let cutoff = now
1752            - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::seconds(86400));
1753        let rows = client
1754            .query(
1755                "SELECT tenant_id, submitter, submission_id FROM bulk_submissions
1756                 WHERE updated_at < $1 ORDER BY updated_at LIMIT $2",
1757                &[&cutoff, &(limit as i64)],
1758            )
1759            .await
1760            .map_err(|e| internal_error(format!("list expired: {e}")))?;
1761        Ok(rows
1762            .iter()
1763            .map(|r| {
1764                let tenant_id: String = r.get(0);
1765                let submitter: String = r.get(1);
1766                let submission_id: String = r.get(2);
1767                (
1768                    TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
1769                    SubmissionId::new(submitter, submission_id),
1770                )
1771            })
1772            .collect())
1773    }
1774
1775    async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>> {
1776        let client = self.get_client().await?;
1777        let rows = client
1778            .query(
1779                "SELECT tenant_id, submitter, submission_id, owner_subject
1780                 FROM bulk_submissions WHERE poll_token = $1",
1781                &[&token],
1782            )
1783            .await
1784            .map_err(|e| internal_error(format!("resolve poll token: {e}")))?;
1785        Ok(rows.first().map(|row| {
1786            let tenant_id: String = row.get(0);
1787            let submitter: String = row.get(1);
1788            let submission_id: String = row.get(2);
1789            let owner_subject: Option<String> = row.get(3);
1790            PollTokenTarget {
1791                tenant: TenantContext::new(
1792                    TenantId::new(tenant_id),
1793                    TenantPermissions::full_access(),
1794                ),
1795                submission_id: SubmissionId::new(submitter, submission_id),
1796                owner_subject,
1797            }
1798        }))
1799    }
1800
1801    async fn clear_poll_token(
1802        &self,
1803        tenant: &TenantContext,
1804        id: &SubmissionId,
1805    ) -> StorageResult<()> {
1806        let client = self.get_client().await?;
1807        client
1808            .execute(
1809                "UPDATE bulk_submissions SET poll_token = NULL
1810                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1811                &[
1812                    &tenant.tenant_id().as_str(),
1813                    &id.submitter,
1814                    &id.submission_id,
1815                ],
1816            )
1817            .await
1818            .map_err(|e| internal_error(format!("clear poll token: {e}")))?;
1819        Ok(())
1820    }
1821
1822    async fn list_submit_files(
1823        &self,
1824        tenant: &TenantContext,
1825        id: &SubmissionId,
1826    ) -> StorageResult<Vec<SubmitFileRow>> {
1827        let client = self.get_client().await?;
1828        let rows = client
1829            .query(
1830                "SELECT manifest_url, file_type, resource_type, part_index, fencing_token,
1831                        file_path, line_count, byte_count, count_severity
1832                 FROM bulk_submit_files
1833                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1834                 ORDER BY id",
1835                &[
1836                    &tenant.tenant_id().as_str(),
1837                    &id.submitter,
1838                    &id.submission_id,
1839                ],
1840            )
1841            .await
1842            .map_err(|e| internal_error(format!("list submit files: {e}")))?;
1843        Ok(rows
1844            .iter()
1845            .map(|row| {
1846                let count_severity: Option<String> = row.get(8);
1847                SubmitFileRow {
1848                    manifest_url: row.get(0),
1849                    file_type: row.get(1),
1850                    resource_type: row.get(2),
1851                    part_index: row.get::<_, i32>(3) as u32,
1852                    fencing_token: row.get::<_, i64>(4) as u64,
1853                    file_path: row.get(5),
1854                    line_count: row.get::<_, i64>(6) as u64,
1855                    byte_count: row.get::<_, i64>(7) as u64,
1856                    count_severity: count_severity
1857                        .as_deref()
1858                        .and_then(|s| serde_json::from_str(s).ok()),
1859                }
1860            })
1861            .collect())
1862    }
1863
1864    async fn delete_submission_artifacts(
1865        &self,
1866        tenant: &TenantContext,
1867        id: &SubmissionId,
1868    ) -> StorageResult<()> {
1869        let client = self.get_client().await?;
1870        client
1871            .execute(
1872                "DELETE FROM bulk_submit_files
1873                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1874                &[
1875                    &tenant.tenant_id().as_str(),
1876                    &id.submitter,
1877                    &id.submission_id,
1878                ],
1879            )
1880            .await
1881            .map_err(|e| internal_error(format!("delete artifacts: {e}")))?;
1882        Ok(())
1883    }
1884
1885    async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64> {
1886        let client = self.get_client().await?;
1887        let row = client
1888            .query_one(
1889                "SELECT COUNT(*) FROM bulk_submissions
1890                 WHERE tenant_id = $1 AND status = 'in-progress'",
1891                &[&tenant.tenant_id().as_str()],
1892            )
1893            .await
1894            .map_err(|e| internal_error(format!("count active submissions: {e}")))?;
1895        let count: i64 = row.get(0);
1896        Ok(count.max(0) as u64)
1897    }
1898
1899    async fn ensure_transaction_time(
1900        &self,
1901        tenant: &TenantContext,
1902        id: &SubmissionId,
1903    ) -> StorageResult<DateTime<Utc>> {
1904        let client = self.get_client().await?;
1905        let tenant_id = tenant.tenant_id().as_str();
1906        let rows = client
1907            .query(
1908                "SELECT transaction_time FROM bulk_submissions
1909                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
1910                &[&tenant_id, &id.submitter, &id.submission_id],
1911            )
1912            .await
1913            .map_err(|e| internal_error(format!("read transaction_time: {e}")))?;
1914        if let Some(row) = rows.first() {
1915            let existing: Option<DateTime<Utc>> = row.get(0);
1916            if let Some(dt) = existing {
1917                return Ok(dt);
1918            }
1919        }
1920        let now = Utc::now();
1921        client
1922            .execute(
1923                "UPDATE bulk_submissions SET transaction_time = $1
1924                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
1925                &[&now, &tenant_id, &id.submitter, &id.submission_id],
1926            )
1927            .await
1928            .map_err(|e| internal_error(format!("set transaction_time: {e}")))?;
1929        Ok(now)
1930    }
1931}