Skip to main content

helios_sof/
data_source.rs

1//! # FHIR Data Source Loading
2//!
3//! This module provides flexible data loading capabilities for FHIR resources from
4//! various sources including local files, HTTP endpoints, and cloud storage services.
5//! It handles automatic format detection and conversion to FHIR Bundles.
6//!
7//! ## Overview
8//!
9//! The data source system supports:
10//!
11//! - **Multiple Protocols**: file://, http(s)://, s3://, gs://, azure://
12//! - **Format Detection**: Automatic detection of JSON vs NDJSON formats
13//! - **Smart Wrapping**: Single resources and arrays automatically wrapped in Bundles
14//! - **Version Agnostic**: Works with R4, R4B, R5, and R6 FHIR versions
15//! - **Error Handling**: Comprehensive error reporting for invalid sources
16//!
17//! ## Supported Sources
18//!
19//! ### Local Files
20//! ```text
21//! file:///path/to/bundle.json
22//! file:///path/to/resource.ndjson
23//! ```
24//!
25//! ### HTTP/HTTPS
26//! ```text
27//! https://example.org/fhir/Bundle/123
28//! http://localhost:8080/Patient?_count=100
29//! ```
30//!
31//! ### Amazon S3
32//! ```text
33//! s3://my-bucket/path/to/bundle.json
34//! ```
35//! Requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
36//!
37//! For S3-compatible services (MinIO, Ceph, etc.), also set:
38//! - AWS_ENDPOINT: Custom endpoint URL (e.g., `http://localhost:9000`)
39//! - AWS_ALLOW_HTTP: Set to `true` if the endpoint uses HTTP instead of HTTPS
40//!
41//! ### Google Cloud Storage
42//! ```text
43//! gs://my-bucket/path/to/data.ndjson
44//! ```
45//! Requires: GOOGLE_SERVICE_ACCOUNT or Application Default Credentials
46//!
47//! ### Azure Blob Storage
48//! ```text
49//! azure://container/path/to/bundle.json
50//! abfss://container@account.dfs.core.windows.net/path/to/data.json
51//! ```
52//! Requires: AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY
53//!
54//! ## Key Components
55//!
56//! - [`DataSource`]: Trait for loading FHIR data from various sources
57//! - [`UniversalDataSource`]: Universal implementation supporting all protocols
58//! - [`parse_fhir_content()`]: Parses FHIR content and wraps it in a Bundle
59//!
60//! ## Format Support
61//!
62//! ### JSON Format
63//! - Single FHIR resources (Patient, Observation, etc.)
64//! - FHIR Bundles
65//! - Arrays of FHIR resources
66//!
67//! ### NDJSON Format
68//! - Newline-delimited JSON (one resource per line)
69//! - Detected by `.ndjson` extension or content analysis
70//! - Partial failures tolerated (invalid lines logged as warnings)
71//!
72//! ## Examples
73//!
74//! ```rust
75//! use helios_sof::data_source::{DataSource, UniversalDataSource};
76//!
77//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78//! let source = UniversalDataSource::new();
79//!
80//! // Load from local file
81//! let bundle = source.load("file:///data/patients.json").await?;
82//!
83//! // Load from HTTP endpoint
84//! let bundle = source.load("https://hapi.fhir.org/baseR4/Patient?_count=10").await?;
85//!
86//! // Load from S3
87//! let bundle = source.load("s3://fhir-data/bundles/patients.json").await?;
88//!
89//! // Load NDJSON
90//! let bundle = source.load("file:///data/observations.ndjson").await?;
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ## Automatic Format Detection
96//!
97//! The module automatically:
98//! 1. Detects NDJSON by `.ndjson` file extension
99//! 2. Falls back to content-based detection for multi-line JSON files
100//! 3. Determines FHIR version by attempting to parse as each version
101//! 4. Wraps single resources or arrays in appropriate Bundle types
102//!
103//! ## Error Handling
104//!
105//! Provides detailed errors for:
106//! - Invalid URLs or protocols
107//! - Missing files or objects
108//! - Network failures
109//! - Malformed JSON
110//! - Invalid FHIR content
111//! - Missing credentials for cloud services
112
113use crate::{SofBundle, SofError};
114use async_trait::async_trait;
115use helios_fhir::Element;
116use object_store::{
117    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
118    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
119};
120use reqwest;
121use serde_json;
122use std::sync::Arc;
123use tokio::fs;
124use url::Url;
125
126/// Trait for loading FHIR data from various sources
127#[async_trait]
128pub trait DataSource: Send + Sync {
129    /// Load FHIR data from the source and return as a Bundle
130    async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
131}
132
133/// Implementation for loading data from various sources based on URL scheme
134pub struct UniversalDataSource {
135    client: reqwest::Client,
136}
137
138impl UniversalDataSource {
139    pub fn new() -> Self {
140        Self {
141            client: reqwest::Client::builder()
142                .timeout(std::time::Duration::from_secs(30))
143                .build()
144                .unwrap_or_else(|_| reqwest::Client::new()),
145        }
146    }
147}
148
149impl Default for UniversalDataSource {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155#[async_trait]
156impl DataSource for UniversalDataSource {
157    async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
158        // Parse the source as a URL to determine the protocol
159        let url = Url::parse(source).map_err(|e| {
160            SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
161        })?;
162
163        match url.scheme() {
164            "file" => load_from_file(&url).await,
165            "http" | "https" => load_from_http(&self.client, &url).await,
166            "s3" => load_from_s3(&url).await,
167            "gs" => load_from_gcs(&url).await,
168            "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
169            scheme => Err(SofError::UnsupportedSourceProtocol(format!(
170                "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
171                scheme
172            ))),
173        }
174    }
175}
176
177/// Load FHIR data from a local file
178async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
179    // Convert file URL to path
180    let path = url
181        .to_file_path()
182        .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
183
184    // Check if file exists
185    if !path.exists() {
186        return Err(SofError::SourceNotFound(format!(
187            "File not found: {}",
188            path.display()
189        )));
190    }
191
192    // Read file contents
193    let contents = fs::read_to_string(&path)
194        .await
195        .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
196
197    // Parse and convert to bundle
198    parse_fhir_content(&contents, &path.to_string_lossy())
199}
200
201/// Load FHIR data from HTTP/HTTPS URL
202async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
203    // Fetch content from URL
204    let response = client
205        .get(url.as_str())
206        .header("Accept", "application/fhir+json, application/json")
207        .send()
208        .await
209        .map_err(|e| {
210            SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
211        })?;
212
213    // Check response status
214    if !response.status().is_success() {
215        return Err(SofError::SourceFetchError(format!(
216            "HTTP error {} when fetching '{}'",
217            response.status(),
218            url
219        )));
220    }
221
222    // Get content
223    let contents = response
224        .text()
225        .await
226        .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
227
228    // Parse and convert to bundle
229    parse_fhir_content(&contents, url.as_str())
230}
231
232/// Load FHIR data from AWS S3
233async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
234    // Parse S3 URL: s3://bucket/path/to/object
235    let bucket = url.host_str().ok_or_else(|| {
236        SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
237    })?;
238
239    let path = url.path().trim_start_matches('/');
240    if path.is_empty() {
241        return Err(SofError::InvalidSource(format!(
242            "Invalid S3 URL '{}': missing object path",
243            url
244        )));
245    }
246
247    // Create S3 client using environment variables or default credentials.
248    // AmazonS3Builder::from_env() automatically reads AWS_ACCESS_KEY_ID,
249    // AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION, AWS_ENDPOINT, and AWS_ALLOW_HTTP.
250    let builder = AmazonS3Builder::from_env().with_bucket_name(bucket);
251
252    let store = builder.build().map_err(|e| {
253        SofError::SourceFetchError(format!(
254            "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION). For S3-compatible services, set AWS_ENDPOINT (and AWS_ALLOW_HTTP=true for HTTP endpoints).",
255            url, e
256        ))
257    })?;
258
259    load_from_object_store(Arc::new(store), path, url.as_str()).await
260}
261
262/// Load FHIR data from Google Cloud Storage
263async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
264    // Parse GCS URL: gs://bucket/path/to/object
265    let bucket = url.host_str().ok_or_else(|| {
266        SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
267    })?;
268
269    let path = url.path().trim_start_matches('/');
270    if path.is_empty() {
271        return Err(SofError::InvalidSource(format!(
272            "Invalid GCS URL '{}': missing object path",
273            url
274        )));
275    }
276
277    // Create GCS client using environment variables or default credentials
278    let store = GoogleCloudStorageBuilder::new()
279        .with_bucket_name(bucket)
280        .build()
281        .map_err(|e| {
282            SofError::SourceFetchError(format!(
283                "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
284                url, e
285            ))
286        })?;
287
288    load_from_object_store(Arc::new(store), path, url.as_str()).await
289}
290
291/// Load FHIR data from Azure Blob Storage
292async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
293    // Parse Azure URL: azure://container/path/to/object or abfss://container@account.dfs.core.windows.net/path
294    let (container, path) = if url.scheme() == "azure" {
295        // Simple format: azure://container/path
296        let container = url.host_str().ok_or_else(|| {
297            SofError::InvalidSource(format!(
298                "Invalid Azure URL '{}': missing container name",
299                url
300            ))
301        })?;
302        let path = url.path().trim_start_matches('/');
303        (container.to_string(), path.to_string())
304    } else {
305        // ABFSS format: abfss://container@account.dfs.core.windows.net/path
306        let host = url.host_str().ok_or_else(|| {
307            SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
308        })?;
309        let parts: Vec<&str> = host.split('@').collect();
310        if parts.len() != 2 {
311            return Err(SofError::InvalidSource(format!(
312                "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
313                url
314            )));
315        }
316        let container = parts[0];
317        let path = url.path().trim_start_matches('/');
318        (container.to_string(), path.to_string())
319    };
320
321    if path.is_empty() {
322        return Err(SofError::InvalidSource(format!(
323            "Invalid Azure URL '{}': missing blob path",
324            url
325        )));
326    }
327
328    // Create Azure client using environment variables or managed identity
329    let store = MicrosoftAzureBuilder::new()
330        .with_container_name(&container)
331        .build()
332        .map_err(|e| {
333            SofError::SourceFetchError(format!(
334                "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
335                url, e
336            ))
337        })?;
338
339    load_from_object_store(Arc::new(store), &path, url.as_str()).await
340}
341
342/// Common function to load from any object store
343async fn load_from_object_store(
344    store: Arc<dyn ObjectStore>,
345    path: &str,
346    source_name: &str,
347) -> Result<SofBundle, SofError> {
348    // Create object path
349    let object_path = ObjectPath::from(path);
350
351    // Download the object
352    let result = store.get(&object_path).await.map_err(|e| match e {
353        object_store::Error::NotFound { .. } => {
354            SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
355        }
356        _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
357    })?;
358
359    // Read the bytes
360    let bytes = result
361        .bytes()
362        .await
363        .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
364
365    // Convert to string
366    let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
367        SofError::InvalidSourceContent(format!(
368            "Content from '{}' is not valid UTF-8: {}",
369            source_name, e
370        ))
371    })?;
372
373    // Parse and convert to bundle
374    parse_fhir_content(&contents, source_name)
375}
376
377/// Check if a source name suggests NDJSON format based on file extension
378fn is_ndjson_extension(source_name: &str) -> bool {
379    source_name.to_lowercase().ends_with(".ndjson")
380}
381
382/// Parse NDJSON content (newline-delimited JSON) and convert to SofBundle
383fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
384    let lines: Vec<&str> = contents
385        .lines()
386        .filter(|line| !line.trim().is_empty())
387        .collect();
388
389    if lines.is_empty() {
390        return Err(SofError::InvalidSourceContent(format!(
391            "Empty NDJSON content from '{}'",
392            source_name
393        )));
394    }
395
396    // Parse each line as a separate JSON resource
397    let mut resources = Vec::new();
398    let mut parse_errors = Vec::new();
399
400    for (line_num, line) in lines.iter().enumerate() {
401        match serde_json::from_str::<serde_json::Value>(line) {
402            Ok(value) => {
403                // Verify it's a FHIR resource
404                if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
405                    resources.push(value);
406                } else {
407                    parse_errors.push(format!(
408                        "Line {}: Missing 'resourceType' field",
409                        line_num + 1
410                    ));
411                }
412            }
413            Err(e) => {
414                parse_errors.push(format!("Line {}: {}", line_num + 1, e));
415            }
416        }
417    }
418
419    // If we have some valid resources, proceed even if some lines failed
420    if resources.is_empty() {
421        return Err(SofError::InvalidSourceContent(format!(
422            "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
423            source_name,
424            parse_errors.join("; ")
425        )));
426    }
427
428    // Log warnings for failed lines (in production, you might want to use a proper logger)
429    if !parse_errors.is_empty() {
430        eprintln!(
431            "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
432            parse_errors.len(),
433            source_name,
434            parse_errors.join("; ")
435        );
436    }
437
438    // Wrap all resources in a Bundle
439    let resources_array = serde_json::Value::Array(resources);
440    wrap_resources_in_bundle(resources_array, source_name)
441}
442
443/// Parse FHIR content and convert to SofBundle
444/// Supports both JSON and NDJSON formats with automatic detection
445pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
446    // Check if the source suggests NDJSON format based on file extension
447    if is_ndjson_extension(source_name) {
448        return parse_ndjson_content(contents, source_name);
449    }
450
451    // Try to parse as regular JSON first
452    let value: serde_json::Value = match serde_json::from_str(contents) {
453        Ok(v) => v,
454        Err(json_err) => {
455            // JSON parsing failed, try NDJSON as fallback (content-based detection)
456            // This handles cases where .json files actually contain NDJSON content
457            if contents.lines().count() > 1 {
458                // Multiple lines suggest it might be NDJSON
459                return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
460                    // If both fail, return the original JSON error with a helpful message
461                    SofError::InvalidSourceContent(format!(
462                        "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
463                        source_name, json_err, ndjson_err
464                    ))
465                });
466            }
467
468            // Single line or regular JSON error
469            return Err(SofError::InvalidSourceContent(format!(
470                "Failed to parse JSON from '{}': {}",
471                source_name, json_err
472            )));
473        }
474    };
475
476    // Check if it's already a Bundle
477    if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
478        if resource_type == "Bundle" {
479            // Try parsing as each FHIR version
480            #[cfg(feature = "R4")]
481            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
482                return Ok(SofBundle::R4(bundle));
483            }
484            #[cfg(feature = "R4B")]
485            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
486                return Ok(SofBundle::R4B(bundle));
487            }
488            #[cfg(feature = "R5")]
489            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
490                return Ok(SofBundle::R5(bundle));
491            }
492            #[cfg(feature = "R6")]
493            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
494                return Ok(SofBundle::R6(bundle));
495            }
496            return Err(SofError::InvalidSourceContent(format!(
497                "Bundle from '{}' could not be parsed as any supported FHIR version",
498                source_name
499            )));
500        }
501
502        // It's a single resource - wrap it in a Bundle
503        return wrap_resource_in_bundle(value, source_name);
504    }
505
506    // Check if it's an array of resources
507    if value.is_array() {
508        return wrap_resources_in_bundle(value, source_name);
509    }
510
511    Err(SofError::InvalidSourceContent(format!(
512        "Content from '{}' is not a valid FHIR resource or Bundle",
513        source_name
514    )))
515}
516
517/// Wrap a single resource in a Bundle
518fn wrap_resource_in_bundle(
519    resource: serde_json::Value,
520    source_name: &str,
521) -> Result<SofBundle, SofError> {
522    // Try each FHIR version
523    // R4
524    #[cfg(feature = "R4")]
525    if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
526        let mut bundle = helios_fhir::r4::Bundle::default();
527        bundle.r#type = Element {
528            id: None,
529            extension: None,
530            value: Some("collection".to_string()),
531        };
532        bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
533            resource: Some(res),
534            ..Default::default()
535        }]);
536        return Ok(SofBundle::R4(bundle));
537    }
538
539    // R4B
540    #[cfg(feature = "R4B")]
541    if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
542        let mut bundle = helios_fhir::r4b::Bundle::default();
543        bundle.r#type = Element {
544            id: None,
545            extension: None,
546            value: Some("collection".to_string()),
547        };
548        bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
549            resource: Some(res),
550            ..Default::default()
551        }]);
552        return Ok(SofBundle::R4B(bundle));
553    }
554
555    // R5
556    #[cfg(feature = "R5")]
557    if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
558        let mut bundle = helios_fhir::r5::Bundle::default();
559        bundle.r#type = Element {
560            id: None,
561            extension: None,
562            value: Some("collection".to_string()),
563        };
564        bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
565            resource: Some(Box::new(res)),
566            ..Default::default()
567        }]);
568        return Ok(SofBundle::R5(bundle));
569    }
570
571    // R6
572    #[cfg(feature = "R6")]
573    if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
574        let mut bundle = helios_fhir::r6::Bundle::default();
575        bundle.r#type = Element {
576            id: None,
577            extension: None,
578            value: Some("collection".to_string()),
579        };
580        bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
581            resource: Some(Box::new(res)),
582            ..Default::default()
583        }]);
584        return Ok(SofBundle::R6(bundle));
585    }
586
587    Err(SofError::InvalidSourceContent(format!(
588        "Resource from '{}' could not be parsed as any supported FHIR version",
589        source_name
590    )))
591}
592
593/// Wrap an array of resources in a Bundle
594fn wrap_resources_in_bundle(
595    resources: serde_json::Value,
596    source_name: &str,
597) -> Result<SofBundle, SofError> {
598    let arr = resources
599        .as_array()
600        .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
601
602    if arr.is_empty() {
603        return Err(SofError::InvalidSourceContent(format!(
604            "Empty array of resources from '{}'",
605            source_name
606        )));
607    }
608
609    // Try to parse the first resource to determine version
610    let first = &arr[0];
611
612    // Try R4
613    #[cfg(feature = "R4")]
614    if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
615        let mut bundle = helios_fhir::r4::Bundle::default();
616        bundle.r#type = Element {
617            id: None,
618            extension: None,
619            value: Some("collection".to_string()),
620        };
621        let mut entries = Vec::new();
622
623        for resource in arr {
624            let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
625                .map_err(|e| {
626                    SofError::InvalidSourceContent(format!(
627                        "Failed to parse R4 resource from '{}': {}",
628                        source_name, e
629                    ))
630                })?;
631            entries.push(helios_fhir::r4::BundleEntry {
632                resource: Some(res),
633                ..Default::default()
634            });
635        }
636
637        bundle.entry = Some(entries);
638        return Ok(SofBundle::R4(bundle));
639    }
640
641    // Try R4B
642    #[cfg(feature = "R4B")]
643    if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
644        let mut bundle = helios_fhir::r4b::Bundle::default();
645        bundle.r#type = Element {
646            id: None,
647            extension: None,
648            value: Some("collection".to_string()),
649        };
650        let mut entries = Vec::new();
651
652        for resource in arr {
653            let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
654                .map_err(|e| {
655                    SofError::InvalidSourceContent(format!(
656                        "Failed to parse R4B resource from '{}': {}",
657                        source_name, e
658                    ))
659                })?;
660            entries.push(helios_fhir::r4b::BundleEntry {
661                resource: Some(res),
662                ..Default::default()
663            });
664        }
665
666        bundle.entry = Some(entries);
667        return Ok(SofBundle::R4B(bundle));
668    }
669
670    // Try R5
671    #[cfg(feature = "R5")]
672    if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
673        let mut bundle = helios_fhir::r5::Bundle::default();
674        bundle.r#type = Element {
675            id: None,
676            extension: None,
677            value: Some("collection".to_string()),
678        };
679        let mut entries = Vec::new();
680
681        for resource in arr {
682            let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
683                .map_err(|e| {
684                    SofError::InvalidSourceContent(format!(
685                        "Failed to parse R5 resource from '{}': {}",
686                        source_name, e
687                    ))
688                })?;
689            entries.push(helios_fhir::r5::BundleEntry {
690                resource: Some(Box::new(res)),
691                ..Default::default()
692            });
693        }
694
695        bundle.entry = Some(entries);
696        return Ok(SofBundle::R5(bundle));
697    }
698
699    // Try R6
700    #[cfg(feature = "R6")]
701    if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
702        let mut bundle = helios_fhir::r6::Bundle::default();
703        bundle.r#type = Element {
704            id: None,
705            extension: None,
706            value: Some("collection".to_string()),
707        };
708        let mut entries = Vec::new();
709
710        for resource in arr {
711            let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
712                .map_err(|e| {
713                    SofError::InvalidSourceContent(format!(
714                        "Failed to parse R6 resource from '{}': {}",
715                        source_name, e
716                    ))
717                })?;
718            entries.push(helios_fhir::r6::BundleEntry {
719                resource: Some(Box::new(res)),
720                ..Default::default()
721            });
722        }
723
724        bundle.entry = Some(entries);
725        return Ok(SofBundle::R6(bundle));
726    }
727
728    Err(SofError::InvalidSourceContent(format!(
729        "Resources from '{}' could not be parsed as any supported FHIR version",
730        source_name
731    )))
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737
738    #[tokio::test]
739    async fn test_parse_fhir_bundle() {
740        let bundle_json = r#"{
741            "resourceType": "Bundle",
742            "type": "collection",
743            "entry": [{
744                "resource": {
745                    "resourceType": "Patient",
746                    "id": "123"
747                }
748            }]
749        }"#;
750
751        let result = parse_fhir_content(bundle_json, "test").unwrap();
752        #[cfg(feature = "R4")]
753        assert!(matches!(result, SofBundle::R4(_)));
754        #[cfg(not(feature = "R4"))]
755        assert!(matches!(result, _));
756    }
757
758    #[tokio::test]
759    async fn test_parse_single_resource() {
760        let patient_json = r#"{
761            "resourceType": "Patient",
762            "id": "123"
763        }"#;
764
765        let result = parse_fhir_content(patient_json, "test").unwrap();
766        #[cfg(feature = "R4")]
767        match result {
768            SofBundle::R4(bundle) => {
769                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
770            }
771            #[cfg(feature = "R4B")]
772            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
773            #[cfg(feature = "R5")]
774            SofBundle::R5(_) => panic!("Expected R4 bundle"),
775            #[cfg(feature = "R6")]
776            SofBundle::R6(_) => panic!("Expected R4 bundle"),
777        }
778    }
779
780    #[tokio::test]
781    async fn test_parse_resource_array() {
782        let resources_json = r#"[
783            {
784                "resourceType": "Patient",
785                "id": "123"
786            },
787            {
788                "resourceType": "Patient",
789                "id": "456"
790            }
791        ]"#;
792
793        let result = parse_fhir_content(resources_json, "test").unwrap();
794        #[cfg(feature = "R4")]
795        match result {
796            SofBundle::R4(bundle) => {
797                assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
798            }
799            #[cfg(feature = "R4B")]
800            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
801            #[cfg(feature = "R5")]
802            SofBundle::R5(_) => panic!("Expected R4 bundle"),
803            #[cfg(feature = "R6")]
804            SofBundle::R6(_) => panic!("Expected R4 bundle"),
805        }
806    }
807
808    #[tokio::test]
809    async fn test_invalid_content() {
810        let invalid_json = r#"{"not": "fhir"}"#;
811        let result = parse_fhir_content(invalid_json, "test");
812        assert!(result.is_err());
813    }
814
815    #[tokio::test]
816    async fn test_s3_url_parsing() {
817        let data_source = UniversalDataSource::new();
818
819        // Test invalid S3 URL without bucket
820        let result = data_source.load("s3:///path/to/file.json").await;
821        assert!(result.is_err());
822        if let Err(SofError::InvalidSource(msg)) = result {
823            assert!(msg.contains("missing bucket name"));
824        }
825
826        // Test invalid S3 URL without path
827        let result = data_source.load("s3://bucket/").await;
828        assert!(result.is_err());
829        if let Err(SofError::InvalidSource(msg)) = result {
830            assert!(msg.contains("missing object path"));
831        }
832
833        // Note: Actual S3 fetching would require valid credentials and a real bucket
834        // These tests verify URL parsing and error handling
835    }
836
837    #[tokio::test]
838    async fn test_gcs_url_parsing() {
839        let data_source = UniversalDataSource::new();
840
841        // Test invalid GCS URL without bucket
842        let result = data_source.load("gs:///path/to/file.json").await;
843        assert!(result.is_err());
844        if let Err(SofError::InvalidSource(msg)) = result {
845            assert!(msg.contains("missing bucket name"));
846        }
847
848        // Test invalid GCS URL without path
849        let result = data_source.load("gs://bucket/").await;
850        assert!(result.is_err());
851        if let Err(SofError::InvalidSource(msg)) = result {
852            assert!(msg.contains("missing object path"));
853        }
854    }
855
856    #[tokio::test]
857    async fn test_azure_url_parsing() {
858        let data_source = UniversalDataSource::new();
859
860        // Test invalid Azure URL without container
861        let result = data_source.load("azure:///path/to/file.json").await;
862        assert!(result.is_err());
863        if let Err(SofError::InvalidSource(msg)) = result {
864            assert!(msg.contains("missing container name"));
865        }
866
867        // Test invalid Azure URL without path
868        let result = data_source.load("azure://container/").await;
869        assert!(result.is_err());
870        if let Err(SofError::InvalidSource(msg)) = result {
871            assert!(msg.contains("missing blob path"));
872        }
873    }
874
875    #[tokio::test]
876    async fn test_unsupported_protocol() {
877        let data_source = UniversalDataSource::new();
878
879        // Test unsupported protocol
880        let result = data_source.load("ftp://server/file.json").await;
881        assert!(result.is_err());
882        if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
883            assert!(msg.contains("Unsupported source protocol: ftp"));
884            assert!(msg.contains("Supported:"));
885        }
886    }
887
888    #[tokio::test]
889    async fn test_file_protocol_bundle() {
890        use std::io::Write;
891        use tempfile::NamedTempFile;
892
893        let data_source = UniversalDataSource::new();
894
895        // Create a temporary file with a FHIR Bundle
896        let bundle_json = r#"{
897            "resourceType": "Bundle",
898            "type": "collection",
899            "entry": [{
900                "resource": {
901                    "resourceType": "Patient",
902                    "id": "test-patient"
903                }
904            }]
905        }"#;
906
907        let mut temp_file = NamedTempFile::new().unwrap();
908        temp_file.write_all(bundle_json.as_bytes()).unwrap();
909        temp_file.flush().unwrap();
910
911        // Get the file path and convert to file:// URL
912        let file_path = temp_file.path();
913        let file_url = format!("file://{}", file_path.to_string_lossy());
914
915        // Test loading from file:// URL
916        let result = data_source.load(&file_url).await;
917        assert!(result.is_ok());
918
919        #[cfg(feature = "R4")]
920        match result.unwrap() {
921            SofBundle::R4(bundle) => {
922                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
923            }
924            #[cfg(feature = "R4B")]
925            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
926            #[cfg(feature = "R5")]
927            SofBundle::R5(_) => panic!("Expected R4 bundle"),
928            #[cfg(feature = "R6")]
929            SofBundle::R6(_) => panic!("Expected R4 bundle"),
930        }
931    }
932
933    #[tokio::test]
934    async fn test_file_protocol_single_resource() {
935        use std::io::Write;
936        use tempfile::NamedTempFile;
937
938        let data_source = UniversalDataSource::new();
939
940        // Create a temporary file with a single FHIR resource
941        let patient_json = r#"{
942            "resourceType": "Patient",
943            "id": "test-patient",
944            "name": [{
945                "family": "Test",
946                "given": ["Patient"]
947            }]
948        }"#;
949
950        let mut temp_file = NamedTempFile::new().unwrap();
951        temp_file.write_all(patient_json.as_bytes()).unwrap();
952        temp_file.flush().unwrap();
953
954        let file_path = temp_file.path();
955        let file_url = format!("file://{}", file_path.to_string_lossy());
956
957        // Test loading single resource - should be wrapped in a Bundle
958        let result = data_source.load(&file_url).await;
959        assert!(result.is_ok());
960
961        #[cfg(feature = "R4")]
962        match result.unwrap() {
963            SofBundle::R4(bundle) => {
964                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
965            }
966            #[cfg(feature = "R4B")]
967            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
968            #[cfg(feature = "R5")]
969            SofBundle::R5(_) => panic!("Expected R4 bundle"),
970            #[cfg(feature = "R6")]
971            SofBundle::R6(_) => panic!("Expected R4 bundle"),
972        }
973    }
974
975    #[tokio::test]
976    async fn test_file_protocol_resource_array() {
977        use std::io::Write;
978        use tempfile::NamedTempFile;
979
980        let data_source = UniversalDataSource::new();
981
982        // Create a temporary file with an array of FHIR resources
983        let resources_json = r#"[
984            {
985                "resourceType": "Patient",
986                "id": "patient-1"
987            },
988            {
989                "resourceType": "Patient",
990                "id": "patient-2"
991            },
992            {
993                "resourceType": "Observation",
994                "id": "obs-1",
995                "status": "final",
996                "code": {
997                    "text": "Test"
998                }
999            }
1000        ]"#;
1001
1002        let mut temp_file = NamedTempFile::new().unwrap();
1003        temp_file.write_all(resources_json.as_bytes()).unwrap();
1004        temp_file.flush().unwrap();
1005
1006        let file_path = temp_file.path();
1007        let file_url = format!("file://{}", file_path.to_string_lossy());
1008
1009        // Test loading array of resources
1010        let result = data_source.load(&file_url).await;
1011        assert!(result.is_ok());
1012
1013        #[cfg(feature = "R4")]
1014        match result.unwrap() {
1015            SofBundle::R4(bundle) => {
1016                assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1017            }
1018            #[cfg(feature = "R4B")]
1019            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1020            #[cfg(feature = "R5")]
1021            SofBundle::R5(_) => panic!("Expected R4 bundle"),
1022            #[cfg(feature = "R6")]
1023            SofBundle::R6(_) => panic!("Expected R4 bundle"),
1024        }
1025    }
1026
1027    #[tokio::test]
1028    async fn test_file_protocol_file_not_found() {
1029        use std::path::PathBuf;
1030        use url::Url;
1031
1032        let data_source = UniversalDataSource::new();
1033
1034        // Test with non-existent file using platform-appropriate path
1035        #[cfg(windows)]
1036        let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1037        #[cfg(not(windows))]
1038        let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1039
1040        let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1041
1042        let result = data_source.load(&file_url).await;
1043        assert!(result.is_err());
1044
1045        if let Err(SofError::SourceNotFound(msg)) = result {
1046            assert!(msg.contains("File not found"));
1047        } else {
1048            panic!("Expected SourceNotFound error");
1049        }
1050    }
1051
1052    #[tokio::test]
1053    async fn test_file_protocol_invalid_json() {
1054        use std::io::Write;
1055        use tempfile::NamedTempFile;
1056
1057        let data_source = UniversalDataSource::new();
1058
1059        // Create a temporary file with invalid JSON
1060        let invalid_json = "{ this is not valid json }";
1061
1062        let mut temp_file = NamedTempFile::new().unwrap();
1063        temp_file.write_all(invalid_json.as_bytes()).unwrap();
1064        temp_file.flush().unwrap();
1065
1066        let file_path = temp_file.path();
1067        let file_url = format!("file://{}", file_path.to_string_lossy());
1068
1069        // Test loading invalid JSON
1070        let result = data_source.load(&file_url).await;
1071        assert!(result.is_err());
1072
1073        if let Err(SofError::InvalidSourceContent(msg)) = result {
1074            assert!(msg.contains("Failed to parse JSON"));
1075        } else {
1076            panic!("Expected InvalidSourceContent error");
1077        }
1078    }
1079
1080    #[tokio::test]
1081    async fn test_file_protocol_invalid_fhir() {
1082        use std::io::Write;
1083        use tempfile::NamedTempFile;
1084
1085        let data_source = UniversalDataSource::new();
1086
1087        // Create a temporary file with valid JSON but not FHIR content
1088        let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1089
1090        let mut temp_file = NamedTempFile::new().unwrap();
1091        temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1092        temp_file.flush().unwrap();
1093
1094        let file_path = temp_file.path();
1095        let file_url = format!("file://{}", file_path.to_string_lossy());
1096
1097        // Test loading non-FHIR content
1098        let result = data_source.load(&file_url).await;
1099        assert!(result.is_err());
1100
1101        if let Err(SofError::InvalidSourceContent(msg)) = result {
1102            assert!(msg.contains("not a valid FHIR resource"));
1103        } else {
1104            panic!("Expected InvalidSourceContent error, got {:?}", result);
1105        }
1106    }
1107
1108    #[tokio::test]
1109    async fn test_file_protocol_invalid_url() {
1110        let data_source = UniversalDataSource::new();
1111
1112        // Test with malformed file URL (Windows-style path without proper file:// format)
1113        let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1114        assert!(result.is_err());
1115        // The error type will depend on URL parsing behavior
1116    }
1117}