Skip to main content

helios_persistence/core/
bulk_export.rs

1//! Bulk export types and traits.
2//!
3//! This module provides types and traits for implementing FHIR Bulk Data Export
4//! as specified in the [FHIR Bulk Data Access IG](https://hl7.org/fhir/uv/bulkdata/export.html).
5//!
6//! # Export Levels
7//!
8//! The Bulk Data Export specification supports three levels of export:
9//!
10//! - **System-level** (`[base]/$export`) - Exports all resources in the system
11//! - **Patient-level** (`[base]/Patient/$export`) - Exports all patient compartment resources
12//! - **Group-level** (`[base]/Group/[id]/$export`) - Exports resources for patients in a group
13//!
14//! # Example
15//!
16//! ```ignore
17//! use helios_persistence::core::bulk_export::{
18//!     BulkExportStorage, ExportRequest, ExportLevel, ExportStatus,
19//! };
20//!
21//! async fn export_patients<S: BulkExportStorage>(storage: &S, tenant: &TenantContext) {
22//!     // Start a system-level export of Patient resources
23//!     let request = ExportRequest::new(ExportLevel::System)
24//!         .with_types(vec!["Patient".to_string()]);
25//!
26//!     let job_id = storage.start_export(tenant, request).await.unwrap();
27//!
28//!     // Poll for completion
29//!     loop {
30//!         let progress = storage.get_export_status(tenant, &job_id).await.unwrap();
31//!         match progress.status {
32//!             ExportStatus::Complete => break,
33//!             ExportStatus::Error => panic!("Export failed"),
34//!             _ => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
35//!         }
36//!     }
37//!
38//!     // Get the manifest
39//!     let manifest = storage.get_export_manifest(tenant, &job_id).await.unwrap();
40//!     for file in manifest.output {
41//!         println!("Exported {} {} resources to {}", file.count, file.resource_type, file.url);
42//!     }
43//! }
44//! ```
45
46use async_trait::async_trait;
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use serde_json::Value;
50use uuid::Uuid;
51
52/// Audit event helpers for bulk export operations.
53#[cfg(feature = "audit")]
54pub mod audit {
55    use helios_audit::{AuditAction, AuditEventBuilder, AuditSink};
56
57    use super::ExportLevel;
58
59    /// Record an audit event for a bulk export lifecycle event.
60    ///
61    /// Call this at export start, completion, cancellation, or failure.
62    #[allow(clippy::too_many_arguments)]
63    pub async fn record_export_event(
64        sink: &dyn AuditSink,
65        source_observer: &str,
66        agent: Option<&str>,
67        job_id: &str,
68        level: &ExportLevel,
69        resource_types: &[String],
70        outcome: &str,
71        outcome_desc: Option<&str>,
72    ) {
73        let mut builder = AuditEventBuilder::new(source_observer)
74            .event_type(
75                "http://terminology.hl7.org/CodeSystem/audit-event-type",
76                "object",
77            )
78            .action(AuditAction::Execute)
79            .outcome(outcome)
80            .detail("audit-operation", "bulk-export")
81            .detail("job-id", job_id)
82            .detail("export-level", level.to_string());
83        if !resource_types.is_empty() {
84            builder = builder.detail("resource-types", resource_types.join(","));
85        }
86        if let Some(a) = agent {
87            builder = builder.agent(a, None, true);
88        }
89        if let Some(d) = outcome_desc {
90            builder = builder.outcome_desc(d);
91        }
92        sink.record(builder.build()).await;
93    }
94
95    #[cfg(test)]
96    mod tests {
97        use helios_audit::sinks::NullSink;
98
99        use super::*;
100        use crate::core::bulk_export::ExportLevel;
101
102        #[tokio::test]
103        async fn test_export_event_has_job_id() {
104            let sink = NullSink;
105            record_export_event(
106                &sink,
107                "Device/hfs",
108                Some("Practitioner/dr-1"),
109                "job-abc",
110                &ExportLevel::System,
111                &["Patient".to_string()],
112                "0",
113                None,
114            )
115            .await;
116            // NullSink discards; we verify it doesn't panic and compiles correctly
117        }
118
119        #[test]
120        fn test_export_event_type_is_object_for_bulk_export() {
121            let event = AuditEventBuilder::new("Device/hfs")
122                .event_type(
123                    "http://terminology.hl7.org/CodeSystem/audit-event-type",
124                    "object",
125                )
126                .action(AuditAction::Execute)
127                .outcome("0")
128                .detail("audit-operation", "bulk-export")
129                .detail("job-id", "j1")
130                .build();
131            assert_eq!(
132                event.r#type.code.as_ref().and_then(|c| c.value.as_deref()),
133                Some("object")
134            );
135        }
136    }
137}
138
139use crate::error::StorageResult;
140use crate::tenant::TenantContext;
141
142/// Unique identifier for an export job.
143#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
144pub struct ExportJobId(String);
145
146impl ExportJobId {
147    /// Creates a new random export job ID.
148    pub fn new() -> Self {
149        Self(Uuid::new_v4().to_string())
150    }
151
152    /// Creates an export job ID from an existing string.
153    pub fn from_string(id: impl Into<String>) -> Self {
154        Self(id.into())
155    }
156
157    /// Returns the ID as a string reference.
158    pub fn as_str(&self) -> &str {
159        &self.0
160    }
161}
162
163impl Default for ExportJobId {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169impl std::fmt::Display for ExportJobId {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        write!(f, "{}", self.0)
172    }
173}
174
175impl From<String> for ExportJobId {
176    fn from(s: String) -> Self {
177        Self(s)
178    }
179}
180
181impl From<&str> for ExportJobId {
182    fn from(s: &str) -> Self {
183        Self(s.to_string())
184    }
185}
186
187/// Status of an export job.
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
189#[serde(rename_all = "lowercase")]
190pub enum ExportStatus {
191    /// Job has been accepted but not yet started processing.
192    Accepted,
193    /// Job is currently processing.
194    InProgress,
195    /// Job has completed successfully.
196    Complete,
197    /// Job failed with an error.
198    Error,
199    /// Job was cancelled by the user.
200    Cancelled,
201}
202
203impl ExportStatus {
204    /// Returns true if the job is in a terminal state (complete, error, or cancelled).
205    pub fn is_terminal(&self) -> bool {
206        matches!(self, Self::Complete | Self::Error | Self::Cancelled)
207    }
208
209    /// Returns true if the job is still active (accepted or in progress).
210    pub fn is_active(&self) -> bool {
211        matches!(self, Self::Accepted | Self::InProgress)
212    }
213}
214
215impl std::fmt::Display for ExportStatus {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        match self {
218            Self::Accepted => write!(f, "accepted"),
219            Self::InProgress => write!(f, "in-progress"),
220            Self::Complete => write!(f, "complete"),
221            Self::Error => write!(f, "error"),
222            Self::Cancelled => write!(f, "cancelled"),
223        }
224    }
225}
226
227impl std::str::FromStr for ExportStatus {
228    type Err = String;
229
230    fn from_str(s: &str) -> Result<Self, Self::Err> {
231        match s {
232            "accepted" => Ok(Self::Accepted),
233            "in-progress" | "in_progress" => Ok(Self::InProgress),
234            "complete" => Ok(Self::Complete),
235            "error" => Ok(Self::Error),
236            "cancelled" => Ok(Self::Cancelled),
237            _ => Err(format!("unknown export status: {}", s)),
238        }
239    }
240}
241
242/// Level at which the export is being performed.
243#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "lowercase")]
245pub enum ExportLevel {
246    /// System-level export (`[base]/$export`).
247    System,
248    /// Patient-level export (`[base]/Patient/$export`).
249    Patient,
250    /// Group-level export (`[base]/Group/[id]/$export`).
251    Group {
252        /// The group ID to export.
253        group_id: String,
254    },
255}
256
257impl ExportLevel {
258    /// Creates a system-level export.
259    pub fn system() -> Self {
260        Self::System
261    }
262
263    /// Creates a patient-level export.
264    pub fn patient() -> Self {
265        Self::Patient
266    }
267
268    /// Creates a group-level export for the given group ID.
269    pub fn group(group_id: impl Into<String>) -> Self {
270        Self::Group {
271            group_id: group_id.into(),
272        }
273    }
274}
275
276impl std::fmt::Display for ExportLevel {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        match self {
279            Self::System => write!(f, "system"),
280            Self::Patient => write!(f, "patient"),
281            Self::Group { group_id } => write!(f, "group/{}", group_id),
282        }
283    }
284}
285
286/// A type filter for the export request.
287///
288/// Type filters allow specifying FHIR search parameters that should be applied
289/// when exporting a specific resource type.
290#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
291pub struct TypeFilter {
292    /// The resource type this filter applies to.
293    pub resource_type: String,
294    /// The search query parameters.
295    pub query: String,
296}
297
298impl TypeFilter {
299    /// Creates a new type filter.
300    pub fn new(resource_type: impl Into<String>, query: impl Into<String>) -> Self {
301        Self {
302            resource_type: resource_type.into(),
303            query: query.into(),
304        }
305    }
306}
307
308/// Request parameters for starting an export job.
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct ExportRequest {
311    /// The level at which to perform the export.
312    pub level: ExportLevel,
313
314    /// Resource types to export. If empty, all applicable types are exported.
315    #[serde(default)]
316    pub resource_types: Vec<String>,
317
318    /// Only include resources modified at or after this time (`_since`).
319    #[serde(skip_serializing_if = "Option::is_none")]
320    pub since: Option<DateTime<Utc>>,
321
322    /// Only include resources modified at or before this time (`_until`).
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub until: Option<DateTime<Utc>>,
325
326    /// Type-specific filters to apply during export.
327    #[serde(default)]
328    pub type_filters: Vec<TypeFilter>,
329
330    /// Element paths to include (`_elements`). When non-empty, exported
331    /// resources are subset to these paths plus mandatory elements and tagged
332    /// `SUBSETTED`.
333    #[serde(default)]
334    pub elements: Vec<String>,
335
336    /// `includeAssociatedData` hint values. Parsed but currently a no-op
337    /// (rejected under `Prefer: handling=strict`, ignored otherwise).
338    #[serde(default)]
339    pub include_associated_data: Vec<String>,
340
341    /// Patient references restricting the export (POST `patient` parameter).
342    /// Only valid for patient- and group-level exports.
343    #[serde(default)]
344    pub patient_refs: Vec<String>,
345
346    /// Batch size for processing (implementation-specific).
347    #[serde(default = "default_batch_size")]
348    pub batch_size: u32,
349
350    /// Output format (default: "application/fhir+ndjson").
351    #[serde(default = "default_output_format")]
352    pub output_format: String,
353}
354
355fn default_batch_size() -> u32 {
356    1000
357}
358
359fn default_output_format() -> String {
360    "application/fhir+ndjson".to_string()
361}
362
363impl ExportRequest {
364    /// Creates a new export request with the given level.
365    pub fn new(level: ExportLevel) -> Self {
366        Self {
367            level,
368            resource_types: Vec::new(),
369            since: None,
370            until: None,
371            type_filters: Vec::new(),
372            elements: Vec::new(),
373            include_associated_data: Vec::new(),
374            patient_refs: Vec::new(),
375            batch_size: default_batch_size(),
376            output_format: default_output_format(),
377        }
378    }
379
380    /// Creates a system-level export request.
381    pub fn system() -> Self {
382        Self::new(ExportLevel::System)
383    }
384
385    /// Creates a patient-level export request.
386    pub fn patient() -> Self {
387        Self::new(ExportLevel::Patient)
388    }
389
390    /// Creates a group-level export request.
391    pub fn group(group_id: impl Into<String>) -> Self {
392        Self::new(ExportLevel::Group {
393            group_id: group_id.into(),
394        })
395    }
396
397    /// Sets the resource types to export.
398    pub fn with_types(mut self, types: Vec<String>) -> Self {
399        self.resource_types = types;
400        self
401    }
402
403    /// Sets the since filter.
404    pub fn with_since(mut self, since: DateTime<Utc>) -> Self {
405        self.since = Some(since);
406        self
407    }
408
409    /// Sets the until filter.
410    pub fn with_until(mut self, until: DateTime<Utc>) -> Self {
411        self.until = Some(until);
412        self
413    }
414
415    /// Sets the `_elements` element paths.
416    pub fn with_elements(mut self, elements: Vec<String>) -> Self {
417        self.elements = elements;
418        self
419    }
420
421    /// Sets the patient references (POST `patient` filter).
422    pub fn with_patient_refs(mut self, patient_refs: Vec<String>) -> Self {
423        self.patient_refs = patient_refs;
424        self
425    }
426
427    /// Adds a type filter.
428    pub fn with_type_filter(mut self, filter: TypeFilter) -> Self {
429        self.type_filters.push(filter);
430        self
431    }
432
433    /// Adds multiple type filters.
434    pub fn with_type_filters(mut self, filters: Vec<TypeFilter>) -> Self {
435        self.type_filters.extend(filters);
436        self
437    }
438
439    /// Sets the batch size.
440    pub fn with_batch_size(mut self, batch_size: u32) -> Self {
441        self.batch_size = batch_size;
442        self
443    }
444
445    /// Returns the group ID if this is a group-level export.
446    pub fn group_id(&self) -> Option<&str> {
447        match &self.level {
448            ExportLevel::Group { group_id } => Some(group_id),
449            _ => None,
450        }
451    }
452}
453
454/// Progress information for a single resource type in an export.
455#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct TypeExportProgress {
457    /// The resource type.
458    pub resource_type: String,
459    /// Total number of resources to export (may be estimated).
460    pub total_count: Option<u64>,
461    /// Number of resources exported so far.
462    pub exported_count: u64,
463    /// Number of errors encountered.
464    pub error_count: u64,
465    /// Current cursor state for resuming (opaque to clients).
466    #[serde(skip_serializing_if = "Option::is_none")]
467    pub cursor_state: Option<String>,
468}
469
470impl TypeExportProgress {
471    /// Creates new progress tracking for a resource type.
472    pub fn new(resource_type: impl Into<String>) -> Self {
473        Self {
474            resource_type: resource_type.into(),
475            total_count: None,
476            exported_count: 0,
477            error_count: 0,
478            cursor_state: None,
479        }
480    }
481
482    /// Sets the total count.
483    pub fn with_total(mut self, total: u64) -> Self {
484        self.total_count = Some(total);
485        self
486    }
487
488    /// Returns the progress as a percentage (0.0 to 1.0).
489    pub fn progress_fraction(&self) -> Option<f64> {
490        self.total_count.map(|total| {
491            if total == 0 {
492                1.0
493            } else {
494                self.exported_count as f64 / total as f64
495            }
496        })
497    }
498}
499
500/// Overall progress of an export job.
501#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct ExportProgress {
503    /// The job ID.
504    pub job_id: ExportJobId,
505    /// Current status of the job.
506    pub status: ExportStatus,
507    /// The export level.
508    pub level: ExportLevel,
509    /// Time the export was initiated.
510    pub transaction_time: DateTime<Utc>,
511    /// Time the export started processing.
512    #[serde(skip_serializing_if = "Option::is_none")]
513    pub started_at: Option<DateTime<Utc>>,
514    /// Time the export completed (success, error, or cancelled).
515    #[serde(skip_serializing_if = "Option::is_none")]
516    pub completed_at: Option<DateTime<Utc>>,
517    /// Per-type progress information.
518    pub type_progress: Vec<TypeExportProgress>,
519    /// Current type being processed.
520    #[serde(skip_serializing_if = "Option::is_none")]
521    pub current_type: Option<String>,
522    /// Error message if status is Error.
523    #[serde(skip_serializing_if = "Option::is_none")]
524    pub error_message: Option<String>,
525}
526
527impl ExportProgress {
528    /// Creates new progress for an accepted job.
529    pub fn accepted(
530        job_id: ExportJobId,
531        level: ExportLevel,
532        transaction_time: DateTime<Utc>,
533    ) -> Self {
534        Self {
535            job_id,
536            status: ExportStatus::Accepted,
537            level,
538            transaction_time,
539            started_at: None,
540            completed_at: None,
541            type_progress: Vec::new(),
542            current_type: None,
543            error_message: None,
544        }
545    }
546
547    /// Returns the overall progress as a percentage (0.0 to 1.0).
548    pub fn overall_progress(&self) -> f64 {
549        if self.type_progress.is_empty() {
550            return 0.0;
551        }
552
553        let (total_exported, total_count) = self
554            .type_progress
555            .iter()
556            .fold((0u64, 0u64), |(exp, tot), tp| {
557                (exp + tp.exported_count, tot + tp.total_count.unwrap_or(0))
558            });
559
560        if total_count == 0 {
561            0.0
562        } else {
563            total_exported as f64 / total_count as f64
564        }
565    }
566}
567
568/// An output file in the export manifest.
569#[derive(Debug, Clone, Serialize, Deserialize)]
570pub struct ExportOutputFile {
571    /// The resource type contained in this file.
572    #[serde(rename = "type")]
573    pub resource_type: String,
574    /// URL to access the file.
575    pub url: String,
576    /// Number of resources in the file.
577    #[serde(skip_serializing_if = "Option::is_none")]
578    pub count: Option<u64>,
579}
580
581impl ExportOutputFile {
582    /// Creates a new output file descriptor.
583    pub fn new(resource_type: impl Into<String>, url: impl Into<String>) -> Self {
584        Self {
585            resource_type: resource_type.into(),
586            url: url.into(),
587            count: None,
588        }
589    }
590
591    /// Sets the count.
592    pub fn with_count(mut self, count: u64) -> Self {
593        self.count = Some(count);
594        self
595    }
596}
597
598/// The export manifest returned when an export completes.
599///
600/// This follows the FHIR Bulk Data Export manifest format.
601#[derive(Debug, Clone, Serialize, Deserialize)]
602pub struct ExportManifest {
603    /// Time the export was initiated.
604    #[serde(rename = "transactionTime")]
605    pub transaction_time: DateTime<Utc>,
606    /// The original export request URL.
607    pub request: String,
608    /// Whether the client should check for deleted resources.
609    #[serde(rename = "requiresAccessToken")]
610    pub requires_access_token: bool,
611    /// Output files containing the exported resources.
612    pub output: Vec<ExportOutputFile>,
613    /// Output files containing OperationOutcome resources for errors.
614    #[serde(default)]
615    pub error: Vec<ExportOutputFile>,
616    /// Files containing deleted resource references (always empty for now).
617    #[serde(default)]
618    pub deleted: Vec<ExportOutputFile>,
619    /// Pagination links for partial manifests (always empty — `allowPartialManifests` unsupported).
620    #[serde(default)]
621    pub link: Vec<String>,
622    /// Informational messages.
623    #[serde(default, skip_serializing_if = "Option::is_none")]
624    pub message: Option<String>,
625    /// Extension data.
626    #[serde(default, skip_serializing_if = "Option::is_none")]
627    pub extension: Option<Value>,
628}
629
630impl ExportManifest {
631    /// Creates a new export manifest.
632    pub fn new(transaction_time: DateTime<Utc>, request: impl Into<String>) -> Self {
633        Self {
634            transaction_time,
635            request: request.into(),
636            requires_access_token: true,
637            output: Vec::new(),
638            error: Vec::new(),
639            deleted: Vec::new(),
640            link: Vec::new(),
641            message: None,
642            extension: None,
643        }
644    }
645
646    /// Adds an output file.
647    pub fn with_output(mut self, file: ExportOutputFile) -> Self {
648        self.output.push(file);
649        self
650    }
651
652    /// Adds an error file.
653    pub fn with_error(mut self, file: ExportOutputFile) -> Self {
654        self.error.push(file);
655        self
656    }
657
658    /// Sets a message.
659    pub fn with_message(mut self, message: impl Into<String>) -> Self {
660        self.message = Some(message.into());
661        self
662    }
663}
664
665/// A batch of NDJSON resources for streaming export.
666#[derive(Debug, Clone)]
667pub struct NdjsonBatch {
668    /// The serialized NDJSON lines (one JSON object per line).
669    pub lines: Vec<String>,
670    /// Cursor for fetching the next batch, if any.
671    pub next_cursor: Option<String>,
672    /// Whether this is the last batch.
673    pub is_last: bool,
674}
675
676impl NdjsonBatch {
677    /// Creates a new batch.
678    pub fn new(lines: Vec<String>) -> Self {
679        Self {
680            lines,
681            next_cursor: None,
682            is_last: false,
683        }
684    }
685
686    /// Creates an empty final batch.
687    pub fn empty() -> Self {
688        Self {
689            lines: Vec::new(),
690            next_cursor: None,
691            is_last: true,
692        }
693    }
694
695    /// Sets the next cursor.
696    pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
697        self.next_cursor = Some(cursor.into());
698        self
699    }
700
701    /// Marks this as the last batch.
702    pub fn as_last(mut self) -> Self {
703        self.is_last = true;
704        self.next_cursor = None;
705        self
706    }
707
708    /// Returns the number of resources in this batch.
709    pub fn len(&self) -> usize {
710        self.lines.len()
711    }
712
713    /// Returns true if this batch is empty.
714    pub fn is_empty(&self) -> bool {
715        self.lines.is_empty()
716    }
717}
718
719/// Kickoff metadata for starting an export job.
720///
721/// Bundles the [`ExportRequest`] (what to export) with the server-frozen
722/// metadata captured once at kickoff time: `transaction_time`, the original
723/// request URL, the owning principal's subject, and the FHIR version. These
724/// are the single source of truth — the worker only ever reads them back.
725#[derive(Debug, Clone)]
726pub struct StartExportInput {
727    /// What to export.
728    pub request: ExportRequest,
729    /// Server wall-clock frozen at kickoff (the manifest `transactionTime`).
730    pub transaction_time: DateTime<Utc>,
731    /// The full kickoff request URL (echoed in the manifest `request` field).
732    pub request_url: String,
733    /// The subject of the authenticated principal that kicked off the export.
734    pub owner_subject: Option<String>,
735    /// The FHIR version the export runs against.
736    pub fhir_version: helios_fhir::FhirVersion,
737}
738
739/// A single entry in a [`RawExportManifest`] — carries a storage key, never a URL.
740#[derive(Debug, Clone)]
741pub struct RawManifestEntry {
742    /// The resource type contained in this part.
743    pub resource_type: String,
744    /// The output-store key for this part (URL minting happens in the REST layer).
745    pub key: crate::core::bulk_export_output::ExportPartKey,
746    /// Number of resources in the part.
747    pub count: u64,
748}
749
750/// The storage-side view of a completed export's manifest.
751///
752/// Carries keys rather than URLs — the REST layer mints download URLs via the
753/// [`ExportOutputStore`](crate::core::bulk_export_output::ExportOutputStore)
754/// and assembles the wire-format [`ExportManifest`].
755#[derive(Debug, Clone)]
756pub struct RawExportManifest {
757    /// Server wall-clock frozen at kickoff.
758    pub transaction_time: DateTime<Utc>,
759    /// The original kickoff request URL.
760    pub request_url: String,
761    /// Current job status.
762    pub status: ExportStatus,
763    /// Error message if the job failed.
764    pub error_message: Option<String>,
765    /// Time the job completed.
766    pub completed_at: Option<DateTime<Utc>>,
767    /// Output parts (`file_type = "output"`).
768    pub output: Vec<RawManifestEntry>,
769    /// Error parts (`file_type = "error"`).
770    pub errors: Vec<RawManifestEntry>,
771}
772
773/// Lightweight job metadata for authorization checks.
774///
775/// Returned by `get_export_job_metadata` — a single cheap row lookup the REST
776/// status/cancel handlers call *before* any heavier status/manifest query.
777#[derive(Debug, Clone)]
778pub struct ExportJobMetadata {
779    /// The job ID.
780    pub job_id: ExportJobId,
781    /// Current status.
782    pub status: ExportStatus,
783    /// The export level.
784    pub level: ExportLevel,
785    /// Subject of the principal that owns the job.
786    pub owner_subject: Option<String>,
787    /// Server wall-clock frozen at kickoff.
788    pub transaction_time: DateTime<Utc>,
789    /// Time the job completed.
790    pub completed_at: Option<DateTime<Utc>>,
791    /// The original kickoff request URL.
792    pub request_url: String,
793}
794
795/// Metadata for a single output/error file, for the download handler.
796#[derive(Debug, Clone)]
797pub struct ExportFileMetadata {
798    /// The output-store key for this part.
799    pub key: crate::core::bulk_export_output::ExportPartKey,
800    /// The resource type contained in the file.
801    pub resource_type: String,
802    /// `"output"` or `"error"`.
803    pub file_type: String,
804    /// Number of resources (lines) in the file.
805    pub line_count: u64,
806    /// Subject of the principal that owns the job.
807    pub job_owner_subject: Option<String>,
808}
809
810/// A reference to an expired export job, for the cleanup task.
811#[derive(Debug, Clone)]
812pub struct ExpiredExportRef {
813    /// The tenant the job belongs to.
814    pub tenant: TenantContext,
815    /// The expired job ID.
816    pub job_id: ExportJobId,
817}
818
819// ============================================================================
820// Traits
821// ============================================================================
822
823/// Storage trait for bulk export job management.
824///
825/// This trait handles the lifecycle of export jobs: creating, tracking,
826/// completing, and cleaning up exports.
827#[async_trait]
828pub trait BulkExportStorage: Send + Sync {
829    /// Starts a new export job.
830    ///
831    /// # Arguments
832    ///
833    /// * `tenant` - The tenant context
834    /// * `input` - The kickoff metadata (request + frozen `transaction_time`,
835    ///   `request_url`, `owner_subject`, `fhir_version`)
836    ///
837    /// # Returns
838    ///
839    /// The job ID for tracking the export.
840    ///
841    /// # Errors
842    ///
843    /// * `BulkExportError::TooManyConcurrentExports` - If too many exports are running
844    /// * `BulkExportError::InvalidRequest` - If the request is invalid
845    async fn start_export(
846        &self,
847        tenant: &TenantContext,
848        input: StartExportInput,
849    ) -> StorageResult<ExportJobId>;
850
851    /// Gets the current status of an export job.
852    ///
853    /// # Arguments
854    ///
855    /// * `tenant` - The tenant context
856    /// * `job_id` - The export job ID
857    ///
858    /// # Returns
859    ///
860    /// The current progress of the export.
861    ///
862    /// # Errors
863    ///
864    /// * `BulkExportError::JobNotFound` - If the job doesn't exist
865    async fn get_export_status(
866        &self,
867        tenant: &TenantContext,
868        job_id: &ExportJobId,
869    ) -> StorageResult<ExportProgress>;
870
871    /// Cancels an in-progress export job.
872    ///
873    /// # Arguments
874    ///
875    /// * `tenant` - The tenant context
876    /// * `job_id` - The export job ID
877    ///
878    /// # Errors
879    ///
880    /// * `BulkExportError::JobNotFound` - If the job doesn't exist
881    /// * `BulkExportError::InvalidJobState` - If the job is already complete
882    async fn cancel_export(
883        &self,
884        tenant: &TenantContext,
885        job_id: &ExportJobId,
886    ) -> StorageResult<()>;
887
888    /// Deletes an export job and its output files.
889    ///
890    /// # Arguments
891    ///
892    /// * `tenant` - The tenant context
893    /// * `job_id` - The export job ID
894    ///
895    /// # Errors
896    ///
897    /// * `BulkExportError::JobNotFound` - If the job doesn't exist
898    async fn delete_export(
899        &self,
900        tenant: &TenantContext,
901        job_id: &ExportJobId,
902    ) -> StorageResult<()>;
903
904    /// Gets the storage-side manifest for a completed export.
905    ///
906    /// Returns a [`RawExportManifest`] carrying output-store *keys* — the REST
907    /// layer mints download URLs and assembles the wire-format
908    /// [`ExportManifest`].
909    ///
910    /// # Errors
911    ///
912    /// * `BulkExportError::JobNotFound` - If the job doesn't exist
913    async fn get_export_manifest(
914        &self,
915        tenant: &TenantContext,
916        job_id: &ExportJobId,
917    ) -> StorageResult<RawExportManifest>;
918
919    /// Lists export jobs for a tenant.
920    ///
921    /// # Arguments
922    ///
923    /// * `tenant` - The tenant context
924    /// * `include_completed` - Whether to include completed jobs
925    ///
926    /// # Returns
927    ///
928    /// List of export progress records.
929    async fn list_exports(
930        &self,
931        tenant: &TenantContext,
932        include_completed: bool,
933    ) -> StorageResult<Vec<ExportProgress>>;
934
935    /// Returns lightweight job metadata for an authorization check.
936    ///
937    /// Called by the REST status/cancel handlers *before* any heavier query.
938    ///
939    /// # Errors
940    ///
941    /// * `BulkExportError::JobNotFound` - If the job doesn't exist
942    async fn get_export_job_metadata(
943        &self,
944        tenant: &TenantContext,
945        job_id: &ExportJobId,
946    ) -> StorageResult<ExportJobMetadata>;
947
948    /// Returns file metadata for a single output/error part, for the download
949    /// handler. `part` is the `{resource_type}-{part_index}` route segment.
950    ///
951    /// # Errors
952    ///
953    /// * `BulkExportError::JobNotFound` - If the job or part doesn't exist
954    async fn get_export_file_metadata(
955        &self,
956        tenant: &TenantContext,
957        job_id: &ExportJobId,
958        part: &str,
959    ) -> StorageResult<ExportFileMetadata>;
960
961    /// Counts active (`accepted` or `in_progress`) jobs for a tenant — used to
962    /// enforce the per-tenant concurrency cap at kickoff.
963    async fn count_active_exports(&self, tenant: &TenantContext) -> StorageResult<u64>;
964
965    /// Lists expired completed jobs across *all* tenants, for the cleanup task.
966    ///
967    /// This is intentionally cross-tenant — the cleanup task is a server-wide
968    /// background job, so this is the one method that does not take a tenant.
969    async fn list_expired_exports(
970        &self,
971        now: DateTime<Utc>,
972        output_ttl: std::time::Duration,
973        limit: u32,
974    ) -> StorageResult<Vec<ExpiredExportRef>>;
975}
976
977/// Data provider for export operations.
978///
979/// This trait provides the data retrieval capabilities needed to perform
980/// system-level exports.
981#[async_trait]
982pub trait ExportDataProvider: Send + Sync {
983    /// Lists resource types available for export.
984    ///
985    /// # Arguments
986    ///
987    /// * `tenant` - The tenant context
988    /// * `request` - The export request (used to filter by requested types)
989    ///
990    /// # Returns
991    ///
992    /// List of resource type names that should be exported.
993    async fn list_export_types(
994        &self,
995        tenant: &TenantContext,
996        request: &ExportRequest,
997    ) -> StorageResult<Vec<String>>;
998
999    /// Counts resources of a type for export.
1000    ///
1001    /// # Arguments
1002    ///
1003    /// * `tenant` - The tenant context
1004    /// * `request` - The export request (for filters)
1005    /// * `resource_type` - The resource type to count
1006    ///
1007    /// # Returns
1008    ///
1009    /// The count of resources matching the export criteria.
1010    async fn count_export_resources(
1011        &self,
1012        tenant: &TenantContext,
1013        request: &ExportRequest,
1014        resource_type: &str,
1015    ) -> StorageResult<u64>;
1016
1017    /// Fetches a batch of resources for export.
1018    ///
1019    /// # Arguments
1020    ///
1021    /// * `tenant` - The tenant context
1022    /// * `request` - The export request (for filters)
1023    /// * `resource_type` - The resource type to fetch
1024    /// * `cursor` - Cursor from previous batch, or None for first batch
1025    /// * `batch_size` - Maximum number of resources to return
1026    ///
1027    /// # Returns
1028    ///
1029    /// A batch of NDJSON lines with cursor for next batch.
1030    async fn fetch_export_batch(
1031        &self,
1032        tenant: &TenantContext,
1033        request: &ExportRequest,
1034        resource_type: &str,
1035        cursor: Option<&str>,
1036        batch_size: u32,
1037    ) -> StorageResult<NdjsonBatch>;
1038}
1039
1040/// Provider for patient compartment exports.
1041///
1042/// This trait extends `ExportDataProvider` with patient-specific capabilities
1043/// needed for Patient-level exports.
1044#[async_trait]
1045pub trait PatientExportProvider: ExportDataProvider {
1046    /// Lists patient IDs to include in the export.
1047    ///
1048    /// # Arguments
1049    ///
1050    /// * `tenant` - The tenant context
1051    /// * `request` - The export request
1052    /// * `cursor` - Cursor from previous call, or None for first call
1053    /// * `batch_size` - Maximum number of patient IDs to return
1054    ///
1055    /// # Returns
1056    ///
1057    /// A tuple of (patient_ids, next_cursor).
1058    async fn list_patient_ids(
1059        &self,
1060        tenant: &TenantContext,
1061        request: &ExportRequest,
1062        cursor: Option<&str>,
1063        batch_size: u32,
1064    ) -> StorageResult<(Vec<String>, Option<String>)>;
1065
1066    /// Fetches a batch of resources from the patient compartment.
1067    ///
1068    /// # Arguments
1069    ///
1070    /// * `tenant` - The tenant context
1071    /// * `request` - The export request
1072    /// * `resource_type` - The resource type to fetch
1073    /// * `patient_ids` - Patient IDs whose resources to fetch
1074    /// * `cursor` - Cursor from previous batch, or None for first batch
1075    /// * `batch_size` - Maximum number of resources to return
1076    ///
1077    /// # Returns
1078    ///
1079    /// A batch of NDJSON lines with cursor for next batch.
1080    async fn fetch_patient_compartment_batch(
1081        &self,
1082        tenant: &TenantContext,
1083        request: &ExportRequest,
1084        resource_type: &str,
1085        patient_ids: &[String],
1086        cursor: Option<&str>,
1087        batch_size: u32,
1088    ) -> StorageResult<NdjsonBatch>;
1089}
1090
1091/// Provider for group-level exports.
1092///
1093/// This trait extends `PatientExportProvider` with group-specific capabilities
1094/// needed for Group-level exports.
1095#[async_trait]
1096pub trait GroupExportProvider: PatientExportProvider {
1097    /// Gets the members of a group.
1098    ///
1099    /// # Arguments
1100    ///
1101    /// * `tenant` - The tenant context
1102    /// * `group_id` - The group resource ID
1103    ///
1104    /// # Returns
1105    ///
1106    /// List of member references (e.g., "Patient/123").
1107    ///
1108    /// # Errors
1109    ///
1110    /// * `BulkExportError::GroupNotFound` - If the group doesn't exist
1111    async fn get_group_members(
1112        &self,
1113        tenant: &TenantContext,
1114        group_id: &str,
1115    ) -> StorageResult<Vec<String>>;
1116
1117    /// Resolves group members to patient IDs.
1118    ///
1119    /// This handles the case where group members may be references to other
1120    /// resources (like Practitioner) or nested Groups.
1121    ///
1122    /// # Arguments
1123    ///
1124    /// * `tenant` - The tenant context
1125    /// * `group_id` - The group resource ID
1126    ///
1127    /// # Returns
1128    ///
1129    /// List of patient IDs for the group members.
1130    async fn resolve_group_patient_ids(
1131        &self,
1132        tenant: &TenantContext,
1133        group_id: &str,
1134    ) -> StorageResult<Vec<String>>;
1135
1136    /// Returns each member's reference together with its `Group.member.period.start`.
1137    ///
1138    /// The default implementation falls back to [`get_group_members`] and
1139    /// returns `None` for every period start (loses the membership-history
1140    /// signal the `_since`-newly-added filter relies on). Backends that can
1141    /// inspect the raw Group resource override this to return real period
1142    /// starts.
1143    async fn get_group_members_with_periods(
1144        &self,
1145        tenant: &TenantContext,
1146        group_id: &str,
1147    ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
1148        let members = self.get_group_members(tenant, group_id).await?;
1149        Ok(members.into_iter().map(|m| (m, None)).collect())
1150    }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156
1157    #[test]
1158    fn test_export_job_id() {
1159        let id = ExportJobId::new();
1160        assert!(!id.as_str().is_empty());
1161
1162        let id2 = ExportJobId::from_string("test-123");
1163        assert_eq!(id2.as_str(), "test-123");
1164        assert_eq!(id2.to_string(), "test-123");
1165    }
1166
1167    #[test]
1168    fn test_export_status() {
1169        assert!(ExportStatus::Complete.is_terminal());
1170        assert!(ExportStatus::Error.is_terminal());
1171        assert!(ExportStatus::Cancelled.is_terminal());
1172        assert!(!ExportStatus::Accepted.is_terminal());
1173        assert!(!ExportStatus::InProgress.is_terminal());
1174
1175        assert!(ExportStatus::Accepted.is_active());
1176        assert!(ExportStatus::InProgress.is_active());
1177        assert!(!ExportStatus::Complete.is_active());
1178    }
1179
1180    #[test]
1181    fn test_export_status_display_parse() {
1182        let status = ExportStatus::InProgress;
1183        assert_eq!(status.to_string(), "in-progress");
1184
1185        let parsed: ExportStatus = "in-progress".parse().unwrap();
1186        assert_eq!(parsed, ExportStatus::InProgress);
1187
1188        // Also accept underscore variant
1189        let parsed: ExportStatus = "in_progress".parse().unwrap();
1190        assert_eq!(parsed, ExportStatus::InProgress);
1191    }
1192
1193    #[test]
1194    fn test_export_level() {
1195        let system = ExportLevel::system();
1196        assert!(matches!(system, ExportLevel::System));
1197
1198        let patient = ExportLevel::patient();
1199        assert!(matches!(patient, ExportLevel::Patient));
1200
1201        let group = ExportLevel::group("grp-123");
1202        assert!(matches!(group, ExportLevel::Group { group_id } if group_id == "grp-123"));
1203    }
1204
1205    #[test]
1206    fn test_export_request_builder() {
1207        let request = ExportRequest::system()
1208            .with_types(vec!["Patient".to_string(), "Observation".to_string()])
1209            .with_batch_size(500)
1210            .with_type_filter(TypeFilter::new("Observation", "code=1234"));
1211
1212        assert!(matches!(request.level, ExportLevel::System));
1213        assert_eq!(request.resource_types, vec!["Patient", "Observation"]);
1214        assert_eq!(request.batch_size, 500);
1215        assert_eq!(request.type_filters.len(), 1);
1216    }
1217
1218    #[test]
1219    fn test_export_request_group_id() {
1220        let request = ExportRequest::group("grp-123");
1221        assert_eq!(request.group_id(), Some("grp-123"));
1222
1223        let system_request = ExportRequest::system();
1224        assert_eq!(system_request.group_id(), None);
1225    }
1226
1227    #[test]
1228    fn test_type_export_progress() {
1229        let progress = TypeExportProgress::new("Patient").with_total(100);
1230        assert_eq!(progress.total_count, Some(100));
1231        assert_eq!(progress.progress_fraction(), Some(0.0));
1232
1233        let mut progress = progress;
1234        progress.exported_count = 50;
1235        assert_eq!(progress.progress_fraction(), Some(0.5));
1236    }
1237
1238    #[test]
1239    fn test_export_manifest() {
1240        let manifest = ExportManifest::new(Utc::now(), "https://example.com/$export")
1241            .with_output(
1242                ExportOutputFile::new("Patient", "/exports/Patient.ndjson").with_count(100),
1243            )
1244            .with_message("Export complete");
1245
1246        assert_eq!(manifest.output.len(), 1);
1247        assert_eq!(manifest.output[0].resource_type, "Patient");
1248        assert_eq!(manifest.output[0].count, Some(100));
1249        assert!(manifest.message.is_some());
1250    }
1251
1252    #[test]
1253    fn test_ndjson_batch() {
1254        let batch = NdjsonBatch::new(vec![
1255            r#"{"resourceType":"Patient","id":"1"}"#.to_string(),
1256            r#"{"resourceType":"Patient","id":"2"}"#.to_string(),
1257        ])
1258        .with_cursor("next-page");
1259
1260        assert_eq!(batch.len(), 2);
1261        assert!(!batch.is_empty());
1262        assert!(!batch.is_last);
1263        assert_eq!(batch.next_cursor, Some("next-page".to_string()));
1264
1265        let final_batch = batch.as_last();
1266        assert!(final_batch.is_last);
1267        assert!(final_batch.next_cursor.is_none());
1268    }
1269
1270    #[test]
1271    fn test_ndjson_batch_empty() {
1272        let batch = NdjsonBatch::empty();
1273        assert!(batch.is_empty());
1274        assert!(batch.is_last);
1275    }
1276}