1use std::collections::HashMap;
37use std::sync::Arc;
38
39use async_trait::async_trait;
40use helios_fhir::FhirVersion;
41use parking_lot::RwLock;
42use serde_json::Value;
43use tracing::{debug, instrument, warn};
44
45use crate::core::history::HistoryParams;
46use crate::core::{
47 BundleEntry, BundleProvider, BundleResult, CapabilityProvider, ChainedSearchProvider,
48 ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
49 ConditionalUpdateResult, IncludeProvider, InstanceHistoryProvider, PatchFormat,
50 ResourceStorage, RevincludeProvider, SearchProvider, SearchResult, StorageCapabilities,
51 TerminologySearchProvider, TextSearchProvider, VersionedStorage,
52};
53use crate::error::{BackendError, StorageError, StorageResult, TransactionError};
54use crate::tenant::TenantContext;
55use crate::types::{
56 IncludeDirective, Pagination, ReverseChainedParameter, SearchQuery, StoredResource,
57};
58
59use super::config::CompositeConfig;
60use super::merger::{MergeOptions, ResultMerger};
61use super::router::{QueryRouter, RoutingDecision, RoutingError};
62use super::sync::{SyncEvent, SyncManager};
63
64pub type DynStorage = Arc<dyn ResourceStorage + Send + Sync>;
66
67pub type DynSearchProvider = Arc<dyn SearchProvider + Send + Sync>;
69
70pub type DynConditionalStorage = Arc<dyn ConditionalStorage + Send + Sync>;
72
73pub type DynVersionedStorage = Arc<dyn VersionedStorage + Send + Sync>;
75
76pub type DynInstanceHistoryProvider = Arc<dyn InstanceHistoryProvider + Send + Sync>;
78
79pub type DynBundleProvider = Arc<dyn BundleProvider + Send + Sync>;
81
82pub struct CompositeStorage {
93 config: CompositeConfig,
95
96 primary: DynStorage,
98
99 secondaries: HashMap<String, DynStorage>,
101
102 search_providers: HashMap<String, DynSearchProvider>,
104
105 router: QueryRouter,
107
108 merger: ResultMerger,
110
111 sync_manager: Option<SyncManager>,
113
114 health_status: Arc<RwLock<HashMap<String, BackendHealth>>>,
116
117 conditional_storage: Option<DynConditionalStorage>,
121
122 versioned_storage: Option<DynVersionedStorage>,
124
125 history_provider: Option<DynInstanceHistoryProvider>,
127
128 bundle_provider: Option<DynBundleProvider>,
130}
131
132#[derive(Debug, Clone)]
134pub struct BackendHealth {
135 pub healthy: bool,
137
138 pub last_success: Option<std::time::Instant>,
140
141 pub failure_count: u32,
143
144 pub last_error: Option<String>,
146}
147
148impl Default for BackendHealth {
149 fn default() -> Self {
150 Self {
151 healthy: true,
152 last_success: None,
153 failure_count: 0,
154 last_error: None,
155 }
156 }
157}
158
159impl CompositeStorage {
160 pub fn new(
171 config: CompositeConfig,
172 backends: HashMap<String, DynStorage>,
173 ) -> StorageResult<Self> {
174 let primary_id = config.primary_id().ok_or_else(|| {
175 StorageError::Backend(BackendError::Unavailable {
176 backend_name: "primary".to_string(),
177 message: "No primary backend configured".to_string(),
178 })
179 })?;
180
181 let primary = backends.get(primary_id).cloned().ok_or_else(|| {
182 StorageError::Backend(BackendError::Unavailable {
183 backend_name: primary_id.to_string(),
184 message: format!("Primary backend '{}' not found in backends map", primary_id),
185 })
186 })?;
187
188 let secondaries: HashMap<_, _> = backends
190 .iter()
191 .filter(|(id, _)| *id != primary_id)
192 .map(|(id, backend)| (id.clone(), backend.clone()))
193 .collect();
194
195 let mut health_status = HashMap::new();
197 health_status.insert(primary_id.to_string(), BackendHealth::default());
198 for id in secondaries.keys() {
199 health_status.insert(id.clone(), BackendHealth::default());
200 }
201
202 let router = QueryRouter::new(config.clone());
203 let merger = ResultMerger::new();
204
205 let sync_manager = if !secondaries.is_empty() {
207 Some(SyncManager::new(config.sync_config.clone()))
208 } else {
209 None
210 };
211
212 Ok(Self {
213 config,
214 primary,
215 secondaries,
216 search_providers: HashMap::new(),
217 router,
218 merger,
219 sync_manager,
220 health_status: Arc::new(RwLock::new(health_status)),
221 conditional_storage: None,
222 versioned_storage: None,
223 history_provider: None,
224 bundle_provider: None,
225 })
226 }
227
228 pub fn with_search_providers(mut self, providers: HashMap<String, DynSearchProvider>) -> Self {
232 self.search_providers = providers;
233 self
234 }
235
236 pub fn with_full_primary<T>(mut self, primary: Arc<T>) -> Self
251 where
252 T: ResourceStorage
253 + ConditionalStorage
254 + VersionedStorage
255 + InstanceHistoryProvider
256 + BundleProvider
257 + Send
258 + Sync
259 + 'static,
260 {
261 self.conditional_storage = Some(primary.clone() as DynConditionalStorage);
262 self.versioned_storage = Some(primary.clone() as DynVersionedStorage);
263 self.history_provider = Some(primary.clone() as DynInstanceHistoryProvider);
264 self.bundle_provider = Some(primary as DynBundleProvider);
265 self
266 }
267
268 pub fn config(&self) -> &CompositeConfig {
270 &self.config
271 }
272
273 pub fn primary(&self) -> &DynStorage {
275 &self.primary
276 }
277
278 pub fn secondary(&self, id: &str) -> Option<&DynStorage> {
280 self.secondaries.get(id)
281 }
282
283 pub fn secondaries(&self) -> &HashMap<String, DynStorage> {
285 &self.secondaries
286 }
287
288 pub fn backend_health(&self, id: &str) -> Option<BackendHealth> {
290 self.health_status.read().get(id).cloned()
291 }
292
293 pub fn is_backend_healthy(&self, id: &str) -> bool {
295 self.health_status
296 .read()
297 .get(id)
298 .map(|h| h.healthy)
299 .unwrap_or(false)
300 }
301
302 fn update_health(&self, backend_id: &str, success: bool, error: Option<String>) {
304 let mut status = self.health_status.write();
305 if let Some(health) = status.get_mut(backend_id) {
306 if success {
307 health.healthy = true;
308 health.last_success = Some(std::time::Instant::now());
309 health.failure_count = 0;
310 health.last_error = None;
311 } else {
312 health.failure_count += 1;
313 health.last_error = error;
314
315 if health.failure_count >= self.config.health_config.failure_threshold {
317 health.healthy = false;
318 warn!(
319 backend_id = backend_id,
320 failures = health.failure_count,
321 "Backend marked unhealthy"
322 );
323 }
324 }
325 }
326 }
327
328 async fn sync_to_secondaries(&self, event: SyncEvent) -> StorageResult<()> {
330 if let Some(ref sync_manager) = self.sync_manager {
331 sync_manager.sync(&event, &self.secondaries).await?;
332 }
333 Ok(())
334 }
335
336 #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
338 async fn execute_routed_search(
339 &self,
340 tenant: &TenantContext,
341 query: &SearchQuery,
342 ) -> StorageResult<SearchResult> {
343 let decision = self
345 .router
346 .route(query)
347 .map_err(|e| self.routing_error_to_storage_error(e))?;
348
349 debug!(
350 primary = %decision.primary_target,
351 auxiliary_count = decision.auxiliary_targets.len(),
352 merge_strategy = ?decision.merge_strategy,
353 "Routing query"
354 );
355
356 if decision.auxiliary_targets.is_empty() {
358 return self.execute_primary_search(tenant, query).await;
359 }
360
361 let (primary_result, auxiliary_results) = self
363 .execute_parallel_search(tenant, query, &decision)
364 .await?;
365
366 let merge_options = MergeOptions {
368 strategy: decision.merge_strategy,
369 preserve_primary_order: true,
370 deduplicate: true,
371 };
372
373 self.merger
374 .merge(primary_result, auxiliary_results, merge_options)
375 }
376
377 async fn execute_primary_search(
384 &self,
385 tenant: &TenantContext,
386 query: &SearchQuery,
387 ) -> StorageResult<SearchResult> {
388 if let Some(search_backend) = self
391 .config
392 .backends_with_role(super::config::BackendRole::Search)
393 .next()
394 {
395 if let Some(provider) = self.search_providers.get(&search_backend.id) {
396 let result = provider.search(tenant, query).await;
397 self.update_health(
398 &search_backend.id,
399 result.is_ok(),
400 result.as_ref().err().map(|e| e.to_string()),
401 );
402 return result;
403 }
404 }
405
406 let primary_id = self.config.primary_id().unwrap_or("primary");
408
409 if let Some(provider) = self.search_providers.get(primary_id) {
410 let result = provider.search(tenant, query).await;
411 self.update_health(
412 primary_id,
413 result.is_ok(),
414 result.as_ref().err().map(|e| e.to_string()),
415 );
416 result
417 } else {
418 Err(StorageError::Backend(BackendError::UnsupportedCapability {
419 backend_name: primary_id.to_string(),
420 capability: "SearchProvider".to_string(),
421 }))
422 }
423 }
424
425 async fn execute_parallel_search(
427 &self,
428 tenant: &TenantContext,
429 query: &SearchQuery,
430 decision: &RoutingDecision,
431 ) -> StorageResult<(SearchResult, Vec<(String, SearchResult)>)> {
432 use tokio::task::JoinSet;
433
434 let mut tasks: JoinSet<(String, StorageResult<SearchResult>)> = JoinSet::new();
435
436 let tenant = tenant.clone();
438 let query = query.clone();
439 let primary_id = decision.primary_target.clone();
440
441 if let Some(provider) = self.search_providers.get(&primary_id).cloned() {
443 let t = tenant.clone();
444 let q = query.clone();
445 let id = primary_id.clone();
446 tasks.spawn(async move {
447 let result = provider.search(&t, &q).await;
448 (id, result)
449 });
450 }
451
452 for (feature, backend_id) in &decision.auxiliary_targets {
454 if let Some(provider) = self.search_providers.get(backend_id).cloned() {
455 let part_params = decision
457 .analysis
458 .feature_params
459 .get(feature)
460 .cloned()
461 .unwrap_or_default();
462
463 let mut aux_query = SearchQuery::new(&query.resource_type);
464 for param in part_params {
465 aux_query = aux_query.with_parameter(param);
466 }
467 aux_query.count = query.count;
468 aux_query.offset = query.offset;
469 aux_query.cursor = query.cursor.clone();
470
471 let t = tenant.clone();
472 let id = backend_id.clone();
473 tasks.spawn(async move {
474 let result = provider.search(&t, &aux_query).await;
475 (id, result)
476 });
477 }
478 }
479
480 let mut primary_result = None;
482 let mut auxiliary_results = Vec::new();
483
484 while let Some(result) = tasks.join_next().await {
485 match result {
486 Ok((id, search_result)) => {
487 self.update_health(
488 &id,
489 search_result.is_ok(),
490 search_result.as_ref().err().map(|e| e.to_string()),
491 );
492
493 if id == primary_id {
494 primary_result = Some(search_result?);
495 } else if let Ok(res) = search_result {
496 auxiliary_results.push((id, res));
497 }
498 }
500 Err(e) => {
501 warn!(error = %e, "Task join error during parallel search");
502 }
503 }
504 }
505
506 let primary = primary_result.ok_or_else(|| {
507 StorageError::Backend(BackendError::ConnectionFailed {
508 backend_name: primary_id,
509 message: "Primary search task failed".to_string(),
510 })
511 })?;
512
513 Ok((primary, auxiliary_results))
514 }
515
516 async fn sync_bundle_results(&self, tenant: &TenantContext, result: &BundleResult) {
518 for entry_result in &result.entries {
519 if let Some(ref resource_json) = entry_result.resource {
521 let resource_type = resource_json
522 .get("resourceType")
523 .and_then(|v| v.as_str())
524 .unwrap_or_default();
525 let resource_id = resource_json
526 .get("id")
527 .and_then(|v| v.as_str())
528 .unwrap_or_default();
529
530 if resource_type.is_empty() || resource_id.is_empty() {
531 continue;
532 }
533
534 let fhir_version = resource_json
535 .get("meta")
536 .and_then(|m| m.get("profile"))
537 .map(|_| FhirVersion::default())
538 .unwrap_or_default();
539
540 if let Err(e) = self
541 .sync_to_secondaries(SyncEvent::Create {
542 resource_type: resource_type.to_string(),
543 resource_id: resource_id.to_string(),
544 content: resource_json.clone(),
545 tenant_id: tenant.tenant_id().clone(),
546 fhir_version,
547 })
548 .await
549 {
550 warn!(
551 error = %e,
552 resource_type = resource_type,
553 resource_id = resource_id,
554 "Failed to sync bundle entry to secondaries"
555 );
556 }
557 }
558 }
559 }
560
561 fn routing_error_to_storage_error(&self, err: RoutingError) -> StorageError {
563 match err {
564 RoutingError::NoPrimaryBackend => StorageError::Backend(BackendError::Unavailable {
565 backend_name: "primary".to_string(),
566 message: "No primary backend configured".to_string(),
567 }),
568 RoutingError::NoCapableBackend { feature } => {
569 StorageError::Backend(BackendError::UnsupportedCapability {
570 backend_name: "composite".to_string(),
571 capability: format!("{:?}", feature),
572 })
573 }
574 RoutingError::BackendUnavailable { backend_id } => {
575 StorageError::Backend(BackendError::ConnectionFailed {
576 backend_name: backend_id,
577 message: "Backend unavailable".to_string(),
578 })
579 }
580 }
581 }
582}
583
584#[async_trait]
585impl ResourceStorage for CompositeStorage {
586 fn backend_name(&self) -> &'static str {
587 "composite"
588 }
589
590 #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type))]
591 async fn create(
592 &self,
593 tenant: &TenantContext,
594 resource_type: &str,
595 resource: Value,
596 fhir_version: FhirVersion,
597 ) -> StorageResult<StoredResource> {
598 let result = self
600 .primary
601 .create(tenant, resource_type, resource.clone(), fhir_version)
602 .await;
603
604 let primary_id = self.config.primary_id().unwrap_or("primary");
605 self.update_health(
606 primary_id,
607 result.is_ok(),
608 result.as_ref().err().map(|e| e.to_string()),
609 );
610
611 let stored = result?;
612
613 if let Err(e) = self
615 .sync_to_secondaries(SyncEvent::Create {
616 resource_type: resource_type.to_string(),
617 resource_id: stored.id().to_string(),
618 content: stored.content().clone(),
619 tenant_id: tenant.tenant_id().clone(),
620 fhir_version,
621 })
622 .await
623 {
624 warn!(error = %e, "Failed to sync create to secondaries");
625 }
627
628 Ok(stored)
629 }
630
631 #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type, id = %id))]
632 async fn create_or_update(
633 &self,
634 tenant: &TenantContext,
635 resource_type: &str,
636 id: &str,
637 resource: Value,
638 fhir_version: FhirVersion,
639 ) -> StorageResult<(StoredResource, bool)> {
640 let result = self
641 .primary
642 .create_or_update(tenant, resource_type, id, resource.clone(), fhir_version)
643 .await;
644
645 let primary_id = self.config.primary_id().unwrap_or("primary");
646 self.update_health(
647 primary_id,
648 result.is_ok(),
649 result.as_ref().err().map(|e| e.to_string()),
650 );
651
652 let (stored, created) = result?;
653
654 let event = if created {
656 SyncEvent::Create {
657 resource_type: resource_type.to_string(),
658 resource_id: id.to_string(),
659 content: stored.content().clone(),
660 tenant_id: tenant.tenant_id().clone(),
661 fhir_version,
662 }
663 } else {
664 SyncEvent::Update {
665 resource_type: resource_type.to_string(),
666 resource_id: id.to_string(),
667 content: stored.content().clone(),
668 tenant_id: tenant.tenant_id().clone(),
669 version: stored.version_id().to_string(),
670 fhir_version,
671 }
672 };
673
674 if let Err(e) = self.sync_to_secondaries(event).await {
675 warn!(error = %e, "Failed to sync create_or_update to secondaries");
676 }
677
678 Ok((stored, created))
679 }
680
681 #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
682 async fn read(
683 &self,
684 tenant: &TenantContext,
685 resource_type: &str,
686 id: &str,
687 ) -> StorageResult<Option<StoredResource>> {
688 let result = self.primary.read(tenant, resource_type, id).await;
690
691 let primary_id = self.config.primary_id().unwrap_or("primary");
692 self.update_health(
693 primary_id,
694 result.is_ok(),
695 result.as_ref().err().map(|e| e.to_string()),
696 );
697
698 result
699 }
700
701 #[instrument(skip(self, tenant, resource), fields(resource_type = %current.resource_type(), id = %current.id()))]
702 async fn update(
703 &self,
704 tenant: &TenantContext,
705 current: &StoredResource,
706 resource: Value,
707 ) -> StorageResult<StoredResource> {
708 let result = self.primary.update(tenant, current, resource.clone()).await;
709
710 let primary_id = self.config.primary_id().unwrap_or("primary");
711 self.update_health(
712 primary_id,
713 result.is_ok(),
714 result.as_ref().err().map(|e| e.to_string()),
715 );
716
717 let stored = result?;
718
719 if let Err(e) = self
721 .sync_to_secondaries(SyncEvent::Update {
722 resource_type: current.resource_type().to_string(),
723 resource_id: current.id().to_string(),
724 content: stored.content().clone(),
725 tenant_id: tenant.tenant_id().clone(),
726 version: stored.version_id().to_string(),
727 fhir_version: stored.fhir_version(),
728 })
729 .await
730 {
731 warn!(error = %e, "Failed to sync update to secondaries");
732 }
733
734 Ok(stored)
735 }
736
737 #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
738 async fn delete(
739 &self,
740 tenant: &TenantContext,
741 resource_type: &str,
742 id: &str,
743 ) -> StorageResult<()> {
744 let result = self.primary.delete(tenant, resource_type, id).await;
745
746 let primary_id = self.config.primary_id().unwrap_or("primary");
747 self.update_health(
748 primary_id,
749 result.is_ok(),
750 result.as_ref().err().map(|e| e.to_string()),
751 );
752
753 result?;
754
755 if let Err(e) = self
757 .sync_to_secondaries(SyncEvent::Delete {
758 resource_type: resource_type.to_string(),
759 resource_id: id.to_string(),
760 tenant_id: tenant.tenant_id().clone(),
761 })
762 .await
763 {
764 warn!(error = %e, "Failed to sync delete to secondaries");
765 }
766
767 Ok(())
768 }
769
770 async fn count(
771 &self,
772 tenant: &TenantContext,
773 resource_type: Option<&str>,
774 ) -> StorageResult<u64> {
775 self.primary.count(tenant, resource_type).await
776 }
777}
778
779#[async_trait]
780impl SearchProvider for CompositeStorage {
781 #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
782 async fn search(
783 &self,
784 tenant: &TenantContext,
785 query: &SearchQuery,
786 ) -> StorageResult<SearchResult> {
787 self.execute_routed_search(tenant, query).await
788 }
789
790 async fn search_count(
791 &self,
792 tenant: &TenantContext,
793 query: &SearchQuery,
794 ) -> StorageResult<u64> {
795 if let Some(provider) = self
798 .search_providers
799 .get(self.config.primary_id().unwrap_or("primary"))
800 {
801 provider.search_count(tenant, query).await
802 } else {
803 Err(StorageError::Backend(BackendError::UnsupportedCapability {
804 backend_name: "composite".to_string(),
805 capability: "search_count".to_string(),
806 }))
807 }
808 }
809}
810
811#[async_trait]
812impl ConditionalStorage for CompositeStorage {
813 async fn conditional_create(
814 &self,
815 tenant: &TenantContext,
816 resource_type: &str,
817 resource: Value,
818 search_params: &str,
819 fhir_version: FhirVersion,
820 ) -> StorageResult<ConditionalCreateResult> {
821 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
822 StorageError::Backend(BackendError::UnsupportedCapability {
823 backend_name: "composite".to_string(),
824 capability: "ConditionalStorage".to_string(),
825 })
826 })?;
827
828 let result = storage
829 .conditional_create(tenant, resource_type, resource, search_params, fhir_version)
830 .await?;
831
832 if let ConditionalCreateResult::Created(ref stored) = result {
834 if let Err(e) = self
835 .sync_to_secondaries(SyncEvent::Create {
836 resource_type: resource_type.to_string(),
837 resource_id: stored.id().to_string(),
838 content: stored.content().clone(),
839 tenant_id: tenant.tenant_id().clone(),
840 fhir_version,
841 })
842 .await
843 {
844 warn!(error = %e, "Failed to sync conditional_create to secondaries");
845 }
846 }
847
848 Ok(result)
849 }
850
851 async fn conditional_update(
852 &self,
853 tenant: &TenantContext,
854 resource_type: &str,
855 resource: Value,
856 search_params: &str,
857 upsert: bool,
858 fhir_version: FhirVersion,
859 ) -> StorageResult<ConditionalUpdateResult> {
860 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
861 StorageError::Backend(BackendError::UnsupportedCapability {
862 backend_name: "composite".to_string(),
863 capability: "ConditionalStorage".to_string(),
864 })
865 })?;
866
867 let result = storage
868 .conditional_update(
869 tenant,
870 resource_type,
871 resource,
872 search_params,
873 upsert,
874 fhir_version,
875 )
876 .await?;
877
878 match &result {
880 ConditionalUpdateResult::Created(stored) => {
881 if let Err(e) = self
882 .sync_to_secondaries(SyncEvent::Create {
883 resource_type: resource_type.to_string(),
884 resource_id: stored.id().to_string(),
885 content: stored.content().clone(),
886 tenant_id: tenant.tenant_id().clone(),
887 fhir_version,
888 })
889 .await
890 {
891 warn!(error = %e, "Failed to sync conditional_update create to secondaries");
892 }
893 }
894 ConditionalUpdateResult::Updated(stored) => {
895 if let Err(e) = self
896 .sync_to_secondaries(SyncEvent::Update {
897 resource_type: resource_type.to_string(),
898 resource_id: stored.id().to_string(),
899 content: stored.content().clone(),
900 tenant_id: tenant.tenant_id().clone(),
901 version: stored.version_id().to_string(),
902 fhir_version: stored.fhir_version(),
903 })
904 .await
905 {
906 warn!(error = %e, "Failed to sync conditional_update to secondaries");
907 }
908 }
909 _ => {}
910 }
911
912 Ok(result)
913 }
914
915 async fn conditional_delete(
916 &self,
917 tenant: &TenantContext,
918 resource_type: &str,
919 search_params: &str,
920 ) -> StorageResult<ConditionalDeleteResult> {
921 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
922 StorageError::Backend(BackendError::UnsupportedCapability {
923 backend_name: "composite".to_string(),
924 capability: "ConditionalStorage".to_string(),
925 })
926 })?;
927
928 let result = storage
929 .conditional_delete(tenant, resource_type, search_params)
930 .await?;
931
932 Ok(result)
936 }
937
938 async fn conditional_patch(
939 &self,
940 tenant: &TenantContext,
941 resource_type: &str,
942 search_params: &str,
943 patch: &PatchFormat,
944 ) -> StorageResult<ConditionalPatchResult> {
945 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
946 StorageError::Backend(BackendError::UnsupportedCapability {
947 backend_name: "composite".to_string(),
948 capability: "ConditionalStorage".to_string(),
949 })
950 })?;
951
952 let result = storage
953 .conditional_patch(tenant, resource_type, search_params, patch)
954 .await?;
955
956 if let ConditionalPatchResult::Patched(ref stored) = result {
958 if let Err(e) = self
959 .sync_to_secondaries(SyncEvent::Update {
960 resource_type: resource_type.to_string(),
961 resource_id: stored.id().to_string(),
962 content: stored.content().clone(),
963 tenant_id: tenant.tenant_id().clone(),
964 version: stored.version_id().to_string(),
965 fhir_version: stored.fhir_version(),
966 })
967 .await
968 {
969 warn!(error = %e, "Failed to sync conditional_patch to secondaries");
970 }
971 }
972
973 Ok(result)
974 }
975}
976
977#[async_trait]
978impl VersionedStorage for CompositeStorage {
979 async fn vread(
980 &self,
981 tenant: &TenantContext,
982 resource_type: &str,
983 id: &str,
984 version_id: &str,
985 ) -> StorageResult<Option<StoredResource>> {
986 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
987 StorageError::Backend(BackendError::UnsupportedCapability {
988 backend_name: "composite".to_string(),
989 capability: "VersionedStorage".to_string(),
990 })
991 })?;
992
993 storage.vread(tenant, resource_type, id, version_id).await
994 }
995
996 async fn update_with_match(
997 &self,
998 tenant: &TenantContext,
999 resource_type: &str,
1000 id: &str,
1001 expected_version: &str,
1002 resource: Value,
1003 ) -> StorageResult<StoredResource> {
1004 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1005 StorageError::Backend(BackendError::UnsupportedCapability {
1006 backend_name: "composite".to_string(),
1007 capability: "VersionedStorage".to_string(),
1008 })
1009 })?;
1010
1011 let stored = storage
1012 .update_with_match(tenant, resource_type, id, expected_version, resource)
1013 .await?;
1014
1015 if let Err(e) = self
1017 .sync_to_secondaries(SyncEvent::Update {
1018 resource_type: resource_type.to_string(),
1019 resource_id: id.to_string(),
1020 content: stored.content().clone(),
1021 tenant_id: tenant.tenant_id().clone(),
1022 version: stored.version_id().to_string(),
1023 fhir_version: stored.fhir_version(),
1024 })
1025 .await
1026 {
1027 warn!(error = %e, "Failed to sync update_with_match to secondaries");
1028 }
1029
1030 Ok(stored)
1031 }
1032
1033 async fn delete_with_match(
1034 &self,
1035 tenant: &TenantContext,
1036 resource_type: &str,
1037 id: &str,
1038 expected_version: &str,
1039 ) -> StorageResult<()> {
1040 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1041 StorageError::Backend(BackendError::UnsupportedCapability {
1042 backend_name: "composite".to_string(),
1043 capability: "VersionedStorage".to_string(),
1044 })
1045 })?;
1046
1047 storage
1048 .delete_with_match(tenant, resource_type, id, expected_version)
1049 .await?;
1050
1051 if let Err(e) = self
1053 .sync_to_secondaries(SyncEvent::Delete {
1054 resource_type: resource_type.to_string(),
1055 resource_id: id.to_string(),
1056 tenant_id: tenant.tenant_id().clone(),
1057 })
1058 .await
1059 {
1060 warn!(error = %e, "Failed to sync delete_with_match to secondaries");
1061 }
1062
1063 Ok(())
1064 }
1065
1066 async fn list_versions(
1067 &self,
1068 tenant: &TenantContext,
1069 resource_type: &str,
1070 id: &str,
1071 ) -> StorageResult<Vec<String>> {
1072 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1073 StorageError::Backend(BackendError::UnsupportedCapability {
1074 backend_name: "composite".to_string(),
1075 capability: "VersionedStorage".to_string(),
1076 })
1077 })?;
1078
1079 storage.list_versions(tenant, resource_type, id).await
1080 }
1081}
1082
1083#[async_trait]
1084impl InstanceHistoryProvider for CompositeStorage {
1085 async fn history_instance(
1086 &self,
1087 tenant: &TenantContext,
1088 resource_type: &str,
1089 id: &str,
1090 params: &HistoryParams,
1091 ) -> StorageResult<crate::core::HistoryPage> {
1092 let provider = self.history_provider.as_ref().ok_or_else(|| {
1093 StorageError::Backend(BackendError::UnsupportedCapability {
1094 backend_name: "composite".to_string(),
1095 capability: "InstanceHistoryProvider".to_string(),
1096 })
1097 })?;
1098
1099 provider
1100 .history_instance(tenant, resource_type, id, params)
1101 .await
1102 }
1103
1104 async fn history_instance_count(
1105 &self,
1106 tenant: &TenantContext,
1107 resource_type: &str,
1108 id: &str,
1109 ) -> StorageResult<u64> {
1110 let provider = self.history_provider.as_ref().ok_or_else(|| {
1111 StorageError::Backend(BackendError::UnsupportedCapability {
1112 backend_name: "composite".to_string(),
1113 capability: "InstanceHistoryProvider".to_string(),
1114 })
1115 })?;
1116
1117 provider
1118 .history_instance_count(tenant, resource_type, id)
1119 .await
1120 }
1121}
1122
1123#[async_trait]
1124impl BundleProvider for CompositeStorage {
1125 async fn process_transaction(
1126 &self,
1127 tenant: &TenantContext,
1128 entries: Vec<BundleEntry>,
1129 ) -> Result<BundleResult, TransactionError> {
1130 let provider =
1131 self.bundle_provider
1132 .as_ref()
1133 .ok_or_else(|| TransactionError::BundleError {
1134 index: 0,
1135 message: "BundleProvider not available on composite primary".to_string(),
1136 })?;
1137
1138 let result = provider.process_transaction(tenant, entries).await?;
1139
1140 self.sync_bundle_results(tenant, &result).await;
1142
1143 Ok(result)
1144 }
1145
1146 async fn process_batch(
1147 &self,
1148 tenant: &TenantContext,
1149 entries: Vec<BundleEntry>,
1150 ) -> StorageResult<BundleResult> {
1151 let provider = self.bundle_provider.as_ref().ok_or_else(|| {
1152 StorageError::Backend(BackendError::UnsupportedCapability {
1153 backend_name: "composite".to_string(),
1154 capability: "BundleProvider".to_string(),
1155 })
1156 })?;
1157
1158 let result = provider.process_batch(tenant, entries).await?;
1159
1160 self.sync_bundle_results(tenant, &result).await;
1162
1163 Ok(result)
1164 }
1165}
1166
1167#[async_trait]
1168impl IncludeProvider for CompositeStorage {
1169 async fn resolve_includes(
1170 &self,
1171 tenant: &TenantContext,
1172 resources: &[StoredResource],
1173 includes: &[IncludeDirective],
1174 ) -> StorageResult<Vec<StoredResource>> {
1175 let primary_id = self.config.primary_id().unwrap_or("primary");
1177
1178 if let Some(_provider) = self.search_providers.get(primary_id) {
1179 self.resolve_includes_basic(tenant, resources, includes)
1183 .await
1184 } else {
1185 self.resolve_includes_basic(tenant, resources, includes)
1186 .await
1187 }
1188 }
1189}
1190
1191impl CompositeStorage {
1192 async fn resolve_includes_basic(
1194 &self,
1195 tenant: &TenantContext,
1196 resources: &[StoredResource],
1197 includes: &[IncludeDirective],
1198 ) -> StorageResult<Vec<StoredResource>> {
1199 use std::collections::HashSet;
1200
1201 let mut included = Vec::new();
1202 let mut seen_ids = HashSet::new();
1203
1204 for resource in resources {
1205 for include in includes {
1206 let refs = self.extract_references(resource, &include.search_param);
1208
1209 for reference in refs {
1210 if let Some((ref_type, ref_id)) = reference.split_once('/') {
1212 if let Some(ref target) = include.target_type {
1214 if target != ref_type {
1215 continue;
1216 }
1217 }
1218
1219 let key = format!("{}/{}", ref_type, ref_id);
1220 if seen_ids.insert(key) {
1221 if let Ok(Some(included_resource)) =
1222 self.primary.read(tenant, ref_type, ref_id).await
1223 {
1224 included.push(included_resource);
1225 }
1226 }
1227 }
1228 }
1229 }
1230 }
1231
1232 Ok(included)
1233 }
1234
1235 fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec<String> {
1237 let content = resource.content();
1238 let mut refs = Vec::new();
1239
1240 if let Some(value) = content.get(search_param) {
1243 Self::extract_reference_values(value, &mut refs);
1244 }
1245
1246 let field_name = match search_param {
1248 "patient" | "subject" => Some("subject"),
1249 "encounter" => Some("encounter"),
1250 "performer" => Some("performer"),
1251 _ => None,
1252 };
1253
1254 if let Some(field) = field_name {
1255 if let Some(value) = content.get(field) {
1256 Self::extract_reference_values(value, &mut refs);
1257 }
1258 }
1259
1260 refs
1261 }
1262
1263 fn extract_reference_values(value: &Value, refs: &mut Vec<String>) {
1265 match value {
1266 Value::Object(obj) => {
1267 if let Some(Value::String(reference)) = obj.get("reference") {
1268 refs.push(reference.clone());
1269 }
1270 }
1271 Value::Array(arr) => {
1272 for item in arr {
1273 Self::extract_reference_values(item, refs);
1274 }
1275 }
1276 _ => {}
1277 }
1278 }
1279}
1280
1281#[async_trait]
1282impl RevincludeProvider for CompositeStorage {
1283 async fn resolve_revincludes(
1284 &self,
1285 tenant: &TenantContext,
1286 resources: &[StoredResource],
1287 revincludes: &[IncludeDirective],
1288 ) -> StorageResult<Vec<StoredResource>> {
1289 let mut revincluded = Vec::new();
1292
1293 for revinclude in revincludes {
1294 for resource in resources {
1295 let reference = format!("{}/{}", resource.resource_type(), resource.id());
1296
1297 let query = SearchQuery::new(&revinclude.source_type).with_parameter(
1299 crate::types::SearchParameter {
1300 name: revinclude.search_param.clone(),
1301 param_type: crate::types::SearchParamType::Reference,
1302 modifier: None,
1303 values: vec![crate::types::SearchValue::eq(&reference)],
1304 chain: vec![],
1305 components: vec![],
1306 },
1307 );
1308
1309 if let Ok(result) = self.search(tenant, &query).await {
1310 for item in result.resources.items {
1311 revincluded.push(item);
1312 }
1313 }
1314 }
1315 }
1316
1317 let mut seen = std::collections::HashSet::new();
1319 revincluded.retain(|r| seen.insert(format!("{}/{}", r.resource_type(), r.id())));
1320
1321 Ok(revincluded)
1322 }
1323}
1324
1325#[async_trait]
1326impl ChainedSearchProvider for CompositeStorage {
1327 async fn resolve_chain(
1328 &self,
1329 tenant: &TenantContext,
1330 base_type: &str,
1331 chain: &str,
1332 value: &str,
1333 ) -> StorageResult<Vec<String>> {
1334 let graph_backend = self
1336 .config
1337 .backends_with_role(super::config::BackendRole::Graph)
1338 .next();
1339
1340 if let Some(backend) = graph_backend {
1341 if let Some(_provider) = self.search_providers.get(&backend.id) {
1342 }
1345 }
1346
1347 self.resolve_chain_iterative(tenant, base_type, chain, value)
1349 .await
1350 }
1351
1352 async fn resolve_reverse_chain(
1353 &self,
1354 tenant: &TenantContext,
1355 base_type: &str,
1356 reverse_chain: &ReverseChainedParameter,
1357 ) -> StorageResult<Vec<String>> {
1358 let values = match &reverse_chain.value {
1361 Some(v) => vec![v.clone()],
1362 None => vec![],
1363 };
1364 let query = SearchQuery::new(&reverse_chain.source_type).with_parameter(
1365 crate::types::SearchParameter {
1366 name: reverse_chain.search_param.clone(),
1367 param_type: crate::types::SearchParamType::Token,
1368 modifier: None,
1369 values,
1370 chain: vec![],
1371 components: vec![],
1372 },
1373 );
1374
1375 let result = self.search(tenant, &query).await?;
1376
1377 let mut ids = Vec::new();
1379 for resource in result.resources.items {
1380 let refs = self.extract_references(&resource, &reverse_chain.reference_param);
1381 for reference in refs {
1382 if let Some((ref_type, ref_id)) = reference.split_once('/') {
1383 if ref_type == base_type {
1384 ids.push(ref_id.to_string());
1385 }
1386 }
1387 }
1388 }
1389
1390 Ok(ids)
1391 }
1392}
1393
1394impl CompositeStorage {
1395 async fn resolve_chain_iterative(
1397 &self,
1398 _tenant: &TenantContext,
1399 _base_type: &str,
1400 chain: &str,
1401 _value: &str,
1402 ) -> StorageResult<Vec<String>> {
1403 let parts: Vec<&str> = chain.split('.').collect();
1405
1406 if parts.is_empty() {
1407 return Ok(Vec::new());
1408 }
1409
1410 Ok(Vec::new())
1416 }
1417}
1418
1419#[async_trait]
1420impl TerminologySearchProvider for CompositeStorage {
1421 async fn expand_value_set(&self, _value_set_url: &str) -> StorageResult<Vec<(String, String)>> {
1422 let term_backend = self
1424 .config
1425 .backends_with_role(super::config::BackendRole::Terminology)
1426 .next();
1427
1428 if let Some(_backend) = term_backend {
1429 }
1431
1432 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1434 backend_name: "composite".to_string(),
1435 capability: "expand_value_set".to_string(),
1436 }))
1437 }
1438
1439 async fn codes_above(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1440 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1441 backend_name: "composite".to_string(),
1442 capability: "codes_above".to_string(),
1443 }))
1444 }
1445
1446 async fn codes_below(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1447 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1448 backend_name: "composite".to_string(),
1449 capability: "codes_below".to_string(),
1450 }))
1451 }
1452}
1453
1454#[async_trait]
1455impl TextSearchProvider for CompositeStorage {
1456 async fn search_text(
1457 &self,
1458 tenant: &TenantContext,
1459 resource_type: &str,
1460 text: &str,
1461 pagination: &Pagination,
1462 ) -> StorageResult<SearchResult> {
1463 let search_backend = self
1465 .config
1466 .backends_with_role(super::config::BackendRole::Search)
1467 .next();
1468
1469 if let Some(backend) = search_backend {
1470 if let Some(provider) = self.search_providers.get(&backend.id) {
1471 let query = SearchQuery::new(resource_type)
1473 .with_parameter(crate::types::SearchParameter {
1474 name: "_text".to_string(),
1475 param_type: crate::types::SearchParamType::String,
1476 modifier: None,
1477 values: vec![crate::types::SearchValue::string(text)],
1478 chain: vec![],
1479 components: vec![],
1480 })
1481 .with_count(pagination.count);
1482
1483 return provider.search(tenant, &query).await;
1484 }
1485 }
1486
1487 self.execute_primary_search(
1489 tenant,
1490 &SearchQuery::new(resource_type)
1491 .with_parameter(crate::types::SearchParameter {
1492 name: "_text".to_string(),
1493 param_type: crate::types::SearchParamType::String,
1494 modifier: None,
1495 values: vec![crate::types::SearchValue::string(text)],
1496 chain: vec![],
1497 components: vec![],
1498 })
1499 .with_count(pagination.count),
1500 )
1501 .await
1502 }
1503
1504 async fn search_content(
1505 &self,
1506 tenant: &TenantContext,
1507 resource_type: &str,
1508 content: &str,
1509 pagination: &Pagination,
1510 ) -> StorageResult<SearchResult> {
1511 let search_backend = self
1513 .config
1514 .backends_with_role(super::config::BackendRole::Search)
1515 .next();
1516
1517 if let Some(backend) = search_backend {
1518 if let Some(provider) = self.search_providers.get(&backend.id) {
1519 let query = SearchQuery::new(resource_type)
1520 .with_parameter(crate::types::SearchParameter {
1521 name: "_content".to_string(),
1522 param_type: crate::types::SearchParamType::String,
1523 modifier: None,
1524 values: vec![crate::types::SearchValue::string(content)],
1525 chain: vec![],
1526 components: vec![],
1527 })
1528 .with_count(pagination.count);
1529
1530 return provider.search(tenant, &query).await;
1531 }
1532 }
1533
1534 self.execute_primary_search(
1535 tenant,
1536 &SearchQuery::new(resource_type)
1537 .with_parameter(crate::types::SearchParameter {
1538 name: "_content".to_string(),
1539 param_type: crate::types::SearchParamType::String,
1540 modifier: None,
1541 values: vec![crate::types::SearchValue::string(content)],
1542 chain: vec![],
1543 components: vec![],
1544 })
1545 .with_count(pagination.count),
1546 )
1547 .await
1548 }
1549}
1550
1551impl CapabilityProvider for CompositeStorage {
1552 fn capabilities(&self) -> StorageCapabilities {
1553 use std::collections::HashSet;
1554
1555 let resource_caps = HashMap::new();
1557
1558 let mut system_interactions = HashSet::new();
1559 system_interactions.insert(crate::core::SystemInteraction::Transaction);
1560 system_interactions.insert(crate::core::SystemInteraction::Batch);
1561 system_interactions.insert(crate::core::SystemInteraction::SearchSystem);
1562 system_interactions.insert(crate::core::SystemInteraction::HistorySystem);
1563
1564 StorageCapabilities {
1565 backend_name: "composite".to_string(),
1566 backend_version: None,
1567 resources: resource_caps,
1568 system_interactions,
1569 supports_system_history: true,
1570 supports_system_search: true,
1571 supported_sorts: vec!["_lastUpdated".to_string(), "_id".to_string()],
1572 supports_total: true,
1573 max_page_size: Some(1000),
1574 default_page_size: 20,
1575 }
1576 }
1577
1578 }
1580
1581#[cfg(test)]
1582mod tests {
1583 use super::*;
1584 use crate::core::BackendKind;
1585
1586 fn test_config() -> CompositeConfig {
1587 CompositeConfig::builder()
1588 .primary("sqlite", BackendKind::Sqlite)
1589 .search_backend("es", BackendKind::Elasticsearch)
1590 .build()
1591 .unwrap()
1592 }
1593
1594 #[test]
1595 fn test_backend_health_default() {
1596 let health = BackendHealth::default();
1597 assert!(health.healthy);
1598 assert_eq!(health.failure_count, 0);
1599 assert!(health.last_error.is_none());
1600 }
1601
1602 #[test]
1603 fn test_composite_config() {
1604 let config = test_config();
1605 assert_eq!(config.primary_id(), Some("sqlite"));
1606 assert_eq!(config.secondaries().count(), 1);
1607 }
1608}