1use async_trait::async_trait;
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use serde_json::Value;
50use uuid::Uuid;
51
52use crate::error::StorageResult;
53use crate::tenant::TenantContext;
54
55#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
57pub struct ExportJobId(String);
58
59impl ExportJobId {
60 pub fn new() -> Self {
62 Self(Uuid::new_v4().to_string())
63 }
64
65 pub fn from_string(id: impl Into<String>) -> Self {
67 Self(id.into())
68 }
69
70 pub fn as_str(&self) -> &str {
72 &self.0
73 }
74}
75
76impl Default for ExportJobId {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl std::fmt::Display for ExportJobId {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(f, "{}", self.0)
85 }
86}
87
88impl From<String> for ExportJobId {
89 fn from(s: String) -> Self {
90 Self(s)
91 }
92}
93
94impl From<&str> for ExportJobId {
95 fn from(s: &str) -> Self {
96 Self(s.to_string())
97 }
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "lowercase")]
103pub enum ExportStatus {
104 Accepted,
106 InProgress,
108 Complete,
110 Error,
112 Cancelled,
114}
115
116impl ExportStatus {
117 pub fn is_terminal(&self) -> bool {
119 matches!(self, Self::Complete | Self::Error | Self::Cancelled)
120 }
121
122 pub fn is_active(&self) -> bool {
124 matches!(self, Self::Accepted | Self::InProgress)
125 }
126}
127
128impl std::fmt::Display for ExportStatus {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Self::Accepted => write!(f, "accepted"),
132 Self::InProgress => write!(f, "in-progress"),
133 Self::Complete => write!(f, "complete"),
134 Self::Error => write!(f, "error"),
135 Self::Cancelled => write!(f, "cancelled"),
136 }
137 }
138}
139
140impl std::str::FromStr for ExportStatus {
141 type Err = String;
142
143 fn from_str(s: &str) -> Result<Self, Self::Err> {
144 match s {
145 "accepted" => Ok(Self::Accepted),
146 "in-progress" | "in_progress" => Ok(Self::InProgress),
147 "complete" => Ok(Self::Complete),
148 "error" => Ok(Self::Error),
149 "cancelled" => Ok(Self::Cancelled),
150 _ => Err(format!("unknown export status: {}", s)),
151 }
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(rename_all = "lowercase")]
158pub enum ExportLevel {
159 System,
161 Patient,
163 Group {
165 group_id: String,
167 },
168}
169
170impl ExportLevel {
171 pub fn system() -> Self {
173 Self::System
174 }
175
176 pub fn patient() -> Self {
178 Self::Patient
179 }
180
181 pub fn group(group_id: impl Into<String>) -> Self {
183 Self::Group {
184 group_id: group_id.into(),
185 }
186 }
187}
188
189impl std::fmt::Display for ExportLevel {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 Self::System => write!(f, "system"),
193 Self::Patient => write!(f, "patient"),
194 Self::Group { group_id } => write!(f, "group/{}", group_id),
195 }
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204pub struct TypeFilter {
205 pub resource_type: String,
207 pub query: String,
209}
210
211impl TypeFilter {
212 pub fn new(resource_type: impl Into<String>, query: impl Into<String>) -> Self {
214 Self {
215 resource_type: resource_type.into(),
216 query: query.into(),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct ExportRequest {
224 pub level: ExportLevel,
226
227 #[serde(default)]
229 pub resource_types: Vec<String>,
230
231 #[serde(skip_serializing_if = "Option::is_none")]
233 pub since: Option<DateTime<Utc>>,
234
235 #[serde(default)]
237 pub type_filters: Vec<TypeFilter>,
238
239 #[serde(default = "default_batch_size")]
241 pub batch_size: u32,
242
243 #[serde(default = "default_output_format")]
245 pub output_format: String,
246}
247
248fn default_batch_size() -> u32 {
249 1000
250}
251
252fn default_output_format() -> String {
253 "application/fhir+ndjson".to_string()
254}
255
256impl ExportRequest {
257 pub fn new(level: ExportLevel) -> Self {
259 Self {
260 level,
261 resource_types: Vec::new(),
262 since: None,
263 type_filters: Vec::new(),
264 batch_size: default_batch_size(),
265 output_format: default_output_format(),
266 }
267 }
268
269 pub fn system() -> Self {
271 Self::new(ExportLevel::System)
272 }
273
274 pub fn patient() -> Self {
276 Self::new(ExportLevel::Patient)
277 }
278
279 pub fn group(group_id: impl Into<String>) -> Self {
281 Self::new(ExportLevel::Group {
282 group_id: group_id.into(),
283 })
284 }
285
286 pub fn with_types(mut self, types: Vec<String>) -> Self {
288 self.resource_types = types;
289 self
290 }
291
292 pub fn with_since(mut self, since: DateTime<Utc>) -> Self {
294 self.since = Some(since);
295 self
296 }
297
298 pub fn with_type_filter(mut self, filter: TypeFilter) -> Self {
300 self.type_filters.push(filter);
301 self
302 }
303
304 pub fn with_type_filters(mut self, filters: Vec<TypeFilter>) -> Self {
306 self.type_filters.extend(filters);
307 self
308 }
309
310 pub fn with_batch_size(mut self, batch_size: u32) -> Self {
312 self.batch_size = batch_size;
313 self
314 }
315
316 pub fn group_id(&self) -> Option<&str> {
318 match &self.level {
319 ExportLevel::Group { group_id } => Some(group_id),
320 _ => None,
321 }
322 }
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
327pub struct TypeExportProgress {
328 pub resource_type: String,
330 pub total_count: Option<u64>,
332 pub exported_count: u64,
334 pub error_count: u64,
336 #[serde(skip_serializing_if = "Option::is_none")]
338 pub cursor_state: Option<String>,
339}
340
341impl TypeExportProgress {
342 pub fn new(resource_type: impl Into<String>) -> Self {
344 Self {
345 resource_type: resource_type.into(),
346 total_count: None,
347 exported_count: 0,
348 error_count: 0,
349 cursor_state: None,
350 }
351 }
352
353 pub fn with_total(mut self, total: u64) -> Self {
355 self.total_count = Some(total);
356 self
357 }
358
359 pub fn progress_fraction(&self) -> Option<f64> {
361 self.total_count.map(|total| {
362 if total == 0 {
363 1.0
364 } else {
365 self.exported_count as f64 / total as f64
366 }
367 })
368 }
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct ExportProgress {
374 pub job_id: ExportJobId,
376 pub status: ExportStatus,
378 pub level: ExportLevel,
380 pub transaction_time: DateTime<Utc>,
382 #[serde(skip_serializing_if = "Option::is_none")]
384 pub started_at: Option<DateTime<Utc>>,
385 #[serde(skip_serializing_if = "Option::is_none")]
387 pub completed_at: Option<DateTime<Utc>>,
388 pub type_progress: Vec<TypeExportProgress>,
390 #[serde(skip_serializing_if = "Option::is_none")]
392 pub current_type: Option<String>,
393 #[serde(skip_serializing_if = "Option::is_none")]
395 pub error_message: Option<String>,
396}
397
398impl ExportProgress {
399 pub fn accepted(
401 job_id: ExportJobId,
402 level: ExportLevel,
403 transaction_time: DateTime<Utc>,
404 ) -> Self {
405 Self {
406 job_id,
407 status: ExportStatus::Accepted,
408 level,
409 transaction_time,
410 started_at: None,
411 completed_at: None,
412 type_progress: Vec::new(),
413 current_type: None,
414 error_message: None,
415 }
416 }
417
418 pub fn overall_progress(&self) -> f64 {
420 if self.type_progress.is_empty() {
421 return 0.0;
422 }
423
424 let (total_exported, total_count) = self
425 .type_progress
426 .iter()
427 .fold((0u64, 0u64), |(exp, tot), tp| {
428 (exp + tp.exported_count, tot + tp.total_count.unwrap_or(0))
429 });
430
431 if total_count == 0 {
432 0.0
433 } else {
434 total_exported as f64 / total_count as f64
435 }
436 }
437}
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct ExportOutputFile {
442 #[serde(rename = "type")]
444 pub resource_type: String,
445 pub url: String,
447 #[serde(skip_serializing_if = "Option::is_none")]
449 pub count: Option<u64>,
450}
451
452impl ExportOutputFile {
453 pub fn new(resource_type: impl Into<String>, url: impl Into<String>) -> Self {
455 Self {
456 resource_type: resource_type.into(),
457 url: url.into(),
458 count: None,
459 }
460 }
461
462 pub fn with_count(mut self, count: u64) -> Self {
464 self.count = Some(count);
465 self
466 }
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct ExportManifest {
474 #[serde(rename = "transactionTime")]
476 pub transaction_time: DateTime<Utc>,
477 pub request: String,
479 #[serde(rename = "requiresAccessToken")]
481 pub requires_access_token: bool,
482 pub output: Vec<ExportOutputFile>,
484 #[serde(default)]
486 pub error: Vec<ExportOutputFile>,
487 #[serde(default, skip_serializing_if = "Option::is_none")]
489 pub message: Option<String>,
490 #[serde(default, skip_serializing_if = "Option::is_none")]
492 pub extension: Option<Value>,
493}
494
495impl ExportManifest {
496 pub fn new(transaction_time: DateTime<Utc>, request: impl Into<String>) -> Self {
498 Self {
499 transaction_time,
500 request: request.into(),
501 requires_access_token: true,
502 output: Vec::new(),
503 error: Vec::new(),
504 message: None,
505 extension: None,
506 }
507 }
508
509 pub fn with_output(mut self, file: ExportOutputFile) -> Self {
511 self.output.push(file);
512 self
513 }
514
515 pub fn with_error(mut self, file: ExportOutputFile) -> Self {
517 self.error.push(file);
518 self
519 }
520
521 pub fn with_message(mut self, message: impl Into<String>) -> Self {
523 self.message = Some(message.into());
524 self
525 }
526}
527
528#[derive(Debug, Clone)]
530pub struct NdjsonBatch {
531 pub lines: Vec<String>,
533 pub next_cursor: Option<String>,
535 pub is_last: bool,
537}
538
539impl NdjsonBatch {
540 pub fn new(lines: Vec<String>) -> Self {
542 Self {
543 lines,
544 next_cursor: None,
545 is_last: false,
546 }
547 }
548
549 pub fn empty() -> Self {
551 Self {
552 lines: Vec::new(),
553 next_cursor: None,
554 is_last: true,
555 }
556 }
557
558 pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
560 self.next_cursor = Some(cursor.into());
561 self
562 }
563
564 pub fn as_last(mut self) -> Self {
566 self.is_last = true;
567 self.next_cursor = None;
568 self
569 }
570
571 pub fn len(&self) -> usize {
573 self.lines.len()
574 }
575
576 pub fn is_empty(&self) -> bool {
578 self.lines.is_empty()
579 }
580}
581
582#[async_trait]
591pub trait BulkExportStorage: Send + Sync {
592 async fn start_export(
608 &self,
609 tenant: &TenantContext,
610 request: ExportRequest,
611 ) -> StorageResult<ExportJobId>;
612
613 async fn get_export_status(
628 &self,
629 tenant: &TenantContext,
630 job_id: &ExportJobId,
631 ) -> StorageResult<ExportProgress>;
632
633 async fn cancel_export(
645 &self,
646 tenant: &TenantContext,
647 job_id: &ExportJobId,
648 ) -> StorageResult<()>;
649
650 async fn delete_export(
661 &self,
662 tenant: &TenantContext,
663 job_id: &ExportJobId,
664 ) -> StorageResult<()>;
665
666 async fn get_export_manifest(
682 &self,
683 tenant: &TenantContext,
684 job_id: &ExportJobId,
685 ) -> StorageResult<ExportManifest>;
686
687 async fn list_exports(
698 &self,
699 tenant: &TenantContext,
700 include_completed: bool,
701 ) -> StorageResult<Vec<ExportProgress>>;
702}
703
704#[async_trait]
709pub trait ExportDataProvider: Send + Sync {
710 async fn list_export_types(
721 &self,
722 tenant: &TenantContext,
723 request: &ExportRequest,
724 ) -> StorageResult<Vec<String>>;
725
726 async fn count_export_resources(
738 &self,
739 tenant: &TenantContext,
740 request: &ExportRequest,
741 resource_type: &str,
742 ) -> StorageResult<u64>;
743
744 async fn fetch_export_batch(
758 &self,
759 tenant: &TenantContext,
760 request: &ExportRequest,
761 resource_type: &str,
762 cursor: Option<&str>,
763 batch_size: u32,
764 ) -> StorageResult<NdjsonBatch>;
765}
766
767#[async_trait]
772pub trait PatientExportProvider: ExportDataProvider {
773 async fn list_patient_ids(
786 &self,
787 tenant: &TenantContext,
788 request: &ExportRequest,
789 cursor: Option<&str>,
790 batch_size: u32,
791 ) -> StorageResult<(Vec<String>, Option<String>)>;
792
793 async fn fetch_patient_compartment_batch(
808 &self,
809 tenant: &TenantContext,
810 request: &ExportRequest,
811 resource_type: &str,
812 patient_ids: &[String],
813 cursor: Option<&str>,
814 batch_size: u32,
815 ) -> StorageResult<NdjsonBatch>;
816}
817
818#[async_trait]
823pub trait GroupExportProvider: PatientExportProvider {
824 async fn get_group_members(
839 &self,
840 tenant: &TenantContext,
841 group_id: &str,
842 ) -> StorageResult<Vec<String>>;
843
844 async fn resolve_group_patient_ids(
858 &self,
859 tenant: &TenantContext,
860 group_id: &str,
861 ) -> StorageResult<Vec<String>>;
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867
868 #[test]
869 fn test_export_job_id() {
870 let id = ExportJobId::new();
871 assert!(!id.as_str().is_empty());
872
873 let id2 = ExportJobId::from_string("test-123");
874 assert_eq!(id2.as_str(), "test-123");
875 assert_eq!(id2.to_string(), "test-123");
876 }
877
878 #[test]
879 fn test_export_status() {
880 assert!(ExportStatus::Complete.is_terminal());
881 assert!(ExportStatus::Error.is_terminal());
882 assert!(ExportStatus::Cancelled.is_terminal());
883 assert!(!ExportStatus::Accepted.is_terminal());
884 assert!(!ExportStatus::InProgress.is_terminal());
885
886 assert!(ExportStatus::Accepted.is_active());
887 assert!(ExportStatus::InProgress.is_active());
888 assert!(!ExportStatus::Complete.is_active());
889 }
890
891 #[test]
892 fn test_export_status_display_parse() {
893 let status = ExportStatus::InProgress;
894 assert_eq!(status.to_string(), "in-progress");
895
896 let parsed: ExportStatus = "in-progress".parse().unwrap();
897 assert_eq!(parsed, ExportStatus::InProgress);
898
899 let parsed: ExportStatus = "in_progress".parse().unwrap();
901 assert_eq!(parsed, ExportStatus::InProgress);
902 }
903
904 #[test]
905 fn test_export_level() {
906 let system = ExportLevel::system();
907 assert!(matches!(system, ExportLevel::System));
908
909 let patient = ExportLevel::patient();
910 assert!(matches!(patient, ExportLevel::Patient));
911
912 let group = ExportLevel::group("grp-123");
913 assert!(matches!(group, ExportLevel::Group { group_id } if group_id == "grp-123"));
914 }
915
916 #[test]
917 fn test_export_request_builder() {
918 let request = ExportRequest::system()
919 .with_types(vec!["Patient".to_string(), "Observation".to_string()])
920 .with_batch_size(500)
921 .with_type_filter(TypeFilter::new("Observation", "code=1234"));
922
923 assert!(matches!(request.level, ExportLevel::System));
924 assert_eq!(request.resource_types, vec!["Patient", "Observation"]);
925 assert_eq!(request.batch_size, 500);
926 assert_eq!(request.type_filters.len(), 1);
927 }
928
929 #[test]
930 fn test_export_request_group_id() {
931 let request = ExportRequest::group("grp-123");
932 assert_eq!(request.group_id(), Some("grp-123"));
933
934 let system_request = ExportRequest::system();
935 assert_eq!(system_request.group_id(), None);
936 }
937
938 #[test]
939 fn test_type_export_progress() {
940 let progress = TypeExportProgress::new("Patient").with_total(100);
941 assert_eq!(progress.total_count, Some(100));
942 assert_eq!(progress.progress_fraction(), Some(0.0));
943
944 let mut progress = progress;
945 progress.exported_count = 50;
946 assert_eq!(progress.progress_fraction(), Some(0.5));
947 }
948
949 #[test]
950 fn test_export_manifest() {
951 let manifest = ExportManifest::new(Utc::now(), "https://example.com/$export")
952 .with_output(
953 ExportOutputFile::new("Patient", "/exports/Patient.ndjson").with_count(100),
954 )
955 .with_message("Export complete");
956
957 assert_eq!(manifest.output.len(), 1);
958 assert_eq!(manifest.output[0].resource_type, "Patient");
959 assert_eq!(manifest.output[0].count, Some(100));
960 assert!(manifest.message.is_some());
961 }
962
963 #[test]
964 fn test_ndjson_batch() {
965 let batch = NdjsonBatch::new(vec![
966 r#"{"resourceType":"Patient","id":"1"}"#.to_string(),
967 r#"{"resourceType":"Patient","id":"2"}"#.to_string(),
968 ])
969 .with_cursor("next-page");
970
971 assert_eq!(batch.len(), 2);
972 assert!(!batch.is_empty());
973 assert!(!batch.is_last);
974 assert_eq!(batch.next_cursor, Some("next-page".to_string()));
975
976 let final_batch = batch.as_last();
977 assert!(final_batch.is_last);
978 assert!(final_batch.next_cursor.is_none());
979 }
980
981 #[test]
982 fn test_ndjson_batch_empty() {
983 let batch = NdjsonBatch::empty();
984 assert!(batch.is_empty());
985 assert!(batch.is_last);
986 }
987}