Skip to main content

heliosdb_proxy/schema_routing/
mod.rs

1//! Schema-Aware Routing
2//!
3//! Feature 13 of the HeliosProxy roadmap.
4//!
5//! This module provides intelligent query routing based on schema semantics:
6//!
7//! - **Table Classification**: HOT/WARM/COLD temperature, OLTP/OLAP/Vector workload
8//! - **Query Analysis**: Detect access patterns, shard keys, complexity
9//! - **Smart Routing**: Route to optimal nodes based on schema + query characteristics
10//! - **AI Workload Detection**: Recognize RAG, embedding, and agent workloads
11//! - **Learning Classifier**: Automatically learn and update classifications
12//!
13//! # Architecture
14//!
15//! ```text
16//! Query → Analyzer → Schema Registry → Router → Node Selection
17//!                          ↑
18//!                   Learning Classifier
19//! ```
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use heliosdb::proxy::schema_routing::{
25//!     SchemaAwareRouter, SchemaRegistry, SchemaRoutingConfig,
26//! };
27//!
28//! let config = SchemaRoutingConfig::builder()
29//!     .auto_discover(true)
30//!     .refresh_interval(Duration::from_secs(300))
31//!     .build();
32//!
33//! let registry = SchemaRegistry::new();
34//! let router = SchemaAwareRouter::new(config, registry);
35//!
36//! let decision = router.route("SELECT * FROM users WHERE id = 1").await;
37//! ```
38
39pub mod registry;
40pub mod analyzer;
41pub mod router;
42pub mod classifier;
43pub mod discovery;
44pub mod metrics;
45pub mod admin;
46
47pub use registry::{
48    SchemaRegistry, TableSchema, ColumnSchema, IndexSchema,
49    AccessPattern, DataTemperature, WorkloadType, ShardingConfig,
50    NodeCapabilities, PartitionKey, Relationship,
51};
52pub use analyzer::{QueryAnalyzer, QueryAnalysis, TableRef, ShardKeyValue};
53pub use router::{
54    SchemaAwareRouter, RoutingDecision, RoutingPreference,
55    RAGStage, AIWorkloadType, RouteTarget, RoutingReason,
56};
57pub use classifier::{LearningClassifier, ClassificationModel, QueryHistory, QueryType, TableClassification};
58pub use admin::{SchemaRoutingAdmin, AdminError};
59pub use discovery::{SchemaDiscovery, DiscoveryConfig, DiscoveryError};
60pub use metrics::{
61    SchemaRoutingMetrics, RoutingStats, TableStats, WorkloadStats,
62    AIWorkloadStats, RAGStats, MetricsReport,
63};
64
65use std::collections::HashMap;
66use std::time::Duration;
67
68/// Schema-aware routing configuration
69#[derive(Debug, Clone)]
70pub struct SchemaRoutingConfig {
71    /// Enable schema-aware routing
72    pub enabled: bool,
73    /// Auto-discover schema from database
74    pub auto_discover: bool,
75    /// Schema refresh interval
76    pub refresh_interval: Duration,
77    /// Enable learning classifier
78    pub learning_enabled: bool,
79    /// Classification update threshold (queries before reclassification)
80    pub classification_threshold: u64,
81    /// Default temperature for new tables
82    pub default_temperature: DataTemperature,
83    /// Default workload for new tables
84    pub default_workload: WorkloadType,
85    /// Table configurations
86    pub tables: Vec<TableConfig>,
87    /// Node capability configurations
88    pub node_capabilities: HashMap<String, NodeCapabilities>,
89}
90
91impl Default for SchemaRoutingConfig {
92    fn default() -> Self {
93        Self {
94            enabled: true,
95            auto_discover: true,
96            refresh_interval: Duration::from_secs(300),
97            learning_enabled: true,
98            classification_threshold: 1000,
99            default_temperature: DataTemperature::Warm,
100            default_workload: WorkloadType::Mixed,
101            tables: Vec::new(),
102            node_capabilities: HashMap::new(),
103        }
104    }
105}
106
107/// Builder for SchemaRoutingConfig
108#[derive(Debug, Default)]
109pub struct SchemaRoutingConfigBuilder {
110    config: SchemaRoutingConfig,
111}
112
113impl SchemaRoutingConfigBuilder {
114    /// Create a new builder
115    pub fn new() -> Self {
116        Self::default()
117    }
118
119    /// Enable or disable schema-aware routing
120    pub fn enabled(mut self, enabled: bool) -> Self {
121        self.config.enabled = enabled;
122        self
123    }
124
125    /// Enable auto-discovery of schema
126    pub fn auto_discover(mut self, auto_discover: bool) -> Self {
127        self.config.auto_discover = auto_discover;
128        self
129    }
130
131    /// Set schema refresh interval
132    pub fn refresh_interval(mut self, interval: Duration) -> Self {
133        self.config.refresh_interval = interval;
134        self
135    }
136
137    /// Enable learning classifier
138    pub fn learning_enabled(mut self, enabled: bool) -> Self {
139        self.config.learning_enabled = enabled;
140        self
141    }
142
143    /// Set classification threshold
144    pub fn classification_threshold(mut self, threshold: u64) -> Self {
145        self.config.classification_threshold = threshold;
146        self
147    }
148
149    /// Set default temperature for new tables
150    pub fn default_temperature(mut self, temp: DataTemperature) -> Self {
151        self.config.default_temperature = temp;
152        self
153    }
154
155    /// Set default workload for new tables
156    pub fn default_workload(mut self, workload: WorkloadType) -> Self {
157        self.config.default_workload = workload;
158        self
159    }
160
161    /// Add table configuration
162    pub fn add_table(mut self, table: TableConfig) -> Self {
163        self.config.tables.push(table);
164        self
165    }
166
167    /// Add node capability configuration
168    pub fn add_node_capability(mut self, node_name: impl Into<String>, caps: NodeCapabilities) -> Self {
169        self.config.node_capabilities.insert(node_name.into(), caps);
170        self
171    }
172
173    /// Build the configuration
174    pub fn build(self) -> SchemaRoutingConfig {
175        self.config
176    }
177}
178
179impl SchemaRoutingConfig {
180    /// Create a builder
181    pub fn builder() -> SchemaRoutingConfigBuilder {
182        SchemaRoutingConfigBuilder::new()
183    }
184}
185
186/// Table configuration for schema routing
187#[derive(Debug, Clone)]
188pub struct TableConfig {
189    /// Table name
190    pub name: String,
191    /// Data temperature
192    pub temperature: DataTemperature,
193    /// Workload type
194    pub workload: WorkloadType,
195    /// Access pattern
196    pub access_pattern: AccessPattern,
197    /// Shard key (if sharded)
198    pub shard_key: Option<String>,
199    /// Shard count
200    pub shard_count: Option<u32>,
201    /// Preferred nodes
202    pub preferred_nodes: Vec<String>,
203}
204
205impl TableConfig {
206    /// Create a new table configuration
207    pub fn new(name: impl Into<String>) -> Self {
208        Self {
209            name: name.into(),
210            temperature: DataTemperature::Warm,
211            workload: WorkloadType::Mixed,
212            access_pattern: AccessPattern::Mixed,
213            shard_key: None,
214            shard_count: None,
215            preferred_nodes: Vec::new(),
216        }
217    }
218
219    /// Set temperature
220    pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
221        self.temperature = temp;
222        self
223    }
224
225    /// Set workload type
226    pub fn with_workload(mut self, workload: WorkloadType) -> Self {
227        self.workload = workload;
228        self
229    }
230
231    /// Set access pattern
232    pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
233        self.access_pattern = pattern;
234        self
235    }
236
237    /// Set shard key
238    pub fn with_shard_key(mut self, key: impl Into<String>, count: u32) -> Self {
239        self.shard_key = Some(key.into());
240        self.shard_count = Some(count);
241        self
242    }
243
244    /// Add preferred node
245    pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
246        self.preferred_nodes.push(node.into());
247        self
248    }
249}
250
251/// Sync mode for replication
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
253pub enum SyncMode {
254    /// Synchronous replication
255    Sync,
256    /// Asynchronous replication
257    Async,
258    /// Primary node
259    Primary,
260}
261
262/// Node information for routing
263#[derive(Debug, Clone)]
264pub struct NodeInfo {
265    /// Node identifier
266    pub id: String,
267    /// Node name
268    pub name: String,
269    /// Is this the primary node
270    pub is_primary: bool,
271    /// Sync mode
272    pub sync_mode: SyncMode,
273    /// Node capabilities
274    pub capabilities: NodeCapabilities,
275    /// Current load (0.0 - 1.0)
276    pub current_load: f64,
277    /// Current latency in milliseconds
278    pub current_latency_ms: u64,
279    /// Indexes loaded in memory
280    pub indexes_in_memory: Vec<String>,
281}
282
283impl NodeInfo {
284    /// Create a new node
285    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
286        Self {
287            id: id.into(),
288            name: name.into(),
289            is_primary: false,
290            sync_mode: SyncMode::Async,
291            capabilities: NodeCapabilities::default(),
292            current_load: 0.0,
293            current_latency_ms: 0,
294            indexes_in_memory: Vec::new(),
295        }
296    }
297
298    /// Set as primary
299    pub fn as_primary(mut self) -> Self {
300        self.is_primary = true;
301        self.sync_mode = SyncMode::Primary;
302        self
303    }
304
305    /// Set sync mode
306    pub fn with_sync_mode(mut self, mode: SyncMode) -> Self {
307        self.sync_mode = mode;
308        self
309    }
310
311    /// Set capabilities
312    pub fn with_capabilities(mut self, caps: NodeCapabilities) -> Self {
313        self.capabilities = caps;
314        self
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    #[test]
323    fn test_config_builder() {
324        let config = SchemaRoutingConfig::builder()
325            .enabled(true)
326            .auto_discover(true)
327            .refresh_interval(Duration::from_secs(60))
328            .learning_enabled(true)
329            .default_temperature(DataTemperature::Hot)
330            .build();
331
332        assert!(config.enabled);
333        assert!(config.auto_discover);
334        assert_eq!(config.refresh_interval, Duration::from_secs(60));
335        assert_eq!(config.default_temperature, DataTemperature::Hot);
336    }
337
338    #[test]
339    fn test_table_config_builder() {
340        let config = TableConfig::new("users")
341            .with_temperature(DataTemperature::Hot)
342            .with_workload(WorkloadType::OLTP)
343            .with_access_pattern(AccessPattern::PointLookup)
344            .with_preferred_node("primary")
345            .with_preferred_node("standby-sync");
346
347        assert_eq!(config.name, "users");
348        assert_eq!(config.temperature, DataTemperature::Hot);
349        assert_eq!(config.workload, WorkloadType::OLTP);
350        assert_eq!(config.preferred_nodes.len(), 2);
351    }
352
353    #[test]
354    fn test_node_info() {
355        let node = NodeInfo::new("node1", "primary")
356            .as_primary()
357            .with_capabilities(NodeCapabilities {
358                vector_search: true,
359                gpu_acceleration: true,
360                ..Default::default()
361            });
362
363        assert!(node.is_primary);
364        assert_eq!(node.sync_mode, SyncMode::Primary);
365        assert!(node.capabilities.vector_search);
366    }
367}