helios_sof/
data_source.rs

1use crate::{SofBundle, SofError};
2use async_trait::async_trait;
3use helios_fhir::Element;
4use object_store::{
5    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
6    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
7};
8use reqwest;
9use serde_json;
10use std::sync::Arc;
11use tokio::fs;
12use url::Url;
13
14/// Trait for loading FHIR data from various sources
15#[async_trait]
16pub trait DataSource: Send + Sync {
17    /// Load FHIR data from the source and return as a Bundle
18    async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
19}
20
21/// Implementation for loading data from various sources based on URL scheme
22pub struct UniversalDataSource {
23    client: reqwest::Client,
24}
25
26impl UniversalDataSource {
27    pub fn new() -> Self {
28        Self {
29            client: reqwest::Client::builder()
30                .timeout(std::time::Duration::from_secs(30))
31                .build()
32                .unwrap_or_else(|_| reqwest::Client::new()),
33        }
34    }
35}
36
37impl Default for UniversalDataSource {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43#[async_trait]
44impl DataSource for UniversalDataSource {
45    async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
46        // Parse the source as a URL to determine the protocol
47        let url = Url::parse(source).map_err(|e| {
48            SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
49        })?;
50
51        match url.scheme() {
52            "file" => load_from_file(&url).await,
53            "http" | "https" => load_from_http(&self.client, &url).await,
54            "s3" => load_from_s3(&url).await,
55            "gs" => load_from_gcs(&url).await,
56            "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
57            scheme => Err(SofError::UnsupportedSourceProtocol(format!(
58                "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
59                scheme
60            ))),
61        }
62    }
63}
64
65/// Load FHIR data from a local file
66async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
67    // Convert file URL to path
68    let path = url
69        .to_file_path()
70        .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
71
72    // Check if file exists
73    if !path.exists() {
74        return Err(SofError::SourceNotFound(format!(
75            "File not found: {}",
76            path.display()
77        )));
78    }
79
80    // Read file contents
81    let contents = fs::read_to_string(&path)
82        .await
83        .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
84
85    // Parse and convert to bundle
86    parse_fhir_content(&contents, &path.to_string_lossy())
87}
88
89/// Load FHIR data from HTTP/HTTPS URL
90async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
91    // Fetch content from URL
92    let response = client
93        .get(url.as_str())
94        .header("Accept", "application/fhir+json, application/json")
95        .send()
96        .await
97        .map_err(|e| {
98            SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
99        })?;
100
101    // Check response status
102    if !response.status().is_success() {
103        return Err(SofError::SourceFetchError(format!(
104            "HTTP error {} when fetching '{}'",
105            response.status(),
106            url
107        )));
108    }
109
110    // Get content
111    let contents = response
112        .text()
113        .await
114        .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
115
116    // Parse and convert to bundle
117    parse_fhir_content(&contents, url.as_str())
118}
119
120/// Load FHIR data from AWS S3
121async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
122    // Parse S3 URL: s3://bucket/path/to/object
123    let bucket = url.host_str().ok_or_else(|| {
124        SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
125    })?;
126
127    let path = url.path().trim_start_matches('/');
128    if path.is_empty() {
129        return Err(SofError::InvalidSource(format!(
130            "Invalid S3 URL '{}': missing object path",
131            url
132        )));
133    }
134
135    // Create S3 client using environment variables or default credentials
136    let store = AmazonS3Builder::new()
137        .with_bucket_name(bucket)
138        .build()
139        .map_err(|e| {
140            SofError::SourceFetchError(format!(
141                "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
142                url, e
143            ))
144        })?;
145
146    load_from_object_store(Arc::new(store), path, url.as_str()).await
147}
148
149/// Load FHIR data from Google Cloud Storage
150async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
151    // Parse GCS URL: gs://bucket/path/to/object
152    let bucket = url.host_str().ok_or_else(|| {
153        SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
154    })?;
155
156    let path = url.path().trim_start_matches('/');
157    if path.is_empty() {
158        return Err(SofError::InvalidSource(format!(
159            "Invalid GCS URL '{}': missing object path",
160            url
161        )));
162    }
163
164    // Create GCS client using environment variables or default credentials
165    let store = GoogleCloudStorageBuilder::new()
166        .with_bucket_name(bucket)
167        .build()
168        .map_err(|e| {
169            SofError::SourceFetchError(format!(
170                "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
171                url, e
172            ))
173        })?;
174
175    load_from_object_store(Arc::new(store), path, url.as_str()).await
176}
177
178/// Load FHIR data from Azure Blob Storage
179async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
180    // Parse Azure URL: azure://container/path/to/object or abfss://container@account.dfs.core.windows.net/path
181    let (container, path) = if url.scheme() == "azure" {
182        // Simple format: azure://container/path
183        let container = url.host_str().ok_or_else(|| {
184            SofError::InvalidSource(format!(
185                "Invalid Azure URL '{}': missing container name",
186                url
187            ))
188        })?;
189        let path = url.path().trim_start_matches('/');
190        (container.to_string(), path.to_string())
191    } else {
192        // ABFSS format: abfss://container@account.dfs.core.windows.net/path
193        let host = url.host_str().ok_or_else(|| {
194            SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
195        })?;
196        let parts: Vec<&str> = host.split('@').collect();
197        if parts.len() != 2 {
198            return Err(SofError::InvalidSource(format!(
199                "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
200                url
201            )));
202        }
203        let container = parts[0];
204        let path = url.path().trim_start_matches('/');
205        (container.to_string(), path.to_string())
206    };
207
208    if path.is_empty() {
209        return Err(SofError::InvalidSource(format!(
210            "Invalid Azure URL '{}': missing blob path",
211            url
212        )));
213    }
214
215    // Create Azure client using environment variables or managed identity
216    let store = MicrosoftAzureBuilder::new()
217        .with_container_name(&container)
218        .build()
219        .map_err(|e| {
220            SofError::SourceFetchError(format!(
221                "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
222                url, e
223            ))
224        })?;
225
226    load_from_object_store(Arc::new(store), &path, url.as_str()).await
227}
228
229/// Common function to load from any object store
230async fn load_from_object_store(
231    store: Arc<dyn ObjectStore>,
232    path: &str,
233    source_name: &str,
234) -> Result<SofBundle, SofError> {
235    // Create object path
236    let object_path = ObjectPath::from(path);
237
238    // Download the object
239    let result = store.get(&object_path).await.map_err(|e| match e {
240        object_store::Error::NotFound { .. } => {
241            SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
242        }
243        _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
244    })?;
245
246    // Read the bytes
247    let bytes = result
248        .bytes()
249        .await
250        .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
251
252    // Convert to string
253    let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
254        SofError::InvalidSourceContent(format!(
255            "Content from '{}' is not valid UTF-8: {}",
256            source_name, e
257        ))
258    })?;
259
260    // Parse and convert to bundle
261    parse_fhir_content(&contents, source_name)
262}
263
264/// Check if a source name suggests NDJSON format based on file extension
265fn is_ndjson_extension(source_name: &str) -> bool {
266    source_name.to_lowercase().ends_with(".ndjson")
267}
268
269/// Parse NDJSON content (newline-delimited JSON) and convert to SofBundle
270fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
271    let lines: Vec<&str> = contents
272        .lines()
273        .filter(|line| !line.trim().is_empty())
274        .collect();
275
276    if lines.is_empty() {
277        return Err(SofError::InvalidSourceContent(format!(
278            "Empty NDJSON content from '{}'",
279            source_name
280        )));
281    }
282
283    // Parse each line as a separate JSON resource
284    let mut resources = Vec::new();
285    let mut parse_errors = Vec::new();
286
287    for (line_num, line) in lines.iter().enumerate() {
288        match serde_json::from_str::<serde_json::Value>(line) {
289            Ok(value) => {
290                // Verify it's a FHIR resource
291                if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
292                    resources.push(value);
293                } else {
294                    parse_errors.push(format!(
295                        "Line {}: Missing 'resourceType' field",
296                        line_num + 1
297                    ));
298                }
299            }
300            Err(e) => {
301                parse_errors.push(format!("Line {}: {}", line_num + 1, e));
302            }
303        }
304    }
305
306    // If we have some valid resources, proceed even if some lines failed
307    if resources.is_empty() {
308        return Err(SofError::InvalidSourceContent(format!(
309            "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
310            source_name,
311            parse_errors.join("; ")
312        )));
313    }
314
315    // Log warnings for failed lines (in production, you might want to use a proper logger)
316    if !parse_errors.is_empty() {
317        eprintln!(
318            "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
319            parse_errors.len(),
320            source_name,
321            parse_errors.join("; ")
322        );
323    }
324
325    // Wrap all resources in a Bundle
326    let resources_array = serde_json::Value::Array(resources);
327    wrap_resources_in_bundle(resources_array, source_name)
328}
329
330/// Parse FHIR content and convert to SofBundle
331/// Supports both JSON and NDJSON formats with automatic detection
332pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
333    // Check if the source suggests NDJSON format based on file extension
334    if is_ndjson_extension(source_name) {
335        return parse_ndjson_content(contents, source_name);
336    }
337
338    // Try to parse as regular JSON first
339    let value: serde_json::Value = match serde_json::from_str(contents) {
340        Ok(v) => v,
341        Err(json_err) => {
342            // JSON parsing failed, try NDJSON as fallback (content-based detection)
343            // This handles cases where .json files actually contain NDJSON content
344            if contents.lines().count() > 1 {
345                // Multiple lines suggest it might be NDJSON
346                return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
347                    // If both fail, return the original JSON error with a helpful message
348                    SofError::InvalidSourceContent(format!(
349                        "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
350                        source_name, json_err, ndjson_err
351                    ))
352                });
353            }
354
355            // Single line or regular JSON error
356            return Err(SofError::InvalidSourceContent(format!(
357                "Failed to parse JSON from '{}': {}",
358                source_name, json_err
359            )));
360        }
361    };
362
363    // Check if it's already a Bundle
364    if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
365        if resource_type == "Bundle" {
366            // Try parsing as each FHIR version
367            #[cfg(feature = "R4")]
368            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
369                return Ok(SofBundle::R4(bundle));
370            }
371            #[cfg(feature = "R4B")]
372            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
373                return Ok(SofBundle::R4B(bundle));
374            }
375            #[cfg(feature = "R5")]
376            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
377                return Ok(SofBundle::R5(bundle));
378            }
379            #[cfg(feature = "R6")]
380            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
381                return Ok(SofBundle::R6(bundle));
382            }
383            return Err(SofError::InvalidSourceContent(format!(
384                "Bundle from '{}' could not be parsed as any supported FHIR version",
385                source_name
386            )));
387        }
388
389        // It's a single resource - wrap it in a Bundle
390        return wrap_resource_in_bundle(value, source_name);
391    }
392
393    // Check if it's an array of resources
394    if value.is_array() {
395        return wrap_resources_in_bundle(value, source_name);
396    }
397
398    Err(SofError::InvalidSourceContent(format!(
399        "Content from '{}' is not a valid FHIR resource or Bundle",
400        source_name
401    )))
402}
403
404/// Wrap a single resource in a Bundle
405fn wrap_resource_in_bundle(
406    resource: serde_json::Value,
407    source_name: &str,
408) -> Result<SofBundle, SofError> {
409    // Try each FHIR version
410    // R4
411    #[cfg(feature = "R4")]
412    if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
413        let mut bundle = helios_fhir::r4::Bundle::default();
414        bundle.r#type = Element {
415            id: None,
416            extension: None,
417            value: Some("collection".to_string()),
418        };
419        bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
420            resource: Some(res),
421            ..Default::default()
422        }]);
423        return Ok(SofBundle::R4(bundle));
424    }
425
426    // R4B
427    #[cfg(feature = "R4B")]
428    if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
429        let mut bundle = helios_fhir::r4b::Bundle::default();
430        bundle.r#type = Element {
431            id: None,
432            extension: None,
433            value: Some("collection".to_string()),
434        };
435        bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
436            resource: Some(res),
437            ..Default::default()
438        }]);
439        return Ok(SofBundle::R4B(bundle));
440    }
441
442    // R5
443    #[cfg(feature = "R5")]
444    if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
445        let mut bundle = helios_fhir::r5::Bundle::default();
446        bundle.r#type = Element {
447            id: None,
448            extension: None,
449            value: Some("collection".to_string()),
450        };
451        bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
452            resource: Some(Box::new(res)),
453            ..Default::default()
454        }]);
455        return Ok(SofBundle::R5(bundle));
456    }
457
458    // R6
459    #[cfg(feature = "R6")]
460    if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
461        let mut bundle = helios_fhir::r6::Bundle::default();
462        bundle.r#type = Element {
463            id: None,
464            extension: None,
465            value: Some("collection".to_string()),
466        };
467        bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
468            resource: Some(Box::new(res)),
469            ..Default::default()
470        }]);
471        return Ok(SofBundle::R6(bundle));
472    }
473
474    Err(SofError::InvalidSourceContent(format!(
475        "Resource from '{}' could not be parsed as any supported FHIR version",
476        source_name
477    )))
478}
479
480/// Wrap an array of resources in a Bundle
481fn wrap_resources_in_bundle(
482    resources: serde_json::Value,
483    source_name: &str,
484) -> Result<SofBundle, SofError> {
485    let arr = resources
486        .as_array()
487        .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
488
489    if arr.is_empty() {
490        return Err(SofError::InvalidSourceContent(format!(
491            "Empty array of resources from '{}'",
492            source_name
493        )));
494    }
495
496    // Try to parse the first resource to determine version
497    let first = &arr[0];
498
499    // Try R4
500    #[cfg(feature = "R4")]
501    if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
502        let mut bundle = helios_fhir::r4::Bundle::default();
503        bundle.r#type = Element {
504            id: None,
505            extension: None,
506            value: Some("collection".to_string()),
507        };
508        let mut entries = Vec::new();
509
510        for resource in arr {
511            let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
512                .map_err(|e| {
513                    SofError::InvalidSourceContent(format!(
514                        "Failed to parse R4 resource from '{}': {}",
515                        source_name, e
516                    ))
517                })?;
518            entries.push(helios_fhir::r4::BundleEntry {
519                resource: Some(res),
520                ..Default::default()
521            });
522        }
523
524        bundle.entry = Some(entries);
525        return Ok(SofBundle::R4(bundle));
526    }
527
528    // Try R4B
529    #[cfg(feature = "R4B")]
530    if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
531        let mut bundle = helios_fhir::r4b::Bundle::default();
532        bundle.r#type = Element {
533            id: None,
534            extension: None,
535            value: Some("collection".to_string()),
536        };
537        let mut entries = Vec::new();
538
539        for resource in arr {
540            let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
541                .map_err(|e| {
542                    SofError::InvalidSourceContent(format!(
543                        "Failed to parse R4B resource from '{}': {}",
544                        source_name, e
545                    ))
546                })?;
547            entries.push(helios_fhir::r4b::BundleEntry {
548                resource: Some(res),
549                ..Default::default()
550            });
551        }
552
553        bundle.entry = Some(entries);
554        return Ok(SofBundle::R4B(bundle));
555    }
556
557    // Try R5
558    #[cfg(feature = "R5")]
559    if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
560        let mut bundle = helios_fhir::r5::Bundle::default();
561        bundle.r#type = Element {
562            id: None,
563            extension: None,
564            value: Some("collection".to_string()),
565        };
566        let mut entries = Vec::new();
567
568        for resource in arr {
569            let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
570                .map_err(|e| {
571                    SofError::InvalidSourceContent(format!(
572                        "Failed to parse R5 resource from '{}': {}",
573                        source_name, e
574                    ))
575                })?;
576            entries.push(helios_fhir::r5::BundleEntry {
577                resource: Some(Box::new(res)),
578                ..Default::default()
579            });
580        }
581
582        bundle.entry = Some(entries);
583        return Ok(SofBundle::R5(bundle));
584    }
585
586    // Try R6
587    #[cfg(feature = "R6")]
588    if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
589        let mut bundle = helios_fhir::r6::Bundle::default();
590        bundle.r#type = Element {
591            id: None,
592            extension: None,
593            value: Some("collection".to_string()),
594        };
595        let mut entries = Vec::new();
596
597        for resource in arr {
598            let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
599                .map_err(|e| {
600                    SofError::InvalidSourceContent(format!(
601                        "Failed to parse R6 resource from '{}': {}",
602                        source_name, e
603                    ))
604                })?;
605            entries.push(helios_fhir::r6::BundleEntry {
606                resource: Some(Box::new(res)),
607                ..Default::default()
608            });
609        }
610
611        bundle.entry = Some(entries);
612        return Ok(SofBundle::R6(bundle));
613    }
614
615    Err(SofError::InvalidSourceContent(format!(
616        "Resources from '{}' could not be parsed as any supported FHIR version",
617        source_name
618    )))
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624
625    #[tokio::test]
626    async fn test_parse_fhir_bundle() {
627        let bundle_json = r#"{
628            "resourceType": "Bundle",
629            "type": "collection",
630            "entry": [{
631                "resource": {
632                    "resourceType": "Patient",
633                    "id": "123"
634                }
635            }]
636        }"#;
637
638        let result = parse_fhir_content(bundle_json, "test").unwrap();
639        #[cfg(feature = "R4")]
640        assert!(matches!(result, SofBundle::R4(_)));
641        #[cfg(not(feature = "R4"))]
642        assert!(matches!(result, _));
643    }
644
645    #[tokio::test]
646    async fn test_parse_single_resource() {
647        let patient_json = r#"{
648            "resourceType": "Patient",
649            "id": "123"
650        }"#;
651
652        let result = parse_fhir_content(patient_json, "test").unwrap();
653        #[cfg(feature = "R4")]
654        match result {
655            SofBundle::R4(bundle) => {
656                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
657            }
658            #[cfg(feature = "R4B")]
659            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
660            #[cfg(feature = "R5")]
661            SofBundle::R5(_) => panic!("Expected R4 bundle"),
662            #[cfg(feature = "R6")]
663            SofBundle::R6(_) => panic!("Expected R4 bundle"),
664        }
665    }
666
667    #[tokio::test]
668    async fn test_parse_resource_array() {
669        let resources_json = r#"[
670            {
671                "resourceType": "Patient",
672                "id": "123"
673            },
674            {
675                "resourceType": "Patient",
676                "id": "456"
677            }
678        ]"#;
679
680        let result = parse_fhir_content(resources_json, "test").unwrap();
681        #[cfg(feature = "R4")]
682        match result {
683            SofBundle::R4(bundle) => {
684                assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
685            }
686            #[cfg(feature = "R4B")]
687            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
688            #[cfg(feature = "R5")]
689            SofBundle::R5(_) => panic!("Expected R4 bundle"),
690            #[cfg(feature = "R6")]
691            SofBundle::R6(_) => panic!("Expected R4 bundle"),
692        }
693    }
694
695    #[tokio::test]
696    async fn test_invalid_content() {
697        let invalid_json = r#"{"not": "fhir"}"#;
698        let result = parse_fhir_content(invalid_json, "test");
699        assert!(result.is_err());
700    }
701
702    #[tokio::test]
703    async fn test_s3_url_parsing() {
704        let data_source = UniversalDataSource::new();
705
706        // Test invalid S3 URL without bucket
707        let result = data_source.load("s3:///path/to/file.json").await;
708        assert!(result.is_err());
709        if let Err(SofError::InvalidSource(msg)) = result {
710            assert!(msg.contains("missing bucket name"));
711        }
712
713        // Test invalid S3 URL without path
714        let result = data_source.load("s3://bucket/").await;
715        assert!(result.is_err());
716        if let Err(SofError::InvalidSource(msg)) = result {
717            assert!(msg.contains("missing object path"));
718        }
719
720        // Note: Actual S3 fetching would require valid credentials and a real bucket
721        // These tests verify URL parsing and error handling
722    }
723
724    #[tokio::test]
725    async fn test_gcs_url_parsing() {
726        let data_source = UniversalDataSource::new();
727
728        // Test invalid GCS URL without bucket
729        let result = data_source.load("gs:///path/to/file.json").await;
730        assert!(result.is_err());
731        if let Err(SofError::InvalidSource(msg)) = result {
732            assert!(msg.contains("missing bucket name"));
733        }
734
735        // Test invalid GCS URL without path
736        let result = data_source.load("gs://bucket/").await;
737        assert!(result.is_err());
738        if let Err(SofError::InvalidSource(msg)) = result {
739            assert!(msg.contains("missing object path"));
740        }
741    }
742
743    #[tokio::test]
744    async fn test_azure_url_parsing() {
745        let data_source = UniversalDataSource::new();
746
747        // Test invalid Azure URL without container
748        let result = data_source.load("azure:///path/to/file.json").await;
749        assert!(result.is_err());
750        if let Err(SofError::InvalidSource(msg)) = result {
751            assert!(msg.contains("missing container name"));
752        }
753
754        // Test invalid Azure URL without path
755        let result = data_source.load("azure://container/").await;
756        assert!(result.is_err());
757        if let Err(SofError::InvalidSource(msg)) = result {
758            assert!(msg.contains("missing blob path"));
759        }
760    }
761
762    #[tokio::test]
763    async fn test_unsupported_protocol() {
764        let data_source = UniversalDataSource::new();
765
766        // Test unsupported protocol
767        let result = data_source.load("ftp://server/file.json").await;
768        assert!(result.is_err());
769        if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
770            assert!(msg.contains("Unsupported source protocol: ftp"));
771            assert!(msg.contains("Supported:"));
772        }
773    }
774
775    #[tokio::test]
776    async fn test_file_protocol_bundle() {
777        use std::io::Write;
778        use tempfile::NamedTempFile;
779
780        let data_source = UniversalDataSource::new();
781
782        // Create a temporary file with a FHIR Bundle
783        let bundle_json = r#"{
784            "resourceType": "Bundle",
785            "type": "collection",
786            "entry": [{
787                "resource": {
788                    "resourceType": "Patient",
789                    "id": "test-patient"
790                }
791            }]
792        }"#;
793
794        let mut temp_file = NamedTempFile::new().unwrap();
795        temp_file.write_all(bundle_json.as_bytes()).unwrap();
796        temp_file.flush().unwrap();
797
798        // Get the file path and convert to file:// URL
799        let file_path = temp_file.path();
800        let file_url = format!("file://{}", file_path.to_string_lossy());
801
802        // Test loading from file:// URL
803        let result = data_source.load(&file_url).await;
804        assert!(result.is_ok());
805
806        #[cfg(feature = "R4")]
807        match result.unwrap() {
808            SofBundle::R4(bundle) => {
809                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
810            }
811            #[cfg(feature = "R4B")]
812            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
813            #[cfg(feature = "R5")]
814            SofBundle::R5(_) => panic!("Expected R4 bundle"),
815            #[cfg(feature = "R6")]
816            SofBundle::R6(_) => panic!("Expected R4 bundle"),
817        }
818    }
819
820    #[tokio::test]
821    async fn test_file_protocol_single_resource() {
822        use std::io::Write;
823        use tempfile::NamedTempFile;
824
825        let data_source = UniversalDataSource::new();
826
827        // Create a temporary file with a single FHIR resource
828        let patient_json = r#"{
829            "resourceType": "Patient",
830            "id": "test-patient",
831            "name": [{
832                "family": "Test",
833                "given": ["Patient"]
834            }]
835        }"#;
836
837        let mut temp_file = NamedTempFile::new().unwrap();
838        temp_file.write_all(patient_json.as_bytes()).unwrap();
839        temp_file.flush().unwrap();
840
841        let file_path = temp_file.path();
842        let file_url = format!("file://{}", file_path.to_string_lossy());
843
844        // Test loading single resource - should be wrapped in a Bundle
845        let result = data_source.load(&file_url).await;
846        assert!(result.is_ok());
847
848        #[cfg(feature = "R4")]
849        match result.unwrap() {
850            SofBundle::R4(bundle) => {
851                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
852            }
853            #[cfg(feature = "R4B")]
854            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
855            #[cfg(feature = "R5")]
856            SofBundle::R5(_) => panic!("Expected R4 bundle"),
857            #[cfg(feature = "R6")]
858            SofBundle::R6(_) => panic!("Expected R4 bundle"),
859        }
860    }
861
862    #[tokio::test]
863    async fn test_file_protocol_resource_array() {
864        use std::io::Write;
865        use tempfile::NamedTempFile;
866
867        let data_source = UniversalDataSource::new();
868
869        // Create a temporary file with an array of FHIR resources
870        let resources_json = r#"[
871            {
872                "resourceType": "Patient",
873                "id": "patient-1"
874            },
875            {
876                "resourceType": "Patient",
877                "id": "patient-2"
878            },
879            {
880                "resourceType": "Observation",
881                "id": "obs-1",
882                "status": "final",
883                "code": {
884                    "text": "Test"
885                }
886            }
887        ]"#;
888
889        let mut temp_file = NamedTempFile::new().unwrap();
890        temp_file.write_all(resources_json.as_bytes()).unwrap();
891        temp_file.flush().unwrap();
892
893        let file_path = temp_file.path();
894        let file_url = format!("file://{}", file_path.to_string_lossy());
895
896        // Test loading array of resources
897        let result = data_source.load(&file_url).await;
898        assert!(result.is_ok());
899
900        #[cfg(feature = "R4")]
901        match result.unwrap() {
902            SofBundle::R4(bundle) => {
903                assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
904            }
905            #[cfg(feature = "R4B")]
906            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
907            #[cfg(feature = "R5")]
908            SofBundle::R5(_) => panic!("Expected R4 bundle"),
909            #[cfg(feature = "R6")]
910            SofBundle::R6(_) => panic!("Expected R4 bundle"),
911        }
912    }
913
914    #[tokio::test]
915    async fn test_file_protocol_file_not_found() {
916        let data_source = UniversalDataSource::new();
917
918        // Test with non-existent file
919        let file_url = "file:///nonexistent/path/to/file.json";
920        let result = data_source.load(file_url).await;
921        assert!(result.is_err());
922
923        if let Err(SofError::SourceNotFound(msg)) = result {
924            assert!(msg.contains("File not found"));
925        } else {
926            panic!("Expected SourceNotFound error");
927        }
928    }
929
930    #[tokio::test]
931    async fn test_file_protocol_invalid_json() {
932        use std::io::Write;
933        use tempfile::NamedTempFile;
934
935        let data_source = UniversalDataSource::new();
936
937        // Create a temporary file with invalid JSON
938        let invalid_json = "{ this is not valid json }";
939
940        let mut temp_file = NamedTempFile::new().unwrap();
941        temp_file.write_all(invalid_json.as_bytes()).unwrap();
942        temp_file.flush().unwrap();
943
944        let file_path = temp_file.path();
945        let file_url = format!("file://{}", file_path.to_string_lossy());
946
947        // Test loading invalid JSON
948        let result = data_source.load(&file_url).await;
949        assert!(result.is_err());
950
951        if let Err(SofError::InvalidSourceContent(msg)) = result {
952            assert!(msg.contains("Failed to parse JSON"));
953        } else {
954            panic!("Expected InvalidSourceContent error");
955        }
956    }
957
958    #[tokio::test]
959    async fn test_file_protocol_invalid_fhir() {
960        use std::io::Write;
961        use tempfile::NamedTempFile;
962
963        let data_source = UniversalDataSource::new();
964
965        // Create a temporary file with valid JSON but not FHIR content
966        let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
967
968        let mut temp_file = NamedTempFile::new().unwrap();
969        temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
970        temp_file.flush().unwrap();
971
972        let file_path = temp_file.path();
973        let file_url = format!("file://{}", file_path.to_string_lossy());
974
975        // Test loading non-FHIR content
976        let result = data_source.load(&file_url).await;
977        assert!(result.is_err());
978
979        if let Err(SofError::InvalidSourceContent(msg)) = result {
980            assert!(msg.contains("not a valid FHIR resource"));
981        } else {
982            panic!("Expected InvalidSourceContent error, got {:?}", result);
983        }
984    }
985
986    #[tokio::test]
987    async fn test_file_protocol_invalid_url() {
988        let data_source = UniversalDataSource::new();
989
990        // Test with malformed file URL (Windows-style path without proper file:// format)
991        let result = data_source.load("file://C:\\invalid\\windows\\path").await;
992        assert!(result.is_err());
993        // The error type will depend on URL parsing behavior
994    }
995}