1use std::collections::{HashMap, HashSet};
17
18use crate::types::{SearchModifier, SearchParameter, SearchQuery};
19
20use super::analyzer::{QueryAnalysis, QueryAnalyzer, QueryFeature};
21use super::config::{BackendEntry, BackendRole, CompositeConfig};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
25pub enum MergeStrategy {
26 #[default]
29 Intersection,
30
31 Union,
34
35 PrimaryEnriched,
38
39 SecondaryFiltered,
42}
43
44#[derive(Debug, Clone)]
46pub struct RoutingDecision {
47 pub primary_target: String,
49
50 pub auxiliary_targets: HashMap<QueryFeature, String>,
53
54 pub query_parts: HashMap<String, QueryPart>,
56
57 pub execution_order: Vec<ExecutionStep>,
59
60 pub merge_strategy: MergeStrategy,
62
63 pub analysis: QueryAnalysis,
65}
66
67impl RoutingDecision {
68 pub fn all_backends(&self) -> HashSet<&str> {
70 let mut backends = HashSet::new();
71 backends.insert(self.primary_target.as_str());
72 for backend_id in self.auxiliary_targets.values() {
73 backends.insert(backend_id.as_str());
74 }
75 backends
76 }
77
78 pub fn is_multi_backend(&self) -> bool {
80 !self.auxiliary_targets.is_empty()
81 }
82
83 pub fn uses_backend(&self, backend_id: &str) -> bool {
85 self.primary_target == backend_id
86 || self.auxiliary_targets.values().any(|b| b == backend_id)
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct QueryPart {
93 pub backend_id: String,
95
96 pub parameters: Vec<SearchParameter>,
98
99 pub feature: QueryFeature,
101
102 pub returns_ids_only: bool,
104}
105
106impl QueryPart {
107 pub fn new(backend_id: impl Into<String>, feature: QueryFeature) -> Self {
109 Self {
110 backend_id: backend_id.into(),
111 parameters: Vec::new(),
112 feature,
113 returns_ids_only: false,
114 }
115 }
116
117 pub fn with_parameters(mut self, params: Vec<SearchParameter>) -> Self {
119 self.parameters = params;
120 self
121 }
122
123 pub fn with_ids_only(mut self, ids_only: bool) -> Self {
125 self.returns_ids_only = ids_only;
126 self
127 }
128}
129
130#[derive(Debug, Clone)]
132pub enum ExecutionStep {
133 Execute {
135 backend_id: String,
137 part_feature: QueryFeature,
139 },
140
141 Barrier(Vec<String>),
143
144 Merge {
146 inputs: Vec<String>,
148 strategy: MergeStrategy,
150 },
151
152 Filter {
154 backend_id: String,
156 source: String,
158 },
159
160 ResolveIncludes {
162 backend_id: String,
164 },
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
169pub enum BackendType {
170 Primary,
172 Search,
174 Graph,
176 Terminology,
178 Archive,
180}
181
182impl From<BackendRole> for BackendType {
183 fn from(role: BackendRole) -> Self {
184 match role {
185 BackendRole::Primary => BackendType::Primary,
186 BackendRole::Search => BackendType::Search,
187 BackendRole::Graph => BackendType::Graph,
188 BackendRole::Terminology => BackendType::Terminology,
189 BackendRole::Archive => BackendType::Archive,
190 }
191 }
192}
193
194#[derive(Debug)]
196pub struct QueryRouting {
197 pub primary_backend: BackendType,
199 pub auxiliary_backends: HashSet<BackendType>,
201}
202
203pub struct QueryRouter {
205 config: CompositeConfig,
206 analyzer: QueryAnalyzer,
207}
208
209impl QueryRouter {
210 pub fn new(config: CompositeConfig) -> Self {
212 Self {
213 config,
214 analyzer: QueryAnalyzer::new(),
215 }
216 }
217
218 pub fn route(&self, query: &SearchQuery) -> Result<RoutingDecision, RoutingError> {
220 let analysis = self.analyzer.analyze(query);
222
223 let primary = self
225 .config
226 .primary()
227 .ok_or(RoutingError::NoPrimaryBackend)?;
228
229 let mut auxiliary_targets = HashMap::new();
231 let mut query_parts = HashMap::new();
232
233 for feature in &analysis.specialized_features {
235 if let Some(backend) = self.find_backend_for_feature(*feature, &analysis) {
236 if backend.id != primary.id {
237 auxiliary_targets.insert(*feature, backend.id.clone());
238
239 let params = analysis
241 .feature_params
242 .get(feature)
243 .cloned()
244 .unwrap_or_default();
245
246 query_parts.insert(
247 backend.id.clone(),
248 QueryPart::new(&backend.id, *feature)
249 .with_parameters(params)
250 .with_ids_only(true),
251 );
252 }
253 }
254 }
255
256 let primary_params = analysis
258 .feature_params
259 .get(&QueryFeature::BasicSearch)
260 .cloned()
261 .unwrap_or_default();
262
263 query_parts.insert(
264 primary.id.clone(),
265 QueryPart::new(&primary.id, QueryFeature::BasicSearch)
266 .with_parameters(primary_params)
267 .with_ids_only(false),
268 );
269
270 let execution_order =
272 self.build_execution_order(&analysis, &auxiliary_targets, &primary.id);
273
274 let merge_strategy = self.determine_merge_strategy(&analysis, &auxiliary_targets);
276
277 Ok(RoutingDecision {
278 primary_target: primary.id.clone(),
279 auxiliary_targets,
280 query_parts,
281 execution_order,
282 merge_strategy,
283 analysis,
284 })
285 }
286
287 pub fn route_simple(&self, query: &SearchQuery) -> QueryRouting {
289 let features = self.analyzer.analyze(query).features;
290
291 let mut routing = QueryRouting {
292 primary_backend: BackendType::Primary,
293 auxiliary_backends: HashSet::new(),
294 };
295
296 if features.contains(&QueryFeature::ChainedSearch)
298 || features.contains(&QueryFeature::ReverseChaining)
299 {
300 routing.auxiliary_backends.insert(BackendType::Graph);
301 }
302
303 if features.contains(&QueryFeature::FullTextSearch) {
305 routing.auxiliary_backends.insert(BackendType::Search);
306 }
307
308 if features.contains(&QueryFeature::TerminologySearch) {
310 routing.auxiliary_backends.insert(BackendType::Terminology);
311 }
312
313 routing
314 }
315
316 fn find_backend_for_feature(
318 &self,
319 feature: QueryFeature,
320 _analysis: &QueryAnalysis,
321 ) -> Option<&BackendEntry> {
322 for rule in &self.config.routing_rules {
324 if rule.triggers.contains(&feature) {
325 if let Some(backend) = self.config.backend(&rule.target_backend) {
326 if backend.enabled {
327 return Some(backend);
328 }
329 }
330 }
331 }
332
333 let preferred_role = match feature {
335 QueryFeature::ChainedSearch | QueryFeature::ReverseChaining => Some(BackendRole::Graph),
336 QueryFeature::FullTextSearch => Some(BackendRole::Search),
337 QueryFeature::TerminologySearch => Some(BackendRole::Terminology),
338 _ => None,
339 };
340
341 if let Some(role) = preferred_role {
342 let mut candidates: Vec<_> = self.config.backends_with_role(role).collect();
344 candidates.sort_by_key(|b| b.priority);
345
346 if let Some(backend) = candidates.first() {
347 return Some(*backend);
348 }
349 }
350
351 if let Some(cap) = feature.required_capability() {
353 let mut capable: Vec<_> = self.config.backends_with_capability(cap).collect();
354 capable.sort_by_key(|b| b.priority);
355 return capable.first().copied();
356 }
357
358 self.config.primary()
359 }
360
361 fn build_execution_order(
363 &self,
364 analysis: &QueryAnalysis,
365 auxiliary_targets: &HashMap<QueryFeature, String>,
366 primary_id: &str,
367 ) -> Vec<ExecutionStep> {
368 let mut steps = Vec::new();
369
370 if !auxiliary_targets.is_empty() {
372 for (feature, backend_id) in auxiliary_targets {
374 steps.push(ExecutionStep::Execute {
375 backend_id: backend_id.clone(),
376 part_feature: *feature,
377 });
378 }
379
380 let aux_backends: Vec<_> = auxiliary_targets.values().cloned().collect();
382 steps.push(ExecutionStep::Barrier(aux_backends.clone()));
383
384 if auxiliary_targets.len() > 1 {
386 steps.push(ExecutionStep::Merge {
387 inputs: aux_backends,
388 strategy: MergeStrategy::Intersection,
389 });
390 }
391 }
392
393 steps.push(ExecutionStep::Execute {
395 backend_id: primary_id.to_string(),
396 part_feature: QueryFeature::BasicSearch,
397 });
398
399 if !auxiliary_targets.is_empty() {
401 steps.push(ExecutionStep::Filter {
402 backend_id: primary_id.to_string(),
403 source: "auxiliary_results".to_string(),
404 });
405 }
406
407 if analysis.has_includes() {
409 steps.push(ExecutionStep::ResolveIncludes {
410 backend_id: primary_id.to_string(),
411 });
412 }
413
414 steps
415 }
416
417 fn determine_merge_strategy(
419 &self,
420 _analysis: &QueryAnalysis,
421 auxiliary_targets: &HashMap<QueryFeature, String>,
422 ) -> MergeStrategy {
423 if auxiliary_targets.is_empty() {
424 return MergeStrategy::Intersection;
425 }
426
427 if auxiliary_targets.contains_key(&QueryFeature::ChainedSearch)
429 || auxiliary_targets.contains_key(&QueryFeature::ReverseChaining)
430 || auxiliary_targets.contains_key(&QueryFeature::TerminologySearch)
431 {
432 return MergeStrategy::SecondaryFiltered;
433 }
434
435 if auxiliary_targets.contains_key(&QueryFeature::FullTextSearch) {
437 return MergeStrategy::Intersection;
438 }
439
440 MergeStrategy::Intersection
441 }
442
443 pub fn decompose_query(&self, query: &SearchQuery) -> Vec<QueryPart> {
445 let _analysis = self.analyzer.analyze(query);
446 let mut parts = Vec::new();
447 let mut primary_params = Vec::new();
448 let mut search_params = Vec::new();
449 let mut graph_params = Vec::new();
450 let mut term_params = Vec::new();
451
452 for param in &query.parameters {
453 if param.name == "_text" || param.name == "_content" {
455 search_params.push(param.clone());
456 }
457 else if !param.chain.is_empty() {
459 graph_params.push(param.clone());
460 }
461 else if matches!(
463 param.modifier,
464 Some(SearchModifier::Above)
465 | Some(SearchModifier::Below)
466 | Some(SearchModifier::In)
467 | Some(SearchModifier::NotIn)
468 ) {
469 term_params.push(param.clone());
470 }
471 else {
473 primary_params.push(param.clone());
474 }
475 }
476
477 if !primary_params.is_empty() {
478 parts.push(
479 QueryPart::new("primary", QueryFeature::BasicSearch)
480 .with_parameters(primary_params),
481 );
482 }
483
484 if !search_params.is_empty() {
485 parts.push(
486 QueryPart::new("search", QueryFeature::FullTextSearch)
487 .with_parameters(search_params)
488 .with_ids_only(true),
489 );
490 }
491
492 if !graph_params.is_empty() {
493 parts.push(
494 QueryPart::new("graph", QueryFeature::ChainedSearch)
495 .with_parameters(graph_params)
496 .with_ids_only(true),
497 );
498 }
499
500 if !term_params.is_empty() {
501 parts.push(
502 QueryPart::new("terminology", QueryFeature::TerminologySearch)
503 .with_parameters(term_params)
504 .with_ids_only(true),
505 );
506 }
507
508 parts
509 }
510}
511
512#[derive(Debug, Clone, thiserror::Error)]
514pub enum RoutingError {
515 #[error("no primary backend configured")]
517 NoPrimaryBackend,
518
519 #[error("no backend capable of handling feature: {feature:?}")]
521 NoCapableBackend {
522 feature: QueryFeature,
524 },
525
526 #[error("backend '{backend_id}' is unavailable")]
528 BackendUnavailable {
529 backend_id: String,
531 },
532}
533
534pub fn route_query(query: &SearchQuery) -> QueryRouting {
536 let config = CompositeConfig::default();
537 let router = QueryRouter::new(config);
538 router.route_simple(query)
539}
540
541pub fn decompose_query(query: &SearchQuery) -> Vec<QueryPart> {
543 let config = CompositeConfig::default();
544 let router = QueryRouter::new(config);
545 router.decompose_query(query)
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::core::BackendKind;
552 use crate::types::{ChainedParameter, SearchModifier, SearchParamType, SearchValue};
553
554 fn test_config() -> CompositeConfig {
555 CompositeConfig::builder()
556 .primary("sqlite", BackendKind::Sqlite)
557 .search_backend("es", BackendKind::Elasticsearch)
558 .graph_backend("neo4j", BackendKind::Neo4j)
559 .build()
560 .unwrap()
561 }
562
563 #[test]
564 fn test_route_simple_query_to_primary() {
565 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
566 name: "_id".to_string(),
567 param_type: SearchParamType::Token,
568 modifier: None,
569 values: vec![SearchValue::eq("patient-123")],
570 chain: vec![],
571 components: vec![],
572 });
573
574 let routing = route_query(&query);
575 assert_eq!(routing.primary_backend, BackendType::Primary);
576 assert!(routing.auxiliary_backends.is_empty());
577 }
578
579 #[test]
580 fn test_route_chained_search_to_graph() {
581 let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
582 name: "name".to_string(),
583 param_type: SearchParamType::String,
584 modifier: None,
585 values: vec![SearchValue::string("Smith")],
586 chain: vec![ChainedParameter {
587 reference_param: "subject".to_string(),
588 target_type: Some("Patient".to_string()),
589 target_param: "name".to_string(),
590 }],
591 components: vec![],
592 });
593
594 let routing = route_query(&query);
595 assert!(routing.auxiliary_backends.contains(&BackendType::Graph));
596 }
597
598 #[test]
599 fn test_route_fulltext_to_search() {
600 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
601 name: "_text".to_string(),
602 param_type: SearchParamType::String,
603 modifier: None,
604 values: vec![SearchValue::string("cardiac")],
605 chain: vec![],
606 components: vec![],
607 });
608
609 let routing = route_query(&query);
610 assert!(routing.auxiliary_backends.contains(&BackendType::Search));
611 }
612
613 #[test]
614 fn test_route_terminology_to_terminology_service() {
615 let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
616 name: "code".to_string(),
617 param_type: SearchParamType::Token,
618 modifier: Some(SearchModifier::Below),
619 values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
620 chain: vec![],
621 components: vec![],
622 });
623
624 let routing = route_query(&query);
625 assert!(
626 routing
627 .auxiliary_backends
628 .contains(&BackendType::Terminology)
629 );
630 }
631
632 #[test]
633 fn test_route_complex_query_to_multiple_backends() {
634 let query = SearchQuery::new("Observation")
635 .with_parameter(SearchParameter {
636 name: "name".to_string(),
637 param_type: SearchParamType::String,
638 modifier: None,
639 values: vec![SearchValue::string("Smith")],
640 chain: vec![ChainedParameter {
641 reference_param: "subject".to_string(),
642 target_type: Some("Patient".to_string()),
643 target_param: "name".to_string(),
644 }],
645 components: vec![],
646 })
647 .with_parameter(SearchParameter {
648 name: "_text".to_string(),
649 param_type: SearchParamType::String,
650 modifier: None,
651 values: vec![SearchValue::string("cardiac")],
652 chain: vec![],
653 components: vec![],
654 })
655 .with_parameter(SearchParameter {
656 name: "code".to_string(),
657 param_type: SearchParamType::Token,
658 modifier: Some(SearchModifier::Below),
659 values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
660 chain: vec![],
661 components: vec![],
662 });
663
664 let routing = route_query(&query);
665 assert!(!routing.auxiliary_backends.is_empty());
666 }
667
668 #[test]
669 fn test_decompose_query() {
670 let query = SearchQuery::new("Observation")
671 .with_parameter(SearchParameter {
672 name: "code".to_string(),
673 param_type: SearchParamType::Token,
674 modifier: None,
675 values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
676 chain: vec![],
677 components: vec![],
678 })
679 .with_parameter(SearchParameter {
680 name: "_text".to_string(),
681 param_type: SearchParamType::String,
682 modifier: None,
683 values: vec![SearchValue::string("cardiac")],
684 chain: vec![],
685 components: vec![],
686 });
687
688 let parts = decompose_query(&query);
689
690 assert!(!parts.is_empty());
691 assert!(parts.iter().any(|p| p.backend_id == "primary"));
692 assert!(parts.iter().any(|p| p.backend_id == "search"));
693 }
694
695 #[test]
696 fn test_routing_decision_with_config() {
697 let config = test_config();
698 let router = QueryRouter::new(config);
699
700 let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
701 name: "_text".to_string(),
702 param_type: SearchParamType::String,
703 modifier: None,
704 values: vec![SearchValue::string("cardiac")],
705 chain: vec![],
706 components: vec![],
707 });
708
709 let decision = router.route(&query).unwrap();
710
711 assert_eq!(decision.primary_target, "sqlite");
712 assert!(
713 decision
714 .auxiliary_targets
715 .contains_key(&QueryFeature::FullTextSearch)
716 );
717 }
718}