Skip to main content

heliosdb_proxy/schema_routing/
mod.rs

1//! Schema-Aware Routing
2//!
3//! Feature 13 of the HeliosProxy roadmap.
4//!
5//! This module provides intelligent query routing based on schema semantics:
6//!
7//! - **Table Classification**: HOT/WARM/COLD temperature, OLTP/OLAP/Vector workload
8//! - **Query Analysis**: Detect access patterns, shard keys, complexity
9//! - **Smart Routing**: Route to optimal nodes based on schema + query characteristics
10//! - **AI Workload Detection**: Recognize RAG, embedding, and agent workloads
11//! - **Learning Classifier**: Automatically learn and update classifications
12//!
13//! # Architecture
14//!
15//! ```text
16//! Query → Analyzer → Schema Registry → Router → Node Selection
17//!                          ↑
18//!                   Learning Classifier
19//! ```
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use heliosdb::proxy::schema_routing::{
25//!     SchemaAwareRouter, SchemaRegistry, SchemaRoutingConfig,
26//! };
27//!
28//! let config = SchemaRoutingConfig::builder()
29//!     .auto_discover(true)
30//!     .refresh_interval(Duration::from_secs(300))
31//!     .build();
32//!
33//! let registry = SchemaRegistry::new();
34//! let router = SchemaAwareRouter::new(config, registry);
35//!
36//! let decision = router.route("SELECT * FROM users WHERE id = 1").await;
37//! ```
38
39pub mod admin;
40pub mod analyzer;
41pub mod classifier;
42pub mod discovery;
43pub mod metrics;
44pub mod registry;
45pub mod router;
46
47pub use admin::{AdminError, SchemaRoutingAdmin};
48pub use analyzer::{QueryAnalysis, QueryAnalyzer, ShardKeyValue, TableRef};
49pub use classifier::{
50    ClassificationModel, LearningClassifier, QueryHistory, QueryType, TableClassification,
51};
52pub use discovery::{DiscoveryConfig, DiscoveryError, SchemaDiscovery};
53pub use metrics::{
54    AIWorkloadStats, MetricsReport, RAGStats, RoutingStats, SchemaRoutingMetrics, TableStats,
55    WorkloadStats,
56};
57pub use registry::{
58    AccessPattern, ColumnSchema, DataTemperature, IndexSchema, NodeCapabilities, PartitionKey,
59    Relationship, SchemaRegistry, ShardingConfig, TableSchema, WorkloadType,
60};
61pub use router::{
62    AIWorkloadType, RAGStage, RouteTarget, RoutingDecision, RoutingPreference, RoutingReason,
63    SchemaAwareRouter,
64};
65
66use std::collections::HashMap;
67use std::time::Duration;
68
69/// Schema-aware routing configuration
70#[derive(Debug, Clone)]
71pub struct SchemaRoutingConfig {
72    /// Enable schema-aware routing
73    pub enabled: bool,
74    /// Auto-discover schema from database
75    pub auto_discover: bool,
76    /// Schema refresh interval
77    pub refresh_interval: Duration,
78    /// Enable learning classifier
79    pub learning_enabled: bool,
80    /// Classification update threshold (queries before reclassification)
81    pub classification_threshold: u64,
82    /// Default temperature for new tables
83    pub default_temperature: DataTemperature,
84    /// Default workload for new tables
85    pub default_workload: WorkloadType,
86    /// Table configurations
87    pub tables: Vec<TableConfig>,
88    /// Node capability configurations
89    pub node_capabilities: HashMap<String, NodeCapabilities>,
90}
91
92impl Default for SchemaRoutingConfig {
93    fn default() -> Self {
94        Self {
95            enabled: true,
96            auto_discover: true,
97            refresh_interval: Duration::from_secs(300),
98            learning_enabled: true,
99            classification_threshold: 1000,
100            default_temperature: DataTemperature::Warm,
101            default_workload: WorkloadType::Mixed,
102            tables: Vec::new(),
103            node_capabilities: HashMap::new(),
104        }
105    }
106}
107
108/// Builder for SchemaRoutingConfig
109#[derive(Debug, Default)]
110pub struct SchemaRoutingConfigBuilder {
111    config: SchemaRoutingConfig,
112}
113
114impl SchemaRoutingConfigBuilder {
115    /// Create a new builder
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Enable or disable schema-aware routing
121    pub fn enabled(mut self, enabled: bool) -> Self {
122        self.config.enabled = enabled;
123        self
124    }
125
126    /// Enable auto-discovery of schema
127    pub fn auto_discover(mut self, auto_discover: bool) -> Self {
128        self.config.auto_discover = auto_discover;
129        self
130    }
131
132    /// Set schema refresh interval
133    pub fn refresh_interval(mut self, interval: Duration) -> Self {
134        self.config.refresh_interval = interval;
135        self
136    }
137
138    /// Enable learning classifier
139    pub fn learning_enabled(mut self, enabled: bool) -> Self {
140        self.config.learning_enabled = enabled;
141        self
142    }
143
144    /// Set classification threshold
145    pub fn classification_threshold(mut self, threshold: u64) -> Self {
146        self.config.classification_threshold = threshold;
147        self
148    }
149
150    /// Set default temperature for new tables
151    pub fn default_temperature(mut self, temp: DataTemperature) -> Self {
152        self.config.default_temperature = temp;
153        self
154    }
155
156    /// Set default workload for new tables
157    pub fn default_workload(mut self, workload: WorkloadType) -> Self {
158        self.config.default_workload = workload;
159        self
160    }
161
162    /// Add table configuration
163    pub fn add_table(mut self, table: TableConfig) -> Self {
164        self.config.tables.push(table);
165        self
166    }
167
168    /// Add node capability configuration
169    pub fn add_node_capability(
170        mut self,
171        node_name: impl Into<String>,
172        caps: NodeCapabilities,
173    ) -> Self {
174        self.config.node_capabilities.insert(node_name.into(), caps);
175        self
176    }
177
178    /// Build the configuration
179    pub fn build(self) -> SchemaRoutingConfig {
180        self.config
181    }
182}
183
184impl SchemaRoutingConfig {
185    /// Create a builder
186    pub fn builder() -> SchemaRoutingConfigBuilder {
187        SchemaRoutingConfigBuilder::new()
188    }
189}
190
191/// Table configuration for schema routing
192#[derive(Debug, Clone)]
193pub struct TableConfig {
194    /// Table name
195    pub name: String,
196    /// Data temperature
197    pub temperature: DataTemperature,
198    /// Workload type
199    pub workload: WorkloadType,
200    /// Access pattern
201    pub access_pattern: AccessPattern,
202    /// Shard key (if sharded)
203    pub shard_key: Option<String>,
204    /// Shard count
205    pub shard_count: Option<u32>,
206    /// Preferred nodes
207    pub preferred_nodes: Vec<String>,
208}
209
210impl TableConfig {
211    /// Create a new table configuration
212    pub fn new(name: impl Into<String>) -> Self {
213        Self {
214            name: name.into(),
215            temperature: DataTemperature::Warm,
216            workload: WorkloadType::Mixed,
217            access_pattern: AccessPattern::Mixed,
218            shard_key: None,
219            shard_count: None,
220            preferred_nodes: Vec::new(),
221        }
222    }
223
224    /// Set temperature
225    pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
226        self.temperature = temp;
227        self
228    }
229
230    /// Set workload type
231    pub fn with_workload(mut self, workload: WorkloadType) -> Self {
232        self.workload = workload;
233        self
234    }
235
236    /// Set access pattern
237    pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
238        self.access_pattern = pattern;
239        self
240    }
241
242    /// Set shard key
243    pub fn with_shard_key(mut self, key: impl Into<String>, count: u32) -> Self {
244        self.shard_key = Some(key.into());
245        self.shard_count = Some(count);
246        self
247    }
248
249    /// Add preferred node
250    pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
251        self.preferred_nodes.push(node.into());
252        self
253    }
254}
255
256/// Sync mode for replication
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
258pub enum SyncMode {
259    /// Synchronous replication
260    Sync,
261    /// Asynchronous replication
262    Async,
263    /// Primary node
264    Primary,
265}
266
267/// Node information for routing
268#[derive(Debug, Clone)]
269pub struct NodeInfo {
270    /// Node identifier
271    pub id: String,
272    /// Node name
273    pub name: String,
274    /// Is this the primary node
275    pub is_primary: bool,
276    /// Sync mode
277    pub sync_mode: SyncMode,
278    /// Node capabilities
279    pub capabilities: NodeCapabilities,
280    /// Current load (0.0 - 1.0)
281    pub current_load: f64,
282    /// Current latency in milliseconds
283    pub current_latency_ms: u64,
284    /// Indexes loaded in memory
285    pub indexes_in_memory: Vec<String>,
286}
287
288impl NodeInfo {
289    /// Create a new node
290    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
291        Self {
292            id: id.into(),
293            name: name.into(),
294            is_primary: false,
295            sync_mode: SyncMode::Async,
296            capabilities: NodeCapabilities::default(),
297            current_load: 0.0,
298            current_latency_ms: 0,
299            indexes_in_memory: Vec::new(),
300        }
301    }
302
303    /// Set as primary
304    pub fn as_primary(mut self) -> Self {
305        self.is_primary = true;
306        self.sync_mode = SyncMode::Primary;
307        self
308    }
309
310    /// Set sync mode
311    pub fn with_sync_mode(mut self, mode: SyncMode) -> Self {
312        self.sync_mode = mode;
313        self
314    }
315
316    /// Set capabilities
317    pub fn with_capabilities(mut self, caps: NodeCapabilities) -> Self {
318        self.capabilities = caps;
319        self
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_config_builder() {
329        let config = SchemaRoutingConfig::builder()
330            .enabled(true)
331            .auto_discover(true)
332            .refresh_interval(Duration::from_secs(60))
333            .learning_enabled(true)
334            .default_temperature(DataTemperature::Hot)
335            .build();
336
337        assert!(config.enabled);
338        assert!(config.auto_discover);
339        assert_eq!(config.refresh_interval, Duration::from_secs(60));
340        assert_eq!(config.default_temperature, DataTemperature::Hot);
341    }
342
343    #[test]
344    fn test_table_config_builder() {
345        let config = TableConfig::new("users")
346            .with_temperature(DataTemperature::Hot)
347            .with_workload(WorkloadType::OLTP)
348            .with_access_pattern(AccessPattern::PointLookup)
349            .with_preferred_node("primary")
350            .with_preferred_node("standby-sync");
351
352        assert_eq!(config.name, "users");
353        assert_eq!(config.temperature, DataTemperature::Hot);
354        assert_eq!(config.workload, WorkloadType::OLTP);
355        assert_eq!(config.preferred_nodes.len(), 2);
356    }
357
358    #[test]
359    fn test_node_info() {
360        let node = NodeInfo::new("node1", "primary")
361            .as_primary()
362            .with_capabilities(NodeCapabilities {
363                vector_search: true,
364                gpu_acceleration: true,
365                ..Default::default()
366            });
367
368        assert!(node.is_primary);
369        assert_eq!(node.sync_mode, SyncMode::Primary);
370        assert!(node.capabilities.vector_search);
371    }
372}