Skip to main content

helios_persistence/composite/
storage.rs

1//! CompositeStorage implementation.
2//!
3//! This module provides the main `CompositeStorage` struct that coordinates
4//! multiple backends for FHIR resource storage and querying.
5//!
6//! # Overview
7//!
8//! `CompositeStorage` implements all storage traits by delegating to appropriate
9//! backends based on operation type:
10//!
11//! - **Writes (CRUD)**: Always go to the primary backend
12//! - **Reads**: Go to primary, with optional secondary enrichment
13//! - **Search**: Routed based on query features to optimal backends
14//!
15//! # Example
16//!
17//! ```ignore
18//! use helios_persistence::composite::{CompositeStorage, CompositeConfig};
19//!
20//! let config = CompositeConfig::builder()
21//!     .primary("sqlite", BackendKind::Sqlite)
22//!     .search_backend("es", BackendKind::Elasticsearch)
23//!     .build()?;
24//!
25//! let storage = CompositeStorage::new(config, backends).await?;
26//!
27//! // All CRUD goes to primary (sqlite)
28//! let patient = storage.create(&tenant, "Patient", patient_json).await?;
29//!
30//! // Search is routed based on features
31//! // - Basic search → sqlite
32//! // - Full-text (_text) → elasticsearch
33//! let results = storage.search(&tenant, &query).await?;
34//! ```
35
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use async_trait::async_trait;
40use helios_fhir::FhirVersion;
41use parking_lot::RwLock;
42use serde_json::Value;
43use tracing::{debug, instrument, warn};
44
45use crate::core::history::HistoryParams;
46use crate::core::{
47    BundleEntry, BundleProvider, BundleResult, CapabilityProvider, ChainedSearchProvider,
48    ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
49    ConditionalUpdateResult, ExportDataProvider, ExportRequest, GroupExportProvider,
50    IncludeProvider, InstanceHistoryProvider, NdjsonBatch, PatchFormat, PatientExportProvider,
51    ResourceStorage, RevincludeProvider, SearchProvider, SearchResult, SofRunner,
52    StorageCapabilities, SystemHistoryProvider, TerminologySearchProvider, TextSearchProvider,
53    TypeHistoryProvider, VersionedStorage,
54};
55use crate::error::{BackendError, StorageError, StorageResult, TransactionError};
56use crate::tenant::TenantContext;
57use crate::types::{
58    IncludeDirective, Pagination, ReverseChainedParameter, SearchParamType, SearchParameter,
59    SearchQuery, SearchValue, StoredResource,
60};
61
62use super::config::{CompositeConfig, SyncMode};
63use super::merger::{MergeOptions, ResultMerger};
64use super::router::{QueryRouter, RoutingDecision, RoutingError};
65use super::sync::{SyncEvent, SyncManager};
66
67/// A dynamically typed storage backend.
68pub type DynStorage = Arc<dyn ResourceStorage + Send + Sync>;
69
70/// A dynamically typed search provider.
71pub type DynSearchProvider = Arc<dyn SearchProvider + Send + Sync>;
72
73/// A dynamically typed conditional storage provider.
74pub type DynConditionalStorage = Arc<dyn ConditionalStorage + Send + Sync>;
75
76/// A dynamically typed versioned storage provider.
77pub type DynVersionedStorage = Arc<dyn VersionedStorage + Send + Sync>;
78
79/// A dynamically typed instance history provider.
80pub type DynInstanceHistoryProvider = Arc<dyn InstanceHistoryProvider + Send + Sync>;
81
82/// A dynamically typed system history provider (also covers Type + Instance).
83pub type DynSystemHistoryProvider = Arc<dyn SystemHistoryProvider + Send + Sync>;
84
85/// A dynamically typed bundle provider.
86pub type DynBundleProvider = Arc<dyn BundleProvider + Send + Sync>;
87
88/// A dynamically typed group export provider (also covers Patient + System).
89pub type DynGroupExportProvider = Arc<dyn GroupExportProvider + Send + Sync>;
90
91/// Composite storage that coordinates multiple backends.
92///
93/// This is the main entry point for polyglot persistence. It implements
94/// all storage traits and routes operations to appropriate backends.
95///
96/// # Full Primary Support
97///
98/// When the primary backend supports advanced traits (versioning, conditional
99/// operations, history, bundles), use [`with_full_primary()`](Self::with_full_primary)
100/// to enable delegation of these operations through the composite layer.
101pub struct CompositeStorage {
102    /// Configuration.
103    config: CompositeConfig,
104
105    /// Primary storage backend.
106    primary: DynStorage,
107
108    /// Secondary backends by ID.
109    secondaries: HashMap<String, DynStorage>,
110
111    /// Search providers by backend ID.
112    search_providers: HashMap<String, DynSearchProvider>,
113
114    /// Query router.
115    router: QueryRouter,
116
117    /// Result merger.
118    merger: ResultMerger,
119
120    /// Synchronization manager.
121    sync_manager: Option<SyncManager>,
122
123    /// Backend health status.
124    health_status: Arc<RwLock<HashMap<String, BackendHealth>>>,
125
126    // Typed trait objects for primary's advanced capabilities.
127    // These are set via `with_full_primary()` to support delegation.
128    /// Primary as ConditionalStorage (if supported).
129    conditional_storage: Option<DynConditionalStorage>,
130
131    /// Primary as VersionedStorage (if supported).
132    versioned_storage: Option<DynVersionedStorage>,
133
134    /// Primary as InstanceHistoryProvider (if supported).
135    history_provider: Option<DynInstanceHistoryProvider>,
136
137    /// Primary as SystemHistoryProvider (if supported) — covers Type + System history.
138    system_history_provider: Option<DynSystemHistoryProvider>,
139
140    /// Primary as BundleProvider (if supported).
141    bundle_provider: Option<DynBundleProvider>,
142
143    /// Primary as GroupExportProvider (if supported) — covers all export levels.
144    export_provider: Option<DynGroupExportProvider>,
145}
146
147/// Health status for a backend.
148#[derive(Debug, Clone)]
149pub struct BackendHealth {
150    /// Whether the backend is healthy.
151    pub healthy: bool,
152
153    /// Last successful operation timestamp.
154    pub last_success: Option<std::time::Instant>,
155
156    /// Consecutive failure count.
157    pub failure_count: u32,
158
159    /// Last error message.
160    pub last_error: Option<String>,
161}
162
163impl Default for BackendHealth {
164    fn default() -> Self {
165        Self {
166            healthy: true,
167            last_success: None,
168            failure_count: 0,
169            last_error: None,
170        }
171    }
172}
173
174impl CompositeStorage {
175    fn has_dedicated_search_backend(&self) -> bool {
176        self.config
177            .backends_with_role(super::config::BackendRole::Search)
178            .next()
179            .is_some()
180    }
181
182    fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
183        params
184            .split('&')
185            .filter_map(|pair| {
186                let parts: Vec<&str> = pair.splitn(2, '=').collect();
187                if parts.len() == 2 {
188                    Some((parts[0].to_string(), parts[1].to_string()))
189                } else {
190                    None
191                }
192            })
193            .collect()
194    }
195
196    fn infer_conditional_param_type(name: &str) -> SearchParamType {
197        match name {
198            "_id" => SearchParamType::Token,
199            "_lastUpdated" => SearchParamType::Date,
200            "_tag" | "_profile" | "_security" | "identifier" => SearchParamType::Token,
201            "patient" | "subject" | "encounter" | "performer" | "author" | "requester"
202            | "recorder" | "asserter" | "practitioner" | "organization" | "location" | "device" => {
203                SearchParamType::Reference
204            }
205            _ => SearchParamType::String,
206        }
207    }
208
209    async fn find_conditional_matches(
210        &self,
211        tenant: &TenantContext,
212        resource_type: &str,
213        search_params: &str,
214    ) -> StorageResult<Vec<StoredResource>> {
215        let parsed_params = Self::parse_simple_search_params(search_params);
216        if parsed_params.is_empty() {
217            return Ok(Vec::new());
218        }
219
220        let mut query = SearchQuery::new(resource_type);
221        query.count = Some(1000);
222        for (name, value) in parsed_params {
223            query = query.with_parameter(SearchParameter {
224                name: name.clone(),
225                param_type: Self::infer_conditional_param_type(&name),
226                modifier: None,
227                values: vec![SearchValue::parse(&value)],
228                chain: vec![],
229                components: vec![],
230            });
231        }
232
233        let result = self.search(tenant, &query).await?;
234        Ok(result.resources.items)
235    }
236
237    /// Creates a new composite storage with the given configuration and backends.
238    ///
239    /// # Arguments
240    ///
241    /// * `config` - The composite storage configuration
242    /// * `backends` - Map of backend ID to storage implementation
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the primary backend is not found in the backends map.
247    pub fn new(
248        config: CompositeConfig,
249        backends: HashMap<String, DynStorage>,
250    ) -> StorageResult<Self> {
251        let primary_id = config.primary_id().ok_or_else(|| {
252            StorageError::Backend(BackendError::Unavailable {
253                backend_name: "primary".to_string(),
254                message: "No primary backend configured".to_string(),
255            })
256        })?;
257
258        let primary = backends.get(primary_id).cloned().ok_or_else(|| {
259            StorageError::Backend(BackendError::Unavailable {
260                backend_name: primary_id.to_string(),
261                message: format!("Primary backend '{}' not found in backends map", primary_id),
262            })
263        })?;
264
265        // Separate out secondaries
266        let secondaries: HashMap<_, _> = backends
267            .iter()
268            .filter(|(id, _)| *id != primary_id)
269            .map(|(id, backend)| (id.clone(), backend.clone()))
270            .collect();
271
272        // Initialize health status
273        let mut health_status = HashMap::new();
274        health_status.insert(primary_id.to_string(), BackendHealth::default());
275        for id in secondaries.keys() {
276            health_status.insert(id.clone(), BackendHealth::default());
277        }
278
279        let router = QueryRouter::new(config.clone());
280        let merger = ResultMerger::new();
281
282        // Create sync manager if we have secondaries
283        let sync_manager = if !secondaries.is_empty() {
284            Some(SyncManager::new(config.sync_config.clone()))
285        } else {
286            None
287        };
288
289        Ok(Self {
290            config,
291            primary,
292            secondaries,
293            search_providers: HashMap::new(),
294            router,
295            merger,
296            sync_manager,
297            health_status: Arc::new(RwLock::new(health_status)),
298            conditional_storage: None,
299            versioned_storage: None,
300            history_provider: None,
301            system_history_provider: None,
302            bundle_provider: None,
303            export_provider: None,
304        })
305    }
306
307    /// Creates a composite storage with search providers.
308    ///
309    /// Search providers allow specialized search backends like Elasticsearch.
310    pub fn with_search_providers(mut self, providers: HashMap<String, DynSearchProvider>) -> Self {
311        self.search_providers = providers;
312        self
313    }
314
315    /// Starts the background sync worker for secondary backends.
316    ///
317    /// Must be called from within a Tokio runtime. When the sync mode is
318    /// `Asynchronous` or `Hybrid`, this spawns a background task that
319    /// processes sync events without blocking write requests. Without this,
320    /// every write falls back to synchronous secondary indexing which can
321    /// cause timeouts on large transaction bundles.
322    pub fn start_sync_workers(mut self) -> Self {
323        if let Some(ref mut manager) = self.sync_manager {
324            if matches!(
325                self.config.sync_config.mode,
326                SyncMode::Asynchronous | SyncMode::Hybrid { .. }
327            ) {
328                manager.start_async_worker(self.secondaries.clone());
329            }
330        }
331        self
332    }
333
334    /// Registers the primary backend's advanced capabilities for delegation.
335    ///
336    /// When the primary backend implements traits beyond `ResourceStorage`
337    /// (e.g., `ConditionalStorage`, `VersionedStorage`, `InstanceHistoryProvider`,
338    /// `BundleProvider`), this method stores typed references so that
339    /// `CompositeStorage` can delegate these operations to the primary.
340    ///
341    /// # Example
342    ///
343    /// ```ignore
344    /// let sqlite = Arc::new(SqliteBackend::new("fhir.db")?);
345    /// let composite = CompositeStorage::new(config, backends)?
346    ///     .with_full_primary(sqlite.clone());
347    /// ```
348    pub fn with_full_primary<T>(mut self, primary: Arc<T>) -> Self
349    where
350        T: ResourceStorage
351            + ConditionalStorage
352            + VersionedStorage
353            + InstanceHistoryProvider
354            + TypeHistoryProvider
355            + SystemHistoryProvider
356            + BundleProvider
357            + GroupExportProvider
358            + Send
359            + Sync
360            + 'static,
361    {
362        self.conditional_storage = Some(primary.clone() as DynConditionalStorage);
363        self.versioned_storage = Some(primary.clone() as DynVersionedStorage);
364        self.history_provider = Some(primary.clone() as DynInstanceHistoryProvider);
365        self.system_history_provider = Some(primary.clone() as DynSystemHistoryProvider);
366        self.bundle_provider = Some(primary.clone() as DynBundleProvider);
367        self.export_provider = Some(primary as DynGroupExportProvider);
368        self
369    }
370
371    /// Returns the configuration.
372    pub fn config(&self) -> &CompositeConfig {
373        &self.config
374    }
375
376    /// Returns the primary backend.
377    pub fn primary(&self) -> &DynStorage {
378        &self.primary
379    }
380
381    /// Returns a secondary backend by ID.
382    pub fn secondary(&self, id: &str) -> Option<&DynStorage> {
383        self.secondaries.get(id)
384    }
385
386    /// Returns all secondary backends.
387    pub fn secondaries(&self) -> &HashMap<String, DynStorage> {
388        &self.secondaries
389    }
390
391    /// Returns the health status for a backend.
392    pub fn backend_health(&self, id: &str) -> Option<BackendHealth> {
393        self.health_status.read().get(id).cloned()
394    }
395
396    /// Returns true if a backend is healthy.
397    pub fn is_backend_healthy(&self, id: &str) -> bool {
398        self.health_status
399            .read()
400            .get(id)
401            .map(|h| h.healthy)
402            .unwrap_or(false)
403    }
404
405    /// Updates health status after an operation.
406    fn update_health(&self, backend_id: &str, success: bool, error: Option<String>) {
407        let mut status = self.health_status.write();
408        if let Some(health) = status.get_mut(backend_id) {
409            if success {
410                health.healthy = true;
411                health.last_success = Some(std::time::Instant::now());
412                health.failure_count = 0;
413                health.last_error = None;
414            } else {
415                health.failure_count += 1;
416                health.last_error = error;
417
418                // Mark unhealthy after threshold failures
419                if health.failure_count >= self.config.health_config.failure_threshold {
420                    health.healthy = false;
421                    warn!(
422                        backend_id = backend_id,
423                        failures = health.failure_count,
424                        "Backend marked unhealthy"
425                    );
426                }
427            }
428        }
429    }
430
431    /// Synchronizes a resource change to secondary backends.
432    async fn sync_to_secondaries(&self, event: SyncEvent) -> StorageResult<()> {
433        if let Some(ref sync_manager) = self.sync_manager {
434            sync_manager.sync(&event, &self.secondaries).await?;
435        }
436        Ok(())
437    }
438
439    /// Routes and executes a search query.
440    #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
441    async fn execute_routed_search(
442        &self,
443        tenant: &TenantContext,
444        query: &SearchQuery,
445    ) -> StorageResult<SearchResult> {
446        // Route the query
447        let decision = self
448            .router
449            .route(query)
450            .map_err(|e| self.routing_error_to_storage_error(e))?;
451
452        debug!(
453            primary = %decision.primary_target,
454            auxiliary_count = decision.auxiliary_targets.len(),
455            merge_strategy = ?decision.merge_strategy,
456            "Routing query"
457        );
458
459        // If no auxiliary backends, just execute on primary
460        if decision.auxiliary_targets.is_empty() {
461            return self.execute_primary_search(tenant, query).await;
462        }
463
464        // Execute on all backends in parallel
465        let (primary_result, auxiliary_results) = self
466            .execute_parallel_search(tenant, query, &decision)
467            .await?;
468
469        // Merge results
470        let merge_options = MergeOptions {
471            strategy: decision.merge_strategy,
472            preserve_primary_order: true,
473            deduplicate: true,
474        };
475
476        self.merger
477            .merge(primary_result, auxiliary_results, merge_options)
478    }
479
480    /// Executes search on the primary backend.
481    ///
482    /// When a dedicated Search backend (e.g., Elasticsearch) is configured, all
483    /// searches are routed there. This is necessary because the primary backend
484    /// may have search indexing disabled (`search_offloaded = true`), leaving its
485    /// search index empty.
486    async fn execute_primary_search(
487        &self,
488        tenant: &TenantContext,
489        query: &SearchQuery,
490    ) -> StorageResult<SearchResult> {
491        // Prefer the Search backend when one is configured, since the primary
492        // may have offloaded search indexing to it.
493        if let Some(search_backend) = self
494            .config
495            .backends_with_role(super::config::BackendRole::Search)
496            .next()
497        {
498            if let Some(provider) = self.search_providers.get(&search_backend.id) {
499                let result = provider.search(tenant, query).await;
500                self.update_health(
501                    &search_backend.id,
502                    result.is_ok(),
503                    result.as_ref().err().map(|e| e.to_string()),
504                );
505                return result;
506            }
507        }
508
509        // Fall back to primary
510        let primary_id = self.config.primary_id().unwrap_or("primary");
511
512        if let Some(provider) = self.search_providers.get(primary_id) {
513            let result = provider.search(tenant, query).await;
514            self.update_health(
515                primary_id,
516                result.is_ok(),
517                result.as_ref().err().map(|e| e.to_string()),
518            );
519            result
520        } else {
521            Err(StorageError::Backend(BackendError::UnsupportedCapability {
522                backend_name: primary_id.to_string(),
523                capability: "SearchProvider".to_string(),
524            }))
525        }
526    }
527
528    /// Executes search on primary and auxiliary backends in parallel.
529    async fn execute_parallel_search(
530        &self,
531        tenant: &TenantContext,
532        query: &SearchQuery,
533        decision: &RoutingDecision,
534    ) -> StorageResult<(SearchResult, Vec<(String, SearchResult)>)> {
535        use tokio::task::JoinSet;
536
537        let mut tasks: JoinSet<(String, StorageResult<SearchResult>)> = JoinSet::new();
538
539        // Clone what we need for async tasks
540        let tenant = tenant.clone();
541        let query = query.clone();
542        let primary_id = decision.primary_target.clone();
543
544        // Start primary search
545        if let Some(provider) = self.search_providers.get(&primary_id).cloned() {
546            let t = tenant.clone();
547            let q = query.clone();
548            let id = primary_id.clone();
549            tasks.spawn(async move {
550                let result = provider.search(&t, &q).await;
551                (id, result)
552            });
553        }
554
555        // Start auxiliary searches
556        for (feature, backend_id) in &decision.auxiliary_targets {
557            if let Some(provider) = self.search_providers.get(backend_id).cloned() {
558                // Create a modified query with only the relevant parameters
559                let part_params = decision
560                    .analysis
561                    .feature_params
562                    .get(feature)
563                    .cloned()
564                    .unwrap_or_default();
565
566                let mut aux_query = SearchQuery::new(&query.resource_type);
567                for param in part_params {
568                    aux_query = aux_query.with_parameter(param);
569                }
570                aux_query.count = query.count;
571                aux_query.offset = query.offset;
572                aux_query.cursor = query.cursor.clone();
573
574                let t = tenant.clone();
575                let id = backend_id.clone();
576                tasks.spawn(async move {
577                    let result = provider.search(&t, &aux_query).await;
578                    (id, result)
579                });
580            }
581        }
582
583        // Collect results
584        let mut primary_result: Option<SearchResult> = None;
585        let mut primary_unsupported = false;
586        let mut auxiliary_results = Vec::new();
587
588        while let Some(result) = tasks.join_next().await {
589            match result {
590                Ok((id, search_result)) => {
591                    self.update_health(
592                        &id,
593                        search_result.is_ok(),
594                        search_result.as_ref().err().map(|e| e.to_string()),
595                    );
596
597                    if id == primary_id {
598                        match search_result {
599                            Ok(r) => primary_result = Some(r),
600                            Err(StorageError::Backend(BackendError::UnsupportedCapability {
601                                ..
602                            })) => {
603                                // Primary doesn't support this search feature (e.g. S3 has no
604                                // full-text search). An auxiliary backend (e.g. Elasticsearch)
605                                // will handle it — promote its result to primary below.
606                                primary_unsupported = true;
607                            }
608                            Err(e) => return Err(e),
609                        }
610                    } else if let Ok(res) = search_result {
611                        auxiliary_results.push((id, res));
612                    }
613                    // Ignore other auxiliary failures - graceful degradation
614                }
615                Err(e) => {
616                    warn!(error = %e, "Task join error during parallel search");
617                }
618            }
619        }
620
621        // When primary lacks search capability and auxiliary has results, promote the
622        // first auxiliary result to primary so the merger can return it directly.
623        if primary_unsupported && primary_result.is_none() {
624            if !auxiliary_results.is_empty() {
625                let (_, promoted) = auxiliary_results.remove(0);
626                return Ok((promoted, auxiliary_results));
627            }
628            return Err(StorageError::Backend(BackendError::UnsupportedCapability {
629                backend_name: primary_id,
630                capability: "search".to_string(),
631            }));
632        }
633
634        let primary = primary_result.ok_or_else(|| {
635            StorageError::Backend(BackendError::ConnectionFailed {
636                backend_name: primary_id,
637                message: "Primary search task failed".to_string(),
638            })
639        })?;
640
641        Ok((primary, auxiliary_results))
642    }
643
644    /// Syncs bundle results to secondaries by extracting resource info from responses.
645    async fn sync_bundle_results(&self, tenant: &TenantContext, result: &BundleResult) {
646        for entry_result in &result.entries {
647            // Only sync successful mutating operations that have a resource body
648            if let Some(ref resource_json) = entry_result.resource {
649                let resource_type = resource_json
650                    .get("resourceType")
651                    .and_then(|v| v.as_str())
652                    .unwrap_or_default();
653                let resource_id = resource_json
654                    .get("id")
655                    .and_then(|v| v.as_str())
656                    .unwrap_or_default();
657
658                if resource_type.is_empty() || resource_id.is_empty() {
659                    continue;
660                }
661
662                let fhir_version = resource_json
663                    .get("meta")
664                    .and_then(|m| m.get("profile"))
665                    .map(|_| FhirVersion::default_enabled())
666                    .unwrap_or_else(FhirVersion::default_enabled);
667
668                if let Err(e) = self
669                    .sync_to_secondaries(SyncEvent::Create {
670                        resource_type: resource_type.to_string(),
671                        resource_id: resource_id.to_string(),
672                        content: resource_json.clone(),
673                        tenant_id: tenant.tenant_id().clone(),
674                        fhir_version,
675                    })
676                    .await
677                {
678                    warn!(
679                        error = %e,
680                        resource_type = resource_type,
681                        resource_id = resource_id,
682                        "Failed to sync bundle entry to secondaries"
683                    );
684                }
685            }
686        }
687    }
688
689    /// Converts a routing error to a storage error.
690    fn routing_error_to_storage_error(&self, err: RoutingError) -> StorageError {
691        match err {
692            RoutingError::NoPrimaryBackend => StorageError::Backend(BackendError::Unavailable {
693                backend_name: "primary".to_string(),
694                message: "No primary backend configured".to_string(),
695            }),
696            RoutingError::NoCapableBackend { feature } => {
697                StorageError::Backend(BackendError::UnsupportedCapability {
698                    backend_name: "composite".to_string(),
699                    capability: format!("{:?}", feature),
700                })
701            }
702            RoutingError::BackendUnavailable { backend_id } => {
703                StorageError::Backend(BackendError::ConnectionFailed {
704                    backend_name: backend_id,
705                    message: "Backend unavailable".to_string(),
706                })
707            }
708        }
709    }
710}
711
712#[async_trait]
713impl ResourceStorage for CompositeStorage {
714    fn backend_name(&self) -> &'static str {
715        "composite"
716    }
717
718    fn sof_runner(&self) -> Option<Arc<dyn SofRunner>> {
719        // SQL-on-FHIR runs as in-DB SQL against the primary store (where all
720        // writes land), so delegate to the primary backend. Composites whose
721        // primary is SOF-capable (e.g. sqlite-elasticsearch) thus expose a
722        // runner; others inherit `None`.
723        self.primary.sof_runner()
724    }
725
726    #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type))]
727    async fn create(
728        &self,
729        tenant: &TenantContext,
730        resource_type: &str,
731        resource: Value,
732        fhir_version: FhirVersion,
733    ) -> StorageResult<StoredResource> {
734        // All writes go to primary
735        let result = self
736            .primary
737            .create(tenant, resource_type, resource.clone(), fhir_version)
738            .await;
739
740        let primary_id = self.config.primary_id().unwrap_or("primary");
741        self.update_health(
742            primary_id,
743            result.is_ok(),
744            result.as_ref().err().map(|e| e.to_string()),
745        );
746
747        let stored = result?;
748
749        // Sync to secondaries
750        if let Err(e) = self
751            .sync_to_secondaries(SyncEvent::Create {
752                resource_type: resource_type.to_string(),
753                resource_id: stored.id().to_string(),
754                content: stored.content().clone(),
755                tenant_id: tenant.tenant_id().clone(),
756                fhir_version,
757            })
758            .await
759        {
760            warn!(error = %e, "Failed to sync create to secondaries");
761            // Don't fail the operation - primary succeeded
762        }
763
764        Ok(stored)
765    }
766
767    #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type, id = %id))]
768    async fn create_or_update(
769        &self,
770        tenant: &TenantContext,
771        resource_type: &str,
772        id: &str,
773        resource: Value,
774        fhir_version: FhirVersion,
775    ) -> StorageResult<(StoredResource, bool)> {
776        let result = self
777            .primary
778            .create_or_update(tenant, resource_type, id, resource.clone(), fhir_version)
779            .await;
780
781        let primary_id = self.config.primary_id().unwrap_or("primary");
782        self.update_health(
783            primary_id,
784            result.is_ok(),
785            result.as_ref().err().map(|e| e.to_string()),
786        );
787
788        let (stored, created) = result?;
789
790        // Sync to secondaries
791        let event = if created {
792            SyncEvent::Create {
793                resource_type: resource_type.to_string(),
794                resource_id: id.to_string(),
795                content: stored.content().clone(),
796                tenant_id: tenant.tenant_id().clone(),
797                fhir_version,
798            }
799        } else {
800            SyncEvent::Update {
801                resource_type: resource_type.to_string(),
802                resource_id: id.to_string(),
803                content: stored.content().clone(),
804                tenant_id: tenant.tenant_id().clone(),
805                version: stored.version_id().to_string(),
806                fhir_version,
807            }
808        };
809
810        if let Err(e) = self.sync_to_secondaries(event).await {
811            warn!(error = %e, "Failed to sync create_or_update to secondaries");
812        }
813
814        Ok((stored, created))
815    }
816
817    #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
818    async fn read(
819        &self,
820        tenant: &TenantContext,
821        resource_type: &str,
822        id: &str,
823    ) -> StorageResult<Option<StoredResource>> {
824        // Reads always go to primary (source of truth)
825        let result = self.primary.read(tenant, resource_type, id).await;
826
827        let primary_id = self.config.primary_id().unwrap_or("primary");
828        self.update_health(
829            primary_id,
830            result.is_ok(),
831            result.as_ref().err().map(|e| e.to_string()),
832        );
833
834        result
835    }
836
837    #[instrument(skip(self, tenant, resource), fields(resource_type = %current.resource_type(), id = %current.id()))]
838    async fn update(
839        &self,
840        tenant: &TenantContext,
841        current: &StoredResource,
842        resource: Value,
843    ) -> StorageResult<StoredResource> {
844        let result = self.primary.update(tenant, current, resource.clone()).await;
845
846        let primary_id = self.config.primary_id().unwrap_or("primary");
847        self.update_health(
848            primary_id,
849            result.is_ok(),
850            result.as_ref().err().map(|e| e.to_string()),
851        );
852
853        let stored = result?;
854
855        // Sync to secondaries
856        if let Err(e) = self
857            .sync_to_secondaries(SyncEvent::Update {
858                resource_type: current.resource_type().to_string(),
859                resource_id: current.id().to_string(),
860                content: stored.content().clone(),
861                tenant_id: tenant.tenant_id().clone(),
862                version: stored.version_id().to_string(),
863                fhir_version: stored.fhir_version(),
864            })
865            .await
866        {
867            warn!(error = %e, "Failed to sync update to secondaries");
868        }
869
870        Ok(stored)
871    }
872
873    #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
874    async fn delete(
875        &self,
876        tenant: &TenantContext,
877        resource_type: &str,
878        id: &str,
879    ) -> StorageResult<()> {
880        let result = self.primary.delete(tenant, resource_type, id).await;
881
882        let primary_id = self.config.primary_id().unwrap_or("primary");
883        self.update_health(
884            primary_id,
885            result.is_ok(),
886            result.as_ref().err().map(|e| e.to_string()),
887        );
888
889        result?;
890
891        // Sync to secondaries
892        if let Err(e) = self
893            .sync_to_secondaries(SyncEvent::Delete {
894                resource_type: resource_type.to_string(),
895                resource_id: id.to_string(),
896                tenant_id: tenant.tenant_id().clone(),
897            })
898            .await
899        {
900            warn!(error = %e, "Failed to sync delete to secondaries");
901        }
902
903        Ok(())
904    }
905
906    async fn count(
907        &self,
908        tenant: &TenantContext,
909        resource_type: Option<&str>,
910    ) -> StorageResult<u64> {
911        self.primary.count(tenant, resource_type).await
912    }
913}
914
915#[async_trait]
916impl SearchProvider for CompositeStorage {
917    #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
918    async fn search(
919        &self,
920        tenant: &TenantContext,
921        query: &SearchQuery,
922    ) -> StorageResult<SearchResult> {
923        self.execute_routed_search(tenant, query).await
924    }
925
926    async fn search_count(
927        &self,
928        tenant: &TenantContext,
929        query: &SearchQuery,
930    ) -> StorageResult<u64> {
931        // Prefer dedicated Search backend when configured, matching `search` routing.
932        if let Some(search_backend) = self
933            .config
934            .backends_with_role(super::config::BackendRole::Search)
935            .next()
936        {
937            if let Some(provider) = self.search_providers.get(&search_backend.id) {
938                return provider.search_count(tenant, query).await;
939            }
940        }
941
942        // Fall back to primary provider.
943        if let Some(provider) = self
944            .search_providers
945            .get(self.config.primary_id().unwrap_or("primary"))
946        {
947            provider.search_count(tenant, query).await
948        } else {
949            Err(StorageError::Backend(BackendError::UnsupportedCapability {
950                backend_name: "composite".to_string(),
951                capability: "search_count".to_string(),
952            }))
953        }
954    }
955
956    fn search_param_registry(
957        &self,
958    ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
959        // Same routing as `search`: prefer the dedicated Search backend's
960        // registry, fall back to primary, otherwise an empty registry. The
961        // returned reference outlives `&self` because both providers are
962        // owned by `self.search_providers` for the lifetime of the composite.
963        if let Some(search_backend) = self
964            .config
965            .backends_with_role(super::config::BackendRole::Search)
966            .next()
967        {
968            if let Some(provider) = self.search_providers.get(&search_backend.id) {
969                return provider.search_param_registry();
970            }
971        }
972
973        if let Some(provider) = self
974            .search_providers
975            .get(self.config.primary_id().unwrap_or("primary"))
976        {
977            return provider.search_param_registry();
978        }
979
980        use std::sync::OnceLock;
981        static EMPTY: OnceLock<
982            std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
983        > = OnceLock::new();
984        EMPTY.get_or_init(|| {
985            std::sync::Arc::new(parking_lot::RwLock::new(
986                crate::search::SearchParameterRegistry::new(),
987            ))
988        })
989    }
990
991    fn supports_contained_search(&self) -> bool {
992        // Same routing as `search`: defer to the dedicated Search backend when
993        // configured, otherwise the primary provider.
994        if let Some(search_backend) = self
995            .config
996            .backends_with_role(super::config::BackendRole::Search)
997            .next()
998        {
999            if let Some(provider) = self.search_providers.get(&search_backend.id) {
1000                return provider.supports_contained_search();
1001            }
1002        }
1003        self.search_providers
1004            .get(self.config.primary_id().unwrap_or("primary"))
1005            .map(|p| p.supports_contained_search())
1006            .unwrap_or(false)
1007    }
1008
1009    fn modifiers_for_param_type(
1010        &self,
1011        param_type: crate::types::SearchParamType,
1012    ) -> Vec<&'static str> {
1013        // Same routing as `search`: defer to the dedicated Search backend when
1014        // configured, otherwise the primary provider.
1015        if let Some(search_backend) = self
1016            .config
1017            .backends_with_role(super::config::BackendRole::Search)
1018            .next()
1019        {
1020            if let Some(provider) = self.search_providers.get(&search_backend.id) {
1021                return provider.modifiers_for_param_type(param_type);
1022            }
1023        }
1024        self.search_providers
1025            .get(self.config.primary_id().unwrap_or("primary"))
1026            .map(|p| p.modifiers_for_param_type(param_type))
1027            .unwrap_or_default()
1028    }
1029}
1030
1031#[async_trait]
1032impl ConditionalStorage for CompositeStorage {
1033    async fn conditional_create(
1034        &self,
1035        tenant: &TenantContext,
1036        resource_type: &str,
1037        resource: Value,
1038        search_params: &str,
1039        fhir_version: FhirVersion,
1040    ) -> StorageResult<ConditionalCreateResult> {
1041        if self.has_dedicated_search_backend() {
1042            let matches = self
1043                .find_conditional_matches(tenant, resource_type, search_params)
1044                .await?;
1045
1046            return match matches.len() {
1047                0 => {
1048                    let created = self
1049                        .primary
1050                        .create(tenant, resource_type, resource, fhir_version)
1051                        .await?;
1052
1053                    if let Err(e) = self
1054                        .sync_to_secondaries(SyncEvent::Create {
1055                            resource_type: resource_type.to_string(),
1056                            resource_id: created.id().to_string(),
1057                            content: created.content().clone(),
1058                            tenant_id: tenant.tenant_id().clone(),
1059                            fhir_version,
1060                        })
1061                        .await
1062                    {
1063                        warn!(error = %e, "Failed to sync conditional_create to secondaries");
1064                    }
1065
1066                    Ok(ConditionalCreateResult::Created(created))
1067                }
1068                1 => Ok(ConditionalCreateResult::Exists(
1069                    matches.into_iter().next().expect("single match must exist"),
1070                )),
1071                n => Ok(ConditionalCreateResult::MultipleMatches(n)),
1072            };
1073        }
1074
1075        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1076            StorageError::Backend(BackendError::UnsupportedCapability {
1077                backend_name: "composite".to_string(),
1078                capability: "ConditionalStorage".to_string(),
1079            })
1080        })?;
1081
1082        let result = storage
1083            .conditional_create(tenant, resource_type, resource, search_params, fhir_version)
1084            .await?;
1085
1086        // Sync created resource to secondaries
1087        if let ConditionalCreateResult::Created(ref stored) = result {
1088            if let Err(e) = self
1089                .sync_to_secondaries(SyncEvent::Create {
1090                    resource_type: resource_type.to_string(),
1091                    resource_id: stored.id().to_string(),
1092                    content: stored.content().clone(),
1093                    tenant_id: tenant.tenant_id().clone(),
1094                    fhir_version,
1095                })
1096                .await
1097            {
1098                warn!(error = %e, "Failed to sync conditional_create to secondaries");
1099            }
1100        }
1101
1102        Ok(result)
1103    }
1104
1105    async fn conditional_update(
1106        &self,
1107        tenant: &TenantContext,
1108        resource_type: &str,
1109        resource: Value,
1110        search_params: &str,
1111        upsert: bool,
1112        fhir_version: FhirVersion,
1113    ) -> StorageResult<ConditionalUpdateResult> {
1114        if self.has_dedicated_search_backend() {
1115            let matches = self
1116                .find_conditional_matches(tenant, resource_type, search_params)
1117                .await?;
1118
1119            return match matches.len() {
1120                0 => {
1121                    if upsert {
1122                        let created = self
1123                            .primary
1124                            .create(tenant, resource_type, resource, fhir_version)
1125                            .await?;
1126
1127                        if let Err(e) = self
1128                            .sync_to_secondaries(SyncEvent::Create {
1129                                resource_type: resource_type.to_string(),
1130                                resource_id: created.id().to_string(),
1131                                content: created.content().clone(),
1132                                tenant_id: tenant.tenant_id().clone(),
1133                                fhir_version,
1134                            })
1135                            .await
1136                        {
1137                            warn!(
1138                                error = %e,
1139                                "Failed to sync conditional_update create to secondaries"
1140                            );
1141                        }
1142
1143                        Ok(ConditionalUpdateResult::Created(created))
1144                    } else {
1145                        Ok(ConditionalUpdateResult::NoMatch)
1146                    }
1147                }
1148                1 => {
1149                    let current = matches.into_iter().next().expect("single match must exist");
1150                    let updated = self.primary.update(tenant, &current, resource).await?;
1151
1152                    if let Err(e) = self
1153                        .sync_to_secondaries(SyncEvent::Update {
1154                            resource_type: resource_type.to_string(),
1155                            resource_id: updated.id().to_string(),
1156                            content: updated.content().clone(),
1157                            tenant_id: tenant.tenant_id().clone(),
1158                            version: updated.version_id().to_string(),
1159                            fhir_version: updated.fhir_version(),
1160                        })
1161                        .await
1162                    {
1163                        warn!(error = %e, "Failed to sync conditional_update to secondaries");
1164                    }
1165
1166                    Ok(ConditionalUpdateResult::Updated(updated))
1167                }
1168                n => Ok(ConditionalUpdateResult::MultipleMatches(n)),
1169            };
1170        }
1171
1172        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1173            StorageError::Backend(BackendError::UnsupportedCapability {
1174                backend_name: "composite".to_string(),
1175                capability: "ConditionalStorage".to_string(),
1176            })
1177        })?;
1178
1179        let result = storage
1180            .conditional_update(
1181                tenant,
1182                resource_type,
1183                resource,
1184                search_params,
1185                upsert,
1186                fhir_version,
1187            )
1188            .await?;
1189
1190        // Sync to secondaries
1191        match &result {
1192            ConditionalUpdateResult::Created(stored) => {
1193                if let Err(e) = self
1194                    .sync_to_secondaries(SyncEvent::Create {
1195                        resource_type: resource_type.to_string(),
1196                        resource_id: stored.id().to_string(),
1197                        content: stored.content().clone(),
1198                        tenant_id: tenant.tenant_id().clone(),
1199                        fhir_version,
1200                    })
1201                    .await
1202                {
1203                    warn!(error = %e, "Failed to sync conditional_update create to secondaries");
1204                }
1205            }
1206            ConditionalUpdateResult::Updated(stored) => {
1207                if let Err(e) = self
1208                    .sync_to_secondaries(SyncEvent::Update {
1209                        resource_type: resource_type.to_string(),
1210                        resource_id: stored.id().to_string(),
1211                        content: stored.content().clone(),
1212                        tenant_id: tenant.tenant_id().clone(),
1213                        version: stored.version_id().to_string(),
1214                        fhir_version: stored.fhir_version(),
1215                    })
1216                    .await
1217                {
1218                    warn!(error = %e, "Failed to sync conditional_update to secondaries");
1219                }
1220            }
1221            _ => {}
1222        }
1223
1224        Ok(result)
1225    }
1226
1227    async fn conditional_delete(
1228        &self,
1229        tenant: &TenantContext,
1230        resource_type: &str,
1231        search_params: &str,
1232    ) -> StorageResult<ConditionalDeleteResult> {
1233        if self.has_dedicated_search_backend() {
1234            let matches = self
1235                .find_conditional_matches(tenant, resource_type, search_params)
1236                .await?;
1237
1238            return match matches.len() {
1239                0 => Ok(ConditionalDeleteResult::NoMatch),
1240                1 => {
1241                    let current = matches.into_iter().next().expect("single match must exist");
1242                    self.primary
1243                        .delete(tenant, resource_type, current.id())
1244                        .await?;
1245
1246                    if let Err(e) = self
1247                        .sync_to_secondaries(SyncEvent::Delete {
1248                            resource_type: resource_type.to_string(),
1249                            resource_id: current.id().to_string(),
1250                            tenant_id: tenant.tenant_id().clone(),
1251                        })
1252                        .await
1253                    {
1254                        warn!(error = %e, "Failed to sync conditional_delete to secondaries");
1255                    }
1256
1257                    Ok(ConditionalDeleteResult::Deleted)
1258                }
1259                n => Ok(ConditionalDeleteResult::MultipleMatches(n)),
1260            };
1261        }
1262
1263        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1264            StorageError::Backend(BackendError::UnsupportedCapability {
1265                backend_name: "composite".to_string(),
1266                capability: "ConditionalStorage".to_string(),
1267            })
1268        })?;
1269
1270        let result = storage
1271            .conditional_delete(tenant, resource_type, search_params)
1272            .await?;
1273
1274        // Note: We don't have the resource ID for sync here — the primary already
1275        // performed the delete. The sync_manager will handle it if configured.
1276
1277        Ok(result)
1278    }
1279
1280    async fn conditional_patch(
1281        &self,
1282        tenant: &TenantContext,
1283        resource_type: &str,
1284        search_params: &str,
1285        patch: &PatchFormat,
1286    ) -> StorageResult<ConditionalPatchResult> {
1287        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
1288            StorageError::Backend(BackendError::UnsupportedCapability {
1289                backend_name: "composite".to_string(),
1290                capability: "ConditionalStorage".to_string(),
1291            })
1292        })?;
1293
1294        let result = storage
1295            .conditional_patch(tenant, resource_type, search_params, patch)
1296            .await?;
1297
1298        // Sync patched resource to secondaries
1299        if let ConditionalPatchResult::Patched(ref stored) = result {
1300            if let Err(e) = self
1301                .sync_to_secondaries(SyncEvent::Update {
1302                    resource_type: resource_type.to_string(),
1303                    resource_id: stored.id().to_string(),
1304                    content: stored.content().clone(),
1305                    tenant_id: tenant.tenant_id().clone(),
1306                    version: stored.version_id().to_string(),
1307                    fhir_version: stored.fhir_version(),
1308                })
1309                .await
1310            {
1311                warn!(error = %e, "Failed to sync conditional_patch to secondaries");
1312            }
1313        }
1314
1315        Ok(result)
1316    }
1317}
1318
1319#[async_trait]
1320impl VersionedStorage for CompositeStorage {
1321    async fn vread(
1322        &self,
1323        tenant: &TenantContext,
1324        resource_type: &str,
1325        id: &str,
1326        version_id: &str,
1327    ) -> StorageResult<Option<StoredResource>> {
1328        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1329            StorageError::Backend(BackendError::UnsupportedCapability {
1330                backend_name: "composite".to_string(),
1331                capability: "VersionedStorage".to_string(),
1332            })
1333        })?;
1334
1335        storage.vread(tenant, resource_type, id, version_id).await
1336    }
1337
1338    async fn update_with_match(
1339        &self,
1340        tenant: &TenantContext,
1341        resource_type: &str,
1342        id: &str,
1343        expected_version: &str,
1344        resource: Value,
1345    ) -> StorageResult<StoredResource> {
1346        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1347            StorageError::Backend(BackendError::UnsupportedCapability {
1348                backend_name: "composite".to_string(),
1349                capability: "VersionedStorage".to_string(),
1350            })
1351        })?;
1352
1353        let stored = storage
1354            .update_with_match(tenant, resource_type, id, expected_version, resource)
1355            .await?;
1356
1357        // Sync to secondaries
1358        if let Err(e) = self
1359            .sync_to_secondaries(SyncEvent::Update {
1360                resource_type: resource_type.to_string(),
1361                resource_id: id.to_string(),
1362                content: stored.content().clone(),
1363                tenant_id: tenant.tenant_id().clone(),
1364                version: stored.version_id().to_string(),
1365                fhir_version: stored.fhir_version(),
1366            })
1367            .await
1368        {
1369            warn!(error = %e, "Failed to sync update_with_match to secondaries");
1370        }
1371
1372        Ok(stored)
1373    }
1374
1375    async fn delete_with_match(
1376        &self,
1377        tenant: &TenantContext,
1378        resource_type: &str,
1379        id: &str,
1380        expected_version: &str,
1381    ) -> StorageResult<()> {
1382        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1383            StorageError::Backend(BackendError::UnsupportedCapability {
1384                backend_name: "composite".to_string(),
1385                capability: "VersionedStorage".to_string(),
1386            })
1387        })?;
1388
1389        storage
1390            .delete_with_match(tenant, resource_type, id, expected_version)
1391            .await?;
1392
1393        // Sync to secondaries
1394        if let Err(e) = self
1395            .sync_to_secondaries(SyncEvent::Delete {
1396                resource_type: resource_type.to_string(),
1397                resource_id: id.to_string(),
1398                tenant_id: tenant.tenant_id().clone(),
1399            })
1400            .await
1401        {
1402            warn!(error = %e, "Failed to sync delete_with_match to secondaries");
1403        }
1404
1405        Ok(())
1406    }
1407
1408    async fn list_versions(
1409        &self,
1410        tenant: &TenantContext,
1411        resource_type: &str,
1412        id: &str,
1413    ) -> StorageResult<Vec<String>> {
1414        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1415            StorageError::Backend(BackendError::UnsupportedCapability {
1416                backend_name: "composite".to_string(),
1417                capability: "VersionedStorage".to_string(),
1418            })
1419        })?;
1420
1421        storage.list_versions(tenant, resource_type, id).await
1422    }
1423}
1424
1425#[async_trait]
1426impl InstanceHistoryProvider for CompositeStorage {
1427    async fn history_instance(
1428        &self,
1429        tenant: &TenantContext,
1430        resource_type: &str,
1431        id: &str,
1432        params: &HistoryParams,
1433    ) -> StorageResult<crate::core::HistoryPage> {
1434        let provider = self.history_provider.as_ref().ok_or_else(|| {
1435            StorageError::Backend(BackendError::UnsupportedCapability {
1436                backend_name: "composite".to_string(),
1437                capability: "InstanceHistoryProvider".to_string(),
1438            })
1439        })?;
1440
1441        provider
1442            .history_instance(tenant, resource_type, id, params)
1443            .await
1444    }
1445
1446    async fn history_instance_count(
1447        &self,
1448        tenant: &TenantContext,
1449        resource_type: &str,
1450        id: &str,
1451    ) -> StorageResult<u64> {
1452        let provider = self.history_provider.as_ref().ok_or_else(|| {
1453            StorageError::Backend(BackendError::UnsupportedCapability {
1454                backend_name: "composite".to_string(),
1455                capability: "InstanceHistoryProvider".to_string(),
1456            })
1457        })?;
1458
1459        provider
1460            .history_instance_count(tenant, resource_type, id)
1461            .await
1462    }
1463}
1464
1465#[async_trait]
1466impl TypeHistoryProvider for CompositeStorage {
1467    async fn history_type(
1468        &self,
1469        tenant: &TenantContext,
1470        resource_type: &str,
1471        params: &HistoryParams,
1472    ) -> StorageResult<crate::core::HistoryPage> {
1473        let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1474            StorageError::Backend(BackendError::UnsupportedCapability {
1475                backend_name: "composite".to_string(),
1476                capability: "TypeHistoryProvider".to_string(),
1477            })
1478        })?;
1479
1480        provider.history_type(tenant, resource_type, params).await
1481    }
1482
1483    async fn history_type_count(
1484        &self,
1485        tenant: &TenantContext,
1486        resource_type: &str,
1487    ) -> StorageResult<u64> {
1488        let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1489            StorageError::Backend(BackendError::UnsupportedCapability {
1490                backend_name: "composite".to_string(),
1491                capability: "TypeHistoryProvider".to_string(),
1492            })
1493        })?;
1494
1495        provider.history_type_count(tenant, resource_type).await
1496    }
1497}
1498
1499#[async_trait]
1500impl SystemHistoryProvider for CompositeStorage {
1501    async fn history_system(
1502        &self,
1503        tenant: &TenantContext,
1504        params: &HistoryParams,
1505    ) -> StorageResult<crate::core::HistoryPage> {
1506        let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1507            StorageError::Backend(BackendError::UnsupportedCapability {
1508                backend_name: "composite".to_string(),
1509                capability: "SystemHistoryProvider".to_string(),
1510            })
1511        })?;
1512
1513        provider.history_system(tenant, params).await
1514    }
1515
1516    async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1517        let provider = self.system_history_provider.as_ref().ok_or_else(|| {
1518            StorageError::Backend(BackendError::UnsupportedCapability {
1519                backend_name: "composite".to_string(),
1520                capability: "SystemHistoryProvider".to_string(),
1521            })
1522        })?;
1523
1524        provider.history_system_count(tenant).await
1525    }
1526}
1527
1528#[async_trait]
1529impl BundleProvider for CompositeStorage {
1530    async fn process_transaction(
1531        &self,
1532        tenant: &TenantContext,
1533        entries: Vec<BundleEntry>,
1534    ) -> Result<BundleResult, TransactionError> {
1535        let provider =
1536            self.bundle_provider
1537                .as_ref()
1538                .ok_or_else(|| TransactionError::BundleError {
1539                    index: 0,
1540                    message: "BundleProvider not available on composite primary".to_string(),
1541                })?;
1542
1543        let result = provider.process_transaction(tenant, entries).await?;
1544
1545        // Sync successful entries to secondaries by reading resources from primary
1546        self.sync_bundle_results(tenant, &result).await;
1547
1548        Ok(result)
1549    }
1550
1551    async fn process_batch(
1552        &self,
1553        tenant: &TenantContext,
1554        entries: Vec<BundleEntry>,
1555    ) -> StorageResult<BundleResult> {
1556        let provider = self.bundle_provider.as_ref().ok_or_else(|| {
1557            StorageError::Backend(BackendError::UnsupportedCapability {
1558                backend_name: "composite".to_string(),
1559                capability: "BundleProvider".to_string(),
1560            })
1561        })?;
1562
1563        let result = provider.process_batch(tenant, entries).await?;
1564
1565        // Sync successful entries to secondaries
1566        self.sync_bundle_results(tenant, &result).await;
1567
1568        Ok(result)
1569    }
1570}
1571
1572#[async_trait]
1573impl IncludeProvider for CompositeStorage {
1574    async fn resolve_includes(
1575        &self,
1576        tenant: &TenantContext,
1577        resources: &[StoredResource],
1578        includes: &[IncludeDirective],
1579    ) -> StorageResult<Vec<StoredResource>> {
1580        // Include resolution always uses primary (has all resources)
1581        let primary_id = self.config.primary_id().unwrap_or("primary");
1582
1583        if let Some(_provider) = self.search_providers.get(primary_id) {
1584            // Try to downcast to IncludeProvider
1585            // This is a limitation - we need trait objects
1586            // For now, fall back to a basic implementation
1587            self.resolve_includes_basic(tenant, resources, includes)
1588                .await
1589        } else {
1590            self.resolve_includes_basic(tenant, resources, includes)
1591                .await
1592        }
1593    }
1594}
1595
1596impl CompositeStorage {
1597    /// Basic include resolution by reading referenced resources.
1598    async fn resolve_includes_basic(
1599        &self,
1600        tenant: &TenantContext,
1601        resources: &[StoredResource],
1602        includes: &[IncludeDirective],
1603    ) -> StorageResult<Vec<StoredResource>> {
1604        use std::collections::HashSet;
1605
1606        let mut included = Vec::new();
1607        let mut seen_ids = HashSet::new();
1608
1609        for resource in resources {
1610            for include in includes {
1611                // Extract references from resource based on search param
1612                let refs = self.extract_references(resource, &include.search_param);
1613
1614                for reference in refs {
1615                    // Parse reference: "ResourceType/id"
1616                    if let Some((ref_type, ref_id)) = reference.split_once('/') {
1617                        // Check target type filter
1618                        if let Some(ref target) = include.target_type {
1619                            if target != ref_type {
1620                                continue;
1621                            }
1622                        }
1623
1624                        let key = format!("{}/{}", ref_type, ref_id);
1625                        if seen_ids.insert(key) {
1626                            if let Ok(Some(included_resource)) =
1627                                self.primary.read(tenant, ref_type, ref_id).await
1628                            {
1629                                included.push(included_resource);
1630                            }
1631                        }
1632                    }
1633                }
1634            }
1635        }
1636
1637        Ok(included)
1638    }
1639
1640    /// Extracts references from a resource for a given search parameter.
1641    ///
1642    /// Resolution order:
1643    /// 1. Look up the search parameter in the registry and evaluate its
1644    ///    FHIRPath `expression` via `SearchParameterExtractor` — the canonical
1645    ///    FHIR source of truth for every standard parameter and any custom
1646    ///    parameter the user has registered.
1647    /// 2. Fall back to the prior heuristic (look for the search-param name as
1648    ///    a JSON field, plus a small alias map for `patient`/`subject`/etc.)
1649    ///    only when the registry doesn't know about the parameter at all.
1650    ///    That keeps unregistered custom parameters working as they did
1651    ///    before this change.
1652    fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec<String> {
1653        let content = resource.content();
1654        let resource_type = resource.resource_type();
1655
1656        let registered = {
1657            let registry = self.search_param_registry().read();
1658            registry
1659                .get_param(resource_type, search_param)
1660                .or_else(|| registry.get_param("Resource", search_param))
1661        };
1662
1663        if let Some(param_def) = registered {
1664            let extractor = crate::search::SearchParameterExtractor::new(Arc::clone(
1665                self.search_param_registry(),
1666            ));
1667            if let Ok(values) = extractor.extract_for_param(content, &param_def) {
1668                // Trust the registry: if the param is registered, return what
1669                // the FHIRPath expression yields (even if empty) rather than
1670                // also running the heuristic — otherwise an _include against
1671                // a resource that genuinely has no matching reference would
1672                // accidentally match an unrelated JSON field with the same
1673                // name.
1674                return values
1675                    .into_iter()
1676                    .filter_map(|v| match v.value {
1677                        crate::search::IndexValue::Reference { reference, .. } => Some(reference),
1678                        _ => None,
1679                    })
1680                    .collect();
1681            }
1682        }
1683
1684        // Heuristic fallback for unregistered custom parameters.
1685        let mut refs = Vec::new();
1686        if let Some(value) = content.get(search_param) {
1687            Self::extract_reference_values(value, &mut refs);
1688        }
1689        let alias = match search_param {
1690            "patient" | "subject" => Some("subject"),
1691            "encounter" => Some("encounter"),
1692            "performer" => Some("performer"),
1693            _ => None,
1694        };
1695        if let Some(field) = alias {
1696            if let Some(value) = content.get(field) {
1697                Self::extract_reference_values(value, &mut refs);
1698            }
1699        }
1700        refs
1701    }
1702
1703    /// Recursively extracts reference values.
1704    fn extract_reference_values(value: &Value, refs: &mut Vec<String>) {
1705        match value {
1706            Value::Object(obj) => {
1707                if let Some(Value::String(reference)) = obj.get("reference") {
1708                    refs.push(reference.clone());
1709                }
1710            }
1711            Value::Array(arr) => {
1712                for item in arr {
1713                    Self::extract_reference_values(item, refs);
1714                }
1715            }
1716            _ => {}
1717        }
1718    }
1719}
1720
1721#[async_trait]
1722impl RevincludeProvider for CompositeStorage {
1723    async fn resolve_revincludes(
1724        &self,
1725        tenant: &TenantContext,
1726        resources: &[StoredResource],
1727        revincludes: &[IncludeDirective],
1728    ) -> StorageResult<Vec<StoredResource>> {
1729        // Revinclude resolution - find resources that reference the primary results
1730        // This typically requires search capability
1731        let mut revincluded = Vec::new();
1732
1733        for revinclude in revincludes {
1734            for resource in resources {
1735                let reference = format!("{}/{}", resource.resource_type(), resource.id());
1736
1737                // Search for resources that reference this one
1738                let query = SearchQuery::new(&revinclude.source_type).with_parameter(
1739                    crate::types::SearchParameter {
1740                        name: revinclude.search_param.clone(),
1741                        param_type: crate::types::SearchParamType::Reference,
1742                        modifier: None,
1743                        values: vec![crate::types::SearchValue::eq(&reference)],
1744                        chain: vec![],
1745                        components: vec![],
1746                    },
1747                );
1748
1749                if let Ok(result) = self.search(tenant, &query).await {
1750                    for item in result.resources.items {
1751                        revincluded.push(item);
1752                    }
1753                }
1754            }
1755        }
1756
1757        // Deduplicate
1758        let mut seen = std::collections::HashSet::new();
1759        revincluded.retain(|r| seen.insert(format!("{}/{}", r.resource_type(), r.id())));
1760
1761        Ok(revincluded)
1762    }
1763}
1764
1765#[async_trait]
1766impl ChainedSearchProvider for CompositeStorage {
1767    async fn resolve_chain(
1768        &self,
1769        tenant: &TenantContext,
1770        base_type: &str,
1771        chain: &str,
1772        value: &str,
1773    ) -> StorageResult<Vec<String>> {
1774        self.resolve_chain_via_search(tenant, base_type, chain, value)
1775            .await
1776    }
1777
1778    async fn resolve_reverse_chain(
1779        &self,
1780        tenant: &TenantContext,
1781        base_type: &str,
1782        reverse_chain: &ReverseChainedParameter,
1783    ) -> StorageResult<Vec<String>> {
1784        // Find resources of source_type that match the parameter,
1785        // then return IDs of base_type resources they reference
1786        let values = match &reverse_chain.value {
1787            Some(v) => vec![v.clone()],
1788            None => vec![],
1789        };
1790        let query = SearchQuery::new(&reverse_chain.source_type).with_parameter(
1791            crate::types::SearchParameter {
1792                name: reverse_chain.search_param.clone(),
1793                param_type: crate::types::SearchParamType::Token,
1794                modifier: None,
1795                values,
1796                chain: vec![],
1797                components: vec![],
1798            },
1799        );
1800
1801        let result = self.search(tenant, &query).await?;
1802
1803        // Extract references to base_type
1804        let mut ids = Vec::new();
1805        for resource in result.resources.items {
1806            let refs = self.extract_references(&resource, &reverse_chain.reference_param);
1807            for reference in refs {
1808                if let Some((ref_type, ref_id)) = reference.split_once('/') {
1809                    if ref_type == base_type {
1810                        ids.push(ref_id.to_string());
1811                    }
1812                }
1813            }
1814        }
1815
1816        Ok(ids)
1817    }
1818}
1819
1820impl CompositeStorage {
1821    /// Resolves a forward chain by issuing iterative SearchQueries against
1822    /// composite's regular search routing.
1823    ///
1824    /// For `Observation?subject.organization.name=Hospital`:
1825    ///   1. Parse the chain via the registry: links = [(subject -> Patient),
1826    ///      (organization -> Organization)], terminal = name.
1827    ///   2. Search Organization for `name=Hospital` — collect refs.
1828    ///   3. Search Patient for `organization=<refs>` — collect refs.
1829    ///   4. Search Observation for `subject=<refs>` — return resource IDs.
1830    ///
1831    /// Each step issues exactly one SearchQuery, so the cost is proportional
1832    /// to the chain depth, not the result-set fan-out (the inner backend
1833    /// applies the multi-value `OR` semantics natively).
1834    async fn resolve_chain_via_search(
1835        &self,
1836        tenant: &TenantContext,
1837        base_type: &str,
1838        chain: &str,
1839        value: &str,
1840    ) -> StorageResult<Vec<String>> {
1841        use crate::types::{SearchParameter, SearchQuery, SearchValue};
1842
1843        let parts: Vec<&str> = chain.split('.').collect();
1844        if parts.len() < 2 {
1845            return Ok(Vec::new());
1846        }
1847
1848        // Resolve target type per chain link from the registry. Mirrors
1849        // chain_builder::resolve_target_type so composite agrees with SQLite
1850        // and Postgres on ambiguous reference disambiguation.
1851        let target_types: Vec<String> = {
1852            let registry = self.search_param_registry().read();
1853            let mut types = Vec::with_capacity(parts.len() - 1);
1854            let mut current = base_type.to_string();
1855            for ref_param in parts.iter().take(parts.len() - 1) {
1856                let next = registry
1857                    .get_param(&current, ref_param)
1858                    .and_then(|def| {
1859                        def.target.as_ref().and_then(|t| {
1860                            if t.len() == 1 {
1861                                Some(t[0].clone())
1862                            } else {
1863                                None
1864                            }
1865                        })
1866                    })
1867                    .unwrap_or_else(|| crate::search::chain_resolver::infer_target_type(ref_param));
1868                types.push(next.clone());
1869                current = next;
1870            }
1871            types
1872        };
1873
1874        // Innermost: search the deepest target type for the terminal param.
1875        let terminal_param = parts[parts.len() - 1];
1876        let deepest_type = target_types.last().map(String::as_str).unwrap_or(base_type);
1877
1878        let terminal_query = SearchQuery::new(deepest_type).with_parameter(SearchParameter {
1879            name: terminal_param.to_string(),
1880            param_type: {
1881                let registry = self.search_param_registry().read();
1882                crate::search::resolve_param_type(
1883                    &registry,
1884                    deepest_type,
1885                    terminal_param,
1886                    &[SearchValue::eq(value)],
1887                )
1888            },
1889            modifier: None,
1890            values: vec![SearchValue::eq(value)],
1891            chain: vec![],
1892            components: vec![],
1893        });
1894
1895        let result = self.search(tenant, &terminal_query).await?;
1896        let mut current_refs: Vec<String> = result
1897            .resources
1898            .items
1899            .into_iter()
1900            .map(|r| format!("{}/{}", r.resource_type(), r.id()))
1901            .collect();
1902
1903        if current_refs.is_empty() {
1904            return Ok(Vec::new());
1905        }
1906
1907        // Walk back: for each link from deepest to outermost, search the
1908        // parent type for resources whose reference param matches the
1909        // accumulated refs.
1910        for i in (0..parts.len() - 1).rev() {
1911            let ref_param = parts[i];
1912            let parent_type = if i == 0 {
1913                base_type
1914            } else {
1915                &target_types[i - 1]
1916            };
1917
1918            let values: Vec<SearchValue> = current_refs.iter().map(SearchValue::eq).collect();
1919            let query = SearchQuery::new(parent_type).with_parameter(SearchParameter {
1920                name: ref_param.to_string(),
1921                param_type: crate::types::SearchParamType::Reference,
1922                modifier: None,
1923                values,
1924                chain: vec![],
1925                components: vec![],
1926            });
1927
1928            let r = self.search(tenant, &query).await?;
1929            current_refs = r
1930                .resources
1931                .items
1932                .into_iter()
1933                .map(|res| {
1934                    if i == 0 {
1935                        // Outermost: caller wants raw IDs, not refs.
1936                        res.id().to_string()
1937                    } else {
1938                        format!("{}/{}", res.resource_type(), res.id())
1939                    }
1940                })
1941                .collect();
1942
1943            if current_refs.is_empty() {
1944                return Ok(Vec::new());
1945            }
1946        }
1947
1948        Ok(current_refs)
1949    }
1950}
1951
1952#[async_trait]
1953impl TerminologySearchProvider for CompositeStorage {
1954    async fn expand_value_set(&self, _value_set_url: &str) -> StorageResult<Vec<(String, String)>> {
1955        // Delegate to terminology backend if available
1956        let term_backend = self
1957            .config
1958            .backends_with_role(super::config::BackendRole::Terminology)
1959            .next();
1960
1961        if let Some(_backend) = term_backend {
1962            // Would need to downcast to TerminologySearchProvider
1963        }
1964
1965        // Fallback: not supported without terminology service
1966        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1967            backend_name: "composite".to_string(),
1968            capability: "expand_value_set".to_string(),
1969        }))
1970    }
1971
1972    async fn codes_above(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1973        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1974            backend_name: "composite".to_string(),
1975            capability: "codes_above".to_string(),
1976        }))
1977    }
1978
1979    async fn codes_below(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1980        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1981            backend_name: "composite".to_string(),
1982            capability: "codes_below".to_string(),
1983        }))
1984    }
1985}
1986
1987#[async_trait]
1988impl TextSearchProvider for CompositeStorage {
1989    async fn search_text(
1990        &self,
1991        tenant: &TenantContext,
1992        resource_type: &str,
1993        text: &str,
1994        pagination: &Pagination,
1995    ) -> StorageResult<SearchResult> {
1996        // Delegate to search backend if available
1997        let search_backend = self
1998            .config
1999            .backends_with_role(super::config::BackendRole::Search)
2000            .next();
2001
2002        if let Some(backend) = search_backend {
2003            if let Some(provider) = self.search_providers.get(&backend.id) {
2004                // Build a text search query
2005                let query = SearchQuery::new(resource_type)
2006                    .with_parameter(crate::types::SearchParameter {
2007                        name: "_text".to_string(),
2008                        param_type: crate::types::SearchParamType::String,
2009                        modifier: None,
2010                        values: vec![crate::types::SearchValue::string(text)],
2011                        chain: vec![],
2012                        components: vec![],
2013                    })
2014                    .with_count(pagination.count);
2015
2016                return provider.search(tenant, &query).await;
2017            }
2018        }
2019
2020        // Fallback to primary (may be less efficient)
2021        self.execute_primary_search(
2022            tenant,
2023            &SearchQuery::new(resource_type)
2024                .with_parameter(crate::types::SearchParameter {
2025                    name: "_text".to_string(),
2026                    param_type: crate::types::SearchParamType::String,
2027                    modifier: None,
2028                    values: vec![crate::types::SearchValue::string(text)],
2029                    chain: vec![],
2030                    components: vec![],
2031                })
2032                .with_count(pagination.count),
2033        )
2034        .await
2035    }
2036
2037    async fn search_content(
2038        &self,
2039        tenant: &TenantContext,
2040        resource_type: &str,
2041        content: &str,
2042        pagination: &Pagination,
2043    ) -> StorageResult<SearchResult> {
2044        // Similar to search_text but uses _content parameter
2045        let search_backend = self
2046            .config
2047            .backends_with_role(super::config::BackendRole::Search)
2048            .next();
2049
2050        if let Some(backend) = search_backend {
2051            if let Some(provider) = self.search_providers.get(&backend.id) {
2052                let query = SearchQuery::new(resource_type)
2053                    .with_parameter(crate::types::SearchParameter {
2054                        name: "_content".to_string(),
2055                        param_type: crate::types::SearchParamType::String,
2056                        modifier: None,
2057                        values: vec![crate::types::SearchValue::string(content)],
2058                        chain: vec![],
2059                        components: vec![],
2060                    })
2061                    .with_count(pagination.count);
2062
2063                return provider.search(tenant, &query).await;
2064            }
2065        }
2066
2067        self.execute_primary_search(
2068            tenant,
2069            &SearchQuery::new(resource_type)
2070                .with_parameter(crate::types::SearchParameter {
2071                    name: "_content".to_string(),
2072                    param_type: crate::types::SearchParamType::String,
2073                    modifier: None,
2074                    values: vec![crate::types::SearchValue::string(content)],
2075                    chain: vec![],
2076                    components: vec![],
2077                })
2078                .with_count(pagination.count),
2079        )
2080        .await
2081    }
2082}
2083
2084impl CapabilityProvider for CompositeStorage {
2085    fn capabilities(&self) -> StorageCapabilities {
2086        use std::collections::HashSet;
2087
2088        // Merge capabilities from all backends
2089        let resource_caps = HashMap::new();
2090
2091        let mut system_interactions = HashSet::new();
2092        system_interactions.insert(crate::core::SystemInteraction::Transaction);
2093        system_interactions.insert(crate::core::SystemInteraction::Batch);
2094        system_interactions.insert(crate::core::SystemInteraction::SearchSystem);
2095        system_interactions.insert(crate::core::SystemInteraction::HistorySystem);
2096
2097        StorageCapabilities {
2098            backend_name: "composite".to_string(),
2099            backend_version: None,
2100            resources: resource_caps,
2101            system_interactions,
2102            supports_system_history: true,
2103            supports_system_search: true,
2104            supported_sorts: vec!["_lastUpdated".to_string(), "_id".to_string()],
2105            supports_total: true,
2106            max_page_size: Some(1000),
2107            default_page_size: 20,
2108        }
2109    }
2110
2111    // resource_capabilities uses the default implementation that returns Option<ResourceCapabilities>
2112}
2113
2114/// Returns an `UnsupportedCapability` error for export operations when the
2115/// primary backend does not implement the export provider traits.
2116fn export_unsupported() -> StorageError {
2117    StorageError::Backend(BackendError::UnsupportedCapability {
2118        backend_name: "composite".to_string(),
2119        capability: "bulk-export".to_string(),
2120    })
2121}
2122
2123#[async_trait]
2124impl ExportDataProvider for CompositeStorage {
2125    async fn list_export_types(
2126        &self,
2127        tenant: &TenantContext,
2128        request: &ExportRequest,
2129    ) -> StorageResult<Vec<String>> {
2130        match &self.export_provider {
2131            Some(p) => p.list_export_types(tenant, request).await,
2132            None => Err(export_unsupported()),
2133        }
2134    }
2135
2136    async fn count_export_resources(
2137        &self,
2138        tenant: &TenantContext,
2139        request: &ExportRequest,
2140        resource_type: &str,
2141    ) -> StorageResult<u64> {
2142        match &self.export_provider {
2143            Some(p) => {
2144                p.count_export_resources(tenant, request, resource_type)
2145                    .await
2146            }
2147            None => Err(export_unsupported()),
2148        }
2149    }
2150
2151    async fn fetch_export_batch(
2152        &self,
2153        tenant: &TenantContext,
2154        request: &ExportRequest,
2155        resource_type: &str,
2156        cursor: Option<&str>,
2157        batch_size: u32,
2158    ) -> StorageResult<NdjsonBatch> {
2159        match &self.export_provider {
2160            Some(p) => {
2161                p.fetch_export_batch(tenant, request, resource_type, cursor, batch_size)
2162                    .await
2163            }
2164            None => Err(export_unsupported()),
2165        }
2166    }
2167}
2168
2169#[async_trait]
2170impl PatientExportProvider for CompositeStorage {
2171    async fn list_patient_ids(
2172        &self,
2173        tenant: &TenantContext,
2174        request: &ExportRequest,
2175        cursor: Option<&str>,
2176        batch_size: u32,
2177    ) -> StorageResult<(Vec<String>, Option<String>)> {
2178        match &self.export_provider {
2179            Some(p) => {
2180                p.list_patient_ids(tenant, request, cursor, batch_size)
2181                    .await
2182            }
2183            None => Err(export_unsupported()),
2184        }
2185    }
2186
2187    async fn fetch_patient_compartment_batch(
2188        &self,
2189        tenant: &TenantContext,
2190        request: &ExportRequest,
2191        resource_type: &str,
2192        patient_ids: &[String],
2193        cursor: Option<&str>,
2194        batch_size: u32,
2195    ) -> StorageResult<NdjsonBatch> {
2196        match &self.export_provider {
2197            Some(p) => {
2198                p.fetch_patient_compartment_batch(
2199                    tenant,
2200                    request,
2201                    resource_type,
2202                    patient_ids,
2203                    cursor,
2204                    batch_size,
2205                )
2206                .await
2207            }
2208            None => Err(export_unsupported()),
2209        }
2210    }
2211}
2212
2213#[async_trait]
2214impl GroupExportProvider for CompositeStorage {
2215    async fn get_group_members(
2216        &self,
2217        tenant: &TenantContext,
2218        group_id: &str,
2219    ) -> StorageResult<Vec<String>> {
2220        match &self.export_provider {
2221            Some(p) => p.get_group_members(tenant, group_id).await,
2222            None => Err(export_unsupported()),
2223        }
2224    }
2225
2226    async fn resolve_group_patient_ids(
2227        &self,
2228        tenant: &TenantContext,
2229        group_id: &str,
2230    ) -> StorageResult<Vec<String>> {
2231        match &self.export_provider {
2232            Some(p) => p.resolve_group_patient_ids(tenant, group_id).await,
2233            None => Err(export_unsupported()),
2234        }
2235    }
2236}
2237
2238#[cfg(test)]
2239mod tests {
2240    use super::*;
2241    use crate::core::{BackendKind, CapabilityProvider};
2242    use crate::error::{BackendError, StorageError, StorageResult};
2243    use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2244    use crate::types::{
2245        SearchParamType, SearchParameter, SearchQuery, SearchValue, StoredResource,
2246    };
2247    use async_trait::async_trait;
2248    use helios_fhir::FhirVersion;
2249    use serde_json::{Value, json};
2250
2251    #[derive(Debug)]
2252    struct FailingSearchBackend {
2253        backend_name: &'static str,
2254        error_message: &'static str,
2255    }
2256
2257    #[async_trait]
2258    impl ResourceStorage for FailingSearchBackend {
2259        fn backend_name(&self) -> &'static str {
2260            self.backend_name
2261        }
2262
2263        async fn create(
2264            &self,
2265            _tenant: &TenantContext,
2266            _resource_type: &str,
2267            _resource: Value,
2268            _fhir_version: FhirVersion,
2269        ) -> StorageResult<StoredResource> {
2270            Err(StorageError::Backend(BackendError::UnsupportedCapability {
2271                backend_name: self.backend_name.to_string(),
2272                capability: "create".to_string(),
2273            }))
2274        }
2275
2276        async fn create_or_update(
2277            &self,
2278            _tenant: &TenantContext,
2279            _resource_type: &str,
2280            _id: &str,
2281            _resource: Value,
2282            _fhir_version: FhirVersion,
2283        ) -> StorageResult<(StoredResource, bool)> {
2284            Err(StorageError::Backend(BackendError::UnsupportedCapability {
2285                backend_name: self.backend_name.to_string(),
2286                capability: "create_or_update".to_string(),
2287            }))
2288        }
2289
2290        async fn read(
2291            &self,
2292            _tenant: &TenantContext,
2293            _resource_type: &str,
2294            _id: &str,
2295        ) -> StorageResult<Option<StoredResource>> {
2296            Ok(None)
2297        }
2298
2299        async fn update(
2300            &self,
2301            _tenant: &TenantContext,
2302            _current: &StoredResource,
2303            _resource: Value,
2304        ) -> StorageResult<StoredResource> {
2305            Err(StorageError::Backend(BackendError::UnsupportedCapability {
2306                backend_name: self.backend_name.to_string(),
2307                capability: "update".to_string(),
2308            }))
2309        }
2310
2311        async fn delete(
2312            &self,
2313            _tenant: &TenantContext,
2314            _resource_type: &str,
2315            _id: &str,
2316        ) -> StorageResult<()> {
2317            Err(StorageError::Backend(BackendError::UnsupportedCapability {
2318                backend_name: self.backend_name.to_string(),
2319                capability: "delete".to_string(),
2320            }))
2321        }
2322
2323        async fn count(
2324            &self,
2325            _tenant: &TenantContext,
2326            _resource_type: Option<&str>,
2327        ) -> StorageResult<u64> {
2328            Ok(0)
2329        }
2330    }
2331
2332    #[async_trait]
2333    impl SearchProvider for FailingSearchBackend {
2334        async fn search(
2335            &self,
2336            _tenant: &TenantContext,
2337            _query: &SearchQuery,
2338        ) -> StorageResult<SearchResult> {
2339            Err(StorageError::Backend(BackendError::ConnectionFailed {
2340                backend_name: self.backend_name.to_string(),
2341                message: self.error_message.to_string(),
2342            }))
2343        }
2344
2345        async fn search_count(
2346            &self,
2347            _tenant: &TenantContext,
2348            _query: &SearchQuery,
2349        ) -> StorageResult<u64> {
2350            Err(StorageError::Backend(BackendError::ConnectionFailed {
2351                backend_name: self.backend_name.to_string(),
2352                message: self.error_message.to_string(),
2353            }))
2354        }
2355
2356        fn search_param_registry(
2357            &self,
2358        ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
2359            use std::sync::OnceLock;
2360            static EMPTY: OnceLock<
2361                std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
2362            > = OnceLock::new();
2363            EMPTY.get_or_init(|| {
2364                std::sync::Arc::new(parking_lot::RwLock::new(
2365                    crate::search::SearchParameterRegistry::new(),
2366                ))
2367            })
2368        }
2369    }
2370
2371    /// Minimal mock storage for unit testing CompositeStorage.
2372    struct MockStorage;
2373
2374    #[async_trait]
2375    impl ResourceStorage for MockStorage {
2376        fn backend_name(&self) -> &'static str {
2377            "mock"
2378        }
2379
2380        async fn create(
2381            &self,
2382            tenant: &TenantContext,
2383            resource_type: &str,
2384            resource: Value,
2385            fhir_version: FhirVersion,
2386        ) -> StorageResult<StoredResource> {
2387            let id = uuid::Uuid::new_v4().to_string();
2388            Ok(StoredResource::new(
2389                resource_type,
2390                &id,
2391                tenant.tenant_id().clone(),
2392                resource,
2393                fhir_version,
2394            ))
2395        }
2396
2397        async fn create_or_update(
2398            &self,
2399            tenant: &TenantContext,
2400            resource_type: &str,
2401            id: &str,
2402            resource: Value,
2403            fhir_version: FhirVersion,
2404        ) -> StorageResult<(StoredResource, bool)> {
2405            Ok((
2406                StoredResource::new(
2407                    resource_type,
2408                    id,
2409                    tenant.tenant_id().clone(),
2410                    resource,
2411                    fhir_version,
2412                ),
2413                true,
2414            ))
2415        }
2416
2417        async fn read(
2418            &self,
2419            _tenant: &TenantContext,
2420            _resource_type: &str,
2421            _id: &str,
2422        ) -> StorageResult<Option<StoredResource>> {
2423            Ok(None)
2424        }
2425
2426        async fn update(
2427            &self,
2428            tenant: &TenantContext,
2429            current: &StoredResource,
2430            resource: Value,
2431        ) -> StorageResult<StoredResource> {
2432            Ok(StoredResource::new(
2433                current.resource_type(),
2434                current.id(),
2435                tenant.tenant_id().clone(),
2436                resource,
2437                current.fhir_version(),
2438            ))
2439        }
2440
2441        async fn delete(
2442            &self,
2443            _tenant: &TenantContext,
2444            _resource_type: &str,
2445            _id: &str,
2446        ) -> StorageResult<()> {
2447            Ok(())
2448        }
2449
2450        async fn count(
2451            &self,
2452            _tenant: &TenantContext,
2453            _resource_type: Option<&str>,
2454        ) -> StorageResult<u64> {
2455            Ok(0)
2456        }
2457    }
2458
2459    fn make_tenant() -> TenantContext {
2460        TenantContext::new(TenantId::new("test"), TenantPermissions::full_access())
2461    }
2462
2463    fn make_composite_no_secondary() -> CompositeStorage {
2464        let config = CompositeConfig::builder()
2465            .primary("primary", BackendKind::Sqlite)
2466            .build()
2467            .unwrap();
2468        let mut backends = HashMap::new();
2469        backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage);
2470        CompositeStorage::new(config, backends).unwrap()
2471    }
2472
2473    fn make_composite_with_secondary() -> CompositeStorage {
2474        let config = CompositeConfig::builder()
2475            .primary("primary", BackendKind::Sqlite)
2476            .search_backend("es", BackendKind::Elasticsearch)
2477            .build()
2478            .unwrap();
2479        let mut backends = HashMap::new();
2480        backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage);
2481        backends.insert("es".to_string(), Arc::new(MockStorage) as DynStorage);
2482        CompositeStorage::new(config, backends).unwrap()
2483    }
2484
2485    fn test_config() -> CompositeConfig {
2486        CompositeConfig::builder()
2487            .primary("sqlite", BackendKind::Sqlite)
2488            .search_backend("es", BackendKind::Elasticsearch)
2489            .build()
2490            .unwrap()
2491    }
2492
2493    // ── BackendHealth ──────────────────────────────────────────────
2494
2495    #[test]
2496    fn test_backend_health_default() {
2497        let health = BackendHealth::default();
2498        assert!(health.healthy);
2499        assert_eq!(health.failure_count, 0);
2500        assert!(health.last_error.is_none());
2501        assert!(health.last_success.is_none());
2502    }
2503
2504    #[test]
2505    fn test_backend_health_clone() {
2506        let health = BackendHealth {
2507            healthy: false,
2508            last_success: None,
2509            failure_count: 5,
2510            last_error: Some("timeout".to_string()),
2511        };
2512        let cloned = health.clone();
2513        assert!(!cloned.healthy);
2514        assert_eq!(cloned.failure_count, 5);
2515        assert_eq!(cloned.last_error.as_deref(), Some("timeout"));
2516    }
2517
2518    // ── CompositeConfig ────────────────────────────────────────────
2519
2520    #[test]
2521    fn test_composite_config() {
2522        let config = test_config();
2523        assert_eq!(config.primary_id(), Some("sqlite"));
2524        assert_eq!(config.secondaries().count(), 1);
2525    }
2526
2527    #[cfg(feature = "sqlite")]
2528    #[tokio::test]
2529    async fn test_search_prefers_configured_search_backend() {
2530        use std::collections::HashMap;
2531        use std::sync::Arc;
2532
2533        use crate::backends::sqlite::SqliteBackend;
2534        use crate::core::{ResourceStorage, SearchProvider};
2535        use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2536        use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
2537
2538        let primary = Arc::new(SqliteBackend::in_memory().expect("create primary sqlite backend"));
2539        primary.init_schema().expect("init primary sqlite schema");
2540
2541        let search = Arc::new(SqliteBackend::in_memory().expect("create search sqlite backend"));
2542        search.init_schema().expect("init search sqlite schema");
2543
2544        let tenant = TenantContext::new(
2545            TenantId::new("composite-test"),
2546            TenantPermissions::full_access(),
2547        );
2548
2549        // Seed distinct data so we can tell which provider answered the query.
2550        primary
2551            .create(
2552                &tenant,
2553                "Patient",
2554                json!({
2555                    "resourceType": "Patient",
2556                    "id": "primary-only-patient",
2557                }),
2558                FhirVersion::default(),
2559            )
2560            .await
2561            .expect("seed primary patient");
2562
2563        search
2564            .create(
2565                &tenant,
2566                "Patient",
2567                json!({
2568                    "resourceType": "Patient",
2569                    "id": "search-only-patient",
2570                }),
2571                FhirVersion::default(),
2572            )
2573            .await
2574            .expect("seed search patient");
2575
2576        let composite_config = CompositeConfig::builder()
2577            .primary("primary", BackendKind::Sqlite)
2578            .search_backend("search", BackendKind::Sqlite)
2579            .build()
2580            .expect("build composite config");
2581
2582        let mut backends = HashMap::new();
2583        backends.insert("primary".to_string(), primary.clone() as DynStorage);
2584        backends.insert("search".to_string(), search.clone() as DynStorage);
2585
2586        let mut search_providers = HashMap::new();
2587        search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2588        search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2589
2590        let composite = CompositeStorage::new(composite_config, backends)
2591            .expect("create composite storage")
2592            .with_search_providers(search_providers)
2593            .with_full_primary(primary.clone());
2594
2595        let read_result = composite
2596            .read(&tenant, "Patient", "primary-only-patient")
2597            .await
2598            .expect("composite read should succeed");
2599        assert!(
2600            read_result.is_some(),
2601            "Read path should use primary backend data"
2602        );
2603
2604        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2605            name: "_id".to_string(),
2606            param_type: SearchParamType::Token,
2607            modifier: None,
2608            values: vec![SearchValue::eq("search-only-patient")],
2609            chain: vec![],
2610            components: vec![],
2611        });
2612
2613        let result = composite
2614            .search(&tenant, &query)
2615            .await
2616            .expect("composite search should succeed");
2617
2618        assert_eq!(result.resources.len(), 1);
2619        assert_eq!(result.resources.items[0].id(), "search-only-patient");
2620
2621        let count = composite
2622            .search_count(&tenant, &query)
2623            .await
2624            .expect("composite search_count should succeed");
2625        assert_eq!(count, 1);
2626    }
2627
2628    #[cfg(feature = "sqlite")]
2629    #[tokio::test]
2630    async fn test_search_backend_preserves_tenant_isolation() {
2631        use std::collections::HashMap;
2632        use std::sync::Arc;
2633
2634        use crate::backends::sqlite::SqliteBackend;
2635        use crate::core::{ResourceStorage, SearchProvider};
2636        use crate::tenant::{TenantContext, TenantId, TenantPermissions};
2637
2638        let primary = Arc::new(SqliteBackend::in_memory().expect("create primary sqlite backend"));
2639        primary.init_schema().expect("init primary sqlite schema");
2640
2641        let search = Arc::new(SqliteBackend::in_memory().expect("create search sqlite backend"));
2642        search.init_schema().expect("init search sqlite schema");
2643
2644        let tenant_a =
2645            TenantContext::new(TenantId::new("tenant-a"), TenantPermissions::full_access());
2646        let tenant_b =
2647            TenantContext::new(TenantId::new("tenant-b"), TenantPermissions::full_access());
2648
2649        search
2650            .create(
2651                &tenant_a,
2652                "Patient",
2653                json!({
2654                    "resourceType": "Patient",
2655                    "id": "tenant-a-patient",
2656                }),
2657                FhirVersion::default(),
2658            )
2659            .await
2660            .expect("seed tenant A search patient");
2661
2662        search
2663            .create(
2664                &tenant_b,
2665                "Patient",
2666                json!({
2667                    "resourceType": "Patient",
2668                    "id": "tenant-b-patient",
2669                }),
2670                FhirVersion::default(),
2671            )
2672            .await
2673            .expect("seed tenant B search patient");
2674
2675        let composite_config = CompositeConfig::builder()
2676            .primary("primary", BackendKind::Sqlite)
2677            .search_backend("search", BackendKind::Sqlite)
2678            .build()
2679            .expect("build composite config");
2680
2681        let mut backends = HashMap::new();
2682        backends.insert("primary".to_string(), primary.clone() as DynStorage);
2683        backends.insert("search".to_string(), search.clone() as DynStorage);
2684
2685        let mut search_providers = HashMap::new();
2686        search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2687        search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2688
2689        let composite = CompositeStorage::new(composite_config, backends)
2690            .expect("create composite storage")
2691            .with_search_providers(search_providers)
2692            .with_full_primary(primary.clone());
2693
2694        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2695            name: "_id".to_string(),
2696            param_type: SearchParamType::Token,
2697            modifier: None,
2698            values: vec![SearchValue::eq("tenant-a-patient")],
2699            chain: vec![],
2700            components: vec![],
2701        });
2702
2703        let tenant_a_result = composite
2704            .search(&tenant_a, &query)
2705            .await
2706            .expect("tenant A composite search should succeed");
2707        assert_eq!(tenant_a_result.resources.len(), 1);
2708        assert_eq!(tenant_a_result.resources.items[0].id(), "tenant-a-patient");
2709
2710        let tenant_b_result = composite
2711            .search(&tenant_b, &query)
2712            .await
2713            .expect("tenant B composite search should succeed");
2714        assert!(
2715            tenant_b_result.resources.is_empty(),
2716            "delegated search must not leak tenant A data to tenant B"
2717        );
2718    }
2719
2720    #[test]
2721    fn test_search_backend_failure_marks_backend_unhealthy() {
2722        use std::collections::HashMap;
2723        use std::sync::Arc;
2724
2725        use crate::composite::config::HealthConfig;
2726
2727        let primary = Arc::new(FailingSearchBackend {
2728            backend_name: "primary",
2729            error_message: "primary should not be used",
2730        });
2731        let search = Arc::new(FailingSearchBackend {
2732            backend_name: "search",
2733            error_message: "simulated search outage",
2734        });
2735
2736        let composite_config = CompositeConfig::builder()
2737            .primary("primary", BackendKind::MongoDB)
2738            .search_backend("search", BackendKind::Elasticsearch)
2739            .with_health_config(HealthConfig {
2740                failure_threshold: 1,
2741                ..HealthConfig::default()
2742            })
2743            .build()
2744            .expect("build composite config");
2745
2746        let mut backends = HashMap::new();
2747        backends.insert("primary".to_string(), primary.clone() as DynStorage);
2748        backends.insert("search".to_string(), search.clone() as DynStorage);
2749
2750        let mut search_providers = HashMap::new();
2751        search_providers.insert("primary".to_string(), primary.clone() as DynSearchProvider);
2752        search_providers.insert("search".to_string(), search.clone() as DynSearchProvider);
2753
2754        let composite = CompositeStorage::new(composite_config, backends)
2755            .expect("create composite storage")
2756            .with_search_providers(search_providers);
2757
2758        let tenant = TenantContext::new(
2759            TenantId::new("tenant-failure"),
2760            TenantPermissions::full_access(),
2761        );
2762        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
2763            name: "_id".to_string(),
2764            param_type: SearchParamType::Token,
2765            modifier: None,
2766            values: vec![SearchValue::eq("failure-patient")],
2767            chain: vec![],
2768            components: vec![],
2769        });
2770
2771        let runtime = tokio::runtime::Builder::new_current_thread()
2772            .enable_all()
2773            .build()
2774            .expect("build tokio runtime");
2775        let err = runtime
2776            .block_on(composite.search(&tenant, &query))
2777            .expect_err("delegated search should fail when search backend is down");
2778
2779        assert!(matches!(
2780            err,
2781            StorageError::Backend(BackendError::ConnectionFailed {
2782                backend_name,
2783                message,
2784            }) if backend_name == "search" && message.contains("simulated search outage")
2785        ));
2786
2787        let health = composite
2788            .backend_health("search")
2789            .expect("search backend health should exist");
2790        assert!(
2791            !health.healthy,
2792            "search backend should be marked unhealthy after failure"
2793        );
2794        assert_eq!(health.failure_count, 1);
2795        assert_eq!(
2796            health.last_error.as_deref(),
2797            Some("connection failed to search: simulated search outage")
2798        );
2799    }
2800
2801    // ── CompositeStorage::new() ────────────────────────────────────
2802
2803    #[test]
2804    fn test_new_success() {
2805        let composite = make_composite_no_secondary();
2806        assert_eq!(composite.backend_name(), "composite");
2807    }
2808
2809    #[test]
2810    fn test_new_missing_primary_backend_in_map() {
2811        let config = CompositeConfig::builder()
2812            .primary("primary", BackendKind::Sqlite)
2813            .build()
2814            .unwrap();
2815        // Deliberately omit the primary from the backends map
2816        let backends: HashMap<String, DynStorage> = HashMap::new();
2817        let result = CompositeStorage::new(config, backends);
2818        assert!(result.is_err());
2819    }
2820
2821    #[test]
2822    fn test_new_with_secondary() {
2823        let composite = make_composite_with_secondary();
2824        assert!(composite.secondary("es").is_some());
2825        assert!(composite.secondary("nonexistent").is_none());
2826    }
2827
2828    // ── Accessors ─────────────────────────────────────────────────
2829
2830    #[test]
2831    fn test_config_accessor() {
2832        let composite = make_composite_no_secondary();
2833        assert_eq!(composite.config().primary_id(), Some("primary"));
2834    }
2835
2836    #[test]
2837    fn test_primary_accessor() {
2838        let composite = make_composite_no_secondary();
2839        assert_eq!(composite.primary().backend_name(), "mock");
2840    }
2841
2842    #[test]
2843    fn test_secondaries_accessor() {
2844        let composite = make_composite_with_secondary();
2845        assert_eq!(composite.secondaries().len(), 1);
2846        assert!(composite.secondaries().contains_key("es"));
2847    }
2848
2849    #[test]
2850    fn test_secondaries_empty_when_no_secondary() {
2851        let composite = make_composite_no_secondary();
2852        assert!(composite.secondaries().is_empty());
2853    }
2854
2855    #[test]
2856    fn test_secondary_accessor_present() {
2857        let composite = make_composite_with_secondary();
2858        assert!(composite.secondary("es").is_some());
2859    }
2860
2861    #[test]
2862    fn test_secondary_accessor_absent() {
2863        let composite = make_composite_no_secondary();
2864        assert!(composite.secondary("missing").is_none());
2865    }
2866
2867    // ── Health tracking ───────────────────────────────────────────
2868
2869    #[test]
2870    fn test_backend_health_initially_healthy() {
2871        let composite = make_composite_no_secondary();
2872        let health = composite.backend_health("primary").unwrap();
2873        assert!(health.healthy);
2874    }
2875
2876    #[test]
2877    fn test_backend_health_missing_id_returns_none() {
2878        let composite = make_composite_no_secondary();
2879        assert!(composite.backend_health("nonexistent").is_none());
2880    }
2881
2882    #[test]
2883    fn test_is_backend_healthy_true() {
2884        let composite = make_composite_no_secondary();
2885        assert!(composite.is_backend_healthy("primary"));
2886    }
2887
2888    #[test]
2889    fn test_is_backend_healthy_unknown_returns_false() {
2890        let composite = make_composite_no_secondary();
2891        assert!(!composite.is_backend_healthy("nonexistent"));
2892    }
2893
2894    #[test]
2895    fn test_update_health_success_resets_failures() {
2896        let composite = make_composite_no_secondary();
2897        // First, record a few failures
2898        composite.update_health("primary", false, Some("err1".to_string()));
2899        composite.update_health("primary", false, Some("err2".to_string()));
2900        let health = composite.backend_health("primary").unwrap();
2901        assert_eq!(health.failure_count, 2);
2902
2903        // Now record a success — should reset
2904        composite.update_health("primary", true, None);
2905        let health = composite.backend_health("primary").unwrap();
2906        assert!(health.healthy);
2907        assert_eq!(health.failure_count, 0);
2908        assert!(health.last_error.is_none());
2909        assert!(health.last_success.is_some());
2910    }
2911
2912    #[test]
2913    fn test_update_health_failure_increments_count() {
2914        let composite = make_composite_no_secondary();
2915        composite.update_health("primary", false, Some("timeout".to_string()));
2916        let health = composite.backend_health("primary").unwrap();
2917        assert_eq!(health.failure_count, 1);
2918        assert_eq!(health.last_error.as_deref(), Some("timeout"));
2919    }
2920
2921    #[test]
2922    fn test_update_health_marks_unhealthy_after_threshold() {
2923        let composite = make_composite_no_secondary();
2924        // Default failure_threshold is 3
2925        let threshold = composite.config.health_config.failure_threshold;
2926        for i in 0..threshold {
2927            composite.update_health("primary", false, Some(format!("error {}", i)));
2928        }
2929        let health = composite.backend_health("primary").unwrap();
2930        assert!(!health.healthy);
2931        assert_eq!(health.failure_count, threshold);
2932    }
2933
2934    #[test]
2935    fn test_update_health_ignores_unknown_backend() {
2936        let composite = make_composite_no_secondary();
2937        // Should not panic on unknown backend ID
2938        composite.update_health("nonexistent", false, Some("err".to_string()));
2939    }
2940
2941    // ── sync_to_secondaries when sync_manager is None ─────────────
2942
2943    #[tokio::test]
2944    async fn test_sync_to_secondaries_no_sync_manager_returns_ok() {
2945        let composite = make_composite_no_secondary();
2946        // No sync manager (no secondaries) → should return Ok immediately
2947        let result = composite
2948            .sync_to_secondaries(super::super::sync::SyncEvent::Delete {
2949                resource_type: "Patient".to_string(),
2950                resource_id: "1".to_string(),
2951                tenant_id: TenantId::new("test"),
2952            })
2953            .await;
2954        assert!(result.is_ok());
2955    }
2956
2957    // ── routing_error_to_storage_error ────────────────────────────
2958
2959    #[test]
2960    fn test_routing_error_no_primary_backend() {
2961        let composite = make_composite_no_secondary();
2962        let err = composite.routing_error_to_storage_error(RoutingError::NoPrimaryBackend);
2963        match err {
2964            StorageError::Backend(BackendError::Unavailable { backend_name, .. }) => {
2965                assert_eq!(backend_name, "primary");
2966            }
2967            other => panic!("unexpected error: {:?}", other),
2968        }
2969    }
2970
2971    #[test]
2972    fn test_routing_error_no_capable_backend() {
2973        use super::super::analyzer::QueryFeature;
2974        let composite = make_composite_no_secondary();
2975        let err = composite.routing_error_to_storage_error(RoutingError::NoCapableBackend {
2976            feature: QueryFeature::FullTextSearch,
2977        });
2978        match err {
2979            StorageError::Backend(BackendError::UnsupportedCapability {
2980                backend_name,
2981                capability,
2982            }) => {
2983                assert_eq!(backend_name, "composite");
2984                assert!(!capability.is_empty());
2985            }
2986            other => panic!("unexpected error: {:?}", other),
2987        }
2988    }
2989
2990    #[test]
2991    fn test_routing_error_backend_unavailable() {
2992        let composite = make_composite_no_secondary();
2993        let err = composite.routing_error_to_storage_error(RoutingError::BackendUnavailable {
2994            backend_id: "my-backend".to_string(),
2995        });
2996        match err {
2997            StorageError::Backend(BackendError::ConnectionFailed { backend_name, .. }) => {
2998                assert_eq!(backend_name, "my-backend");
2999            }
3000            other => panic!("unexpected error: {:?}", other),
3001        }
3002    }
3003
3004    // ── CapabilityProvider ────────────────────────────────────────
3005
3006    #[test]
3007    fn test_capabilities_backend_name() {
3008        let composite = make_composite_no_secondary();
3009        let caps = composite.capabilities();
3010        assert_eq!(caps.backend_name, "composite");
3011    }
3012
3013    #[test]
3014    fn test_backend_name_is_composite() {
3015        let composite = make_composite_no_secondary();
3016        assert_eq!(composite.backend_name(), "composite");
3017    }
3018
3019    // ── "No capability" error paths ───────────────────────────────
3020
3021    #[tokio::test]
3022    async fn test_conditional_create_no_capability() {
3023        use crate::core::ConditionalStorage;
3024        let composite = make_composite_no_secondary();
3025        let tenant = make_tenant();
3026        let result = composite
3027            .conditional_create(
3028                &tenant,
3029                "Patient",
3030                serde_json::json!({}),
3031                "identifier=foo",
3032                FhirVersion::default(),
3033            )
3034            .await;
3035        assert!(result.is_err());
3036    }
3037
3038    #[tokio::test]
3039    async fn test_versioned_storage_vread_no_capability() {
3040        use crate::core::VersionedStorage;
3041        let composite = make_composite_no_secondary();
3042        let tenant = make_tenant();
3043        let result = composite.vread(&tenant, "Patient", "1", "1").await;
3044        assert!(result.is_err());
3045        match result.unwrap_err() {
3046            StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3047                assert!(capability.contains("VersionedStorage"));
3048            }
3049            other => panic!("unexpected error: {:?}", other),
3050        }
3051    }
3052
3053    #[tokio::test]
3054    async fn test_instance_history_no_capability() {
3055        use crate::core::InstanceHistoryProvider;
3056        use crate::core::history::HistoryParams;
3057        let composite = make_composite_no_secondary();
3058        let tenant = make_tenant();
3059        let result = composite
3060            .history_instance(&tenant, "Patient", "1", &HistoryParams::default())
3061            .await;
3062        assert!(result.is_err());
3063        match result.unwrap_err() {
3064            StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3065                assert!(capability.contains("InstanceHistoryProvider"));
3066            }
3067            other => panic!("unexpected error: {:?}", other),
3068        }
3069    }
3070
3071    #[tokio::test]
3072    async fn test_bundle_provider_process_batch_no_capability() {
3073        use crate::core::BundleProvider;
3074        let composite = make_composite_no_secondary();
3075        let tenant = make_tenant();
3076        let result = composite.process_batch(&tenant, vec![]).await;
3077        assert!(result.is_err());
3078        match result.unwrap_err() {
3079            StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3080                assert!(capability.contains("BundleProvider"));
3081            }
3082            other => panic!("unexpected error: {:?}", other),
3083        }
3084    }
3085
3086    #[tokio::test]
3087    async fn test_bundle_provider_process_transaction_no_capability() {
3088        use crate::core::BundleProvider;
3089        let composite = make_composite_no_secondary();
3090        let tenant = make_tenant();
3091        let result = composite.process_transaction(&tenant, vec![]).await;
3092        assert!(result.is_err());
3093    }
3094
3095    // ── search_count when no search provider ──────────────────────
3096
3097    #[tokio::test]
3098    async fn test_search_count_no_search_provider_returns_error() {
3099        use crate::core::SearchProvider;
3100        use crate::types::SearchQuery;
3101        let composite = make_composite_no_secondary();
3102        let tenant = make_tenant();
3103        let query = SearchQuery::new("Patient");
3104        let result = composite.search_count(&tenant, &query).await;
3105        assert!(result.is_err());
3106        match result.unwrap_err() {
3107            StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => {
3108                assert!(capability.contains("search_count"));
3109            }
3110            other => panic!("unexpected error: {:?}", other),
3111        }
3112    }
3113
3114    // ── with_search_providers ─────────────────────────────────────
3115
3116    #[test]
3117    fn test_with_search_providers() {
3118        let composite = make_composite_no_secondary();
3119        let mut providers = HashMap::new();
3120        providers.insert(
3121            "primary".to_string(),
3122            Arc::new(MockStorage) as DynSearchProvider,
3123        );
3124        let composite = composite.with_search_providers(providers);
3125        // Should have one search provider now
3126        assert!(composite.search_providers.contains_key("primary"));
3127    }
3128
3129    // ── extract_reference_values (static method) ──────────────────
3130
3131    #[test]
3132    fn test_extract_reference_values_object_with_reference() {
3133        let obj = serde_json::json!({"reference": "Patient/123"});
3134        let mut refs = Vec::new();
3135        CompositeStorage::extract_reference_values(&obj, &mut refs);
3136        assert_eq!(refs, vec!["Patient/123"]);
3137    }
3138
3139    #[test]
3140    fn test_extract_reference_values_object_without_reference() {
3141        let obj = serde_json::json!({"display": "John Smith"});
3142        let mut refs = Vec::new();
3143        CompositeStorage::extract_reference_values(&obj, &mut refs);
3144        assert!(refs.is_empty());
3145    }
3146
3147    #[test]
3148    fn test_extract_reference_values_array() {
3149        let arr = serde_json::json!([
3150            {"reference": "Patient/1"},
3151            {"reference": "Patient/2"},
3152            {"display": "no ref here"}
3153        ]);
3154        let mut refs = Vec::new();
3155        CompositeStorage::extract_reference_values(&arr, &mut refs);
3156        assert_eq!(refs.len(), 2);
3157        assert!(refs.contains(&"Patient/1".to_string()));
3158        assert!(refs.contains(&"Patient/2".to_string()));
3159    }
3160
3161    #[test]
3162    fn test_extract_reference_values_primitive_ignored() {
3163        let val = serde_json::json!("just a string");
3164        let mut refs = Vec::new();
3165        CompositeStorage::extract_reference_values(&val, &mut refs);
3166        assert!(refs.is_empty());
3167    }
3168
3169    /// `extract_references` should consult the registry first and use the
3170    /// FHIRPath `expression` from the registered SearchParameter, not the
3171    /// hardcoded JSON-field-name heuristic. This exercises the new wiring
3172    /// from PR #2 of the load-bearing-stub-fixes plan.
3173    #[test]
3174    fn test_extract_references_uses_registry_expression() {
3175        use crate::search::{SearchParameterDefinition, SearchParameterRegistry};
3176        use crate::tenant::TenantId;
3177        use crate::types::SearchParamType;
3178
3179        // Composite wrapped around a backend whose registry has been seeded
3180        // with an Encounter.subject param whose FHIRPath expression points at
3181        // a JSON field name that is *different* from the search-param name —
3182        // proving the registry's expression is what's evaluated, not the
3183        // hardcoded "patient" -> "subject" alias.
3184        let registry = Arc::new(parking_lot::RwLock::new(SearchParameterRegistry::new()));
3185        registry
3186            .write()
3187            .register(
3188                SearchParameterDefinition::new(
3189                    "http://hl7.org/fhir/SearchParameter/Encounter-subject",
3190                    "subject",
3191                    SearchParamType::Reference,
3192                    "Encounter.subject",
3193                )
3194                .with_base(vec!["Encounter"])
3195                .with_targets(vec!["Patient", "Group"]),
3196            )
3197            .unwrap();
3198
3199        struct MockWithRegistry {
3200            registry: Arc<parking_lot::RwLock<SearchParameterRegistry>>,
3201        }
3202
3203        #[async_trait::async_trait]
3204        impl ResourceStorage for MockWithRegistry {
3205            fn backend_name(&self) -> &'static str {
3206                "mock-with-registry"
3207            }
3208            async fn create(
3209                &self,
3210                _tenant: &TenantContext,
3211                _resource_type: &str,
3212                _resource: serde_json::Value,
3213                _fhir_version: FhirVersion,
3214            ) -> StorageResult<crate::types::StoredResource> {
3215                unimplemented!()
3216            }
3217            async fn create_or_update(
3218                &self,
3219                _tenant: &TenantContext,
3220                _resource_type: &str,
3221                _id: &str,
3222                _resource: serde_json::Value,
3223                _fhir_version: FhirVersion,
3224            ) -> StorageResult<(crate::types::StoredResource, bool)> {
3225                unimplemented!()
3226            }
3227            async fn read(
3228                &self,
3229                _tenant: &TenantContext,
3230                _resource_type: &str,
3231                _id: &str,
3232            ) -> StorageResult<Option<crate::types::StoredResource>> {
3233                Ok(None)
3234            }
3235            async fn update(
3236                &self,
3237                _tenant: &TenantContext,
3238                _current: &crate::types::StoredResource,
3239                _resource: serde_json::Value,
3240            ) -> StorageResult<crate::types::StoredResource> {
3241                unimplemented!()
3242            }
3243            async fn delete(
3244                &self,
3245                _tenant: &TenantContext,
3246                _resource_type: &str,
3247                _id: &str,
3248            ) -> StorageResult<()> {
3249                Ok(())
3250            }
3251            async fn count(
3252                &self,
3253                _tenant: &TenantContext,
3254                _resource_type: Option<&str>,
3255            ) -> StorageResult<u64> {
3256                Ok(0)
3257            }
3258        }
3259
3260        #[async_trait::async_trait]
3261        impl SearchProvider for MockWithRegistry {
3262            async fn search(
3263                &self,
3264                _tenant: &TenantContext,
3265                _query: &crate::types::SearchQuery,
3266            ) -> StorageResult<SearchResult> {
3267                use crate::types::Page;
3268                Ok(SearchResult::new(Page::empty()))
3269            }
3270            async fn search_count(
3271                &self,
3272                _tenant: &TenantContext,
3273                _query: &crate::types::SearchQuery,
3274            ) -> StorageResult<u64> {
3275                Ok(0)
3276            }
3277            fn search_param_registry(&self) -> &Arc<parking_lot::RwLock<SearchParameterRegistry>> {
3278                &self.registry
3279            }
3280        }
3281
3282        let config = CompositeConfig::builder()
3283            .primary("primary", BackendKind::Sqlite)
3284            .build()
3285            .unwrap();
3286        let backend = Arc::new(MockWithRegistry {
3287            registry: Arc::clone(&registry),
3288        });
3289        let mut backends = HashMap::new();
3290        backends.insert("primary".to_string(), backend.clone() as DynStorage);
3291        let mut providers = HashMap::new();
3292        providers.insert("primary".to_string(), backend.clone() as DynSearchProvider);
3293        let composite = CompositeStorage::new(config, backends)
3294            .unwrap()
3295            .with_search_providers(providers);
3296
3297        // Encounter resource referencing Patient/p1 via subject.
3298        let content = serde_json::json!({
3299            "resourceType": "Encounter",
3300            "id": "e1",
3301            "subject": {"reference": "Patient/p1"},
3302        });
3303        let resource = crate::types::StoredResource::new(
3304            "Encounter",
3305            "e1",
3306            TenantId::new("t"),
3307            content,
3308            FhirVersion::default(),
3309        );
3310
3311        let refs = composite.extract_references(&resource, "subject");
3312        assert_eq!(refs, vec!["Patient/p1".to_string()]);
3313    }
3314
3315    #[test]
3316    fn test_extract_reference_values_null_ignored() {
3317        let val = serde_json::Value::Null;
3318        let mut refs = Vec::new();
3319        CompositeStorage::extract_reference_values(&val, &mut refs);
3320        assert!(refs.is_empty());
3321    }
3322
3323    // ── CRUD delegation to primary ────────────────────────────────
3324
3325    #[tokio::test]
3326    async fn test_create_delegates_to_primary() {
3327        use crate::core::ResourceStorage;
3328        let composite = make_composite_no_secondary();
3329        let tenant = make_tenant();
3330        let result = composite
3331            .create(
3332                &tenant,
3333                "Patient",
3334                serde_json::json!({"resourceType": "Patient"}),
3335                FhirVersion::default(),
3336            )
3337            .await;
3338        assert!(result.is_ok());
3339        let stored = result.unwrap();
3340        assert_eq!(stored.resource_type(), "Patient");
3341    }
3342
3343    #[tokio::test]
3344    async fn test_read_delegates_to_primary() {
3345        use crate::core::ResourceStorage;
3346        let composite = make_composite_no_secondary();
3347        let tenant = make_tenant();
3348        let result = composite.read(&tenant, "Patient", "1").await;
3349        assert!(result.is_ok());
3350        assert!(result.unwrap().is_none()); // MockStorage always returns None
3351    }
3352
3353    #[tokio::test]
3354    async fn test_count_delegates_to_primary() {
3355        use crate::core::ResourceStorage;
3356        let composite = make_composite_no_secondary();
3357        let tenant = make_tenant();
3358        let result = composite.count(&tenant, Some("Patient")).await;
3359        assert!(result.is_ok());
3360        assert_eq!(result.unwrap(), 0);
3361    }
3362
3363    #[tokio::test]
3364    async fn test_delete_delegates_to_primary() {
3365        use crate::core::ResourceStorage;
3366        let composite = make_composite_no_secondary();
3367        let tenant = make_tenant();
3368        let result = composite.delete(&tenant, "Patient", "1").await;
3369        assert!(result.is_ok());
3370    }
3371
3372    fn make_composite_with_search_provider() -> CompositeStorage {
3373        let composite = make_composite_no_secondary();
3374        let mut providers = HashMap::new();
3375        providers.insert(
3376            "primary".to_string(),
3377            Arc::new(MockStorage) as DynSearchProvider,
3378        );
3379        composite.with_search_providers(providers)
3380    }
3381
3382    /// `resolve_chain` should run iterative SearchQueries against composite's
3383    /// own search routing rather than the previous stub that returned an
3384    /// empty Vec for any chain longer than 2 segments. This test uses
3385    /// MockStorage (which returns no results) so the assertion is just that
3386    /// the implementation runs to completion and yields empty (not that it
3387    /// finds anything) — full end-to-end coverage comes from inferno against
3388    /// real backends.
3389    #[tokio::test]
3390    async fn test_resolve_chain_three_segments_does_not_error() {
3391        use crate::core::ChainedSearchProvider;
3392        let composite = make_composite_with_search_provider();
3393        let tenant = make_tenant();
3394        let result = composite
3395            .resolve_chain(
3396                &tenant,
3397                "Observation",
3398                "subject.organization.name",
3399                "Hospital",
3400            )
3401            .await;
3402        assert!(result.is_ok(), "resolve_chain failed: {:?}", result.err());
3403        assert!(result.unwrap().is_empty());
3404    }
3405
3406    #[tokio::test]
3407    async fn test_resolve_chain_short_chain_does_not_error() {
3408        use crate::core::ChainedSearchProvider;
3409        let composite = make_composite_with_search_provider();
3410        let tenant = make_tenant();
3411        let result = composite
3412            .resolve_chain(&tenant, "Observation", "patient.name", "Smith")
3413            .await;
3414        assert!(result.is_ok());
3415        assert!(result.unwrap().is_empty());
3416    }
3417
3418    #[tokio::test]
3419    async fn test_resolve_chain_invalid_chain_returns_empty() {
3420        use crate::core::ChainedSearchProvider;
3421        let composite = make_composite_with_search_provider();
3422        let tenant = make_tenant();
3423        // Single segment isn't a chain — must return empty, not error.
3424        let result = composite
3425            .resolve_chain(&tenant, "Observation", "patient", "x")
3426            .await;
3427        assert!(result.is_ok());
3428        assert!(result.unwrap().is_empty());
3429    }
3430
3431    // SearchProvider impl for MockStorage (must live inside test module).
3432    #[async_trait::async_trait]
3433    impl SearchProvider for MockStorage {
3434        async fn search(
3435            &self,
3436            _tenant: &TenantContext,
3437            _query: &crate::types::SearchQuery,
3438        ) -> StorageResult<SearchResult> {
3439            use crate::types::Page;
3440            Ok(SearchResult::new(Page::empty()))
3441        }
3442
3443        async fn search_count(
3444            &self,
3445            _tenant: &TenantContext,
3446            _query: &crate::types::SearchQuery,
3447        ) -> StorageResult<u64> {
3448            Ok(0)
3449        }
3450
3451        fn search_param_registry(
3452            &self,
3453        ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
3454            use std::sync::OnceLock;
3455            static EMPTY: OnceLock<
3456                std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>>,
3457            > = OnceLock::new();
3458            EMPTY.get_or_init(|| {
3459                std::sync::Arc::new(parking_lot::RwLock::new(
3460                    crate::search::SearchParameterRegistry::new(),
3461                ))
3462            })
3463        }
3464    }
3465}