1use std::collections::HashMap;
6use dashmap::DashMap;
7use parking_lot::RwLock;
8
9#[derive(Debug)]
11pub struct SchemaRegistry {
12 tables: DashMap<String, TableSchema>,
14 indexes: DashMap<String, IndexSchema>,
16 relationships: RwLock<Vec<Relationship>>,
18 sharding: RwLock<ShardingConfig>,
20 node_capabilities: DashMap<String, NodeCapabilities>,
22 branch_locations: DashMap<String, Vec<String>>,
24}
25
26impl SchemaRegistry {
27 pub fn new() -> Self {
29 Self {
30 tables: DashMap::new(),
31 indexes: DashMap::new(),
32 relationships: RwLock::new(Vec::new()),
33 sharding: RwLock::new(ShardingConfig::default()),
34 node_capabilities: DashMap::new(),
35 branch_locations: DashMap::new(),
36 }
37 }
38
39 pub fn register_table(&self, schema: TableSchema) {
41 self.tables.insert(schema.name.clone(), schema);
42 }
43
44 pub fn get_table(&self, name: &str) -> Option<TableSchema> {
46 self.tables.get(name).map(|r| r.clone())
47 }
48
49 pub fn update_classification(
51 &self,
52 table: &str,
53 temperature: DataTemperature,
54 workload: WorkloadType,
55 ) {
56 if let Some(mut entry) = self.tables.get_mut(table) {
57 entry.temperature = temperature;
58 entry.workload = workload;
59 }
60 }
61
62 pub fn register_index(&self, schema: IndexSchema) {
64 self.indexes.insert(schema.name.clone(), schema);
65 }
66
67 pub fn get_index(&self, name: &str) -> Option<IndexSchema> {
69 self.indexes.get(name).map(|r| r.clone())
70 }
71
72 pub fn get_vector_index(&self, table: &str) -> Option<IndexSchema> {
74 self.indexes.iter()
75 .find(|entry| entry.table == table && entry.index_type == IndexType::Vector)
76 .map(|entry| entry.clone())
77 }
78
79 pub fn add_relationship(&self, relationship: Relationship) {
81 let mut rels = self.relationships.write();
82 rels.push(relationship);
83 }
84
85 pub fn get_relationships(&self, table: &str) -> Vec<Relationship> {
87 let rels = self.relationships.read();
88 rels.iter()
89 .filter(|r| r.from_table == table || r.to_table == table)
90 .cloned()
91 .collect()
92 }
93
94 pub fn set_sharding(&self, config: ShardingConfig) {
96 let mut sharding = self.sharding.write();
97 *sharding = config;
98 }
99
100 pub fn get_shard(&self, key: &str, value: &str) -> Option<u32> {
102 let sharding = self.sharding.read();
103 sharding.get_shard(key, value)
104 }
105
106 pub fn register_node_capabilities(&self, node_id: &str, capabilities: NodeCapabilities) {
108 self.node_capabilities.insert(node_id.to_string(), capabilities);
109 }
110
111 pub fn get_node_capabilities(&self, node_id: &str) -> Option<NodeCapabilities> {
113 self.node_capabilities.get(node_id).map(|r| r.clone())
114 }
115
116 pub fn register_branch_location(&self, branch: &str, node_ids: Vec<String>) {
118 self.branch_locations.insert(branch.to_string(), node_ids);
119 }
120
121 pub fn get_branch_locations(&self, branch: &str) -> Vec<String> {
123 self.branch_locations
124 .get(branch)
125 .map(|r| r.clone())
126 .unwrap_or_default()
127 }
128
129 pub fn all_tables(&self) -> Vec<TableSchema> {
131 self.tables.iter().map(|r| r.clone()).collect()
132 }
133
134 pub fn list_tables(&self) -> Vec<TableSchema> {
136 self.all_tables()
137 }
138
139 pub fn table_count(&self) -> usize {
141 self.tables.len()
142 }
143
144 pub fn remove_table(&self, name: &str) {
146 self.tables.remove(name);
147 }
148
149 pub fn tables_by_workload(&self, workload: WorkloadType) -> Vec<TableSchema> {
151 self.tables
152 .iter()
153 .filter(|r| r.workload == workload)
154 .map(|r| r.clone())
155 .collect()
156 }
157
158 pub fn tables_by_temperature(&self, temperature: DataTemperature) -> Vec<TableSchema> {
160 self.tables
161 .iter()
162 .filter(|r| r.temperature == temperature)
163 .map(|r| r.clone())
164 .collect()
165 }
166
167 pub fn is_columnar_column(&self, table: &str, column: &str) -> bool {
169 self.tables
170 .get(table)
171 .map(|t| {
172 t.columns
173 .iter()
174 .any(|c| c.name == column && c.storage_type == StorageType::Columnar)
175 })
176 .unwrap_or(false)
177 }
178
179 pub fn is_content_addressed(&self, table: &str, column: &str) -> bool {
181 self.tables
182 .get(table)
183 .map(|t| {
184 t.columns
185 .iter()
186 .any(|c| c.name == column && c.storage_type == StorageType::ContentAddressed)
187 })
188 .unwrap_or(false)
189 }
190}
191
192impl Default for SchemaRegistry {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198#[derive(Debug, Clone)]
200pub struct TableSchema {
201 pub name: String,
203 pub columns: Vec<ColumnSchema>,
205 pub access_pattern: AccessPattern,
207 pub temperature: DataTemperature,
209 pub workload: WorkloadType,
211 pub primary_key: Vec<String>,
213 pub shard_key: Option<String>,
215 pub partition_key: Option<PartitionKey>,
217 pub preferred_nodes: Vec<String>,
219 pub estimated_rows: u64,
221 pub avg_row_size: usize,
223}
224
225impl TableSchema {
226 pub fn new(name: impl Into<String>) -> Self {
228 Self {
229 name: name.into(),
230 columns: Vec::new(),
231 access_pattern: AccessPattern::Mixed,
232 temperature: DataTemperature::Warm,
233 workload: WorkloadType::Mixed,
234 primary_key: Vec::new(),
235 shard_key: None,
236 partition_key: None,
237 preferred_nodes: Vec::new(),
238 estimated_rows: 0,
239 avg_row_size: 0,
240 }
241 }
242
243 pub fn with_column(mut self, column: ColumnSchema) -> Self {
245 self.columns.push(column);
246 self
247 }
248
249 pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
251 self.access_pattern = pattern;
252 self
253 }
254
255 pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
257 self.temperature = temp;
258 self
259 }
260
261 pub fn with_workload(mut self, workload: WorkloadType) -> Self {
263 self.workload = workload;
264 self
265 }
266
267 pub fn with_primary_key(mut self, columns: Vec<String>) -> Self {
269 self.primary_key = columns;
270 self
271 }
272
273 pub fn with_shard_key(mut self, key: impl Into<String>) -> Self {
275 self.shard_key = Some(key.into());
276 self
277 }
278
279 pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
281 self.preferred_nodes.push(node.into());
282 self
283 }
284
285 pub fn with_estimated_rows(mut self, rows: u64) -> Self {
287 self.estimated_rows = rows;
288 self
289 }
290}
291
292#[derive(Debug, Clone)]
294pub struct ColumnSchema {
295 pub name: String,
297 pub data_type: String,
299 pub nullable: bool,
301 pub storage_type: StorageType,
303 pub is_primary_key: bool,
305 pub is_indexed: bool,
307}
308
309impl ColumnSchema {
310 pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
312 Self {
313 name: name.into(),
314 data_type: data_type.into(),
315 nullable: true,
316 storage_type: StorageType::Row,
317 is_primary_key: false,
318 is_indexed: false,
319 }
320 }
321
322 pub fn nullable(mut self, nullable: bool) -> Self {
324 self.nullable = nullable;
325 self
326 }
327
328 pub fn with_storage(mut self, storage: StorageType) -> Self {
330 self.storage_type = storage;
331 self
332 }
333
334 pub fn as_primary_key(mut self) -> Self {
336 self.is_primary_key = true;
337 self.nullable = false;
338 self
339 }
340
341 pub fn indexed(mut self) -> Self {
343 self.is_indexed = true;
344 self
345 }
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
350pub enum StorageType {
351 Row,
353 Columnar,
355 ContentAddressed,
357 Vector,
359}
360
361impl Default for StorageType {
362 fn default() -> Self {
363 StorageType::Row
364 }
365}
366
367#[derive(Debug, Clone)]
369pub struct IndexSchema {
370 pub name: String,
372 pub table: String,
374 pub columns: Vec<String>,
376 pub index_type: IndexType,
378 pub is_unique: bool,
380}
381
382impl IndexSchema {
383 pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
385 Self {
386 name: name.into(),
387 table: table.into(),
388 columns: Vec::new(),
389 index_type: IndexType::BTree,
390 is_unique: false,
391 }
392 }
393
394 pub fn with_column(mut self, column: impl Into<String>) -> Self {
396 self.columns.push(column.into());
397 self
398 }
399
400 pub fn with_type(mut self, index_type: IndexType) -> Self {
402 self.index_type = index_type;
403 self
404 }
405
406 pub fn unique(mut self) -> Self {
408 self.is_unique = true;
409 self
410 }
411}
412
413#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
415pub enum IndexType {
416 BTree,
418 Hash,
420 GiST,
422 GIN,
424 Vector,
426}
427
428impl Default for IndexType {
429 fn default() -> Self {
430 IndexType::BTree
431 }
432}
433
434#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
436pub enum AccessPattern {
437 PointLookup,
439 RangeScan,
441 FullScan,
443 VectorSearch,
445 TimeSeriesAppend,
447 Mixed,
449}
450
451impl Default for AccessPattern {
452 fn default() -> Self {
453 AccessPattern::Mixed
454 }
455}
456
457impl AccessPattern {
458 pub fn from_str(s: &str) -> Option<Self> {
460 match s.to_lowercase().as_str() {
461 "point_lookup" | "pointlookup" => Some(AccessPattern::PointLookup),
462 "range_scan" | "rangescan" => Some(AccessPattern::RangeScan),
463 "full_scan" | "fullscan" => Some(AccessPattern::FullScan),
464 "vector_search" | "vectorsearch" | "vector" => Some(AccessPattern::VectorSearch),
465 "time_series" | "timeseries" | "append" => Some(AccessPattern::TimeSeriesAppend),
466 "mixed" => Some(AccessPattern::Mixed),
467 _ => None,
468 }
469 }
470}
471
472#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
474pub enum DataTemperature {
475 Hot,
477 Warm,
479 Cold,
481 Frozen,
483}
484
485impl Default for DataTemperature {
486 fn default() -> Self {
487 DataTemperature::Warm
488 }
489}
490
491impl DataTemperature {
492 pub fn from_str(s: &str) -> Option<Self> {
494 match s.to_lowercase().as_str() {
495 "hot" => Some(DataTemperature::Hot),
496 "warm" => Some(DataTemperature::Warm),
497 "cold" => Some(DataTemperature::Cold),
498 "frozen" | "archive" => Some(DataTemperature::Frozen),
499 _ => None,
500 }
501 }
502}
503
504#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
506pub enum WorkloadType {
507 OLTP,
509 OLAP,
511 HTAP,
513 Vector,
515 Mixed,
517}
518
519impl Default for WorkloadType {
520 fn default() -> Self {
521 WorkloadType::Mixed
522 }
523}
524
525impl WorkloadType {
526 pub fn from_str(s: &str) -> Option<Self> {
528 match s.to_lowercase().as_str() {
529 "oltp" => Some(WorkloadType::OLTP),
530 "olap" => Some(WorkloadType::OLAP),
531 "htap" => Some(WorkloadType::HTAP),
532 "vector" | "ai" => Some(WorkloadType::Vector),
533 "mixed" => Some(WorkloadType::Mixed),
534 _ => None,
535 }
536 }
537}
538
539#[derive(Debug, Clone)]
541pub struct PartitionKey {
542 pub column: String,
544 pub partition_type: PartitionType,
546}
547
548#[derive(Debug, Clone, Copy, PartialEq, Eq)]
550pub enum PartitionType {
551 Range,
553 List,
555 Hash,
557}
558
559#[derive(Debug, Clone)]
561pub struct Relationship {
562 pub from_table: String,
564 pub from_column: String,
566 pub to_table: String,
568 pub to_column: String,
570 pub relationship_type: RelationshipType,
572}
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
576pub enum RelationshipType {
577 OneToOne,
579 OneToMany,
581 ManyToOne,
583 ManyToMany,
585}
586
587#[derive(Debug, Clone, Default)]
589pub struct ShardingConfig {
590 pub enabled: bool,
592 pub shard_count: u32,
594 pub hash_ring: Vec<u32>,
596 pub table_shard_keys: HashMap<String, String>,
598}
599
600impl ShardingConfig {
601 pub fn new(shard_count: u32) -> Self {
603 let mut config = Self {
604 enabled: true,
605 shard_count,
606 hash_ring: Vec::new(),
607 table_shard_keys: HashMap::new(),
608 };
609 config.initialize_hash_ring();
610 config
611 }
612
613 fn initialize_hash_ring(&mut self) {
615 self.hash_ring = (0..self.shard_count).collect();
616 }
617
618 pub fn get_shard(&self, _key: &str, value: &str) -> Option<u32> {
620 if !self.enabled || self.shard_count == 0 {
621 return None;
622 }
623
624 let hash = self.hash_value(value);
626 Some(hash % self.shard_count)
627 }
628
629 fn hash_value(&self, value: &str) -> u32 {
631 let mut hash: u32 = 2166136261;
633 for byte in value.bytes() {
634 hash ^= byte as u32;
635 hash = hash.wrapping_mul(16777619);
636 }
637 hash
638 }
639
640 pub fn register_table_shard_key(&mut self, table: &str, shard_key: &str) {
642 self.table_shard_keys.insert(table.to_string(), shard_key.to_string());
643 }
644}
645
646#[derive(Debug, Clone, Default)]
648pub struct NodeCapabilities {
649 pub vector_search: bool,
651 pub gpu_acceleration: bool,
653 pub columnar_storage: bool,
655 pub in_memory: bool,
657 pub content_addressed: bool,
659 pub max_concurrent_queries: u32,
661 pub memory_limit: u64,
663}
664
665impl NodeCapabilities {
666 pub fn vector_node() -> Self {
668 Self {
669 vector_search: true,
670 gpu_acceleration: true,
671 ..Default::default()
672 }
673 }
674
675 pub fn analytics_node() -> Self {
677 Self {
678 columnar_storage: true,
679 in_memory: false,
680 ..Default::default()
681 }
682 }
683
684 pub fn hot_node() -> Self {
686 Self {
687 in_memory: true,
688 ..Default::default()
689 }
690 }
691
692 pub fn satisfies(&self, required: &NodeCapabilities) -> bool {
694 (!required.vector_search || self.vector_search)
695 && (!required.gpu_acceleration || self.gpu_acceleration)
696 && (!required.columnar_storage || self.columnar_storage)
697 && (!required.in_memory || self.in_memory)
698 && (!required.content_addressed || self.content_addressed)
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[test]
707 fn test_schema_registry() {
708 let registry = SchemaRegistry::new();
709
710 let users = TableSchema::new("users")
711 .with_temperature(DataTemperature::Hot)
712 .with_workload(WorkloadType::OLTP)
713 .with_access_pattern(AccessPattern::PointLookup)
714 .with_column(ColumnSchema::new("id", "integer").as_primary_key())
715 .with_column(ColumnSchema::new("name", "varchar"));
716
717 registry.register_table(users);
718
719 let result = registry.get_table("users");
720 assert!(result.is_some());
721 let table = result.expect("should exist");
722 assert_eq!(table.name, "users");
723 assert_eq!(table.temperature, DataTemperature::Hot);
724 }
725
726 #[test]
727 fn test_update_classification() {
728 let registry = SchemaRegistry::new();
729
730 registry.register_table(TableSchema::new("events")
731 .with_temperature(DataTemperature::Warm)
732 .with_workload(WorkloadType::Mixed));
733
734 registry.update_classification("events", DataTemperature::Cold, WorkloadType::OLAP);
735
736 let table = registry.get_table("events").expect("should exist");
737 assert_eq!(table.temperature, DataTemperature::Cold);
738 assert_eq!(table.workload, WorkloadType::OLAP);
739 }
740
741 #[test]
742 fn test_sharding_config() {
743 let mut config = ShardingConfig::new(4);
744 config.register_table_shard_key("orders", "customer_id");
745
746 let shard1 = config.get_shard("customer_id", "cust_123");
747 let shard2 = config.get_shard("customer_id", "cust_456");
748
749 assert!(shard1.is_some());
750 assert!(shard2.is_some());
751 }
753
754 #[test]
755 fn test_node_capabilities() {
756 let required = NodeCapabilities {
757 vector_search: true,
758 gpu_acceleration: false,
759 ..Default::default()
760 };
761
762 let vector_node = NodeCapabilities::vector_node();
763 let analytics_node = NodeCapabilities::analytics_node();
764
765 assert!(vector_node.satisfies(&required));
766 assert!(!analytics_node.satisfies(&required));
767 }
768
769 #[test]
770 fn test_access_pattern_from_str() {
771 assert_eq!(AccessPattern::from_str("point_lookup"), Some(AccessPattern::PointLookup));
772 assert_eq!(AccessPattern::from_str("vector"), Some(AccessPattern::VectorSearch));
773 assert_eq!(AccessPattern::from_str("invalid"), None);
774 }
775
776 #[test]
777 fn test_data_temperature_from_str() {
778 assert_eq!(DataTemperature::from_str("hot"), Some(DataTemperature::Hot));
779 assert_eq!(DataTemperature::from_str("cold"), Some(DataTemperature::Cold));
780 assert_eq!(DataTemperature::from_str("archive"), Some(DataTemperature::Frozen));
781 }
782
783 #[test]
784 fn test_workload_type_from_str() {
785 assert_eq!(WorkloadType::from_str("oltp"), Some(WorkloadType::OLTP));
786 assert_eq!(WorkloadType::from_str("vector"), Some(WorkloadType::Vector));
787 assert_eq!(WorkloadType::from_str("ai"), Some(WorkloadType::Vector));
788 }
789
790 #[test]
791 fn test_index_schema() {
792 let index = IndexSchema::new("idx_users_email", "users")
793 .with_column("email")
794 .with_type(IndexType::BTree)
795 .unique();
796
797 assert_eq!(index.name, "idx_users_email");
798 assert!(index.is_unique);
799 assert_eq!(index.columns, vec!["email"]);
800 }
801
802 #[test]
803 fn test_tables_by_workload() {
804 let registry = SchemaRegistry::new();
805
806 registry.register_table(TableSchema::new("users").with_workload(WorkloadType::OLTP));
807 registry.register_table(TableSchema::new("events").with_workload(WorkloadType::OLAP));
808 registry.register_table(TableSchema::new("orders").with_workload(WorkloadType::OLTP));
809
810 let oltp_tables = registry.tables_by_workload(WorkloadType::OLTP);
811 assert_eq!(oltp_tables.len(), 2);
812 }
813}