Skip to main content

cqlite_core/parser/
statistics.rs

1//! Statistics.db parser for Cassandra 5+ SSTable format
2//!
3//! This module provides comprehensive parsing of Statistics.db files which contain
4//! detailed metadata about SSTable contents including row counts, min/max timestamps,
5//! column statistics, and other metadata for efficient query planning.
6
7use super::vint::{parse_vint, parse_vint_length};
8use crate::error::{Error, Result};
9use nom::{
10    bytes::complete::take,
11    multi::count,
12    number::complete::{be_f64, be_i64, be_u32, be_u64, be_u8},
13    IResult,
14};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17
18/// Statistics.db file header with version and metadata
19/// Updated to support both legacy and enhanced formats
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct StatisticsHeader {
22    /// Format version/type identifier
23    pub version: u32,
24    /// Statistics type/kind identifier (for enhanced format) or table_id (legacy)
25    pub statistics_kind: u32,
26    /// Data length or offset
27    pub data_length: u32,
28    /// Additional metadata field
29    pub metadata1: u32,
30    /// Additional metadata field
31    pub metadata2: u32,
32    /// Additional metadata field
33    pub metadata3: u32,
34    /// CRC32 checksum of the statistics data
35    pub checksum: u32,
36    /// Table UUID for validation (optional for enhanced format)
37    pub table_id: Option<[u8; 16]>,
38}
39
40/// Comprehensive SSTable statistics extracted from Statistics.db
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SSTableStatistics {
43    /// Header information
44    pub header: StatisticsHeader,
45    /// Row count statistics
46    pub row_stats: RowStatistics,
47    /// Timestamp range information
48    pub timestamp_stats: TimestampStatistics,
49    /// Column-level statistics
50    pub column_stats: Vec<ColumnStatistics>,
51    /// Table-level aggregated statistics
52    pub table_stats: TableStatistics,
53    /// Partition size distribution
54    pub partition_stats: PartitionStatistics,
55    /// Compression statistics
56    pub compression_stats: CompressionStatistics,
57    /// Additional metadata
58    pub metadata: HashMap<String, String>,
59    /// SerializationHeader columns (Issue #163)
60    ///
61    /// Column definitions parsed from SerializationHeader embedded in nb-format
62    /// Statistics.db files. Used for schema extraction in V5CompressedLegacy format.
63    /// Empty if SerializationHeader not found in Statistics.db.
64    #[serde(default)]
65    pub serialization_header_columns: Vec<super::header::ColumnInfo>,
66    /// Partition key definitions extracted from SerializationHeader (Issue #195)
67    #[serde(default)]
68    pub serialization_header_partition_keys: Vec<super::header::ColumnInfo>,
69    /// Clustering key definitions extracted from SerializationHeader (Issue #195)
70    #[serde(default)]
71    pub serialization_header_clustering_keys: Vec<super::header::ColumnInfo>,
72}
73
74/// Row count and distribution statistics
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct RowStatistics {
77    /// Total number of rows in the SSTable
78    pub total_rows: u64,
79    /// Number of live (non-tombstone) rows
80    pub live_rows: u64,
81    /// Number of tombstone markers
82    pub tombstone_count: u64,
83    /// Estimated number of partitions
84    pub partition_count: u64,
85    /// Average rows per partition
86    pub avg_rows_per_partition: f64,
87    /// Row size distribution histogram
88    pub row_size_histogram: Vec<RowSizeBucket>,
89}
90
91/// Timestamp range and TTL statistics
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct TimestampStatistics {
94    /// Minimum timestamp in the SSTable (microseconds since epoch)
95    pub min_timestamp: i64,
96    /// Maximum timestamp in the SSTable (microseconds since epoch)
97    pub max_timestamp: i64,
98    /// Minimum deletion time (for tombstones)
99    pub min_deletion_time: i64,
100    /// Maximum deletion time (for tombstones)
101    pub max_deletion_time: i64,
102    /// Minimum TTL value
103    pub min_ttl: Option<i64>,
104    /// Maximum TTL value
105    pub max_ttl: Option<i64>,
106    /// Number of rows with TTL
107    pub rows_with_ttl: u64,
108}
109
110/// Per-column statistics for query optimization
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ColumnStatistics {
113    /// Column name
114    pub name: String,
115    /// Column type (CQL type)
116    pub column_type: String,
117    /// Number of non-null values
118    pub value_count: u64,
119    /// Number of null values
120    pub null_count: u64,
121    /// Minimum value (serialized as bytes)
122    pub min_value: Option<Vec<u8>>,
123    /// Maximum value (serialized as bytes)
124    pub max_value: Option<Vec<u8>>,
125    /// Average serialized size in bytes
126    pub avg_size: f64,
127    /// Estimated cardinality (distinct values)
128    pub cardinality: u64,
129    /// Value frequency histogram for common values
130    pub value_histogram: Vec<ValueFrequency>,
131    /// Whether this column has an index
132    pub has_index: bool,
133}
134
135/// Table-level aggregated statistics
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct TableStatistics {
138    /// Total disk space used by the SSTable
139    pub disk_size: u64,
140    /// Uncompressed size
141    pub uncompressed_size: u64,
142    /// Compressed size
143    pub compressed_size: u64,
144    /// Compression ratio
145    pub compression_ratio: f64,
146    /// Number of blocks in the SSTable
147    pub block_count: u64,
148    /// Average block size
149    pub avg_block_size: f64,
150    /// Index size in bytes
151    pub index_size: u64,
152    /// Bloom filter size in bytes
153    pub bloom_filter_size: u64,
154    /// Number of levels in LSM tree
155    pub level_count: u32,
156}
157
158/// Partition size distribution for efficient range queries
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct PartitionStatistics {
161    /// Average partition size in bytes
162    pub avg_partition_size: f64,
163    /// Minimum partition size
164    pub min_partition_size: u64,
165    /// Maximum partition size
166    pub max_partition_size: u64,
167    /// Partition size distribution
168    pub size_histogram: Vec<PartitionSizeBucket>,
169    /// Percentage of large partitions (>1MB)
170    pub large_partition_percentage: f64,
171}
172
173/// Compression algorithm performance statistics
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct CompressionStatistics {
176    /// Compression algorithm used
177    pub algorithm: String,
178    /// Original size before compression
179    pub original_size: u64,
180    /// Compressed size
181    pub compressed_size: u64,
182    /// Compression ratio (compressed/original)
183    pub ratio: f64,
184    /// Compression speed in MB/s
185    pub compression_speed: f64,
186    /// Decompression speed in MB/s
187    pub decompression_speed: f64,
188    /// Number of compressed blocks
189    pub compressed_blocks: u64,
190}
191
192/// Row size distribution bucket
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct RowSizeBucket {
195    /// Size range start (inclusive)
196    pub size_start: u64,
197    /// Size range end (exclusive)
198    pub size_end: u64,
199    /// Number of rows in this bucket
200    pub count: u64,
201    /// Percentage of total rows
202    pub percentage: f64,
203}
204
205/// Value frequency information for column statistics
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct ValueFrequency {
208    /// Serialized value (truncated for large values)
209    pub value: Vec<u8>,
210    /// Number of occurrences
211    pub frequency: u64,
212    /// Percentage of total non-null values
213    pub percentage: f64,
214}
215
216/// Partition size distribution bucket
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct PartitionSizeBucket {
219    /// Size range start (inclusive)
220    pub size_start: u64,
221    /// Size range end (exclusive)
222    pub size_end: u64,
223    /// Number of partitions in this bucket
224    pub count: u64,
225    /// Cumulative percentage
226    pub cumulative_percentage: f64,
227}
228
229/// Parse the complete Statistics.db file
230pub fn parse_statistics_file(input: &[u8]) -> IResult<&[u8], SSTableStatistics> {
231    let (input, header) = parse_statistics_header(input)?;
232    let (input, row_stats) = parse_row_statistics(input)?;
233    let (input, timestamp_stats) = parse_timestamp_statistics(input)?;
234    let (input, column_stats) = parse_column_statistics(input, header.data_length)?;
235    let (input, table_stats) = parse_table_statistics(input)?;
236    let (input, partition_stats) = parse_partition_statistics(input)?;
237    let (input, compression_stats) = parse_compression_statistics(input)?;
238    let (input, metadata) = parse_metadata_section(input)?;
239
240    Ok((
241        input,
242        SSTableStatistics {
243            header,
244            row_stats,
245            timestamp_stats,
246            column_stats,
247            table_stats,
248            partition_stats,
249            compression_stats,
250            metadata,
251            serialization_header_columns: vec![], // Not available in legacy format
252            serialization_header_partition_keys: vec![],
253            serialization_header_clustering_keys: vec![],
254        },
255    ))
256}
257
258/// Parse the Statistics.db file header with authoritative format detection
259///
260/// Statistics.db format is definitively identified by the version field:
261/// - **Version 4**: 'nb' (new big) format - Cassandra 5.0+ enhanced statistics
262///     - Structure: version(4) + statistics_kind(4) + reserved(4) + data_length(4) +
263///       metadata1(4) + metadata2(4) + metadata3(4) + checksum(4) = 32 bytes
264///     - Authoritative marker: version == 4
265///     - Used by: Cassandra 5.0+ with 'nb' SSTable format
266///
267/// - **Versions 1-3**: Legacy format - pre-Cassandra 5.0 statistics
268///     - Structure: version(4) + table_id(16) + section_count(4) + file_size(8) + checksum(4) = 36 bytes
269///     - Authoritative marker: version in range 1..=3
270///     - Used by: Cassandra 3.x and 4.x
271///
272/// Any other version number is unsupported and results in a parse error.
273pub fn parse_statistics_header(input: &[u8]) -> IResult<&[u8], StatisticsHeader> {
274    let (remaining, version) = be_u32(input)?;
275
276    match version {
277        // nb-format: Cassandra 5.0+ enhanced statistics (version 4)
278        // This is the authoritative format identifier - no heuristics needed
279        4 => parse_nb_format_header(remaining, version),
280
281        // Legacy format: Cassandra 3.x/4.x statistics (versions 1-3)
282        // Definitively identified by version range
283        1..=3 => parse_legacy_format_header(remaining, version),
284
285        // Unknown/unsupported version - fail explicitly
286        // This ensures we never silently misparse corrupt or future formats
287        _ => Err(nom::Err::Error(nom::error::Error::new(
288            input,
289            nom::error::ErrorKind::Verify,
290        ))),
291    }
292}
293
294/// Parse nb-format (version 4) Statistics.db header
295///
296/// Format structure (Cassandra 5.0+):
297/// ```text
298/// [0..4]   version: u32          = 4 (nb-format identifier)
299/// [4..8]   statistics_kind: u32  (statistics type/kind identifier)
300/// [8..12]  reserved: u32         (reserved field, typically 0)
301/// [12..16] data_length: u32      (length of statistics data section)
302/// [16..20] metadata1: u32        (metadata field 1)
303/// [20..24] metadata2: u32        (metadata field 2)
304/// [24..28] metadata3: u32        (metadata field 3)
305/// [28..32] checksum: u32         (CRC32 checksum)
306/// ```
307fn parse_nb_format_header(input: &[u8], version: u32) -> IResult<&[u8], StatisticsHeader> {
308    let (input, statistics_kind) = be_u32(input)?;
309    let (input, _reserved) = be_u32(input)?;
310    let (input, data_length) = be_u32(input)?;
311    let (input, metadata1) = be_u32(input)?;
312    let (input, metadata2) = be_u32(input)?;
313    let (input, metadata3) = be_u32(input)?;
314    let (input, checksum) = be_u32(input)?;
315
316    Ok((
317        input,
318        StatisticsHeader {
319            version,
320            statistics_kind,
321            data_length,
322            metadata1,
323            metadata2,
324            metadata3,
325            checksum,
326            table_id: None, // nb-format does not include table_id in header
327        },
328    ))
329}
330
331/// Parse legacy format (versions 1-3) Statistics.db header
332///
333/// Format structure (Cassandra 3.x/4.x):
334/// ```text
335/// [0..4]   version: u32          = 1, 2, or 3 (legacy format identifier)
336/// [4..20]  table_id: [u8; 16]    (UUID of the table)
337/// [20..24] section_count: u32    (number of statistics sections)
338/// [24..32] file_size: u64        (total file size)
339/// [32..36] checksum: u32         (CRC32 checksum)
340/// ```
341fn parse_legacy_format_header(input: &[u8], version: u32) -> IResult<&[u8], StatisticsHeader> {
342    let (input, table_id_raw) = take(16u8)(input)?;
343    let mut table_id_array = [0u8; 16];
344    table_id_array.copy_from_slice(table_id_raw);
345
346    let (input, section_count) = be_u32(input)?;
347    let (input, file_size) = be_u64(input)?;
348    let (input, checksum) = be_u32(input)?;
349
350    Ok((
351        input,
352        StatisticsHeader {
353            version,
354            statistics_kind: 0, // Not used in legacy format
355            data_length: section_count,
356            metadata1: (file_size >> 32) as u32,
357            metadata2: file_size as u32,
358            metadata3: 0,
359            checksum,
360            table_id: Some(table_id_array),
361        },
362    ))
363}
364
365/// Parse row count and distribution statistics
366pub fn parse_row_statistics(input: &[u8]) -> IResult<&[u8], RowStatistics> {
367    let (input, total_rows) = parse_vint_as_u64(input)?;
368    let (input, live_rows) = parse_vint_as_u64(input)?;
369    let (input, tombstone_count) = parse_vint_as_u64(input)?;
370    let (input, partition_count) = parse_vint_as_u64(input)?;
371    let (input, avg_rows_per_partition) = be_f64(input)?;
372    let (input, histogram_count) = be_u32(input)?;
373    let (input, row_size_histogram) =
374        count(parse_row_size_bucket, histogram_count as usize)(input)?;
375
376    Ok((
377        input,
378        RowStatistics {
379            total_rows,
380            live_rows,
381            tombstone_count,
382            partition_count,
383            avg_rows_per_partition,
384            row_size_histogram,
385        },
386    ))
387}
388
389/// Parse timestamp range statistics
390pub fn parse_timestamp_statistics(input: &[u8]) -> IResult<&[u8], TimestampStatistics> {
391    let (input, min_timestamp) = be_i64(input)?;
392    let (input, max_timestamp) = be_i64(input)?;
393    let (input, min_deletion_time) = be_i64(input)?;
394    let (input, max_deletion_time) = be_i64(input)?;
395    let (input, has_ttl) = be_u8(input)?;
396    let (input, min_ttl, max_ttl, rows_with_ttl) = if has_ttl != 0 {
397        let (input, min_ttl) = be_i64(input)?;
398        let (input, max_ttl) = be_i64(input)?;
399        let (input, rows_with_ttl) = parse_vint_as_u64(input)?;
400        (input, Some(min_ttl), Some(max_ttl), rows_with_ttl)
401    } else {
402        (input, None, None, 0)
403    };
404
405    Ok((
406        input,
407        TimestampStatistics {
408            min_timestamp,
409            max_timestamp,
410            min_deletion_time,
411            max_deletion_time,
412            min_ttl,
413            max_ttl,
414            rows_with_ttl,
415        },
416    ))
417}
418
419/// Parse column-level statistics
420pub fn parse_column_statistics(
421    input: &[u8],
422    column_count: u32,
423) -> IResult<&[u8], Vec<ColumnStatistics>> {
424    count(parse_single_column_statistics, column_count as usize)(input)
425}
426
427/// Parse statistics for a single column
428pub fn parse_single_column_statistics(input: &[u8]) -> IResult<&[u8], ColumnStatistics> {
429    let (input, name_len) = parse_vint_length(input)?;
430    let (input, name_bytes) = take(name_len)(input)?;
431    let name = String::from_utf8_lossy(name_bytes).to_string();
432
433    let (input, type_len) = parse_vint_length(input)?;
434    let (input, type_bytes) = take(type_len)(input)?;
435    let column_type = String::from_utf8_lossy(type_bytes).to_string();
436
437    let (input, value_count) = parse_vint_as_u64(input)?;
438    let (input, null_count) = parse_vint_as_u64(input)?;
439
440    let (input, has_min_max) = be_u8(input)?;
441    let (input, min_value, max_value) = if has_min_max != 0 {
442        let (input, min_len) = parse_vint_length(input)?;
443        let (input, min_bytes) = take(min_len)(input)?;
444        let (input, max_len) = parse_vint_length(input)?;
445        let (input, max_bytes) = take(max_len)(input)?;
446        (input, Some(min_bytes.to_vec()), Some(max_bytes.to_vec()))
447    } else {
448        (input, None, None)
449    };
450
451    let (input, avg_size) = be_f64(input)?;
452    let (input, cardinality) = parse_vint_as_u64(input)?;
453
454    let (input, histogram_count) = be_u32(input)?;
455    let (input, value_histogram) = count(parse_value_frequency, histogram_count as usize)(input)?;
456
457    let (input, has_index) = be_u8(input)?;
458
459    Ok((
460        input,
461        ColumnStatistics {
462            name,
463            column_type,
464            value_count,
465            null_count,
466            min_value,
467            max_value,
468            avg_size,
469            cardinality,
470            value_histogram,
471            has_index: has_index != 0,
472        },
473    ))
474}
475
476/// Parse table-level statistics
477pub fn parse_table_statistics(input: &[u8]) -> IResult<&[u8], TableStatistics> {
478    let (input, disk_size) = be_u64(input)?;
479    let (input, uncompressed_size) = be_u64(input)?;
480    let (input, compression_ratio) = be_f64(input)?;
481    let (input, block_count) = parse_vint_as_u64(input)?;
482    let (input, avg_block_size) = be_f64(input)?;
483    let (input, index_size) = be_u64(input)?;
484    let (input, bloom_filter_size) = be_u64(input)?;
485    let (input, level_count) = be_u32(input)?;
486
487    Ok((
488        input,
489        TableStatistics {
490            disk_size,
491            uncompressed_size,
492            compressed_size: disk_size, // For now, assume disk_size is compressed_size
493            compression_ratio,
494            block_count,
495            avg_block_size,
496            index_size,
497            bloom_filter_size,
498            level_count,
499        },
500    ))
501}
502
503/// Parse partition size distribution statistics
504pub fn parse_partition_statistics(input: &[u8]) -> IResult<&[u8], PartitionStatistics> {
505    let (input, avg_partition_size) = be_f64(input)?;
506    let (input, min_partition_size) = be_u64(input)?;
507    let (input, max_partition_size) = be_u64(input)?;
508    let (input, large_partition_percentage) = be_f64(input)?;
509
510    let (input, histogram_count) = be_u32(input)?;
511    let (input, size_histogram) =
512        count(parse_partition_size_bucket, histogram_count as usize)(input)?;
513
514    Ok((
515        input,
516        PartitionStatistics {
517            avg_partition_size,
518            min_partition_size,
519            max_partition_size,
520            size_histogram,
521            large_partition_percentage,
522        },
523    ))
524}
525
526/// Parse compression performance statistics
527pub fn parse_compression_statistics(input: &[u8]) -> IResult<&[u8], CompressionStatistics> {
528    let (input, algorithm_len) = parse_vint_length(input)?;
529    let (input, algorithm_bytes) = take(algorithm_len)(input)?;
530    let algorithm = String::from_utf8_lossy(algorithm_bytes).to_string();
531
532    let (input, original_size) = be_u64(input)?;
533    let (input, compressed_size) = be_u64(input)?;
534    let (input, ratio) = be_f64(input)?;
535    let (input, compression_speed) = be_f64(input)?;
536    let (input, decompression_speed) = be_f64(input)?;
537    let (input, compressed_blocks) = parse_vint_as_u64(input)?;
538
539    Ok((
540        input,
541        CompressionStatistics {
542            algorithm,
543            original_size,
544            compressed_size,
545            ratio,
546            compression_speed,
547            decompression_speed,
548            compressed_blocks,
549        },
550    ))
551}
552
553/// Parse additional metadata section
554pub fn parse_metadata_section(input: &[u8]) -> IResult<&[u8], HashMap<String, String>> {
555    let (input, metadata_count) = be_u32(input)?;
556    let mut metadata = HashMap::new();
557
558    let mut remaining = input;
559    for _ in 0..metadata_count {
560        let (next, key_len) = parse_vint_length(remaining)?;
561        let (next, key_bytes) = take(key_len)(next)?;
562        let key = String::from_utf8_lossy(key_bytes).to_string();
563
564        let (next, value_len) = parse_vint_length(next)?;
565        let (next, value_bytes) = take(value_len)(next)?;
566        let value = String::from_utf8_lossy(value_bytes).to_string();
567
568        metadata.insert(key, value);
569        remaining = next;
570    }
571
572    Ok((remaining, metadata))
573}
574
575/// Parse a row size histogram bucket
576pub fn parse_row_size_bucket(input: &[u8]) -> IResult<&[u8], RowSizeBucket> {
577    let (input, size_start) = parse_vint_as_u64(input)?;
578    let (input, size_end) = parse_vint_as_u64(input)?;
579    let (input, count) = parse_vint_as_u64(input)?;
580    let (input, percentage) = be_f64(input)?;
581
582    Ok((
583        input,
584        RowSizeBucket {
585            size_start,
586            size_end,
587            count,
588            percentage,
589        },
590    ))
591}
592
593/// Parse a partition size histogram bucket
594pub fn parse_partition_size_bucket(input: &[u8]) -> IResult<&[u8], PartitionSizeBucket> {
595    let (input, size_start) = parse_vint_as_u64(input)?;
596    let (input, size_end) = parse_vint_as_u64(input)?;
597    let (input, count) = parse_vint_as_u64(input)?;
598    let (input, cumulative_percentage) = be_f64(input)?;
599
600    Ok((
601        input,
602        PartitionSizeBucket {
603            size_start,
604            size_end,
605            count,
606            cumulative_percentage,
607        },
608    ))
609}
610
611/// Parse a value frequency entry
612pub fn parse_value_frequency(input: &[u8]) -> IResult<&[u8], ValueFrequency> {
613    let (input, value_len) = parse_vint_length(input)?;
614    let (input, value_bytes) = take(value_len)(input)?;
615    let (input, frequency) = parse_vint_as_u64(input)?;
616    let (input, percentage) = be_f64(input)?;
617
618    Ok((
619        input,
620        ValueFrequency {
621            value: value_bytes.to_vec(),
622            frequency,
623            percentage,
624        },
625    ))
626}
627
628/// Helper function to parse VInt as u64
629fn parse_vint_as_u64(input: &[u8]) -> IResult<&[u8], u64> {
630    let (input, value) = parse_vint(input)?;
631    Ok((input, value as u64))
632}
633
634/// Statistics analyzer for enhanced reporting
635pub struct StatisticsAnalyzer;
636
637impl StatisticsAnalyzer {
638    /// Analyze statistics and generate human-readable summary
639    pub fn analyze(stats: &SSTableStatistics) -> StatisticsSummary {
640        let data_efficiency = Self::calculate_data_efficiency(stats);
641        let query_performance_hints = Self::generate_query_hints(stats);
642        let storage_recommendations = Self::generate_storage_recommendations(stats);
643        let health_score = Self::calculate_health_score(stats);
644
645        StatisticsSummary {
646            total_rows: stats.row_stats.total_rows,
647            live_data_percentage: (stats.row_stats.live_rows as f64
648                / stats.row_stats.total_rows as f64)
649                * 100.0,
650            compression_efficiency: stats.compression_stats.ratio * 100.0,
651            timestamp_range_days: Self::calculate_timestamp_range_days(stats),
652            largest_partition_mb: stats.partition_stats.max_partition_size as f64 / 1_048_576.0,
653            data_efficiency,
654            query_performance_hints,
655            storage_recommendations,
656            health_score,
657        }
658    }
659
660    fn calculate_data_efficiency(stats: &SSTableStatistics) -> f64 {
661        let live_ratio = stats.row_stats.live_rows as f64 / stats.row_stats.total_rows as f64;
662        let compression_ratio = stats.compression_stats.ratio;
663        let partition_efficiency = 1.0 - (stats.partition_stats.large_partition_percentage / 100.0);
664
665        (live_ratio + compression_ratio + partition_efficiency) / 3.0 * 100.0
666    }
667
668    fn generate_query_hints(stats: &SSTableStatistics) -> Vec<String> {
669        let mut hints = Vec::new();
670
671        if stats.partition_stats.large_partition_percentage > 10.0 {
672            hints.push("Consider reviewing partition key design - high percentage of large partitions detected".to_string());
673        }
674
675        if stats.row_stats.tombstone_count > stats.row_stats.live_rows / 4 {
676            hints.push("High tombstone ratio - consider running compaction".to_string());
677        }
678
679        if stats.table_stats.compression_ratio < 0.5 {
680            hints.push("Low compression ratio - data may not be well-suited for current compression algorithm".to_string());
681        }
682
683        hints
684    }
685
686    fn generate_storage_recommendations(stats: &SSTableStatistics) -> Vec<String> {
687        let mut recommendations = Vec::new();
688
689        if stats.table_stats.disk_size > 1_073_741_824 {
690            recommendations
691                .push("Large SSTable detected - consider more frequent compaction".to_string());
692        }
693
694        if stats.row_stats.avg_rows_per_partition < 10.0 {
695            recommendations.push(
696                "Low average rows per partition - partition key may be too granular".to_string(),
697            );
698        }
699
700        recommendations
701    }
702
703    fn calculate_health_score(stats: &SSTableStatistics) -> f64 {
704        let mut score = 100.0;
705
706        // Deduct for high tombstone ratio
707        let tombstone_ratio =
708            stats.row_stats.tombstone_count as f64 / stats.row_stats.total_rows as f64;
709        score -= tombstone_ratio * 30.0;
710
711        // Deduct for poor compression
712        if stats.compression_stats.ratio < 0.5 {
713            score -= 20.0;
714        }
715
716        // Deduct for large partitions
717        score -= stats.partition_stats.large_partition_percentage;
718
719        score.max(0.0)
720    }
721
722    fn calculate_timestamp_range_days(stats: &SSTableStatistics) -> f64 {
723        let range_micros =
724            stats.timestamp_stats.max_timestamp - stats.timestamp_stats.min_timestamp;
725        range_micros as f64 / (1_000_000.0 * 60.0 * 60.0 * 24.0)
726    }
727}
728
729/// Human-readable statistics summary
730#[derive(Debug, Clone, Serialize, Deserialize)]
731pub struct StatisticsSummary {
732    pub total_rows: u64,
733    pub live_data_percentage: f64,
734    pub compression_efficiency: f64,
735    pub timestamp_range_days: f64,
736    pub largest_partition_mb: f64,
737    pub data_efficiency: f64,
738    pub query_performance_hints: Vec<String>,
739    pub storage_recommendations: Vec<String>,
740    pub health_score: f64,
741}
742
743/// Serialize Statistics structure to bytes (for testing and validation)
744pub fn serialize_statistics(_stats: &SSTableStatistics) -> Result<Vec<u8>> {
745    // This would implement the reverse of parsing for complete round-trip testing
746    // For now, return an error indicating this is not implemented
747    Err(Error::corruption(
748        "Statistics serialization not yet implemented",
749    ))
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755
756    #[test]
757    fn test_statistics_header_parsing() {
758        let test_data = vec![
759            0x00, 0x00, 0x00, 0x01, // version = 1
760            // table_id (16 bytes)
761            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
762            0x0F, 0x10, 0x00, 0x00, 0x00, 0x05, // section_count = 5
763            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, // file_size = 4096
764            0x12, 0x34, 0x56, 0x78, // checksum
765        ];
766
767        let result = parse_statistics_header(&test_data);
768        assert!(result.is_ok());
769
770        let (_, header) = result.unwrap();
771        assert_eq!(header.version, 1);
772        // assert_eq!(header.section_count, 5); // Field not available
773        // assert_eq!(header.file_size, 4096); // Field not available
774        assert_eq!(header.checksum, 0x12345678);
775    }
776
777    #[test]
778    fn test_nb_format_authoritative_detection() {
779        // nb-format (version 4) - should parse as nb-format
780        let nb_data = vec![
781            0x00, 0x00, 0x00, 0x04, // version = 4 (authoritative nb-format marker)
782            0x26, 0x29, 0x1b, 0x05, // statistics_kind
783            0x00, 0x00, 0x00, 0x00, // reserved
784            0x00, 0x00, 0x00, 0x2c, // data_length = 44
785            0x00, 0x00, 0x00, 0x01, // metadata1 = 1
786            0x00, 0x00, 0x00, 0x65, // metadata2 = 101
787            0x00, 0x00, 0x00, 0x02, // metadata3 = 2
788            0x00, 0x00, 0x14, 0xd4, // checksum = 5332
789        ];
790
791        let result = parse_statistics_header(&nb_data);
792        assert!(result.is_ok());
793
794        let (_, header) = result.unwrap();
795        assert_eq!(header.version, 4);
796        assert_eq!(header.statistics_kind, 0x26291b05);
797        assert_eq!(header.data_length, 44);
798        assert!(header.table_id.is_none()); // nb-format has no table_id
799    }
800
801    #[test]
802    fn test_legacy_format_authoritative_detection() {
803        // Legacy format (version 2) - should parse as legacy
804        let legacy_data = vec![
805            0x00, 0x00, 0x00, 0x02, // version = 2 (authoritative legacy marker)
806            // table_id (16 bytes)
807            0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE,
808            0xFF, 0x00, 0x00, 0x00, 0x00, 0x0A, // section_count = 10
809            0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, // file_size = 65536
810            0xAB, 0xCD, 0xEF, 0x12, // checksum
811        ];
812
813        let result = parse_statistics_header(&legacy_data);
814        assert!(result.is_ok());
815
816        let (_, header) = result.unwrap();
817        assert_eq!(header.version, 2);
818        assert_eq!(header.statistics_kind, 0); // legacy format doesn't use this
819        assert!(header.table_id.is_some()); // legacy format has table_id
820    }
821
822    #[test]
823    fn test_unsupported_version_rejection() {
824        // Version 0 - should be rejected
825        let invalid_v0 = vec![
826            0x00, 0x00, 0x00, 0x00, // version = 0 (invalid)
827            0x00, 0x00, 0x00, 0x00, // ...rest doesn't matter
828        ];
829        assert!(parse_statistics_header(&invalid_v0).is_err());
830
831        // Version 5 - should be rejected (future/unknown version)
832        let invalid_v5 = vec![
833            0x00, 0x00, 0x00, 0x05, // version = 5 (unsupported)
834            0x00, 0x00, 0x00, 0x00, // ...rest doesn't matter
835        ];
836        assert!(parse_statistics_header(&invalid_v5).is_err());
837
838        // Version 255 - should be rejected
839        let invalid_v255 = vec![
840            0x00, 0x00, 0x00, 0xFF, // version = 255 (unsupported)
841            0x00, 0x00, 0x00, 0x00, // ...rest doesn't matter
842        ];
843        assert!(parse_statistics_header(&invalid_v255).is_err());
844    }
845
846    #[test]
847    fn test_no_heuristics_version_4_with_short_input() {
848        // Previous implementation used heuristic: version == 4 && input.len() >= 28
849        // New implementation uses ONLY version number - no length check heuristic
850        // This test ensures we don't fall back to legacy parsing with short input
851
852        let short_nb_data = vec![
853            0x00, 0x00, 0x00, 0x04, // version = 4 (authoritative nb-format)
854            0x26, 0x29, 0x1b, 0x05, // statistics_kind
855            0x00, 0x00, 0x00, 0x00, // reserved
856            0x00, 0x00, 0x00,
857            0x2c, // data_length = 44
858                  // Missing remaining fields - should fail parsing, not switch formats
859        ];
860
861        let result = parse_statistics_header(&short_nb_data);
862        // Should fail because version 4 DEFINITIVELY means nb-format
863        // and nb-format requires 32 bytes. This is NOT a heuristic,
864        // it's the authoritative format specification.
865        assert!(result.is_err());
866    }
867
868    #[test]
869    fn test_statistics_analyzer() {
870        let stats = create_test_statistics();
871        let summary = StatisticsAnalyzer::analyze(&stats);
872
873        assert!(summary.total_rows > 0);
874        assert!(summary.health_score >= 0.0 && summary.health_score <= 100.0);
875        assert!(summary.live_data_percentage >= 0.0 && summary.live_data_percentage <= 100.0);
876    }
877
878    #[test]
879    fn test_parse_row_statistics() {
880        use super::super::vint::encode_vint;
881
882        let mut data = Vec::new();
883        // total_rows: VInt
884        data.extend_from_slice(&encode_vint(1000));
885        // live_rows: VInt
886        data.extend_from_slice(&encode_vint(900));
887        // tombstone_count: VInt
888        data.extend_from_slice(&encode_vint(100));
889        // partition_count: VInt
890        data.extend_from_slice(&encode_vint(50));
891        // avg_rows_per_partition: f64
892        data.extend_from_slice(&20.0f64.to_be_bytes());
893        // histogram_count: u32
894        data.extend_from_slice(&0u32.to_be_bytes());
895
896        let result = parse_row_statistics(&data);
897        assert!(result.is_ok());
898
899        let (remaining, row_stats) = result.unwrap();
900        assert!(remaining.is_empty());
901        assert_eq!(row_stats.total_rows, 1000);
902        assert_eq!(row_stats.live_rows, 900);
903        assert_eq!(row_stats.tombstone_count, 100);
904        assert_eq!(row_stats.partition_count, 50);
905        assert_eq!(row_stats.avg_rows_per_partition, 20.0);
906        assert_eq!(row_stats.row_size_histogram.len(), 0);
907    }
908
909    #[test]
910    fn test_parse_row_statistics_with_histogram() {
911        use super::super::vint::encode_vint;
912
913        let mut data = Vec::new();
914        data.extend_from_slice(&encode_vint(1000));
915        data.extend_from_slice(&encode_vint(900));
916        data.extend_from_slice(&encode_vint(100));
917        data.extend_from_slice(&encode_vint(50));
918        data.extend_from_slice(&20.0f64.to_be_bytes());
919        // histogram_count: 2 buckets
920        data.extend_from_slice(&2u32.to_be_bytes());
921        // Bucket 1
922        data.extend_from_slice(&encode_vint(0)); // size_start
923        data.extend_from_slice(&encode_vint(1024)); // size_end
924        data.extend_from_slice(&encode_vint(500)); // count
925        data.extend_from_slice(&50.0f64.to_be_bytes()); // percentage
926                                                        // Bucket 2
927        data.extend_from_slice(&encode_vint(1024)); // size_start
928        data.extend_from_slice(&encode_vint(10240)); // size_end
929        data.extend_from_slice(&encode_vint(500)); // count
930        data.extend_from_slice(&50.0f64.to_be_bytes()); // percentage
931
932        let result = parse_row_statistics(&data);
933        assert!(result.is_ok());
934
935        let (_, row_stats) = result.unwrap();
936        assert_eq!(row_stats.row_size_histogram.len(), 2);
937        assert_eq!(row_stats.row_size_histogram[0].size_start, 0);
938        assert_eq!(row_stats.row_size_histogram[0].size_end, 1024);
939        assert_eq!(row_stats.row_size_histogram[0].count, 500);
940        assert_eq!(row_stats.row_size_histogram[0].percentage, 50.0);
941    }
942
943    #[test]
944    fn test_parse_timestamp_statistics_no_ttl() {
945        let mut data = Vec::new();
946        data.extend_from_slice(&1000000i64.to_be_bytes()); // min_timestamp
947        data.extend_from_slice(&2000000i64.to_be_bytes()); // max_timestamp
948        data.extend_from_slice(&0i64.to_be_bytes()); // min_deletion_time
949        data.extend_from_slice(&0i64.to_be_bytes()); // max_deletion_time
950        data.push(0); // has_ttl = false
951
952        let result = parse_timestamp_statistics(&data);
953        assert!(result.is_ok());
954
955        let (remaining, ts_stats) = result.unwrap();
956        assert!(remaining.is_empty());
957        assert_eq!(ts_stats.min_timestamp, 1000000);
958        assert_eq!(ts_stats.max_timestamp, 2000000);
959        assert_eq!(ts_stats.min_deletion_time, 0);
960        assert_eq!(ts_stats.max_deletion_time, 0);
961        assert!(ts_stats.min_ttl.is_none());
962        assert!(ts_stats.max_ttl.is_none());
963        assert_eq!(ts_stats.rows_with_ttl, 0);
964    }
965
966    #[test]
967    fn test_parse_timestamp_statistics_with_ttl() {
968        use super::super::vint::encode_vint;
969
970        let mut data = Vec::new();
971        data.extend_from_slice(&1000000i64.to_be_bytes());
972        data.extend_from_slice(&2000000i64.to_be_bytes());
973        data.extend_from_slice(&0i64.to_be_bytes());
974        data.extend_from_slice(&0i64.to_be_bytes());
975        data.push(1); // has_ttl = true
976        data.extend_from_slice(&3600i64.to_be_bytes()); // min_ttl
977        data.extend_from_slice(&86400i64.to_be_bytes()); // max_ttl
978        data.extend_from_slice(&encode_vint(250)); // rows_with_ttl
979
980        let result = parse_timestamp_statistics(&data);
981        assert!(result.is_ok());
982
983        let (_, ts_stats) = result.unwrap();
984        assert_eq!(ts_stats.min_ttl, Some(3600));
985        assert_eq!(ts_stats.max_ttl, Some(86400));
986        assert_eq!(ts_stats.rows_with_ttl, 250);
987    }
988
989    #[test]
990    fn test_parse_column_statistics_empty() {
991        let data = Vec::new();
992        let result = parse_column_statistics(&data, 0);
993        assert!(result.is_ok());
994
995        let (remaining, col_stats) = result.unwrap();
996        assert!(remaining.is_empty());
997        assert_eq!(col_stats.len(), 0);
998    }
999
1000    #[test]
1001    fn test_parse_column_statistics_single_column() {
1002        use super::super::vint::encode_vint;
1003
1004        let mut data = Vec::new();
1005        // Column name
1006        let name = b"user_id";
1007        data.extend_from_slice(&encode_vint(name.len() as i64));
1008        data.extend_from_slice(name);
1009        // Column type
1010        let col_type = b"int";
1011        data.extend_from_slice(&encode_vint(col_type.len() as i64));
1012        data.extend_from_slice(col_type);
1013        // value_count
1014        data.extend_from_slice(&encode_vint(1000));
1015        // null_count
1016        data.extend_from_slice(&encode_vint(0));
1017        // has_min_max = false
1018        data.push(0);
1019        // avg_size
1020        data.extend_from_slice(&4.0f64.to_be_bytes());
1021        // cardinality
1022        data.extend_from_slice(&encode_vint(500));
1023        // histogram_count
1024        data.extend_from_slice(&0u32.to_be_bytes());
1025        // has_index
1026        data.push(1);
1027
1028        let result = parse_column_statistics(&data, 1);
1029        assert!(result.is_ok());
1030
1031        let (_, col_stats) = result.unwrap();
1032        assert_eq!(col_stats.len(), 1);
1033        assert_eq!(col_stats[0].name, "user_id");
1034        assert_eq!(col_stats[0].column_type, "int");
1035        assert_eq!(col_stats[0].value_count, 1000);
1036        assert_eq!(col_stats[0].null_count, 0);
1037        assert!(col_stats[0].min_value.is_none());
1038        assert!(col_stats[0].max_value.is_none());
1039        assert_eq!(col_stats[0].avg_size, 4.0);
1040        assert_eq!(col_stats[0].cardinality, 500);
1041        assert_eq!(col_stats[0].value_histogram.len(), 0);
1042        assert!(col_stats[0].has_index);
1043    }
1044
1045    #[test]
1046    fn test_parse_column_statistics_with_min_max() {
1047        use super::super::vint::encode_vint;
1048
1049        let mut data = Vec::new();
1050        let name = b"score";
1051        data.extend_from_slice(&encode_vint(name.len() as i64));
1052        data.extend_from_slice(name);
1053        let col_type = b"int";
1054        data.extend_from_slice(&encode_vint(col_type.len() as i64));
1055        data.extend_from_slice(col_type);
1056        data.extend_from_slice(&encode_vint(500));
1057        data.extend_from_slice(&encode_vint(10));
1058        // has_min_max = true
1059        data.push(1);
1060        // min_value
1061        let min_val = vec![0x00, 0x00, 0x00, 0x01];
1062        data.extend_from_slice(&encode_vint(min_val.len() as i64));
1063        data.extend_from_slice(&min_val);
1064        // max_value
1065        let max_val = vec![0x00, 0x00, 0x03, 0xE8];
1066        data.extend_from_slice(&encode_vint(max_val.len() as i64));
1067        data.extend_from_slice(&max_val);
1068        data.extend_from_slice(&4.0f64.to_be_bytes());
1069        data.extend_from_slice(&encode_vint(400));
1070        data.extend_from_slice(&0u32.to_be_bytes());
1071        data.push(0);
1072
1073        let result = parse_column_statistics(&data, 1);
1074        assert!(result.is_ok());
1075
1076        let (_, col_stats) = result.unwrap();
1077        assert_eq!(col_stats.len(), 1);
1078        assert!(col_stats[0].min_value.is_some());
1079        assert!(col_stats[0].max_value.is_some());
1080        assert_eq!(col_stats[0].min_value.as_ref().unwrap(), &min_val);
1081        assert_eq!(col_stats[0].max_value.as_ref().unwrap(), &max_val);
1082    }
1083
1084    #[test]
1085    fn test_parse_table_statistics() {
1086        use super::super::vint::encode_vint;
1087
1088        let mut data = Vec::new();
1089        data.extend_from_slice(&(1024 * 1024u64).to_be_bytes()); // disk_size
1090        data.extend_from_slice(&(2048 * 1024u64).to_be_bytes()); // uncompressed_size
1091        data.extend_from_slice(&0.5f64.to_be_bytes()); // compression_ratio
1092        data.extend_from_slice(&encode_vint(100)); // block_count
1093        data.extend_from_slice(&1024.0f64.to_be_bytes()); // avg_block_size
1094        data.extend_from_slice(&1024u64.to_be_bytes()); // index_size
1095        data.extend_from_slice(&512u64.to_be_bytes()); // bloom_filter_size
1096        data.extend_from_slice(&1u32.to_be_bytes()); // level_count
1097
1098        let result = parse_table_statistics(&data);
1099        assert!(result.is_ok());
1100
1101        let (remaining, table_stats) = result.unwrap();
1102        assert!(remaining.is_empty());
1103        assert_eq!(table_stats.disk_size, 1024 * 1024);
1104        assert_eq!(table_stats.uncompressed_size, 2048 * 1024);
1105        assert_eq!(table_stats.compression_ratio, 0.5);
1106        assert_eq!(table_stats.block_count, 100);
1107        assert_eq!(table_stats.avg_block_size, 1024.0);
1108        assert_eq!(table_stats.index_size, 1024);
1109        assert_eq!(table_stats.bloom_filter_size, 512);
1110        assert_eq!(table_stats.level_count, 1);
1111    }
1112
1113    #[test]
1114    fn test_parse_partition_statistics() {
1115        let mut data = Vec::new();
1116        data.extend_from_slice(&20480.0f64.to_be_bytes()); // avg_partition_size
1117        data.extend_from_slice(&1024u64.to_be_bytes()); // min_partition_size
1118        data.extend_from_slice(&1048576u64.to_be_bytes()); // max_partition_size
1119        data.extend_from_slice(&5.0f64.to_be_bytes()); // large_partition_percentage
1120        data.extend_from_slice(&0u32.to_be_bytes()); // histogram_count
1121
1122        let result = parse_partition_statistics(&data);
1123        assert!(result.is_ok());
1124
1125        let (remaining, part_stats) = result.unwrap();
1126        assert!(remaining.is_empty());
1127        assert_eq!(part_stats.avg_partition_size, 20480.0);
1128        assert_eq!(part_stats.min_partition_size, 1024);
1129        assert_eq!(part_stats.max_partition_size, 1048576);
1130        assert_eq!(part_stats.large_partition_percentage, 5.0);
1131        assert_eq!(part_stats.size_histogram.len(), 0);
1132    }
1133
1134    #[test]
1135    fn test_parse_partition_statistics_with_histogram() {
1136        use super::super::vint::encode_vint;
1137
1138        let mut data = Vec::new();
1139        data.extend_from_slice(&20480.0f64.to_be_bytes());
1140        data.extend_from_slice(&1024u64.to_be_bytes());
1141        data.extend_from_slice(&1048576u64.to_be_bytes());
1142        data.extend_from_slice(&5.0f64.to_be_bytes());
1143        // histogram_count: 2 buckets
1144        data.extend_from_slice(&2u32.to_be_bytes());
1145        // Bucket 1
1146        data.extend_from_slice(&encode_vint(0));
1147        data.extend_from_slice(&encode_vint(10240));
1148        data.extend_from_slice(&encode_vint(30));
1149        data.extend_from_slice(&60.0f64.to_be_bytes()); // cumulative_percentage
1150                                                        // Bucket 2
1151        data.extend_from_slice(&encode_vint(10240));
1152        data.extend_from_slice(&encode_vint(1048576));
1153        data.extend_from_slice(&encode_vint(20));
1154        data.extend_from_slice(&100.0f64.to_be_bytes());
1155
1156        let result = parse_partition_statistics(&data);
1157        assert!(result.is_ok());
1158
1159        let (_, part_stats) = result.unwrap();
1160        assert_eq!(part_stats.size_histogram.len(), 2);
1161        assert_eq!(part_stats.size_histogram[0].size_start, 0);
1162        assert_eq!(part_stats.size_histogram[0].cumulative_percentage, 60.0);
1163        assert_eq!(part_stats.size_histogram[1].cumulative_percentage, 100.0);
1164    }
1165
1166    #[test]
1167    fn test_parse_compression_statistics() {
1168        use super::super::vint::encode_vint;
1169
1170        let mut data = Vec::new();
1171        // algorithm
1172        let algo = b"LZ4";
1173        data.extend_from_slice(&encode_vint(algo.len() as i64));
1174        data.extend_from_slice(algo);
1175        // original_size
1176        data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
1177        // compressed_size
1178        data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
1179        // ratio
1180        data.extend_from_slice(&0.5f64.to_be_bytes());
1181        // compression_speed
1182        data.extend_from_slice(&100.0f64.to_be_bytes());
1183        // decompression_speed
1184        data.extend_from_slice(&200.0f64.to_be_bytes());
1185        // compressed_blocks
1186        data.extend_from_slice(&encode_vint(100));
1187
1188        let result = parse_compression_statistics(&data);
1189        assert!(result.is_ok());
1190
1191        let (remaining, comp_stats) = result.unwrap();
1192        assert!(remaining.is_empty());
1193        assert_eq!(comp_stats.algorithm, "LZ4");
1194        assert_eq!(comp_stats.original_size, 2048 * 1024);
1195        assert_eq!(comp_stats.compressed_size, 1024 * 1024);
1196        assert_eq!(comp_stats.ratio, 0.5);
1197        assert_eq!(comp_stats.compression_speed, 100.0);
1198        assert_eq!(comp_stats.decompression_speed, 200.0);
1199        assert_eq!(comp_stats.compressed_blocks, 100);
1200    }
1201
1202    #[test]
1203    fn test_parse_compression_statistics_different_algorithms() {
1204        use super::super::vint::encode_vint;
1205
1206        for algorithm in &["LZ4", "Snappy", "Deflate", "Zstd"] {
1207            let mut data = Vec::new();
1208            data.extend_from_slice(&encode_vint(algorithm.len() as i64));
1209            data.extend_from_slice(algorithm.as_bytes());
1210            data.extend_from_slice(&(1000000u64).to_be_bytes());
1211            data.extend_from_slice(&(500000u64).to_be_bytes());
1212            data.extend_from_slice(&0.5f64.to_be_bytes());
1213            data.extend_from_slice(&100.0f64.to_be_bytes());
1214            data.extend_from_slice(&200.0f64.to_be_bytes());
1215            data.extend_from_slice(&encode_vint(50));
1216
1217            let result = parse_compression_statistics(&data);
1218            assert!(result.is_ok());
1219            let (_, comp_stats) = result.unwrap();
1220            assert_eq!(comp_stats.algorithm, *algorithm);
1221        }
1222    }
1223
1224    #[test]
1225    fn test_parse_metadata_section_empty() {
1226        let mut data = Vec::new();
1227        data.extend_from_slice(&0u32.to_be_bytes()); // metadata_count = 0
1228
1229        let result = parse_metadata_section(&data);
1230        assert!(result.is_ok());
1231
1232        let (remaining, metadata) = result.unwrap();
1233        assert!(remaining.is_empty());
1234        assert_eq!(metadata.len(), 0);
1235    }
1236
1237    #[test]
1238    fn test_parse_metadata_section_with_entries() {
1239        use super::super::vint::encode_vint;
1240
1241        let mut data = Vec::new();
1242        data.extend_from_slice(&2u32.to_be_bytes()); // metadata_count = 2
1243
1244        // Entry 1: "compaction_strategy" -> "LeveledCompactionStrategy"
1245        let key1 = b"compaction_strategy";
1246        data.extend_from_slice(&encode_vint(key1.len() as i64));
1247        data.extend_from_slice(key1);
1248        let val1 = b"LeveledCompactionStrategy";
1249        data.extend_from_slice(&encode_vint(val1.len() as i64));
1250        data.extend_from_slice(val1);
1251
1252        // Entry 2: "sstable_format" -> "nb"
1253        let key2 = b"sstable_format";
1254        data.extend_from_slice(&encode_vint(key2.len() as i64));
1255        data.extend_from_slice(key2);
1256        let val2 = b"nb";
1257        data.extend_from_slice(&encode_vint(val2.len() as i64));
1258        data.extend_from_slice(val2);
1259
1260        let result = parse_metadata_section(&data);
1261        assert!(result.is_ok());
1262
1263        let (remaining, metadata) = result.unwrap();
1264        assert!(remaining.is_empty());
1265        assert_eq!(metadata.len(), 2);
1266        assert_eq!(
1267            metadata.get("compaction_strategy"),
1268            Some(&"LeveledCompactionStrategy".to_string())
1269        );
1270        assert_eq!(metadata.get("sstable_format"), Some(&"nb".to_string()));
1271    }
1272
1273    #[test]
1274    fn test_parse_statistics_file() {
1275        use super::super::vint::encode_vint;
1276
1277        let mut data = Vec::new();
1278
1279        // Header (legacy format)
1280        data.extend_from_slice(&1u32.to_be_bytes()); // version
1281        data.extend_from_slice(&[1u8; 16]); // table_id
1282        data.extend_from_slice(&0u32.to_be_bytes()); // section_count
1283        data.extend_from_slice(&4096u64.to_be_bytes()); // file_size
1284        data.extend_from_slice(&0x12345678u32.to_be_bytes()); // checksum
1285
1286        // Row statistics
1287        data.extend_from_slice(&encode_vint(1000));
1288        data.extend_from_slice(&encode_vint(900));
1289        data.extend_from_slice(&encode_vint(100));
1290        data.extend_from_slice(&encode_vint(50));
1291        data.extend_from_slice(&20.0f64.to_be_bytes());
1292        data.extend_from_slice(&0u32.to_be_bytes()); // histogram_count
1293
1294        // Timestamp statistics
1295        data.extend_from_slice(&1000000i64.to_be_bytes());
1296        data.extend_from_slice(&2000000i64.to_be_bytes());
1297        data.extend_from_slice(&0i64.to_be_bytes());
1298        data.extend_from_slice(&0i64.to_be_bytes());
1299        data.push(0); // has_ttl = false
1300
1301        // Column statistics (0 columns)
1302        // (no data needed since column_count from header.data_length is 0)
1303
1304        // Table statistics
1305        data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
1306        data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
1307        data.extend_from_slice(&0.5f64.to_be_bytes());
1308        data.extend_from_slice(&encode_vint(100));
1309        data.extend_from_slice(&1024.0f64.to_be_bytes());
1310        data.extend_from_slice(&1024u64.to_be_bytes());
1311        data.extend_from_slice(&512u64.to_be_bytes());
1312        data.extend_from_slice(&1u32.to_be_bytes());
1313
1314        // Partition statistics
1315        data.extend_from_slice(&20480.0f64.to_be_bytes());
1316        data.extend_from_slice(&1024u64.to_be_bytes());
1317        data.extend_from_slice(&1048576u64.to_be_bytes());
1318        data.extend_from_slice(&5.0f64.to_be_bytes());
1319        data.extend_from_slice(&0u32.to_be_bytes()); // histogram_count
1320
1321        // Compression statistics
1322        let algo = b"LZ4";
1323        data.extend_from_slice(&encode_vint(algo.len() as i64));
1324        data.extend_from_slice(algo);
1325        data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
1326        data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
1327        data.extend_from_slice(&0.5f64.to_be_bytes());
1328        data.extend_from_slice(&100.0f64.to_be_bytes());
1329        data.extend_from_slice(&200.0f64.to_be_bytes());
1330        data.extend_from_slice(&encode_vint(100));
1331
1332        // Metadata section
1333        data.extend_from_slice(&0u32.to_be_bytes()); // metadata_count
1334
1335        let result = parse_statistics_file(&data);
1336        assert!(result.is_ok());
1337
1338        let (remaining, stats) = result.unwrap();
1339        assert!(remaining.is_empty());
1340        assert_eq!(stats.header.version, 1);
1341        assert_eq!(stats.row_stats.total_rows, 1000);
1342        assert_eq!(stats.timestamp_stats.min_timestamp, 1000000);
1343        assert_eq!(stats.table_stats.disk_size, 1024 * 1024);
1344        assert_eq!(stats.partition_stats.avg_partition_size, 20480.0);
1345        assert_eq!(stats.compression_stats.algorithm, "LZ4");
1346        assert_eq!(stats.metadata.len(), 0);
1347    }
1348
1349    #[test]
1350    fn test_parse_statistics_file_with_extra_data() {
1351        use super::super::vint::encode_vint;
1352
1353        let mut data = Vec::new();
1354
1355        // Header
1356        data.extend_from_slice(&1u32.to_be_bytes());
1357        data.extend_from_slice(&[1u8; 16]);
1358        data.extend_from_slice(&0u32.to_be_bytes());
1359        data.extend_from_slice(&4096u64.to_be_bytes());
1360        data.extend_from_slice(&0x12345678u32.to_be_bytes());
1361
1362        // Minimal required sections
1363        data.extend_from_slice(&encode_vint(100));
1364        data.extend_from_slice(&encode_vint(90));
1365        data.extend_from_slice(&encode_vint(10));
1366        data.extend_from_slice(&encode_vint(10));
1367        data.extend_from_slice(&10.0f64.to_be_bytes());
1368        data.extend_from_slice(&0u32.to_be_bytes());
1369
1370        data.extend_from_slice(&0i64.to_be_bytes());
1371        data.extend_from_slice(&1000000i64.to_be_bytes());
1372        data.extend_from_slice(&0i64.to_be_bytes());
1373        data.extend_from_slice(&0i64.to_be_bytes());
1374        data.push(0);
1375
1376        data.extend_from_slice(&1024u64.to_be_bytes());
1377        data.extend_from_slice(&2048u64.to_be_bytes());
1378        data.extend_from_slice(&0.5f64.to_be_bytes());
1379        data.extend_from_slice(&encode_vint(10));
1380        data.extend_from_slice(&100.0f64.to_be_bytes());
1381        data.extend_from_slice(&100u64.to_be_bytes());
1382        data.extend_from_slice(&50u64.to_be_bytes());
1383        data.extend_from_slice(&1u32.to_be_bytes());
1384
1385        data.extend_from_slice(&1000.0f64.to_be_bytes());
1386        data.extend_from_slice(&100u64.to_be_bytes());
1387        data.extend_from_slice(&10000u64.to_be_bytes());
1388        data.extend_from_slice(&1.0f64.to_be_bytes());
1389        data.extend_from_slice(&0u32.to_be_bytes());
1390
1391        let algo = b"Snappy";
1392        data.extend_from_slice(&encode_vint(algo.len() as i64));
1393        data.extend_from_slice(algo);
1394        data.extend_from_slice(&2048u64.to_be_bytes());
1395        data.extend_from_slice(&1024u64.to_be_bytes());
1396        data.extend_from_slice(&0.5f64.to_be_bytes());
1397        data.extend_from_slice(&100.0f64.to_be_bytes());
1398        data.extend_from_slice(&200.0f64.to_be_bytes());
1399        data.extend_from_slice(&encode_vint(10));
1400
1401        data.extend_from_slice(&0u32.to_be_bytes());
1402
1403        // Extra trailing data
1404        data.extend_from_slice(b"extra_data");
1405
1406        let result = parse_statistics_file(&data);
1407        assert!(result.is_ok());
1408
1409        let (remaining, stats) = result.unwrap();
1410        // Should have consumed all except the extra trailing data
1411        assert_eq!(remaining, b"extra_data");
1412        assert_eq!(stats.compression_stats.algorithm, "Snappy");
1413    }
1414
1415    fn create_test_statistics() -> SSTableStatistics {
1416        SSTableStatistics {
1417            header: StatisticsHeader {
1418                version: 1,
1419                statistics_kind: 3,
1420                data_length: 1024,
1421                metadata1: 0,
1422                metadata2: 0,
1423                metadata3: 0,
1424                checksum: 0x12345678,
1425                table_id: Some([1; 16]),
1426            },
1427            row_stats: RowStatistics {
1428                total_rows: 1000,
1429                live_rows: 900,
1430                tombstone_count: 100,
1431                partition_count: 50,
1432                avg_rows_per_partition: 20.0,
1433                row_size_histogram: vec![],
1434            },
1435            timestamp_stats: TimestampStatistics {
1436                min_timestamp: 1000000,
1437                max_timestamp: 2000000,
1438                min_deletion_time: 0,
1439                max_deletion_time: 0,
1440                min_ttl: None,
1441                max_ttl: None,
1442                rows_with_ttl: 0,
1443            },
1444            column_stats: vec![],
1445            table_stats: TableStatistics {
1446                disk_size: 1024 * 1024,
1447                uncompressed_size: 2048 * 1024,
1448                compressed_size: 1024 * 1024,
1449                compression_ratio: 0.5,
1450                block_count: 100,
1451                avg_block_size: 1024.0,
1452                index_size: 1024,
1453                bloom_filter_size: 512,
1454                level_count: 1,
1455            },
1456            partition_stats: PartitionStatistics {
1457                avg_partition_size: 20480.0,
1458                min_partition_size: 1024,
1459                max_partition_size: 1048576,
1460                size_histogram: vec![],
1461                large_partition_percentage: 5.0,
1462            },
1463            compression_stats: CompressionStatistics {
1464                algorithm: "LZ4".to_string(),
1465                original_size: 2048 * 1024,
1466                compressed_size: 1024 * 1024,
1467                ratio: 0.5,
1468                compression_speed: 100.0,
1469                decompression_speed: 200.0,
1470                compressed_blocks: 100,
1471            },
1472            metadata: HashMap::new(),
1473            serialization_header_columns: vec![],
1474            serialization_header_partition_keys: vec![],
1475            serialization_header_clustering_keys: vec![],
1476        }
1477    }
1478}