Skip to main content

helios_persistence/composite/
storage.rs

1//! CompositeStorage implementation.
2//!
3//! This module provides the main `CompositeStorage` struct that coordinates
4//! multiple backends for FHIR resource storage and querying.
5//!
6//! # Overview
7//!
8//! `CompositeStorage` implements all storage traits by delegating to appropriate
9//! backends based on operation type:
10//!
11//! - **Writes (CRUD)**: Always go to the primary backend
12//! - **Reads**: Go to primary, with optional secondary enrichment
13//! - **Search**: Routed based on query features to optimal backends
14//!
15//! # Example
16//!
17//! ```ignore
18//! use helios_persistence::composite::{CompositeStorage, CompositeConfig};
19//!
20//! let config = CompositeConfig::builder()
21//!     .primary("sqlite", BackendKind::Sqlite)
22//!     .search_backend("es", BackendKind::Elasticsearch)
23//!     .build()?;
24//!
25//! let storage = CompositeStorage::new(config, backends).await?;
26//!
27//! // All CRUD goes to primary (sqlite)
28//! let patient = storage.create(&tenant, "Patient", patient_json).await?;
29//!
30//! // Search is routed based on features
31//! // - Basic search → sqlite
32//! // - Full-text (_text) → elasticsearch
33//! let results = storage.search(&tenant, &query).await?;
34//! ```
35
36use std::collections::HashMap;
37use std::sync::Arc;
38
39use async_trait::async_trait;
40use helios_fhir::FhirVersion;
41use parking_lot::RwLock;
42use serde_json::Value;
43use tracing::{debug, instrument, warn};
44
45use crate::core::history::HistoryParams;
46use crate::core::{
47    BundleEntry, BundleProvider, BundleResult, CapabilityProvider, ChainedSearchProvider,
48    ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
49    ConditionalUpdateResult, IncludeProvider, InstanceHistoryProvider, PatchFormat,
50    ResourceStorage, RevincludeProvider, SearchProvider, SearchResult, StorageCapabilities,
51    TerminologySearchProvider, TextSearchProvider, VersionedStorage,
52};
53use crate::error::{BackendError, StorageError, StorageResult, TransactionError};
54use crate::tenant::TenantContext;
55use crate::types::{
56    IncludeDirective, Pagination, ReverseChainedParameter, SearchQuery, StoredResource,
57};
58
59use super::config::CompositeConfig;
60use super::merger::{MergeOptions, ResultMerger};
61use super::router::{QueryRouter, RoutingDecision, RoutingError};
62use super::sync::{SyncEvent, SyncManager};
63
64/// A dynamically typed storage backend.
65pub type DynStorage = Arc<dyn ResourceStorage + Send + Sync>;
66
67/// A dynamically typed search provider.
68pub type DynSearchProvider = Arc<dyn SearchProvider + Send + Sync>;
69
70/// A dynamically typed conditional storage provider.
71pub type DynConditionalStorage = Arc<dyn ConditionalStorage + Send + Sync>;
72
73/// A dynamically typed versioned storage provider.
74pub type DynVersionedStorage = Arc<dyn VersionedStorage + Send + Sync>;
75
76/// A dynamically typed instance history provider.
77pub type DynInstanceHistoryProvider = Arc<dyn InstanceHistoryProvider + Send + Sync>;
78
79/// A dynamically typed bundle provider.
80pub type DynBundleProvider = Arc<dyn BundleProvider + Send + Sync>;
81
82/// Composite storage that coordinates multiple backends.
83///
84/// This is the main entry point for polyglot persistence. It implements
85/// all storage traits and routes operations to appropriate backends.
86///
87/// # Full Primary Support
88///
89/// When the primary backend supports advanced traits (versioning, conditional
90/// operations, history, bundles), use [`with_full_primary()`](Self::with_full_primary)
91/// to enable delegation of these operations through the composite layer.
92pub struct CompositeStorage {
93    /// Configuration.
94    config: CompositeConfig,
95
96    /// Primary storage backend.
97    primary: DynStorage,
98
99    /// Secondary backends by ID.
100    secondaries: HashMap<String, DynStorage>,
101
102    /// Search providers by backend ID.
103    search_providers: HashMap<String, DynSearchProvider>,
104
105    /// Query router.
106    router: QueryRouter,
107
108    /// Result merger.
109    merger: ResultMerger,
110
111    /// Synchronization manager.
112    sync_manager: Option<SyncManager>,
113
114    /// Backend health status.
115    health_status: Arc<RwLock<HashMap<String, BackendHealth>>>,
116
117    // Typed trait objects for primary's advanced capabilities.
118    // These are set via `with_full_primary()` to support delegation.
119    /// Primary as ConditionalStorage (if supported).
120    conditional_storage: Option<DynConditionalStorage>,
121
122    /// Primary as VersionedStorage (if supported).
123    versioned_storage: Option<DynVersionedStorage>,
124
125    /// Primary as InstanceHistoryProvider (if supported).
126    history_provider: Option<DynInstanceHistoryProvider>,
127
128    /// Primary as BundleProvider (if supported).
129    bundle_provider: Option<DynBundleProvider>,
130}
131
132/// Health status for a backend.
133#[derive(Debug, Clone)]
134pub struct BackendHealth {
135    /// Whether the backend is healthy.
136    pub healthy: bool,
137
138    /// Last successful operation timestamp.
139    pub last_success: Option<std::time::Instant>,
140
141    /// Consecutive failure count.
142    pub failure_count: u32,
143
144    /// Last error message.
145    pub last_error: Option<String>,
146}
147
148impl Default for BackendHealth {
149    fn default() -> Self {
150        Self {
151            healthy: true,
152            last_success: None,
153            failure_count: 0,
154            last_error: None,
155        }
156    }
157}
158
159impl CompositeStorage {
160    /// Creates a new composite storage with the given configuration and backends.
161    ///
162    /// # Arguments
163    ///
164    /// * `config` - The composite storage configuration
165    /// * `backends` - Map of backend ID to storage implementation
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the primary backend is not found in the backends map.
170    pub fn new(
171        config: CompositeConfig,
172        backends: HashMap<String, DynStorage>,
173    ) -> StorageResult<Self> {
174        let primary_id = config.primary_id().ok_or_else(|| {
175            StorageError::Backend(BackendError::Unavailable {
176                backend_name: "primary".to_string(),
177                message: "No primary backend configured".to_string(),
178            })
179        })?;
180
181        let primary = backends.get(primary_id).cloned().ok_or_else(|| {
182            StorageError::Backend(BackendError::Unavailable {
183                backend_name: primary_id.to_string(),
184                message: format!("Primary backend '{}' not found in backends map", primary_id),
185            })
186        })?;
187
188        // Separate out secondaries
189        let secondaries: HashMap<_, _> = backends
190            .iter()
191            .filter(|(id, _)| *id != primary_id)
192            .map(|(id, backend)| (id.clone(), backend.clone()))
193            .collect();
194
195        // Initialize health status
196        let mut health_status = HashMap::new();
197        health_status.insert(primary_id.to_string(), BackendHealth::default());
198        for id in secondaries.keys() {
199            health_status.insert(id.clone(), BackendHealth::default());
200        }
201
202        let router = QueryRouter::new(config.clone());
203        let merger = ResultMerger::new();
204
205        // Create sync manager if we have secondaries
206        let sync_manager = if !secondaries.is_empty() {
207            Some(SyncManager::new(config.sync_config.clone()))
208        } else {
209            None
210        };
211
212        Ok(Self {
213            config,
214            primary,
215            secondaries,
216            search_providers: HashMap::new(),
217            router,
218            merger,
219            sync_manager,
220            health_status: Arc::new(RwLock::new(health_status)),
221            conditional_storage: None,
222            versioned_storage: None,
223            history_provider: None,
224            bundle_provider: None,
225        })
226    }
227
228    /// Creates a composite storage with search providers.
229    ///
230    /// Search providers allow specialized search backends like Elasticsearch.
231    pub fn with_search_providers(mut self, providers: HashMap<String, DynSearchProvider>) -> Self {
232        self.search_providers = providers;
233        self
234    }
235
236    /// Registers the primary backend's advanced capabilities for delegation.
237    ///
238    /// When the primary backend implements traits beyond `ResourceStorage`
239    /// (e.g., `ConditionalStorage`, `VersionedStorage`, `InstanceHistoryProvider`,
240    /// `BundleProvider`), this method stores typed references so that
241    /// `CompositeStorage` can delegate these operations to the primary.
242    ///
243    /// # Example
244    ///
245    /// ```ignore
246    /// let sqlite = Arc::new(SqliteBackend::new("fhir.db")?);
247    /// let composite = CompositeStorage::new(config, backends)?
248    ///     .with_full_primary(sqlite.clone());
249    /// ```
250    pub fn with_full_primary<T>(mut self, primary: Arc<T>) -> Self
251    where
252        T: ResourceStorage
253            + ConditionalStorage
254            + VersionedStorage
255            + InstanceHistoryProvider
256            + BundleProvider
257            + Send
258            + Sync
259            + 'static,
260    {
261        self.conditional_storage = Some(primary.clone() as DynConditionalStorage);
262        self.versioned_storage = Some(primary.clone() as DynVersionedStorage);
263        self.history_provider = Some(primary.clone() as DynInstanceHistoryProvider);
264        self.bundle_provider = Some(primary as DynBundleProvider);
265        self
266    }
267
268    /// Returns the configuration.
269    pub fn config(&self) -> &CompositeConfig {
270        &self.config
271    }
272
273    /// Returns the primary backend.
274    pub fn primary(&self) -> &DynStorage {
275        &self.primary
276    }
277
278    /// Returns a secondary backend by ID.
279    pub fn secondary(&self, id: &str) -> Option<&DynStorage> {
280        self.secondaries.get(id)
281    }
282
283    /// Returns all secondary backends.
284    pub fn secondaries(&self) -> &HashMap<String, DynStorage> {
285        &self.secondaries
286    }
287
288    /// Returns the health status for a backend.
289    pub fn backend_health(&self, id: &str) -> Option<BackendHealth> {
290        self.health_status.read().get(id).cloned()
291    }
292
293    /// Returns true if a backend is healthy.
294    pub fn is_backend_healthy(&self, id: &str) -> bool {
295        self.health_status
296            .read()
297            .get(id)
298            .map(|h| h.healthy)
299            .unwrap_or(false)
300    }
301
302    /// Updates health status after an operation.
303    fn update_health(&self, backend_id: &str, success: bool, error: Option<String>) {
304        let mut status = self.health_status.write();
305        if let Some(health) = status.get_mut(backend_id) {
306            if success {
307                health.healthy = true;
308                health.last_success = Some(std::time::Instant::now());
309                health.failure_count = 0;
310                health.last_error = None;
311            } else {
312                health.failure_count += 1;
313                health.last_error = error;
314
315                // Mark unhealthy after threshold failures
316                if health.failure_count >= self.config.health_config.failure_threshold {
317                    health.healthy = false;
318                    warn!(
319                        backend_id = backend_id,
320                        failures = health.failure_count,
321                        "Backend marked unhealthy"
322                    );
323                }
324            }
325        }
326    }
327
328    /// Synchronizes a resource change to secondary backends.
329    async fn sync_to_secondaries(&self, event: SyncEvent) -> StorageResult<()> {
330        if let Some(ref sync_manager) = self.sync_manager {
331            sync_manager.sync(&event, &self.secondaries).await?;
332        }
333        Ok(())
334    }
335
336    /// Routes and executes a search query.
337    #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
338    async fn execute_routed_search(
339        &self,
340        tenant: &TenantContext,
341        query: &SearchQuery,
342    ) -> StorageResult<SearchResult> {
343        // Route the query
344        let decision = self
345            .router
346            .route(query)
347            .map_err(|e| self.routing_error_to_storage_error(e))?;
348
349        debug!(
350            primary = %decision.primary_target,
351            auxiliary_count = decision.auxiliary_targets.len(),
352            merge_strategy = ?decision.merge_strategy,
353            "Routing query"
354        );
355
356        // If no auxiliary backends, just execute on primary
357        if decision.auxiliary_targets.is_empty() {
358            return self.execute_primary_search(tenant, query).await;
359        }
360
361        // Execute on all backends in parallel
362        let (primary_result, auxiliary_results) = self
363            .execute_parallel_search(tenant, query, &decision)
364            .await?;
365
366        // Merge results
367        let merge_options = MergeOptions {
368            strategy: decision.merge_strategy,
369            preserve_primary_order: true,
370            deduplicate: true,
371        };
372
373        self.merger
374            .merge(primary_result, auxiliary_results, merge_options)
375    }
376
377    /// Executes search on the primary backend.
378    ///
379    /// When a dedicated Search backend (e.g., Elasticsearch) is configured, all
380    /// searches are routed there. This is necessary because the primary backend
381    /// may have search indexing disabled (`search_offloaded = true`), leaving its
382    /// search index empty.
383    async fn execute_primary_search(
384        &self,
385        tenant: &TenantContext,
386        query: &SearchQuery,
387    ) -> StorageResult<SearchResult> {
388        // Prefer the Search backend when one is configured, since the primary
389        // may have offloaded search indexing to it.
390        if let Some(search_backend) = self
391            .config
392            .backends_with_role(super::config::BackendRole::Search)
393            .next()
394        {
395            if let Some(provider) = self.search_providers.get(&search_backend.id) {
396                let result = provider.search(tenant, query).await;
397                self.update_health(
398                    &search_backend.id,
399                    result.is_ok(),
400                    result.as_ref().err().map(|e| e.to_string()),
401                );
402                return result;
403            }
404        }
405
406        // Fall back to primary
407        let primary_id = self.config.primary_id().unwrap_or("primary");
408
409        if let Some(provider) = self.search_providers.get(primary_id) {
410            let result = provider.search(tenant, query).await;
411            self.update_health(
412                primary_id,
413                result.is_ok(),
414                result.as_ref().err().map(|e| e.to_string()),
415            );
416            result
417        } else {
418            Err(StorageError::Backend(BackendError::UnsupportedCapability {
419                backend_name: primary_id.to_string(),
420                capability: "SearchProvider".to_string(),
421            }))
422        }
423    }
424
425    /// Executes search on primary and auxiliary backends in parallel.
426    async fn execute_parallel_search(
427        &self,
428        tenant: &TenantContext,
429        query: &SearchQuery,
430        decision: &RoutingDecision,
431    ) -> StorageResult<(SearchResult, Vec<(String, SearchResult)>)> {
432        use tokio::task::JoinSet;
433
434        let mut tasks: JoinSet<(String, StorageResult<SearchResult>)> = JoinSet::new();
435
436        // Clone what we need for async tasks
437        let tenant = tenant.clone();
438        let query = query.clone();
439        let primary_id = decision.primary_target.clone();
440
441        // Start primary search
442        if let Some(provider) = self.search_providers.get(&primary_id).cloned() {
443            let t = tenant.clone();
444            let q = query.clone();
445            let id = primary_id.clone();
446            tasks.spawn(async move {
447                let result = provider.search(&t, &q).await;
448                (id, result)
449            });
450        }
451
452        // Start auxiliary searches
453        for (feature, backend_id) in &decision.auxiliary_targets {
454            if let Some(provider) = self.search_providers.get(backend_id).cloned() {
455                // Create a modified query with only the relevant parameters
456                let part_params = decision
457                    .analysis
458                    .feature_params
459                    .get(feature)
460                    .cloned()
461                    .unwrap_or_default();
462
463                let mut aux_query = SearchQuery::new(&query.resource_type);
464                for param in part_params {
465                    aux_query = aux_query.with_parameter(param);
466                }
467                aux_query.count = query.count;
468                aux_query.offset = query.offset;
469                aux_query.cursor = query.cursor.clone();
470
471                let t = tenant.clone();
472                let id = backend_id.clone();
473                tasks.spawn(async move {
474                    let result = provider.search(&t, &aux_query).await;
475                    (id, result)
476                });
477            }
478        }
479
480        // Collect results
481        let mut primary_result = None;
482        let mut auxiliary_results = Vec::new();
483
484        while let Some(result) = tasks.join_next().await {
485            match result {
486                Ok((id, search_result)) => {
487                    self.update_health(
488                        &id,
489                        search_result.is_ok(),
490                        search_result.as_ref().err().map(|e| e.to_string()),
491                    );
492
493                    if id == primary_id {
494                        primary_result = Some(search_result?);
495                    } else if let Ok(res) = search_result {
496                        auxiliary_results.push((id, res));
497                    }
498                    // Ignore auxiliary failures - graceful degradation
499                }
500                Err(e) => {
501                    warn!(error = %e, "Task join error during parallel search");
502                }
503            }
504        }
505
506        let primary = primary_result.ok_or_else(|| {
507            StorageError::Backend(BackendError::ConnectionFailed {
508                backend_name: primary_id,
509                message: "Primary search task failed".to_string(),
510            })
511        })?;
512
513        Ok((primary, auxiliary_results))
514    }
515
516    /// Syncs bundle results to secondaries by extracting resource info from responses.
517    async fn sync_bundle_results(&self, tenant: &TenantContext, result: &BundleResult) {
518        for entry_result in &result.entries {
519            // Only sync successful mutating operations that have a resource body
520            if let Some(ref resource_json) = entry_result.resource {
521                let resource_type = resource_json
522                    .get("resourceType")
523                    .and_then(|v| v.as_str())
524                    .unwrap_or_default();
525                let resource_id = resource_json
526                    .get("id")
527                    .and_then(|v| v.as_str())
528                    .unwrap_or_default();
529
530                if resource_type.is_empty() || resource_id.is_empty() {
531                    continue;
532                }
533
534                let fhir_version = resource_json
535                    .get("meta")
536                    .and_then(|m| m.get("profile"))
537                    .map(|_| FhirVersion::default())
538                    .unwrap_or_default();
539
540                if let Err(e) = self
541                    .sync_to_secondaries(SyncEvent::Create {
542                        resource_type: resource_type.to_string(),
543                        resource_id: resource_id.to_string(),
544                        content: resource_json.clone(),
545                        tenant_id: tenant.tenant_id().clone(),
546                        fhir_version,
547                    })
548                    .await
549                {
550                    warn!(
551                        error = %e,
552                        resource_type = resource_type,
553                        resource_id = resource_id,
554                        "Failed to sync bundle entry to secondaries"
555                    );
556                }
557            }
558        }
559    }
560
561    /// Converts a routing error to a storage error.
562    fn routing_error_to_storage_error(&self, err: RoutingError) -> StorageError {
563        match err {
564            RoutingError::NoPrimaryBackend => StorageError::Backend(BackendError::Unavailable {
565                backend_name: "primary".to_string(),
566                message: "No primary backend configured".to_string(),
567            }),
568            RoutingError::NoCapableBackend { feature } => {
569                StorageError::Backend(BackendError::UnsupportedCapability {
570                    backend_name: "composite".to_string(),
571                    capability: format!("{:?}", feature),
572                })
573            }
574            RoutingError::BackendUnavailable { backend_id } => {
575                StorageError::Backend(BackendError::ConnectionFailed {
576                    backend_name: backend_id,
577                    message: "Backend unavailable".to_string(),
578                })
579            }
580        }
581    }
582}
583
584#[async_trait]
585impl ResourceStorage for CompositeStorage {
586    fn backend_name(&self) -> &'static str {
587        "composite"
588    }
589
590    #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type))]
591    async fn create(
592        &self,
593        tenant: &TenantContext,
594        resource_type: &str,
595        resource: Value,
596        fhir_version: FhirVersion,
597    ) -> StorageResult<StoredResource> {
598        // All writes go to primary
599        let result = self
600            .primary
601            .create(tenant, resource_type, resource.clone(), fhir_version)
602            .await;
603
604        let primary_id = self.config.primary_id().unwrap_or("primary");
605        self.update_health(
606            primary_id,
607            result.is_ok(),
608            result.as_ref().err().map(|e| e.to_string()),
609        );
610
611        let stored = result?;
612
613        // Sync to secondaries
614        if let Err(e) = self
615            .sync_to_secondaries(SyncEvent::Create {
616                resource_type: resource_type.to_string(),
617                resource_id: stored.id().to_string(),
618                content: stored.content().clone(),
619                tenant_id: tenant.tenant_id().clone(),
620                fhir_version,
621            })
622            .await
623        {
624            warn!(error = %e, "Failed to sync create to secondaries");
625            // Don't fail the operation - primary succeeded
626        }
627
628        Ok(stored)
629    }
630
631    #[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type, id = %id))]
632    async fn create_or_update(
633        &self,
634        tenant: &TenantContext,
635        resource_type: &str,
636        id: &str,
637        resource: Value,
638        fhir_version: FhirVersion,
639    ) -> StorageResult<(StoredResource, bool)> {
640        let result = self
641            .primary
642            .create_or_update(tenant, resource_type, id, resource.clone(), fhir_version)
643            .await;
644
645        let primary_id = self.config.primary_id().unwrap_or("primary");
646        self.update_health(
647            primary_id,
648            result.is_ok(),
649            result.as_ref().err().map(|e| e.to_string()),
650        );
651
652        let (stored, created) = result?;
653
654        // Sync to secondaries
655        let event = if created {
656            SyncEvent::Create {
657                resource_type: resource_type.to_string(),
658                resource_id: id.to_string(),
659                content: stored.content().clone(),
660                tenant_id: tenant.tenant_id().clone(),
661                fhir_version,
662            }
663        } else {
664            SyncEvent::Update {
665                resource_type: resource_type.to_string(),
666                resource_id: id.to_string(),
667                content: stored.content().clone(),
668                tenant_id: tenant.tenant_id().clone(),
669                version: stored.version_id().to_string(),
670                fhir_version,
671            }
672        };
673
674        if let Err(e) = self.sync_to_secondaries(event).await {
675            warn!(error = %e, "Failed to sync create_or_update to secondaries");
676        }
677
678        Ok((stored, created))
679    }
680
681    #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
682    async fn read(
683        &self,
684        tenant: &TenantContext,
685        resource_type: &str,
686        id: &str,
687    ) -> StorageResult<Option<StoredResource>> {
688        // Reads always go to primary (source of truth)
689        let result = self.primary.read(tenant, resource_type, id).await;
690
691        let primary_id = self.config.primary_id().unwrap_or("primary");
692        self.update_health(
693            primary_id,
694            result.is_ok(),
695            result.as_ref().err().map(|e| e.to_string()),
696        );
697
698        result
699    }
700
701    #[instrument(skip(self, tenant, resource), fields(resource_type = %current.resource_type(), id = %current.id()))]
702    async fn update(
703        &self,
704        tenant: &TenantContext,
705        current: &StoredResource,
706        resource: Value,
707    ) -> StorageResult<StoredResource> {
708        let result = self.primary.update(tenant, current, resource.clone()).await;
709
710        let primary_id = self.config.primary_id().unwrap_or("primary");
711        self.update_health(
712            primary_id,
713            result.is_ok(),
714            result.as_ref().err().map(|e| e.to_string()),
715        );
716
717        let stored = result?;
718
719        // Sync to secondaries
720        if let Err(e) = self
721            .sync_to_secondaries(SyncEvent::Update {
722                resource_type: current.resource_type().to_string(),
723                resource_id: current.id().to_string(),
724                content: stored.content().clone(),
725                tenant_id: tenant.tenant_id().clone(),
726                version: stored.version_id().to_string(),
727                fhir_version: stored.fhir_version(),
728            })
729            .await
730        {
731            warn!(error = %e, "Failed to sync update to secondaries");
732        }
733
734        Ok(stored)
735    }
736
737    #[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
738    async fn delete(
739        &self,
740        tenant: &TenantContext,
741        resource_type: &str,
742        id: &str,
743    ) -> StorageResult<()> {
744        let result = self.primary.delete(tenant, resource_type, id).await;
745
746        let primary_id = self.config.primary_id().unwrap_or("primary");
747        self.update_health(
748            primary_id,
749            result.is_ok(),
750            result.as_ref().err().map(|e| e.to_string()),
751        );
752
753        result?;
754
755        // Sync to secondaries
756        if let Err(e) = self
757            .sync_to_secondaries(SyncEvent::Delete {
758                resource_type: resource_type.to_string(),
759                resource_id: id.to_string(),
760                tenant_id: tenant.tenant_id().clone(),
761            })
762            .await
763        {
764            warn!(error = %e, "Failed to sync delete to secondaries");
765        }
766
767        Ok(())
768    }
769
770    async fn count(
771        &self,
772        tenant: &TenantContext,
773        resource_type: Option<&str>,
774    ) -> StorageResult<u64> {
775        self.primary.count(tenant, resource_type).await
776    }
777}
778
779#[async_trait]
780impl SearchProvider for CompositeStorage {
781    #[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
782    async fn search(
783        &self,
784        tenant: &TenantContext,
785        query: &SearchQuery,
786    ) -> StorageResult<SearchResult> {
787        self.execute_routed_search(tenant, query).await
788    }
789
790    async fn search_count(
791        &self,
792        tenant: &TenantContext,
793        query: &SearchQuery,
794    ) -> StorageResult<u64> {
795        // For count, we can just use primary
796        // A more sophisticated implementation might route based on features
797        if let Some(provider) = self
798            .search_providers
799            .get(self.config.primary_id().unwrap_or("primary"))
800        {
801            provider.search_count(tenant, query).await
802        } else {
803            Err(StorageError::Backend(BackendError::UnsupportedCapability {
804                backend_name: "composite".to_string(),
805                capability: "search_count".to_string(),
806            }))
807        }
808    }
809}
810
811#[async_trait]
812impl ConditionalStorage for CompositeStorage {
813    async fn conditional_create(
814        &self,
815        tenant: &TenantContext,
816        resource_type: &str,
817        resource: Value,
818        search_params: &str,
819        fhir_version: FhirVersion,
820    ) -> StorageResult<ConditionalCreateResult> {
821        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
822            StorageError::Backend(BackendError::UnsupportedCapability {
823                backend_name: "composite".to_string(),
824                capability: "ConditionalStorage".to_string(),
825            })
826        })?;
827
828        let result = storage
829            .conditional_create(tenant, resource_type, resource, search_params, fhir_version)
830            .await?;
831
832        // Sync created resource to secondaries
833        if let ConditionalCreateResult::Created(ref stored) = result {
834            if let Err(e) = self
835                .sync_to_secondaries(SyncEvent::Create {
836                    resource_type: resource_type.to_string(),
837                    resource_id: stored.id().to_string(),
838                    content: stored.content().clone(),
839                    tenant_id: tenant.tenant_id().clone(),
840                    fhir_version,
841                })
842                .await
843            {
844                warn!(error = %e, "Failed to sync conditional_create to secondaries");
845            }
846        }
847
848        Ok(result)
849    }
850
851    async fn conditional_update(
852        &self,
853        tenant: &TenantContext,
854        resource_type: &str,
855        resource: Value,
856        search_params: &str,
857        upsert: bool,
858        fhir_version: FhirVersion,
859    ) -> StorageResult<ConditionalUpdateResult> {
860        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
861            StorageError::Backend(BackendError::UnsupportedCapability {
862                backend_name: "composite".to_string(),
863                capability: "ConditionalStorage".to_string(),
864            })
865        })?;
866
867        let result = storage
868            .conditional_update(
869                tenant,
870                resource_type,
871                resource,
872                search_params,
873                upsert,
874                fhir_version,
875            )
876            .await?;
877
878        // Sync to secondaries
879        match &result {
880            ConditionalUpdateResult::Created(stored) => {
881                if let Err(e) = self
882                    .sync_to_secondaries(SyncEvent::Create {
883                        resource_type: resource_type.to_string(),
884                        resource_id: stored.id().to_string(),
885                        content: stored.content().clone(),
886                        tenant_id: tenant.tenant_id().clone(),
887                        fhir_version,
888                    })
889                    .await
890                {
891                    warn!(error = %e, "Failed to sync conditional_update create to secondaries");
892                }
893            }
894            ConditionalUpdateResult::Updated(stored) => {
895                if let Err(e) = self
896                    .sync_to_secondaries(SyncEvent::Update {
897                        resource_type: resource_type.to_string(),
898                        resource_id: stored.id().to_string(),
899                        content: stored.content().clone(),
900                        tenant_id: tenant.tenant_id().clone(),
901                        version: stored.version_id().to_string(),
902                        fhir_version: stored.fhir_version(),
903                    })
904                    .await
905                {
906                    warn!(error = %e, "Failed to sync conditional_update to secondaries");
907                }
908            }
909            _ => {}
910        }
911
912        Ok(result)
913    }
914
915    async fn conditional_delete(
916        &self,
917        tenant: &TenantContext,
918        resource_type: &str,
919        search_params: &str,
920    ) -> StorageResult<ConditionalDeleteResult> {
921        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
922            StorageError::Backend(BackendError::UnsupportedCapability {
923                backend_name: "composite".to_string(),
924                capability: "ConditionalStorage".to_string(),
925            })
926        })?;
927
928        let result = storage
929            .conditional_delete(tenant, resource_type, search_params)
930            .await?;
931
932        // Note: We don't have the resource ID for sync here — the primary already
933        // performed the delete. The sync_manager will handle it if configured.
934
935        Ok(result)
936    }
937
938    async fn conditional_patch(
939        &self,
940        tenant: &TenantContext,
941        resource_type: &str,
942        search_params: &str,
943        patch: &PatchFormat,
944    ) -> StorageResult<ConditionalPatchResult> {
945        let storage = self.conditional_storage.as_ref().ok_or_else(|| {
946            StorageError::Backend(BackendError::UnsupportedCapability {
947                backend_name: "composite".to_string(),
948                capability: "ConditionalStorage".to_string(),
949            })
950        })?;
951
952        let result = storage
953            .conditional_patch(tenant, resource_type, search_params, patch)
954            .await?;
955
956        // Sync patched resource to secondaries
957        if let ConditionalPatchResult::Patched(ref stored) = result {
958            if let Err(e) = self
959                .sync_to_secondaries(SyncEvent::Update {
960                    resource_type: resource_type.to_string(),
961                    resource_id: stored.id().to_string(),
962                    content: stored.content().clone(),
963                    tenant_id: tenant.tenant_id().clone(),
964                    version: stored.version_id().to_string(),
965                    fhir_version: stored.fhir_version(),
966                })
967                .await
968            {
969                warn!(error = %e, "Failed to sync conditional_patch to secondaries");
970            }
971        }
972
973        Ok(result)
974    }
975}
976
977#[async_trait]
978impl VersionedStorage for CompositeStorage {
979    async fn vread(
980        &self,
981        tenant: &TenantContext,
982        resource_type: &str,
983        id: &str,
984        version_id: &str,
985    ) -> StorageResult<Option<StoredResource>> {
986        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
987            StorageError::Backend(BackendError::UnsupportedCapability {
988                backend_name: "composite".to_string(),
989                capability: "VersionedStorage".to_string(),
990            })
991        })?;
992
993        storage.vread(tenant, resource_type, id, version_id).await
994    }
995
996    async fn update_with_match(
997        &self,
998        tenant: &TenantContext,
999        resource_type: &str,
1000        id: &str,
1001        expected_version: &str,
1002        resource: Value,
1003    ) -> StorageResult<StoredResource> {
1004        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1005            StorageError::Backend(BackendError::UnsupportedCapability {
1006                backend_name: "composite".to_string(),
1007                capability: "VersionedStorage".to_string(),
1008            })
1009        })?;
1010
1011        let stored = storage
1012            .update_with_match(tenant, resource_type, id, expected_version, resource)
1013            .await?;
1014
1015        // Sync to secondaries
1016        if let Err(e) = self
1017            .sync_to_secondaries(SyncEvent::Update {
1018                resource_type: resource_type.to_string(),
1019                resource_id: id.to_string(),
1020                content: stored.content().clone(),
1021                tenant_id: tenant.tenant_id().clone(),
1022                version: stored.version_id().to_string(),
1023                fhir_version: stored.fhir_version(),
1024            })
1025            .await
1026        {
1027            warn!(error = %e, "Failed to sync update_with_match to secondaries");
1028        }
1029
1030        Ok(stored)
1031    }
1032
1033    async fn delete_with_match(
1034        &self,
1035        tenant: &TenantContext,
1036        resource_type: &str,
1037        id: &str,
1038        expected_version: &str,
1039    ) -> StorageResult<()> {
1040        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1041            StorageError::Backend(BackendError::UnsupportedCapability {
1042                backend_name: "composite".to_string(),
1043                capability: "VersionedStorage".to_string(),
1044            })
1045        })?;
1046
1047        storage
1048            .delete_with_match(tenant, resource_type, id, expected_version)
1049            .await?;
1050
1051        // Sync to secondaries
1052        if let Err(e) = self
1053            .sync_to_secondaries(SyncEvent::Delete {
1054                resource_type: resource_type.to_string(),
1055                resource_id: id.to_string(),
1056                tenant_id: tenant.tenant_id().clone(),
1057            })
1058            .await
1059        {
1060            warn!(error = %e, "Failed to sync delete_with_match to secondaries");
1061        }
1062
1063        Ok(())
1064    }
1065
1066    async fn list_versions(
1067        &self,
1068        tenant: &TenantContext,
1069        resource_type: &str,
1070        id: &str,
1071    ) -> StorageResult<Vec<String>> {
1072        let storage = self.versioned_storage.as_ref().ok_or_else(|| {
1073            StorageError::Backend(BackendError::UnsupportedCapability {
1074                backend_name: "composite".to_string(),
1075                capability: "VersionedStorage".to_string(),
1076            })
1077        })?;
1078
1079        storage.list_versions(tenant, resource_type, id).await
1080    }
1081}
1082
1083#[async_trait]
1084impl InstanceHistoryProvider for CompositeStorage {
1085    async fn history_instance(
1086        &self,
1087        tenant: &TenantContext,
1088        resource_type: &str,
1089        id: &str,
1090        params: &HistoryParams,
1091    ) -> StorageResult<crate::core::HistoryPage> {
1092        let provider = self.history_provider.as_ref().ok_or_else(|| {
1093            StorageError::Backend(BackendError::UnsupportedCapability {
1094                backend_name: "composite".to_string(),
1095                capability: "InstanceHistoryProvider".to_string(),
1096            })
1097        })?;
1098
1099        provider
1100            .history_instance(tenant, resource_type, id, params)
1101            .await
1102    }
1103
1104    async fn history_instance_count(
1105        &self,
1106        tenant: &TenantContext,
1107        resource_type: &str,
1108        id: &str,
1109    ) -> StorageResult<u64> {
1110        let provider = self.history_provider.as_ref().ok_or_else(|| {
1111            StorageError::Backend(BackendError::UnsupportedCapability {
1112                backend_name: "composite".to_string(),
1113                capability: "InstanceHistoryProvider".to_string(),
1114            })
1115        })?;
1116
1117        provider
1118            .history_instance_count(tenant, resource_type, id)
1119            .await
1120    }
1121}
1122
1123#[async_trait]
1124impl BundleProvider for CompositeStorage {
1125    async fn process_transaction(
1126        &self,
1127        tenant: &TenantContext,
1128        entries: Vec<BundleEntry>,
1129    ) -> Result<BundleResult, TransactionError> {
1130        let provider =
1131            self.bundle_provider
1132                .as_ref()
1133                .ok_or_else(|| TransactionError::BundleError {
1134                    index: 0,
1135                    message: "BundleProvider not available on composite primary".to_string(),
1136                })?;
1137
1138        let result = provider.process_transaction(tenant, entries).await?;
1139
1140        // Sync successful entries to secondaries by reading resources from primary
1141        self.sync_bundle_results(tenant, &result).await;
1142
1143        Ok(result)
1144    }
1145
1146    async fn process_batch(
1147        &self,
1148        tenant: &TenantContext,
1149        entries: Vec<BundleEntry>,
1150    ) -> StorageResult<BundleResult> {
1151        let provider = self.bundle_provider.as_ref().ok_or_else(|| {
1152            StorageError::Backend(BackendError::UnsupportedCapability {
1153                backend_name: "composite".to_string(),
1154                capability: "BundleProvider".to_string(),
1155            })
1156        })?;
1157
1158        let result = provider.process_batch(tenant, entries).await?;
1159
1160        // Sync successful entries to secondaries
1161        self.sync_bundle_results(tenant, &result).await;
1162
1163        Ok(result)
1164    }
1165}
1166
1167#[async_trait]
1168impl IncludeProvider for CompositeStorage {
1169    async fn resolve_includes(
1170        &self,
1171        tenant: &TenantContext,
1172        resources: &[StoredResource],
1173        includes: &[IncludeDirective],
1174    ) -> StorageResult<Vec<StoredResource>> {
1175        // Include resolution always uses primary (has all resources)
1176        let primary_id = self.config.primary_id().unwrap_or("primary");
1177
1178        if let Some(_provider) = self.search_providers.get(primary_id) {
1179            // Try to downcast to IncludeProvider
1180            // This is a limitation - we need trait objects
1181            // For now, fall back to a basic implementation
1182            self.resolve_includes_basic(tenant, resources, includes)
1183                .await
1184        } else {
1185            self.resolve_includes_basic(tenant, resources, includes)
1186                .await
1187        }
1188    }
1189}
1190
1191impl CompositeStorage {
1192    /// Basic include resolution by reading referenced resources.
1193    async fn resolve_includes_basic(
1194        &self,
1195        tenant: &TenantContext,
1196        resources: &[StoredResource],
1197        includes: &[IncludeDirective],
1198    ) -> StorageResult<Vec<StoredResource>> {
1199        use std::collections::HashSet;
1200
1201        let mut included = Vec::new();
1202        let mut seen_ids = HashSet::new();
1203
1204        for resource in resources {
1205            for include in includes {
1206                // Extract references from resource based on search param
1207                let refs = self.extract_references(resource, &include.search_param);
1208
1209                for reference in refs {
1210                    // Parse reference: "ResourceType/id"
1211                    if let Some((ref_type, ref_id)) = reference.split_once('/') {
1212                        // Check target type filter
1213                        if let Some(ref target) = include.target_type {
1214                            if target != ref_type {
1215                                continue;
1216                            }
1217                        }
1218
1219                        let key = format!("{}/{}", ref_type, ref_id);
1220                        if seen_ids.insert(key) {
1221                            if let Ok(Some(included_resource)) =
1222                                self.primary.read(tenant, ref_type, ref_id).await
1223                            {
1224                                included.push(included_resource);
1225                            }
1226                        }
1227                    }
1228                }
1229            }
1230        }
1231
1232        Ok(included)
1233    }
1234
1235    /// Extracts references from a resource for a given search parameter.
1236    fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec<String> {
1237        let content = resource.content();
1238        let mut refs = Vec::new();
1239
1240        // Simple extraction - looks for the search param as a field
1241        // A real implementation would use FHIRPath or search parameter definitions
1242        if let Some(value) = content.get(search_param) {
1243            Self::extract_reference_values(value, &mut refs);
1244        }
1245
1246        // Also check common reference field names
1247        let field_name = match search_param {
1248            "patient" | "subject" => Some("subject"),
1249            "encounter" => Some("encounter"),
1250            "performer" => Some("performer"),
1251            _ => None,
1252        };
1253
1254        if let Some(field) = field_name {
1255            if let Some(value) = content.get(field) {
1256                Self::extract_reference_values(value, &mut refs);
1257            }
1258        }
1259
1260        refs
1261    }
1262
1263    /// Recursively extracts reference values.
1264    fn extract_reference_values(value: &Value, refs: &mut Vec<String>) {
1265        match value {
1266            Value::Object(obj) => {
1267                if let Some(Value::String(reference)) = obj.get("reference") {
1268                    refs.push(reference.clone());
1269                }
1270            }
1271            Value::Array(arr) => {
1272                for item in arr {
1273                    Self::extract_reference_values(item, refs);
1274                }
1275            }
1276            _ => {}
1277        }
1278    }
1279}
1280
1281#[async_trait]
1282impl RevincludeProvider for CompositeStorage {
1283    async fn resolve_revincludes(
1284        &self,
1285        tenant: &TenantContext,
1286        resources: &[StoredResource],
1287        revincludes: &[IncludeDirective],
1288    ) -> StorageResult<Vec<StoredResource>> {
1289        // Revinclude resolution - find resources that reference the primary results
1290        // This typically requires search capability
1291        let mut revincluded = Vec::new();
1292
1293        for revinclude in revincludes {
1294            for resource in resources {
1295                let reference = format!("{}/{}", resource.resource_type(), resource.id());
1296
1297                // Search for resources that reference this one
1298                let query = SearchQuery::new(&revinclude.source_type).with_parameter(
1299                    crate::types::SearchParameter {
1300                        name: revinclude.search_param.clone(),
1301                        param_type: crate::types::SearchParamType::Reference,
1302                        modifier: None,
1303                        values: vec![crate::types::SearchValue::eq(&reference)],
1304                        chain: vec![],
1305                        components: vec![],
1306                    },
1307                );
1308
1309                if let Ok(result) = self.search(tenant, &query).await {
1310                    for item in result.resources.items {
1311                        revincluded.push(item);
1312                    }
1313                }
1314            }
1315        }
1316
1317        // Deduplicate
1318        let mut seen = std::collections::HashSet::new();
1319        revincluded.retain(|r| seen.insert(format!("{}/{}", r.resource_type(), r.id())));
1320
1321        Ok(revincluded)
1322    }
1323}
1324
1325#[async_trait]
1326impl ChainedSearchProvider for CompositeStorage {
1327    async fn resolve_chain(
1328        &self,
1329        tenant: &TenantContext,
1330        base_type: &str,
1331        chain: &str,
1332        value: &str,
1333    ) -> StorageResult<Vec<String>> {
1334        // Chain resolution - delegate to graph backend if available
1335        let graph_backend = self
1336            .config
1337            .backends_with_role(super::config::BackendRole::Graph)
1338            .next();
1339
1340        if let Some(backend) = graph_backend {
1341            if let Some(_provider) = self.search_providers.get(&backend.id) {
1342                // Would need to downcast to ChainedSearchProvider
1343                // For now, fall back to iterative resolution
1344            }
1345        }
1346
1347        // Fallback: iterative chain resolution
1348        self.resolve_chain_iterative(tenant, base_type, chain, value)
1349            .await
1350    }
1351
1352    async fn resolve_reverse_chain(
1353        &self,
1354        tenant: &TenantContext,
1355        base_type: &str,
1356        reverse_chain: &ReverseChainedParameter,
1357    ) -> StorageResult<Vec<String>> {
1358        // Find resources of source_type that match the parameter,
1359        // then return IDs of base_type resources they reference
1360        let values = match &reverse_chain.value {
1361            Some(v) => vec![v.clone()],
1362            None => vec![],
1363        };
1364        let query = SearchQuery::new(&reverse_chain.source_type).with_parameter(
1365            crate::types::SearchParameter {
1366                name: reverse_chain.search_param.clone(),
1367                param_type: crate::types::SearchParamType::Token,
1368                modifier: None,
1369                values,
1370                chain: vec![],
1371                components: vec![],
1372            },
1373        );
1374
1375        let result = self.search(tenant, &query).await?;
1376
1377        // Extract references to base_type
1378        let mut ids = Vec::new();
1379        for resource in result.resources.items {
1380            let refs = self.extract_references(&resource, &reverse_chain.reference_param);
1381            for reference in refs {
1382                if let Some((ref_type, ref_id)) = reference.split_once('/') {
1383                    if ref_type == base_type {
1384                        ids.push(ref_id.to_string());
1385                    }
1386                }
1387            }
1388        }
1389
1390        Ok(ids)
1391    }
1392}
1393
1394impl CompositeStorage {
1395    /// Resolves a chain iteratively.
1396    async fn resolve_chain_iterative(
1397        &self,
1398        _tenant: &TenantContext,
1399        _base_type: &str,
1400        chain: &str,
1401        _value: &str,
1402    ) -> StorageResult<Vec<String>> {
1403        // Parse chain: "patient.organization.name" -> ["patient", "organization", "name"]
1404        let parts: Vec<&str> = chain.split('.').collect();
1405
1406        if parts.is_empty() {
1407            return Ok(Vec::new());
1408        }
1409
1410        // This is a simplified implementation
1411        // A full implementation would handle multiple chain segments
1412        // and different parameter types
1413
1414        // For now, just return empty - this would need FHIRPath evaluation
1415        Ok(Vec::new())
1416    }
1417}
1418
1419#[async_trait]
1420impl TerminologySearchProvider for CompositeStorage {
1421    async fn expand_value_set(&self, _value_set_url: &str) -> StorageResult<Vec<(String, String)>> {
1422        // Delegate to terminology backend if available
1423        let term_backend = self
1424            .config
1425            .backends_with_role(super::config::BackendRole::Terminology)
1426            .next();
1427
1428        if let Some(_backend) = term_backend {
1429            // Would need to downcast to TerminologySearchProvider
1430        }
1431
1432        // Fallback: not supported without terminology service
1433        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1434            backend_name: "composite".to_string(),
1435            capability: "expand_value_set".to_string(),
1436        }))
1437    }
1438
1439    async fn codes_above(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1440        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1441            backend_name: "composite".to_string(),
1442            capability: "codes_above".to_string(),
1443        }))
1444    }
1445
1446    async fn codes_below(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
1447        Err(StorageError::Backend(BackendError::UnsupportedCapability {
1448            backend_name: "composite".to_string(),
1449            capability: "codes_below".to_string(),
1450        }))
1451    }
1452}
1453
1454#[async_trait]
1455impl TextSearchProvider for CompositeStorage {
1456    async fn search_text(
1457        &self,
1458        tenant: &TenantContext,
1459        resource_type: &str,
1460        text: &str,
1461        pagination: &Pagination,
1462    ) -> StorageResult<SearchResult> {
1463        // Delegate to search backend if available
1464        let search_backend = self
1465            .config
1466            .backends_with_role(super::config::BackendRole::Search)
1467            .next();
1468
1469        if let Some(backend) = search_backend {
1470            if let Some(provider) = self.search_providers.get(&backend.id) {
1471                // Build a text search query
1472                let query = SearchQuery::new(resource_type)
1473                    .with_parameter(crate::types::SearchParameter {
1474                        name: "_text".to_string(),
1475                        param_type: crate::types::SearchParamType::String,
1476                        modifier: None,
1477                        values: vec![crate::types::SearchValue::string(text)],
1478                        chain: vec![],
1479                        components: vec![],
1480                    })
1481                    .with_count(pagination.count);
1482
1483                return provider.search(tenant, &query).await;
1484            }
1485        }
1486
1487        // Fallback to primary (may be less efficient)
1488        self.execute_primary_search(
1489            tenant,
1490            &SearchQuery::new(resource_type)
1491                .with_parameter(crate::types::SearchParameter {
1492                    name: "_text".to_string(),
1493                    param_type: crate::types::SearchParamType::String,
1494                    modifier: None,
1495                    values: vec![crate::types::SearchValue::string(text)],
1496                    chain: vec![],
1497                    components: vec![],
1498                })
1499                .with_count(pagination.count),
1500        )
1501        .await
1502    }
1503
1504    async fn search_content(
1505        &self,
1506        tenant: &TenantContext,
1507        resource_type: &str,
1508        content: &str,
1509        pagination: &Pagination,
1510    ) -> StorageResult<SearchResult> {
1511        // Similar to search_text but uses _content parameter
1512        let search_backend = self
1513            .config
1514            .backends_with_role(super::config::BackendRole::Search)
1515            .next();
1516
1517        if let Some(backend) = search_backend {
1518            if let Some(provider) = self.search_providers.get(&backend.id) {
1519                let query = SearchQuery::new(resource_type)
1520                    .with_parameter(crate::types::SearchParameter {
1521                        name: "_content".to_string(),
1522                        param_type: crate::types::SearchParamType::String,
1523                        modifier: None,
1524                        values: vec![crate::types::SearchValue::string(content)],
1525                        chain: vec![],
1526                        components: vec![],
1527                    })
1528                    .with_count(pagination.count);
1529
1530                return provider.search(tenant, &query).await;
1531            }
1532        }
1533
1534        self.execute_primary_search(
1535            tenant,
1536            &SearchQuery::new(resource_type)
1537                .with_parameter(crate::types::SearchParameter {
1538                    name: "_content".to_string(),
1539                    param_type: crate::types::SearchParamType::String,
1540                    modifier: None,
1541                    values: vec![crate::types::SearchValue::string(content)],
1542                    chain: vec![],
1543                    components: vec![],
1544                })
1545                .with_count(pagination.count),
1546        )
1547        .await
1548    }
1549}
1550
1551impl CapabilityProvider for CompositeStorage {
1552    fn capabilities(&self) -> StorageCapabilities {
1553        use std::collections::HashSet;
1554
1555        // Merge capabilities from all backends
1556        let resource_caps = HashMap::new();
1557
1558        let mut system_interactions = HashSet::new();
1559        system_interactions.insert(crate::core::SystemInteraction::Transaction);
1560        system_interactions.insert(crate::core::SystemInteraction::Batch);
1561        system_interactions.insert(crate::core::SystemInteraction::SearchSystem);
1562        system_interactions.insert(crate::core::SystemInteraction::HistorySystem);
1563
1564        StorageCapabilities {
1565            backend_name: "composite".to_string(),
1566            backend_version: None,
1567            resources: resource_caps,
1568            system_interactions,
1569            supports_system_history: true,
1570            supports_system_search: true,
1571            supported_sorts: vec!["_lastUpdated".to_string(), "_id".to_string()],
1572            supports_total: true,
1573            max_page_size: Some(1000),
1574            default_page_size: 20,
1575        }
1576    }
1577
1578    // resource_capabilities uses the default implementation that returns Option<ResourceCapabilities>
1579}
1580
1581#[cfg(test)]
1582mod tests {
1583    use super::*;
1584    use crate::core::BackendKind;
1585
1586    fn test_config() -> CompositeConfig {
1587        CompositeConfig::builder()
1588            .primary("sqlite", BackendKind::Sqlite)
1589            .search_backend("es", BackendKind::Elasticsearch)
1590            .build()
1591            .unwrap()
1592    }
1593
1594    #[test]
1595    fn test_backend_health_default() {
1596        let health = BackendHealth::default();
1597        assert!(health.healthy);
1598        assert_eq!(health.failure_count, 0);
1599        assert!(health.last_error.is_none());
1600    }
1601
1602    #[test]
1603    fn test_composite_config() {
1604        let config = test_config();
1605        assert_eq!(config.primary_id(), Some("sqlite"));
1606        assert_eq!(config.secondaries().count(), 1);
1607    }
1608}