use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
platform::Platform,
schema::{
discovery::{SchemaDiscoveryConfig, SchemaDiscoveryEngine, SchemaInfo},
CqlType, TableSchema, UdtRegistry,
},
types::{ComparatorType, UdtTypeDef},
Config, Error, Result,
};
#[derive(Debug, Clone)]
pub struct SchemaRegistryConfig {
pub enable_auto_discovery: bool,
pub enable_caching: bool,
pub cache_ttl_seconds: u64,
pub enable_versioning: bool,
pub max_versions_per_schema: usize,
pub enable_validation: bool,
pub auto_refresh_on_changes: bool,
pub discovery_config: SchemaDiscoveryConfig,
}
impl Default for SchemaRegistryConfig {
fn default() -> Self {
Self {
enable_auto_discovery: true,
enable_caching: true,
cache_ttl_seconds: 3600, enable_versioning: true,
max_versions_per_schema: 5,
enable_validation: true,
auto_refresh_on_changes: false, discovery_config: SchemaDiscoveryConfig::default(),
}
}
}
#[derive(Debug)]
pub struct SchemaRegistry {
config: SchemaRegistryConfig,
_platform: Arc<Platform>,
_core_config: Config,
schemas: Arc<RwLock<HashMap<String, SchemaEntry>>>,
udt_registry: Arc<RwLock<UdtRegistry>>,
discovery_engine: Arc<SchemaDiscoveryEngine>,
validator: Arc<SchemaValidator>,
version_history: Arc<RwLock<HashMap<String, Vec<SchemaVersion>>>>,
}
#[derive(Debug, Clone)]
struct SchemaEntry {
schema: TableSchema,
extended_info: Option<SchemaInfo>,
registered_at: SystemTime,
source: SchemaSource,
validation_status: SchemaValidationStatus,
_associated_files: Vec<PathBuf>,
}
#[derive(Debug, Clone)]
pub enum SchemaSource {
Discovered(Vec<PathBuf>),
External(PathBuf),
Cql(String),
Manual,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaValidationStatus {
Valid,
ValidWithWarnings,
Invalid,
NotValidated,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaVersion {
pub version: u32,
pub created_at: SystemTime,
pub schema: TableSchema,
pub changes: Vec<SchemaChange>,
pub source: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaChange {
pub change_type: SchemaChangeType,
pub component: String,
pub description: String,
pub old_value: Option<String>,
pub new_value: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SchemaChangeType {
ColumnAdded,
ColumnRemoved,
ColumnTypeChanged,
ColumnRenamed,
IndexAdded,
IndexRemoved,
UdtAdded,
UdtModified,
UdtRemoved,
TableOptionChanged,
}
#[derive(Debug, Clone)]
pub struct ValidationReport {
pub table_id: String,
pub status: SchemaValidationStatus,
pub errors: Vec<ValidationError>,
pub warnings: Vec<ValidationWarning>,
pub recommendations: Vec<String>,
pub validated_at: SystemTime,
}
#[derive(Debug, Clone)]
pub struct ValidationError {
pub code: String,
pub message: String,
pub component: Option<String>,
pub severity: ErrorSeverity,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ErrorSeverity {
Critical,
High,
Medium,
Low,
}
#[derive(Debug, Clone)]
pub struct ValidationWarning {
pub code: String,
pub message: String,
pub component: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SchemaQuery {
pub keyspace: Option<String>,
pub table_pattern: Option<String>,
pub source_types: Option<Vec<SchemaSource>>,
pub validated_only: bool,
pub include_history: bool,
}
impl SchemaRegistry {
pub async fn new(
config: SchemaRegistryConfig,
platform: Arc<Platform>,
core_config: Config,
) -> Result<Self> {
let discovery_engine = Arc::new(
SchemaDiscoveryEngine::new(
config.discovery_config.clone(),
platform.clone(),
core_config.clone(),
)
.await?,
);
let validator = Arc::new(SchemaValidator::new());
let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
Ok(Self {
config,
_platform: platform,
_core_config: core_config,
schemas: Arc::new(RwLock::new(HashMap::new())),
udt_registry,
discovery_engine,
validator,
version_history: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn discover_schema(
&self,
keyspace: &str,
table: &str,
sstable_files: &[PathBuf],
) -> Result<TableSchema> {
if !self.config.enable_auto_discovery {
return Err(Error::Schema("Auto-discovery is disabled".to_string()));
}
let schema_info = self
.discovery_engine
.discover_schema(keyspace, table, sstable_files)
.await?;
let table_schema = self.convert_schema_info_to_table_schema(&schema_info)?;
self.register_discovered_schema(
table_schema.clone(),
Some(schema_info),
sstable_files.to_vec(),
)
.await?;
Ok(table_schema)
}
pub async fn register_schema(&self, schema: TableSchema, source: SchemaSource) -> Result<()> {
let table_id = format!("{}.{}", schema.keyspace, schema.table);
let validation_status = if self.config.enable_validation {
match self.validator.validate_table_schema(&schema).await {
Ok(_) => SchemaValidationStatus::Valid,
Err(_) => SchemaValidationStatus::Invalid,
}
} else {
SchemaValidationStatus::NotValidated
};
let entry = SchemaEntry {
schema: schema.clone(),
extended_info: None,
registered_at: SystemTime::now(),
source,
validation_status,
_associated_files: Vec::new(),
};
{
let mut schemas = self.schemas.write().await;
if self.config.enable_versioning && schemas.contains_key(&table_id) {
self.create_schema_version(&table_id, &schema).await?;
}
schemas.insert(table_id, entry);
}
Ok(())
}
pub async fn get_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
let table_id = format!("{}.{}", keyspace, table);
let schemas = self.schemas.read().await;
match schemas.get(&table_id) {
Some(entry) => {
if self.is_entry_expired(entry) {
drop(schemas); return self.refresh_schema(keyspace, table).await;
}
Ok(entry.schema.clone())
}
None => {
drop(schemas); if self.config.enable_auto_discovery {
self.auto_discover_schema(keyspace, table).await
} else {
Err(Error::Schema(format!(
"Schema not found: {}.{}",
keyspace, table
)))
}
}
}
}
pub async fn get_schema_info(&self, keyspace: &str, table: &str) -> Result<Option<SchemaInfo>> {
let table_id = format!("{}.{}", keyspace, table);
let schemas = self.schemas.read().await;
match schemas.get(&table_id) {
Some(entry) => Ok(entry.extended_info.clone()),
None => Ok(None),
}
}
pub async fn list_schemas(&self, query: Option<SchemaQuery>) -> Result<Vec<TableSchema>> {
let schemas = self.schemas.read().await;
let mut results = Vec::new();
for (_table_id, entry) in schemas.iter() {
if let Some(ref q) = query {
if !self.matches_query(&entry.schema, q) {
continue;
}
}
results.push(entry.schema.clone());
}
results.sort_by(|a, b| {
a.keyspace
.cmp(&b.keyspace)
.then_with(|| a.table.cmp(&b.table))
});
Ok(results)
}
#[allow(dead_code)]
pub async fn validate_schema(&self, keyspace: &str, table: &str) -> Result<ValidationReport> {
let schema = self.get_schema(keyspace, table).await?;
let table_id = format!("{}.{}", keyspace, table);
let mut errors = Vec::new();
let mut warnings = Vec::new();
let mut recommendations = Vec::new();
if let Err(e) = schema.validate() {
errors.push(ValidationError {
code: "SCHEMA_INVALID".to_string(),
message: e.to_string(),
component: None,
severity: ErrorSeverity::Critical,
});
}
self.validate_schema_udts(&schema, &mut errors, &mut warnings)
.await;
self.validate_column_types(&schema, &mut errors, &mut warnings)
.await;
self.generate_performance_recommendations(&schema, &mut recommendations)
.await;
let status = if !errors.is_empty() {
SchemaValidationStatus::Invalid
} else if !warnings.is_empty() {
SchemaValidationStatus::ValidWithWarnings
} else {
SchemaValidationStatus::Valid
};
{
let mut schemas = self.schemas.write().await;
if let Some(entry) = schemas.get_mut(&table_id) {
entry.validation_status = status.clone();
}
}
Ok(ValidationReport {
table_id,
status,
errors,
warnings,
recommendations,
validated_at: SystemTime::now(),
})
}
pub async fn get_schema_history(
&self,
keyspace: &str,
table: &str,
) -> Result<Vec<SchemaVersion>> {
if !self.config.enable_versioning {
return Err(Error::Schema("Schema versioning is disabled".to_string()));
}
let table_id = format!("{}.{}", keyspace, table);
let history = self.version_history.read().await;
Ok(history.get(&table_id).cloned().unwrap_or_default())
}
pub async fn remove_schema(&self, keyspace: &str, table: &str) -> Result<()> {
let table_id = format!("{}.{}", keyspace, table);
{
let mut schemas = self.schemas.write().await;
schemas.remove(&table_id);
}
if self.config.enable_versioning {
let mut history = self.version_history.write().await;
history.remove(&table_id);
}
Ok(())
}
pub async fn generate_cql(&self, keyspace: &str, table: &str) -> Result<String> {
if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
return self.discovery_engine.generate_cql(&schema_info).await;
}
let schema = self.get_schema(keyspace, table).await?;
Ok(self.generate_basic_cql(&schema))
}
#[cfg(feature = "experimental")]
pub async fn export_schema_json(&self, keyspace: &str, table: &str) -> Result<String> {
self.export_schema_json_with_config(
keyspace,
table,
&crate::schema::json_exporter::JsonExportConfig::default(),
)
.await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_schema_json(&self, _keyspace: &str, _table: &str) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_schema_json_with_config(
&self,
keyspace: &str,
table: &str,
config: &crate::schema::json_exporter::JsonExportConfig,
) -> Result<String> {
if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
return self
.discovery_engine
.export_json_with_config(&schema_info, config)
.await;
}
let schema = self.get_schema(keyspace, table).await?;
let exporter = crate::schema::json_exporter::JsonExporter::with_config(config.clone());
exporter.export_table_schema(&schema)
}
#[cfg(not(feature = "experimental"))]
pub async fn export_schema_json_with_config<T>(
&self,
_keyspace: &str,
_table: &str,
_config: &T,
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_schema_json_compact(&self, keyspace: &str, table: &str) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::Compact,
include_metadata: false,
include_performance_metrics: false,
include_type_details: false,
pretty_format: false,
..Default::default()
};
self.export_schema_json_with_config(keyspace, table, &config)
.await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_schema_json_compact(
&self,
_keyspace: &str,
_table: &str,
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_schema_json_openapi(&self, keyspace: &str, table: &str) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::OpenApi,
include_documentation: true,
include_type_details: true,
include_metadata: false,
..Default::default()
};
self.export_schema_json_with_config(keyspace, table, &config)
.await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_schema_json_openapi(
&self,
_keyspace: &str,
_table: &str,
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_schema_json_pipeline(&self, keyspace: &str, table: &str) -> Result<String> {
let config = crate::schema::json_exporter::JsonExportConfig {
format_variant: crate::schema::json_exporter::JsonFormat::DataPipeline,
include_type_details: true,
include_table_options: false,
include_performance_metrics: true,
..Default::default()
};
self.export_schema_json_with_config(keyspace, table, &config)
.await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_schema_json_pipeline(
&self,
_keyspace: &str,
_table: &str,
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_multiple_schemas_json(
&self,
schema_infos: &[SchemaInfo],
) -> Result<String> {
let exporter = crate::schema::json_exporter::JsonExporter::new();
exporter.export_multiple_schemas(schema_infos)
}
#[cfg(not(feature = "experimental"))]
pub async fn export_multiple_schemas_json(
&self,
_schema_infos: &[SchemaInfo],
) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
#[cfg(feature = "experimental")]
pub async fn export_keyspace_schemas_json(&self, keyspace: &str) -> Result<String> {
let mut schema_infos = Vec::new();
for (_table_id, entry) in self.schemas.read().await.iter() {
if entry.schema.keyspace == keyspace {
if let Ok(Some(schema_info)) = self
.get_schema_info(&entry.schema.keyspace, &entry.schema.table)
.await
{
schema_infos.push(schema_info);
}
}
}
if schema_infos.is_empty() {
return Err(Error::NotFound(format!(
"No schemas found in keyspace '{}'",
keyspace
)));
}
self.export_multiple_schemas_json(&schema_infos).await
}
#[cfg(not(feature = "experimental"))]
pub async fn export_keyspace_schemas_json(&self, _keyspace: &str) -> Result<String> {
Err(crate::error::Error::unsupported_format(
"JSON export requires experimental feature",
))
}
pub async fn register_udt(&self, udt_def: UdtTypeDef) -> Result<()> {
let mut registry = self.udt_registry.write().await;
registry.register_udt(udt_def);
Ok(())
}
pub async fn get_udt(&self, keyspace: &str, name: &str) -> Result<Option<UdtTypeDef>> {
let registry = self.udt_registry.read().await;
Ok(registry.get_udt(keyspace, name).cloned())
}
pub(crate) fn get_udt_registry(&self) -> Arc<RwLock<UdtRegistry>> {
self.udt_registry.clone()
}
pub async fn get_column_comparator(
&self,
keyspace: &str,
table: &str,
column: &str,
) -> Result<ComparatorType> {
let schema = self.get_schema(keyspace, table).await?;
let column_def = schema
.columns
.iter()
.find(|c| c.name == column)
.ok_or_else(|| {
Error::Schema(format!(
"Column '{}' not found in table '{}.{}'",
column, keyspace, table
))
})?;
let cql_type = CqlType::parse(&column_def.data_type)?;
ComparatorType::from_cql_type(&cql_type)
}
pub async fn get_table_comparators(
&self,
keyspace: &str,
table: &str,
) -> Result<HashMap<String, ComparatorType>> {
let schema = self.get_schema(keyspace, table).await?;
let mut comparators = HashMap::new();
for column in &schema.columns {
let cql_type = CqlType::parse(&column.data_type)?;
let comparator = ComparatorType::from_cql_type(&cql_type)?;
comparators.insert(column.name.clone(), comparator);
}
Ok(comparators)
}
pub async fn get_partition_key_comparator(
&self,
keyspace: &str,
table: &str,
) -> Result<Vec<ComparatorType>> {
let schema = self.get_schema(keyspace, table).await?;
let mut comparators = Vec::new();
let ordered_keys = schema.ordered_partition_keys();
for key_column in ordered_keys {
let cql_type = CqlType::parse(&key_column.data_type)?;
let comparator = ComparatorType::from_cql_type(&cql_type)?;
comparators.push(comparator);
}
Ok(comparators)
}
pub async fn get_parsing_context(&self, keyspace: &str, table: &str) -> Result<ParsingContext> {
let schema = self.get_schema(keyspace, table).await?;
let partition_comparators = self.get_partition_key_comparator(keyspace, table).await?;
let clustering_comparators = self.get_clustering_key_comparator(keyspace, table).await?;
let column_comparators = self.get_table_comparators(keyspace, table).await?;
Ok(ParsingContext {
schema,
partition_comparators,
clustering_comparators,
column_comparators,
})
}
pub async fn get_clustering_key_comparator(
&self,
keyspace: &str,
table: &str,
) -> Result<Vec<ComparatorType>> {
let schema = self.get_schema(keyspace, table).await?;
let mut comparators = Vec::new();
let ordered_keys = schema.ordered_clustering_keys();
for key_column in ordered_keys {
let cql_type = CqlType::parse(&key_column.data_type)?;
let comparator = ComparatorType::from_cql_type(&cql_type)?;
comparators.push(comparator);
}
Ok(comparators)
}
pub async fn validate_column_type_compatibility(
&self,
keyspace: &str,
table: &str,
column: &str,
expected_type: &str,
) -> Result<bool> {
let column_comparator = self.get_column_comparator(keyspace, table, column).await?;
let expected_cql_type = CqlType::parse(expected_type)?;
let expected_comparator = ComparatorType::from_cql_type(&expected_cql_type)?;
Ok(self.comparators_are_compatible(&column_comparator, &expected_comparator))
}
#[allow(clippy::only_used_in_recursion)]
fn comparators_are_compatible(&self, left: &ComparatorType, right: &ComparatorType) -> bool {
match (left, right) {
(ComparatorType::Boolean, ComparatorType::Boolean) => true,
(ComparatorType::TinyInt, ComparatorType::TinyInt) => true,
(ComparatorType::SmallInt, ComparatorType::SmallInt) => true,
(ComparatorType::Int, ComparatorType::Int) => true,
(ComparatorType::BigInt, ComparatorType::BigInt) => true,
(ComparatorType::Float32, ComparatorType::Float32) => true,
(ComparatorType::Float, ComparatorType::Float) => true,
(ComparatorType::Text, ComparatorType::Text) => true,
(ComparatorType::Blob, ComparatorType::Blob) => true,
(ComparatorType::Timestamp, ComparatorType::Timestamp) => true,
(ComparatorType::Uuid, ComparatorType::Uuid) => true,
(ComparatorType::Json, ComparatorType::Json) => true,
(ComparatorType::List(l_elem), ComparatorType::List(r_elem)) => {
self.comparators_are_compatible(l_elem, r_elem)
}
(ComparatorType::Set(l_elem), ComparatorType::Set(r_elem)) => {
self.comparators_are_compatible(l_elem, r_elem)
}
(ComparatorType::Map(l_key, l_val), ComparatorType::Map(r_key, r_val)) => {
self.comparators_are_compatible(l_key, r_key)
&& self.comparators_are_compatible(l_val, r_val)
}
(ComparatorType::Tuple(l_fields), ComparatorType::Tuple(r_fields)) => {
l_fields.len() == r_fields.len()
&& l_fields
.iter()
.zip(r_fields.iter())
.all(|(l, r)| self.comparators_are_compatible(l, r))
}
(
ComparatorType::Udt {
type_name: l_name,
keyspace: l_ks,
..
},
ComparatorType::Udt {
type_name: r_name,
keyspace: r_ks,
..
},
) => l_name == r_name && l_ks == r_ks,
(ComparatorType::Frozen(l_inner), ComparatorType::Frozen(r_inner)) => {
self.comparators_are_compatible(l_inner, r_inner)
}
(ComparatorType::Custom(l_name), ComparatorType::Custom(r_name)) => l_name == r_name,
_ => false,
}
}
pub async fn get_statistics(&self) -> Result<RegistryStatistics> {
let schemas = self.schemas.read().await;
let udt_registry = self.udt_registry.read().await;
let version_history = self.version_history.read().await;
let mut stats = RegistryStatistics {
total_schemas: schemas.len(),
schemas_by_keyspace: HashMap::new(),
validated_schemas: 0,
schemas_with_warnings: 0,
invalid_schemas: 0,
total_udts: udt_registry.total_udts(),
total_versions: version_history.values().map(|v| v.len()).sum(),
auto_discovered_schemas: 0,
manually_registered_schemas: 0,
cache_hit_rate: 0.0, };
for entry in schemas.values() {
let keyspace = &entry.schema.keyspace;
*stats
.schemas_by_keyspace
.entry(keyspace.clone())
.or_insert(0) += 1;
match entry.validation_status {
SchemaValidationStatus::Valid => stats.validated_schemas += 1,
SchemaValidationStatus::ValidWithWarnings => stats.schemas_with_warnings += 1,
SchemaValidationStatus::Invalid => stats.invalid_schemas += 1,
SchemaValidationStatus::NotValidated => {}
}
match entry.source {
SchemaSource::Discovered(_) => stats.auto_discovered_schemas += 1,
_ => stats.manually_registered_schemas += 1,
}
}
Ok(stats)
}
async fn register_discovered_schema(
&self,
schema: TableSchema,
schema_info: Option<SchemaInfo>,
sstable_files: Vec<PathBuf>,
) -> Result<()> {
let table_id = format!("{}.{}", schema.keyspace, schema.table);
let source = SchemaSource::Discovered(sstable_files.clone());
let entry = SchemaEntry {
schema,
extended_info: schema_info,
registered_at: SystemTime::now(),
source,
validation_status: SchemaValidationStatus::Valid, _associated_files: sstable_files,
};
let mut schemas = self.schemas.write().await;
schemas.insert(table_id, entry);
Ok(())
}
fn convert_schema_info_to_table_schema(&self, schema_info: &SchemaInfo) -> Result<TableSchema> {
let mut columns = Vec::new();
let mut partition_keys = Vec::new();
let mut clustering_keys = Vec::new();
for (pos, pk) in schema_info.partition_key.iter().enumerate() {
partition_keys.push(crate::schema::KeyColumn {
name: pk.name.clone(),
data_type: pk.data_type.clone(),
position: pos,
});
}
for ck in &schema_info.clustering_keys {
clustering_keys.push(ck.clone());
}
for col in &schema_info.regular_columns {
columns.push(crate::schema::Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
nullable: col.nullable,
default: None, is_static: false,
});
}
for col in &schema_info.static_columns {
columns.push(crate::schema::Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
nullable: col.nullable,
default: None, is_static: true,
});
}
Ok(TableSchema {
keyspace: schema_info.keyspace.clone(),
table: schema_info.table.clone(),
partition_keys,
clustering_keys,
columns,
comments: HashMap::new(),
})
}
fn is_entry_expired(&self, entry: &SchemaEntry) -> bool {
if !self.config.enable_caching {
return false;
}
let ttl = std::time::Duration::from_secs(self.config.cache_ttl_seconds);
entry
.registered_at
.elapsed()
.unwrap_or(std::time::Duration::ZERO)
> ttl
}
async fn refresh_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
self.auto_discover_schema(keyspace, table).await
}
async fn auto_discover_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
let sstable_files = self.find_sstable_files(keyspace, table).await?;
if sstable_files.is_empty() {
return Err(Error::Schema(format!(
"No SSTables found for {}.{}",
keyspace, table
)));
}
self.discover_schema(keyspace, table, &sstable_files).await
}
async fn find_sstable_files(&self, _keyspace: &str, _table: &str) -> Result<Vec<PathBuf>> {
Ok(Vec::new())
}
fn matches_query(&self, schema: &TableSchema, query: &SchemaQuery) -> bool {
if let Some(ref ks) = query.keyspace {
if &schema.keyspace != ks {
return false;
}
}
if let Some(ref pattern) = query.table_pattern {
if !self.matches_pattern(&schema.table, pattern) {
return false;
}
}
true
}
fn matches_pattern(&self, text: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
text == pattern || text.contains(pattern)
}
async fn create_schema_version(&self, table_id: &str, new_schema: &TableSchema) -> Result<()> {
let mut version_history = self.version_history.write().await;
let versions = version_history
.entry(table_id.to_string())
.or_insert_with(Vec::new);
let version_number = versions.len() as u32 + 1;
let changes = if versions.is_empty() {
vec![SchemaChange {
change_type: SchemaChangeType::ColumnAdded,
component: "initial".to_string(),
description: "Initial schema version".to_string(),
old_value: None,
new_value: None,
}]
} else {
self.detect_schema_changes(&versions.last().unwrap().schema, new_schema)
};
let new_version = SchemaVersion {
version: version_number,
created_at: SystemTime::now(),
schema: new_schema.clone(),
changes,
source: "registry".to_string(),
};
versions.push(new_version);
if versions.len() > self.config.max_versions_per_schema {
versions.remove(0);
}
Ok(())
}
fn detect_schema_changes(
&self,
old_schema: &TableSchema,
new_schema: &TableSchema,
) -> Vec<SchemaChange> {
let mut changes = Vec::new();
let old_columns: HashMap<_, _> = old_schema.columns.iter().map(|c| (&c.name, c)).collect();
let new_columns: HashMap<_, _> = new_schema.columns.iter().map(|c| (&c.name, c)).collect();
for (name, column) in &new_columns {
if !old_columns.contains_key(name) {
changes.push(SchemaChange {
change_type: SchemaChangeType::ColumnAdded,
component: name.to_string(),
description: format!(
"Column '{}' added with type '{}'",
name, column.data_type
),
old_value: None,
new_value: Some(column.data_type.clone()),
});
}
}
for name in old_columns.keys() {
if !new_columns.contains_key(name) {
changes.push(SchemaChange {
change_type: SchemaChangeType::ColumnRemoved,
component: name.to_string(),
description: format!("Column '{}' removed", name),
old_value: None,
new_value: None,
});
}
}
for (name, new_column) in &new_columns {
if let Some(old_column) = old_columns.get(name) {
if old_column.data_type != new_column.data_type {
changes.push(SchemaChange {
change_type: SchemaChangeType::ColumnTypeChanged,
component: name.to_string(),
description: format!("Column '{}' type changed", name),
old_value: Some(old_column.data_type.clone()),
new_value: Some(new_column.data_type.clone()),
});
}
}
}
changes
}
async fn validate_schema_udts(
&self,
schema: &TableSchema,
errors: &mut Vec<ValidationError>,
warnings: &mut Vec<ValidationWarning>,
) {
let udt_registry = self.udt_registry.read().await;
for column in &schema.columns {
if let Ok(cql_type) = CqlType::parse(&column.data_type) {
self.validate_cql_type_udts(
&cql_type,
&schema.keyspace,
&udt_registry,
errors,
warnings,
);
}
}
}
#[allow(clippy::only_used_in_recursion)]
fn validate_cql_type_udts(
&self,
cql_type: &CqlType,
keyspace: &str,
udt_registry: &UdtRegistry,
errors: &mut Vec<ValidationError>,
_warnings: &mut Vec<ValidationWarning>,
) {
match cql_type {
CqlType::Udt(udt_name, _) => {
if !udt_registry.contains_udt(keyspace, udt_name) {
errors.push(ValidationError {
code: "UDT_NOT_FOUND".to_string(),
message: format!("UDT '{}' not found in keyspace '{}'", udt_name, keyspace),
component: Some(udt_name.clone()),
severity: ErrorSeverity::High,
});
}
}
CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
self.validate_cql_type_udts(inner, keyspace, udt_registry, errors, _warnings);
}
CqlType::Map(key_type, value_type) => {
self.validate_cql_type_udts(key_type, keyspace, udt_registry, errors, _warnings);
self.validate_cql_type_udts(value_type, keyspace, udt_registry, errors, _warnings);
}
CqlType::Tuple(types) => {
for t in types {
self.validate_cql_type_udts(t, keyspace, udt_registry, errors, _warnings);
}
}
_ => {} }
}
async fn validate_column_types(
&self,
schema: &TableSchema,
errors: &mut Vec<ValidationError>,
_warnings: &mut [ValidationWarning],
) {
for column in &schema.columns {
if let Err(e) = CqlType::parse(&column.data_type) {
errors.push(ValidationError {
code: "INVALID_COLUMN_TYPE".to_string(),
message: format!("Invalid column type '{}': {}", column.data_type, e),
component: Some(column.name.clone()),
severity: ErrorSeverity::High,
});
}
}
}
async fn generate_performance_recommendations(
&self,
schema: &TableSchema,
recommendations: &mut Vec<String>,
) {
if schema.partition_keys.len() > 3 {
recommendations.push(
"Consider reducing the number of partition key columns for better performance"
.to_string(),
);
}
if schema.clustering_keys.len() > 5 {
recommendations
.push("Large number of clustering keys may impact query performance".to_string());
}
if schema.columns.len() > 50 {
recommendations.push(
"Consider using UDTs or denormalizing wide tables for better performance"
.to_string(),
);
}
}
fn generate_basic_cql(&self, schema: &TableSchema) -> String {
let mut cql = format!("CREATE TABLE {}.{} (\n", schema.keyspace, schema.table);
for (i, column) in schema.columns.iter().enumerate() {
if i > 0 {
cql.push_str(",\n");
}
cql.push_str(&format!(" {} {}", column.name, column.data_type));
}
if !schema.partition_keys.is_empty() {
cql.push_str(",\n PRIMARY KEY (");
if schema.partition_keys.len() == 1 && schema.clustering_keys.is_empty() {
cql.push_str(&schema.partition_keys[0].name);
} else {
cql.push('(');
for (i, pk) in schema.partition_keys.iter().enumerate() {
if i > 0 {
cql.push_str(", ");
}
cql.push_str(&pk.name);
}
cql.push(')');
if !schema.clustering_keys.is_empty() {
for ck in &schema.clustering_keys {
cql.push_str(", ");
cql.push_str(&ck.name);
}
}
}
cql.push(')');
}
cql.push_str("\n);");
cql
}
}
#[derive(Debug, Clone)]
pub struct RegistryStatistics {
pub total_schemas: usize,
pub schemas_by_keyspace: HashMap<String, usize>,
pub validated_schemas: usize,
pub schemas_with_warnings: usize,
pub invalid_schemas: usize,
pub total_udts: usize,
pub total_versions: usize,
pub auto_discovered_schemas: usize,
pub manually_registered_schemas: usize,
pub cache_hit_rate: f64,
}
#[derive(Debug, Clone)]
pub struct ParsingContext {
pub schema: TableSchema,
pub partition_comparators: Vec<ComparatorType>,
pub clustering_comparators: Vec<ComparatorType>,
pub column_comparators: HashMap<String, ComparatorType>,
}
impl ParsingContext {
pub fn get_column_comparator(&self, column_name: &str) -> Option<&ComparatorType> {
self.column_comparators.get(column_name)
}
pub fn is_complete(&self) -> bool {
!self.partition_comparators.is_empty() || !self.schema.partition_keys.is_empty()
}
pub fn get_all_key_column_names(&self) -> Vec<String> {
let mut names = Vec::new();
names.extend(
self.schema
.ordered_partition_keys()
.iter()
.map(|k| k.name.clone()),
);
names.extend(
self.schema
.ordered_clustering_keys()
.iter()
.map(|k| k.name.clone()),
);
names
}
}
#[derive(Debug)]
pub struct SchemaValidator;
impl Default for SchemaValidator {
fn default() -> Self {
Self::new()
}
}
impl SchemaValidator {
pub fn new() -> Self {
Self
}
pub async fn validate_table_schema(&self, schema: &TableSchema) -> Result<()> {
schema.validate()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
async fn make_registry(mut reg_config: SchemaRegistryConfig) -> SchemaRegistry {
reg_config.enable_auto_discovery = false;
let core_config = Config::default();
let platform = Arc::new(Platform::new(&core_config).await.expect("platform"));
SchemaRegistry::new(reg_config, platform, core_config)
.await
.expect("registry")
}
fn simple_schema(name: &str) -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: name.to_string(),
partition_keys: vec![crate::schema::KeyColumn {
name: "id".to_string(),
data_type: "uuid".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![crate::schema::Column {
name: "id".to_string(),
data_type: "uuid".to_string(),
nullable: false,
default: None,
is_static: false,
}],
comments: HashMap::new(),
}
}
#[tokio::test]
async fn test_schema_registry_creation() {
let registry = make_registry(SchemaRegistryConfig::default()).await;
let stats = registry.get_statistics().await.unwrap();
assert_eq!(stats.total_schemas, 0);
}
#[test]
fn test_schema_query_creation() {
let query = SchemaQuery {
keyspace: Some("test_ks".to_string()),
table_pattern: Some("user_*".to_string()),
source_types: None,
validated_only: false,
include_history: false,
};
assert_eq!(query.keyspace.as_ref().unwrap(), "test_ks");
assert_eq!(query.table_pattern.as_ref().unwrap(), "user_*");
}
#[tokio::test]
async fn register_and_retrieve_schema() {
let registry = make_registry(SchemaRegistryConfig::default()).await;
let schema = simple_schema("users");
registry
.register_schema(schema.clone(), SchemaSource::Manual)
.await
.expect("register schema");
let fetched = registry
.get_schema("test_ks", "users")
.await
.expect("fetch schema");
assert_eq!(fetched.table, "users");
assert_eq!(fetched.partition_keys.len(), 1);
}
#[tokio::test]
async fn schema_version_history_tracks_changes() {
let registry = make_registry(SchemaRegistryConfig::default()).await;
let mut schema = simple_schema("accounts");
registry
.register_schema(schema.clone(), SchemaSource::Manual)
.await
.expect("register v1");
schema.columns.push(crate::schema::Column {
name: "status".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
});
registry
.register_schema(schema.clone(), SchemaSource::Manual)
.await
.expect("register v2");
let history = registry
.get_schema_history("test_ks", "accounts")
.await
.expect("history");
assert_eq!(
history.len(),
1,
"Second registration should emit first version"
);
assert!(history[0]
.changes
.iter()
.any(|change| matches!(change.change_type, SchemaChangeType::ColumnAdded)));
}
#[tokio::test]
async fn expired_cached_schema_invokes_discovery_path() {
let mut config = SchemaRegistryConfig::default();
config.cache_ttl_seconds = 0;
config.enable_auto_discovery = true;
let registry = make_registry(config).await;
registry
.register_schema(simple_schema("events"), SchemaSource::Manual)
.await
.expect("register events schema");
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let err = registry
.get_schema("test_ks", "events")
.await
.expect_err("expired schema should attempt discovery");
assert!(matches!(err, Error::Schema(message) if message.contains("No SSTables found")));
}
}