1use std::collections::{BTreeMap, HashMap};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant, SystemTime};
11
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14
15use crate::{
16 parser::header::{CassandraVersion, ColumnInfo},
17 platform::Platform,
18 schema::{Column, TableSchema},
19 storage::sstable::reader::SSTableReader,
20 types::{DataType, Value},
21 Config, Result,
22};
23
24#[derive(Debug, Clone)]
26pub struct SchemaDiscoveryConfig {
27 pub max_sample_rows: usize,
29 pub aggressive_inference: bool,
31 pub cache_schemas: bool,
33 pub cache_ttl_seconds: u64,
35 pub enable_versioning: bool,
37 pub max_versions: usize,
39}
40
41impl Default for SchemaDiscoveryConfig {
42 fn default() -> Self {
43 Self {
44 max_sample_rows: 1000,
45 aggressive_inference: true,
46 cache_schemas: true,
47 cache_ttl_seconds: 3600, enable_versioning: true,
49 max_versions: 10,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct DiscoveredSchema {
57 pub schema: TableSchema,
59 pub metadata: SchemaMetadata,
61 pub column_stats: HashMap<String, ColumnStatistics>,
63 pub inference_confidence: f64,
65 pub validation_status: ValidationStatus,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct SchemaMetadata {
72 pub discovered_at: SystemTime,
74 pub source_files: Vec<PathBuf>,
76 pub rows_sampled: usize,
78 pub cassandra_version: Option<CassandraVersion>,
80 pub discovery_method: DiscoveryMethod,
82 pub version: u32,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ColumnStatistics {
89 pub name: String,
91 pub inferred_type: String,
93 pub type_confidence: f64,
95 pub null_percentage: f64,
97 pub unique_values: usize,
99 pub avg_size_bytes: f64,
101 pub min_value: Option<Value>,
103 pub max_value: Option<Value>,
104 pub patterns: Vec<String>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub enum ValidationStatus {
111 Valid,
113 WarningsPresent,
115 Invalid,
117 Unknown,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub enum DiscoveryMethod {
124 HeaderMetadata,
126 DataSampling,
128 Hybrid,
130 External,
132}
133
134#[allow(dead_code)]
136pub struct SchemaDiscovery {
137 config: SchemaDiscoveryConfig,
139 platform: Arc<Platform>,
141 core_config: Config,
143 schema_cache: Arc<RwLock<HashMap<String, (DiscoveredSchema, Instant)>>>,
145 type_inference: Arc<TypeInferenceEngine>,
147 validator: Arc<SchemaValidator>,
149}
150
151impl SchemaDiscovery {
152 pub async fn new(
154 config: SchemaDiscoveryConfig,
155 platform: Arc<Platform>,
156 core_config: Config,
157 ) -> Result<Self> {
158 let type_inference = Arc::new(TypeInferenceEngine::new());
159 let validator = Arc::new(SchemaValidator::new());
160
161 Ok(Self {
162 config,
163 platform,
164 core_config,
165 schema_cache: Arc::new(RwLock::new(HashMap::new())),
166 type_inference,
167 validator,
168 })
169 }
170
171 pub async fn discover_table_schema(
173 &self,
174 keyspace: &str,
175 table: &str,
176 sstable_files: &[PathBuf],
177 ) -> Result<DiscoveredSchema> {
178 let cache_key = format!("{}.{}", keyspace, table);
179
180 if self.config.cache_schemas {
182 if let Some(cached) = self.get_cached_schema(&cache_key).await {
183 return Ok(cached);
184 }
185 }
186
187 let discovered = self
189 .perform_schema_discovery(keyspace, table, sstable_files)
190 .await?;
191
192 if self.config.cache_schemas {
194 self.cache_schema(cache_key, discovered.clone()).await;
195 }
196
197 Ok(discovered)
198 }
199
200 async fn perform_schema_discovery(
202 &self,
203 keyspace: &str,
204 table: &str,
205 sstable_files: &[PathBuf],
206 ) -> Result<DiscoveredSchema> {
207 let start_time = SystemTime::now();
208 let mut source_files = Vec::new();
209 let mut all_column_data = HashMap::new();
210 let mut total_rows_sampled = 0;
211 let mut cassandra_version = None;
212
213 for file_path in sstable_files {
215 if let Ok(reader) = self.create_reader(file_path).await {
216 source_files.push(file_path.clone());
217
218 if let Ok(header_schema) = self.extract_schema_from_header(&reader).await {
220 if cassandra_version.is_none() {
221 let header = reader.header();
222 cassandra_version = Some(header.cassandra_version);
223 }
224
225 self.merge_header_schema(&mut all_column_data, header_schema);
227 }
228
229 let sampled_data = self.sample_table_data(&reader).await?;
231 total_rows_sampled += sampled_data.len();
232
233 self.analyze_sampled_data(&mut all_column_data, sampled_data);
235
236 if total_rows_sampled >= self.config.max_sample_rows {
238 break;
239 }
240 }
241 }
242
243 let schema = self
245 .infer_table_schema(keyspace, table, &all_column_data)
246 .await?;
247
248 let column_stats = self.calculate_column_statistics(&all_column_data).await;
250
251 let inference_confidence = self.calculate_inference_confidence(&column_stats);
253
254 let validation_status = self.validator.validate_schema(&schema, &column_stats).await;
256
257 let discovery_method = if source_files.is_empty() {
259 DiscoveryMethod::External
260 } else if all_column_data.values().any(|cd| cd.header_info.is_some()) {
261 if total_rows_sampled > 0 {
262 DiscoveryMethod::Hybrid
263 } else {
264 DiscoveryMethod::HeaderMetadata
265 }
266 } else {
267 DiscoveryMethod::DataSampling
268 };
269
270 let metadata = SchemaMetadata {
271 discovered_at: start_time,
272 source_files,
273 rows_sampled: total_rows_sampled,
274 cassandra_version,
275 discovery_method,
276 version: 1,
277 };
278
279 Ok(DiscoveredSchema {
280 schema,
281 metadata,
282 column_stats,
283 inference_confidence,
284 validation_status,
285 })
286 }
287
288 async fn create_reader(&self, file_path: &Path) -> Result<SSTableReader> {
290 SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await
291 }
292
293 async fn extract_schema_from_header(
295 &self,
296 reader: &SSTableReader,
297 ) -> Result<HashMap<String, ColumnInfo>> {
298 let header = reader.header();
299 let mut columns = HashMap::new();
300
301 for column_def in &header.columns {
302 columns.insert(column_def.name.clone(), column_def.clone());
303 }
304
305 Ok(columns)
306 }
307
308 async fn sample_table_data(
310 &self,
311 reader: &SSTableReader,
312 ) -> Result<Vec<HashMap<String, Value>>> {
313 let header = reader.header();
315 let column_names: Vec<String> = header.columns.iter().map(|col| col.name.clone()).collect();
316
317 let all_entries = reader.get_all_entries().await?;
319
320 let samples: Vec<HashMap<String, Value>> = all_entries
325 .into_iter()
326 .take(self.config.max_sample_rows)
327 .filter_map(|(_table_id, _row_key, value)| {
328 let mut row_data = HashMap::new();
330
331 if !column_names.is_empty() {
332 row_data.insert(column_names[0].clone(), value);
334 Some(row_data)
335 } else {
336 None
338 }
339 })
340 .collect();
341
342 Ok(samples)
343 }
344
345 async fn infer_table_schema(
347 &self,
348 keyspace: &str,
349 table: &str,
350 column_data: &HashMap<String, ColumnData>,
351 ) -> Result<TableSchema> {
352 let mut columns = Vec::new();
353
354 for (name, data) in column_data {
355 let data_type = self.type_inference.infer_column_type(data).await;
356 let column = Column {
357 name: name.clone(),
358 data_type: data_type.to_string(),
359 nullable: true,
360 default: None,
361 is_static: false,
362 };
363 columns.push(column);
364 }
365
366 columns.sort_by(|a, b| a.name.cmp(&b.name));
368
369 Ok(TableSchema {
370 keyspace: keyspace.to_string(),
371 table: table.to_string(),
372 partition_keys: vec![], clustering_keys: vec![],
374 columns,
375 comments: HashMap::new(),
376 })
377 }
378
379 async fn calculate_column_statistics(
381 &self,
382 column_data: &HashMap<String, ColumnData>,
383 ) -> HashMap<String, ColumnStatistics> {
384 let mut stats = HashMap::new();
385
386 for (name, data) in column_data {
387 let stat = ColumnStatistics {
388 name: name.clone(),
389 inferred_type: self
390 .type_inference
391 .infer_column_type(data)
392 .await
393 .to_string(),
394 type_confidence: data.calculate_type_confidence(),
395 null_percentage: data.calculate_null_percentage(),
396 unique_values: data.unique_values.len(),
397 avg_size_bytes: data.calculate_average_size(),
398 min_value: data.min_value.clone(),
399 max_value: data.max_value.clone(),
400 patterns: data.detected_patterns.clone(),
401 };
402 stats.insert(name.clone(), stat);
403 }
404
405 stats
406 }
407
408 fn calculate_inference_confidence(
410 &self,
411 column_stats: &HashMap<String, ColumnStatistics>,
412 ) -> f64 {
413 if column_stats.is_empty() {
414 return 0.0;
415 }
416
417 let total_confidence: f64 = column_stats.values().map(|stat| stat.type_confidence).sum();
418
419 total_confidence / column_stats.len() as f64
420 }
421
422 async fn get_cached_schema(&self, cache_key: &str) -> Option<DiscoveredSchema> {
425 let cache = self.schema_cache.read().await;
426 if let Some((schema, cached_at)) = cache.get(cache_key) {
427 let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
428 if cached_at.elapsed() < ttl {
429 return Some(schema.clone());
430 }
431 }
432 None
433 }
434
435 async fn cache_schema(&self, cache_key: String, schema: DiscoveredSchema) {
436 let mut cache = self.schema_cache.write().await;
437 cache.insert(cache_key, (schema, Instant::now()));
438
439 if cache.len() > 100 {
441 let oldest_key = cache
442 .iter()
443 .min_by_key(|(_, (_, time))| time)
444 .map(|(key, _)| key.clone());
445
446 if let Some(key) = oldest_key {
447 cache.remove(&key);
448 }
449 }
450 }
451
452 fn merge_header_schema(
455 &self,
456 column_data: &mut HashMap<String, ColumnData>,
457 header_columns: HashMap<String, ColumnInfo>,
458 ) {
459 for (name, column_info) in header_columns {
460 let entry = column_data.entry(name).or_insert_with(ColumnData::new);
461 entry.header_info = Some(column_info);
462 }
463 }
464
465 fn analyze_sampled_data(
466 &self,
467 column_data: &mut HashMap<String, ColumnData>,
468 samples: Vec<HashMap<String, Value>>,
469 ) {
470 for sample in samples {
471 for (column_name, value) in sample {
472 let entry = column_data
473 .entry(column_name)
474 .or_insert_with(ColumnData::new);
475 entry.add_sample_value(value);
476 }
477 }
478 }
479}
480
481#[derive(Debug)]
483struct ColumnData {
484 header_info: Option<ColumnInfo>,
486 sample_values: Vec<Value>,
488 unique_values: BTreeMap<String, usize>,
490 null_count: usize,
492 min_value: Option<Value>,
494 max_value: Option<Value>,
495 detected_patterns: Vec<String>,
497 type_frequency: HashMap<String, usize>,
499}
500
501impl ColumnData {
502 fn new() -> Self {
503 Self {
504 header_info: None,
505 sample_values: Vec::new(),
506 unique_values: BTreeMap::new(),
507 null_count: 0,
508 min_value: None,
509 max_value: None,
510 detected_patterns: Vec::new(),
511 type_frequency: HashMap::new(),
512 }
513 }
514
515 fn add_sample_value(&mut self, value: Value) {
516 if value == Value::Null {
517 self.null_count += 1;
518 } else {
519 let type_name = value.type_name();
521 *self.type_frequency.entry(type_name).or_insert(0) += 1;
522
523 if self.unique_values.len() < 1000 {
525 let value_str = format!("{:?}", value);
526 *self.unique_values.entry(value_str).or_insert(0) += 1;
527 }
528
529 if self.min_value.is_none() || Some(&value) < self.min_value.as_ref() {
531 self.min_value = Some(value.clone());
532 }
533 if self.max_value.is_none() || Some(&value) > self.max_value.as_ref() {
534 self.max_value = Some(value.clone());
535 }
536
537 self.sample_values.push(value);
538 }
539 }
540
541 fn calculate_type_confidence(&self) -> f64 {
542 if self.type_frequency.is_empty() {
543 return 0.0;
544 }
545
546 let total_samples = self.type_frequency.values().sum::<usize>();
547 let max_frequency = *self.type_frequency.values().max().unwrap_or(&0);
548
549 max_frequency as f64 / total_samples as f64
550 }
551
552 fn calculate_null_percentage(&self) -> f64 {
553 let total = self.sample_values.len() + self.null_count;
554 if total == 0 {
555 0.0
556 } else {
557 self.null_count as f64 / total as f64
558 }
559 }
560
561 fn calculate_average_size(&self) -> f64 {
562 if self.sample_values.is_empty() {
563 0.0
564 } else {
565 let total_size: usize = self.sample_values.iter().map(|v| v.estimate_size()).sum();
566 total_size as f64 / self.sample_values.len() as f64
567 }
568 }
569}
570
571struct TypeInferenceEngine;
573
574impl TypeInferenceEngine {
575 fn new() -> Self {
576 Self
577 }
578
579 async fn infer_column_type(&self, column_data: &ColumnData) -> DataType {
580 if let Some(ref header_info) = column_data.header_info {
582 return self.convert_cql_type_to_data_type(&header_info.column_type);
583 }
584
585 if let Some(most_common_type) = column_data
587 .type_frequency
588 .iter()
589 .max_by_key(|(_, count)| *count)
590 .map(|(type_name, _)| type_name)
591 {
592 return self.string_to_data_type(most_common_type);
593 }
594
595 DataType::Text }
597
598 fn convert_cql_type_to_data_type(&self, type_name: &str) -> DataType {
599 match type_name.to_lowercase().as_str() {
600 "text" | "varchar" | "ascii" => DataType::Text,
601 "int" => DataType::Integer,
602 "bigint" => DataType::BigInt,
603 "boolean" => DataType::Boolean,
604 "double" => DataType::Float,
605 "float" => DataType::Float,
606 "uuid" => DataType::Uuid,
607 "timestamp" => DataType::Timestamp,
608 "blob" => DataType::Blob,
609 _ => DataType::Text,
610 }
611 }
612
613 fn string_to_data_type(&self, type_name: &str) -> DataType {
614 match type_name {
615 "Text" => DataType::Text,
616 "Integer" => DataType::Integer,
617 "Float" => DataType::Float,
618 "Boolean" => DataType::Boolean,
619 _ => DataType::Text,
620 }
621 }
622}
623
624struct SchemaValidator;
626
627impl SchemaValidator {
628 fn new() -> Self {
629 Self
630 }
631
632 async fn validate_schema(
633 &self,
634 _schema: &TableSchema,
635 column_stats: &HashMap<String, ColumnStatistics>,
636 ) -> ValidationStatus {
637 let mut warnings = 0;
638 let mut errors = 0;
639
640 for stat in column_stats.values() {
642 if stat.type_confidence < 0.5 {
643 warnings += 1;
644 }
645 if stat.type_confidence < 0.3 {
646 errors += 1;
647 }
648 }
649
650 if errors > 0 {
651 ValidationStatus::Invalid
652 } else if warnings > 0 {
653 ValidationStatus::WarningsPresent
654 } else {
655 ValidationStatus::Valid
656 }
657 }
658}
659
660trait ValueExt {
662 fn type_name(&self) -> String;
663 fn estimate_size(&self) -> usize;
664}
665
666impl ValueExt for Value {
667 fn type_name(&self) -> String {
668 match self {
669 Value::Null => "Null".to_string(),
670 Value::Text(_) => "Text".to_string(),
671 Value::Integer(_) => "Integer".to_string(),
672 Value::BigInt(_) => "BigInteger".to_string(),
673 Value::Counter(_) => "Counter".to_string(),
674 Value::Float(_) => "Float".to_string(),
675 Value::Boolean(_) => "Boolean".to_string(),
676 Value::Uuid(_) => "UUID".to_string(),
677 Value::Timestamp(_) => "Timestamp".to_string(),
678 Value::Date(_) => "Date".to_string(),
679 Value::Time(_) => "Time".to_string(),
680 Value::Inet(_) => "Inet".to_string(),
681 Value::Blob(_) => "Blob".to_string(),
682 Value::List(_) => "List".to_string(),
683 Value::Set(_) => "Set".to_string(),
684 Value::Map(_) => "Map".to_string(),
685 Value::Json(_) => "JSON".to_string(),
686 Value::TinyInt(_) => "TinyInt".to_string(),
687 Value::SmallInt(_) => "SmallInt".to_string(),
688 Value::Float32(_) => "Float32".to_string(),
689 Value::Tuple(_) => "Tuple".to_string(),
690 Value::Udt(_) => "UDT".to_string(),
691 Value::Frozen(_) => "Frozen".to_string(),
692 Value::Varint(_) => "Varint".to_string(),
693 Value::Decimal { .. } => "Decimal".to_string(),
694 Value::Duration { .. } => "Duration".to_string(),
695 Value::Tombstone(_) => "Tombstone".to_string(),
696 }
697 }
698
699 fn estimate_size(&self) -> usize {
700 match self {
701 Value::Null => 0,
702 Value::Text(s) => s.len(),
703 Value::Integer(_) => 4,
704 Value::BigInt(_) => 8,
705 Value::Counter(_) => 8,
706 Value::Float(_) => 8,
707 Value::Boolean(_) => 1,
708 Value::Uuid(_) => 16,
709 Value::Timestamp(_) => 8,
710 Value::Date(_) => 4,
711 Value::Time(_) => 8,
712 Value::Inet(bytes) => bytes.len(),
713 Value::Blob(b) => b.len(),
714 Value::List(items) => items.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
715 Value::Set(items) => items.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
716 Value::Map(map) => {
717 map.iter()
718 .map(|(k, v)| k.estimate_size() + v.estimate_size())
719 .sum::<usize>()
720 + 16
721 }
722 Value::Json(_) => 64, Value::TinyInt(_) => 1,
724 Value::SmallInt(_) => 2,
725 Value::Float32(_) => 4,
726 Value::Tuple(t) => t.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
727 Value::Udt(_) => 32, Value::Frozen(f) => f.estimate_size(), Value::Varint(data) => data.len(),
730 Value::Decimal { unscaled, .. } => 4 + unscaled.len(), Value::Duration { .. } => 12, Value::Tombstone(_) => 8, }
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740 use tempfile::TempDir;
741
742 #[tokio::test]
743 async fn test_schema_discovery_creation() {
744 let _temp_dir = TempDir::new().unwrap();
745 let config = SchemaDiscoveryConfig::default();
746 let core_config = Config::default();
747 let platform = Arc::new(Platform::new(&core_config).await.unwrap());
748
749 let discovery = SchemaDiscovery::new(config, platform, core_config)
750 .await
751 .unwrap();
752
753 assert!(!discovery.config.cache_schemas || discovery.schema_cache.read().await.is_empty());
755 }
756
757 #[test]
758 fn test_column_data_analysis() {
759 let mut column_data = ColumnData::new();
760
761 column_data.add_sample_value(Value::Text("test1".to_string()));
763 column_data.add_sample_value(Value::Text("test2".to_string()));
764 column_data.add_sample_value(Value::Null);
765 column_data.add_sample_value(Value::Text("test3".to_string()));
766
767 assert_eq!(column_data.calculate_null_percentage(), 0.25); assert_eq!(column_data.unique_values.len(), 3); assert!(column_data.calculate_type_confidence() > 0.7); }
772}