helios_sof/
data_source.rs

1use crate::{SofBundle, SofError};
2use async_trait::async_trait;
3use helios_fhir::Element;
4use object_store::{
5    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
6    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
7};
8use reqwest;
9use serde_json;
10use std::sync::Arc;
11use tokio::fs;
12use url::Url;
13
14/// Trait for loading FHIR data from various sources
15#[async_trait]
16pub trait DataSource: Send + Sync {
17    /// Load FHIR data from the source and return as a Bundle
18    async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
19}
20
21/// Implementation for loading data from various sources based on URL scheme
22pub struct UniversalDataSource {
23    client: reqwest::Client,
24}
25
26impl UniversalDataSource {
27    pub fn new() -> Self {
28        Self {
29            client: reqwest::Client::builder()
30                .timeout(std::time::Duration::from_secs(30))
31                .build()
32                .unwrap_or_else(|_| reqwest::Client::new()),
33        }
34    }
35}
36
37impl Default for UniversalDataSource {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43#[async_trait]
44impl DataSource for UniversalDataSource {
45    async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
46        // Parse the source as a URL to determine the protocol
47        let url = Url::parse(source).map_err(|e| {
48            SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
49        })?;
50
51        match url.scheme() {
52            "file" => load_from_file(&url).await,
53            "http" | "https" => load_from_http(&self.client, &url).await,
54            "s3" => load_from_s3(&url).await,
55            "gs" => load_from_gcs(&url).await,
56            "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
57            scheme => Err(SofError::UnsupportedSourceProtocol(format!(
58                "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
59                scheme
60            ))),
61        }
62    }
63}
64
65/// Load FHIR data from a local file
66async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
67    // Convert file URL to path
68    let path = url
69        .to_file_path()
70        .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
71
72    // Check if file exists
73    if !path.exists() {
74        return Err(SofError::SourceNotFound(format!(
75            "File not found: {}",
76            path.display()
77        )));
78    }
79
80    // Read file contents
81    let contents = fs::read_to_string(&path)
82        .await
83        .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
84
85    // Parse and convert to bundle
86    parse_fhir_content(&contents, &path.to_string_lossy())
87}
88
89/// Load FHIR data from HTTP/HTTPS URL
90async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
91    // Fetch content from URL
92    let response = client
93        .get(url.as_str())
94        .header("Accept", "application/fhir+json, application/json")
95        .send()
96        .await
97        .map_err(|e| {
98            SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
99        })?;
100
101    // Check response status
102    if !response.status().is_success() {
103        return Err(SofError::SourceFetchError(format!(
104            "HTTP error {} when fetching '{}'",
105            response.status(),
106            url
107        )));
108    }
109
110    // Get content
111    let contents = response
112        .text()
113        .await
114        .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
115
116    // Parse and convert to bundle
117    parse_fhir_content(&contents, url.as_str())
118}
119
120/// Load FHIR data from AWS S3
121async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
122    // Parse S3 URL: s3://bucket/path/to/object
123    let bucket = url.host_str().ok_or_else(|| {
124        SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
125    })?;
126
127    let path = url.path().trim_start_matches('/');
128    if path.is_empty() {
129        return Err(SofError::InvalidSource(format!(
130            "Invalid S3 URL '{}': missing object path",
131            url
132        )));
133    }
134
135    // Create S3 client using environment variables or default credentials
136    let store = AmazonS3Builder::new()
137        .with_bucket_name(bucket)
138        .build()
139        .map_err(|e| {
140            SofError::SourceFetchError(format!(
141                "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
142                url, e
143            ))
144        })?;
145
146    load_from_object_store(Arc::new(store), path, url.as_str()).await
147}
148
149/// Load FHIR data from Google Cloud Storage
150async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
151    // Parse GCS URL: gs://bucket/path/to/object
152    let bucket = url.host_str().ok_or_else(|| {
153        SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
154    })?;
155
156    let path = url.path().trim_start_matches('/');
157    if path.is_empty() {
158        return Err(SofError::InvalidSource(format!(
159            "Invalid GCS URL '{}': missing object path",
160            url
161        )));
162    }
163
164    // Create GCS client using environment variables or default credentials
165    let store = GoogleCloudStorageBuilder::new()
166        .with_bucket_name(bucket)
167        .build()
168        .map_err(|e| {
169            SofError::SourceFetchError(format!(
170                "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
171                url, e
172            ))
173        })?;
174
175    load_from_object_store(Arc::new(store), path, url.as_str()).await
176}
177
178/// Load FHIR data from Azure Blob Storage
179async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
180    // Parse Azure URL: azure://container/path/to/object or abfss://container@account.dfs.core.windows.net/path
181    let (container, path) = if url.scheme() == "azure" {
182        // Simple format: azure://container/path
183        let container = url.host_str().ok_or_else(|| {
184            SofError::InvalidSource(format!(
185                "Invalid Azure URL '{}': missing container name",
186                url
187            ))
188        })?;
189        let path = url.path().trim_start_matches('/');
190        (container.to_string(), path.to_string())
191    } else {
192        // ABFSS format: abfss://container@account.dfs.core.windows.net/path
193        let host = url.host_str().ok_or_else(|| {
194            SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
195        })?;
196        let parts: Vec<&str> = host.split('@').collect();
197        if parts.len() != 2 {
198            return Err(SofError::InvalidSource(format!(
199                "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
200                url
201            )));
202        }
203        let container = parts[0];
204        let path = url.path().trim_start_matches('/');
205        (container.to_string(), path.to_string())
206    };
207
208    if path.is_empty() {
209        return Err(SofError::InvalidSource(format!(
210            "Invalid Azure URL '{}': missing blob path",
211            url
212        )));
213    }
214
215    // Create Azure client using environment variables or managed identity
216    let store = MicrosoftAzureBuilder::new()
217        .with_container_name(&container)
218        .build()
219        .map_err(|e| {
220            SofError::SourceFetchError(format!(
221                "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
222                url, e
223            ))
224        })?;
225
226    load_from_object_store(Arc::new(store), &path, url.as_str()).await
227}
228
229/// Common function to load from any object store
230async fn load_from_object_store(
231    store: Arc<dyn ObjectStore>,
232    path: &str,
233    source_name: &str,
234) -> Result<SofBundle, SofError> {
235    // Create object path
236    let object_path = ObjectPath::from(path);
237
238    // Download the object
239    let result = store.get(&object_path).await.map_err(|e| match e {
240        object_store::Error::NotFound { .. } => {
241            SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
242        }
243        _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
244    })?;
245
246    // Read the bytes
247    let bytes = result
248        .bytes()
249        .await
250        .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
251
252    // Convert to string
253    let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
254        SofError::InvalidSourceContent(format!(
255            "Content from '{}' is not valid UTF-8: {}",
256            source_name, e
257        ))
258    })?;
259
260    // Parse and convert to bundle
261    parse_fhir_content(&contents, source_name)
262}
263
264/// Parse FHIR content and convert to SofBundle
265fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
266    // First, try to determine what type of content we have
267    let value: serde_json::Value = serde_json::from_str(contents).map_err(|e| {
268        SofError::InvalidSourceContent(format!(
269            "Failed to parse JSON from '{}': {}",
270            source_name, e
271        ))
272    })?;
273
274    // Check if it's already a Bundle
275    if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
276        if resource_type == "Bundle" {
277            // Try parsing as each FHIR version
278            #[cfg(feature = "R4")]
279            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
280                return Ok(SofBundle::R4(bundle));
281            }
282            #[cfg(feature = "R4B")]
283            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
284                return Ok(SofBundle::R4B(bundle));
285            }
286            #[cfg(feature = "R5")]
287            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
288                return Ok(SofBundle::R5(bundle));
289            }
290            #[cfg(feature = "R6")]
291            if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
292                return Ok(SofBundle::R6(bundle));
293            }
294            return Err(SofError::InvalidSourceContent(format!(
295                "Bundle from '{}' could not be parsed as any supported FHIR version",
296                source_name
297            )));
298        }
299
300        // It's a single resource - wrap it in a Bundle
301        return wrap_resource_in_bundle(value, source_name);
302    }
303
304    // Check if it's an array of resources
305    if value.is_array() {
306        return wrap_resources_in_bundle(value, source_name);
307    }
308
309    Err(SofError::InvalidSourceContent(format!(
310        "Content from '{}' is not a valid FHIR resource or Bundle",
311        source_name
312    )))
313}
314
315/// Wrap a single resource in a Bundle
316fn wrap_resource_in_bundle(
317    resource: serde_json::Value,
318    source_name: &str,
319) -> Result<SofBundle, SofError> {
320    // Try each FHIR version
321    // R4
322    #[cfg(feature = "R4")]
323    if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
324        let mut bundle = helios_fhir::r4::Bundle::default();
325        bundle.r#type = Element {
326            id: None,
327            extension: None,
328            value: Some("collection".to_string()),
329        };
330        bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
331            resource: Some(res),
332            ..Default::default()
333        }]);
334        return Ok(SofBundle::R4(bundle));
335    }
336
337    // R4B
338    #[cfg(feature = "R4B")]
339    if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
340        let mut bundle = helios_fhir::r4b::Bundle::default();
341        bundle.r#type = Element {
342            id: None,
343            extension: None,
344            value: Some("collection".to_string()),
345        };
346        bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
347            resource: Some(res),
348            ..Default::default()
349        }]);
350        return Ok(SofBundle::R4B(bundle));
351    }
352
353    // R5
354    #[cfg(feature = "R5")]
355    if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
356        let mut bundle = helios_fhir::r5::Bundle::default();
357        bundle.r#type = Element {
358            id: None,
359            extension: None,
360            value: Some("collection".to_string()),
361        };
362        bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
363            resource: Some(Box::new(res)),
364            ..Default::default()
365        }]);
366        return Ok(SofBundle::R5(bundle));
367    }
368
369    // R6
370    #[cfg(feature = "R6")]
371    if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
372        let mut bundle = helios_fhir::r6::Bundle::default();
373        bundle.r#type = Element {
374            id: None,
375            extension: None,
376            value: Some("collection".to_string()),
377        };
378        bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
379            resource: Some(Box::new(res)),
380            ..Default::default()
381        }]);
382        return Ok(SofBundle::R6(bundle));
383    }
384
385    Err(SofError::InvalidSourceContent(format!(
386        "Resource from '{}' could not be parsed as any supported FHIR version",
387        source_name
388    )))
389}
390
391/// Wrap an array of resources in a Bundle
392fn wrap_resources_in_bundle(
393    resources: serde_json::Value,
394    source_name: &str,
395) -> Result<SofBundle, SofError> {
396    let arr = resources
397        .as_array()
398        .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
399
400    if arr.is_empty() {
401        return Err(SofError::InvalidSourceContent(format!(
402            "Empty array of resources from '{}'",
403            source_name
404        )));
405    }
406
407    // Try to parse the first resource to determine version
408    let first = &arr[0];
409
410    // Try R4
411    #[cfg(feature = "R4")]
412    if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
413        let mut bundle = helios_fhir::r4::Bundle::default();
414        bundle.r#type = Element {
415            id: None,
416            extension: None,
417            value: Some("collection".to_string()),
418        };
419        let mut entries = Vec::new();
420
421        for resource in arr {
422            let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
423                .map_err(|e| {
424                    SofError::InvalidSourceContent(format!(
425                        "Failed to parse R4 resource from '{}': {}",
426                        source_name, e
427                    ))
428                })?;
429            entries.push(helios_fhir::r4::BundleEntry {
430                resource: Some(res),
431                ..Default::default()
432            });
433        }
434
435        bundle.entry = Some(entries);
436        return Ok(SofBundle::R4(bundle));
437    }
438
439    // Try R4B
440    #[cfg(feature = "R4B")]
441    if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
442        let mut bundle = helios_fhir::r4b::Bundle::default();
443        bundle.r#type = Element {
444            id: None,
445            extension: None,
446            value: Some("collection".to_string()),
447        };
448        let mut entries = Vec::new();
449
450        for resource in arr {
451            let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
452                .map_err(|e| {
453                    SofError::InvalidSourceContent(format!(
454                        "Failed to parse R4B resource from '{}': {}",
455                        source_name, e
456                    ))
457                })?;
458            entries.push(helios_fhir::r4b::BundleEntry {
459                resource: Some(res),
460                ..Default::default()
461            });
462        }
463
464        bundle.entry = Some(entries);
465        return Ok(SofBundle::R4B(bundle));
466    }
467
468    // Try R5
469    #[cfg(feature = "R5")]
470    if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
471        let mut bundle = helios_fhir::r5::Bundle::default();
472        bundle.r#type = Element {
473            id: None,
474            extension: None,
475            value: Some("collection".to_string()),
476        };
477        let mut entries = Vec::new();
478
479        for resource in arr {
480            let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
481                .map_err(|e| {
482                    SofError::InvalidSourceContent(format!(
483                        "Failed to parse R5 resource from '{}': {}",
484                        source_name, e
485                    ))
486                })?;
487            entries.push(helios_fhir::r5::BundleEntry {
488                resource: Some(Box::new(res)),
489                ..Default::default()
490            });
491        }
492
493        bundle.entry = Some(entries);
494        return Ok(SofBundle::R5(bundle));
495    }
496
497    // Try R6
498    #[cfg(feature = "R6")]
499    if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
500        let mut bundle = helios_fhir::r6::Bundle::default();
501        bundle.r#type = Element {
502            id: None,
503            extension: None,
504            value: Some("collection".to_string()),
505        };
506        let mut entries = Vec::new();
507
508        for resource in arr {
509            let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
510                .map_err(|e| {
511                    SofError::InvalidSourceContent(format!(
512                        "Failed to parse R6 resource from '{}': {}",
513                        source_name, e
514                    ))
515                })?;
516            entries.push(helios_fhir::r6::BundleEntry {
517                resource: Some(Box::new(res)),
518                ..Default::default()
519            });
520        }
521
522        bundle.entry = Some(entries);
523        return Ok(SofBundle::R6(bundle));
524    }
525
526    Err(SofError::InvalidSourceContent(format!(
527        "Resources from '{}' could not be parsed as any supported FHIR version",
528        source_name
529    )))
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535
536    #[tokio::test]
537    async fn test_parse_fhir_bundle() {
538        let bundle_json = r#"{
539            "resourceType": "Bundle",
540            "type": "collection",
541            "entry": [{
542                "resource": {
543                    "resourceType": "Patient",
544                    "id": "123"
545                }
546            }]
547        }"#;
548
549        let result = parse_fhir_content(bundle_json, "test").unwrap();
550        #[cfg(feature = "R4")]
551        assert!(matches!(result, SofBundle::R4(_)));
552        #[cfg(not(feature = "R4"))]
553        assert!(matches!(result, _));
554    }
555
556    #[tokio::test]
557    async fn test_parse_single_resource() {
558        let patient_json = r#"{
559            "resourceType": "Patient",
560            "id": "123"
561        }"#;
562
563        let result = parse_fhir_content(patient_json, "test").unwrap();
564        #[cfg(feature = "R4")]
565        match result {
566            SofBundle::R4(bundle) => {
567                assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
568            }
569            #[cfg(feature = "R4B")]
570            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
571            #[cfg(feature = "R5")]
572            SofBundle::R5(_) => panic!("Expected R4 bundle"),
573            #[cfg(feature = "R6")]
574            SofBundle::R6(_) => panic!("Expected R4 bundle"),
575        }
576    }
577
578    #[tokio::test]
579    async fn test_parse_resource_array() {
580        let resources_json = r#"[
581            {
582                "resourceType": "Patient",
583                "id": "123"
584            },
585            {
586                "resourceType": "Patient",
587                "id": "456"
588            }
589        ]"#;
590
591        let result = parse_fhir_content(resources_json, "test").unwrap();
592        #[cfg(feature = "R4")]
593        match result {
594            SofBundle::R4(bundle) => {
595                assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
596            }
597            #[cfg(feature = "R4B")]
598            SofBundle::R4B(_) => panic!("Expected R4 bundle"),
599            #[cfg(feature = "R5")]
600            SofBundle::R5(_) => panic!("Expected R4 bundle"),
601            #[cfg(feature = "R6")]
602            SofBundle::R6(_) => panic!("Expected R4 bundle"),
603        }
604    }
605
606    #[tokio::test]
607    async fn test_invalid_content() {
608        let invalid_json = r#"{"not": "fhir"}"#;
609        let result = parse_fhir_content(invalid_json, "test");
610        assert!(result.is_err());
611    }
612
613    #[tokio::test]
614    async fn test_s3_url_parsing() {
615        let data_source = UniversalDataSource::new();
616
617        // Test invalid S3 URL without bucket
618        let result = data_source.load("s3:///path/to/file.json").await;
619        assert!(result.is_err());
620        if let Err(SofError::InvalidSource(msg)) = result {
621            assert!(msg.contains("missing bucket name"));
622        }
623
624        // Test invalid S3 URL without path
625        let result = data_source.load("s3://bucket/").await;
626        assert!(result.is_err());
627        if let Err(SofError::InvalidSource(msg)) = result {
628            assert!(msg.contains("missing object path"));
629        }
630
631        // Note: Actual S3 fetching would require valid credentials and a real bucket
632        // These tests verify URL parsing and error handling
633    }
634
635    #[tokio::test]
636    async fn test_gcs_url_parsing() {
637        let data_source = UniversalDataSource::new();
638
639        // Test invalid GCS URL without bucket
640        let result = data_source.load("gs:///path/to/file.json").await;
641        assert!(result.is_err());
642        if let Err(SofError::InvalidSource(msg)) = result {
643            assert!(msg.contains("missing bucket name"));
644        }
645
646        // Test invalid GCS URL without path
647        let result = data_source.load("gs://bucket/").await;
648        assert!(result.is_err());
649        if let Err(SofError::InvalidSource(msg)) = result {
650            assert!(msg.contains("missing object path"));
651        }
652    }
653
654    #[tokio::test]
655    async fn test_azure_url_parsing() {
656        let data_source = UniversalDataSource::new();
657
658        // Test invalid Azure URL without container
659        let result = data_source.load("azure:///path/to/file.json").await;
660        assert!(result.is_err());
661        if let Err(SofError::InvalidSource(msg)) = result {
662            assert!(msg.contains("missing container name"));
663        }
664
665        // Test invalid Azure URL without path
666        let result = data_source.load("azure://container/").await;
667        assert!(result.is_err());
668        if let Err(SofError::InvalidSource(msg)) = result {
669            assert!(msg.contains("missing blob path"));
670        }
671    }
672
673    #[tokio::test]
674    async fn test_unsupported_protocol() {
675        let data_source = UniversalDataSource::new();
676
677        // Test unsupported protocol
678        let result = data_source.load("ftp://server/file.json").await;
679        assert!(result.is_err());
680        if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
681            assert!(msg.contains("Unsupported source protocol: ftp"));
682            assert!(msg.contains("Supported:"));
683        }
684    }
685}