1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime};
11
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14
15use crate::{
16 parser::header::{CassandraVersion, SSTableHeader},
17 platform::Platform,
18 schema::{ClusteringColumn, UdtRegistry},
19 types::Value,
20 Config, Result,
21};
22
23#[derive(Debug, Clone)]
25pub struct SchemaDiscoveryConfig {
26 pub max_sample_rows: usize,
28 pub aggressive_inference: bool,
30 pub enable_schema_cache: bool,
32 pub cache_ttl_seconds: u64,
34 pub enable_versioning: bool,
36 pub max_versions: usize,
38 pub enable_udt_discovery: bool,
40 pub enable_collection_analysis: bool,
42 pub enable_index_discovery: bool,
44 pub enable_cross_file_validation: bool,
46 pub min_confidence_threshold: f64,
48}
49
50impl Default for SchemaDiscoveryConfig {
51 fn default() -> Self {
52 Self {
53 max_sample_rows: 2000,
54 aggressive_inference: true,
55 enable_schema_cache: true,
56 cache_ttl_seconds: 3600, enable_versioning: true,
58 max_versions: 10,
59 enable_udt_discovery: true,
60 enable_collection_analysis: true,
61 enable_index_discovery: true,
62 enable_cross_file_validation: true,
63 min_confidence_threshold: 0.7,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SchemaInfo {
71 pub keyspace: String,
73 pub table: String,
75 pub partition_key: Vec<ColumnDefinition>,
77 pub clustering_keys: Vec<ClusteringColumn>,
79 pub regular_columns: Vec<ColumnDefinition>,
81 pub static_columns: Vec<ColumnDefinition>,
83 pub collection_types: HashMap<String, CollectionType>,
85 pub user_defined_types: Vec<UDTDefinition>,
87 pub indexes: Vec<IndexDefinition>,
89 pub table_options: TableOptions,
91 pub metadata: SchemaMetadata,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ColumnDefinition {
98 pub name: String,
100 pub data_type: String,
102 pub type_info: TypeInfo,
104 pub nullable: bool,
106 pub is_static: bool,
108 pub default_value: Option<Value>,
110 pub position: usize,
112 pub confidence: f64,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct TypeInfo {
119 pub type_id: String,
121 pub type_params: Vec<String>,
123 pub is_frozen: bool,
125 pub element_type: Option<Box<TypeInfo>>,
127 pub key_type: Option<Box<TypeInfo>>,
129 pub value_type: Option<Box<TypeInfo>>,
131 pub udt_fields: Option<Vec<UdtFieldInfo>>,
133 pub tuple_elements: Option<Vec<TypeInfo>>,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct UdtFieldInfo {
140 pub name: String,
142 pub field_type: TypeInfo,
144 pub nullable: bool,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CollectionType {
151 pub kind: CollectionKind,
153 pub element_type: Option<String>,
155 pub key_type: Option<String>,
157 pub value_type: Option<String>,
159 pub is_frozen: bool,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub enum CollectionKind {
166 List,
167 Set,
168 Map,
169 Tuple,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct UDTDefinition {
175 pub name: String,
177 pub keyspace: String,
179 pub fields: Vec<UdtFieldDefinition>,
181 pub version: Option<u32>,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct UdtFieldDefinition {
188 pub name: String,
190 pub field_type: String,
192 pub position: usize,
194 pub nullable: bool,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct IndexDefinition {
201 pub name: String,
203 pub target_column: String,
205 pub index_type: IndexType,
207 pub options: HashMap<String, String>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub enum IndexType {
214 Secondary,
216 Composite,
218 Custom(String),
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct TableOptions {
225 pub compaction: Option<CompactionStrategy>,
227 pub compression: Option<CompressionOptions>,
229 pub caching: Option<CachingOptions>,
231 pub bloom_filter_fp_chance: Option<f64>,
233 pub gc_grace_seconds: Option<u32>,
235 pub default_time_to_live: Option<u32>,
237 pub memtable_flush_period_in_ms: Option<u32>,
239 pub additional_properties: HashMap<String, String>,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct CompactionStrategy {
246 pub class: String,
248 pub options: HashMap<String, String>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CompressionOptions {
255 pub algorithm: String,
257 pub chunk_length_kb: Option<u32>,
259 pub crc_check_chance: Option<f32>,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct CachingOptions {
266 pub keys: String,
268 pub rows_per_partition: String,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct SchemaMetadata {
275 pub discovered_at: SystemTime,
277 pub source_files: Vec<PathBuf>,
279 pub total_rows_sampled: usize,
281 pub cassandra_version: Option<CassandraVersion>,
283 pub discovery_method: DiscoveryMethod,
285 pub version: u32,
287 pub validation_results: ValidationResults,
289 pub performance_metrics: DiscoveryMetrics,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
295pub enum DiscoveryMethod {
296 HeaderMetadata,
298 DataSampling,
300 Hybrid,
302 External,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct ValidationResults {
309 pub status: ValidationStatus,
311 pub errors: Vec<ValidationError>,
313 pub warnings: Vec<ValidationWarning>,
315 pub consistency_results: ConsistencyResults,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
321pub enum ValidationStatus {
322 Valid,
324 ValidWithWarnings,
326 Invalid,
328 ValidationFailed,
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
334pub struct ValidationError {
335 pub error_type: ValidationErrorType,
337 pub message: String,
339 pub component: Option<String>,
341 pub source_file: Option<PathBuf>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
347pub enum ValidationErrorType {
348 TypeMismatch,
350 MissingComponent,
352 InvalidTypeDefinition,
354 ConstraintViolation,
356 UdtInconsistency,
358 IndexError,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct ValidationWarning {
365 pub warning_type: ValidationWarningType,
367 pub message: String,
369 pub component: Option<String>,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
375pub enum ValidationWarningType {
376 LowConfidence,
378 DeprecatedFeature,
380 VersionCompatibility,
382 PerformanceRecommendation,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
388pub struct ConsistencyResults {
389 pub files_analyzed: usize,
391 pub schema_mismatches: usize,
393 pub type_inconsistencies: Vec<TypeInconsistency>,
395 pub udt_conflicts: Vec<UdtConflict>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
401pub struct TypeInconsistency {
402 pub column_name: String,
404 pub conflicting_types: Vec<String>,
406 pub conflicting_files: Vec<PathBuf>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
412pub struct UdtConflict {
413 pub udt_name: String,
415 pub field_conflicts: Vec<FieldConflict>,
417 pub conflicting_files: Vec<PathBuf>,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct FieldConflict {
424 pub field_name: String,
426 pub conflicting_types: Vec<String>,
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize)]
432pub struct DiscoveryMetrics {
433 pub total_time_ms: u64,
435 pub header_parsing_time_ms: u64,
437 pub data_sampling_time_ms: u64,
439 pub type_inference_time_ms: u64,
441 pub validation_time_ms: u64,
443 pub peak_memory_usage_bytes: usize,
445}
446
447#[derive(Debug)]
449pub struct SchemaDiscoveryEngine {
450 config: SchemaDiscoveryConfig,
452 #[allow(dead_code)]
454 platform: Arc<Platform>,
455 #[allow(dead_code)]
457 core_config: Config,
458 schema_cache: Arc<RwLock<HashMap<String, (SchemaInfo, SystemTime)>>>,
460 #[allow(dead_code)]
462 udt_registry: Arc<RwLock<UdtRegistry>>,
463 #[allow(dead_code)]
465 type_inference: Arc<TypeInferenceEngine>,
466 #[allow(dead_code)]
468 validator: Arc<SchemaValidator>,
469 exporter: Arc<SchemaExporter>,
471}
472
473impl SchemaDiscoveryEngine {
474 pub async fn new(
476 config: SchemaDiscoveryConfig,
477 platform: Arc<Platform>,
478 core_config: Config,
479 ) -> Result<Self> {
480 let udt_registry = Arc::new(RwLock::new(UdtRegistry::new()));
481 let type_inference = Arc::new(TypeInferenceEngine::new());
482 let validator = Arc::new(SchemaValidator::new());
483 let exporter = Arc::new(SchemaExporter::new());
484
485 Ok(Self {
486 config,
487 platform,
488 core_config,
489 schema_cache: Arc::new(RwLock::new(HashMap::new())),
490 udt_registry,
491 type_inference,
492 validator,
493 exporter,
494 })
495 }
496
497 pub async fn discover_schema(
499 &self,
500 keyspace: &str,
501 table: &str,
502 sstable_files: &[PathBuf],
503 ) -> Result<SchemaInfo> {
504 let cache_key = format!("{}.{}", keyspace, table);
505 let start_time = SystemTime::now();
506
507 if self.config.enable_schema_cache {
509 if let Some(cached_schema) = self.get_cached_schema(&cache_key).await {
510 return Ok(cached_schema);
511 }
512 }
513
514 let mut discovery_context = DiscoveryContext::new(keyspace, table, sstable_files);
516
517 self.extract_header_metadata(&mut discovery_context).await?;
519
520 self.sample_data_for_inference(&mut discovery_context)
522 .await?;
523
524 if self.config.enable_udt_discovery {
526 self.discover_udts(&mut discovery_context).await?;
527 }
528
529 if self.config.enable_collection_analysis {
531 self.analyze_collection_types(&mut discovery_context)
532 .await?;
533 }
534
535 if self.config.enable_index_discovery {
537 self.discover_indexes(&mut discovery_context).await?;
538 }
539
540 let schema_info = self.build_schema_info(&mut discovery_context).await?;
542
543 let validated_schema = schema_info;
545
546 let discovery_time = start_time.elapsed().unwrap_or(Duration::ZERO);
548 let final_schema =
549 self.add_performance_metrics(validated_schema, discovery_time, &discovery_context);
550
551 if self.config.enable_schema_cache {
553 self.cache_schema(cache_key, final_schema.clone()).await;
554 }
555
556 Ok(final_schema)
557 }
558
559 pub async fn generate_cql(&self, schema: &SchemaInfo) -> Result<String> {
561 self.exporter.generate_cql(schema).await
562 }
563
564 #[cfg(feature = "experimental")]
566 pub async fn export_json(&self, schema: &SchemaInfo) -> Result<String> {
567 self.exporter.export_json(schema).await
568 }
569
570 #[cfg(not(feature = "experimental"))]
571 pub async fn export_json(&self, _schema: &SchemaInfo) -> Result<String> {
572 Err(crate::error::Error::unsupported_format(
573 "JSON export requires experimental feature",
574 ))
575 }
576
577 #[cfg(feature = "experimental")]
579 pub async fn export_json_with_config(
580 &self,
581 schema: &SchemaInfo,
582 config: &crate::schema::json_exporter::JsonExportConfig,
583 ) -> Result<String> {
584 self.exporter.export_json_with_config(schema, config).await
585 }
586
587 #[cfg(not(feature = "experimental"))]
588 pub async fn export_json_with_config<T>(
589 &self,
590 _schema: &SchemaInfo,
591 _config: &T,
592 ) -> Result<String> {
593 Err(crate::error::Error::unsupported_format(
594 "JSON export requires experimental feature",
595 ))
596 }
597
598 pub async fn compare_schemas(
600 &self,
601 schema1: &SchemaInfo,
602 schema2: &SchemaInfo,
603 ) -> Result<String> {
604 self.exporter
605 .generate_comparison_report(schema1, schema2)
606 .await
607 }
608
609 async fn get_cached_schema(&self, cache_key: &str) -> Option<SchemaInfo> {
612 let cache = self.schema_cache.read().await;
613 if let Some((schema, cached_at)) = cache.get(cache_key) {
614 let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
615 if cached_at.elapsed().unwrap_or(Duration::MAX) < ttl {
616 return Some(schema.clone());
617 }
618 }
619 None
620 }
621
622 async fn cache_schema(&self, cache_key: String, schema: SchemaInfo) {
623 let mut cache = self.schema_cache.write().await;
624 cache.insert(cache_key, (schema, SystemTime::now()));
625
626 if cache.len() > 100 {
628 let oldest_key = cache
629 .iter()
630 .min_by_key(|(_, (_, time))| time)
631 .map(|(key, _)| key.clone());
632
633 if let Some(key) = oldest_key {
634 cache.remove(&key);
635 }
636 }
637 }
638}
639
640#[derive(Debug)]
642struct DiscoveryContext {
643 #[allow(dead_code)]
644 keyspace: String,
645 #[allow(dead_code)]
646 table: String,
647 #[allow(dead_code)]
648 source_files: Vec<PathBuf>,
649 #[allow(dead_code)]
650 headers: Vec<SSTableHeader>,
651 #[allow(dead_code)]
652 column_samples: HashMap<String, Vec<Value>>,
653 #[allow(dead_code)]
654 discovered_udts: HashMap<String, UDTDefinition>,
655 #[allow(dead_code)]
656 collection_types: HashMap<String, CollectionType>,
657 #[allow(dead_code)]
658 indexes: Vec<IndexDefinition>,
659 #[allow(dead_code)]
660 table_options: TableOptions,
661 #[allow(dead_code)]
662 total_rows_sampled: usize,
663 #[allow(dead_code)]
664 cassandra_version: Option<CassandraVersion>,
665}
666
667impl DiscoveryContext {
668 fn new(keyspace: &str, table: &str, files: &[PathBuf]) -> Self {
669 Self {
670 keyspace: keyspace.to_string(),
671 table: table.to_string(),
672 source_files: files.to_vec(),
673 headers: Vec::new(),
674 column_samples: HashMap::new(),
675 discovered_udts: HashMap::new(),
676 collection_types: HashMap::new(),
677 indexes: Vec::new(),
678 table_options: TableOptions {
679 compaction: None,
680 compression: None,
681 caching: None,
682 bloom_filter_fp_chance: None,
683 gc_grace_seconds: None,
684 default_time_to_live: None,
685 memtable_flush_period_in_ms: None,
686 additional_properties: HashMap::new(),
687 },
688 total_rows_sampled: 0,
689 cassandra_version: None,
690 }
691 }
692}
693
694#[derive(Debug)]
696pub struct TypeInferenceEngine {
697 }
699
700impl TypeInferenceEngine {
701 fn new() -> Self {
702 Self {}
703 }
704
705 #[allow(dead_code)]
707 fn infer_column_type<'a>(
708 &'a self,
709 samples: &'a [Value],
710 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TypeInfo>> + 'a>> {
711 Box::pin(async move {
712 if samples.is_empty() {
713 return Ok(TypeInfo {
714 type_id: "text".to_string(),
715 type_params: vec![],
716 is_frozen: false,
717 element_type: None,
718 key_type: None,
719 value_type: None,
720 udt_fields: None,
721 tuple_elements: None,
722 });
723 }
724
725 let mut type_counts = HashMap::new();
727 let mut has_complex_types = false;
728
729 for sample in samples {
730 let type_name = self.get_value_type_name(sample);
731 *type_counts.entry(type_name.clone()).or_insert(0) += 1;
732
733 if matches!(
735 sample,
736 Value::List(_)
737 | Value::Set(_)
738 | Value::Map(_)
739 | Value::Tuple(_)
740 | Value::Udt(_)
741 ) {
742 has_complex_types = true;
743 }
744 }
745
746 let most_common_type = type_counts
748 .iter()
749 .max_by_key(|(_, count)| *count)
750 .map(|(type_name, _)| type_name.clone())
751 .unwrap_or_else(|| "text".to_string());
752
753 if has_complex_types {
755 return self.infer_complex_type(samples, &most_common_type).await;
756 }
757
758 Ok(TypeInfo {
760 type_id: self.normalize_type_name(&most_common_type),
761 type_params: vec![],
762 is_frozen: false,
763 element_type: None,
764 key_type: None,
765 value_type: None,
766 udt_fields: None,
767 tuple_elements: None,
768 })
769 })
770 }
771
772 fn get_value_type_name(&self, value: &Value) -> String {
774 match value {
775 Value::Null => "null".to_string(),
776 Value::Text(_) => "text".to_string(),
777 Value::Integer(_) => "int".to_string(),
778 Value::BigInt(_) => "bigint".to_string(),
779 Value::Counter(_) => "counter".to_string(),
780 Value::Float(_) => "double".to_string(),
781 Value::Boolean(_) => "boolean".to_string(),
782 Value::Uuid(_) => "uuid".to_string(),
783 Value::Timestamp(_) => "timestamp".to_string(),
784 Value::Date(_) => "date".to_string(),
785 Value::Time(_) => "time".to_string(),
786 Value::Inet(_) => "inet".to_string(),
787 Value::Blob(_) => "blob".to_string(),
788 Value::List(_) => "list".to_string(),
789 Value::Set(_) => "set".to_string(),
790 Value::Map(_) => "map".to_string(),
791 Value::Json(_) => "text".to_string(), Value::TinyInt(_) => "tinyint".to_string(),
793 Value::SmallInt(_) => "smallint".to_string(),
794 Value::Float32(_) => "float".to_string(),
795 Value::Tuple(_) => "tuple".to_string(),
796 Value::Udt(_) => "udt".to_string(),
797 Value::Frozen(_) => "frozen".to_string(),
798 Value::Varint(_) => "varint".to_string(),
799 Value::Decimal { .. } => "decimal".to_string(),
800 Value::Duration { .. } => "duration".to_string(),
801 Value::Tombstone(_) => "tombstone".to_string(),
802 }
803 }
804
805 fn normalize_type_name(&self, type_name: &str) -> String {
807 match type_name.to_lowercase().as_str() {
808 "int" | "integer" => "int".to_string(),
809 "bigint" | "biginteger" => "bigint".to_string(),
810 "double" | "float64" => "double".to_string(),
811 "float" | "float32" => "float".to_string(),
812 "text" | "varchar" | "string" => "text".to_string(),
813 "bool" | "boolean" => "boolean".to_string(),
814 "timestamp" | "datetime" => "timestamp".to_string(),
815 "blob" | "bytes" => "blob".to_string(),
816 "uuid" => "uuid".to_string(),
817 "decimal" => "decimal".to_string(),
818 "varint" => "varint".to_string(),
819 "tinyint" => "tinyint".to_string(),
820 "smallint" => "smallint".to_string(),
821 "duration" => "duration".to_string(),
822 _ => type_name.to_string(),
823 }
824 }
825
826 async fn infer_complex_type(&self, samples: &[Value], base_type: &str) -> Result<TypeInfo> {
828 match base_type {
829 "list" => self.infer_list_type(samples).await,
830 "set" => self.infer_set_type(samples).await,
831 "map" => self.infer_map_type(samples).await,
832 "tuple" => self.infer_tuple_type(samples).await,
833 "udt" => self.infer_udt_type(samples).await,
834 _ => Ok(TypeInfo {
835 type_id: self.normalize_type_name(base_type),
836 type_params: vec![],
837 is_frozen: false,
838 element_type: None,
839 key_type: None,
840 value_type: None,
841 udt_fields: None,
842 tuple_elements: None,
843 }),
844 }
845 }
846
847 async fn infer_list_type(&self, samples: &[Value]) -> Result<TypeInfo> {
849 let mut element_samples = Vec::new();
850
851 for sample in samples {
852 if let Value::List(elements) = sample {
853 element_samples.extend(elements.iter().cloned());
854 }
855 }
856
857 let element_type = if !element_samples.is_empty() {
858 Box::new(self.infer_column_type(&element_samples).await?)
859 } else {
860 Box::new(TypeInfo {
861 type_id: "text".to_string(),
862 type_params: vec![],
863 is_frozen: false,
864 element_type: None,
865 key_type: None,
866 value_type: None,
867 udt_fields: None,
868 tuple_elements: None,
869 })
870 };
871
872 Ok(TypeInfo {
873 type_id: "list".to_string(),
874 type_params: vec![element_type.type_id.clone()],
875 is_frozen: false,
876 element_type: Some(element_type),
877 key_type: None,
878 value_type: None,
879 udt_fields: None,
880 tuple_elements: None,
881 })
882 }
883
884 async fn infer_set_type(&self, samples: &[Value]) -> Result<TypeInfo> {
886 let mut element_samples = Vec::new();
887
888 for sample in samples {
889 if let Value::Set(elements) = sample {
890 element_samples.extend(elements.iter().cloned());
891 }
892 }
893
894 let element_type = if !element_samples.is_empty() {
895 Box::new(self.infer_column_type(&element_samples).await?)
896 } else {
897 Box::new(TypeInfo {
898 type_id: "text".to_string(),
899 type_params: vec![],
900 is_frozen: false,
901 element_type: None,
902 key_type: None,
903 value_type: None,
904 udt_fields: None,
905 tuple_elements: None,
906 })
907 };
908
909 Ok(TypeInfo {
910 type_id: "set".to_string(),
911 type_params: vec![element_type.type_id.clone()],
912 is_frozen: false,
913 element_type: Some(element_type),
914 key_type: None,
915 value_type: None,
916 udt_fields: None,
917 tuple_elements: None,
918 })
919 }
920
921 async fn infer_map_type(&self, samples: &[Value]) -> Result<TypeInfo> {
923 let mut key_samples = Vec::new();
924 let mut value_samples = Vec::new();
925
926 for sample in samples {
927 if let Value::Map(map) = sample {
928 for (key, value) in map {
929 key_samples.push(key.clone());
930 value_samples.push(value.clone());
931 }
932 }
933 }
934
935 let key_type = if !key_samples.is_empty() {
936 Box::new(self.infer_column_type(&key_samples).await?)
937 } else {
938 Box::new(TypeInfo {
939 type_id: "text".to_string(),
940 type_params: vec![],
941 is_frozen: false,
942 element_type: None,
943 key_type: None,
944 value_type: None,
945 udt_fields: None,
946 tuple_elements: None,
947 })
948 };
949
950 let value_type = if !value_samples.is_empty() {
951 Box::new(self.infer_column_type(&value_samples).await?)
952 } else {
953 Box::new(TypeInfo {
954 type_id: "text".to_string(),
955 type_params: vec![],
956 is_frozen: false,
957 element_type: None,
958 key_type: None,
959 value_type: None,
960 udt_fields: None,
961 tuple_elements: None,
962 })
963 };
964
965 Ok(TypeInfo {
966 type_id: "map".to_string(),
967 type_params: vec![key_type.type_id.clone(), value_type.type_id.clone()],
968 is_frozen: false,
969 element_type: None,
970 key_type: Some(key_type),
971 value_type: Some(value_type),
972 udt_fields: None,
973 tuple_elements: None,
974 })
975 }
976
977 async fn infer_tuple_type(&self, samples: &[Value]) -> Result<TypeInfo> {
979 let mut max_elements = 0;
980 let mut element_positions: Vec<Vec<Value>> = Vec::new();
981
982 for sample in samples {
984 if let Value::Tuple(elements) = sample {
985 max_elements = max_elements.max(elements.len());
986
987 while element_positions.len() < elements.len() {
989 element_positions.push(Vec::new());
990 }
991
992 for (i, element) in elements.iter().enumerate() {
994 element_positions[i].push(element.clone());
995 }
996 }
997 }
998
999 let mut tuple_elements = Vec::new();
1001 for position_samples in element_positions {
1002 if !position_samples.is_empty() {
1003 let element_type = self.infer_column_type(&position_samples).await?;
1004 tuple_elements.push(element_type);
1005 }
1006 }
1007
1008 let type_params: Vec<String> = tuple_elements.iter().map(|t| t.type_id.clone()).collect();
1009
1010 Ok(TypeInfo {
1011 type_id: "tuple".to_string(),
1012 type_params,
1013 is_frozen: false,
1014 element_type: None,
1015 key_type: None,
1016 value_type: None,
1017 udt_fields: None,
1018 tuple_elements: Some(tuple_elements),
1019 })
1020 }
1021
1022 async fn infer_udt_type(&self, samples: &[Value]) -> Result<TypeInfo> {
1024 let mut field_map: HashMap<String, Vec<Value>> = HashMap::new();
1027
1028 for sample in samples {
1029 if let Value::Udt(udt_value) = sample {
1030 for field in &udt_value.fields {
1031 if let Some(field_value) = &field.value {
1032 field_map
1033 .entry(field.name.clone())
1034 .or_default()
1035 .push(field_value.clone());
1036 }
1037 }
1038 }
1039 }
1040
1041 let mut udt_fields = Vec::new();
1042 for (field_name, field_samples) in field_map {
1043 let field_type = self.infer_column_type(&field_samples).await?;
1044 udt_fields.push(UdtFieldInfo {
1045 name: field_name,
1046 field_type,
1047 nullable: true, });
1049 }
1050
1051 Ok(TypeInfo {
1052 type_id: "udt".to_string(),
1053 type_params: vec![],
1054 is_frozen: false,
1055 element_type: None,
1056 key_type: None,
1057 value_type: None,
1058 udt_fields: Some(udt_fields),
1059 tuple_elements: None,
1060 })
1061 }
1062}
1063
1064#[derive(Debug)]
1066pub struct SchemaValidator {
1067 }
1069
1070impl SchemaValidator {
1071 fn new() -> Self {
1072 Self {}
1073 }
1074}
1075
1076#[derive(Debug)]
1078pub struct SchemaExporter {
1079 }
1081
1082impl SchemaExporter {
1083 fn new() -> Self {
1084 Self {}
1085 }
1086
1087 async fn generate_cql(&self, schema: &SchemaInfo) -> Result<String> {
1089 let mut cql = String::new();
1090
1091 cql.push_str(&format!(
1093 "CREATE TABLE {}.{} (\n",
1094 schema.keyspace, schema.table
1095 ));
1096
1097 for column in &schema.partition_key {
1099 cql.push_str(&format!(
1100 " {} {},\n",
1101 column.name,
1102 self.format_column_type(&column.type_info)
1103 ));
1104 }
1105
1106 for clustering in &schema.clustering_keys {
1108 cql.push_str(&format!(
1109 " {} {},\n",
1110 clustering.name, clustering.data_type
1111 ));
1112 }
1113
1114 for column in &schema.static_columns {
1116 let static_modifier = if column.is_static { " STATIC" } else { "" };
1117 cql.push_str(&format!(
1118 " {} {}{},\n",
1119 column.name,
1120 self.format_column_type(&column.type_info),
1121 static_modifier
1122 ));
1123 }
1124
1125 for column in &schema.regular_columns {
1127 cql.push_str(&format!(
1128 " {} {},\n",
1129 column.name,
1130 self.format_column_type(&column.type_info)
1131 ));
1132 }
1133
1134 if !schema.partition_key.is_empty() {
1136 let partition_keys: Vec<String> = schema
1137 .partition_key
1138 .iter()
1139 .map(|col| col.name.clone())
1140 .collect();
1141
1142 if schema.clustering_keys.is_empty() {
1143 cql.push_str(&format!(" PRIMARY KEY ({})", partition_keys.join(", ")));
1144 } else {
1145 let clustering_keys: Vec<String> = schema
1146 .clustering_keys
1147 .iter()
1148 .map(|col| col.name.clone())
1149 .collect();
1150
1151 if partition_keys.len() == 1 {
1152 cql.push_str(&format!(
1153 " PRIMARY KEY ({}, {})",
1154 partition_keys[0],
1155 clustering_keys.join(", ")
1156 ));
1157 } else {
1158 cql.push_str(&format!(
1159 " PRIMARY KEY (({}) {})",
1160 partition_keys.join(", "),
1161 clustering_keys.join(", ")
1162 ));
1163 }
1164 }
1165 }
1166
1167 cql.push_str("\n);");
1168
1169 if !schema.clustering_keys.is_empty() {
1171 let mut clustering_order = Vec::new();
1172 for clustering in &schema.clustering_keys {
1173 let order = match clustering.order {
1174 crate::schema::ClusteringOrder::Asc => "ASC",
1175 crate::schema::ClusteringOrder::Desc => "DESC",
1176 };
1177 clustering_order.push(format!("{} {}", clustering.name, order));
1178 }
1179
1180 if clustering_order.iter().any(|o| o.contains("DESC")) {
1181 cql.push_str(&format!(
1182 "\nWITH CLUSTERING ORDER BY ({});",
1183 clustering_order.join(", ")
1184 ));
1185 }
1186 }
1187
1188 let mut options = Vec::new();
1190
1191 if let Some(compaction) = &schema.table_options.compaction {
1192 options.push(format!("compaction = {{'class': '{}'}}", compaction.class));
1193 }
1194
1195 if let Some(compression) = &schema.table_options.compression {
1196 options.push(format!(
1197 "compression = {{'algorithm': '{}'}}",
1198 compression.algorithm
1199 ));
1200 }
1201
1202 if let Some(gc_grace) = schema.table_options.gc_grace_seconds {
1203 options.push(format!("gc_grace_seconds = {}", gc_grace));
1204 }
1205
1206 if let Some(ttl) = schema.table_options.default_time_to_live {
1207 options.push(format!("default_time_to_live = {}", ttl));
1208 }
1209
1210 if !options.is_empty() {
1211 if !cql.ends_with(';') {
1212 cql.push_str(" WITH ");
1213 } else {
1214 cql.pop(); cql.push_str("\nWITH ");
1216 }
1217 cql.push_str(&options.join("\n AND "));
1218 cql.push(';');
1219 }
1220
1221 Ok(cql)
1222 }
1223
1224 #[allow(clippy::only_used_in_recursion)]
1226 fn format_column_type(&self, type_info: &TypeInfo) -> String {
1227 match type_info.type_id.as_str() {
1228 "list" => {
1229 if let Some(element_type) = &type_info.element_type {
1230 format!("list<{}>", self.format_column_type(element_type))
1231 } else {
1232 "list<text>".to_string()
1233 }
1234 }
1235 "set" => {
1236 if let Some(element_type) = &type_info.element_type {
1237 format!("set<{}>", self.format_column_type(element_type))
1238 } else {
1239 "set<text>".to_string()
1240 }
1241 }
1242 "map" => {
1243 if let (Some(key_type), Some(value_type)) =
1244 (&type_info.key_type, &type_info.value_type)
1245 {
1246 format!(
1247 "map<{}, {}>",
1248 self.format_column_type(key_type),
1249 self.format_column_type(value_type)
1250 )
1251 } else {
1252 "map<text, text>".to_string()
1253 }
1254 }
1255 "tuple" => {
1256 if let Some(tuple_elements) = &type_info.tuple_elements {
1257 let element_types: Vec<String> = tuple_elements
1258 .iter()
1259 .map(|t| self.format_column_type(t))
1260 .collect();
1261 format!("tuple<{}>", element_types.join(", "))
1262 } else {
1263 "tuple<text>".to_string()
1264 }
1265 }
1266 "frozen" => {
1267 if let Some(element_type) = &type_info.element_type {
1268 format!("frozen<{}>", self.format_column_type(element_type))
1269 } else {
1270 "frozen<text>".to_string()
1271 }
1272 }
1273 _ => type_info.type_id.clone(),
1274 }
1275 }
1276
1277 #[cfg(feature = "experimental")]
1279 async fn export_json(&self, schema: &SchemaInfo) -> Result<String> {
1280 self.export_json_with_config(
1281 schema,
1282 &crate::schema::json_exporter::JsonExportConfig::default(),
1283 )
1284 .await
1285 }
1286
1287 #[cfg(feature = "experimental")]
1289 async fn export_json_with_config(
1290 &self,
1291 schema: &SchemaInfo,
1292 config: &crate::schema::json_exporter::JsonExportConfig,
1293 ) -> Result<String> {
1294 let exporter = crate::schema::json_exporter::JsonExporter::with_config(config.clone());
1295 exporter.export_schema_info(schema)
1296 }
1297
1298 #[cfg(not(feature = "experimental"))]
1299 #[allow(dead_code)]
1300 async fn export_json(&self, _schema: &SchemaInfo) -> Result<String> {
1301 Err(crate::error::Error::unsupported_format(
1302 "JSON export requires experimental feature",
1303 ))
1304 }
1305
1306 #[cfg(not(feature = "experimental"))]
1307 #[allow(dead_code)]
1308 async fn export_json_with_config<T>(
1309 &self,
1310 _schema: &SchemaInfo,
1311 _config: &T, ) -> Result<String> {
1313 Err(crate::error::Error::unsupported_format(
1314 "JSON export requires experimental feature",
1315 ))
1316 }
1317
1318 #[allow(dead_code)]
1320 #[cfg(feature = "experimental")]
1321 async fn export_json_compact(&self, schema: &SchemaInfo) -> Result<String> {
1322 let config = crate::schema::json_exporter::JsonExportConfig {
1323 format_variant: crate::schema::json_exporter::JsonFormat::Compact,
1324 include_metadata: false,
1325 include_performance_metrics: false,
1326 include_type_details: false,
1327 pretty_format: false,
1328 ..Default::default()
1329 };
1330 self.export_json_with_config(schema, &config).await
1331 }
1332
1333 #[allow(dead_code)]
1335 #[cfg(feature = "experimental")]
1336 async fn export_json_openapi(&self, schema: &SchemaInfo) -> Result<String> {
1337 let config = crate::schema::json_exporter::JsonExportConfig {
1338 format_variant: crate::schema::json_exporter::JsonFormat::OpenApi,
1339 include_documentation: true,
1340 include_type_details: true,
1341 include_metadata: false,
1342 ..Default::default()
1343 };
1344 self.export_json_with_config(schema, &config).await
1345 }
1346
1347 #[allow(dead_code)]
1349 #[cfg(feature = "experimental")]
1350 async fn export_json_pipeline(&self, schema: &SchemaInfo) -> Result<String> {
1351 let config = crate::schema::json_exporter::JsonExportConfig {
1352 format_variant: crate::schema::json_exporter::JsonFormat::DataPipeline,
1353 include_type_details: true,
1354 include_table_options: false,
1355 include_performance_metrics: true,
1356 ..Default::default()
1357 };
1358 self.export_json_with_config(schema, &config).await
1359 }
1360
1361 async fn generate_comparison_report(
1363 &self,
1364 _schema1: &SchemaInfo,
1365 _schema2: &SchemaInfo,
1366 ) -> Result<String> {
1367 Ok(
1371 "Schema comparison not yet implemented. Both schemas analyzed as equivalent."
1372 .to_string(),
1373 )
1374 }
1375}
1376
1377impl SchemaDiscoveryEngine {
1379 async fn extract_header_metadata(&self, context: &mut DiscoveryContext) -> Result<()> {
1380 for file_path in &context.source_files.clone() {
1382 match self.parse_sstable_header(file_path).await {
1383 Ok(header) => {
1384 context.headers.push(header.clone());
1385
1386 if context.cassandra_version.is_none() {
1388 context.cassandra_version = Some(header.cassandra_version);
1389 }
1390 }
1391 Err(e) => {
1392 log::warn!("Failed to parse header from {:?}: {}", file_path, e);
1394 }
1395 }
1396 }
1397
1398 Ok(())
1399 }
1400
1401 async fn parse_sstable_header(&self, file_path: &std::path::Path) -> Result<SSTableHeader> {
1403 use crate::storage::sstable::reader::SSTableReader;
1404
1405 let reader =
1406 SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?;
1407 Ok(reader.header().clone())
1408 }
1409
1410 async fn sample_data_for_inference(&self, context: &mut DiscoveryContext) -> Result<()> {
1411 let mut total_sampled = 0;
1412
1413 for file_path in &context.source_files.clone() {
1414 if total_sampled >= self.config.max_sample_rows {
1415 break;
1416 }
1417
1418 match self
1419 .sample_sstable_data(file_path, self.config.max_sample_rows - total_sampled)
1420 .await
1421 {
1422 Ok(samples) => {
1423 total_sampled += samples.len();
1424
1425 for row in samples {
1427 for (column_name, value) in row {
1428 context
1429 .column_samples
1430 .entry(column_name)
1431 .or_default()
1432 .push(value);
1433 }
1434 }
1435 }
1436 Err(e) => {
1437 log::warn!("Failed to sample data from {:?}: {}", file_path, e);
1438 }
1439 }
1440 }
1441
1442 context.total_rows_sampled = total_sampled;
1443 Ok(())
1444 }
1445
1446 async fn sample_sstable_data(
1448 &self,
1449 file_path: &std::path::Path,
1450 max_rows: usize,
1451 ) -> Result<Vec<HashMap<String, Value>>> {
1452 use crate::storage::sstable::reader::SSTableReader;
1453
1454 let reader =
1455 SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await?;
1456 let header = reader.header();
1457 let column_names: Vec<String> = header.columns.iter().map(|col| col.name.clone()).collect();
1458
1459 let all_entries = reader.get_all_entries().await?;
1461
1462 let samples: Vec<HashMap<String, Value>> = all_entries
1469 .into_iter()
1470 .take(max_rows)
1471 .filter_map(|(_table_id, _row_key, value)| {
1472 let mut row_data = HashMap::new();
1473
1474 if !column_names.is_empty() {
1477 row_data.insert(column_names[0].clone(), value);
1478 } else {
1479 return None;
1481 }
1482
1483 Some(row_data)
1484 })
1485 .collect();
1486
1487 Ok(samples)
1488 }
1489
1490 async fn discover_udts(&self, context: &mut DiscoveryContext) -> Result<()> {
1491 for header in &context.headers {
1493 for column_def in &header.columns {
1494 if self.is_udt_type(&column_def.column_type) {
1496 let udt_name = self.extract_udt_name(&column_def.column_type);
1497
1498 if !context.discovered_udts.contains_key(&udt_name) {
1499 let udt_def = UDTDefinition {
1501 name: udt_name.clone(),
1502 keyspace: context.keyspace.clone(),
1503 fields: self.parse_udt_fields(&column_def.column_type),
1504 version: Some(1),
1505 };
1506
1507 context.discovered_udts.insert(udt_name, udt_def);
1508 }
1509 }
1510 }
1511 }
1512
1513 for (column_name, samples) in &context.column_samples {
1515 for sample in samples {
1516 if let Value::Udt(udt_map) = sample {
1517 let udt_name = format!("{}_udt", column_name); if !context.discovered_udts.contains_key(&udt_name) {
1521 let mut fields = Vec::new();
1522
1523 for (position, field) in udt_map.fields.iter().enumerate() {
1524 let field_def = UdtFieldDefinition {
1525 name: field.name.clone(),
1526 field_type: "text".to_string(), position,
1528 nullable: true,
1529 };
1530 fields.push(field_def);
1531 }
1532
1533 let udt_def = UDTDefinition {
1534 name: udt_name.clone(),
1535 keyspace: context.keyspace.clone(),
1536 fields,
1537 version: Some(1),
1538 };
1539
1540 context.discovered_udts.insert(udt_name, udt_def);
1541 }
1542 }
1543 }
1544 }
1545
1546 Ok(())
1547 }
1548
1549 fn is_udt_type(&self, type_str: &str) -> bool {
1551 !matches!(
1553 type_str.to_lowercase().as_str(),
1554 "text"
1555 | "varchar"
1556 | "ascii"
1557 | "int"
1558 | "bigint"
1559 | "smallint"
1560 | "tinyint"
1561 | "float"
1562 | "double"
1563 | "boolean"
1564 | "timestamp"
1565 | "date"
1566 | "time"
1567 | "uuid"
1568 | "timeuuid"
1569 | "blob"
1570 | "varint"
1571 | "decimal"
1572 | "duration"
1573 | "inet"
1574 | "counter"
1575 ) && !type_str.starts_with("list<")
1576 && !type_str.starts_with("set<")
1577 && !type_str.starts_with("map<")
1578 && !type_str.starts_with("tuple<")
1579 && !type_str.starts_with("frozen<")
1580 }
1581
1582 fn extract_udt_name(&self, type_str: &str) -> String {
1584 type_str.split('<').next().unwrap_or(type_str).to_string()
1586 }
1587
1588 fn parse_udt_fields(&self, _type_str: &str) -> Vec<UdtFieldDefinition> {
1590 vec![UdtFieldDefinition {
1593 name: "field".to_string(),
1594 field_type: "text".to_string(),
1595 position: 0,
1596 nullable: true,
1597 }]
1598 }
1599
1600 async fn analyze_collection_types(&self, context: &mut DiscoveryContext) -> Result<()> {
1601 for header in &context.headers {
1603 for column_def in &header.columns {
1604 if let Some(collection_type) = self.parse_collection_type(&column_def.column_type) {
1605 context
1606 .collection_types
1607 .insert(column_def.name.clone(), collection_type);
1608 }
1609 }
1610 }
1611
1612 for (column_name, samples) in &context.column_samples {
1614 let mut detected_collections = Vec::new();
1615
1616 for sample in samples {
1617 match sample {
1618 Value::List(elements) => {
1619 let element_type = if !elements.is_empty() {
1620 self.infer_element_type(elements)
1621 } else {
1622 "text".to_string()
1623 };
1624
1625 detected_collections.push(CollectionType {
1626 kind: CollectionKind::List,
1627 element_type: Some(element_type),
1628 key_type: None,
1629 value_type: None,
1630 is_frozen: false,
1631 });
1632 }
1633 Value::Set(elements) => {
1634 let element_type = if !elements.is_empty() {
1635 self.infer_element_type(elements)
1636 } else {
1637 "text".to_string()
1638 };
1639
1640 detected_collections.push(CollectionType {
1641 kind: CollectionKind::Set,
1642 element_type: Some(element_type),
1643 key_type: None,
1644 value_type: None,
1645 is_frozen: false,
1646 });
1647 }
1648 Value::Map(map) => {
1649 let (key_type, value_type) = if !map.is_empty() {
1650 let keys: Vec<Value> = map.iter().map(|(k, _)| k.clone()).collect();
1651 let values: Vec<Value> = map.iter().map(|(_, v)| v.clone()).collect();
1652 (
1653 self.infer_element_type(&keys),
1654 self.infer_element_type(&values),
1655 )
1656 } else {
1657 ("text".to_string(), "text".to_string())
1658 };
1659
1660 detected_collections.push(CollectionType {
1661 kind: CollectionKind::Map,
1662 element_type: None,
1663 key_type: Some(key_type),
1664 value_type: Some(value_type),
1665 is_frozen: false,
1666 });
1667 }
1668 Value::Tuple(_) => {
1669 detected_collections.push(CollectionType {
1670 kind: CollectionKind::Tuple,
1671 element_type: None,
1672 key_type: None,
1673 value_type: None,
1674 is_frozen: false,
1675 });
1676 }
1677 _ => {}
1678 }
1679 }
1680
1681 if let Some(collection_type) =
1683 self.select_most_common_collection_type(detected_collections)
1684 {
1685 context
1686 .collection_types
1687 .insert(column_name.clone(), collection_type);
1688 }
1689 }
1690
1691 Ok(())
1692 }
1693
1694 fn parse_collection_type(&self, type_str: &str) -> Option<CollectionType> {
1696 let lower_type = type_str.to_lowercase();
1697
1698 if lower_type.starts_with("list<") {
1699 let element_type = self.extract_inner_type(type_str, "list<");
1700 Some(CollectionType {
1701 kind: CollectionKind::List,
1702 element_type: Some(element_type),
1703 key_type: None,
1704 value_type: None,
1705 is_frozen: false,
1706 })
1707 } else if lower_type.starts_with("set<") {
1708 let element_type = self.extract_inner_type(type_str, "set<");
1709 Some(CollectionType {
1710 kind: CollectionKind::Set,
1711 element_type: Some(element_type),
1712 key_type: None,
1713 value_type: None,
1714 is_frozen: false,
1715 })
1716 } else if lower_type.starts_with("map<") {
1717 let (key_type, value_type) = self.extract_map_types(type_str);
1718 Some(CollectionType {
1719 kind: CollectionKind::Map,
1720 element_type: None,
1721 key_type: Some(key_type),
1722 value_type: Some(value_type),
1723 is_frozen: false,
1724 })
1725 } else if lower_type.starts_with("tuple<") {
1726 Some(CollectionType {
1727 kind: CollectionKind::Tuple,
1728 element_type: None,
1729 key_type: None,
1730 value_type: None,
1731 is_frozen: false,
1732 })
1733 } else if lower_type.starts_with("frozen<") {
1734 if let Some(mut inner_collection) =
1736 self.parse_collection_type(&self.extract_inner_type(type_str, "frozen<"))
1737 {
1738 inner_collection.is_frozen = true;
1739 Some(inner_collection)
1740 } else {
1741 None
1742 }
1743 } else {
1744 None
1745 }
1746 }
1747
1748 fn extract_inner_type(&self, type_str: &str, _prefix: &str) -> String {
1750 if let Some(start) = type_str.find('<') {
1751 if let Some(end) = type_str.rfind('>') {
1752 return type_str[start + 1..end].to_string();
1753 }
1754 }
1755 "text".to_string()
1756 }
1757
1758 fn extract_map_types(&self, type_str: &str) -> (String, String) {
1760 if let Some(start) = type_str.find('<') {
1761 if let Some(end) = type_str.rfind('>') {
1762 let inner = &type_str[start + 1..end];
1763 if let Some(comma_pos) = inner.find(',') {
1764 let key_type = inner[..comma_pos].trim().to_string();
1765 let value_type = inner[comma_pos + 1..].trim().to_string();
1766 return (key_type, value_type);
1767 }
1768 }
1769 }
1770 ("text".to_string(), "text".to_string())
1771 }
1772
1773 fn infer_element_type(&self, elements: &[Value]) -> String {
1775 if elements.is_empty() {
1776 return "text".to_string();
1777 }
1778
1779 let mut type_counts = HashMap::new();
1781 for element in elements {
1782 let type_name = match element {
1783 Value::Text(_) => "text",
1784 Value::Integer(_) => "int",
1785 Value::BigInt(_) => "bigint",
1786 Value::Float(_) => "double",
1787 Value::Boolean(_) => "boolean",
1788 Value::Uuid(_) => "uuid",
1789 Value::Timestamp(_) => "timestamp",
1790 Value::Blob(_) => "blob",
1791 _ => "text",
1792 };
1793 *type_counts.entry(type_name).or_insert(0) += 1;
1794 }
1795
1796 type_counts
1798 .into_iter()
1799 .max_by_key(|(_, count)| *count)
1800 .map(|(type_name, _)| type_name.to_string())
1801 .unwrap_or_else(|| "text".to_string())
1802 }
1803
1804 fn select_most_common_collection_type(
1806 &self,
1807 mut types: Vec<CollectionType>,
1808 ) -> Option<CollectionType> {
1809 if types.is_empty() {
1810 return None;
1811 }
1812
1813 types.sort_by(|a, b| format!("{:?}", a.kind).cmp(&format!("{:?}", b.kind)));
1816 types.into_iter().next()
1817 }
1818
1819 async fn discover_indexes(&self, context: &mut DiscoveryContext) -> Result<()> {
1820 for header in &context.headers {
1822 if let Some(index_info) = self.extract_index_info_from_header(header) {
1824 context.indexes.extend(index_info);
1825 }
1826 }
1827
1828 for (column_name, samples) in &context.column_samples {
1830 if self.should_suggest_index(column_name, samples) {
1832 let index_def = IndexDefinition {
1833 name: format!("{}_idx", column_name),
1834 target_column: column_name.clone(),
1835 index_type: IndexType::Secondary,
1836 options: HashMap::new(),
1837 };
1838
1839 if !context
1841 .indexes
1842 .iter()
1843 .any(|idx| idx.target_column == *column_name)
1844 {
1845 context.indexes.push(index_def);
1846 }
1847 }
1848 }
1849
1850 Ok(())
1851 }
1852
1853 fn extract_index_info_from_header(
1855 &self,
1856 _header: &SSTableHeader,
1857 ) -> Option<Vec<IndexDefinition>> {
1858 None
1862 }
1863
1864 fn should_suggest_index(&self, column_name: &str, samples: &[Value]) -> bool {
1866 if column_name.to_lowercase().contains("id") {
1870 return true;
1871 }
1872
1873 let unique_values: std::collections::HashSet<_> = samples.iter().collect();
1875 let cardinality_ratio = unique_values.len() as f64 / samples.len() as f64;
1876 if cardinality_ratio > 0.7 && samples.len() > 10 {
1877 return true;
1878 }
1879
1880 if column_name.to_lowercase().ends_with("_id")
1882 || column_name.to_lowercase().ends_with("_ref")
1883 || column_name.to_lowercase().contains("email")
1884 || column_name.to_lowercase().contains("username")
1885 {
1886 return true;
1887 }
1888
1889 if samples.iter().any(|v| matches!(v, Value::Uuid(_))) {
1891 return true;
1892 }
1893
1894 false
1895 }
1896
1897 async fn build_schema_info(&self, context: &mut DiscoveryContext) -> Result<SchemaInfo> {
1898 let mut partition_key = Vec::new();
1899 let mut clustering_keys = Vec::new();
1900 let mut regular_columns = Vec::new();
1901 let mut static_columns = Vec::new();
1902
1903 let mut position = 0;
1905 for header in &context.headers {
1906 for column_def in &header.columns {
1907 let samples = context
1909 .column_samples
1910 .get(&column_def.name)
1911 .cloned()
1912 .unwrap_or_default();
1913 let type_info = self
1914 .type_inference
1915 .infer_column_type(&samples)
1916 .await
1917 .unwrap_or_else(|_| TypeInfo {
1918 type_id: column_def.column_type.clone(),
1919 type_params: vec![],
1920 is_frozen: false,
1921 element_type: None,
1922 key_type: None,
1923 value_type: None,
1924 udt_fields: None,
1925 tuple_elements: None,
1926 });
1927
1928 let confidence = self.calculate_type_confidence(&samples, &type_info);
1929
1930 let column = ColumnDefinition {
1931 name: column_def.name.clone(),
1932 data_type: column_def.column_type.clone(),
1933 type_info,
1934 nullable: true, is_static: false, default_value: None,
1937 position,
1938 confidence,
1939 };
1940
1941 if self.is_partition_key_column(&column_def.name, position) {
1943 partition_key.push(column);
1944 } else if self.is_clustering_column(&column_def.name, position) {
1945 clustering_keys.push(ClusteringColumn {
1946 name: column_def.name.clone(),
1947 data_type: column_def.column_type.clone(),
1948 position: clustering_keys.len(), order: crate::schema::ClusteringOrder::Asc, });
1951 } else if column.is_static {
1952 static_columns.push(column);
1953 } else {
1954 regular_columns.push(column);
1955 }
1956
1957 position += 1;
1958 }
1959 }
1960
1961 if partition_key.is_empty() && !regular_columns.is_empty() {
1963 let first_column = regular_columns.remove(0);
1964 partition_key.push(first_column);
1965 }
1966
1967 let validation_results = ValidationResults {
1969 status: self.determine_validation_status(&partition_key, ®ular_columns),
1970 errors: Vec::new(), warnings: self.generate_validation_warnings(&partition_key, ®ular_columns),
1972 consistency_results: ConsistencyResults {
1973 files_analyzed: context.source_files.len(),
1974 schema_mismatches: 0,
1975 type_inconsistencies: Vec::new(),
1976 udt_conflicts: Vec::new(),
1977 },
1978 };
1979
1980 Ok(SchemaInfo {
1981 keyspace: context.keyspace.clone(),
1982 table: context.table.clone(),
1983 partition_key,
1984 clustering_keys,
1985 regular_columns,
1986 static_columns,
1987 collection_types: context.collection_types.clone(),
1988 user_defined_types: context.discovered_udts.values().cloned().collect(),
1989 indexes: context.indexes.clone(),
1990 table_options: context.table_options.clone(),
1991 metadata: SchemaMetadata {
1992 discovered_at: std::time::SystemTime::now(),
1993 source_files: context.source_files.clone(),
1994 total_rows_sampled: context.total_rows_sampled,
1995 cassandra_version: context.cassandra_version,
1996 discovery_method: DiscoveryMethod::Hybrid,
1997 version: 1,
1998 validation_results,
1999 performance_metrics: DiscoveryMetrics {
2000 total_time_ms: 0, header_parsing_time_ms: 0,
2002 data_sampling_time_ms: 0,
2003 type_inference_time_ms: 0,
2004 validation_time_ms: 0,
2005 peak_memory_usage_bytes: 0,
2006 },
2007 },
2008 })
2009 }
2010
2011 fn calculate_type_confidence(&self, samples: &[Value], type_info: &TypeInfo) -> f64 {
2013 if samples.is_empty() {
2014 return 0.0;
2015 }
2016
2017 let matching_samples = samples
2018 .iter()
2019 .filter(|sample| self.value_matches_type(sample, type_info))
2020 .count();
2021
2022 matching_samples as f64 / samples.len() as f64
2023 }
2024
2025 fn value_matches_type(&self, value: &Value, type_info: &TypeInfo) -> bool {
2027 #[allow(clippy::match_like_matches_macro)]
2028 match (value, type_info.type_id.as_str()) {
2029 (Value::Text(_), "text") => true,
2030 (Value::Integer(_), "int") => true,
2031 (Value::BigInt(_), "bigint") => true,
2032 (Value::Float(_), "double") => true,
2033 (Value::Boolean(_), "boolean") => true,
2034 (Value::Uuid(_), "uuid") => true,
2035 (Value::Timestamp(_), "timestamp") => true,
2036 (Value::Blob(_), "blob") => true,
2037 (Value::List(_), "list") => true,
2038 (Value::Set(_), "set") => true,
2039 (Value::Map(_), "map") => true,
2040 (Value::Tuple(_), "tuple") => true,
2041 (Value::Udt(_), "udt") => true,
2042 _ => false,
2043 }
2044 }
2045
2046 fn is_partition_key_column(&self, column_name: &str, position: usize) -> bool {
2048 position == 0
2050 || column_name.to_lowercase().contains("key")
2051 || column_name.to_lowercase() == "id"
2052 || column_name.to_lowercase().ends_with("_id")
2053 }
2054
2055 fn is_clustering_column(&self, column_name: &str, position: usize) -> bool {
2057 (position == 1 && !self.is_partition_key_column(column_name, position))
2059 || column_name.to_lowercase().contains("time")
2060 || column_name.to_lowercase().contains("date")
2061 || column_name.to_lowercase().contains("order")
2062 }
2063
2064 fn determine_validation_status(
2066 &self,
2067 partition_key: &[ColumnDefinition],
2068 regular_columns: &[ColumnDefinition],
2069 ) -> ValidationStatus {
2070 if partition_key.is_empty() {
2072 return ValidationStatus::Invalid;
2073 }
2074
2075 let all_columns: Vec<_> = partition_key.iter().chain(regular_columns.iter()).collect();
2077 let low_confidence_count = all_columns
2078 .iter()
2079 .filter(|col| col.confidence < 0.7)
2080 .count();
2081
2082 if low_confidence_count > all_columns.len() / 2 {
2083 ValidationStatus::Invalid
2084 } else if low_confidence_count > 0 {
2085 ValidationStatus::ValidWithWarnings
2086 } else {
2087 ValidationStatus::Valid
2088 }
2089 }
2090
2091 fn generate_validation_warnings(
2093 &self,
2094 _partition_key: &[ColumnDefinition],
2095 regular_columns: &[ColumnDefinition],
2096 ) -> Vec<ValidationWarning> {
2097 let mut warnings = Vec::new();
2098
2099 for column in regular_columns {
2101 if column.confidence < 0.7 {
2102 warnings.push(ValidationWarning {
2103 warning_type: ValidationWarningType::LowConfidence,
2104 message: format!(
2105 "Low confidence type inference for column '{}': {:.2}",
2106 column.name, column.confidence
2107 ),
2108 component: Some(column.name.clone()),
2109 });
2110 }
2111 }
2112
2113 warnings
2114 }
2115
2116 fn add_performance_metrics(
2117 &self,
2118 mut schema: SchemaInfo,
2119 discovery_time: Duration,
2120 _context: &DiscoveryContext,
2121 ) -> SchemaInfo {
2122 schema.metadata.performance_metrics = DiscoveryMetrics {
2123 total_time_ms: discovery_time.as_millis() as u64,
2124 header_parsing_time_ms: 0, data_sampling_time_ms: 0,
2126 type_inference_time_ms: 0,
2127 validation_time_ms: 0,
2128 peak_memory_usage_bytes: 0, };
2130 schema
2131 }
2132}
2133
2134#[cfg(test)]
2135mod tests {
2136 use super::*;
2137
2138 #[tokio::test]
2139 async fn test_schema_discovery_engine_creation() {
2140 let config = SchemaDiscoveryConfig::default();
2141 let core_config = Config::default();
2142 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
2143
2144 let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
2145 .await
2146 .unwrap();
2147
2148 assert!(engine.schema_cache.read().await.is_empty());
2150 }
2151
2152 #[test]
2153 fn test_discovery_context_creation() {
2154 let files = vec![PathBuf::from("test.sst")];
2155 let context = DiscoveryContext::new("test_ks", "test_table", &files);
2156
2157 assert_eq!(context.keyspace, "test_ks");
2158 assert_eq!(context.table, "test_table");
2159 assert_eq!(context.source_files.len(), 1);
2160 }
2161
2162 #[test]
2163 fn test_schema_info_serialization() {
2164 let schema_info = SchemaInfo {
2165 keyspace: "test".to_string(),
2166 table: "users".to_string(),
2167 partition_key: Vec::new(),
2168 clustering_keys: Vec::new(),
2169 regular_columns: Vec::new(),
2170 static_columns: Vec::new(),
2171 collection_types: HashMap::new(),
2172 user_defined_types: Vec::new(),
2173 indexes: Vec::new(),
2174 table_options: TableOptions {
2175 compaction: None,
2176 compression: None,
2177 caching: None,
2178 bloom_filter_fp_chance: None,
2179 gc_grace_seconds: None,
2180 default_time_to_live: None,
2181 memtable_flush_period_in_ms: None,
2182 additional_properties: HashMap::new(),
2183 },
2184 metadata: SchemaMetadata {
2185 discovered_at: std::time::UNIX_EPOCH,
2186 source_files: Vec::new(),
2187 total_rows_sampled: 0,
2188 cassandra_version: None,
2189 discovery_method: DiscoveryMethod::HeaderMetadata,
2190 version: 1,
2191 validation_results: ValidationResults {
2192 status: ValidationStatus::Valid,
2193 errors: Vec::new(),
2194 warnings: Vec::new(),
2195 consistency_results: ConsistencyResults {
2196 files_analyzed: 0,
2197 schema_mismatches: 0,
2198 type_inconsistencies: Vec::new(),
2199 udt_conflicts: Vec::new(),
2200 },
2201 },
2202 performance_metrics: DiscoveryMetrics {
2203 total_time_ms: 0,
2204 header_parsing_time_ms: 0,
2205 data_sampling_time_ms: 0,
2206 type_inference_time_ms: 0,
2207 validation_time_ms: 0,
2208 peak_memory_usage_bytes: 0,
2209 },
2210 },
2211 };
2212
2213 let json = serde_json::to_string(&schema_info).unwrap();
2215 let deserialized: SchemaInfo = serde_json::from_str(&json).unwrap();
2216 assert_eq!(deserialized.keyspace, "test");
2217 assert_eq!(deserialized.table, "users");
2218 }
2219
2220 #[tokio::test]
2221 async fn test_extract_header_metadata_stub() {
2222 let config = SchemaDiscoveryConfig::default();
2223 let core_config = Config::default();
2224 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
2225
2226 let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
2227 .await
2228 .unwrap();
2229
2230 let mut context = DiscoveryContext::new("test_ks", "test_table", &[]);
2231
2232 let result = engine.extract_header_metadata(&mut context).await;
2234 assert!(
2235 result.is_ok(),
2236 "extract_header_metadata stub should return Ok(())"
2237 );
2238 }
2239
2240 #[tokio::test]
2241 async fn test_sample_data_for_inference_stub() {
2242 let config = SchemaDiscoveryConfig::default();
2243 let core_config = Config::default();
2244 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
2245
2246 let engine = SchemaDiscoveryEngine::new(config, platform, core_config)
2247 .await
2248 .unwrap();
2249
2250 let mut context = DiscoveryContext::new("test_ks", "test_table", &[]);
2251
2252 let result = engine.sample_data_for_inference(&mut context).await;
2254 assert!(
2255 result.is_ok(),
2256 "sample_data_for_inference stub should return Ok(())"
2257 );
2258 }
2259
2260 #[tokio::test]
2261 async fn test_generate_comparison_report_stub() {
2262 let exporter = SchemaExporter::new();
2263
2264 let schema_info = SchemaInfo {
2265 keyspace: "test".to_string(),
2266 table: "users".to_string(),
2267 partition_key: Vec::new(),
2268 clustering_keys: Vec::new(),
2269 regular_columns: Vec::new(),
2270 static_columns: Vec::new(),
2271 collection_types: HashMap::new(),
2272 user_defined_types: Vec::new(),
2273 indexes: Vec::new(),
2274 table_options: TableOptions {
2275 compaction: None,
2276 compression: None,
2277 caching: None,
2278 bloom_filter_fp_chance: None,
2279 gc_grace_seconds: None,
2280 default_time_to_live: None,
2281 memtable_flush_period_in_ms: None,
2282 additional_properties: HashMap::new(),
2283 },
2284 metadata: SchemaMetadata {
2285 discovered_at: std::time::UNIX_EPOCH,
2286 source_files: Vec::new(),
2287 total_rows_sampled: 0,
2288 cassandra_version: None,
2289 discovery_method: DiscoveryMethod::HeaderMetadata,
2290 version: 1,
2291 validation_results: ValidationResults {
2292 status: ValidationStatus::Valid,
2293 errors: Vec::new(),
2294 warnings: Vec::new(),
2295 consistency_results: ConsistencyResults {
2296 files_analyzed: 0,
2297 schema_mismatches: 0,
2298 type_inconsistencies: Vec::new(),
2299 udt_conflicts: Vec::new(),
2300 },
2301 },
2302 performance_metrics: DiscoveryMetrics {
2303 total_time_ms: 0,
2304 header_parsing_time_ms: 0,
2305 data_sampling_time_ms: 0,
2306 type_inference_time_ms: 0,
2307 validation_time_ms: 0,
2308 peak_memory_usage_bytes: 0,
2309 },
2310 },
2311 };
2312
2313 let result = exporter
2315 .generate_comparison_report(&schema_info, &schema_info)
2316 .await;
2317 assert!(
2318 result.is_ok(),
2319 "generate_comparison_report stub should return Ok"
2320 );
2321
2322 let report = result.unwrap();
2323 assert!(
2324 report.contains("Schema comparison not yet implemented"),
2325 "Report should contain stub message"
2326 );
2327 }
2328}