1use async_trait::async_trait;
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use tokio::io::AsyncBufRead;
55use uuid::Uuid;
56
57use crate::core::storage::ResourceStorage;
58use crate::error::StorageResult;
59use crate::tenant::TenantContext;
60
61#[cfg(feature = "audit")]
63pub mod audit {
64 use helios_audit::{AuditAction, AuditEventBuilder, AuditSink};
65
66 use super::SubmissionId;
67
68 pub async fn record_submit_event(
72 sink: &dyn AuditSink,
73 source_observer: &str,
74 agent: Option<&str>,
75 submission_id: &SubmissionId,
76 phase: &str,
77 outcome: &str,
78 outcome_desc: Option<&str>,
79 ) {
80 let mut builder = AuditEventBuilder::new(source_observer)
81 .event_type(
82 "http://terminology.hl7.org/CodeSystem/audit-event-type",
83 "object",
84 )
85 .action(AuditAction::Execute)
86 .outcome(outcome)
87 .detail("audit-operation", "bulk-import")
88 .detail("submission-id", &submission_id.submission_id)
89 .detail("submitter", &submission_id.submitter)
90 .detail("phase", phase);
91 if let Some(a) = agent {
92 builder = builder.agent(a, None, true);
93 }
94 if let Some(d) = outcome_desc {
95 builder = builder.outcome_desc(d);
96 }
97 sink.record(builder.build()).await;
98 }
99
100 #[cfg(test)]
101 mod tests {
102 use helios_audit::sinks::NullSink;
103
104 use super::*;
105 use crate::core::bulk_submit::SubmissionId;
106
107 #[tokio::test]
108 async fn test_submit_event_includes_phase() {
109 let sink = NullSink;
110 let id = SubmissionId::new("client-a", "sub-123");
111 record_submit_event(&sink, "Device/hfs", None, &id, "start", "0", None).await;
112 }
113
114 #[test]
115 fn test_submit_event_has_details() {
116 let id = SubmissionId::new("client-a", "sub-123");
117 let event = AuditEventBuilder::new("Device/hfs")
118 .event_type(
119 "http://terminology.hl7.org/CodeSystem/audit-event-type",
120 "object",
121 )
122 .action(AuditAction::Execute)
123 .outcome("0")
124 .detail("audit-operation", "bulk-import")
125 .detail("submission-id", &id.submission_id)
126 .detail("submitter", &id.submitter)
127 .detail("phase", "complete")
128 .build();
129 let entities = event.entity.as_ref().unwrap();
130 let details = entities[0].detail.as_ref().unwrap();
131 assert_eq!(details.len(), 4);
132 assert_eq!(details[3].r#type.value.as_deref(), Some("phase"));
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
142pub struct SubmissionId {
143 pub submitter: String,
145 pub submission_id: String,
147}
148
149impl SubmissionId {
150 pub fn new(submitter: impl Into<String>, submission_id: impl Into<String>) -> Self {
152 Self {
153 submitter: submitter.into(),
154 submission_id: submission_id.into(),
155 }
156 }
157
158 pub fn generate(submitter: impl Into<String>) -> Self {
160 Self {
161 submitter: submitter.into(),
162 submission_id: Uuid::new_v4().to_string(),
163 }
164 }
165}
166
167impl std::fmt::Display for SubmissionId {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 write!(f, "{}/{}", self.submitter, self.submission_id)
170 }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(rename_all = "kebab-case")]
176pub enum SubmissionStatus {
177 InProgress,
179 Complete,
181 Aborted,
183}
184
185impl SubmissionStatus {
186 pub fn is_terminal(&self) -> bool {
188 matches!(self, Self::Complete | Self::Aborted)
189 }
190}
191
192impl std::fmt::Display for SubmissionStatus {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 match self {
195 Self::InProgress => write!(f, "in-progress"),
196 Self::Complete => write!(f, "complete"),
197 Self::Aborted => write!(f, "aborted"),
198 }
199 }
200}
201
202impl std::str::FromStr for SubmissionStatus {
203 type Err = String;
204
205 fn from_str(s: &str) -> Result<Self, Self::Err> {
206 match s {
207 "in-progress" | "in_progress" => Ok(Self::InProgress),
208 "complete" => Ok(Self::Complete),
209 "aborted" => Ok(Self::Aborted),
210 _ => Err(format!("unknown submission status: {}", s)),
211 }
212 }
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
217#[serde(rename_all = "kebab-case")]
218pub enum ManifestStatus {
219 Pending,
221 Processing,
223 Completed,
225 Failed,
227 Replaced,
229}
230
231impl ManifestStatus {
232 pub fn is_terminal(&self) -> bool {
234 matches!(self, Self::Completed | Self::Failed | Self::Replaced)
235 }
236}
237
238impl std::fmt::Display for ManifestStatus {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 match self {
241 Self::Pending => write!(f, "pending"),
242 Self::Processing => write!(f, "processing"),
243 Self::Completed => write!(f, "completed"),
244 Self::Failed => write!(f, "failed"),
245 Self::Replaced => write!(f, "replaced"),
246 }
247 }
248}
249
250impl std::str::FromStr for ManifestStatus {
251 type Err = String;
252
253 fn from_str(s: &str) -> Result<Self, Self::Err> {
254 match s {
255 "pending" => Ok(Self::Pending),
256 "processing" => Ok(Self::Processing),
257 "completed" => Ok(Self::Completed),
258 "failed" => Ok(Self::Failed),
259 "replaced" => Ok(Self::Replaced),
260 _ => Err(format!("unknown manifest status: {}", s)),
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct SubmissionManifest {
268 pub manifest_id: String,
270 #[serde(skip_serializing_if = "Option::is_none")]
272 pub manifest_url: Option<String>,
273 #[serde(skip_serializing_if = "Option::is_none")]
275 pub replaces_manifest_url: Option<String>,
276 pub status: ManifestStatus,
278 pub added_at: DateTime<Utc>,
280 pub total_entries: u64,
282 pub processed_entries: u64,
284 pub failed_entries: u64,
286}
287
288impl SubmissionManifest {
289 pub fn new(manifest_id: impl Into<String>) -> Self {
291 Self {
292 manifest_id: manifest_id.into(),
293 manifest_url: None,
294 replaces_manifest_url: None,
295 status: ManifestStatus::Pending,
296 added_at: Utc::now(),
297 total_entries: 0,
298 processed_entries: 0,
299 failed_entries: 0,
300 }
301 }
302
303 pub fn with_url(mut self, url: impl Into<String>) -> Self {
305 self.manifest_url = Some(url.into());
306 self
307 }
308
309 pub fn with_replaces(mut self, url: impl Into<String>) -> Self {
311 self.replaces_manifest_url = Some(url.into());
312 self
313 }
314}
315
316#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
318#[serde(rename_all = "kebab-case")]
319pub enum BulkEntryOutcome {
320 Success,
322 ValidationError,
324 ProcessingError,
326 Skipped,
328}
329
330impl std::fmt::Display for BulkEntryOutcome {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 Self::Success => write!(f, "success"),
334 Self::ValidationError => write!(f, "validation-error"),
335 Self::ProcessingError => write!(f, "processing-error"),
336 Self::Skipped => write!(f, "skipped"),
337 }
338 }
339}
340
341impl std::str::FromStr for BulkEntryOutcome {
342 type Err = String;
343
344 fn from_str(s: &str) -> Result<Self, Self::Err> {
345 match s {
346 "success" => Ok(Self::Success),
347 "validation-error" | "validation_error" => Ok(Self::ValidationError),
348 "processing-error" | "processing_error" => Ok(Self::ProcessingError),
349 "skipped" => Ok(Self::Skipped),
350 _ => Err(format!("unknown entry outcome: {}", s)),
351 }
352 }
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct BulkEntryResult {
358 pub line_number: u64,
360 pub resource_type: String,
362 #[serde(skip_serializing_if = "Option::is_none")]
364 pub resource_id: Option<String>,
365 pub created: bool,
367 pub outcome: BulkEntryOutcome,
369 #[serde(skip_serializing_if = "Option::is_none")]
371 pub operation_outcome: Option<Value>,
372}
373
374impl BulkEntryResult {
375 pub fn success(
377 line_number: u64,
378 resource_type: impl Into<String>,
379 resource_id: impl Into<String>,
380 created: bool,
381 ) -> Self {
382 Self {
383 line_number,
384 resource_type: resource_type.into(),
385 resource_id: Some(resource_id.into()),
386 created,
387 outcome: BulkEntryOutcome::Success,
388 operation_outcome: None,
389 }
390 }
391
392 pub fn validation_error(
394 line_number: u64,
395 resource_type: impl Into<String>,
396 outcome: Value,
397 ) -> Self {
398 Self {
399 line_number,
400 resource_type: resource_type.into(),
401 resource_id: None,
402 created: false,
403 outcome: BulkEntryOutcome::ValidationError,
404 operation_outcome: Some(outcome),
405 }
406 }
407
408 pub fn processing_error(
410 line_number: u64,
411 resource_type: impl Into<String>,
412 outcome: Value,
413 ) -> Self {
414 Self {
415 line_number,
416 resource_type: resource_type.into(),
417 resource_id: None,
418 created: false,
419 outcome: BulkEntryOutcome::ProcessingError,
420 operation_outcome: Some(outcome),
421 }
422 }
423
424 pub fn skipped(line_number: u64, resource_type: impl Into<String>, reason: &str) -> Self {
426 Self {
427 line_number,
428 resource_type: resource_type.into(),
429 resource_id: None,
430 created: false,
431 outcome: BulkEntryOutcome::Skipped,
432 operation_outcome: Some(serde_json::json!({
433 "resourceType": "OperationOutcome",
434 "issue": [{
435 "severity": "information",
436 "code": "informational",
437 "diagnostics": reason
438 }]
439 })),
440 }
441 }
442
443 pub fn is_success(&self) -> bool {
445 self.outcome == BulkEntryOutcome::Success
446 }
447
448 pub fn is_error(&self) -> bool {
450 matches!(
451 self.outcome,
452 BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError
453 )
454 }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
459pub struct SubmissionSummary {
460 pub id: SubmissionId,
462 pub status: SubmissionStatus,
464 pub created_at: DateTime<Utc>,
466 pub updated_at: DateTime<Utc>,
468 #[serde(skip_serializing_if = "Option::is_none")]
470 pub completed_at: Option<DateTime<Utc>>,
471 pub manifest_count: u32,
473 pub total_entries: u64,
475 pub success_count: u64,
477 pub error_count: u64,
479 pub skipped_count: u64,
481 #[serde(skip_serializing_if = "Option::is_none")]
483 pub metadata: Option<Value>,
484}
485
486impl SubmissionSummary {
487 pub fn new(id: SubmissionId) -> Self {
489 let now = Utc::now();
490 Self {
491 id,
492 status: SubmissionStatus::InProgress,
493 created_at: now,
494 updated_at: now,
495 completed_at: None,
496 manifest_count: 0,
497 total_entries: 0,
498 success_count: 0,
499 error_count: 0,
500 skipped_count: 0,
501 metadata: None,
502 }
503 }
504
505 pub fn with_metadata(mut self, metadata: Value) -> Self {
507 self.metadata = Some(metadata);
508 self
509 }
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
514pub struct NdjsonEntry {
515 pub line_number: u64,
517 pub resource_type: String,
519 #[serde(skip_serializing_if = "Option::is_none")]
521 pub resource_id: Option<String>,
522 pub resource: Value,
524}
525
526impl NdjsonEntry {
527 pub fn new(line_number: u64, resource_type: impl Into<String>, resource: Value) -> Self {
529 let resource_type = resource_type.into();
530 let resource_id = resource
531 .get("id")
532 .and_then(|v| v.as_str())
533 .map(String::from);
534 Self {
535 line_number,
536 resource_type,
537 resource_id,
538 resource,
539 }
540 }
541
542 pub fn parse(line_number: u64, line: &str) -> Result<Self, String> {
544 let resource: Value =
545 serde_json::from_str(line).map_err(|e| format!("invalid JSON: {}", e))?;
546
547 let resource_type = resource
548 .get("resourceType")
549 .and_then(|v| v.as_str())
550 .ok_or_else(|| "missing resourceType".to_string())?
551 .to_string();
552
553 Ok(Self::new(line_number, resource_type, resource))
554 }
555}
556
557#[derive(Debug, Clone, Serialize, Deserialize)]
559pub struct BulkProcessingOptions {
560 #[serde(default = "default_submit_batch_size")]
562 pub batch_size: u32,
563 #[serde(default = "default_continue_on_error")]
565 pub continue_on_error: bool,
566 #[serde(default)]
568 pub max_errors: u32,
569 #[serde(default = "default_allow_updates")]
571 pub allow_updates: bool,
572}
573
574fn default_submit_batch_size() -> u32 {
575 100
576}
577
578fn default_continue_on_error() -> bool {
579 true
580}
581
582fn default_allow_updates() -> bool {
583 true
584}
585
586impl Default for BulkProcessingOptions {
587 fn default() -> Self {
588 Self::new()
589 }
590}
591
592impl BulkProcessingOptions {
593 pub fn new() -> Self {
595 Self {
596 batch_size: default_submit_batch_size(),
597 continue_on_error: default_continue_on_error(),
598 max_errors: 0,
599 allow_updates: default_allow_updates(),
600 }
601 }
602
603 pub fn with_batch_size(mut self, batch_size: u32) -> Self {
605 self.batch_size = batch_size;
606 self
607 }
608
609 pub fn with_continue_on_error(mut self, continue_on_error: bool) -> Self {
611 self.continue_on_error = continue_on_error;
612 self
613 }
614
615 pub fn with_max_errors(mut self, max_errors: u32) -> Self {
617 self.max_errors = max_errors;
618 self
619 }
620
621 pub fn with_allow_updates(mut self, allow_updates: bool) -> Self {
623 self.allow_updates = allow_updates;
624 self
625 }
626
627 pub fn strict() -> Self {
629 Self {
630 batch_size: default_submit_batch_size(),
631 continue_on_error: false,
632 max_errors: 1,
633 allow_updates: true,
634 }
635 }
636
637 pub fn create_only() -> Self {
639 Self {
640 batch_size: default_submit_batch_size(),
641 continue_on_error: true,
642 max_errors: 0,
643 allow_updates: false,
644 }
645 }
646}
647
648#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
650#[serde(rename_all = "lowercase")]
651pub enum ChangeType {
652 Create,
654 Update,
656}
657
658impl std::fmt::Display for ChangeType {
659 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
660 match self {
661 Self::Create => write!(f, "create"),
662 Self::Update => write!(f, "update"),
663 }
664 }
665}
666
667impl std::str::FromStr for ChangeType {
668 type Err = String;
669
670 fn from_str(s: &str) -> Result<Self, Self::Err> {
671 match s {
672 "create" => Ok(Self::Create),
673 "update" => Ok(Self::Update),
674 _ => Err(format!("unknown change type: {}", s)),
675 }
676 }
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
681pub struct SubmissionChange {
682 pub change_id: String,
684 pub manifest_id: String,
686 pub change_type: ChangeType,
688 pub resource_type: String,
690 pub resource_id: String,
692 #[serde(skip_serializing_if = "Option::is_none")]
694 pub previous_version: Option<String>,
695 pub new_version: String,
697 #[serde(skip_serializing_if = "Option::is_none")]
699 pub previous_content: Option<Value>,
700 pub changed_at: DateTime<Utc>,
702}
703
704impl SubmissionChange {
705 pub fn create(
707 manifest_id: impl Into<String>,
708 resource_type: impl Into<String>,
709 resource_id: impl Into<String>,
710 new_version: impl Into<String>,
711 ) -> Self {
712 Self {
713 change_id: Uuid::new_v4().to_string(),
714 manifest_id: manifest_id.into(),
715 change_type: ChangeType::Create,
716 resource_type: resource_type.into(),
717 resource_id: resource_id.into(),
718 previous_version: None,
719 new_version: new_version.into(),
720 previous_content: None,
721 changed_at: Utc::now(),
722 }
723 }
724
725 pub fn update(
727 manifest_id: impl Into<String>,
728 resource_type: impl Into<String>,
729 resource_id: impl Into<String>,
730 previous_version: impl Into<String>,
731 new_version: impl Into<String>,
732 previous_content: Value,
733 ) -> Self {
734 Self {
735 change_id: Uuid::new_v4().to_string(),
736 manifest_id: manifest_id.into(),
737 change_type: ChangeType::Update,
738 resource_type: resource_type.into(),
739 resource_id: resource_id.into(),
740 previous_version: Some(previous_version.into()),
741 new_version: new_version.into(),
742 previous_content: Some(previous_content),
743 changed_at: Utc::now(),
744 }
745 }
746}
747
748#[derive(Debug, Clone, Default, Serialize, Deserialize)]
750pub struct EntryCountSummary {
751 pub total: u64,
753 pub success: u64,
755 pub validation_error: u64,
757 pub processing_error: u64,
759 pub skipped: u64,
761}
762
763impl EntryCountSummary {
764 pub fn new() -> Self {
766 Self::default()
767 }
768
769 pub fn error_count(&self) -> u64 {
771 self.validation_error + self.processing_error
772 }
773
774 pub fn increment(&mut self, outcome: BulkEntryOutcome) {
776 self.total += 1;
777 match outcome {
778 BulkEntryOutcome::Success => self.success += 1,
779 BulkEntryOutcome::ValidationError => self.validation_error += 1,
780 BulkEntryOutcome::ProcessingError => self.processing_error += 1,
781 BulkEntryOutcome::Skipped => self.skipped += 1,
782 }
783 }
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
788pub struct StreamProcessingResult {
789 pub lines_processed: u64,
791 pub counts: EntryCountSummary,
793 pub aborted: bool,
795 #[serde(skip_serializing_if = "Option::is_none")]
797 pub abort_reason: Option<String>,
798}
799
800impl StreamProcessingResult {
801 pub fn new() -> Self {
803 Self {
804 lines_processed: 0,
805 counts: EntryCountSummary::new(),
806 aborted: false,
807 abort_reason: None,
808 }
809 }
810
811 pub fn aborted(mut self, reason: impl Into<String>) -> Self {
813 self.aborted = true;
814 self.abort_reason = Some(reason.into());
815 self
816 }
817}
818
819impl Default for StreamProcessingResult {
820 fn default() -> Self {
821 Self::new()
822 }
823}
824
825#[async_trait]
835pub trait BulkSubmitProvider: ResourceStorage {
836 async fn create_submission(
852 &self,
853 tenant: &TenantContext,
854 id: &SubmissionId,
855 metadata: Option<Value>,
856 ) -> StorageResult<SubmissionSummary>;
857
858 async fn get_submission(
869 &self,
870 tenant: &TenantContext,
871 id: &SubmissionId,
872 ) -> StorageResult<Option<SubmissionSummary>>;
873
874 async fn list_submissions(
888 &self,
889 tenant: &TenantContext,
890 submitter: Option<&str>,
891 status: Option<SubmissionStatus>,
892 limit: u32,
893 offset: u32,
894 ) -> StorageResult<Vec<SubmissionSummary>>;
895
896 async fn complete_submission(
912 &self,
913 tenant: &TenantContext,
914 id: &SubmissionId,
915 ) -> StorageResult<SubmissionSummary>;
916
917 async fn abort_submission(
937 &self,
938 tenant: &TenantContext,
939 id: &SubmissionId,
940 reason: &str,
941 ) -> StorageResult<u64>;
942
943 async fn add_manifest(
961 &self,
962 tenant: &TenantContext,
963 submission_id: &SubmissionId,
964 manifest_url: Option<&str>,
965 replaces_manifest_url: Option<&str>,
966 ) -> StorageResult<SubmissionManifest>;
967
968 async fn get_manifest(
980 &self,
981 tenant: &TenantContext,
982 submission_id: &SubmissionId,
983 manifest_id: &str,
984 ) -> StorageResult<Option<SubmissionManifest>>;
985
986 async fn list_manifests(
997 &self,
998 tenant: &TenantContext,
999 submission_id: &SubmissionId,
1000 ) -> StorageResult<Vec<SubmissionManifest>>;
1001
1002 async fn process_entries(
1022 &self,
1023 tenant: &TenantContext,
1024 submission_id: &SubmissionId,
1025 manifest_id: &str,
1026 entries: Vec<NdjsonEntry>,
1027 options: &BulkProcessingOptions,
1028 ) -> StorageResult<Vec<BulkEntryResult>>;
1029
1030 async fn get_entry_results(
1045 &self,
1046 tenant: &TenantContext,
1047 submission_id: &SubmissionId,
1048 manifest_id: &str,
1049 outcome_filter: Option<BulkEntryOutcome>,
1050 limit: u32,
1051 offset: u32,
1052 ) -> StorageResult<Vec<BulkEntryResult>>;
1053
1054 async fn get_entry_counts(
1066 &self,
1067 tenant: &TenantContext,
1068 submission_id: &SubmissionId,
1069 manifest_id: &str,
1070 ) -> StorageResult<EntryCountSummary>;
1071}
1072
1073#[async_trait]
1078pub trait StreamingBulkSubmitProvider: BulkSubmitProvider {
1079 async fn process_ndjson_stream(
1094 &self,
1095 tenant: &TenantContext,
1096 submission_id: &SubmissionId,
1097 manifest_id: &str,
1098 resource_type: &str,
1099 reader: Box<dyn AsyncBufRead + Send + Unpin>,
1100 options: &BulkProcessingOptions,
1101 ) -> StorageResult<StreamProcessingResult>;
1102}
1103
1104#[async_trait]
1109pub trait BulkSubmitRollbackProvider: BulkSubmitProvider {
1110 async fn record_change(
1118 &self,
1119 tenant: &TenantContext,
1120 submission_id: &SubmissionId,
1121 change: &SubmissionChange,
1122 ) -> StorageResult<()>;
1123
1124 async fn list_changes(
1137 &self,
1138 tenant: &TenantContext,
1139 submission_id: &SubmissionId,
1140 limit: u32,
1141 offset: u32,
1142 ) -> StorageResult<Vec<SubmissionChange>>;
1143
1144 async fn rollback_change(
1159 &self,
1160 tenant: &TenantContext,
1161 submission_id: &SubmissionId,
1162 change: &SubmissionChange,
1163 ) -> StorageResult<bool>;
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169
1170 #[test]
1171 fn test_submission_id() {
1172 let id = SubmissionId::new("my-system", "sub-123");
1173 assert_eq!(id.submitter, "my-system");
1174 assert_eq!(id.submission_id, "sub-123");
1175 assert_eq!(id.to_string(), "my-system/sub-123");
1176 }
1177
1178 #[test]
1179 fn test_submission_id_generate() {
1180 let id = SubmissionId::generate("my-system");
1181 assert_eq!(id.submitter, "my-system");
1182 assert!(!id.submission_id.is_empty());
1183 }
1184
1185 #[test]
1186 fn test_submission_status() {
1187 assert!(!SubmissionStatus::InProgress.is_terminal());
1188 assert!(SubmissionStatus::Complete.is_terminal());
1189 assert!(SubmissionStatus::Aborted.is_terminal());
1190
1191 let status: SubmissionStatus = "in-progress".parse().unwrap();
1192 assert_eq!(status, SubmissionStatus::InProgress);
1193 }
1194
1195 #[test]
1196 fn test_manifest_status() {
1197 assert!(!ManifestStatus::Pending.is_terminal());
1198 assert!(!ManifestStatus::Processing.is_terminal());
1199 assert!(ManifestStatus::Completed.is_terminal());
1200 assert!(ManifestStatus::Failed.is_terminal());
1201 }
1202
1203 #[test]
1204 fn test_bulk_entry_result() {
1205 let success = BulkEntryResult::success(1, "Patient", "pat-123", true);
1206 assert!(success.is_success());
1207 assert!(!success.is_error());
1208 assert!(success.created);
1209
1210 let error = BulkEntryResult::validation_error(
1211 2,
1212 "Patient",
1213 serde_json::json!({"resourceType": "OperationOutcome"}),
1214 );
1215 assert!(!error.is_success());
1216 assert!(error.is_error());
1217 }
1218
1219 #[test]
1220 fn test_ndjson_entry_parse() {
1221 let line = r#"{"resourceType":"Patient","id":"123","name":[{"family":"Smith"}]}"#;
1222 let entry = NdjsonEntry::parse(1, line).unwrap();
1223
1224 assert_eq!(entry.line_number, 1);
1225 assert_eq!(entry.resource_type, "Patient");
1226 assert_eq!(entry.resource_id, Some("123".to_string()));
1227 }
1228
1229 #[test]
1230 fn test_ndjson_entry_parse_error() {
1231 let result = NdjsonEntry::parse(1, "not json");
1232 assert!(result.is_err());
1233
1234 let result = NdjsonEntry::parse(1, r#"{"id":"123"}"#);
1235 assert!(result.is_err()); }
1237
1238 #[test]
1239 fn test_bulk_processing_options() {
1240 let options = BulkProcessingOptions::new()
1241 .with_batch_size(50)
1242 .with_max_errors(10)
1243 .with_continue_on_error(false);
1244
1245 assert_eq!(options.batch_size, 50);
1246 assert_eq!(options.max_errors, 10);
1247 assert!(!options.continue_on_error);
1248 }
1249
1250 #[test]
1251 fn test_bulk_processing_options_strict() {
1252 let options = BulkProcessingOptions::strict();
1253 assert!(!options.continue_on_error);
1254 assert_eq!(options.max_errors, 1);
1255 }
1256
1257 #[test]
1258 fn test_submission_change() {
1259 let create = SubmissionChange::create("manifest-1", "Patient", "pat-123", "1");
1260 assert_eq!(create.change_type, ChangeType::Create);
1261 assert!(create.previous_content.is_none());
1262
1263 let update = SubmissionChange::update(
1264 "manifest-1",
1265 "Patient",
1266 "pat-123",
1267 "1",
1268 "2",
1269 serde_json::json!({"resourceType": "Patient"}),
1270 );
1271 assert_eq!(update.change_type, ChangeType::Update);
1272 assert!(update.previous_content.is_some());
1273 }
1274
1275 #[test]
1276 fn test_entry_count_summary() {
1277 let mut counts = EntryCountSummary::new();
1278 counts.increment(BulkEntryOutcome::Success);
1279 counts.increment(BulkEntryOutcome::Success);
1280 counts.increment(BulkEntryOutcome::ValidationError);
1281 counts.increment(BulkEntryOutcome::ProcessingError);
1282 counts.increment(BulkEntryOutcome::Skipped);
1283
1284 assert_eq!(counts.total, 5);
1285 assert_eq!(counts.success, 2);
1286 assert_eq!(counts.error_count(), 2);
1287 assert_eq!(counts.skipped, 1);
1288 }
1289
1290 #[test]
1291 fn test_stream_processing_result() {
1292 let result = StreamProcessingResult::new().aborted("max errors exceeded");
1293 assert!(result.aborted);
1294 assert_eq!(result.abort_reason, Some("max errors exceeded".to_string()));
1295 }
1296}