use std::collections::HashMap;
use dashmap::DashMap;
use parking_lot::RwLock;
#[derive(Debug)]
pub struct SchemaRegistry {
tables: DashMap<String, TableSchema>,
indexes: DashMap<String, IndexSchema>,
relationships: RwLock<Vec<Relationship>>,
sharding: RwLock<ShardingConfig>,
node_capabilities: DashMap<String, NodeCapabilities>,
branch_locations: DashMap<String, Vec<String>>,
}
impl SchemaRegistry {
pub fn new() -> Self {
Self {
tables: DashMap::new(),
indexes: DashMap::new(),
relationships: RwLock::new(Vec::new()),
sharding: RwLock::new(ShardingConfig::default()),
node_capabilities: DashMap::new(),
branch_locations: DashMap::new(),
}
}
pub fn register_table(&self, schema: TableSchema) {
self.tables.insert(schema.name.clone(), schema);
}
pub fn get_table(&self, name: &str) -> Option<TableSchema> {
self.tables.get(name).map(|r| r.clone())
}
pub fn update_classification(
&self,
table: &str,
temperature: DataTemperature,
workload: WorkloadType,
) {
if let Some(mut entry) = self.tables.get_mut(table) {
entry.temperature = temperature;
entry.workload = workload;
}
}
pub fn register_index(&self, schema: IndexSchema) {
self.indexes.insert(schema.name.clone(), schema);
}
pub fn get_index(&self, name: &str) -> Option<IndexSchema> {
self.indexes.get(name).map(|r| r.clone())
}
pub fn get_vector_index(&self, table: &str) -> Option<IndexSchema> {
self.indexes.iter()
.find(|entry| entry.table == table && entry.index_type == IndexType::Vector)
.map(|entry| entry.clone())
}
pub fn add_relationship(&self, relationship: Relationship) {
let mut rels = self.relationships.write();
rels.push(relationship);
}
pub fn get_relationships(&self, table: &str) -> Vec<Relationship> {
let rels = self.relationships.read();
rels.iter()
.filter(|r| r.from_table == table || r.to_table == table)
.cloned()
.collect()
}
pub fn set_sharding(&self, config: ShardingConfig) {
let mut sharding = self.sharding.write();
*sharding = config;
}
pub fn get_shard(&self, key: &str, value: &str) -> Option<u32> {
let sharding = self.sharding.read();
sharding.get_shard(key, value)
}
pub fn register_node_capabilities(&self, node_id: &str, capabilities: NodeCapabilities) {
self.node_capabilities.insert(node_id.to_string(), capabilities);
}
pub fn get_node_capabilities(&self, node_id: &str) -> Option<NodeCapabilities> {
self.node_capabilities.get(node_id).map(|r| r.clone())
}
pub fn register_branch_location(&self, branch: &str, node_ids: Vec<String>) {
self.branch_locations.insert(branch.to_string(), node_ids);
}
pub fn get_branch_locations(&self, branch: &str) -> Vec<String> {
self.branch_locations
.get(branch)
.map(|r| r.clone())
.unwrap_or_default()
}
pub fn all_tables(&self) -> Vec<TableSchema> {
self.tables.iter().map(|r| r.clone()).collect()
}
pub fn list_tables(&self) -> Vec<TableSchema> {
self.all_tables()
}
pub fn table_count(&self) -> usize {
self.tables.len()
}
pub fn remove_table(&self, name: &str) {
self.tables.remove(name);
}
pub fn tables_by_workload(&self, workload: WorkloadType) -> Vec<TableSchema> {
self.tables
.iter()
.filter(|r| r.workload == workload)
.map(|r| r.clone())
.collect()
}
pub fn tables_by_temperature(&self, temperature: DataTemperature) -> Vec<TableSchema> {
self.tables
.iter()
.filter(|r| r.temperature == temperature)
.map(|r| r.clone())
.collect()
}
pub fn is_columnar_column(&self, table: &str, column: &str) -> bool {
self.tables
.get(table)
.map(|t| {
t.columns
.iter()
.any(|c| c.name == column && c.storage_type == StorageType::Columnar)
})
.unwrap_or(false)
}
pub fn is_content_addressed(&self, table: &str, column: &str) -> bool {
self.tables
.get(table)
.map(|t| {
t.columns
.iter()
.any(|c| c.name == column && c.storage_type == StorageType::ContentAddressed)
})
.unwrap_or(false)
}
}
impl Default for SchemaRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnSchema>,
pub access_pattern: AccessPattern,
pub temperature: DataTemperature,
pub workload: WorkloadType,
pub primary_key: Vec<String>,
pub shard_key: Option<String>,
pub partition_key: Option<PartitionKey>,
pub preferred_nodes: Vec<String>,
pub estimated_rows: u64,
pub avg_row_size: usize,
}
impl TableSchema {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
columns: Vec::new(),
access_pattern: AccessPattern::Mixed,
temperature: DataTemperature::Warm,
workload: WorkloadType::Mixed,
primary_key: Vec::new(),
shard_key: None,
partition_key: None,
preferred_nodes: Vec::new(),
estimated_rows: 0,
avg_row_size: 0,
}
}
pub fn with_column(mut self, column: ColumnSchema) -> Self {
self.columns.push(column);
self
}
pub fn with_access_pattern(mut self, pattern: AccessPattern) -> Self {
self.access_pattern = pattern;
self
}
pub fn with_temperature(mut self, temp: DataTemperature) -> Self {
self.temperature = temp;
self
}
pub fn with_workload(mut self, workload: WorkloadType) -> Self {
self.workload = workload;
self
}
pub fn with_primary_key(mut self, columns: Vec<String>) -> Self {
self.primary_key = columns;
self
}
pub fn with_shard_key(mut self, key: impl Into<String>) -> Self {
self.shard_key = Some(key.into());
self
}
pub fn with_preferred_node(mut self, node: impl Into<String>) -> Self {
self.preferred_nodes.push(node.into());
self
}
pub fn with_estimated_rows(mut self, rows: u64) -> Self {
self.estimated_rows = rows;
self
}
}
#[derive(Debug, Clone)]
pub struct ColumnSchema {
pub name: String,
pub data_type: String,
pub nullable: bool,
pub storage_type: StorageType,
pub is_primary_key: bool,
pub is_indexed: bool,
}
impl ColumnSchema {
pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
Self {
name: name.into(),
data_type: data_type.into(),
nullable: true,
storage_type: StorageType::Row,
is_primary_key: false,
is_indexed: false,
}
}
pub fn nullable(mut self, nullable: bool) -> Self {
self.nullable = nullable;
self
}
pub fn with_storage(mut self, storage: StorageType) -> Self {
self.storage_type = storage;
self
}
pub fn as_primary_key(mut self) -> Self {
self.is_primary_key = true;
self.nullable = false;
self
}
pub fn indexed(mut self) -> Self {
self.is_indexed = true;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StorageType {
Row,
Columnar,
ContentAddressed,
Vector,
}
impl Default for StorageType {
fn default() -> Self {
StorageType::Row
}
}
#[derive(Debug, Clone)]
pub struct IndexSchema {
pub name: String,
pub table: String,
pub columns: Vec<String>,
pub index_type: IndexType,
pub is_unique: bool,
}
impl IndexSchema {
pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
Self {
name: name.into(),
table: table.into(),
columns: Vec::new(),
index_type: IndexType::BTree,
is_unique: false,
}
}
pub fn with_column(mut self, column: impl Into<String>) -> Self {
self.columns.push(column.into());
self
}
pub fn with_type(mut self, index_type: IndexType) -> Self {
self.index_type = index_type;
self
}
pub fn unique(mut self) -> Self {
self.is_unique = true;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IndexType {
BTree,
Hash,
GiST,
GIN,
Vector,
}
impl Default for IndexType {
fn default() -> Self {
IndexType::BTree
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AccessPattern {
PointLookup,
RangeScan,
FullScan,
VectorSearch,
TimeSeriesAppend,
Mixed,
}
impl Default for AccessPattern {
fn default() -> Self {
AccessPattern::Mixed
}
}
impl AccessPattern {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"point_lookup" | "pointlookup" => Some(AccessPattern::PointLookup),
"range_scan" | "rangescan" => Some(AccessPattern::RangeScan),
"full_scan" | "fullscan" => Some(AccessPattern::FullScan),
"vector_search" | "vectorsearch" | "vector" => Some(AccessPattern::VectorSearch),
"time_series" | "timeseries" | "append" => Some(AccessPattern::TimeSeriesAppend),
"mixed" => Some(AccessPattern::Mixed),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DataTemperature {
Hot,
Warm,
Cold,
Frozen,
}
impl Default for DataTemperature {
fn default() -> Self {
DataTemperature::Warm
}
}
impl DataTemperature {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"hot" => Some(DataTemperature::Hot),
"warm" => Some(DataTemperature::Warm),
"cold" => Some(DataTemperature::Cold),
"frozen" | "archive" => Some(DataTemperature::Frozen),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WorkloadType {
OLTP,
OLAP,
HTAP,
Vector,
Mixed,
}
impl Default for WorkloadType {
fn default() -> Self {
WorkloadType::Mixed
}
}
impl WorkloadType {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"oltp" => Some(WorkloadType::OLTP),
"olap" => Some(WorkloadType::OLAP),
"htap" => Some(WorkloadType::HTAP),
"vector" | "ai" => Some(WorkloadType::Vector),
"mixed" => Some(WorkloadType::Mixed),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct PartitionKey {
pub column: String,
pub partition_type: PartitionType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionType {
Range,
List,
Hash,
}
#[derive(Debug, Clone)]
pub struct Relationship {
pub from_table: String,
pub from_column: String,
pub to_table: String,
pub to_column: String,
pub relationship_type: RelationshipType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RelationshipType {
OneToOne,
OneToMany,
ManyToOne,
ManyToMany,
}
#[derive(Debug, Clone, Default)]
pub struct ShardingConfig {
pub enabled: bool,
pub shard_count: u32,
pub hash_ring: Vec<u32>,
pub table_shard_keys: HashMap<String, String>,
}
impl ShardingConfig {
pub fn new(shard_count: u32) -> Self {
let mut config = Self {
enabled: true,
shard_count,
hash_ring: Vec::new(),
table_shard_keys: HashMap::new(),
};
config.initialize_hash_ring();
config
}
fn initialize_hash_ring(&mut self) {
self.hash_ring = (0..self.shard_count).collect();
}
pub fn get_shard(&self, _key: &str, value: &str) -> Option<u32> {
if !self.enabled || self.shard_count == 0 {
return None;
}
let hash = self.hash_value(value);
Some(hash % self.shard_count)
}
fn hash_value(&self, value: &str) -> u32 {
let mut hash: u32 = 2166136261;
for byte in value.bytes() {
hash ^= byte as u32;
hash = hash.wrapping_mul(16777619);
}
hash
}
pub fn register_table_shard_key(&mut self, table: &str, shard_key: &str) {
self.table_shard_keys.insert(table.to_string(), shard_key.to_string());
}
}
#[derive(Debug, Clone, Default)]
pub struct NodeCapabilities {
pub vector_search: bool,
pub gpu_acceleration: bool,
pub columnar_storage: bool,
pub in_memory: bool,
pub content_addressed: bool,
pub max_concurrent_queries: u32,
pub memory_limit: u64,
}
impl NodeCapabilities {
pub fn vector_node() -> Self {
Self {
vector_search: true,
gpu_acceleration: true,
..Default::default()
}
}
pub fn analytics_node() -> Self {
Self {
columnar_storage: true,
in_memory: false,
..Default::default()
}
}
pub fn hot_node() -> Self {
Self {
in_memory: true,
..Default::default()
}
}
pub fn satisfies(&self, required: &NodeCapabilities) -> bool {
(!required.vector_search || self.vector_search)
&& (!required.gpu_acceleration || self.gpu_acceleration)
&& (!required.columnar_storage || self.columnar_storage)
&& (!required.in_memory || self.in_memory)
&& (!required.content_addressed || self.content_addressed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_schema_registry() {
let registry = SchemaRegistry::new();
let users = TableSchema::new("users")
.with_temperature(DataTemperature::Hot)
.with_workload(WorkloadType::OLTP)
.with_access_pattern(AccessPattern::PointLookup)
.with_column(ColumnSchema::new("id", "integer").as_primary_key())
.with_column(ColumnSchema::new("name", "varchar"));
registry.register_table(users);
let result = registry.get_table("users");
assert!(result.is_some());
let table = result.expect("should exist");
assert_eq!(table.name, "users");
assert_eq!(table.temperature, DataTemperature::Hot);
}
#[test]
fn test_update_classification() {
let registry = SchemaRegistry::new();
registry.register_table(TableSchema::new("events")
.with_temperature(DataTemperature::Warm)
.with_workload(WorkloadType::Mixed));
registry.update_classification("events", DataTemperature::Cold, WorkloadType::OLAP);
let table = registry.get_table("events").expect("should exist");
assert_eq!(table.temperature, DataTemperature::Cold);
assert_eq!(table.workload, WorkloadType::OLAP);
}
#[test]
fn test_sharding_config() {
let mut config = ShardingConfig::new(4);
config.register_table_shard_key("orders", "customer_id");
let shard1 = config.get_shard("customer_id", "cust_123");
let shard2 = config.get_shard("customer_id", "cust_456");
assert!(shard1.is_some());
assert!(shard2.is_some());
}
#[test]
fn test_node_capabilities() {
let required = NodeCapabilities {
vector_search: true,
gpu_acceleration: false,
..Default::default()
};
let vector_node = NodeCapabilities::vector_node();
let analytics_node = NodeCapabilities::analytics_node();
assert!(vector_node.satisfies(&required));
assert!(!analytics_node.satisfies(&required));
}
#[test]
fn test_access_pattern_from_str() {
assert_eq!(AccessPattern::from_str("point_lookup"), Some(AccessPattern::PointLookup));
assert_eq!(AccessPattern::from_str("vector"), Some(AccessPattern::VectorSearch));
assert_eq!(AccessPattern::from_str("invalid"), None);
}
#[test]
fn test_data_temperature_from_str() {
assert_eq!(DataTemperature::from_str("hot"), Some(DataTemperature::Hot));
assert_eq!(DataTemperature::from_str("cold"), Some(DataTemperature::Cold));
assert_eq!(DataTemperature::from_str("archive"), Some(DataTemperature::Frozen));
}
#[test]
fn test_workload_type_from_str() {
assert_eq!(WorkloadType::from_str("oltp"), Some(WorkloadType::OLTP));
assert_eq!(WorkloadType::from_str("vector"), Some(WorkloadType::Vector));
assert_eq!(WorkloadType::from_str("ai"), Some(WorkloadType::Vector));
}
#[test]
fn test_index_schema() {
let index = IndexSchema::new("idx_users_email", "users")
.with_column("email")
.with_type(IndexType::BTree)
.unique();
assert_eq!(index.name, "idx_users_email");
assert!(index.is_unique);
assert_eq!(index.columns, vec!["email"]);
}
#[test]
fn test_tables_by_workload() {
let registry = SchemaRegistry::new();
registry.register_table(TableSchema::new("users").with_workload(WorkloadType::OLTP));
registry.register_table(TableSchema::new("events").with_workload(WorkloadType::OLAP));
registry.register_table(TableSchema::new("orders").with_workload(WorkloadType::OLTP));
let oltp_tables = registry.tables_by_workload(WorkloadType::OLTP);
assert_eq!(oltp_tables.len(), 2);
}
}