1use crate::{SofBundle, SofError};
2use async_trait::async_trait;
3use helios_fhir::Element;
4use object_store::{
5 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
6 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
7};
8use reqwest;
9use serde_json;
10use std::sync::Arc;
11use tokio::fs;
12use url::Url;
13
14#[async_trait]
16pub trait DataSource: Send + Sync {
17 async fn load(&self, source: &str) -> Result<SofBundle, SofError>;
19}
20
21pub struct UniversalDataSource {
23 client: reqwest::Client,
24}
25
26impl UniversalDataSource {
27 pub fn new() -> Self {
28 Self {
29 client: reqwest::Client::builder()
30 .timeout(std::time::Duration::from_secs(30))
31 .build()
32 .unwrap_or_else(|_| reqwest::Client::new()),
33 }
34 }
35}
36
37impl Default for UniversalDataSource {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[async_trait]
44impl DataSource for UniversalDataSource {
45 async fn load(&self, source: &str) -> Result<SofBundle, SofError> {
46 let url = Url::parse(source).map_err(|e| {
48 SofError::InvalidSource(format!("Invalid source URL '{}': {}", source, e))
49 })?;
50
51 match url.scheme() {
52 "file" => load_from_file(&url).await,
53 "http" | "https" => load_from_http(&self.client, &url).await,
54 "s3" => load_from_s3(&url).await,
55 "gs" => load_from_gcs(&url).await,
56 "azure" | "abfss" | "abfs" => load_from_azure(&url).await,
57 scheme => Err(SofError::UnsupportedSourceProtocol(format!(
58 "Unsupported source protocol: {}. Supported: file://, http(s)://, s3://, gs://, azure://",
59 scheme
60 ))),
61 }
62 }
63}
64
65async fn load_from_file(url: &Url) -> Result<SofBundle, SofError> {
67 let path = url
69 .to_file_path()
70 .map_err(|_| SofError::InvalidSource(format!("Invalid file URL: {}", url)))?;
71
72 if !path.exists() {
74 return Err(SofError::SourceNotFound(format!(
75 "File not found: {}",
76 path.display()
77 )));
78 }
79
80 let contents = fs::read_to_string(&path)
82 .await
83 .map_err(|e| SofError::SourceReadError(format!("Failed to read file: {}", e)))?;
84
85 parse_fhir_content(&contents, &path.to_string_lossy())
87}
88
89async fn load_from_http(client: &reqwest::Client, url: &Url) -> Result<SofBundle, SofError> {
91 let response = client
93 .get(url.as_str())
94 .header("Accept", "application/fhir+json, application/json")
95 .send()
96 .await
97 .map_err(|e| {
98 SofError::SourceFetchError(format!("Failed to fetch from URL '{}': {}", url, e))
99 })?;
100
101 if !response.status().is_success() {
103 return Err(SofError::SourceFetchError(format!(
104 "HTTP error {} when fetching '{}'",
105 response.status(),
106 url
107 )));
108 }
109
110 let contents = response
112 .text()
113 .await
114 .map_err(|e| SofError::SourceReadError(format!("Failed to read response body: {}", e)))?;
115
116 parse_fhir_content(&contents, url.as_str())
118}
119
120async fn load_from_s3(url: &Url) -> Result<SofBundle, SofError> {
122 let bucket = url.host_str().ok_or_else(|| {
124 SofError::InvalidSource(format!("Invalid S3 URL '{}': missing bucket name", url))
125 })?;
126
127 let path = url.path().trim_start_matches('/');
128 if path.is_empty() {
129 return Err(SofError::InvalidSource(format!(
130 "Invalid S3 URL '{}': missing object path",
131 url
132 )));
133 }
134
135 let store = AmazonS3Builder::new()
137 .with_bucket_name(bucket)
138 .build()
139 .map_err(|e| {
140 SofError::SourceFetchError(format!(
141 "Failed to create S3 client for '{}': {}. Ensure AWS credentials are configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)",
142 url, e
143 ))
144 })?;
145
146 load_from_object_store(Arc::new(store), path, url.as_str()).await
147}
148
149async fn load_from_gcs(url: &Url) -> Result<SofBundle, SofError> {
151 let bucket = url.host_str().ok_or_else(|| {
153 SofError::InvalidSource(format!("Invalid GCS URL '{}': missing bucket name", url))
154 })?;
155
156 let path = url.path().trim_start_matches('/');
157 if path.is_empty() {
158 return Err(SofError::InvalidSource(format!(
159 "Invalid GCS URL '{}': missing object path",
160 url
161 )));
162 }
163
164 let store = GoogleCloudStorageBuilder::new()
166 .with_bucket_name(bucket)
167 .build()
168 .map_err(|e| {
169 SofError::SourceFetchError(format!(
170 "Failed to create GCS client for '{}': {}. Ensure GCP credentials are configured (GOOGLE_SERVICE_ACCOUNT or Application Default Credentials)",
171 url, e
172 ))
173 })?;
174
175 load_from_object_store(Arc::new(store), path, url.as_str()).await
176}
177
178async fn load_from_azure(url: &Url) -> Result<SofBundle, SofError> {
180 let (container, path) = if url.scheme() == "azure" {
182 let container = url.host_str().ok_or_else(|| {
184 SofError::InvalidSource(format!(
185 "Invalid Azure URL '{}': missing container name",
186 url
187 ))
188 })?;
189 let path = url.path().trim_start_matches('/');
190 (container.to_string(), path.to_string())
191 } else {
192 let host = url.host_str().ok_or_else(|| {
194 SofError::InvalidSource(format!("Invalid Azure URL '{}': missing host", url))
195 })?;
196 let parts: Vec<&str> = host.split('@').collect();
197 if parts.len() != 2 {
198 return Err(SofError::InvalidSource(format!(
199 "Invalid Azure URL '{}': expected format abfss://container@account.dfs.core.windows.net/path",
200 url
201 )));
202 }
203 let container = parts[0];
204 let path = url.path().trim_start_matches('/');
205 (container.to_string(), path.to_string())
206 };
207
208 if path.is_empty() {
209 return Err(SofError::InvalidSource(format!(
210 "Invalid Azure URL '{}': missing blob path",
211 url
212 )));
213 }
214
215 let store = MicrosoftAzureBuilder::new()
217 .with_container_name(&container)
218 .build()
219 .map_err(|e| {
220 SofError::SourceFetchError(format!(
221 "Failed to create Azure client for '{}': {}. Ensure Azure credentials are configured (AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or managed identity)",
222 url, e
223 ))
224 })?;
225
226 load_from_object_store(Arc::new(store), &path, url.as_str()).await
227}
228
229async fn load_from_object_store(
231 store: Arc<dyn ObjectStore>,
232 path: &str,
233 source_name: &str,
234) -> Result<SofBundle, SofError> {
235 let object_path = ObjectPath::from(path);
237
238 let result = store.get(&object_path).await.map_err(|e| match e {
240 object_store::Error::NotFound { .. } => {
241 SofError::SourceNotFound(format!("Object not found at '{}'", source_name))
242 }
243 _ => SofError::SourceFetchError(format!("Failed to fetch from '{}': {}", source_name, e)),
244 })?;
245
246 let bytes = result
248 .bytes()
249 .await
250 .map_err(|e| SofError::SourceReadError(format!("Failed to read object data: {}", e)))?;
251
252 let contents = String::from_utf8(bytes.to_vec()).map_err(|e| {
254 SofError::InvalidSourceContent(format!(
255 "Content from '{}' is not valid UTF-8: {}",
256 source_name, e
257 ))
258 })?;
259
260 parse_fhir_content(&contents, source_name)
262}
263
264fn parse_fhir_content(contents: &str, source_name: &str) -> Result<SofBundle, SofError> {
266 let value: serde_json::Value = serde_json::from_str(contents).map_err(|e| {
268 SofError::InvalidSourceContent(format!(
269 "Failed to parse JSON from '{}': {}",
270 source_name, e
271 ))
272 })?;
273
274 if let Some(resource_type) = value.get("resourceType").and_then(|v| v.as_str()) {
276 if resource_type == "Bundle" {
277 #[cfg(feature = "R4")]
279 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4::Bundle>(value.clone()) {
280 return Ok(SofBundle::R4(bundle));
281 }
282 #[cfg(feature = "R4B")]
283 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r4b::Bundle>(value.clone()) {
284 return Ok(SofBundle::R4B(bundle));
285 }
286 #[cfg(feature = "R5")]
287 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r5::Bundle>(value.clone()) {
288 return Ok(SofBundle::R5(bundle));
289 }
290 #[cfg(feature = "R6")]
291 if let Ok(bundle) = serde_json::from_value::<helios_fhir::r6::Bundle>(value.clone()) {
292 return Ok(SofBundle::R6(bundle));
293 }
294 return Err(SofError::InvalidSourceContent(format!(
295 "Bundle from '{}' could not be parsed as any supported FHIR version",
296 source_name
297 )));
298 }
299
300 return wrap_resource_in_bundle(value, source_name);
302 }
303
304 if value.is_array() {
306 return wrap_resources_in_bundle(value, source_name);
307 }
308
309 Err(SofError::InvalidSourceContent(format!(
310 "Content from '{}' is not a valid FHIR resource or Bundle",
311 source_name
312 )))
313}
314
315fn wrap_resource_in_bundle(
317 resource: serde_json::Value,
318 source_name: &str,
319) -> Result<SofBundle, SofError> {
320 #[cfg(feature = "R4")]
323 if let Ok(res) = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone()) {
324 let mut bundle = helios_fhir::r4::Bundle::default();
325 bundle.r#type = Element {
326 id: None,
327 extension: None,
328 value: Some("collection".to_string()),
329 };
330 bundle.entry = Some(vec![helios_fhir::r4::BundleEntry {
331 resource: Some(res),
332 ..Default::default()
333 }]);
334 return Ok(SofBundle::R4(bundle));
335 }
336
337 #[cfg(feature = "R4B")]
339 if let Ok(res) = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone()) {
340 let mut bundle = helios_fhir::r4b::Bundle::default();
341 bundle.r#type = Element {
342 id: None,
343 extension: None,
344 value: Some("collection".to_string()),
345 };
346 bundle.entry = Some(vec![helios_fhir::r4b::BundleEntry {
347 resource: Some(res),
348 ..Default::default()
349 }]);
350 return Ok(SofBundle::R4B(bundle));
351 }
352
353 #[cfg(feature = "R5")]
355 if let Ok(res) = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone()) {
356 let mut bundle = helios_fhir::r5::Bundle::default();
357 bundle.r#type = Element {
358 id: None,
359 extension: None,
360 value: Some("collection".to_string()),
361 };
362 bundle.entry = Some(vec![helios_fhir::r5::BundleEntry {
363 resource: Some(Box::new(res)),
364 ..Default::default()
365 }]);
366 return Ok(SofBundle::R5(bundle));
367 }
368
369 #[cfg(feature = "R6")]
371 if let Ok(res) = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone()) {
372 let mut bundle = helios_fhir::r6::Bundle::default();
373 bundle.r#type = Element {
374 id: None,
375 extension: None,
376 value: Some("collection".to_string()),
377 };
378 bundle.entry = Some(vec![helios_fhir::r6::BundleEntry {
379 resource: Some(Box::new(res)),
380 ..Default::default()
381 }]);
382 return Ok(SofBundle::R6(bundle));
383 }
384
385 Err(SofError::InvalidSourceContent(format!(
386 "Resource from '{}' could not be parsed as any supported FHIR version",
387 source_name
388 )))
389}
390
391fn wrap_resources_in_bundle(
393 resources: serde_json::Value,
394 source_name: &str,
395) -> Result<SofBundle, SofError> {
396 let arr = resources
397 .as_array()
398 .ok_or_else(|| SofError::InvalidSourceContent("Expected array of resources".to_string()))?;
399
400 if arr.is_empty() {
401 return Err(SofError::InvalidSourceContent(format!(
402 "Empty array of resources from '{}'",
403 source_name
404 )));
405 }
406
407 let first = &arr[0];
409
410 #[cfg(feature = "R4")]
412 if serde_json::from_value::<helios_fhir::r4::Resource>(first.clone()).is_ok() {
413 let mut bundle = helios_fhir::r4::Bundle::default();
414 bundle.r#type = Element {
415 id: None,
416 extension: None,
417 value: Some("collection".to_string()),
418 };
419 let mut entries = Vec::new();
420
421 for resource in arr {
422 let res = serde_json::from_value::<helios_fhir::r4::Resource>(resource.clone())
423 .map_err(|e| {
424 SofError::InvalidSourceContent(format!(
425 "Failed to parse R4 resource from '{}': {}",
426 source_name, e
427 ))
428 })?;
429 entries.push(helios_fhir::r4::BundleEntry {
430 resource: Some(res),
431 ..Default::default()
432 });
433 }
434
435 bundle.entry = Some(entries);
436 return Ok(SofBundle::R4(bundle));
437 }
438
439 #[cfg(feature = "R4B")]
441 if serde_json::from_value::<helios_fhir::r4b::Resource>(first.clone()).is_ok() {
442 let mut bundle = helios_fhir::r4b::Bundle::default();
443 bundle.r#type = Element {
444 id: None,
445 extension: None,
446 value: Some("collection".to_string()),
447 };
448 let mut entries = Vec::new();
449
450 for resource in arr {
451 let res = serde_json::from_value::<helios_fhir::r4b::Resource>(resource.clone())
452 .map_err(|e| {
453 SofError::InvalidSourceContent(format!(
454 "Failed to parse R4B resource from '{}': {}",
455 source_name, e
456 ))
457 })?;
458 entries.push(helios_fhir::r4b::BundleEntry {
459 resource: Some(res),
460 ..Default::default()
461 });
462 }
463
464 bundle.entry = Some(entries);
465 return Ok(SofBundle::R4B(bundle));
466 }
467
468 #[cfg(feature = "R5")]
470 if serde_json::from_value::<helios_fhir::r5::Resource>(first.clone()).is_ok() {
471 let mut bundle = helios_fhir::r5::Bundle::default();
472 bundle.r#type = Element {
473 id: None,
474 extension: None,
475 value: Some("collection".to_string()),
476 };
477 let mut entries = Vec::new();
478
479 for resource in arr {
480 let res = serde_json::from_value::<helios_fhir::r5::Resource>(resource.clone())
481 .map_err(|e| {
482 SofError::InvalidSourceContent(format!(
483 "Failed to parse R5 resource from '{}': {}",
484 source_name, e
485 ))
486 })?;
487 entries.push(helios_fhir::r5::BundleEntry {
488 resource: Some(Box::new(res)),
489 ..Default::default()
490 });
491 }
492
493 bundle.entry = Some(entries);
494 return Ok(SofBundle::R5(bundle));
495 }
496
497 #[cfg(feature = "R6")]
499 if serde_json::from_value::<helios_fhir::r6::Resource>(first.clone()).is_ok() {
500 let mut bundle = helios_fhir::r6::Bundle::default();
501 bundle.r#type = Element {
502 id: None,
503 extension: None,
504 value: Some("collection".to_string()),
505 };
506 let mut entries = Vec::new();
507
508 for resource in arr {
509 let res = serde_json::from_value::<helios_fhir::r6::Resource>(resource.clone())
510 .map_err(|e| {
511 SofError::InvalidSourceContent(format!(
512 "Failed to parse R6 resource from '{}': {}",
513 source_name, e
514 ))
515 })?;
516 entries.push(helios_fhir::r6::BundleEntry {
517 resource: Some(Box::new(res)),
518 ..Default::default()
519 });
520 }
521
522 bundle.entry = Some(entries);
523 return Ok(SofBundle::R6(bundle));
524 }
525
526 Err(SofError::InvalidSourceContent(format!(
527 "Resources from '{}' could not be parsed as any supported FHIR version",
528 source_name
529 )))
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535
536 #[tokio::test]
537 async fn test_parse_fhir_bundle() {
538 let bundle_json = r#"{
539 "resourceType": "Bundle",
540 "type": "collection",
541 "entry": [{
542 "resource": {
543 "resourceType": "Patient",
544 "id": "123"
545 }
546 }]
547 }"#;
548
549 let result = parse_fhir_content(bundle_json, "test").unwrap();
550 #[cfg(feature = "R4")]
551 assert!(matches!(result, SofBundle::R4(_)));
552 #[cfg(not(feature = "R4"))]
553 assert!(matches!(result, _));
554 }
555
556 #[tokio::test]
557 async fn test_parse_single_resource() {
558 let patient_json = r#"{
559 "resourceType": "Patient",
560 "id": "123"
561 }"#;
562
563 let result = parse_fhir_content(patient_json, "test").unwrap();
564 #[cfg(feature = "R4")]
565 match result {
566 SofBundle::R4(bundle) => {
567 assert_eq!(bundle.entry.as_ref().unwrap().len(), 1);
568 }
569 #[cfg(feature = "R4B")]
570 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
571 #[cfg(feature = "R5")]
572 SofBundle::R5(_) => panic!("Expected R4 bundle"),
573 #[cfg(feature = "R6")]
574 SofBundle::R6(_) => panic!("Expected R4 bundle"),
575 }
576 }
577
578 #[tokio::test]
579 async fn test_parse_resource_array() {
580 let resources_json = r#"[
581 {
582 "resourceType": "Patient",
583 "id": "123"
584 },
585 {
586 "resourceType": "Patient",
587 "id": "456"
588 }
589 ]"#;
590
591 let result = parse_fhir_content(resources_json, "test").unwrap();
592 #[cfg(feature = "R4")]
593 match result {
594 SofBundle::R4(bundle) => {
595 assert_eq!(bundle.entry.as_ref().unwrap().len(), 2);
596 }
597 #[cfg(feature = "R4B")]
598 SofBundle::R4B(_) => panic!("Expected R4 bundle"),
599 #[cfg(feature = "R5")]
600 SofBundle::R5(_) => panic!("Expected R4 bundle"),
601 #[cfg(feature = "R6")]
602 SofBundle::R6(_) => panic!("Expected R4 bundle"),
603 }
604 }
605
606 #[tokio::test]
607 async fn test_invalid_content() {
608 let invalid_json = r#"{"not": "fhir"}"#;
609 let result = parse_fhir_content(invalid_json, "test");
610 assert!(result.is_err());
611 }
612
613 #[tokio::test]
614 async fn test_s3_url_parsing() {
615 let data_source = UniversalDataSource::new();
616
617 let result = data_source.load("s3:///path/to/file.json").await;
619 assert!(result.is_err());
620 if let Err(SofError::InvalidSource(msg)) = result {
621 assert!(msg.contains("missing bucket name"));
622 }
623
624 let result = data_source.load("s3://bucket/").await;
626 assert!(result.is_err());
627 if let Err(SofError::InvalidSource(msg)) = result {
628 assert!(msg.contains("missing object path"));
629 }
630
631 }
634
635 #[tokio::test]
636 async fn test_gcs_url_parsing() {
637 let data_source = UniversalDataSource::new();
638
639 let result = data_source.load("gs:///path/to/file.json").await;
641 assert!(result.is_err());
642 if let Err(SofError::InvalidSource(msg)) = result {
643 assert!(msg.contains("missing bucket name"));
644 }
645
646 let result = data_source.load("gs://bucket/").await;
648 assert!(result.is_err());
649 if let Err(SofError::InvalidSource(msg)) = result {
650 assert!(msg.contains("missing object path"));
651 }
652 }
653
654 #[tokio::test]
655 async fn test_azure_url_parsing() {
656 let data_source = UniversalDataSource::new();
657
658 let result = data_source.load("azure:///path/to/file.json").await;
660 assert!(result.is_err());
661 if let Err(SofError::InvalidSource(msg)) = result {
662 assert!(msg.contains("missing container name"));
663 }
664
665 let result = data_source.load("azure://container/").await;
667 assert!(result.is_err());
668 if let Err(SofError::InvalidSource(msg)) = result {
669 assert!(msg.contains("missing blob path"));
670 }
671 }
672
673 #[tokio::test]
674 async fn test_unsupported_protocol() {
675 let data_source = UniversalDataSource::new();
676
677 let result = data_source.load("ftp://server/file.json").await;
679 assert!(result.is_err());
680 if let Err(SofError::UnsupportedSourceProtocol(msg)) = result {
681 assert!(msg.contains("Unsupported source protocol: ftp"));
682 assert!(msg.contains("Supported:"));
683 }
684 }
685}