cqlite_core/storage/sstable/
statistics_reader.rs1use crate::{
7 error::{Error, Result},
8 parser::enhanced_statistics_parser::parse_statistics_with_fallback,
9 parser::statistics::{SSTableStatistics, StatisticsAnalyzer, StatisticsSummary},
10 platform::Platform,
11};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use tokio::fs::File;
15use tokio::io::AsyncReadExt;
16
17fn crc32_checksum(data: &[u8]) -> u32 {
19 let mut crc = 0xffff_ffff_u32;
20
21 for &byte in data {
22 crc ^= byte as u32;
23 for _ in 0..8 {
24 if crc & 1 != 0 {
25 crc = (crc >> 1) ^ 0xedb8_8320_u32;
26 } else {
27 crc >>= 1;
28 }
29 }
30 }
31
32 !crc
33}
34
35pub struct StatisticsReader {
37 file_path: PathBuf,
39 statistics: SSTableStatistics,
41 #[allow(dead_code)]
43 platform: Arc<Platform>,
44}
45
46impl StatisticsReader {
47 pub async fn open(path: &Path, platform: Arc<Platform>) -> Result<Self> {
49 if !platform.fs().exists(path).await? {
50 return Err(Error::not_found(format!(
51 "Statistics.db file not found: {}",
52 path.display()
53 )));
54 }
55
56 let mut file = File::open(path).await?;
58 let mut buffer = Vec::new();
59 file.read_to_end(&mut buffer).await?;
60
61 let statistics = match parse_statistics_with_fallback(&buffer, None) {
66 Ok((_, stats)) => stats,
67 Err(e) => {
68 return Err(Error::corruption(format!(
69 "Failed to parse Statistics.db with enhanced parser: {:?}",
70 e
71 )));
72 }
73 };
74
75 if statistics.header.checksum != 0 {
89 }
91
92 Ok(Self {
93 file_path: path.to_path_buf(),
94 statistics,
95 platform,
96 })
97 }
98
99 pub fn statistics(&self) -> &SSTableStatistics {
101 &self.statistics
102 }
103
104 pub fn analyze(&self) -> StatisticsSummary {
106 StatisticsAnalyzer::analyze(&self.statistics)
107 }
108
109 pub fn file_path(&self) -> &Path {
111 &self.file_path
112 }
113
114 pub async fn validate_checksum(&self) -> Result<bool> {
116 let mut file = File::open(&self.file_path).await?;
118 let mut buffer = Vec::new();
119 file.read_to_end(&mut buffer).await?;
120
121 if buffer.len() < 4 {
122 return Err(Error::corruption(
123 "Statistics file too small for checksum validation".to_string(),
124 ));
125 }
126
127 let stored_checksum = self.statistics.header.checksum;
129
130 let data_section = if buffer.len() >= 32 {
132 &buffer[28..buffer.len() - 4] } else {
134 return Err(Error::corruption(
135 "Invalid Statistics file format for checksum validation".to_string(),
136 ));
137 };
138
139 let calculated_checksum = crc32_checksum(data_section);
140
141 Ok(calculated_checksum == stored_checksum)
142 }
143
144 pub fn matches_table(&self, table_id: &[u8; 16]) -> bool {
146 if let Some(ref stats_table_id) = self.statistics.header.table_id {
147 stats_table_id == table_id
148 } else {
149 false }
151 }
152
153 pub fn row_count(&self) -> u64 {
155 self.statistics.row_stats.total_rows
156 }
157
158 pub fn live_row_count(&self) -> u64 {
160 self.statistics.row_stats.live_rows
161 }
162
163 pub fn timestamp_range(&self) -> (i64, i64) {
165 (
166 self.statistics.timestamp_stats.min_timestamp,
167 self.statistics.timestamp_stats.max_timestamp,
168 )
169 }
170
171 pub fn compression_info(&self) -> (&str, f64) {
173 (
174 &self.statistics.compression_stats.algorithm,
175 self.statistics.compression_stats.ratio,
176 )
177 }
178
179 pub fn partition_info(&self) -> (u64, f64, u64) {
181 (
182 self.statistics.partition_stats.min_partition_size,
183 self.statistics.partition_stats.avg_partition_size,
184 self.statistics.partition_stats.max_partition_size,
185 )
186 }
187
188 pub fn column_stats(
190 &self,
191 column_name: &str,
192 ) -> Option<&crate::parser::statistics::ColumnStatistics> {
193 self.statistics
194 .column_stats
195 .iter()
196 .find(|col| col.name == column_name)
197 }
198
199 pub fn column_names(&self) -> Vec<&str> {
201 self.statistics
202 .column_stats
203 .iter()
204 .map(|col| col.name.as_str())
205 .collect()
206 }
207
208 pub fn has_ttl_data(&self) -> bool {
210 self.statistics.timestamp_stats.rows_with_ttl > 0
211 }
212
213 pub fn disk_usage(&self) -> (u64, u64, f64) {
215 (
216 self.statistics.table_stats.compressed_size,
217 self.statistics.table_stats.uncompressed_size,
218 self.statistics.table_stats.compression_ratio,
219 )
220 }
221
222 pub fn generate_report(&self, include_column_details: bool) -> String {
224 let mut report = String::new();
225 let summary = self.analyze();
226
227 report.push_str("# SSTable Statistics Report\n\n");
228
229 report.push_str("## Overview\n");
231 report.push_str(&format!("- **Total Rows**: {}\n", summary.total_rows));
232 report.push_str(&format!(
233 "- **Live Data**: {:.2}%\n",
234 summary.live_data_percentage
235 ));
236 report.push_str(&format!(
237 "- **Compression Efficiency**: {:.2}%\n",
238 summary.compression_efficiency
239 ));
240 report.push_str(&format!(
241 "- **Time Range**: {:.1} days\n",
242 summary.timestamp_range_days
243 ));
244 report.push_str(&format!(
245 "- **Largest Partition**: {:.2} MB\n",
246 summary.largest_partition_mb
247 ));
248 report.push_str(&format!(
249 "- **Health Score**: {:.1}/100\n\n",
250 summary.health_score
251 ));
252
253 report.push_str("## Row Statistics\n");
255 report.push_str(&format!(
256 "- Total rows: {}\n",
257 self.statistics.row_stats.total_rows
258 ));
259 report.push_str(&format!(
260 "- Live rows: {}\n",
261 self.statistics.row_stats.live_rows
262 ));
263 report.push_str(&format!(
264 "- Tombstones: {}\n",
265 self.statistics.row_stats.tombstone_count
266 ));
267 report.push_str(&format!(
268 "- Partitions: {}\n",
269 self.statistics.row_stats.partition_count
270 ));
271 report.push_str(&format!(
272 "- Average rows per partition: {:.1}\n\n",
273 self.statistics.row_stats.avg_rows_per_partition
274 ));
275
276 if self.statistics.timestamp_stats.min_timestamp != 0 {
278 let min_time = chrono::DateTime::from_timestamp_micros(
279 self.statistics.timestamp_stats.min_timestamp,
280 );
281 let max_time = chrono::DateTime::from_timestamp_micros(
282 self.statistics.timestamp_stats.max_timestamp,
283 );
284
285 report.push_str("## Timestamp Range\n");
286 if let (Some(min), Some(max)) = (min_time, max_time) {
287 report.push_str(&format!(
288 "- From: {}\n",
289 min.format("%Y-%m-%d %H:%M:%S UTC")
290 ));
291 report.push_str(&format!("- To: {}\n", max.format("%Y-%m-%d %H:%M:%S UTC")));
292 } else {
293 report.push_str(&format!(
294 "- Min timestamp: {}\n",
295 self.statistics.timestamp_stats.min_timestamp
296 ));
297 report.push_str(&format!(
298 "- Max timestamp: {}\n",
299 self.statistics.timestamp_stats.max_timestamp
300 ));
301 }
302
303 if self.has_ttl_data() {
304 report.push_str(&format!(
305 "- Rows with TTL: {}\n",
306 self.statistics.timestamp_stats.rows_with_ttl
307 ));
308 }
309 report.push('\n');
310 }
311
312 report.push_str("## Compression\n");
314 report.push_str(&format!(
315 "- Algorithm: {}\n",
316 self.statistics.compression_stats.algorithm
317 ));
318 report.push_str(&format!(
319 "- Original size: {:.2} MB\n",
320 self.statistics.compression_stats.original_size as f64 / 1_048_576.0
321 ));
322 report.push_str(&format!(
323 "- Compressed size: {:.2} MB\n",
324 self.statistics.compression_stats.compressed_size as f64 / 1_048_576.0
325 ));
326 report.push_str(&format!(
327 "- Ratio: {:.2}%\n",
328 self.statistics.compression_stats.ratio * 100.0
329 ));
330 report.push_str(&format!(
331 "- Speed: {:.1} MB/s (compress), {:.1} MB/s (decompress)\n\n",
332 self.statistics.compression_stats.compression_speed,
333 self.statistics.compression_stats.decompression_speed
334 ));
335
336 report.push_str("## Partition Distribution\n");
338 report.push_str(&format!(
339 "- Average size: {:.2} KB\n",
340 self.statistics.partition_stats.avg_partition_size / 1024.0
341 ));
342 report.push_str(&format!(
343 "- Range: {:.2} KB - {:.2} MB\n",
344 self.statistics.partition_stats.min_partition_size as f64 / 1024.0,
345 self.statistics.partition_stats.max_partition_size as f64 / 1_048_576.0
346 ));
347 report.push_str(&format!(
348 "- Large partitions (>1MB): {:.1}%\n\n",
349 self.statistics.partition_stats.large_partition_percentage
350 ));
351
352 if include_column_details && !self.statistics.column_stats.is_empty() {
354 report.push_str("## Column Statistics\n");
355 for column in &self.statistics.column_stats {
356 report.push_str(&format!("### {}\n", column.name));
357 report.push_str(&format!("- Type: {}\n", column.column_type));
358 report.push_str(&format!("- Values: {}\n", column.value_count));
359 report.push_str(&format!("- Nulls: {}\n", column.null_count));
360 report.push_str(&format!("- Average size: {:.1} bytes\n", column.avg_size));
361 report.push_str(&format!("- Cardinality: {}\n", column.cardinality));
362 if column.has_index {
363 report.push_str("- **Indexed**: Yes\n");
364 }
365 report.push('\n');
366 }
367 }
368
369 if !summary.query_performance_hints.is_empty() {
371 report.push_str("## Query Performance Hints\n");
372 for hint in &summary.query_performance_hints {
373 report.push_str(&format!("- {}\n", hint));
374 }
375 report.push('\n');
376 }
377
378 if !summary.storage_recommendations.is_empty() {
380 report.push_str("## Storage Recommendations\n");
381 for rec in &summary.storage_recommendations {
382 report.push_str(&format!("- {}\n", rec));
383 }
384 report.push('\n');
385 }
386
387 report
388 }
389
390 pub fn compact_summary(&self) -> String {
392 let summary = self.analyze();
393 format!(
394 "Rows: {} ({:.1}% live) | Compression: {:.1}% | Health: {:.0}/100 | Size: {:.2} MB",
395 summary.total_rows,
396 summary.live_data_percentage,
397 summary.compression_efficiency,
398 summary.health_score,
399 self.statistics.table_stats.disk_size as f64 / 1_048_576.0
400 )
401 }
402}
403
404pub async fn find_statistics_file(data_db_path: &Path) -> Option<PathBuf> {
406 if let Some(parent) = data_db_path.parent() {
407 if let Some(stem) = data_db_path.file_stem() {
408 if let Some(stem_str) = stem.to_str() {
409 let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
411 let stats_path = parent.join(stats_name);
412
413 if tokio::fs::metadata(&stats_path).await.is_ok() {
414 return Some(stats_path);
415 }
416 }
417 }
418 }
419 None
420}
421
422pub async fn check_statistics_availability(sstable_dir: &Path) -> Result<Vec<PathBuf>> {
424 let mut stats_files = Vec::new();
425
426 let mut dir_entries = tokio::fs::read_dir(sstable_dir).await?;
427 while let Some(entry) = dir_entries.next_entry().await? {
428 let path = entry.path();
429 if let Some(file_name) = path.file_name() {
430 if let Some(name_str) = file_name.to_str() {
431 if name_str.contains("-Statistics.db") {
432 stats_files.push(path);
433 }
434 }
435 }
436 }
437
438 Ok(stats_files)
439}
440
441#[cfg(test)]
442mod tests {
443
444 #[tokio::test]
445 async fn test_statistics_reader_creation() {
446 assert!(true);
449 }
450
451 #[tokio::test]
452 async fn test_find_statistics_file() {
453 use std::path::PathBuf;
454
455 let data_path = PathBuf::from("/path/to/sstables/users-123abc-Data.db");
456 if let Some(_parent) = data_path.parent() {
460 if let Some(stem) = data_path.file_stem() {
461 if let Some(stem_str) = stem.to_str() {
462 let stats_name = stem_str.replace("-Data", "-Statistics") + ".db";
463 assert_eq!(stats_name, "users-123abc-Statistics.db");
464 }
465 }
466 }
467 }
468}