Skip to main content

helios_persistence/core/
bulk_submit_worker.rs

1//! Worker-facing traits for asynchronous Bulk Data **Submit** processing.
2//!
3//! The synchronous ingestion engine ([`BulkSubmitProvider`] and friends) does the
4//! actual create/update/delete work. This module adds the missing async layer that
5//! the FHIR Bulk Data Submit operation needs: a Data Consumer accepts a submission,
6//! then a background worker **claims a pending manifest under a heartbeated,
7//! fencing-token-guarded lease**, fetches the referenced files, and ingests them —
8//! mirroring the bulk-export worker design ([`crate::core::bulk_export_worker`]).
9//!
10//! [`BulkSubmitProvider`]: crate::core::bulk_submit::BulkSubmitProvider
11
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use helios_fhir::FhirVersion;
18use serde_json::{Value, json};
19
20use crate::core::bulk_export_output::{ExportOutputStore, ExportPartKey};
21use crate::core::bulk_export_worker::{LeaseError, WorkerId};
22use crate::core::bulk_submit::{
23    BulkEntryOutcome, BulkProcessingOptions, BulkSubmitProvider, BulkSubmitRollbackProvider,
24    StreamingBulkSubmitProvider, SubmissionId,
25};
26use crate::core::bulk_submit_input::{SubmitInputFetcher, submission_output_job_id};
27use crate::error::StorageResult;
28use crate::tenant::TenantContext;
29
30/// A lease over a single pending manifest, held by exactly one worker at a time.
31///
32/// Leases expire; if the holding worker does not heartbeat before `lease_expiry`,
33/// the manifest is reclaimable. The `fencing_token` is bumped on every claim so a
34/// zombie worker cannot mutate a manifest another worker now owns.
35#[derive(Debug, Clone)]
36pub struct ManifestLease {
37    /// The tenant the submission belongs to.
38    pub tenant: TenantContext,
39    /// The submission this manifest belongs to.
40    pub submission_id: SubmissionId,
41    /// The leased manifest's ID (unique within the submission).
42    pub manifest_id: String,
43    /// The worker holding the lease.
44    pub worker_id: WorkerId,
45    /// When the lease expires if not renewed.
46    pub lease_expiry: DateTime<Utc>,
47    /// Monotonically increasing token, bumped on every claim.
48    pub fencing_token: u64,
49}
50
51/// The worker's view of a claimed manifest: everything needed to fetch + ingest it.
52#[derive(Debug, Clone)]
53pub struct ManifestWorkerView {
54    /// The manifest ID.
55    pub manifest_id: String,
56    /// The remote Bulk Export Manifest URL to fetch (None means nothing to do).
57    pub manifest_url: Option<String>,
58    /// Base URL for resolving relative references in ingested resources.
59    pub fhir_base_url: Option<String>,
60    /// The kickoff `outputFormat` (MIME), used to derive the FHIR version.
61    pub output_format: Option<String>,
62    /// HTTP headers the Data Provider asked us to include when fetching files.
63    pub file_request_headers: Vec<(String, String)>,
64    /// OAuth 2.0 metadata endpoints for acquiring file-retrieval tokens.
65    pub oauth_metadata_urls: Vec<String>,
66    /// JWE file-encryption key descriptor, if the provider encrypts files.
67    pub file_encryption_key: Option<Value>,
68    /// Resume cursor: lines already processed for this manifest.
69    pub last_processed_line: u64,
70    /// FHIR version this submission ingests against.
71    pub fhir_version: FhirVersion,
72}
73
74/// A finalized status-manifest artifact (output / error / deleted NDJSON part) to record.
75#[derive(Debug, Clone)]
76pub struct SubmitFileRecord {
77    /// The submitted manifest this artifact relates to (for `output[].manifestUrl`).
78    pub manifest_url: Option<String>,
79    /// `output`, `error`, or `deleted`.
80    pub file_type: String,
81    /// FHIR resource type (for `output` entries).
82    pub resource_type: Option<String>,
83    /// 0-based part index within (submission, file_type, resource_type).
84    pub part_index: u32,
85    /// Encoded storage key / path in the output store.
86    pub file_path: String,
87    /// Number of NDJSON lines in the artifact.
88    pub line_count: u64,
89    /// Size of the artifact in bytes.
90    pub byte_count: u64,
91    /// `countSeverity` breakdown JSON (for `error` artifacts).
92    pub count_severity: Option<Value>,
93}
94
95/// A persisted status-manifest artifact row, read back when building the status manifest.
96#[derive(Debug, Clone)]
97pub struct SubmitFileRow {
98    /// The submitted manifest this artifact relates to.
99    pub manifest_url: Option<String>,
100    /// `output`, `error`, or `deleted`.
101    pub file_type: String,
102    /// FHIR resource type (for `output` entries).
103    pub resource_type: Option<String>,
104    /// 0-based part index.
105    pub part_index: u32,
106    /// Fencing token of the worker that wrote it.
107    pub fencing_token: u64,
108    /// Encoded storage key / path in the output store.
109    pub file_path: String,
110    /// Number of NDJSON lines in the artifact.
111    pub line_count: u64,
112    /// Size of the artifact in bytes.
113    pub byte_count: u64,
114    /// `countSeverity` breakdown JSON (for `error` artifacts).
115    pub count_severity: Option<Value>,
116}
117
118/// Resolution of a poll token to its owning submission, for REST status/cancel/file auth.
119#[derive(Debug, Clone)]
120pub struct PollTokenTarget {
121    /// The tenant the submission belongs to.
122    pub tenant: TenantContext,
123    /// The submission identified by the token.
124    pub submission_id: SubmissionId,
125    /// The OAuth subject that kicked off the submission (for ownership checks).
126    pub owner_subject: Option<String>,
127}
128
129/// Strategy for atomically claiming the next available pending manifest.
130///
131/// Each backend reaches for its native primitive — `SELECT … FOR UPDATE SKIP LOCKED`
132/// on Postgres, a process-local mutex on SQLite.
133#[async_trait]
134pub trait SubmitClaimStrategy: Send + Sync {
135    /// Atomically transitions one eligible manifest (`pending`, or `processing` with
136    /// an expired lease) to held-by-this-worker, bumping the fencing token. Returns
137    /// `Ok(None)` when no manifest is available.
138    async fn claim_next_manifest(
139        &self,
140        worker_id: &WorkerId,
141        lease_duration: Duration,
142    ) -> StorageResult<Option<ManifestLease>>;
143
144    /// Renews a lease the worker still holds; returns the new expiry, or
145    /// `LeaseError::LeaseLost` if the manifest was reclaimed.
146    async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError>;
147
148    /// Releases a lease early (graceful shutdown). Best-effort.
149    async fn release(&self, lease: ManifestLease) -> StorageResult<()>;
150}
151
152/// Worker-owned mutations of manifest/submission state.
153///
154/// The fenced methods (those taking a [`ManifestLease`]) verify `worker_id` +
155/// `fencing_token`: a guarded mutation affecting zero rows returns
156/// `LeaseError::LeaseLost`, so a zombie worker cannot corrupt progress, file rows,
157/// or terminal status after its manifest has been reclaimed.
158#[async_trait]
159pub trait SubmitWorkerStorage: Send + Sync {
160    /// Loads the claimed manifest's fetch parameters and resume cursor. Fenced.
161    async fn get_manifest_for_worker(
162        &self,
163        lease: &ManifestLease,
164    ) -> Result<ManifestWorkerView, LeaseError>;
165
166    /// Marks the manifest `processing`. Fenced.
167    async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError>;
168
169    /// Idempotent update of per-manifest progress (counts + resume cursor). Fenced.
170    async fn update_manifest_progress(
171        &self,
172        lease: &ManifestLease,
173        processed_entries: u64,
174        failed_entries: u64,
175        last_processed_line: u64,
176    ) -> Result<(), LeaseError>;
177
178    /// Idempotent upsert of a finalized status-manifest artifact row. Fenced.
179    async fn record_submit_file(
180        &self,
181        lease: &ManifestLease,
182        file: &SubmitFileRecord,
183    ) -> Result<(), LeaseError>;
184
185    /// Marks the manifest `completed`. Fenced.
186    async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError>;
187
188    /// Marks the manifest `failed` with a message. Fenced.
189    async fn fail_manifest(
190        &self,
191        lease: &ManifestLease,
192        error_message: &str,
193    ) -> Result<(), LeaseError>;
194
195    // ---- REST-facing (unfenced) submission/poll-token/artifact lifecycle ----
196
197    /// Persists the remote-fetch parameters for a manifest that the kickoff handler
198    /// added via [`crate::core::bulk_submit::BulkSubmitProvider::add_manifest`].
199    ///
200    /// `file_request_headers` / `oauth_metadata_urls` are stored as JSON arrays;
201    /// `file_encryption_key` as a JSON object.
202    #[allow(clippy::too_many_arguments)]
203    async fn set_manifest_fetch_params(
204        &self,
205        tenant: &TenantContext,
206        id: &SubmissionId,
207        manifest_id: &str,
208        fhir_base_url: Option<&str>,
209        output_format: Option<&str>,
210        file_request_headers: &[(String, String)],
211        oauth_metadata_urls: &[String],
212        file_encryption_key: Option<&Value>,
213    ) -> StorageResult<()>;
214
215    /// Marks a previously-submitted manifest (identified by its submitted
216    /// `manifest_url`) as `replaced`, for `replacesManifestUrl` handling. Returns the
217    /// `manifest_id`s that were superseded (so the caller can roll back their changes).
218    async fn replace_manifest_by_url(
219        &self,
220        tenant: &TenantContext,
221        id: &SubmissionId,
222        manifest_url: &str,
223    ) -> StorageResult<Vec<String>>;
224
225    /// Persists kickoff metadata (owner subject, request URL, access-token posture).
226    async fn set_submission_kickoff_meta(
227        &self,
228        tenant: &TenantContext,
229        id: &SubmissionId,
230        owner_subject: Option<&str>,
231        request_url: &str,
232        requires_access_token: bool,
233    ) -> StorageResult<()>;
234
235    /// Idempotently mints + stores a poll token on the submission (UNIQUE index
236    /// guards collisions). Returns the existing token if one is already set.
237    async fn ensure_poll_token(
238        &self,
239        tenant: &TenantContext,
240        id: &SubmissionId,
241    ) -> StorageResult<String>;
242
243    /// Resolves a poll token to its submission, or `None` after deletion/unknown.
244    async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>>;
245
246    /// Clears the poll token so subsequent [`Self::resolve_poll_token`] returns `None`.
247    async fn clear_poll_token(
248        &self,
249        tenant: &TenantContext,
250        id: &SubmissionId,
251    ) -> StorageResult<()>;
252
253    /// Lists the recorded status-manifest artifact rows for a submission.
254    async fn list_submit_files(
255        &self,
256        tenant: &TenantContext,
257        id: &SubmissionId,
258    ) -> StorageResult<Vec<SubmitFileRow>>;
259
260    /// Deletes the `bulk_submit_files` rows for a submission (idempotent).
261    ///
262    /// **Note:** the persistence layer holds no reference to the output store, so
263    /// removing the underlying NDJSON objects is orchestrated by the caller (REST
264    /// DELETE handler / TTL cleanup task), which lists files via
265    /// [`Self::list_submit_files`], deletes them from the output store, then calls
266    /// this to drop the rows.
267    async fn delete_submission_artifacts(
268        &self,
269        tenant: &TenantContext,
270        id: &SubmissionId,
271    ) -> StorageResult<()>;
272
273    /// Counts non-terminal submissions for a tenant (per-tenant concurrency cap).
274    async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64>;
275
276    /// Lists submissions whose `updated_at` is older than `now - ttl`, across all
277    /// tenants, for the periodic cleanup task. Returns `(tenant, submission_id)`
278    /// pairs (bounded by `limit`) so the caller can delete their output-store
279    /// artifacts and rows.
280    async fn list_expired_submissions(
281        &self,
282        now: DateTime<Utc>,
283        ttl: Duration,
284        limit: u32,
285    ) -> StorageResult<Vec<(TenantContext, SubmissionId)>>;
286
287    /// Records a transaction time on the submission when its status manifest is first
288    /// finalized (idempotent — only sets if currently unset).
289    async fn ensure_transaction_time(
290        &self,
291        tenant: &TenantContext,
292        id: &SubmissionId,
293    ) -> StorageResult<DateTime<Utc>>;
294}
295
296/// Marker trait composing the submit job-state surfaces a worker + REST layer needs.
297///
298/// Only the SQLite and Postgres backends implement this; it is held as an
299/// `Arc<dyn BulkSubmitJobStore>` and selected at bootstrap by `HFS_BULK_SUBMIT_BACKEND`.
300pub trait BulkSubmitJobStore:
301    BulkSubmitProvider
302    + StreamingBulkSubmitProvider
303    + BulkSubmitRollbackProvider
304    + SubmitWorkerStorage
305    + SubmitClaimStrategy
306{
307}
308
309impl<T> BulkSubmitJobStore for T where
310    T: BulkSubmitProvider
311        + StreamingBulkSubmitProvider
312        + BulkSubmitRollbackProvider
313        + SubmitWorkerStorage
314        + SubmitClaimStrategy
315{
316}
317
318/// The default in-process submit worker.
319///
320/// Binds a [`BulkSubmitJobStore`] (job state + claim + worker storage + ingestion
321/// engine), a [`SubmitInputFetcher`] (remote manifest + NDJSON fetch), and an
322/// [`ExportOutputStore`] (where status-manifest artifacts go), and drives a claimed
323/// manifest to completion: fetch → ingest each `output` file via the existing
324/// `process_ndjson_stream` engine → emit `output`/`error` artifacts → finish.
325pub struct DefaultSubmitWorker<Js: ?Sized, Fetcher: ?Sized, Os: ?Sized> {
326    jobs: Arc<Js>,
327    fetcher: Arc<Fetcher>,
328    output: Arc<Os>,
329    #[allow(dead_code)]
330    worker_id: WorkerId,
331}
332
333impl<Js, Fetcher, Os> DefaultSubmitWorker<Js, Fetcher, Os>
334where
335    Js: BulkSubmitJobStore + ?Sized,
336    Fetcher: SubmitInputFetcher + ?Sized,
337    Os: ExportOutputStore + ?Sized,
338{
339    /// Creates a new worker bound to the given job store, fetcher, and output store.
340    pub fn new(jobs: Arc<Js>, fetcher: Arc<Fetcher>, output: Arc<Os>, worker_id: WorkerId) -> Self {
341        Self {
342            jobs,
343            fetcher,
344            output,
345            worker_id,
346        }
347    }
348
349    /// Drives a single claimed manifest to a terminal state.
350    ///
351    /// Returns `Ok(())` for both successful ingestion and *recorded* manifest-level
352    /// failures (a bad remote manifest fails only that manifest, not the worker).
353    /// Only `LeaseError::LeaseLost` aborts early; storage errors propagate.
354    pub async fn run_job(&self, lease: ManifestLease) -> StorageResult<()> {
355        let view = match self.jobs.get_manifest_for_worker(&lease).await {
356            Ok(v) => v,
357            Err(LeaseError::LeaseLost { .. }) => return Ok(()),
358            Err(LeaseError::Storage(e)) => return Err(e),
359        };
360        if let Err(LeaseError::Storage(e)) = self.jobs.mark_manifest_processing(&lease).await {
361            return Err(e);
362        }
363
364        let Some(manifest_url) = view.manifest_url.clone() else {
365            // Nothing to fetch (status-only submission) — treat as done.
366            let _ = self.jobs.finish_manifest(&lease).await;
367            return Ok(());
368        };
369
370        // 1. Fetch the remote Bulk Export Manifest.
371        let manifest = match self
372            .fetcher
373            .fetch_manifest(
374                &manifest_url,
375                &view.file_request_headers,
376                &view.oauth_metadata_urls,
377            )
378            .await
379        {
380            Ok(m) => m,
381            Err(e) => {
382                self.record_manifest_error(
383                    &lease,
384                    &manifest_url,
385                    &format!("failed to fetch manifest: {e}"),
386                )
387                .await?;
388                let _ = self.jobs.fail_manifest(&lease, &e.to_string()).await;
389                return Ok(());
390            }
391        };
392
393        let opts = BulkProcessingOptions::new();
394        let mut processed: u64 = 0;
395        let mut failed: u64 = 0;
396
397        // 2. Ingest each output file via the existing streaming engine.
398        for file in &manifest.output {
399            if let Err(LeaseError::LeaseLost { .. }) = self.jobs.heartbeat(&lease).await {
400                return Ok(());
401            }
402            let resource_type = file
403                .resource_type
404                .clone()
405                .unwrap_or_else(|| "Resource".into());
406            let stream = match self
407                .fetcher
408                .open_file_stream(
409                    &file.url,
410                    &view.file_request_headers,
411                    manifest.requires_access_token,
412                    &view.oauth_metadata_urls,
413                    view.file_encryption_key.as_ref(),
414                )
415                .await
416            {
417                Ok(s) => s,
418                Err(e) => {
419                    self.record_manifest_error(
420                        &lease,
421                        &manifest_url,
422                        &format!("failed to fetch file {}: {e}", file.url),
423                    )
424                    .await?;
425                    failed += 1;
426                    continue;
427                }
428            };
429
430            match self
431                .jobs
432                .process_ndjson_stream(
433                    &lease.tenant,
434                    &lease.submission_id,
435                    &lease.manifest_id,
436                    &resource_type,
437                    stream,
438                    &opts,
439                )
440                .await
441            {
442                Ok(result) => {
443                    processed += result.counts.success + result.counts.skipped;
444                    failed += result.counts.error_count();
445                }
446                Err(e) => {
447                    self.record_manifest_error(
448                        &lease,
449                        &manifest_url,
450                        &format!("failed to ingest file {}: {e}", file.url),
451                    )
452                    .await?;
453                    failed += 1;
454                }
455            }
456
457            if let Err(LeaseError::Storage(e)) = self
458                .jobs
459                .update_manifest_progress(&lease, processed, failed, processed + failed)
460                .await
461            {
462                return Err(e);
463            }
464        }
465
466        // 2b. Process `deleted` files — transaction Bundles / resource refs to remove.
467        let mut deleted_refs: Vec<String> = Vec::new();
468        for file in &manifest.deleted {
469            if let Err(LeaseError::LeaseLost { .. }) = self.jobs.heartbeat(&lease).await {
470                return Ok(());
471            }
472            match self
473                .fetcher
474                .open_file_stream(
475                    &file.url,
476                    &view.file_request_headers,
477                    manifest.requires_access_token,
478                    &view.oauth_metadata_urls,
479                    view.file_encryption_key.as_ref(),
480                )
481                .await
482            {
483                Ok(reader) => {
484                    self.process_deleted_stream(&lease, reader, &mut deleted_refs)
485                        .await;
486                }
487                Err(e) => {
488                    self.record_manifest_error(
489                        &lease,
490                        &manifest_url,
491                        &format!("failed to fetch deleted file {}: {e}", file.url),
492                    )
493                    .await?;
494                }
495            }
496        }
497        if !deleted_refs.is_empty() {
498            self.write_deleted_artifact(&lease, &manifest_url, &deleted_refs)
499                .await?;
500        }
501
502        // 3. Emit per-type `output` receipts and an aggregated `error` artifact.
503        self.write_result_artifacts(&lease, &manifest_url, view.fhir_version, failed)
504            .await?;
505
506        // 4. Mark the manifest complete.
507        if let Err(LeaseError::Storage(e)) = self.jobs.finish_manifest(&lease).await {
508            return Err(e);
509        }
510        Ok(())
511    }
512
513    /// Reads back this manifest's entry results and writes `output` receipts
514    /// (grouped by resource type) plus a single aggregated `error` artifact.
515    async fn write_result_artifacts(
516        &self,
517        lease: &ManifestLease,
518        manifest_url: &str,
519        _fhir_version: FhirVersion,
520        failed_count: u64,
521    ) -> StorageResult<()> {
522        let job_id = submission_output_job_id(&lease.submission_id);
523        let tenant_id = lease.tenant.tenant_id().as_str().to_string();
524
525        // Page through all entry results for this manifest.
526        let mut all = Vec::new();
527        let limit = 1000u32;
528        let mut offset = 0u32;
529        loop {
530            let batch = self
531                .jobs
532                .get_entry_results(
533                    &lease.tenant,
534                    &lease.submission_id,
535                    &lease.manifest_id,
536                    None,
537                    limit,
538                    offset,
539                )
540                .await?;
541            let n = batch.len() as u32;
542            all.extend(batch);
543            if n < limit {
544                break;
545            }
546            offset += limit;
547        }
548
549        // Partition successes (by type) and errors.
550        let mut by_type: std::collections::BTreeMap<String, Vec<String>> =
551            std::collections::BTreeMap::new();
552        let mut error_lines: Vec<String> = Vec::new();
553        let mut severity: std::collections::BTreeMap<String, u64> =
554            std::collections::BTreeMap::new();
555
556        for entry in &all {
557            match entry.outcome {
558                BulkEntryOutcome::Success => {
559                    if let Some(id) = &entry.resource_id {
560                        let line = json!({"reference": format!("{}/{}", entry.resource_type, id)})
561                            .to_string();
562                        by_type
563                            .entry(entry.resource_type.clone())
564                            .or_default()
565                            .push(line);
566                    }
567                }
568                BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError => {
569                    let oo = entry
570                        .operation_outcome
571                        .clone()
572                        .unwrap_or_else(|| default_error_outcome(entry));
573                    tally_severity(&oo, &mut severity);
574                    error_lines.push(oo.to_string());
575                }
576                BulkEntryOutcome::Skipped => {}
577            }
578        }
579
580        // The streaming engine counts parse / wrong-type failures but does not
581        // persist them as per-line entry results. Surface any such uncaptured
582        // failures as a summary OperationOutcome so the status manifest's `error`
583        // array reflects them (partial success).
584        let recorded_errors = error_lines.len() as u64;
585        if failed_count > recorded_errors {
586            let uncaptured = failed_count - recorded_errors;
587            let oo = json!({
588                "resourceType": "OperationOutcome",
589                "issue": [{
590                    "severity": "error",
591                    "code": "processing",
592                    "diagnostics": format!(
593                        "{uncaptured} submitted resource(s) could not be parsed or did not \
594                         match the declared resource type"
595                    )
596                }]
597            });
598            tally_severity(&oo, &mut severity);
599            error_lines.push(oo.to_string());
600        }
601
602        // Write one `output` part per resource type.
603        for (idx, (resource_type, lines)) in by_type.iter().enumerate() {
604            let key = ExportPartKey::output(
605                tenant_id.clone(),
606                job_id.clone(),
607                resource_type.clone(),
608                idx as u32,
609                lease.fencing_token,
610            );
611            let part = self.write_part(&key, lines).await?;
612            self.jobs
613                .record_submit_file(
614                    lease,
615                    &SubmitFileRecord {
616                        manifest_url: Some(manifest_url.to_string()),
617                        file_type: "output".to_string(),
618                        resource_type: Some(resource_type.clone()),
619                        part_index: idx as u32,
620                        file_path: key.part_segment(),
621                        line_count: part.0,
622                        byte_count: part.1,
623                        count_severity: None,
624                    },
625                )
626                .await
627                .map_err(lease_err_to_storage)?;
628        }
629
630        // Write a single aggregated `error` part (if any).
631        if !error_lines.is_empty() {
632            let key = ExportPartKey::error(
633                tenant_id.clone(),
634                job_id.clone(),
635                "OperationOutcome",
636                0,
637                lease.fencing_token,
638            );
639            let part = self.write_part(&key, &error_lines).await?;
640            let count_severity = Value::Object(
641                severity
642                    .into_iter()
643                    .map(|(k, v)| (k, Value::from(v)))
644                    .collect(),
645            );
646            self.jobs
647                .record_submit_file(
648                    lease,
649                    &SubmitFileRecord {
650                        manifest_url: Some(manifest_url.to_string()),
651                        file_type: "error".to_string(),
652                        resource_type: Some("OperationOutcome".to_string()),
653                        part_index: 0,
654                        file_path: key.part_segment(),
655                        line_count: part.0,
656                        byte_count: part.1,
657                        count_severity: Some(count_severity),
658                    },
659                )
660                .await
661                .map_err(lease_err_to_storage)?;
662        }
663        Ok(())
664    }
665
666    /// Applies deletions from a `deleted` NDJSON stream (transaction Bundles or
667    /// bare resources), collecting `Type/id` references actually removed.
668    async fn process_deleted_stream(
669        &self,
670        lease: &ManifestLease,
671        reader: Box<dyn tokio::io::AsyncBufRead + Send + Unpin>,
672        refs: &mut Vec<String>,
673    ) {
674        use tokio::io::AsyncBufReadExt;
675        let mut lines = reader.lines();
676        while let Ok(Some(line)) = lines.next_line().await {
677            let line = line.trim();
678            if line.is_empty() {
679                continue;
680            }
681            let Ok(val) = serde_json::from_str::<Value>(line) else {
682                continue;
683            };
684            if val.get("resourceType").and_then(|v| v.as_str()) == Some("Bundle") {
685                if let Some(entries) = val.get("entry").and_then(|e| e.as_array()) {
686                    for e in entries {
687                        if let Some(url) = e
688                            .get("request")
689                            .and_then(|r| r.get("url"))
690                            .and_then(|u| u.as_str())
691                        {
692                            if let Some((ty, id)) = url.split_once('/') {
693                                if self.jobs.delete(&lease.tenant, ty, id).await.is_ok() {
694                                    refs.push(format!("{ty}/{id}"));
695                                }
696                            }
697                        }
698                    }
699                }
700            } else if let (Some(ty), Some(id)) = (
701                val.get("resourceType").and_then(|v| v.as_str()),
702                val.get("id").and_then(|v| v.as_str()),
703            ) {
704                if self.jobs.delete(&lease.tenant, ty, id).await.is_ok() {
705                    refs.push(format!("{ty}/{id}"));
706                }
707            }
708        }
709    }
710
711    /// Writes the `deleted` receipt artifact listing removed resource references.
712    async fn write_deleted_artifact(
713        &self,
714        lease: &ManifestLease,
715        manifest_url: &str,
716        refs: &[String],
717    ) -> StorageResult<()> {
718        let job_id = submission_output_job_id(&lease.submission_id);
719        let tenant_id = lease.tenant.tenant_id().as_str().to_string();
720        let lines: Vec<String> = refs
721            .iter()
722            .map(|r| json!({ "reference": r }).to_string())
723            .collect();
724        let key = ExportPartKey {
725            tenant_id,
726            job_id,
727            resource_type: "Bundle".to_string(),
728            file_type: "deleted".to_string(),
729            part_index: 0,
730            fencing_token: lease.fencing_token,
731        };
732        let (line_count, byte_count) = self.write_part(&key, &lines).await?;
733        self.jobs
734            .record_submit_file(
735                lease,
736                &SubmitFileRecord {
737                    manifest_url: Some(manifest_url.to_string()),
738                    file_type: "deleted".to_string(),
739                    resource_type: Some("Bundle".to_string()),
740                    part_index: 0,
741                    file_path: key.part_segment(),
742                    line_count,
743                    byte_count,
744                    count_severity: None,
745                },
746            )
747            .await
748            .map_err(lease_err_to_storage)?;
749        Ok(())
750    }
751
752    /// Records a single manifest-level `error` OperationOutcome artifact.
753    async fn record_manifest_error(
754        &self,
755        lease: &ManifestLease,
756        manifest_url: &str,
757        message: &str,
758    ) -> StorageResult<()> {
759        let job_id = submission_output_job_id(&lease.submission_id);
760        let tenant_id = lease.tenant.tenant_id().as_str().to_string();
761        let oo = json!({
762            "resourceType": "OperationOutcome",
763            "issue": [{
764                "severity": "error",
765                "code": "processing",
766                "diagnostics": message
767            }]
768        })
769        .to_string();
770        // Use a high part index space for manifest-level errors to avoid clashing
771        // with per-result error parts (which use index 0).
772        let key = ExportPartKey::error(
773            tenant_id,
774            job_id,
775            "OperationOutcome",
776            1_000_000 + (manifest_url.len() as u32 % 1000),
777            lease.fencing_token,
778        );
779        let part = self.write_part(&key, std::slice::from_ref(&oo)).await?;
780        self.jobs
781            .record_submit_file(
782                lease,
783                &SubmitFileRecord {
784                    manifest_url: Some(manifest_url.to_string()),
785                    file_type: "error".to_string(),
786                    resource_type: Some("OperationOutcome".to_string()),
787                    part_index: key.part_index,
788                    file_path: key.part_segment(),
789                    line_count: part.0,
790                    byte_count: part.1,
791                    count_severity: Some(json!({"error": 1})),
792                },
793            )
794            .await
795            .map_err(lease_err_to_storage)?;
796        Ok(())
797    }
798
799    /// Writes NDJSON `lines` to a new output-store part, returning `(line_count, byte_count)`.
800    async fn write_part(&self, key: &ExportPartKey, lines: &[String]) -> StorageResult<(u64, u64)> {
801        let mut writer = self.output.open_writer(key).await?;
802        for line in lines {
803            writer.write_line(line).await.map_err(|e| {
804                crate::error::StorageError::Backend(crate::error::BackendError::Internal {
805                    backend_name: "bulk-submit-output".to_string(),
806                    message: format!("write artifact: {e}"),
807                    source: None,
808                })
809            })?;
810        }
811        let finalized = self.output.finalize_part(key, writer).await?;
812        Ok((finalized.line_count, finalized.size_bytes))
813    }
814}
815
816/// Converts a fenced [`LeaseError`] into a plain storage error for non-fatal paths.
817fn lease_err_to_storage(e: LeaseError) -> crate::error::StorageError {
818    match e {
819        LeaseError::Storage(s) => s,
820        LeaseError::LeaseLost { job_id } => {
821            crate::error::StorageError::Backend(crate::error::BackendError::Internal {
822                backend_name: "bulk-submit".to_string(),
823                message: format!("lease lost for {job_id}"),
824                source: None,
825            })
826        }
827    }
828}
829
830/// Builds a fallback OperationOutcome when an entry result lacks one.
831fn default_error_outcome(entry: &crate::core::bulk_submit::BulkEntryResult) -> Value {
832    json!({
833        "resourceType": "OperationOutcome",
834        "issue": [{
835            "severity": "error",
836            "code": "processing",
837            "diagnostics": format!(
838                "{} error on {} line {}",
839                entry.outcome, entry.resource_type, entry.line_number
840            )
841        }]
842    })
843}
844
845/// Tallies issue severities from an OperationOutcome into `acc` (for `countSeverity`).
846fn tally_severity(oo: &Value, acc: &mut std::collections::BTreeMap<String, u64>) {
847    if let Some(issues) = oo.get("issue").and_then(|v| v.as_array()) {
848        for issue in issues {
849            let sev = issue
850                .get("severity")
851                .and_then(|v| v.as_str())
852                .unwrap_or("error");
853            *acc.entry(sev.to_string()).or_insert(0) += 1;
854        }
855    } else {
856        *acc.entry("error".to_string()).or_insert(0) += 1;
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863    use crate::backends::local_fs::LocalFsOutputStore;
864    use crate::backends::sqlite::SqliteBackend;
865    use crate::core::bulk_submit::BulkSubmitProvider;
866    use crate::core::bulk_submit_input::{RemoteFile, RemoteManifest};
867    use crate::tenant::{TenantContext, TenantId, TenantPermissions};
868    use std::time::Duration as StdDuration;
869
870    /// A fetcher that returns a fixed manifest and serves NDJSON from memory.
871    struct MockFetcher {
872        files: std::collections::HashMap<String, Vec<u8>>,
873        manifest: RemoteManifest,
874    }
875
876    #[async_trait]
877    impl SubmitInputFetcher for MockFetcher {
878        async fn fetch_manifest(
879            &self,
880            _url: &str,
881            _headers: &[(String, String)],
882            _oauth: &[String],
883        ) -> StorageResult<RemoteManifest> {
884            Ok(self.manifest.clone())
885        }
886
887        async fn open_file_stream(
888            &self,
889            url: &str,
890            _headers: &[(String, String)],
891            _requires_access_token: bool,
892            _oauth: &[String],
893            _encryption_key: Option<&Value>,
894        ) -> StorageResult<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>> {
895            let data = self.files.get(url).cloned().unwrap_or_default();
896            Ok(Box::new(tokio::io::BufReader::new(std::io::Cursor::new(
897                data,
898            ))))
899        }
900    }
901
902    fn tenant() -> TenantContext {
903        TenantContext::new(TenantId::new("t1"), TenantPermissions::full_access())
904    }
905
906    #[tokio::test]
907    async fn test_worker_ingests_output_and_records_artifacts() {
908        let backend = Arc::new(SqliteBackend::in_memory().unwrap());
909        backend.init_schema().unwrap();
910        let tmp = tempfile::tempdir().unwrap();
911        let output = Arc::new(LocalFsOutputStore::new(
912            tmp.path().to_path_buf(),
913            "http://localhost:8080",
914        ));
915
916        let tenant = tenant();
917        let sub_id = SubmissionId::generate("mock-system");
918        backend
919            .create_submission(&tenant, &sub_id, None)
920            .await
921            .unwrap();
922        backend
923            .add_manifest(
924                &tenant,
925                &sub_id,
926                Some("http://provider/manifest.json"),
927                None,
928            )
929            .await
930            .unwrap();
931
932        let ndjson = concat!(
933            "{\"resourceType\":\"Patient\",\"id\":\"p1\",\"name\":[{\"family\":\"A\"}]}\n",
934            "{\"resourceType\":\"Patient\",\"name\":[{\"family\":\"B\"}]}\n"
935        );
936        let mut files = std::collections::HashMap::new();
937        files.insert(
938            "http://provider/patient.ndjson".to_string(),
939            ndjson.as_bytes().to_vec(),
940        );
941        let fetcher = Arc::new(MockFetcher {
942            files,
943            manifest: RemoteManifest {
944                requires_access_token: false,
945                output: vec![RemoteFile {
946                    resource_type: Some("Patient".to_string()),
947                    url: "http://provider/patient.ndjson".to_string(),
948                    count: Some(2),
949                }],
950                deleted: vec![],
951            },
952        });
953
954        let worker = DefaultSubmitWorker::new(
955            backend.clone(),
956            fetcher,
957            output,
958            WorkerId::new("test-worker"),
959        );
960
961        let lease = backend
962            .claim_next_manifest(&WorkerId::new("test-worker"), StdDuration::from_secs(60))
963            .await
964            .unwrap()
965            .expect("claimable manifest");
966        worker.run_job(lease).await.unwrap();
967
968        let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
969        let manifest_id = manifests[0].manifest_id.clone();
970
971        // Both Patients ingested (one with id, one assigned a new id).
972        let counts = backend
973            .get_entry_counts(&tenant, &sub_id, &manifest_id)
974            .await
975            .unwrap();
976        assert_eq!(counts.success, 2);
977
978        // Manifest marked completed.
979        assert_eq!(
980            manifests[0].status,
981            crate::core::bulk_submit::ManifestStatus::Completed
982        );
983
984        // An `output` artifact for Patient was recorded.
985        let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
986        assert!(
987            files
988                .iter()
989                .any(|f| f.file_type == "output" && f.resource_type.as_deref() == Some("Patient"))
990        );
991    }
992
993    /// A fetcher whose `fetch_manifest` always fails (unreachable / bad manifest).
994    struct FailingManifestFetcher;
995    #[async_trait]
996    impl SubmitInputFetcher for FailingManifestFetcher {
997        async fn fetch_manifest(
998            &self,
999            _url: &str,
1000            _h: &[(String, String)],
1001            _o: &[String],
1002        ) -> StorageResult<RemoteManifest> {
1003            Err(crate::error::StorageError::Backend(
1004                crate::error::BackendError::Internal {
1005                    backend_name: "test".into(),
1006                    message: "unreachable manifest".into(),
1007                    source: None,
1008                },
1009            ))
1010        }
1011        async fn open_file_stream(
1012            &self,
1013            _url: &str,
1014            _h: &[(String, String)],
1015            _r: bool,
1016            _o: &[String],
1017            _k: Option<&Value>,
1018        ) -> StorageResult<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>> {
1019            Ok(Box::new(tokio::io::BufReader::new(std::io::Cursor::new(
1020                Vec::new(),
1021            ))))
1022        }
1023    }
1024
1025    async fn seed(backend: &Arc<SqliteBackend>, tenant: &TenantContext) -> SubmissionId {
1026        let sub_id = SubmissionId::generate("mock-system");
1027        backend
1028            .create_submission(tenant, &sub_id, None)
1029            .await
1030            .unwrap();
1031        backend
1032            .add_manifest(tenant, &sub_id, Some("http://provider/manifest.json"), None)
1033            .await
1034            .unwrap();
1035        sub_id
1036    }
1037
1038    #[tokio::test]
1039    async fn test_worker_fails_manifest_on_fetch_error() {
1040        let backend = Arc::new(SqliteBackend::in_memory().unwrap());
1041        backend.init_schema().unwrap();
1042        let tmp = tempfile::tempdir().unwrap();
1043        let output = Arc::new(LocalFsOutputStore::new(
1044            tmp.path().to_path_buf(),
1045            "http://x",
1046        ));
1047        let tenant = tenant();
1048        let sub_id = seed(&backend, &tenant).await;
1049
1050        let worker = DefaultSubmitWorker::new(
1051            backend.clone(),
1052            Arc::new(FailingManifestFetcher),
1053            output,
1054            WorkerId::new("w"),
1055        );
1056        let lease = backend
1057            .claim_next_manifest(&WorkerId::new("w"), StdDuration::from_secs(60))
1058            .await
1059            .unwrap()
1060            .unwrap();
1061        // A bad manifest fails only that manifest (worker returns Ok).
1062        worker.run_job(lease).await.unwrap();
1063
1064        let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
1065        assert_eq!(
1066            manifests[0].status,
1067            crate::core::bulk_submit::ManifestStatus::Failed
1068        );
1069        // A manifest-level error artifact was recorded.
1070        let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
1071        assert!(files.iter().any(|f| f.file_type == "error"));
1072    }
1073
1074    #[tokio::test]
1075    async fn test_worker_partial_success_on_invalid_ndjson() {
1076        let backend = Arc::new(SqliteBackend::in_memory().unwrap());
1077        backend.init_schema().unwrap();
1078        let tmp = tempfile::tempdir().unwrap();
1079        let output = Arc::new(LocalFsOutputStore::new(
1080            tmp.path().to_path_buf(),
1081            "http://x",
1082        ));
1083        let tenant = tenant();
1084        let sub_id = seed(&backend, &tenant).await;
1085
1086        // One valid Patient, one malformed JSON line.
1087        let ndjson = "{\"resourceType\":\"Patient\",\"id\":\"ok\"}\nnot-json\n";
1088        let mut files = std::collections::HashMap::new();
1089        files.insert(
1090            "http://provider/p.ndjson".to_string(),
1091            ndjson.as_bytes().to_vec(),
1092        );
1093        let fetcher = Arc::new(MockFetcher {
1094            files,
1095            manifest: RemoteManifest {
1096                requires_access_token: false,
1097                output: vec![RemoteFile {
1098                    resource_type: Some("Patient".to_string()),
1099                    url: "http://provider/p.ndjson".to_string(),
1100                    count: Some(2),
1101                }],
1102                deleted: vec![],
1103            },
1104        });
1105        let worker = DefaultSubmitWorker::new(backend.clone(), fetcher, output, WorkerId::new("w"));
1106        let lease = backend
1107            .claim_next_manifest(&WorkerId::new("w"), StdDuration::from_secs(60))
1108            .await
1109            .unwrap()
1110            .unwrap();
1111        worker.run_job(lease).await.unwrap();
1112
1113        let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
1114        let counts = backend
1115            .get_entry_counts(&tenant, &sub_id, &manifests[0].manifest_id)
1116            .await
1117            .unwrap();
1118        // Partial success: one ingested; the malformed line is counted as a
1119        // failure on the manifest and surfaced as a summary error artifact, and
1120        // the manifest still completes.
1121        assert_eq!(counts.success, 1);
1122        assert!(manifests[0].failed_entries >= 1);
1123        assert_eq!(
1124            manifests[0].status,
1125            crate::core::bulk_submit::ManifestStatus::Completed
1126        );
1127        let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
1128        assert!(files.iter().any(|f| f.file_type == "error"));
1129        assert!(files.iter().any(|f| f.file_type == "output"));
1130    }
1131}