Skip to main content

heliosdb_proxy/schema_routing/
registry.rs

1//! Schema Registry
2//!
3//! Manages metadata about tables, indexes, and relationships for routing decisions.
4
5use std::collections::HashMap;
6use dashmap::DashMap;
7use parking_lot::RwLock;
8
9/// Schema registry for routing decisions
10#[derive(Debug)]
11pub struct SchemaRegistry {
12    /// Table metadata
13    tables: DashMap<String, TableSchema>,
14    /// Index metadata
15    indexes: DashMap<String, IndexSchema>,
16    /// Relationships between tables
17    relationships: RwLock<Vec<Relationship>>,
18    /// Sharding configuration
19    sharding: RwLock<ShardingConfig>,
20    /// Node capabilities
21    node_capabilities: DashMap<String, NodeCapabilities>,
22    /// Branch locations (branch -> nodes)
23    branch_locations: DashMap<String, Vec<String>>,
24}
25
26impl SchemaRegistry {
27    /// Create a new schema registry
28    pub fn new() -> Self {
29        Self {
30            tables: DashMap::new(),
31            indexes: DashMap::new(),
32            relationships: RwLock::new(Vec::new()),
33            sharding: RwLock::new(ShardingConfig::default()),
34            node_capabilities: DashMap::new(),
35            branch_locations: DashMap::new(),
36        }
37    }
38
39    /// Register a table schema
40    pub fn register_table(&self, schema: TableSchema) {
41        self.tables.insert(schema.name.clone(), schema);
42    }
43
44    /// Get a table schema
45    pub fn get_table(&self, name: &str) -> Option<TableSchema> {
46        self.tables.get(name).map(|r| r.clone())
47    }
48
49    /// Update table classification
50    pub fn update_classification(
51        &self,
52        table: &str,
53        temperature: DataTemperature,
54        workload: WorkloadType,
55    ) {
56        if let Some(mut entry) = self.tables.get_mut(table) {
57            entry.temperature = temperature;
58            entry.workload = workload;
59        }
60    }
61
62    /// Register an index schema
63    pub fn register_index(&self, schema: IndexSchema) {
64        self.indexes.insert(schema.name.clone(), schema);
65    }
66
67    /// Get an index schema
68    pub fn get_index(&self, name: &str) -> Option<IndexSchema> {
69        self.indexes.get(name).map(|r| r.clone())
70    }
71
72    /// Get vector index for a table
73    pub fn get_vector_index(&self, table: &str) -> Option<IndexSchema> {
74        self.indexes.iter()
75            .find(|entry| entry.table == table && entry.index_type == IndexType::Vector)
76            .map(|entry| entry.clone())
77    }
78
79    /// Add a relationship
80    pub fn add_relationship(&self, relationship: Relationship) {
81        let mut rels = self.relationships.write();
82        rels.push(relationship);
83    }
84
85    /// Get relationships for a table
86    pub fn get_relationships(&self, table: &str) -> Vec<Relationship> {
87        let rels = self.relationships.read();
88        rels.iter()
89            .filter(|r| r.from_table == table || r.to_table == table)
90            .cloned()
91            .collect()
92    }
93
94    /// Set sharding configuration
95    pub fn set_sharding(&self, config: ShardingConfig) {
96        let mut sharding = self.sharding.write();
97        *sharding = config;
98    }
99
100    /// Get shard for a value
101    pub fn get_shard(&self, key: &str, value: &str) -> Option<u32> {
102        let sharding = self.sharding.read();
103        sharding.get_shard(key, value)
104    }
105
106    /// Register node capabilities
107    pub fn register_node_capabilities(&self, node_id: &str, capabilities: NodeCapabilities) {
108        self.node_capabilities.insert(node_id.to_string(), capabilities);
109    }
110
111    /// Get node capabilities
112    pub fn get_node_capabilities(&self, node_id: &str) -> Option<NodeCapabilities> {
113        self.node_capabilities.get(node_id).map(|r| r.clone())
114    }
115
116    /// Register branch location
117    pub fn register_branch_location(&self, branch: &str, node_ids: Vec<String>) {
118        self.branch_locations.insert(branch.to_string(), node_ids);
119    }
120
121    /// Get nodes that have a branch
122    pub fn get_branch_locations(&self, branch: &str) -> Vec<String> {
123        self.branch_locations
124            .get(branch)
125            .map(|r| r.clone())
126            .unwrap_or_default()
127    }
128
129    /// Get all tables
130    pub fn all_tables(&self) -> Vec<TableSchema> {
131        self.tables.iter().map(|r| r.clone()).collect()
132    }
133
134    /// List all tables (alias for all_tables)
135    pub fn list_tables(&self) -> Vec<TableSchema> {
136        self.all_tables()
137    }
138
139    /// Get table count
140    pub fn table_count(&self) -> usize {
141        self.tables.len()
142    }
143
144    /// Remove a table
145    pub fn remove_table(&self, name: &str) {
146        self.tables.remove(name);
147    }
148
149    /// Get tables by workload
150    pub fn tables_by_workload(&self, workload: WorkloadType) -> Vec<TableSchema> {
151        self.tables
152            .iter()
153            .filter(|r| r.workload == workload)
154            .map(|r| r.clone())
155            .collect()
156    }
157
158    /// Get tables by temperature
159    pub fn tables_by_temperature(&self, temperature: DataTemperature) -> Vec<TableSchema> {
160        self.tables
161            .iter()
162            .filter(|r| r.temperature == temperature)
163            .map(|r| r.clone())
164            .collect()
165    }
166
167    /// Check if a column uses columnar storage
168    pub fn is_columnar_column(&self, table: &str, column: &str) -> bool {
169        self.tables
170            .get(table)
171            .map(|t| {
172                t.columns
173                    .iter()
174                    .any(|c| c.name == column && c.storage_type == StorageType::Columnar)
175            })
176            .unwrap_or(false)
177    }
178
179    /// Check if a column is content-addressed
180    pub fn is_content_addressed(&self, table: &str, column: &str) -> bool {
181        self.tables
182            .get(table)
183            .map(|t| {
184                t.columns
185                    .iter()
186                    .any(|c| c.name == column && c.storage_type == StorageType::ContentAddressed)
187            })
188            .unwrap_or(false)
189    }
190}
191
192impl Default for SchemaRegistry {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198/// Table schema information
199#[derive(Debug, Clone)]
200pub struct TableSchema {
201    /// Table name
202    pub name: String,
203    /// Columns
204    pub columns: Vec<ColumnSchema>,
205    /// Access pattern classification
206    pub access_pattern: AccessPattern,
207    /// Temperature (HOT/WARM/COLD)
208    pub temperature: DataTemperature,
209    /// Workload type
210    pub workload: WorkloadType,
211    /// Primary key columns
212    pub primary_key: Vec<String>,
213    /// Shard key (if sharded)
214    pub shard_key: Option<String>,
215    /// Partition key (if partitioned)
216    pub partition_key: Option<PartitionKey>,
217    /// Preferred nodes
218    pub preferred_nodes: Vec<String>,
219    /// Estimated row count
220    pub estimated_rows: u64,
221    /// Average row size in bytes
222    pub avg_row_size: usize,
223}
224
225impl TableSchema {
226    /// Create a new table schema
227    pub fn new(name: impl Into<String>) -> Self {
228        Self {
229            name: name.into(),
230            columns: Vec::new(),
231            access_pattern: AccessPattern::Mixed,
232            temperature: DataTemperature::Warm,
233            workload: WorkloadType::Mixed,
234            primary_key: Vec::new(),
235            shard_key: None,
236            partition_key: None,
237            preferred_nodes: Vec::new(),
238            estimated_rows: 0,
239            avg_row_size: 0,
240        }
241    }
242
243    /// Add a column
244    pub fn with_column(mut self, column: ColumnSchema) -> Self {
245        self.columns.push(column);
246        self
247    }
248
249    /// Set access pattern
250    pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
251        self.access_pattern = pattern;
252        self
253    }
254
255    /// Set temperature
256    pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
257        self.temperature = temp;
258        self
259    }
260
261    /// Set workload
262    pub fn with_workload(mut self, workload: WorkloadType) -> Self {
263        self.workload = workload;
264        self
265    }
266
267    /// Set primary key
268    pub fn with_primary_key(mut self, columns: Vec<String>) -> Self {
269        self.primary_key = columns;
270        self
271    }
272
273    /// Set shard key
274    pub fn with_shard_key(mut self, key: impl Into<String>) -> Self {
275        self.shard_key = Some(key.into());
276        self
277    }
278
279    /// Add preferred node
280    pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
281        self.preferred_nodes.push(node.into());
282        self
283    }
284
285    /// Set estimated rows
286    pub fn with_estimated_rows(mut self, rows: u64) -> Self {
287        self.estimated_rows = rows;
288        self
289    }
290}
291
292/// Column schema information
293#[derive(Debug, Clone)]
294pub struct ColumnSchema {
295    /// Column name
296    pub name: String,
297    /// Data type
298    pub data_type: String,
299    /// Is nullable
300    pub nullable: bool,
301    /// Storage type
302    pub storage_type: StorageType,
303    /// Is part of primary key
304    pub is_primary_key: bool,
305    /// Is indexed
306    pub is_indexed: bool,
307}
308
309impl ColumnSchema {
310    /// Create a new column schema
311    pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
312        Self {
313            name: name.into(),
314            data_type: data_type.into(),
315            nullable: true,
316            storage_type: StorageType::Row,
317            is_primary_key: false,
318            is_indexed: false,
319        }
320    }
321
322    /// Set nullable
323    pub fn nullable(mut self, nullable: bool) -> Self {
324        self.nullable = nullable;
325        self
326    }
327
328    /// Set storage type
329    pub fn with_storage(mut self, storage: StorageType) -> Self {
330        self.storage_type = storage;
331        self
332    }
333
334    /// Set as primary key
335    pub fn as_primary_key(mut self) -> Self {
336        self.is_primary_key = true;
337        self.nullable = false;
338        self
339    }
340
341    /// Set as indexed
342    pub fn indexed(mut self) -> Self {
343        self.is_indexed = true;
344        self
345    }
346}
347
348/// Column storage type
349#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
350pub enum StorageType {
351    /// Traditional row storage
352    Row,
353    /// Columnar storage (for analytics)
354    Columnar,
355    /// Content-addressed storage
356    ContentAddressed,
357    /// Vector storage
358    Vector,
359}
360
361impl Default for StorageType {
362    fn default() -> Self {
363        StorageType::Row
364    }
365}
366
367/// Index schema information
368#[derive(Debug, Clone)]
369pub struct IndexSchema {
370    /// Index name
371    pub name: String,
372    /// Table name
373    pub table: String,
374    /// Indexed columns
375    pub columns: Vec<String>,
376    /// Index type
377    pub index_type: IndexType,
378    /// Is unique
379    pub is_unique: bool,
380}
381
382impl IndexSchema {
383    /// Create a new index schema
384    pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
385        Self {
386            name: name.into(),
387            table: table.into(),
388            columns: Vec::new(),
389            index_type: IndexType::BTree,
390            is_unique: false,
391        }
392    }
393
394    /// Add column
395    pub fn with_column(mut self, column: impl Into<String>) -> Self {
396        self.columns.push(column.into());
397        self
398    }
399
400    /// Set index type
401    pub fn with_type(mut self, index_type: IndexType) -> Self {
402        self.index_type = index_type;
403        self
404    }
405
406    /// Set as unique
407    pub fn unique(mut self) -> Self {
408        self.is_unique = true;
409        self
410    }
411}
412
413/// Index type
414#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
415pub enum IndexType {
416    /// B-tree index
417    BTree,
418    /// Hash index
419    Hash,
420    /// GiST index
421    GiST,
422    /// GIN index
423    GIN,
424    /// Vector/HNSW index
425    Vector,
426}
427
428impl Default for IndexType {
429    fn default() -> Self {
430        IndexType::BTree
431    }
432}
433
434/// Access pattern classification
435#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
436pub enum AccessPattern {
437    /// Point lookups by primary key
438    PointLookup,
439    /// Range scans
440    RangeScan,
441    /// Full table scans (OLAP)
442    FullScan,
443    /// Vector similarity search
444    VectorSearch,
445    /// Time-series append
446    TimeSeriesAppend,
447    /// Mixed patterns
448    Mixed,
449}
450
451impl Default for AccessPattern {
452    fn default() -> Self {
453        AccessPattern::Mixed
454    }
455}
456
457impl AccessPattern {
458    /// Parse from string
459    pub fn from_str(s: &str) -> Option<Self> {
460        match s.to_lowercase().as_str() {
461            "point_lookup" | "pointlookup" => Some(AccessPattern::PointLookup),
462            "range_scan" | "rangescan" => Some(AccessPattern::RangeScan),
463            "full_scan" | "fullscan" => Some(AccessPattern::FullScan),
464            "vector_search" | "vectorsearch" | "vector" => Some(AccessPattern::VectorSearch),
465            "time_series" | "timeseries" | "append" => Some(AccessPattern::TimeSeriesAppend),
466            "mixed" => Some(AccessPattern::Mixed),
467            _ => None,
468        }
469    }
470}
471
472/// Data temperature classification
473#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
474pub enum DataTemperature {
475    /// Frequently accessed, keep in memory
476    Hot,
477    /// Occasionally accessed
478    Warm,
479    /// Rarely accessed, can be on slower storage
480    Cold,
481    /// Archive, acceptable to be slow
482    Frozen,
483}
484
485impl Default for DataTemperature {
486    fn default() -> Self {
487        DataTemperature::Warm
488    }
489}
490
491impl DataTemperature {
492    /// Parse from string
493    pub fn from_str(s: &str) -> Option<Self> {
494        match s.to_lowercase().as_str() {
495            "hot" => Some(DataTemperature::Hot),
496            "warm" => Some(DataTemperature::Warm),
497            "cold" => Some(DataTemperature::Cold),
498            "frozen" | "archive" => Some(DataTemperature::Frozen),
499            _ => None,
500        }
501    }
502}
503
504/// Workload type classification
505#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
506pub enum WorkloadType {
507    /// Online Transaction Processing
508    OLTP,
509    /// Online Analytical Processing
510    OLAP,
511    /// Hybrid Transactional/Analytical
512    HTAP,
513    /// Vector/AI workloads
514    Vector,
515    /// Mixed workload
516    Mixed,
517}
518
519impl Default for WorkloadType {
520    fn default() -> Self {
521        WorkloadType::Mixed
522    }
523}
524
525impl WorkloadType {
526    /// Parse from string
527    pub fn from_str(s: &str) -> Option<Self> {
528        match s.to_lowercase().as_str() {
529            "oltp" => Some(WorkloadType::OLTP),
530            "olap" => Some(WorkloadType::OLAP),
531            "htap" => Some(WorkloadType::HTAP),
532            "vector" | "ai" => Some(WorkloadType::Vector),
533            "mixed" => Some(WorkloadType::Mixed),
534            _ => None,
535        }
536    }
537}
538
539/// Partition key configuration
540#[derive(Debug, Clone)]
541pub struct PartitionKey {
542    /// Column name
543    pub column: String,
544    /// Partition type
545    pub partition_type: PartitionType,
546}
547
548/// Partition type
549#[derive(Debug, Clone, Copy, PartialEq, Eq)]
550pub enum PartitionType {
551    /// Range partitioning (e.g., by date)
552    Range,
553    /// List partitioning
554    List,
555    /// Hash partitioning
556    Hash,
557}
558
559/// Table relationship
560#[derive(Debug, Clone)]
561pub struct Relationship {
562    /// Source table
563    pub from_table: String,
564    /// Source column
565    pub from_column: String,
566    /// Target table
567    pub to_table: String,
568    /// Target column
569    pub to_column: String,
570    /// Relationship type
571    pub relationship_type: RelationshipType,
572}
573
574/// Relationship type
575#[derive(Debug, Clone, Copy, PartialEq, Eq)]
576pub enum RelationshipType {
577    /// One-to-one
578    OneToOne,
579    /// One-to-many
580    OneToMany,
581    /// Many-to-one
582    ManyToOne,
583    /// Many-to-many
584    ManyToMany,
585}
586
587/// Sharding configuration
588#[derive(Debug, Clone, Default)]
589pub struct ShardingConfig {
590    /// Enabled
591    pub enabled: bool,
592    /// Shard count
593    pub shard_count: u32,
594    /// Hash ring for consistent hashing
595    pub hash_ring: Vec<u32>,
596    /// Table to shard key mapping
597    pub table_shard_keys: HashMap<String, String>,
598}
599
600impl ShardingConfig {
601    /// Create a new sharding configuration
602    pub fn new(shard_count: u32) -> Self {
603        let mut config = Self {
604            enabled: true,
605            shard_count,
606            hash_ring: Vec::new(),
607            table_shard_keys: HashMap::new(),
608        };
609        config.initialize_hash_ring();
610        config
611    }
612
613    /// Initialize the hash ring
614    fn initialize_hash_ring(&mut self) {
615        self.hash_ring = (0..self.shard_count).collect();
616    }
617
618    /// Get shard for a value
619    pub fn get_shard(&self, _key: &str, value: &str) -> Option<u32> {
620        if !self.enabled || self.shard_count == 0 {
621            return None;
622        }
623
624        // Simple consistent hashing
625        let hash = self.hash_value(value);
626        Some(hash % self.shard_count)
627    }
628
629    /// Hash a value
630    fn hash_value(&self, value: &str) -> u32 {
631        // Simple FNV-1a hash
632        let mut hash: u32 = 2166136261;
633        for byte in value.bytes() {
634            hash ^= byte as u32;
635            hash = hash.wrapping_mul(16777619);
636        }
637        hash
638    }
639
640    /// Register a table's shard key
641    pub fn register_table_shard_key(&mut self, table: &str, shard_key: &str) {
642        self.table_shard_keys.insert(table.to_string(), shard_key.to_string());
643    }
644}
645
646/// Node capabilities
647#[derive(Debug, Clone, Default)]
648pub struct NodeCapabilities {
649    /// Supports vector search
650    pub vector_search: bool,
651    /// Has GPU acceleration
652    pub gpu_acceleration: bool,
653    /// Has columnar storage engine
654    pub columnar_storage: bool,
655    /// Has in-memory storage
656    pub in_memory: bool,
657    /// Has content-addressed storage
658    pub content_addressed: bool,
659    /// Maximum concurrent queries
660    pub max_concurrent_queries: u32,
661    /// Memory limit in bytes
662    pub memory_limit: u64,
663}
664
665impl NodeCapabilities {
666    /// Create with vector support
667    pub fn vector_node() -> Self {
668        Self {
669            vector_search: true,
670            gpu_acceleration: true,
671            ..Default::default()
672        }
673    }
674
675    /// Create with analytics support
676    pub fn analytics_node() -> Self {
677        Self {
678            columnar_storage: true,
679            in_memory: false,
680            ..Default::default()
681        }
682    }
683
684    /// Create with in-memory support
685    pub fn hot_node() -> Self {
686        Self {
687            in_memory: true,
688            ..Default::default()
689        }
690    }
691
692    /// Check if node has required capabilities
693    pub fn satisfies(&self, required: &NodeCapabilities) -> bool {
694        (!required.vector_search || self.vector_search)
695            && (!required.gpu_acceleration || self.gpu_acceleration)
696            && (!required.columnar_storage || self.columnar_storage)
697            && (!required.in_memory || self.in_memory)
698            && (!required.content_addressed || self.content_addressed)
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    #[test]
707    fn test_schema_registry() {
708        let registry = SchemaRegistry::new();
709
710        let users = TableSchema::new("users")
711            .with_temperature(DataTemperature::Hot)
712            .with_workload(WorkloadType::OLTP)
713            .with_access_pattern(AccessPattern::PointLookup)
714            .with_column(ColumnSchema::new("id", "integer").as_primary_key())
715            .with_column(ColumnSchema::new("name", "varchar"));
716
717        registry.register_table(users);
718
719        let result = registry.get_table("users");
720        assert!(result.is_some());
721        let table = result.expect("should exist");
722        assert_eq!(table.name, "users");
723        assert_eq!(table.temperature, DataTemperature::Hot);
724    }
725
726    #[test]
727    fn test_update_classification() {
728        let registry = SchemaRegistry::new();
729
730        registry.register_table(TableSchema::new("events")
731            .with_temperature(DataTemperature::Warm)
732            .with_workload(WorkloadType::Mixed));
733
734        registry.update_classification("events", DataTemperature::Cold, WorkloadType::OLAP);
735
736        let table = registry.get_table("events").expect("should exist");
737        assert_eq!(table.temperature, DataTemperature::Cold);
738        assert_eq!(table.workload, WorkloadType::OLAP);
739    }
740
741    #[test]
742    fn test_sharding_config() {
743        let mut config = ShardingConfig::new(4);
744        config.register_table_shard_key("orders", "customer_id");
745
746        let shard1 = config.get_shard("customer_id", "cust_123");
747        let shard2 = config.get_shard("customer_id", "cust_456");
748
749        assert!(shard1.is_some());
750        assert!(shard2.is_some());
751        // Different values may map to same or different shards
752    }
753
754    #[test]
755    fn test_node_capabilities() {
756        let required = NodeCapabilities {
757            vector_search: true,
758            gpu_acceleration: false,
759            ..Default::default()
760        };
761
762        let vector_node = NodeCapabilities::vector_node();
763        let analytics_node = NodeCapabilities::analytics_node();
764
765        assert!(vector_node.satisfies(&required));
766        assert!(!analytics_node.satisfies(&required));
767    }
768
769    #[test]
770    fn test_access_pattern_from_str() {
771        assert_eq!(AccessPattern::from_str("point_lookup"), Some(AccessPattern::PointLookup));
772        assert_eq!(AccessPattern::from_str("vector"), Some(AccessPattern::VectorSearch));
773        assert_eq!(AccessPattern::from_str("invalid"), None);
774    }
775
776    #[test]
777    fn test_data_temperature_from_str() {
778        assert_eq!(DataTemperature::from_str("hot"), Some(DataTemperature::Hot));
779        assert_eq!(DataTemperature::from_str("cold"), Some(DataTemperature::Cold));
780        assert_eq!(DataTemperature::from_str("archive"), Some(DataTemperature::Frozen));
781    }
782
783    #[test]
784    fn test_workload_type_from_str() {
785        assert_eq!(WorkloadType::from_str("oltp"), Some(WorkloadType::OLTP));
786        assert_eq!(WorkloadType::from_str("vector"), Some(WorkloadType::Vector));
787        assert_eq!(WorkloadType::from_str("ai"), Some(WorkloadType::Vector));
788    }
789
790    #[test]
791    fn test_index_schema() {
792        let index = IndexSchema::new("idx_users_email", "users")
793            .with_column("email")
794            .with_type(IndexType::BTree)
795            .unique();
796
797        assert_eq!(index.name, "idx_users_email");
798        assert!(index.is_unique);
799        assert_eq!(index.columns, vec!["email"]);
800    }
801
802    #[test]
803    fn test_tables_by_workload() {
804        let registry = SchemaRegistry::new();
805
806        registry.register_table(TableSchema::new("users").with_workload(WorkloadType::OLTP));
807        registry.register_table(TableSchema::new("events").with_workload(WorkloadType::OLAP));
808        registry.register_table(TableSchema::new("orders").with_workload(WorkloadType::OLTP));
809
810        let oltp_tables = registry.tables_by_workload(WorkloadType::OLTP);
811        assert_eq!(oltp_tables.len(), 2);
812    }
813}