1use crate::{SofBundle, SofError};
114use async_trait::async_trait;
115use helios_fhir::Element;
116use object_store::{
117 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
118 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
119};
120use reqwest;
121use serde_json;
122use std::sync::Arc;
123use tokio::fs;
124use url::Url;
125
126#[async_trait]
128pub trait DataSource: Send + Sync {
129 async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
131}
132
133pub struct UniversalDataSource {
135 client: reqwest::Client,
136}
137
138impl UniversalDataSource {
139 pub fn new() -> Self {
140 Self {
141 client: reqwest::Client::builder()
142 .timeout(std::time::Duration::from_secs(30))
143 .build()
144 .unwrap_or_else(|_| reqwest::Client::new()),
145 }
146 }
147}
148
149impl Default for UniversalDataSource {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155#[async_trait]
156impl DataSource for UniversalDataSource {
157 async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
158 let url = Url::parse(source).map_err(|e| {
160 SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
161 })?;
162
163 match url.scheme() {
164 "file" => load_from_file(&url).await,
165 "http" | "https" => load_from_http(&self.client, &url).await,
166 "s3" => load_from_s3(&url).await,
167 "gs" => load_from_gcs(&url).await,
168 "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
169 scheme => Err(SofError::UnsupportedSourceProtocol(format!(
170 "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
171 scheme
172 ))),
173 }
174 }
175}
176
177async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
179 let path = url
181 .to_file_path()
182 .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
183
184 if !path.exists() {
186 return Err(SofError::SourceNotFound(format!(
187 "File not found: {}",
188 path.display()
189 )));
190 }
191
192 let contents = fs::read_to_string(&path)
194 .await
195 .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
196
197 parse_fhir_content(&contents, &path.to_string_lossy())
199}
200
201async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
203 let response = client
205 .get(url.as_str())
206 .header("Accept", "application/fhir+json, application/json")
207 .send()
208 .await
209 .map_err(|e| {
210 SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
211 })?;
212
213 if !response.status().is_success() {
215 return Err(SofError::SourceFetchError(format!(
216 "HTTP error {} when fetching '{}'",
217 response.status(),
218 url
219 )));
220 }
221
222 let contents = response
224 .text()
225 .await
226 .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
227
228 parse_fhir_content(&contents, url.as_str())
230}
231
232async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
234 let bucket = url.host_str().ok_or_else(|| {
236 SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
237 })?;
238
239 let path = url.path().trim_start_matches('/');
240 if path.is_empty() {
241 return Err(SofError::InvalidSource(format!(
242 "Invalid S3 URL '{}': missing object path",
243 url
244 )));
245 }
246
247 let mut builder = AmazonS3Builder::new().with_bucket_name(bucket);
249
250 if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
252 builder = builder.with_endpoint(&endpoint);
253
254 if let Ok(allow_http) = std::env::var("AWS_ALLOW_HTTP") {
256 if allow_http.eq_ignore_ascii_case("true") || allow_http == "1" {
257 builder = builder.with_allow_http(true);
258 }
259 }
260 }
261
262 let store = builder.build().map_err(|e| {
263 SofError::SourceFetchError(format!(
264 "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION). For S3-compatible services, set AWS_ENDPOINT_URL.",
265 url, e
266 ))
267 })?;
268
269 load_from_object_store(Arc::new(store), path, url.as_str()).await
270}
271
272async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
274 let bucket = url.host_str().ok_or_else(|| {
276 SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
277 })?;
278
279 let path = url.path().trim_start_matches('/');
280 if path.is_empty() {
281 return Err(SofError::InvalidSource(format!(
282 "Invalid GCS URL '{}': missing object path",
283 url
284 )));
285 }
286
287 let store = GoogleCloudStorageBuilder::new()
289 .with_bucket_name(bucket)
290 .build()
291 .map_err(|e| {
292 SofError::SourceFetchError(format!(
293 "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
294 url, e
295 ))
296 })?;
297
298 load_from_object_store(Arc::new(store), path, url.as_str()).await
299}
300
301async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
303 let (container, path) = if url.scheme() == "azure" {
305 let container = url.host_str().ok_or_else(|| {
307 SofError::InvalidSource(format!(
308 "Invalid Azure URL '{}': missing container name",
309 url
310 ))
311 })?;
312 let path = url.path().trim_start_matches('/');
313 (container.to_string(), path.to_string())
314 } else {
315 let host = url.host_str().ok_or_else(|| {
317 SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
318 })?;
319 let parts: Vec<&str> = host.split('@').collect();
320 if parts.len() != 2 {
321 return Err(SofError::InvalidSource(format!(
322 "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
323 url
324 )));
325 }
326 let container = parts[0];
327 let path = url.path().trim_start_matches('/');
328 (container.to_string(), path.to_string())
329 };
330
331 if path.is_empty() {
332 return Err(SofError::InvalidSource(format!(
333 "Invalid Azure URL '{}': missing blob path",
334 url
335 )));
336 }
337
338 let store = MicrosoftAzureBuilder::new()
340 .with_container_name(&container)
341 .build()
342 .map_err(|e| {
343 SofError::SourceFetchError(format!(
344 "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
345 url, e
346 ))
347 })?;
348
349 load_from_object_store(Arc::new(store), &path, url.as_str()).await
350}
351
352async fn load_from_object_store(
354 store: Arc<dyn ObjectStore>,
355 path: &str,
356 source_name: &str,
357) -> Result<SofBundle, SofError> {
358 let object_path = ObjectPath::from(path);
360
361 let result = store.get(&object_path).await.map_err(|e| match e {
363 object_store::Error::NotFound { .. } => {
364 SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
365 }
366 _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
367 })?;
368
369 let bytes = result
371 .bytes()
372 .await
373 .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
374
375 let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
377 SofError::InvalidSourceContent(format!(
378 "Content from '{}' is not valid UTF-8: {}",
379 source_name, e
380 ))
381 })?;
382
383 parse_fhir_content(&contents, source_name)
385}
386
387fn is_ndjson_extension(source_name: &str) -> bool {
389 source_name.to_lowercase().ends_with(".ndjson")
390}
391
392fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
394 let lines: Vec<&str> = contents
395 .lines()
396 .filter(|line| !line.trim().is_empty())
397 .collect();
398
399 if lines.is_empty() {
400 return Err(SofError::InvalidSourceContent(format!(
401 "Empty NDJSON content from '{}'",
402 source_name
403 )));
404 }
405
406 let mut resources = Vec::new();
408 let mut parse_errors = Vec::new();
409
410 for (line_num, line) in lines.iter().enumerate() {
411 match serde_json::from_str::<serde_json::Value>(line) {
412 Ok(value) => {
413 if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
415 resources.push(value);
416 } else {
417 parse_errors.push(format!(
418 "Line {}: Missing 'resourceType' field",
419 line_num + 1
420 ));
421 }
422 }
423 Err(e) => {
424 parse_errors.push(format!("Line {}: {}", line_num + 1, e));
425 }
426 }
427 }
428
429 if resources.is_empty() {
431 return Err(SofError::InvalidSourceContent(format!(
432 "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
433 source_name,
434 parse_errors.join("; ")
435 )));
436 }
437
438 if !parse_errors.is_empty() {
440 eprintln!(
441 "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
442 parse_errors.len(),
443 source_name,
444 parse_errors.join("; ")
445 );
446 }
447
448 let resources_array = serde_json::Value::Array(resources);
450 wrap_resources_in_bundle(resources_array, source_name)
451}
452
453pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
456 if is_ndjson_extension(source_name) {
458 return parse_ndjson_content(contents, source_name);
459 }
460
461 let value: serde_json::Value = match serde_json::from_str(contents) {
463 Ok(v) => v,
464 Err(json_err) => {
465 if contents.lines().count() > 1 {
468 return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
470 SofError::InvalidSourceContent(format!(
472 "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
473 source_name, json_err, ndjson_err
474 ))
475 });
476 }
477
478 return Err(SofError::InvalidSourceContent(format!(
480 "Failed to parse JSON from '{}': {}",
481 source_name, json_err
482 )));
483 }
484 };
485
486 if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
488 if resource_type == "Bundle" {
489 #[cfg(feature = "R4")]
491 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
492 return Ok(SofBundle::R4(bundle));
493 }
494 #[cfg(feature = "R4B")]
495 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
496 return Ok(SofBundle::R4B(bundle));
497 }
498 #[cfg(feature = "R5")]
499 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
500 return Ok(SofBundle::R5(bundle));
501 }
502 #[cfg(feature = "R6")]
503 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
504 return Ok(SofBundle::R6(bundle));
505 }
506 return Err(SofError::InvalidSourceContent(format!(
507 "Bundle from '{}' could not be parsed as any supported FHIR version",
508 source_name
509 )));
510 }
511
512 return wrap_resource_in_bundle(value, source_name);
514 }
515
516 if value.is_array() {
518 return wrap_resources_in_bundle(value, source_name);
519 }
520
521 Err(SofError::InvalidSourceContent(format!(
522 "Content from '{}' is not a valid FHIR resource or Bundle",
523 source_name
524 )))
525}
526
527fn wrap_resource_in_bundle(
529 resource: serde_json::Value,
530 source_name: &str,
531) -> Result<SofBundle, SofError> {
532 #[cfg(feature = "R4")]
535 if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
536 let mut bundle = helios_fhir::r4::Bundle::default();
537 bundle.r#type = Element {
538 id: None,
539 extension: None,
540 value: Some("collection".to_string()),
541 };
542 bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
543 resource: Some(res),
544 ..Default::default()
545 }]);
546 return Ok(SofBundle::R4(bundle));
547 }
548
549 #[cfg(feature = "R4B")]
551 if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
552 let mut bundle = helios_fhir::r4b::Bundle::default();
553 bundle.r#type = Element {
554 id: None,
555 extension: None,
556 value: Some("collection".to_string()),
557 };
558 bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
559 resource: Some(res),
560 ..Default::default()
561 }]);
562 return Ok(SofBundle::R4B(bundle));
563 }
564
565 #[cfg(feature = "R5")]
567 if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
568 let mut bundle = helios_fhir::r5::Bundle::default();
569 bundle.r#type = Element {
570 id: None,
571 extension: None,
572 value: Some("collection".to_string()),
573 };
574 bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
575 resource: Some(Box::new(res)),
576 ..Default::default()
577 }]);
578 return Ok(SofBundle::R5(bundle));
579 }
580
581 #[cfg(feature = "R6")]
583 if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
584 let mut bundle = helios_fhir::r6::Bundle::default();
585 bundle.r#type = Element {
586 id: None,
587 extension: None,
588 value: Some("collection".to_string()),
589 };
590 bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
591 resource: Some(Box::new(res)),
592 ..Default::default()
593 }]);
594 return Ok(SofBundle::R6(bundle));
595 }
596
597 Err(SofError::InvalidSourceContent(format!(
598 "Resource from '{}' could not be parsed as any supported FHIR version",
599 source_name
600 )))
601}
602
603fn wrap_resources_in_bundle(
605 resources: serde_json::Value,
606 source_name: &str,
607) -> Result<SofBundle, SofError> {
608 let arr = resources
609 .as_array()
610 .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
611
612 if arr.is_empty() {
613 return Err(SofError::InvalidSourceContent(format!(
614 "Empty array of resources from '{}'",
615 source_name
616 )));
617 }
618
619 let first = &arr[0];
621
622 #[cfg(feature = "R4")]
624 if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
625 let mut bundle = helios_fhir::r4::Bundle::default();
626 bundle.r#type = Element {
627 id: None,
628 extension: None,
629 value: Some("collection".to_string()),
630 };
631 let mut entries = Vec::new();
632
633 for resource in arr {
634 let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
635 .map_err(|e| {
636 SofError::InvalidSourceContent(format!(
637 "Failed to parse R4 resource from '{}': {}",
638 source_name, e
639 ))
640 })?;
641 entries.push(helios_fhir::r4::BundleEntry {
642 resource: Some(res),
643 ..Default::default()
644 });
645 }
646
647 bundle.entry = Some(entries);
648 return Ok(SofBundle::R4(bundle));
649 }
650
651 #[cfg(feature = "R4B")]
653 if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
654 let mut bundle = helios_fhir::r4b::Bundle::default();
655 bundle.r#type = Element {
656 id: None,
657 extension: None,
658 value: Some("collection".to_string()),
659 };
660 let mut entries = Vec::new();
661
662 for resource in arr {
663 let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
664 .map_err(|e| {
665 SofError::InvalidSourceContent(format!(
666 "Failed to parse R4B resource from '{}': {}",
667 source_name, e
668 ))
669 })?;
670 entries.push(helios_fhir::r4b::BundleEntry {
671 resource: Some(res),
672 ..Default::default()
673 });
674 }
675
676 bundle.entry = Some(entries);
677 return Ok(SofBundle::R4B(bundle));
678 }
679
680 #[cfg(feature = "R5")]
682 if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
683 let mut bundle = helios_fhir::r5::Bundle::default();
684 bundle.r#type = Element {
685 id: None,
686 extension: None,
687 value: Some("collection".to_string()),
688 };
689 let mut entries = Vec::new();
690
691 for resource in arr {
692 let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
693 .map_err(|e| {
694 SofError::InvalidSourceContent(format!(
695 "Failed to parse R5 resource from '{}': {}",
696 source_name, e
697 ))
698 })?;
699 entries.push(helios_fhir::r5::BundleEntry {
700 resource: Some(Box::new(res)),
701 ..Default::default()
702 });
703 }
704
705 bundle.entry = Some(entries);
706 return Ok(SofBundle::R5(bundle));
707 }
708
709 #[cfg(feature = "R6")]
711 if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
712 let mut bundle = helios_fhir::r6::Bundle::default();
713 bundle.r#type = Element {
714 id: None,
715 extension: None,
716 value: Some("collection".to_string()),
717 };
718 let mut entries = Vec::new();
719
720 for resource in arr {
721 let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
722 .map_err(|e| {
723 SofError::InvalidSourceContent(format!(
724 "Failed to parse R6 resource from '{}': {}",
725 source_name, e
726 ))
727 })?;
728 entries.push(helios_fhir::r6::BundleEntry {
729 resource: Some(Box::new(res)),
730 ..Default::default()
731 });
732 }
733
734 bundle.entry = Some(entries);
735 return Ok(SofBundle::R6(bundle));
736 }
737
738 Err(SofError::InvalidSourceContent(format!(
739 "Resources from '{}' could not be parsed as any supported FHIR version",
740 source_name
741 )))
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747
748 #[tokio::test]
749 async fn test_parse_fhir_bundle() {
750 let bundle_json = r#"{
751 "resourceType": "Bundle",
752 "type": "collection",
753 "entry": [{
754 "resource": {
755 "resourceType": "Patient",
756 "id": "123"
757 }
758 }]
759 }"#;
760
761 let result = parse_fhir_content(bundle_json, "test").unwrap();
762 #[cfg(feature = "R4")]
763 assert!(matches!(result, SofBundle::R4(_)));
764 #[cfg(not(feature = "R4"))]
765 assert!(matches!(result, _));
766 }
767
768 #[tokio::test]
769 async fn test_parse_single_resource() {
770 let patient_json = r#"{
771 "resourceType": "Patient",
772 "id": "123"
773 }"#;
774
775 let result = parse_fhir_content(patient_json, "test").unwrap();
776 #[cfg(feature = "R4")]
777 match result {
778 SofBundle::R4(bundle) => {
779 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
780 }
781 #[cfg(feature = "R4B")]
782 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
783 #[cfg(feature = "R5")]
784 SofBundle::R5(_) => panic!("Expected R4 bundle"),
785 #[cfg(feature = "R6")]
786 SofBundle::R6(_) => panic!("Expected R4 bundle"),
787 }
788 }
789
790 #[tokio::test]
791 async fn test_parse_resource_array() {
792 let resources_json = r#"[
793 {
794 "resourceType": "Patient",
795 "id": "123"
796 },
797 {
798 "resourceType": "Patient",
799 "id": "456"
800 }
801 ]"#;
802
803 let result = parse_fhir_content(resources_json, "test").unwrap();
804 #[cfg(feature = "R4")]
805 match result {
806 SofBundle::R4(bundle) => {
807 assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
808 }
809 #[cfg(feature = "R4B")]
810 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
811 #[cfg(feature = "R5")]
812 SofBundle::R5(_) => panic!("Expected R4 bundle"),
813 #[cfg(feature = "R6")]
814 SofBundle::R6(_) => panic!("Expected R4 bundle"),
815 }
816 }
817
818 #[tokio::test]
819 async fn test_invalid_content() {
820 let invalid_json = r#"{"not": "fhir"}"#;
821 let result = parse_fhir_content(invalid_json, "test");
822 assert!(result.is_err());
823 }
824
825 #[tokio::test]
826 async fn test_s3_url_parsing() {
827 let data_source = UniversalDataSource::new();
828
829 let result = data_source.load("s3:///path/to/file.json").await;
831 assert!(result.is_err());
832 if let Err(SofError::InvalidSource(msg)) = result {
833 assert!(msg.contains("missing bucket name"));
834 }
835
836 let result = data_source.load("s3://bucket/").await;
838 assert!(result.is_err());
839 if let Err(SofError::InvalidSource(msg)) = result {
840 assert!(msg.contains("missing object path"));
841 }
842
843 }
846
847 #[tokio::test]
848 async fn test_gcs_url_parsing() {
849 let data_source = UniversalDataSource::new();
850
851 let result = data_source.load("gs:///path/to/file.json").await;
853 assert!(result.is_err());
854 if let Err(SofError::InvalidSource(msg)) = result {
855 assert!(msg.contains("missing bucket name"));
856 }
857
858 let result = data_source.load("gs://bucket/").await;
860 assert!(result.is_err());
861 if let Err(SofError::InvalidSource(msg)) = result {
862 assert!(msg.contains("missing object path"));
863 }
864 }
865
866 #[tokio::test]
867 async fn test_azure_url_parsing() {
868 let data_source = UniversalDataSource::new();
869
870 let result = data_source.load("azure:///path/to/file.json").await;
872 assert!(result.is_err());
873 if let Err(SofError::InvalidSource(msg)) = result {
874 assert!(msg.contains("missing container name"));
875 }
876
877 let result = data_source.load("azure://container/").await;
879 assert!(result.is_err());
880 if let Err(SofError::InvalidSource(msg)) = result {
881 assert!(msg.contains("missing blob path"));
882 }
883 }
884
885 #[tokio::test]
886 async fn test_unsupported_protocol() {
887 let data_source = UniversalDataSource::new();
888
889 let result = data_source.load("ftp://server/file.json").await;
891 assert!(result.is_err());
892 if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
893 assert!(msg.contains("Unsupported source protocol: ftp"));
894 assert!(msg.contains("Supported:"));
895 }
896 }
897
898 #[tokio::test]
899 async fn test_file_protocol_bundle() {
900 use std::io::Write;
901 use tempfile::NamedTempFile;
902
903 let data_source = UniversalDataSource::new();
904
905 let bundle_json = r#"{
907 "resourceType": "Bundle",
908 "type": "collection",
909 "entry": [{
910 "resource": {
911 "resourceType": "Patient",
912 "id": "test-patient"
913 }
914 }]
915 }"#;
916
917 let mut temp_file = NamedTempFile::new().unwrap();
918 temp_file.write_all(bundle_json.as_bytes()).unwrap();
919 temp_file.flush().unwrap();
920
921 let file_path = temp_file.path();
923 let file_url = format!("file://{}", file_path.to_string_lossy());
924
925 let result = data_source.load(&file_url).await;
927 assert!(result.is_ok());
928
929 #[cfg(feature = "R4")]
930 match result.unwrap() {
931 SofBundle::R4(bundle) => {
932 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
933 }
934 #[cfg(feature = "R4B")]
935 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
936 #[cfg(feature = "R5")]
937 SofBundle::R5(_) => panic!("Expected R4 bundle"),
938 #[cfg(feature = "R6")]
939 SofBundle::R6(_) => panic!("Expected R4 bundle"),
940 }
941 }
942
943 #[tokio::test]
944 async fn test_file_protocol_single_resource() {
945 use std::io::Write;
946 use tempfile::NamedTempFile;
947
948 let data_source = UniversalDataSource::new();
949
950 let patient_json = r#"{
952 "resourceType": "Patient",
953 "id": "test-patient",
954 "name": [{
955 "family": "Test",
956 "given": ["Patient"]
957 }]
958 }"#;
959
960 let mut temp_file = NamedTempFile::new().unwrap();
961 temp_file.write_all(patient_json.as_bytes()).unwrap();
962 temp_file.flush().unwrap();
963
964 let file_path = temp_file.path();
965 let file_url = format!("file://{}", file_path.to_string_lossy());
966
967 let result = data_source.load(&file_url).await;
969 assert!(result.is_ok());
970
971 #[cfg(feature = "R4")]
972 match result.unwrap() {
973 SofBundle::R4(bundle) => {
974 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
975 }
976 #[cfg(feature = "R4B")]
977 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
978 #[cfg(feature = "R5")]
979 SofBundle::R5(_) => panic!("Expected R4 bundle"),
980 #[cfg(feature = "R6")]
981 SofBundle::R6(_) => panic!("Expected R4 bundle"),
982 }
983 }
984
985 #[tokio::test]
986 async fn test_file_protocol_resource_array() {
987 use std::io::Write;
988 use tempfile::NamedTempFile;
989
990 let data_source = UniversalDataSource::new();
991
992 let resources_json = r#"[
994 {
995 "resourceType": "Patient",
996 "id": "patient-1"
997 },
998 {
999 "resourceType": "Patient",
1000 "id": "patient-2"
1001 },
1002 {
1003 "resourceType": "Observation",
1004 "id": "obs-1",
1005 "status": "final",
1006 "code": {
1007 "text": "Test"
1008 }
1009 }
1010 ]"#;
1011
1012 let mut temp_file = NamedTempFile::new().unwrap();
1013 temp_file.write_all(resources_json.as_bytes()).unwrap();
1014 temp_file.flush().unwrap();
1015
1016 let file_path = temp_file.path();
1017 let file_url = format!("file://{}", file_path.to_string_lossy());
1018
1019 let result = data_source.load(&file_url).await;
1021 assert!(result.is_ok());
1022
1023 #[cfg(feature = "R4")]
1024 match result.unwrap() {
1025 SofBundle::R4(bundle) => {
1026 assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1027 }
1028 #[cfg(feature = "R4B")]
1029 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1030 #[cfg(feature = "R5")]
1031 SofBundle::R5(_) => panic!("Expected R4 bundle"),
1032 #[cfg(feature = "R6")]
1033 SofBundle::R6(_) => panic!("Expected R4 bundle"),
1034 }
1035 }
1036
1037 #[tokio::test]
1038 async fn test_file_protocol_file_not_found() {
1039 use std::path::PathBuf;
1040 use url::Url;
1041
1042 let data_source = UniversalDataSource::new();
1043
1044 #[cfg(windows)]
1046 let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1047 #[cfg(not(windows))]
1048 let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1049
1050 let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1051
1052 let result = data_source.load(&file_url).await;
1053 assert!(result.is_err());
1054
1055 if let Err(SofError::SourceNotFound(msg)) = result {
1056 assert!(msg.contains("File not found"));
1057 } else {
1058 panic!("Expected SourceNotFound error");
1059 }
1060 }
1061
1062 #[tokio::test]
1063 async fn test_file_protocol_invalid_json() {
1064 use std::io::Write;
1065 use tempfile::NamedTempFile;
1066
1067 let data_source = UniversalDataSource::new();
1068
1069 let invalid_json = "{ this is not valid json }";
1071
1072 let mut temp_file = NamedTempFile::new().unwrap();
1073 temp_file.write_all(invalid_json.as_bytes()).unwrap();
1074 temp_file.flush().unwrap();
1075
1076 let file_path = temp_file.path();
1077 let file_url = format!("file://{}", file_path.to_string_lossy());
1078
1079 let result = data_source.load(&file_url).await;
1081 assert!(result.is_err());
1082
1083 if let Err(SofError::InvalidSourceContent(msg)) = result {
1084 assert!(msg.contains("Failed to parse JSON"));
1085 } else {
1086 panic!("Expected InvalidSourceContent error");
1087 }
1088 }
1089
1090 #[tokio::test]
1091 async fn test_file_protocol_invalid_fhir() {
1092 use std::io::Write;
1093 use tempfile::NamedTempFile;
1094
1095 let data_source = UniversalDataSource::new();
1096
1097 let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1099
1100 let mut temp_file = NamedTempFile::new().unwrap();
1101 temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1102 temp_file.flush().unwrap();
1103
1104 let file_path = temp_file.path();
1105 let file_url = format!("file://{}", file_path.to_string_lossy());
1106
1107 let result = data_source.load(&file_url).await;
1109 assert!(result.is_err());
1110
1111 if let Err(SofError::InvalidSourceContent(msg)) = result {
1112 assert!(msg.contains("not a valid FHIR resource"));
1113 } else {
1114 panic!("Expected InvalidSourceContent error, got {:?}", result);
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn test_file_protocol_invalid_url() {
1120 let data_source = UniversalDataSource::new();
1121
1122 let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1124 assert!(result.is_err());
1125 }
1127}