Skip to main content

helios_persistence/core/
bulk_submit.rs

1//! Bulk submit types and traits.
2//!
3//! This module provides types and traits for implementing Bulk Data Submit
4//! as specified in the [Bulk Submit](https://hackmd.io/@argonaut/rJoqHZrPle) specification.
5//!
6//! # Overview
7//!
8//! Bulk Submit allows clients to submit large amounts of FHIR resources in NDJSON format.
9//! The submission process involves:
10//!
11//! 1. Creating a submission
12//! 2. Adding manifests (files to process)
13//! 3. Processing entries from each manifest
14//! 4. Completing or aborting the submission
15//!
16//! # Rollback Support
17//!
18//! Submissions track changes for potential rollback. When a submission is aborted,
19//! created resources are deleted and updated resources are reverted to their
20//! previous state.
21//!
22//! # Example
23//!
24//! ```ignore
25//! use helios_persistence::core::bulk_submit::{
26//!     BulkSubmitProvider, SubmissionId, NdjsonEntry, BulkProcessingOptions,
27//! };
28//!
29//! async fn submit_patients<S: BulkSubmitProvider>(storage: &S, tenant: &TenantContext) {
30//!     // Create a submission
31//!     let sub_id = SubmissionId::generate("my-system");
32//!     let summary = storage.create_submission(tenant, &sub_id, None).await.unwrap();
33//!
34//!     // Add a manifest
35//!     let manifest = storage.add_manifest(tenant, &sub_id, None, None).await.unwrap();
36//!
37//!     // Process entries
38//!     let entries = vec![
39//!         NdjsonEntry::new(1, "Patient", serde_json::json!({"resourceType": "Patient"})),
40//!     ];
41//!     let results = storage.process_entries(
42//!         tenant, &sub_id, &manifest.manifest_id, entries, &BulkProcessingOptions::new()
43//!     ).await.unwrap();
44//!
45//!     // Complete the submission
46//!     storage.complete_submission(tenant, &sub_id).await.unwrap();
47//! }
48//! ```
49
50use async_trait::async_trait;
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::io::AsyncBufRead;
55use uuid::Uuid;
56
57use crate::core::storage::ResourceStorage;
58use crate::error::StorageResult;
59use crate::tenant::TenantContext;
60
61/// Audit event helpers for bulk submit operations.
62#[cfg(feature = "audit")]
63pub mod audit {
64    use helios_audit::{AuditAction, AuditEventBuilder, AuditSink};
65
66    use super::SubmissionId;
67
68    /// Record an audit event for a bulk submit lifecycle event.
69    ///
70    /// Call this at submission start, completion, abort, or rollback.
71    pub async fn record_submit_event(
72        sink: &dyn AuditSink,
73        source_observer: &str,
74        agent: Option<&str>,
75        submission_id: &SubmissionId,
76        phase: &str,
77        outcome: &str,
78        outcome_desc: Option<&str>,
79    ) {
80        let mut builder = AuditEventBuilder::new(source_observer)
81            .event_type(
82                "http://terminology.hl7.org/CodeSystem/audit-event-type",
83                "object",
84            )
85            .action(AuditAction::Execute)
86            .outcome(outcome)
87            .detail("audit-operation", "bulk-import")
88            .detail("submission-id", &submission_id.submission_id)
89            .detail("submitter", &submission_id.submitter)
90            .detail("phase", phase);
91        if let Some(a) = agent {
92            builder = builder.agent(a, None, true);
93        }
94        if let Some(d) = outcome_desc {
95            builder = builder.outcome_desc(d);
96        }
97        sink.record(builder.build()).await;
98    }
99
100    #[cfg(test)]
101    mod tests {
102        use helios_audit::sinks::NullSink;
103
104        use super::*;
105        use crate::core::bulk_submit::SubmissionId;
106
107        #[tokio::test]
108        async fn test_submit_event_includes_phase() {
109            let sink = NullSink;
110            let id = SubmissionId::new("client-a", "sub-123");
111            record_submit_event(&sink, "Device/hfs", None, &id, "start", "0", None).await;
112        }
113
114        #[test]
115        fn test_submit_event_has_details() {
116            let id = SubmissionId::new("client-a", "sub-123");
117            let event = AuditEventBuilder::new("Device/hfs")
118                .event_type(
119                    "http://terminology.hl7.org/CodeSystem/audit-event-type",
120                    "object",
121                )
122                .action(AuditAction::Execute)
123                .outcome("0")
124                .detail("audit-operation", "bulk-import")
125                .detail("submission-id", &id.submission_id)
126                .detail("submitter", &id.submitter)
127                .detail("phase", "complete")
128                .build();
129            let entities = event.entity.as_ref().unwrap();
130            let details = entities[0].detail.as_ref().unwrap();
131            assert_eq!(details.len(), 4);
132            assert_eq!(details[3].r#type.value.as_deref(), Some("phase"));
133        }
134    }
135}
136
137/// Unique identifier for a bulk submission.
138///
139/// A submission is identified by the combination of a submitter identifier
140/// (typically the client system) and a submission ID.
141#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
142pub struct SubmissionId {
143    /// The submitter identifier (e.g., client system name).
144    pub submitter: String,
145    /// The submission identifier.
146    pub submission_id: String,
147}
148
149impl SubmissionId {
150    /// Creates a new submission ID.
151    pub fn new(submitter: impl Into<String>, submission_id: impl Into<String>) -> Self {
152        Self {
153            submitter: submitter.into(),
154            submission_id: submission_id.into(),
155        }
156    }
157
158    /// Generates a new submission ID with a random UUID.
159    pub fn generate(submitter: impl Into<String>) -> Self {
160        Self {
161            submitter: submitter.into(),
162            submission_id: Uuid::new_v4().to_string(),
163        }
164    }
165}
166
167impl std::fmt::Display for SubmissionId {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        write!(f, "{}/{}", self.submitter, self.submission_id)
170    }
171}
172
173/// Status of a bulk submission.
174#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(rename_all = "kebab-case")]
176pub enum SubmissionStatus {
177    /// Submission is in progress.
178    InProgress,
179    /// Submission has completed successfully.
180    Complete,
181    /// Submission was aborted.
182    Aborted,
183}
184
185impl SubmissionStatus {
186    /// Returns true if the submission is in a terminal state.
187    pub fn is_terminal(&self) -> bool {
188        matches!(self, Self::Complete | Self::Aborted)
189    }
190}
191
192impl std::fmt::Display for SubmissionStatus {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        match self {
195            Self::InProgress => write!(f, "in-progress"),
196            Self::Complete => write!(f, "complete"),
197            Self::Aborted => write!(f, "aborted"),
198        }
199    }
200}
201
202impl std::str::FromStr for SubmissionStatus {
203    type Err = String;
204
205    fn from_str(s: &str) -> Result<Self, Self::Err> {
206        match s {
207            "in-progress" | "in_progress" => Ok(Self::InProgress),
208            "complete" => Ok(Self::Complete),
209            "aborted" => Ok(Self::Aborted),
210            _ => Err(format!("unknown submission status: {}", s)),
211        }
212    }
213}
214
215/// Status of a manifest within a submission.
216#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
217#[serde(rename_all = "kebab-case")]
218pub enum ManifestStatus {
219    /// Manifest has been added but not yet processed.
220    Pending,
221    /// Manifest is currently being processed.
222    Processing,
223    /// Manifest has been fully processed.
224    Completed,
225    /// Manifest processing failed.
226    Failed,
227    /// Manifest was superseded by a later submission via `replacesManifestUrl`.
228    Replaced,
229}
230
231impl ManifestStatus {
232    /// Returns true if the manifest is in a terminal state.
233    pub fn is_terminal(&self) -> bool {
234        matches!(self, Self::Completed | Self::Failed | Self::Replaced)
235    }
236}
237
238impl std::fmt::Display for ManifestStatus {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        match self {
241            Self::Pending => write!(f, "pending"),
242            Self::Processing => write!(f, "processing"),
243            Self::Completed => write!(f, "completed"),
244            Self::Failed => write!(f, "failed"),
245            Self::Replaced => write!(f, "replaced"),
246        }
247    }
248}
249
250impl std::str::FromStr for ManifestStatus {
251    type Err = String;
252
253    fn from_str(s: &str) -> Result<Self, Self::Err> {
254        match s {
255            "pending" => Ok(Self::Pending),
256            "processing" => Ok(Self::Processing),
257            "completed" => Ok(Self::Completed),
258            "failed" => Ok(Self::Failed),
259            "replaced" => Ok(Self::Replaced),
260            _ => Err(format!("unknown manifest status: {}", s)),
261        }
262    }
263}
264
265/// A manifest within a submission.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct SubmissionManifest {
268    /// Unique manifest ID within the submission.
269    pub manifest_id: String,
270    /// Optional URL where the manifest data was fetched from.
271    #[serde(skip_serializing_if = "Option::is_none")]
272    pub manifest_url: Option<String>,
273    /// URL of the manifest this replaces, if any.
274    #[serde(skip_serializing_if = "Option::is_none")]
275    pub replaces_manifest_url: Option<String>,
276    /// Current processing status.
277    pub status: ManifestStatus,
278    /// When the manifest was added.
279    pub added_at: DateTime<Utc>,
280    /// Total number of entries in the manifest.
281    pub total_entries: u64,
282    /// Number of entries processed so far.
283    pub processed_entries: u64,
284    /// Number of entries that failed processing.
285    pub failed_entries: u64,
286}
287
288impl SubmissionManifest {
289    /// Creates a new manifest.
290    pub fn new(manifest_id: impl Into<String>) -> Self {
291        Self {
292            manifest_id: manifest_id.into(),
293            manifest_url: None,
294            replaces_manifest_url: None,
295            status: ManifestStatus::Pending,
296            added_at: Utc::now(),
297            total_entries: 0,
298            processed_entries: 0,
299            failed_entries: 0,
300        }
301    }
302
303    /// Sets the manifest URL.
304    pub fn with_url(mut self, url: impl Into<String>) -> Self {
305        self.manifest_url = Some(url.into());
306        self
307    }
308
309    /// Sets the replaces URL.
310    pub fn with_replaces(mut self, url: impl Into<String>) -> Self {
311        self.replaces_manifest_url = Some(url.into());
312        self
313    }
314}
315
316/// Outcome of processing a single entry.
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
318#[serde(rename_all = "kebab-case")]
319pub enum BulkEntryOutcome {
320    /// Entry was processed successfully.
321    Success,
322    /// Entry failed validation.
323    ValidationError,
324    /// Entry encountered a processing error.
325    ProcessingError,
326    /// Entry was skipped (e.g., duplicate).
327    Skipped,
328}
329
330impl std::fmt::Display for BulkEntryOutcome {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        match self {
333            Self::Success => write!(f, "success"),
334            Self::ValidationError => write!(f, "validation-error"),
335            Self::ProcessingError => write!(f, "processing-error"),
336            Self::Skipped => write!(f, "skipped"),
337        }
338    }
339}
340
341impl std::str::FromStr for BulkEntryOutcome {
342    type Err = String;
343
344    fn from_str(s: &str) -> Result<Self, Self::Err> {
345        match s {
346            "success" => Ok(Self::Success),
347            "validation-error" | "validation_error" => Ok(Self::ValidationError),
348            "processing-error" | "processing_error" => Ok(Self::ProcessingError),
349            "skipped" => Ok(Self::Skipped),
350            _ => Err(format!("unknown entry outcome: {}", s)),
351        }
352    }
353}
354
355/// Result of processing a single NDJSON entry.
356#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct BulkEntryResult {
358    /// Line number in the NDJSON file (1-indexed).
359    pub line_number: u64,
360    /// Resource type of the entry.
361    pub resource_type: String,
362    /// Resource ID if successfully processed.
363    #[serde(skip_serializing_if = "Option::is_none")]
364    pub resource_id: Option<String>,
365    /// Whether a new resource was created (vs updated).
366    pub created: bool,
367    /// Processing outcome.
368    pub outcome: BulkEntryOutcome,
369    /// OperationOutcome if there was an error.
370    #[serde(skip_serializing_if = "Option::is_none")]
371    pub operation_outcome: Option<Value>,
372}
373
374impl BulkEntryResult {
375    /// Creates a success result.
376    pub fn success(
377        line_number: u64,
378        resource_type: impl Into<String>,
379        resource_id: impl Into<String>,
380        created: bool,
381    ) -> Self {
382        Self {
383            line_number,
384            resource_type: resource_type.into(),
385            resource_id: Some(resource_id.into()),
386            created,
387            outcome: BulkEntryOutcome::Success,
388            operation_outcome: None,
389        }
390    }
391
392    /// Creates a validation error result.
393    pub fn validation_error(
394        line_number: u64,
395        resource_type: impl Into<String>,
396        outcome: Value,
397    ) -> Self {
398        Self {
399            line_number,
400            resource_type: resource_type.into(),
401            resource_id: None,
402            created: false,
403            outcome: BulkEntryOutcome::ValidationError,
404            operation_outcome: Some(outcome),
405        }
406    }
407
408    /// Creates a processing error result.
409    pub fn processing_error(
410        line_number: u64,
411        resource_type: impl Into<String>,
412        outcome: Value,
413    ) -> Self {
414        Self {
415            line_number,
416            resource_type: resource_type.into(),
417            resource_id: None,
418            created: false,
419            outcome: BulkEntryOutcome::ProcessingError,
420            operation_outcome: Some(outcome),
421        }
422    }
423
424    /// Creates a skipped result.
425    pub fn skipped(line_number: u64, resource_type: impl Into<String>, reason: &str) -> Self {
426        Self {
427            line_number,
428            resource_type: resource_type.into(),
429            resource_id: None,
430            created: false,
431            outcome: BulkEntryOutcome::Skipped,
432            operation_outcome: Some(serde_json::json!({
433                "resourceType": "OperationOutcome",
434                "issue": [{
435                    "severity": "information",
436                    "code": "informational",
437                    "diagnostics": reason
438                }]
439            })),
440        }
441    }
442
443    /// Returns true if this was a successful outcome.
444    pub fn is_success(&self) -> bool {
445        self.outcome == BulkEntryOutcome::Success
446    }
447
448    /// Returns true if this was an error outcome.
449    pub fn is_error(&self) -> bool {
450        matches!(
451            self.outcome,
452            BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError
453        )
454    }
455}
456
457/// Summary of a submission's status.
458#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct SubmissionSummary {
460    /// The submission ID.
461    pub id: SubmissionId,
462    /// Current status.
463    pub status: SubmissionStatus,
464    /// When the submission was created.
465    pub created_at: DateTime<Utc>,
466    /// When the submission was last updated.
467    pub updated_at: DateTime<Utc>,
468    /// When the submission completed (if terminal).
469    #[serde(skip_serializing_if = "Option::is_none")]
470    pub completed_at: Option<DateTime<Utc>>,
471    /// Number of manifests in the submission.
472    pub manifest_count: u32,
473    /// Total entries across all manifests.
474    pub total_entries: u64,
475    /// Successfully processed entries.
476    pub success_count: u64,
477    /// Failed entries.
478    pub error_count: u64,
479    /// Skipped entries.
480    pub skipped_count: u64,
481    /// Optional metadata.
482    #[serde(skip_serializing_if = "Option::is_none")]
483    pub metadata: Option<Value>,
484}
485
486impl SubmissionSummary {
487    /// Creates a new submission summary.
488    pub fn new(id: SubmissionId) -> Self {
489        let now = Utc::now();
490        Self {
491            id,
492            status: SubmissionStatus::InProgress,
493            created_at: now,
494            updated_at: now,
495            completed_at: None,
496            manifest_count: 0,
497            total_entries: 0,
498            success_count: 0,
499            error_count: 0,
500            skipped_count: 0,
501            metadata: None,
502        }
503    }
504
505    /// Sets the metadata.
506    pub fn with_metadata(mut self, metadata: Value) -> Self {
507        self.metadata = Some(metadata);
508        self
509    }
510}
511
512/// A parsed NDJSON entry ready for processing.
513#[derive(Debug, Clone, Serialize, Deserialize)]
514pub struct NdjsonEntry {
515    /// Line number in the source file (1-indexed).
516    pub line_number: u64,
517    /// The resource type.
518    pub resource_type: String,
519    /// The resource ID (if present in the resource).
520    #[serde(skip_serializing_if = "Option::is_none")]
521    pub resource_id: Option<String>,
522    /// The resource content.
523    pub resource: Value,
524}
525
526impl NdjsonEntry {
527    /// Creates a new NDJSON entry.
528    pub fn new(line_number: u64, resource_type: impl Into<String>, resource: Value) -> Self {
529        let resource_type = resource_type.into();
530        let resource_id = resource
531            .get("id")
532            .and_then(|v| v.as_str())
533            .map(String::from);
534        Self {
535            line_number,
536            resource_type,
537            resource_id,
538            resource,
539        }
540    }
541
542    /// Parses an NDJSON line into an entry.
543    pub fn parse(line_number: u64, line: &str) -> Result<Self, String> {
544        let resource: Value =
545            serde_json::from_str(line).map_err(|e| format!("invalid JSON: {}", e))?;
546
547        let resource_type = resource
548            .get("resourceType")
549            .and_then(|v| v.as_str())
550            .ok_or_else(|| "missing resourceType".to_string())?
551            .to_string();
552
553        Ok(Self::new(line_number, resource_type, resource))
554    }
555}
556
557/// Options for bulk processing.
558#[derive(Debug, Clone, Serialize, Deserialize)]
559pub struct BulkProcessingOptions {
560    /// Number of entries to process in a single batch/transaction.
561    #[serde(default = "default_submit_batch_size")]
562    pub batch_size: u32,
563    /// Whether to continue processing after encountering errors.
564    #[serde(default = "default_continue_on_error")]
565    pub continue_on_error: bool,
566    /// Maximum number of errors before aborting (0 = unlimited).
567    #[serde(default)]
568    pub max_errors: u32,
569    /// Whether to allow updates to existing resources.
570    #[serde(default = "default_allow_updates")]
571    pub allow_updates: bool,
572}
573
574fn default_submit_batch_size() -> u32 {
575    100
576}
577
578fn default_continue_on_error() -> bool {
579    true
580}
581
582fn default_allow_updates() -> bool {
583    true
584}
585
586impl Default for BulkProcessingOptions {
587    fn default() -> Self {
588        Self::new()
589    }
590}
591
592impl BulkProcessingOptions {
593    /// Creates default processing options.
594    pub fn new() -> Self {
595        Self {
596            batch_size: default_submit_batch_size(),
597            continue_on_error: default_continue_on_error(),
598            max_errors: 0,
599            allow_updates: default_allow_updates(),
600        }
601    }
602
603    /// Sets the batch size.
604    pub fn with_batch_size(mut self, batch_size: u32) -> Self {
605        self.batch_size = batch_size;
606        self
607    }
608
609    /// Sets whether to continue on error.
610    pub fn with_continue_on_error(mut self, continue_on_error: bool) -> Self {
611        self.continue_on_error = continue_on_error;
612        self
613    }
614
615    /// Sets the maximum number of errors.
616    pub fn with_max_errors(mut self, max_errors: u32) -> Self {
617        self.max_errors = max_errors;
618        self
619    }
620
621    /// Sets whether updates are allowed.
622    pub fn with_allow_updates(mut self, allow_updates: bool) -> Self {
623        self.allow_updates = allow_updates;
624        self
625    }
626
627    /// Creates options for strict processing (no errors allowed).
628    pub fn strict() -> Self {
629        Self {
630            batch_size: default_submit_batch_size(),
631            continue_on_error: false,
632            max_errors: 1,
633            allow_updates: true,
634        }
635    }
636
637    /// Creates options for create-only processing.
638    pub fn create_only() -> Self {
639        Self {
640            batch_size: default_submit_batch_size(),
641            continue_on_error: true,
642            max_errors: 0,
643            allow_updates: false,
644        }
645    }
646}
647
648/// Type of change recorded for rollback.
649#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
650#[serde(rename_all = "lowercase")]
651pub enum ChangeType {
652    /// A new resource was created.
653    Create,
654    /// An existing resource was updated.
655    Update,
656}
657
658impl std::fmt::Display for ChangeType {
659    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
660        match self {
661            Self::Create => write!(f, "create"),
662            Self::Update => write!(f, "update"),
663        }
664    }
665}
666
667impl std::str::FromStr for ChangeType {
668    type Err = String;
669
670    fn from_str(s: &str) -> Result<Self, Self::Err> {
671        match s {
672            "create" => Ok(Self::Create),
673            "update" => Ok(Self::Update),
674            _ => Err(format!("unknown change type: {}", s)),
675        }
676    }
677}
678
679/// A change record for potential rollback.
680#[derive(Debug, Clone, Serialize, Deserialize)]
681pub struct SubmissionChange {
682    /// Unique change ID.
683    pub change_id: String,
684    /// Manifest this change is associated with.
685    pub manifest_id: String,
686    /// Type of change.
687    pub change_type: ChangeType,
688    /// Resource type.
689    pub resource_type: String,
690    /// Resource ID.
691    pub resource_id: String,
692    /// Version before the change (for updates).
693    #[serde(skip_serializing_if = "Option::is_none")]
694    pub previous_version: Option<String>,
695    /// Version after the change.
696    pub new_version: String,
697    /// Previous resource content (for updates).
698    #[serde(skip_serializing_if = "Option::is_none")]
699    pub previous_content: Option<Value>,
700    /// When the change was made.
701    pub changed_at: DateTime<Utc>,
702}
703
704impl SubmissionChange {
705    /// Creates a change record for a create operation.
706    pub fn create(
707        manifest_id: impl Into<String>,
708        resource_type: impl Into<String>,
709        resource_id: impl Into<String>,
710        new_version: impl Into<String>,
711    ) -> Self {
712        Self {
713            change_id: Uuid::new_v4().to_string(),
714            manifest_id: manifest_id.into(),
715            change_type: ChangeType::Create,
716            resource_type: resource_type.into(),
717            resource_id: resource_id.into(),
718            previous_version: None,
719            new_version: new_version.into(),
720            previous_content: None,
721            changed_at: Utc::now(),
722        }
723    }
724
725    /// Creates a change record for an update operation.
726    pub fn update(
727        manifest_id: impl Into<String>,
728        resource_type: impl Into<String>,
729        resource_id: impl Into<String>,
730        previous_version: impl Into<String>,
731        new_version: impl Into<String>,
732        previous_content: Value,
733    ) -> Self {
734        Self {
735            change_id: Uuid::new_v4().to_string(),
736            manifest_id: manifest_id.into(),
737            change_type: ChangeType::Update,
738            resource_type: resource_type.into(),
739            resource_id: resource_id.into(),
740            previous_version: Some(previous_version.into()),
741            new_version: new_version.into(),
742            previous_content: Some(previous_content),
743            changed_at: Utc::now(),
744        }
745    }
746}
747
748/// Summary of entry counts by outcome.
749#[derive(Debug, Clone, Default, Serialize, Deserialize)]
750pub struct EntryCountSummary {
751    /// Total entries.
752    pub total: u64,
753    /// Successful entries.
754    pub success: u64,
755    /// Validation errors.
756    pub validation_error: u64,
757    /// Processing errors.
758    pub processing_error: u64,
759    /// Skipped entries.
760    pub skipped: u64,
761}
762
763impl EntryCountSummary {
764    /// Creates an empty summary.
765    pub fn new() -> Self {
766        Self::default()
767    }
768
769    /// Returns the total number of errors.
770    pub fn error_count(&self) -> u64 {
771        self.validation_error + self.processing_error
772    }
773
774    /// Increments the count for an outcome.
775    pub fn increment(&mut self, outcome: BulkEntryOutcome) {
776        self.total += 1;
777        match outcome {
778            BulkEntryOutcome::Success => self.success += 1,
779            BulkEntryOutcome::ValidationError => self.validation_error += 1,
780            BulkEntryOutcome::ProcessingError => self.processing_error += 1,
781            BulkEntryOutcome::Skipped => self.skipped += 1,
782        }
783    }
784}
785
786/// Result of streaming NDJSON processing.
787#[derive(Debug, Clone, Serialize, Deserialize)]
788pub struct StreamProcessingResult {
789    /// Number of lines processed.
790    pub lines_processed: u64,
791    /// Entry count summary.
792    pub counts: EntryCountSummary,
793    /// Whether processing was aborted early.
794    pub aborted: bool,
795    /// Abort reason if applicable.
796    #[serde(skip_serializing_if = "Option::is_none")]
797    pub abort_reason: Option<String>,
798}
799
800impl StreamProcessingResult {
801    /// Creates a new stream processing result.
802    pub fn new() -> Self {
803        Self {
804            lines_processed: 0,
805            counts: EntryCountSummary::new(),
806            aborted: false,
807            abort_reason: None,
808        }
809    }
810
811    /// Marks the result as aborted.
812    pub fn aborted(mut self, reason: impl Into<String>) -> Self {
813        self.aborted = true;
814        self.abort_reason = Some(reason.into());
815        self
816    }
817}
818
819impl Default for StreamProcessingResult {
820    fn default() -> Self {
821        Self::new()
822    }
823}
824
825// ============================================================================
826// Traits
827// ============================================================================
828
829/// Provider for bulk submit operations.
830///
831/// This trait handles the complete lifecycle of bulk submissions including
832/// creating submissions, adding manifests, processing entries, and completing
833/// or aborting submissions.
834#[async_trait]
835pub trait BulkSubmitProvider: ResourceStorage {
836    /// Creates a new submission.
837    ///
838    /// # Arguments
839    ///
840    /// * `tenant` - The tenant context
841    /// * `id` - The submission identifier
842    /// * `metadata` - Optional metadata to attach to the submission
843    ///
844    /// # Returns
845    ///
846    /// The submission summary.
847    ///
848    /// # Errors
849    ///
850    /// * `BulkSubmitError::DuplicateSubmission` - If a submission with this ID exists
851    async fn create_submission(
852        &self,
853        tenant: &TenantContext,
854        id: &SubmissionId,
855        metadata: Option<Value>,
856    ) -> StorageResult<SubmissionSummary>;
857
858    /// Gets a submission by ID.
859    ///
860    /// # Arguments
861    ///
862    /// * `tenant` - The tenant context
863    /// * `id` - The submission identifier
864    ///
865    /// # Returns
866    ///
867    /// The submission summary if found.
868    async fn get_submission(
869        &self,
870        tenant: &TenantContext,
871        id: &SubmissionId,
872    ) -> StorageResult<Option<SubmissionSummary>>;
873
874    /// Lists submissions.
875    ///
876    /// # Arguments
877    ///
878    /// * `tenant` - The tenant context
879    /// * `submitter` - Optional filter by submitter
880    /// * `status` - Optional filter by status
881    /// * `limit` - Maximum number of results
882    /// * `offset` - Offset for pagination
883    ///
884    /// # Returns
885    ///
886    /// List of submission summaries.
887    async fn list_submissions(
888        &self,
889        tenant: &TenantContext,
890        submitter: Option<&str>,
891        status: Option<SubmissionStatus>,
892        limit: u32,
893        offset: u32,
894    ) -> StorageResult<Vec<SubmissionSummary>>;
895
896    /// Marks a submission as complete.
897    ///
898    /// # Arguments
899    ///
900    /// * `tenant` - The tenant context
901    /// * `id` - The submission identifier
902    ///
903    /// # Returns
904    ///
905    /// The updated submission summary.
906    ///
907    /// # Errors
908    ///
909    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
910    /// * `BulkSubmitError::AlreadyComplete` - If already completed
911    async fn complete_submission(
912        &self,
913        tenant: &TenantContext,
914        id: &SubmissionId,
915    ) -> StorageResult<SubmissionSummary>;
916
917    /// Aborts a submission.
918    ///
919    /// This does NOT automatically roll back changes - use `BulkSubmitRollbackProvider`
920    /// for that functionality.
921    ///
922    /// # Arguments
923    ///
924    /// * `tenant` - The tenant context
925    /// * `id` - The submission identifier
926    /// * `reason` - Reason for aborting
927    ///
928    /// # Returns
929    ///
930    /// The number of pending manifests that were cancelled.
931    ///
932    /// # Errors
933    ///
934    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
935    /// * `BulkSubmitError::AlreadyComplete` - If already completed
936    async fn abort_submission(
937        &self,
938        tenant: &TenantContext,
939        id: &SubmissionId,
940        reason: &str,
941    ) -> StorageResult<u64>;
942
943    /// Adds a manifest to a submission.
944    ///
945    /// # Arguments
946    ///
947    /// * `tenant` - The tenant context
948    /// * `submission_id` - The submission identifier
949    /// * `manifest_url` - Optional URL where the manifest data came from
950    /// * `replaces_manifest_url` - Optional URL of manifest this replaces
951    ///
952    /// # Returns
953    ///
954    /// The created manifest.
955    ///
956    /// # Errors
957    ///
958    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
959    /// * `BulkSubmitError::InvalidState` - If the submission is not in progress
960    async fn add_manifest(
961        &self,
962        tenant: &TenantContext,
963        submission_id: &SubmissionId,
964        manifest_url: Option<&str>,
965        replaces_manifest_url: Option<&str>,
966    ) -> StorageResult<SubmissionManifest>;
967
968    /// Gets a manifest by ID.
969    ///
970    /// # Arguments
971    ///
972    /// * `tenant` - The tenant context
973    /// * `submission_id` - The submission identifier
974    /// * `manifest_id` - The manifest identifier
975    ///
976    /// # Returns
977    ///
978    /// The manifest if found.
979    async fn get_manifest(
980        &self,
981        tenant: &TenantContext,
982        submission_id: &SubmissionId,
983        manifest_id: &str,
984    ) -> StorageResult<Option<SubmissionManifest>>;
985
986    /// Lists manifests in a submission.
987    ///
988    /// # Arguments
989    ///
990    /// * `tenant` - The tenant context
991    /// * `submission_id` - The submission identifier
992    ///
993    /// # Returns
994    ///
995    /// List of manifests.
996    async fn list_manifests(
997        &self,
998        tenant: &TenantContext,
999        submission_id: &SubmissionId,
1000    ) -> StorageResult<Vec<SubmissionManifest>>;
1001
1002    /// Processes entries from a manifest.
1003    ///
1004    /// # Arguments
1005    ///
1006    /// * `tenant` - The tenant context
1007    /// * `submission_id` - The submission identifier
1008    /// * `manifest_id` - The manifest identifier
1009    /// * `entries` - The entries to process
1010    /// * `options` - Processing options
1011    ///
1012    /// # Returns
1013    ///
1014    /// Results for each entry.
1015    ///
1016    /// # Errors
1017    ///
1018    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
1019    /// * `BulkSubmitError::ManifestNotFound` - If the manifest doesn't exist
1020    /// * `BulkSubmitError::MaxErrorsExceeded` - If max errors was reached
1021    async fn process_entries(
1022        &self,
1023        tenant: &TenantContext,
1024        submission_id: &SubmissionId,
1025        manifest_id: &str,
1026        entries: Vec<NdjsonEntry>,
1027        options: &BulkProcessingOptions,
1028    ) -> StorageResult<Vec<BulkEntryResult>>;
1029
1030    /// Gets entry results for a manifest.
1031    ///
1032    /// # Arguments
1033    ///
1034    /// * `tenant` - The tenant context
1035    /// * `submission_id` - The submission identifier
1036    /// * `manifest_id` - The manifest identifier
1037    /// * `outcome_filter` - Optional filter by outcome
1038    /// * `limit` - Maximum number of results
1039    /// * `offset` - Offset for pagination
1040    ///
1041    /// # Returns
1042    ///
1043    /// List of entry results.
1044    async fn get_entry_results(
1045        &self,
1046        tenant: &TenantContext,
1047        submission_id: &SubmissionId,
1048        manifest_id: &str,
1049        outcome_filter: Option<BulkEntryOutcome>,
1050        limit: u32,
1051        offset: u32,
1052    ) -> StorageResult<Vec<BulkEntryResult>>;
1053
1054    /// Gets entry counts for a manifest.
1055    ///
1056    /// # Arguments
1057    ///
1058    /// * `tenant` - The tenant context
1059    /// * `submission_id` - The submission identifier
1060    /// * `manifest_id` - The manifest identifier
1061    ///
1062    /// # Returns
1063    ///
1064    /// Entry count summary.
1065    async fn get_entry_counts(
1066        &self,
1067        tenant: &TenantContext,
1068        submission_id: &SubmissionId,
1069        manifest_id: &str,
1070    ) -> StorageResult<EntryCountSummary>;
1071}
1072
1073/// Provider for streaming NDJSON processing.
1074///
1075/// This trait extends `BulkSubmitProvider` with the ability to process
1076/// NDJSON data from an async reader stream.
1077#[async_trait]
1078pub trait StreamingBulkSubmitProvider: BulkSubmitProvider {
1079    /// Processes NDJSON data from a stream.
1080    ///
1081    /// # Arguments
1082    ///
1083    /// * `tenant` - The tenant context
1084    /// * `submission_id` - The submission identifier
1085    /// * `manifest_id` - The manifest identifier
1086    /// * `resource_type` - Expected resource type (for validation)
1087    /// * `reader` - Async reader providing NDJSON lines
1088    /// * `options` - Processing options
1089    ///
1090    /// # Returns
1091    ///
1092    /// Result of the streaming processing.
1093    async fn process_ndjson_stream(
1094        &self,
1095        tenant: &TenantContext,
1096        submission_id: &SubmissionId,
1097        manifest_id: &str,
1098        resource_type: &str,
1099        reader: Box<dyn AsyncBufRead + Send + Unpin>,
1100        options: &BulkProcessingOptions,
1101    ) -> StorageResult<StreamProcessingResult>;
1102}
1103
1104/// Provider for rollback of bulk submissions.
1105///
1106/// This trait extends `BulkSubmitProvider` with the ability to track and
1107/// rollback changes made during a submission.
1108#[async_trait]
1109pub trait BulkSubmitRollbackProvider: BulkSubmitProvider {
1110    /// Records a change for potential rollback.
1111    ///
1112    /// # Arguments
1113    ///
1114    /// * `tenant` - The tenant context
1115    /// * `submission_id` - The submission identifier
1116    /// * `change` - The change to record
1117    async fn record_change(
1118        &self,
1119        tenant: &TenantContext,
1120        submission_id: &SubmissionId,
1121        change: &SubmissionChange,
1122    ) -> StorageResult<()>;
1123
1124    /// Lists recorded changes for a submission.
1125    ///
1126    /// # Arguments
1127    ///
1128    /// * `tenant` - The tenant context
1129    /// * `submission_id` - The submission identifier
1130    /// * `limit` - Maximum number of results
1131    /// * `offset` - Offset for pagination
1132    ///
1133    /// # Returns
1134    ///
1135    /// List of recorded changes.
1136    async fn list_changes(
1137        &self,
1138        tenant: &TenantContext,
1139        submission_id: &SubmissionId,
1140        limit: u32,
1141        offset: u32,
1142    ) -> StorageResult<Vec<SubmissionChange>>;
1143
1144    /// Rolls back a single change.
1145    ///
1146    /// For creates: deletes the resource.
1147    /// For updates: restores the previous content.
1148    ///
1149    /// # Arguments
1150    ///
1151    /// * `tenant` - The tenant context
1152    /// * `submission_id` - The submission identifier
1153    /// * `change` - The change to rollback
1154    ///
1155    /// # Returns
1156    ///
1157    /// Whether the rollback was successful.
1158    async fn rollback_change(
1159        &self,
1160        tenant: &TenantContext,
1161        submission_id: &SubmissionId,
1162        change: &SubmissionChange,
1163    ) -> StorageResult<bool>;
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169
1170    #[test]
1171    fn test_submission_id() {
1172        let id = SubmissionId::new("my-system", "sub-123");
1173        assert_eq!(id.submitter, "my-system");
1174        assert_eq!(id.submission_id, "sub-123");
1175        assert_eq!(id.to_string(), "my-system/sub-123");
1176    }
1177
1178    #[test]
1179    fn test_submission_id_generate() {
1180        let id = SubmissionId::generate("my-system");
1181        assert_eq!(id.submitter, "my-system");
1182        assert!(!id.submission_id.is_empty());
1183    }
1184
1185    #[test]
1186    fn test_submission_status() {
1187        assert!(!SubmissionStatus::InProgress.is_terminal());
1188        assert!(SubmissionStatus::Complete.is_terminal());
1189        assert!(SubmissionStatus::Aborted.is_terminal());
1190
1191        let status: SubmissionStatus = "in-progress".parse().unwrap();
1192        assert_eq!(status, SubmissionStatus::InProgress);
1193    }
1194
1195    #[test]
1196    fn test_manifest_status() {
1197        assert!(!ManifestStatus::Pending.is_terminal());
1198        assert!(!ManifestStatus::Processing.is_terminal());
1199        assert!(ManifestStatus::Completed.is_terminal());
1200        assert!(ManifestStatus::Failed.is_terminal());
1201    }
1202
1203    #[test]
1204    fn test_bulk_entry_result() {
1205        let success = BulkEntryResult::success(1, "Patient", "pat-123", true);
1206        assert!(success.is_success());
1207        assert!(!success.is_error());
1208        assert!(success.created);
1209
1210        let error = BulkEntryResult::validation_error(
1211            2,
1212            "Patient",
1213            serde_json::json!({"resourceType": "OperationOutcome"}),
1214        );
1215        assert!(!error.is_success());
1216        assert!(error.is_error());
1217    }
1218
1219    #[test]
1220    fn test_ndjson_entry_parse() {
1221        let line = r#"{"resourceType":"Patient","id":"123","name":[{"family":"Smith"}]}"#;
1222        let entry = NdjsonEntry::parse(1, line).unwrap();
1223
1224        assert_eq!(entry.line_number, 1);
1225        assert_eq!(entry.resource_type, "Patient");
1226        assert_eq!(entry.resource_id, Some("123".to_string()));
1227    }
1228
1229    #[test]
1230    fn test_ndjson_entry_parse_error() {
1231        let result = NdjsonEntry::parse(1, "not json");
1232        assert!(result.is_err());
1233
1234        let result = NdjsonEntry::parse(1, r#"{"id":"123"}"#);
1235        assert!(result.is_err()); // Missing resourceType
1236    }
1237
1238    #[test]
1239    fn test_bulk_processing_options() {
1240        let options = BulkProcessingOptions::new()
1241            .with_batch_size(50)
1242            .with_max_errors(10)
1243            .with_continue_on_error(false);
1244
1245        assert_eq!(options.batch_size, 50);
1246        assert_eq!(options.max_errors, 10);
1247        assert!(!options.continue_on_error);
1248    }
1249
1250    #[test]
1251    fn test_bulk_processing_options_strict() {
1252        let options = BulkProcessingOptions::strict();
1253        assert!(!options.continue_on_error);
1254        assert_eq!(options.max_errors, 1);
1255    }
1256
1257    #[test]
1258    fn test_submission_change() {
1259        let create = SubmissionChange::create("manifest-1", "Patient", "pat-123", "1");
1260        assert_eq!(create.change_type, ChangeType::Create);
1261        assert!(create.previous_content.is_none());
1262
1263        let update = SubmissionChange::update(
1264            "manifest-1",
1265            "Patient",
1266            "pat-123",
1267            "1",
1268            "2",
1269            serde_json::json!({"resourceType": "Patient"}),
1270        );
1271        assert_eq!(update.change_type, ChangeType::Update);
1272        assert!(update.previous_content.is_some());
1273    }
1274
1275    #[test]
1276    fn test_entry_count_summary() {
1277        let mut counts = EntryCountSummary::new();
1278        counts.increment(BulkEntryOutcome::Success);
1279        counts.increment(BulkEntryOutcome::Success);
1280        counts.increment(BulkEntryOutcome::ValidationError);
1281        counts.increment(BulkEntryOutcome::ProcessingError);
1282        counts.increment(BulkEntryOutcome::Skipped);
1283
1284        assert_eq!(counts.total, 5);
1285        assert_eq!(counts.success, 2);
1286        assert_eq!(counts.error_count(), 2);
1287        assert_eq!(counts.skipped, 1);
1288    }
1289
1290    #[test]
1291    fn test_stream_processing_result() {
1292        let result = StreamProcessingResult::new().aborted("max errors exceeded");
1293        assert!(result.aborted);
1294        assert_eq!(result.abort_reason, Some("max errors exceeded".to_string()));
1295    }
1296}