Skip to main content

cqlite_core/storage/sstable/
statistics_reader.rs

1//! Statistics.db file reader for enhanced SSTable metadata
2//!
3//! This module provides a high-level interface for reading and analyzing
4//! Statistics.db files that accompany SSTable Data.db files in Cassandra 5+.
5
6use crate::{
7    error::{Error, Result},
8    parser::enhanced_statistics_parser::parse_statistics_with_fallback,
9    parser::statistics::{SSTableStatistics, StatisticsAnalyzer, StatisticsSummary},
10    platform::Platform,
11};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use tokio::fs::File;
15use tokio::io::AsyncReadExt;
16
17/// Calculate CRC32 checksum for data validation
18fn crc32_checksum(data: &[u8]) -> u32 {
19    let mut crc = 0xffff_ffff_u32;
20
21    for &byte in data {
22        crc ^= byte as u32;
23        for _ in 0..8 {
24            if crc & 1 != 0 {
25                crc = (crc >> 1) ^ 0xedb8_8320_u32;
26            } else {
27                crc >>= 1;
28            }
29        }
30    }
31
32    !crc
33}
34
35/// High-level Statistics.db file reader
36pub struct StatisticsReader {
37    /// Path to the Statistics.db file
38    file_path: PathBuf,
39    /// Parsed statistics data
40    statistics: SSTableStatistics,
41    /// Platform abstraction for file operations
42    #[allow(dead_code)]
43    platform: Arc<Platform>,
44}
45
46impl StatisticsReader {
47    /// Open and parse a Statistics.db file
48    pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
49        if !platform.fs().exists(path).await? {
50            return Err(Error::not_found(format!(
51                "Statistics.db file not found: {}",
52                path.display()
53            )));
54        }
55
56        // Read the entire file (Statistics.db files are typically small)
57        let mut file = File::open(path).await?;
58        let mut buffer = Vec::new();
59        file.read_to_end(&mut buffer).await?;
60
61        // Parse the statistics data using enhanced parser with fallback.
62        // Gates are not available at StatisticsReader construction time (the reader
63        // is opened before SSTableReader has gates). Pass None here; nb-compatible
64        // defaults apply. VG3 will wire gates through SSTableReader instead.
65        let statistics = match parse_statistics_with_fallback(&buffer, None) {
66            Ok((_, stats)) => stats,
67            Err(e) => {
68                return Err(Error::corruption(format!(
69                    "Failed to parse Statistics.db with enhanced parser: {:?}",
70                    e
71                )));
72            }
73        };
74
75        // Checksum validation: DEFERRED TO M2 MILESTONE
76        //
77        // The nb-format Statistics.db checksum algorithm is not yet implemented (Issue #28).
78        // The checksum field (header.checksum) contains a value but we cannot validate it
79        // without knowing the exact algorithm Cassandra 5.0+ uses for this file format.
80        //
81        // Known limitations:
82        // - Files with corrupt data may be silently accepted
83        // - No data integrity guarantee for Statistics.db parsing
84        // - M2 milestone will implement proper CRC32/Adler32/other validation
85        //
86        // This limitation is explicitly documented rather than silently ignored.
87        // Statistics.db is metadata-only (not critical path) so risk is acceptable for M1.
88        if statistics.header.checksum != 0 {
89            // Checksum present but not validated - this is a known M1 limitation
90        }
91
92        Ok(Self {
93            file_path: path.to_path_buf(),
94            statistics,
95            platform,
96        })
97    }
98
99    /// Get the raw statistics data
100    pub fn statistics(&self) -> &SSTableStatistics {
101        &self.statistics
102    }
103
104    /// Get a human-readable summary analysis
105    pub fn analyze(&self) -> StatisticsSummary {
106        StatisticsAnalyzer::analyze(&self.statistics)
107    }
108
109    /// Get the file path
110    pub fn file_path(&self) -> &Path {
111        &self.file_path
112    }
113
114    /// Validate checksum for parsed statistics
115    pub async fn validate_checksum(&self) -> Result<bool> {
116        // Read the raw file data for checksum calculation
117        let mut file = File::open(&self.file_path).await?;
118        let mut buffer = Vec::new();
119        file.read_to_end(&mut buffer).await?;
120
121        if buffer.len() < 4 {
122            return Err(Error::corruption(
123                "Statistics file too small for checksum validation".to_string(),
124            ));
125        }
126
127        // Extract the stored checksum from header
128        let stored_checksum = self.statistics.header.checksum;
129
130        // Calculate CRC32 checksum of the data section (excluding the checksum field itself)
131        let data_section = if buffer.len() >= 32 {
132            &buffer[28..buffer.len() - 4] // Skip header to checksum field, then skip checksum
133        } else {
134            return Err(Error::corruption(
135                "Invalid Statistics file format for checksum validation".to_string(),
136            ));
137        };
138
139        let calculated_checksum = crc32_checksum(data_section);
140
141        Ok(calculated_checksum == stored_checksum)
142    }
143
144    /// Check if the Statistics.db corresponds to a specific table
145    pub fn matches_table(&self, table_id: &[u8; 16]) -> bool {
146        if let Some(ref stats_table_id) = self.statistics.header.table_id {
147            stats_table_id == table_id
148        } else {
149            false // Cannot match without table ID
150        }
151    }
152
153    /// Get row count information
154    pub fn row_count(&self) -> u64 {
155        self.statistics.row_stats.total_rows
156    }
157
158    /// Get live row count (excluding tombstones)
159    pub fn live_row_count(&self) -> u64 {
160        self.statistics.row_stats.live_rows
161    }
162
163    /// Get timestamp range in microseconds
164    pub fn timestamp_range(&self) -> (i64, i64) {
165        (
166            self.statistics.timestamp_stats.min_timestamp,
167            self.statistics.timestamp_stats.max_timestamp,
168        )
169    }
170
171    /// Get compression information
172    pub fn compression_info(&self) -> (&str, f64) {
173        (
174            &self.statistics.compression_stats.algorithm,
175            self.statistics.compression_stats.ratio,
176        )
177    }
178
179    /// Get partition statistics
180    pub fn partition_info(&self) -> (u64, f64, u64) {
181        (
182            self.statistics.partition_stats.min_partition_size,
183            self.statistics.partition_stats.avg_partition_size,
184            self.statistics.partition_stats.max_partition_size,
185        )
186    }
187
188    /// Get column statistics by name
189    pub fn column_stats(
190        &self,
191        column_name: &str,
192    ) -> Option<&crate::parser::statistics::ColumnStatistics> {
193        self.statistics
194            .column_stats
195            .iter()
196            .find(|col| col.name == column_name)
197    }
198
199    /// Get all column names with statistics
200    pub fn column_names(&self) -> Vec<&str> {
201        self.statistics
202            .column_stats
203            .iter()
204            .map(|col| col.name.as_str())
205            .collect()
206    }
207
208    /// Check if data has TTL information
209    pub fn has_ttl_data(&self) -> bool {
210        self.statistics.timestamp_stats.rows_with_ttl > 0
211    }
212
213    /// Get disk space usage information
214    pub fn disk_usage(&self) -> (u64, u64, f64) {
215        (
216            self.statistics.table_stats.compressed_size,
217            self.statistics.table_stats.uncompressed_size,
218            self.statistics.table_stats.compression_ratio,
219        )
220    }
221
222    /// Generate a detailed report
223    pub fn generate_report(&self, include_column_details: bool) -> String {
224        let mut report = String::new();
225        let summary = self.analyze();
226
227        report.push_str("# SSTable Statistics Report\n\n");
228
229        // Overview section
230        report.push_str("## Overview\n");
231        report.push_str(&format!("- **Total Rows**: {}\n", summary.total_rows));
232        report.push_str(&format!(
233            "- **Live Data**: {:.2}%\n",
234            summary.live_data_percentage
235        ));
236        report.push_str(&format!(
237            "- **Compression Efficiency**: {:.2}%\n",
238            summary.compression_efficiency
239        ));
240        report.push_str(&format!(
241            "- **Time Range**: {:.1} days\n",
242            summary.timestamp_range_days
243        ));
244        report.push_str(&format!(
245            "- **Largest Partition**: {:.2} MB\n",
246            summary.largest_partition_mb
247        ));
248        report.push_str(&format!(
249            "- **Health Score**: {:.1}/100\n\n",
250            summary.health_score
251        ));
252
253        // Row statistics
254        report.push_str("## Row Statistics\n");
255        report.push_str(&format!(
256            "- Total rows: {}\n",
257            self.statistics.row_stats.total_rows
258        ));
259        report.push_str(&format!(
260            "- Live rows: {}\n",
261            self.statistics.row_stats.live_rows
262        ));
263        report.push_str(&format!(
264            "- Tombstones: {}\n",
265            self.statistics.row_stats.tombstone_count
266        ));
267        report.push_str(&format!(
268            "- Partitions: {}\n",
269            self.statistics.row_stats.partition_count
270        ));
271        report.push_str(&format!(
272            "- Average rows per partition: {:.1}\n\n",
273            self.statistics.row_stats.avg_rows_per_partition
274        ));
275
276        // Timestamp information
277        if self.statistics.timestamp_stats.min_timestamp != 0 {
278            let min_time = chrono::DateTime::from_timestamp_micros(
279                self.statistics.timestamp_stats.min_timestamp,
280            );
281            let max_time = chrono::DateTime::from_timestamp_micros(
282                self.statistics.timestamp_stats.max_timestamp,
283            );
284
285            report.push_str("## Timestamp Range\n");
286            if let (Some(min), Some(max)) = (min_time, max_time) {
287                report.push_str(&format!(
288                    "- From: {}\n",
289                    min.format("%Y-%m-%d %H:%M:%S UTC")
290                ));
291                report.push_str(&format!("- To: {}\n", max.format("%Y-%m-%d %H:%M:%S UTC")));
292            } else {
293                report.push_str(&format!(
294                    "- Min timestamp: {}\n",
295                    self.statistics.timestamp_stats.min_timestamp
296                ));
297                report.push_str(&format!(
298                    "- Max timestamp: {}\n",
299                    self.statistics.timestamp_stats.max_timestamp
300                ));
301            }
302
303            if self.has_ttl_data() {
304                report.push_str(&format!(
305                    "- Rows with TTL: {}\n",
306                    self.statistics.timestamp_stats.rows_with_ttl
307                ));
308            }
309            report.push('\n');
310        }
311
312        // Compression statistics
313        report.push_str("## Compression\n");
314        report.push_str(&format!(
315            "- Algorithm: {}\n",
316            self.statistics.compression_stats.algorithm
317        ));
318        report.push_str(&format!(
319            "- Original size: {:.2} MB\n",
320            self.statistics.compression_stats.original_size as f64 / 1_048_576.0
321        ));
322        report.push_str(&format!(
323            "- Compressed size: {:.2} MB\n",
324            self.statistics.compression_stats.compressed_size as f64 / 1_048_576.0
325        ));
326        report.push_str(&format!(
327            "- Ratio: {:.2}%\n",
328            self.statistics.compression_stats.ratio * 100.0
329        ));
330        report.push_str(&format!(
331            "- Speed: {:.1} MB/s (compress), {:.1} MB/s (decompress)\n\n",
332            self.statistics.compression_stats.compression_speed,
333            self.statistics.compression_stats.decompression_speed
334        ));
335
336        // Partition statistics
337        report.push_str("## Partition Distribution\n");
338        report.push_str(&format!(
339            "- Average size: {:.2} KB\n",
340            self.statistics.partition_stats.avg_partition_size / 1024.0
341        ));
342        report.push_str(&format!(
343            "- Range: {:.2} KB - {:.2} MB\n",
344            self.statistics.partition_stats.min_partition_size as f64 / 1024.0,
345            self.statistics.partition_stats.max_partition_size as f64 / 1_048_576.0
346        ));
347        report.push_str(&format!(
348            "- Large partitions (>1MB): {:.1}%\n\n",
349            self.statistics.partition_stats.large_partition_percentage
350        ));
351
352        // Column statistics
353        if include_column_details && !self.statistics.column_stats.is_empty() {
354            report.push_str("## Column Statistics\n");
355            for column in &self.statistics.column_stats {
356                report.push_str(&format!("### {}\n", column.name));
357                report.push_str(&format!("- Type: {}\n", column.column_type));
358                report.push_str(&format!("- Values: {}\n", column.value_count));
359                report.push_str(&format!("- Nulls: {}\n", column.null_count));
360                report.push_str(&format!("- Average size: {:.1} bytes\n", column.avg_size));
361                report.push_str(&format!("- Cardinality: {}\n", column.cardinality));
362                if column.has_index {
363                    report.push_str("- **Indexed**: Yes\n");
364                }
365                report.push('\n');
366            }
367        }
368
369        // Performance hints
370        if !summary.query_performance_hints.is_empty() {
371            report.push_str("## Query Performance Hints\n");
372            for hint in &summary.query_performance_hints {
373                report.push_str(&format!("- {}\n", hint));
374            }
375            report.push('\n');
376        }
377
378        // Storage recommendations
379        if !summary.storage_recommendations.is_empty() {
380            report.push_str("## Storage Recommendations\n");
381            for rec in &summary.storage_recommendations {
382                report.push_str(&format!("- {}\n", rec));
383            }
384            report.push('\n');
385        }
386
387        report
388    }
389
390    /// Get a compact summary for CLI display
391    pub fn compact_summary(&self) -> String {
392        let summary = self.analyze();
393        format!(
394            "Rows: {} ({:.1}% live) | Compression: {:.1}% | Health: {:.0}/100 | Size: {:.2} MB",
395            summary.total_rows,
396            summary.live_data_percentage,
397            summary.compression_efficiency,
398            summary.health_score,
399            self.statistics.table_stats.disk_size as f64 / 1_048_576.0
400        )
401    }
402}
403
404/// Utility function to find Statistics.db file for a given Data.db file
405pub async fn find_statistics_file(data_db_path: &Path) -> Option<PathBuf> {
406    if let Some(parent) = data_db_path.parent() {
407        if let Some(stem) = data_db_path.file_stem() {
408            if let Some(stem_str) = stem.to_str() {
409                // Replace "Data.db" with "Statistics.db"
410                let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
411                let stats_path = parent.join(stats_name);
412
413                if tokio::fs::metadata(&stats_path).await.is_ok() {
414                    return Some(stats_path);
415                }
416            }
417        }
418    }
419    None
420}
421
422/// Utility function to check if a Statistics.db file exists for an SSTable directory
423pub async fn check_statistics_availability(sstable_dir: &Path) -> Result<Vec<PathBuf>> {
424    let mut stats_files = Vec::new();
425
426    let mut dir_entries = tokio::fs::read_dir(sstable_dir).await?;
427    while let Some(entry) = dir_entries.next_entry().await? {
428        let path = entry.path();
429        if let Some(file_name) = path.file_name() {
430            if let Some(name_str) = file_name.to_str() {
431                if name_str.contains("-Statistics.db") {
432                    stats_files.push(path);
433                }
434            }
435        }
436    }
437
438    Ok(stats_files)
439}
440
441#[cfg(test)]
442mod tests {
443
444    #[tokio::test]
445    async fn test_statistics_reader_creation() {
446        // This test would require a real Statistics.db file
447        // For now, just test the basic structure
448        assert!(true);
449    }
450
451    #[tokio::test]
452    async fn test_find_statistics_file() {
453        use std::path::PathBuf;
454
455        let data_path = PathBuf::from("/path/to/sstables/users-123abc-Data.db");
456        // find_statistics_file would look for users-123abc-Statistics.db
457        // This is a unit test for the path manipulation logic
458
459        if let Some(_parent) = data_path.parent() {
460            if let Some(stem) = data_path.file_stem() {
461                if let Some(stem_str) = stem.to_str() {
462                    let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
463                    assert_eq!(stats_name, "users-123abc-Statistics.db");
464                }
465            }
466        }
467    }
468}