1use crate::{SofBundle, SofError};
114use async_trait::async_trait;
115use helios_fhir::Element;
116use object_store::{
117 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
118 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
119};
120use reqwest;
121use serde_json;
122use std::sync::Arc;
123use tokio::fs;
124use url::Url;
125
126#[async_trait]
128pub trait DataSource: Send + Sync {
129 async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
131}
132
133pub struct UniversalDataSource {
135 client: reqwest::Client,
136}
137
138impl UniversalDataSource {
139 pub fn new() -> Self {
140 Self {
141 client: reqwest::Client::builder()
142 .timeout(std::time::Duration::from_secs(30))
143 .build()
144 .unwrap_or_else(|_| reqwest::Client::new()),
145 }
146 }
147}
148
149impl Default for UniversalDataSource {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155#[async_trait]
156impl DataSource for UniversalDataSource {
157 async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
158 let url = Url::parse(source).map_err(|e| {
160 SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
161 })?;
162
163 match url.scheme() {
164 "file" => load_from_file(&url).await,
165 "http" | "https" => load_from_http(&self.client, &url).await,
166 "s3" => load_from_s3(&url).await,
167 "gs" => load_from_gcs(&url).await,
168 "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
169 scheme => Err(SofError::UnsupportedSourceProtocol(format!(
170 "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
171 scheme
172 ))),
173 }
174 }
175}
176
177async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
179 let path = url
181 .to_file_path()
182 .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
183
184 if !path.exists() {
186 return Err(SofError::SourceNotFound(format!(
187 "File not found: {}",
188 path.display()
189 )));
190 }
191
192 let contents = fs::read_to_string(&path)
194 .await
195 .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
196
197 parse_fhir_content(&contents, &path.to_string_lossy())
199}
200
201async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
203 let response = client
205 .get(url.as_str())
206 .header("Accept", "application/fhir+json, application/json")
207 .send()
208 .await
209 .map_err(|e| {
210 SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
211 })?;
212
213 if !response.status().is_success() {
215 return Err(SofError::SourceFetchError(format!(
216 "HTTP error {} when fetching '{}'",
217 response.status(),
218 url
219 )));
220 }
221
222 let contents = response
224 .text()
225 .await
226 .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
227
228 parse_fhir_content(&contents, url.as_str())
230}
231
232async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
234 let bucket = url.host_str().ok_or_else(|| {
236 SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
237 })?;
238
239 let path = url.path().trim_start_matches('/');
240 if path.is_empty() {
241 return Err(SofError::InvalidSource(format!(
242 "Invalid S3 URL '{}': missing object path",
243 url
244 )));
245 }
246
247 let builder = AmazonS3Builder::from_env().with_bucket_name(bucket);
251
252 let store = builder.build().map_err(|e| {
253 SofError::SourceFetchError(format!(
254 "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION). For S3-compatible services, set AWS_ENDPOINT (and AWS_ALLOW_HTTP=true for HTTP endpoints).",
255 url, e
256 ))
257 })?;
258
259 load_from_object_store(Arc::new(store), path, url.as_str()).await
260}
261
262async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
264 let bucket = url.host_str().ok_or_else(|| {
266 SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
267 })?;
268
269 let path = url.path().trim_start_matches('/');
270 if path.is_empty() {
271 return Err(SofError::InvalidSource(format!(
272 "Invalid GCS URL '{}': missing object path",
273 url
274 )));
275 }
276
277 let store = GoogleCloudStorageBuilder::new()
279 .with_bucket_name(bucket)
280 .build()
281 .map_err(|e| {
282 SofError::SourceFetchError(format!(
283 "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
284 url, e
285 ))
286 })?;
287
288 load_from_object_store(Arc::new(store), path, url.as_str()).await
289}
290
291async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
293 let (container, path) = if url.scheme() == "azure" {
295 let container = url.host_str().ok_or_else(|| {
297 SofError::InvalidSource(format!(
298 "Invalid Azure URL '{}': missing container name",
299 url
300 ))
301 })?;
302 let path = url.path().trim_start_matches('/');
303 (container.to_string(), path.to_string())
304 } else {
305 let host = url.host_str().ok_or_else(|| {
307 SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
308 })?;
309 let parts: Vec<&str> = host.split('@').collect();
310 if parts.len() != 2 {
311 return Err(SofError::InvalidSource(format!(
312 "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
313 url
314 )));
315 }
316 let container = parts[0];
317 let path = url.path().trim_start_matches('/');
318 (container.to_string(), path.to_string())
319 };
320
321 if path.is_empty() {
322 return Err(SofError::InvalidSource(format!(
323 "Invalid Azure URL '{}': missing blob path",
324 url
325 )));
326 }
327
328 let store = MicrosoftAzureBuilder::new()
330 .with_container_name(&container)
331 .build()
332 .map_err(|e| {
333 SofError::SourceFetchError(format!(
334 "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
335 url, e
336 ))
337 })?;
338
339 load_from_object_store(Arc::new(store), &path, url.as_str()).await
340}
341
342async fn load_from_object_store(
344 store: Arc<dyn ObjectStore>,
345 path: &str,
346 source_name: &str,
347) -> Result<SofBundle, SofError> {
348 let object_path = ObjectPath::from(path);
350
351 let result = store.get(&object_path).await.map_err(|e| match e {
353 object_store::Error::NotFound { .. } => {
354 SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
355 }
356 _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
357 })?;
358
359 let bytes = result
361 .bytes()
362 .await
363 .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
364
365 let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
367 SofError::InvalidSourceContent(format!(
368 "Content from '{}' is not valid UTF-8: {}",
369 source_name, e
370 ))
371 })?;
372
373 parse_fhir_content(&contents, source_name)
375}
376
377fn is_ndjson_extension(source_name: &str) -> bool {
379 source_name.to_lowercase().ends_with(".ndjson")
380}
381
382fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
384 let lines: Vec<&str> = contents
385 .lines()
386 .filter(|line| !line.trim().is_empty())
387 .collect();
388
389 if lines.is_empty() {
390 return Err(SofError::InvalidSourceContent(format!(
391 "Empty NDJSON content from '{}'",
392 source_name
393 )));
394 }
395
396 let mut resources = Vec::new();
398 let mut parse_errors = Vec::new();
399
400 for (line_num, line) in lines.iter().enumerate() {
401 match serde_json::from_str::<serde_json::Value>(line) {
402 Ok(value) => {
403 if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
405 resources.push(value);
406 } else {
407 parse_errors.push(format!(
408 "Line {}: Missing 'resourceType' field",
409 line_num + 1
410 ));
411 }
412 }
413 Err(e) => {
414 parse_errors.push(format!("Line {}: {}", line_num + 1, e));
415 }
416 }
417 }
418
419 if resources.is_empty() {
421 return Err(SofError::InvalidSourceContent(format!(
422 "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
423 source_name,
424 parse_errors.join("; ")
425 )));
426 }
427
428 if !parse_errors.is_empty() {
430 eprintln!(
431 "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
432 parse_errors.len(),
433 source_name,
434 parse_errors.join("; ")
435 );
436 }
437
438 let resources_array = serde_json::Value::Array(resources);
440 wrap_resources_in_bundle(resources_array, source_name)
441}
442
443pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
446 if is_ndjson_extension(source_name) {
448 return parse_ndjson_content(contents, source_name);
449 }
450
451 let value: serde_json::Value = match serde_json::from_str(contents) {
453 Ok(v) => v,
454 Err(json_err) => {
455 if contents.lines().count() > 1 {
458 return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
460 SofError::InvalidSourceContent(format!(
462 "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
463 source_name, json_err, ndjson_err
464 ))
465 });
466 }
467
468 return Err(SofError::InvalidSourceContent(format!(
470 "Failed to parse JSON from '{}': {}",
471 source_name, json_err
472 )));
473 }
474 };
475
476 if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
478 if resource_type == "Bundle" {
479 #[cfg(feature = "R4")]
481 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
482 return Ok(SofBundle::R4(bundle));
483 }
484 #[cfg(feature = "R4B")]
485 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
486 return Ok(SofBundle::R4B(bundle));
487 }
488 #[cfg(feature = "R5")]
489 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
490 return Ok(SofBundle::R5(bundle));
491 }
492 #[cfg(feature = "R6")]
493 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
494 return Ok(SofBundle::R6(bundle));
495 }
496 return Err(SofError::InvalidSourceContent(format!(
497 "Bundle from '{}' could not be parsed as any supported FHIR version",
498 source_name
499 )));
500 }
501
502 return wrap_resource_in_bundle(value, source_name);
504 }
505
506 if value.is_array() {
508 return wrap_resources_in_bundle(value, source_name);
509 }
510
511 Err(SofError::InvalidSourceContent(format!(
512 "Content from '{}' is not a valid FHIR resource or Bundle",
513 source_name
514 )))
515}
516
517fn wrap_resource_in_bundle(
519 resource: serde_json::Value,
520 source_name: &str,
521) -> Result<SofBundle, SofError> {
522 #[cfg(feature = "R4")]
525 if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
526 let mut bundle = helios_fhir::r4::Bundle::default();
527 bundle.r#type = Element {
528 id: None,
529 extension: None,
530 value: Some("collection".to_string()),
531 };
532 bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
533 resource: Some(res),
534 ..Default::default()
535 }]);
536 return Ok(SofBundle::R4(bundle));
537 }
538
539 #[cfg(feature = "R4B")]
541 if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
542 let mut bundle = helios_fhir::r4b::Bundle::default();
543 bundle.r#type = Element {
544 id: None,
545 extension: None,
546 value: Some("collection".to_string()),
547 };
548 bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
549 resource: Some(res),
550 ..Default::default()
551 }]);
552 return Ok(SofBundle::R4B(bundle));
553 }
554
555 #[cfg(feature = "R5")]
557 if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
558 let mut bundle = helios_fhir::r5::Bundle::default();
559 bundle.r#type = Element {
560 id: None,
561 extension: None,
562 value: Some("collection".to_string()),
563 };
564 bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
565 resource: Some(Box::new(res)),
566 ..Default::default()
567 }]);
568 return Ok(SofBundle::R5(bundle));
569 }
570
571 #[cfg(feature = "R6")]
573 if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
574 let mut bundle = helios_fhir::r6::Bundle::default();
575 bundle.r#type = Element {
576 id: None,
577 extension: None,
578 value: Some("collection".to_string()),
579 };
580 bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
581 resource: Some(Box::new(res)),
582 ..Default::default()
583 }]);
584 return Ok(SofBundle::R6(bundle));
585 }
586
587 Err(SofError::InvalidSourceContent(format!(
588 "Resource from '{}' could not be parsed as any supported FHIR version",
589 source_name
590 )))
591}
592
593fn wrap_resources_in_bundle(
595 resources: serde_json::Value,
596 source_name: &str,
597) -> Result<SofBundle, SofError> {
598 let arr = resources
599 .as_array()
600 .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
601
602 if arr.is_empty() {
603 return Err(SofError::InvalidSourceContent(format!(
604 "Empty array of resources from '{}'",
605 source_name
606 )));
607 }
608
609 let first = &arr[0];
611
612 #[cfg(feature = "R4")]
614 if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
615 let mut bundle = helios_fhir::r4::Bundle::default();
616 bundle.r#type = Element {
617 id: None,
618 extension: None,
619 value: Some("collection".to_string()),
620 };
621 let mut entries = Vec::new();
622
623 for resource in arr {
624 let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
625 .map_err(|e| {
626 SofError::InvalidSourceContent(format!(
627 "Failed to parse R4 resource from '{}': {}",
628 source_name, e
629 ))
630 })?;
631 entries.push(helios_fhir::r4::BundleEntry {
632 resource: Some(res),
633 ..Default::default()
634 });
635 }
636
637 bundle.entry = Some(entries);
638 return Ok(SofBundle::R4(bundle));
639 }
640
641 #[cfg(feature = "R4B")]
643 if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
644 let mut bundle = helios_fhir::r4b::Bundle::default();
645 bundle.r#type = Element {
646 id: None,
647 extension: None,
648 value: Some("collection".to_string()),
649 };
650 let mut entries = Vec::new();
651
652 for resource in arr {
653 let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
654 .map_err(|e| {
655 SofError::InvalidSourceContent(format!(
656 "Failed to parse R4B resource from '{}': {}",
657 source_name, e
658 ))
659 })?;
660 entries.push(helios_fhir::r4b::BundleEntry {
661 resource: Some(res),
662 ..Default::default()
663 });
664 }
665
666 bundle.entry = Some(entries);
667 return Ok(SofBundle::R4B(bundle));
668 }
669
670 #[cfg(feature = "R5")]
672 if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
673 let mut bundle = helios_fhir::r5::Bundle::default();
674 bundle.r#type = Element {
675 id: None,
676 extension: None,
677 value: Some("collection".to_string()),
678 };
679 let mut entries = Vec::new();
680
681 for resource in arr {
682 let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
683 .map_err(|e| {
684 SofError::InvalidSourceContent(format!(
685 "Failed to parse R5 resource from '{}': {}",
686 source_name, e
687 ))
688 })?;
689 entries.push(helios_fhir::r5::BundleEntry {
690 resource: Some(Box::new(res)),
691 ..Default::default()
692 });
693 }
694
695 bundle.entry = Some(entries);
696 return Ok(SofBundle::R5(bundle));
697 }
698
699 #[cfg(feature = "R6")]
701 if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
702 let mut bundle = helios_fhir::r6::Bundle::default();
703 bundle.r#type = Element {
704 id: None,
705 extension: None,
706 value: Some("collection".to_string()),
707 };
708 let mut entries = Vec::new();
709
710 for resource in arr {
711 let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
712 .map_err(|e| {
713 SofError::InvalidSourceContent(format!(
714 "Failed to parse R6 resource from '{}': {}",
715 source_name, e
716 ))
717 })?;
718 entries.push(helios_fhir::r6::BundleEntry {
719 resource: Some(Box::new(res)),
720 ..Default::default()
721 });
722 }
723
724 bundle.entry = Some(entries);
725 return Ok(SofBundle::R6(bundle));
726 }
727
728 Err(SofError::InvalidSourceContent(format!(
729 "Resources from '{}' could not be parsed as any supported FHIR version",
730 source_name
731 )))
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737
738 #[tokio::test]
739 async fn test_parse_fhir_bundle() {
740 let bundle_json = r#"{
741 "resourceType": "Bundle",
742 "type": "collection",
743 "entry": [{
744 "resource": {
745 "resourceType": "Patient",
746 "id": "123"
747 }
748 }]
749 }"#;
750
751 let result = parse_fhir_content(bundle_json, "test").unwrap();
752 #[cfg(feature = "R4")]
753 assert!(matches!(result, SofBundle::R4(_)));
754 #[cfg(not(feature = "R4"))]
755 assert!(matches!(result, _));
756 }
757
758 #[tokio::test]
759 async fn test_parse_single_resource() {
760 let patient_json = r#"{
761 "resourceType": "Patient",
762 "id": "123"
763 }"#;
764
765 let result = parse_fhir_content(patient_json, "test").unwrap();
766 #[cfg(feature = "R4")]
767 match result {
768 SofBundle::R4(bundle) => {
769 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
770 }
771 #[cfg(feature = "R4B")]
772 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
773 #[cfg(feature = "R5")]
774 SofBundle::R5(_) => panic!("Expected R4 bundle"),
775 #[cfg(feature = "R6")]
776 SofBundle::R6(_) => panic!("Expected R4 bundle"),
777 }
778 }
779
780 #[tokio::test]
781 async fn test_parse_resource_array() {
782 let resources_json = r#"[
783 {
784 "resourceType": "Patient",
785 "id": "123"
786 },
787 {
788 "resourceType": "Patient",
789 "id": "456"
790 }
791 ]"#;
792
793 let result = parse_fhir_content(resources_json, "test").unwrap();
794 #[cfg(feature = "R4")]
795 match result {
796 SofBundle::R4(bundle) => {
797 assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
798 }
799 #[cfg(feature = "R4B")]
800 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
801 #[cfg(feature = "R5")]
802 SofBundle::R5(_) => panic!("Expected R4 bundle"),
803 #[cfg(feature = "R6")]
804 SofBundle::R6(_) => panic!("Expected R4 bundle"),
805 }
806 }
807
808 #[tokio::test]
809 async fn test_invalid_content() {
810 let invalid_json = r#"{"not": "fhir"}"#;
811 let result = parse_fhir_content(invalid_json, "test");
812 assert!(result.is_err());
813 }
814
815 #[tokio::test]
816 async fn test_s3_url_parsing() {
817 let data_source = UniversalDataSource::new();
818
819 let result = data_source.load("s3:///path/to/file.json").await;
821 assert!(result.is_err());
822 if let Err(SofError::InvalidSource(msg)) = result {
823 assert!(msg.contains("missing bucket name"));
824 }
825
826 let result = data_source.load("s3://bucket/").await;
828 assert!(result.is_err());
829 if let Err(SofError::InvalidSource(msg)) = result {
830 assert!(msg.contains("missing object path"));
831 }
832
833 }
836
837 #[tokio::test]
838 async fn test_gcs_url_parsing() {
839 let data_source = UniversalDataSource::new();
840
841 let result = data_source.load("gs:///path/to/file.json").await;
843 assert!(result.is_err());
844 if let Err(SofError::InvalidSource(msg)) = result {
845 assert!(msg.contains("missing bucket name"));
846 }
847
848 let result = data_source.load("gs://bucket/").await;
850 assert!(result.is_err());
851 if let Err(SofError::InvalidSource(msg)) = result {
852 assert!(msg.contains("missing object path"));
853 }
854 }
855
856 #[tokio::test]
857 async fn test_azure_url_parsing() {
858 let data_source = UniversalDataSource::new();
859
860 let result = data_source.load("azure:///path/to/file.json").await;
862 assert!(result.is_err());
863 if let Err(SofError::InvalidSource(msg)) = result {
864 assert!(msg.contains("missing container name"));
865 }
866
867 let result = data_source.load("azure://container/").await;
869 assert!(result.is_err());
870 if let Err(SofError::InvalidSource(msg)) = result {
871 assert!(msg.contains("missing blob path"));
872 }
873 }
874
875 #[tokio::test]
876 async fn test_unsupported_protocol() {
877 let data_source = UniversalDataSource::new();
878
879 let result = data_source.load("ftp://server/file.json").await;
881 assert!(result.is_err());
882 if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
883 assert!(msg.contains("Unsupported source protocol: ftp"));
884 assert!(msg.contains("Supported:"));
885 }
886 }
887
888 #[tokio::test]
889 async fn test_file_protocol_bundle() {
890 use std::io::Write;
891 use tempfile::NamedTempFile;
892
893 let data_source = UniversalDataSource::new();
894
895 let bundle_json = r#"{
897 "resourceType": "Bundle",
898 "type": "collection",
899 "entry": [{
900 "resource": {
901 "resourceType": "Patient",
902 "id": "test-patient"
903 }
904 }]
905 }"#;
906
907 let mut temp_file = NamedTempFile::new().unwrap();
908 temp_file.write_all(bundle_json.as_bytes()).unwrap();
909 temp_file.flush().unwrap();
910
911 let file_path = temp_file.path();
913 let file_url = format!("file://{}", file_path.to_string_lossy());
914
915 let result = data_source.load(&file_url).await;
917 assert!(result.is_ok());
918
919 #[cfg(feature = "R4")]
920 match result.unwrap() {
921 SofBundle::R4(bundle) => {
922 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
923 }
924 #[cfg(feature = "R4B")]
925 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
926 #[cfg(feature = "R5")]
927 SofBundle::R5(_) => panic!("Expected R4 bundle"),
928 #[cfg(feature = "R6")]
929 SofBundle::R6(_) => panic!("Expected R4 bundle"),
930 }
931 }
932
933 #[tokio::test]
934 async fn test_file_protocol_single_resource() {
935 use std::io::Write;
936 use tempfile::NamedTempFile;
937
938 let data_source = UniversalDataSource::new();
939
940 let patient_json = r#"{
942 "resourceType": "Patient",
943 "id": "test-patient",
944 "name": [{
945 "family": "Test",
946 "given": ["Patient"]
947 }]
948 }"#;
949
950 let mut temp_file = NamedTempFile::new().unwrap();
951 temp_file.write_all(patient_json.as_bytes()).unwrap();
952 temp_file.flush().unwrap();
953
954 let file_path = temp_file.path();
955 let file_url = format!("file://{}", file_path.to_string_lossy());
956
957 let result = data_source.load(&file_url).await;
959 assert!(result.is_ok());
960
961 #[cfg(feature = "R4")]
962 match result.unwrap() {
963 SofBundle::R4(bundle) => {
964 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
965 }
966 #[cfg(feature = "R4B")]
967 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
968 #[cfg(feature = "R5")]
969 SofBundle::R5(_) => panic!("Expected R4 bundle"),
970 #[cfg(feature = "R6")]
971 SofBundle::R6(_) => panic!("Expected R4 bundle"),
972 }
973 }
974
975 #[tokio::test]
976 async fn test_file_protocol_resource_array() {
977 use std::io::Write;
978 use tempfile::NamedTempFile;
979
980 let data_source = UniversalDataSource::new();
981
982 let resources_json = r#"[
984 {
985 "resourceType": "Patient",
986 "id": "patient-1"
987 },
988 {
989 "resourceType": "Patient",
990 "id": "patient-2"
991 },
992 {
993 "resourceType": "Observation",
994 "id": "obs-1",
995 "status": "final",
996 "code": {
997 "text": "Test"
998 }
999 }
1000 ]"#;
1001
1002 let mut temp_file = NamedTempFile::new().unwrap();
1003 temp_file.write_all(resources_json.as_bytes()).unwrap();
1004 temp_file.flush().unwrap();
1005
1006 let file_path = temp_file.path();
1007 let file_url = format!("file://{}", file_path.to_string_lossy());
1008
1009 let result = data_source.load(&file_url).await;
1011 assert!(result.is_ok());
1012
1013 #[cfg(feature = "R4")]
1014 match result.unwrap() {
1015 SofBundle::R4(bundle) => {
1016 assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1017 }
1018 #[cfg(feature = "R4B")]
1019 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1020 #[cfg(feature = "R5")]
1021 SofBundle::R5(_) => panic!("Expected R4 bundle"),
1022 #[cfg(feature = "R6")]
1023 SofBundle::R6(_) => panic!("Expected R4 bundle"),
1024 }
1025 }
1026
1027 #[tokio::test]
1028 async fn test_file_protocol_file_not_found() {
1029 use std::path::PathBuf;
1030 use url::Url;
1031
1032 let data_source = UniversalDataSource::new();
1033
1034 #[cfg(windows)]
1036 let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1037 #[cfg(not(windows))]
1038 let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1039
1040 let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1041
1042 let result = data_source.load(&file_url).await;
1043 assert!(result.is_err());
1044
1045 if let Err(SofError::SourceNotFound(msg)) = result {
1046 assert!(msg.contains("File not found"));
1047 } else {
1048 panic!("Expected SourceNotFound error");
1049 }
1050 }
1051
1052 #[tokio::test]
1053 async fn test_file_protocol_invalid_json() {
1054 use std::io::Write;
1055 use tempfile::NamedTempFile;
1056
1057 let data_source = UniversalDataSource::new();
1058
1059 let invalid_json = "{ this is not valid json }";
1061
1062 let mut temp_file = NamedTempFile::new().unwrap();
1063 temp_file.write_all(invalid_json.as_bytes()).unwrap();
1064 temp_file.flush().unwrap();
1065
1066 let file_path = temp_file.path();
1067 let file_url = format!("file://{}", file_path.to_string_lossy());
1068
1069 let result = data_source.load(&file_url).await;
1071 assert!(result.is_err());
1072
1073 if let Err(SofError::InvalidSourceContent(msg)) = result {
1074 assert!(msg.contains("Failed to parse JSON"));
1075 } else {
1076 panic!("Expected InvalidSourceContent error");
1077 }
1078 }
1079
1080 #[tokio::test]
1081 async fn test_file_protocol_invalid_fhir() {
1082 use std::io::Write;
1083 use tempfile::NamedTempFile;
1084
1085 let data_source = UniversalDataSource::new();
1086
1087 let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1089
1090 let mut temp_file = NamedTempFile::new().unwrap();
1091 temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1092 temp_file.flush().unwrap();
1093
1094 let file_path = temp_file.path();
1095 let file_url = format!("file://{}", file_path.to_string_lossy());
1096
1097 let result = data_source.load(&file_url).await;
1099 assert!(result.is_err());
1100
1101 if let Err(SofError::InvalidSourceContent(msg)) = result {
1102 assert!(msg.contains("not a valid FHIR resource"));
1103 } else {
1104 panic!("Expected InvalidSourceContent error, got {:?}", result);
1105 }
1106 }
1107
1108 #[tokio::test]
1109 async fn test_file_protocol_invalid_url() {
1110 let data_source = UniversalDataSource::new();
1111
1112 let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1114 assert!(result.is_err());
1115 }
1117}