use crate::{
error::{Error, Result},
parser::enhanced_statistics_parser::parse_statistics_with_fallback,
parser::statistics::{SSTableStatistics, StatisticsAnalyzer, StatisticsSummary},
platform::Platform,
};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
fn crc32_checksum(data: &[u8]) -> u32 {
let mut crc = 0xffff_ffff_u32;
for &byte in data {
crc ^= byte as u32;
for _ in 0..8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ 0xedb8_8320_u32;
} else {
crc >>= 1;
}
}
}
!crc
}
pub struct StatisticsReader {
file_path: PathBuf,
statistics: SSTableStatistics,
#[allow(dead_code)]
platform: Arc<Platform>,
}
impl StatisticsReader {
pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
if !platform.fs().exists(path).await? {
return Err(Error::not_found(format!(
"Statistics.db file not found: {}",
path.display()
)));
}
let mut file = File::open(path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let statistics = match parse_statistics_with_fallback(&buffer, None) {
Ok((_, stats)) => stats,
Err(e) => {
return Err(Error::corruption(format!(
"Failed to parse Statistics.db with enhanced parser: {:?}",
e
)));
}
};
if statistics.header.checksum != 0 {
}
Ok(Self {
file_path: path.to_path_buf(),
statistics,
platform,
})
}
pub fn statistics(&self) -> &SSTableStatistics {
&self.statistics
}
pub fn analyze(&self) -> StatisticsSummary {
StatisticsAnalyzer::analyze(&self.statistics)
}
pub fn file_path(&self) -> &Path {
&self.file_path
}
pub async fn validate_checksum(&self) -> Result<bool> {
let mut file = File::open(&self.file_path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
if buffer.len() < 4 {
return Err(Error::corruption(
"Statistics file too small for checksum validation".to_string(),
));
}
let stored_checksum = self.statistics.header.checksum;
let data_section = if buffer.len() >= 32 {
&buffer[28..buffer.len() - 4] } else {
return Err(Error::corruption(
"Invalid Statistics file format for checksum validation".to_string(),
));
};
let calculated_checksum = crc32_checksum(data_section);
Ok(calculated_checksum == stored_checksum)
}
pub fn matches_table(&self, table_id: &[u8; 16]) -> bool {
if let Some(ref stats_table_id) = self.statistics.header.table_id {
stats_table_id == table_id
} else {
false }
}
pub fn row_count(&self) -> u64 {
self.statistics.row_stats.total_rows
}
pub fn live_row_count(&self) -> u64 {
self.statistics.row_stats.live_rows
}
pub fn timestamp_range(&self) -> (i64, i64) {
(
self.statistics.timestamp_stats.min_timestamp,
self.statistics.timestamp_stats.max_timestamp,
)
}
pub fn compression_info(&self) -> (&str, f64) {
(
&self.statistics.compression_stats.algorithm,
self.statistics.compression_stats.ratio,
)
}
pub fn partition_info(&self) -> (u64, f64, u64) {
(
self.statistics.partition_stats.min_partition_size,
self.statistics.partition_stats.avg_partition_size,
self.statistics.partition_stats.max_partition_size,
)
}
pub fn column_stats(
&self,
column_name: &str,
) -> Option<&crate::parser::statistics::ColumnStatistics> {
self.statistics
.column_stats
.iter()
.find(|col| col.name == column_name)
}
pub fn column_names(&self) -> Vec<&str> {
self.statistics
.column_stats
.iter()
.map(|col| col.name.as_str())
.collect()
}
pub fn has_ttl_data(&self) -> bool {
self.statistics.timestamp_stats.rows_with_ttl > 0
}
pub fn disk_usage(&self) -> (u64, u64, f64) {
(
self.statistics.table_stats.compressed_size,
self.statistics.table_stats.uncompressed_size,
self.statistics.table_stats.compression_ratio,
)
}
pub fn generate_report(&self, include_column_details: bool) -> String {
let mut report = String::new();
let summary = self.analyze();
report.push_str("# SSTable Statistics Report\n\n");
report.push_str("## Overview\n");
report.push_str(&format!("- **Total Rows**: {}\n", summary.total_rows));
report.push_str(&format!(
"- **Live Data**: {:.2}%\n",
summary.live_data_percentage
));
report.push_str(&format!(
"- **Compression Efficiency**: {:.2}%\n",
summary.compression_efficiency
));
report.push_str(&format!(
"- **Time Range**: {:.1} days\n",
summary.timestamp_range_days
));
report.push_str(&format!(
"- **Largest Partition**: {:.2} MB\n",
summary.largest_partition_mb
));
report.push_str(&format!(
"- **Health Score**: {:.1}/100\n\n",
summary.health_score
));
report.push_str("## Row Statistics\n");
report.push_str(&format!(
"- Total rows: {}\n",
self.statistics.row_stats.total_rows
));
report.push_str(&format!(
"- Live rows: {}\n",
self.statistics.row_stats.live_rows
));
report.push_str(&format!(
"- Tombstones: {}\n",
self.statistics.row_stats.tombstone_count
));
report.push_str(&format!(
"- Partitions: {}\n",
self.statistics.row_stats.partition_count
));
report.push_str(&format!(
"- Average rows per partition: {:.1}\n\n",
self.statistics.row_stats.avg_rows_per_partition
));
if self.statistics.timestamp_stats.min_timestamp != 0 {
let min_time = chrono::DateTime::from_timestamp_micros(
self.statistics.timestamp_stats.min_timestamp,
);
let max_time = chrono::DateTime::from_timestamp_micros(
self.statistics.timestamp_stats.max_timestamp,
);
report.push_str("## Timestamp Range\n");
if let (Some(min), Some(max)) = (min_time, max_time) {
report.push_str(&format!(
"- From: {}\n",
min.format("%Y-%m-%d %H:%M:%S UTC")
));
report.push_str(&format!("- To: {}\n", max.format("%Y-%m-%d %H:%M:%S UTC")));
} else {
report.push_str(&format!(
"- Min timestamp: {}\n",
self.statistics.timestamp_stats.min_timestamp
));
report.push_str(&format!(
"- Max timestamp: {}\n",
self.statistics.timestamp_stats.max_timestamp
));
}
if self.has_ttl_data() {
report.push_str(&format!(
"- Rows with TTL: {}\n",
self.statistics.timestamp_stats.rows_with_ttl
));
}
report.push('\n');
}
report.push_str("## Compression\n");
report.push_str(&format!(
"- Algorithm: {}\n",
self.statistics.compression_stats.algorithm
));
report.push_str(&format!(
"- Original size: {:.2} MB\n",
self.statistics.compression_stats.original_size as f64 / 1_048_576.0
));
report.push_str(&format!(
"- Compressed size: {:.2} MB\n",
self.statistics.compression_stats.compressed_size as f64 / 1_048_576.0
));
report.push_str(&format!(
"- Ratio: {:.2}%\n",
self.statistics.compression_stats.ratio * 100.0
));
report.push_str(&format!(
"- Speed: {:.1} MB/s (compress), {:.1} MB/s (decompress)\n\n",
self.statistics.compression_stats.compression_speed,
self.statistics.compression_stats.decompression_speed
));
report.push_str("## Partition Distribution\n");
report.push_str(&format!(
"- Average size: {:.2} KB\n",
self.statistics.partition_stats.avg_partition_size / 1024.0
));
report.push_str(&format!(
"- Range: {:.2} KB - {:.2} MB\n",
self.statistics.partition_stats.min_partition_size as f64 / 1024.0,
self.statistics.partition_stats.max_partition_size as f64 / 1_048_576.0
));
report.push_str(&format!(
"- Large partitions (>1MB): {:.1}%\n\n",
self.statistics.partition_stats.large_partition_percentage
));
if include_column_details && !self.statistics.column_stats.is_empty() {
report.push_str("## Column Statistics\n");
for column in &self.statistics.column_stats {
report.push_str(&format!("### {}\n", column.name));
report.push_str(&format!("- Type: {}\n", column.column_type));
report.push_str(&format!("- Values: {}\n", column.value_count));
report.push_str(&format!("- Nulls: {}\n", column.null_count));
report.push_str(&format!("- Average size: {:.1} bytes\n", column.avg_size));
report.push_str(&format!("- Cardinality: {}\n", column.cardinality));
if column.has_index {
report.push_str("- **Indexed**: Yes\n");
}
report.push('\n');
}
}
if !summary.query_performance_hints.is_empty() {
report.push_str("## Query Performance Hints\n");
for hint in &summary.query_performance_hints {
report.push_str(&format!("- {}\n", hint));
}
report.push('\n');
}
if !summary.storage_recommendations.is_empty() {
report.push_str("## Storage Recommendations\n");
for rec in &summary.storage_recommendations {
report.push_str(&format!("- {}\n", rec));
}
report.push('\n');
}
report
}
pub fn compact_summary(&self) -> String {
let summary = self.analyze();
format!(
"Rows: {} ({:.1}% live) | Compression: {:.1}% | Health: {:.0}/100 | Size: {:.2} MB",
summary.total_rows,
summary.live_data_percentage,
summary.compression_efficiency,
summary.health_score,
self.statistics.table_stats.disk_size as f64 / 1_048_576.0
)
}
}
pub async fn find_statistics_file(data_db_path: &Path) -> Option<PathBuf> {
if let Some(parent) = data_db_path.parent() {
if let Some(stem) = data_db_path.file_stem() {
if let Some(stem_str) = stem.to_str() {
let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
let stats_path = parent.join(stats_name);
if tokio::fs::metadata(&stats_path).await.is_ok() {
return Some(stats_path);
}
}
}
}
None
}
pub async fn check_statistics_availability(sstable_dir: &Path) -> Result<Vec<PathBuf>> {
let mut stats_files = Vec::new();
let mut dir_entries = tokio::fs::read_dir(sstable_dir).await?;
while let Some(entry) = dir_entries.next_entry().await? {
let path = entry.path();
if let Some(file_name) = path.file_name() {
if let Some(name_str) = file_name.to_str() {
if name_str.contains("-Statistics.db") {
stats_files.push(path);
}
}
}
}
Ok(stats_files)
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_statistics_reader_creation() {
assert!(true);
}
#[tokio::test]
async fn test_find_statistics_file() {
use std::path::PathBuf;
let data_path = PathBuf::from("/path/to/sstables/users-123abc-Data.db");
if let Some(_parent) = data_path.parent() {
if let Some(stem) = data_path.file_stem() {
if let Some(stem_str) = stem.to_str() {
let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
assert_eq!(stats_name, "users-123abc-Statistics.db");
}
}
}
}
}