1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14
15use crate::{
16 platform::Platform,
17 schema::{
18 discovery::{SchemaDiscoveryConfig, SchemaDiscoveryEngine, SchemaInfo},
19 CqlType, TableSchema, UdtRegistry,
20 },
21 types::{ComparatorType, UdtTypeDef},
22 Config, Error, Result,
23};
24
25#[derive(Debug, Clone)]
27pub struct SchemaRegistryConfig {
28 pub enable_auto_discovery: bool,
30 pub enable_caching: bool,
32 pub cache_ttl_seconds: u64,
34 pub enable_versioning: bool,
36 pub max_versions_per_schema: usize,
38 pub enable_validation: bool,
40 pub auto_refresh_on_changes: bool,
42 pub discovery_config: SchemaDiscoveryConfig,
44}
45
46impl Default for SchemaRegistryConfig {
47 fn default() -> Self {
48 Self {
49 enable_auto_discovery: true,
50 enable_caching: true,
51 cache_ttl_seconds: 3600, enable_versioning: true,
53 max_versions_per_schema: 5,
54 enable_validation: true,
55 auto_refresh_on_changes: false, discovery_config: SchemaDiscoveryConfig::default(),
57 }
58 }
59}
60
61#[derive(Debug)]
63pub struct SchemaRegistry {
64 config: SchemaRegistryConfig,
66 _platform: Arc<Platform>,
68 _core_config: Config,
70 schemas: Arc<RwLock<HashMap<String, SchemaEntry>>>,
72 udt_registry: Arc<RwLock<UdtRegistry>>,
74 discovery_engine: Arc<SchemaDiscoveryEngine>,
76 validator: Arc<SchemaValidator>,
78 version_history: Arc<RwLock<HashMap<String, Vec<SchemaVersion>>>>,
80}
81
82#[derive(Debug, Clone)]
84struct SchemaEntry {
85 schema: TableSchema,
87 extended_info: Option<SchemaInfo>,
89 registered_at: SystemTime,
91 source: SchemaSource,
93 validation_status: SchemaValidationStatus,
95 _associated_files: Vec<PathBuf>,
97}
98
99#[derive(Debug, Clone)]
101pub enum SchemaSource {
102 Discovered(Vec<PathBuf>),
104 External(PathBuf),
106 Cql(String),
108 Manual,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum SchemaValidationStatus {
115 Valid,
117 ValidWithWarnings,
119 Invalid,
121 NotValidated,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct SchemaVersion {
128 pub version: u32,
130 pub created_at: SystemTime,
132 pub schema: TableSchema,
134 pub changes: Vec<SchemaChange>,
136 pub source: String,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SchemaChange {
143 pub change_type: SchemaChangeType,
145 pub component: String,
147 pub description: String,
149 pub old_value: Option<String>,
151 pub new_value: Option<String>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
157pub enum SchemaChangeType {
158 ColumnAdded,
160 ColumnRemoved,
162 ColumnTypeChanged,
164 ColumnRenamed,
166 IndexAdded,
168 IndexRemoved,
170 UdtAdded,
172 UdtModified,
174 UdtRemoved,
176 TableOptionChanged,
178}
179
180#[derive(Debug, Clone)]
182pub struct ValidationReport {
183 pub table_id: String,
185 pub status: SchemaValidationStatus,
187 pub errors: Vec<ValidationError>,
189 pub warnings: Vec<ValidationWarning>,
191 pub recommendations: Vec<String>,
193 pub validated_at: SystemTime,
195}
196
197#[derive(Debug, Clone)]
199pub struct ValidationError {
200 pub code: String,
202 pub message: String,
204 pub component: Option<String>,
206 pub severity: ErrorSeverity,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum ErrorSeverity {
213 Critical,
214 High,
215 Medium,
216 Low,
217}
218
219#[derive(Debug, Clone)]
221pub struct ValidationWarning {
222 pub code: String,
224 pub message: String,
226 pub component: Option<String>,
228}
229
230#[derive(Debug, Clone)]
232pub struct SchemaQuery {
233 pub keyspace: Option<String>,
235 pub table_pattern: Option<String>,
237 pub source_types: Option<Vec<SchemaSource>>,
239 pub validated_only: bool,
241 pub include_history: bool,
243}
244
245impl SchemaRegistry {
246 pub async fn new(
248 config: SchemaRegistryConfig,
249 platform: Arc<Platform>,
250 core_config: Config,
251 ) -> Result<Self> {
252 let discovery_engine = Arc::new(
253 SchemaDiscoveryEngine::new(
254 config.discovery_config.clone(),
255 platform.clone(),
256 core_config.clone(),
257 )
258 .await?,
259 );
260
261 let validator = Arc::new(SchemaValidator::new());
262 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
263
264 Ok(Self {
265 config,
266 _platform: platform,
267 _core_config: core_config,
268 schemas: Arc::new(RwLock::new(HashMap::new())),
269 udt_registry,
270 discovery_engine,
271 validator,
272 version_history: Arc::new(RwLock::new(HashMap::new())),
273 })
274 }
275
276 pub async fn discover_schema(
278 &self,
279 keyspace: &str,
280 table: &str,
281 sstable_files: &[PathBuf],
282 ) -> Result<TableSchema> {
283 if !self.config.enable_auto_discovery {
284 return Err(Error::Schema("Auto-discovery is disabled".to_string()));
285 }
286
287 let schema_info = self
289 .discovery_engine
290 .discover_schema(keyspace, table, sstable_files)
291 .await?;
292
293 let table_schema = self.convert_schema_info_to_table_schema(&schema_info)?;
295
296 self.register_discovered_schema(
298 table_schema.clone(),
299 Some(schema_info),
300 sstable_files.to_vec(),
301 )
302 .await?;
303
304 Ok(table_schema)
305 }
306
307 pub async fn register_schema(&self, schema: TableSchema, source: SchemaSource) -> Result<()> {
309 let table_id = format!("{}.{}", schema.keyspace, schema.table);
310
311 let validation_status = if self.config.enable_validation {
313 match self.validator.validate_table_schema(&schema).await {
314 Ok(_) => SchemaValidationStatus::Valid,
315 Err(_) => SchemaValidationStatus::Invalid,
316 }
317 } else {
318 SchemaValidationStatus::NotValidated
319 };
320
321 let entry = SchemaEntry {
323 schema: schema.clone(),
324 extended_info: None,
325 registered_at: SystemTime::now(),
326 source,
327 validation_status,
328 _associated_files: Vec::new(),
329 };
330
331 {
333 let mut schemas = self.schemas.write().await;
334
335 if self.config.enable_versioning && schemas.contains_key(&table_id) {
337 self.create_schema_version(&table_id, &schema).await?;
338 }
339
340 schemas.insert(table_id, entry);
341 }
342
343 Ok(())
344 }
345
346 pub async fn get_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
348 let table_id = format!("{}.{}", keyspace, table);
349 let schemas = self.schemas.read().await;
350
351 match schemas.get(&table_id) {
352 Some(entry) => {
353 if self.is_entry_expired(entry) {
355 drop(schemas); return self.refresh_schema(keyspace, table).await;
357 }
358 Ok(entry.schema.clone())
359 }
360 None => {
361 drop(schemas); if self.config.enable_auto_discovery {
364 self.auto_discover_schema(keyspace, table).await
365 } else {
366 Err(Error::Schema(format!(
367 "Schema not found: {}.{}",
368 keyspace, table
369 )))
370 }
371 }
372 }
373 }
374
375 pub async fn get_schema_info(&self, keyspace: &str, table: &str) -> Result<Option<SchemaInfo>> {
377 let table_id = format!("{}.{}", keyspace, table);
378 let schemas = self.schemas.read().await;
379
380 match schemas.get(&table_id) {
381 Some(entry) => Ok(entry.extended_info.clone()),
382 None => Ok(None),
383 }
384 }
385
386 pub async fn list_schemas(&self, query: Option<SchemaQuery>) -> Result<Vec<TableSchema>> {
388 let schemas = self.schemas.read().await;
389 let mut results = Vec::new();
390
391 for (_table_id, entry) in schemas.iter() {
392 if let Some(ref q) = query {
394 if !self.matches_query(&entry.schema, q) {
395 continue;
396 }
397 }
398
399 results.push(entry.schema.clone());
400 }
401
402 results.sort_by(|a, b| {
404 a.keyspace
405 .cmp(&b.keyspace)
406 .then_with(|| a.table.cmp(&b.table))
407 });
408
409 Ok(results)
410 }
411
412 #[allow(dead_code)]
414 pub async fn validate_schema(&self, keyspace: &str, table: &str) -> Result<ValidationReport> {
415 let schema = self.get_schema(keyspace, table).await?;
416 let table_id = format!("{}.{}", keyspace, table);
417
418 let mut errors = Vec::new();
420 let mut warnings = Vec::new();
421 let mut recommendations = Vec::new();
422
423 if let Err(e) = schema.validate() {
425 errors.push(ValidationError {
426 code: "SCHEMA_INVALID".to_string(),
427 message: e.to_string(),
428 component: None,
429 severity: ErrorSeverity::Critical,
430 });
431 }
432
433 self.validate_schema_udts(&schema, &mut errors, &mut warnings)
435 .await;
436
437 self.validate_column_types(&schema, &mut errors, &mut warnings)
439 .await;
440
441 self.generate_performance_recommendations(&schema, &mut recommendations)
443 .await;
444
445 let status = if !errors.is_empty() {
447 SchemaValidationStatus::Invalid
448 } else if !warnings.is_empty() {
449 SchemaValidationStatus::ValidWithWarnings
450 } else {
451 SchemaValidationStatus::Valid
452 };
453
454 {
456 let mut schemas = self.schemas.write().await;
457 if let Some(entry) = schemas.get_mut(&table_id) {
458 entry.validation_status = status.clone();
459 }
460 }
461
462 Ok(ValidationReport {
463 table_id,
464 status,
465 errors,
466 warnings,
467 recommendations,
468 validated_at: SystemTime::now(),
469 })
470 }
471
472 pub async fn get_schema_history(
474 &self,
475 keyspace: &str,
476 table: &str,
477 ) -> Result<Vec<SchemaVersion>> {
478 if !self.config.enable_versioning {
479 return Err(Error::Schema("Schema versioning is disabled".to_string()));
480 }
481
482 let table_id = format!("{}.{}", keyspace, table);
483 let history = self.version_history.read().await;
484
485 Ok(history.get(&table_id).cloned().unwrap_or_default())
486 }
487
488 pub async fn remove_schema(&self, keyspace: &str, table: &str) -> Result<()> {
490 let table_id = format!("{}.{}", keyspace, table);
491
492 {
493 let mut schemas = self.schemas.write().await;
494 schemas.remove(&table_id);
495 }
496
497 if self.config.enable_versioning {
499 let mut history = self.version_history.write().await;
500 history.remove(&table_id);
501 }
502
503 Ok(())
504 }
505
506 pub async fn generate_cql(&self, keyspace: &str, table: &str) -> Result<String> {
508 if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
510 return self.discovery_engine.generate_cql(&schema_info).await;
511 }
512
513 let schema = self.get_schema(keyspace, table).await?;
515 Ok(self.generate_basic_cql(&schema))
516 }
517
518 #[cfg(feature = "experimental")]
520 pub async fn export_schema_json(&self, keyspace: &str, table: &str) -> Result<String> {
521 self.export_schema_json_with_config(
522 keyspace,
523 table,
524 &crate::schema::json_exporter::JsonExportConfig::default(),
525 )
526 .await
527 }
528
529 #[cfg(not(feature = "experimental"))]
530 pub async fn export_schema_json(&self, _keyspace: &str, _table: &str) -> Result<String> {
531 Err(crate::error::Error::unsupported_format(
532 "JSON export requires experimental feature",
533 ))
534 }
535
536 #[cfg(feature = "experimental")]
538 pub async fn export_schema_json_with_config(
539 &self,
540 keyspace: &str,
541 table: &str,
542 config: &crate::schema::json_exporter::JsonExportConfig,
543 ) -> Result<String> {
544 if let Some(schema_info) = self.get_schema_info(keyspace, table).await? {
546 return self
547 .discovery_engine
548 .export_json_with_config(&schema_info, config)
549 .await;
550 }
551
552 let schema = self.get_schema(keyspace, table).await?;
554 let exporter = crate::schema::json_exporter::JsonExporter::with_config(config.clone());
555 exporter.export_table_schema(&schema)
556 }
557
558 #[cfg(not(feature = "experimental"))]
559 pub async fn export_schema_json_with_config<T>(
560 &self,
561 _keyspace: &str,
562 _table: &str,
563 _config: &T,
564 ) -> Result<String> {
565 Err(crate::error::Error::unsupported_format(
566 "JSON export requires experimental feature",
567 ))
568 }
569
570 #[cfg(feature = "experimental")]
572 pub async fn export_schema_json_compact(&self, keyspace: &str, table: &str) -> Result<String> {
573 let config = crate::schema::json_exporter::JsonExportConfig {
574 format_variant: crate::schema::json_exporter::JsonFormat::Compact,
575 include_metadata: false,
576 include_performance_metrics: false,
577 include_type_details: false,
578 pretty_format: false,
579 ..Default::default()
580 };
581 self.export_schema_json_with_config(keyspace, table, &config)
582 .await
583 }
584
585 #[cfg(not(feature = "experimental"))]
586 pub async fn export_schema_json_compact(
587 &self,
588 _keyspace: &str,
589 _table: &str,
590 ) -> Result<String> {
591 Err(crate::error::Error::unsupported_format(
592 "JSON export requires experimental feature",
593 ))
594 }
595
596 #[cfg(feature = "experimental")]
598 pub async fn export_schema_json_openapi(&self, keyspace: &str, table: &str) -> Result<String> {
599 let config = crate::schema::json_exporter::JsonExportConfig {
600 format_variant: crate::schema::json_exporter::JsonFormat::OpenApi,
601 include_documentation: true,
602 include_type_details: true,
603 include_metadata: false,
604 ..Default::default()
605 };
606 self.export_schema_json_with_config(keyspace, table, &config)
607 .await
608 }
609
610 #[cfg(not(feature = "experimental"))]
611 pub async fn export_schema_json_openapi(
612 &self,
613 _keyspace: &str,
614 _table: &str,
615 ) -> Result<String> {
616 Err(crate::error::Error::unsupported_format(
617 "JSON export requires experimental feature",
618 ))
619 }
620
621 #[cfg(feature = "experimental")]
623 pub async fn export_schema_json_pipeline(&self, keyspace: &str, table: &str) -> Result<String> {
624 let config = crate::schema::json_exporter::JsonExportConfig {
625 format_variant: crate::schema::json_exporter::JsonFormat::DataPipeline,
626 include_type_details: true,
627 include_table_options: false,
628 include_performance_metrics: true,
629 ..Default::default()
630 };
631 self.export_schema_json_with_config(keyspace, table, &config)
632 .await
633 }
634
635 #[cfg(not(feature = "experimental"))]
636 pub async fn export_schema_json_pipeline(
637 &self,
638 _keyspace: &str,
639 _table: &str,
640 ) -> Result<String> {
641 Err(crate::error::Error::unsupported_format(
642 "JSON export requires experimental feature",
643 ))
644 }
645
646 #[cfg(feature = "experimental")]
648 pub async fn export_multiple_schemas_json(
649 &self,
650 schema_infos: &[SchemaInfo],
651 ) -> Result<String> {
652 let exporter = crate::schema::json_exporter::JsonExporter::new();
653 exporter.export_multiple_schemas(schema_infos)
654 }
655
656 #[cfg(not(feature = "experimental"))]
657 pub async fn export_multiple_schemas_json(
658 &self,
659 _schema_infos: &[SchemaInfo],
660 ) -> Result<String> {
661 Err(crate::error::Error::unsupported_format(
662 "JSON export requires experimental feature",
663 ))
664 }
665
666 #[cfg(feature = "experimental")]
668 pub async fn export_keyspace_schemas_json(&self, keyspace: &str) -> Result<String> {
669 let mut schema_infos = Vec::new();
670
671 for (_table_id, entry) in self.schemas.read().await.iter() {
673 if entry.schema.keyspace == keyspace {
674 if let Ok(Some(schema_info)) = self
676 .get_schema_info(&entry.schema.keyspace, &entry.schema.table)
677 .await
678 {
679 schema_infos.push(schema_info);
680 }
681 }
682 }
683
684 if schema_infos.is_empty() {
685 return Err(Error::NotFound(format!(
686 "No schemas found in keyspace '{}'",
687 keyspace
688 )));
689 }
690
691 self.export_multiple_schemas_json(&schema_infos).await
692 }
693
694 #[cfg(not(feature = "experimental"))]
695 pub async fn export_keyspace_schemas_json(&self, _keyspace: &str) -> Result<String> {
696 Err(crate::error::Error::unsupported_format(
697 "JSON export requires experimental feature",
698 ))
699 }
700
701 pub async fn register_udt(&self, udt_def: UdtTypeDef) -> Result<()> {
703 let mut registry = self.udt_registry.write().await;
704 registry.register_udt(udt_def);
705 Ok(())
706 }
707
708 pub async fn get_udt(&self, keyspace: &str, name: &str) -> Result<Option<UdtTypeDef>> {
710 let registry = self.udt_registry.read().await;
711 Ok(registry.get_udt(keyspace, name).cloned())
712 }
713
714 pub(crate) fn get_udt_registry(&self) -> Arc<RwLock<UdtRegistry>> {
719 self.udt_registry.clone()
720 }
721
722 pub async fn get_column_comparator(
724 &self,
725 keyspace: &str,
726 table: &str,
727 column: &str,
728 ) -> Result<ComparatorType> {
729 let schema = self.get_schema(keyspace, table).await?;
730
731 let column_def = schema
733 .columns
734 .iter()
735 .find(|c| c.name == column)
736 .ok_or_else(|| {
737 Error::Schema(format!(
738 "Column '{}' not found in table '{}.{}'",
739 column, keyspace, table
740 ))
741 })?;
742
743 let cql_type = CqlType::parse(&column_def.data_type)?;
745 ComparatorType::from_cql_type(&cql_type)
746 }
747
748 pub async fn get_table_comparators(
750 &self,
751 keyspace: &str,
752 table: &str,
753 ) -> Result<HashMap<String, ComparatorType>> {
754 let schema = self.get_schema(keyspace, table).await?;
755 let mut comparators = HashMap::new();
756
757 for column in &schema.columns {
758 let cql_type = CqlType::parse(&column.data_type)?;
759 let comparator = ComparatorType::from_cql_type(&cql_type)?;
760 comparators.insert(column.name.clone(), comparator);
761 }
762
763 Ok(comparators)
764 }
765
766 pub async fn get_partition_key_comparator(
768 &self,
769 keyspace: &str,
770 table: &str,
771 ) -> Result<Vec<ComparatorType>> {
772 let schema = self.get_schema(keyspace, table).await?;
773 let mut comparators = Vec::new();
774
775 let ordered_keys = schema.ordered_partition_keys();
777 for key_column in ordered_keys {
778 let cql_type = CqlType::parse(&key_column.data_type)?;
779 let comparator = ComparatorType::from_cql_type(&cql_type)?;
780 comparators.push(comparator);
781 }
782
783 Ok(comparators)
784 }
785
786 pub async fn get_parsing_context(&self, keyspace: &str, table: &str) -> Result<ParsingContext> {
788 let schema = self.get_schema(keyspace, table).await?;
789 let partition_comparators = self.get_partition_key_comparator(keyspace, table).await?;
790 let clustering_comparators = self.get_clustering_key_comparator(keyspace, table).await?;
791 let column_comparators = self.get_table_comparators(keyspace, table).await?;
792
793 Ok(ParsingContext {
794 schema,
795 partition_comparators,
796 clustering_comparators,
797 column_comparators,
798 })
799 }
800
801 pub async fn get_clustering_key_comparator(
803 &self,
804 keyspace: &str,
805 table: &str,
806 ) -> Result<Vec<ComparatorType>> {
807 let schema = self.get_schema(keyspace, table).await?;
808 let mut comparators = Vec::new();
809
810 let ordered_keys = schema.ordered_clustering_keys();
812 for key_column in ordered_keys {
813 let cql_type = CqlType::parse(&key_column.data_type)?;
814 let comparator = ComparatorType::from_cql_type(&cql_type)?;
815 comparators.push(comparator);
816 }
817
818 Ok(comparators)
819 }
820
821 pub async fn validate_column_type_compatibility(
823 &self,
824 keyspace: &str,
825 table: &str,
826 column: &str,
827 expected_type: &str,
828 ) -> Result<bool> {
829 let column_comparator = self.get_column_comparator(keyspace, table, column).await?;
830 let expected_cql_type = CqlType::parse(expected_type)?;
831 let expected_comparator = ComparatorType::from_cql_type(&expected_cql_type)?;
832
833 Ok(self.comparators_are_compatible(&column_comparator, &expected_comparator))
835 }
836
837 #[allow(clippy::only_used_in_recursion)]
839 fn comparators_are_compatible(&self, left: &ComparatorType, right: &ComparatorType) -> bool {
840 match (left, right) {
841 (ComparatorType::Boolean, ComparatorType::Boolean) => true,
843 (ComparatorType::TinyInt, ComparatorType::TinyInt) => true,
844 (ComparatorType::SmallInt, ComparatorType::SmallInt) => true,
845 (ComparatorType::Int, ComparatorType::Int) => true,
846 (ComparatorType::BigInt, ComparatorType::BigInt) => true,
847 (ComparatorType::Float32, ComparatorType::Float32) => true,
848 (ComparatorType::Float, ComparatorType::Float) => true,
849 (ComparatorType::Text, ComparatorType::Text) => true,
850 (ComparatorType::Blob, ComparatorType::Blob) => true,
851 (ComparatorType::Timestamp, ComparatorType::Timestamp) => true,
852 (ComparatorType::Uuid, ComparatorType::Uuid) => true,
853 (ComparatorType::Json, ComparatorType::Json) => true,
854
855 (ComparatorType::List(l_elem), ComparatorType::List(r_elem)) => {
857 self.comparators_are_compatible(l_elem, r_elem)
858 }
859 (ComparatorType::Set(l_elem), ComparatorType::Set(r_elem)) => {
860 self.comparators_are_compatible(l_elem, r_elem)
861 }
862 (ComparatorType::Map(l_key, l_val), ComparatorType::Map(r_key, r_val)) => {
863 self.comparators_are_compatible(l_key, r_key)
864 && self.comparators_are_compatible(l_val, r_val)
865 }
866
867 (ComparatorType::Tuple(l_fields), ComparatorType::Tuple(r_fields)) => {
869 l_fields.len() == r_fields.len()
870 && l_fields
871 .iter()
872 .zip(r_fields.iter())
873 .all(|(l, r)| self.comparators_are_compatible(l, r))
874 }
875
876 (
878 ComparatorType::Udt {
879 type_name: l_name,
880 keyspace: l_ks,
881 ..
882 },
883 ComparatorType::Udt {
884 type_name: r_name,
885 keyspace: r_ks,
886 ..
887 },
888 ) => l_name == r_name && l_ks == r_ks,
889
890 (ComparatorType::Frozen(l_inner), ComparatorType::Frozen(r_inner)) => {
892 self.comparators_are_compatible(l_inner, r_inner)
893 }
894
895 (ComparatorType::Custom(l_name), ComparatorType::Custom(r_name)) => l_name == r_name,
897
898 _ => false,
900 }
901 }
902
903 pub async fn get_statistics(&self) -> Result<RegistryStatistics> {
905 let schemas = self.schemas.read().await;
906 let udt_registry = self.udt_registry.read().await;
907 let version_history = self.version_history.read().await;
908
909 let mut stats = RegistryStatistics {
910 total_schemas: schemas.len(),
911 schemas_by_keyspace: HashMap::new(),
912 validated_schemas: 0,
913 schemas_with_warnings: 0,
914 invalid_schemas: 0,
915 total_udts: udt_registry.total_udts(),
916 total_versions: version_history.values().map(|v| v.len()).sum(),
917 auto_discovered_schemas: 0,
918 manually_registered_schemas: 0,
919 cache_hit_rate: 0.0, };
921
922 for entry in schemas.values() {
924 let keyspace = &entry.schema.keyspace;
925 *stats
926 .schemas_by_keyspace
927 .entry(keyspace.clone())
928 .or_insert(0) += 1;
929
930 match entry.validation_status {
931 SchemaValidationStatus::Valid => stats.validated_schemas += 1,
932 SchemaValidationStatus::ValidWithWarnings => stats.schemas_with_warnings += 1,
933 SchemaValidationStatus::Invalid => stats.invalid_schemas += 1,
934 SchemaValidationStatus::NotValidated => {}
935 }
936
937 match entry.source {
938 SchemaSource::Discovered(_) => stats.auto_discovered_schemas += 1,
939 _ => stats.manually_registered_schemas += 1,
940 }
941 }
942
943 Ok(stats)
944 }
945
946 async fn register_discovered_schema(
949 &self,
950 schema: TableSchema,
951 schema_info: Option<SchemaInfo>,
952 sstable_files: Vec<PathBuf>,
953 ) -> Result<()> {
954 let table_id = format!("{}.{}", schema.keyspace, schema.table);
955 let source = SchemaSource::Discovered(sstable_files.clone());
956
957 let entry = SchemaEntry {
958 schema,
959 extended_info: schema_info,
960 registered_at: SystemTime::now(),
961 source,
962 validation_status: SchemaValidationStatus::Valid, _associated_files: sstable_files,
964 };
965
966 let mut schemas = self.schemas.write().await;
967 schemas.insert(table_id, entry);
968
969 Ok(())
970 }
971
972 fn convert_schema_info_to_table_schema(&self, schema_info: &SchemaInfo) -> Result<TableSchema> {
973 let mut columns = Vec::new();
974 let mut partition_keys = Vec::new();
975 let mut clustering_keys = Vec::new();
976
977 for (pos, pk) in schema_info.partition_key.iter().enumerate() {
979 partition_keys.push(crate::schema::KeyColumn {
980 name: pk.name.clone(),
981 data_type: pk.data_type.clone(),
982 position: pos,
983 });
984 }
985
986 for ck in &schema_info.clustering_keys {
988 clustering_keys.push(ck.clone());
989 }
990
991 for col in &schema_info.regular_columns {
993 columns.push(crate::schema::Column {
994 name: col.name.clone(),
995 data_type: col.data_type.clone(),
996 nullable: col.nullable,
997 default: None, is_static: false,
999 });
1000 }
1001
1002 for col in &schema_info.static_columns {
1004 columns.push(crate::schema::Column {
1005 name: col.name.clone(),
1006 data_type: col.data_type.clone(),
1007 nullable: col.nullable,
1008 default: None, is_static: true,
1010 });
1011 }
1012
1013 Ok(TableSchema {
1014 keyspace: schema_info.keyspace.clone(),
1015 table: schema_info.table.clone(),
1016 partition_keys,
1017 clustering_keys,
1018 columns,
1019 comments: HashMap::new(),
1020 })
1021 }
1022
1023 fn is_entry_expired(&self, entry: &SchemaEntry) -> bool {
1024 if !self.config.enable_caching {
1025 return false;
1026 }
1027
1028 let ttl = std::time::Duration::from_secs(self.config.cache_ttl_seconds);
1029 entry
1030 .registered_at
1031 .elapsed()
1032 .unwrap_or(std::time::Duration::ZERO)
1033 > ttl
1034 }
1035
1036 async fn refresh_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
1037 self.auto_discover_schema(keyspace, table).await
1040 }
1041
1042 async fn auto_discover_schema(&self, keyspace: &str, table: &str) -> Result<TableSchema> {
1043 let sstable_files = self.find_sstable_files(keyspace, table).await?;
1046
1047 if sstable_files.is_empty() {
1048 return Err(Error::Schema(format!(
1049 "No SSTables found for {}.{}",
1050 keyspace, table
1051 )));
1052 }
1053
1054 self.discover_schema(keyspace, table, &sstable_files).await
1055 }
1056
1057 async fn find_sstable_files(&self, _keyspace: &str, _table: &str) -> Result<Vec<PathBuf>> {
1058 Ok(Vec::new())
1061 }
1062
1063 fn matches_query(&self, schema: &TableSchema, query: &SchemaQuery) -> bool {
1064 if let Some(ref ks) = query.keyspace {
1066 if &schema.keyspace != ks {
1067 return false;
1068 }
1069 }
1070
1071 if let Some(ref pattern) = query.table_pattern {
1073 if !self.matches_pattern(&schema.table, pattern) {
1074 return false;
1075 }
1076 }
1077
1078 true
1080 }
1081
1082 fn matches_pattern(&self, text: &str, pattern: &str) -> bool {
1083 if pattern == "*" {
1085 return true;
1086 }
1087
1088 text == pattern || text.contains(pattern)
1090 }
1091
1092 async fn create_schema_version(&self, table_id: &str, new_schema: &TableSchema) -> Result<()> {
1093 let mut version_history = self.version_history.write().await;
1094 let versions = version_history
1095 .entry(table_id.to_string())
1096 .or_insert_with(Vec::new);
1097
1098 let version_number = versions.len() as u32 + 1;
1099 let changes = if versions.is_empty() {
1100 vec![SchemaChange {
1101 change_type: SchemaChangeType::ColumnAdded,
1102 component: "initial".to_string(),
1103 description: "Initial schema version".to_string(),
1104 old_value: None,
1105 new_value: None,
1106 }]
1107 } else {
1108 self.detect_schema_changes(&versions.last().unwrap().schema, new_schema)
1110 };
1111
1112 let new_version = SchemaVersion {
1113 version: version_number,
1114 created_at: SystemTime::now(),
1115 schema: new_schema.clone(),
1116 changes,
1117 source: "registry".to_string(),
1118 };
1119
1120 versions.push(new_version);
1121
1122 if versions.len() > self.config.max_versions_per_schema {
1124 versions.remove(0);
1125 }
1126
1127 Ok(())
1128 }
1129
1130 fn detect_schema_changes(
1131 &self,
1132 old_schema: &TableSchema,
1133 new_schema: &TableSchema,
1134 ) -> Vec<SchemaChange> {
1135 let mut changes = Vec::new();
1136
1137 let old_columns: HashMap<_, _> = old_schema.columns.iter().map(|c| (&c.name, c)).collect();
1139 let new_columns: HashMap<_, _> = new_schema.columns.iter().map(|c| (&c.name, c)).collect();
1140
1141 for (name, column) in &new_columns {
1143 if !old_columns.contains_key(name) {
1144 changes.push(SchemaChange {
1145 change_type: SchemaChangeType::ColumnAdded,
1146 component: name.to_string(),
1147 description: format!(
1148 "Column '{}' added with type '{}'",
1149 name, column.data_type
1150 ),
1151 old_value: None,
1152 new_value: Some(column.data_type.clone()),
1153 });
1154 }
1155 }
1156
1157 for name in old_columns.keys() {
1159 if !new_columns.contains_key(name) {
1160 changes.push(SchemaChange {
1161 change_type: SchemaChangeType::ColumnRemoved,
1162 component: name.to_string(),
1163 description: format!("Column '{}' removed", name),
1164 old_value: None,
1165 new_value: None,
1166 });
1167 }
1168 }
1169
1170 for (name, new_column) in &new_columns {
1172 if let Some(old_column) = old_columns.get(name) {
1173 if old_column.data_type != new_column.data_type {
1174 changes.push(SchemaChange {
1175 change_type: SchemaChangeType::ColumnTypeChanged,
1176 component: name.to_string(),
1177 description: format!("Column '{}' type changed", name),
1178 old_value: Some(old_column.data_type.clone()),
1179 new_value: Some(new_column.data_type.clone()),
1180 });
1181 }
1182 }
1183 }
1184
1185 changes
1186 }
1187
1188 async fn validate_schema_udts(
1189 &self,
1190 schema: &TableSchema,
1191 errors: &mut Vec<ValidationError>,
1192 warnings: &mut Vec<ValidationWarning>,
1193 ) {
1194 let udt_registry = self.udt_registry.read().await;
1195
1196 for column in &schema.columns {
1197 if let Ok(cql_type) = CqlType::parse(&column.data_type) {
1199 self.validate_cql_type_udts(
1200 &cql_type,
1201 &schema.keyspace,
1202 &udt_registry,
1203 errors,
1204 warnings,
1205 );
1206 }
1207 }
1208 }
1209
1210 #[allow(clippy::only_used_in_recursion)]
1211 fn validate_cql_type_udts(
1212 &self,
1213 cql_type: &CqlType,
1214 keyspace: &str,
1215 udt_registry: &UdtRegistry,
1216 errors: &mut Vec<ValidationError>,
1217 _warnings: &mut Vec<ValidationWarning>,
1218 ) {
1219 match cql_type {
1220 CqlType::Udt(udt_name, _) => {
1221 if !udt_registry.contains_udt(keyspace, udt_name) {
1222 errors.push(ValidationError {
1223 code: "UDT_NOT_FOUND".to_string(),
1224 message: format!("UDT '{}' not found in keyspace '{}'", udt_name, keyspace),
1225 component: Some(udt_name.clone()),
1226 severity: ErrorSeverity::High,
1227 });
1228 }
1229 }
1230 CqlType::List(inner) | CqlType::Set(inner) | CqlType::Frozen(inner) => {
1231 self.validate_cql_type_udts(inner, keyspace, udt_registry, errors, _warnings);
1232 }
1233 CqlType::Map(key_type, value_type) => {
1234 self.validate_cql_type_udts(key_type, keyspace, udt_registry, errors, _warnings);
1235 self.validate_cql_type_udts(value_type, keyspace, udt_registry, errors, _warnings);
1236 }
1237 CqlType::Tuple(types) => {
1238 for t in types {
1239 self.validate_cql_type_udts(t, keyspace, udt_registry, errors, _warnings);
1240 }
1241 }
1242 _ => {} }
1244 }
1245
1246 async fn validate_column_types(
1247 &self,
1248 schema: &TableSchema,
1249 errors: &mut Vec<ValidationError>,
1250 _warnings: &mut [ValidationWarning],
1251 ) {
1252 for column in &schema.columns {
1253 if let Err(e) = CqlType::parse(&column.data_type) {
1254 errors.push(ValidationError {
1255 code: "INVALID_COLUMN_TYPE".to_string(),
1256 message: format!("Invalid column type '{}': {}", column.data_type, e),
1257 component: Some(column.name.clone()),
1258 severity: ErrorSeverity::High,
1259 });
1260 }
1261 }
1262 }
1263
1264 async fn generate_performance_recommendations(
1265 &self,
1266 schema: &TableSchema,
1267 recommendations: &mut Vec<String>,
1268 ) {
1269 if schema.partition_keys.len() > 3 {
1273 recommendations.push(
1274 "Consider reducing the number of partition key columns for better performance"
1275 .to_string(),
1276 );
1277 }
1278
1279 if schema.clustering_keys.len() > 5 {
1281 recommendations
1282 .push("Large number of clustering keys may impact query performance".to_string());
1283 }
1284
1285 if schema.columns.len() > 50 {
1287 recommendations.push(
1288 "Consider using UDTs or denormalizing wide tables for better performance"
1289 .to_string(),
1290 );
1291 }
1292 }
1293
1294 fn generate_basic_cql(&self, schema: &TableSchema) -> String {
1295 let mut cql = format!("CREATE TABLE {}.{} (\n", schema.keyspace, schema.table);
1296
1297 for (i, column) in schema.columns.iter().enumerate() {
1299 if i > 0 {
1300 cql.push_str(",\n");
1301 }
1302 cql.push_str(&format!(" {} {}", column.name, column.data_type));
1303 }
1304
1305 if !schema.partition_keys.is_empty() {
1307 cql.push_str(",\n PRIMARY KEY (");
1308
1309 if schema.partition_keys.len() == 1 && schema.clustering_keys.is_empty() {
1310 cql.push_str(&schema.partition_keys[0].name);
1311 } else {
1312 cql.push('(');
1314 for (i, pk) in schema.partition_keys.iter().enumerate() {
1315 if i > 0 {
1316 cql.push_str(", ");
1317 }
1318 cql.push_str(&pk.name);
1319 }
1320 cql.push(')');
1321
1322 if !schema.clustering_keys.is_empty() {
1323 for ck in &schema.clustering_keys {
1324 cql.push_str(", ");
1325 cql.push_str(&ck.name);
1326 }
1327 }
1328 }
1329
1330 cql.push(')');
1331 }
1332
1333 cql.push_str("\n);");
1334 cql
1335 }
1336}
1337
1338#[derive(Debug, Clone)]
1340pub struct RegistryStatistics {
1341 pub total_schemas: usize,
1343 pub schemas_by_keyspace: HashMap<String, usize>,
1345 pub validated_schemas: usize,
1347 pub schemas_with_warnings: usize,
1349 pub invalid_schemas: usize,
1351 pub total_udts: usize,
1353 pub total_versions: usize,
1355 pub auto_discovered_schemas: usize,
1357 pub manually_registered_schemas: usize,
1359 pub cache_hit_rate: f64,
1361}
1362
1363#[derive(Debug, Clone)]
1365pub struct ParsingContext {
1366 pub schema: TableSchema,
1368 pub partition_comparators: Vec<ComparatorType>,
1370 pub clustering_comparators: Vec<ComparatorType>,
1372 pub column_comparators: HashMap<String, ComparatorType>,
1374}
1375
1376impl ParsingContext {
1377 pub fn get_column_comparator(&self, column_name: &str) -> Option<&ComparatorType> {
1379 self.column_comparators.get(column_name)
1380 }
1381
1382 pub fn is_complete(&self) -> bool {
1384 !self.partition_comparators.is_empty() || !self.schema.partition_keys.is_empty()
1385 }
1386
1387 pub fn get_all_key_column_names(&self) -> Vec<String> {
1389 let mut names = Vec::new();
1390 names.extend(
1391 self.schema
1392 .ordered_partition_keys()
1393 .iter()
1394 .map(|k| k.name.clone()),
1395 );
1396 names.extend(
1397 self.schema
1398 .ordered_clustering_keys()
1399 .iter()
1400 .map(|k| k.name.clone()),
1401 );
1402 names
1403 }
1404}
1405
1406#[derive(Debug)]
1408pub struct SchemaValidator;
1409
1410impl Default for SchemaValidator {
1411 fn default() -> Self {
1412 Self::new()
1413 }
1414}
1415
1416impl SchemaValidator {
1417 pub fn new() -> Self {
1418 Self
1419 }
1420
1421 pub async fn validate_table_schema(&self, schema: &TableSchema) -> Result<()> {
1422 schema.validate()
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use super::*;
1429 use std::collections::HashMap;
1430
1431 async fn make_registry(mut reg_config: SchemaRegistryConfig) -> SchemaRegistry {
1432 reg_config.enable_auto_discovery = false;
1433 let core_config = Config::default();
1434 let platform = Arc::new(Platform::new(&core_config).await.expect("platform"));
1435 SchemaRegistry::new(reg_config, platform, core_config)
1436 .await
1437 .expect("registry")
1438 }
1439
1440 fn simple_schema(name: &str) -> TableSchema {
1441 TableSchema {
1442 keyspace: "test_ks".to_string(),
1443 table: name.to_string(),
1444 partition_keys: vec![crate::schema::KeyColumn {
1445 name: "id".to_string(),
1446 data_type: "uuid".to_string(),
1447 position: 0,
1448 }],
1449 clustering_keys: vec![],
1450 columns: vec![crate::schema::Column {
1451 name: "id".to_string(),
1452 data_type: "uuid".to_string(),
1453 nullable: false,
1454 default: None,
1455 is_static: false,
1456 }],
1457 comments: HashMap::new(),
1458 }
1459 }
1460
1461 #[tokio::test]
1462 async fn test_schema_registry_creation() {
1463 let registry = make_registry(SchemaRegistryConfig::default()).await;
1464 let stats = registry.get_statistics().await.unwrap();
1465 assert_eq!(stats.total_schemas, 0);
1466 }
1467
1468 #[test]
1469 fn test_schema_query_creation() {
1470 let query = SchemaQuery {
1471 keyspace: Some("test_ks".to_string()),
1472 table_pattern: Some("user_*".to_string()),
1473 source_types: None,
1474 validated_only: false,
1475 include_history: false,
1476 };
1477
1478 assert_eq!(query.keyspace.as_ref().unwrap(), "test_ks");
1479 assert_eq!(query.table_pattern.as_ref().unwrap(), "user_*");
1480 }
1481
1482 #[tokio::test]
1483 async fn register_and_retrieve_schema() {
1484 let registry = make_registry(SchemaRegistryConfig::default()).await;
1485 let schema = simple_schema("users");
1486
1487 registry
1488 .register_schema(schema.clone(), SchemaSource::Manual)
1489 .await
1490 .expect("register schema");
1491
1492 let fetched = registry
1493 .get_schema("test_ks", "users")
1494 .await
1495 .expect("fetch schema");
1496 assert_eq!(fetched.table, "users");
1497 assert_eq!(fetched.partition_keys.len(), 1);
1498 }
1499
1500 #[tokio::test]
1501 async fn schema_version_history_tracks_changes() {
1502 let registry = make_registry(SchemaRegistryConfig::default()).await;
1503 let mut schema = simple_schema("accounts");
1504
1505 registry
1506 .register_schema(schema.clone(), SchemaSource::Manual)
1507 .await
1508 .expect("register v1");
1509
1510 schema.columns.push(crate::schema::Column {
1511 name: "status".to_string(),
1512 data_type: "text".to_string(),
1513 nullable: true,
1514 default: None,
1515 is_static: false,
1516 });
1517
1518 registry
1519 .register_schema(schema.clone(), SchemaSource::Manual)
1520 .await
1521 .expect("register v2");
1522
1523 let history = registry
1524 .get_schema_history("test_ks", "accounts")
1525 .await
1526 .expect("history");
1527
1528 assert_eq!(
1529 history.len(),
1530 1,
1531 "Second registration should emit first version"
1532 );
1533 assert!(history[0]
1534 .changes
1535 .iter()
1536 .any(|change| matches!(change.change_type, SchemaChangeType::ColumnAdded)));
1537 }
1538
1539 #[tokio::test]
1540 async fn expired_cached_schema_invokes_discovery_path() {
1541 let mut config = SchemaRegistryConfig::default();
1542 config.cache_ttl_seconds = 0;
1543 config.enable_auto_discovery = true;
1544 let registry = make_registry(config).await;
1545
1546 registry
1547 .register_schema(simple_schema("events"), SchemaSource::Manual)
1548 .await
1549 .expect("register events schema");
1550
1551 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1552
1553 let err = registry
1554 .get_schema("test_ks", "events")
1555 .await
1556 .expect_err("expired schema should attempt discovery");
1557
1558 assert!(matches!(err, Error::Schema(message) if message.contains("No SSTables found")));
1559 }
1560}