Skip to main content

helios_sof/
data_source.rs

1//! # FHIR Data Source Loading
2//!
3//! This module provides flexible data loading capabilities for FHIR resources from
4//! various sources including local files, HTTP endpoints, and cloud storage services.
5//! It handles automatic format detection and conversion to FHIR Bundles.
6//!
7//! ## Overview
8//!
9//! The data source system supports:
10//!
11//! - **Multiple Protocols**: file://, http(s)://, s3://, gs://, azure://
12//! - **Format Detection**: Automatic detection of JSON vs NDJSON formats
13//! - **Smart Wrapping**: Single resources and arrays automatically wrapped in Bundles
14//! - **Version Agnostic**: Works with R4, R4B, R5, and R6 FHIR versions
15//! - **Error Handling**: Comprehensive error reporting for invalid sources
16//!
17//! ## Supported Sources
18//!
19//! ### Local Files
20//! ```text
21//! file:///path/to/bundle.json
22//! file:///path/to/resource.ndjson
23//! ```
24//!
25//! ### HTTP/HTTPS
26//! ```text
27//! https://example.org/fhir/Bundle/123
28//! http://localhost:8080/Patient?_count=100
29//! ```
30//!
31//! ### Amazon S3
32//! ```text
33//! s3://my-bucket/path/to/bundle.json
34//! ```
35//! Requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
36//!
37//! For S3-compatible services (MinIO, Ceph, etc.), also set:
38//! - AWS_ENDPOINT_URL: Custom endpoint URL (e.g., `http://localhost:9000`)
39//! - AWS_ALLOW_HTTP: Set to `true` if the endpoint uses HTTP instead of HTTPS
40//!
41//! ### Google Cloud Storage
42//! ```text
43//! gs://my-bucket/path/to/data.ndjson
44//! ```
45//! Requires: GOOGLE_SERVICE_ACCOUNT or Application Default Credentials
46//!
47//! ### Azure Blob Storage
48//! ```text
49//! azure://container/path/to/bundle.json
50//! abfss://container@account.dfs.core.windows.net/path/to/data.json
51//! ```
52//! Requires: AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY
53//!
54//! ## Key Components
55//!
56//! - [`DataSource`]: Trait for loading FHIR data from various sources
57//! - [`UniversalDataSource`]: Universal implementation supporting all protocols
58//! - [`parse_fhir_content()`]: Parses FHIR content and wraps it in a Bundle
59//!
60//! ## Format Support
61//!
62//! ### JSON Format
63//! - Single FHIR resources (Patient, Observation, etc.)
64//! - FHIR Bundles
65//! - Arrays of FHIR resources
66//!
67//! ### NDJSON Format
68//! - Newline-delimited JSON (one resource per line)
69//! - Detected by `.ndjson` extension or content analysis
70//! - Partial failures tolerated (invalid lines logged as warnings)
71//!
72//! ## Examples
73//!
74//! ```rust
75//! use helios_sof::data_source::{DataSource, UniversalDataSource};
76//!
77//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78//! let source = UniversalDataSource::new();
79//!
80//! // Load from local file
81//! let bundle = source.load("file:///data/patients.json").await?;
82//!
83//! // Load from HTTP endpoint
84//! let bundle = source.load("https://hapi.fhir.org/baseR4/Patient?_count=10").await?;
85//!
86//! // Load from S3
87//! let bundle = source.load("s3://fhir-data/bundles/patients.json").await?;
88//!
89//! // Load NDJSON
90//! let bundle = source.load("file:///data/observations.ndjson").await?;
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ## Automatic Format Detection
96//!
97//! The module automatically:
98//! 1. Detects NDJSON by `.ndjson` file extension
99//! 2. Falls back to content-based detection for multi-line JSON files
100//! 3. Determines FHIR version by attempting to parse as each version
101//! 4. Wraps single resources or arrays in appropriate Bundle types
102//!
103//! ## Error Handling
104//!
105//! Provides detailed errors for:
106//! - Invalid URLs or protocols
107//! - Missing files or objects
108//! - Network failures
109//! - Malformed JSON
110//! - Invalid FHIR content
111//! - Missing credentials for cloud services
112
113use crate::{SofBundle, SofError};
114use async_trait::async_trait;
115use helios_fhir::Element;
116use object_store::{
117    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
118    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
119};
120use reqwest;
121use serde_json;
122use std::sync::Arc;
123use tokio::fs;
124use url::Url;
125
126/// Trait for loading FHIR data from various sources
127#[async_trait]
128pub trait DataSource: Send + Sync {
129    /// Load FHIR data from the source and return as a Bundle
130    async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
131}
132
133/// Implementation for loading data from various sources based on URL scheme
134pub struct UniversalDataSource {
135    client: reqwest::Client,
136}
137
138impl UniversalDataSource {
139    pub fn new() -> Self {
140        Self {
141            client: reqwest::Client::builder()
142                .timeout(std::time::Duration::from_secs(30))
143                .build()
144                .unwrap_or_else(|_| reqwest::Client::new()),
145        }
146    }
147}
148
149impl Default for UniversalDataSource {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155#[async_trait]
156impl DataSource for UniversalDataSource {
157    async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
158        // Parse the source as a URL to determine the protocol
159        let url = Url::parse(source).map_err(|e| {
160            SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
161        })?;
162
163        match url.scheme() {
164            "file" => load_from_file(&url).await,
165            "http" | "https" => load_from_http(&self.client, &url).await,
166            "s3" => load_from_s3(&url).await,
167            "gs" => load_from_gcs(&url).await,
168            "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
169            scheme => Err(SofError::UnsupportedSourceProtocol(format!(
170                "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
171                scheme
172            ))),
173        }
174    }
175}
176
177/// Load FHIR data from a local file
178async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
179    // Convert file URL to path
180    let path = url
181        .to_file_path()
182        .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
183
184    // Check if file exists
185    if !path.exists() {
186        return Err(SofError::SourceNotFound(format!(
187            "File not found: {}",
188            path.display()
189        )));
190    }
191
192    // Read file contents
193    let contents = fs::read_to_string(&path)
194        .await
195        .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
196
197    // Parse and convert to bundle
198    parse_fhir_content(&contents, &path.to_string_lossy())
199}
200
201/// Load FHIR data from HTTP/HTTPS URL
202async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
203    // Fetch content from URL
204    let response = client
205        .get(url.as_str())
206        .header("Accept", "application/fhir+json, application/json")
207        .send()
208        .await
209        .map_err(|e| {
210            SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
211        })?;
212
213    // Check response status
214    if !response.status().is_success() {
215        return Err(SofError::SourceFetchError(format!(
216            "HTTP error {} when fetching '{}'",
217            response.status(),
218            url
219        )));
220    }
221
222    // Get content
223    let contents = response
224        .text()
225        .await
226        .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
227
228    // Parse and convert to bundle
229    parse_fhir_content(&contents, url.as_str())
230}
231
232/// Load FHIR data from AWS S3
233async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
234    // Parse S3 URL: s3://bucket/path/to/object
235    let bucket = url.host_str().ok_or_else(|| {
236        SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
237    })?;
238
239    let path = url.path().trim_start_matches('/');
240    if path.is_empty() {
241        return Err(SofError::InvalidSource(format!(
242            "Invalid S3 URL '{}': missing object path",
243            url
244        )));
245    }
246
247    // Create S3 client using environment variables or default credentials
248    let mut builder = AmazonS3Builder::new().with_bucket_name(bucket);
249
250    // Support custom S3-compatible endpoints (e.g., MinIO, Ceph, LocalStack)
251    if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
252        builder = builder.with_endpoint(&endpoint);
253
254        // Allow HTTP connections for local development endpoints
255        if let Ok(allow_http) = std::env::var("AWS_ALLOW_HTTP") {
256            if allow_http.eq_ignore_ascii_case("true") || allow_http == "1" {
257                builder = builder.with_allow_http(true);
258            }
259        }
260    }
261
262    let store = builder.build().map_err(|e| {
263        SofError::SourceFetchError(format!(
264            "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION). For S3-compatible services, set AWS_ENDPOINT_URL.",
265            url, e
266        ))
267    })?;
268
269    load_from_object_store(Arc::new(store), path, url.as_str()).await
270}
271
272/// Load FHIR data from Google Cloud Storage
273async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
274    // Parse GCS URL: gs://bucket/path/to/object
275    let bucket = url.host_str().ok_or_else(|| {
276        SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
277    })?;
278
279    let path = url.path().trim_start_matches('/');
280    if path.is_empty() {
281        return Err(SofError::InvalidSource(format!(
282            "Invalid GCS URL '{}': missing object path",
283            url
284        )));
285    }
286
287    // Create GCS client using environment variables or default credentials
288    let store = GoogleCloudStorageBuilder::new()
289        .with_bucket_name(bucket)
290        .build()
291        .map_err(|e| {
292            SofError::SourceFetchError(format!(
293                "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
294                url, e
295            ))
296        })?;
297
298    load_from_object_store(Arc::new(store), path, url.as_str()).await
299}
300
301/// Load FHIR data from Azure Blob Storage
302async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
303    // Parse Azure URL: azure://container/path/to/object or abfss://container@account.dfs.core.windows.net/path
304    let (container, path) = if url.scheme() == "azure" {
305        // Simple format: azure://container/path
306        let container = url.host_str().ok_or_else(|| {
307            SofError::InvalidSource(format!(
308                "Invalid Azure URL '{}': missing container name",
309                url
310            ))
311        })?;
312        let path = url.path().trim_start_matches('/');
313        (container.to_string(), path.to_string())
314    } else {
315        // ABFSS format: abfss://container@account.dfs.core.windows.net/path
316        let host = url.host_str().ok_or_else(|| {
317            SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
318        })?;
319        let parts: Vec<&str> = host.split('@').collect();
320        if parts.len() != 2 {
321            return Err(SofError::InvalidSource(format!(
322                "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
323                url
324            )));
325        }
326        let container = parts[0];
327        let path = url.path().trim_start_matches('/');
328        (container.to_string(), path.to_string())
329    };
330
331    if path.is_empty() {
332        return Err(SofError::InvalidSource(format!(
333            "Invalid Azure URL '{}': missing blob path",
334            url
335        )));
336    }
337
338    // Create Azure client using environment variables or managed identity
339    let store = MicrosoftAzureBuilder::new()
340        .with_container_name(&container)
341        .build()
342        .map_err(|e| {
343            SofError::SourceFetchError(format!(
344                "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
345                url, e
346            ))
347        })?;
348
349    load_from_object_store(Arc::new(store), &path, url.as_str()).await
350}
351
352/// Common function to load from any object store
353async fn load_from_object_store(
354    store: Arc<dyn ObjectStore>,
355    path: &str,
356    source_name: &str,
357) -> Result<SofBundle, SofError> {
358    // Create object path
359    let object_path = ObjectPath::from(path);
360
361    // Download the object
362    let result = store.get(&object_path).await.map_err(|e| match e {
363        object_store::Error::NotFound { .. } => {
364            SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
365        }
366        _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
367    })?;
368
369    // Read the bytes
370    let bytes = result
371        .bytes()
372        .await
373        .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
374
375    // Convert to string
376    let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
377        SofError::InvalidSourceContent(format!(
378            "Content from '{}' is not valid UTF-8: {}",
379            source_name, e
380        ))
381    })?;
382
383    // Parse and convert to bundle
384    parse_fhir_content(&contents, source_name)
385}
386
387/// Check if a source name suggests NDJSON format based on file extension
388fn is_ndjson_extension(source_name: &str) -> bool {
389    source_name.to_lowercase().ends_with(".ndjson")
390}
391
392/// Parse NDJSON content (newline-delimited JSON) and convert to SofBundle
393fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
394    let lines: Vec<&str> = contents
395        .lines()
396        .filter(|line| !line.trim().is_empty())
397        .collect();
398
399    if lines.is_empty() {
400        return Err(SofError::InvalidSourceContent(format!(
401            "Empty NDJSON content from '{}'",
402            source_name
403        )));
404    }
405
406    // Parse each line as a separate JSON resource
407    let mut resources = Vec::new();
408    let mut parse_errors = Vec::new();
409
410    for (line_num, line) in lines.iter().enumerate() {
411        match serde_json::from_str::<serde_json::Value>(line) {
412            Ok(value) => {
413                // Verify it's a FHIR resource
414                if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
415                    resources.push(value);
416                } else {
417                    parse_errors.push(format!(
418                        "Line {}: Missing 'resourceType' field",
419                        line_num + 1
420                    ));
421                }
422            }
423            Err(e) => {
424                parse_errors.push(format!("Line {}: {}", line_num + 1, e));
425            }
426        }
427    }
428
429    // If we have some valid resources, proceed even if some lines failed
430    if resources.is_empty() {
431        return Err(SofError::InvalidSourceContent(format!(
432            "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
433            source_name,
434            parse_errors.join("; ")
435        )));
436    }
437
438    // Log warnings for failed lines (in production, you might want to use a proper logger)
439    if !parse_errors.is_empty() {
440        eprintln!(
441            "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
442            parse_errors.len(),
443            source_name,
444            parse_errors.join("; ")
445        );
446    }
447
448    // Wrap all resources in a Bundle
449    let resources_array = serde_json::Value::Array(resources);
450    wrap_resources_in_bundle(resources_array, source_name)
451}
452
453/// Parse FHIR content and convert to SofBundle
454/// Supports both JSON and NDJSON formats with automatic detection
455pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
456    // Check if the source suggests NDJSON format based on file extension
457    if is_ndjson_extension(source_name) {
458        return parse_ndjson_content(contents, source_name);
459    }
460
461    // Try to parse as regular JSON first
462    let value: serde_json::Value = match serde_json::from_str(contents) {
463        Ok(v) => v,
464        Err(json_err) => {
465            // JSON parsing failed, try NDJSON as fallback (content-based detection)
466            // This handles cases where .json files actually contain NDJSON content
467            if contents.lines().count() > 1 {
468                // Multiple lines suggest it might be NDJSON
469                return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
470                    // If both fail, return the original JSON error with a helpful message
471                    SofError::InvalidSourceContent(format!(
472                        "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
473                        source_name, json_err, ndjson_err
474                    ))
475                });
476            }
477
478            // Single line or regular JSON error
479            return Err(SofError::InvalidSourceContent(format!(
480                "Failed to parse JSON from '{}': {}",
481                source_name, json_err
482            )));
483        }
484    };
485
486    // Check if it's already a Bundle
487    if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
488        if resource_type == "Bundle" {
489            // Try parsing as each FHIR version
490            #[cfg(feature = "R4")]
491            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
492                return Ok(SofBundle::R4(bundle));
493            }
494            #[cfg(feature = "R4B")]
495            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
496                return Ok(SofBundle::R4B(bundle));
497            }
498            #[cfg(feature = "R5")]
499            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
500                return Ok(SofBundle::R5(bundle));
501            }
502            #[cfg(feature = "R6")]
503            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
504                return Ok(SofBundle::R6(bundle));
505            }
506            return Err(SofError::InvalidSourceContent(format!(
507                "Bundle from '{}' could not be parsed as any supported FHIR version",
508                source_name
509            )));
510        }
511
512        // It's a single resource - wrap it in a Bundle
513        return wrap_resource_in_bundle(value, source_name);
514    }
515
516    // Check if it's an array of resources
517    if value.is_array() {
518        return wrap_resources_in_bundle(value, source_name);
519    }
520
521    Err(SofError::InvalidSourceContent(format!(
522        "Content from '{}' is not a valid FHIR resource or Bundle",
523        source_name
524    )))
525}
526
527/// Wrap a single resource in a Bundle
528fn wrap_resource_in_bundle(
529    resource: serde_json::Value,
530    source_name: &str,
531) -> Result<SofBundle, SofError> {
532    // Try each FHIR version
533    // R4
534    #[cfg(feature = "R4")]
535    if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
536        let mut bundle = helios_fhir::r4::Bundle::default();
537        bundle.r#type = Element {
538            id: None,
539            extension: None,
540            value: Some("collection".to_string()),
541        };
542        bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
543            resource: Some(res),
544            ..Default::default()
545        }]);
546        return Ok(SofBundle::R4(bundle));
547    }
548
549    // R4B
550    #[cfg(feature = "R4B")]
551    if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
552        let mut bundle = helios_fhir::r4b::Bundle::default();
553        bundle.r#type = Element {
554            id: None,
555            extension: None,
556            value: Some("collection".to_string()),
557        };
558        bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
559            resource: Some(res),
560            ..Default::default()
561        }]);
562        return Ok(SofBundle::R4B(bundle));
563    }
564
565    // R5
566    #[cfg(feature = "R5")]
567    if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
568        let mut bundle = helios_fhir::r5::Bundle::default();
569        bundle.r#type = Element {
570            id: None,
571            extension: None,
572            value: Some("collection".to_string()),
573        };
574        bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
575            resource: Some(Box::new(res)),
576            ..Default::default()
577        }]);
578        return Ok(SofBundle::R5(bundle));
579    }
580
581    // R6
582    #[cfg(feature = "R6")]
583    if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
584        let mut bundle = helios_fhir::r6::Bundle::default();
585        bundle.r#type = Element {
586            id: None,
587            extension: None,
588            value: Some("collection".to_string()),
589        };
590        bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
591            resource: Some(Box::new(res)),
592            ..Default::default()
593        }]);
594        return Ok(SofBundle::R6(bundle));
595    }
596
597    Err(SofError::InvalidSourceContent(format!(
598        "Resource from '{}' could not be parsed as any supported FHIR version",
599        source_name
600    )))
601}
602
603/// Wrap an array of resources in a Bundle
604fn wrap_resources_in_bundle(
605    resources: serde_json::Value,
606    source_name: &str,
607) -> Result<SofBundle, SofError> {
608    let arr = resources
609        .as_array()
610        .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
611
612    if arr.is_empty() {
613        return Err(SofError::InvalidSourceContent(format!(
614            "Empty array of resources from '{}'",
615            source_name
616        )));
617    }
618
619    // Try to parse the first resource to determine version
620    let first = &arr[0];
621
622    // Try R4
623    #[cfg(feature = "R4")]
624    if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
625        let mut bundle = helios_fhir::r4::Bundle::default();
626        bundle.r#type = Element {
627            id: None,
628            extension: None,
629            value: Some("collection".to_string()),
630        };
631        let mut entries = Vec::new();
632
633        for resource in arr {
634            let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
635                .map_err(|e| {
636                    SofError::InvalidSourceContent(format!(
637                        "Failed to parse R4 resource from '{}': {}",
638                        source_name, e
639                    ))
640                })?;
641            entries.push(helios_fhir::r4::BundleEntry {
642                resource: Some(res),
643                ..Default::default()
644            });
645        }
646
647        bundle.entry = Some(entries);
648        return Ok(SofBundle::R4(bundle));
649    }
650
651    // Try R4B
652    #[cfg(feature = "R4B")]
653    if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
654        let mut bundle = helios_fhir::r4b::Bundle::default();
655        bundle.r#type = Element {
656            id: None,
657            extension: None,
658            value: Some("collection".to_string()),
659        };
660        let mut entries = Vec::new();
661
662        for resource in arr {
663            let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
664                .map_err(|e| {
665                    SofError::InvalidSourceContent(format!(
666                        "Failed to parse R4B resource from '{}': {}",
667                        source_name, e
668                    ))
669                })?;
670            entries.push(helios_fhir::r4b::BundleEntry {
671                resource: Some(res),
672                ..Default::default()
673            });
674        }
675
676        bundle.entry = Some(entries);
677        return Ok(SofBundle::R4B(bundle));
678    }
679
680    // Try R5
681    #[cfg(feature = "R5")]
682    if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
683        let mut bundle = helios_fhir::r5::Bundle::default();
684        bundle.r#type = Element {
685            id: None,
686            extension: None,
687            value: Some("collection".to_string()),
688        };
689        let mut entries = Vec::new();
690
691        for resource in arr {
692            let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
693                .map_err(|e| {
694                    SofError::InvalidSourceContent(format!(
695                        "Failed to parse R5 resource from '{}': {}",
696                        source_name, e
697                    ))
698                })?;
699            entries.push(helios_fhir::r5::BundleEntry {
700                resource: Some(Box::new(res)),
701                ..Default::default()
702            });
703        }
704
705        bundle.entry = Some(entries);
706        return Ok(SofBundle::R5(bundle));
707    }
708
709    // Try R6
710    #[cfg(feature = "R6")]
711    if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
712        let mut bundle = helios_fhir::r6::Bundle::default();
713        bundle.r#type = Element {
714            id: None,
715            extension: None,
716            value: Some("collection".to_string()),
717        };
718        let mut entries = Vec::new();
719
720        for resource in arr {
721            let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
722                .map_err(|e| {
723                    SofError::InvalidSourceContent(format!(
724                        "Failed to parse R6 resource from '{}': {}",
725                        source_name, e
726                    ))
727                })?;
728            entries.push(helios_fhir::r6::BundleEntry {
729                resource: Some(Box::new(res)),
730                ..Default::default()
731            });
732        }
733
734        bundle.entry = Some(entries);
735        return Ok(SofBundle::R6(bundle));
736    }
737
738    Err(SofError::InvalidSourceContent(format!(
739        "Resources from '{}' could not be parsed as any supported FHIR version",
740        source_name
741    )))
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747
748    #[tokio::test]
749    async fn test_parse_fhir_bundle() {
750        let bundle_json = r#"{
751            "resourceType": "Bundle",
752            "type": "collection",
753            "entry": [{
754                "resource": {
755                    "resourceType": "Patient",
756                    "id": "123"
757                }
758            }]
759        }"#;
760
761        let result = parse_fhir_content(bundle_json, "test").unwrap();
762        #[cfg(feature = "R4")]
763        assert!(matches!(result, SofBundle::R4(_)));
764        #[cfg(not(feature = "R4"))]
765        assert!(matches!(result, _));
766    }
767
768    #[tokio::test]
769    async fn test_parse_single_resource() {
770        let patient_json = r#"{
771            "resourceType": "Patient",
772            "id": "123"
773        }"#;
774
775        let result = parse_fhir_content(patient_json, "test").unwrap();
776        #[cfg(feature = "R4")]
777        match result {
778            SofBundle::R4(bundle) => {
779                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
780            }
781            #[cfg(feature = "R4B")]
782            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
783            #[cfg(feature = "R5")]
784            SofBundle::R5(_) => panic!("Expected R4 bundle"),
785            #[cfg(feature = "R6")]
786            SofBundle::R6(_) => panic!("Expected R4 bundle"),
787        }
788    }
789
790    #[tokio::test]
791    async fn test_parse_resource_array() {
792        let resources_json = r#"[
793            {
794                "resourceType": "Patient",
795                "id": "123"
796            },
797            {
798                "resourceType": "Patient",
799                "id": "456"
800            }
801        ]"#;
802
803        let result = parse_fhir_content(resources_json, "test").unwrap();
804        #[cfg(feature = "R4")]
805        match result {
806            SofBundle::R4(bundle) => {
807                assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
808            }
809            #[cfg(feature = "R4B")]
810            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
811            #[cfg(feature = "R5")]
812            SofBundle::R5(_) => panic!("Expected R4 bundle"),
813            #[cfg(feature = "R6")]
814            SofBundle::R6(_) => panic!("Expected R4 bundle"),
815        }
816    }
817
818    #[tokio::test]
819    async fn test_invalid_content() {
820        let invalid_json = r#"{"not": "fhir"}"#;
821        let result = parse_fhir_content(invalid_json, "test");
822        assert!(result.is_err());
823    }
824
825    #[tokio::test]
826    async fn test_s3_url_parsing() {
827        let data_source = UniversalDataSource::new();
828
829        // Test invalid S3 URL without bucket
830        let result = data_source.load("s3:///path/to/file.json").await;
831        assert!(result.is_err());
832        if let Err(SofError::InvalidSource(msg)) = result {
833            assert!(msg.contains("missing bucket name"));
834        }
835
836        // Test invalid S3 URL without path
837        let result = data_source.load("s3://bucket/").await;
838        assert!(result.is_err());
839        if let Err(SofError::InvalidSource(msg)) = result {
840            assert!(msg.contains("missing object path"));
841        }
842
843        // Note: Actual S3 fetching would require valid credentials and a real bucket
844        // These tests verify URL parsing and error handling
845    }
846
847    #[tokio::test]
848    async fn test_gcs_url_parsing() {
849        let data_source = UniversalDataSource::new();
850
851        // Test invalid GCS URL without bucket
852        let result = data_source.load("gs:///path/to/file.json").await;
853        assert!(result.is_err());
854        if let Err(SofError::InvalidSource(msg)) = result {
855            assert!(msg.contains("missing bucket name"));
856        }
857
858        // Test invalid GCS URL without path
859        let result = data_source.load("gs://bucket/").await;
860        assert!(result.is_err());
861        if let Err(SofError::InvalidSource(msg)) = result {
862            assert!(msg.contains("missing object path"));
863        }
864    }
865
866    #[tokio::test]
867    async fn test_azure_url_parsing() {
868        let data_source = UniversalDataSource::new();
869
870        // Test invalid Azure URL without container
871        let result = data_source.load("azure:///path/to/file.json").await;
872        assert!(result.is_err());
873        if let Err(SofError::InvalidSource(msg)) = result {
874            assert!(msg.contains("missing container name"));
875        }
876
877        // Test invalid Azure URL without path
878        let result = data_source.load("azure://container/").await;
879        assert!(result.is_err());
880        if let Err(SofError::InvalidSource(msg)) = result {
881            assert!(msg.contains("missing blob path"));
882        }
883    }
884
885    #[tokio::test]
886    async fn test_unsupported_protocol() {
887        let data_source = UniversalDataSource::new();
888
889        // Test unsupported protocol
890        let result = data_source.load("ftp://server/file.json").await;
891        assert!(result.is_err());
892        if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
893            assert!(msg.contains("Unsupported source protocol: ftp"));
894            assert!(msg.contains("Supported:"));
895        }
896    }
897
898    #[tokio::test]
899    async fn test_file_protocol_bundle() {
900        use std::io::Write;
901        use tempfile::NamedTempFile;
902
903        let data_source = UniversalDataSource::new();
904
905        // Create a temporary file with a FHIR Bundle
906        let bundle_json = r#"{
907            "resourceType": "Bundle",
908            "type": "collection",
909            "entry": [{
910                "resource": {
911                    "resourceType": "Patient",
912                    "id": "test-patient"
913                }
914            }]
915        }"#;
916
917        let mut temp_file = NamedTempFile::new().unwrap();
918        temp_file.write_all(bundle_json.as_bytes()).unwrap();
919        temp_file.flush().unwrap();
920
921        // Get the file path and convert to file:// URL
922        let file_path = temp_file.path();
923        let file_url = format!("file://{}", file_path.to_string_lossy());
924
925        // Test loading from file:// URL
926        let result = data_source.load(&file_url).await;
927        assert!(result.is_ok());
928
929        #[cfg(feature = "R4")]
930        match result.unwrap() {
931            SofBundle::R4(bundle) => {
932                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
933            }
934            #[cfg(feature = "R4B")]
935            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
936            #[cfg(feature = "R5")]
937            SofBundle::R5(_) => panic!("Expected R4 bundle"),
938            #[cfg(feature = "R6")]
939            SofBundle::R6(_) => panic!("Expected R4 bundle"),
940        }
941    }
942
943    #[tokio::test]
944    async fn test_file_protocol_single_resource() {
945        use std::io::Write;
946        use tempfile::NamedTempFile;
947
948        let data_source = UniversalDataSource::new();
949
950        // Create a temporary file with a single FHIR resource
951        let patient_json = r#"{
952            "resourceType": "Patient",
953            "id": "test-patient",
954            "name": [{
955                "family": "Test",
956                "given": ["Patient"]
957            }]
958        }"#;
959
960        let mut temp_file = NamedTempFile::new().unwrap();
961        temp_file.write_all(patient_json.as_bytes()).unwrap();
962        temp_file.flush().unwrap();
963
964        let file_path = temp_file.path();
965        let file_url = format!("file://{}", file_path.to_string_lossy());
966
967        // Test loading single resource - should be wrapped in a Bundle
968        let result = data_source.load(&file_url).await;
969        assert!(result.is_ok());
970
971        #[cfg(feature = "R4")]
972        match result.unwrap() {
973            SofBundle::R4(bundle) => {
974                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
975            }
976            #[cfg(feature = "R4B")]
977            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
978            #[cfg(feature = "R5")]
979            SofBundle::R5(_) => panic!("Expected R4 bundle"),
980            #[cfg(feature = "R6")]
981            SofBundle::R6(_) => panic!("Expected R4 bundle"),
982        }
983    }
984
985    #[tokio::test]
986    async fn test_file_protocol_resource_array() {
987        use std::io::Write;
988        use tempfile::NamedTempFile;
989
990        let data_source = UniversalDataSource::new();
991
992        // Create a temporary file with an array of FHIR resources
993        let resources_json = r#"[
994            {
995                "resourceType": "Patient",
996                "id": "patient-1"
997            },
998            {
999                "resourceType": "Patient",
1000                "id": "patient-2"
1001            },
1002            {
1003                "resourceType": "Observation",
1004                "id": "obs-1",
1005                "status": "final",
1006                "code": {
1007                    "text": "Test"
1008                }
1009            }
1010        ]"#;
1011
1012        let mut temp_file = NamedTempFile::new().unwrap();
1013        temp_file.write_all(resources_json.as_bytes()).unwrap();
1014        temp_file.flush().unwrap();
1015
1016        let file_path = temp_file.path();
1017        let file_url = format!("file://{}", file_path.to_string_lossy());
1018
1019        // Test loading array of resources
1020        let result = data_source.load(&file_url).await;
1021        assert!(result.is_ok());
1022
1023        #[cfg(feature = "R4")]
1024        match result.unwrap() {
1025            SofBundle::R4(bundle) => {
1026                assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1027            }
1028            #[cfg(feature = "R4B")]
1029            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1030            #[cfg(feature = "R5")]
1031            SofBundle::R5(_) => panic!("Expected R4 bundle"),
1032            #[cfg(feature = "R6")]
1033            SofBundle::R6(_) => panic!("Expected R4 bundle"),
1034        }
1035    }
1036
1037    #[tokio::test]
1038    async fn test_file_protocol_file_not_found() {
1039        use std::path::PathBuf;
1040        use url::Url;
1041
1042        let data_source = UniversalDataSource::new();
1043
1044        // Test with non-existent file using platform-appropriate path
1045        #[cfg(windows)]
1046        let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1047        #[cfg(not(windows))]
1048        let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1049
1050        let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1051
1052        let result = data_source.load(&file_url).await;
1053        assert!(result.is_err());
1054
1055        if let Err(SofError::SourceNotFound(msg)) = result {
1056            assert!(msg.contains("File not found"));
1057        } else {
1058            panic!("Expected SourceNotFound error");
1059        }
1060    }
1061
1062    #[tokio::test]
1063    async fn test_file_protocol_invalid_json() {
1064        use std::io::Write;
1065        use tempfile::NamedTempFile;
1066
1067        let data_source = UniversalDataSource::new();
1068
1069        // Create a temporary file with invalid JSON
1070        let invalid_json = "{ this is not valid json }";
1071
1072        let mut temp_file = NamedTempFile::new().unwrap();
1073        temp_file.write_all(invalid_json.as_bytes()).unwrap();
1074        temp_file.flush().unwrap();
1075
1076        let file_path = temp_file.path();
1077        let file_url = format!("file://{}", file_path.to_string_lossy());
1078
1079        // Test loading invalid JSON
1080        let result = data_source.load(&file_url).await;
1081        assert!(result.is_err());
1082
1083        if let Err(SofError::InvalidSourceContent(msg)) = result {
1084            assert!(msg.contains("Failed to parse JSON"));
1085        } else {
1086            panic!("Expected InvalidSourceContent error");
1087        }
1088    }
1089
1090    #[tokio::test]
1091    async fn test_file_protocol_invalid_fhir() {
1092        use std::io::Write;
1093        use tempfile::NamedTempFile;
1094
1095        let data_source = UniversalDataSource::new();
1096
1097        // Create a temporary file with valid JSON but not FHIR content
1098        let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1099
1100        let mut temp_file = NamedTempFile::new().unwrap();
1101        temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1102        temp_file.flush().unwrap();
1103
1104        let file_path = temp_file.path();
1105        let file_url = format!("file://{}", file_path.to_string_lossy());
1106
1107        // Test loading non-FHIR content
1108        let result = data_source.load(&file_url).await;
1109        assert!(result.is_err());
1110
1111        if let Err(SofError::InvalidSourceContent(msg)) = result {
1112            assert!(msg.contains("not a valid FHIR resource"));
1113        } else {
1114            panic!("Expected InvalidSourceContent error, got {:?}", result);
1115        }
1116    }
1117
1118    #[tokio::test]
1119    async fn test_file_protocol_invalid_url() {
1120        let data_source = UniversalDataSource::new();
1121
1122        // Test with malformed file URL (Windows-style path without proper file:// format)
1123        let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1124        assert!(result.is_err());
1125        // The error type will depend on URL parsing behavior
1126    }
1127}