Skip to main content

helios_persistence/composite/
router.rs

1//! Query routing logic for composite storage.
2//!
3//! This module determines how queries are routed to backends based on
4//! detected features and backend capabilities.
5//!
6//! # Routing Rules
7//!
8//! The router applies these rules (from tests):
9//! - Chained parameters → Graph backend
10//! - `_text`/`_content` → Search backend
11//! - `:above`/`:below`/`:in`/`:not-in` → Terminology service
12//! - Default → Primary backend
13//! - Writes → Primary only
14//! - `_include`/`_revinclude` → Primary backend (for reference resolution)
15
16use std::collections::{HashMap, HashSet};
17
18use crate::types::{SearchModifier, SearchParameter, SearchQuery};
19
20use super::analyzer::{QueryAnalysis, QueryAnalyzer, QueryFeature};
21use super::config::{BackendEntry, BackendRole, CompositeConfig};
22
23/// Strategy for merging results from multiple backends.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
25pub enum MergeStrategy {
26    /// Results must match all backends (AND).
27    /// Used when all backends must agree on matches.
28    #[default]
29    Intersection,
30
31    /// Results from any backend (OR).
32    /// Used for broad searches.
33    Union,
34
35    /// Primary results, enriched by secondaries.
36    /// Used when primary is authoritative but secondaries add metadata.
37    PrimaryEnriched,
38
39    /// Filter secondary results through primary.
40    /// Used when secondary finds candidates, primary validates.
41    SecondaryFiltered,
42}
43
44/// Routing decision for a query.
45#[derive(Debug, Clone)]
46pub struct RoutingDecision {
47    /// Primary backend ID to execute main query.
48    pub primary_target: String,
49
50    /// Additional backends for specific features.
51    /// Maps feature to backend ID.
52    pub auxiliary_targets: HashMap<QueryFeature, String>,
53
54    /// Query parts for each backend.
55    pub query_parts: HashMap<String, QueryPart>,
56
57    /// Execution order for query parts.
58    pub execution_order: Vec<ExecutionStep>,
59
60    /// Strategy for merging results.
61    pub merge_strategy: MergeStrategy,
62
63    /// Analysis used for this decision.
64    pub analysis: QueryAnalysis,
65}
66
67impl RoutingDecision {
68    /// Returns all unique backend IDs involved in this decision.
69    pub fn all_backends(&self) -> HashSet<&str> {
70        let mut backends = HashSet::new();
71        backends.insert(self.primary_target.as_str());
72        for backend_id in self.auxiliary_targets.values() {
73            backends.insert(backend_id.as_str());
74        }
75        backends
76    }
77
78    /// Returns true if this decision uses multiple backends.
79    pub fn is_multi_backend(&self) -> bool {
80        !self.auxiliary_targets.is_empty()
81    }
82
83    /// Returns true if a specific backend is used.
84    pub fn uses_backend(&self, backend_id: &str) -> bool {
85        self.primary_target == backend_id
86            || self.auxiliary_targets.values().any(|b| b == backend_id)
87    }
88}
89
90/// A part of a query to execute on a specific backend.
91#[derive(Debug, Clone)]
92pub struct QueryPart {
93    /// Backend ID for this part.
94    pub backend_id: String,
95
96    /// Parameters for this part.
97    pub parameters: Vec<SearchParameter>,
98
99    /// Feature being handled.
100    pub feature: QueryFeature,
101
102    /// Whether this part returns only IDs (not full resources).
103    pub returns_ids_only: bool,
104}
105
106impl QueryPart {
107    /// Creates a new query part.
108    pub fn new(backend_id: impl Into<String>, feature: QueryFeature) -> Self {
109        Self {
110            backend_id: backend_id.into(),
111            parameters: Vec::new(),
112            feature,
113            returns_ids_only: false,
114        }
115    }
116
117    /// Adds parameters to this part.
118    pub fn with_parameters(mut self, params: Vec<SearchParameter>) -> Self {
119        self.parameters = params;
120        self
121    }
122
123    /// Sets whether this part returns IDs only.
124    pub fn with_ids_only(mut self, ids_only: bool) -> Self {
125        self.returns_ids_only = ids_only;
126        self
127    }
128}
129
130/// An execution step in the query plan.
131#[derive(Debug, Clone)]
132pub enum ExecutionStep {
133    /// Execute query part on a backend.
134    Execute {
135        /// Backend ID.
136        backend_id: String,
137        /// The query part to execute.
138        part_feature: QueryFeature,
139    },
140
141    /// Wait for previous steps to complete.
142    Barrier(Vec<String>),
143
144    /// Merge results from multiple backends.
145    Merge {
146        /// Backend IDs to merge from.
147        inputs: Vec<String>,
148        /// Merge strategy.
149        strategy: MergeStrategy,
150    },
151
152    /// Filter results through another backend.
153    Filter {
154        /// Backend to filter with.
155        backend_id: String,
156        /// Source of IDs to filter.
157        source: String,
158    },
159
160    /// Resolve includes from primary.
161    ResolveIncludes {
162        /// Backend for include resolution.
163        backend_id: String,
164    },
165}
166
167/// Backend type for routing (matches test expectations).
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
169pub enum BackendType {
170    /// Primary storage backend.
171    Primary,
172    /// Search optimization backend.
173    Search,
174    /// Graph query backend.
175    Graph,
176    /// Terminology service.
177    Terminology,
178    /// Archive storage.
179    Archive,
180}
181
182impl From<BackendRole> for BackendType {
183    fn from(role: BackendRole) -> Self {
184        match role {
185            BackendRole::Primary => BackendType::Primary,
186            BackendRole::Search => BackendType::Search,
187            BackendRole::Graph => BackendType::Graph,
188            BackendRole::Terminology => BackendType::Terminology,
189            BackendRole::Archive => BackendType::Archive,
190        }
191    }
192}
193
194/// Simple routing result for tests.
195#[derive(Debug)]
196pub struct QueryRouting {
197    /// Primary backend type.
198    pub primary_backend: BackendType,
199    /// Auxiliary backend types.
200    pub auxiliary_backends: HashSet<BackendType>,
201}
202
203/// Query router that determines execution plan.
204pub struct QueryRouter {
205    config: CompositeConfig,
206    analyzer: QueryAnalyzer,
207}
208
209impl QueryRouter {
210    /// Creates a new router with the given configuration.
211    pub fn new(config: CompositeConfig) -> Self {
212        Self {
213            config,
214            analyzer: QueryAnalyzer::new(),
215        }
216    }
217
218    /// Routes a query to appropriate backends.
219    pub fn route(&self, query: &SearchQuery) -> Result<RoutingDecision, RoutingError> {
220        // 1. Analyze query features
221        let analysis = self.analyzer.analyze(query);
222
223        // 2. Get primary backend
224        let primary = self
225            .config
226            .primary()
227            .ok_or(RoutingError::NoPrimaryBackend)?;
228
229        // 3. Find capable backends for each specialized feature
230        let mut auxiliary_targets = HashMap::new();
231        let mut query_parts = HashMap::new();
232
233        // Route specialized features
234        for feature in &analysis.specialized_features {
235            if let Some(backend) = self.find_backend_for_feature(*feature, &analysis) {
236                if backend.id != primary.id {
237                    auxiliary_targets.insert(*feature, backend.id.clone());
238
239                    // Create query part for this feature
240                    let params = analysis
241                        .feature_params
242                        .get(feature)
243                        .cloned()
244                        .unwrap_or_default();
245
246                    query_parts.insert(
247                        backend.id.clone(),
248                        QueryPart::new(&backend.id, *feature)
249                            .with_parameters(params)
250                            .with_ids_only(true),
251                    );
252                }
253            }
254        }
255
256        // Create primary query part with remaining parameters
257        let primary_params = analysis
258            .feature_params
259            .get(&QueryFeature::BasicSearch)
260            .cloned()
261            .unwrap_or_default();
262
263        query_parts.insert(
264            primary.id.clone(),
265            QueryPart::new(&primary.id, QueryFeature::BasicSearch)
266                .with_parameters(primary_params)
267                .with_ids_only(false),
268        );
269
270        // 4. Build execution order
271        let execution_order =
272            self.build_execution_order(&analysis, &auxiliary_targets, &primary.id);
273
274        // 5. Determine merge strategy
275        let merge_strategy = self.determine_merge_strategy(&analysis, &auxiliary_targets);
276
277        Ok(RoutingDecision {
278            primary_target: primary.id.clone(),
279            auxiliary_targets,
280            query_parts,
281            execution_order,
282            merge_strategy,
283            analysis,
284        })
285    }
286
287    /// Simple route function matching test expectations.
288    pub fn route_simple(&self, query: &SearchQuery) -> QueryRouting {
289        let features = self.analyzer.analyze(query).features;
290
291        let mut routing = QueryRouting {
292            primary_backend: BackendType::Primary,
293            auxiliary_backends: HashSet::new(),
294        };
295
296        // Route chained search to graph
297        if features.contains(&QueryFeature::ChainedSearch)
298            || features.contains(&QueryFeature::ReverseChaining)
299        {
300            routing.auxiliary_backends.insert(BackendType::Graph);
301        }
302
303        // Route full-text to search
304        if features.contains(&QueryFeature::FullTextSearch) {
305            routing.auxiliary_backends.insert(BackendType::Search);
306        }
307
308        // Route terminology to terminology service
309        if features.contains(&QueryFeature::TerminologySearch) {
310            routing.auxiliary_backends.insert(BackendType::Terminology);
311        }
312
313        routing
314    }
315
316    /// Finds the best backend for a feature.
317    fn find_backend_for_feature(
318        &self,
319        feature: QueryFeature,
320        _analysis: &QueryAnalysis,
321    ) -> Option<&BackendEntry> {
322        // First check custom routing rules
323        for rule in &self.config.routing_rules {
324            if rule.triggers.contains(&feature) {
325                if let Some(backend) = self.config.backend(&rule.target_backend) {
326                    if backend.enabled {
327                        return Some(backend);
328                    }
329                }
330            }
331        }
332
333        // Then check by role mapping
334        let preferred_role = match feature {
335            QueryFeature::ChainedSearch | QueryFeature::ReverseChaining => Some(BackendRole::Graph),
336            QueryFeature::FullTextSearch => Some(BackendRole::Search),
337            QueryFeature::TerminologySearch => Some(BackendRole::Terminology),
338            _ => None,
339        };
340
341        if let Some(role) = preferred_role {
342            // Find backend with this role
343            let mut candidates: Vec<_> = self.config.backends_with_role(role).collect();
344            candidates.sort_by_key(|b| b.priority);
345
346            if let Some(backend) = candidates.first() {
347                return Some(*backend);
348            }
349        }
350
351        // Fall back to primary or any backend with the capability
352        if let Some(cap) = feature.required_capability() {
353            let mut capable: Vec<_> = self.config.backends_with_capability(cap).collect();
354            capable.sort_by_key(|b| b.priority);
355            return capable.first().copied();
356        }
357
358        self.config.primary()
359    }
360
361    /// Builds the execution order.
362    fn build_execution_order(
363        &self,
364        analysis: &QueryAnalysis,
365        auxiliary_targets: &HashMap<QueryFeature, String>,
366        primary_id: &str,
367    ) -> Vec<ExecutionStep> {
368        let mut steps = Vec::new();
369
370        // If there are auxiliary backends, execute them first in parallel
371        if !auxiliary_targets.is_empty() {
372            // Execute all auxiliary queries
373            for (feature, backend_id) in auxiliary_targets {
374                steps.push(ExecutionStep::Execute {
375                    backend_id: backend_id.clone(),
376                    part_feature: *feature,
377                });
378            }
379
380            // Barrier to wait for auxiliary results
381            let aux_backends: Vec<_> = auxiliary_targets.values().cloned().collect();
382            steps.push(ExecutionStep::Barrier(aux_backends.clone()));
383
384            // Merge or filter with primary
385            if auxiliary_targets.len() > 1 {
386                steps.push(ExecutionStep::Merge {
387                    inputs: aux_backends,
388                    strategy: MergeStrategy::Intersection,
389                });
390            }
391        }
392
393        // Execute primary query
394        steps.push(ExecutionStep::Execute {
395            backend_id: primary_id.to_string(),
396            part_feature: QueryFeature::BasicSearch,
397        });
398
399        // If there were auxiliary results, filter through them
400        if !auxiliary_targets.is_empty() {
401            steps.push(ExecutionStep::Filter {
402                backend_id: primary_id.to_string(),
403                source: "auxiliary_results".to_string(),
404            });
405        }
406
407        // Resolve includes if needed
408        if analysis.has_includes() {
409            steps.push(ExecutionStep::ResolveIncludes {
410                backend_id: primary_id.to_string(),
411            });
412        }
413
414        steps
415    }
416
417    /// Determines the merge strategy based on query analysis.
418    fn determine_merge_strategy(
419        &self,
420        _analysis: &QueryAnalysis,
421        auxiliary_targets: &HashMap<QueryFeature, String>,
422    ) -> MergeStrategy {
423        if auxiliary_targets.is_empty() {
424            return MergeStrategy::Intersection;
425        }
426
427        // If using graph or terminology, filter secondary through primary
428        if auxiliary_targets.contains_key(&QueryFeature::ChainedSearch)
429            || auxiliary_targets.contains_key(&QueryFeature::ReverseChaining)
430            || auxiliary_targets.contains_key(&QueryFeature::TerminologySearch)
431        {
432            return MergeStrategy::SecondaryFiltered;
433        }
434
435        // If using full-text search, intersect results
436        if auxiliary_targets.contains_key(&QueryFeature::FullTextSearch) {
437            return MergeStrategy::Intersection;
438        }
439
440        MergeStrategy::Intersection
441    }
442
443    /// Decomposes a query into backend-specific parts.
444    pub fn decompose_query(&self, query: &SearchQuery) -> Vec<QueryPart> {
445        let _analysis = self.analyzer.analyze(query);
446        let mut parts = Vec::new();
447        let mut primary_params = Vec::new();
448        let mut search_params = Vec::new();
449        let mut graph_params = Vec::new();
450        let mut term_params = Vec::new();
451
452        for param in &query.parameters {
453            // Full-text goes to search backend
454            if param.name == "_text" || param.name == "_content" {
455                search_params.push(param.clone());
456            }
457            // Chained goes to graph
458            else if !param.chain.is_empty() {
459                graph_params.push(param.clone());
460            }
461            // Terminology modifiers
462            else if matches!(
463                param.modifier,
464                Some(SearchModifier::Above)
465                    | Some(SearchModifier::Below)
466                    | Some(SearchModifier::In)
467                    | Some(SearchModifier::NotIn)
468            ) {
469                term_params.push(param.clone());
470            }
471            // Default to primary
472            else {
473                primary_params.push(param.clone());
474            }
475        }
476
477        if !primary_params.is_empty() {
478            parts.push(
479                QueryPart::new("primary", QueryFeature::BasicSearch)
480                    .with_parameters(primary_params),
481            );
482        }
483
484        if !search_params.is_empty() {
485            parts.push(
486                QueryPart::new("search", QueryFeature::FullTextSearch)
487                    .with_parameters(search_params)
488                    .with_ids_only(true),
489            );
490        }
491
492        if !graph_params.is_empty() {
493            parts.push(
494                QueryPart::new("graph", QueryFeature::ChainedSearch)
495                    .with_parameters(graph_params)
496                    .with_ids_only(true),
497            );
498        }
499
500        if !term_params.is_empty() {
501            parts.push(
502                QueryPart::new("terminology", QueryFeature::TerminologySearch)
503                    .with_parameters(term_params)
504                    .with_ids_only(true),
505            );
506        }
507
508        parts
509    }
510}
511
512/// Routing errors.
513#[derive(Debug, Clone, thiserror::Error)]
514pub enum RoutingError {
515    /// No primary backend configured.
516    #[error("no primary backend configured")]
517    NoPrimaryBackend,
518
519    /// No backend capable of handling required features.
520    #[error("no backend capable of handling feature: {feature:?}")]
521    NoCapableBackend {
522        /// The feature that cannot be handled.
523        feature: QueryFeature,
524    },
525
526    /// Backend unavailable.
527    #[error("backend '{backend_id}' is unavailable")]
528    BackendUnavailable {
529        /// The unavailable backend ID.
530        backend_id: String,
531    },
532}
533
534/// Convenience function to route a query (for tests).
535pub fn route_query(query: &SearchQuery) -> QueryRouting {
536    let config = CompositeConfig::default();
537    let router = QueryRouter::new(config);
538    router.route_simple(query)
539}
540
541/// Convenience function to decompose a query (for tests).
542pub fn decompose_query(query: &SearchQuery) -> Vec<QueryPart> {
543    let config = CompositeConfig::default();
544    let router = QueryRouter::new(config);
545    router.decompose_query(query)
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::core::BackendKind;
552    use crate::types::{ChainedParameter, SearchModifier, SearchParamType, SearchValue};
553
554    fn test_config() -> CompositeConfig {
555        CompositeConfig::builder()
556            .primary("sqlite", BackendKind::Sqlite)
557            .search_backend("es", BackendKind::Elasticsearch)
558            .graph_backend("neo4j", BackendKind::Neo4j)
559            .build()
560            .unwrap()
561    }
562
563    #[test]
564    fn test_route_simple_query_to_primary() {
565        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
566            name: "_id".to_string(),
567            param_type: SearchParamType::Token,
568            modifier: None,
569            values: vec![SearchValue::eq("patient-123")],
570            chain: vec![],
571            components: vec![],
572        });
573
574        let routing = route_query(&query);
575        assert_eq!(routing.primary_backend, BackendType::Primary);
576        assert!(routing.auxiliary_backends.is_empty());
577    }
578
579    #[test]
580    fn test_route_chained_search_to_graph() {
581        let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
582            name: "name".to_string(),
583            param_type: SearchParamType::String,
584            modifier: None,
585            values: vec![SearchValue::string("Smith")],
586            chain: vec![ChainedParameter {
587                reference_param: "subject".to_string(),
588                target_type: Some("Patient".to_string()),
589                target_param: "name".to_string(),
590            }],
591            components: vec![],
592        });
593
594        let routing = route_query(&query);
595        assert!(routing.auxiliary_backends.contains(&BackendType::Graph));
596    }
597
598    #[test]
599    fn test_route_fulltext_to_search() {
600        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
601            name: "_text".to_string(),
602            param_type: SearchParamType::String,
603            modifier: None,
604            values: vec![SearchValue::string("cardiac")],
605            chain: vec![],
606            components: vec![],
607        });
608
609        let routing = route_query(&query);
610        assert!(routing.auxiliary_backends.contains(&BackendType::Search));
611    }
612
613    #[test]
614    fn test_route_terminology_to_terminology_service() {
615        let query = SearchQuery::new("Observation").with_parameter(SearchParameter {
616            name: "code".to_string(),
617            param_type: SearchParamType::Token,
618            modifier: Some(SearchModifier::Below),
619            values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
620            chain: vec![],
621            components: vec![],
622        });
623
624        let routing = route_query(&query);
625        assert!(
626            routing
627                .auxiliary_backends
628                .contains(&BackendType::Terminology)
629        );
630    }
631
632    #[test]
633    fn test_route_complex_query_to_multiple_backends() {
634        let query = SearchQuery::new("Observation")
635            .with_parameter(SearchParameter {
636                name: "name".to_string(),
637                param_type: SearchParamType::String,
638                modifier: None,
639                values: vec![SearchValue::string("Smith")],
640                chain: vec![ChainedParameter {
641                    reference_param: "subject".to_string(),
642                    target_type: Some("Patient".to_string()),
643                    target_param: "name".to_string(),
644                }],
645                components: vec![],
646            })
647            .with_parameter(SearchParameter {
648                name: "_text".to_string(),
649                param_type: SearchParamType::String,
650                modifier: None,
651                values: vec![SearchValue::string("cardiac")],
652                chain: vec![],
653                components: vec![],
654            })
655            .with_parameter(SearchParameter {
656                name: "code".to_string(),
657                param_type: SearchParamType::Token,
658                modifier: Some(SearchModifier::Below),
659                values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
660                chain: vec![],
661                components: vec![],
662            });
663
664        let routing = route_query(&query);
665        assert!(!routing.auxiliary_backends.is_empty());
666    }
667
668    #[test]
669    fn test_decompose_query() {
670        let query = SearchQuery::new("Observation")
671            .with_parameter(SearchParameter {
672                name: "code".to_string(),
673                param_type: SearchParamType::Token,
674                modifier: None,
675                values: vec![SearchValue::token(Some("http://loinc.org"), "8867-4")],
676                chain: vec![],
677                components: vec![],
678            })
679            .with_parameter(SearchParameter {
680                name: "_text".to_string(),
681                param_type: SearchParamType::String,
682                modifier: None,
683                values: vec![SearchValue::string("cardiac")],
684                chain: vec![],
685                components: vec![],
686            });
687
688        let parts = decompose_query(&query);
689
690        assert!(!parts.is_empty());
691        assert!(parts.iter().any(|p| p.backend_id == "primary"));
692        assert!(parts.iter().any(|p| p.backend_id == "search"));
693    }
694
695    #[test]
696    fn test_routing_decision_with_config() {
697        let config = test_config();
698        let router = QueryRouter::new(config);
699
700        let query = SearchQuery::new("Patient").with_parameter(SearchParameter {
701            name: "_text".to_string(),
702            param_type: SearchParamType::String,
703            modifier: None,
704            values: vec![SearchValue::string("cardiac")],
705            chain: vec![],
706            components: vec![],
707        });
708
709        let decision = router.route(&query).unwrap();
710
711        assert_eq!(decision.primary_target, "sqlite");
712        assert!(
713            decision
714                .auxiliary_targets
715                .contains_key(&QueryFeature::FullTextSearch)
716        );
717    }
718}