Skip to main content

helios_persistence/core/
bulk_submit.rs

1//! Bulk submit types and traits.
2//!
3//! This module provides types and traits for implementing Bulk Data Submit
4//! as specified in the [Bulk Submit](https://hackmd.io/@argonaut/rJoqHZrPle) specification.
5//!
6//! # Overview
7//!
8//! Bulk Submit allows clients to submit large amounts of FHIR resources in NDJSON format.
9//! The submission process involves:
10//!
11//! 1. Creating a submission
12//! 2. Adding manifests (files to process)
13//! 3. Processing entries from each manifest
14//! 4. Completing or aborting the submission
15//!
16//! # Rollback Support
17//!
18//! Submissions track changes for potential rollback. When a submission is aborted,
19//! created resources are deleted and updated resources are reverted to their
20//! previous state.
21//!
22//! # Example
23//!
24//! ```ignore
25//! use helios_persistence::core::bulk_submit::{
26//!     BulkSubmitProvider, SubmissionId, NdjsonEntry, BulkProcessingOptions,
27//! };
28//!
29//! async fn submit_patients<S: BulkSubmitProvider>(storage: &S, tenant: &TenantContext) {
30//!     // Create a submission
31//!     let sub_id = SubmissionId::generate("my-system");
32//!     let summary = storage.create_submission(tenant, &sub_id, None).await.unwrap();
33//!
34//!     // Add a manifest
35//!     let manifest = storage.add_manifest(tenant, &sub_id, None, None).await.unwrap();
36//!
37//!     // Process entries
38//!     let entries = vec![
39//!         NdjsonEntry::new(1, "Patient", serde_json::json!({"resourceType": "Patient"})),
40//!     ];
41//!     let results = storage.process_entries(
42//!         tenant, &sub_id, &manifest.manifest_id, entries, &BulkProcessingOptions::new()
43//!     ).await.unwrap();
44//!
45//!     // Complete the submission
46//!     storage.complete_submission(tenant, &sub_id).await.unwrap();
47//! }
48//! ```
49
50use async_trait::async_trait;
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::io::AsyncBufRead;
55use uuid::Uuid;
56
57use crate::core::storage::ResourceStorage;
58use crate::error::StorageResult;
59use crate::tenant::TenantContext;
60
61/// Unique identifier for a bulk submission.
62///
63/// A submission is identified by the combination of a submitter identifier
64/// (typically the client system) and a submission ID.
65#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
66pub struct SubmissionId {
67    /// The submitter identifier (e.g., client system name).
68    pub submitter: String,
69    /// The submission identifier.
70    pub submission_id: String,
71}
72
73impl SubmissionId {
74    /// Creates a new submission ID.
75    pub fn new(submitter: impl Into<String>, submission_id: impl Into<String>) -> Self {
76        Self {
77            submitter: submitter.into(),
78            submission_id: submission_id.into(),
79        }
80    }
81
82    /// Generates a new submission ID with a random UUID.
83    pub fn generate(submitter: impl Into<String>) -> Self {
84        Self {
85            submitter: submitter.into(),
86            submission_id: Uuid::new_v4().to_string(),
87        }
88    }
89}
90
91impl std::fmt::Display for SubmissionId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "{}/{}", self.submitter, self.submission_id)
94    }
95}
96
97/// Status of a bulk submission.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(rename_all = "kebab-case")]
100pub enum SubmissionStatus {
101    /// Submission is in progress.
102    InProgress,
103    /// Submission has completed successfully.
104    Complete,
105    /// Submission was aborted.
106    Aborted,
107}
108
109impl SubmissionStatus {
110    /// Returns true if the submission is in a terminal state.
111    pub fn is_terminal(&self) -> bool {
112        matches!(self, Self::Complete | Self::Aborted)
113    }
114}
115
116impl std::fmt::Display for SubmissionStatus {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            Self::InProgress => write!(f, "in-progress"),
120            Self::Complete => write!(f, "complete"),
121            Self::Aborted => write!(f, "aborted"),
122        }
123    }
124}
125
126impl std::str::FromStr for SubmissionStatus {
127    type Err = String;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        match s {
131            "in-progress" | "in_progress" => Ok(Self::InProgress),
132            "complete" => Ok(Self::Complete),
133            "aborted" => Ok(Self::Aborted),
134            _ => Err(format!("unknown submission status: {}", s)),
135        }
136    }
137}
138
139/// Status of a manifest within a submission.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142pub enum ManifestStatus {
143    /// Manifest has been added but not yet processed.
144    Pending,
145    /// Manifest is currently being processed.
146    Processing,
147    /// Manifest has been fully processed.
148    Completed,
149    /// Manifest processing failed.
150    Failed,
151}
152
153impl ManifestStatus {
154    /// Returns true if the manifest is in a terminal state.
155    pub fn is_terminal(&self) -> bool {
156        matches!(self, Self::Completed | Self::Failed)
157    }
158}
159
160impl std::fmt::Display for ManifestStatus {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            Self::Pending => write!(f, "pending"),
164            Self::Processing => write!(f, "processing"),
165            Self::Completed => write!(f, "completed"),
166            Self::Failed => write!(f, "failed"),
167        }
168    }
169}
170
171impl std::str::FromStr for ManifestStatus {
172    type Err = String;
173
174    fn from_str(s: &str) -> Result<Self, Self::Err> {
175        match s {
176            "pending" => Ok(Self::Pending),
177            "processing" => Ok(Self::Processing),
178            "completed" => Ok(Self::Completed),
179            "failed" => Ok(Self::Failed),
180            _ => Err(format!("unknown manifest status: {}", s)),
181        }
182    }
183}
184
185/// A manifest within a submission.
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct SubmissionManifest {
188    /// Unique manifest ID within the submission.
189    pub manifest_id: String,
190    /// Optional URL where the manifest data was fetched from.
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub manifest_url: Option<String>,
193    /// URL of the manifest this replaces, if any.
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub replaces_manifest_url: Option<String>,
196    /// Current processing status.
197    pub status: ManifestStatus,
198    /// When the manifest was added.
199    pub added_at: DateTime<Utc>,
200    /// Total number of entries in the manifest.
201    pub total_entries: u64,
202    /// Number of entries processed so far.
203    pub processed_entries: u64,
204    /// Number of entries that failed processing.
205    pub failed_entries: u64,
206}
207
208impl SubmissionManifest {
209    /// Creates a new manifest.
210    pub fn new(manifest_id: impl Into<String>) -> Self {
211        Self {
212            manifest_id: manifest_id.into(),
213            manifest_url: None,
214            replaces_manifest_url: None,
215            status: ManifestStatus::Pending,
216            added_at: Utc::now(),
217            total_entries: 0,
218            processed_entries: 0,
219            failed_entries: 0,
220        }
221    }
222
223    /// Sets the manifest URL.
224    pub fn with_url(mut self, url: impl Into<String>) -> Self {
225        self.manifest_url = Some(url.into());
226        self
227    }
228
229    /// Sets the replaces URL.
230    pub fn with_replaces(mut self, url: impl Into<String>) -> Self {
231        self.replaces_manifest_url = Some(url.into());
232        self
233    }
234}
235
236/// Outcome of processing a single entry.
237#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
238#[serde(rename_all = "kebab-case")]
239pub enum BulkEntryOutcome {
240    /// Entry was processed successfully.
241    Success,
242    /// Entry failed validation.
243    ValidationError,
244    /// Entry encountered a processing error.
245    ProcessingError,
246    /// Entry was skipped (e.g., duplicate).
247    Skipped,
248}
249
250impl std::fmt::Display for BulkEntryOutcome {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        match self {
253            Self::Success => write!(f, "success"),
254            Self::ValidationError => write!(f, "validation-error"),
255            Self::ProcessingError => write!(f, "processing-error"),
256            Self::Skipped => write!(f, "skipped"),
257        }
258    }
259}
260
261impl std::str::FromStr for BulkEntryOutcome {
262    type Err = String;
263
264    fn from_str(s: &str) -> Result<Self, Self::Err> {
265        match s {
266            "success" => Ok(Self::Success),
267            "validation-error" | "validation_error" => Ok(Self::ValidationError),
268            "processing-error" | "processing_error" => Ok(Self::ProcessingError),
269            "skipped" => Ok(Self::Skipped),
270            _ => Err(format!("unknown entry outcome: {}", s)),
271        }
272    }
273}
274
275/// Result of processing a single NDJSON entry.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct BulkEntryResult {
278    /// Line number in the NDJSON file (1-indexed).
279    pub line_number: u64,
280    /// Resource type of the entry.
281    pub resource_type: String,
282    /// Resource ID if successfully processed.
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub resource_id: Option<String>,
285    /// Whether a new resource was created (vs updated).
286    pub created: bool,
287    /// Processing outcome.
288    pub outcome: BulkEntryOutcome,
289    /// OperationOutcome if there was an error.
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub operation_outcome: Option<Value>,
292}
293
294impl BulkEntryResult {
295    /// Creates a success result.
296    pub fn success(
297        line_number: u64,
298        resource_type: impl Into<String>,
299        resource_id: impl Into<String>,
300        created: bool,
301    ) -> Self {
302        Self {
303            line_number,
304            resource_type: resource_type.into(),
305            resource_id: Some(resource_id.into()),
306            created,
307            outcome: BulkEntryOutcome::Success,
308            operation_outcome: None,
309        }
310    }
311
312    /// Creates a validation error result.
313    pub fn validation_error(
314        line_number: u64,
315        resource_type: impl Into<String>,
316        outcome: Value,
317    ) -> Self {
318        Self {
319            line_number,
320            resource_type: resource_type.into(),
321            resource_id: None,
322            created: false,
323            outcome: BulkEntryOutcome::ValidationError,
324            operation_outcome: Some(outcome),
325        }
326    }
327
328    /// Creates a processing error result.
329    pub fn processing_error(
330        line_number: u64,
331        resource_type: impl Into<String>,
332        outcome: Value,
333    ) -> Self {
334        Self {
335            line_number,
336            resource_type: resource_type.into(),
337            resource_id: None,
338            created: false,
339            outcome: BulkEntryOutcome::ProcessingError,
340            operation_outcome: Some(outcome),
341        }
342    }
343
344    /// Creates a skipped result.
345    pub fn skipped(line_number: u64, resource_type: impl Into<String>, reason: &str) -> Self {
346        Self {
347            line_number,
348            resource_type: resource_type.into(),
349            resource_id: None,
350            created: false,
351            outcome: BulkEntryOutcome::Skipped,
352            operation_outcome: Some(serde_json::json!({
353                "resourceType": "OperationOutcome",
354                "issue": [{
355                    "severity": "information",
356                    "code": "informational",
357                    "diagnostics": reason
358                }]
359            })),
360        }
361    }
362
363    /// Returns true if this was a successful outcome.
364    pub fn is_success(&self) -> bool {
365        self.outcome == BulkEntryOutcome::Success
366    }
367
368    /// Returns true if this was an error outcome.
369    pub fn is_error(&self) -> bool {
370        matches!(
371            self.outcome,
372            BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError
373        )
374    }
375}
376
377/// Summary of a submission's status.
378#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct SubmissionSummary {
380    /// The submission ID.
381    pub id: SubmissionId,
382    /// Current status.
383    pub status: SubmissionStatus,
384    /// When the submission was created.
385    pub created_at: DateTime<Utc>,
386    /// When the submission was last updated.
387    pub updated_at: DateTime<Utc>,
388    /// When the submission completed (if terminal).
389    #[serde(skip_serializing_if = "Option::is_none")]
390    pub completed_at: Option<DateTime<Utc>>,
391    /// Number of manifests in the submission.
392    pub manifest_count: u32,
393    /// Total entries across all manifests.
394    pub total_entries: u64,
395    /// Successfully processed entries.
396    pub success_count: u64,
397    /// Failed entries.
398    pub error_count: u64,
399    /// Skipped entries.
400    pub skipped_count: u64,
401    /// Optional metadata.
402    #[serde(skip_serializing_if = "Option::is_none")]
403    pub metadata: Option<Value>,
404}
405
406impl SubmissionSummary {
407    /// Creates a new submission summary.
408    pub fn new(id: SubmissionId) -> Self {
409        let now = Utc::now();
410        Self {
411            id,
412            status: SubmissionStatus::InProgress,
413            created_at: now,
414            updated_at: now,
415            completed_at: None,
416            manifest_count: 0,
417            total_entries: 0,
418            success_count: 0,
419            error_count: 0,
420            skipped_count: 0,
421            metadata: None,
422        }
423    }
424
425    /// Sets the metadata.
426    pub fn with_metadata(mut self, metadata: Value) -> Self {
427        self.metadata = Some(metadata);
428        self
429    }
430}
431
432/// A parsed NDJSON entry ready for processing.
433#[derive(Debug, Clone, Serialize, Deserialize)]
434pub struct NdjsonEntry {
435    /// Line number in the source file (1-indexed).
436    pub line_number: u64,
437    /// The resource type.
438    pub resource_type: String,
439    /// The resource ID (if present in the resource).
440    #[serde(skip_serializing_if = "Option::is_none")]
441    pub resource_id: Option<String>,
442    /// The resource content.
443    pub resource: Value,
444}
445
446impl NdjsonEntry {
447    /// Creates a new NDJSON entry.
448    pub fn new(line_number: u64, resource_type: impl Into<String>, resource: Value) -> Self {
449        let resource_type = resource_type.into();
450        let resource_id = resource
451            .get("id")
452            .and_then(|v| v.as_str())
453            .map(String::from);
454        Self {
455            line_number,
456            resource_type,
457            resource_id,
458            resource,
459        }
460    }
461
462    /// Parses an NDJSON line into an entry.
463    pub fn parse(line_number: u64, line: &str) -> Result<Self, String> {
464        let resource: Value =
465            serde_json::from_str(line).map_err(|e| format!("invalid JSON: {}", e))?;
466
467        let resource_type = resource
468            .get("resourceType")
469            .and_then(|v| v.as_str())
470            .ok_or_else(|| "missing resourceType".to_string())?
471            .to_string();
472
473        Ok(Self::new(line_number, resource_type, resource))
474    }
475}
476
477/// Options for bulk processing.
478#[derive(Debug, Clone, Serialize, Deserialize)]
479pub struct BulkProcessingOptions {
480    /// Number of entries to process in a single batch/transaction.
481    #[serde(default = "default_submit_batch_size")]
482    pub batch_size: u32,
483    /// Whether to continue processing after encountering errors.
484    #[serde(default = "default_continue_on_error")]
485    pub continue_on_error: bool,
486    /// Maximum number of errors before aborting (0 = unlimited).
487    #[serde(default)]
488    pub max_errors: u32,
489    /// Whether to allow updates to existing resources.
490    #[serde(default = "default_allow_updates")]
491    pub allow_updates: bool,
492}
493
494fn default_submit_batch_size() -> u32 {
495    100
496}
497
498fn default_continue_on_error() -> bool {
499    true
500}
501
502fn default_allow_updates() -> bool {
503    true
504}
505
506impl Default for BulkProcessingOptions {
507    fn default() -> Self {
508        Self::new()
509    }
510}
511
512impl BulkProcessingOptions {
513    /// Creates default processing options.
514    pub fn new() -> Self {
515        Self {
516            batch_size: default_submit_batch_size(),
517            continue_on_error: default_continue_on_error(),
518            max_errors: 0,
519            allow_updates: default_allow_updates(),
520        }
521    }
522
523    /// Sets the batch size.
524    pub fn with_batch_size(mut self, batch_size: u32) -> Self {
525        self.batch_size = batch_size;
526        self
527    }
528
529    /// Sets whether to continue on error.
530    pub fn with_continue_on_error(mut self, continue_on_error: bool) -> Self {
531        self.continue_on_error = continue_on_error;
532        self
533    }
534
535    /// Sets the maximum number of errors.
536    pub fn with_max_errors(mut self, max_errors: u32) -> Self {
537        self.max_errors = max_errors;
538        self
539    }
540
541    /// Sets whether updates are allowed.
542    pub fn with_allow_updates(mut self, allow_updates: bool) -> Self {
543        self.allow_updates = allow_updates;
544        self
545    }
546
547    /// Creates options for strict processing (no errors allowed).
548    pub fn strict() -> Self {
549        Self {
550            batch_size: default_submit_batch_size(),
551            continue_on_error: false,
552            max_errors: 1,
553            allow_updates: true,
554        }
555    }
556
557    /// Creates options for create-only processing.
558    pub fn create_only() -> Self {
559        Self {
560            batch_size: default_submit_batch_size(),
561            continue_on_error: true,
562            max_errors: 0,
563            allow_updates: false,
564        }
565    }
566}
567
568/// Type of change recorded for rollback.
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
570#[serde(rename_all = "lowercase")]
571pub enum ChangeType {
572    /// A new resource was created.
573    Create,
574    /// An existing resource was updated.
575    Update,
576}
577
578impl std::fmt::Display for ChangeType {
579    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580        match self {
581            Self::Create => write!(f, "create"),
582            Self::Update => write!(f, "update"),
583        }
584    }
585}
586
587impl std::str::FromStr for ChangeType {
588    type Err = String;
589
590    fn from_str(s: &str) -> Result<Self, Self::Err> {
591        match s {
592            "create" => Ok(Self::Create),
593            "update" => Ok(Self::Update),
594            _ => Err(format!("unknown change type: {}", s)),
595        }
596    }
597}
598
599/// A change record for potential rollback.
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct SubmissionChange {
602    /// Unique change ID.
603    pub change_id: String,
604    /// Manifest this change is associated with.
605    pub manifest_id: String,
606    /// Type of change.
607    pub change_type: ChangeType,
608    /// Resource type.
609    pub resource_type: String,
610    /// Resource ID.
611    pub resource_id: String,
612    /// Version before the change (for updates).
613    #[serde(skip_serializing_if = "Option::is_none")]
614    pub previous_version: Option<String>,
615    /// Version after the change.
616    pub new_version: String,
617    /// Previous resource content (for updates).
618    #[serde(skip_serializing_if = "Option::is_none")]
619    pub previous_content: Option<Value>,
620    /// When the change was made.
621    pub changed_at: DateTime<Utc>,
622}
623
624impl SubmissionChange {
625    /// Creates a change record for a create operation.
626    pub fn create(
627        manifest_id: impl Into<String>,
628        resource_type: impl Into<String>,
629        resource_id: impl Into<String>,
630        new_version: impl Into<String>,
631    ) -> Self {
632        Self {
633            change_id: Uuid::new_v4().to_string(),
634            manifest_id: manifest_id.into(),
635            change_type: ChangeType::Create,
636            resource_type: resource_type.into(),
637            resource_id: resource_id.into(),
638            previous_version: None,
639            new_version: new_version.into(),
640            previous_content: None,
641            changed_at: Utc::now(),
642        }
643    }
644
645    /// Creates a change record for an update operation.
646    pub fn update(
647        manifest_id: impl Into<String>,
648        resource_type: impl Into<String>,
649        resource_id: impl Into<String>,
650        previous_version: impl Into<String>,
651        new_version: impl Into<String>,
652        previous_content: Value,
653    ) -> Self {
654        Self {
655            change_id: Uuid::new_v4().to_string(),
656            manifest_id: manifest_id.into(),
657            change_type: ChangeType::Update,
658            resource_type: resource_type.into(),
659            resource_id: resource_id.into(),
660            previous_version: Some(previous_version.into()),
661            new_version: new_version.into(),
662            previous_content: Some(previous_content),
663            changed_at: Utc::now(),
664        }
665    }
666}
667
668/// Summary of entry counts by outcome.
669#[derive(Debug, Clone, Default, Serialize, Deserialize)]
670pub struct EntryCountSummary {
671    /// Total entries.
672    pub total: u64,
673    /// Successful entries.
674    pub success: u64,
675    /// Validation errors.
676    pub validation_error: u64,
677    /// Processing errors.
678    pub processing_error: u64,
679    /// Skipped entries.
680    pub skipped: u64,
681}
682
683impl EntryCountSummary {
684    /// Creates an empty summary.
685    pub fn new() -> Self {
686        Self::default()
687    }
688
689    /// Returns the total number of errors.
690    pub fn error_count(&self) -> u64 {
691        self.validation_error + self.processing_error
692    }
693
694    /// Increments the count for an outcome.
695    pub fn increment(&mut self, outcome: BulkEntryOutcome) {
696        self.total += 1;
697        match outcome {
698            BulkEntryOutcome::Success => self.success += 1,
699            BulkEntryOutcome::ValidationError => self.validation_error += 1,
700            BulkEntryOutcome::ProcessingError => self.processing_error += 1,
701            BulkEntryOutcome::Skipped => self.skipped += 1,
702        }
703    }
704}
705
706/// Result of streaming NDJSON processing.
707#[derive(Debug, Clone, Serialize, Deserialize)]
708pub struct StreamProcessingResult {
709    /// Number of lines processed.
710    pub lines_processed: u64,
711    /// Entry count summary.
712    pub counts: EntryCountSummary,
713    /// Whether processing was aborted early.
714    pub aborted: bool,
715    /// Abort reason if applicable.
716    #[serde(skip_serializing_if = "Option::is_none")]
717    pub abort_reason: Option<String>,
718}
719
720impl StreamProcessingResult {
721    /// Creates a new stream processing result.
722    pub fn new() -> Self {
723        Self {
724            lines_processed: 0,
725            counts: EntryCountSummary::new(),
726            aborted: false,
727            abort_reason: None,
728        }
729    }
730
731    /// Marks the result as aborted.
732    pub fn aborted(mut self, reason: impl Into<String>) -> Self {
733        self.aborted = true;
734        self.abort_reason = Some(reason.into());
735        self
736    }
737}
738
739impl Default for StreamProcessingResult {
740    fn default() -> Self {
741        Self::new()
742    }
743}
744
745// ============================================================================
746// Traits
747// ============================================================================
748
749/// Provider for bulk submit operations.
750///
751/// This trait handles the complete lifecycle of bulk submissions including
752/// creating submissions, adding manifests, processing entries, and completing
753/// or aborting submissions.
754#[async_trait]
755pub trait BulkSubmitProvider: ResourceStorage {
756    /// Creates a new submission.
757    ///
758    /// # Arguments
759    ///
760    /// * `tenant` - The tenant context
761    /// * `id` - The submission identifier
762    /// * `metadata` - Optional metadata to attach to the submission
763    ///
764    /// # Returns
765    ///
766    /// The submission summary.
767    ///
768    /// # Errors
769    ///
770    /// * `BulkSubmitError::DuplicateSubmission` - If a submission with this ID exists
771    async fn create_submission(
772        &self,
773        tenant: &TenantContext,
774        id: &SubmissionId,
775        metadata: Option<Value>,
776    ) -> StorageResult<SubmissionSummary>;
777
778    /// Gets a submission by ID.
779    ///
780    /// # Arguments
781    ///
782    /// * `tenant` - The tenant context
783    /// * `id` - The submission identifier
784    ///
785    /// # Returns
786    ///
787    /// The submission summary if found.
788    async fn get_submission(
789        &self,
790        tenant: &TenantContext,
791        id: &SubmissionId,
792    ) -> StorageResult<Option<SubmissionSummary>>;
793
794    /// Lists submissions.
795    ///
796    /// # Arguments
797    ///
798    /// * `tenant` - The tenant context
799    /// * `submitter` - Optional filter by submitter
800    /// * `status` - Optional filter by status
801    /// * `limit` - Maximum number of results
802    /// * `offset` - Offset for pagination
803    ///
804    /// # Returns
805    ///
806    /// List of submission summaries.
807    async fn list_submissions(
808        &self,
809        tenant: &TenantContext,
810        submitter: Option<&str>,
811        status: Option<SubmissionStatus>,
812        limit: u32,
813        offset: u32,
814    ) -> StorageResult<Vec<SubmissionSummary>>;
815
816    /// Marks a submission as complete.
817    ///
818    /// # Arguments
819    ///
820    /// * `tenant` - The tenant context
821    /// * `id` - The submission identifier
822    ///
823    /// # Returns
824    ///
825    /// The updated submission summary.
826    ///
827    /// # Errors
828    ///
829    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
830    /// * `BulkSubmitError::AlreadyComplete` - If already completed
831    async fn complete_submission(
832        &self,
833        tenant: &TenantContext,
834        id: &SubmissionId,
835    ) -> StorageResult<SubmissionSummary>;
836
837    /// Aborts a submission.
838    ///
839    /// This does NOT automatically roll back changes - use `BulkSubmitRollbackProvider`
840    /// for that functionality.
841    ///
842    /// # Arguments
843    ///
844    /// * `tenant` - The tenant context
845    /// * `id` - The submission identifier
846    /// * `reason` - Reason for aborting
847    ///
848    /// # Returns
849    ///
850    /// The number of pending manifests that were cancelled.
851    ///
852    /// # Errors
853    ///
854    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
855    /// * `BulkSubmitError::AlreadyComplete` - If already completed
856    async fn abort_submission(
857        &self,
858        tenant: &TenantContext,
859        id: &SubmissionId,
860        reason: &str,
861    ) -> StorageResult<u64>;
862
863    /// Adds a manifest to a submission.
864    ///
865    /// # Arguments
866    ///
867    /// * `tenant` - The tenant context
868    /// * `submission_id` - The submission identifier
869    /// * `manifest_url` - Optional URL where the manifest data came from
870    /// * `replaces_manifest_url` - Optional URL of manifest this replaces
871    ///
872    /// # Returns
873    ///
874    /// The created manifest.
875    ///
876    /// # Errors
877    ///
878    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
879    /// * `BulkSubmitError::InvalidState` - If the submission is not in progress
880    async fn add_manifest(
881        &self,
882        tenant: &TenantContext,
883        submission_id: &SubmissionId,
884        manifest_url: Option<&str>,
885        replaces_manifest_url: Option<&str>,
886    ) -> StorageResult<SubmissionManifest>;
887
888    /// Gets a manifest by ID.
889    ///
890    /// # Arguments
891    ///
892    /// * `tenant` - The tenant context
893    /// * `submission_id` - The submission identifier
894    /// * `manifest_id` - The manifest identifier
895    ///
896    /// # Returns
897    ///
898    /// The manifest if found.
899    async fn get_manifest(
900        &self,
901        tenant: &TenantContext,
902        submission_id: &SubmissionId,
903        manifest_id: &str,
904    ) -> StorageResult<Option<SubmissionManifest>>;
905
906    /// Lists manifests in a submission.
907    ///
908    /// # Arguments
909    ///
910    /// * `tenant` - The tenant context
911    /// * `submission_id` - The submission identifier
912    ///
913    /// # Returns
914    ///
915    /// List of manifests.
916    async fn list_manifests(
917        &self,
918        tenant: &TenantContext,
919        submission_id: &SubmissionId,
920    ) -> StorageResult<Vec<SubmissionManifest>>;
921
922    /// Processes entries from a manifest.
923    ///
924    /// # Arguments
925    ///
926    /// * `tenant` - The tenant context
927    /// * `submission_id` - The submission identifier
928    /// * `manifest_id` - The manifest identifier
929    /// * `entries` - The entries to process
930    /// * `options` - Processing options
931    ///
932    /// # Returns
933    ///
934    /// Results for each entry.
935    ///
936    /// # Errors
937    ///
938    /// * `BulkSubmitError::SubmissionNotFound` - If the submission doesn't exist
939    /// * `BulkSubmitError::ManifestNotFound` - If the manifest doesn't exist
940    /// * `BulkSubmitError::MaxErrorsExceeded` - If max errors was reached
941    async fn process_entries(
942        &self,
943        tenant: &TenantContext,
944        submission_id: &SubmissionId,
945        manifest_id: &str,
946        entries: Vec<NdjsonEntry>,
947        options: &BulkProcessingOptions,
948    ) -> StorageResult<Vec<BulkEntryResult>>;
949
950    /// Gets entry results for a manifest.
951    ///
952    /// # Arguments
953    ///
954    /// * `tenant` - The tenant context
955    /// * `submission_id` - The submission identifier
956    /// * `manifest_id` - The manifest identifier
957    /// * `outcome_filter` - Optional filter by outcome
958    /// * `limit` - Maximum number of results
959    /// * `offset` - Offset for pagination
960    ///
961    /// # Returns
962    ///
963    /// List of entry results.
964    async fn get_entry_results(
965        &self,
966        tenant: &TenantContext,
967        submission_id: &SubmissionId,
968        manifest_id: &str,
969        outcome_filter: Option<BulkEntryOutcome>,
970        limit: u32,
971        offset: u32,
972    ) -> StorageResult<Vec<BulkEntryResult>>;
973
974    /// Gets entry counts for a manifest.
975    ///
976    /// # Arguments
977    ///
978    /// * `tenant` - The tenant context
979    /// * `submission_id` - The submission identifier
980    /// * `manifest_id` - The manifest identifier
981    ///
982    /// # Returns
983    ///
984    /// Entry count summary.
985    async fn get_entry_counts(
986        &self,
987        tenant: &TenantContext,
988        submission_id: &SubmissionId,
989        manifest_id: &str,
990    ) -> StorageResult<EntryCountSummary>;
991}
992
993/// Provider for streaming NDJSON processing.
994///
995/// This trait extends `BulkSubmitProvider` with the ability to process
996/// NDJSON data from an async reader stream.
997#[async_trait]
998pub trait StreamingBulkSubmitProvider: BulkSubmitProvider {
999    /// Processes NDJSON data from a stream.
1000    ///
1001    /// # Arguments
1002    ///
1003    /// * `tenant` - The tenant context
1004    /// * `submission_id` - The submission identifier
1005    /// * `manifest_id` - The manifest identifier
1006    /// * `resource_type` - Expected resource type (for validation)
1007    /// * `reader` - Async reader providing NDJSON lines
1008    /// * `options` - Processing options
1009    ///
1010    /// # Returns
1011    ///
1012    /// Result of the streaming processing.
1013    async fn process_ndjson_stream(
1014        &self,
1015        tenant: &TenantContext,
1016        submission_id: &SubmissionId,
1017        manifest_id: &str,
1018        resource_type: &str,
1019        reader: Box<dyn AsyncBufRead + Send + Unpin>,
1020        options: &BulkProcessingOptions,
1021    ) -> StorageResult<StreamProcessingResult>;
1022}
1023
1024/// Provider for rollback of bulk submissions.
1025///
1026/// This trait extends `BulkSubmitProvider` with the ability to track and
1027/// rollback changes made during a submission.
1028#[async_trait]
1029pub trait BulkSubmitRollbackProvider: BulkSubmitProvider {
1030    /// Records a change for potential rollback.
1031    ///
1032    /// # Arguments
1033    ///
1034    /// * `tenant` - The tenant context
1035    /// * `submission_id` - The submission identifier
1036    /// * `change` - The change to record
1037    async fn record_change(
1038        &self,
1039        tenant: &TenantContext,
1040        submission_id: &SubmissionId,
1041        change: &SubmissionChange,
1042    ) -> StorageResult<()>;
1043
1044    /// Lists recorded changes for a submission.
1045    ///
1046    /// # Arguments
1047    ///
1048    /// * `tenant` - The tenant context
1049    /// * `submission_id` - The submission identifier
1050    /// * `limit` - Maximum number of results
1051    /// * `offset` - Offset for pagination
1052    ///
1053    /// # Returns
1054    ///
1055    /// List of recorded changes.
1056    async fn list_changes(
1057        &self,
1058        tenant: &TenantContext,
1059        submission_id: &SubmissionId,
1060        limit: u32,
1061        offset: u32,
1062    ) -> StorageResult<Vec<SubmissionChange>>;
1063
1064    /// Rolls back a single change.
1065    ///
1066    /// For creates: deletes the resource.
1067    /// For updates: restores the previous content.
1068    ///
1069    /// # Arguments
1070    ///
1071    /// * `tenant` - The tenant context
1072    /// * `submission_id` - The submission identifier
1073    /// * `change` - The change to rollback
1074    ///
1075    /// # Returns
1076    ///
1077    /// Whether the rollback was successful.
1078    async fn rollback_change(
1079        &self,
1080        tenant: &TenantContext,
1081        submission_id: &SubmissionId,
1082        change: &SubmissionChange,
1083    ) -> StorageResult<bool>;
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089
1090    #[test]
1091    fn test_submission_id() {
1092        let id = SubmissionId::new("my-system", "sub-123");
1093        assert_eq!(id.submitter, "my-system");
1094        assert_eq!(id.submission_id, "sub-123");
1095        assert_eq!(id.to_string(), "my-system/sub-123");
1096    }
1097
1098    #[test]
1099    fn test_submission_id_generate() {
1100        let id = SubmissionId::generate("my-system");
1101        assert_eq!(id.submitter, "my-system");
1102        assert!(!id.submission_id.is_empty());
1103    }
1104
1105    #[test]
1106    fn test_submission_status() {
1107        assert!(!SubmissionStatus::InProgress.is_terminal());
1108        assert!(SubmissionStatus::Complete.is_terminal());
1109        assert!(SubmissionStatus::Aborted.is_terminal());
1110
1111        let status: SubmissionStatus = "in-progress".parse().unwrap();
1112        assert_eq!(status, SubmissionStatus::InProgress);
1113    }
1114
1115    #[test]
1116    fn test_manifest_status() {
1117        assert!(!ManifestStatus::Pending.is_terminal());
1118        assert!(!ManifestStatus::Processing.is_terminal());
1119        assert!(ManifestStatus::Completed.is_terminal());
1120        assert!(ManifestStatus::Failed.is_terminal());
1121    }
1122
1123    #[test]
1124    fn test_bulk_entry_result() {
1125        let success = BulkEntryResult::success(1, "Patient", "pat-123", true);
1126        assert!(success.is_success());
1127        assert!(!success.is_error());
1128        assert!(success.created);
1129
1130        let error = BulkEntryResult::validation_error(
1131            2,
1132            "Patient",
1133            serde_json::json!({"resourceType": "OperationOutcome"}),
1134        );
1135        assert!(!error.is_success());
1136        assert!(error.is_error());
1137    }
1138
1139    #[test]
1140    fn test_ndjson_entry_parse() {
1141        let line = r#"{"resourceType":"Patient","id":"123","name":[{"family":"Smith"}]}"#;
1142        let entry = NdjsonEntry::parse(1, line).unwrap();
1143
1144        assert_eq!(entry.line_number, 1);
1145        assert_eq!(entry.resource_type, "Patient");
1146        assert_eq!(entry.resource_id, Some("123".to_string()));
1147    }
1148
1149    #[test]
1150    fn test_ndjson_entry_parse_error() {
1151        let result = NdjsonEntry::parse(1, "not json");
1152        assert!(result.is_err());
1153
1154        let result = NdjsonEntry::parse(1, r#"{"id":"123"}"#);
1155        assert!(result.is_err()); // Missing resourceType
1156    }
1157
1158    #[test]
1159    fn test_bulk_processing_options() {
1160        let options = BulkProcessingOptions::new()
1161            .with_batch_size(50)
1162            .with_max_errors(10)
1163            .with_continue_on_error(false);
1164
1165        assert_eq!(options.batch_size, 50);
1166        assert_eq!(options.max_errors, 10);
1167        assert!(!options.continue_on_error);
1168    }
1169
1170    #[test]
1171    fn test_bulk_processing_options_strict() {
1172        let options = BulkProcessingOptions::strict();
1173        assert!(!options.continue_on_error);
1174        assert_eq!(options.max_errors, 1);
1175    }
1176
1177    #[test]
1178    fn test_submission_change() {
1179        let create = SubmissionChange::create("manifest-1", "Patient", "pat-123", "1");
1180        assert_eq!(create.change_type, ChangeType::Create);
1181        assert!(create.previous_content.is_none());
1182
1183        let update = SubmissionChange::update(
1184            "manifest-1",
1185            "Patient",
1186            "pat-123",
1187            "1",
1188            "2",
1189            serde_json::json!({"resourceType": "Patient"}),
1190        );
1191        assert_eq!(update.change_type, ChangeType::Update);
1192        assert!(update.previous_content.is_some());
1193    }
1194
1195    #[test]
1196    fn test_entry_count_summary() {
1197        let mut counts = EntryCountSummary::new();
1198        counts.increment(BulkEntryOutcome::Success);
1199        counts.increment(BulkEntryOutcome::Success);
1200        counts.increment(BulkEntryOutcome::ValidationError);
1201        counts.increment(BulkEntryOutcome::ProcessingError);
1202        counts.increment(BulkEntryOutcome::Skipped);
1203
1204        assert_eq!(counts.total, 5);
1205        assert_eq!(counts.success, 2);
1206        assert_eq!(counts.error_count(), 2);
1207        assert_eq!(counts.skipped, 1);
1208    }
1209
1210    #[test]
1211    fn test_stream_processing_result() {
1212        let result = StreamProcessingResult::new().aborted("max errors exceeded");
1213        assert!(result.aborted);
1214        assert_eq!(result.abort_reason, Some("max errors exceeded".to_string()));
1215    }
1216}