1use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8
9#[derive(Debug)]
11pub struct SchemaRegistry {
12 tables: DashMap<String, TableSchema>,
14 indexes: DashMap<String, IndexSchema>,
16 relationships: RwLock<Vec<Relationship>>,
18 sharding: RwLock<ShardingConfig>,
20 node_capabilities: DashMap<String, NodeCapabilities>,
22 branch_locations: DashMap<String, Vec<String>>,
24}
25
26impl SchemaRegistry {
27 pub fn new() -> Self {
29 Self {
30 tables: DashMap::new(),
31 indexes: DashMap::new(),
32 relationships: RwLock::new(Vec::new()),
33 sharding: RwLock::new(ShardingConfig::default()),
34 node_capabilities: DashMap::new(),
35 branch_locations: DashMap::new(),
36 }
37 }
38
39 pub fn register_table(&self, schema: TableSchema) {
41 self.tables.insert(schema.name.clone(), schema);
42 }
43
44 pub fn get_table(&self, name: &str) -> Option<TableSchema> {
46 self.tables.get(name).map(|r| r.clone())
47 }
48
49 pub fn update_classification(
51 &self,
52 table: &str,
53 temperature: DataTemperature,
54 workload: WorkloadType,
55 ) {
56 if let Some(mut entry) = self.tables.get_mut(table) {
57 entry.temperature = temperature;
58 entry.workload = workload;
59 }
60 }
61
62 pub fn register_index(&self, schema: IndexSchema) {
64 self.indexes.insert(schema.name.clone(), schema);
65 }
66
67 pub fn get_index(&self, name: &str) -> Option<IndexSchema> {
69 self.indexes.get(name).map(|r| r.clone())
70 }
71
72 pub fn get_vector_index(&self, table: &str) -> Option<IndexSchema> {
74 self.indexes
75 .iter()
76 .find(|entry| entry.table == table && entry.index_type == IndexType::Vector)
77 .map(|entry| entry.clone())
78 }
79
80 pub fn add_relationship(&self, relationship: Relationship) {
82 let mut rels = self.relationships.write();
83 rels.push(relationship);
84 }
85
86 pub fn get_relationships(&self, table: &str) -> Vec<Relationship> {
88 let rels = self.relationships.read();
89 rels.iter()
90 .filter(|r| r.from_table == table || r.to_table == table)
91 .cloned()
92 .collect()
93 }
94
95 pub fn set_sharding(&self, config: ShardingConfig) {
97 let mut sharding = self.sharding.write();
98 *sharding = config;
99 }
100
101 pub fn get_shard(&self, key: &str, value: &str) -> Option<u32> {
103 let sharding = self.sharding.read();
104 sharding.get_shard(key, value)
105 }
106
107 pub fn register_node_capabilities(&self, node_id: &str, capabilities: NodeCapabilities) {
109 self.node_capabilities
110 .insert(node_id.to_string(), capabilities);
111 }
112
113 pub fn get_node_capabilities(&self, node_id: &str) -> Option<NodeCapabilities> {
115 self.node_capabilities.get(node_id).map(|r| r.clone())
116 }
117
118 pub fn register_branch_location(&self, branch: &str, node_ids: Vec<String>) {
120 self.branch_locations.insert(branch.to_string(), node_ids);
121 }
122
123 pub fn get_branch_locations(&self, branch: &str) -> Vec<String> {
125 self.branch_locations
126 .get(branch)
127 .map(|r| r.clone())
128 .unwrap_or_default()
129 }
130
131 pub fn all_tables(&self) -> Vec<TableSchema> {
133 self.tables.iter().map(|r| r.clone()).collect()
134 }
135
136 pub fn list_tables(&self) -> Vec<TableSchema> {
138 self.all_tables()
139 }
140
141 pub fn table_count(&self) -> usize {
143 self.tables.len()
144 }
145
146 pub fn remove_table(&self, name: &str) {
148 self.tables.remove(name);
149 }
150
151 pub fn tables_by_workload(&self, workload: WorkloadType) -> Vec<TableSchema> {
153 self.tables
154 .iter()
155 .filter(|r| r.workload == workload)
156 .map(|r| r.clone())
157 .collect()
158 }
159
160 pub fn tables_by_temperature(&self, temperature: DataTemperature) -> Vec<TableSchema> {
162 self.tables
163 .iter()
164 .filter(|r| r.temperature == temperature)
165 .map(|r| r.clone())
166 .collect()
167 }
168
169 pub fn is_columnar_column(&self, table: &str, column: &str) -> bool {
171 self.tables
172 .get(table)
173 .map(|t| {
174 t.columns
175 .iter()
176 .any(|c| c.name == column && c.storage_type == StorageType::Columnar)
177 })
178 .unwrap_or(false)
179 }
180
181 pub fn is_content_addressed(&self, table: &str, column: &str) -> bool {
183 self.tables
184 .get(table)
185 .map(|t| {
186 t.columns
187 .iter()
188 .any(|c| c.name == column && c.storage_type == StorageType::ContentAddressed)
189 })
190 .unwrap_or(false)
191 }
192}
193
194impl Default for SchemaRegistry {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct TableSchema {
203 pub name: String,
205 pub columns: Vec<ColumnSchema>,
207 pub access_pattern: AccessPattern,
209 pub temperature: DataTemperature,
211 pub workload: WorkloadType,
213 pub primary_key: Vec<String>,
215 pub shard_key: Option<String>,
217 pub partition_key: Option<PartitionKey>,
219 pub preferred_nodes: Vec<String>,
221 pub estimated_rows: u64,
223 pub avg_row_size: usize,
225}
226
227impl TableSchema {
228 pub fn new(name: impl Into<String>) -> Self {
230 Self {
231 name: name.into(),
232 columns: Vec::new(),
233 access_pattern: AccessPattern::Mixed,
234 temperature: DataTemperature::Warm,
235 workload: WorkloadType::Mixed,
236 primary_key: Vec::new(),
237 shard_key: None,
238 partition_key: None,
239 preferred_nodes: Vec::new(),
240 estimated_rows: 0,
241 avg_row_size: 0,
242 }
243 }
244
245 pub fn with_column(mut self, column: ColumnSchema) -> Self {
247 self.columns.push(column);
248 self
249 }
250
251 pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
253 self.access_pattern = pattern;
254 self
255 }
256
257 pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
259 self.temperature = temp;
260 self
261 }
262
263 pub fn with_workload(mut self, workload: WorkloadType) -> Self {
265 self.workload = workload;
266 self
267 }
268
269 pub fn with_primary_key(mut self, columns: Vec<String>) -> Self {
271 self.primary_key = columns;
272 self
273 }
274
275 pub fn with_shard_key(mut self, key: impl Into<String>) -> Self {
277 self.shard_key = Some(key.into());
278 self
279 }
280
281 pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
283 self.preferred_nodes.push(node.into());
284 self
285 }
286
287 pub fn with_estimated_rows(mut self, rows: u64) -> Self {
289 self.estimated_rows = rows;
290 self
291 }
292}
293
294#[derive(Debug, Clone)]
296pub struct ColumnSchema {
297 pub name: String,
299 pub data_type: String,
301 pub nullable: bool,
303 pub storage_type: StorageType,
305 pub is_primary_key: bool,
307 pub is_indexed: bool,
309}
310
311impl ColumnSchema {
312 pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
314 Self {
315 name: name.into(),
316 data_type: data_type.into(),
317 nullable: true,
318 storage_type: StorageType::Row,
319 is_primary_key: false,
320 is_indexed: false,
321 }
322 }
323
324 pub fn nullable(mut self, nullable: bool) -> Self {
326 self.nullable = nullable;
327 self
328 }
329
330 pub fn with_storage(mut self, storage: StorageType) -> Self {
332 self.storage_type = storage;
333 self
334 }
335
336 pub fn as_primary_key(mut self) -> Self {
338 self.is_primary_key = true;
339 self.nullable = false;
340 self
341 }
342
343 pub fn indexed(mut self) -> Self {
345 self.is_indexed = true;
346 self
347 }
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
352pub enum StorageType {
353 #[default]
355 Row,
356 Columnar,
358 ContentAddressed,
360 Vector,
362}
363
364#[derive(Debug, Clone)]
366pub struct IndexSchema {
367 pub name: String,
369 pub table: String,
371 pub columns: Vec<String>,
373 pub index_type: IndexType,
375 pub is_unique: bool,
377}
378
379impl IndexSchema {
380 pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
382 Self {
383 name: name.into(),
384 table: table.into(),
385 columns: Vec::new(),
386 index_type: IndexType::BTree,
387 is_unique: false,
388 }
389 }
390
391 pub fn with_column(mut self, column: impl Into<String>) -> Self {
393 self.columns.push(column.into());
394 self
395 }
396
397 pub fn with_type(mut self, index_type: IndexType) -> Self {
399 self.index_type = index_type;
400 self
401 }
402
403 pub fn unique(mut self) -> Self {
405 self.is_unique = true;
406 self
407 }
408}
409
410#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
412pub enum IndexType {
413 #[default]
415 BTree,
416 Hash,
418 GiST,
420 GIN,
422 Vector,
424}
425
426#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
428pub enum AccessPattern {
429 PointLookup,
431 RangeScan,
433 FullScan,
435 VectorSearch,
437 TimeSeriesAppend,
439 #[default]
441 Mixed,
442}
443
444impl AccessPattern {
445 #[allow(clippy::should_implement_trait)]
447 pub fn from_str(s: &str) -> Option<Self> {
448 match s.to_lowercase().as_str() {
449 "point_lookup" | "pointlookup" => Some(AccessPattern::PointLookup),
450 "range_scan" | "rangescan" => Some(AccessPattern::RangeScan),
451 "full_scan" | "fullscan" => Some(AccessPattern::FullScan),
452 "vector_search" | "vectorsearch" | "vector" => Some(AccessPattern::VectorSearch),
453 "time_series" | "timeseries" | "append" => Some(AccessPattern::TimeSeriesAppend),
454 "mixed" => Some(AccessPattern::Mixed),
455 _ => None,
456 }
457 }
458}
459
460#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
462pub enum DataTemperature {
463 Hot,
465 #[default]
467 Warm,
468 Cold,
470 Frozen,
472}
473
474impl DataTemperature {
475 #[allow(clippy::should_implement_trait)]
477 pub fn from_str(s: &str) -> Option<Self> {
478 match s.to_lowercase().as_str() {
479 "hot" => Some(DataTemperature::Hot),
480 "warm" => Some(DataTemperature::Warm),
481 "cold" => Some(DataTemperature::Cold),
482 "frozen" | "archive" => Some(DataTemperature::Frozen),
483 _ => None,
484 }
485 }
486}
487
488#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
490pub enum WorkloadType {
491 OLTP,
493 OLAP,
495 HTAP,
497 Vector,
499 #[default]
501 Mixed,
502}
503
504impl WorkloadType {
505 #[allow(clippy::should_implement_trait)]
507 pub fn from_str(s: &str) -> Option<Self> {
508 match s.to_lowercase().as_str() {
509 "oltp" => Some(WorkloadType::OLTP),
510 "olap" => Some(WorkloadType::OLAP),
511 "htap" => Some(WorkloadType::HTAP),
512 "vector" | "ai" => Some(WorkloadType::Vector),
513 "mixed" => Some(WorkloadType::Mixed),
514 _ => None,
515 }
516 }
517}
518
519#[derive(Debug, Clone)]
521pub struct PartitionKey {
522 pub column: String,
524 pub partition_type: PartitionType,
526}
527
528#[derive(Debug, Clone, Copy, PartialEq, Eq)]
530pub enum PartitionType {
531 Range,
533 List,
535 Hash,
537}
538
539#[derive(Debug, Clone)]
541pub struct Relationship {
542 pub from_table: String,
544 pub from_column: String,
546 pub to_table: String,
548 pub to_column: String,
550 pub relationship_type: RelationshipType,
552}
553
554#[derive(Debug, Clone, Copy, PartialEq, Eq)]
556pub enum RelationshipType {
557 OneToOne,
559 OneToMany,
561 ManyToOne,
563 ManyToMany,
565}
566
567#[derive(Debug, Clone, Default)]
569pub struct ShardingConfig {
570 pub enabled: bool,
572 pub shard_count: u32,
574 pub hash_ring: Vec<u32>,
576 pub table_shard_keys: HashMap<String, String>,
578}
579
580impl ShardingConfig {
581 pub fn new(shard_count: u32) -> Self {
583 let mut config = Self {
584 enabled: true,
585 shard_count,
586 hash_ring: Vec::new(),
587 table_shard_keys: HashMap::new(),
588 };
589 config.initialize_hash_ring();
590 config
591 }
592
593 fn initialize_hash_ring(&mut self) {
595 self.hash_ring = (0..self.shard_count).collect();
596 }
597
598 pub fn get_shard(&self, _key: &str, value: &str) -> Option<u32> {
600 if !self.enabled || self.shard_count == 0 {
601 return None;
602 }
603
604 let hash = self.hash_value(value);
606 Some(hash % self.shard_count)
607 }
608
609 fn hash_value(&self, value: &str) -> u32 {
611 let mut hash: u32 = 2166136261;
613 for byte in value.bytes() {
614 hash ^= byte as u32;
615 hash = hash.wrapping_mul(16777619);
616 }
617 hash
618 }
619
620 pub fn register_table_shard_key(&mut self, table: &str, shard_key: &str) {
622 self.table_shard_keys
623 .insert(table.to_string(), shard_key.to_string());
624 }
625}
626
627#[derive(Debug, Clone, Default)]
629pub struct NodeCapabilities {
630 pub vector_search: bool,
632 pub gpu_acceleration: bool,
634 pub columnar_storage: bool,
636 pub in_memory: bool,
638 pub content_addressed: bool,
640 pub max_concurrent_queries: u32,
642 pub memory_limit: u64,
644}
645
646impl NodeCapabilities {
647 pub fn vector_node() -> Self {
649 Self {
650 vector_search: true,
651 gpu_acceleration: true,
652 ..Default::default()
653 }
654 }
655
656 pub fn analytics_node() -> Self {
658 Self {
659 columnar_storage: true,
660 in_memory: false,
661 ..Default::default()
662 }
663 }
664
665 pub fn hot_node() -> Self {
667 Self {
668 in_memory: true,
669 ..Default::default()
670 }
671 }
672
673 pub fn satisfies(&self, required: &NodeCapabilities) -> bool {
675 (!required.vector_search || self.vector_search)
676 && (!required.gpu_acceleration || self.gpu_acceleration)
677 && (!required.columnar_storage || self.columnar_storage)
678 && (!required.in_memory || self.in_memory)
679 && (!required.content_addressed || self.content_addressed)
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use super::*;
686
687 #[test]
688 fn test_schema_registry() {
689 let registry = SchemaRegistry::new();
690
691 let users = TableSchema::new("users")
692 .with_temperature(DataTemperature::Hot)
693 .with_workload(WorkloadType::OLTP)
694 .with_access_pattern(AccessPattern::PointLookup)
695 .with_column(ColumnSchema::new("id", "integer").as_primary_key())
696 .with_column(ColumnSchema::new("name", "varchar"));
697
698 registry.register_table(users);
699
700 let result = registry.get_table("users");
701 assert!(result.is_some());
702 let table = result.expect("should exist");
703 assert_eq!(table.name, "users");
704 assert_eq!(table.temperature, DataTemperature::Hot);
705 }
706
707 #[test]
708 fn test_update_classification() {
709 let registry = SchemaRegistry::new();
710
711 registry.register_table(
712 TableSchema::new("events")
713 .with_temperature(DataTemperature::Warm)
714 .with_workload(WorkloadType::Mixed),
715 );
716
717 registry.update_classification("events", DataTemperature::Cold, WorkloadType::OLAP);
718
719 let table = registry.get_table("events").expect("should exist");
720 assert_eq!(table.temperature, DataTemperature::Cold);
721 assert_eq!(table.workload, WorkloadType::OLAP);
722 }
723
724 #[test]
725 fn test_sharding_config() {
726 let mut config = ShardingConfig::new(4);
727 config.register_table_shard_key("orders", "customer_id");
728
729 let shard1 = config.get_shard("customer_id", "cust_123");
730 let shard2 = config.get_shard("customer_id", "cust_456");
731
732 assert!(shard1.is_some());
733 assert!(shard2.is_some());
734 }
736
737 #[test]
738 fn test_node_capabilities() {
739 let required = NodeCapabilities {
740 vector_search: true,
741 gpu_acceleration: false,
742 ..Default::default()
743 };
744
745 let vector_node = NodeCapabilities::vector_node();
746 let analytics_node = NodeCapabilities::analytics_node();
747
748 assert!(vector_node.satisfies(&required));
749 assert!(!analytics_node.satisfies(&required));
750 }
751
752 #[test]
753 fn test_access_pattern_from_str() {
754 assert_eq!(
755 AccessPattern::from_str("point_lookup"),
756 Some(AccessPattern::PointLookup)
757 );
758 assert_eq!(
759 AccessPattern::from_str("vector"),
760 Some(AccessPattern::VectorSearch)
761 );
762 assert_eq!(AccessPattern::from_str("invalid"), None);
763 }
764
765 #[test]
766 fn test_data_temperature_from_str() {
767 assert_eq!(DataTemperature::from_str("hot"), Some(DataTemperature::Hot));
768 assert_eq!(
769 DataTemperature::from_str("cold"),
770 Some(DataTemperature::Cold)
771 );
772 assert_eq!(
773 DataTemperature::from_str("archive"),
774 Some(DataTemperature::Frozen)
775 );
776 }
777
778 #[test]
779 fn test_workload_type_from_str() {
780 assert_eq!(WorkloadType::from_str("oltp"), Some(WorkloadType::OLTP));
781 assert_eq!(WorkloadType::from_str("vector"), Some(WorkloadType::Vector));
782 assert_eq!(WorkloadType::from_str("ai"), Some(WorkloadType::Vector));
783 }
784
785 #[test]
786 fn test_index_schema() {
787 let index = IndexSchema::new("idx_users_email", "users")
788 .with_column("email")
789 .with_type(IndexType::BTree)
790 .unique();
791
792 assert_eq!(index.name, "idx_users_email");
793 assert!(index.is_unique);
794 assert_eq!(index.columns, vec!["email"]);
795 }
796
797 #[test]
798 fn test_tables_by_workload() {
799 let registry = SchemaRegistry::new();
800
801 registry.register_table(TableSchema::new("users").with_workload(WorkloadType::OLTP));
802 registry.register_table(TableSchema::new("events").with_workload(WorkloadType::OLAP));
803 registry.register_table(TableSchema::new("orders").with_workload(WorkloadType::OLTP));
804
805 let oltp_tables = registry.tables_by_workload(WorkloadType::OLTP);
806 assert_eq!(oltp_tables.len(), 2);
807 }
808}