1pub mod aggregator;
8pub mod cql_parser;
9pub mod discovery;
10#[cfg(feature = "experimental")]
11pub mod json_exporter;
12pub mod parser;
13pub mod registry;
14
15pub use aggregator::{
17 AggregatorConfig, LoadErrorType, LoadResult, SchemaAggregator, SchemaLoadError,
18 SchemaLoadWarning,
19};
20
21pub use cql_parser::{
23 cql_type_to_type_id, extract_table_name, parse_cql_schema, parse_cql_schema_with_visitor,
24 parse_create_table, table_name_matches,
25};
26
27pub use discovery::{
29 ColumnDefinition, DiscoveryMethod, IndexDefinition, SchemaDiscoveryConfig,
30 SchemaDiscoveryEngine, SchemaInfo, SchemaMetadata, TableOptions, TypeInfo, UDTDefinition,
31 ValidationError, ValidationResults, ValidationStatus, ValidationWarning,
32};
33
34pub use registry::{
35 ParsingContext, RegistryStatistics, SchemaChange, SchemaChangeType, SchemaQuery,
36 SchemaRegistry, SchemaRegistryConfig, SchemaSource, SchemaValidationStatus, SchemaValidator,
37 SchemaVersion, ValidationReport,
38};
39
40pub use parser::SchemaParser;
41
42#[cfg(feature = "experimental")]
43pub use json_exporter::{
44 JsonClusteringKey, JsonColumn, JsonExportConfig, JsonExporter, JsonFormat, JsonIndex,
45 JsonMetadata, JsonPerformanceMetrics, JsonPrimaryKey, JsonSchema, JsonTable, JsonTableOptions,
46 JsonUDT, JsonValidationResults,
47};
48
49pub type ColumnSpec = Column;
51
52use crate::error::{Error, Result};
53use crate::parser::header::SSTableHeader;
54use crate::parser::types::CqlTypeId;
55use crate::storage::StorageEngine;
56use crate::types::{ComparatorType, UdtTypeDef};
57use crate::Config;
58use serde::{Deserialize, Serialize};
59use std::collections::HashMap;
60use std::fs;
61use std::path::Path;
62use std::sync::Arc;
63use tokio::sync::RwLock;
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TableSchema {
68 pub keyspace: String,
70
71 pub table: String,
73
74 pub partition_keys: Vec<KeyColumn>,
76
77 pub clustering_keys: Vec<ClusteringColumn>,
79
80 pub columns: Vec<Column>,
82
83 #[serde(default)]
85 pub comments: HashMap<String, String>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct KeyColumn {
91 pub name: String,
93
94 #[serde(rename = "type")]
96 pub data_type: String,
97
98 pub position: usize,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct ClusteringColumn {
105 pub name: String,
107
108 #[serde(rename = "type")]
110 pub data_type: String,
111
112 pub position: usize,
114
115 #[serde(default)]
117 pub order: ClusteringOrder,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
122pub enum ClusteringOrder {
123 #[default]
125 Asc,
126 Desc,
128}
129
130impl From<&str> for ClusteringOrder {
131 fn from(s: &str) -> Self {
132 match s.to_uppercase().as_str() {
133 "DESC" => ClusteringOrder::Desc,
134 _ => ClusteringOrder::Asc,
135 }
136 }
137}
138
139impl std::fmt::Display for ClusteringOrder {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 ClusteringOrder::Asc => write!(f, "ASC"),
143 ClusteringOrder::Desc => write!(f, "DESC"),
144 }
145 }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Column {
151 pub name: String,
153
154 #[serde(rename = "type")]
156 pub data_type: String,
157
158 #[serde(default)]
160 pub nullable: bool,
161
162 #[serde(default)]
164 pub default: Option<serde_json::Value>,
165
166 #[serde(default)]
168 pub is_static: bool,
169}
170
171#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
173pub enum CqlType {
174 Boolean,
176 TinyInt,
177 SmallInt,
178 Int,
179 BigInt,
180 Counter,
181 Float,
182 Double,
183 Decimal,
184 Text,
185 Ascii,
186 Varchar,
187 Blob,
188 Timestamp,
189 Date,
190 Time,
191 Uuid,
192 TimeUuid,
193 Inet,
194 Duration,
195 Varint,
196
197 List(Box<CqlType>),
199 Set(Box<CqlType>),
200 Map(Box<CqlType>, Box<CqlType>),
201
202 Tuple(Vec<CqlType>),
204 Udt(String, Vec<(String, CqlType)>), Frozen(Box<CqlType>),
206
207 Custom(String),
209}
210
211#[derive(Debug, Clone, Default, Serialize, Deserialize)]
213pub struct UdtRegistry {
214 udts: HashMap<String, HashMap<String, UdtTypeDef>>,
216}
217
218impl UdtRegistry {
219 pub fn new() -> Self {
221 Self {
222 udts: HashMap::new(),
223 }
224 }
225
226 pub fn with_cassandra5_defaults() -> Self {
228 let mut registry = Self::new();
229 registry.load_cassandra5_system_udts();
230 registry
231 }
232
233 pub fn register_udt(&mut self, udt_def: UdtTypeDef) {
235 let keyspace_udts = self.udts.entry(udt_def.keyspace.clone()).or_default();
236 keyspace_udts.insert(udt_def.name.clone(), udt_def);
237 }
238
239 pub fn get_udt(&self, keyspace: &str, name: &str) -> Option<&UdtTypeDef> {
241 self.udts.get(keyspace)?.get(name)
242 }
243
244 pub fn get_keyspace_udts(&self, keyspace: &str) -> Option<&HashMap<String, UdtTypeDef>> {
246 self.udts.get(keyspace)
247 }
248
249 pub fn list_udt_names(&self, keyspace: &str) -> Vec<&str> {
251 self.udts
252 .get(keyspace)
253 .map(|udts| udts.keys().map(|s| s.as_str()).collect())
254 .unwrap_or_default()
255 }
256
257 pub fn contains_udt(&self, keyspace: &str, name: &str) -> bool {
259 self.udts
260 .get(keyspace)
261 .map(|udts| udts.contains_key(name))
262 .unwrap_or(false)
263 }
264
265 pub fn remove_udt(&mut self, keyspace: &str, name: &str) -> Option<UdtTypeDef> {
267 self.udts.get_mut(keyspace)?.remove(name)
268 }
269
270 pub fn clear_keyspace(&mut self, keyspace: &str) {
272 self.udts.remove(keyspace);
273 }
274
275 pub fn total_udts(&self) -> usize {
277 self.udts.values().map(|udts| udts.len()).sum()
278 }
279
280 fn load_cassandra5_system_udts(&mut self) {
282 let address_udt = UdtTypeDef::new("system".to_string(), "address".to_string())
284 .with_field("street".to_string(), CqlType::Text, true)
285 .with_field("street2".to_string(), CqlType::Text, true)
286 .with_field("city".to_string(), CqlType::Text, true)
287 .with_field("state".to_string(), CqlType::Text, true)
288 .with_field("zip_code".to_string(), CqlType::Text, true)
289 .with_field("country".to_string(), CqlType::Text, true)
290 .with_field(
291 "coordinates".to_string(),
292 CqlType::Tuple(vec![CqlType::Double, CqlType::Double]),
293 true,
294 );
295
296 self.register_udt(address_udt);
297
298 let person_udt = UdtTypeDef::new("system".to_string(), "person".to_string())
300 .with_field("id".to_string(), CqlType::Uuid, false)
301 .with_field("first_name".to_string(), CqlType::Text, false)
302 .with_field("last_name".to_string(), CqlType::Text, false)
303 .with_field("middle_name".to_string(), CqlType::Text, true)
304 .with_field("age".to_string(), CqlType::Int, true)
305 .with_field("email".to_string(), CqlType::Text, true)
306 .with_field(
307 "phone_numbers".to_string(),
308 CqlType::Set(Box::new(CqlType::Text)),
309 true,
310 )
311 .with_field(
312 "addresses".to_string(),
313 CqlType::List(Box::new(CqlType::Udt("address".to_string(), vec![]))),
314 true,
315 )
316 .with_field(
317 "metadata".to_string(),
318 CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text)),
319 true,
320 );
321
322 self.register_udt(person_udt);
323
324 let contact_info_udt = UdtTypeDef::new("system".to_string(), "contact_info".to_string())
326 .with_field(
327 "person".to_string(),
328 CqlType::Udt("person".to_string(), vec![]),
329 false,
330 )
331 .with_field(
332 "primary_address".to_string(),
333 CqlType::Udt("address".to_string(), vec![]),
334 true,
335 )
336 .with_field(
337 "emergency_contacts".to_string(),
338 CqlType::List(Box::new(CqlType::Udt("person".to_string(), vec![]))),
339 true,
340 )
341 .with_field("last_updated".to_string(), CqlType::Timestamp, true);
342
343 self.register_udt(contact_info_udt);
344 }
345
346 pub fn resolve_udt_with_dependencies(
348 &self,
349 keyspace: &str,
350 name: &str,
351 ) -> crate::Result<&UdtTypeDef> {
352 let udt = self.get_udt(keyspace, name).ok_or_else(|| {
353 crate::Error::schema(format!(
354 "UDT '{}' not found in keyspace '{}'",
355 name, keyspace
356 ))
357 })?;
358
359 for field in &udt.fields {
361 self.validate_field_type_dependencies(&field.field_type, keyspace)?;
362 }
363
364 Ok(udt)
365 }
366
367 fn validate_field_type_dependencies(
369 &self,
370 field_type: &CqlType,
371 keyspace: &str,
372 ) -> crate::Result<()> {
373 match field_type {
374 CqlType::Udt(udt_name, _) => {
375 if !self.contains_udt(keyspace, udt_name) {
376 return Err(crate::Error::schema(format!(
377 "UDT dependency '{}' not found in keyspace '{}'",
378 udt_name, keyspace
379 )));
380 }
381 }
382 CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
383 self.validate_field_type_dependencies(inner, keyspace)?;
384 }
385 CqlType::Map(key_type, value_type) => {
386 self.validate_field_type_dependencies(key_type, keyspace)?;
387 self.validate_field_type_dependencies(value_type, keyspace)?;
388 }
389 CqlType::Tuple(field_types) => {
390 for tuple_field_type in field_types {
391 self.validate_field_type_dependencies(tuple_field_type, keyspace)?;
392 }
393 }
394 _ => {} }
396 Ok(())
397 }
398
399 pub fn get_dependent_udts(&self, keyspace: &str, udt_name: &str) -> Vec<&UdtTypeDef> {
401 let mut dependents = Vec::new();
402
403 if let Some(keyspace_udts) = self.udts.get(keyspace) {
404 for udt in keyspace_udts.values() {
405 if udt.name == udt_name {
406 continue; }
408
409 if self.udt_depends_on(udt, udt_name) {
411 dependents.push(udt);
412 }
413 }
414 }
415
416 dependents
417 }
418
419 fn udt_depends_on(&self, udt: &UdtTypeDef, target_udt: &str) -> bool {
421 for field in &udt.fields {
422 if self.field_type_depends_on(&field.field_type, target_udt) {
423 return true;
424 }
425 }
426 false
427 }
428
429 #[allow(clippy::only_used_in_recursion)]
431 fn field_type_depends_on(&self, field_type: &CqlType, target_udt: &str) -> bool {
432 match field_type {
433 CqlType::Udt(udt_name, _) => udt_name == target_udt,
434 CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
435 self.field_type_depends_on(inner, target_udt)
436 }
437 CqlType::Map(key_type, value_type) => {
438 self.field_type_depends_on(key_type, target_udt)
439 || self.field_type_depends_on(value_type, target_udt)
440 }
441 CqlType::Tuple(field_types) => field_types
442 .iter()
443 .any(|ft| self.field_type_depends_on(ft, target_udt)),
444 _ => false,
445 }
446 }
447
448 pub fn register_udt_with_validation(&mut self, udt_def: UdtTypeDef) -> crate::Result<()> {
450 for field in &udt_def.fields {
452 self.validate_field_type_dependencies(&field.field_type, &udt_def.keyspace)?;
453 }
454
455 if self.would_create_circular_dependency(&udt_def) {
457 return Err(crate::Error::schema(format!(
458 "Registering UDT '{}' would create circular dependency",
459 udt_def.name
460 )));
461 }
462
463 self.register_udt(udt_def);
464 Ok(())
465 }
466
467 fn would_create_circular_dependency(&self, udt_def: &UdtTypeDef) -> bool {
469 for field in &udt_def.fields {
471 if self.field_type_depends_on(&field.field_type, &udt_def.name) {
472 return true;
473 }
474 }
475 false
476 }
477
478 pub fn export_definitions(&self, keyspace: &str) -> Vec<String> {
480 let mut definitions = Vec::new();
481
482 if let Some(keyspace_udts) = self.udts.get(keyspace) {
483 for udt in keyspace_udts.values() {
484 let mut def = format!("CREATE TYPE {}.{} (\n", keyspace, udt.name);
485
486 for (i, field) in udt.fields.iter().enumerate() {
487 if i > 0 {
488 def.push_str(",\n");
489 }
490 def.push_str(&format!(
491 " {} {}",
492 field.name,
493 self.format_cql_type(&field.field_type)
494 ));
495 }
496
497 def.push_str("\n);");
498 definitions.push(def);
499 }
500 }
501
502 definitions
503 }
504
505 #[allow(clippy::only_used_in_recursion)]
507 fn format_cql_type(&self, cql_type: &CqlType) -> String {
508 match cql_type {
509 CqlType::Boolean => "boolean".to_string(),
510 CqlType::TinyInt => "tinyint".to_string(),
511 CqlType::SmallInt => "smallint".to_string(),
512 CqlType::Int => "int".to_string(),
513 CqlType::BigInt => "bigint".to_string(),
514 CqlType::Counter => "counter".to_string(),
515 CqlType::Float => "float".to_string(),
516 CqlType::Double => "double".to_string(),
517 CqlType::Text | CqlType::Varchar => "text".to_string(),
518 CqlType::Ascii => "ascii".to_string(),
519 CqlType::Blob => "blob".to_string(),
520 CqlType::Timestamp => "timestamp".to_string(),
521 CqlType::Date => "date".to_string(),
522 CqlType::Time => "time".to_string(),
523 CqlType::Uuid => "uuid".to_string(),
524 CqlType::TimeUuid => "timeuuid".to_string(),
525 CqlType::Inet => "inet".to_string(),
526 CqlType::Duration => "duration".to_string(),
527 CqlType::Varint => "varint".to_string(),
528 CqlType::Decimal => "decimal".to_string(),
529 CqlType::List(inner) => format!("list<{}>", self.format_cql_type(inner)),
530 CqlType::Set(inner) => format!("set<{}>", self.format_cql_type(inner)),
531 CqlType::Map(key, value) => format!(
532 "map<{}, {}>",
533 self.format_cql_type(key),
534 self.format_cql_type(value)
535 ),
536 CqlType::Udt(name, _) => name.clone(),
537 CqlType::Tuple(types) => {
538 let type_strs: Vec<String> =
539 types.iter().map(|t| self.format_cql_type(t)).collect();
540 format!("tuple<{}>", type_strs.join(", "))
541 }
542 CqlType::Frozen(inner) => format!("frozen<{}>", self.format_cql_type(inner)),
543 CqlType::Custom(name) => name.clone(),
544 }
545 }
546}
547
548impl TableSchema {
549 pub fn from_sstable_header(header: &SSTableHeader) -> Result<Self> {
554 let mut partition_keys = Vec::new();
556 let mut clustering_keys = Vec::new();
557 let mut regular_columns = Vec::new();
558
559 for col_info in &header.columns {
560 if col_info.is_primary_key {
561 if col_info.is_clustering {
562 clustering_keys.push(col_info);
563 } else {
564 partition_keys.push(col_info);
565 }
566 } else {
567 regular_columns.push(col_info);
568 }
569 }
570
571 for col_info in &partition_keys {
573 if col_info.key_position.is_none() {
574 return Err(Error::schema(format!(
575 "Partition key column '{}' missing key_position in SSTable header",
576 col_info.name
577 )));
578 }
579 }
580
581 for col_info in &clustering_keys {
583 if col_info.key_position.is_none() {
584 return Err(Error::schema(format!(
585 "Clustering key column '{}' missing key_position in SSTable header",
586 col_info.name
587 )));
588 }
589 }
590
591 partition_keys.sort_by_key(|c| c.key_position.unwrap());
593 clustering_keys.sort_by_key(|c| c.key_position.unwrap());
594
595 let partition_keys: Vec<KeyColumn> = partition_keys
598 .iter()
599 .enumerate()
600 .map(|(pos, col)| KeyColumn {
601 name: col.name.clone(),
602 data_type: col.column_type.clone(),
603 position: pos, })
605 .collect();
606
607 let clustering_keys: Vec<ClusteringColumn> = clustering_keys
609 .iter()
610 .enumerate()
611 .map(|(pos, col)| ClusteringColumn {
612 name: col.name.clone(),
613 data_type: col.column_type.clone(),
614 position: pos, order: ClusteringOrder::Asc, })
617 .collect();
618
619 let columns: Vec<Column> = header
621 .columns
622 .iter()
623 .map(|col| Column {
624 name: col.name.clone(),
625 data_type: col.column_type.clone(),
626 nullable: !col.is_primary_key, default: None,
628 is_static: false, })
630 .collect();
631
632 if partition_keys.is_empty() {
633 return Err(Error::schema(
634 "No partition keys found in SSTable header".to_string(),
635 ));
636 }
637
638 let schema = TableSchema {
639 keyspace: header.keyspace.clone(),
640 table: header.table_name.clone(),
641 partition_keys,
642 clustering_keys,
643 columns,
644 comments: HashMap::new(),
645 };
646
647 schema.validate()?;
648 Ok(schema)
649 }
650
651 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
653 let content = fs::read_to_string(path)
654 .map_err(|e| Error::schema(format!("Failed to read schema file: {}", e)))?;
655
656 Self::from_json(&content)
657 }
658
659 pub fn from_json(json: &str) -> Result<Self> {
661 let schema: TableSchema = serde_json::from_str(json)
662 .map_err(|e| Error::schema(format!("Invalid JSON schema: {}", e)))?;
663
664 schema.validate()?;
665 Ok(schema)
666 }
667
668 pub fn to_file<P: AsRef<Path>>(&self, path: P) -> Result<()> {
670 let json = serde_json::to_string_pretty(self)
671 .map_err(|e| Error::serialization(format!("Failed to serialize schema: {}", e)))?;
672
673 fs::write(path, json)
674 .map_err(|e| Error::schema(format!("Failed to write schema file: {}", e)))?;
675
676 Ok(())
677 }
678
679 pub fn validate(&self) -> Result<()> {
681 if self.keyspace.is_empty() {
683 return Err(Error::schema("Keyspace name cannot be empty".to_string()));
684 }
685
686 if self.table.is_empty() {
687 return Err(Error::schema("Table name cannot be empty".to_string()));
688 }
689
690 if self.partition_keys.is_empty() {
692 return Err(Error::schema(
693 "Table must have at least one partition key".to_string(),
694 ));
695 }
696
697 let mut positions: Vec<_> = self.partition_keys.iter().map(|k| k.position).collect();
699 positions.sort();
700 for (i, &pos) in positions.iter().enumerate() {
701 if pos != i {
702 return Err(Error::schema(format!(
703 "Partition key positions must be contiguous starting from 0, found gap at position {}",
704 i
705 )));
706 }
707 }
708
709 if !self.clustering_keys.is_empty() {
711 let mut positions: Vec<_> = self.clustering_keys.iter().map(|k| k.position).collect();
712 positions.sort();
713 for (i, &pos) in positions.iter().enumerate() {
714 if pos != i {
715 return Err(Error::schema(format!(
716 "Clustering key positions must be contiguous starting from 0, found gap at position {}",
717 i
718 )));
719 }
720 }
721 }
722
723 for column in &self.columns {
725 CqlType::parse(&column.data_type).map_err(|e| {
726 Error::schema(format!(
727 "Invalid data type '{}' for column '{}': {}",
728 column.data_type, column.name, e
729 ))
730 })?;
731 }
732
733 for key in &self.partition_keys {
737 if !self.columns.iter().any(|c| c.name == key.name) {
738 return Err(Error::schema(format!(
739 "Partition key '{}' not found in columns list",
740 key.name
741 )));
742 }
743 }
744
745 for key in &self.clustering_keys {
746 if !self.columns.iter().any(|c| c.name == key.name) {
747 return Err(Error::schema(format!(
748 "Clustering key '{}' not found in columns list",
749 key.name
750 )));
751 }
752 }
753
754 Ok(())
755 }
756
757 pub fn get_column(&self, name: &str) -> Option<&Column> {
759 self.columns.iter().find(|c| c.name == name)
760 }
761
762 pub fn is_partition_key(&self, name: &str) -> bool {
764 self.partition_keys.iter().any(|k| k.name == name)
765 }
766
767 pub fn is_clustering_key(&self, name: &str) -> bool {
769 self.clustering_keys.iter().any(|k| k.name == name)
770 }
771
772 pub fn ordered_partition_keys(&self) -> Vec<&KeyColumn> {
774 let mut keys = self.partition_keys.iter().collect::<Vec<_>>();
775 keys.sort_by_key(|k| k.position);
776 keys
777 }
778
779 pub fn ordered_clustering_keys(&self) -> Vec<&ClusteringColumn> {
781 let mut keys = self.clustering_keys.iter().collect::<Vec<_>>();
782 keys.sort_by_key(|k| k.position);
783 keys
784 }
785
786 pub fn get_column_comparator(&self, column_name: &str) -> Result<ComparatorType> {
788 let column = self
789 .get_column(column_name)
790 .ok_or_else(|| Error::Schema(format!("Column '{}' not found", column_name)))?;
791
792 let cql_type = CqlType::parse(&column.data_type)?;
793 ComparatorType::from_cql_type(&cql_type)
794 }
795
796 pub fn get_all_comparators(&self) -> Result<HashMap<String, ComparatorType>> {
798 let mut comparators = HashMap::new();
799
800 for column in &self.columns {
801 let cql_type = CqlType::parse(&column.data_type)?;
802 let comparator = ComparatorType::from_cql_type(&cql_type)?;
803 comparators.insert(column.name.clone(), comparator);
804 }
805
806 Ok(comparators)
807 }
808
809 pub fn get_partition_key_comparators(&self) -> Result<Vec<ComparatorType>> {
811 let mut comparators = Vec::new();
812 let ordered_keys = self.ordered_partition_keys();
813
814 for key_column in ordered_keys {
815 let cql_type = CqlType::parse(&key_column.data_type)?;
816 let comparator = ComparatorType::from_cql_type(&cql_type)?;
817 comparators.push(comparator);
818 }
819
820 Ok(comparators)
821 }
822
823 pub fn get_clustering_key_comparators(&self) -> Result<Vec<ComparatorType>> {
825 let mut comparators = Vec::new();
826 let ordered_keys = self.ordered_clustering_keys();
827
828 for key_column in ordered_keys {
829 let cql_type = CqlType::parse(&key_column.data_type)?;
830 let comparator = ComparatorType::from_cql_type(&cql_type)?;
831 comparators.push(comparator);
832 }
833
834 Ok(comparators)
835 }
836
837 pub fn is_column_type_compatible(
839 &self,
840 column_name: &str,
841 expected_type: &str,
842 ) -> Result<bool> {
843 let column_comparator = self.get_column_comparator(column_name)?;
844 let expected_cql_type = CqlType::parse(expected_type)?;
845 let expected_comparator = ComparatorType::from_cql_type(&expected_cql_type)?;
846
847 Ok(self.comparators_are_compatible(&column_comparator, &expected_comparator))
848 }
849
850 #[allow(clippy::only_used_in_recursion)]
852 fn comparators_are_compatible(&self, left: &ComparatorType, right: &ComparatorType) -> bool {
853 match (left, right) {
854 (ComparatorType::Boolean, ComparatorType::Boolean) => true,
856 (ComparatorType::TinyInt, ComparatorType::TinyInt) => true,
857 (ComparatorType::SmallInt, ComparatorType::SmallInt) => true,
858 (ComparatorType::Int, ComparatorType::Int) => true,
859 (ComparatorType::BigInt, ComparatorType::BigInt) => true,
860 (ComparatorType::Float32, ComparatorType::Float32) => true,
861 (ComparatorType::Float, ComparatorType::Float) => true,
862 (ComparatorType::Text, ComparatorType::Text) => true,
863 (ComparatorType::Blob, ComparatorType::Blob) => true,
864 (ComparatorType::Timestamp, ComparatorType::Timestamp) => true,
865 (ComparatorType::Uuid, ComparatorType::Uuid) => true,
866 (ComparatorType::Json, ComparatorType::Json) => true,
867
868 (ComparatorType::List(l_elem), ComparatorType::List(r_elem)) => {
870 self.comparators_are_compatible(l_elem, r_elem)
871 }
872 (ComparatorType::Set(l_elem), ComparatorType::Set(r_elem)) => {
873 self.comparators_are_compatible(l_elem, r_elem)
874 }
875 (ComparatorType::Map(l_key, l_val), ComparatorType::Map(r_key, r_val)) => {
876 self.comparators_are_compatible(l_key, r_key)
877 && self.comparators_are_compatible(l_val, r_val)
878 }
879
880 (ComparatorType::Tuple(l_fields), ComparatorType::Tuple(r_fields)) => {
882 l_fields.len() == r_fields.len()
883 && l_fields
884 .iter()
885 .zip(r_fields.iter())
886 .all(|(l, r)| self.comparators_are_compatible(l, r))
887 }
888
889 (
891 ComparatorType::Udt {
892 type_name: l_name,
893 keyspace: l_ks,
894 ..
895 },
896 ComparatorType::Udt {
897 type_name: r_name,
898 keyspace: r_ks,
899 ..
900 },
901 ) => l_name == r_name && l_ks == r_ks,
902
903 (ComparatorType::Frozen(l_inner), ComparatorType::Frozen(r_inner)) => {
905 self.comparators_are_compatible(l_inner, r_inner)
906 }
907
908 (ComparatorType::Custom(l_name), ComparatorType::Custom(r_name)) => l_name == r_name,
910
911 _ => false,
913 }
914 }
915
916 #[cfg(test)]
918 pub fn new_for_testing(keyspace: &str, table: &str) -> Self {
919 Self {
920 keyspace: keyspace.to_string(),
921 table: table.to_string(),
922 partition_keys: vec![KeyColumn {
923 name: "id".to_string(),
924 data_type: "int".to_string(),
925 position: 0,
926 }],
927 clustering_keys: vec![],
928 columns: vec![Column {
929 name: "id".to_string(),
930 data_type: "int".to_string(),
931 nullable: false,
932 default: None,
933 is_static: false,
934 }],
935 comments: HashMap::new(),
936 }
937 }
938}
939
940impl CqlType {
941 fn split_top_level_types(type_str: &str) -> Result<Vec<&str>> {
942 let mut parts = Vec::new();
943 let mut depth = 0usize;
944 let mut start = 0usize;
945
946 for (index, ch) in type_str.char_indices() {
947 match ch {
948 '<' => depth += 1,
949 '>' => {
950 if depth == 0 {
951 return Err(Error::schema(format!(
952 "Invalid nested type syntax: {}",
953 type_str
954 )));
955 }
956 depth -= 1;
957 }
958 ',' if depth == 0 => {
959 parts.push(type_str[start..index].trim());
960 start = index + ch.len_utf8();
961 }
962 _ => {}
963 }
964 }
965
966 if depth != 0 {
967 return Err(Error::schema(format!(
968 "Unbalanced nested type syntax: {}",
969 type_str
970 )));
971 }
972
973 parts.push(type_str[start..].trim());
974 Ok(parts.into_iter().filter(|part| !part.is_empty()).collect())
975 }
976
977 pub fn parse(type_str: &str) -> Result<Self> {
979 let type_str = type_str.trim();
980
981 if let Some(inner) = type_str.strip_prefix("frozen<") {
983 if let Some(inner) = inner.strip_suffix('>') {
984 return Ok(CqlType::Frozen(Box::new(Self::parse(inner)?)));
985 }
986 }
987
988 if let Some(inner) = type_str.strip_prefix("list<") {
990 if let Some(inner) = inner.strip_suffix('>') {
991 return Ok(CqlType::List(Box::new(Self::parse(inner)?)));
992 }
993 }
994
995 if let Some(inner) = type_str.strip_prefix("set<") {
996 if let Some(inner) = inner.strip_suffix('>') {
997 return Ok(CqlType::Set(Box::new(Self::parse(inner)?)));
998 }
999 }
1000
1001 if let Some(inner) = type_str.strip_prefix("map<") {
1002 if let Some(inner) = inner.strip_suffix('>') {
1003 let parts = Self::split_top_level_types(inner)?;
1004 if parts.len() != 2 {
1005 return Err(Error::schema(format!("Invalid map type: {}", type_str)));
1006 }
1007 return Ok(CqlType::Map(
1008 Box::new(Self::parse(parts[0].trim())?),
1009 Box::new(Self::parse(parts[1].trim())?),
1010 ));
1011 }
1012 }
1013
1014 if let Some(inner) = type_str.strip_prefix("tuple<") {
1016 if let Some(inner) = inner.strip_suffix('>') {
1017 let parts = Self::split_top_level_types(inner)?;
1018 let mut types = Vec::new();
1019 for part in parts {
1020 types.push(Self::parse(part.trim())?);
1021 }
1022 return Ok(CqlType::Tuple(types));
1023 }
1024 }
1025
1026 let lowercase_type = type_str.to_lowercase();
1029 let is_primitive = matches!(
1030 lowercase_type.as_str(),
1031 "boolean"
1032 | "bool"
1033 | "tinyint"
1034 | "smallint"
1035 | "int"
1036 | "integer"
1037 | "bigint"
1038 | "long"
1039 | "counter"
1040 | "float"
1041 | "double"
1042 | "decimal"
1043 | "text"
1044 | "varchar"
1045 | "ascii"
1046 | "blob"
1047 | "timestamp"
1048 | "date"
1049 | "time"
1050 | "uuid"
1051 | "timeuuid"
1052 | "inet"
1053 | "duration"
1054 );
1055
1056 if !is_primitive
1057 && type_str
1058 .chars()
1059 .all(|c| c.is_alphanumeric() || c == '_' || c == '.')
1060 && !type_str.chars().all(|c| c.is_ascii_lowercase())
1061 {
1062 return Ok(CqlType::Custom(format!("udt:{}", type_str)));
1065 }
1066
1067 match type_str.to_lowercase().as_str() {
1069 "boolean" | "bool" => Ok(CqlType::Boolean),
1070 "tinyint" => Ok(CqlType::TinyInt),
1071 "smallint" => Ok(CqlType::SmallInt),
1072 "int" | "integer" => Ok(CqlType::Int),
1073 "bigint" | "long" => Ok(CqlType::BigInt),
1074 "counter" => Ok(CqlType::Counter),
1075 "float" => Ok(CqlType::Float),
1076 "double" => Ok(CqlType::Double),
1077 "decimal" => Ok(CqlType::Decimal),
1078 "text" | "varchar" => Ok(CqlType::Text),
1079 "ascii" => Ok(CqlType::Ascii),
1080 "blob" => Ok(CqlType::Blob),
1081 "timestamp" => Ok(CqlType::Timestamp),
1082 "date" => Ok(CqlType::Date),
1083 "time" => Ok(CqlType::Time),
1084 "uuid" => Ok(CqlType::Uuid),
1085 "timeuuid" => Ok(CqlType::TimeUuid),
1086 "inet" => Ok(CqlType::Inet),
1087 "duration" => Ok(CqlType::Duration),
1088 "varint" => Ok(CqlType::Varint),
1089 _ => Ok(CqlType::Custom(type_str.to_string())),
1090 }
1091 }
1092
1093 pub fn fixed_size(&self) -> Option<usize> {
1095 match self {
1096 CqlType::Boolean => Some(1),
1097 CqlType::TinyInt => Some(1),
1098 CqlType::SmallInt => Some(2),
1099 CqlType::Int => Some(4),
1100 CqlType::BigInt => Some(8),
1101 CqlType::Counter => Some(8),
1102 CqlType::Float => Some(4),
1103 CqlType::Double => Some(8),
1104 CqlType::Timestamp => Some(8),
1105 CqlType::Date => Some(4),
1106 CqlType::Time => Some(8),
1107 CqlType::Uuid | CqlType::TimeUuid => Some(16),
1108 CqlType::Inet => Some(16), CqlType::Text
1111 | CqlType::Ascii
1112 | CqlType::Varchar
1113 | CqlType::Blob
1114 | CqlType::Decimal
1115 | CqlType::Duration
1116 | CqlType::Varint => None,
1117 CqlType::List(_)
1119 | CqlType::Set(_)
1120 | CqlType::Map(_, _)
1121 | CqlType::Tuple(_)
1122 | CqlType::Udt(_, _) => None,
1123 CqlType::Frozen(inner) => inner.fixed_size(),
1124 CqlType::Custom(_) => None,
1125 }
1126 }
1127
1128 pub fn is_collection(&self) -> bool {
1130 matches!(
1131 self,
1132 CqlType::List(_) | CqlType::Set(_) | CqlType::Map(_, _)
1133 )
1134 }
1135}
1136
1137#[derive(Debug)]
1139pub struct SchemaManager {
1140 #[allow(dead_code)]
1141 storage: Arc<StorageEngine>,
1142 schemas: Arc<RwLock<HashMap<String, TableSchema>>>,
1143 pub(crate) udt_registry: Arc<RwLock<UdtRegistry>>,
1145}
1146
1147impl SchemaManager {
1148 pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1150 let config = Config::default();
1152 let platform = Arc::new(crate::platform::Platform::new(&config).await?);
1153 let storage = Arc::new(
1154 StorageEngine::open(
1155 path.as_ref(),
1156 &config,
1157 platform,
1158 #[cfg(feature = "state_machine")]
1159 None,
1160 )
1161 .await?,
1162 );
1163
1164 Ok(Self {
1165 storage,
1166 schemas: Arc::new(RwLock::new(HashMap::new())),
1167 udt_registry: Arc::new(RwLock::new(UdtRegistry::new())),
1168 })
1169 }
1170
1171 pub async fn new_with_storage(storage: Arc<StorageEngine>, _config: &Config) -> Result<Self> {
1173 let manager = Self {
1174 storage,
1175 schemas: Arc::new(RwLock::new(HashMap::new())),
1176 udt_registry: Arc::new(RwLock::new(UdtRegistry::new())),
1177 };
1178
1179 manager.load_default_udts().await;
1181
1182 Ok(manager)
1183 }
1184
1185 pub async fn new_with_registry(
1196 storage: Arc<StorageEngine>,
1197 registry: Arc<tokio::sync::RwLock<registry::SchemaRegistry>>,
1198 _config: &Config,
1199 ) -> Result<Self> {
1200 let (loaded_schemas, udt_registry) = {
1202 let registry_guard = registry.read().await;
1203 let schemas = registry_guard.list_schemas(None).await?;
1204 let udt_reg = registry_guard.get_udt_registry();
1205 (schemas, udt_reg)
1206 }; let mut schemas_map = HashMap::new();
1210 for schema in loaded_schemas {
1211 let table_id = format!("{}.{}", schema.keyspace, schema.table);
1212 schemas_map.insert(table_id, schema);
1213 }
1214
1215 let manager = Self {
1216 storage,
1217 schemas: Arc::new(RwLock::new(schemas_map)),
1218 udt_registry,
1219 };
1220
1221 Ok(manager)
1222 }
1223
1224 async fn load_default_udts(&self) {
1226 let address_udt = UdtTypeDef::new("test_keyspace".to_string(), "address".to_string())
1228 .with_field("street".to_string(), CqlType::Text, true)
1229 .with_field("city".to_string(), CqlType::Text, true)
1230 .with_field("state".to_string(), CqlType::Text, true)
1231 .with_field("zip_code".to_string(), CqlType::Text, true)
1232 .with_field("country".to_string(), CqlType::Text, true);
1233
1234 self.udt_registry.write().await.register_udt(address_udt);
1235
1236 let person_udt = UdtTypeDef::new("test_keyspace".to_string(), "person".to_string())
1238 .with_field("name".to_string(), CqlType::Text, true)
1239 .with_field("age".to_string(), CqlType::Int, true)
1240 .with_field("email".to_string(), CqlType::Text, true)
1241 .with_field(
1242 "addresses".to_string(),
1243 CqlType::List(Box::new(CqlType::Udt(
1244 "address".to_string(),
1245 vec![
1246 ("street".to_string(), CqlType::Text),
1247 ("city".to_string(), CqlType::Text),
1248 ("state".to_string(), CqlType::Text),
1249 ("zip_code".to_string(), CqlType::Text),
1250 ("country".to_string(), CqlType::Text),
1251 ],
1252 ))),
1253 true,
1254 )
1255 .with_field(
1256 "contact_info".to_string(),
1257 CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text)),
1258 true,
1259 );
1260
1261 self.udt_registry.write().await.register_udt(person_udt);
1262
1263 let company_udt = UdtTypeDef::new("test_keyspace".to_string(), "company".to_string())
1265 .with_field("name".to_string(), CqlType::Text, false)
1266 .with_field(
1267 "headquarters".to_string(),
1268 CqlType::Udt(
1269 "address".to_string(),
1270 vec![
1271 ("street".to_string(), CqlType::Text),
1272 ("city".to_string(), CqlType::Text),
1273 ("state".to_string(), CqlType::Text),
1274 ("zip_code".to_string(), CqlType::Text),
1275 ("country".to_string(), CqlType::Text),
1276 ],
1277 ),
1278 true,
1279 )
1280 .with_field(
1281 "employees".to_string(),
1282 CqlType::Set(Box::new(CqlType::Udt("person".to_string(), vec![]))),
1283 true,
1284 )
1285 .with_field("founded_year".to_string(), CqlType::Int, true);
1286
1287 self.udt_registry.write().await.register_udt(company_udt);
1288 }
1289
1290 pub async fn register_udt(&self, udt_def: UdtTypeDef) {
1292 self.udt_registry.write().await.register_udt(udt_def);
1293 }
1294
1295 pub async fn get_udt(&self, keyspace: &str, name: &str) -> Option<UdtTypeDef> {
1297 self.udt_registry
1298 .read()
1299 .await
1300 .get_udt(keyspace, name)
1301 .cloned()
1302 }
1303
1304 pub async fn load_schema(&self, table_name: &str) -> Result<TableSchema> {
1306 let schemas = self.schemas.read().await;
1308 if let Some(schema) = schemas.get(table_name) {
1309 return Ok(schema.clone());
1310 }
1311 drop(schemas); let schema = self.create_default_schema(table_name);
1315
1316 self.schemas
1318 .write()
1319 .await
1320 .insert(table_name.to_string(), schema.clone());
1321 Ok(schema)
1322 }
1323
1324 fn create_default_schema(&self, table_name: &str) -> TableSchema {
1326 TableSchema {
1327 keyspace: "default".to_string(),
1328 table: table_name.to_string(),
1329 partition_keys: vec![KeyColumn {
1330 name: "id".to_string(),
1331 data_type: "uuid".to_string(),
1332 position: 0,
1333 }],
1334 clustering_keys: vec![],
1335 columns: vec![Column {
1336 name: "id".to_string(),
1337 data_type: "uuid".to_string(),
1338 nullable: false,
1339 default: None,
1340 is_static: false,
1341 }],
1342 comments: HashMap::new(),
1343 }
1344 }
1345
1346 pub async fn parse_and_register_cql_schema(&self, cql: &str) -> Result<TableSchema> {
1348 let schema = cql_parser::parse_cql_schema(cql)?;
1349 let table_key = format!("{}.{}", schema.keyspace, schema.table);
1350 self.schemas
1351 .write()
1352 .await
1353 .insert(table_key.clone(), schema.clone());
1354 Ok(schema)
1355 }
1356
1357 pub async fn find_schema_by_table(
1359 &self,
1360 keyspace: &Option<String>,
1361 table: &str,
1362 ) -> Option<TableSchema> {
1363 let schemas = self.schemas.read().await;
1364
1365 if let Some(ks) = keyspace {
1367 let key = format!("{}.{}", ks, table);
1368 if let Some(schema) = schemas.get(&key) {
1369 return Some(schema.clone());
1370 }
1371 }
1372
1373 schemas
1375 .values()
1376 .find(|schema| {
1377 cql_parser::table_name_matches(
1378 &Some(schema.keyspace.clone()),
1379 &schema.table,
1380 keyspace,
1381 table,
1382 )
1383 })
1384 .cloned()
1385 }
1386
1387 pub fn extract_table_info(&self, cql: &str) -> Result<(Option<String>, String)> {
1389 cql_parser::extract_table_name(cql)
1390 }
1391
1392 pub fn cql_type_to_internal(&self, cql_type: &str) -> Result<CqlTypeId> {
1394 cql_parser::cql_type_to_type_id(cql_type)
1395 }
1396
1397 pub async fn get_table_schema(&self, table_name: &str) -> Result<TableSchema> {
1399 if let Some(schema) = self.find_schema_by_table(&None, table_name).await {
1401 Ok(schema)
1402 } else {
1403 Err(Error::Schema(format!(
1404 "Table schema not found: {}",
1405 table_name
1406 )))
1407 }
1408 }
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413 use super::*;
1414
1415 #[test]
1416 fn test_schema_validation() {
1417 let schema_json = r#"
1418 {
1419 "keyspace": "test",
1420 "table": "users",
1421 "partition_keys": [
1422 {"name": "id", "type": "bigint", "position": 0}
1423 ],
1424 "clustering_keys": [],
1425 "columns": [
1426 {"name": "id", "type": "bigint", "nullable": false},
1427 {"name": "name", "type": "text", "nullable": true}
1428 ]
1429 }
1430 "#;
1431
1432 let schema = TableSchema::from_json(schema_json).unwrap();
1433 assert_eq!(schema.keyspace, "test");
1434 assert_eq!(schema.table, "users");
1435 assert_eq!(schema.partition_keys.len(), 1);
1436 assert_eq!(schema.columns.len(), 2);
1437 }
1438
1439 #[test]
1440 fn test_cql_type_parsing() {
1441 assert_eq!(CqlType::parse("text").unwrap(), CqlType::Text);
1442 assert_eq!(CqlType::parse("bigint").unwrap(), CqlType::BigInt);
1443
1444 match CqlType::parse("list<int>").unwrap() {
1445 CqlType::List(inner) => assert_eq!(*inner, CqlType::Int),
1446 _ => panic!("Expected List type"),
1447 }
1448
1449 match CqlType::parse("map<text, bigint>").unwrap() {
1450 CqlType::Map(key, value) => {
1451 assert_eq!(*key, CqlType::Text);
1452 assert_eq!(*value, CqlType::BigInt);
1453 }
1454 _ => panic!("Expected Map type"),
1455 }
1456
1457 match CqlType::parse("tuple<text, list<int>, map<text, text>>").unwrap() {
1458 CqlType::Tuple(fields) => {
1459 assert_eq!(fields.len(), 3);
1460 assert_eq!(fields[0], CqlType::Text);
1461 assert_eq!(fields[1], CqlType::List(Box::new(CqlType::Int)));
1462 assert_eq!(
1463 fields[2],
1464 CqlType::Map(Box::new(CqlType::Text), Box::new(CqlType::Text))
1465 );
1466 }
1467 _ => panic!("Expected Tuple type"),
1468 }
1469 }
1470
1471 #[test]
1472 fn test_schema_validation_failures() {
1473 let invalid_schema = r#"
1475 {
1476 "keyspace": "test",
1477 "table": "users",
1478 "partition_keys": [],
1479 "clustering_keys": [],
1480 "columns": []
1481 }
1482 "#;
1483
1484 assert!(TableSchema::from_json(invalid_schema).is_err());
1485
1486 let invalid_type = r#"
1488 {
1489 "keyspace": "test",
1490 "table": "users",
1491 "partition_keys": [
1492 {"name": "id", "type": "invalid_type", "position": 0}
1493 ],
1494 "clustering_keys": [],
1495 "columns": [
1496 {"name": "id", "type": "invalid_type", "nullable": false}
1497 ]
1498 }
1499 "#;
1500
1501 assert!(TableSchema::from_json(invalid_type).is_ok());
1503 }
1504
1505 #[tokio::test]
1506 async fn test_concurrent_schema_access() {
1507 let config = Config::default();
1509 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
1510 let temp_dir = tempfile::tempdir().unwrap();
1511 let storage = Arc::new(
1512 StorageEngine::open(
1513 temp_dir.path(),
1514 &config,
1515 platform,
1516 #[cfg(feature = "state_machine")]
1517 None,
1518 )
1519 .await
1520 .unwrap(),
1521 );
1522
1523 let manager = Arc::new(
1524 SchemaManager::new_with_storage(storage, &config)
1525 .await
1526 .unwrap(),
1527 );
1528
1529 let mut handles = vec![];
1531 for i in 0..10 {
1532 let m = Arc::clone(&manager);
1533 let handle = tokio::spawn(async move {
1534 let table = format!("table_{}", i % 3); m.load_schema(&table).await.unwrap()
1536 });
1537 handles.push(handle);
1538 }
1539
1540 for handle in handles {
1542 handle.await.unwrap();
1543 }
1544
1545 let schemas = manager.schemas.read().await;
1547 assert!(schemas.len() <= 3); assert!(schemas.contains_key("table_0"));
1549 assert!(schemas.contains_key("table_1"));
1550 assert!(schemas.contains_key("table_2"));
1551 }
1552
1553 #[test]
1554 fn test_schema_from_sstable_header() {
1555 use crate::parser::header::{
1556 CassandraVersion, ColumnInfo, CompressionInfo, SSTableHeader, SSTableStats,
1557 };
1558 use std::collections::HashMap;
1559
1560 let columns = vec![
1561 ColumnInfo {
1562 name: "id".to_string(),
1563 column_type: "int".to_string(),
1564 is_primary_key: true,
1565 key_position: Some(0),
1566 is_static: false,
1567 is_clustering: false,
1568 },
1569 ColumnInfo {
1570 name: "name".to_string(),
1571 column_type: "text".to_string(),
1572 is_primary_key: false,
1573 key_position: None,
1574 is_static: false,
1575 is_clustering: false,
1576 },
1577 ];
1578
1579 let header = SSTableHeader {
1580 cassandra_version: CassandraVersion::V5_0Bti,
1581 version: 1,
1582 table_id: [0; 16],
1583 keyspace: "test_ks".to_string(),
1584 table_name: "test_table".to_string(),
1585 generation: 1,
1586 compression: CompressionInfo {
1587 algorithm: "NONE".to_string(),
1588 chunk_size: 0,
1589 parameters: HashMap::new(),
1590 },
1591 stats: SSTableStats::default(),
1592 columns,
1593 properties: HashMap::new(),
1594 };
1595
1596 let schema = TableSchema::from_sstable_header(&header).unwrap();
1597
1598 assert_eq!(schema.keyspace, "test_ks");
1599 assert_eq!(schema.table, "test_table");
1600 assert_eq!(schema.partition_keys.len(), 1);
1601 assert_eq!(schema.partition_keys[0].name, "id");
1602 assert_eq!(schema.columns.len(), 2);
1603 }
1604}