Skip to main content

cqlite_core/schema/
mod.rs

1//! Schema definition and parsing for CQLite
2//!
3//! This module handles schema definitions that describe the structure of
4//! Cassandra tables for schema-aware SSTable reading. It supports both
5//! JSON-based schema definitions and CQL CREATE TABLE statement parsing.
6
7pub mod aggregator;
8pub mod cql_parser;
9pub mod discovery;
10#[cfg(feature = "experimental")]
11pub mod json_exporter;
12pub mod parser;
13pub mod registry;
14
15// Re-export aggregator components
16pub use aggregator::{
17    AggregatorConfig, LoadErrorType, LoadResult, SchemaAggregator, SchemaLoadError,
18    SchemaLoadWarning,
19};
20
21// Re-export CQL parsing functions
22pub use cql_parser::{
23    cql_type_to_type_id, extract_table_name, parse_cql_schema, parse_cql_schema_with_visitor,
24    parse_create_table, table_name_matches,
25};
26
27// Re-export discovery and registry components
28pub use discovery::{
29    ColumnDefinition, DiscoveryMethod, IndexDefinition, SchemaDiscoveryConfig,
30    SchemaDiscoveryEngine, SchemaInfo, SchemaMetadata, TableOptions, TypeInfo, UDTDefinition,
31    ValidationError, ValidationResults, ValidationStatus, ValidationWarning,
32};
33
34pub use registry::{
35    ParsingContext, RegistryStatistics, SchemaChange, SchemaChangeType, SchemaQuery,
36    SchemaRegistry, SchemaRegistryConfig, SchemaSource, SchemaValidationStatus, SchemaValidator,
37    SchemaVersion, ValidationReport,
38};
39
40pub use parser::SchemaParser;
41
42#[cfg(feature = "experimental")]
43pub use json_exporter::{
44    JsonClusteringKey, JsonColumn, JsonExportConfig, JsonExporter, JsonFormat, JsonIndex,
45    JsonMetadata, JsonPerformanceMetrics, JsonPrimaryKey, JsonSchema, JsonTable, JsonTableOptions,
46    JsonUDT, JsonValidationResults,
47};
48
49// Type alias for backward compatibility
50pub type ColumnSpec = Column;
51
52use crate::error::{Error, Result};
53use crate::parser::header::SSTableHeader;
54use crate::parser::types::CqlTypeId;
55use crate::storage::StorageEngine;
56use crate::types::{ComparatorType, UdtTypeDef};
57use crate::Config;
58use serde::{Deserialize, Serialize};
59use std::collections::HashMap;
60use std::fs;
61use std::path::Path;
62use std::sync::Arc;
63use tokio::sync::RwLock;
64
65/// Table schema definition loaded from JSON
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TableSchema {
68    /// Keyspace name
69    pub keyspace: String,
70
71    /// Table name
72    pub table: String,
73
74    /// Partition key columns (ordered)
75    pub partition_keys: Vec<KeyColumn>,
76
77    /// Clustering key columns (ordered)  
78    pub clustering_keys: Vec<ClusteringColumn>,
79
80    /// All columns in the table
81    pub columns: Vec<Column>,
82
83    /// Optional metadata
84    #[serde(default)]
85    pub comments: HashMap<String, String>,
86}
87
88/// Partition key column definition
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct KeyColumn {
91    /// Column name
92    pub name: String,
93
94    /// CQL data type
95    #[serde(rename = "type")]
96    pub data_type: String,
97
98    /// Position in composite key (0-based)
99    pub position: usize,
100}
101
102/// Clustering key column with ordering
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct ClusteringColumn {
105    /// Column name
106    pub name: String,
107
108    /// CQL data type
109    #[serde(rename = "type")]
110    pub data_type: String,
111
112    /// Position in clustering key (0-based)
113    pub position: usize,
114
115    /// Sort order (ASC or DESC)
116    #[serde(default)]
117    pub order: ClusteringOrder,
118}
119
120/// Clustering order enum for sorting
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
122pub enum ClusteringOrder {
123    /// Ascending order
124    #[default]
125    Asc,
126    /// Descending order
127    Desc,
128}
129
130impl From<&str> for ClusteringOrder {
131    fn from(s: &str) -> Self {
132        match s.to_uppercase().as_str() {
133            "DESC" => ClusteringOrder::Desc,
134            _ => ClusteringOrder::Asc,
135        }
136    }
137}
138
139impl std::fmt::Display for ClusteringOrder {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            ClusteringOrder::Asc => write!(f, "ASC"),
143            ClusteringOrder::Desc => write!(f, "DESC"),
144        }
145    }
146}
147
148/// Regular column definition
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Column {
151    /// Column name
152    pub name: String,
153
154    /// CQL data type (e.g., "text", "bigint", "list<int>")
155    #[serde(rename = "type")]
156    pub data_type: String,
157
158    /// Whether column can be null
159    #[serde(default)]
160    pub nullable: bool,
161
162    /// Default value (if any)
163    #[serde(default)]
164    pub default: Option<serde_json::Value>,
165
166    /// Whether this is a STATIC column
167    #[serde(default)]
168    pub is_static: bool,
169}
170
171/// Parsed CQL data type
172#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
173pub enum CqlType {
174    // Primitive types
175    Boolean,
176    TinyInt,
177    SmallInt,
178    Int,
179    BigInt,
180    Counter,
181    Float,
182    Double,
183    Decimal,
184    Text,
185    Ascii,
186    Varchar,
187    Blob,
188    Timestamp,
189    Date,
190    Time,
191    Uuid,
192    TimeUuid,
193    Inet,
194    Duration,
195    Varint,
196
197    // Collection types (implemented as tuples)
198    List(Box<CqlType>),
199    Set(Box<CqlType>),
200    Map(Box<CqlType>, Box<CqlType>),
201
202    // Complex types
203    Tuple(Vec<CqlType>),
204    Udt(String, Vec<(String, CqlType)>), // name, fields
205    Frozen(Box<CqlType>),
206
207    // Custom/Unknown
208    Custom(String),
209}
210
211/// UDT Schema Registry for managing User Defined Type definitions
212#[derive(Debug, Clone, Default, Serialize, Deserialize)]
213pub struct UdtRegistry {
214    /// Registered UDT type definitions by keyspace and type name
215    udts: HashMap<String, HashMap<String, UdtTypeDef>>,
216}
217
218impl UdtRegistry {
219    /// Create a new UDT registry
220    pub fn new() -> Self {
221        Self {
222            udts: HashMap::new(),
223        }
224    }
225
226    /// Create a new UDT registry with enhanced Cassandra 5.0 defaults
227    pub fn with_cassandra5_defaults() -> Self {
228        let mut registry = Self::new();
229        registry.load_cassandra5_system_udts();
230        registry
231    }
232
233    /// Register a UDT type definition
234    pub fn register_udt(&mut self, udt_def: UdtTypeDef) {
235        let keyspace_udts = self.udts.entry(udt_def.keyspace.clone()).or_default();
236        keyspace_udts.insert(udt_def.name.clone(), udt_def);
237    }
238
239    /// Get a UDT definition by keyspace and name
240    pub fn get_udt(&self, keyspace: &str, name: &str) -> Option<&UdtTypeDef> {
241        self.udts.get(keyspace)?.get(name)
242    }
243
244    /// Get all UDTs in a keyspace
245    pub fn get_keyspace_udts(&self, keyspace: &str) -> Option<&HashMap<String, UdtTypeDef>> {
246        self.udts.get(keyspace)
247    }
248
249    /// List all registered UDT names in a keyspace
250    pub fn list_udt_names(&self, keyspace: &str) -> Vec<&str> {
251        self.udts
252            .get(keyspace)
253            .map(|udts| udts.keys().map(|s| s.as_str()).collect())
254            .unwrap_or_default()
255    }
256
257    /// Check if a UDT is registered
258    pub fn contains_udt(&self, keyspace: &str, name: &str) -> bool {
259        self.udts
260            .get(keyspace)
261            .map(|udts| udts.contains_key(name))
262            .unwrap_or(false)
263    }
264
265    /// Remove a UDT definition
266    pub fn remove_udt(&mut self, keyspace: &str, name: &str) -> Option<UdtTypeDef> {
267        self.udts.get_mut(keyspace)?.remove(name)
268    }
269
270    /// Clear all UDTs in a keyspace
271    pub fn clear_keyspace(&mut self, keyspace: &str) {
272        self.udts.remove(keyspace);
273    }
274
275    /// Get total number of registered UDTs
276    pub fn total_udts(&self) -> usize {
277        self.udts.values().map(|udts| udts.len()).sum()
278    }
279
280    /// Load enhanced Cassandra 5.0 system UDTs with complex nested structures
281    fn load_cassandra5_system_udts(&mut self) {
282        // Enhanced address UDT for Cassandra 5.0 compatibility
283        let address_udt = UdtTypeDef::new("system".to_string(), "address".to_string())
284            .with_field("street".to_string(), CqlType::Text, true)
285            .with_field("street2".to_string(), CqlType::Text, true)
286            .with_field("city".to_string(), CqlType::Text, true)
287            .with_field("state".to_string(), CqlType::Text, true)
288            .with_field("zip_code".to_string(), CqlType::Text, true)
289            .with_field("country".to_string(), CqlType::Text, true)
290            .with_field(
291                "coordinates".to_string(),
292                CqlType::Tuple(vec![CqlType::Double, CqlType::Double]),
293                true,
294            );
295
296        self.register_udt(address_udt);
297
298        // Enhanced person UDT with collections and nested types
299        let person_udt = UdtTypeDef::new("system".to_string(), "person".to_string())
300            .with_field("id".to_string(), CqlType::Uuid, false)
301            .with_field("first_name".to_string(), CqlType::Text, false)
302            .with_field("last_name".to_string(), CqlType::Text, false)
303            .with_field("middle_name".to_string(), CqlType::Text, true)
304            .with_field("age".to_string(), CqlType::Int, true)
305            .with_field("email".to_string(), CqlType::Text, true)
306            .with_field(
307                "phone_numbers".to_string(),
308                CqlType::Set(Box::new(CqlType::Text)),
309                true,
310            )
311            .with_field(
312                "addresses".to_string(),
313                CqlType::List(Box::new(CqlType::Udt("address".to_string(), vec![]))),
314                true,
315            )
316            .with_field(
317                "metadata".to_string(),
318                CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text)),
319                true,
320            );
321
322        self.register_udt(person_udt);
323
324        // Contact info UDT for complex nested scenarios
325        let contact_info_udt = UdtTypeDef::new("system".to_string(), "contact_info".to_string())
326            .with_field(
327                "person".to_string(),
328                CqlType::Udt("person".to_string(), vec![]),
329                false,
330            )
331            .with_field(
332                "primary_address".to_string(),
333                CqlType::Udt("address".to_string(), vec![]),
334                true,
335            )
336            .with_field(
337                "emergency_contacts".to_string(),
338                CqlType::List(Box::new(CqlType::Udt("person".to_string(), vec![]))),
339                true,
340            )
341            .with_field("last_updated".to_string(), CqlType::Timestamp, true);
342
343        self.register_udt(contact_info_udt);
344    }
345
346    /// Resolve UDT with full dependency chain
347    pub fn resolve_udt_with_dependencies(
348        &self,
349        keyspace: &str,
350        name: &str,
351    ) -> crate::Result<&UdtTypeDef> {
352        let udt = self.get_udt(keyspace, name).ok_or_else(|| {
353            crate::Error::schema(format!(
354                "UDT '{}' not found in keyspace '{}'",
355                name, keyspace
356            ))
357        })?;
358
359        // Validate all field dependencies exist
360        for field in &udt.fields {
361            self.validate_field_type_dependencies(&field.field_type, keyspace)?;
362        }
363
364        Ok(udt)
365    }
366
367    /// Validate that all UDT field type dependencies exist in the registry
368    fn validate_field_type_dependencies(
369        &self,
370        field_type: &CqlType,
371        keyspace: &str,
372    ) -> crate::Result<()> {
373        match field_type {
374            CqlType::Udt(udt_name, _) => {
375                if !self.contains_udt(keyspace, udt_name) {
376                    return Err(crate::Error::schema(format!(
377                        "UDT dependency '{}' not found in keyspace '{}'",
378                        udt_name, keyspace
379                    )));
380                }
381            }
382            CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
383                self.validate_field_type_dependencies(inner, keyspace)?;
384            }
385            CqlType::Map(key_type, value_type) => {
386                self.validate_field_type_dependencies(key_type, keyspace)?;
387                self.validate_field_type_dependencies(value_type, keyspace)?;
388            }
389            CqlType::Tuple(field_types) => {
390                for tuple_field_type in field_types {
391                    self.validate_field_type_dependencies(tuple_field_type, keyspace)?;
392                }
393            }
394            _ => {} // Primitive types don't need validation
395        }
396        Ok(())
397    }
398
399    /// Get all UDTs that depend on a given UDT (for cascade operations)
400    pub fn get_dependent_udts(&self, keyspace: &str, udt_name: &str) -> Vec<&UdtTypeDef> {
401        let mut dependents = Vec::new();
402
403        if let Some(keyspace_udts) = self.udts.get(keyspace) {
404            for udt in keyspace_udts.values() {
405                if udt.name == udt_name {
406                    continue; // Skip self
407                }
408
409                // Check if this UDT depends on the target UDT
410                if self.udt_depends_on(udt, udt_name) {
411                    dependents.push(udt);
412                }
413            }
414        }
415
416        dependents
417    }
418
419    /// Check if a UDT depends on another UDT (recursively)
420    fn udt_depends_on(&self, udt: &UdtTypeDef, target_udt: &str) -> bool {
421        for field in &udt.fields {
422            if self.field_type_depends_on(&field.field_type, target_udt) {
423                return true;
424            }
425        }
426        false
427    }
428
429    /// Check if a field type depends on a UDT
430    #[allow(clippy::only_used_in_recursion)]
431    fn field_type_depends_on(&self, field_type: &CqlType, target_udt: &str) -> bool {
432        match field_type {
433            CqlType::Udt(udt_name, _) => udt_name == target_udt,
434            CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
435                self.field_type_depends_on(inner, target_udt)
436            }
437            CqlType::Map(key_type, value_type) => {
438                self.field_type_depends_on(key_type, target_udt)
439                    || self.field_type_depends_on(value_type, target_udt)
440            }
441            CqlType::Tuple(field_types) => field_types
442                .iter()
443                .any(|ft| self.field_type_depends_on(ft, target_udt)),
444            _ => false,
445        }
446    }
447
448    /// Register UDT with dependency validation
449    pub fn register_udt_with_validation(&mut self, udt_def: UdtTypeDef) -> crate::Result<()> {
450        // Validate dependencies exist
451        for field in &udt_def.fields {
452            self.validate_field_type_dependencies(&field.field_type, &udt_def.keyspace)?;
453        }
454
455        // Check for circular dependencies
456        if self.would_create_circular_dependency(&udt_def) {
457            return Err(crate::Error::schema(format!(
458                "Registering UDT '{}' would create circular dependency",
459                udt_def.name
460            )));
461        }
462
463        self.register_udt(udt_def);
464        Ok(())
465    }
466
467    /// Check if registering a UDT would create circular dependencies
468    fn would_create_circular_dependency(&self, udt_def: &UdtTypeDef) -> bool {
469        // This is complex - for now, just check direct self-reference
470        for field in &udt_def.fields {
471            if self.field_type_depends_on(&field.field_type, &udt_def.name) {
472                return true;
473            }
474        }
475        false
476    }
477
478    /// Export UDT definitions for debugging
479    pub fn export_definitions(&self, keyspace: &str) -> Vec<String> {
480        let mut definitions = Vec::new();
481
482        if let Some(keyspace_udts) = self.udts.get(keyspace) {
483            for udt in keyspace_udts.values() {
484                let mut def = format!("CREATE TYPE {}.{} (\n", keyspace, udt.name);
485
486                for (i, field) in udt.fields.iter().enumerate() {
487                    if i > 0 {
488                        def.push_str(",\n");
489                    }
490                    def.push_str(&format!(
491                        "  {} {}",
492                        field.name,
493                        self.format_cql_type(&field.field_type)
494                    ));
495                }
496
497                def.push_str("\n);");
498                definitions.push(def);
499            }
500        }
501
502        definitions
503    }
504
505    /// Format CQL type for CREATE TYPE statements
506    #[allow(clippy::only_used_in_recursion)]
507    fn format_cql_type(&self, cql_type: &CqlType) -> String {
508        match cql_type {
509            CqlType::Boolean => "boolean".to_string(),
510            CqlType::TinyInt => "tinyint".to_string(),
511            CqlType::SmallInt => "smallint".to_string(),
512            CqlType::Int => "int".to_string(),
513            CqlType::BigInt => "bigint".to_string(),
514            CqlType::Counter => "counter".to_string(),
515            CqlType::Float => "float".to_string(),
516            CqlType::Double => "double".to_string(),
517            CqlType::Text | CqlType::Varchar => "text".to_string(),
518            CqlType::Ascii => "ascii".to_string(),
519            CqlType::Blob => "blob".to_string(),
520            CqlType::Timestamp => "timestamp".to_string(),
521            CqlType::Date => "date".to_string(),
522            CqlType::Time => "time".to_string(),
523            CqlType::Uuid => "uuid".to_string(),
524            CqlType::TimeUuid => "timeuuid".to_string(),
525            CqlType::Inet => "inet".to_string(),
526            CqlType::Duration => "duration".to_string(),
527            CqlType::Varint => "varint".to_string(),
528            CqlType::Decimal => "decimal".to_string(),
529            CqlType::List(inner) => format!("list<{}>", self.format_cql_type(inner)),
530            CqlType::Set(inner) => format!("set<{}>", self.format_cql_type(inner)),
531            CqlType::Map(key, value) => format!(
532                "map<{}, {}>",
533                self.format_cql_type(key),
534                self.format_cql_type(value)
535            ),
536            CqlType::Udt(name, _) => name.clone(),
537            CqlType::Tuple(types) => {
538                let type_strs: Vec<String> =
539                    types.iter().map(|t| self.format_cql_type(t)).collect();
540                format!("tuple<{}>", type_strs.join(", "))
541            }
542            CqlType::Frozen(inner) => format!("frozen<{}>", self.format_cql_type(inner)),
543            CqlType::Custom(name) => name.clone(),
544        }
545    }
546}
547
548impl TableSchema {
549    /// Extract schema from SSTable header column metadata
550    ///
551    /// This method constructs a TableSchema from the column information
552    /// embedded in the SSTable header's SerializationHeader.
553    pub fn from_sstable_header(header: &SSTableHeader) -> Result<Self> {
554        // Separate columns by role
555        let mut partition_keys = Vec::new();
556        let mut clustering_keys = Vec::new();
557        let mut regular_columns = Vec::new();
558
559        for col_info in &header.columns {
560            if col_info.is_primary_key {
561                if col_info.is_clustering {
562                    clustering_keys.push(col_info);
563                } else {
564                    partition_keys.push(col_info);
565                }
566            } else {
567                regular_columns.push(col_info);
568            }
569        }
570
571        // Validate all partition keys have positions
572        for col_info in &partition_keys {
573            if col_info.key_position.is_none() {
574                return Err(Error::schema(format!(
575                    "Partition key column '{}' missing key_position in SSTable header",
576                    col_info.name
577                )));
578            }
579        }
580
581        // Validate all clustering keys have positions
582        for col_info in &clustering_keys {
583            if col_info.key_position.is_none() {
584                return Err(Error::schema(format!(
585                    "Clustering key column '{}' missing key_position in SSTable header",
586                    col_info.name
587                )));
588            }
589        }
590
591        // Sort by header's key_position to establish canonical ordering
592        partition_keys.sort_by_key(|c| c.key_position.unwrap());
593        clustering_keys.sort_by_key(|c| c.key_position.unwrap());
594
595        // Build KeyColumn with contiguous 0-based positions for CQLite's internal representation
596        // (SSTable key_position values may have gaps; we normalize to [0,1,2,...])
597        let partition_keys: Vec<KeyColumn> = partition_keys
598            .iter()
599            .enumerate()
600            .map(|(pos, col)| KeyColumn {
601                name: col.name.clone(),
602                data_type: col.column_type.clone(),
603                position: pos, // Contiguous internal position, not header key_position
604            })
605            .collect();
606
607        // Build ClusteringColumn with contiguous positions
608        let clustering_keys: Vec<ClusteringColumn> = clustering_keys
609            .iter()
610            .enumerate()
611            .map(|(pos, col)| ClusteringColumn {
612                name: col.name.clone(),
613                data_type: col.column_type.clone(),
614                position: pos, // Contiguous internal position, not header key_position
615                order: ClusteringOrder::Asc, // TODO(Future): Extract clustering order from header properties when format documented
616            })
617            .collect();
618
619        // All columns including keys
620        let columns: Vec<Column> = header
621            .columns
622            .iter()
623            .map(|col| Column {
624                name: col.name.clone(),
625                data_type: col.column_type.clone(),
626                nullable: !col.is_primary_key, // Primary keys are non-nullable
627                default: None,
628                is_static: false, // TODO: Header format doesn't track static columns yet
629            })
630            .collect();
631
632        if partition_keys.is_empty() {
633            return Err(Error::schema(
634                "No partition keys found in SSTable header".to_string(),
635            ));
636        }
637
638        let schema = TableSchema {
639            keyspace: header.keyspace.clone(),
640            table: header.table_name.clone(),
641            partition_keys,
642            clustering_keys,
643            columns,
644            comments: HashMap::new(),
645        };
646
647        schema.validate()?;
648        Ok(schema)
649    }
650
651    /// Load schema from JSON file
652    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
653        let content = fs::read_to_string(path)
654            .map_err(|e| Error::schema(format!("Failed to read schema file: {}", e)))?;
655
656        Self::from_json(&content)
657    }
658
659    /// Parse schema from JSON string
660    pub fn from_json(json: &str) -> Result<Self> {
661        let schema: TableSchema = serde_json::from_str(json)
662            .map_err(|e| Error::schema(format!("Invalid JSON schema: {}", e)))?;
663
664        schema.validate()?;
665        Ok(schema)
666    }
667
668    /// Save schema to JSON file
669    pub fn to_file<P: AsRef<Path>>(&self, path: P) -> Result<()> {
670        let json = serde_json::to_string_pretty(self)
671            .map_err(|e| Error::serialization(format!("Failed to serialize schema: {}", e)))?;
672
673        fs::write(path, json)
674            .map_err(|e| Error::schema(format!("Failed to write schema file: {}", e)))?;
675
676        Ok(())
677    }
678
679    /// Validate schema consistency
680    pub fn validate(&self) -> Result<()> {
681        // Validate keyspace and table names
682        if self.keyspace.is_empty() {
683            return Err(Error::schema("Keyspace name cannot be empty".to_string()));
684        }
685
686        if self.table.is_empty() {
687            return Err(Error::schema("Table name cannot be empty".to_string()));
688        }
689
690        // Must have at least one partition key
691        if self.partition_keys.is_empty() {
692            return Err(Error::schema(
693                "Table must have at least one partition key".to_string(),
694            ));
695        }
696
697        // Validate partition key positions are contiguous
698        let mut positions: Vec<_> = self.partition_keys.iter().map(|k| k.position).collect();
699        positions.sort();
700        for (i, &pos) in positions.iter().enumerate() {
701            if pos != i {
702                return Err(Error::schema(format!(
703                    "Partition key positions must be contiguous starting from 0, found gap at position {}",
704                    i
705                )));
706            }
707        }
708
709        // Validate clustering key positions (if any)
710        if !self.clustering_keys.is_empty() {
711            let mut positions: Vec<_> = self.clustering_keys.iter().map(|k| k.position).collect();
712            positions.sort();
713            for (i, &pos) in positions.iter().enumerate() {
714                if pos != i {
715                    return Err(Error::schema(format!(
716                        "Clustering key positions must be contiguous starting from 0, found gap at position {}",
717                        i
718                    )));
719                }
720            }
721        }
722
723        // Validate data types
724        for column in &self.columns {
725            CqlType::parse(&column.data_type).map_err(|e| {
726                Error::schema(format!(
727                    "Invalid data type '{}' for column '{}': {}",
728                    column.data_type, column.name, e
729                ))
730            })?;
731        }
732
733        // TODO: Add UDT type validation - check that referenced UDTs exist in registry
734
735        // Validate all key columns exist in columns list
736        for key in &self.partition_keys {
737            if !self.columns.iter().any(|c| c.name == key.name) {
738                return Err(Error::schema(format!(
739                    "Partition key '{}' not found in columns list",
740                    key.name
741                )));
742            }
743        }
744
745        for key in &self.clustering_keys {
746            if !self.columns.iter().any(|c| c.name == key.name) {
747                return Err(Error::schema(format!(
748                    "Clustering key '{}' not found in columns list",
749                    key.name
750                )));
751            }
752        }
753
754        Ok(())
755    }
756
757    /// Get column by name
758    pub fn get_column(&self, name: &str) -> Option<&Column> {
759        self.columns.iter().find(|c| c.name == name)
760    }
761
762    /// Check if column is a partition key
763    pub fn is_partition_key(&self, name: &str) -> bool {
764        self.partition_keys.iter().any(|k| k.name == name)
765    }
766
767    /// Check if column is a clustering key
768    pub fn is_clustering_key(&self, name: &str) -> bool {
769        self.clustering_keys.iter().any(|k| k.name == name)
770    }
771
772    /// Get partition key columns in order
773    pub fn ordered_partition_keys(&self) -> Vec<&KeyColumn> {
774        let mut keys = self.partition_keys.iter().collect::<Vec<_>>();
775        keys.sort_by_key(|k| k.position);
776        keys
777    }
778
779    /// Get clustering key columns in order
780    pub fn ordered_clustering_keys(&self) -> Vec<&ClusteringColumn> {
781        let mut keys = self.clustering_keys.iter().collect::<Vec<_>>();
782        keys.sort_by_key(|k| k.position);
783        keys
784    }
785
786    /// Get ComparatorType for a specific column
787    pub fn get_column_comparator(&self, column_name: &str) -> Result<ComparatorType> {
788        let column = self
789            .get_column(column_name)
790            .ok_or_else(|| Error::Schema(format!("Column '{}' not found", column_name)))?;
791
792        let cql_type = CqlType::parse(&column.data_type)?;
793        ComparatorType::from_cql_type(&cql_type)
794    }
795
796    /// Get ComparatorTypes for all columns
797    pub fn get_all_comparators(&self) -> Result<HashMap<String, ComparatorType>> {
798        let mut comparators = HashMap::new();
799
800        for column in &self.columns {
801            let cql_type = CqlType::parse(&column.data_type)?;
802            let comparator = ComparatorType::from_cql_type(&cql_type)?;
803            comparators.insert(column.name.clone(), comparator);
804        }
805
806        Ok(comparators)
807    }
808
809    /// Get ComparatorTypes for partition key columns in order
810    pub fn get_partition_key_comparators(&self) -> Result<Vec<ComparatorType>> {
811        let mut comparators = Vec::new();
812        let ordered_keys = self.ordered_partition_keys();
813
814        for key_column in ordered_keys {
815            let cql_type = CqlType::parse(&key_column.data_type)?;
816            let comparator = ComparatorType::from_cql_type(&cql_type)?;
817            comparators.push(comparator);
818        }
819
820        Ok(comparators)
821    }
822
823    /// Get ComparatorTypes for clustering key columns in order
824    pub fn get_clustering_key_comparators(&self) -> Result<Vec<ComparatorType>> {
825        let mut comparators = Vec::new();
826        let ordered_keys = self.ordered_clustering_keys();
827
828        for key_column in ordered_keys {
829            let cql_type = CqlType::parse(&key_column.data_type)?;
830            let comparator = ComparatorType::from_cql_type(&cql_type)?;
831            comparators.push(comparator);
832        }
833
834        Ok(comparators)
835    }
836
837    /// Check if a column type is compatible with an expected type
838    pub fn is_column_type_compatible(
839        &self,
840        column_name: &str,
841        expected_type: &str,
842    ) -> Result<bool> {
843        let column_comparator = self.get_column_comparator(column_name)?;
844        let expected_cql_type = CqlType::parse(expected_type)?;
845        let expected_comparator = ComparatorType::from_cql_type(&expected_cql_type)?;
846
847        Ok(self.comparators_are_compatible(&column_comparator, &expected_comparator))
848    }
849
850    /// Check if two ComparatorTypes are compatible (helper method)
851    #[allow(clippy::only_used_in_recursion)]
852    fn comparators_are_compatible(&self, left: &ComparatorType, right: &ComparatorType) -> bool {
853        match (left, right) {
854            // Exact matches
855            (ComparatorType::Boolean, ComparatorType::Boolean) => true,
856            (ComparatorType::TinyInt, ComparatorType::TinyInt) => true,
857            (ComparatorType::SmallInt, ComparatorType::SmallInt) => true,
858            (ComparatorType::Int, ComparatorType::Int) => true,
859            (ComparatorType::BigInt, ComparatorType::BigInt) => true,
860            (ComparatorType::Float32, ComparatorType::Float32) => true,
861            (ComparatorType::Float, ComparatorType::Float) => true,
862            (ComparatorType::Text, ComparatorType::Text) => true,
863            (ComparatorType::Blob, ComparatorType::Blob) => true,
864            (ComparatorType::Timestamp, ComparatorType::Timestamp) => true,
865            (ComparatorType::Uuid, ComparatorType::Uuid) => true,
866            (ComparatorType::Json, ComparatorType::Json) => true,
867
868            // Collection types
869            (ComparatorType::List(l_elem), ComparatorType::List(r_elem)) => {
870                self.comparators_are_compatible(l_elem, r_elem)
871            }
872            (ComparatorType::Set(l_elem), ComparatorType::Set(r_elem)) => {
873                self.comparators_are_compatible(l_elem, r_elem)
874            }
875            (ComparatorType::Map(l_key, l_val), ComparatorType::Map(r_key, r_val)) => {
876                self.comparators_are_compatible(l_key, r_key)
877                    && self.comparators_are_compatible(l_val, r_val)
878            }
879
880            // Tuple types
881            (ComparatorType::Tuple(l_fields), ComparatorType::Tuple(r_fields)) => {
882                l_fields.len() == r_fields.len()
883                    && l_fields
884                        .iter()
885                        .zip(r_fields.iter())
886                        .all(|(l, r)| self.comparators_are_compatible(l, r))
887            }
888
889            // UDT types
890            (
891                ComparatorType::Udt {
892                    type_name: l_name,
893                    keyspace: l_ks,
894                    ..
895                },
896                ComparatorType::Udt {
897                    type_name: r_name,
898                    keyspace: r_ks,
899                    ..
900                },
901            ) => l_name == r_name && l_ks == r_ks,
902
903            // Frozen types
904            (ComparatorType::Frozen(l_inner), ComparatorType::Frozen(r_inner)) => {
905                self.comparators_are_compatible(l_inner, r_inner)
906            }
907
908            // Custom types
909            (ComparatorType::Custom(l_name), ComparatorType::Custom(r_name)) => l_name == r_name,
910
911            // No other combinations are compatible
912            _ => false,
913        }
914    }
915
916    /// Create a minimal test schema (for testing only)
917    #[cfg(test)]
918    pub fn new_for_testing(keyspace: &str, table: &str) -> Self {
919        Self {
920            keyspace: keyspace.to_string(),
921            table: table.to_string(),
922            partition_keys: vec![KeyColumn {
923                name: "id".to_string(),
924                data_type: "int".to_string(),
925                position: 0,
926            }],
927            clustering_keys: vec![],
928            columns: vec![Column {
929                name: "id".to_string(),
930                data_type: "int".to_string(),
931                nullable: false,
932                default: None,
933                is_static: false,
934            }],
935            comments: HashMap::new(),
936        }
937    }
938}
939
940impl CqlType {
941    fn split_top_level_types(type_str: &str) -> Result<Vec<&str>> {
942        let mut parts = Vec::new();
943        let mut depth = 0usize;
944        let mut start = 0usize;
945
946        for (index, ch) in type_str.char_indices() {
947            match ch {
948                '<' => depth += 1,
949                '>' => {
950                    if depth == 0 {
951                        return Err(Error::schema(format!(
952                            "Invalid nested type syntax: {}",
953                            type_str
954                        )));
955                    }
956                    depth -= 1;
957                }
958                ',' if depth == 0 => {
959                    parts.push(type_str[start..index].trim());
960                    start = index + ch.len_utf8();
961                }
962                _ => {}
963            }
964        }
965
966        if depth != 0 {
967            return Err(Error::schema(format!(
968                "Unbalanced nested type syntax: {}",
969                type_str
970            )));
971        }
972
973        parts.push(type_str[start..].trim());
974        Ok(parts.into_iter().filter(|part| !part.is_empty()).collect())
975    }
976
977    /// Parse CQL type string into structured type
978    pub fn parse(type_str: &str) -> Result<Self> {
979        let type_str = type_str.trim();
980
981        // Handle frozen types
982        if let Some(inner) = type_str.strip_prefix("frozen<") {
983            if let Some(inner) = inner.strip_suffix('>') {
984                return Ok(CqlType::Frozen(Box::new(Self::parse(inner)?)));
985            }
986        }
987
988        // Handle collection types
989        if let Some(inner) = type_str.strip_prefix("list<") {
990            if let Some(inner) = inner.strip_suffix('>') {
991                return Ok(CqlType::List(Box::new(Self::parse(inner)?)));
992            }
993        }
994
995        if let Some(inner) = type_str.strip_prefix("set<") {
996            if let Some(inner) = inner.strip_suffix('>') {
997                return Ok(CqlType::Set(Box::new(Self::parse(inner)?)));
998            }
999        }
1000
1001        if let Some(inner) = type_str.strip_prefix("map<") {
1002            if let Some(inner) = inner.strip_suffix('>') {
1003                let parts = Self::split_top_level_types(inner)?;
1004                if parts.len() != 2 {
1005                    return Err(Error::schema(format!("Invalid map type: {}", type_str)));
1006                }
1007                return Ok(CqlType::Map(
1008                    Box::new(Self::parse(parts[0].trim())?),
1009                    Box::new(Self::parse(parts[1].trim())?),
1010                ));
1011            }
1012        }
1013
1014        // Handle tuple types
1015        if let Some(inner) = type_str.strip_prefix("tuple<") {
1016            if let Some(inner) = inner.strip_suffix('>') {
1017                let parts = Self::split_top_level_types(inner)?;
1018                let mut types = Vec::new();
1019                for part in parts {
1020                    types.push(Self::parse(part.trim())?);
1021                }
1022                return Ok(CqlType::Tuple(types));
1023            }
1024        }
1025
1026        // Handle UDT types - format: udt_name or keyspace.udt_name
1027        // But first check if it's not a primitive type in uppercase
1028        let lowercase_type = type_str.to_lowercase();
1029        let is_primitive = matches!(
1030            lowercase_type.as_str(),
1031            "boolean"
1032                | "bool"
1033                | "tinyint"
1034                | "smallint"
1035                | "int"
1036                | "integer"
1037                | "bigint"
1038                | "long"
1039                | "counter"
1040                | "float"
1041                | "double"
1042                | "decimal"
1043                | "text"
1044                | "varchar"
1045                | "ascii"
1046                | "blob"
1047                | "timestamp"
1048                | "date"
1049                | "time"
1050                | "uuid"
1051                | "timeuuid"
1052                | "inet"
1053                | "duration"
1054        );
1055
1056        if !is_primitive
1057            && type_str
1058                .chars()
1059                .all(|c| c.is_alphanumeric() || c == '_' || c == '.')
1060            && !type_str.chars().all(|c| c.is_ascii_lowercase())
1061        {
1062            // This might be a UDT name - store as custom type for now
1063            // Full validation requires UDT registry context
1064            return Ok(CqlType::Custom(format!("udt:{}", type_str)));
1065        }
1066
1067        // Primitive types
1068        match type_str.to_lowercase().as_str() {
1069            "boolean" | "bool" => Ok(CqlType::Boolean),
1070            "tinyint" => Ok(CqlType::TinyInt),
1071            "smallint" => Ok(CqlType::SmallInt),
1072            "int" | "integer" => Ok(CqlType::Int),
1073            "bigint" | "long" => Ok(CqlType::BigInt),
1074            "counter" => Ok(CqlType::Counter),
1075            "float" => Ok(CqlType::Float),
1076            "double" => Ok(CqlType::Double),
1077            "decimal" => Ok(CqlType::Decimal),
1078            "text" | "varchar" => Ok(CqlType::Text),
1079            "ascii" => Ok(CqlType::Ascii),
1080            "blob" => Ok(CqlType::Blob),
1081            "timestamp" => Ok(CqlType::Timestamp),
1082            "date" => Ok(CqlType::Date),
1083            "time" => Ok(CqlType::Time),
1084            "uuid" => Ok(CqlType::Uuid),
1085            "timeuuid" => Ok(CqlType::TimeUuid),
1086            "inet" => Ok(CqlType::Inet),
1087            "duration" => Ok(CqlType::Duration),
1088            "varint" => Ok(CqlType::Varint),
1089            _ => Ok(CqlType::Custom(type_str.to_string())),
1090        }
1091    }
1092
1093    /// Get the expected byte size for fixed-size types
1094    pub fn fixed_size(&self) -> Option<usize> {
1095        match self {
1096            CqlType::Boolean => Some(1),
1097            CqlType::TinyInt => Some(1),
1098            CqlType::SmallInt => Some(2),
1099            CqlType::Int => Some(4),
1100            CqlType::BigInt => Some(8),
1101            CqlType::Counter => Some(8),
1102            CqlType::Float => Some(4),
1103            CqlType::Double => Some(8),
1104            CqlType::Timestamp => Some(8),
1105            CqlType::Date => Some(4),
1106            CqlType::Time => Some(8),
1107            CqlType::Uuid | CqlType::TimeUuid => Some(16),
1108            CqlType::Inet => Some(16), // IPv6, IPv4 is variable
1109            // Variable size types
1110            CqlType::Text
1111            | CqlType::Ascii
1112            | CqlType::Varchar
1113            | CqlType::Blob
1114            | CqlType::Decimal
1115            | CqlType::Duration
1116            | CqlType::Varint => None,
1117            // Collections and complex types are variable
1118            CqlType::List(_)
1119            | CqlType::Set(_)
1120            | CqlType::Map(_, _)
1121            | CqlType::Tuple(_)
1122            | CqlType::Udt(_, _) => None,
1123            CqlType::Frozen(inner) => inner.fixed_size(),
1124            CqlType::Custom(_) => None,
1125        }
1126    }
1127
1128    /// Check if this type is a collection
1129    pub fn is_collection(&self) -> bool {
1130        matches!(
1131            self,
1132            CqlType::List(_) | CqlType::Set(_) | CqlType::Map(_, _)
1133        )
1134    }
1135}
1136
1137/// Schema management service for handling table schemas and UDT definitions
1138#[derive(Debug)]
1139pub struct SchemaManager {
1140    #[allow(dead_code)]
1141    storage: Arc<StorageEngine>,
1142    schemas: Arc<RwLock<HashMap<String, TableSchema>>>,
1143    /// UDT registry for managing User Defined Types (internal, use accessor methods)
1144    pub(crate) udt_registry: Arc<RwLock<UdtRegistry>>,
1145}
1146
1147impl SchemaManager {
1148    /// Create a new schema manager from a path
1149    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1150        // Create temporary storage engine (not actually used in this context)
1151        let config = Config::default();
1152        let platform = Arc::new(crate::platform::Platform::new(&config).await?);
1153        let storage = Arc::new(
1154            StorageEngine::open(
1155                path.as_ref(),
1156                &config,
1157                platform,
1158                #[cfg(feature = "state_machine")]
1159                None,
1160            )
1161            .await?,
1162        );
1163
1164        Ok(Self {
1165            storage,
1166            schemas: Arc::new(RwLock::new(HashMap::new())),
1167            udt_registry: Arc::new(RwLock::new(UdtRegistry::new())),
1168        })
1169    }
1170
1171    /// Create a new schema manager with storage
1172    pub async fn new_with_storage(storage: Arc<StorageEngine>, _config: &Config) -> Result<Self> {
1173        let manager = Self {
1174            storage,
1175            schemas: Arc::new(RwLock::new(HashMap::new())),
1176            udt_registry: Arc::new(RwLock::new(UdtRegistry::new())),
1177        };
1178
1179        // Load built-in UDT definitions for Cassandra 5.0 compatibility
1180        manager.load_default_udts().await;
1181
1182        Ok(manager)
1183    }
1184
1185    /// Create a new schema manager with a pre-loaded SchemaRegistry
1186    ///
1187    /// This constructor is used when schemas are loaded from external .cql files
1188    /// during ingestion, allowing the pre-loaded schemas to be used by the query engine.
1189    ///
1190    /// # Arguments
1191    ///
1192    /// * `storage` - The storage engine instance
1193    /// * `registry` - Pre-loaded schema registry from ingestion
1194    /// * `_config` - Database configuration (currently unused)
1195    pub async fn new_with_registry(
1196        storage: Arc<StorageEngine>,
1197        registry: Arc<tokio::sync::RwLock<registry::SchemaRegistry>>,
1198        _config: &Config,
1199    ) -> Result<Self> {
1200        // Acquire both schemas and UDT registry in a single lock scope to prevent deadlocks
1201        let (loaded_schemas, udt_registry) = {
1202            let registry_guard = registry.read().await;
1203            let schemas = registry_guard.list_schemas(None).await?;
1204            let udt_reg = registry_guard.get_udt_registry();
1205            (schemas, udt_reg)
1206        }; // Lock is dropped here before further processing
1207
1208        // Populate internal schemas map
1209        let mut schemas_map = HashMap::new();
1210        for schema in loaded_schemas {
1211            let table_id = format!("{}.{}", schema.keyspace, schema.table);
1212            schemas_map.insert(table_id, schema);
1213        }
1214
1215        let manager = Self {
1216            storage,
1217            schemas: Arc::new(RwLock::new(schemas_map)),
1218            udt_registry,
1219        };
1220
1221        Ok(manager)
1222    }
1223
1224    /// Load default UDT definitions that are commonly used in Cassandra
1225    async fn load_default_udts(&self) {
1226        // Common address UDT used in many Cassandra schemas
1227        let address_udt = UdtTypeDef::new("test_keyspace".to_string(), "address".to_string())
1228            .with_field("street".to_string(), CqlType::Text, true)
1229            .with_field("city".to_string(), CqlType::Text, true)
1230            .with_field("state".to_string(), CqlType::Text, true)
1231            .with_field("zip_code".to_string(), CqlType::Text, true)
1232            .with_field("country".to_string(), CqlType::Text, true);
1233
1234        self.udt_registry.write().await.register_udt(address_udt);
1235
1236        // Enhanced person UDT with nested address
1237        let person_udt = UdtTypeDef::new("test_keyspace".to_string(), "person".to_string())
1238            .with_field("name".to_string(), CqlType::Text, true)
1239            .with_field("age".to_string(), CqlType::Int, true)
1240            .with_field("email".to_string(), CqlType::Text, true)
1241            .with_field(
1242                "addresses".to_string(),
1243                CqlType::List(Box::new(CqlType::Udt(
1244                    "address".to_string(),
1245                    vec![
1246                        ("street".to_string(), CqlType::Text),
1247                        ("city".to_string(), CqlType::Text),
1248                        ("state".to_string(), CqlType::Text),
1249                        ("zip_code".to_string(), CqlType::Text),
1250                        ("country".to_string(), CqlType::Text),
1251                    ],
1252                ))),
1253                true,
1254            )
1255            .with_field(
1256                "contact_info".to_string(),
1257                CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text)),
1258                true,
1259            );
1260
1261        self.udt_registry.write().await.register_udt(person_udt);
1262
1263        // Company UDT with nested person and address relationships
1264        let company_udt = UdtTypeDef::new("test_keyspace".to_string(), "company".to_string())
1265            .with_field("name".to_string(), CqlType::Text, false)
1266            .with_field(
1267                "headquarters".to_string(),
1268                CqlType::Udt(
1269                    "address".to_string(),
1270                    vec![
1271                        ("street".to_string(), CqlType::Text),
1272                        ("city".to_string(), CqlType::Text),
1273                        ("state".to_string(), CqlType::Text),
1274                        ("zip_code".to_string(), CqlType::Text),
1275                        ("country".to_string(), CqlType::Text),
1276                    ],
1277                ),
1278                true,
1279            )
1280            .with_field(
1281                "employees".to_string(),
1282                CqlType::Set(Box::new(CqlType::Udt("person".to_string(), vec![]))),
1283                true,
1284            )
1285            .with_field("founded_year".to_string(), CqlType::Int, true);
1286
1287        self.udt_registry.write().await.register_udt(company_udt);
1288    }
1289
1290    /// Register a new UDT type definition
1291    pub async fn register_udt(&self, udt_def: UdtTypeDef) {
1292        self.udt_registry.write().await.register_udt(udt_def);
1293    }
1294
1295    /// Get a UDT definition (returns a cloned UdtTypeDef)
1296    pub async fn get_udt(&self, keyspace: &str, name: &str) -> Option<UdtTypeDef> {
1297        self.udt_registry
1298            .read()
1299            .await
1300            .get_udt(keyspace, name)
1301            .cloned()
1302    }
1303
1304    /// Load schema for a table
1305    pub async fn load_schema(&self, table_name: &str) -> Result<TableSchema> {
1306        // Read lock first to check if schema exists
1307        let schemas = self.schemas.read().await;
1308        if let Some(schema) = schemas.get(table_name) {
1309            return Ok(schema.clone());
1310        }
1311        drop(schemas); // Explicit drop before write lock
1312
1313        // Create default schema
1314        let schema = self.create_default_schema(table_name);
1315
1316        // Write lock to insert
1317        self.schemas
1318            .write()
1319            .await
1320            .insert(table_name.to_string(), schema.clone());
1321        Ok(schema)
1322    }
1323
1324    /// Create a default schema for unknown tables
1325    fn create_default_schema(&self, table_name: &str) -> TableSchema {
1326        TableSchema {
1327            keyspace: "default".to_string(),
1328            table: table_name.to_string(),
1329            partition_keys: vec![KeyColumn {
1330                name: "id".to_string(),
1331                data_type: "uuid".to_string(),
1332                position: 0,
1333            }],
1334            clustering_keys: vec![],
1335            columns: vec![Column {
1336                name: "id".to_string(),
1337                data_type: "uuid".to_string(),
1338                nullable: false,
1339                default: None,
1340                is_static: false,
1341            }],
1342            comments: HashMap::new(),
1343        }
1344    }
1345
1346    /// Parse and register a schema from a CQL CREATE TABLE statement
1347    pub async fn parse_and_register_cql_schema(&self, cql: &str) -> Result<TableSchema> {
1348        let schema = cql_parser::parse_cql_schema(cql)?;
1349        let table_key = format!("{}.{}", schema.keyspace, schema.table);
1350        self.schemas
1351            .write()
1352            .await
1353            .insert(table_key.clone(), schema.clone());
1354        Ok(schema)
1355    }
1356
1357    /// Find schema by table name with optional keyspace matching
1358    pub async fn find_schema_by_table(
1359        &self,
1360        keyspace: &Option<String>,
1361        table: &str,
1362    ) -> Option<TableSchema> {
1363        let schemas = self.schemas.read().await;
1364
1365        // First try exact match if keyspace provided
1366        if let Some(ks) = keyspace {
1367            let key = format!("{}.{}", ks, table);
1368            if let Some(schema) = schemas.get(&key) {
1369                return Some(schema.clone());
1370            }
1371        }
1372
1373        // Then try to find any schema matching the table name
1374        schemas
1375            .values()
1376            .find(|schema| {
1377                cql_parser::table_name_matches(
1378                    &Some(schema.keyspace.clone()),
1379                    &schema.table,
1380                    keyspace,
1381                    table,
1382                )
1383            })
1384            .cloned()
1385    }
1386
1387    /// Extract table information from CQL without full parsing
1388    pub fn extract_table_info(&self, cql: &str) -> Result<(Option<String>, String)> {
1389        cql_parser::extract_table_name(cql)
1390    }
1391
1392    /// Convert CQL type string to internal type ID
1393    pub fn cql_type_to_internal(&self, cql_type: &str) -> Result<CqlTypeId> {
1394        cql_parser::cql_type_to_type_id(cql_type)
1395    }
1396
1397    /// Get table schema by name (async for compatibility)
1398    pub async fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
1399        // Try to find schema by table name
1400        if let Some(schema) = self.find_schema_by_table(&None, table_name).await {
1401            Ok(schema)
1402        } else {
1403            Err(Error::Schema(format!(
1404                "Table schema not found: {}",
1405                table_name
1406            )))
1407        }
1408    }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use super::*;
1414
1415    #[test]
1416    fn test_schema_validation() {
1417        let schema_json = r#"
1418        {
1419            "keyspace": "test",
1420            "table": "users",
1421            "partition_keys": [
1422                {"name": "id", "type": "bigint", "position": 0}
1423            ],
1424            "clustering_keys": [],
1425            "columns": [
1426                {"name": "id", "type": "bigint", "nullable": false},
1427                {"name": "name", "type": "text", "nullable": true}
1428            ]
1429        }
1430        "#;
1431
1432        let schema = TableSchema::from_json(schema_json).unwrap();
1433        assert_eq!(schema.keyspace, "test");
1434        assert_eq!(schema.table, "users");
1435        assert_eq!(schema.partition_keys.len(), 1);
1436        assert_eq!(schema.columns.len(), 2);
1437    }
1438
1439    #[test]
1440    fn test_cql_type_parsing() {
1441        assert_eq!(CqlType::parse("text").unwrap(), CqlType::Text);
1442        assert_eq!(CqlType::parse("bigint").unwrap(), CqlType::BigInt);
1443
1444        match CqlType::parse("list<int>").unwrap() {
1445            CqlType::List(inner) => assert_eq!(*inner, CqlType::Int),
1446            _ => panic!("Expected List type"),
1447        }
1448
1449        match CqlType::parse("map<text, bigint>").unwrap() {
1450            CqlType::Map(key, value) => {
1451                assert_eq!(*key, CqlType::Text);
1452                assert_eq!(*value, CqlType::BigInt);
1453            }
1454            _ => panic!("Expected Map type"),
1455        }
1456
1457        match CqlType::parse("tuple<text, list<int>, map<text, text>>").unwrap() {
1458            CqlType::Tuple(fields) => {
1459                assert_eq!(fields.len(), 3);
1460                assert_eq!(fields[0], CqlType::Text);
1461                assert_eq!(fields[1], CqlType::List(Box::new(CqlType::Int)));
1462                assert_eq!(
1463                    fields[2],
1464                    CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text))
1465                );
1466            }
1467            _ => panic!("Expected Tuple type"),
1468        }
1469    }
1470
1471    #[test]
1472    fn test_schema_validation_failures() {
1473        // Missing partition key
1474        let invalid_schema = r#"
1475        {
1476            "keyspace": "test",
1477            "table": "users", 
1478            "partition_keys": [],
1479            "clustering_keys": [],
1480            "columns": []
1481        }
1482        "#;
1483
1484        assert!(TableSchema::from_json(invalid_schema).is_err());
1485
1486        // Invalid type
1487        let invalid_type = r#"
1488        {
1489            "keyspace": "test",
1490            "table": "users",
1491            "partition_keys": [
1492                {"name": "id", "type": "invalid_type", "position": 0}
1493            ],
1494            "clustering_keys": [],
1495            "columns": [
1496                {"name": "id", "type": "invalid_type", "nullable": false}
1497            ]
1498        }
1499        "#;
1500
1501        // This should succeed as we allow custom types
1502        assert!(TableSchema::from_json(invalid_type).is_ok());
1503    }
1504
1505    #[tokio::test]
1506    async fn test_concurrent_schema_access() {
1507        // Create a SchemaManager for testing concurrent access
1508        let config = Config::default();
1509        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
1510        let temp_dir = tempfile::tempdir().unwrap();
1511        let storage = Arc::new(
1512            StorageEngine::open(
1513                temp_dir.path(),
1514                &config,
1515                platform,
1516                #[cfg(feature = "state_machine")]
1517                None,
1518            )
1519            .await
1520            .unwrap(),
1521        );
1522
1523        let manager = Arc::new(
1524            SchemaManager::new_with_storage(storage, &config)
1525                .await
1526                .unwrap(),
1527        );
1528
1529        // Spawn 10 concurrent tasks accessing 3 different tables
1530        let mut handles = vec![];
1531        for i in 0..10 {
1532            let m = Arc::clone(&manager);
1533            let handle = tokio::spawn(async move {
1534                let table = format!("table_{}", i % 3); // 3 different tables, concurrent access
1535                m.load_schema(&table).await.unwrap()
1536            });
1537            handles.push(handle);
1538        }
1539
1540        // Wait for all tasks to complete
1541        for handle in handles {
1542            handle.await.unwrap();
1543        }
1544
1545        // Verify schemas were created
1546        let schemas = manager.schemas.read().await;
1547        assert!(schemas.len() <= 3); // At most 3 unique tables
1548        assert!(schemas.contains_key("table_0"));
1549        assert!(schemas.contains_key("table_1"));
1550        assert!(schemas.contains_key("table_2"));
1551    }
1552
1553    #[test]
1554    fn test_schema_from_sstable_header() {
1555        use crate::parser::header::{
1556            CassandraVersion, ColumnInfo, CompressionInfo, SSTableHeader, SSTableStats,
1557        };
1558        use std::collections::HashMap;
1559
1560        let columns = vec![
1561            ColumnInfo {
1562                name: "id".to_string(),
1563                column_type: "int".to_string(),
1564                is_primary_key: true,
1565                key_position: Some(0),
1566                is_static: false,
1567                is_clustering: false,
1568            },
1569            ColumnInfo {
1570                name: "name".to_string(),
1571                column_type: "text".to_string(),
1572                is_primary_key: false,
1573                key_position: None,
1574                is_static: false,
1575                is_clustering: false,
1576            },
1577        ];
1578
1579        let header = SSTableHeader {
1580            cassandra_version: CassandraVersion::V5_0Bti,
1581            version: 1,
1582            table_id: [0; 16],
1583            keyspace: "test_ks".to_string(),
1584            table_name: "test_table".to_string(),
1585            generation: 1,
1586            compression: CompressionInfo {
1587                algorithm: "NONE".to_string(),
1588                chunk_size: 0,
1589                parameters: HashMap::new(),
1590            },
1591            stats: SSTableStats::default(),
1592            columns,
1593            properties: HashMap::new(),
1594        };
1595
1596        let schema = TableSchema::from_sstable_header(&header).unwrap();
1597
1598        assert_eq!(schema.keyspace, "test_ks");
1599        assert_eq!(schema.table, "test_table");
1600        assert_eq!(schema.partition_keys.len(), 1);
1601        assert_eq!(schema.partition_keys[0].name, "id");
1602        assert_eq!(schema.columns.len(), 2);
1603    }
1604}