Skip to main content

cqlite_core/storage/
schema_discovery.rs

1//! Schema Discovery and Metadata Management
2//!
3//! This module provides automatic schema detection and metadata management for
4//! SSTable files, enabling the REPL to understand table structures and provide
5//! intelligent data access capabilities.
6
7use std::collections::{BTreeMap, HashMap};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant, SystemTime};
11
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14
15use crate::{
16    parser::header::{CassandraVersion, ColumnInfo},
17    platform::Platform,
18    schema::{Column, TableSchema},
19    storage::sstable::reader::SSTableReader,
20    types::{DataType, Value},
21    Config, Result,
22};
23
24/// Schema discovery configuration
25#[derive(Debug, Clone)]
26pub struct SchemaDiscoveryConfig {
27    /// Maximum number of rows to sample for type inference
28    pub max_sample_rows: usize,
29    /// Enable aggressive type inference
30    pub aggressive_inference: bool,
31    /// Cache discovered schemas
32    pub cache_schemas: bool,
33    /// Schema cache TTL in seconds
34    pub cache_ttl_seconds: u64,
35    /// Enable schema versioning
36    pub enable_versioning: bool,
37    /// Maximum schema versions to keep
38    pub max_versions: usize,
39}
40
41impl Default for SchemaDiscoveryConfig {
42    fn default() -> Self {
43        Self {
44            max_sample_rows: 1000,
45            aggressive_inference: true,
46            cache_schemas: true,
47            cache_ttl_seconds: 3600, // 1 hour
48            enable_versioning: true,
49            max_versions: 10,
50        }
51    }
52}
53
54/// Discovered schema information
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct DiscoveredSchema {
57    /// Table schema
58    pub schema: TableSchema,
59    /// Discovery metadata
60    pub metadata: SchemaMetadata,
61    /// Column statistics
62    pub column_stats: HashMap<String, ColumnStatistics>,
63    /// Type inference confidence
64    pub inference_confidence: f64,
65    /// Schema validation status
66    pub validation_status: ValidationStatus,
67}
68
69/// Schema discovery metadata
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct SchemaMetadata {
72    /// When schema was discovered
73    pub discovered_at: SystemTime,
74    /// Source SSTable files
75    pub source_files: Vec<PathBuf>,
76    /// Number of rows sampled
77    pub rows_sampled: usize,
78    /// Cassandra version detected
79    pub cassandra_version: Option<CassandraVersion>,
80    /// Discovery method used
81    pub discovery_method: DiscoveryMethod,
82    /// Schema version
83    pub version: u32,
84}
85
86/// Column statistics for type inference
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ColumnStatistics {
89    /// Column name
90    pub name: String,
91    /// Inferred data type
92    pub inferred_type: String,
93    /// Type confidence (0.0 - 1.0)
94    pub type_confidence: f64,
95    /// Null value percentage
96    pub null_percentage: f64,
97    /// Unique value count (sampled)
98    pub unique_values: usize,
99    /// Average value size in bytes
100    pub avg_size_bytes: f64,
101    /// Min/max values (for applicable types)
102    pub min_value: Option<Value>,
103    pub max_value: Option<Value>,
104    /// Common patterns detected
105    pub patterns: Vec<String>,
106}
107
108/// Schema validation status
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub enum ValidationStatus {
111    /// Schema is valid and consistent
112    Valid,
113    /// Schema has minor inconsistencies
114    WarningsPresent,
115    /// Schema has significant issues
116    Invalid,
117    /// Schema could not be validated
118    Unknown,
119}
120
121/// Discovery method used
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub enum DiscoveryMethod {
124    /// From SSTable header metadata
125    HeaderMetadata,
126    /// Inferred from data sampling
127    DataSampling,
128    /// Combination of header and sampling
129    Hybrid,
130    /// From external schema definition
131    External,
132}
133
134/// Schema discovery engine
135#[allow(dead_code)]
136pub struct SchemaDiscovery {
137    /// Configuration
138    config: SchemaDiscoveryConfig,
139    /// Platform abstraction
140    platform: Arc<Platform>,
141    /// Core configuration
142    core_config: Config,
143    /// Schema cache
144    schema_cache: Arc<RwLock<HashMap<String, (DiscoveredSchema, Instant)>>>,
145    /// Type inference engine
146    type_inference: Arc<TypeInferenceEngine>,
147    /// Schema validator
148    validator: Arc<SchemaValidator>,
149}
150
151impl SchemaDiscovery {
152    /// Create a new schema discovery engine
153    pub async fn new(
154        config: SchemaDiscoveryConfig,
155        platform: Arc<Platform>,
156        core_config: Config,
157    ) -> Result<Self> {
158        let type_inference = Arc::new(TypeInferenceEngine::new());
159        let validator = Arc::new(SchemaValidator::new());
160
161        Ok(Self {
162            config,
163            platform,
164            core_config,
165            schema_cache: Arc::new(RwLock::new(HashMap::new())),
166            type_inference,
167            validator,
168        })
169    }
170
171    /// Discover schema for a table from SSTable files
172    pub async fn discover_table_schema(
173        &self,
174        keyspace: &str,
175        table: &str,
176        sstable_files: &[PathBuf],
177    ) -> Result<DiscoveredSchema> {
178        let cache_key = format!("{}.{}", keyspace, table);
179
180        // Check cache first
181        if self.config.cache_schemas {
182            if let Some(cached) = self.get_cached_schema(&cache_key).await {
183                return Ok(cached);
184            }
185        }
186
187        // Perform discovery
188        let discovered = self
189            .perform_schema_discovery(keyspace, table, sstable_files)
190            .await?;
191
192        // Cache the result
193        if self.config.cache_schemas {
194            self.cache_schema(cache_key, discovered.clone()).await;
195        }
196
197        Ok(discovered)
198    }
199
200    /// Perform the actual schema discovery
201    async fn perform_schema_discovery(
202        &self,
203        keyspace: &str,
204        table: &str,
205        sstable_files: &[PathBuf],
206    ) -> Result<DiscoveredSchema> {
207        let start_time = SystemTime::now();
208        let mut source_files = Vec::new();
209        let mut all_column_data = HashMap::new();
210        let mut total_rows_sampled = 0;
211        let mut cassandra_version = None;
212
213        // Analyze each SSTable file
214        for file_path in sstable_files {
215            if let Ok(reader) = self.create_reader(file_path).await {
216                source_files.push(file_path.clone());
217
218                // Try to get schema from header first
219                if let Ok(header_schema) = self.extract_schema_from_header(&reader).await {
220                    if cassandra_version.is_none() {
221                        let header = reader.header();
222                        cassandra_version = Some(header.cassandra_version);
223                    }
224
225                    // Merge with existing column data
226                    self.merge_header_schema(&mut all_column_data, header_schema);
227                }
228
229                // Sample data for type inference
230                let sampled_data = self.sample_table_data(&reader).await?;
231                total_rows_sampled += sampled_data.len();
232
233                // Analyze sampled data
234                self.analyze_sampled_data(&mut all_column_data, sampled_data);
235
236                // Limit total sampling
237                if total_rows_sampled >= self.config.max_sample_rows {
238                    break;
239                }
240            }
241        }
242
243        // Infer final schema
244        let schema = self
245            .infer_table_schema(keyspace, table, &all_column_data)
246            .await?;
247
248        // Calculate column statistics
249        let column_stats = self.calculate_column_statistics(&all_column_data).await;
250
251        // Calculate inference confidence
252        let inference_confidence = self.calculate_inference_confidence(&column_stats);
253
254        // Validate schema
255        let validation_status = self.validator.validate_schema(&schema, &column_stats).await;
256
257        // Determine discovery method
258        let discovery_method = if source_files.is_empty() {
259            DiscoveryMethod::External
260        } else if all_column_data.values().any(|cd| cd.header_info.is_some()) {
261            if total_rows_sampled > 0 {
262                DiscoveryMethod::Hybrid
263            } else {
264                DiscoveryMethod::HeaderMetadata
265            }
266        } else {
267            DiscoveryMethod::DataSampling
268        };
269
270        let metadata = SchemaMetadata {
271            discovered_at: start_time,
272            source_files,
273            rows_sampled: total_rows_sampled,
274            cassandra_version,
275            discovery_method,
276            version: 1,
277        };
278
279        Ok(DiscoveredSchema {
280            schema,
281            metadata,
282            column_stats,
283            inference_confidence,
284            validation_status,
285        })
286    }
287
288    /// Create a reader for the SSTable file
289    async fn create_reader(&self, file_path: &Path) -> Result<SSTableReader> {
290        SSTableReader::open(file_path, &self.core_config, self.platform.clone()).await
291    }
292
293    /// Extract schema information from SSTable header
294    async fn extract_schema_from_header(
295        &self,
296        reader: &SSTableReader,
297    ) -> Result<HashMap<String, ColumnInfo>> {
298        let header = reader.header();
299        let mut columns = HashMap::new();
300
301        for column_def in &header.columns {
302            columns.insert(column_def.name.clone(), column_def.clone());
303        }
304
305        Ok(columns)
306    }
307
308    /// Sample data from the SSTable for type inference
309    async fn sample_table_data(
310        &self,
311        reader: &SSTableReader,
312    ) -> Result<Vec<HashMap<String, Value>>> {
313        // Get column names from the header
314        let header = reader.header();
315        let column_names: Vec<String> = header.columns.iter().map(|col| col.name.clone()).collect();
316
317        // Get all entries and sample up to max_rows
318        let all_entries = reader.get_all_entries().await?;
319
320        // TODO(Issue #190): SSTableReader returns Vec<(TableId, RowKey, Value)> where Value
321        // is typically a single parsed value per entry. For schema discovery type inference,
322        // we map each entry's Value to the first column. Future enhancement: use scan()
323        // with schema-aware parsing for multi-column row data.
324        let samples: Vec<HashMap<String, Value>> = all_entries
325            .into_iter()
326            .take(self.config.max_sample_rows)
327            .filter_map(|(_table_id, _row_key, value)| {
328                // Convert entry to column-value map using actual column names
329                let mut row_data = HashMap::new();
330
331                if !column_names.is_empty() {
332                    // Map the single Value to the first column for type inference
333                    row_data.insert(column_names[0].clone(), value);
334                    Some(row_data)
335                } else {
336                    // If no column names available, skip this entry
337                    None
338                }
339            })
340            .collect();
341
342        Ok(samples)
343    }
344
345    /// Infer table schema from column data
346    async fn infer_table_schema(
347        &self,
348        keyspace: &str,
349        table: &str,
350        column_data: &HashMap<String, ColumnData>,
351    ) -> Result<TableSchema> {
352        let mut columns = Vec::new();
353
354        for (name, data) in column_data {
355            let data_type = self.type_inference.infer_column_type(data).await;
356            let column = Column {
357                name: name.clone(),
358                data_type: data_type.to_string(),
359                nullable: true,
360                default: None,
361                is_static: false,
362            };
363            columns.push(column);
364        }
365
366        // Sort columns by name for consistency
367        columns.sort_by(|a, b| a.name.cmp(&b.name));
368
369        Ok(TableSchema {
370            keyspace: keyspace.to_string(),
371            table: table.to_string(),
372            partition_keys: vec![], // Would need sophisticated analysis
373            clustering_keys: vec![],
374            columns,
375            comments: HashMap::new(),
376        })
377    }
378
379    /// Calculate column statistics
380    async fn calculate_column_statistics(
381        &self,
382        column_data: &HashMap<String, ColumnData>,
383    ) -> HashMap<String, ColumnStatistics> {
384        let mut stats = HashMap::new();
385
386        for (name, data) in column_data {
387            let stat = ColumnStatistics {
388                name: name.clone(),
389                inferred_type: self
390                    .type_inference
391                    .infer_column_type(data)
392                    .await
393                    .to_string(),
394                type_confidence: data.calculate_type_confidence(),
395                null_percentage: data.calculate_null_percentage(),
396                unique_values: data.unique_values.len(),
397                avg_size_bytes: data.calculate_average_size(),
398                min_value: data.min_value.clone(),
399                max_value: data.max_value.clone(),
400                patterns: data.detected_patterns.clone(),
401            };
402            stats.insert(name.clone(), stat);
403        }
404
405        stats
406    }
407
408    /// Calculate overall inference confidence
409    fn calculate_inference_confidence(
410        &self,
411        column_stats: &HashMap<String, ColumnStatistics>,
412    ) -> f64 {
413        if column_stats.is_empty() {
414            return 0.0;
415        }
416
417        let total_confidence: f64 = column_stats.values().map(|stat| stat.type_confidence).sum();
418
419        total_confidence / column_stats.len() as f64
420    }
421
422    // Cache management methods
423
424    async fn get_cached_schema(&self, cache_key: &str) -> Option<DiscoveredSchema> {
425        let cache = self.schema_cache.read().await;
426        if let Some((schema, cached_at)) = cache.get(cache_key) {
427            let ttl = Duration::from_secs(self.config.cache_ttl_seconds);
428            if cached_at.elapsed() < ttl {
429                return Some(schema.clone());
430            }
431        }
432        None
433    }
434
435    async fn cache_schema(&self, cache_key: String, schema: DiscoveredSchema) {
436        let mut cache = self.schema_cache.write().await;
437        cache.insert(cache_key, (schema, Instant::now()));
438
439        // Simple cache eviction
440        if cache.len() > 100 {
441            let oldest_key = cache
442                .iter()
443                .min_by_key(|(_, (_, time))| time)
444                .map(|(key, _)| key.clone());
445
446            if let Some(key) = oldest_key {
447                cache.remove(&key);
448            }
449        }
450    }
451
452    // Helper methods for data processing
453
454    fn merge_header_schema(
455        &self,
456        column_data: &mut HashMap<String, ColumnData>,
457        header_columns: HashMap<String, ColumnInfo>,
458    ) {
459        for (name, column_info) in header_columns {
460            let entry = column_data.entry(name).or_insert_with(ColumnData::new);
461            entry.header_info = Some(column_info);
462        }
463    }
464
465    fn analyze_sampled_data(
466        &self,
467        column_data: &mut HashMap<String, ColumnData>,
468        samples: Vec<HashMap<String, Value>>,
469    ) {
470        for sample in samples {
471            for (column_name, value) in sample {
472                let entry = column_data
473                    .entry(column_name)
474                    .or_insert_with(ColumnData::new);
475                entry.add_sample_value(value);
476            }
477        }
478    }
479}
480
481/// Internal column data for analysis
482#[derive(Debug)]
483struct ColumnData {
484    /// Header information if available
485    header_info: Option<ColumnInfo>,
486    /// Sample values collected
487    sample_values: Vec<Value>,
488    /// Unique values (limited set)
489    unique_values: BTreeMap<String, usize>,
490    /// Null count
491    null_count: usize,
492    /// Min/max values
493    min_value: Option<Value>,
494    max_value: Option<Value>,
495    /// Detected patterns
496    detected_patterns: Vec<String>,
497    /// Type frequency map
498    type_frequency: HashMap<String, usize>,
499}
500
501impl ColumnData {
502    fn new() -> Self {
503        Self {
504            header_info: None,
505            sample_values: Vec::new(),
506            unique_values: BTreeMap::new(),
507            null_count: 0,
508            min_value: None,
509            max_value: None,
510            detected_patterns: Vec::new(),
511            type_frequency: HashMap::new(),
512        }
513    }
514
515    fn add_sample_value(&mut self, value: Value) {
516        if value == Value::Null {
517            self.null_count += 1;
518        } else {
519            // Track type frequency
520            let type_name = value.type_name();
521            *self.type_frequency.entry(type_name).or_insert(0) += 1;
522
523            // Track unique values (limited)
524            if self.unique_values.len() < 1000 {
525                let value_str = format!("{:?}", value);
526                *self.unique_values.entry(value_str).or_insert(0) += 1;
527            }
528
529            // Update min/max
530            if self.min_value.is_none() || Some(&value) < self.min_value.as_ref() {
531                self.min_value = Some(value.clone());
532            }
533            if self.max_value.is_none() || Some(&value) > self.max_value.as_ref() {
534                self.max_value = Some(value.clone());
535            }
536
537            self.sample_values.push(value);
538        }
539    }
540
541    fn calculate_type_confidence(&self) -> f64 {
542        if self.type_frequency.is_empty() {
543            return 0.0;
544        }
545
546        let total_samples = self.type_frequency.values().sum::<usize>();
547        let max_frequency = *self.type_frequency.values().max().unwrap_or(&0);
548
549        max_frequency as f64 / total_samples as f64
550    }
551
552    fn calculate_null_percentage(&self) -> f64 {
553        let total = self.sample_values.len() + self.null_count;
554        if total == 0 {
555            0.0
556        } else {
557            self.null_count as f64 / total as f64
558        }
559    }
560
561    fn calculate_average_size(&self) -> f64 {
562        if self.sample_values.is_empty() {
563            0.0
564        } else {
565            let total_size: usize = self.sample_values.iter().map(|v| v.estimate_size()).sum();
566            total_size as f64 / self.sample_values.len() as f64
567        }
568    }
569}
570
571/// Type inference engine
572struct TypeInferenceEngine;
573
574impl TypeInferenceEngine {
575    fn new() -> Self {
576        Self
577    }
578
579    async fn infer_column_type(&self, column_data: &ColumnData) -> DataType {
580        // If we have header information, use it
581        if let Some(ref header_info) = column_data.header_info {
582            return self.convert_cql_type_to_data_type(&header_info.column_type);
583        }
584
585        // Otherwise, infer from sample data
586        if let Some(most_common_type) = column_data
587            .type_frequency
588            .iter()
589            .max_by_key(|(_, count)| *count)
590            .map(|(type_name, _)| type_name)
591        {
592            return self.string_to_data_type(most_common_type);
593        }
594
595        DataType::Text // Default fallback
596    }
597
598    fn convert_cql_type_to_data_type(&self, type_name: &str) -> DataType {
599        match type_name.to_lowercase().as_str() {
600            "text" | "varchar" | "ascii" => DataType::Text,
601            "int" => DataType::Integer,
602            "bigint" => DataType::BigInt,
603            "boolean" => DataType::Boolean,
604            "double" => DataType::Float,
605            "float" => DataType::Float,
606            "uuid" => DataType::Uuid,
607            "timestamp" => DataType::Timestamp,
608            "blob" => DataType::Blob,
609            _ => DataType::Text,
610        }
611    }
612
613    fn string_to_data_type(&self, type_name: &str) -> DataType {
614        match type_name {
615            "Text" => DataType::Text,
616            "Integer" => DataType::Integer,
617            "Float" => DataType::Float,
618            "Boolean" => DataType::Boolean,
619            _ => DataType::Text,
620        }
621    }
622}
623
624/// Schema validator
625struct SchemaValidator;
626
627impl SchemaValidator {
628    fn new() -> Self {
629        Self
630    }
631
632    async fn validate_schema(
633        &self,
634        _schema: &TableSchema,
635        column_stats: &HashMap<String, ColumnStatistics>,
636    ) -> ValidationStatus {
637        let mut warnings = 0;
638        let mut errors = 0;
639
640        // Check type confidence
641        for stat in column_stats.values() {
642            if stat.type_confidence < 0.5 {
643                warnings += 1;
644            }
645            if stat.type_confidence < 0.3 {
646                errors += 1;
647            }
648        }
649
650        if errors > 0 {
651            ValidationStatus::Invalid
652        } else if warnings > 0 {
653            ValidationStatus::WarningsPresent
654        } else {
655            ValidationStatus::Valid
656        }
657    }
658}
659
660// Extension trait for Value to add helper methods
661trait ValueExt {
662    fn type_name(&self) -> String;
663    fn estimate_size(&self) -> usize;
664}
665
666impl ValueExt for Value {
667    fn type_name(&self) -> String {
668        match self {
669            Value::Null => "Null".to_string(),
670            Value::Text(_) => "Text".to_string(),
671            Value::Integer(_) => "Integer".to_string(),
672            Value::BigInt(_) => "BigInteger".to_string(),
673            Value::Counter(_) => "Counter".to_string(),
674            Value::Float(_) => "Float".to_string(),
675            Value::Boolean(_) => "Boolean".to_string(),
676            Value::Uuid(_) => "UUID".to_string(),
677            Value::Timestamp(_) => "Timestamp".to_string(),
678            Value::Date(_) => "Date".to_string(),
679            Value::Time(_) => "Time".to_string(),
680            Value::Inet(_) => "Inet".to_string(),
681            Value::Blob(_) => "Blob".to_string(),
682            Value::List(_) => "List".to_string(),
683            Value::Set(_) => "Set".to_string(),
684            Value::Map(_) => "Map".to_string(),
685            Value::Json(_) => "JSON".to_string(),
686            Value::TinyInt(_) => "TinyInt".to_string(),
687            Value::SmallInt(_) => "SmallInt".to_string(),
688            Value::Float32(_) => "Float32".to_string(),
689            Value::Tuple(_) => "Tuple".to_string(),
690            Value::Udt(_) => "UDT".to_string(),
691            Value::Frozen(_) => "Frozen".to_string(),
692            Value::Varint(_) => "Varint".to_string(),
693            Value::Decimal { .. } => "Decimal".to_string(),
694            Value::Duration { .. } => "Duration".to_string(),
695            Value::Tombstone(_) => "Tombstone".to_string(),
696        }
697    }
698
699    fn estimate_size(&self) -> usize {
700        match self {
701            Value::Null => 0,
702            Value::Text(s) => s.len(),
703            Value::Integer(_) => 4,
704            Value::BigInt(_) => 8,
705            Value::Counter(_) => 8,
706            Value::Float(_) => 8,
707            Value::Boolean(_) => 1,
708            Value::Uuid(_) => 16,
709            Value::Timestamp(_) => 8,
710            Value::Date(_) => 4,
711            Value::Time(_) => 8,
712            Value::Inet(bytes) => bytes.len(),
713            Value::Blob(b) => b.len(),
714            Value::List(items) => items.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
715            Value::Set(items) => items.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
716            Value::Map(map) => {
717                map.iter()
718                    .map(|(k, v)| k.estimate_size() + v.estimate_size())
719                    .sum::<usize>()
720                    + 16
721            }
722            Value::Json(_) => 64, // JSON estimate
723            Value::TinyInt(_) => 1,
724            Value::SmallInt(_) => 2,
725            Value::Float32(_) => 4,
726            Value::Tuple(t) => t.iter().map(|v| v.estimate_size()).sum::<usize>() + 8,
727            Value::Udt(_) => 32,                   // UDT estimate
728            Value::Frozen(f) => f.estimate_size(), // Recursive
729            Value::Varint(data) => data.len(),
730            Value::Decimal { unscaled, .. } => 4 + unscaled.len(), // scale + data
731            Value::Duration { .. } => 12,                          // 3 * 4 bytes
732            Value::Tombstone(_) => 8,                              // Tombstone marker
733        }
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740    use tempfile::TempDir;
741
742    #[tokio::test]
743    async fn test_schema_discovery_creation() {
744        let _temp_dir = TempDir::new().unwrap();
745        let config = SchemaDiscoveryConfig::default();
746        let core_config = Config::default();
747        let platform = Arc::new(Platform::new(&core_config).await.unwrap());
748
749        let discovery = SchemaDiscovery::new(config, platform, core_config)
750            .await
751            .unwrap();
752
753        // Test that it was created successfully
754        assert!(!discovery.config.cache_schemas || discovery.schema_cache.read().await.is_empty());
755    }
756
757    #[test]
758    fn test_column_data_analysis() {
759        let mut column_data = ColumnData::new();
760
761        // Add some sample values
762        column_data.add_sample_value(Value::Text("test1".to_string()));
763        column_data.add_sample_value(Value::Text("test2".to_string()));
764        column_data.add_sample_value(Value::Null);
765        column_data.add_sample_value(Value::Text("test3".to_string()));
766
767        // Test calculations
768        assert_eq!(column_data.calculate_null_percentage(), 0.25); // 1 null out of 4 values
769        assert_eq!(column_data.unique_values.len(), 3); // 3 unique text values
770        assert!(column_data.calculate_type_confidence() > 0.7); // Mostly text values
771    }
772}