Skip to main content

helios_persistence/backends/postgres/
bulk_submit.rs

1//! Bulk submit implementation for PostgreSQL backend.
2
3use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt};
8use uuid::Uuid;
9
10use crate::core::ResourceStorage;
11use crate::core::bulk_submit::{
12    BulkEntryOutcome, BulkEntryResult, BulkProcessingOptions, BulkSubmitProvider,
13    BulkSubmitRollbackProvider, ChangeType, EntryCountSummary, ManifestStatus, NdjsonEntry,
14    StreamProcessingResult, StreamingBulkSubmitProvider, SubmissionChange, SubmissionId,
15    SubmissionManifest, SubmissionStatus, SubmissionSummary,
16};
17use crate::error::{BackendError, BulkSubmitError, StorageError, StorageResult};
18use crate::tenant::TenantContext;
19
20use super::PostgresBackend;
21
22fn internal_error(message: String) -> StorageError {
23    StorageError::Backend(BackendError::Internal {
24        backend_name: "postgres".to_string(),
25        message,
26        source: None,
27    })
28}
29
30#[async_trait]
31impl BulkSubmitProvider for PostgresBackend {
32    async fn create_submission(
33        &self,
34        tenant: &TenantContext,
35        id: &SubmissionId,
36        metadata: Option<Value>,
37    ) -> StorageResult<SubmissionSummary> {
38        let client = self.get_client().await?;
39        let tenant_id = tenant.tenant_id().as_str();
40
41        // Check for duplicate
42        let rows = client
43            .query(
44                "SELECT 1 FROM bulk_submissions
45                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
46                &[
47                    &tenant_id,
48                    &id.submitter.as_str(),
49                    &id.submission_id.as_str(),
50                ],
51            )
52            .await
53            .map_err(|e| internal_error(format!("Failed to check duplicate: {}", e)))?;
54
55        if !rows.is_empty() {
56            return Err(StorageError::BulkSubmit(
57                BulkSubmitError::DuplicateSubmission {
58                    submitter: id.submitter.clone(),
59                    submission_id: id.submission_id.clone(),
60                },
61            ));
62        }
63
64        let now = Utc::now();
65        let metadata_json: Option<Value> = metadata.clone();
66
67        client
68            .execute(
69                "INSERT INTO bulk_submissions
70                 (tenant_id, submitter, submission_id, status, created_at, updated_at, metadata)
71                 VALUES ($1, $2, $3, 'in-progress', $4, $5, $6)",
72                &[
73                    &tenant_id,
74                    &id.submitter.as_str(),
75                    &id.submission_id.as_str(),
76                    &now,
77                    &now,
78                    &metadata_json,
79                ],
80            )
81            .await
82            .map_err(|e| internal_error(format!("Failed to create submission: {}", e)))?;
83
84        Ok(SubmissionSummary {
85            id: id.clone(),
86            status: SubmissionStatus::InProgress,
87            created_at: now,
88            updated_at: now,
89            completed_at: None,
90            manifest_count: 0,
91            total_entries: 0,
92            success_count: 0,
93            error_count: 0,
94            skipped_count: 0,
95            metadata,
96        })
97    }
98
99    async fn get_submission(
100        &self,
101        tenant: &TenantContext,
102        id: &SubmissionId,
103    ) -> StorageResult<Option<SubmissionSummary>> {
104        let client = self.get_client().await?;
105        let tenant_id = tenant.tenant_id().as_str();
106
107        let rows = client
108            .query(
109                "SELECT status, created_at, updated_at, completed_at, metadata
110                 FROM bulk_submissions
111                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
112                &[
113                    &tenant_id,
114                    &id.submitter.as_str(),
115                    &id.submission_id.as_str(),
116                ],
117            )
118            .await
119            .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
120
121        if rows.is_empty() {
122            return Ok(None);
123        }
124
125        let row = &rows[0];
126        let status_str: String = row.get(0);
127        let created_at: chrono::DateTime<Utc> = row.get(1);
128        let updated_at: chrono::DateTime<Utc> = row.get(2);
129        let completed_at: Option<chrono::DateTime<Utc>> = row.get(3);
130        let metadata: Option<Value> = row.get(4);
131
132        let status: SubmissionStatus = status_str
133            .parse()
134            .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
135
136        // Get manifest count
137        let manifest_row = client
138            .query_one(
139                "SELECT COUNT(*) FROM bulk_manifests
140                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
141                &[
142                    &tenant_id,
143                    &id.submitter.as_str(),
144                    &id.submission_id.as_str(),
145                ],
146            )
147            .await
148            .map_err(|e| internal_error(format!("Failed to count manifests: {}", e)))?;
149
150        let manifest_count: i64 = manifest_row.get(0);
151
152        // Get aggregated counts from entry results
153        let counts_row = client
154            .query_one(
155                "SELECT
156                    COUNT(*),
157                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
158                    SUM(CASE WHEN outcome IN ('validation-error', 'processing-error') THEN 1 ELSE 0 END),
159                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
160                 FROM bulk_entry_results
161                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
162                &[&tenant_id, &id.submitter.as_str(), &id.submission_id.as_str()],
163            )
164            .await
165            .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
166
167        let total: i64 = counts_row.get(0);
168        let success: Option<i64> = counts_row.get(1);
169        let errors: Option<i64> = counts_row.get(2);
170        let skipped: Option<i64> = counts_row.get(3);
171
172        Ok(Some(SubmissionSummary {
173            id: id.clone(),
174            status,
175            created_at,
176            updated_at,
177            completed_at,
178            manifest_count: manifest_count as u32,
179            total_entries: total as u64,
180            success_count: success.unwrap_or(0) as u64,
181            error_count: errors.unwrap_or(0) as u64,
182            skipped_count: skipped.unwrap_or(0) as u64,
183            metadata,
184        }))
185    }
186
187    async fn list_submissions(
188        &self,
189        tenant: &TenantContext,
190        submitter: Option<&str>,
191        status: Option<SubmissionStatus>,
192        limit: u32,
193        offset: u32,
194    ) -> StorageResult<Vec<SubmissionSummary>> {
195        let client = self.get_client().await?;
196        let tenant_id = tenant.tenant_id().as_str();
197
198        let mut sql = "SELECT submitter, submission_id FROM bulk_submissions WHERE tenant_id = $1"
199            .to_string();
200        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
201            vec![Box::new(tenant_id.to_string())];
202        let mut param_idx = 2;
203
204        if let Some(submitter) = submitter {
205            sql.push_str(&format!(" AND submitter = ${}", param_idx));
206            params.push(Box::new(submitter.to_string()));
207            param_idx += 1;
208        }
209
210        if let Some(status) = status {
211            sql.push_str(&format!(" AND status = ${}", param_idx));
212            params.push(Box::new(status.to_string()));
213        }
214
215        sql.push_str(&format!(
216            " ORDER BY created_at DESC LIMIT {} OFFSET {}",
217            limit, offset
218        ));
219
220        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
221            .iter()
222            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
223            .collect();
224
225        let rows = client
226            .query(&sql, &param_refs)
227            .await
228            .map_err(|e| internal_error(format!("Failed to query submissions: {}", e)))?;
229
230        let mut results = Vec::new();
231        for row in &rows {
232            let submitter: String = row.get(0);
233            let submission_id: String = row.get(1);
234            let sub_id = SubmissionId::new(submitter, submission_id);
235            if let Some(summary) = self.get_submission(tenant, &sub_id).await? {
236                results.push(summary);
237            }
238        }
239
240        Ok(results)
241    }
242
243    async fn complete_submission(
244        &self,
245        tenant: &TenantContext,
246        id: &SubmissionId,
247    ) -> StorageResult<SubmissionSummary> {
248        let client = self.get_client().await?;
249        let tenant_id = tenant.tenant_id().as_str();
250
251        // Check current status
252        let rows = client
253            .query(
254                "SELECT status FROM bulk_submissions
255                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
256                &[
257                    &tenant_id,
258                    &id.submitter.as_str(),
259                    &id.submission_id.as_str(),
260                ],
261            )
262            .await
263            .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
264
265        if rows.is_empty() {
266            return Err(StorageError::BulkSubmit(
267                BulkSubmitError::SubmissionNotFound {
268                    submitter: id.submitter.clone(),
269                    submission_id: id.submission_id.clone(),
270                },
271            ));
272        }
273
274        let current_status: String = rows[0].get(0);
275        if current_status != "in-progress" {
276            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
277                submission_id: id.submission_id.clone(),
278            }));
279        }
280
281        let now = Utc::now();
282        client
283            .execute(
284                "UPDATE bulk_submissions SET status = 'complete', completed_at = $1, updated_at = $2
285                 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
286                &[
287                    &now,
288                    &now,
289                    &tenant_id,
290                    &id.submitter.as_str(),
291                    &id.submission_id.as_str(),
292                ],
293            )
294            .await
295            .map_err(|e| internal_error(format!("Failed to complete submission: {}", e)))?;
296
297        self.get_submission(tenant, id)
298            .await?
299            .ok_or_else(|| internal_error("Submission disappeared".to_string()))
300    }
301
302    async fn abort_submission(
303        &self,
304        tenant: &TenantContext,
305        id: &SubmissionId,
306        _reason: &str,
307    ) -> StorageResult<u64> {
308        let client = self.get_client().await?;
309        let tenant_id = tenant.tenant_id().as_str();
310
311        // Check current status
312        let rows = client
313            .query(
314                "SELECT status FROM bulk_submissions
315                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
316                &[
317                    &tenant_id,
318                    &id.submitter.as_str(),
319                    &id.submission_id.as_str(),
320                ],
321            )
322            .await
323            .map_err(|e| internal_error(format!("Failed to get submission status: {}", e)))?;
324
325        if rows.is_empty() {
326            return Err(StorageError::BulkSubmit(
327                BulkSubmitError::SubmissionNotFound {
328                    submitter: id.submitter.clone(),
329                    submission_id: id.submission_id.clone(),
330                },
331            ));
332        }
333
334        let current_status: String = rows[0].get(0);
335        if current_status != "in-progress" {
336            return Err(StorageError::BulkSubmit(BulkSubmitError::AlreadyComplete {
337                submission_id: id.submission_id.clone(),
338            }));
339        }
340
341        // Count pending manifests
342        let pending_row = client
343            .query_one(
344                "SELECT COUNT(*) FROM bulk_manifests
345                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
346                 AND status IN ('pending', 'processing')",
347                &[
348                    &tenant_id,
349                    &id.submitter.as_str(),
350                    &id.submission_id.as_str(),
351                ],
352            )
353            .await
354            .map_err(|e| internal_error(format!("Failed to count pending manifests: {}", e)))?;
355
356        let pending_count: i64 = pending_row.get(0);
357        let now = Utc::now();
358
359        // Update submission status
360        client
361            .execute(
362                "UPDATE bulk_submissions SET status = 'aborted', completed_at = $1, updated_at = $2
363                 WHERE tenant_id = $3 AND submitter = $4 AND submission_id = $5",
364                &[
365                    &now,
366                    &now,
367                    &tenant_id,
368                    &id.submitter.as_str(),
369                    &id.submission_id.as_str(),
370                ],
371            )
372            .await
373            .map_err(|e| internal_error(format!("Failed to abort submission: {}", e)))?;
374
375        // Update pending manifests to failed
376        client
377            .execute(
378                "UPDATE bulk_manifests SET status = 'failed'
379                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
380                 AND status IN ('pending', 'processing')",
381                &[
382                    &tenant_id,
383                    &id.submitter.as_str(),
384                    &id.submission_id.as_str(),
385                ],
386            )
387            .await
388            .map_err(|e| internal_error(format!("Failed to update manifests: {}", e)))?;
389
390        Ok(pending_count as u64)
391    }
392
393    async fn add_manifest(
394        &self,
395        tenant: &TenantContext,
396        submission_id: &SubmissionId,
397        manifest_url: Option<&str>,
398        replaces_manifest_url: Option<&str>,
399    ) -> StorageResult<SubmissionManifest> {
400        let client = self.get_client().await?;
401        let tenant_id = tenant.tenant_id().as_str();
402
403        // Check submission exists and is in progress
404        let rows = client
405            .query(
406                "SELECT status FROM bulk_submissions
407                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3",
408                &[
409                    &tenant_id,
410                    &submission_id.submitter.as_str(),
411                    &submission_id.submission_id.as_str(),
412                ],
413            )
414            .await
415            .map_err(|e| internal_error(format!("Failed to get submission: {}", e)))?;
416
417        if rows.is_empty() {
418            return Err(StorageError::BulkSubmit(
419                BulkSubmitError::SubmissionNotFound {
420                    submitter: submission_id.submitter.clone(),
421                    submission_id: submission_id.submission_id.clone(),
422                },
423            ));
424        }
425
426        let status: String = rows[0].get(0);
427        if status != "in-progress" {
428            return Err(StorageError::BulkSubmit(BulkSubmitError::InvalidState {
429                submission_id: submission_id.submission_id.clone(),
430                expected: "in-progress".to_string(),
431                actual: status,
432            }));
433        }
434
435        let manifest_id = Uuid::new_v4().to_string();
436        let now = Utc::now();
437
438        client
439            .execute(
440                "INSERT INTO bulk_manifests
441                 (tenant_id, submitter, submission_id, manifest_id, manifest_url, replaces_manifest_url, status, added_at)
442                 VALUES ($1, $2, $3, $4, $5, $6, 'pending', $7)",
443                &[
444                    &tenant_id,
445                    &submission_id.submitter.as_str(),
446                    &submission_id.submission_id.as_str(),
447                    &manifest_id.as_str(),
448                    &manifest_url,
449                    &replaces_manifest_url,
450                    &now,
451                ],
452            )
453            .await
454            .map_err(|e| internal_error(format!("Failed to add manifest: {}", e)))?;
455
456        // Update submission updated_at
457        client
458            .execute(
459                "UPDATE bulk_submissions SET updated_at = $1
460                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
461                &[
462                    &now,
463                    &tenant_id,
464                    &submission_id.submitter.as_str(),
465                    &submission_id.submission_id.as_str(),
466                ],
467            )
468            .await
469            .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
470
471        Ok(SubmissionManifest {
472            manifest_id,
473            manifest_url: manifest_url.map(String::from),
474            replaces_manifest_url: replaces_manifest_url.map(String::from),
475            status: ManifestStatus::Pending,
476            added_at: now,
477            total_entries: 0,
478            processed_entries: 0,
479            failed_entries: 0,
480        })
481    }
482
483    async fn get_manifest(
484        &self,
485        tenant: &TenantContext,
486        submission_id: &SubmissionId,
487        manifest_id: &str,
488    ) -> StorageResult<Option<SubmissionManifest>> {
489        let client = self.get_client().await?;
490        let tenant_id = tenant.tenant_id().as_str();
491
492        let rows = client
493            .query(
494                "SELECT manifest_url, replaces_manifest_url, status, added_at, total_entries, processed_entries, failed_entries
495                 FROM bulk_manifests
496                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
497                &[
498                    &tenant_id,
499                    &submission_id.submitter.as_str(),
500                    &submission_id.submission_id.as_str(),
501                    &manifest_id,
502                ],
503            )
504            .await
505            .map_err(|e| internal_error(format!("Failed to get manifest: {}", e)))?;
506
507        if rows.is_empty() {
508            return Ok(None);
509        }
510
511        let row = &rows[0];
512        let manifest_url: Option<String> = row.get(0);
513        let replaces_manifest_url: Option<String> = row.get(1);
514        let status_str: String = row.get(2);
515        let added_at: chrono::DateTime<Utc> = row.get(3);
516        let total: i64 = row.get(4);
517        let processed: i64 = row.get(5);
518        let failed: i64 = row.get(6);
519
520        let status: ManifestStatus = status_str
521            .parse()
522            .map_err(|_| internal_error(format!("Invalid manifest status: {}", status_str)))?;
523
524        Ok(Some(SubmissionManifest {
525            manifest_id: manifest_id.to_string(),
526            manifest_url,
527            replaces_manifest_url,
528            status,
529            added_at,
530            total_entries: total as u64,
531            processed_entries: processed as u64,
532            failed_entries: failed as u64,
533        }))
534    }
535
536    async fn list_manifests(
537        &self,
538        tenant: &TenantContext,
539        submission_id: &SubmissionId,
540    ) -> StorageResult<Vec<SubmissionManifest>> {
541        let client = self.get_client().await?;
542        let tenant_id = tenant.tenant_id().as_str();
543
544        let rows = client
545            .query(
546                "SELECT manifest_id FROM bulk_manifests
547                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
548                 ORDER BY added_at",
549                &[
550                    &tenant_id,
551                    &submission_id.submitter.as_str(),
552                    &submission_id.submission_id.as_str(),
553                ],
554            )
555            .await
556            .map_err(|e| internal_error(format!("Failed to query manifests: {}", e)))?;
557
558        let mut results = Vec::new();
559        for row in &rows {
560            let manifest_id: String = row.get(0);
561            if let Some(manifest) = self
562                .get_manifest(tenant, submission_id, &manifest_id)
563                .await?
564            {
565                results.push(manifest);
566            }
567        }
568
569        Ok(results)
570    }
571
572    async fn process_entries(
573        &self,
574        tenant: &TenantContext,
575        submission_id: &SubmissionId,
576        manifest_id: &str,
577        entries: Vec<NdjsonEntry>,
578        options: &BulkProcessingOptions,
579    ) -> StorageResult<Vec<BulkEntryResult>> {
580        let client = self.get_client().await?;
581        let tenant_id = tenant.tenant_id().as_str();
582
583        // Verify manifest exists
584        if self
585            .get_manifest(tenant, submission_id, manifest_id)
586            .await?
587            .is_none()
588        {
589            return Err(StorageError::BulkSubmit(
590                BulkSubmitError::ManifestNotFound {
591                    submission_id: submission_id.submission_id.clone(),
592                    manifest_id: manifest_id.to_string(),
593                },
594            ));
595        }
596
597        // Update manifest status to processing
598        client
599            .execute(
600                "UPDATE bulk_manifests SET status = 'processing'
601                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
602                &[
603                    &tenant_id,
604                    &submission_id.submitter.as_str(),
605                    &submission_id.submission_id.as_str(),
606                    &manifest_id,
607                ],
608            )
609            .await
610            .map_err(|e| internal_error(format!("Failed to update manifest status: {}", e)))?;
611
612        let mut results = Vec::new();
613        let mut error_count = 0u32;
614
615        for entry in entries {
616            if options.max_errors > 0 && error_count >= options.max_errors {
617                if !options.continue_on_error {
618                    return Err(StorageError::BulkSubmit(
619                        BulkSubmitError::MaxErrorsExceeded {
620                            submission_id: submission_id.submission_id.clone(),
621                            max_errors: options.max_errors,
622                        },
623                    ));
624                }
625                let skip_result = BulkEntryResult::skipped(
626                    entry.line_number,
627                    &entry.resource_type,
628                    "max errors exceeded",
629                );
630                results.push(skip_result);
631                continue;
632            }
633
634            let result = self
635                .process_single_entry(tenant, submission_id, manifest_id, &entry, options)
636                .await;
637
638            let entry_result = match result {
639                Ok(r) => r,
640                Err(e) => {
641                    error_count += 1;
642                    BulkEntryResult::processing_error(
643                        entry.line_number,
644                        &entry.resource_type,
645                        serde_json::json!({
646                            "resourceType": "OperationOutcome",
647                            "issue": [{
648                                "severity": "error",
649                                "code": "exception",
650                                "diagnostics": e.to_string()
651                            }]
652                        }),
653                    )
654                }
655            };
656
657            if entry_result.is_error() {
658                error_count += 1;
659            }
660
661            self.store_entry_result(tenant, submission_id, manifest_id, &entry_result)
662                .await?;
663
664            results.push(entry_result);
665        }
666
667        // Update manifest counts
668        let now = Utc::now();
669        client
670            .execute(
671                "UPDATE bulk_manifests SET
672                    total_entries = total_entries + $1,
673                    processed_entries = processed_entries + $2,
674                    failed_entries = failed_entries + $3
675                 WHERE tenant_id = $4 AND submitter = $5 AND submission_id = $6 AND manifest_id = $7",
676                &[
677                    &(results.len() as i64),
678                    &(results.iter().filter(|r| r.is_success()).count() as i64),
679                    &(error_count as i64),
680                    &tenant_id,
681                    &submission_id.submitter.as_str(),
682                    &submission_id.submission_id.as_str(),
683                    &manifest_id,
684                ],
685            )
686            .await
687            .map_err(|e| internal_error(format!("Failed to update manifest counts: {}", e)))?;
688
689        // Update submission updated_at
690        client
691            .execute(
692                "UPDATE bulk_submissions SET updated_at = $1
693                 WHERE tenant_id = $2 AND submitter = $3 AND submission_id = $4",
694                &[
695                    &now,
696                    &tenant_id,
697                    &submission_id.submitter.as_str(),
698                    &submission_id.submission_id.as_str(),
699                ],
700            )
701            .await
702            .map_err(|e| internal_error(format!("Failed to update submission: {}", e)))?;
703
704        Ok(results)
705    }
706
707    async fn get_entry_results(
708        &self,
709        tenant: &TenantContext,
710        submission_id: &SubmissionId,
711        manifest_id: &str,
712        outcome_filter: Option<BulkEntryOutcome>,
713        limit: u32,
714        offset: u32,
715    ) -> StorageResult<Vec<BulkEntryResult>> {
716        let client = self.get_client().await?;
717        let tenant_id = tenant.tenant_id().as_str();
718
719        let mut sql =
720            "SELECT line_number, resource_type, resource_id, created, outcome, operation_outcome
721             FROM bulk_entry_results
722             WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4"
723                .to_string();
724
725        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
726            Box::new(tenant_id.to_string()),
727            Box::new(submission_id.submitter.clone()),
728            Box::new(submission_id.submission_id.clone()),
729            Box::new(manifest_id.to_string()),
730        ];
731
732        if let Some(outcome) = outcome_filter {
733            sql.push_str(" AND outcome = $5");
734            params.push(Box::new(outcome.to_string()));
735        }
736
737        sql.push_str(&format!(
738            " ORDER BY line_number LIMIT {} OFFSET {}",
739            limit, offset
740        ));
741
742        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
743            .iter()
744            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
745            .collect();
746
747        let rows = client
748            .query(&sql, &param_refs)
749            .await
750            .map_err(|e| internal_error(format!("Failed to query results: {}", e)))?;
751
752        let results: Vec<BulkEntryResult> = rows
753            .iter()
754            .map(|row| {
755                let line_number: i64 = row.get(0);
756                let resource_type: String = row.get(1);
757                let resource_id: Option<String> = row.get(2);
758                let created: Option<bool> = row.get(3);
759                let outcome_str: String = row.get(4);
760                let operation_outcome: Option<Value> = row.get(5);
761
762                let outcome: BulkEntryOutcome = outcome_str
763                    .parse()
764                    .unwrap_or(BulkEntryOutcome::ProcessingError);
765
766                BulkEntryResult {
767                    line_number: line_number as u64,
768                    resource_type,
769                    resource_id,
770                    created: created.unwrap_or(false),
771                    outcome,
772                    operation_outcome,
773                }
774            })
775            .collect();
776
777        Ok(results)
778    }
779
780    async fn get_entry_counts(
781        &self,
782        tenant: &TenantContext,
783        submission_id: &SubmissionId,
784        manifest_id: &str,
785    ) -> StorageResult<EntryCountSummary> {
786        let client = self.get_client().await?;
787        let tenant_id = tenant.tenant_id().as_str();
788
789        let row = client
790            .query_one(
791                "SELECT
792                    COUNT(*),
793                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END),
794                    SUM(CASE WHEN outcome = 'validation-error' THEN 1 ELSE 0 END),
795                    SUM(CASE WHEN outcome = 'processing-error' THEN 1 ELSE 0 END),
796                    SUM(CASE WHEN outcome = 'skipped' THEN 1 ELSE 0 END)
797                 FROM bulk_entry_results
798                 WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3 AND manifest_id = $4",
799                &[
800                    &tenant_id,
801                    &submission_id.submitter.as_str(),
802                    &submission_id.submission_id.as_str(),
803                    &manifest_id,
804                ],
805            )
806            .await
807            .map_err(|e| internal_error(format!("Failed to count entries: {}", e)))?;
808
809        let total: i64 = row.get(0);
810        let success: Option<i64> = row.get(1);
811        let validation_error: Option<i64> = row.get(2);
812        let processing_error: Option<i64> = row.get(3);
813        let skipped: Option<i64> = row.get(4);
814
815        Ok(EntryCountSummary {
816            total: total as u64,
817            success: success.unwrap_or(0) as u64,
818            validation_error: validation_error.unwrap_or(0) as u64,
819            processing_error: processing_error.unwrap_or(0) as u64,
820            skipped: skipped.unwrap_or(0) as u64,
821        })
822    }
823}
824
825impl PostgresBackend {
826    /// Process a single NDJSON entry.
827    async fn process_single_entry(
828        &self,
829        tenant: &TenantContext,
830        submission_id: &SubmissionId,
831        manifest_id: &str,
832        entry: &NdjsonEntry,
833        options: &BulkProcessingOptions,
834    ) -> StorageResult<BulkEntryResult> {
835        let resource_id = entry.resource_id.as_ref();
836
837        if let Some(id) = resource_id {
838            let existing = self.read(tenant, &entry.resource_type, id).await;
839
840            match existing {
841                Ok(Some(current)) => {
842                    if !options.allow_updates {
843                        return Ok(BulkEntryResult::skipped(
844                            entry.line_number,
845                            &entry.resource_type,
846                            "updates not allowed",
847                        ));
848                    }
849
850                    let change = SubmissionChange::update(
851                        manifest_id,
852                        &entry.resource_type,
853                        id,
854                        current.version_id(),
855                        (current.version_id().parse::<i32>().unwrap_or(0) + 1).to_string(),
856                        current.content().clone(),
857                    );
858                    self.record_change(tenant, submission_id, &change).await?;
859
860                    let updated = self
861                        .update(tenant, &current, entry.resource.clone())
862                        .await?;
863
864                    Ok(BulkEntryResult::success(
865                        entry.line_number,
866                        &entry.resource_type,
867                        updated.id(),
868                        false,
869                    ))
870                }
871                Ok(None)
872                | Err(StorageError::Resource(crate::error::ResourceError::Gone { .. })) => {
873                    let created = self
874                        .create(
875                            tenant,
876                            &entry.resource_type,
877                            entry.resource.clone(),
878                            FhirVersion::default(),
879                        )
880                        .await?;
881
882                    let change = SubmissionChange::create(
883                        manifest_id,
884                        &entry.resource_type,
885                        created.id(),
886                        created.version_id(),
887                    );
888                    self.record_change(tenant, submission_id, &change).await?;
889
890                    Ok(BulkEntryResult::success(
891                        entry.line_number,
892                        &entry.resource_type,
893                        created.id(),
894                        true,
895                    ))
896                }
897                Err(e) => Err(e),
898            }
899        } else {
900            let created = self
901                .create(
902                    tenant,
903                    &entry.resource_type,
904                    entry.resource.clone(),
905                    FhirVersion::default(),
906                )
907                .await?;
908
909            let change = SubmissionChange::create(
910                manifest_id,
911                &entry.resource_type,
912                created.id(),
913                created.version_id(),
914            );
915            self.record_change(tenant, submission_id, &change).await?;
916
917            Ok(BulkEntryResult::success(
918                entry.line_number,
919                &entry.resource_type,
920                created.id(),
921                true,
922            ))
923        }
924    }
925
926    /// Store an entry result in the database.
927    async fn store_entry_result(
928        &self,
929        tenant: &TenantContext,
930        submission_id: &SubmissionId,
931        manifest_id: &str,
932        result: &BulkEntryResult,
933    ) -> StorageResult<()> {
934        let client = self.get_client().await?;
935        let tenant_id = tenant.tenant_id().as_str();
936
937        let outcome_json: Option<Value> = result.operation_outcome.clone();
938
939        client
940            .execute(
941                "INSERT INTO bulk_entry_results
942                 (tenant_id, submitter, submission_id, manifest_id, line_number, resource_type, resource_id, created, outcome, operation_outcome)
943                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
944                &[
945                    &tenant_id,
946                    &submission_id.submitter.as_str(),
947                    &submission_id.submission_id.as_str(),
948                    &manifest_id,
949                    &(result.line_number as i64),
950                    &result.resource_type.as_str(),
951                    &result.resource_id,
952                    &result.created,
953                    &result.outcome.to_string().as_str(),
954                    &outcome_json,
955                ],
956            )
957            .await
958            .map_err(|e| internal_error(format!("Failed to store entry result: {}", e)))?;
959
960        Ok(())
961    }
962}
963
964#[async_trait]
965impl StreamingBulkSubmitProvider for PostgresBackend {
966    async fn process_ndjson_stream(
967        &self,
968        tenant: &TenantContext,
969        submission_id: &SubmissionId,
970        manifest_id: &str,
971        resource_type: &str,
972        mut reader: Box<dyn AsyncBufRead + Send + Unpin>,
973        options: &BulkProcessingOptions,
974    ) -> StorageResult<StreamProcessingResult> {
975        let mut result = StreamProcessingResult::new();
976        let mut line_number = 0u64;
977        let mut batch = Vec::new();
978
979        loop {
980            let mut line = String::new();
981            let bytes_read = reader
982                .read_line(&mut line)
983                .await
984                .map_err(|e| internal_error(format!("Failed to read line: {}", e)))?;
985
986            if bytes_read == 0 {
987                break;
988            }
989
990            line_number += 1;
991            result.lines_processed = line_number;
992
993            let line = line.trim();
994            if line.is_empty() {
995                continue;
996            }
997
998            match NdjsonEntry::parse(line_number, line) {
999                Ok(entry) => {
1000                    if entry.resource_type != resource_type {
1001                        let error_result = BulkEntryResult::validation_error(
1002                            line_number,
1003                            &entry.resource_type,
1004                            serde_json::json!({
1005                                "resourceType": "OperationOutcome",
1006                                "issue": [{
1007                                    "severity": "error",
1008                                    "code": "invalid",
1009                                    "diagnostics": format!("Expected resource type {}, got {}", resource_type, entry.resource_type)
1010                                }]
1011                            }),
1012                        );
1013                        result.counts.increment(error_result.outcome);
1014
1015                        if !options.continue_on_error
1016                            && (options.max_errors == 0
1017                                || result.counts.error_count() >= options.max_errors as u64)
1018                        {
1019                            return Ok(result.aborted("max errors exceeded"));
1020                        }
1021                        continue;
1022                    }
1023
1024                    batch.push(entry);
1025                }
1026                Err(e) => {
1027                    result.counts.increment(BulkEntryOutcome::ValidationError);
1028
1029                    if !options.continue_on_error
1030                        && (options.max_errors == 0
1031                            || result.counts.error_count() >= options.max_errors as u64)
1032                    {
1033                        return Ok(result.aborted(format!("Parse error: {}", e)));
1034                    }
1035                }
1036            }
1037
1038            if batch.len() >= options.batch_size as usize {
1039                let batch_results = self
1040                    .process_entries(
1041                        tenant,
1042                        submission_id,
1043                        manifest_id,
1044                        std::mem::take(&mut batch),
1045                        options,
1046                    )
1047                    .await?;
1048
1049                for r in batch_results {
1050                    result.counts.increment(r.outcome);
1051                }
1052
1053                if !options.continue_on_error
1054                    && options.max_errors > 0
1055                    && result.counts.error_count() >= options.max_errors as u64
1056                {
1057                    return Ok(result.aborted("max errors exceeded"));
1058                }
1059            }
1060        }
1061
1062        // Process remaining entries
1063        if !batch.is_empty() {
1064            let batch_results = self
1065                .process_entries(tenant, submission_id, manifest_id, batch, options)
1066                .await?;
1067
1068            for r in batch_results {
1069                result.counts.increment(r.outcome);
1070            }
1071        }
1072
1073        Ok(result)
1074    }
1075}
1076
1077#[async_trait]
1078impl BulkSubmitRollbackProvider for PostgresBackend {
1079    async fn record_change(
1080        &self,
1081        tenant: &TenantContext,
1082        submission_id: &SubmissionId,
1083        change: &SubmissionChange,
1084    ) -> StorageResult<()> {
1085        let client = self.get_client().await?;
1086        let tenant_id = tenant.tenant_id().as_str();
1087
1088        let previous_content_json: Option<Value> = change.previous_content.clone();
1089
1090        client
1091            .execute(
1092                "INSERT INTO bulk_submission_changes
1093                 (tenant_id, submitter, submission_id, change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at)
1094                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1095                &[
1096                    &tenant_id,
1097                    &submission_id.submitter.as_str(),
1098                    &submission_id.submission_id.as_str(),
1099                    &change.change_id.as_str(),
1100                    &change.manifest_id.as_str(),
1101                    &change.change_type.to_string().as_str(),
1102                    &change.resource_type.as_str(),
1103                    &change.resource_id.as_str(),
1104                    &change.previous_version,
1105                    &change.new_version.as_str(),
1106                    &previous_content_json,
1107                    &change.changed_at,
1108                ],
1109            )
1110            .await
1111            .map_err(|e| internal_error(format!("Failed to record change: {}", e)))?;
1112
1113        Ok(())
1114    }
1115
1116    async fn list_changes(
1117        &self,
1118        tenant: &TenantContext,
1119        submission_id: &SubmissionId,
1120        limit: u32,
1121        offset: u32,
1122    ) -> StorageResult<Vec<SubmissionChange>> {
1123        let client = self.get_client().await?;
1124        let tenant_id = tenant.tenant_id().as_str();
1125
1126        let sql = format!(
1127            "SELECT change_id, manifest_id, change_type, resource_type, resource_id, previous_version, new_version, previous_content, changed_at
1128             FROM bulk_submission_changes
1129             WHERE tenant_id = $1 AND submitter = $2 AND submission_id = $3
1130             ORDER BY changed_at DESC
1131             LIMIT {} OFFSET {}",
1132            limit, offset
1133        );
1134
1135        let rows = client
1136            .query(
1137                &sql,
1138                &[
1139                    &tenant_id,
1140                    &submission_id.submitter.as_str(),
1141                    &submission_id.submission_id.as_str(),
1142                ],
1143            )
1144            .await
1145            .map_err(|e| internal_error(format!("Failed to query changes: {}", e)))?;
1146
1147        let changes: Vec<SubmissionChange> = rows
1148            .iter()
1149            .map(|row| {
1150                let change_id: String = row.get(0);
1151                let manifest_id: String = row.get(1);
1152                let change_type_str: String = row.get(2);
1153                let resource_type: String = row.get(3);
1154                let resource_id: String = row.get(4);
1155                let previous_version: Option<String> = row.get(5);
1156                let new_version: String = row.get(6);
1157                let previous_content: Option<Value> = row.get(7);
1158                let changed_at: chrono::DateTime<Utc> = row.get(8);
1159
1160                let change_type: ChangeType = change_type_str.parse().unwrap_or(ChangeType::Create);
1161
1162                SubmissionChange {
1163                    change_id,
1164                    manifest_id,
1165                    change_type,
1166                    resource_type,
1167                    resource_id,
1168                    previous_version,
1169                    new_version,
1170                    previous_content,
1171                    changed_at,
1172                }
1173            })
1174            .collect();
1175
1176        Ok(changes)
1177    }
1178
1179    async fn rollback_change(
1180        &self,
1181        tenant: &TenantContext,
1182        _submission_id: &SubmissionId,
1183        change: &SubmissionChange,
1184    ) -> StorageResult<bool> {
1185        match change.change_type {
1186            ChangeType::Create => {
1187                match self
1188                    .delete(tenant, &change.resource_type, &change.resource_id)
1189                    .await
1190                {
1191                    Ok(()) => Ok(true),
1192                    Err(StorageError::Resource(crate::error::ResourceError::NotFound {
1193                        ..
1194                    })) => Ok(true),
1195                    Err(e) => Err(e),
1196                }
1197            }
1198            ChangeType::Update => {
1199                if let Some(ref previous_content) = change.previous_content {
1200                    let current = self
1201                        .read(tenant, &change.resource_type, &change.resource_id)
1202                        .await?;
1203                    if let Some(current) = current {
1204                        self.update(tenant, &current, previous_content.clone())
1205                            .await?;
1206                        Ok(true)
1207                    } else {
1208                        Ok(false)
1209                    }
1210                } else {
1211                    Ok(false)
1212                }
1213            }
1214        }
1215    }
1216}