Skip to main content

heliosdb_proxy/schema_routing/
registry.rs

1//! Schema Registry
2//!
3//! Manages metadata about tables, indexes, and relationships for routing decisions.
4
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8
9/// Schema registry for routing decisions
10#[derive(Debug)]
11pub struct SchemaRegistry {
12    /// Table metadata
13    tables: DashMap<String, TableSchema>,
14    /// Index metadata
15    indexes: DashMap<String, IndexSchema>,
16    /// Relationships between tables
17    relationships: RwLock<Vec<Relationship>>,
18    /// Sharding configuration
19    sharding: RwLock<ShardingConfig>,
20    /// Node capabilities
21    node_capabilities: DashMap<String, NodeCapabilities>,
22    /// Branch locations (branch -> nodes)
23    branch_locations: DashMap<String, Vec<String>>,
24}
25
26impl SchemaRegistry {
27    /// Create a new schema registry
28    pub fn new() -> Self {
29        Self {
30            tables: DashMap::new(),
31            indexes: DashMap::new(),
32            relationships: RwLock::new(Vec::new()),
33            sharding: RwLock::new(ShardingConfig::default()),
34            node_capabilities: DashMap::new(),
35            branch_locations: DashMap::new(),
36        }
37    }
38
39    /// Register a table schema
40    pub fn register_table(&self, schema: TableSchema) {
41        self.tables.insert(schema.name.clone(), schema);
42    }
43
44    /// Get a table schema
45    pub fn get_table(&self, name: &str) -> Option<TableSchema> {
46        self.tables.get(name).map(|r| r.clone())
47    }
48
49    /// Update table classification
50    pub fn update_classification(
51        &self,
52        table: &str,
53        temperature: DataTemperature,
54        workload: WorkloadType,
55    ) {
56        if let Some(mut entry) = self.tables.get_mut(table) {
57            entry.temperature = temperature;
58            entry.workload = workload;
59        }
60    }
61
62    /// Register an index schema
63    pub fn register_index(&self, schema: IndexSchema) {
64        self.indexes.insert(schema.name.clone(), schema);
65    }
66
67    /// Get an index schema
68    pub fn get_index(&self, name: &str) -> Option<IndexSchema> {
69        self.indexes.get(name).map(|r| r.clone())
70    }
71
72    /// Get vector index for a table
73    pub fn get_vector_index(&self, table: &str) -> Option<IndexSchema> {
74        self.indexes
75            .iter()
76            .find(|entry| entry.table == table && entry.index_type == IndexType::Vector)
77            .map(|entry| entry.clone())
78    }
79
80    /// Add a relationship
81    pub fn add_relationship(&self, relationship: Relationship) {
82        let mut rels = self.relationships.write();
83        rels.push(relationship);
84    }
85
86    /// Get relationships for a table
87    pub fn get_relationships(&self, table: &str) -> Vec<Relationship> {
88        let rels = self.relationships.read();
89        rels.iter()
90            .filter(|r| r.from_table == table || r.to_table == table)
91            .cloned()
92            .collect()
93    }
94
95    /// Set sharding configuration
96    pub fn set_sharding(&self, config: ShardingConfig) {
97        let mut sharding = self.sharding.write();
98        *sharding = config;
99    }
100
101    /// Get shard for a value
102    pub fn get_shard(&self, key: &str, value: &str) -> Option<u32> {
103        let sharding = self.sharding.read();
104        sharding.get_shard(key, value)
105    }
106
107    /// Register node capabilities
108    pub fn register_node_capabilities(&self, node_id: &str, capabilities: NodeCapabilities) {
109        self.node_capabilities
110            .insert(node_id.to_string(), capabilities);
111    }
112
113    /// Get node capabilities
114    pub fn get_node_capabilities(&self, node_id: &str) -> Option<NodeCapabilities> {
115        self.node_capabilities.get(node_id).map(|r| r.clone())
116    }
117
118    /// Register branch location
119    pub fn register_branch_location(&self, branch: &str, node_ids: Vec<String>) {
120        self.branch_locations.insert(branch.to_string(), node_ids);
121    }
122
123    /// Get nodes that have a branch
124    pub fn get_branch_locations(&self, branch: &str) -> Vec<String> {
125        self.branch_locations
126            .get(branch)
127            .map(|r| r.clone())
128            .unwrap_or_default()
129    }
130
131    /// Get all tables
132    pub fn all_tables(&self) -> Vec<TableSchema> {
133        self.tables.iter().map(|r| r.clone()).collect()
134    }
135
136    /// List all tables (alias for all_tables)
137    pub fn list_tables(&self) -> Vec<TableSchema> {
138        self.all_tables()
139    }
140
141    /// Get table count
142    pub fn table_count(&self) -> usize {
143        self.tables.len()
144    }
145
146    /// Remove a table
147    pub fn remove_table(&self, name: &str) {
148        self.tables.remove(name);
149    }
150
151    /// Get tables by workload
152    pub fn tables_by_workload(&self, workload: WorkloadType) -> Vec<TableSchema> {
153        self.tables
154            .iter()
155            .filter(|r| r.workload == workload)
156            .map(|r| r.clone())
157            .collect()
158    }
159
160    /// Get tables by temperature
161    pub fn tables_by_temperature(&self, temperature: DataTemperature) -> Vec<TableSchema> {
162        self.tables
163            .iter()
164            .filter(|r| r.temperature == temperature)
165            .map(|r| r.clone())
166            .collect()
167    }
168
169    /// Check if a column uses columnar storage
170    pub fn is_columnar_column(&self, table: &str, column: &str) -> bool {
171        self.tables
172            .get(table)
173            .map(|t| {
174                t.columns
175                    .iter()
176                    .any(|c| c.name == column && c.storage_type == StorageType::Columnar)
177            })
178            .unwrap_or(false)
179    }
180
181    /// Check if a column is content-addressed
182    pub fn is_content_addressed(&self, table: &str, column: &str) -> bool {
183        self.tables
184            .get(table)
185            .map(|t| {
186                t.columns
187                    .iter()
188                    .any(|c| c.name == column && c.storage_type == StorageType::ContentAddressed)
189            })
190            .unwrap_or(false)
191    }
192}
193
194impl Default for SchemaRegistry {
195    fn default() -> Self {
196        Self::new()
197    }
198}
199
200/// Table schema information
201#[derive(Debug, Clone)]
202pub struct TableSchema {
203    /// Table name
204    pub name: String,
205    /// Columns
206    pub columns: Vec<ColumnSchema>,
207    /// Access pattern classification
208    pub access_pattern: AccessPattern,
209    /// Temperature (HOT/WARM/COLD)
210    pub temperature: DataTemperature,
211    /// Workload type
212    pub workload: WorkloadType,
213    /// Primary key columns
214    pub primary_key: Vec<String>,
215    /// Shard key (if sharded)
216    pub shard_key: Option<String>,
217    /// Partition key (if partitioned)
218    pub partition_key: Option<PartitionKey>,
219    /// Preferred nodes
220    pub preferred_nodes: Vec<String>,
221    /// Estimated row count
222    pub estimated_rows: u64,
223    /// Average row size in bytes
224    pub avg_row_size: usize,
225}
226
227impl TableSchema {
228    /// Create a new table schema
229    pub fn new(name: impl Into<String>) -> Self {
230        Self {
231            name: name.into(),
232            columns: Vec::new(),
233            access_pattern: AccessPattern::Mixed,
234            temperature: DataTemperature::Warm,
235            workload: WorkloadType::Mixed,
236            primary_key: Vec::new(),
237            shard_key: None,
238            partition_key: None,
239            preferred_nodes: Vec::new(),
240            estimated_rows: 0,
241            avg_row_size: 0,
242        }
243    }
244
245    /// Add a column
246    pub fn with_column(mut self, column: ColumnSchema) -> Self {
247        self.columns.push(column);
248        self
249    }
250
251    /// Set access pattern
252    pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
253        self.access_pattern = pattern;
254        self
255    }
256
257    /// Set temperature
258    pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
259        self.temperature = temp;
260        self
261    }
262
263    /// Set workload
264    pub fn with_workload(mut self, workload: WorkloadType) -> Self {
265        self.workload = workload;
266        self
267    }
268
269    /// Set primary key
270    pub fn with_primary_key(mut self, columns: Vec<String>) -> Self {
271        self.primary_key = columns;
272        self
273    }
274
275    /// Set shard key
276    pub fn with_shard_key(mut self, key: impl Into<String>) -> Self {
277        self.shard_key = Some(key.into());
278        self
279    }
280
281    /// Add preferred node
282    pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
283        self.preferred_nodes.push(node.into());
284        self
285    }
286
287    /// Set estimated rows
288    pub fn with_estimated_rows(mut self, rows: u64) -> Self {
289        self.estimated_rows = rows;
290        self
291    }
292}
293
294/// Column schema information
295#[derive(Debug, Clone)]
296pub struct ColumnSchema {
297    /// Column name
298    pub name: String,
299    /// Data type
300    pub data_type: String,
301    /// Is nullable
302    pub nullable: bool,
303    /// Storage type
304    pub storage_type: StorageType,
305    /// Is part of primary key
306    pub is_primary_key: bool,
307    /// Is indexed
308    pub is_indexed: bool,
309}
310
311impl ColumnSchema {
312    /// Create a new column schema
313    pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
314        Self {
315            name: name.into(),
316            data_type: data_type.into(),
317            nullable: true,
318            storage_type: StorageType::Row,
319            is_primary_key: false,
320            is_indexed: false,
321        }
322    }
323
324    /// Set nullable
325    pub fn nullable(mut self, nullable: bool) -> Self {
326        self.nullable = nullable;
327        self
328    }
329
330    /// Set storage type
331    pub fn with_storage(mut self, storage: StorageType) -> Self {
332        self.storage_type = storage;
333        self
334    }
335
336    /// Set as primary key
337    pub fn as_primary_key(mut self) -> Self {
338        self.is_primary_key = true;
339        self.nullable = false;
340        self
341    }
342
343    /// Set as indexed
344    pub fn indexed(mut self) -> Self {
345        self.is_indexed = true;
346        self
347    }
348}
349
350/// Column storage type
351#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
352pub enum StorageType {
353    /// Traditional row storage
354    #[default]
355    Row,
356    /// Columnar storage (for analytics)
357    Columnar,
358    /// Content-addressed storage
359    ContentAddressed,
360    /// Vector storage
361    Vector,
362}
363
364/// Index schema information
365#[derive(Debug, Clone)]
366pub struct IndexSchema {
367    /// Index name
368    pub name: String,
369    /// Table name
370    pub table: String,
371    /// Indexed columns
372    pub columns: Vec<String>,
373    /// Index type
374    pub index_type: IndexType,
375    /// Is unique
376    pub is_unique: bool,
377}
378
379impl IndexSchema {
380    /// Create a new index schema
381    pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
382        Self {
383            name: name.into(),
384            table: table.into(),
385            columns: Vec::new(),
386            index_type: IndexType::BTree,
387            is_unique: false,
388        }
389    }
390
391    /// Add column
392    pub fn with_column(mut self, column: impl Into<String>) -> Self {
393        self.columns.push(column.into());
394        self
395    }
396
397    /// Set index type
398    pub fn with_type(mut self, index_type: IndexType) -> Self {
399        self.index_type = index_type;
400        self
401    }
402
403    /// Set as unique
404    pub fn unique(mut self) -> Self {
405        self.is_unique = true;
406        self
407    }
408}
409
410/// Index type
411#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
412pub enum IndexType {
413    /// B-tree index
414    #[default]
415    BTree,
416    /// Hash index
417    Hash,
418    /// GiST index
419    GiST,
420    /// GIN index
421    GIN,
422    /// Vector/HNSW index
423    Vector,
424}
425
426/// Access pattern classification
427#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
428pub enum AccessPattern {
429    /// Point lookups by primary key
430    PointLookup,
431    /// Range scans
432    RangeScan,
433    /// Full table scans (OLAP)
434    FullScan,
435    /// Vector similarity search
436    VectorSearch,
437    /// Time-series append
438    TimeSeriesAppend,
439    /// Mixed patterns
440    #[default]
441    Mixed,
442}
443
444impl AccessPattern {
445    /// Parse from string
446    #[allow(clippy::should_implement_trait)]
447    pub fn from_str(s: &str) -> Option<Self> {
448        match s.to_lowercase().as_str() {
449            "point_lookup" | "pointlookup" => Some(AccessPattern::PointLookup),
450            "range_scan" | "rangescan" => Some(AccessPattern::RangeScan),
451            "full_scan" | "fullscan" => Some(AccessPattern::FullScan),
452            "vector_search" | "vectorsearch" | "vector" => Some(AccessPattern::VectorSearch),
453            "time_series" | "timeseries" | "append" => Some(AccessPattern::TimeSeriesAppend),
454            "mixed" => Some(AccessPattern::Mixed),
455            _ => None,
456        }
457    }
458}
459
460/// Data temperature classification
461#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
462pub enum DataTemperature {
463    /// Frequently accessed, keep in memory
464    Hot,
465    /// Occasionally accessed
466    #[default]
467    Warm,
468    /// Rarely accessed, can be on slower storage
469    Cold,
470    /// Archive, acceptable to be slow
471    Frozen,
472}
473
474impl DataTemperature {
475    /// Parse from string
476    #[allow(clippy::should_implement_trait)]
477    pub fn from_str(s: &str) -> Option<Self> {
478        match s.to_lowercase().as_str() {
479            "hot" => Some(DataTemperature::Hot),
480            "warm" => Some(DataTemperature::Warm),
481            "cold" => Some(DataTemperature::Cold),
482            "frozen" | "archive" => Some(DataTemperature::Frozen),
483            _ => None,
484        }
485    }
486}
487
488/// Workload type classification
489#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
490pub enum WorkloadType {
491    /// Online Transaction Processing
492    OLTP,
493    /// Online Analytical Processing
494    OLAP,
495    /// Hybrid Transactional/Analytical
496    HTAP,
497    /// Vector/AI workloads
498    Vector,
499    /// Mixed workload
500    #[default]
501    Mixed,
502}
503
504impl WorkloadType {
505    /// Parse from string
506    #[allow(clippy::should_implement_trait)]
507    pub fn from_str(s: &str) -> Option<Self> {
508        match s.to_lowercase().as_str() {
509            "oltp" => Some(WorkloadType::OLTP),
510            "olap" => Some(WorkloadType::OLAP),
511            "htap" => Some(WorkloadType::HTAP),
512            "vector" | "ai" => Some(WorkloadType::Vector),
513            "mixed" => Some(WorkloadType::Mixed),
514            _ => None,
515        }
516    }
517}
518
519/// Partition key configuration
520#[derive(Debug, Clone)]
521pub struct PartitionKey {
522    /// Column name
523    pub column: String,
524    /// Partition type
525    pub partition_type: PartitionType,
526}
527
528/// Partition type
529#[derive(Debug, Clone, Copy, PartialEq, Eq)]
530pub enum PartitionType {
531    /// Range partitioning (e.g., by date)
532    Range,
533    /// List partitioning
534    List,
535    /// Hash partitioning
536    Hash,
537}
538
539/// Table relationship
540#[derive(Debug, Clone)]
541pub struct Relationship {
542    /// Source table
543    pub from_table: String,
544    /// Source column
545    pub from_column: String,
546    /// Target table
547    pub to_table: String,
548    /// Target column
549    pub to_column: String,
550    /// Relationship type
551    pub relationship_type: RelationshipType,
552}
553
554/// Relationship type
555#[derive(Debug, Clone, Copy, PartialEq, Eq)]
556pub enum RelationshipType {
557    /// One-to-one
558    OneToOne,
559    /// One-to-many
560    OneToMany,
561    /// Many-to-one
562    ManyToOne,
563    /// Many-to-many
564    ManyToMany,
565}
566
567/// Sharding configuration
568#[derive(Debug, Clone, Default)]
569pub struct ShardingConfig {
570    /// Enabled
571    pub enabled: bool,
572    /// Shard count
573    pub shard_count: u32,
574    /// Hash ring for consistent hashing
575    pub hash_ring: Vec<u32>,
576    /// Table to shard key mapping
577    pub table_shard_keys: HashMap<String, String>,
578}
579
580impl ShardingConfig {
581    /// Create a new sharding configuration
582    pub fn new(shard_count: u32) -> Self {
583        let mut config = Self {
584            enabled: true,
585            shard_count,
586            hash_ring: Vec::new(),
587            table_shard_keys: HashMap::new(),
588        };
589        config.initialize_hash_ring();
590        config
591    }
592
593    /// Initialize the hash ring
594    fn initialize_hash_ring(&mut self) {
595        self.hash_ring = (0..self.shard_count).collect();
596    }
597
598    /// Get shard for a value
599    pub fn get_shard(&self, _key: &str, value: &str) -> Option<u32> {
600        if !self.enabled || self.shard_count == 0 {
601            return None;
602        }
603
604        // Simple consistent hashing
605        let hash = self.hash_value(value);
606        Some(hash % self.shard_count)
607    }
608
609    /// Hash a value
610    fn hash_value(&self, value: &str) -> u32 {
611        // Simple FNV-1a hash
612        let mut hash: u32 = 2166136261;
613        for byte in value.bytes() {
614            hash ^= byte as u32;
615            hash = hash.wrapping_mul(16777619);
616        }
617        hash
618    }
619
620    /// Register a table's shard key
621    pub fn register_table_shard_key(&mut self, table: &str, shard_key: &str) {
622        self.table_shard_keys
623            .insert(table.to_string(), shard_key.to_string());
624    }
625}
626
627/// Node capabilities
628#[derive(Debug, Clone, Default)]
629pub struct NodeCapabilities {
630    /// Supports vector search
631    pub vector_search: bool,
632    /// Has GPU acceleration
633    pub gpu_acceleration: bool,
634    /// Has columnar storage engine
635    pub columnar_storage: bool,
636    /// Has in-memory storage
637    pub in_memory: bool,
638    /// Has content-addressed storage
639    pub content_addressed: bool,
640    /// Maximum concurrent queries
641    pub max_concurrent_queries: u32,
642    /// Memory limit in bytes
643    pub memory_limit: u64,
644}
645
646impl NodeCapabilities {
647    /// Create with vector support
648    pub fn vector_node() -> Self {
649        Self {
650            vector_search: true,
651            gpu_acceleration: true,
652            ..Default::default()
653        }
654    }
655
656    /// Create with analytics support
657    pub fn analytics_node() -> Self {
658        Self {
659            columnar_storage: true,
660            in_memory: false,
661            ..Default::default()
662        }
663    }
664
665    /// Create with in-memory support
666    pub fn hot_node() -> Self {
667        Self {
668            in_memory: true,
669            ..Default::default()
670        }
671    }
672
673    /// Check if node has required capabilities
674    pub fn satisfies(&self, required: &NodeCapabilities) -> bool {
675        (!required.vector_search || self.vector_search)
676            && (!required.gpu_acceleration || self.gpu_acceleration)
677            && (!required.columnar_storage || self.columnar_storage)
678            && (!required.in_memory || self.in_memory)
679            && (!required.content_addressed || self.content_addressed)
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use super::*;
686
687    #[test]
688    fn test_schema_registry() {
689        let registry = SchemaRegistry::new();
690
691        let users = TableSchema::new("users")
692            .with_temperature(DataTemperature::Hot)
693            .with_workload(WorkloadType::OLTP)
694            .with_access_pattern(AccessPattern::PointLookup)
695            .with_column(ColumnSchema::new("id", "integer").as_primary_key())
696            .with_column(ColumnSchema::new("name", "varchar"));
697
698        registry.register_table(users);
699
700        let result = registry.get_table("users");
701        assert!(result.is_some());
702        let table = result.expect("should exist");
703        assert_eq!(table.name, "users");
704        assert_eq!(table.temperature, DataTemperature::Hot);
705    }
706
707    #[test]
708    fn test_update_classification() {
709        let registry = SchemaRegistry::new();
710
711        registry.register_table(
712            TableSchema::new("events")
713                .with_temperature(DataTemperature::Warm)
714                .with_workload(WorkloadType::Mixed),
715        );
716
717        registry.update_classification("events", DataTemperature::Cold, WorkloadType::OLAP);
718
719        let table = registry.get_table("events").expect("should exist");
720        assert_eq!(table.temperature, DataTemperature::Cold);
721        assert_eq!(table.workload, WorkloadType::OLAP);
722    }
723
724    #[test]
725    fn test_sharding_config() {
726        let mut config = ShardingConfig::new(4);
727        config.register_table_shard_key("orders", "customer_id");
728
729        let shard1 = config.get_shard("customer_id", "cust_123");
730        let shard2 = config.get_shard("customer_id", "cust_456");
731
732        assert!(shard1.is_some());
733        assert!(shard2.is_some());
734        // Different values may map to same or different shards
735    }
736
737    #[test]
738    fn test_node_capabilities() {
739        let required = NodeCapabilities {
740            vector_search: true,
741            gpu_acceleration: false,
742            ..Default::default()
743        };
744
745        let vector_node = NodeCapabilities::vector_node();
746        let analytics_node = NodeCapabilities::analytics_node();
747
748        assert!(vector_node.satisfies(&required));
749        assert!(!analytics_node.satisfies(&required));
750    }
751
752    #[test]
753    fn test_access_pattern_from_str() {
754        assert_eq!(
755            AccessPattern::from_str("point_lookup"),
756            Some(AccessPattern::PointLookup)
757        );
758        assert_eq!(
759            AccessPattern::from_str("vector"),
760            Some(AccessPattern::VectorSearch)
761        );
762        assert_eq!(AccessPattern::from_str("invalid"), None);
763    }
764
765    #[test]
766    fn test_data_temperature_from_str() {
767        assert_eq!(DataTemperature::from_str("hot"), Some(DataTemperature::Hot));
768        assert_eq!(
769            DataTemperature::from_str("cold"),
770            Some(DataTemperature::Cold)
771        );
772        assert_eq!(
773            DataTemperature::from_str("archive"),
774            Some(DataTemperature::Frozen)
775        );
776    }
777
778    #[test]
779    fn test_workload_type_from_str() {
780        assert_eq!(WorkloadType::from_str("oltp"), Some(WorkloadType::OLTP));
781        assert_eq!(WorkloadType::from_str("vector"), Some(WorkloadType::Vector));
782        assert_eq!(WorkloadType::from_str("ai"), Some(WorkloadType::Vector));
783    }
784
785    #[test]
786    fn test_index_schema() {
787        let index = IndexSchema::new("idx_users_email", "users")
788            .with_column("email")
789            .with_type(IndexType::BTree)
790            .unique();
791
792        assert_eq!(index.name, "idx_users_email");
793        assert!(index.is_unique);
794        assert_eq!(index.columns, vec!["email"]);
795    }
796
797    #[test]
798    fn test_tables_by_workload() {
799        let registry = SchemaRegistry::new();
800
801        registry.register_table(TableSchema::new("users").with_workload(WorkloadType::OLTP));
802        registry.register_table(TableSchema::new("events").with_workload(WorkloadType::OLAP));
803        registry.register_table(TableSchema::new("orders").with_workload(WorkloadType::OLTP));
804
805        let oltp_tables = registry.tables_by_workload(WorkloadType::OLTP);
806        assert_eq!(oltp_tables.len(), 2);
807    }
808}