Skip to main content

cqlite_core/schema/
registry.rs

1//! Schema Registry for Centralized Schema Management
2//!
3//! This module provides a centralized registry for managing table schemas, UDTs,
4//! and other schema-related information with support for schema discovery,
5//! validation, caching, and version management.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14
15use crate::{
16    platform::Platform,
17    schema::{
18        discovery::{SchemaDiscoveryConfig, SchemaDiscoveryEngine, SchemaInfo},
19        CqlType, TableSchema, UdtRegistry,
20    },
21    types::{ComparatorType, UdtTypeDef},
22    Config, Error, Result,
23};
24
25/// Configuration for schema registry
26#[derive(Debug, Clone)]
27pub struct SchemaRegistryConfig {
28    /// Enable automatic schema discovery
29    pub enable_auto_discovery: bool,
30    /// Enable schema caching
31    pub enable_caching: bool,
32    /// Cache TTL in seconds
33    pub cache_ttl_seconds: u64,
34    /// Enable schema versioning
35    pub enable_versioning: bool,
36    /// Maximum versions to keep per schema
37    pub max_versions_per_schema: usize,
38    /// Enable schema validation
39    pub enable_validation: bool,
40    /// Auto-refresh schemas on SSTable changes
41    pub auto_refresh_on_changes: bool,
42    /// Discovery configuration
43    pub discovery_config: SchemaDiscoveryConfig,
44}
45
46impl Default for SchemaRegistryConfig {
47    fn default() -> Self {
48        Self {
49            enable_auto_discovery: true,
50            enable_caching: true,
51            cache_ttl_seconds: 3600, // 1 hour
52            enable_versioning: true,
53            max_versions_per_schema: 5,
54            enable_validation: true,
55            auto_refresh_on_changes: false, // Disabled by default for performance
56            discovery_config: SchemaDiscoveryConfig::default(),
57        }
58    }
59}
60
61/// Centralized schema registry
62#[derive(Debug)]
63pub struct SchemaRegistry {
64    /// Configuration
65    config: SchemaRegistryConfig,
66    /// Platform abstraction
67    _platform: Arc<Platform>,
68    /// Core configuration
69    _core_config: Config,
70    /// Registered table schemas by keyspace.table
71    schemas: Arc<RwLock<HashMap<String, SchemaEntry>>>,
72    /// UDT registry for managing user-defined types
73    udt_registry: Arc<RwLock<UdtRegistry>>,
74    /// Schema discovery engine
75    discovery_engine: Arc<SchemaDiscoveryEngine>,
76    /// Schema validator
77    validator: Arc<SchemaValidator>,
78    /// Schema version history
79    version_history: Arc<RwLock<HashMap<String, Vec<SchemaVersion>>>>,
80}
81
82/// Schema entry in the registry
83#[derive(Debug, Clone)]
84struct SchemaEntry {
85    /// The table schema
86    schema: TableSchema,
87    /// Extended schema information if available
88    extended_info: Option<SchemaInfo>,
89    /// When the schema was registered/updated
90    registered_at: SystemTime,
91    /// Source of the schema
92    source: SchemaSource,
93    /// Validation status
94    validation_status: SchemaValidationStatus,
95    /// Associated SSTable files
96    _associated_files: Vec<PathBuf>,
97}
98
99/// Source of schema information
100#[derive(Debug, Clone)]
101pub enum SchemaSource {
102    /// Discovered from SSTable files
103    Discovered(Vec<PathBuf>),
104    /// Loaded from external definition
105    External(PathBuf),
106    /// Parsed from CQL DDL
107    Cql(String),
108    /// Manually registered
109    Manual,
110}
111
112/// Schema validation status
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum SchemaValidationStatus {
115    /// Schema is valid
116    Valid,
117    /// Schema has warnings but is usable
118    ValidWithWarnings,
119    /// Schema is invalid
120    Invalid,
121    /// Not yet validated
122    NotValidated,
123}
124
125/// Schema version information
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct SchemaVersion {
128    /// Version number
129    pub version: u32,
130    /// When this version was created
131    pub created_at: SystemTime,
132    /// Schema at this version
133    pub schema: TableSchema,
134    /// Changes from previous version
135    pub changes: Vec<SchemaChange>,
136    /// Source of this version
137    pub source: String,
138}
139
140/// Schema change description
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SchemaChange {
143    /// Type of change
144    pub change_type: SchemaChangeType,
145    /// Component affected
146    pub component: String,
147    /// Description of the change
148    pub description: String,
149    /// Old value (if applicable)
150    pub old_value: Option<String>,
151    /// New value (if applicable)
152    pub new_value: Option<String>,
153}
154
155/// Types of schema changes
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
157pub enum SchemaChangeType {
158    /// Column added
159    ColumnAdded,
160    /// Column removed
161    ColumnRemoved,
162    /// Column type changed
163    ColumnTypeChanged,
164    /// Column renamed
165    ColumnRenamed,
166    /// Index added
167    IndexAdded,
168    /// Index removed
169    IndexRemoved,
170    /// UDT added
171    UdtAdded,
172    /// UDT modified
173    UdtModified,
174    /// UDT removed
175    UdtRemoved,
176    /// Table option changed
177    TableOptionChanged,
178}
179
180/// Schema validation report
181#[derive(Debug, Clone)]
182pub struct ValidationReport {
183    /// Table identifier
184    pub table_id: String,
185    /// Overall validation status
186    pub status: SchemaValidationStatus,
187    /// Validation errors
188    pub errors: Vec<ValidationError>,
189    /// Validation warnings
190    pub warnings: Vec<ValidationWarning>,
191    /// Recommendations
192    pub recommendations: Vec<String>,
193    /// Validation timestamp
194    pub validated_at: SystemTime,
195}
196
197/// Validation error details
198#[derive(Debug, Clone)]
199pub struct ValidationError {
200    /// Error code
201    pub code: String,
202    /// Error message
203    pub message: String,
204    /// Affected component
205    pub component: Option<String>,
206    /// Severity level
207    pub severity: ErrorSeverity,
208}
209
210/// Error severity levels
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum ErrorSeverity {
213    Critical,
214    High,
215    Medium,
216    Low,
217}
218
219/// Validation warning details
220#[derive(Debug, Clone)]
221pub struct ValidationWarning {
222    /// Warning code
223    pub code: String,
224    /// Warning message
225    pub message: String,
226    /// Affected component
227    pub component: Option<String>,
228}
229
230/// Schema search query
231#[derive(Debug, Clone)]
232pub struct SchemaQuery {
233    /// Keyspace filter (optional)
234    pub keyspace: Option<String>,
235    /// Table name pattern (supports wildcards)
236    pub table_pattern: Option<String>,
237    /// Include schemas with specific source types
238    pub source_types: Option<Vec<SchemaSource>>,
239    /// Include only validated schemas
240    pub validated_only: bool,
241    /// Include version history
242    pub include_history: bool,
243}
244
245impl SchemaRegistry {
246    /// Create a new schema registry
247    pub async fn new(
248        config: SchemaRegistryConfig,
249        platform: Arc<Platform>,
250        core_config: Config,
251    ) -> Result<Self> {
252        let discovery_engine = Arc::new(
253            SchemaDiscoveryEngine::new(
254                config.discovery_config.clone(),
255                platform.clone(),
256                core_config.clone(),
257            )
258            .await?,
259        );
260
261        let validator = Arc::new(SchemaValidator::new());
262        let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
263
264        Ok(Self {
265            config,
266            _platform: platform,
267            _core_config: core_config,
268            schemas: Arc::new(RwLock::new(HashMap::new())),
269            udt_registry,
270            discovery_engine,
271            validator,
272            version_history: Arc::new(RwLock::new(HashMap::new())),
273        })
274    }
275
276    /// Discover and register schema from SSTable files
277    pub async fn discover_schema(
278        &self,
279        keyspace: &str,
280        table: &str,
281        sstable_files: &[PathBuf],
282    ) -> Result<TableSchema> {
283        if !self.config.enable_auto_discovery {
284            return Err(Error::Schema("Auto-discovery is disabled".to_string()));
285        }
286
287        // Use discovery engine to analyze SSTable files
288        let schema_info = self
289            .discovery_engine
290            .discover_schema(keyspace, table, sstable_files)
291            .await?;
292
293        // Convert to TableSchema format for compatibility
294        let table_schema = self.convert_schema_info_to_table_schema(&schema_info)?;
295
296        // Register the discovered schema
297        self.register_discovered_schema(
298            table_schema.clone(),
299            Some(schema_info),
300            sstable_files.to_vec(),
301        )
302        .await?;
303
304        Ok(table_schema)
305    }
306
307    /// Register a schema from external source
308    pub async fn register_schema(&self, schema: TableSchema, source: SchemaSource) -> Result<()> {
309        let table_id = format!("{}.{}", schema.keyspace, schema.table);
310
311        // Validate schema if validation is enabled
312        let validation_status = if self.config.enable_validation {
313            match self.validator.validate_table_schema(&schema).await {
314                Ok(_) => SchemaValidationStatus::Valid,
315                Err(_) => SchemaValidationStatus::Invalid,
316            }
317        } else {
318            SchemaValidationStatus::NotValidated
319        };
320
321        // Create schema entry
322        let entry = SchemaEntry {
323            schema: schema.clone(),
324            extended_info: None,
325            registered_at: SystemTime::now(),
326            source,
327            validation_status,
328            _associated_files: Vec::new(),
329        };
330
331        // Store in registry
332        {
333            let mut schemas = self.schemas.write().await;
334
335            // Check if we need to create a new version
336            if self.config.enable_versioning && schemas.contains_key(&table_id) {
337                self.create_schema_version(&table_id, &schema).await?;
338            }
339
340            schemas.insert(table_id, entry);
341        }
342
343        Ok(())
344    }
345
346    /// Get schema by keyspace and table name
347    pub async fn get_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
348        let table_id = format!("{}.{}", keyspace, table);
349        let schemas = self.schemas.read().await;
350
351        match schemas.get(&table_id) {
352            Some(entry) => {
353                // Check if schema is still valid (cache TTL)
354                if self.is_entry_expired(entry) {
355                    drop(schemas); // Release read lock
356                    return self.refresh_schema(keyspace, table).await;
357                }
358                Ok(entry.schema.clone())
359            }
360            None => {
361                drop(schemas); // Release read lock
362                               // Try to discover schema if auto-discovery is enabled
363                if self.config.enable_auto_discovery {
364                    self.auto_discover_schema(keyspace, table).await
365                } else {
366                    Err(Error::Schema(format!(
367                        "Schema not found: {}.{}",
368                        keyspace, table
369                    )))
370                }
371            }
372        }
373    }
374
375    /// Get extended schema information
376    pub async fn get_schema_info(&self, keyspace: &str, table: &str) -> Result<Option<SchemaInfo>> {
377        let table_id = format!("{}.{}", keyspace, table);
378        let schemas = self.schemas.read().await;
379
380        match schemas.get(&table_id) {
381            Some(entry) => Ok(entry.extended_info.clone()),
382            None => Ok(None),
383        }
384    }
385
386    /// List all registered schemas
387    pub async fn list_schemas(&self, query: Option<SchemaQuery>) -> Result<Vec<TableSchema>> {
388        let schemas = self.schemas.read().await;
389        let mut results = Vec::new();
390
391        for (_table_id, entry) in schemas.iter() {
392            // Apply query filters if provided
393            if let Some(ref q) = query {
394                if !self.matches_query(&entry.schema, q) {
395                    continue;
396                }
397            }
398
399            results.push(entry.schema.clone());
400        }
401
402        // Sort by keyspace, then table name
403        results.sort_by(|a, b| {
404            a.keyspace
405                .cmp(&b.keyspace)
406                .then_with(|| a.table.cmp(&b.table))
407        });
408
409        Ok(results)
410    }
411
412    /// Validate a schema
413    #[allow(dead_code)]
414    pub async fn validate_schema(&self, keyspace: &str, table: &str) -> Result<ValidationReport> {
415        let schema = self.get_schema(keyspace, table).await?;
416        let table_id = format!("{}.{}", keyspace, table);
417
418        // Perform comprehensive validation
419        let mut errors = Vec::new();
420        let mut warnings = Vec::new();
421        let mut recommendations = Vec::new();
422
423        // Basic schema structure validation
424        if let Err(e) = schema.validate() {
425            errors.push(ValidationError {
426                code: "SCHEMA_INVALID".to_string(),
427                message: e.to_string(),
428                component: None,
429                severity: ErrorSeverity::Critical,
430            });
431        }
432
433        // UDT validation
434        self.validate_schema_udts(&schema, &mut errors, &mut warnings)
435            .await;
436
437        // Column type validation
438        self.validate_column_types(&schema, &mut errors, &mut warnings)
439            .await;
440
441        // Performance recommendations
442        self.generate_performance_recommendations(&schema, &mut recommendations)
443            .await;
444
445        // Determine overall status
446        let status = if !errors.is_empty() {
447            SchemaValidationStatus::Invalid
448        } else if !warnings.is_empty() {
449            SchemaValidationStatus::ValidWithWarnings
450        } else {
451            SchemaValidationStatus::Valid
452        };
453
454        // Update validation status in registry
455        {
456            let mut schemas = self.schemas.write().await;
457            if let Some(entry) = schemas.get_mut(&table_id) {
458                entry.validation_status = status.clone();
459            }
460        }
461
462        Ok(ValidationReport {
463            table_id,
464            status,
465            errors,
466            warnings,
467            recommendations,
468            validated_at: SystemTime::now(),
469        })
470    }
471
472    /// Get schema version history
473    pub async fn get_schema_history(
474        &self,
475        keyspace: &str,
476        table: &str,
477    ) -> Result<Vec<SchemaVersion>> {
478        if !self.config.enable_versioning {
479            return Err(Error::Schema("Schema versioning is disabled".to_string()));
480        }
481
482        let table_id = format!("{}.{}", keyspace, table);
483        let history = self.version_history.read().await;
484
485        Ok(history.get(&table_id).cloned().unwrap_or_default())
486    }
487
488    /// Remove schema from registry
489    pub async fn remove_schema(&self, keyspace: &str, table: &str) -> Result<()> {
490        let table_id = format!("{}.{}", keyspace, table);
491
492        {
493            let mut schemas = self.schemas.write().await;
494            schemas.remove(&table_id);
495        }
496
497        // Also remove from version history if versioning is enabled
498        if self.config.enable_versioning {
499            let mut history = self.version_history.write().await;
500            history.remove(&table_id);
501        }
502
503        Ok(())
504    }
505
506    /// Generate CQL CREATE statement for schema
507    pub async fn generate_cql(&self, keyspace: &str, table: &str) -> Result<String> {
508        // First try to get extended schema info for better CQL generation
509        if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
510            return self.discovery_engine.generate_cql(&schema_info).await;
511        }
512
513        // Fallback to basic TableSchema CQL generation
514        let schema = self.get_schema(keyspace, table).await?;
515        Ok(self.generate_basic_cql(&schema))
516    }
517
518    /// Export schema as JSON
519    #[cfg(feature = "experimental")]
520    pub async fn export_schema_json(&self, keyspace: &str, table: &str) -> Result<String> {
521        self.export_schema_json_with_config(
522            keyspace,
523            table,
524            &crate::schema::json_exporter::JsonExportConfig::default(),
525        )
526        .await
527    }
528
529    #[cfg(not(feature = "experimental"))]
530    pub async fn export_schema_json(&self, _keyspace: &str, _table: &str) -> Result<String> {
531        Err(crate::error::Error::unsupported_format(
532            "JSON export requires experimental feature",
533        ))
534    }
535
536    /// Export schema as JSON with custom configuration
537    #[cfg(feature = "experimental")]
538    pub async fn export_schema_json_with_config(
539        &self,
540        keyspace: &str,
541        table: &str,
542        config: &crate::schema::json_exporter::JsonExportConfig,
543    ) -> Result<String> {
544        // Try extended schema info first
545        if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
546            return self
547                .discovery_engine
548                .export_json_with_config(&schema_info, config)
549                .await;
550        }
551
552        // Fallback to basic TableSchema JSON
553        let schema = self.get_schema(keyspace, table).await?;
554        let exporter = crate::schema::json_exporter::JsonExporter::with_config(config.clone());
555        exporter.export_table_schema(&schema)
556    }
557
558    #[cfg(not(feature = "experimental"))]
559    pub async fn export_schema_json_with_config<T>(
560        &self,
561        _keyspace: &str,
562        _table: &str,
563        _config: &T,
564    ) -> Result<String> {
565        Err(crate::error::Error::unsupported_format(
566            "JSON export requires experimental feature",
567        ))
568    }
569
570    /// Export schema as compact JSON (minimal format)
571    #[cfg(feature = "experimental")]
572    pub async fn export_schema_json_compact(&self, keyspace: &str, table: &str) -> Result<String> {
573        let config = crate::schema::json_exporter::JsonExportConfig {
574            format_variant: crate::schema::json_exporter::JsonFormat::Compact,
575            include_metadata: false,
576            include_performance_metrics: false,
577            include_type_details: false,
578            pretty_format: false,
579            ..Default::default()
580        };
581        self.export_schema_json_with_config(keyspace, table, &config)
582            .await
583    }
584
585    #[cfg(not(feature = "experimental"))]
586    pub async fn export_schema_json_compact(
587        &self,
588        _keyspace: &str,
589        _table: &str,
590    ) -> Result<String> {
591        Err(crate::error::Error::unsupported_format(
592            "JSON export requires experimental feature",
593        ))
594    }
595
596    /// Export schema for API documentation (OpenAPI-compatible format)
597    #[cfg(feature = "experimental")]
598    pub async fn export_schema_json_openapi(&self, keyspace: &str, table: &str) -> Result<String> {
599        let config = crate::schema::json_exporter::JsonExportConfig {
600            format_variant: crate::schema::json_exporter::JsonFormat::OpenApi,
601            include_documentation: true,
602            include_type_details: true,
603            include_metadata: false,
604            ..Default::default()
605        };
606        self.export_schema_json_with_config(keyspace, table, &config)
607            .await
608    }
609
610    #[cfg(not(feature = "experimental"))]
611    pub async fn export_schema_json_openapi(
612        &self,
613        _keyspace: &str,
614        _table: &str,
615    ) -> Result<String> {
616        Err(crate::error::Error::unsupported_format(
617            "JSON export requires experimental feature",
618        ))
619    }
620
621    /// Export schema for data pipeline tools
622    #[cfg(feature = "experimental")]
623    pub async fn export_schema_json_pipeline(&self, keyspace: &str, table: &str) -> Result<String> {
624        let config = crate::schema::json_exporter::JsonExportConfig {
625            format_variant: crate::schema::json_exporter::JsonFormat::DataPipeline,
626            include_type_details: true,
627            include_table_options: false,
628            include_performance_metrics: true,
629            ..Default::default()
630        };
631        self.export_schema_json_with_config(keyspace, table, &config)
632            .await
633    }
634
635    #[cfg(not(feature = "experimental"))]
636    pub async fn export_schema_json_pipeline(
637        &self,
638        _keyspace: &str,
639        _table: &str,
640    ) -> Result<String> {
641        Err(crate::error::Error::unsupported_format(
642            "JSON export requires experimental feature",
643        ))
644    }
645
646    /// Export multiple schemas as a JSON collection
647    #[cfg(feature = "experimental")]
648    pub async fn export_multiple_schemas_json(
649        &self,
650        schema_infos: &[SchemaInfo],
651    ) -> Result<String> {
652        let exporter = crate::schema::json_exporter::JsonExporter::new();
653        exporter.export_multiple_schemas(schema_infos)
654    }
655
656    #[cfg(not(feature = "experimental"))]
657    pub async fn export_multiple_schemas_json(
658        &self,
659        _schema_infos: &[SchemaInfo],
660    ) -> Result<String> {
661        Err(crate::error::Error::unsupported_format(
662            "JSON export requires experimental feature",
663        ))
664    }
665
666    /// Export all schemas in a keyspace as JSON collection
667    #[cfg(feature = "experimental")]
668    pub async fn export_keyspace_schemas_json(&self, keyspace: &str) -> Result<String> {
669        let mut schema_infos = Vec::new();
670
671        // Get all schemas in the keyspace
672        for (_table_id, entry) in self.schemas.read().await.iter() {
673            if entry.schema.keyspace == keyspace {
674                // Try to get extended schema info
675                if let Ok(Some(schema_info)) = self
676                    .get_schema_info(&entry.schema.keyspace, &entry.schema.table)
677                    .await
678                {
679                    schema_infos.push(schema_info);
680                }
681            }
682        }
683
684        if schema_infos.is_empty() {
685            return Err(Error::NotFound(format!(
686                "No schemas found in keyspace '{}'",
687                keyspace
688            )));
689        }
690
691        self.export_multiple_schemas_json(&schema_infos).await
692    }
693
694    #[cfg(not(feature = "experimental"))]
695    pub async fn export_keyspace_schemas_json(&self, _keyspace: &str) -> Result<String> {
696        Err(crate::error::Error::unsupported_format(
697            "JSON export requires experimental feature",
698        ))
699    }
700
701    /// Register UDT in the registry
702    pub async fn register_udt(&self, udt_def: UdtTypeDef) -> Result<()> {
703        let mut registry = self.udt_registry.write().await;
704        registry.register_udt(udt_def);
705        Ok(())
706    }
707
708    /// Get UDT definition
709    pub async fn get_udt(&self, keyspace: &str, name: &str) -> Result<Option<UdtTypeDef>> {
710        let registry = self.udt_registry.read().await;
711        Ok(registry.get_udt(keyspace, name).cloned())
712    }
713
714    /// Get the internal UDT registry (crate-only access for schema manager)
715    ///
716    /// This method is used by SchemaManager when initialized with a pre-loaded registry
717    /// to preserve the UDT definitions loaded during ingestion.
718    pub(crate) fn get_udt_registry(&self) -> Arc<RwLock<UdtRegistry>> {
719        self.udt_registry.clone()
720    }
721
722    /// Get ComparatorType for a specific column in a table
723    pub async fn get_column_comparator(
724        &self,
725        keyspace: &str,
726        table: &str,
727        column: &str,
728    ) -> Result<ComparatorType> {
729        let schema = self.get_schema(keyspace, table).await?;
730
731        // Find the column
732        let column_def = schema
733            .columns
734            .iter()
735            .find(|c| c.name == column)
736            .ok_or_else(|| {
737                Error::Schema(format!(
738                    "Column '{}' not found in table '{}.{}'",
739                    column, keyspace, table
740                ))
741            })?;
742
743        // Parse the column type and create comparator
744        let cql_type = CqlType::parse(&column_def.data_type)?;
745        ComparatorType::from_cql_type(&cql_type)
746    }
747
748    /// Get ComparatorType for all columns in a table
749    pub async fn get_table_comparators(
750        &self,
751        keyspace: &str,
752        table: &str,
753    ) -> Result<HashMap<String, ComparatorType>> {
754        let schema = self.get_schema(keyspace, table).await?;
755        let mut comparators = HashMap::new();
756
757        for column in &schema.columns {
758            let cql_type = CqlType::parse(&column.data_type)?;
759            let comparator = ComparatorType::from_cql_type(&cql_type)?;
760            comparators.insert(column.name.clone(), comparator);
761        }
762
763        Ok(comparators)
764    }
765
766    /// Get ComparatorType for partition key columns (for key comparison)
767    pub async fn get_partition_key_comparator(
768        &self,
769        keyspace: &str,
770        table: &str,
771    ) -> Result<Vec<ComparatorType>> {
772        let schema = self.get_schema(keyspace, table).await?;
773        let mut comparators = Vec::new();
774
775        // Get partition keys in order
776        let ordered_keys = schema.ordered_partition_keys();
777        for key_column in ordered_keys {
778            let cql_type = CqlType::parse(&key_column.data_type)?;
779            let comparator = ComparatorType::from_cql_type(&cql_type)?;
780            comparators.push(comparator);
781        }
782
783        Ok(comparators)
784    }
785
786    /// Get the complete schema context for parsing operations
787    pub async fn get_parsing_context(&self, keyspace: &str, table: &str) -> Result<ParsingContext> {
788        let schema = self.get_schema(keyspace, table).await?;
789        let partition_comparators = self.get_partition_key_comparator(keyspace, table).await?;
790        let clustering_comparators = self.get_clustering_key_comparator(keyspace, table).await?;
791        let column_comparators = self.get_table_comparators(keyspace, table).await?;
792
793        Ok(ParsingContext {
794            schema,
795            partition_comparators,
796            clustering_comparators,
797            column_comparators,
798        })
799    }
800
801    /// Get ComparatorType for clustering key columns (for clustering comparison)
802    pub async fn get_clustering_key_comparator(
803        &self,
804        keyspace: &str,
805        table: &str,
806    ) -> Result<Vec<ComparatorType>> {
807        let schema = self.get_schema(keyspace, table).await?;
808        let mut comparators = Vec::new();
809
810        // Get clustering keys in order
811        let ordered_keys = schema.ordered_clustering_keys();
812        for key_column in ordered_keys {
813            let cql_type = CqlType::parse(&key_column.data_type)?;
814            let comparator = ComparatorType::from_cql_type(&cql_type)?;
815            comparators.push(comparator);
816        }
817
818        Ok(comparators)
819    }
820
821    /// Validate column type compatibility using ComparatorType
822    pub async fn validate_column_type_compatibility(
823        &self,
824        keyspace: &str,
825        table: &str,
826        column: &str,
827        expected_type: &str,
828    ) -> Result<bool> {
829        let column_comparator = self.get_column_comparator(keyspace, table, column).await?;
830        let expected_cql_type = CqlType::parse(expected_type)?;
831        let expected_comparator = ComparatorType::from_cql_type(&expected_cql_type)?;
832
833        // Check if comparators are compatible (same type structure)
834        Ok(self.comparators_are_compatible(&column_comparator, &expected_comparator))
835    }
836
837    /// Check if two ComparatorTypes are compatible
838    #[allow(clippy::only_used_in_recursion)]
839    fn comparators_are_compatible(&self, left: &ComparatorType, right: &ComparatorType) -> bool {
840        match (left, right) {
841            // Exact matches
842            (ComparatorType::Boolean, ComparatorType::Boolean) => true,
843            (ComparatorType::TinyInt, ComparatorType::TinyInt) => true,
844            (ComparatorType::SmallInt, ComparatorType::SmallInt) => true,
845            (ComparatorType::Int, ComparatorType::Int) => true,
846            (ComparatorType::BigInt, ComparatorType::BigInt) => true,
847            (ComparatorType::Float32, ComparatorType::Float32) => true,
848            (ComparatorType::Float, ComparatorType::Float) => true,
849            (ComparatorType::Text, ComparatorType::Text) => true,
850            (ComparatorType::Blob, ComparatorType::Blob) => true,
851            (ComparatorType::Timestamp, ComparatorType::Timestamp) => true,
852            (ComparatorType::Uuid, ComparatorType::Uuid) => true,
853            (ComparatorType::Json, ComparatorType::Json) => true,
854
855            // Collection types
856            (ComparatorType::List(l_elem), ComparatorType::List(r_elem)) => {
857                self.comparators_are_compatible(l_elem, r_elem)
858            }
859            (ComparatorType::Set(l_elem), ComparatorType::Set(r_elem)) => {
860                self.comparators_are_compatible(l_elem, r_elem)
861            }
862            (ComparatorType::Map(l_key, l_val), ComparatorType::Map(r_key, r_val)) => {
863                self.comparators_are_compatible(l_key, r_key)
864                    && self.comparators_are_compatible(l_val, r_val)
865            }
866
867            // Tuple types
868            (ComparatorType::Tuple(l_fields), ComparatorType::Tuple(r_fields)) => {
869                l_fields.len() == r_fields.len()
870                    && l_fields
871                        .iter()
872                        .zip(r_fields.iter())
873                        .all(|(l, r)| self.comparators_are_compatible(l, r))
874            }
875
876            // UDT types
877            (
878                ComparatorType::Udt {
879                    type_name: l_name,
880                    keyspace: l_ks,
881                    ..
882                },
883                ComparatorType::Udt {
884                    type_name: r_name,
885                    keyspace: r_ks,
886                    ..
887                },
888            ) => l_name == r_name && l_ks == r_ks,
889
890            // Frozen types
891            (ComparatorType::Frozen(l_inner), ComparatorType::Frozen(r_inner)) => {
892                self.comparators_are_compatible(l_inner, r_inner)
893            }
894
895            // Custom types
896            (ComparatorType::Custom(l_name), ComparatorType::Custom(r_name)) => l_name == r_name,
897
898            // No other combinations are compatible
899            _ => false,
900        }
901    }
902
903    /// Get registry statistics
904    pub async fn get_statistics(&self) -> Result<RegistryStatistics> {
905        let schemas = self.schemas.read().await;
906        let udt_registry = self.udt_registry.read().await;
907        let version_history = self.version_history.read().await;
908
909        let mut stats = RegistryStatistics {
910            total_schemas: schemas.len(),
911            schemas_by_keyspace: HashMap::new(),
912            validated_schemas: 0,
913            schemas_with_warnings: 0,
914            invalid_schemas: 0,
915            total_udts: udt_registry.total_udts(),
916            total_versions: version_history.values().map(|v| v.len()).sum(),
917            auto_discovered_schemas: 0,
918            manually_registered_schemas: 0,
919            cache_hit_rate: 0.0, // TODO: Implement cache metrics
920        };
921
922        // Analyze schema distribution and status
923        for entry in schemas.values() {
924            let keyspace = &entry.schema.keyspace;
925            *stats
926                .schemas_by_keyspace
927                .entry(keyspace.clone())
928                .or_insert(0) += 1;
929
930            match entry.validation_status {
931                SchemaValidationStatus::Valid => stats.validated_schemas += 1,
932                SchemaValidationStatus::ValidWithWarnings => stats.schemas_with_warnings += 1,
933                SchemaValidationStatus::Invalid => stats.invalid_schemas += 1,
934                SchemaValidationStatus::NotValidated => {}
935            }
936
937            match entry.source {
938                SchemaSource::Discovered(_) => stats.auto_discovered_schemas += 1,
939                _ => stats.manually_registered_schemas += 1,
940            }
941        }
942
943        Ok(stats)
944    }
945
946    // Private helper methods
947
948    async fn register_discovered_schema(
949        &self,
950        schema: TableSchema,
951        schema_info: Option<SchemaInfo>,
952        sstable_files: Vec<PathBuf>,
953    ) -> Result<()> {
954        let table_id = format!("{}.{}", schema.keyspace, schema.table);
955        let source = SchemaSource::Discovered(sstable_files.clone());
956
957        let entry = SchemaEntry {
958            schema,
959            extended_info: schema_info,
960            registered_at: SystemTime::now(),
961            source,
962            validation_status: SchemaValidationStatus::Valid, // Discovery implies validation
963            _associated_files: sstable_files,
964        };
965
966        let mut schemas = self.schemas.write().await;
967        schemas.insert(table_id, entry);
968
969        Ok(())
970    }
971
972    fn convert_schema_info_to_table_schema(&self, schema_info: &SchemaInfo) -> Result<TableSchema> {
973        let mut columns = Vec::new();
974        let mut partition_keys = Vec::new();
975        let mut clustering_keys = Vec::new();
976
977        // Convert partition keys
978        for (pos, pk) in schema_info.partition_key.iter().enumerate() {
979            partition_keys.push(crate::schema::KeyColumn {
980                name: pk.name.clone(),
981                data_type: pk.data_type.clone(),
982                position: pos,
983            });
984        }
985
986        // Convert clustering keys
987        for ck in &schema_info.clustering_keys {
988            clustering_keys.push(ck.clone());
989        }
990
991        // Convert all columns
992        for col in &schema_info.regular_columns {
993            columns.push(crate::schema::Column {
994                name: col.name.clone(),
995                data_type: col.data_type.clone(),
996                nullable: col.nullable,
997                default: None, // ColumnDefinition doesn't have default_value
998                is_static: false,
999            });
1000        }
1001
1002        // Add static columns
1003        for col in &schema_info.static_columns {
1004            columns.push(crate::schema::Column {
1005                name: col.name.clone(),
1006                data_type: col.data_type.clone(),
1007                nullable: col.nullable,
1008                default: None, // ColumnDefinition doesn't have default_value
1009                is_static: true,
1010            });
1011        }
1012
1013        Ok(TableSchema {
1014            keyspace: schema_info.keyspace.clone(),
1015            table: schema_info.table.clone(),
1016            partition_keys,
1017            clustering_keys,
1018            columns,
1019            comments: HashMap::new(),
1020        })
1021    }
1022
1023    fn is_entry_expired(&self, entry: &SchemaEntry) -> bool {
1024        if !self.config.enable_caching {
1025            return false;
1026        }
1027
1028        let ttl = std::time::Duration::from_secs(self.config.cache_ttl_seconds);
1029        entry
1030            .registered_at
1031            .elapsed()
1032            .unwrap_or(std::time::Duration::ZERO)
1033            > ttl
1034    }
1035
1036    async fn refresh_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
1037        // Implementation for refreshing expired schema
1038        // For now, just try auto-discovery
1039        self.auto_discover_schema(keyspace, table).await
1040    }
1041
1042    async fn auto_discover_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
1043        // Try to find SSTable files for this table
1044        // This is a placeholder - in practice, you'd scan the data directory
1045        let sstable_files = self.find_sstable_files(keyspace, table).await?;
1046
1047        if sstable_files.is_empty() {
1048            return Err(Error::Schema(format!(
1049                "No SSTables found for {}.{}",
1050                keyspace, table
1051            )));
1052        }
1053
1054        self.discover_schema(keyspace, table, &sstable_files).await
1055    }
1056
1057    async fn find_sstable_files(&self, _keyspace: &str, _table: &str) -> Result<Vec<PathBuf>> {
1058        // Placeholder implementation
1059        // In practice, this would scan the data directory structure
1060        Ok(Vec::new())
1061    }
1062
1063    fn matches_query(&self, schema: &TableSchema, query: &SchemaQuery) -> bool {
1064        // Apply keyspace filter
1065        if let Some(ref ks) = query.keyspace {
1066            if &schema.keyspace != ks {
1067                return false;
1068            }
1069        }
1070
1071        // Apply table pattern filter
1072        if let Some(ref pattern) = query.table_pattern {
1073            if !self.matches_pattern(&schema.table, pattern) {
1074                return false;
1075            }
1076        }
1077
1078        // Other filters would be applied here
1079        true
1080    }
1081
1082    fn matches_pattern(&self, text: &str, pattern: &str) -> bool {
1083        // Simple wildcard matching (can be enhanced)
1084        if pattern == "*" {
1085            return true;
1086        }
1087
1088        // For now, just exact match or contains
1089        text == pattern || text.contains(pattern)
1090    }
1091
1092    async fn create_schema_version(&self, table_id: &str, new_schema: &TableSchema) -> Result<()> {
1093        let mut version_history = self.version_history.write().await;
1094        let versions = version_history
1095            .entry(table_id.to_string())
1096            .or_insert_with(Vec::new);
1097
1098        let version_number = versions.len() as u32 + 1;
1099        let changes = if versions.is_empty() {
1100            vec![SchemaChange {
1101                change_type: SchemaChangeType::ColumnAdded,
1102                component: "initial".to_string(),
1103                description: "Initial schema version".to_string(),
1104                old_value: None,
1105                new_value: None,
1106            }]
1107        } else {
1108            // Compare with previous version to detect changes
1109            self.detect_schema_changes(&versions.last().unwrap().schema, new_schema)
1110        };
1111
1112        let new_version = SchemaVersion {
1113            version: version_number,
1114            created_at: SystemTime::now(),
1115            schema: new_schema.clone(),
1116            changes,
1117            source: "registry".to_string(),
1118        };
1119
1120        versions.push(new_version);
1121
1122        // Limit version history size
1123        if versions.len() > self.config.max_versions_per_schema {
1124            versions.remove(0);
1125        }
1126
1127        Ok(())
1128    }
1129
1130    fn detect_schema_changes(
1131        &self,
1132        old_schema: &TableSchema,
1133        new_schema: &TableSchema,
1134    ) -> Vec<SchemaChange> {
1135        let mut changes = Vec::new();
1136
1137        // Compare columns
1138        let old_columns: HashMap<_, _> = old_schema.columns.iter().map(|c| (&c.name, c)).collect();
1139        let new_columns: HashMap<_, _> = new_schema.columns.iter().map(|c| (&c.name, c)).collect();
1140
1141        // Find added columns
1142        for (name, column) in &new_columns {
1143            if !old_columns.contains_key(name) {
1144                changes.push(SchemaChange {
1145                    change_type: SchemaChangeType::ColumnAdded,
1146                    component: name.to_string(),
1147                    description: format!(
1148                        "Column '{}' added with type '{}'",
1149                        name, column.data_type
1150                    ),
1151                    old_value: None,
1152                    new_value: Some(column.data_type.clone()),
1153                });
1154            }
1155        }
1156
1157        // Find removed columns
1158        for name in old_columns.keys() {
1159            if !new_columns.contains_key(name) {
1160                changes.push(SchemaChange {
1161                    change_type: SchemaChangeType::ColumnRemoved,
1162                    component: name.to_string(),
1163                    description: format!("Column '{}' removed", name),
1164                    old_value: None,
1165                    new_value: None,
1166                });
1167            }
1168        }
1169
1170        // Find type changes
1171        for (name, new_column) in &new_columns {
1172            if let Some(old_column) = old_columns.get(name) {
1173                if old_column.data_type != new_column.data_type {
1174                    changes.push(SchemaChange {
1175                        change_type: SchemaChangeType::ColumnTypeChanged,
1176                        component: name.to_string(),
1177                        description: format!("Column '{}' type changed", name),
1178                        old_value: Some(old_column.data_type.clone()),
1179                        new_value: Some(new_column.data_type.clone()),
1180                    });
1181                }
1182            }
1183        }
1184
1185        changes
1186    }
1187
1188    async fn validate_schema_udts(
1189        &self,
1190        schema: &TableSchema,
1191        errors: &mut Vec<ValidationError>,
1192        warnings: &mut Vec<ValidationWarning>,
1193    ) {
1194        let udt_registry = self.udt_registry.read().await;
1195
1196        for column in &schema.columns {
1197            // Check if column type references a UDT
1198            if let Ok(cql_type) = CqlType::parse(&column.data_type) {
1199                self.validate_cql_type_udts(
1200                    &cql_type,
1201                    &schema.keyspace,
1202                    &udt_registry,
1203                    errors,
1204                    warnings,
1205                );
1206            }
1207        }
1208    }
1209
1210    #[allow(clippy::only_used_in_recursion)]
1211    fn validate_cql_type_udts(
1212        &self,
1213        cql_type: &CqlType,
1214        keyspace: &str,
1215        udt_registry: &UdtRegistry,
1216        errors: &mut Vec<ValidationError>,
1217        _warnings: &mut Vec<ValidationWarning>,
1218    ) {
1219        match cql_type {
1220            CqlType::Udt(udt_name, _) => {
1221                if !udt_registry.contains_udt(keyspace, udt_name) {
1222                    errors.push(ValidationError {
1223                        code: "UDT_NOT_FOUND".to_string(),
1224                        message: format!("UDT '{}' not found in keyspace '{}'", udt_name, keyspace),
1225                        component: Some(udt_name.clone()),
1226                        severity: ErrorSeverity::High,
1227                    });
1228                }
1229            }
1230            CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
1231                self.validate_cql_type_udts(inner, keyspace, udt_registry, errors, _warnings);
1232            }
1233            CqlType::Map(key_type, value_type) => {
1234                self.validate_cql_type_udts(key_type, keyspace, udt_registry, errors, _warnings);
1235                self.validate_cql_type_udts(value_type, keyspace, udt_registry, errors, _warnings);
1236            }
1237            CqlType::Tuple(types) => {
1238                for t in types {
1239                    self.validate_cql_type_udts(t, keyspace, udt_registry, errors, _warnings);
1240                }
1241            }
1242            _ => {} // Primitive types don't need UDT validation
1243        }
1244    }
1245
1246    async fn validate_column_types(
1247        &self,
1248        schema: &TableSchema,
1249        errors: &mut Vec<ValidationError>,
1250        _warnings: &mut [ValidationWarning],
1251    ) {
1252        for column in &schema.columns {
1253            if let Err(e) = CqlType::parse(&column.data_type) {
1254                errors.push(ValidationError {
1255                    code: "INVALID_COLUMN_TYPE".to_string(),
1256                    message: format!("Invalid column type '{}': {}", column.data_type, e),
1257                    component: Some(column.name.clone()),
1258                    severity: ErrorSeverity::High,
1259                });
1260            }
1261        }
1262    }
1263
1264    async fn generate_performance_recommendations(
1265        &self,
1266        schema: &TableSchema,
1267        recommendations: &mut Vec<String>,
1268    ) {
1269        // Check for potential performance issues
1270
1271        // Large partition keys
1272        if schema.partition_keys.len() > 3 {
1273            recommendations.push(
1274                "Consider reducing the number of partition key columns for better performance"
1275                    .to_string(),
1276            );
1277        }
1278
1279        // Many clustering keys
1280        if schema.clustering_keys.len() > 5 {
1281            recommendations
1282                .push("Large number of clustering keys may impact query performance".to_string());
1283        }
1284
1285        // Column count
1286        if schema.columns.len() > 50 {
1287            recommendations.push(
1288                "Consider using UDTs or denormalizing wide tables for better performance"
1289                    .to_string(),
1290            );
1291        }
1292    }
1293
1294    fn generate_basic_cql(&self, schema: &TableSchema) -> String {
1295        let mut cql = format!("CREATE TABLE {}.{} (\n", schema.keyspace, schema.table);
1296
1297        // Add columns
1298        for (i, column) in schema.columns.iter().enumerate() {
1299            if i > 0 {
1300                cql.push_str(",\n");
1301            }
1302            cql.push_str(&format!("  {} {}", column.name, column.data_type));
1303        }
1304
1305        // Add primary key
1306        if !schema.partition_keys.is_empty() {
1307            cql.push_str(",\n  PRIMARY KEY (");
1308
1309            if schema.partition_keys.len() == 1 && schema.clustering_keys.is_empty() {
1310                cql.push_str(&schema.partition_keys[0].name);
1311            } else {
1312                // Composite primary key
1313                cql.push('(');
1314                for (i, pk) in schema.partition_keys.iter().enumerate() {
1315                    if i > 0 {
1316                        cql.push_str(", ");
1317                    }
1318                    cql.push_str(&pk.name);
1319                }
1320                cql.push(')');
1321
1322                if !schema.clustering_keys.is_empty() {
1323                    for ck in &schema.clustering_keys {
1324                        cql.push_str(", ");
1325                        cql.push_str(&ck.name);
1326                    }
1327                }
1328            }
1329
1330            cql.push(')');
1331        }
1332
1333        cql.push_str("\n);");
1334        cql
1335    }
1336}
1337
1338/// Registry statistics
1339#[derive(Debug, Clone)]
1340pub struct RegistryStatistics {
1341    /// Total number of registered schemas
1342    pub total_schemas: usize,
1343    /// Schemas grouped by keyspace
1344    pub schemas_by_keyspace: HashMap<String, usize>,
1345    /// Number of validated schemas
1346    pub validated_schemas: usize,
1347    /// Schemas with validation warnings
1348    pub schemas_with_warnings: usize,
1349    /// Invalid schemas
1350    pub invalid_schemas: usize,
1351    /// Total UDTs registered
1352    pub total_udts: usize,
1353    /// Total schema versions stored
1354    pub total_versions: usize,
1355    /// Auto-discovered schemas
1356    pub auto_discovered_schemas: usize,
1357    /// Manually registered schemas
1358    pub manually_registered_schemas: usize,
1359    /// Cache hit rate
1360    pub cache_hit_rate: f64,
1361}
1362
1363/// Schema-driven parsing context containing all necessary type information
1364#[derive(Debug, Clone)]
1365pub struct ParsingContext {
1366    /// The complete table schema
1367    pub schema: TableSchema,
1368    /// Comparators for partition key components
1369    pub partition_comparators: Vec<ComparatorType>,
1370    /// Comparators for clustering key components
1371    pub clustering_comparators: Vec<ComparatorType>,
1372    /// Comparators for all columns by name
1373    pub column_comparators: HashMap<String, ComparatorType>,
1374}
1375
1376impl ParsingContext {
1377    /// Get comparator for a specific column
1378    pub fn get_column_comparator(&self, column_name: &str) -> Option<&ComparatorType> {
1379        self.column_comparators.get(column_name)
1380    }
1381
1382    /// Check if schema-driven parsing is fully configured
1383    pub fn is_complete(&self) -> bool {
1384        !self.partition_comparators.is_empty() || !self.schema.partition_keys.is_empty()
1385    }
1386
1387    /// Get all key columns (partition + clustering) names in order
1388    pub fn get_all_key_column_names(&self) -> Vec<String> {
1389        let mut names = Vec::new();
1390        names.extend(
1391            self.schema
1392                .ordered_partition_keys()
1393                .iter()
1394                .map(|k| k.name.clone()),
1395        );
1396        names.extend(
1397            self.schema
1398                .ordered_clustering_keys()
1399                .iter()
1400                .map(|k| k.name.clone()),
1401        );
1402        names
1403    }
1404}
1405
1406/// Schema validator for comprehensive validation
1407#[derive(Debug)]
1408pub struct SchemaValidator;
1409
1410impl Default for SchemaValidator {
1411    fn default() -> Self {
1412        Self::new()
1413    }
1414}
1415
1416impl SchemaValidator {
1417    pub fn new() -> Self {
1418        Self
1419    }
1420
1421    pub async fn validate_table_schema(&self, schema: &TableSchema) -> Result<()> {
1422        schema.validate()
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use super::*;
1429    use std::collections::HashMap;
1430
1431    async fn make_registry(mut reg_config: SchemaRegistryConfig) -> SchemaRegistry {
1432        reg_config.enable_auto_discovery = false;
1433        let core_config = Config::default();
1434        let platform = Arc::new(Platform::new(&core_config).await.expect("platform"));
1435        SchemaRegistry::new(reg_config, platform, core_config)
1436            .await
1437            .expect("registry")
1438    }
1439
1440    fn simple_schema(name: &str) -> TableSchema {
1441        TableSchema {
1442            keyspace: "test_ks".to_string(),
1443            table: name.to_string(),
1444            partition_keys: vec![crate::schema::KeyColumn {
1445                name: "id".to_string(),
1446                data_type: "uuid".to_string(),
1447                position: 0,
1448            }],
1449            clustering_keys: vec![],
1450            columns: vec![crate::schema::Column {
1451                name: "id".to_string(),
1452                data_type: "uuid".to_string(),
1453                nullable: false,
1454                default: None,
1455                is_static: false,
1456            }],
1457            comments: HashMap::new(),
1458        }
1459    }
1460
1461    #[tokio::test]
1462    async fn test_schema_registry_creation() {
1463        let registry = make_registry(SchemaRegistryConfig::default()).await;
1464        let stats = registry.get_statistics().await.unwrap();
1465        assert_eq!(stats.total_schemas, 0);
1466    }
1467
1468    #[test]
1469    fn test_schema_query_creation() {
1470        let query = SchemaQuery {
1471            keyspace: Some("test_ks".to_string()),
1472            table_pattern: Some("user_*".to_string()),
1473            source_types: None,
1474            validated_only: false,
1475            include_history: false,
1476        };
1477
1478        assert_eq!(query.keyspace.as_ref().unwrap(), "test_ks");
1479        assert_eq!(query.table_pattern.as_ref().unwrap(), "user_*");
1480    }
1481
1482    #[tokio::test]
1483    async fn register_and_retrieve_schema() {
1484        let registry = make_registry(SchemaRegistryConfig::default()).await;
1485        let schema = simple_schema("users");
1486
1487        registry
1488            .register_schema(schema.clone(), SchemaSource::Manual)
1489            .await
1490            .expect("register schema");
1491
1492        let fetched = registry
1493            .get_schema("test_ks", "users")
1494            .await
1495            .expect("fetch schema");
1496        assert_eq!(fetched.table, "users");
1497        assert_eq!(fetched.partition_keys.len(), 1);
1498    }
1499
1500    #[tokio::test]
1501    async fn schema_version_history_tracks_changes() {
1502        let registry = make_registry(SchemaRegistryConfig::default()).await;
1503        let mut schema = simple_schema("accounts");
1504
1505        registry
1506            .register_schema(schema.clone(), SchemaSource::Manual)
1507            .await
1508            .expect("register v1");
1509
1510        schema.columns.push(crate::schema::Column {
1511            name: "status".to_string(),
1512            data_type: "text".to_string(),
1513            nullable: true,
1514            default: None,
1515            is_static: false,
1516        });
1517
1518        registry
1519            .register_schema(schema.clone(), SchemaSource::Manual)
1520            .await
1521            .expect("register v2");
1522
1523        let history = registry
1524            .get_schema_history("test_ks", "accounts")
1525            .await
1526            .expect("history");
1527
1528        assert_eq!(
1529            history.len(),
1530            1,
1531            "Second registration should emit first version"
1532        );
1533        assert!(history[0]
1534            .changes
1535            .iter()
1536            .any(|change| matches!(change.change_type, SchemaChangeType::ColumnAdded)));
1537    }
1538
1539    #[tokio::test]
1540    async fn expired_cached_schema_invokes_discovery_path() {
1541        let mut config = SchemaRegistryConfig::default();
1542        config.cache_ttl_seconds = 0;
1543        config.enable_auto_discovery = true;
1544        let registry = make_registry(config).await;
1545
1546        registry
1547            .register_schema(simple_schema("events"), SchemaSource::Manual)
1548            .await
1549            .expect("register events schema");
1550
1551        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1552
1553        let err = registry
1554            .get_schema("test_ks", "events")
1555            .await
1556            .expect_err("expired schema should attempt discovery");
1557
1558        assert!(matches!(err, Error::Schema(message) if message.contains("No SSTables found")));
1559    }
1560}