helios_sof/
data_source.rs

1//! # FHIR Data Source Loading
2//!
3//! This module provides flexible data loading capabilities for FHIR resources from
4//! various sources including local files, HTTP endpoints, and cloud storage services.
5//! It handles automatic format detection and conversion to FHIR Bundles.
6//!
7//! ## Overview
8//!
9//! The data source system supports:
10//!
11//! - **Multiple Protocols**: file://, http(s)://, s3://, gs://, azure://
12//! - **Format Detection**: Automatic detection of JSON vs NDJSON formats
13//! - **Smart Wrapping**: Single resources and arrays automatically wrapped in Bundles
14//! - **Version Agnostic**: Works with R4, R4B, R5, and R6 FHIR versions
15//! - **Error Handling**: Comprehensive error reporting for invalid sources
16//!
17//! ## Supported Sources
18//!
19//! ### Local Files
20//! ```text
21//! file:///path/to/bundle.json
22//! file:///path/to/resource.ndjson
23//! ```
24//!
25//! ### HTTP/HTTPS
26//! ```text
27//! https://example.org/fhir/Bundle/123
28//! http://localhost:8080/Patient?_count=100
29//! ```
30//!
31//! ### Amazon S3
32//! ```text
33//! s3://my-bucket/path/to/bundle.json
34//! ```
35//! Requires: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
36//!
37//! ### Google Cloud Storage
38//! ```text
39//! gs://my-bucket/path/to/data.ndjson
40//! ```
41//! Requires: GOOGLE_SERVICE_ACCOUNT or Application Default Credentials
42//!
43//! ### Azure Blob Storage
44//! ```text
45//! azure://container/path/to/bundle.json
46//! abfss://container@account.dfs.core.windows.net/path/to/data.json
47//! ```
48//! Requires: AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY
49//!
50//! ## Key Components
51//!
52//! - [`DataSource`]: Trait for loading FHIR data from various sources
53//! - [`UniversalDataSource`]: Universal implementation supporting all protocols
54//! - [`parse_fhir_content()`]: Parses FHIR content and wraps it in a Bundle
55//!
56//! ## Format Support
57//!
58//! ### JSON Format
59//! - Single FHIR resources (Patient, Observation, etc.)
60//! - FHIR Bundles
61//! - Arrays of FHIR resources
62//!
63//! ### NDJSON Format
64//! - Newline-delimited JSON (one resource per line)
65//! - Detected by `.ndjson` extension or content analysis
66//! - Partial failures tolerated (invalid lines logged as warnings)
67//!
68//! ## Examples
69//!
70//! ```rust
71//! use helios_sof::data_source::{DataSource, UniversalDataSource};
72//!
73//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
74//! let source = UniversalDataSource::new();
75//!
76//! // Load from local file
77//! let bundle = source.load("file:///data/patients.json").await?;
78//!
79//! // Load from HTTP endpoint
80//! let bundle = source.load("https://hapi.fhir.org/baseR4/Patient?_count=10").await?;
81//!
82//! // Load from S3
83//! let bundle = source.load("s3://fhir-data/bundles/patients.json").await?;
84//!
85//! // Load NDJSON
86//! let bundle = source.load("file:///data/observations.ndjson").await?;
87//! # Ok(())
88//! # }
89//! ```
90//!
91//! ## Automatic Format Detection
92//!
93//! The module automatically:
94//! 1. Detects NDJSON by `.ndjson` file extension
95//! 2. Falls back to content-based detection for multi-line JSON files
96//! 3. Determines FHIR version by attempting to parse as each version
97//! 4. Wraps single resources or arrays in appropriate Bundle types
98//!
99//! ## Error Handling
100//!
101//! Provides detailed errors for:
102//! - Invalid URLs or protocols
103//! - Missing files or objects
104//! - Network failures
105//! - Malformed JSON
106//! - Invalid FHIR content
107//! - Missing credentials for cloud services
108
109use crate::{SofBundle, SofError};
110use async_trait::async_trait;
111use helios_fhir::Element;
112use object_store::{
113    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
114    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
115};
116use reqwest;
117use serde_json;
118use std::sync::Arc;
119use tokio::fs;
120use url::Url;
121
122/// Trait for loading FHIR data from various sources
123#[async_trait]
124pub trait DataSource: Send + Sync {
125    /// Load FHIR data from the source and return as a Bundle
126    async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
127}
128
129/// Implementation for loading data from various sources based on URL scheme
130pub struct UniversalDataSource {
131    client: reqwest::Client,
132}
133
134impl UniversalDataSource {
135    pub fn new() -> Self {
136        Self {
137            client: reqwest::Client::builder()
138                .timeout(std::time::Duration::from_secs(30))
139                .build()
140                .unwrap_or_else(|_| reqwest::Client::new()),
141        }
142    }
143}
144
145impl Default for UniversalDataSource {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151#[async_trait]
152impl DataSource for UniversalDataSource {
153    async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
154        // Parse the source as a URL to determine the protocol
155        let url = Url::parse(source).map_err(|e| {
156            SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
157        })?;
158
159        match url.scheme() {
160            "file" => load_from_file(&url).await,
161            "http" | "https" => load_from_http(&self.client, &url).await,
162            "s3" => load_from_s3(&url).await,
163            "gs" => load_from_gcs(&url).await,
164            "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
165            scheme => Err(SofError::UnsupportedSourceProtocol(format!(
166                "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
167                scheme
168            ))),
169        }
170    }
171}
172
173/// Load FHIR data from a local file
174async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
175    // Convert file URL to path
176    let path = url
177        .to_file_path()
178        .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
179
180    // Check if file exists
181    if !path.exists() {
182        return Err(SofError::SourceNotFound(format!(
183            "File not found: {}",
184            path.display()
185        )));
186    }
187
188    // Read file contents
189    let contents = fs::read_to_string(&path)
190        .await
191        .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
192
193    // Parse and convert to bundle
194    parse_fhir_content(&contents, &path.to_string_lossy())
195}
196
197/// Load FHIR data from HTTP/HTTPS URL
198async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
199    // Fetch content from URL
200    let response = client
201        .get(url.as_str())
202        .header("Accept", "application/fhir+json, application/json")
203        .send()
204        .await
205        .map_err(|e| {
206            SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
207        })?;
208
209    // Check response status
210    if !response.status().is_success() {
211        return Err(SofError::SourceFetchError(format!(
212            "HTTP error {} when fetching '{}'",
213            response.status(),
214            url
215        )));
216    }
217
218    // Get content
219    let contents = response
220        .text()
221        .await
222        .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
223
224    // Parse and convert to bundle
225    parse_fhir_content(&contents, url.as_str())
226}
227
228/// Load FHIR data from AWS S3
229async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
230    // Parse S3 URL: s3://bucket/path/to/object
231    let bucket = url.host_str().ok_or_else(|| {
232        SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
233    })?;
234
235    let path = url.path().trim_start_matches('/');
236    if path.is_empty() {
237        return Err(SofError::InvalidSource(format!(
238            "Invalid S3 URL '{}': missing object path",
239            url
240        )));
241    }
242
243    // Create S3 client using environment variables or default credentials
244    let store = AmazonS3Builder::new()
245        .with_bucket_name(bucket)
246        .build()
247        .map_err(|e| {
248            SofError::SourceFetchError(format!(
249                "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
250                url, e
251            ))
252        })?;
253
254    load_from_object_store(Arc::new(store), path, url.as_str()).await
255}
256
257/// Load FHIR data from Google Cloud Storage
258async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
259    // Parse GCS URL: gs://bucket/path/to/object
260    let bucket = url.host_str().ok_or_else(|| {
261        SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
262    })?;
263
264    let path = url.path().trim_start_matches('/');
265    if path.is_empty() {
266        return Err(SofError::InvalidSource(format!(
267            "Invalid GCS URL '{}': missing object path",
268            url
269        )));
270    }
271
272    // Create GCS client using environment variables or default credentials
273    let store = GoogleCloudStorageBuilder::new()
274        .with_bucket_name(bucket)
275        .build()
276        .map_err(|e| {
277            SofError::SourceFetchError(format!(
278                "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
279                url, e
280            ))
281        })?;
282
283    load_from_object_store(Arc::new(store), path, url.as_str()).await
284}
285
286/// Load FHIR data from Azure Blob Storage
287async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
288    // Parse Azure URL: azure://container/path/to/object or abfss://container@account.dfs.core.windows.net/path
289    let (container, path) = if url.scheme() == "azure" {
290        // Simple format: azure://container/path
291        let container = url.host_str().ok_or_else(|| {
292            SofError::InvalidSource(format!(
293                "Invalid Azure URL '{}': missing container name",
294                url
295            ))
296        })?;
297        let path = url.path().trim_start_matches('/');
298        (container.to_string(), path.to_string())
299    } else {
300        // ABFSS format: abfss://container@account.dfs.core.windows.net/path
301        let host = url.host_str().ok_or_else(|| {
302            SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
303        })?;
304        let parts: Vec<&str> = host.split('@').collect();
305        if parts.len() != 2 {
306            return Err(SofError::InvalidSource(format!(
307                "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
308                url
309            )));
310        }
311        let container = parts[0];
312        let path = url.path().trim_start_matches('/');
313        (container.to_string(), path.to_string())
314    };
315
316    if path.is_empty() {
317        return Err(SofError::InvalidSource(format!(
318            "Invalid Azure URL '{}': missing blob path",
319            url
320        )));
321    }
322
323    // Create Azure client using environment variables or managed identity
324    let store = MicrosoftAzureBuilder::new()
325        .with_container_name(&container)
326        .build()
327        .map_err(|e| {
328            SofError::SourceFetchError(format!(
329                "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
330                url, e
331            ))
332        })?;
333
334    load_from_object_store(Arc::new(store), &path, url.as_str()).await
335}
336
337/// Common function to load from any object store
338async fn load_from_object_store(
339    store: Arc<dyn ObjectStore>,
340    path: &str,
341    source_name: &str,
342) -> Result<SofBundle, SofError> {
343    // Create object path
344    let object_path = ObjectPath::from(path);
345
346    // Download the object
347    let result = store.get(&object_path).await.map_err(|e| match e {
348        object_store::Error::NotFound { .. } => {
349            SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
350        }
351        _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
352    })?;
353
354    // Read the bytes
355    let bytes = result
356        .bytes()
357        .await
358        .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
359
360    // Convert to string
361    let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
362        SofError::InvalidSourceContent(format!(
363            "Content from '{}' is not valid UTF-8: {}",
364            source_name, e
365        ))
366    })?;
367
368    // Parse and convert to bundle
369    parse_fhir_content(&contents, source_name)
370}
371
372/// Check if a source name suggests NDJSON format based on file extension
373fn is_ndjson_extension(source_name: &str) -> bool {
374    source_name.to_lowercase().ends_with(".ndjson")
375}
376
377/// Parse NDJSON content (newline-delimited JSON) and convert to SofBundle
378fn parse_ndjson_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
379    let lines: Vec<&str> = contents
380        .lines()
381        .filter(|line| !line.trim().is_empty())
382        .collect();
383
384    if lines.is_empty() {
385        return Err(SofError::InvalidSourceContent(format!(
386            "Empty NDJSON content from '{}'",
387            source_name
388        )));
389    }
390
391    // Parse each line as a separate JSON resource
392    let mut resources = Vec::new();
393    let mut parse_errors = Vec::new();
394
395    for (line_num, line) in lines.iter().enumerate() {
396        match serde_json::from_str::<serde_json::Value>(line) {
397            Ok(value) => {
398                // Verify it's a FHIR resource
399                if value.get("resourceType").and_then(|v| v.as_str()).is_some() {
400                    resources.push(value);
401                } else {
402                    parse_errors.push(format!(
403                        "Line {}: Missing 'resourceType' field",
404                        line_num + 1
405                    ));
406                }
407            }
408            Err(e) => {
409                parse_errors.push(format!("Line {}: {}", line_num + 1, e));
410            }
411        }
412    }
413
414    // If we have some valid resources, proceed even if some lines failed
415    if resources.is_empty() {
416        return Err(SofError::InvalidSourceContent(format!(
417            "No valid FHIR resources found in NDJSON from '{}'. Errors: {}",
418            source_name,
419            parse_errors.join("; ")
420        )));
421    }
422
423    // Log warnings for failed lines (in production, you might want to use a proper logger)
424    if !parse_errors.is_empty() {
425        eprintln!(
426            "Warning: {} line(s) in NDJSON from '{}' could not be parsed: {}",
427            parse_errors.len(),
428            source_name,
429            parse_errors.join("; ")
430        );
431    }
432
433    // Wrap all resources in a Bundle
434    let resources_array = serde_json::Value::Array(resources);
435    wrap_resources_in_bundle(resources_array, source_name)
436}
437
438/// Parse FHIR content and convert to SofBundle
439/// Supports both JSON and NDJSON formats with automatic detection
440pub fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
441    // Check if the source suggests NDJSON format based on file extension
442    if is_ndjson_extension(source_name) {
443        return parse_ndjson_content(contents, source_name);
444    }
445
446    // Try to parse as regular JSON first
447    let value: serde_json::Value = match serde_json::from_str(contents) {
448        Ok(v) => v,
449        Err(json_err) => {
450            // JSON parsing failed, try NDJSON as fallback (content-based detection)
451            // This handles cases where .json files actually contain NDJSON content
452            if contents.lines().count() > 1 {
453                // Multiple lines suggest it might be NDJSON
454                return parse_ndjson_content(contents, source_name).map_err(|ndjson_err| {
455                    // If both fail, return the original JSON error with a helpful message
456                    SofError::InvalidSourceContent(format!(
457                        "Failed to parse content from '{}' as JSON: {}. Also tried NDJSON: {}",
458                        source_name, json_err, ndjson_err
459                    ))
460                });
461            }
462
463            // Single line or regular JSON error
464            return Err(SofError::InvalidSourceContent(format!(
465                "Failed to parse JSON from '{}': {}",
466                source_name, json_err
467            )));
468        }
469    };
470
471    // Check if it's already a Bundle
472    if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
473        if resource_type == "Bundle" {
474            // Try parsing as each FHIR version
475            #[cfg(feature = "R4")]
476            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
477                return Ok(SofBundle::R4(bundle));
478            }
479            #[cfg(feature = "R4B")]
480            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
481                return Ok(SofBundle::R4B(bundle));
482            }
483            #[cfg(feature = "R5")]
484            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
485                return Ok(SofBundle::R5(bundle));
486            }
487            #[cfg(feature = "R6")]
488            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
489                return Ok(SofBundle::R6(bundle));
490            }
491            return Err(SofError::InvalidSourceContent(format!(
492                "Bundle from '{}' could not be parsed as any supported FHIR version",
493                source_name
494            )));
495        }
496
497        // It's a single resource - wrap it in a Bundle
498        return wrap_resource_in_bundle(value, source_name);
499    }
500
501    // Check if it's an array of resources
502    if value.is_array() {
503        return wrap_resources_in_bundle(value, source_name);
504    }
505
506    Err(SofError::InvalidSourceContent(format!(
507        "Content from '{}' is not a valid FHIR resource or Bundle",
508        source_name
509    )))
510}
511
512/// Wrap a single resource in a Bundle
513fn wrap_resource_in_bundle(
514    resource: serde_json::Value,
515    source_name: &str,
516) -> Result<SofBundle, SofError> {
517    // Try each FHIR version
518    // R4
519    #[cfg(feature = "R4")]
520    if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
521        let mut bundle = helios_fhir::r4::Bundle::default();
522        bundle.r#type = Element {
523            id: None,
524            extension: None,
525            value: Some("collection".to_string()),
526        };
527        bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
528            resource: Some(res),
529            ..Default::default()
530        }]);
531        return Ok(SofBundle::R4(bundle));
532    }
533
534    // R4B
535    #[cfg(feature = "R4B")]
536    if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
537        let mut bundle = helios_fhir::r4b::Bundle::default();
538        bundle.r#type = Element {
539            id: None,
540            extension: None,
541            value: Some("collection".to_string()),
542        };
543        bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
544            resource: Some(res),
545            ..Default::default()
546        }]);
547        return Ok(SofBundle::R4B(bundle));
548    }
549
550    // R5
551    #[cfg(feature = "R5")]
552    if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
553        let mut bundle = helios_fhir::r5::Bundle::default();
554        bundle.r#type = Element {
555            id: None,
556            extension: None,
557            value: Some("collection".to_string()),
558        };
559        bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
560            resource: Some(Box::new(res)),
561            ..Default::default()
562        }]);
563        return Ok(SofBundle::R5(bundle));
564    }
565
566    // R6
567    #[cfg(feature = "R6")]
568    if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
569        let mut bundle = helios_fhir::r6::Bundle::default();
570        bundle.r#type = Element {
571            id: None,
572            extension: None,
573            value: Some("collection".to_string()),
574        };
575        bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
576            resource: Some(Box::new(res)),
577            ..Default::default()
578        }]);
579        return Ok(SofBundle::R6(bundle));
580    }
581
582    Err(SofError::InvalidSourceContent(format!(
583        "Resource from '{}' could not be parsed as any supported FHIR version",
584        source_name
585    )))
586}
587
588/// Wrap an array of resources in a Bundle
589fn wrap_resources_in_bundle(
590    resources: serde_json::Value,
591    source_name: &str,
592) -> Result<SofBundle, SofError> {
593    let arr = resources
594        .as_array()
595        .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
596
597    if arr.is_empty() {
598        return Err(SofError::InvalidSourceContent(format!(
599            "Empty array of resources from '{}'",
600            source_name
601        )));
602    }
603
604    // Try to parse the first resource to determine version
605    let first = &arr[0];
606
607    // Try R4
608    #[cfg(feature = "R4")]
609    if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
610        let mut bundle = helios_fhir::r4::Bundle::default();
611        bundle.r#type = Element {
612            id: None,
613            extension: None,
614            value: Some("collection".to_string()),
615        };
616        let mut entries = Vec::new();
617
618        for resource in arr {
619            let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
620                .map_err(|e| {
621                    SofError::InvalidSourceContent(format!(
622                        "Failed to parse R4 resource from '{}': {}",
623                        source_name, e
624                    ))
625                })?;
626            entries.push(helios_fhir::r4::BundleEntry {
627                resource: Some(res),
628                ..Default::default()
629            });
630        }
631
632        bundle.entry = Some(entries);
633        return Ok(SofBundle::R4(bundle));
634    }
635
636    // Try R4B
637    #[cfg(feature = "R4B")]
638    if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
639        let mut bundle = helios_fhir::r4b::Bundle::default();
640        bundle.r#type = Element {
641            id: None,
642            extension: None,
643            value: Some("collection".to_string()),
644        };
645        let mut entries = Vec::new();
646
647        for resource in arr {
648            let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
649                .map_err(|e| {
650                    SofError::InvalidSourceContent(format!(
651                        "Failed to parse R4B resource from '{}': {}",
652                        source_name, e
653                    ))
654                })?;
655            entries.push(helios_fhir::r4b::BundleEntry {
656                resource: Some(res),
657                ..Default::default()
658            });
659        }
660
661        bundle.entry = Some(entries);
662        return Ok(SofBundle::R4B(bundle));
663    }
664
665    // Try R5
666    #[cfg(feature = "R5")]
667    if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
668        let mut bundle = helios_fhir::r5::Bundle::default();
669        bundle.r#type = Element {
670            id: None,
671            extension: None,
672            value: Some("collection".to_string()),
673        };
674        let mut entries = Vec::new();
675
676        for resource in arr {
677            let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
678                .map_err(|e| {
679                    SofError::InvalidSourceContent(format!(
680                        "Failed to parse R5 resource from '{}': {}",
681                        source_name, e
682                    ))
683                })?;
684            entries.push(helios_fhir::r5::BundleEntry {
685                resource: Some(Box::new(res)),
686                ..Default::default()
687            });
688        }
689
690        bundle.entry = Some(entries);
691        return Ok(SofBundle::R5(bundle));
692    }
693
694    // Try R6
695    #[cfg(feature = "R6")]
696    if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
697        let mut bundle = helios_fhir::r6::Bundle::default();
698        bundle.r#type = Element {
699            id: None,
700            extension: None,
701            value: Some("collection".to_string()),
702        };
703        let mut entries = Vec::new();
704
705        for resource in arr {
706            let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
707                .map_err(|e| {
708                    SofError::InvalidSourceContent(format!(
709                        "Failed to parse R6 resource from '{}': {}",
710                        source_name, e
711                    ))
712                })?;
713            entries.push(helios_fhir::r6::BundleEntry {
714                resource: Some(Box::new(res)),
715                ..Default::default()
716            });
717        }
718
719        bundle.entry = Some(entries);
720        return Ok(SofBundle::R6(bundle));
721    }
722
723    Err(SofError::InvalidSourceContent(format!(
724        "Resources from '{}' could not be parsed as any supported FHIR version",
725        source_name
726    )))
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732
733    #[tokio::test]
734    async fn test_parse_fhir_bundle() {
735        let bundle_json = r#"{
736            "resourceType": "Bundle",
737            "type": "collection",
738            "entry": [{
739                "resource": {
740                    "resourceType": "Patient",
741                    "id": "123"
742                }
743            }]
744        }"#;
745
746        let result = parse_fhir_content(bundle_json, "test").unwrap();
747        #[cfg(feature = "R4")]
748        assert!(matches!(result, SofBundle::R4(_)));
749        #[cfg(not(feature = "R4"))]
750        assert!(matches!(result, _));
751    }
752
753    #[tokio::test]
754    async fn test_parse_single_resource() {
755        let patient_json = r#"{
756            "resourceType": "Patient",
757            "id": "123"
758        }"#;
759
760        let result = parse_fhir_content(patient_json, "test").unwrap();
761        #[cfg(feature = "R4")]
762        match result {
763            SofBundle::R4(bundle) => {
764                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
765            }
766            #[cfg(feature = "R4B")]
767            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
768            #[cfg(feature = "R5")]
769            SofBundle::R5(_) => panic!("Expected R4 bundle"),
770            #[cfg(feature = "R6")]
771            SofBundle::R6(_) => panic!("Expected R4 bundle"),
772        }
773    }
774
775    #[tokio::test]
776    async fn test_parse_resource_array() {
777        let resources_json = r#"[
778            {
779                "resourceType": "Patient",
780                "id": "123"
781            },
782            {
783                "resourceType": "Patient",
784                "id": "456"
785            }
786        ]"#;
787
788        let result = parse_fhir_content(resources_json, "test").unwrap();
789        #[cfg(feature = "R4")]
790        match result {
791            SofBundle::R4(bundle) => {
792                assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
793            }
794            #[cfg(feature = "R4B")]
795            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
796            #[cfg(feature = "R5")]
797            SofBundle::R5(_) => panic!("Expected R4 bundle"),
798            #[cfg(feature = "R6")]
799            SofBundle::R6(_) => panic!("Expected R4 bundle"),
800        }
801    }
802
803    #[tokio::test]
804    async fn test_invalid_content() {
805        let invalid_json = r#"{"not": "fhir"}"#;
806        let result = parse_fhir_content(invalid_json, "test");
807        assert!(result.is_err());
808    }
809
810    #[tokio::test]
811    async fn test_s3_url_parsing() {
812        let data_source = UniversalDataSource::new();
813
814        // Test invalid S3 URL without bucket
815        let result = data_source.load("s3:///path/to/file.json").await;
816        assert!(result.is_err());
817        if let Err(SofError::InvalidSource(msg)) = result {
818            assert!(msg.contains("missing bucket name"));
819        }
820
821        // Test invalid S3 URL without path
822        let result = data_source.load("s3://bucket/").await;
823        assert!(result.is_err());
824        if let Err(SofError::InvalidSource(msg)) = result {
825            assert!(msg.contains("missing object path"));
826        }
827
828        // Note: Actual S3 fetching would require valid credentials and a real bucket
829        // These tests verify URL parsing and error handling
830    }
831
832    #[tokio::test]
833    async fn test_gcs_url_parsing() {
834        let data_source = UniversalDataSource::new();
835
836        // Test invalid GCS URL without bucket
837        let result = data_source.load("gs:///path/to/file.json").await;
838        assert!(result.is_err());
839        if let Err(SofError::InvalidSource(msg)) = result {
840            assert!(msg.contains("missing bucket name"));
841        }
842
843        // Test invalid GCS URL without path
844        let result = data_source.load("gs://bucket/").await;
845        assert!(result.is_err());
846        if let Err(SofError::InvalidSource(msg)) = result {
847            assert!(msg.contains("missing object path"));
848        }
849    }
850
851    #[tokio::test]
852    async fn test_azure_url_parsing() {
853        let data_source = UniversalDataSource::new();
854
855        // Test invalid Azure URL without container
856        let result = data_source.load("azure:///path/to/file.json").await;
857        assert!(result.is_err());
858        if let Err(SofError::InvalidSource(msg)) = result {
859            assert!(msg.contains("missing container name"));
860        }
861
862        // Test invalid Azure URL without path
863        let result = data_source.load("azure://container/").await;
864        assert!(result.is_err());
865        if let Err(SofError::InvalidSource(msg)) = result {
866            assert!(msg.contains("missing blob path"));
867        }
868    }
869
870    #[tokio::test]
871    async fn test_unsupported_protocol() {
872        let data_source = UniversalDataSource::new();
873
874        // Test unsupported protocol
875        let result = data_source.load("ftp://server/file.json").await;
876        assert!(result.is_err());
877        if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
878            assert!(msg.contains("Unsupported source protocol: ftp"));
879            assert!(msg.contains("Supported:"));
880        }
881    }
882
883    #[tokio::test]
884    async fn test_file_protocol_bundle() {
885        use std::io::Write;
886        use tempfile::NamedTempFile;
887
888        let data_source = UniversalDataSource::new();
889
890        // Create a temporary file with a FHIR Bundle
891        let bundle_json = r#"{
892            "resourceType": "Bundle",
893            "type": "collection",
894            "entry": [{
895                "resource": {
896                    "resourceType": "Patient",
897                    "id": "test-patient"
898                }
899            }]
900        }"#;
901
902        let mut temp_file = NamedTempFile::new().unwrap();
903        temp_file.write_all(bundle_json.as_bytes()).unwrap();
904        temp_file.flush().unwrap();
905
906        // Get the file path and convert to file:// URL
907        let file_path = temp_file.path();
908        let file_url = format!("file://{}", file_path.to_string_lossy());
909
910        // Test loading from file:// URL
911        let result = data_source.load(&file_url).await;
912        assert!(result.is_ok());
913
914        #[cfg(feature = "R4")]
915        match result.unwrap() {
916            SofBundle::R4(bundle) => {
917                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
918            }
919            #[cfg(feature = "R4B")]
920            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
921            #[cfg(feature = "R5")]
922            SofBundle::R5(_) => panic!("Expected R4 bundle"),
923            #[cfg(feature = "R6")]
924            SofBundle::R6(_) => panic!("Expected R4 bundle"),
925        }
926    }
927
928    #[tokio::test]
929    async fn test_file_protocol_single_resource() {
930        use std::io::Write;
931        use tempfile::NamedTempFile;
932
933        let data_source = UniversalDataSource::new();
934
935        // Create a temporary file with a single FHIR resource
936        let patient_json = r#"{
937            "resourceType": "Patient",
938            "id": "test-patient",
939            "name": [{
940                "family": "Test",
941                "given": ["Patient"]
942            }]
943        }"#;
944
945        let mut temp_file = NamedTempFile::new().unwrap();
946        temp_file.write_all(patient_json.as_bytes()).unwrap();
947        temp_file.flush().unwrap();
948
949        let file_path = temp_file.path();
950        let file_url = format!("file://{}", file_path.to_string_lossy());
951
952        // Test loading single resource - should be wrapped in a Bundle
953        let result = data_source.load(&file_url).await;
954        assert!(result.is_ok());
955
956        #[cfg(feature = "R4")]
957        match result.unwrap() {
958            SofBundle::R4(bundle) => {
959                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
960            }
961            #[cfg(feature = "R4B")]
962            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
963            #[cfg(feature = "R5")]
964            SofBundle::R5(_) => panic!("Expected R4 bundle"),
965            #[cfg(feature = "R6")]
966            SofBundle::R6(_) => panic!("Expected R4 bundle"),
967        }
968    }
969
970    #[tokio::test]
971    async fn test_file_protocol_resource_array() {
972        use std::io::Write;
973        use tempfile::NamedTempFile;
974
975        let data_source = UniversalDataSource::new();
976
977        // Create a temporary file with an array of FHIR resources
978        let resources_json = r#"[
979            {
980                "resourceType": "Patient",
981                "id": "patient-1"
982            },
983            {
984                "resourceType": "Patient",
985                "id": "patient-2"
986            },
987            {
988                "resourceType": "Observation",
989                "id": "obs-1",
990                "status": "final",
991                "code": {
992                    "text": "Test"
993                }
994            }
995        ]"#;
996
997        let mut temp_file = NamedTempFile::new().unwrap();
998        temp_file.write_all(resources_json.as_bytes()).unwrap();
999        temp_file.flush().unwrap();
1000
1001        let file_path = temp_file.path();
1002        let file_url = format!("file://{}", file_path.to_string_lossy());
1003
1004        // Test loading array of resources
1005        let result = data_source.load(&file_url).await;
1006        assert!(result.is_ok());
1007
1008        #[cfg(feature = "R4")]
1009        match result.unwrap() {
1010            SofBundle::R4(bundle) => {
1011                assert_eq!(bundle.entry.as_ref().unwrap().len(), 3);
1012            }
1013            #[cfg(feature = "R4B")]
1014            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
1015            #[cfg(feature = "R5")]
1016            SofBundle::R5(_) => panic!("Expected R4 bundle"),
1017            #[cfg(feature = "R6")]
1018            SofBundle::R6(_) => panic!("Expected R4 bundle"),
1019        }
1020    }
1021
1022    #[tokio::test]
1023    async fn test_file_protocol_file_not_found() {
1024        use std::path::PathBuf;
1025        use url::Url;
1026
1027        let data_source = UniversalDataSource::new();
1028
1029        // Test with non-existent file using platform-appropriate path
1030        #[cfg(windows)]
1031        let nonexistent_path = PathBuf::from("C:\\nonexistent\\path\\to\\file.json");
1032        #[cfg(not(windows))]
1033        let nonexistent_path = PathBuf::from("/nonexistent/path/to/file.json");
1034
1035        let file_url = Url::from_file_path(&nonexistent_path).unwrap().to_string();
1036
1037        let result = data_source.load(&file_url).await;
1038        assert!(result.is_err());
1039
1040        if let Err(SofError::SourceNotFound(msg)) = result {
1041            assert!(msg.contains("File not found"));
1042        } else {
1043            panic!("Expected SourceNotFound error");
1044        }
1045    }
1046
1047    #[tokio::test]
1048    async fn test_file_protocol_invalid_json() {
1049        use std::io::Write;
1050        use tempfile::NamedTempFile;
1051
1052        let data_source = UniversalDataSource::new();
1053
1054        // Create a temporary file with invalid JSON
1055        let invalid_json = "{ this is not valid json }";
1056
1057        let mut temp_file = NamedTempFile::new().unwrap();
1058        temp_file.write_all(invalid_json.as_bytes()).unwrap();
1059        temp_file.flush().unwrap();
1060
1061        let file_path = temp_file.path();
1062        let file_url = format!("file://{}", file_path.to_string_lossy());
1063
1064        // Test loading invalid JSON
1065        let result = data_source.load(&file_url).await;
1066        assert!(result.is_err());
1067
1068        if let Err(SofError::InvalidSourceContent(msg)) = result {
1069            assert!(msg.contains("Failed to parse JSON"));
1070        } else {
1071            panic!("Expected InvalidSourceContent error");
1072        }
1073    }
1074
1075    #[tokio::test]
1076    async fn test_file_protocol_invalid_fhir() {
1077        use std::io::Write;
1078        use tempfile::NamedTempFile;
1079
1080        let data_source = UniversalDataSource::new();
1081
1082        // Create a temporary file with valid JSON but not FHIR content
1083        let not_fhir_json = r#"{"just": "some", "random": "data"}"#;
1084
1085        let mut temp_file = NamedTempFile::new().unwrap();
1086        temp_file.write_all(not_fhir_json.as_bytes()).unwrap();
1087        temp_file.flush().unwrap();
1088
1089        let file_path = temp_file.path();
1090        let file_url = format!("file://{}", file_path.to_string_lossy());
1091
1092        // Test loading non-FHIR content
1093        let result = data_source.load(&file_url).await;
1094        assert!(result.is_err());
1095
1096        if let Err(SofError::InvalidSourceContent(msg)) = result {
1097            assert!(msg.contains("not a valid FHIR resource"));
1098        } else {
1099            panic!("Expected InvalidSourceContent error, got {:?}", result);
1100        }
1101    }
1102
1103    #[tokio::test]
1104    async fn test_file_protocol_invalid_url() {
1105        let data_source = UniversalDataSource::new();
1106
1107        // Test with malformed file URL (Windows-style path without proper file:// format)
1108        let result = data_source.load("file://C:\\invalid\\windows\\path").await;
1109        assert!(result.is_err());
1110        // The error type will depend on URL parsing behavior
1111    }
1112}