1use crate::{SofBundle, SofError};
110use async_trait::async_trait;
111use helios_fhir::Element;
112use object_store::{
113 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
114 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
115};
116use reqwest;
117use serde_json;
118use std::sync::Arc;
119use tokio::fs;
120use url::Url;
121
122#[async_trait]
124pub trait DataSource: Send + Sync {
125 async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
127}
128
129pub struct UniversalDataSource {
131 client: reqwest::Client,
132}
133
134impl UniversalDataSource {
135 pub fn new() -> Self {
136 Self {
137 client: reqwest::Client::builder()
138 .timeout(std::time::Duration::from_secs(30))
139 .build()
140 .unwrap_or_else(|_| reqwest::Client::new()),
141 }
142 }
143}
144
145impl Default for UniversalDataSource {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151#[async_trait]
152impl DataSource for UniversalDataSource {
153 async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
154 let url = Url::parse(source).map_err(|e| {
156 SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
157 })?;
158
159 match url.scheme() {
160 "file" => load_from_file(&url).await,
161 "http" | "https" => load_from_http(&self.client, &url).await,
162 "s3" => load_from_s3(&url).await,
163 "gs" => load_from_gcs(&url).await,
164 "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
165 scheme => Err(SofError::UnsupportedSourceProtocol(format!(
166 "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
167 scheme
168 ))),
169 }
170 }
171}
172
173async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
175 let path = url
177 .to_file_path()
178 .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
179
180 if !path.exists() {
182 return Err(SofError::SourceNotFound(format!(
183 "File not found: {}",
184 path.display()
185 )));
186 }
187
188 let contents = fs::read_to_string(&path)
190 .await
191 .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
192
193 parse_fhir_content(&contents, &path.to_string_lossy())
195}
196
197async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
199 let response = client
201 .get(url.as_str())
202 .header("Accept", "application/fhir+json, application/json")
203 .send()
204 .await
205 .map_err(|e| {
206 SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
207 })?;
208
209 if !response.status().is_success() {
211 return Err(SofError::SourceFetchError(format!(
212 "HTTP error {} when fetching '{}'",
213 response.status(),
214 url
215 )));
216 }
217
218 let contents = response
220 .text()
221 .await
222 .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
223
224 parse_fhir_content(&contents, url.as_str())
226}
227
228async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
230 let bucket = url.host_str().ok_or_else(|| {
232 SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
233 })?;
234
235 let path = url.path().trim_start_matches('/');
236 if path.is_empty() {
237 return Err(SofError::InvalidSource(format!(
238 "Invalid S3 URL '{}': missing object path",
239 url
240 )));
241 }
242
243 let store = AmazonS3Builder::new()
245 .with_bucket_name(bucket)
246 .build()
247 .map_err(|e| {
248 SofError::SourceFetchError(format!(
249 "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
250 url, e
251 ))
252 })?;
253
254 load_from_object_store(Arc::new(store), path, url.as_str()).await
255}
256
257async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
259 let bucket = url.host_str().ok_or_else(|| {
261 SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
262 })?;
263
264 let path = url.path().trim_start_matches('/');
265 if path.is_empty() {
266 return Err(SofError::InvalidSource(format!(
267 "Invalid GCS URL '{}': missing object path",
268 url
269 )));
270 }
271
272 let store = GoogleCloudStorageBuilder::new()
274 .with_bucket_name(bucket)
275 .build()
276 .map_err(|e| {
277 SofError::SourceFetchError(format!(
278 "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
279 url, e
280 ))
281 })?;
282
283 load_from_object_store(Arc::new(store), path, url.as_str()).await
284}
285
286async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
288 let (container, path) = if url.scheme() == "azure" {
290 let container = url.host_str().ok_or_else(|| {
292 SofError::InvalidSource(format!(
293 "Invalid Azure URL '{}': missing container name",
294 url
295 ))
296 })?;
297 let path = url.path().trim_start_matches('/');
298 (container.to_string(), path.to_string())
299 } else {
300 let host = url.host_str().ok_or_else(|| {
302 SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
303 })?;
304 let parts: Vec<&str> = host.split('@').collect();
305 if parts.len() != 2 {
306 return Err(SofError::InvalidSource(format!(
307 "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
308 url
309 )));
310 }
311 let container = parts[0];
312 let path = url.path().trim_start_matches('/');
313 (container.to_string(), path.to_string())
314 };
315
316 if path.is_empty() {
317 return Err(SofError::InvalidSource(format!(
318 "Invalid Azure URL '{}': missing blob path",
319 url
320 )));
321 }
322
323 let store = MicrosoftAzureBuilder::new()
325 .with_container_name(&container)
326 .build()
327 .map_err(|e| {
328 SofError::SourceFetchError(format!(
329 "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
330 url, e
331 ))
332 })?;
333
334 load_from_object_store(Arc::new(store), &path, url.as_str()).await
335}
336
337async fn load_from_object_store(
339 store: Arc<dyn ObjectStore>,
340 path: &str,
341 source_name: &str,
342) -> Result<SofBundle, SofError> {
343 let object_path = ObjectPath::from(path);
345
346 let result = store.get(&object_path).await.map_err(|e| match e {
348 object_store::Error::NotFound { .. } => {
349 SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
350 }
351 _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
352 })?;
353
354 let bytes = result
356 .bytes()
357 .await
358 .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
359
360 let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
362 SofError::InvalidSourceContent(format!(
363 "Content from '{}' is not valid UTF-8: {}",
364 source_name, e
365 ))
366 })?;
367
368 parse_fhir_content(&contents, source_name)
370}
371
372fn is_ndjson_extension(source_name: &str) -> bool {
374 source_name.to_lowercase().ends_with(".ndjson")
375}
376
377fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
379 let lines: Vec<&str> = contents
380 .lines()
381 .filter(|line| !line.trim().is_empty())
382 .collect();
383
384 if lines.is_empty() {
385 return Err(SofError::InvalidSourceContent(format!(
386 "Empty NDJSON content from '{}'",
387 source_name
388 )));
389 }
390
391 let mut resources = Vec::new();
393 let mut parse_errors = Vec::new();
394
395 for (line_num, line) in lines.iter().enumerate() {
396 match serde_json::from_str::<serde_json::Value>(line) {
397 Ok(value) => {
398 if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
400 resources.push(value);
401 } else {
402 parse_errors.push(format!(
403 "Line {}: Missing 'resourceType' field",
404 line_num + 1
405 ));
406 }
407 }
408 Err(e) => {
409 parse_errors.push(format!("Line {}: {}", line_num + 1, e));
410 }
411 }
412 }
413
414 if resources.is_empty() {
416 return Err(SofError::InvalidSourceContent(format!(
417 "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
418 source_name,
419 parse_errors.join("; ")
420 )));
421 }
422
423 if !parse_errors.is_empty() {
425 eprintln!(
426 "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
427 parse_errors.len(),
428 source_name,
429 parse_errors.join("; ")
430 );
431 }
432
433 let resources_array = serde_json::Value::Array(resources);
435 wrap_resources_in_bundle(resources_array, source_name)
436}
437
438pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
441 if is_ndjson_extension(source_name) {
443 return parse_ndjson_content(contents, source_name);
444 }
445
446 let value: serde_json::Value = match serde_json::from_str(contents) {
448 Ok(v) => v,
449 Err(json_err) => {
450 if contents.lines().count() > 1 {
453 return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
455 SofError::InvalidSourceContent(format!(
457 "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
458 source_name, json_err, ndjson_err
459 ))
460 });
461 }
462
463 return Err(SofError::InvalidSourceContent(format!(
465 "Failed to parse JSON from '{}': {}",
466 source_name, json_err
467 )));
468 }
469 };
470
471 if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
473 if resource_type == "Bundle" {
474 #[cfg(feature = "R4")]
476 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
477 return Ok(SofBundle::R4(bundle));
478 }
479 #[cfg(feature = "R4B")]
480 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
481 return Ok(SofBundle::R4B(bundle));
482 }
483 #[cfg(feature = "R5")]
484 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
485 return Ok(SofBundle::R5(bundle));
486 }
487 #[cfg(feature = "R6")]
488 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
489 return Ok(SofBundle::R6(bundle));
490 }
491 return Err(SofError::InvalidSourceContent(format!(
492 "Bundle from '{}' could not be parsed as any supported FHIR version",
493 source_name
494 )));
495 }
496
497 return wrap_resource_in_bundle(value, source_name);
499 }
500
501 if value.is_array() {
503 return wrap_resources_in_bundle(value, source_name);
504 }
505
506 Err(SofError::InvalidSourceContent(format!(
507 "Content from '{}' is not a valid FHIR resource or Bundle",
508 source_name
509 )))
510}
511
512fn wrap_resource_in_bundle(
514 resource: serde_json::Value,
515 source_name: &str,
516) -> Result<SofBundle, SofError> {
517 #[cfg(feature = "R4")]
520 if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
521 let mut bundle = helios_fhir::r4::Bundle::default();
522 bundle.r#type = Element {
523 id: None,
524 extension: None,
525 value: Some("collection".to_string()),
526 };
527 bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
528 resource: Some(res),
529 ..Default::default()
530 }]);
531 return Ok(SofBundle::R4(bundle));
532 }
533
534 #[cfg(feature = "R4B")]
536 if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
537 let mut bundle = helios_fhir::r4b::Bundle::default();
538 bundle.r#type = Element {
539 id: None,
540 extension: None,
541 value: Some("collection".to_string()),
542 };
543 bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
544 resource: Some(res),
545 ..Default::default()
546 }]);
547 return Ok(SofBundle::R4B(bundle));
548 }
549
550 #[cfg(feature = "R5")]
552 if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
553 let mut bundle = helios_fhir::r5::Bundle::default();
554 bundle.r#type = Element {
555 id: None,
556 extension: None,
557 value: Some("collection".to_string()),
558 };
559 bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
560 resource: Some(Box::new(res)),
561 ..Default::default()
562 }]);
563 return Ok(SofBundle::R5(bundle));
564 }
565
566 #[cfg(feature = "R6")]
568 if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
569 let mut bundle = helios_fhir::r6::Bundle::default();
570 bundle.r#type = Element {
571 id: None,
572 extension: None,
573 value: Some("collection".to_string()),
574 };
575 bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
576 resource: Some(Box::new(res)),
577 ..Default::default()
578 }]);
579 return Ok(SofBundle::R6(bundle));
580 }
581
582 Err(SofError::InvalidSourceContent(format!(
583 "Resource from '{}' could not be parsed as any supported FHIR version",
584 source_name
585 )))
586}
587
588fn wrap_resources_in_bundle(
590 resources: serde_json::Value,
591 source_name: &str,
592) -> Result<SofBundle, SofError> {
593 let arr = resources
594 .as_array()
595 .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
596
597 if arr.is_empty() {
598 return Err(SofError::InvalidSourceContent(format!(
599 "Empty array of resources from '{}'",
600 source_name
601 )));
602 }
603
604 let first = &arr[0];
606
607 #[cfg(feature = "R4")]
609 if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
610 let mut bundle = helios_fhir::r4::Bundle::default();
611 bundle.r#type = Element {
612 id: None,
613 extension: None,
614 value: Some("collection".to_string()),
615 };
616 let mut entries = Vec::new();
617
618 for resource in arr {
619 let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
620 .map_err(|e| {
621 SofError::InvalidSourceContent(format!(
622 "Failed to parse R4 resource from '{}': {}",
623 source_name, e
624 ))
625 })?;
626 entries.push(helios_fhir::r4::BundleEntry {
627 resource: Some(res),
628 ..Default::default()
629 });
630 }
631
632 bundle.entry = Some(entries);
633 return Ok(SofBundle::R4(bundle));
634 }
635
636 #[cfg(feature = "R4B")]
638 if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
639 let mut bundle = helios_fhir::r4b::Bundle::default();
640 bundle.r#type = Element {
641 id: None,
642 extension: None,
643 value: Some("collection".to_string()),
644 };
645 let mut entries = Vec::new();
646
647 for resource in arr {
648 let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
649 .map_err(|e| {
650 SofError::InvalidSourceContent(format!(
651 "Failed to parse R4B resource from '{}': {}",
652 source_name, e
653 ))
654 })?;
655 entries.push(helios_fhir::r4b::BundleEntry {
656 resource: Some(res),
657 ..Default::default()
658 });
659 }
660
661 bundle.entry = Some(entries);
662 return Ok(SofBundle::R4B(bundle));
663 }
664
665 #[cfg(feature = "R5")]
667 if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
668 let mut bundle = helios_fhir::r5::Bundle::default();
669 bundle.r#type = Element {
670 id: None,
671 extension: None,
672 value: Some("collection".to_string()),
673 };
674 let mut entries = Vec::new();
675
676 for resource in arr {
677 let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
678 .map_err(|e| {
679 SofError::InvalidSourceContent(format!(
680 "Failed to parse R5 resource from '{}': {}",
681 source_name, e
682 ))
683 })?;
684 entries.push(helios_fhir::r5::BundleEntry {
685 resource: Some(Box::new(res)),
686 ..Default::default()
687 });
688 }
689
690 bundle.entry = Some(entries);
691 return Ok(SofBundle::R5(bundle));
692 }
693
694 #[cfg(feature = "R6")]
696 if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
697 let mut bundle = helios_fhir::r6::Bundle::default();
698 bundle.r#type = Element {
699 id: None,
700 extension: None,
701 value: Some("collection".to_string()),
702 };
703 let mut entries = Vec::new();
704
705 for resource in arr {
706 let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
707 .map_err(|e| {
708 SofError::InvalidSourceContent(format!(
709 "Failed to parse R6 resource from '{}': {}",
710 source_name, e
711 ))
712 })?;
713 entries.push(helios_fhir::r6::BundleEntry {
714 resource: Some(Box::new(res)),
715 ..Default::default()
716 });
717 }
718
719 bundle.entry = Some(entries);
720 return Ok(SofBundle::R6(bundle));
721 }
722
723 Err(SofError::InvalidSourceContent(format!(
724 "Resources from '{}' could not be parsed as any supported FHIR version",
725 source_name
726 )))
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732
733 #[tokio::test]
734 async fn test_parse_fhir_bundle() {
735 let bundle_json = r#"{
736 "resourceType": "Bundle",
737 "type": "collection",
738 "entry": [{
739 "resource": {
740 "resourceType": "Patient",
741 "id": "123"
742 }
743 }]
744 }"#;
745
746 let result = parse_fhir_content(bundle_json, "test").unwrap();
747 #[cfg(feature = "R4")]
748 assert!(matches!(result, SofBundle::R4(_)));
749 #[cfg(not(feature = "R4"))]
750 assert!(matches!(result, _));
751 }
752
753 #[tokio::test]
754 async fn test_parse_single_resource() {
755 let patient_json = r#"{
756 "resourceType": "Patient",
757 "id": "123"
758 }"#;
759
760 let result = parse_fhir_content(patient_json, "test").unwrap();
761 #[cfg(feature = "R4")]
762 match result {
763 SofBundle::R4(bundle) => {
764 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
765 }
766 #[cfg(feature = "R4B")]
767 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
768 #[cfg(feature = "R5")]
769 SofBundle::R5(_) => panic!("Expected R4 bundle"),
770 #[cfg(feature = "R6")]
771 SofBundle::R6(_) => panic!("Expected R4 bundle"),
772 }
773 }
774
775 #[tokio::test]
776 async fn test_parse_resource_array() {
777 let resources_json = r#"[
778 {
779 "resourceType": "Patient",
780 "id": "123"
781 },
782 {
783 "resourceType": "Patient",
784 "id": "456"
785 }
786 ]"#;
787
788 let result = parse_fhir_content(resources_json, "test").unwrap();
789 #[cfg(feature = "R4")]
790 match result {
791 SofBundle::R4(bundle) => {
792 assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
793 }
794 #[cfg(feature = "R4B")]
795 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
796 #[cfg(feature = "R5")]
797 SofBundle::R5(_) => panic!("Expected R4 bundle"),
798 #[cfg(feature = "R6")]
799 SofBundle::R6(_) => panic!("Expected R4 bundle"),
800 }
801 }
802
803 #[tokio::test]
804 async fn test_invalid_content() {
805 let invalid_json = r#"{"not": "fhir"}"#;
806 let result = parse_fhir_content(invalid_json, "test");
807 assert!(result.is_err());
808 }
809
810 #[tokio::test]
811 async fn test_s3_url_parsing() {
812 let data_source = UniversalDataSource::new();
813
814 let result = data_source.load("s3:///path/to/file.json").await;
816 assert!(result.is_err());
817 if let Err(SofError::InvalidSource(msg)) = result {
818 assert!(msg.contains("missing bucket name"));
819 }
820
821 let result = data_source.load("s3://bucket/").await;
823 assert!(result.is_err());
824 if let Err(SofError::InvalidSource(msg)) = result {
825 assert!(msg.contains("missing object path"));
826 }
827
828 }
831
832 #[tokio::test]
833 async fn test_gcs_url_parsing() {
834 let data_source = UniversalDataSource::new();
835
836 let result = data_source.load("gs:///path/to/file.json").await;
838 assert!(result.is_err());
839 if let Err(SofError::InvalidSource(msg)) = result {
840 assert!(msg.contains("missing bucket name"));
841 }
842
843 let result = data_source.load("gs://bucket/").await;
845 assert!(result.is_err());
846 if let Err(SofError::InvalidSource(msg)) = result {
847 assert!(msg.contains("missing object path"));
848 }
849 }
850
851 #[tokio::test]
852 async fn test_azure_url_parsing() {
853 let data_source = UniversalDataSource::new();
854
855 let result = data_source.load("azure:///path/to/file.json").await;
857 assert!(result.is_err());
858 if let Err(SofError::InvalidSource(msg)) = result {
859 assert!(msg.contains("missing container name"));
860 }
861
862 let result = data_source.load("azure://container/").await;
864 assert!(result.is_err());
865 if let Err(SofError::InvalidSource(msg)) = result {
866 assert!(msg.contains("missing blob path"));
867 }
868 }
869
870 #[tokio::test]
871 async fn test_unsupported_protocol() {
872 let data_source = UniversalDataSource::new();
873
874 let result = data_source.load("ftp://server/file.json").await;
876 assert!(result.is_err());
877 if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
878 assert!(msg.contains("Unsupported source protocol: ftp"));
879 assert!(msg.contains("Supported:"));
880 }
881 }
882
883 #[tokio::test]
884 async fn test_file_protocol_bundle() {
885 use std::io::Write;
886 use tempfile::NamedTempFile;
887
888 let data_source = UniversalDataSource::new();
889
890 let bundle_json = r#"{
892 "resourceType": "Bundle",
893 "type": "collection",
894 "entry": [{
895 "resource": {
896 "resourceType": "Patient",
897 "id": "test-patient"
898 }
899 }]
900 }"#;
901
902 let mut temp_file = NamedTempFile::new().unwrap();
903 temp_file.write_all(bundle_json.as_bytes()).unwrap();
904 temp_file.flush().unwrap();
905
906 let file_path = temp_file.path();
908 let file_url = format!("file://{}", file_path.to_string_lossy());
909
910 let result = data_source.load(&file_url).await;
912 assert!(result.is_ok());
913
914 #[cfg(feature = "R4")]
915 match result.unwrap() {
916 SofBundle::R4(bundle) => {
917 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
918 }
919 #[cfg(feature = "R4B")]
920 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
921 #[cfg(feature = "R5")]
922 SofBundle::R5(_) => panic!("Expected R4 bundle"),
923 #[cfg(feature = "R6")]
924 SofBundle::R6(_) => panic!("Expected R4 bundle"),
925 }
926 }
927
928 #[tokio::test]
929 async fn test_file_protocol_single_resource() {
930 use std::io::Write;
931 use tempfile::NamedTempFile;
932
933 let data_source = UniversalDataSource::new();
934
935 let patient_json = r#"{
937 "resourceType": "Patient",
938 "id": "test-patient",
939 "name": [{
940 "family": "Test",
941 "given": ["Patient"]
942 }]
943 }"#;
944
945 let mut temp_file = NamedTempFile::new().unwrap();
946 temp_file.write_all(patient_json.as_bytes()).unwrap();
947 temp_file.flush().unwrap();
948
949 let file_path = temp_file.path();
950 let file_url = format!("file://{}", file_path.to_string_lossy());
951
952 let result = data_source.load(&file_url).await;
954 assert!(result.is_ok());
955
956 #[cfg(feature = "R4")]
957 match result.unwrap() {
958 SofBundle::R4(bundle) => {
959 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
960 }
961 #[cfg(feature = "R4B")]
962 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
963 #[cfg(feature = "R5")]
964 SofBundle::R5(_) => panic!("Expected R4 bundle"),
965 #[cfg(feature = "R6")]
966 SofBundle::R6(_) => panic!("Expected R4 bundle"),
967 }
968 }
969
970 #[tokio::test]
971 async fn test_file_protocol_resource_array() {
972 use std::io::Write;
973 use tempfile::NamedTempFile;
974
975 let data_source = UniversalDataSource::new();
976
977 let resources_json = r#"[
979 {
980 "resourceType": "Patient",
981 "id": "patient-1"
982 },
983 {
984 "resourceType": "Patient",
985 "id": "patient-2"
986 },
987 {
988 "resourceType": "Observation",
989 "id": "obs-1",
990 "status": "final",
991 "code": {
992 "text": "Test"
993 }
994 }
995 ]"#;
996
997 let mut temp_file = NamedTempFile::new().unwrap();
998 temp_file.write_all(resources_json.as_bytes()).unwrap();
999 temp_file.flush().unwrap();
1000
1001 let file_path = temp_file.path();
1002 let file_url = format!("file://{}", file_path.to_string_lossy());
1003
1004 let result = data_source.load(&file_url).await;
1006 assert!(result.is_ok());
1007
1008 #[cfg(feature = "R4")]
1009 match result.unwrap() {
1010 SofBundle::R4(bundle) => {
1011 assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1012 }
1013 #[cfg(feature = "R4B")]
1014 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1015 #[cfg(feature = "R5")]
1016 SofBundle::R5(_) => panic!("Expected R4 bundle"),
1017 #[cfg(feature = "R6")]
1018 SofBundle::R6(_) => panic!("Expected R4 bundle"),
1019 }
1020 }
1021
1022 #[tokio::test]
1023 async fn test_file_protocol_file_not_found() {
1024 use std::path::PathBuf;
1025 use url::Url;
1026
1027 let data_source = UniversalDataSource::new();
1028
1029 #[cfg(windows)]
1031 let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1032 #[cfg(not(windows))]
1033 let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1034
1035 let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1036
1037 let result = data_source.load(&file_url).await;
1038 assert!(result.is_err());
1039
1040 if let Err(SofError::SourceNotFound(msg)) = result {
1041 assert!(msg.contains("File not found"));
1042 } else {
1043 panic!("Expected SourceNotFound error");
1044 }
1045 }
1046
1047 #[tokio::test]
1048 async fn test_file_protocol_invalid_json() {
1049 use std::io::Write;
1050 use tempfile::NamedTempFile;
1051
1052 let data_source = UniversalDataSource::new();
1053
1054 let invalid_json = "{ this is not valid json }";
1056
1057 let mut temp_file = NamedTempFile::new().unwrap();
1058 temp_file.write_all(invalid_json.as_bytes()).unwrap();
1059 temp_file.flush().unwrap();
1060
1061 let file_path = temp_file.path();
1062 let file_url = format!("file://{}", file_path.to_string_lossy());
1063
1064 let result = data_source.load(&file_url).await;
1066 assert!(result.is_err());
1067
1068 if let Err(SofError::InvalidSourceContent(msg)) = result {
1069 assert!(msg.contains("Failed to parse JSON"));
1070 } else {
1071 panic!("Expected InvalidSourceContent error");
1072 }
1073 }
1074
1075 #[tokio::test]
1076 async fn test_file_protocol_invalid_fhir() {
1077 use std::io::Write;
1078 use tempfile::NamedTempFile;
1079
1080 let data_source = UniversalDataSource::new();
1081
1082 let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1084
1085 let mut temp_file = NamedTempFile::new().unwrap();
1086 temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1087 temp_file.flush().unwrap();
1088
1089 let file_path = temp_file.path();
1090 let file_url = format!("file://{}", file_path.to_string_lossy());
1091
1092 let result = data_source.load(&file_url).await;
1094 assert!(result.is_err());
1095
1096 if let Err(SofError::InvalidSourceContent(msg)) = result {
1097 assert!(msg.contains("not a valid FHIR resource"));
1098 } else {
1099 panic!("Expected InvalidSourceContent error, got {:?}", result);
1100 }
1101 }
1102
1103 #[tokio::test]
1104 async fn test_file_protocol_invalid_url() {
1105 let data_source = UniversalDataSource::new();
1106
1107 let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1109 assert!(result.is_err());
1110 }
1112}