1use crate::{SofBundle, SofError};
2use async_trait::async_trait;
3use helios_fhir::Element;
4use object_store::{
5 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
6 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
7};
8use reqwest;
9use serde_json;
10use std::sync::Arc;
11use tokio::fs;
12use url::Url;
13
14#[async_trait]
16pub trait DataSource: Send + Sync {
17 async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
19}
20
21pub struct UniversalDataSource {
23 client: reqwest::Client,
24}
25
26impl UniversalDataSource {
27 pub fn new() -> Self {
28 Self {
29 client: reqwest::Client::builder()
30 .timeout(std::time::Duration::from_secs(30))
31 .build()
32 .unwrap_or_else(|_| reqwest::Client::new()),
33 }
34 }
35}
36
37impl Default for UniversalDataSource {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[async_trait]
44impl DataSource for UniversalDataSource {
45 async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
46 let url = Url::parse(source).map_err(|e| {
48 SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
49 })?;
50
51 match url.scheme() {
52 "file" => load_from_file(&url).await,
53 "http" | "https" => load_from_http(&self.client, &url).await,
54 "s3" => load_from_s3(&url).await,
55 "gs" => load_from_gcs(&url).await,
56 "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
57 scheme => Err(SofError::UnsupportedSourceProtocol(format!(
58 "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
59 scheme
60 ))),
61 }
62 }
63}
64
65async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
67 let path = url
69 .to_file_path()
70 .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
71
72 if !path.exists() {
74 return Err(SofError::SourceNotFound(format!(
75 "File not found: {}",
76 path.display()
77 )));
78 }
79
80 let contents = fs::read_to_string(&path)
82 .await
83 .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
84
85 parse_fhir_content(&contents, &path.to_string_lossy())
87}
88
89async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
91 let response = client
93 .get(url.as_str())
94 .header("Accept", "application/fhir+json, application/json")
95 .send()
96 .await
97 .map_err(|e| {
98 SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
99 })?;
100
101 if !response.status().is_success() {
103 return Err(SofError::SourceFetchError(format!(
104 "HTTP error {} when fetching '{}'",
105 response.status(),
106 url
107 )));
108 }
109
110 let contents = response
112 .text()
113 .await
114 .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
115
116 parse_fhir_content(&contents, url.as_str())
118}
119
120async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
122 let bucket = url.host_str().ok_or_else(|| {
124 SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
125 })?;
126
127 let path = url.path().trim_start_matches('/');
128 if path.is_empty() {
129 return Err(SofError::InvalidSource(format!(
130 "Invalid S3 URL '{}': missing object path",
131 url
132 )));
133 }
134
135 let store = AmazonS3Builder::new()
137 .with_bucket_name(bucket)
138 .build()
139 .map_err(|e| {
140 SofError::SourceFetchError(format!(
141 "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
142 url, e
143 ))
144 })?;
145
146 load_from_object_store(Arc::new(store), path, url.as_str()).await
147}
148
149async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
151 let bucket = url.host_str().ok_or_else(|| {
153 SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
154 })?;
155
156 let path = url.path().trim_start_matches('/');
157 if path.is_empty() {
158 return Err(SofError::InvalidSource(format!(
159 "Invalid GCS URL '{}': missing object path",
160 url
161 )));
162 }
163
164 let store = GoogleCloudStorageBuilder::new()
166 .with_bucket_name(bucket)
167 .build()
168 .map_err(|e| {
169 SofError::SourceFetchError(format!(
170 "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
171 url, e
172 ))
173 })?;
174
175 load_from_object_store(Arc::new(store), path, url.as_str()).await
176}
177
178async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
180 let (container, path) = if url.scheme() == "azure" {
182 let container = url.host_str().ok_or_else(|| {
184 SofError::InvalidSource(format!(
185 "Invalid Azure URL '{}': missing container name",
186 url
187 ))
188 })?;
189 let path = url.path().trim_start_matches('/');
190 (container.to_string(), path.to_string())
191 } else {
192 let host = url.host_str().ok_or_else(|| {
194 SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
195 })?;
196 let parts: Vec<&str> = host.split('@').collect();
197 if parts.len() != 2 {
198 return Err(SofError::InvalidSource(format!(
199 "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
200 url
201 )));
202 }
203 let container = parts[0];
204 let path = url.path().trim_start_matches('/');
205 (container.to_string(), path.to_string())
206 };
207
208 if path.is_empty() {
209 return Err(SofError::InvalidSource(format!(
210 "Invalid Azure URL '{}': missing blob path",
211 url
212 )));
213 }
214
215 let store = MicrosoftAzureBuilder::new()
217 .with_container_name(&container)
218 .build()
219 .map_err(|e| {
220 SofError::SourceFetchError(format!(
221 "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
222 url, e
223 ))
224 })?;
225
226 load_from_object_store(Arc::new(store), &path, url.as_str()).await
227}
228
229async fn load_from_object_store(
231 store: Arc<dyn ObjectStore>,
232 path: &str,
233 source_name: &str,
234) -> Result<SofBundle, SofError> {
235 let object_path = ObjectPath::from(path);
237
238 let result = store.get(&object_path).await.map_err(|e| match e {
240 object_store::Error::NotFound { .. } => {
241 SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
242 }
243 _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
244 })?;
245
246 let bytes = result
248 .bytes()
249 .await
250 .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
251
252 let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
254 SofError::InvalidSourceContent(format!(
255 "Content from '{}' is not valid UTF-8: {}",
256 source_name, e
257 ))
258 })?;
259
260 parse_fhir_content(&contents, source_name)
262}
263
264fn is_ndjson_extension(source_name: &str) -> bool {
266 source_name.to_lowercase().ends_with(".ndjson")
267}
268
269fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
271 let lines: Vec<&str> = contents
272 .lines()
273 .filter(|line| !line.trim().is_empty())
274 .collect();
275
276 if lines.is_empty() {
277 return Err(SofError::InvalidSourceContent(format!(
278 "Empty NDJSON content from '{}'",
279 source_name
280 )));
281 }
282
283 let mut resources = Vec::new();
285 let mut parse_errors = Vec::new();
286
287 for (line_num, line) in lines.iter().enumerate() {
288 match serde_json::from_str::<serde_json::Value>(line) {
289 Ok(value) => {
290 if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
292 resources.push(value);
293 } else {
294 parse_errors.push(format!(
295 "Line {}: Missing 'resourceType' field",
296 line_num + 1
297 ));
298 }
299 }
300 Err(e) => {
301 parse_errors.push(format!("Line {}: {}", line_num + 1, e));
302 }
303 }
304 }
305
306 if resources.is_empty() {
308 return Err(SofError::InvalidSourceContent(format!(
309 "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
310 source_name,
311 parse_errors.join("; ")
312 )));
313 }
314
315 if !parse_errors.is_empty() {
317 eprintln!(
318 "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
319 parse_errors.len(),
320 source_name,
321 parse_errors.join("; ")
322 );
323 }
324
325 let resources_array = serde_json::Value::Array(resources);
327 wrap_resources_in_bundle(resources_array, source_name)
328}
329
330pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
333 if is_ndjson_extension(source_name) {
335 return parse_ndjson_content(contents, source_name);
336 }
337
338 let value: serde_json::Value = match serde_json::from_str(contents) {
340 Ok(v) => v,
341 Err(json_err) => {
342 if contents.lines().count() > 1 {
345 return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
347 SofError::InvalidSourceContent(format!(
349 "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
350 source_name, json_err, ndjson_err
351 ))
352 });
353 }
354
355 return Err(SofError::InvalidSourceContent(format!(
357 "Failed to parse JSON from '{}': {}",
358 source_name, json_err
359 )));
360 }
361 };
362
363 if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
365 if resource_type == "Bundle" {
366 #[cfg(feature = "R4")]
368 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
369 return Ok(SofBundle::R4(bundle));
370 }
371 #[cfg(feature = "R4B")]
372 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
373 return Ok(SofBundle::R4B(bundle));
374 }
375 #[cfg(feature = "R5")]
376 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
377 return Ok(SofBundle::R5(bundle));
378 }
379 #[cfg(feature = "R6")]
380 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
381 return Ok(SofBundle::R6(bundle));
382 }
383 return Err(SofError::InvalidSourceContent(format!(
384 "Bundle from '{}' could not be parsed as any supported FHIR version",
385 source_name
386 )));
387 }
388
389 return wrap_resource_in_bundle(value, source_name);
391 }
392
393 if value.is_array() {
395 return wrap_resources_in_bundle(value, source_name);
396 }
397
398 Err(SofError::InvalidSourceContent(format!(
399 "Content from '{}' is not a valid FHIR resource or Bundle",
400 source_name
401 )))
402}
403
404fn wrap_resource_in_bundle(
406 resource: serde_json::Value,
407 source_name: &str,
408) -> Result<SofBundle, SofError> {
409 #[cfg(feature = "R4")]
412 if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
413 let mut bundle = helios_fhir::r4::Bundle::default();
414 bundle.r#type = Element {
415 id: None,
416 extension: None,
417 value: Some("collection".to_string()),
418 };
419 bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
420 resource: Some(res),
421 ..Default::default()
422 }]);
423 return Ok(SofBundle::R4(bundle));
424 }
425
426 #[cfg(feature = "R4B")]
428 if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
429 let mut bundle = helios_fhir::r4b::Bundle::default();
430 bundle.r#type = Element {
431 id: None,
432 extension: None,
433 value: Some("collection".to_string()),
434 };
435 bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
436 resource: Some(res),
437 ..Default::default()
438 }]);
439 return Ok(SofBundle::R4B(bundle));
440 }
441
442 #[cfg(feature = "R5")]
444 if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
445 let mut bundle = helios_fhir::r5::Bundle::default();
446 bundle.r#type = Element {
447 id: None,
448 extension: None,
449 value: Some("collection".to_string()),
450 };
451 bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
452 resource: Some(Box::new(res)),
453 ..Default::default()
454 }]);
455 return Ok(SofBundle::R5(bundle));
456 }
457
458 #[cfg(feature = "R6")]
460 if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
461 let mut bundle = helios_fhir::r6::Bundle::default();
462 bundle.r#type = Element {
463 id: None,
464 extension: None,
465 value: Some("collection".to_string()),
466 };
467 bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
468 resource: Some(Box::new(res)),
469 ..Default::default()
470 }]);
471 return Ok(SofBundle::R6(bundle));
472 }
473
474 Err(SofError::InvalidSourceContent(format!(
475 "Resource from '{}' could not be parsed as any supported FHIR version",
476 source_name
477 )))
478}
479
480fn wrap_resources_in_bundle(
482 resources: serde_json::Value,
483 source_name: &str,
484) -> Result<SofBundle, SofError> {
485 let arr = resources
486 .as_array()
487 .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
488
489 if arr.is_empty() {
490 return Err(SofError::InvalidSourceContent(format!(
491 "Empty array of resources from '{}'",
492 source_name
493 )));
494 }
495
496 let first = &arr[0];
498
499 #[cfg(feature = "R4")]
501 if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
502 let mut bundle = helios_fhir::r4::Bundle::default();
503 bundle.r#type = Element {
504 id: None,
505 extension: None,
506 value: Some("collection".to_string()),
507 };
508 let mut entries = Vec::new();
509
510 for resource in arr {
511 let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
512 .map_err(|e| {
513 SofError::InvalidSourceContent(format!(
514 "Failed to parse R4 resource from '{}': {}",
515 source_name, e
516 ))
517 })?;
518 entries.push(helios_fhir::r4::BundleEntry {
519 resource: Some(res),
520 ..Default::default()
521 });
522 }
523
524 bundle.entry = Some(entries);
525 return Ok(SofBundle::R4(bundle));
526 }
527
528 #[cfg(feature = "R4B")]
530 if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
531 let mut bundle = helios_fhir::r4b::Bundle::default();
532 bundle.r#type = Element {
533 id: None,
534 extension: None,
535 value: Some("collection".to_string()),
536 };
537 let mut entries = Vec::new();
538
539 for resource in arr {
540 let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
541 .map_err(|e| {
542 SofError::InvalidSourceContent(format!(
543 "Failed to parse R4B resource from '{}': {}",
544 source_name, e
545 ))
546 })?;
547 entries.push(helios_fhir::r4b::BundleEntry {
548 resource: Some(res),
549 ..Default::default()
550 });
551 }
552
553 bundle.entry = Some(entries);
554 return Ok(SofBundle::R4B(bundle));
555 }
556
557 #[cfg(feature = "R5")]
559 if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
560 let mut bundle = helios_fhir::r5::Bundle::default();
561 bundle.r#type = Element {
562 id: None,
563 extension: None,
564 value: Some("collection".to_string()),
565 };
566 let mut entries = Vec::new();
567
568 for resource in arr {
569 let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
570 .map_err(|e| {
571 SofError::InvalidSourceContent(format!(
572 "Failed to parse R5 resource from '{}': {}",
573 source_name, e
574 ))
575 })?;
576 entries.push(helios_fhir::r5::BundleEntry {
577 resource: Some(Box::new(res)),
578 ..Default::default()
579 });
580 }
581
582 bundle.entry = Some(entries);
583 return Ok(SofBundle::R5(bundle));
584 }
585
586 #[cfg(feature = "R6")]
588 if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
589 let mut bundle = helios_fhir::r6::Bundle::default();
590 bundle.r#type = Element {
591 id: None,
592 extension: None,
593 value: Some("collection".to_string()),
594 };
595 let mut entries = Vec::new();
596
597 for resource in arr {
598 let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
599 .map_err(|e| {
600 SofError::InvalidSourceContent(format!(
601 "Failed to parse R6 resource from '{}': {}",
602 source_name, e
603 ))
604 })?;
605 entries.push(helios_fhir::r6::BundleEntry {
606 resource: Some(Box::new(res)),
607 ..Default::default()
608 });
609 }
610
611 bundle.entry = Some(entries);
612 return Ok(SofBundle::R6(bundle));
613 }
614
615 Err(SofError::InvalidSourceContent(format!(
616 "Resources from '{}' could not be parsed as any supported FHIR version",
617 source_name
618 )))
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624
625 #[tokio::test]
626 async fn test_parse_fhir_bundle() {
627 let bundle_json = r#"{
628 "resourceType": "Bundle",
629 "type": "collection",
630 "entry": [{
631 "resource": {
632 "resourceType": "Patient",
633 "id": "123"
634 }
635 }]
636 }"#;
637
638 let result = parse_fhir_content(bundle_json, "test").unwrap();
639 #[cfg(feature = "R4")]
640 assert!(matches!(result, SofBundle::R4(_)));
641 #[cfg(not(feature = "R4"))]
642 assert!(matches!(result, _));
643 }
644
645 #[tokio::test]
646 async fn test_parse_single_resource() {
647 let patient_json = r#"{
648 "resourceType": "Patient",
649 "id": "123"
650 }"#;
651
652 let result = parse_fhir_content(patient_json, "test").unwrap();
653 #[cfg(feature = "R4")]
654 match result {
655 SofBundle::R4(bundle) => {
656 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
657 }
658 #[cfg(feature = "R4B")]
659 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
660 #[cfg(feature = "R5")]
661 SofBundle::R5(_) => panic!("Expected R4 bundle"),
662 #[cfg(feature = "R6")]
663 SofBundle::R6(_) => panic!("Expected R4 bundle"),
664 }
665 }
666
667 #[tokio::test]
668 async fn test_parse_resource_array() {
669 let resources_json = r#"[
670 {
671 "resourceType": "Patient",
672 "id": "123"
673 },
674 {
675 "resourceType": "Patient",
676 "id": "456"
677 }
678 ]"#;
679
680 let result = parse_fhir_content(resources_json, "test").unwrap();
681 #[cfg(feature = "R4")]
682 match result {
683 SofBundle::R4(bundle) => {
684 assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
685 }
686 #[cfg(feature = "R4B")]
687 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
688 #[cfg(feature = "R5")]
689 SofBundle::R5(_) => panic!("Expected R4 bundle"),
690 #[cfg(feature = "R6")]
691 SofBundle::R6(_) => panic!("Expected R4 bundle"),
692 }
693 }
694
695 #[tokio::test]
696 async fn test_invalid_content() {
697 let invalid_json = r#"{"not": "fhir"}"#;
698 let result = parse_fhir_content(invalid_json, "test");
699 assert!(result.is_err());
700 }
701
702 #[tokio::test]
703 async fn test_s3_url_parsing() {
704 let data_source = UniversalDataSource::new();
705
706 let result = data_source.load("s3:///path/to/file.json").await;
708 assert!(result.is_err());
709 if let Err(SofError::InvalidSource(msg)) = result {
710 assert!(msg.contains("missing bucket name"));
711 }
712
713 let result = data_source.load("s3://bucket/").await;
715 assert!(result.is_err());
716 if let Err(SofError::InvalidSource(msg)) = result {
717 assert!(msg.contains("missing object path"));
718 }
719
720 }
723
724 #[tokio::test]
725 async fn test_gcs_url_parsing() {
726 let data_source = UniversalDataSource::new();
727
728 let result = data_source.load("gs:///path/to/file.json").await;
730 assert!(result.is_err());
731 if let Err(SofError::InvalidSource(msg)) = result {
732 assert!(msg.contains("missing bucket name"));
733 }
734
735 let result = data_source.load("gs://bucket/").await;
737 assert!(result.is_err());
738 if let Err(SofError::InvalidSource(msg)) = result {
739 assert!(msg.contains("missing object path"));
740 }
741 }
742
743 #[tokio::test]
744 async fn test_azure_url_parsing() {
745 let data_source = UniversalDataSource::new();
746
747 let result = data_source.load("azure:///path/to/file.json").await;
749 assert!(result.is_err());
750 if let Err(SofError::InvalidSource(msg)) = result {
751 assert!(msg.contains("missing container name"));
752 }
753
754 let result = data_source.load("azure://container/").await;
756 assert!(result.is_err());
757 if let Err(SofError::InvalidSource(msg)) = result {
758 assert!(msg.contains("missing blob path"));
759 }
760 }
761
762 #[tokio::test]
763 async fn test_unsupported_protocol() {
764 let data_source = UniversalDataSource::new();
765
766 let result = data_source.load("ftp://server/file.json").await;
768 assert!(result.is_err());
769 if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
770 assert!(msg.contains("Unsupported source protocol: ftp"));
771 assert!(msg.contains("Supported:"));
772 }
773 }
774
775 #[tokio::test]
776 async fn test_file_protocol_bundle() {
777 use std::io::Write;
778 use tempfile::NamedTempFile;
779
780 let data_source = UniversalDataSource::new();
781
782 let bundle_json = r#"{
784 "resourceType": "Bundle",
785 "type": "collection",
786 "entry": [{
787 "resource": {
788 "resourceType": "Patient",
789 "id": "test-patient"
790 }
791 }]
792 }"#;
793
794 let mut temp_file = NamedTempFile::new().unwrap();
795 temp_file.write_all(bundle_json.as_bytes()).unwrap();
796 temp_file.flush().unwrap();
797
798 let file_path = temp_file.path();
800 let file_url = format!("file://{}", file_path.to_string_lossy());
801
802 let result = data_source.load(&file_url).await;
804 assert!(result.is_ok());
805
806 #[cfg(feature = "R4")]
807 match result.unwrap() {
808 SofBundle::R4(bundle) => {
809 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
810 }
811 #[cfg(feature = "R4B")]
812 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
813 #[cfg(feature = "R5")]
814 SofBundle::R5(_) => panic!("Expected R4 bundle"),
815 #[cfg(feature = "R6")]
816 SofBundle::R6(_) => panic!("Expected R4 bundle"),
817 }
818 }
819
820 #[tokio::test]
821 async fn test_file_protocol_single_resource() {
822 use std::io::Write;
823 use tempfile::NamedTempFile;
824
825 let data_source = UniversalDataSource::new();
826
827 let patient_json = r#"{
829 "resourceType": "Patient",
830 "id": "test-patient",
831 "name": [{
832 "family": "Test",
833 "given": ["Patient"]
834 }]
835 }"#;
836
837 let mut temp_file = NamedTempFile::new().unwrap();
838 temp_file.write_all(patient_json.as_bytes()).unwrap();
839 temp_file.flush().unwrap();
840
841 let file_path = temp_file.path();
842 let file_url = format!("file://{}", file_path.to_string_lossy());
843
844 let result = data_source.load(&file_url).await;
846 assert!(result.is_ok());
847
848 #[cfg(feature = "R4")]
849 match result.unwrap() {
850 SofBundle::R4(bundle) => {
851 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
852 }
853 #[cfg(feature = "R4B")]
854 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
855 #[cfg(feature = "R5")]
856 SofBundle::R5(_) => panic!("Expected R4 bundle"),
857 #[cfg(feature = "R6")]
858 SofBundle::R6(_) => panic!("Expected R4 bundle"),
859 }
860 }
861
862 #[tokio::test]
863 async fn test_file_protocol_resource_array() {
864 use std::io::Write;
865 use tempfile::NamedTempFile;
866
867 let data_source = UniversalDataSource::new();
868
869 let resources_json = r#"[
871 {
872 "resourceType": "Patient",
873 "id": "patient-1"
874 },
875 {
876 "resourceType": "Patient",
877 "id": "patient-2"
878 },
879 {
880 "resourceType": "Observation",
881 "id": "obs-1",
882 "status": "final",
883 "code": {
884 "text": "Test"
885 }
886 }
887 ]"#;
888
889 let mut temp_file = NamedTempFile::new().unwrap();
890 temp_file.write_all(resources_json.as_bytes()).unwrap();
891 temp_file.flush().unwrap();
892
893 let file_path = temp_file.path();
894 let file_url = format!("file://{}", file_path.to_string_lossy());
895
896 let result = data_source.load(&file_url).await;
898 assert!(result.is_ok());
899
900 #[cfg(feature = "R4")]
901 match result.unwrap() {
902 SofBundle::R4(bundle) => {
903 assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
904 }
905 #[cfg(feature = "R4B")]
906 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
907 #[cfg(feature = "R5")]
908 SofBundle::R5(_) => panic!("Expected R4 bundle"),
909 #[cfg(feature = "R6")]
910 SofBundle::R6(_) => panic!("Expected R4 bundle"),
911 }
912 }
913
914 #[tokio::test]
915 async fn test_file_protocol_file_not_found() {
916 let data_source = UniversalDataSource::new();
917
918 let file_url = "file:///nonexistent/path/to/file.json";
920 let result = data_source.load(file_url).await;
921 assert!(result.is_err());
922
923 if let Err(SofError::SourceNotFound(msg)) = result {
924 assert!(msg.contains("File not found"));
925 } else {
926 panic!("Expected SourceNotFound error");
927 }
928 }
929
930 #[tokio::test]
931 async fn test_file_protocol_invalid_json() {
932 use std::io::Write;
933 use tempfile::NamedTempFile;
934
935 let data_source = UniversalDataSource::new();
936
937 let invalid_json = "{ this is not valid json }";
939
940 let mut temp_file = NamedTempFile::new().unwrap();
941 temp_file.write_all(invalid_json.as_bytes()).unwrap();
942 temp_file.flush().unwrap();
943
944 let file_path = temp_file.path();
945 let file_url = format!("file://{}", file_path.to_string_lossy());
946
947 let result = data_source.load(&file_url).await;
949 assert!(result.is_err());
950
951 if let Err(SofError::InvalidSourceContent(msg)) = result {
952 assert!(msg.contains("Failed to parse JSON"));
953 } else {
954 panic!("Expected InvalidSourceContent error");
955 }
956 }
957
958 #[tokio::test]
959 async fn test_file_protocol_invalid_fhir() {
960 use std::io::Write;
961 use tempfile::NamedTempFile;
962
963 let data_source = UniversalDataSource::new();
964
965 let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
967
968 let mut temp_file = NamedTempFile::new().unwrap();
969 temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
970 temp_file.flush().unwrap();
971
972 let file_path = temp_file.path();
973 let file_url = format!("file://{}", file_path.to_string_lossy());
974
975 let result = data_source.load(&file_url).await;
977 assert!(result.is_err());
978
979 if let Err(SofError::InvalidSourceContent(msg)) = result {
980 assert!(msg.contains("not a valid FHIR resource"));
981 } else {
982 panic!("Expected InvalidSourceContent error, got {:?}", result);
983 }
984 }
985
986 #[tokio::test]
987 async fn test_file_protocol_invalid_url() {
988 let data_source = UniversalDataSource::new();
989
990 let result = data_source.load("file://C:\\invalid\\windows\\path").await;
992 assert!(result.is_err());
993 }
995}