use std::collections::{HashMap, HashSet};
use crate::types::{SearchModifier, SearchParameter, SearchQuery};
use super::analyzer::{QueryAnalysis, QueryAnalyzer, QueryFeature};
use super::config::{BackendEntry, BackendRole, CompositeConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum MergeStrategy {
#[default]
Intersection,
Union,
PrimaryEnriched,
SecondaryFiltered,
}
#[derive(Debug, Clone)]
pub struct RoutingDecision {
pub primary_target: String,
pub auxiliary_targets: HashMap<QueryFeature, String>,
pub query_parts: HashMap<String, QueryPart>,
pub execution_order: Vec<ExecutionStep>,
pub merge_strategy: MergeStrategy,
pub analysis: QueryAnalysis,
}
impl RoutingDecision {
pub fn all_backends(&self) -> HashSet<&str> {
let mut backends = HashSet::new();
backends.insert(self.primary_target.as_str());
for backend_id in self.auxiliary_targets.values() {
backends.insert(backend_id.as_str());
}
backends
}
pub fn is_multi_backend(&self) -> bool {
!self.auxiliary_targets.is_empty()
}
pub fn uses_backend(&self, backend_id: &str) -> bool {
self.primary_target == backend_id
|| self.auxiliary_targets.values().any(|b| b == backend_id)
}
}
#[derive(Debug, Clone)]
pub struct QueryPart {
pub backend_id: String,
pub parameters: Vec<SearchParameter>,
pub feature: QueryFeature,
pub returns_ids_only: bool,
}
impl QueryPart {
pub fn new(backend_id: impl Into<String>, feature: QueryFeature) -> Self {
Self {
backend_id: backend_id.into(),
parameters: Vec::new(),
feature,
returns_ids_only: false,
}
}
pub fn with_parameters(mut self, params: Vec<SearchParameter>) -> Self {
self.parameters = params;
self
}
pub fn with_ids_only(mut self, ids_only: bool) -> Self {
self.returns_ids_only = ids_only;
self
}
}
#[derive(Debug, Clone)]
pub enum ExecutionStep {
Execute {
backend_id: String,
part_feature: QueryFeature,
},
Barrier(Vec<String>),
Merge {
inputs: Vec<String>,
strategy: MergeStrategy,
},
Filter {
backend_id: String,
source: String,
},
ResolveIncludes {
backend_id: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BackendType {
Primary,
Search,
Graph,
Terminology,
Archive,
}
impl From<BackendRole> for BackendType {
fn from(role: BackendRole) -> Self {
match role {
BackendRole::Primary => BackendType::Primary,
BackendRole::Search => BackendType::Search,
BackendRole::Graph => BackendType::Graph,
BackendRole::Terminology => BackendType::Terminology,
BackendRole::Archive => BackendType::Archive,
}
}
}
#[derive(Debug)]
pub struct QueryRouting {
pub primary_backend: BackendType,
pub auxiliary_backends: HashSet<BackendType>,
}
pub struct QueryRouter {
config: CompositeConfig,
analyzer: QueryAnalyzer,
}
impl QueryRouter {
pub fn new(config: CompositeConfig) -> Self {
Self {
config,
analyzer: QueryAnalyzer::new(),
}
}
pub fn route(&self, query: &SearchQuery) -> Result<RoutingDecision, RoutingError> {
let analysis = self.analyzer.analyze(query);
let primary = self
.config
.primary()
.ok_or(RoutingError::NoPrimaryBackend)?;
let mut auxiliary_targets = HashMap::new();
let mut query_parts = HashMap::new();
for feature in &analysis.specialized_features {
if let Some(backend) = self.find_backend_for_feature(*feature, &analysis) {
if backend.id != primary.id {
auxiliary_targets.insert(*feature, backend.id.clone());
let params = analysis
.feature_params
.get(feature)
.cloned()
.unwrap_or_default();
query_parts.insert(
backend.id.clone(),
QueryPart::new(&backend.id, *feature)
.with_parameters(params)
.with_ids_only(true),
);
}
}
}
let primary_params = analysis
.feature_params
.get(&QueryFeature::BasicSearch)
.cloned()
.unwrap_or_default();
query_parts.insert(
primary.id.clone(),
QueryPart::new(&primary.id, QueryFeature::BasicSearch)
.with_parameters(primary_params)
.with_ids_only(false),
);
let execution_order =
self.build_execution_order(&analysis, &auxiliary_targets, &primary.id);
let merge_strategy = self.determine_merge_strategy(&analysis, &auxiliary_targets);
Ok(RoutingDecision {
primary_target: primary.id.clone(),
auxiliary_targets,
query_parts,
execution_order,
merge_strategy,
analysis,
})
}
pub fn route_simple(&self, query: &SearchQuery) -> QueryRouting {
let features = self.analyzer.analyze(query).features;
let mut routing = QueryRouting {
primary_backend: BackendType::Primary,
auxiliary_backends: HashSet::new(),
};
if features.contains(&QueryFeature::ChainedSearch)
|| features.contains(&QueryFeature::ReverseChaining)
{
routing.auxiliary_backends.insert(BackendType::Graph);
}
if features.contains(&QueryFeature::FullTextSearch) {
routing.auxiliary_backends.insert(BackendType::Search);
}
if features.contains(&QueryFeature::TerminologySearch) {
routing.auxiliary_backends.insert(BackendType::Terminology);
}
routing
}
fn find_backend_for_feature(
&self,
feature: QueryFeature,
_analysis: &QueryAnalysis,
) -> Option<&BackendEntry> {
for rule in &self.config.routing_rules {
if rule.triggers.contains(&feature) {
if let Some(backend) = self.config.backend(&rule.target_backend) {
if backend.enabled {
return Some(backend);
}
}
}
}
let preferred_role = match feature {
QueryFeature::ChainedSearch | QueryFeature::ReverseChaining => Some(BackendRole::Graph),
QueryFeature::FullTextSearch => Some(BackendRole::Search),
QueryFeature::TerminologySearch => Some(BackendRole::Terminology),
_ => None,
};
if let Some(role) = preferred_role {
let mut candidates: Vec<_> = self.config.backends_with_role(role).collect();
candidates.sort_by_key(|b| b.priority);
if let Some(backend) = candidates.first() {
return Some(*backend);
}
}
if let Some(cap) = feature.required_capability() {
let mut capable: Vec<_> = self.config.backends_with_capability(cap).collect();
capable.sort_by_key(|b| b.priority);
return capable.first().copied();
}
self.config.primary()
}
fn build_execution_order(
&self,
analysis: &QueryAnalysis,
auxiliary_targets: &HashMap<QueryFeature, String>,
primary_id: &str,
) -> Vec<ExecutionStep> {
let mut steps = Vec::new();
if !auxiliary_targets.is_empty() {
for (feature, backend_id) in auxiliary_targets {
steps.push(ExecutionStep::Execute {
backend_id: backend_id.clone(),
part_feature: *feature,
});
}
let aux_backends: Vec<_> = auxiliary_targets.values().cloned().collect();
steps.push(ExecutionStep::Barrier(aux_backends.clone()));
if auxiliary_targets.len() > 1 {
steps.push(ExecutionStep::Merge {
inputs: aux_backends,
strategy: MergeStrategy::Intersection,
});
}
}
steps.push(ExecutionStep::Execute {
backend_id: primary_id.to_string(),
part_feature: QueryFeature::BasicSearch,
});
if !auxiliary_targets.is_empty() {
steps.push(ExecutionStep::Filter {
backend_id: primary_id.to_string(),
source: "auxiliary_results".to_string(),
});
}
if analysis.has_includes() {
steps.push(ExecutionStep::ResolveIncludes {
backend_id: primary_id.to_string(),
});
}
steps
}
fn determine_merge_strategy(
&self,
_analysis: &QueryAnalysis,
auxiliary_targets: &HashMap<QueryFeature, String>,
) -> MergeStrategy {
if auxiliary_targets.is_empty() {
return MergeStrategy::Intersection;
}
if auxiliary_targets.contains_key(&QueryFeature::ChainedSearch)
|| auxiliary_targets.contains_key(&QueryFeature::ReverseChaining)
|| auxiliary_targets.contains_key(&QueryFeature::TerminologySearch)
{
return MergeStrategy::SecondaryFiltered;
}
if auxiliary_targets.contains_key(&QueryFeature::FullTextSearch) {
return MergeStrategy::Intersection;
}
MergeStrategy::Intersection
}
pub fn decompose_query(&self, query: &SearchQuery) -> Vec<QueryPart> {
let _analysis = self.analyzer.analyze(query);
let mut parts = Vec::new();
let mut primary_params = Vec::new();
let mut search_params = Vec::new();
let mut graph_params = Vec::new();
let mut term_params = Vec::new();
for param in &query.parameters {
if param.name == "_text" || param.name == "_content" {
search_params.push(param.clone());
}
else if !param.chain.is_empty() {
graph_params.push(param.clone());
}
else if matches!(
param.modifier,
Some(SearchModifier::Above)
| Some(SearchModifier::Below)
| Some(SearchModifier::In)
| Some(SearchModifier::NotIn)
) {
term_params.push(param.clone());
}
else {
primary_params.push(param.clone());
}
}
if !primary_params.is_empty() {
parts.push(
QueryPart::new("primary", QueryFeature::BasicSearch)
.with_parameters(primary_params),
);
}
if !search_params.is_empty() {
parts.push(
QueryPart::new("search", QueryFeature::FullTextSearch)
.with_parameters(search_params)
.with_ids_only(true),
);
}
if !graph_params.is_empty() {
parts.push(
QueryPart::new("graph", QueryFeature::ChainedSearch)
.with_parameters(graph_params)
.with_ids_only(true),
);
}
if !term_params.is_empty() {
parts.push(
QueryPart::new("terminology", QueryFeature::TerminologySearch)
.with_parameters(term_params)
.with_ids_only(true),
);
}
parts
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum RoutingError {
#[error("no primary backend configured")]
NoPrimaryBackend,
#[error("no backend capable of handling feature: {feature:?}")]
NoCapableBackend {
feature: QueryFeature,
},
#[error("backend '{backend_id}' is unavailable")]
BackendUnavailable {
backend_id: String,
},
}
pub fn route_query(query: &SearchQuery) -> QueryRouting {
let config = CompositeConfig::default();
let router = QueryRouter::new(config);
router.route_simple(query)
}
pub fn decompose_query(query: &SearchQuery) -> Vec<QueryPart> {
let config = CompositeConfig::default();
let router = QueryRouter::new(config);
router.decompose_query(query)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::BackendKind;
use crate::types::{ChainedParameter, SearchModifier, SearchParamType, SearchValue};
fn test_config() -> CompositeConfig {
CompositeConfig::builder()
.primary("sqlite", BackendKind::Sqlite)
.search_backend("es", BackendKind::Elasticsearch)
.graph_backend("neo4j", BackendKind::Neo4j)
.build()
.unwrap()
}
#[test]
fn test_route_simple_query_to_primary() {
let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
name: "_id".to_string(),
param_type: SearchParamType::Token,
modifier: None,
values: vec![SearchValue::eq("patient-123")],
chain: vec![],
components: vec![],
});
let routing = route_query(&query);
assert_eq!(routing.primary_backend, BackendType::Primary);
assert!(routing.auxiliary_backends.is_empty());
}
#[test]
fn test_route_chained_search_to_graph() {
let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
name: "name".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("Smith")],
chain: vec![ChainedParameter {
reference_param: "subject".to_string(),
target_type: Some("Patient".to_string()),
target_param: "name".to_string(),
}],
components: vec![],
});
let routing = route_query(&query);
assert!(routing.auxiliary_backends.contains(&BackendType::Graph));
}
#[test]
fn test_route_fulltext_to_search() {
let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
name: "_text".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("cardiac")],
chain: vec![],
components: vec![],
});
let routing = route_query(&query);
assert!(routing.auxiliary_backends.contains(&BackendType::Search));
}
#[test]
fn test_route_terminology_to_terminology_service() {
let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
name: "code".to_string(),
param_type: SearchParamType::Token,
modifier: Some(SearchModifier::Below),
values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
chain: vec![],
components: vec![],
});
let routing = route_query(&query);
assert!(
routing
.auxiliary_backends
.contains(&BackendType::Terminology)
);
}
#[test]
fn test_route_complex_query_to_multiple_backends() {
let query = SearchQuery::new("Observation")
.with_parameter(SearchParameter {
name: "name".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("Smith")],
chain: vec![ChainedParameter {
reference_param: "subject".to_string(),
target_type: Some("Patient".to_string()),
target_param: "name".to_string(),
}],
components: vec![],
})
.with_parameter(SearchParameter {
name: "_text".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("cardiac")],
chain: vec![],
components: vec![],
})
.with_parameter(SearchParameter {
name: "code".to_string(),
param_type: SearchParamType::Token,
modifier: Some(SearchModifier::Below),
values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
chain: vec![],
components: vec![],
});
let routing = route_query(&query);
assert!(!routing.auxiliary_backends.is_empty());
}
#[test]
fn test_decompose_query() {
let query = SearchQuery::new("Observation")
.with_parameter(SearchParameter {
name: "code".to_string(),
param_type: SearchParamType::Token,
modifier: None,
values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
chain: vec![],
components: vec![],
})
.with_parameter(SearchParameter {
name: "_text".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("cardiac")],
chain: vec![],
components: vec![],
});
let parts = decompose_query(&query);
assert!(!parts.is_empty());
assert!(parts.iter().any(|p| p.backend_id == "primary"));
assert!(parts.iter().any(|p| p.backend_id == "search"));
}
#[test]
fn test_routing_decision_with_config() {
let config = test_config();
let router = QueryRouter::new(config);
let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
name: "_text".to_string(),
param_type: SearchParamType::String,
modifier: None,
values: vec![SearchValue::string("cardiac")],
chain: vec![],
components: vec![],
});
let decision = router.route(&query).unwrap();
assert_eq!(decision.primary_target, "sqlite");
assert!(
decision
.auxiliary_targets
.contains_key(&QueryFeature::FullTextSearch)
);
}
}