use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use helios_fhir::FhirVersion;
use parking_lot::RwLock;
use serde_json::Value;
use tracing::{debug, instrument, warn};
use crate::core::history::HistoryParams;
use crate::core::{
BundleEntry, BundleProvider, BundleResult, CapabilityProvider, ChainedSearchProvider,
ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
ConditionalUpdateResult, IncludeProvider, InstanceHistoryProvider, PatchFormat,
ResourceStorage, RevincludeProvider, SearchProvider, SearchResult, StorageCapabilities,
TerminologySearchProvider, TextSearchProvider, VersionedStorage,
};
use crate::error::{BackendError, StorageError, StorageResult, TransactionError};
use crate::tenant::TenantContext;
use crate::types::{
IncludeDirective, Pagination, ReverseChainedParameter, SearchQuery, StoredResource,
};
use super::config::CompositeConfig;
use super::merger::{MergeOptions, ResultMerger};
use super::router::{QueryRouter, RoutingDecision, RoutingError};
use super::sync::{SyncEvent, SyncManager};
pub type DynStorage = Arc<dyn ResourceStorage + Send + Sync>;
pub type DynSearchProvider = Arc<dyn SearchProvider + Send + Sync>;
pub type DynConditionalStorage = Arc<dyn ConditionalStorage + Send + Sync>;
pub type DynVersionedStorage = Arc<dyn VersionedStorage + Send + Sync>;
pub type DynInstanceHistoryProvider = Arc<dyn InstanceHistoryProvider + Send + Sync>;
pub type DynBundleProvider = Arc<dyn BundleProvider + Send + Sync>;
pub struct CompositeStorage {
config: CompositeConfig,
primary: DynStorage,
secondaries: HashMap<String, DynStorage>,
search_providers: HashMap<String, DynSearchProvider>,
router: QueryRouter,
merger: ResultMerger,
sync_manager: Option<SyncManager>,
health_status: Arc<RwLock<HashMap<String, BackendHealth>>>,
conditional_storage: Option<DynConditionalStorage>,
versioned_storage: Option<DynVersionedStorage>,
history_provider: Option<DynInstanceHistoryProvider>,
bundle_provider: Option<DynBundleProvider>,
}
#[derive(Debug, Clone)]
pub struct BackendHealth {
pub healthy: bool,
pub last_success: Option<std::time::Instant>,
pub failure_count: u32,
pub last_error: Option<String>,
}
impl Default for BackendHealth {
fn default() -> Self {
Self {
healthy: true,
last_success: None,
failure_count: 0,
last_error: None,
}
}
}
impl CompositeStorage {
pub fn new(
config: CompositeConfig,
backends: HashMap<String, DynStorage>,
) -> StorageResult<Self> {
let primary_id = config.primary_id().ok_or_else(|| {
StorageError::Backend(BackendError::Unavailable {
backend_name: "primary".to_string(),
message: "No primary backend configured".to_string(),
})
})?;
let primary = backends.get(primary_id).cloned().ok_or_else(|| {
StorageError::Backend(BackendError::Unavailable {
backend_name: primary_id.to_string(),
message: format!("Primary backend '{}' not found in backends map", primary_id),
})
})?;
let secondaries: HashMap<_, _> = backends
.iter()
.filter(|(id, _)| *id != primary_id)
.map(|(id, backend)| (id.clone(), backend.clone()))
.collect();
let mut health_status = HashMap::new();
health_status.insert(primary_id.to_string(), BackendHealth::default());
for id in secondaries.keys() {
health_status.insert(id.clone(), BackendHealth::default());
}
let router = QueryRouter::new(config.clone());
let merger = ResultMerger::new();
let sync_manager = if !secondaries.is_empty() {
Some(SyncManager::new(config.sync_config.clone()))
} else {
None
};
Ok(Self {
config,
primary,
secondaries,
search_providers: HashMap::new(),
router,
merger,
sync_manager,
health_status: Arc::new(RwLock::new(health_status)),
conditional_storage: None,
versioned_storage: None,
history_provider: None,
bundle_provider: None,
})
}
pub fn with_search_providers(mut self, providers: HashMap<String, DynSearchProvider>) -> Self {
self.search_providers = providers;
self
}
pub fn with_full_primary<T>(mut self, primary: Arc<T>) -> Self
where
T: ResourceStorage
+ ConditionalStorage
+ VersionedStorage
+ InstanceHistoryProvider
+ BundleProvider
+ Send
+ Sync
+ 'static,
{
self.conditional_storage = Some(primary.clone() as DynConditionalStorage);
self.versioned_storage = Some(primary.clone() as DynVersionedStorage);
self.history_provider = Some(primary.clone() as DynInstanceHistoryProvider);
self.bundle_provider = Some(primary as DynBundleProvider);
self
}
pub fn config(&self) -> &CompositeConfig {
&self.config
}
pub fn primary(&self) -> &DynStorage {
&self.primary
}
pub fn secondary(&self, id: &str) -> Option<&DynStorage> {
self.secondaries.get(id)
}
pub fn secondaries(&self) -> &HashMap<String, DynStorage> {
&self.secondaries
}
pub fn backend_health(&self, id: &str) -> Option<BackendHealth> {
self.health_status.read().get(id).cloned()
}
pub fn is_backend_healthy(&self, id: &str) -> bool {
self.health_status
.read()
.get(id)
.map(|h| h.healthy)
.unwrap_or(false)
}
fn update_health(&self, backend_id: &str, success: bool, error: Option<String>) {
let mut status = self.health_status.write();
if let Some(health) = status.get_mut(backend_id) {
if success {
health.healthy = true;
health.last_success = Some(std::time::Instant::now());
health.failure_count = 0;
health.last_error = None;
} else {
health.failure_count += 1;
health.last_error = error;
if health.failure_count >= self.config.health_config.failure_threshold {
health.healthy = false;
warn!(
backend_id = backend_id,
failures = health.failure_count,
"Backend marked unhealthy"
);
}
}
}
}
async fn sync_to_secondaries(&self, event: SyncEvent) -> StorageResult<()> {
if let Some(ref sync_manager) = self.sync_manager {
sync_manager.sync(&event, &self.secondaries).await?;
}
Ok(())
}
#[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
async fn execute_routed_search(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<SearchResult> {
let decision = self
.router
.route(query)
.map_err(|e| self.routing_error_to_storage_error(e))?;
debug!(
primary = %decision.primary_target,
auxiliary_count = decision.auxiliary_targets.len(),
merge_strategy = ?decision.merge_strategy,
"Routing query"
);
if decision.auxiliary_targets.is_empty() {
return self.execute_primary_search(tenant, query).await;
}
let (primary_result, auxiliary_results) = self
.execute_parallel_search(tenant, query, &decision)
.await?;
let merge_options = MergeOptions {
strategy: decision.merge_strategy,
preserve_primary_order: true,
deduplicate: true,
};
self.merger
.merge(primary_result, auxiliary_results, merge_options)
}
async fn execute_primary_search(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<SearchResult> {
if let Some(search_backend) = self
.config
.backends_with_role(super::config::BackendRole::Search)
.next()
{
if let Some(provider) = self.search_providers.get(&search_backend.id) {
let result = provider.search(tenant, query).await;
self.update_health(
&search_backend.id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
return result;
}
}
let primary_id = self.config.primary_id().unwrap_or("primary");
if let Some(provider) = self.search_providers.get(primary_id) {
let result = provider.search(tenant, query).await;
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
result
} else {
Err(StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: primary_id.to_string(),
capability: "SearchProvider".to_string(),
}))
}
}
async fn execute_parallel_search(
&self,
tenant: &TenantContext,
query: &SearchQuery,
decision: &RoutingDecision,
) -> StorageResult<(SearchResult, Vec<(String, SearchResult)>)> {
use tokio::task::JoinSet;
let mut tasks: JoinSet<(String, StorageResult<SearchResult>)> = JoinSet::new();
let tenant = tenant.clone();
let query = query.clone();
let primary_id = decision.primary_target.clone();
if let Some(provider) = self.search_providers.get(&primary_id).cloned() {
let t = tenant.clone();
let q = query.clone();
let id = primary_id.clone();
tasks.spawn(async move {
let result = provider.search(&t, &q).await;
(id, result)
});
}
for (feature, backend_id) in &decision.auxiliary_targets {
if let Some(provider) = self.search_providers.get(backend_id).cloned() {
let part_params = decision
.analysis
.feature_params
.get(feature)
.cloned()
.unwrap_or_default();
let mut aux_query = SearchQuery::new(&query.resource_type);
for param in part_params {
aux_query = aux_query.with_parameter(param);
}
aux_query.count = query.count;
aux_query.offset = query.offset;
aux_query.cursor = query.cursor.clone();
let t = tenant.clone();
let id = backend_id.clone();
tasks.spawn(async move {
let result = provider.search(&t, &aux_query).await;
(id, result)
});
}
}
let mut primary_result = None;
let mut auxiliary_results = Vec::new();
while let Some(result) = tasks.join_next().await {
match result {
Ok((id, search_result)) => {
self.update_health(
&id,
search_result.is_ok(),
search_result.as_ref().err().map(|e| e.to_string()),
);
if id == primary_id {
primary_result = Some(search_result?);
} else if let Ok(res) = search_result {
auxiliary_results.push((id, res));
}
}
Err(e) => {
warn!(error = %e, "Task join error during parallel search");
}
}
}
let primary = primary_result.ok_or_else(|| {
StorageError::Backend(BackendError::ConnectionFailed {
backend_name: primary_id,
message: "Primary search task failed".to_string(),
})
})?;
Ok((primary, auxiliary_results))
}
async fn sync_bundle_results(&self, tenant: &TenantContext, result: &BundleResult) {
for entry_result in &result.entries {
if let Some(ref resource_json) = entry_result.resource {
let resource_type = resource_json
.get("resourceType")
.and_then(|v| v.as_str())
.unwrap_or_default();
let resource_id = resource_json
.get("id")
.and_then(|v| v.as_str())
.unwrap_or_default();
if resource_type.is_empty() || resource_id.is_empty() {
continue;
}
let fhir_version = resource_json
.get("meta")
.and_then(|m| m.get("profile"))
.map(|_| FhirVersion::default())
.unwrap_or_default();
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Create {
resource_type: resource_type.to_string(),
resource_id: resource_id.to_string(),
content: resource_json.clone(),
tenant_id: tenant.tenant_id().clone(),
fhir_version,
})
.await
{
warn!(
error = %e,
resource_type = resource_type,
resource_id = resource_id,
"Failed to sync bundle entry to secondaries"
);
}
}
}
}
fn routing_error_to_storage_error(&self, err: RoutingError) -> StorageError {
match err {
RoutingError::NoPrimaryBackend => StorageError::Backend(BackendError::Unavailable {
backend_name: "primary".to_string(),
message: "No primary backend configured".to_string(),
}),
RoutingError::NoCapableBackend { feature } => {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: format!("{:?}", feature),
})
}
RoutingError::BackendUnavailable { backend_id } => {
StorageError::Backend(BackendError::ConnectionFailed {
backend_name: backend_id,
message: "Backend unavailable".to_string(),
})
}
}
}
}
#[async_trait]
impl ResourceStorage for CompositeStorage {
fn backend_name(&self) -> &'static str {
"composite"
}
#[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type))]
async fn create(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
fhir_version: FhirVersion,
) -> StorageResult<StoredResource> {
let result = self
.primary
.create(tenant, resource_type, resource.clone(), fhir_version)
.await;
let primary_id = self.config.primary_id().unwrap_or("primary");
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
let stored = result?;
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Create {
resource_type: resource_type.to_string(),
resource_id: stored.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
fhir_version,
})
.await
{
warn!(error = %e, "Failed to sync create to secondaries");
}
Ok(stored)
}
#[instrument(skip(self, tenant, resource), fields(resource_type = %resource_type, id = %id))]
async fn create_or_update(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
resource: Value,
fhir_version: FhirVersion,
) -> StorageResult<(StoredResource, bool)> {
let result = self
.primary
.create_or_update(tenant, resource_type, id, resource.clone(), fhir_version)
.await;
let primary_id = self.config.primary_id().unwrap_or("primary");
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
let (stored, created) = result?;
let event = if created {
SyncEvent::Create {
resource_type: resource_type.to_string(),
resource_id: id.to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
fhir_version,
}
} else {
SyncEvent::Update {
resource_type: resource_type.to_string(),
resource_id: id.to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
version: stored.version_id().to_string(),
fhir_version,
}
};
if let Err(e) = self.sync_to_secondaries(event).await {
warn!(error = %e, "Failed to sync create_or_update to secondaries");
}
Ok((stored, created))
}
#[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
async fn read(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<Option<StoredResource>> {
let result = self.primary.read(tenant, resource_type, id).await;
let primary_id = self.config.primary_id().unwrap_or("primary");
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
result
}
#[instrument(skip(self, tenant, resource), fields(resource_type = %current.resource_type(), id = %current.id()))]
async fn update(
&self,
tenant: &TenantContext,
current: &StoredResource,
resource: Value,
) -> StorageResult<StoredResource> {
let result = self.primary.update(tenant, current, resource.clone()).await;
let primary_id = self.config.primary_id().unwrap_or("primary");
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
let stored = result?;
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Update {
resource_type: current.resource_type().to_string(),
resource_id: current.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
version: stored.version_id().to_string(),
fhir_version: stored.fhir_version(),
})
.await
{
warn!(error = %e, "Failed to sync update to secondaries");
}
Ok(stored)
}
#[instrument(skip(self, tenant), fields(resource_type = %resource_type, id = %id))]
async fn delete(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<()> {
let result = self.primary.delete(tenant, resource_type, id).await;
let primary_id = self.config.primary_id().unwrap_or("primary");
self.update_health(
primary_id,
result.is_ok(),
result.as_ref().err().map(|e| e.to_string()),
);
result?;
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Delete {
resource_type: resource_type.to_string(),
resource_id: id.to_string(),
tenant_id: tenant.tenant_id().clone(),
})
.await
{
warn!(error = %e, "Failed to sync delete to secondaries");
}
Ok(())
}
async fn count(
&self,
tenant: &TenantContext,
resource_type: Option<&str>,
) -> StorageResult<u64> {
self.primary.count(tenant, resource_type).await
}
}
#[async_trait]
impl SearchProvider for CompositeStorage {
#[instrument(skip(self, tenant, query), fields(resource_type = %query.resource_type))]
async fn search(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<SearchResult> {
self.execute_routed_search(tenant, query).await
}
async fn search_count(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<u64> {
if let Some(provider) = self
.search_providers
.get(self.config.primary_id().unwrap_or("primary"))
{
provider.search_count(tenant, query).await
} else {
Err(StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "search_count".to_string(),
}))
}
}
}
#[async_trait]
impl ConditionalStorage for CompositeStorage {
async fn conditional_create(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
search_params: &str,
fhir_version: FhirVersion,
) -> StorageResult<ConditionalCreateResult> {
let storage = self.conditional_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "ConditionalStorage".to_string(),
})
})?;
let result = storage
.conditional_create(tenant, resource_type, resource, search_params, fhir_version)
.await?;
if let ConditionalCreateResult::Created(ref stored) = result {
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Create {
resource_type: resource_type.to_string(),
resource_id: stored.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
fhir_version,
})
.await
{
warn!(error = %e, "Failed to sync conditional_create to secondaries");
}
}
Ok(result)
}
async fn conditional_update(
&self,
tenant: &TenantContext,
resource_type: &str,
resource: Value,
search_params: &str,
upsert: bool,
fhir_version: FhirVersion,
) -> StorageResult<ConditionalUpdateResult> {
let storage = self.conditional_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "ConditionalStorage".to_string(),
})
})?;
let result = storage
.conditional_update(
tenant,
resource_type,
resource,
search_params,
upsert,
fhir_version,
)
.await?;
match &result {
ConditionalUpdateResult::Created(stored) => {
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Create {
resource_type: resource_type.to_string(),
resource_id: stored.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
fhir_version,
})
.await
{
warn!(error = %e, "Failed to sync conditional_update create to secondaries");
}
}
ConditionalUpdateResult::Updated(stored) => {
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Update {
resource_type: resource_type.to_string(),
resource_id: stored.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
version: stored.version_id().to_string(),
fhir_version: stored.fhir_version(),
})
.await
{
warn!(error = %e, "Failed to sync conditional_update to secondaries");
}
}
_ => {}
}
Ok(result)
}
async fn conditional_delete(
&self,
tenant: &TenantContext,
resource_type: &str,
search_params: &str,
) -> StorageResult<ConditionalDeleteResult> {
let storage = self.conditional_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "ConditionalStorage".to_string(),
})
})?;
let result = storage
.conditional_delete(tenant, resource_type, search_params)
.await?;
Ok(result)
}
async fn conditional_patch(
&self,
tenant: &TenantContext,
resource_type: &str,
search_params: &str,
patch: &PatchFormat,
) -> StorageResult<ConditionalPatchResult> {
let storage = self.conditional_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "ConditionalStorage".to_string(),
})
})?;
let result = storage
.conditional_patch(tenant, resource_type, search_params, patch)
.await?;
if let ConditionalPatchResult::Patched(ref stored) = result {
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Update {
resource_type: resource_type.to_string(),
resource_id: stored.id().to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
version: stored.version_id().to_string(),
fhir_version: stored.fhir_version(),
})
.await
{
warn!(error = %e, "Failed to sync conditional_patch to secondaries");
}
}
Ok(result)
}
}
#[async_trait]
impl VersionedStorage for CompositeStorage {
async fn vread(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
version_id: &str,
) -> StorageResult<Option<StoredResource>> {
let storage = self.versioned_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "VersionedStorage".to_string(),
})
})?;
storage.vread(tenant, resource_type, id, version_id).await
}
async fn update_with_match(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
expected_version: &str,
resource: Value,
) -> StorageResult<StoredResource> {
let storage = self.versioned_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "VersionedStorage".to_string(),
})
})?;
let stored = storage
.update_with_match(tenant, resource_type, id, expected_version, resource)
.await?;
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Update {
resource_type: resource_type.to_string(),
resource_id: id.to_string(),
content: stored.content().clone(),
tenant_id: tenant.tenant_id().clone(),
version: stored.version_id().to_string(),
fhir_version: stored.fhir_version(),
})
.await
{
warn!(error = %e, "Failed to sync update_with_match to secondaries");
}
Ok(stored)
}
async fn delete_with_match(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
expected_version: &str,
) -> StorageResult<()> {
let storage = self.versioned_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "VersionedStorage".to_string(),
})
})?;
storage
.delete_with_match(tenant, resource_type, id, expected_version)
.await?;
if let Err(e) = self
.sync_to_secondaries(SyncEvent::Delete {
resource_type: resource_type.to_string(),
resource_id: id.to_string(),
tenant_id: tenant.tenant_id().clone(),
})
.await
{
warn!(error = %e, "Failed to sync delete_with_match to secondaries");
}
Ok(())
}
async fn list_versions(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<Vec<String>> {
let storage = self.versioned_storage.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "VersionedStorage".to_string(),
})
})?;
storage.list_versions(tenant, resource_type, id).await
}
}
#[async_trait]
impl InstanceHistoryProvider for CompositeStorage {
async fn history_instance(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
params: &HistoryParams,
) -> StorageResult<crate::core::HistoryPage> {
let provider = self.history_provider.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "InstanceHistoryProvider".to_string(),
})
})?;
provider
.history_instance(tenant, resource_type, id, params)
.await
}
async fn history_instance_count(
&self,
tenant: &TenantContext,
resource_type: &str,
id: &str,
) -> StorageResult<u64> {
let provider = self.history_provider.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "InstanceHistoryProvider".to_string(),
})
})?;
provider
.history_instance_count(tenant, resource_type, id)
.await
}
}
#[async_trait]
impl BundleProvider for CompositeStorage {
async fn process_transaction(
&self,
tenant: &TenantContext,
entries: Vec<BundleEntry>,
) -> Result<BundleResult, TransactionError> {
let provider =
self.bundle_provider
.as_ref()
.ok_or_else(|| TransactionError::BundleError {
index: 0,
message: "BundleProvider not available on composite primary".to_string(),
})?;
let result = provider.process_transaction(tenant, entries).await?;
self.sync_bundle_results(tenant, &result).await;
Ok(result)
}
async fn process_batch(
&self,
tenant: &TenantContext,
entries: Vec<BundleEntry>,
) -> StorageResult<BundleResult> {
let provider = self.bundle_provider.as_ref().ok_or_else(|| {
StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "BundleProvider".to_string(),
})
})?;
let result = provider.process_batch(tenant, entries).await?;
self.sync_bundle_results(tenant, &result).await;
Ok(result)
}
}
#[async_trait]
impl IncludeProvider for CompositeStorage {
async fn resolve_includes(
&self,
tenant: &TenantContext,
resources: &[StoredResource],
includes: &[IncludeDirective],
) -> StorageResult<Vec<StoredResource>> {
let primary_id = self.config.primary_id().unwrap_or("primary");
if let Some(_provider) = self.search_providers.get(primary_id) {
self.resolve_includes_basic(tenant, resources, includes)
.await
} else {
self.resolve_includes_basic(tenant, resources, includes)
.await
}
}
}
impl CompositeStorage {
async fn resolve_includes_basic(
&self,
tenant: &TenantContext,
resources: &[StoredResource],
includes: &[IncludeDirective],
) -> StorageResult<Vec<StoredResource>> {
use std::collections::HashSet;
let mut included = Vec::new();
let mut seen_ids = HashSet::new();
for resource in resources {
for include in includes {
let refs = self.extract_references(resource, &include.search_param);
for reference in refs {
if let Some((ref_type, ref_id)) = reference.split_once('/') {
if let Some(ref target) = include.target_type {
if target != ref_type {
continue;
}
}
let key = format!("{}/{}", ref_type, ref_id);
if seen_ids.insert(key) {
if let Ok(Some(included_resource)) =
self.primary.read(tenant, ref_type, ref_id).await
{
included.push(included_resource);
}
}
}
}
}
}
Ok(included)
}
fn extract_references(&self, resource: &StoredResource, search_param: &str) -> Vec<String> {
let content = resource.content();
let mut refs = Vec::new();
if let Some(value) = content.get(search_param) {
Self::extract_reference_values(value, &mut refs);
}
let field_name = match search_param {
"patient" | "subject" => Some("subject"),
"encounter" => Some("encounter"),
"performer" => Some("performer"),
_ => None,
};
if let Some(field) = field_name {
if let Some(value) = content.get(field) {
Self::extract_reference_values(value, &mut refs);
}
}
refs
}
fn extract_reference_values(value: &Value, refs: &mut Vec<String>) {
match value {
Value::Object(obj) => {
if let Some(Value::String(reference)) = obj.get("reference") {
refs.push(reference.clone());
}
}
Value::Array(arr) => {
for item in arr {
Self::extract_reference_values(item, refs);
}
}
_ => {}
}
}
}
#[async_trait]
impl RevincludeProvider for CompositeStorage {
async fn resolve_revincludes(
&self,
tenant: &TenantContext,
resources: &[StoredResource],
revincludes: &[IncludeDirective],
) -> StorageResult<Vec<StoredResource>> {
let mut revincluded = Vec::new();
for revinclude in revincludes {
for resource in resources {
let reference = format!("{}/{}", resource.resource_type(), resource.id());
let query = SearchQuery::new(&revinclude.source_type).with_parameter(
crate::types::SearchParameter {
name: revinclude.search_param.clone(),
param_type: crate::types::SearchParamType::Reference,
modifier: None,
values: vec![crate::types::SearchValue::eq(&reference)],
chain: vec![],
components: vec![],
},
);
if let Ok(result) = self.search(tenant, &query).await {
for item in result.resources.items {
revincluded.push(item);
}
}
}
}
let mut seen = std::collections::HashSet::new();
revincluded.retain(|r| seen.insert(format!("{}/{}", r.resource_type(), r.id())));
Ok(revincluded)
}
}
#[async_trait]
impl ChainedSearchProvider for CompositeStorage {
async fn resolve_chain(
&self,
tenant: &TenantContext,
base_type: &str,
chain: &str,
value: &str,
) -> StorageResult<Vec<String>> {
let graph_backend = self
.config
.backends_with_role(super::config::BackendRole::Graph)
.next();
if let Some(backend) = graph_backend {
if let Some(_provider) = self.search_providers.get(&backend.id) {
}
}
self.resolve_chain_iterative(tenant, base_type, chain, value)
.await
}
async fn resolve_reverse_chain(
&self,
tenant: &TenantContext,
base_type: &str,
reverse_chain: &ReverseChainedParameter,
) -> StorageResult<Vec<String>> {
let values = match &reverse_chain.value {
Some(v) => vec![v.clone()],
None => vec![],
};
let query = SearchQuery::new(&reverse_chain.source_type).with_parameter(
crate::types::SearchParameter {
name: reverse_chain.search_param.clone(),
param_type: crate::types::SearchParamType::Token,
modifier: None,
values,
chain: vec![],
components: vec![],
},
);
let result = self.search(tenant, &query).await?;
let mut ids = Vec::new();
for resource in result.resources.items {
let refs = self.extract_references(&resource, &reverse_chain.reference_param);
for reference in refs {
if let Some((ref_type, ref_id)) = reference.split_once('/') {
if ref_type == base_type {
ids.push(ref_id.to_string());
}
}
}
}
Ok(ids)
}
}
impl CompositeStorage {
async fn resolve_chain_iterative(
&self,
_tenant: &TenantContext,
_base_type: &str,
chain: &str,
_value: &str,
) -> StorageResult<Vec<String>> {
let parts: Vec<&str> = chain.split('.').collect();
if parts.is_empty() {
return Ok(Vec::new());
}
Ok(Vec::new())
}
}
#[async_trait]
impl TerminologySearchProvider for CompositeStorage {
async fn expand_value_set(&self, _value_set_url: &str) -> StorageResult<Vec<(String, String)>> {
let term_backend = self
.config
.backends_with_role(super::config::BackendRole::Terminology)
.next();
if let Some(_backend) = term_backend {
}
Err(StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "expand_value_set".to_string(),
}))
}
async fn codes_above(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
Err(StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "codes_above".to_string(),
}))
}
async fn codes_below(&self, _system: &str, _code: &str) -> StorageResult<Vec<String>> {
Err(StorageError::Backend(BackendError::UnsupportedCapability {
backend_name: "composite".to_string(),
capability: "codes_below".to_string(),
}))
}
}
#[async_trait]
impl TextSearchProvider for CompositeStorage {
async fn search_text(
&self,
tenant: &TenantContext,
resource_type: &str,
text: &str,
pagination: &Pagination,
) -> StorageResult<SearchResult> {
let search_backend = self
.config
.backends_with_role(super::config::BackendRole::Search)
.next();
if let Some(backend) = search_backend {
if let Some(provider) = self.search_providers.get(&backend.id) {
let query = SearchQuery::new(resource_type)
.with_parameter(crate::types::SearchParameter {
name: "_text".to_string(),
param_type: crate::types::SearchParamType::String,
modifier: None,
values: vec![crate::types::SearchValue::string(text)],
chain: vec![],
components: vec![],
})
.with_count(pagination.count);
return provider.search(tenant, &query).await;
}
}
self.execute_primary_search(
tenant,
&SearchQuery::new(resource_type)
.with_parameter(crate::types::SearchParameter {
name: "_text".to_string(),
param_type: crate::types::SearchParamType::String,
modifier: None,
values: vec![crate::types::SearchValue::string(text)],
chain: vec![],
components: vec![],
})
.with_count(pagination.count),
)
.await
}
async fn search_content(
&self,
tenant: &TenantContext,
resource_type: &str,
content: &str,
pagination: &Pagination,
) -> StorageResult<SearchResult> {
let search_backend = self
.config
.backends_with_role(super::config::BackendRole::Search)
.next();
if let Some(backend) = search_backend {
if let Some(provider) = self.search_providers.get(&backend.id) {
let query = SearchQuery::new(resource_type)
.with_parameter(crate::types::SearchParameter {
name: "_content".to_string(),
param_type: crate::types::SearchParamType::String,
modifier: None,
values: vec![crate::types::SearchValue::string(content)],
chain: vec![],
components: vec![],
})
.with_count(pagination.count);
return provider.search(tenant, &query).await;
}
}
self.execute_primary_search(
tenant,
&SearchQuery::new(resource_type)
.with_parameter(crate::types::SearchParameter {
name: "_content".to_string(),
param_type: crate::types::SearchParamType::String,
modifier: None,
values: vec![crate::types::SearchValue::string(content)],
chain: vec![],
components: vec![],
})
.with_count(pagination.count),
)
.await
}
}
impl CapabilityProvider for CompositeStorage {
fn capabilities(&self) -> StorageCapabilities {
use std::collections::HashSet;
let resource_caps = HashMap::new();
let mut system_interactions = HashSet::new();
system_interactions.insert(crate::core::SystemInteraction::Transaction);
system_interactions.insert(crate::core::SystemInteraction::Batch);
system_interactions.insert(crate::core::SystemInteraction::SearchSystem);
system_interactions.insert(crate::core::SystemInteraction::HistorySystem);
StorageCapabilities {
backend_name: "composite".to_string(),
backend_version: None,
resources: resource_caps,
system_interactions,
supports_system_history: true,
supports_system_search: true,
supported_sorts: vec!["_lastUpdated".to_string(), "_id".to_string()],
supports_total: true,
max_page_size: Some(1000),
default_page_size: 20,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::BackendKind;
fn test_config() -> CompositeConfig {
CompositeConfig::builder()
.primary("sqlite", BackendKind::Sqlite)
.search_backend("es", BackendKind::Elasticsearch)
.build()
.unwrap()
}
#[test]
fn test_backend_health_default() {
let health = BackendHealth::default();
assert!(health.healthy);
assert_eq!(health.failure_count, 0);
assert!(health.last_error.is_none());
}
#[test]
fn test_composite_config() {
let config = test_config();
assert_eq!(config.primary_id(), Some("sqlite"));
assert_eq!(config.secondaries().count(), 1);
}
}