1use std::collections::HashMap;
37use std::sync::Arc;
38
39use async_trait::async_trait;
40use helios_fhir::FhirVersion;
41use parking_lot::RwLock;
42use serde_json::Value;
43use tracing::{debug, instrument, warn};
44
45use crate::core::history::HistoryParams;
46use crate::core::{
47 BundleEntry, BundleProvider, BundleResult, CapabilityProvider, ChainedSearchProvider,
48 ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
49 ConditionalUpdateResult, ExportDataProvider, ExportRequest, GroupExportProvider,
50 IncludeProvider, InstanceHistoryProvider, NdjsonBatch, PatchFormat, PatientExportProvider,
51 ResourceStorage, RevincludeProvider, SearchProvider, SearchResult, SofRunner,
52 StorageCapabilities, SystemHistoryProvider, TerminologySearchProvider, TextSearchProvider,
53 TypeHistoryProvider, VersionedStorage,
54};
55use crate::error::{BackendError, StorageError, StorageResult, TransactionError};
56use crate::tenant::TenantContext;
57use crate::types::{
58 IncludeDirective, Pagination, ReverseChainedParameter, SearchParamType, SearchParameter,
59 SearchQuery, SearchValue, StoredResource,
60};
61
62use super::config::{CompositeConfig, SyncMode};
63use super::merger::{MergeOptions, ResultMerger};
64use super::router::{QueryRouter, RoutingDecision, RoutingError};
65use super::sync::{SyncEvent, SyncManager};
66
67pub type DynStorage = Arc<dyn ResourceStorage + Send + Sync>;
69
70pub type DynSearchProvider = Arc<dyn SearchProvider + Send + Sync>;
72
73pub type DynConditionalStorage = Arc<dyn ConditionalStorage + Send + Sync>;
75
76pub type DynVersionedStorage = Arc<dyn VersionedStorage + Send + Sync>;
78
79pub type DynInstanceHistoryProvider = Arc<dyn InstanceHistoryProvider + Send + Sync>;
81
82pub type DynSystemHistoryProvider = Arc<dyn SystemHistoryProvider + Send + Sync>;
84
85pub type DynBundleProvider = Arc<dyn BundleProvider + Send + Sync>;
87
88pub type DynGroupExportProvider = Arc<dyn GroupExportProvider + Send + Sync>;
90
91pub struct CompositeStorage {
102 config: CompositeConfig,
104
105 primary: DynStorage,
107
108 secondaries: HashMap<String, DynStorage>,
110
111 search_providers: HashMap<String, DynSearchProvider>,
113
114 router: QueryRouter,
116
117 merger: ResultMerger,
119
120 sync_manager: Option<SyncManager>,
122
123 health_status: Arc<RwLock<HashMap<String, BackendHealth>>>,
125
126 conditional_storage: Option<DynConditionalStorage>,
130
131 versioned_storage: Option<DynVersionedStorage>,
133
134 history_provider: Option<DynInstanceHistoryProvider>,
136
137 system_history_provider: Option<DynSystemHistoryProvider>,
139
140 bundle_provider: Option<DynBundleProvider>,
142
143 export_provider: Option<DynGroupExportProvider>,
145}
146
147#[derive(Debug, Clone)]
149pub struct BackendHealth {
150 pub healthy: bool,
152
153 pub last_success: Option<std::time::Instant>,
155
156 pub failure_count: u32,
158
159 pub last_error: Option<String>,
161}
162
163impl Default for BackendHealth {
164 fn default() -> Self {
165 Self {
166 healthy: true,
167 last_success: None,
168 failure_count: 0,
169 last_error: None,
170 }
171 }
172}
173
174impl CompositeStorage {
175 fn has_dedicated_search_backend(&self) -> bool {
176 self.config
177 .backends_with_role(super::config::BackendRole::Search)
178 .next()
179 .is_some()
180 }
181
182 fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
183 params
184 .split('&')
185 .filter_map(|pair| {
186 let parts: Vec<&str> = pair.splitn(2, '=').collect();
187 if parts.len() == 2 {
188 Some((parts[0].to_string(), parts[1].to_string()))
189 } else {
190 None
191 }
192 })
193 .collect()
194 }
195
196 fn infer_conditional_param_type(name: &str) -> SearchParamType {
197 match name {
198 "_id" => SearchParamType::Token,
199 "_lastUpdated" => SearchParamType::Date,
200 "_tag" | "_profile" | "_security" | "identifier" => SearchParamType::Token,
201 "patient" | "subject" | "encounter" | "performer" | "author" | "requester"
202 | "recorder" | "asserter" | "practitioner" | "organization" | "location" | "device" => {
203 SearchParamType::Reference
204 }
205 _ => SearchParamType::String,
206 }
207 }
208
209 async fn find_conditional_matches(
210 &self,
211 tenant: &TenantContext,
212 resource_type: &str,
213 search_params: &str,
214 ) -> StorageResult<Vec<StoredResource>> {
215 let parsed_params = Self::parse_simple_search_params(search_params);
216 if parsed_params.is_empty() {
217 return Ok(Vec::new());
218 }
219
220 let mut query = SearchQuery::new(resource_type);
221 query.count = Some(1000);
222 for (name, value) in parsed_params {
223 query = query.with_parameter(SearchParameter {
224 name: name.clone(),
225 param_type: Self::infer_conditional_param_type(&name),
226 modifier: None,
227 values: vec![SearchValue::parse(&value)],
228 chain: vec![],
229 components: vec![],
230 });
231 }
232
233 let result = self.search(tenant, &query).await?;
234 Ok(result.resources.items)
235 }
236
237 pub fn new(
248 config: CompositeConfig,
249 backends: HashMap<String, DynStorage>,
250 ) -> StorageResult<Self> {
251 let primary_id = config.primary_id().ok_or_else(|| {
252 StorageError::Backend(BackendError::Unavailable {
253 backend_name: "primary".to_string(),
254 message: "No primary backend configured".to_string(),
255 })
256 })?;
257
258 let primary = backends.get(primary_id).cloned().ok_or_else(|| {
259 StorageError::Backend(BackendError::Unavailable {
260 backend_name: primary_id.to_string(),
261 message: format!("Primary backend '{}' not found in backends map", primary_id),
262 })
263 })?;
264
265 let secondaries: HashMap<_, _> = backends
267 .iter()
268 .filter(|(id, _)| *id != primary_id)
269 .map(|(id, backend)| (id.clone(), backend.clone()))
270 .collect();
271
272 let mut health_status = HashMap::new();
274 health_status.insert(primary_id.to_string(), BackendHealth::default());
275 for id in secondaries.keys() {
276 health_status.insert(id.clone(), BackendHealth::default());
277 }
278
279 let router = QueryRouter::new(config.clone());
280 let merger = ResultMerger::new();
281
282 let sync_manager = if !secondaries.is_empty() {
284 Some(SyncManager::new(config.sync_config.clone()))
285 } else {
286 None
287 };
288
289 Ok(Self {
290 config,
291 primary,
292 secondaries,
293 search_providers: HashMap::new(),
294 router,
295 merger,
296 sync_manager,
297 health_status: Arc::new(RwLock::new(health_status)),
298 conditional_storage: None,
299 versioned_storage: None,
300 history_provider: None,
301 system_history_provider: None,
302 bundle_provider: None,
303 export_provider: None,
304 })
305 }
306
307 pub fn with_search_providers(mut self, providers: HashMap<String, DynSearchProvider>) -> Self {
311 self.search_providers = providers;
312 self
313 }
314
315 pub fn start_sync_workers(mut self) -> Self {
323 if let Some(ref mut manager) = self.sync_manager {
324 if matches!(
325 self.config.sync_config.mode,
326 SyncMode::Asynchronous | SyncMode::Hybrid { .. }
327 ) {
328 manager.start_async_worker(self.secondaries.clone());
329 }
330 }
331 self
332 }
333
334 pub fn with_full_primary<T>(mut self, primary: Arc<T>) -> Self
349 where
350 T: ResourceStorage
351 + ConditionalStorage
352 + VersionedStorage
353 + InstanceHistoryProvider
354 + TypeHistoryProvider
355 + SystemHistoryProvider
356 + BundleProvider
357 + GroupExportProvider
358 + Send
359 + Sync
360 + 'static,
361 {
362 self.conditional_storage = Some(primary.clone() as DynConditionalStorage);
363 self.versioned_storage = Some(primary.clone() as DynVersionedStorage);
364 self.history_provider = Some(primary.clone() as DynInstanceHistoryProvider);
365 self.system_history_provider = Some(primary.clone() as DynSystemHistoryProvider);
366 self.bundle_provider = Some(primary.clone() as DynBundleProvider);
367 self.export_provider = Some(primary as DynGroupExportProvider);
368 self
369 }
370
371 pub fn config(&self) -> &CompositeConfig {
373 &self.config
374 }
375
376 pub fn primary(&self) -> &DynStorage {
378 &self.primary
379 }
380
381 pub fn secondary(&self, id: &str) -> Option<&DynStorage> {
383 self.secondaries.get(id)
384 }
385
386 pub fn secondaries(&self) -> &HashMap<String, DynStorage> {
388 &self.secondaries
389 }
390
391 pub fn backend_health(&self, id: &str) -> Option<BackendHealth> {
393 self.health_status.read().get(id).cloned()
394 }
395
396 pub fn is_backend_healthy(&self, id: &str) -> bool {
398 self.health_status
399 .read()
400 .get(id)
401 .map(|h| h.healthy)
402 .unwrap_or(false)
403 }
404
405 fn update_health(&self, backend_id: &str, success: bool, error: Option<String>) {
407 let mut status = self.health_status.write();
408 if let Some(health) = status.get_mut(backend_id) {
409 if success {
410 health.healthy = true;
411 health.last_success = Some(std::time::Instant::now());
412 health.failure_count = 0;
413 health.last_error = None;
414 } else {
415 health.failure_count += 1;
416 health.last_error = error;
417
418 if health.failure_count >= self.config.health_config.failure_threshold {
420 health.healthy = false;
421 warn!(
422 backend_id = backend_id,
423 failures = health.failure_count,
424 "Backend marked unhealthy"
425 );
426 }
427 }
428 }
429 }
430
431 async fn sync_to_secondaries(&self, event: SyncEvent) -> StorageResult<()> {
433 if let Some(ref sync_manager) = self.sync_manager {
434 sync_manager.sync(&event, &self.secondaries).await?;
435 }
436 Ok(())
437 }
438
439 #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
441 async fn execute_routed_search(
442 &self,
443 tenant: &TenantContext,
444 query: &SearchQuery,
445 ) -> StorageResult<SearchResult> {
446 let decision = self
448 .router
449 .route(query)
450 .map_err(|e| self.routing_error_to_storage_error(e))?;
451
452 debug!(
453 primary = %decision.primary_target,
454 auxiliary_count = decision.auxiliary_targets.len(),
455 merge_strategy = ?decision.merge_strategy,
456 "Routing query"
457 );
458
459 if decision.auxiliary_targets.is_empty() {
461 return self.execute_primary_search(tenant, query).await;
462 }
463
464 let (primary_result, auxiliary_results) = self
466 .execute_parallel_search(tenant, query, &decision)
467 .await?;
468
469 let merge_options = MergeOptions {
471 strategy: decision.merge_strategy,
472 preserve_primary_order: true,
473 deduplicate: true,
474 };
475
476 self.merger
477 .merge(primary_result, auxiliary_results, merge_options)
478 }
479
480 async fn execute_primary_search(
487 &self,
488 tenant: &TenantContext,
489 query: &SearchQuery,
490 ) -> StorageResult<SearchResult> {
491 if let Some(search_backend) = self
494 .config
495 .backends_with_role(super::config::BackendRole::Search)
496 .next()
497 {
498 if let Some(provider) = self.search_providers.get(&search_backend.id) {
499 let result = provider.search(tenant, query).await;
500 self.update_health(
501 &search_backend.id,
502 result.is_ok(),
503 result.as_ref().err().map(|e| e.to_string()),
504 );
505 return result;
506 }
507 }
508
509 let primary_id = self.config.primary_id().unwrap_or("primary");
511
512 if let Some(provider) = self.search_providers.get(primary_id) {
513 let result = provider.search(tenant, query).await;
514 self.update_health(
515 primary_id,
516 result.is_ok(),
517 result.as_ref().err().map(|e| e.to_string()),
518 );
519 result
520 } else {
521 Err(StorageError::Backend(BackendError::UnsupportedCapability {
522 backend_name: primary_id.to_string(),
523 capability: "SearchProvider".to_string(),
524 }))
525 }
526 }
527
528 async fn execute_parallel_search(
530 &self,
531 tenant: &TenantContext,
532 query: &SearchQuery,
533 decision: &RoutingDecision,
534 ) -> StorageResult<(SearchResult, Vec<(String, SearchResult)>)> {
535 use tokio::task::JoinSet;
536
537 let mut tasks: JoinSet<(String, StorageResult<SearchResult>)> = JoinSet::new();
538
539 let tenant = tenant.clone();
541 let query = query.clone();
542 let primary_id = decision.primary_target.clone();
543
544 if let Some(provider) = self.search_providers.get(&primary_id).cloned() {
546 let t = tenant.clone();
547 let q = query.clone();
548 let id = primary_id.clone();
549 tasks.spawn(async move {
550 let result = provider.search(&t, &q).await;
551 (id, result)
552 });
553 }
554
555 for (feature, backend_id) in &decision.auxiliary_targets {
557 if let Some(provider) = self.search_providers.get(backend_id).cloned() {
558 let part_params = decision
560 .analysis
561 .feature_params
562 .get(feature)
563 .cloned()
564 .unwrap_or_default();
565
566 let mut aux_query = SearchQuery::new(&query.resource_type);
567 for param in part_params {
568 aux_query = aux_query.with_parameter(param);
569 }
570 aux_query.count = query.count;
571 aux_query.offset = query.offset;
572 aux_query.cursor = query.cursor.clone();
573
574 let t = tenant.clone();
575 let id = backend_id.clone();
576 tasks.spawn(async move {
577 let result = provider.search(&t, &aux_query).await;
578 (id, result)
579 });
580 }
581 }
582
583 let mut primary_result: Option<SearchResult> = None;
585 let mut primary_unsupported = false;
586 let mut auxiliary_results = Vec::new();
587
588 while let Some(result) = tasks.join_next().await {
589 match result {
590 Ok((id, search_result)) => {
591 self.update_health(
592 &id,
593 search_result.is_ok(),
594 search_result.as_ref().err().map(|e| e.to_string()),
595 );
596
597 if id == primary_id {
598 match search_result {
599 Ok(r) => primary_result = Some(r),
600 Err(StorageError::Backend(BackendError::UnsupportedCapability {
601 ..
602 })) => {
603 primary_unsupported = true;
607 }
608 Err(e) => return Err(e),
609 }
610 } else if let Ok(res) = search_result {
611 auxiliary_results.push((id, res));
612 }
613 }
615 Err(e) => {
616 warn!(error = %e, "Task join error during parallel search");
617 }
618 }
619 }
620
621 if primary_unsupported && primary_result.is_none() {
624 if !auxiliary_results.is_empty() {
625 let (_, promoted) = auxiliary_results.remove(0);
626 return Ok((promoted, auxiliary_results));
627 }
628 return Err(StorageError::Backend(BackendError::UnsupportedCapability {
629 backend_name: primary_id,
630 capability: "search".to_string(),
631 }));
632 }
633
634 let primary = primary_result.ok_or_else(|| {
635 StorageError::Backend(BackendError::ConnectionFailed {
636 backend_name: primary_id,
637 message: "Primary search task failed".to_string(),
638 })
639 })?;
640
641 Ok((primary, auxiliary_results))
642 }
643
644 async fn sync_bundle_results(&self, tenant: &TenantContext, result: &BundleResult) {
646 for entry_result in &result.entries {
647 if let Some(ref resource_json) = entry_result.resource {
649 let resource_type = resource_json
650 .get("resourceType")
651 .and_then(|v| v.as_str())
652 .unwrap_or_default();
653 let resource_id = resource_json
654 .get("id")
655 .and_then(|v| v.as_str())
656 .unwrap_or_default();
657
658 if resource_type.is_empty() || resource_id.is_empty() {
659 continue;
660 }
661
662 let fhir_version = resource_json
663 .get("meta")
664 .and_then(|m| m.get("profile"))
665 .map(|_| FhirVersion::default_enabled())
666 .unwrap_or_else(FhirVersion::default_enabled);
667
668 if let Err(e) = self
669 .sync_to_secondaries(SyncEvent::Create {
670 resource_type: resource_type.to_string(),
671 resource_id: resource_id.to_string(),
672 content: resource_json.clone(),
673 tenant_id: tenant.tenant_id().clone(),
674 fhir_version,
675 })
676 .await
677 {
678 warn!(
679 error = %e,
680 resource_type = resource_type,
681 resource_id = resource_id,
682 "Failed to sync bundle entry to secondaries"
683 );
684 }
685 }
686 }
687 }
688
689 fn routing_error_to_storage_error(&self, err: RoutingError) -> StorageError {
691 match err {
692 RoutingError::NoPrimaryBackend => StorageError::Backend(BackendError::Unavailable {
693 backend_name: "primary".to_string(),
694 message: "No primary backend configured".to_string(),
695 }),
696 RoutingError::NoCapableBackend { feature } => {
697 StorageError::Backend(BackendError::UnsupportedCapability {
698 backend_name: "composite".to_string(),
699 capability: format!("{:?}", feature),
700 })
701 }
702 RoutingError::BackendUnavailable { backend_id } => {
703 StorageError::Backend(BackendError::ConnectionFailed {
704 backend_name: backend_id,
705 message: "Backend unavailable".to_string(),
706 })
707 }
708 }
709 }
710}
711
712#[async_trait]
713impl ResourceStorage for CompositeStorage {
714 fn backend_name(&self) -> &'static str {
715 "composite"
716 }
717
718 fn sof_runner(&self) -> Option<Arc<dyn SofRunner>> {
719 self.primary.sof_runner()
724 }
725
726 #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type))]
727 async fn create(
728 &self,
729 tenant: &TenantContext,
730 resource_type: &str,
731 resource: Value,
732 fhir_version: FhirVersion,
733 ) -> StorageResult<StoredResource> {
734 let result = self
736 .primary
737 .create(tenant, resource_type, resource.clone(), fhir_version)
738 .await;
739
740 let primary_id = self.config.primary_id().unwrap_or("primary");
741 self.update_health(
742 primary_id,
743 result.is_ok(),
744 result.as_ref().err().map(|e| e.to_string()),
745 );
746
747 let stored = result?;
748
749 if let Err(e) = self
751 .sync_to_secondaries(SyncEvent::Create {
752 resource_type: resource_type.to_string(),
753 resource_id: stored.id().to_string(),
754 content: stored.content().clone(),
755 tenant_id: tenant.tenant_id().clone(),
756 fhir_version,
757 })
758 .await
759 {
760 warn!(error = %e, "Failed to sync create to secondaries");
761 }
763
764 Ok(stored)
765 }
766
767 #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type, id = %id))]
768 async fn create_or_update(
769 &self,
770 tenant: &TenantContext,
771 resource_type: &str,
772 id: &str,
773 resource: Value,
774 fhir_version: FhirVersion,
775 ) -> StorageResult<(StoredResource, bool)> {
776 let result = self
777 .primary
778 .create_or_update(tenant, resource_type, id, resource.clone(), fhir_version)
779 .await;
780
781 let primary_id = self.config.primary_id().unwrap_or("primary");
782 self.update_health(
783 primary_id,
784 result.is_ok(),
785 result.as_ref().err().map(|e| e.to_string()),
786 );
787
788 let (stored, created) = result?;
789
790 let event = if created {
792 SyncEvent::Create {
793 resource_type: resource_type.to_string(),
794 resource_id: id.to_string(),
795 content: stored.content().clone(),
796 tenant_id: tenant.tenant_id().clone(),
797 fhir_version,
798 }
799 } else {
800 SyncEvent::Update {
801 resource_type: resource_type.to_string(),
802 resource_id: id.to_string(),
803 content: stored.content().clone(),
804 tenant_id: tenant.tenant_id().clone(),
805 version: stored.version_id().to_string(),
806 fhir_version,
807 }
808 };
809
810 if let Err(e) = self.sync_to_secondaries(event).await {
811 warn!(error = %e, "Failed to sync create_or_update to secondaries");
812 }
813
814 Ok((stored, created))
815 }
816
817 #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
818 async fn read(
819 &self,
820 tenant: &TenantContext,
821 resource_type: &str,
822 id: &str,
823 ) -> StorageResult<Option<StoredResource>> {
824 let result = self.primary.read(tenant, resource_type, id).await;
826
827 let primary_id = self.config.primary_id().unwrap_or("primary");
828 self.update_health(
829 primary_id,
830 result.is_ok(),
831 result.as_ref().err().map(|e| e.to_string()),
832 );
833
834 result
835 }
836
837 #[instrument(skip(self, tenant, resource), fields(resource_type = %current.resource_type(), id = %current.id()))]
838 async fn update(
839 &self,
840 tenant: &TenantContext,
841 current: &StoredResource,
842 resource: Value,
843 ) -> StorageResult<StoredResource> {
844 let result = self.primary.update(tenant, current, resource.clone()).await;
845
846 let primary_id = self.config.primary_id().unwrap_or("primary");
847 self.update_health(
848 primary_id,
849 result.is_ok(),
850 result.as_ref().err().map(|e| e.to_string()),
851 );
852
853 let stored = result?;
854
855 if let Err(e) = self
857 .sync_to_secondaries(SyncEvent::Update {
858 resource_type: current.resource_type().to_string(),
859 resource_id: current.id().to_string(),
860 content: stored.content().clone(),
861 tenant_id: tenant.tenant_id().clone(),
862 version: stored.version_id().to_string(),
863 fhir_version: stored.fhir_version(),
864 })
865 .await
866 {
867 warn!(error = %e, "Failed to sync update to secondaries");
868 }
869
870 Ok(stored)
871 }
872
873 #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
874 async fn delete(
875 &self,
876 tenant: &TenantContext,
877 resource_type: &str,
878 id: &str,
879 ) -> StorageResult<()> {
880 let result = self.primary.delete(tenant, resource_type, id).await;
881
882 let primary_id = self.config.primary_id().unwrap_or("primary");
883 self.update_health(
884 primary_id,
885 result.is_ok(),
886 result.as_ref().err().map(|e| e.to_string()),
887 );
888
889 result?;
890
891 if let Err(e) = self
893 .sync_to_secondaries(SyncEvent::Delete {
894 resource_type: resource_type.to_string(),
895 resource_id: id.to_string(),
896 tenant_id: tenant.tenant_id().clone(),
897 })
898 .await
899 {
900 warn!(error = %e, "Failed to sync delete to secondaries");
901 }
902
903 Ok(())
904 }
905
906 async fn count(
907 &self,
908 tenant: &TenantContext,
909 resource_type: Option<&str>,
910 ) -> StorageResult<u64> {
911 self.primary.count(tenant, resource_type).await
912 }
913}
914
915#[async_trait]
916impl SearchProvider for CompositeStorage {
917 #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
918 async fn search(
919 &self,
920 tenant: &TenantContext,
921 query: &SearchQuery,
922 ) -> StorageResult<SearchResult> {
923 self.execute_routed_search(tenant, query).await
924 }
925
926 async fn search_count(
927 &self,
928 tenant: &TenantContext,
929 query: &SearchQuery,
930 ) -> StorageResult<u64> {
931 if let Some(search_backend) = self
933 .config
934 .backends_with_role(super::config::BackendRole::Search)
935 .next()
936 {
937 if let Some(provider) = self.search_providers.get(&search_backend.id) {
938 return provider.search_count(tenant, query).await;
939 }
940 }
941
942 if let Some(provider) = self
944 .search_providers
945 .get(self.config.primary_id().unwrap_or("primary"))
946 {
947 provider.search_count(tenant, query).await
948 } else {
949 Err(StorageError::Backend(BackendError::UnsupportedCapability {
950 backend_name: "composite".to_string(),
951 capability: "search_count".to_string(),
952 }))
953 }
954 }
955
956 fn search_param_registry(
957 &self,
958 ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
959 if let Some(search_backend) = self
964 .config
965 .backends_with_role(super::config::BackendRole::Search)
966 .next()
967 {
968 if let Some(provider) = self.search_providers.get(&search_backend.id) {
969 return provider.search_param_registry();
970 }
971 }
972
973 if let Some(provider) = self
974 .search_providers
975 .get(self.config.primary_id().unwrap_or("primary"))
976 {
977 return provider.search_param_registry();
978 }
979
980 use std::sync::OnceLock;
981 static EMPTY: OnceLock<
982 std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
983 > = OnceLock::new();
984 EMPTY.get_or_init(|| {
985 std::sync::Arc::new(parking_lot::RwLock::new(
986 crate::search::SearchParameterRegistry::new(),
987 ))
988 })
989 }
990
991 fn supports_contained_search(&self) -> bool {
992 if let Some(search_backend) = self
995 .config
996 .backends_with_role(super::config::BackendRole::Search)
997 .next()
998 {
999 if let Some(provider) = self.search_providers.get(&search_backend.id) {
1000 return provider.supports_contained_search();
1001 }
1002 }
1003 self.search_providers
1004 .get(self.config.primary_id().unwrap_or("primary"))
1005 .map(|p| p.supports_contained_search())
1006 .unwrap_or(false)
1007 }
1008
1009 fn modifiers_for_param_type(
1010 &self,
1011 param_type: crate::types::SearchParamType,
1012 ) -> Vec<&'static str> {
1013 if let Some(search_backend) = self
1016 .config
1017 .backends_with_role(super::config::BackendRole::Search)
1018 .next()
1019 {
1020 if let Some(provider) = self.search_providers.get(&search_backend.id) {
1021 return provider.modifiers_for_param_type(param_type);
1022 }
1023 }
1024 self.search_providers
1025 .get(self.config.primary_id().unwrap_or("primary"))
1026 .map(|p| p.modifiers_for_param_type(param_type))
1027 .unwrap_or_default()
1028 }
1029}
1030
1031#[async_trait]
1032impl ConditionalStorage for CompositeStorage {
1033 async fn conditional_create(
1034 &self,
1035 tenant: &TenantContext,
1036 resource_type: &str,
1037 resource: Value,
1038 search_params: &str,
1039 fhir_version: FhirVersion,
1040 ) -> StorageResult<ConditionalCreateResult> {
1041 if self.has_dedicated_search_backend() {
1042 let matches = self
1043 .find_conditional_matches(tenant, resource_type, search_params)
1044 .await?;
1045
1046 return match matches.len() {
1047 0 => {
1048 let created = self
1049 .primary
1050 .create(tenant, resource_type, resource, fhir_version)
1051 .await?;
1052
1053 if let Err(e) = self
1054 .sync_to_secondaries(SyncEvent::Create {
1055 resource_type: resource_type.to_string(),
1056 resource_id: created.id().to_string(),
1057 content: created.content().clone(),
1058 tenant_id: tenant.tenant_id().clone(),
1059 fhir_version,
1060 })
1061 .await
1062 {
1063 warn!(error = %e, "Failed to sync conditional_create to secondaries");
1064 }
1065
1066 Ok(ConditionalCreateResult::Created(created))
1067 }
1068 1 => Ok(ConditionalCreateResult::Exists(
1069 matches.into_iter().next().expect("single match must exist"),
1070 )),
1071 n => Ok(ConditionalCreateResult::MultipleMatches(n)),
1072 };
1073 }
1074
1075 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1076 StorageError::Backend(BackendError::UnsupportedCapability {
1077 backend_name: "composite".to_string(),
1078 capability: "ConditionalStorage".to_string(),
1079 })
1080 })?;
1081
1082 let result = storage
1083 .conditional_create(tenant, resource_type, resource, search_params, fhir_version)
1084 .await?;
1085
1086 if let ConditionalCreateResult::Created(ref stored) = result {
1088 if let Err(e) = self
1089 .sync_to_secondaries(SyncEvent::Create {
1090 resource_type: resource_type.to_string(),
1091 resource_id: stored.id().to_string(),
1092 content: stored.content().clone(),
1093 tenant_id: tenant.tenant_id().clone(),
1094 fhir_version,
1095 })
1096 .await
1097 {
1098 warn!(error = %e, "Failed to sync conditional_create to secondaries");
1099 }
1100 }
1101
1102 Ok(result)
1103 }
1104
1105 async fn conditional_update(
1106 &self,
1107 tenant: &TenantContext,
1108 resource_type: &str,
1109 resource: Value,
1110 search_params: &str,
1111 upsert: bool,
1112 fhir_version: FhirVersion,
1113 ) -> StorageResult<ConditionalUpdateResult> {
1114 if self.has_dedicated_search_backend() {
1115 let matches = self
1116 .find_conditional_matches(tenant, resource_type, search_params)
1117 .await?;
1118
1119 return match matches.len() {
1120 0 => {
1121 if upsert {
1122 let created = self
1123 .primary
1124 .create(tenant, resource_type, resource, fhir_version)
1125 .await?;
1126
1127 if let Err(e) = self
1128 .sync_to_secondaries(SyncEvent::Create {
1129 resource_type: resource_type.to_string(),
1130 resource_id: created.id().to_string(),
1131 content: created.content().clone(),
1132 tenant_id: tenant.tenant_id().clone(),
1133 fhir_version,
1134 })
1135 .await
1136 {
1137 warn!(
1138 error = %e,
1139 "Failed to sync conditional_update create to secondaries"
1140 );
1141 }
1142
1143 Ok(ConditionalUpdateResult::Created(created))
1144 } else {
1145 Ok(ConditionalUpdateResult::NoMatch)
1146 }
1147 }
1148 1 => {
1149 let current = matches.into_iter().next().expect("single match must exist");
1150 let updated = self.primary.update(tenant, ¤t, resource).await?;
1151
1152 if let Err(e) = self
1153 .sync_to_secondaries(SyncEvent::Update {
1154 resource_type: resource_type.to_string(),
1155 resource_id: updated.id().to_string(),
1156 content: updated.content().clone(),
1157 tenant_id: tenant.tenant_id().clone(),
1158 version: updated.version_id().to_string(),
1159 fhir_version: updated.fhir_version(),
1160 })
1161 .await
1162 {
1163 warn!(error = %e, "Failed to sync conditional_update to secondaries");
1164 }
1165
1166 Ok(ConditionalUpdateResult::Updated(updated))
1167 }
1168 n => Ok(ConditionalUpdateResult::MultipleMatches(n)),
1169 };
1170 }
1171
1172 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1173 StorageError::Backend(BackendError::UnsupportedCapability {
1174 backend_name: "composite".to_string(),
1175 capability: "ConditionalStorage".to_string(),
1176 })
1177 })?;
1178
1179 let result = storage
1180 .conditional_update(
1181 tenant,
1182 resource_type,
1183 resource,
1184 search_params,
1185 upsert,
1186 fhir_version,
1187 )
1188 .await?;
1189
1190 match &result {
1192 ConditionalUpdateResult::Created(stored) => {
1193 if let Err(e) = self
1194 .sync_to_secondaries(SyncEvent::Create {
1195 resource_type: resource_type.to_string(),
1196 resource_id: stored.id().to_string(),
1197 content: stored.content().clone(),
1198 tenant_id: tenant.tenant_id().clone(),
1199 fhir_version,
1200 })
1201 .await
1202 {
1203 warn!(error = %e, "Failed to sync conditional_update create to secondaries");
1204 }
1205 }
1206 ConditionalUpdateResult::Updated(stored) => {
1207 if let Err(e) = self
1208 .sync_to_secondaries(SyncEvent::Update {
1209 resource_type: resource_type.to_string(),
1210 resource_id: stored.id().to_string(),
1211 content: stored.content().clone(),
1212 tenant_id: tenant.tenant_id().clone(),
1213 version: stored.version_id().to_string(),
1214 fhir_version: stored.fhir_version(),
1215 })
1216 .await
1217 {
1218 warn!(error = %e, "Failed to sync conditional_update to secondaries");
1219 }
1220 }
1221 _ => {}
1222 }
1223
1224 Ok(result)
1225 }
1226
1227 async fn conditional_delete(
1228 &self,
1229 tenant: &TenantContext,
1230 resource_type: &str,
1231 search_params: &str,
1232 ) -> StorageResult<ConditionalDeleteResult> {
1233 if self.has_dedicated_search_backend() {
1234 let matches = self
1235 .find_conditional_matches(tenant, resource_type, search_params)
1236 .await?;
1237
1238 return match matches.len() {
1239 0 => Ok(ConditionalDeleteResult::NoMatch),
1240 1 => {
1241 let current = matches.into_iter().next().expect("single match must exist");
1242 self.primary
1243 .delete(tenant, resource_type, current.id())
1244 .await?;
1245
1246 if let Err(e) = self
1247 .sync_to_secondaries(SyncEvent::Delete {
1248 resource_type: resource_type.to_string(),
1249 resource_id: current.id().to_string(),
1250 tenant_id: tenant.tenant_id().clone(),
1251 })
1252 .await
1253 {
1254 warn!(error = %e, "Failed to sync conditional_delete to secondaries");
1255 }
1256
1257 Ok(ConditionalDeleteResult::Deleted)
1258 }
1259 n => Ok(ConditionalDeleteResult::MultipleMatches(n)),
1260 };
1261 }
1262
1263 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1264 StorageError::Backend(BackendError::UnsupportedCapability {
1265 backend_name: "composite".to_string(),
1266 capability: "ConditionalStorage".to_string(),
1267 })
1268 })?;
1269
1270 let result = storage
1271 .conditional_delete(tenant, resource_type, search_params)
1272 .await?;
1273
1274 Ok(result)
1278 }
1279
1280 async fn conditional_patch(
1281 &self,
1282 tenant: &TenantContext,
1283 resource_type: &str,
1284 search_params: &str,
1285 patch: &PatchFormat,
1286 ) -> StorageResult<ConditionalPatchResult> {
1287 let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1288 StorageError::Backend(BackendError::UnsupportedCapability {
1289 backend_name: "composite".to_string(),
1290 capability: "ConditionalStorage".to_string(),
1291 })
1292 })?;
1293
1294 let result = storage
1295 .conditional_patch(tenant, resource_type, search_params, patch)
1296 .await?;
1297
1298 if let ConditionalPatchResult::Patched(ref stored) = result {
1300 if let Err(e) = self
1301 .sync_to_secondaries(SyncEvent::Update {
1302 resource_type: resource_type.to_string(),
1303 resource_id: stored.id().to_string(),
1304 content: stored.content().clone(),
1305 tenant_id: tenant.tenant_id().clone(),
1306 version: stored.version_id().to_string(),
1307 fhir_version: stored.fhir_version(),
1308 })
1309 .await
1310 {
1311 warn!(error = %e, "Failed to sync conditional_patch to secondaries");
1312 }
1313 }
1314
1315 Ok(result)
1316 }
1317}
1318
1319#[async_trait]
1320impl VersionedStorage for CompositeStorage {
1321 async fn vread(
1322 &self,
1323 tenant: &TenantContext,
1324 resource_type: &str,
1325 id: &str,
1326 version_id: &str,
1327 ) -> StorageResult<Option<StoredResource>> {
1328 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1329 StorageError::Backend(BackendError::UnsupportedCapability {
1330 backend_name: "composite".to_string(),
1331 capability: "VersionedStorage".to_string(),
1332 })
1333 })?;
1334
1335 storage.vread(tenant, resource_type, id, version_id).await
1336 }
1337
1338 async fn update_with_match(
1339 &self,
1340 tenant: &TenantContext,
1341 resource_type: &str,
1342 id: &str,
1343 expected_version: &str,
1344 resource: Value,
1345 ) -> StorageResult<StoredResource> {
1346 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1347 StorageError::Backend(BackendError::UnsupportedCapability {
1348 backend_name: "composite".to_string(),
1349 capability: "VersionedStorage".to_string(),
1350 })
1351 })?;
1352
1353 let stored = storage
1354 .update_with_match(tenant, resource_type, id, expected_version, resource)
1355 .await?;
1356
1357 if let Err(e) = self
1359 .sync_to_secondaries(SyncEvent::Update {
1360 resource_type: resource_type.to_string(),
1361 resource_id: id.to_string(),
1362 content: stored.content().clone(),
1363 tenant_id: tenant.tenant_id().clone(),
1364 version: stored.version_id().to_string(),
1365 fhir_version: stored.fhir_version(),
1366 })
1367 .await
1368 {
1369 warn!(error = %e, "Failed to sync update_with_match to secondaries");
1370 }
1371
1372 Ok(stored)
1373 }
1374
1375 async fn delete_with_match(
1376 &self,
1377 tenant: &TenantContext,
1378 resource_type: &str,
1379 id: &str,
1380 expected_version: &str,
1381 ) -> StorageResult<()> {
1382 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1383 StorageError::Backend(BackendError::UnsupportedCapability {
1384 backend_name: "composite".to_string(),
1385 capability: "VersionedStorage".to_string(),
1386 })
1387 })?;
1388
1389 storage
1390 .delete_with_match(tenant, resource_type, id, expected_version)
1391 .await?;
1392
1393 if let Err(e) = self
1395 .sync_to_secondaries(SyncEvent::Delete {
1396 resource_type: resource_type.to_string(),
1397 resource_id: id.to_string(),
1398 tenant_id: tenant.tenant_id().clone(),
1399 })
1400 .await
1401 {
1402 warn!(error = %e, "Failed to sync delete_with_match to secondaries");
1403 }
1404
1405 Ok(())
1406 }
1407
1408 async fn list_versions(
1409 &self,
1410 tenant: &TenantContext,
1411 resource_type: &str,
1412 id: &str,
1413 ) -> StorageResult<Vec<String>> {
1414 let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1415 StorageError::Backend(BackendError::UnsupportedCapability {
1416 backend_name: "composite".to_string(),
1417 capability: "VersionedStorage".to_string(),
1418 })
1419 })?;
1420
1421 storage.list_versions(tenant, resource_type, id).await
1422 }
1423}
1424
1425#[async_trait]
1426impl InstanceHistoryProvider for CompositeStorage {
1427 async fn history_instance(
1428 &self,
1429 tenant: &TenantContext,
1430 resource_type: &str,
1431 id: &str,
1432 params: &HistoryParams,
1433 ) -> StorageResult<crate::core::HistoryPage> {
1434 let provider = self.history_provider.as_ref().ok_or_else(|| {
1435 StorageError::Backend(BackendError::UnsupportedCapability {
1436 backend_name: "composite".to_string(),
1437 capability: "InstanceHistoryProvider".to_string(),
1438 })
1439 })?;
1440
1441 provider
1442 .history_instance(tenant, resource_type, id, params)
1443 .await
1444 }
1445
1446 async fn history_instance_count(
1447 &self,
1448 tenant: &TenantContext,
1449 resource_type: &str,
1450 id: &str,
1451 ) -> StorageResult<u64> {
1452 let provider = self.history_provider.as_ref().ok_or_else(|| {
1453 StorageError::Backend(BackendError::UnsupportedCapability {
1454 backend_name: "composite".to_string(),
1455 capability: "InstanceHistoryProvider".to_string(),
1456 })
1457 })?;
1458
1459 provider
1460 .history_instance_count(tenant, resource_type, id)
1461 .await
1462 }
1463}
1464
1465#[async_trait]
1466impl TypeHistoryProvider for CompositeStorage {
1467 async fn history_type(
1468 &self,
1469 tenant: &TenantContext,
1470 resource_type: &str,
1471 params: &HistoryParams,
1472 ) -> StorageResult<crate::core::HistoryPage> {
1473 let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1474 StorageError::Backend(BackendError::UnsupportedCapability {
1475 backend_name: "composite".to_string(),
1476 capability: "TypeHistoryProvider".to_string(),
1477 })
1478 })?;
1479
1480 provider.history_type(tenant, resource_type, params).await
1481 }
1482
1483 async fn history_type_count(
1484 &self,
1485 tenant: &TenantContext,
1486 resource_type: &str,
1487 ) -> StorageResult<u64> {
1488 let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1489 StorageError::Backend(BackendError::UnsupportedCapability {
1490 backend_name: "composite".to_string(),
1491 capability: "TypeHistoryProvider".to_string(),
1492 })
1493 })?;
1494
1495 provider.history_type_count(tenant, resource_type).await
1496 }
1497}
1498
1499#[async_trait]
1500impl SystemHistoryProvider for CompositeStorage {
1501 async fn history_system(
1502 &self,
1503 tenant: &TenantContext,
1504 params: &HistoryParams,
1505 ) -> StorageResult<crate::core::HistoryPage> {
1506 let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1507 StorageError::Backend(BackendError::UnsupportedCapability {
1508 backend_name: "composite".to_string(),
1509 capability: "SystemHistoryProvider".to_string(),
1510 })
1511 })?;
1512
1513 provider.history_system(tenant, params).await
1514 }
1515
1516 async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1517 let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1518 StorageError::Backend(BackendError::UnsupportedCapability {
1519 backend_name: "composite".to_string(),
1520 capability: "SystemHistoryProvider".to_string(),
1521 })
1522 })?;
1523
1524 provider.history_system_count(tenant).await
1525 }
1526}
1527
1528#[async_trait]
1529impl BundleProvider for CompositeStorage {
1530 async fn process_transaction(
1531 &self,
1532 tenant: &TenantContext,
1533 entries: Vec<BundleEntry>,
1534 ) -> Result<BundleResult, TransactionError> {
1535 let provider =
1536 self.bundle_provider
1537 .as_ref()
1538 .ok_or_else(|| TransactionError::BundleError {
1539 index: 0,
1540 message: "BundleProvider not available on composite primary".to_string(),
1541 })?;
1542
1543 let result = provider.process_transaction(tenant, entries).await?;
1544
1545 self.sync_bundle_results(tenant, &result).await;
1547
1548 Ok(result)
1549 }
1550
1551 async fn process_batch(
1552 &self,
1553 tenant: &TenantContext,
1554 entries: Vec<BundleEntry>,
1555 ) -> StorageResult<BundleResult> {
1556 let provider = self.bundle_provider.as_ref().ok_or_else(|| {
1557 StorageError::Backend(BackendError::UnsupportedCapability {
1558 backend_name: "composite".to_string(),
1559 capability: "BundleProvider".to_string(),
1560 })
1561 })?;
1562
1563 let result = provider.process_batch(tenant, entries).await?;
1564
1565 self.sync_bundle_results(tenant, &result).await;
1567
1568 Ok(result)
1569 }
1570}
1571
1572#[async_trait]
1573impl IncludeProvider for CompositeStorage {
1574 async fn resolve_includes(
1575 &self,
1576 tenant: &TenantContext,
1577 resources: &[StoredResource],
1578 includes: &[IncludeDirective],
1579 ) -> StorageResult<Vec<StoredResource>> {
1580 let primary_id = self.config.primary_id().unwrap_or("primary");
1582
1583 if let Some(_provider) = self.search_providers.get(primary_id) {
1584 self.resolve_includes_basic(tenant, resources, includes)
1588 .await
1589 } else {
1590 self.resolve_includes_basic(tenant, resources, includes)
1591 .await
1592 }
1593 }
1594}
1595
1596impl CompositeStorage {
1597 async fn resolve_includes_basic(
1599 &self,
1600 tenant: &TenantContext,
1601 resources: &[StoredResource],
1602 includes: &[IncludeDirective],
1603 ) -> StorageResult<Vec<StoredResource>> {
1604 use std::collections::HashSet;
1605
1606 let mut included = Vec::new();
1607 let mut seen_ids = HashSet::new();
1608
1609 for resource in resources {
1610 for include in includes {
1611 let refs = self.extract_references(resource, &include.search_param);
1613
1614 for reference in refs {
1615 if let Some((ref_type, ref_id)) = reference.split_once('/') {
1617 if let Some(ref target) = include.target_type {
1619 if target != ref_type {
1620 continue;
1621 }
1622 }
1623
1624 let key = format!("{}/{}", ref_type, ref_id);
1625 if seen_ids.insert(key) {
1626 if let Ok(Some(included_resource)) =
1627 self.primary.read(tenant, ref_type, ref_id).await
1628 {
1629 included.push(included_resource);
1630 }
1631 }
1632 }
1633 }
1634 }
1635 }
1636
1637 Ok(included)
1638 }
1639
1640 fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec<String> {
1653 let content = resource.content();
1654 let resource_type = resource.resource_type();
1655
1656 let registered = {
1657 let registry = self.search_param_registry().read();
1658 registry
1659 .get_param(resource_type, search_param)
1660 .or_else(|| registry.get_param("Resource", search_param))
1661 };
1662
1663 if let Some(param_def) = registered {
1664 let extractor = crate::search::SearchParameterExtractor::new(Arc::clone(
1665 self.search_param_registry(),
1666 ));
1667 if let Ok(values) = extractor.extract_for_param(content, ¶m_def) {
1668 return values
1675 .into_iter()
1676 .filter_map(|v| match v.value {
1677 crate::search::IndexValue::Reference { reference, .. } => Some(reference),
1678 _ => None,
1679 })
1680 .collect();
1681 }
1682 }
1683
1684 let mut refs = Vec::new();
1686 if let Some(value) = content.get(search_param) {
1687 Self::extract_reference_values(value, &mut refs);
1688 }
1689 let alias = match search_param {
1690 "patient" | "subject" => Some("subject"),
1691 "encounter" => Some("encounter"),
1692 "performer" => Some("performer"),
1693 _ => None,
1694 };
1695 if let Some(field) = alias {
1696 if let Some(value) = content.get(field) {
1697 Self::extract_reference_values(value, &mut refs);
1698 }
1699 }
1700 refs
1701 }
1702
1703 fn extract_reference_values(value: &Value, refs: &mut Vec<String>) {
1705 match value {
1706 Value::Object(obj) => {
1707 if let Some(Value::String(reference)) = obj.get("reference") {
1708 refs.push(reference.clone());
1709 }
1710 }
1711 Value::Array(arr) => {
1712 for item in arr {
1713 Self::extract_reference_values(item, refs);
1714 }
1715 }
1716 _ => {}
1717 }
1718 }
1719}
1720
1721#[async_trait]
1722impl RevincludeProvider for CompositeStorage {
1723 async fn resolve_revincludes(
1724 &self,
1725 tenant: &TenantContext,
1726 resources: &[StoredResource],
1727 revincludes: &[IncludeDirective],
1728 ) -> StorageResult<Vec<StoredResource>> {
1729 let mut revincluded = Vec::new();
1732
1733 for revinclude in revincludes {
1734 for resource in resources {
1735 let reference = format!("{}/{}", resource.resource_type(), resource.id());
1736
1737 let query = SearchQuery::new(&revinclude.source_type).with_parameter(
1739 crate::types::SearchParameter {
1740 name: revinclude.search_param.clone(),
1741 param_type: crate::types::SearchParamType::Reference,
1742 modifier: None,
1743 values: vec![crate::types::SearchValue::eq(&reference)],
1744 chain: vec![],
1745 components: vec![],
1746 },
1747 );
1748
1749 if let Ok(result) = self.search(tenant, &query).await {
1750 for item in result.resources.items {
1751 revincluded.push(item);
1752 }
1753 }
1754 }
1755 }
1756
1757 let mut seen = std::collections::HashSet::new();
1759 revincluded.retain(|r| seen.insert(format!("{}/{}", r.resource_type(), r.id())));
1760
1761 Ok(revincluded)
1762 }
1763}
1764
1765#[async_trait]
1766impl ChainedSearchProvider for CompositeStorage {
1767 async fn resolve_chain(
1768 &self,
1769 tenant: &TenantContext,
1770 base_type: &str,
1771 chain: &str,
1772 value: &str,
1773 ) -> StorageResult<Vec<String>> {
1774 self.resolve_chain_via_search(tenant, base_type, chain, value)
1775 .await
1776 }
1777
1778 async fn resolve_reverse_chain(
1779 &self,
1780 tenant: &TenantContext,
1781 base_type: &str,
1782 reverse_chain: &ReverseChainedParameter,
1783 ) -> StorageResult<Vec<String>> {
1784 let values = match &reverse_chain.value {
1787 Some(v) => vec![v.clone()],
1788 None => vec![],
1789 };
1790 let query = SearchQuery::new(&reverse_chain.source_type).with_parameter(
1791 crate::types::SearchParameter {
1792 name: reverse_chain.search_param.clone(),
1793 param_type: crate::types::SearchParamType::Token,
1794 modifier: None,
1795 values,
1796 chain: vec![],
1797 components: vec![],
1798 },
1799 );
1800
1801 let result = self.search(tenant, &query).await?;
1802
1803 let mut ids = Vec::new();
1805 for resource in result.resources.items {
1806 let refs = self.extract_references(&resource, &reverse_chain.reference_param);
1807 for reference in refs {
1808 if let Some((ref_type, ref_id)) = reference.split_once('/') {
1809 if ref_type == base_type {
1810 ids.push(ref_id.to_string());
1811 }
1812 }
1813 }
1814 }
1815
1816 Ok(ids)
1817 }
1818}
1819
1820impl CompositeStorage {
1821 async fn resolve_chain_via_search(
1835 &self,
1836 tenant: &TenantContext,
1837 base_type: &str,
1838 chain: &str,
1839 value: &str,
1840 ) -> StorageResult<Vec<String>> {
1841 use crate::types::{SearchParameter, SearchQuery, SearchValue};
1842
1843 let parts: Vec<&str> = chain.split('.').collect();
1844 if parts.len() < 2 {
1845 return Ok(Vec::new());
1846 }
1847
1848 let target_types: Vec<String> = {
1852 let registry = self.search_param_registry().read();
1853 let mut types = Vec::with_capacity(parts.len() - 1);
1854 let mut current = base_type.to_string();
1855 for ref_param in parts.iter().take(parts.len() - 1) {
1856 let next = registry
1857 .get_param(¤t, ref_param)
1858 .and_then(|def| {
1859 def.target.as_ref().and_then(|t| {
1860 if t.len() == 1 {
1861 Some(t[0].clone())
1862 } else {
1863 None
1864 }
1865 })
1866 })
1867 .unwrap_or_else(|| crate::search::chain_resolver::infer_target_type(ref_param));
1868 types.push(next.clone());
1869 current = next;
1870 }
1871 types
1872 };
1873
1874 let terminal_param = parts[parts.len() - 1];
1876 let deepest_type = target_types.last().map(String::as_str).unwrap_or(base_type);
1877
1878 let terminal_query = SearchQuery::new(deepest_type).with_parameter(SearchParameter {
1879 name: terminal_param.to_string(),
1880 param_type: {
1881 let registry = self.search_param_registry().read();
1882 crate::search::resolve_param_type(
1883 ®istry,
1884 deepest_type,
1885 terminal_param,
1886 &[SearchValue::eq(value)],
1887 )
1888 },
1889 modifier: None,
1890 values: vec![SearchValue::eq(value)],
1891 chain: vec![],
1892 components: vec![],
1893 });
1894
1895 let result = self.search(tenant, &terminal_query).await?;
1896 let mut current_refs: Vec<String> = result
1897 .resources
1898 .items
1899 .into_iter()
1900 .map(|r| format!("{}/{}", r.resource_type(), r.id()))
1901 .collect();
1902
1903 if current_refs.is_empty() {
1904 return Ok(Vec::new());
1905 }
1906
1907 for i in (0..parts.len() - 1).rev() {
1911 let ref_param = parts[i];
1912 let parent_type = if i == 0 {
1913 base_type
1914 } else {
1915 &target_types[i - 1]
1916 };
1917
1918 let values: Vec<SearchValue> = current_refs.iter().map(SearchValue::eq).collect();
1919 let query = SearchQuery::new(parent_type).with_parameter(SearchParameter {
1920 name: ref_param.to_string(),
1921 param_type: crate::types::SearchParamType::Reference,
1922 modifier: None,
1923 values,
1924 chain: vec![],
1925 components: vec![],
1926 });
1927
1928 let r = self.search(tenant, &query).await?;
1929 current_refs = r
1930 .resources
1931 .items
1932 .into_iter()
1933 .map(|res| {
1934 if i == 0 {
1935 res.id().to_string()
1937 } else {
1938 format!("{}/{}", res.resource_type(), res.id())
1939 }
1940 })
1941 .collect();
1942
1943 if current_refs.is_empty() {
1944 return Ok(Vec::new());
1945 }
1946 }
1947
1948 Ok(current_refs)
1949 }
1950}
1951
1952#[async_trait]
1953impl TerminologySearchProvider for CompositeStorage {
1954 async fn expand_value_set(&self, _value_set_url: &str) -> StorageResult<Vec<(String, String)>> {
1955 let term_backend = self
1957 .config
1958 .backends_with_role(super::config::BackendRole::Terminology)
1959 .next();
1960
1961 if let Some(_backend) = term_backend {
1962 }
1964
1965 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1967 backend_name: "composite".to_string(),
1968 capability: "expand_value_set".to_string(),
1969 }))
1970 }
1971
1972 async fn codes_above(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1973 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1974 backend_name: "composite".to_string(),
1975 capability: "codes_above".to_string(),
1976 }))
1977 }
1978
1979 async fn codes_below(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1980 Err(StorageError::Backend(BackendError::UnsupportedCapability {
1981 backend_name: "composite".to_string(),
1982 capability: "codes_below".to_string(),
1983 }))
1984 }
1985}
1986
1987#[async_trait]
1988impl TextSearchProvider for CompositeStorage {
1989 async fn search_text(
1990 &self,
1991 tenant: &TenantContext,
1992 resource_type: &str,
1993 text: &str,
1994 pagination: &Pagination,
1995 ) -> StorageResult<SearchResult> {
1996 let search_backend = self
1998 .config
1999 .backends_with_role(super::config::BackendRole::Search)
2000 .next();
2001
2002 if let Some(backend) = search_backend {
2003 if let Some(provider) = self.search_providers.get(&backend.id) {
2004 let query = SearchQuery::new(resource_type)
2006 .with_parameter(crate::types::SearchParameter {
2007 name: "_text".to_string(),
2008 param_type: crate::types::SearchParamType::String,
2009 modifier: None,
2010 values: vec![crate::types::SearchValue::string(text)],
2011 chain: vec![],
2012 components: vec![],
2013 })
2014 .with_count(pagination.count);
2015
2016 return provider.search(tenant, &query).await;
2017 }
2018 }
2019
2020 self.execute_primary_search(
2022 tenant,
2023 &SearchQuery::new(resource_type)
2024 .with_parameter(crate::types::SearchParameter {
2025 name: "_text".to_string(),
2026 param_type: crate::types::SearchParamType::String,
2027 modifier: None,
2028 values: vec![crate::types::SearchValue::string(text)],
2029 chain: vec![],
2030 components: vec![],
2031 })
2032 .with_count(pagination.count),
2033 )
2034 .await
2035 }
2036
2037 async fn search_content(
2038 &self,
2039 tenant: &TenantContext,
2040 resource_type: &str,
2041 content: &str,
2042 pagination: &Pagination,
2043 ) -> StorageResult<SearchResult> {
2044 let search_backend = self
2046 .config
2047 .backends_with_role(super::config::BackendRole::Search)
2048 .next();
2049
2050 if let Some(backend) = search_backend {
2051 if let Some(provider) = self.search_providers.get(&backend.id) {
2052 let query = SearchQuery::new(resource_type)
2053 .with_parameter(crate::types::SearchParameter {
2054 name: "_content".to_string(),
2055 param_type: crate::types::SearchParamType::String,
2056 modifier: None,
2057 values: vec![crate::types::SearchValue::string(content)],
2058 chain: vec![],
2059 components: vec![],
2060 })
2061 .with_count(pagination.count);
2062
2063 return provider.search(tenant, &query).await;
2064 }
2065 }
2066
2067 self.execute_primary_search(
2068 tenant,
2069 &SearchQuery::new(resource_type)
2070 .with_parameter(crate::types::SearchParameter {
2071 name: "_content".to_string(),
2072 param_type: crate::types::SearchParamType::String,
2073 modifier: None,
2074 values: vec![crate::types::SearchValue::string(content)],
2075 chain: vec![],
2076 components: vec![],
2077 })
2078 .with_count(pagination.count),
2079 )
2080 .await
2081 }
2082}
2083
2084impl CapabilityProvider for CompositeStorage {
2085 fn capabilities(&self) -> StorageCapabilities {
2086 use std::collections::HashSet;
2087
2088 let resource_caps = HashMap::new();
2090
2091 let mut system_interactions = HashSet::new();
2092 system_interactions.insert(crate::core::SystemInteraction::Transaction);
2093 system_interactions.insert(crate::core::SystemInteraction::Batch);
2094 system_interactions.insert(crate::core::SystemInteraction::SearchSystem);
2095 system_interactions.insert(crate::core::SystemInteraction::HistorySystem);
2096
2097 StorageCapabilities {
2098 backend_name: "composite".to_string(),
2099 backend_version: None,
2100 resources: resource_caps,
2101 system_interactions,
2102 supports_system_history: true,
2103 supports_system_search: true,
2104 supported_sorts: vec!["_lastUpdated".to_string(), "_id".to_string()],
2105 supports_total: true,
2106 max_page_size: Some(1000),
2107 default_page_size: 20,
2108 }
2109 }
2110
2111 }
2113
2114fn export_unsupported() -> StorageError {
2117 StorageError::Backend(BackendError::UnsupportedCapability {
2118 backend_name: "composite".to_string(),
2119 capability: "bulk-export".to_string(),
2120 })
2121}
2122
2123#[async_trait]
2124impl ExportDataProvider for CompositeStorage {
2125 async fn list_export_types(
2126 &self,
2127 tenant: &TenantContext,
2128 request: &ExportRequest,
2129 ) -> StorageResult<Vec<String>> {
2130 match &self.export_provider {
2131 Some(p) => p.list_export_types(tenant, request).await,
2132 None => Err(export_unsupported()),
2133 }
2134 }
2135
2136 async fn count_export_resources(
2137 &self,
2138 tenant: &TenantContext,
2139 request: &ExportRequest,
2140 resource_type: &str,
2141 ) -> StorageResult<u64> {
2142 match &self.export_provider {
2143 Some(p) => {
2144 p.count_export_resources(tenant, request, resource_type)
2145 .await
2146 }
2147 None => Err(export_unsupported()),
2148 }
2149 }
2150
2151 async fn fetch_export_batch(
2152 &self,
2153 tenant: &TenantContext,
2154 request: &ExportRequest,
2155 resource_type: &str,
2156 cursor: Option<&str>,
2157 batch_size: u32,
2158 ) -> StorageResult<NdjsonBatch> {
2159 match &self.export_provider {
2160 Some(p) => {
2161 p.fetch_export_batch(tenant, request, resource_type, cursor, batch_size)
2162 .await
2163 }
2164 None => Err(export_unsupported()),
2165 }
2166 }
2167}
2168
2169#[async_trait]
2170impl PatientExportProvider for CompositeStorage {
2171 async fn list_patient_ids(
2172 &self,
2173 tenant: &TenantContext,
2174 request: &ExportRequest,
2175 cursor: Option<&str>,
2176 batch_size: u32,
2177 ) -> StorageResult<(Vec<String>, Option<String>)> {
2178 match &self.export_provider {
2179 Some(p) => {
2180 p.list_patient_ids(tenant, request, cursor, batch_size)
2181 .await
2182 }
2183 None => Err(export_unsupported()),
2184 }
2185 }
2186
2187 async fn fetch_patient_compartment_batch(
2188 &self,
2189 tenant: &TenantContext,
2190 request: &ExportRequest,
2191 resource_type: &str,
2192 patient_ids: &[String],
2193 cursor: Option<&str>,
2194 batch_size: u32,
2195 ) -> StorageResult<NdjsonBatch> {
2196 match &self.export_provider {
2197 Some(p) => {
2198 p.fetch_patient_compartment_batch(
2199 tenant,
2200 request,
2201 resource_type,
2202 patient_ids,
2203 cursor,
2204 batch_size,
2205 )
2206 .await
2207 }
2208 None => Err(export_unsupported()),
2209 }
2210 }
2211}
2212
2213#[async_trait]
2214impl GroupExportProvider for CompositeStorage {
2215 async fn get_group_members(
2216 &self,
2217 tenant: &TenantContext,
2218 group_id: &str,
2219 ) -> StorageResult<Vec<String>> {
2220 match &self.export_provider {
2221 Some(p) => p.get_group_members(tenant, group_id).await,
2222 None => Err(export_unsupported()),
2223 }
2224 }
2225
2226 async fn resolve_group_patient_ids(
2227 &self,
2228 tenant: &TenantContext,
2229 group_id: &str,
2230 ) -> StorageResult<Vec<String>> {
2231 match &self.export_provider {
2232 Some(p) => p.resolve_group_patient_ids(tenant, group_id).await,
2233 None => Err(export_unsupported()),
2234 }
2235 }
2236}
2237
2238#[cfg(test)]
2239mod tests {
2240 use super::*;
2241 use crate::core::{BackendKind, CapabilityProvider};
2242 use crate::error::{BackendError, StorageError, StorageResult};
2243 use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2244 use crate::types::{
2245 SearchParamType, SearchParameter, SearchQuery, SearchValue, StoredResource,
2246 };
2247 use async_trait::async_trait;
2248 use helios_fhir::FhirVersion;
2249 use serde_json::{Value, json};
2250
2251 #[derive(Debug)]
2252 struct FailingSearchBackend {
2253 backend_name: &'static str,
2254 error_message: &'static str,
2255 }
2256
2257 #[async_trait]
2258 impl ResourceStorage for FailingSearchBackend {
2259 fn backend_name(&self) -> &'static str {
2260 self.backend_name
2261 }
2262
2263 async fn create(
2264 &self,
2265 _tenant: &TenantContext,
2266 _resource_type: &str,
2267 _resource: Value,
2268 _fhir_version: FhirVersion,
2269 ) -> StorageResult<StoredResource> {
2270 Err(StorageError::Backend(BackendError::UnsupportedCapability {
2271 backend_name: self.backend_name.to_string(),
2272 capability: "create".to_string(),
2273 }))
2274 }
2275
2276 async fn create_or_update(
2277 &self,
2278 _tenant: &TenantContext,
2279 _resource_type: &str,
2280 _id: &str,
2281 _resource: Value,
2282 _fhir_version: FhirVersion,
2283 ) -> StorageResult<(StoredResource, bool)> {
2284 Err(StorageError::Backend(BackendError::UnsupportedCapability {
2285 backend_name: self.backend_name.to_string(),
2286 capability: "create_or_update".to_string(),
2287 }))
2288 }
2289
2290 async fn read(
2291 &self,
2292 _tenant: &TenantContext,
2293 _resource_type: &str,
2294 _id: &str,
2295 ) -> StorageResult<Option<StoredResource>> {
2296 Ok(None)
2297 }
2298
2299 async fn update(
2300 &self,
2301 _tenant: &TenantContext,
2302 _current: &StoredResource,
2303 _resource: Value,
2304 ) -> StorageResult<StoredResource> {
2305 Err(StorageError::Backend(BackendError::UnsupportedCapability {
2306 backend_name: self.backend_name.to_string(),
2307 capability: "update".to_string(),
2308 }))
2309 }
2310
2311 async fn delete(
2312 &self,
2313 _tenant: &TenantContext,
2314 _resource_type: &str,
2315 _id: &str,
2316 ) -> StorageResult<()> {
2317 Err(StorageError::Backend(BackendError::UnsupportedCapability {
2318 backend_name: self.backend_name.to_string(),
2319 capability: "delete".to_string(),
2320 }))
2321 }
2322
2323 async fn count(
2324 &self,
2325 _tenant: &TenantContext,
2326 _resource_type: Option<&str>,
2327 ) -> StorageResult<u64> {
2328 Ok(0)
2329 }
2330 }
2331
2332 #[async_trait]
2333 impl SearchProvider for FailingSearchBackend {
2334 async fn search(
2335 &self,
2336 _tenant: &TenantContext,
2337 _query: &SearchQuery,
2338 ) -> StorageResult<SearchResult> {
2339 Err(StorageError::Backend(BackendError::ConnectionFailed {
2340 backend_name: self.backend_name.to_string(),
2341 message: self.error_message.to_string(),
2342 }))
2343 }
2344
2345 async fn search_count(
2346 &self,
2347 _tenant: &TenantContext,
2348 _query: &SearchQuery,
2349 ) -> StorageResult<u64> {
2350 Err(StorageError::Backend(BackendError::ConnectionFailed {
2351 backend_name: self.backend_name.to_string(),
2352 message: self.error_message.to_string(),
2353 }))
2354 }
2355
2356 fn search_param_registry(
2357 &self,
2358 ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
2359 use std::sync::OnceLock;
2360 static EMPTY: OnceLock<
2361 std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
2362 > = OnceLock::new();
2363 EMPTY.get_or_init(|| {
2364 std::sync::Arc::new(parking_lot::RwLock::new(
2365 crate::search::SearchParameterRegistry::new(),
2366 ))
2367 })
2368 }
2369 }
2370
2371 struct MockStorage;
2373
2374 #[async_trait]
2375 impl ResourceStorage for MockStorage {
2376 fn backend_name(&self) -> &'static str {
2377 "mock"
2378 }
2379
2380 async fn create(
2381 &self,
2382 tenant: &TenantContext,
2383 resource_type: &str,
2384 resource: Value,
2385 fhir_version: FhirVersion,
2386 ) -> StorageResult<StoredResource> {
2387 let id = uuid::Uuid::new_v4().to_string();
2388 Ok(StoredResource::new(
2389 resource_type,
2390 &id,
2391 tenant.tenant_id().clone(),
2392 resource,
2393 fhir_version,
2394 ))
2395 }
2396
2397 async fn create_or_update(
2398 &self,
2399 tenant: &TenantContext,
2400 resource_type: &str,
2401 id: &str,
2402 resource: Value,
2403 fhir_version: FhirVersion,
2404 ) -> StorageResult<(StoredResource, bool)> {
2405 Ok((
2406 StoredResource::new(
2407 resource_type,
2408 id,
2409 tenant.tenant_id().clone(),
2410 resource,
2411 fhir_version,
2412 ),
2413 true,
2414 ))
2415 }
2416
2417 async fn read(
2418 &self,
2419 _tenant: &TenantContext,
2420 _resource_type: &str,
2421 _id: &str,
2422 ) -> StorageResult<Option<StoredResource>> {
2423 Ok(None)
2424 }
2425
2426 async fn update(
2427 &self,
2428 tenant: &TenantContext,
2429 current: &StoredResource,
2430 resource: Value,
2431 ) -> StorageResult<StoredResource> {
2432 Ok(StoredResource::new(
2433 current.resource_type(),
2434 current.id(),
2435 tenant.tenant_id().clone(),
2436 resource,
2437 current.fhir_version(),
2438 ))
2439 }
2440
2441 async fn delete(
2442 &self,
2443 _tenant: &TenantContext,
2444 _resource_type: &str,
2445 _id: &str,
2446 ) -> StorageResult<()> {
2447 Ok(())
2448 }
2449
2450 async fn count(
2451 &self,
2452 _tenant: &TenantContext,
2453 _resource_type: Option<&str>,
2454 ) -> StorageResult<u64> {
2455 Ok(0)
2456 }
2457 }
2458
2459 fn make_tenant() -> TenantContext {
2460 TenantContext::new(TenantId::new("test"), TenantPermissions::full_access())
2461 }
2462
2463 fn make_composite_no_secondary() -> CompositeStorage {
2464 let config = CompositeConfig::builder()
2465 .primary("primary", BackendKind::Sqlite)
2466 .build()
2467 .unwrap();
2468 let mut backends = HashMap::new();
2469 backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage);
2470 CompositeStorage::new(config, backends).unwrap()
2471 }
2472
2473 fn make_composite_with_secondary() -> CompositeStorage {
2474 let config = CompositeConfig::builder()
2475 .primary("primary", BackendKind::Sqlite)
2476 .search_backend("es", BackendKind::Elasticsearch)
2477 .build()
2478 .unwrap();
2479 let mut backends = HashMap::new();
2480 backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage);
2481 backends.insert("es".to_string(), Arc::new(MockStorage) as DynStorage);
2482 CompositeStorage::new(config, backends).unwrap()
2483 }
2484
2485 fn test_config() -> CompositeConfig {
2486 CompositeConfig::builder()
2487 .primary("sqlite", BackendKind::Sqlite)
2488 .search_backend("es", BackendKind::Elasticsearch)
2489 .build()
2490 .unwrap()
2491 }
2492
2493 #[test]
2496 fn test_backend_health_default() {
2497 let health = BackendHealth::default();
2498 assert!(health.healthy);
2499 assert_eq!(health.failure_count, 0);
2500 assert!(health.last_error.is_none());
2501 assert!(health.last_success.is_none());
2502 }
2503
2504 #[test]
2505 fn test_backend_health_clone() {
2506 let health = BackendHealth {
2507 healthy: false,
2508 last_success: None,
2509 failure_count: 5,
2510 last_error: Some("timeout".to_string()),
2511 };
2512 let cloned = health.clone();
2513 assert!(!cloned.healthy);
2514 assert_eq!(cloned.failure_count, 5);
2515 assert_eq!(cloned.last_error.as_deref(), Some("timeout"));
2516 }
2517
2518 #[test]
2521 fn test_composite_config() {
2522 let config = test_config();
2523 assert_eq!(config.primary_id(), Some("sqlite"));
2524 assert_eq!(config.secondaries().count(), 1);
2525 }
2526
2527 #[cfg(feature = "sqlite")]
2528 #[tokio::test]
2529 async fn test_search_prefers_configured_search_backend() {
2530 use std::collections::HashMap;
2531 use std::sync::Arc;
2532
2533 use crate::backends::sqlite::SqliteBackend;
2534 use crate::core::{ResourceStorage, SearchProvider};
2535 use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2536 use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
2537
2538 let primary = Arc::new(SqliteBackend::in_memory().expect("create primary sqlite backend"));
2539 primary.init_schema().expect("init primary sqlite schema");
2540
2541 let search = Arc::new(SqliteBackend::in_memory().expect("create search sqlite backend"));
2542 search.init_schema().expect("init search sqlite schema");
2543
2544 let tenant = TenantContext::new(
2545 TenantId::new("composite-test"),
2546 TenantPermissions::full_access(),
2547 );
2548
2549 primary
2551 .create(
2552 &tenant,
2553 "Patient",
2554 json!({
2555 "resourceType": "Patient",
2556 "id": "primary-only-patient",
2557 }),
2558 FhirVersion::default(),
2559 )
2560 .await
2561 .expect("seed primary patient");
2562
2563 search
2564 .create(
2565 &tenant,
2566 "Patient",
2567 json!({
2568 "resourceType": "Patient",
2569 "id": "search-only-patient",
2570 }),
2571 FhirVersion::default(),
2572 )
2573 .await
2574 .expect("seed search patient");
2575
2576 let composite_config = CompositeConfig::builder()
2577 .primary("primary", BackendKind::Sqlite)
2578 .search_backend("search", BackendKind::Sqlite)
2579 .build()
2580 .expect("build composite config");
2581
2582 let mut backends = HashMap::new();
2583 backends.insert("primary".to_string(), primary.clone() as DynStorage);
2584 backends.insert("search".to_string(), search.clone() as DynStorage);
2585
2586 let mut search_providers = HashMap::new();
2587 search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2588 search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2589
2590 let composite = CompositeStorage::new(composite_config, backends)
2591 .expect("create composite storage")
2592 .with_search_providers(search_providers)
2593 .with_full_primary(primary.clone());
2594
2595 let read_result = composite
2596 .read(&tenant, "Patient", "primary-only-patient")
2597 .await
2598 .expect("composite read should succeed");
2599 assert!(
2600 read_result.is_some(),
2601 "Read path should use primary backend data"
2602 );
2603
2604 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2605 name: "_id".to_string(),
2606 param_type: SearchParamType::Token,
2607 modifier: None,
2608 values: vec![SearchValue::eq("search-only-patient")],
2609 chain: vec![],
2610 components: vec![],
2611 });
2612
2613 let result = composite
2614 .search(&tenant, &query)
2615 .await
2616 .expect("composite search should succeed");
2617
2618 assert_eq!(result.resources.len(), 1);
2619 assert_eq!(result.resources.items[0].id(), "search-only-patient");
2620
2621 let count = composite
2622 .search_count(&tenant, &query)
2623 .await
2624 .expect("composite search_count should succeed");
2625 assert_eq!(count, 1);
2626 }
2627
2628 #[cfg(feature = "sqlite")]
2629 #[tokio::test]
2630 async fn test_search_backend_preserves_tenant_isolation() {
2631 use std::collections::HashMap;
2632 use std::sync::Arc;
2633
2634 use crate::backends::sqlite::SqliteBackend;
2635 use crate::core::{ResourceStorage, SearchProvider};
2636 use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2637
2638 let primary = Arc::new(SqliteBackend::in_memory().expect("create primary sqlite backend"));
2639 primary.init_schema().expect("init primary sqlite schema");
2640
2641 let search = Arc::new(SqliteBackend::in_memory().expect("create search sqlite backend"));
2642 search.init_schema().expect("init search sqlite schema");
2643
2644 let tenant_a =
2645 TenantContext::new(TenantId::new("tenant-a"), TenantPermissions::full_access());
2646 let tenant_b =
2647 TenantContext::new(TenantId::new("tenant-b"), TenantPermissions::full_access());
2648
2649 search
2650 .create(
2651 &tenant_a,
2652 "Patient",
2653 json!({
2654 "resourceType": "Patient",
2655 "id": "tenant-a-patient",
2656 }),
2657 FhirVersion::default(),
2658 )
2659 .await
2660 .expect("seed tenant A search patient");
2661
2662 search
2663 .create(
2664 &tenant_b,
2665 "Patient",
2666 json!({
2667 "resourceType": "Patient",
2668 "id": "tenant-b-patient",
2669 }),
2670 FhirVersion::default(),
2671 )
2672 .await
2673 .expect("seed tenant B search patient");
2674
2675 let composite_config = CompositeConfig::builder()
2676 .primary("primary", BackendKind::Sqlite)
2677 .search_backend("search", BackendKind::Sqlite)
2678 .build()
2679 .expect("build composite config");
2680
2681 let mut backends = HashMap::new();
2682 backends.insert("primary".to_string(), primary.clone() as DynStorage);
2683 backends.insert("search".to_string(), search.clone() as DynStorage);
2684
2685 let mut search_providers = HashMap::new();
2686 search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2687 search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2688
2689 let composite = CompositeStorage::new(composite_config, backends)
2690 .expect("create composite storage")
2691 .with_search_providers(search_providers)
2692 .with_full_primary(primary.clone());
2693
2694 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2695 name: "_id".to_string(),
2696 param_type: SearchParamType::Token,
2697 modifier: None,
2698 values: vec![SearchValue::eq("tenant-a-patient")],
2699 chain: vec![],
2700 components: vec![],
2701 });
2702
2703 let tenant_a_result = composite
2704 .search(&tenant_a, &query)
2705 .await
2706 .expect("tenant A composite search should succeed");
2707 assert_eq!(tenant_a_result.resources.len(), 1);
2708 assert_eq!(tenant_a_result.resources.items[0].id(), "tenant-a-patient");
2709
2710 let tenant_b_result = composite
2711 .search(&tenant_b, &query)
2712 .await
2713 .expect("tenant B composite search should succeed");
2714 assert!(
2715 tenant_b_result.resources.is_empty(),
2716 "delegated search must not leak tenant A data to tenant B"
2717 );
2718 }
2719
2720 #[test]
2721 fn test_search_backend_failure_marks_backend_unhealthy() {
2722 use std::collections::HashMap;
2723 use std::sync::Arc;
2724
2725 use crate::composite::config::HealthConfig;
2726
2727 let primary = Arc::new(FailingSearchBackend {
2728 backend_name: "primary",
2729 error_message: "primary should not be used",
2730 });
2731 let search = Arc::new(FailingSearchBackend {
2732 backend_name: "search",
2733 error_message: "simulated search outage",
2734 });
2735
2736 let composite_config = CompositeConfig::builder()
2737 .primary("primary", BackendKind::MongoDB)
2738 .search_backend("search", BackendKind::Elasticsearch)
2739 .with_health_config(HealthConfig {
2740 failure_threshold: 1,
2741 ..HealthConfig::default()
2742 })
2743 .build()
2744 .expect("build composite config");
2745
2746 let mut backends = HashMap::new();
2747 backends.insert("primary".to_string(), primary.clone() as DynStorage);
2748 backends.insert("search".to_string(), search.clone() as DynStorage);
2749
2750 let mut search_providers = HashMap::new();
2751 search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2752 search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2753
2754 let composite = CompositeStorage::new(composite_config, backends)
2755 .expect("create composite storage")
2756 .with_search_providers(search_providers);
2757
2758 let tenant = TenantContext::new(
2759 TenantId::new("tenant-failure"),
2760 TenantPermissions::full_access(),
2761 );
2762 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2763 name: "_id".to_string(),
2764 param_type: SearchParamType::Token,
2765 modifier: None,
2766 values: vec![SearchValue::eq("failure-patient")],
2767 chain: vec![],
2768 components: vec![],
2769 });
2770
2771 let runtime = tokio::runtime::Builder::new_current_thread()
2772 .enable_all()
2773 .build()
2774 .expect("build tokio runtime");
2775 let err = runtime
2776 .block_on(composite.search(&tenant, &query))
2777 .expect_err("delegated search should fail when search backend is down");
2778
2779 assert!(matches!(
2780 err,
2781 StorageError::Backend(BackendError::ConnectionFailed {
2782 backend_name,
2783 message,
2784 }) if backend_name == "search" && message.contains("simulated search outage")
2785 ));
2786
2787 let health = composite
2788 .backend_health("search")
2789 .expect("search backend health should exist");
2790 assert!(
2791 !health.healthy,
2792 "search backend should be marked unhealthy after failure"
2793 );
2794 assert_eq!(health.failure_count, 1);
2795 assert_eq!(
2796 health.last_error.as_deref(),
2797 Some("connection failed to search: simulated search outage")
2798 );
2799 }
2800
2801 #[test]
2804 fn test_new_success() {
2805 let composite = make_composite_no_secondary();
2806 assert_eq!(composite.backend_name(), "composite");
2807 }
2808
2809 #[test]
2810 fn test_new_missing_primary_backend_in_map() {
2811 let config = CompositeConfig::builder()
2812 .primary("primary", BackendKind::Sqlite)
2813 .build()
2814 .unwrap();
2815 let backends: HashMap<String, DynStorage> = HashMap::new();
2817 let result = CompositeStorage::new(config, backends);
2818 assert!(result.is_err());
2819 }
2820
2821 #[test]
2822 fn test_new_with_secondary() {
2823 let composite = make_composite_with_secondary();
2824 assert!(composite.secondary("es").is_some());
2825 assert!(composite.secondary("nonexistent").is_none());
2826 }
2827
2828 #[test]
2831 fn test_config_accessor() {
2832 let composite = make_composite_no_secondary();
2833 assert_eq!(composite.config().primary_id(), Some("primary"));
2834 }
2835
2836 #[test]
2837 fn test_primary_accessor() {
2838 let composite = make_composite_no_secondary();
2839 assert_eq!(composite.primary().backend_name(), "mock");
2840 }
2841
2842 #[test]
2843 fn test_secondaries_accessor() {
2844 let composite = make_composite_with_secondary();
2845 assert_eq!(composite.secondaries().len(), 1);
2846 assert!(composite.secondaries().contains_key("es"));
2847 }
2848
2849 #[test]
2850 fn test_secondaries_empty_when_no_secondary() {
2851 let composite = make_composite_no_secondary();
2852 assert!(composite.secondaries().is_empty());
2853 }
2854
2855 #[test]
2856 fn test_secondary_accessor_present() {
2857 let composite = make_composite_with_secondary();
2858 assert!(composite.secondary("es").is_some());
2859 }
2860
2861 #[test]
2862 fn test_secondary_accessor_absent() {
2863 let composite = make_composite_no_secondary();
2864 assert!(composite.secondary("missing").is_none());
2865 }
2866
2867 #[test]
2870 fn test_backend_health_initially_healthy() {
2871 let composite = make_composite_no_secondary();
2872 let health = composite.backend_health("primary").unwrap();
2873 assert!(health.healthy);
2874 }
2875
2876 #[test]
2877 fn test_backend_health_missing_id_returns_none() {
2878 let composite = make_composite_no_secondary();
2879 assert!(composite.backend_health("nonexistent").is_none());
2880 }
2881
2882 #[test]
2883 fn test_is_backend_healthy_true() {
2884 let composite = make_composite_no_secondary();
2885 assert!(composite.is_backend_healthy("primary"));
2886 }
2887
2888 #[test]
2889 fn test_is_backend_healthy_unknown_returns_false() {
2890 let composite = make_composite_no_secondary();
2891 assert!(!composite.is_backend_healthy("nonexistent"));
2892 }
2893
2894 #[test]
2895 fn test_update_health_success_resets_failures() {
2896 let composite = make_composite_no_secondary();
2897 composite.update_health("primary", false, Some("err1".to_string()));
2899 composite.update_health("primary", false, Some("err2".to_string()));
2900 let health = composite.backend_health("primary").unwrap();
2901 assert_eq!(health.failure_count, 2);
2902
2903 composite.update_health("primary", true, None);
2905 let health = composite.backend_health("primary").unwrap();
2906 assert!(health.healthy);
2907 assert_eq!(health.failure_count, 0);
2908 assert!(health.last_error.is_none());
2909 assert!(health.last_success.is_some());
2910 }
2911
2912 #[test]
2913 fn test_update_health_failure_increments_count() {
2914 let composite = make_composite_no_secondary();
2915 composite.update_health("primary", false, Some("timeout".to_string()));
2916 let health = composite.backend_health("primary").unwrap();
2917 assert_eq!(health.failure_count, 1);
2918 assert_eq!(health.last_error.as_deref(), Some("timeout"));
2919 }
2920
2921 #[test]
2922 fn test_update_health_marks_unhealthy_after_threshold() {
2923 let composite = make_composite_no_secondary();
2924 let threshold = composite.config.health_config.failure_threshold;
2926 for i in 0..threshold {
2927 composite.update_health("primary", false, Some(format!("error {}", i)));
2928 }
2929 let health = composite.backend_health("primary").unwrap();
2930 assert!(!health.healthy);
2931 assert_eq!(health.failure_count, threshold);
2932 }
2933
2934 #[test]
2935 fn test_update_health_ignores_unknown_backend() {
2936 let composite = make_composite_no_secondary();
2937 composite.update_health("nonexistent", false, Some("err".to_string()));
2939 }
2940
2941 #[tokio::test]
2944 async fn test_sync_to_secondaries_no_sync_manager_returns_ok() {
2945 let composite = make_composite_no_secondary();
2946 let result = composite
2948 .sync_to_secondaries(super::super::sync::SyncEvent::Delete {
2949 resource_type: "Patient".to_string(),
2950 resource_id: "1".to_string(),
2951 tenant_id: TenantId::new("test"),
2952 })
2953 .await;
2954 assert!(result.is_ok());
2955 }
2956
2957 #[test]
2960 fn test_routing_error_no_primary_backend() {
2961 let composite = make_composite_no_secondary();
2962 let err = composite.routing_error_to_storage_error(RoutingError::NoPrimaryBackend);
2963 match err {
2964 StorageError::Backend(BackendError::Unavailable { backend_name, .. }) => {
2965 assert_eq!(backend_name, "primary");
2966 }
2967 other => panic!("unexpected error: {:?}", other),
2968 }
2969 }
2970
2971 #[test]
2972 fn test_routing_error_no_capable_backend() {
2973 use super::super::analyzer::QueryFeature;
2974 let composite = make_composite_no_secondary();
2975 let err = composite.routing_error_to_storage_error(RoutingError::NoCapableBackend {
2976 feature: QueryFeature::FullTextSearch,
2977 });
2978 match err {
2979 StorageError::Backend(BackendError::UnsupportedCapability {
2980 backend_name,
2981 capability,
2982 }) => {
2983 assert_eq!(backend_name, "composite");
2984 assert!(!capability.is_empty());
2985 }
2986 other => panic!("unexpected error: {:?}", other),
2987 }
2988 }
2989
2990 #[test]
2991 fn test_routing_error_backend_unavailable() {
2992 let composite = make_composite_no_secondary();
2993 let err = composite.routing_error_to_storage_error(RoutingError::BackendUnavailable {
2994 backend_id: "my-backend".to_string(),
2995 });
2996 match err {
2997 StorageError::Backend(BackendError::ConnectionFailed { backend_name, .. }) => {
2998 assert_eq!(backend_name, "my-backend");
2999 }
3000 other => panic!("unexpected error: {:?}", other),
3001 }
3002 }
3003
3004 #[test]
3007 fn test_capabilities_backend_name() {
3008 let composite = make_composite_no_secondary();
3009 let caps = composite.capabilities();
3010 assert_eq!(caps.backend_name, "composite");
3011 }
3012
3013 #[test]
3014 fn test_backend_name_is_composite() {
3015 let composite = make_composite_no_secondary();
3016 assert_eq!(composite.backend_name(), "composite");
3017 }
3018
3019 #[tokio::test]
3022 async fn test_conditional_create_no_capability() {
3023 use crate::core::ConditionalStorage;
3024 let composite = make_composite_no_secondary();
3025 let tenant = make_tenant();
3026 let result = composite
3027 .conditional_create(
3028 &tenant,
3029 "Patient",
3030 serde_json::json!({}),
3031 "identifier=foo",
3032 FhirVersion::default(),
3033 )
3034 .await;
3035 assert!(result.is_err());
3036 }
3037
3038 #[tokio::test]
3039 async fn test_versioned_storage_vread_no_capability() {
3040 use crate::core::VersionedStorage;
3041 let composite = make_composite_no_secondary();
3042 let tenant = make_tenant();
3043 let result = composite.vread(&tenant, "Patient", "1", "1").await;
3044 assert!(result.is_err());
3045 match result.unwrap_err() {
3046 StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3047 assert!(capability.contains("VersionedStorage"));
3048 }
3049 other => panic!("unexpected error: {:?}", other),
3050 }
3051 }
3052
3053 #[tokio::test]
3054 async fn test_instance_history_no_capability() {
3055 use crate::core::InstanceHistoryProvider;
3056 use crate::core::history::HistoryParams;
3057 let composite = make_composite_no_secondary();
3058 let tenant = make_tenant();
3059 let result = composite
3060 .history_instance(&tenant, "Patient", "1", &HistoryParams::default())
3061 .await;
3062 assert!(result.is_err());
3063 match result.unwrap_err() {
3064 StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3065 assert!(capability.contains("InstanceHistoryProvider"));
3066 }
3067 other => panic!("unexpected error: {:?}", other),
3068 }
3069 }
3070
3071 #[tokio::test]
3072 async fn test_bundle_provider_process_batch_no_capability() {
3073 use crate::core::BundleProvider;
3074 let composite = make_composite_no_secondary();
3075 let tenant = make_tenant();
3076 let result = composite.process_batch(&tenant, vec![]).await;
3077 assert!(result.is_err());
3078 match result.unwrap_err() {
3079 StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3080 assert!(capability.contains("BundleProvider"));
3081 }
3082 other => panic!("unexpected error: {:?}", other),
3083 }
3084 }
3085
3086 #[tokio::test]
3087 async fn test_bundle_provider_process_transaction_no_capability() {
3088 use crate::core::BundleProvider;
3089 let composite = make_composite_no_secondary();
3090 let tenant = make_tenant();
3091 let result = composite.process_transaction(&tenant, vec![]).await;
3092 assert!(result.is_err());
3093 }
3094
3095 #[tokio::test]
3098 async fn test_search_count_no_search_provider_returns_error() {
3099 use crate::core::SearchProvider;
3100 use crate::types::SearchQuery;
3101 let composite = make_composite_no_secondary();
3102 let tenant = make_tenant();
3103 let query = SearchQuery::new("Patient");
3104 let result = composite.search_count(&tenant, &query).await;
3105 assert!(result.is_err());
3106 match result.unwrap_err() {
3107 StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3108 assert!(capability.contains("search_count"));
3109 }
3110 other => panic!("unexpected error: {:?}", other),
3111 }
3112 }
3113
3114 #[test]
3117 fn test_with_search_providers() {
3118 let composite = make_composite_no_secondary();
3119 let mut providers = HashMap::new();
3120 providers.insert(
3121 "primary".to_string(),
3122 Arc::new(MockStorage) as DynSearchProvider,
3123 );
3124 let composite = composite.with_search_providers(providers);
3125 assert!(composite.search_providers.contains_key("primary"));
3127 }
3128
3129 #[test]
3132 fn test_extract_reference_values_object_with_reference() {
3133 let obj = serde_json::json!({"reference": "Patient/123"});
3134 let mut refs = Vec::new();
3135 CompositeStorage::extract_reference_values(&obj, &mut refs);
3136 assert_eq!(refs, vec!["Patient/123"]);
3137 }
3138
3139 #[test]
3140 fn test_extract_reference_values_object_without_reference() {
3141 let obj = serde_json::json!({"display": "John Smith"});
3142 let mut refs = Vec::new();
3143 CompositeStorage::extract_reference_values(&obj, &mut refs);
3144 assert!(refs.is_empty());
3145 }
3146
3147 #[test]
3148 fn test_extract_reference_values_array() {
3149 let arr = serde_json::json!([
3150 {"reference": "Patient/1"},
3151 {"reference": "Patient/2"},
3152 {"display": "no ref here"}
3153 ]);
3154 let mut refs = Vec::new();
3155 CompositeStorage::extract_reference_values(&arr, &mut refs);
3156 assert_eq!(refs.len(), 2);
3157 assert!(refs.contains(&"Patient/1".to_string()));
3158 assert!(refs.contains(&"Patient/2".to_string()));
3159 }
3160
3161 #[test]
3162 fn test_extract_reference_values_primitive_ignored() {
3163 let val = serde_json::json!("just a string");
3164 let mut refs = Vec::new();
3165 CompositeStorage::extract_reference_values(&val, &mut refs);
3166 assert!(refs.is_empty());
3167 }
3168
3169 #[test]
3174 fn test_extract_references_uses_registry_expression() {
3175 use crate::search::{SearchParameterDefinition, SearchParameterRegistry};
3176 use crate::tenant::TenantId;
3177 use crate::types::SearchParamType;
3178
3179 let registry = Arc::new(parking_lot::RwLock::new(SearchParameterRegistry::new()));
3185 registry
3186 .write()
3187 .register(
3188 SearchParameterDefinition::new(
3189 "http://hl7.org/fhir/SearchParameter/Encounter-subject",
3190 "subject",
3191 SearchParamType::Reference,
3192 "Encounter.subject",
3193 )
3194 .with_base(vec!["Encounter"])
3195 .with_targets(vec!["Patient", "Group"]),
3196 )
3197 .unwrap();
3198
3199 struct MockWithRegistry {
3200 registry: Arc<parking_lot::RwLock<SearchParameterRegistry>>,
3201 }
3202
3203 #[async_trait::async_trait]
3204 impl ResourceStorage for MockWithRegistry {
3205 fn backend_name(&self) -> &'static str {
3206 "mock-with-registry"
3207 }
3208 async fn create(
3209 &self,
3210 _tenant: &TenantContext,
3211 _resource_type: &str,
3212 _resource: serde_json::Value,
3213 _fhir_version: FhirVersion,
3214 ) -> StorageResult<crate::types::StoredResource> {
3215 unimplemented!()
3216 }
3217 async fn create_or_update(
3218 &self,
3219 _tenant: &TenantContext,
3220 _resource_type: &str,
3221 _id: &str,
3222 _resource: serde_json::Value,
3223 _fhir_version: FhirVersion,
3224 ) -> StorageResult<(crate::types::StoredResource, bool)> {
3225 unimplemented!()
3226 }
3227 async fn read(
3228 &self,
3229 _tenant: &TenantContext,
3230 _resource_type: &str,
3231 _id: &str,
3232 ) -> StorageResult<Option<crate::types::StoredResource>> {
3233 Ok(None)
3234 }
3235 async fn update(
3236 &self,
3237 _tenant: &TenantContext,
3238 _current: &crate::types::StoredResource,
3239 _resource: serde_json::Value,
3240 ) -> StorageResult<crate::types::StoredResource> {
3241 unimplemented!()
3242 }
3243 async fn delete(
3244 &self,
3245 _tenant: &TenantContext,
3246 _resource_type: &str,
3247 _id: &str,
3248 ) -> StorageResult<()> {
3249 Ok(())
3250 }
3251 async fn count(
3252 &self,
3253 _tenant: &TenantContext,
3254 _resource_type: Option<&str>,
3255 ) -> StorageResult<u64> {
3256 Ok(0)
3257 }
3258 }
3259
3260 #[async_trait::async_trait]
3261 impl SearchProvider for MockWithRegistry {
3262 async fn search(
3263 &self,
3264 _tenant: &TenantContext,
3265 _query: &crate::types::SearchQuery,
3266 ) -> StorageResult<SearchResult> {
3267 use crate::types::Page;
3268 Ok(SearchResult::new(Page::empty()))
3269 }
3270 async fn search_count(
3271 &self,
3272 _tenant: &TenantContext,
3273 _query: &crate::types::SearchQuery,
3274 ) -> StorageResult<u64> {
3275 Ok(0)
3276 }
3277 fn search_param_registry(&self) -> &Arc<parking_lot::RwLock<SearchParameterRegistry>> {
3278 &self.registry
3279 }
3280 }
3281
3282 let config = CompositeConfig::builder()
3283 .primary("primary", BackendKind::Sqlite)
3284 .build()
3285 .unwrap();
3286 let backend = Arc::new(MockWithRegistry {
3287 registry: Arc::clone(®istry),
3288 });
3289 let mut backends = HashMap::new();
3290 backends.insert("primary".to_string(), backend.clone() as DynStorage);
3291 let mut providers = HashMap::new();
3292 providers.insert("primary".to_string(), backend.clone() as DynSearchProvider);
3293 let composite = CompositeStorage::new(config, backends)
3294 .unwrap()
3295 .with_search_providers(providers);
3296
3297 let content = serde_json::json!({
3299 "resourceType": "Encounter",
3300 "id": "e1",
3301 "subject": {"reference": "Patient/p1"},
3302 });
3303 let resource = crate::types::StoredResource::new(
3304 "Encounter",
3305 "e1",
3306 TenantId::new("t"),
3307 content,
3308 FhirVersion::default(),
3309 );
3310
3311 let refs = composite.extract_references(&resource, "subject");
3312 assert_eq!(refs, vec!["Patient/p1".to_string()]);
3313 }
3314
3315 #[test]
3316 fn test_extract_reference_values_null_ignored() {
3317 let val = serde_json::Value::Null;
3318 let mut refs = Vec::new();
3319 CompositeStorage::extract_reference_values(&val, &mut refs);
3320 assert!(refs.is_empty());
3321 }
3322
3323 #[tokio::test]
3326 async fn test_create_delegates_to_primary() {
3327 use crate::core::ResourceStorage;
3328 let composite = make_composite_no_secondary();
3329 let tenant = make_tenant();
3330 let result = composite
3331 .create(
3332 &tenant,
3333 "Patient",
3334 serde_json::json!({"resourceType": "Patient"}),
3335 FhirVersion::default(),
3336 )
3337 .await;
3338 assert!(result.is_ok());
3339 let stored = result.unwrap();
3340 assert_eq!(stored.resource_type(), "Patient");
3341 }
3342
3343 #[tokio::test]
3344 async fn test_read_delegates_to_primary() {
3345 use crate::core::ResourceStorage;
3346 let composite = make_composite_no_secondary();
3347 let tenant = make_tenant();
3348 let result = composite.read(&tenant, "Patient", "1").await;
3349 assert!(result.is_ok());
3350 assert!(result.unwrap().is_none()); }
3352
3353 #[tokio::test]
3354 async fn test_count_delegates_to_primary() {
3355 use crate::core::ResourceStorage;
3356 let composite = make_composite_no_secondary();
3357 let tenant = make_tenant();
3358 let result = composite.count(&tenant, Some("Patient")).await;
3359 assert!(result.is_ok());
3360 assert_eq!(result.unwrap(), 0);
3361 }
3362
3363 #[tokio::test]
3364 async fn test_delete_delegates_to_primary() {
3365 use crate::core::ResourceStorage;
3366 let composite = make_composite_no_secondary();
3367 let tenant = make_tenant();
3368 let result = composite.delete(&tenant, "Patient", "1").await;
3369 assert!(result.is_ok());
3370 }
3371
3372 fn make_composite_with_search_provider() -> CompositeStorage {
3373 let composite = make_composite_no_secondary();
3374 let mut providers = HashMap::new();
3375 providers.insert(
3376 "primary".to_string(),
3377 Arc::new(MockStorage) as DynSearchProvider,
3378 );
3379 composite.with_search_providers(providers)
3380 }
3381
3382 #[tokio::test]
3390 async fn test_resolve_chain_three_segments_does_not_error() {
3391 use crate::core::ChainedSearchProvider;
3392 let composite = make_composite_with_search_provider();
3393 let tenant = make_tenant();
3394 let result = composite
3395 .resolve_chain(
3396 &tenant,
3397 "Observation",
3398 "subject.organization.name",
3399 "Hospital",
3400 )
3401 .await;
3402 assert!(result.is_ok(), "resolve_chain failed: {:?}", result.err());
3403 assert!(result.unwrap().is_empty());
3404 }
3405
3406 #[tokio::test]
3407 async fn test_resolve_chain_short_chain_does_not_error() {
3408 use crate::core::ChainedSearchProvider;
3409 let composite = make_composite_with_search_provider();
3410 let tenant = make_tenant();
3411 let result = composite
3412 .resolve_chain(&tenant, "Observation", "patient.name", "Smith")
3413 .await;
3414 assert!(result.is_ok());
3415 assert!(result.unwrap().is_empty());
3416 }
3417
3418 #[tokio::test]
3419 async fn test_resolve_chain_invalid_chain_returns_empty() {
3420 use crate::core::ChainedSearchProvider;
3421 let composite = make_composite_with_search_provider();
3422 let tenant = make_tenant();
3423 let result = composite
3425 .resolve_chain(&tenant, "Observation", "patient", "x")
3426 .await;
3427 assert!(result.is_ok());
3428 assert!(result.unwrap().is_empty());
3429 }
3430
3431 #[async_trait::async_trait]
3433 impl SearchProvider for MockStorage {
3434 async fn search(
3435 &self,
3436 _tenant: &TenantContext,
3437 _query: &crate::types::SearchQuery,
3438 ) -> StorageResult<SearchResult> {
3439 use crate::types::Page;
3440 Ok(SearchResult::new(Page::empty()))
3441 }
3442
3443 async fn search_count(
3444 &self,
3445 _tenant: &TenantContext,
3446 _query: &crate::types::SearchQuery,
3447 ) -> StorageResult<u64> {
3448 Ok(0)
3449 }
3450
3451 fn search_param_registry(
3452 &self,
3453 ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
3454 use std::sync::OnceLock;
3455 static EMPTY: OnceLock<
3456 std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
3457 > = OnceLock::new();
3458 EMPTY.get_or_init(|| {
3459 std::sync::Arc::new(parking_lot::RwLock::new(
3460 crate::search::SearchParameterRegistry::new(),
3461 ))
3462 })
3463 }
3464 }
3465}