use super::vint::{parse_vint, parse_vint_length};
use crate::error::{Error, Result};
use nom::{
bytes::complete::take,
multi::count,
number::complete::{be_f64, be_i64, be_u32, be_u64, be_u8},
IResult,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatisticsHeader {
pub version: u32,
pub statistics_kind: u32,
pub data_length: u32,
pub metadata1: u32,
pub metadata2: u32,
pub metadata3: u32,
pub checksum: u32,
pub table_id: Option<[u8; 16]>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SSTableStatistics {
pub header: StatisticsHeader,
pub row_stats: RowStatistics,
pub timestamp_stats: TimestampStatistics,
pub column_stats: Vec<ColumnStatistics>,
pub table_stats: TableStatistics,
pub partition_stats: PartitionStatistics,
pub compression_stats: CompressionStatistics,
pub metadata: HashMap<String, String>,
#[serde(default)]
pub serialization_header_columns: Vec<super::header::ColumnInfo>,
#[serde(default)]
pub serialization_header_partition_keys: Vec<super::header::ColumnInfo>,
#[serde(default)]
pub serialization_header_clustering_keys: Vec<super::header::ColumnInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RowStatistics {
pub total_rows: u64,
pub live_rows: u64,
pub tombstone_count: u64,
pub partition_count: u64,
pub avg_rows_per_partition: f64,
pub row_size_histogram: Vec<RowSizeBucket>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimestampStatistics {
pub min_timestamp: i64,
pub max_timestamp: i64,
pub min_deletion_time: i64,
pub max_deletion_time: i64,
pub min_ttl: Option<i64>,
pub max_ttl: Option<i64>,
pub rows_with_ttl: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnStatistics {
pub name: String,
pub column_type: String,
pub value_count: u64,
pub null_count: u64,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub avg_size: f64,
pub cardinality: u64,
pub value_histogram: Vec<ValueFrequency>,
pub has_index: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableStatistics {
pub disk_size: u64,
pub uncompressed_size: u64,
pub compressed_size: u64,
pub compression_ratio: f64,
pub block_count: u64,
pub avg_block_size: f64,
pub index_size: u64,
pub bloom_filter_size: u64,
pub level_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionStatistics {
pub avg_partition_size: f64,
pub min_partition_size: u64,
pub max_partition_size: u64,
pub size_histogram: Vec<PartitionSizeBucket>,
pub large_partition_percentage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionStatistics {
pub algorithm: String,
pub original_size: u64,
pub compressed_size: u64,
pub ratio: f64,
pub compression_speed: f64,
pub decompression_speed: f64,
pub compressed_blocks: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RowSizeBucket {
pub size_start: u64,
pub size_end: u64,
pub count: u64,
pub percentage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValueFrequency {
pub value: Vec<u8>,
pub frequency: u64,
pub percentage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionSizeBucket {
pub size_start: u64,
pub size_end: u64,
pub count: u64,
pub cumulative_percentage: f64,
}
pub fn parse_statistics_file(input: &[u8]) -> IResult<&[u8], SSTableStatistics> {
let (input, header) = parse_statistics_header(input)?;
let (input, row_stats) = parse_row_statistics(input)?;
let (input, timestamp_stats) = parse_timestamp_statistics(input)?;
let (input, column_stats) = parse_column_statistics(input, header.data_length)?;
let (input, table_stats) = parse_table_statistics(input)?;
let (input, partition_stats) = parse_partition_statistics(input)?;
let (input, compression_stats) = parse_compression_statistics(input)?;
let (input, metadata) = parse_metadata_section(input)?;
Ok((
input,
SSTableStatistics {
header,
row_stats,
timestamp_stats,
column_stats,
table_stats,
partition_stats,
compression_stats,
metadata,
serialization_header_columns: vec![], serialization_header_partition_keys: vec![],
serialization_header_clustering_keys: vec![],
},
))
}
pub fn parse_statistics_header(input: &[u8]) -> IResult<&[u8], StatisticsHeader> {
let (remaining, version) = be_u32(input)?;
match version {
4 => parse_nb_format_header(remaining, version),
1..=3 => parse_legacy_format_header(remaining, version),
_ => Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
))),
}
}
fn parse_nb_format_header(input: &[u8], version: u32) -> IResult<&[u8], StatisticsHeader> {
let (input, statistics_kind) = be_u32(input)?;
let (input, _reserved) = be_u32(input)?;
let (input, data_length) = be_u32(input)?;
let (input, metadata1) = be_u32(input)?;
let (input, metadata2) = be_u32(input)?;
let (input, metadata3) = be_u32(input)?;
let (input, checksum) = be_u32(input)?;
Ok((
input,
StatisticsHeader {
version,
statistics_kind,
data_length,
metadata1,
metadata2,
metadata3,
checksum,
table_id: None, },
))
}
fn parse_legacy_format_header(input: &[u8], version: u32) -> IResult<&[u8], StatisticsHeader> {
let (input, table_id_raw) = take(16u8)(input)?;
let mut table_id_array = [0u8; 16];
table_id_array.copy_from_slice(table_id_raw);
let (input, section_count) = be_u32(input)?;
let (input, file_size) = be_u64(input)?;
let (input, checksum) = be_u32(input)?;
Ok((
input,
StatisticsHeader {
version,
statistics_kind: 0, data_length: section_count,
metadata1: (file_size >> 32) as u32,
metadata2: file_size as u32,
metadata3: 0,
checksum,
table_id: Some(table_id_array),
},
))
}
pub fn parse_row_statistics(input: &[u8]) -> IResult<&[u8], RowStatistics> {
let (input, total_rows) = parse_vint_as_u64(input)?;
let (input, live_rows) = parse_vint_as_u64(input)?;
let (input, tombstone_count) = parse_vint_as_u64(input)?;
let (input, partition_count) = parse_vint_as_u64(input)?;
let (input, avg_rows_per_partition) = be_f64(input)?;
let (input, histogram_count) = be_u32(input)?;
let (input, row_size_histogram) =
count(parse_row_size_bucket, histogram_count as usize)(input)?;
Ok((
input,
RowStatistics {
total_rows,
live_rows,
tombstone_count,
partition_count,
avg_rows_per_partition,
row_size_histogram,
},
))
}
pub fn parse_timestamp_statistics(input: &[u8]) -> IResult<&[u8], TimestampStatistics> {
let (input, min_timestamp) = be_i64(input)?;
let (input, max_timestamp) = be_i64(input)?;
let (input, min_deletion_time) = be_i64(input)?;
let (input, max_deletion_time) = be_i64(input)?;
let (input, has_ttl) = be_u8(input)?;
let (input, min_ttl, max_ttl, rows_with_ttl) = if has_ttl != 0 {
let (input, min_ttl) = be_i64(input)?;
let (input, max_ttl) = be_i64(input)?;
let (input, rows_with_ttl) = parse_vint_as_u64(input)?;
(input, Some(min_ttl), Some(max_ttl), rows_with_ttl)
} else {
(input, None, None, 0)
};
Ok((
input,
TimestampStatistics {
min_timestamp,
max_timestamp,
min_deletion_time,
max_deletion_time,
min_ttl,
max_ttl,
rows_with_ttl,
},
))
}
pub fn parse_column_statistics(
input: &[u8],
column_count: u32,
) -> IResult<&[u8], Vec<ColumnStatistics>> {
count(parse_single_column_statistics, column_count as usize)(input)
}
pub fn parse_single_column_statistics(input: &[u8]) -> IResult<&[u8], ColumnStatistics> {
let (input, name_len) = parse_vint_length(input)?;
let (input, name_bytes) = take(name_len)(input)?;
let name = String::from_utf8_lossy(name_bytes).to_string();
let (input, type_len) = parse_vint_length(input)?;
let (input, type_bytes) = take(type_len)(input)?;
let column_type = String::from_utf8_lossy(type_bytes).to_string();
let (input, value_count) = parse_vint_as_u64(input)?;
let (input, null_count) = parse_vint_as_u64(input)?;
let (input, has_min_max) = be_u8(input)?;
let (input, min_value, max_value) = if has_min_max != 0 {
let (input, min_len) = parse_vint_length(input)?;
let (input, min_bytes) = take(min_len)(input)?;
let (input, max_len) = parse_vint_length(input)?;
let (input, max_bytes) = take(max_len)(input)?;
(input, Some(min_bytes.to_vec()), Some(max_bytes.to_vec()))
} else {
(input, None, None)
};
let (input, avg_size) = be_f64(input)?;
let (input, cardinality) = parse_vint_as_u64(input)?;
let (input, histogram_count) = be_u32(input)?;
let (input, value_histogram) = count(parse_value_frequency, histogram_count as usize)(input)?;
let (input, has_index) = be_u8(input)?;
Ok((
input,
ColumnStatistics {
name,
column_type,
value_count,
null_count,
min_value,
max_value,
avg_size,
cardinality,
value_histogram,
has_index: has_index != 0,
},
))
}
pub fn parse_table_statistics(input: &[u8]) -> IResult<&[u8], TableStatistics> {
let (input, disk_size) = be_u64(input)?;
let (input, uncompressed_size) = be_u64(input)?;
let (input, compression_ratio) = be_f64(input)?;
let (input, block_count) = parse_vint_as_u64(input)?;
let (input, avg_block_size) = be_f64(input)?;
let (input, index_size) = be_u64(input)?;
let (input, bloom_filter_size) = be_u64(input)?;
let (input, level_count) = be_u32(input)?;
Ok((
input,
TableStatistics {
disk_size,
uncompressed_size,
compressed_size: disk_size, compression_ratio,
block_count,
avg_block_size,
index_size,
bloom_filter_size,
level_count,
},
))
}
pub fn parse_partition_statistics(input: &[u8]) -> IResult<&[u8], PartitionStatistics> {
let (input, avg_partition_size) = be_f64(input)?;
let (input, min_partition_size) = be_u64(input)?;
let (input, max_partition_size) = be_u64(input)?;
let (input, large_partition_percentage) = be_f64(input)?;
let (input, histogram_count) = be_u32(input)?;
let (input, size_histogram) =
count(parse_partition_size_bucket, histogram_count as usize)(input)?;
Ok((
input,
PartitionStatistics {
avg_partition_size,
min_partition_size,
max_partition_size,
size_histogram,
large_partition_percentage,
},
))
}
pub fn parse_compression_statistics(input: &[u8]) -> IResult<&[u8], CompressionStatistics> {
let (input, algorithm_len) = parse_vint_length(input)?;
let (input, algorithm_bytes) = take(algorithm_len)(input)?;
let algorithm = String::from_utf8_lossy(algorithm_bytes).to_string();
let (input, original_size) = be_u64(input)?;
let (input, compressed_size) = be_u64(input)?;
let (input, ratio) = be_f64(input)?;
let (input, compression_speed) = be_f64(input)?;
let (input, decompression_speed) = be_f64(input)?;
let (input, compressed_blocks) = parse_vint_as_u64(input)?;
Ok((
input,
CompressionStatistics {
algorithm,
original_size,
compressed_size,
ratio,
compression_speed,
decompression_speed,
compressed_blocks,
},
))
}
pub fn parse_metadata_section(input: &[u8]) -> IResult<&[u8], HashMap<String, String>> {
let (input, metadata_count) = be_u32(input)?;
let mut metadata = HashMap::new();
let mut remaining = input;
for _ in 0..metadata_count {
let (next, key_len) = parse_vint_length(remaining)?;
let (next, key_bytes) = take(key_len)(next)?;
let key = String::from_utf8_lossy(key_bytes).to_string();
let (next, value_len) = parse_vint_length(next)?;
let (next, value_bytes) = take(value_len)(next)?;
let value = String::from_utf8_lossy(value_bytes).to_string();
metadata.insert(key, value);
remaining = next;
}
Ok((remaining, metadata))
}
pub fn parse_row_size_bucket(input: &[u8]) -> IResult<&[u8], RowSizeBucket> {
let (input, size_start) = parse_vint_as_u64(input)?;
let (input, size_end) = parse_vint_as_u64(input)?;
let (input, count) = parse_vint_as_u64(input)?;
let (input, percentage) = be_f64(input)?;
Ok((
input,
RowSizeBucket {
size_start,
size_end,
count,
percentage,
},
))
}
pub fn parse_partition_size_bucket(input: &[u8]) -> IResult<&[u8], PartitionSizeBucket> {
let (input, size_start) = parse_vint_as_u64(input)?;
let (input, size_end) = parse_vint_as_u64(input)?;
let (input, count) = parse_vint_as_u64(input)?;
let (input, cumulative_percentage) = be_f64(input)?;
Ok((
input,
PartitionSizeBucket {
size_start,
size_end,
count,
cumulative_percentage,
},
))
}
pub fn parse_value_frequency(input: &[u8]) -> IResult<&[u8], ValueFrequency> {
let (input, value_len) = parse_vint_length(input)?;
let (input, value_bytes) = take(value_len)(input)?;
let (input, frequency) = parse_vint_as_u64(input)?;
let (input, percentage) = be_f64(input)?;
Ok((
input,
ValueFrequency {
value: value_bytes.to_vec(),
frequency,
percentage,
},
))
}
fn parse_vint_as_u64(input: &[u8]) -> IResult<&[u8], u64> {
let (input, value) = parse_vint(input)?;
Ok((input, value as u64))
}
pub struct StatisticsAnalyzer;
impl StatisticsAnalyzer {
pub fn analyze(stats: &SSTableStatistics) -> StatisticsSummary {
let data_efficiency = Self::calculate_data_efficiency(stats);
let query_performance_hints = Self::generate_query_hints(stats);
let storage_recommendations = Self::generate_storage_recommendations(stats);
let health_score = Self::calculate_health_score(stats);
StatisticsSummary {
total_rows: stats.row_stats.total_rows,
live_data_percentage: (stats.row_stats.live_rows as f64
/ stats.row_stats.total_rows as f64)
* 100.0,
compression_efficiency: stats.compression_stats.ratio * 100.0,
timestamp_range_days: Self::calculate_timestamp_range_days(stats),
largest_partition_mb: stats.partition_stats.max_partition_size as f64 / 1_048_576.0,
data_efficiency,
query_performance_hints,
storage_recommendations,
health_score,
}
}
fn calculate_data_efficiency(stats: &SSTableStatistics) -> f64 {
let live_ratio = stats.row_stats.live_rows as f64 / stats.row_stats.total_rows as f64;
let compression_ratio = stats.compression_stats.ratio;
let partition_efficiency = 1.0 - (stats.partition_stats.large_partition_percentage / 100.0);
(live_ratio + compression_ratio + partition_efficiency) / 3.0 * 100.0
}
fn generate_query_hints(stats: &SSTableStatistics) -> Vec<String> {
let mut hints = Vec::new();
if stats.partition_stats.large_partition_percentage > 10.0 {
hints.push("Consider reviewing partition key design - high percentage of large partitions detected".to_string());
}
if stats.row_stats.tombstone_count > stats.row_stats.live_rows / 4 {
hints.push("High tombstone ratio - consider running compaction".to_string());
}
if stats.table_stats.compression_ratio < 0.5 {
hints.push("Low compression ratio - data may not be well-suited for current compression algorithm".to_string());
}
hints
}
fn generate_storage_recommendations(stats: &SSTableStatistics) -> Vec<String> {
let mut recommendations = Vec::new();
if stats.table_stats.disk_size > 1_073_741_824 {
recommendations
.push("Large SSTable detected - consider more frequent compaction".to_string());
}
if stats.row_stats.avg_rows_per_partition < 10.0 {
recommendations.push(
"Low average rows per partition - partition key may be too granular".to_string(),
);
}
recommendations
}
fn calculate_health_score(stats: &SSTableStatistics) -> f64 {
let mut score = 100.0;
let tombstone_ratio =
stats.row_stats.tombstone_count as f64 / stats.row_stats.total_rows as f64;
score -= tombstone_ratio * 30.0;
if stats.compression_stats.ratio < 0.5 {
score -= 20.0;
}
score -= stats.partition_stats.large_partition_percentage;
score.max(0.0)
}
fn calculate_timestamp_range_days(stats: &SSTableStatistics) -> f64 {
let range_micros =
stats.timestamp_stats.max_timestamp - stats.timestamp_stats.min_timestamp;
range_micros as f64 / (1_000_000.0 * 60.0 * 60.0 * 24.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatisticsSummary {
pub total_rows: u64,
pub live_data_percentage: f64,
pub compression_efficiency: f64,
pub timestamp_range_days: f64,
pub largest_partition_mb: f64,
pub data_efficiency: f64,
pub query_performance_hints: Vec<String>,
pub storage_recommendations: Vec<String>,
pub health_score: f64,
}
pub fn serialize_statistics(_stats: &SSTableStatistics) -> Result<Vec<u8>> {
Err(Error::corruption(
"Statistics serialization not yet implemented",
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_statistics_header_parsing() {
let test_data = vec![
0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F, 0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x12, 0x34, 0x56, 0x78, ];
let result = parse_statistics_header(&test_data);
assert!(result.is_ok());
let (_, header) = result.unwrap();
assert_eq!(header.version, 1);
assert_eq!(header.checksum, 0x12345678);
}
#[test]
fn test_nb_format_authoritative_detection() {
let nb_data = vec![
0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14, 0xd4, ];
let result = parse_statistics_header(&nb_data);
assert!(result.is_ok());
let (_, header) = result.unwrap();
assert_eq!(header.version, 4);
assert_eq!(header.statistics_kind, 0x26291b05);
assert_eq!(header.data_length, 44);
assert!(header.table_id.is_none()); }
#[test]
fn test_legacy_format_authoritative_detection() {
let legacy_data = vec![
0x00, 0x00, 0x00, 0x02, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE,
0xFF, 0x00, 0x00, 0x00, 0x00, 0x0A, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0xAB, 0xCD, 0xEF, 0x12, ];
let result = parse_statistics_header(&legacy_data);
assert!(result.is_ok());
let (_, header) = result.unwrap();
assert_eq!(header.version, 2);
assert_eq!(header.statistics_kind, 0); assert!(header.table_id.is_some()); }
#[test]
fn test_unsupported_version_rejection() {
let invalid_v0 = vec![
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
assert!(parse_statistics_header(&invalid_v0).is_err());
let invalid_v5 = vec![
0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, ];
assert!(parse_statistics_header(&invalid_v5).is_err());
let invalid_v255 = vec![
0x00, 0x00, 0x00, 0xFF, 0x00, 0x00, 0x00, 0x00, ];
assert!(parse_statistics_header(&invalid_v255).is_err());
}
#[test]
fn test_no_heuristics_version_4_with_short_input() {
let short_nb_data = vec![
0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x2c, ];
let result = parse_statistics_header(&short_nb_data);
assert!(result.is_err());
}
#[test]
fn test_statistics_analyzer() {
let stats = create_test_statistics();
let summary = StatisticsAnalyzer::analyze(&stats);
assert!(summary.total_rows > 0);
assert!(summary.health_score >= 0.0 && summary.health_score <= 100.0);
assert!(summary.live_data_percentage >= 0.0 && summary.live_data_percentage <= 100.0);
}
#[test]
fn test_parse_row_statistics() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(1000));
data.extend_from_slice(&encode_vint(900));
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&encode_vint(50));
data.extend_from_slice(&20.0f64.to_be_bytes());
data.extend_from_slice(&0u32.to_be_bytes());
let result = parse_row_statistics(&data);
assert!(result.is_ok());
let (remaining, row_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(row_stats.total_rows, 1000);
assert_eq!(row_stats.live_rows, 900);
assert_eq!(row_stats.tombstone_count, 100);
assert_eq!(row_stats.partition_count, 50);
assert_eq!(row_stats.avg_rows_per_partition, 20.0);
assert_eq!(row_stats.row_size_histogram.len(), 0);
}
#[test]
fn test_parse_row_statistics_with_histogram() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(1000));
data.extend_from_slice(&encode_vint(900));
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&encode_vint(50));
data.extend_from_slice(&20.0f64.to_be_bytes());
data.extend_from_slice(&2u32.to_be_bytes());
data.extend_from_slice(&encode_vint(0)); data.extend_from_slice(&encode_vint(1024)); data.extend_from_slice(&encode_vint(500)); data.extend_from_slice(&50.0f64.to_be_bytes()); data.extend_from_slice(&encode_vint(1024)); data.extend_from_slice(&encode_vint(10240)); data.extend_from_slice(&encode_vint(500)); data.extend_from_slice(&50.0f64.to_be_bytes());
let result = parse_row_statistics(&data);
assert!(result.is_ok());
let (_, row_stats) = result.unwrap();
assert_eq!(row_stats.row_size_histogram.len(), 2);
assert_eq!(row_stats.row_size_histogram[0].size_start, 0);
assert_eq!(row_stats.row_size_histogram[0].size_end, 1024);
assert_eq!(row_stats.row_size_histogram[0].count, 500);
assert_eq!(row_stats.row_size_histogram[0].percentage, 50.0);
}
#[test]
fn test_parse_timestamp_statistics_no_ttl() {
let mut data = Vec::new();
data.extend_from_slice(&1000000i64.to_be_bytes()); data.extend_from_slice(&2000000i64.to_be_bytes()); data.extend_from_slice(&0i64.to_be_bytes()); data.extend_from_slice(&0i64.to_be_bytes()); data.push(0);
let result = parse_timestamp_statistics(&data);
assert!(result.is_ok());
let (remaining, ts_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(ts_stats.min_timestamp, 1000000);
assert_eq!(ts_stats.max_timestamp, 2000000);
assert_eq!(ts_stats.min_deletion_time, 0);
assert_eq!(ts_stats.max_deletion_time, 0);
assert!(ts_stats.min_ttl.is_none());
assert!(ts_stats.max_ttl.is_none());
assert_eq!(ts_stats.rows_with_ttl, 0);
}
#[test]
fn test_parse_timestamp_statistics_with_ttl() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&1000000i64.to_be_bytes());
data.extend_from_slice(&2000000i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.push(1); data.extend_from_slice(&3600i64.to_be_bytes()); data.extend_from_slice(&86400i64.to_be_bytes()); data.extend_from_slice(&encode_vint(250));
let result = parse_timestamp_statistics(&data);
assert!(result.is_ok());
let (_, ts_stats) = result.unwrap();
assert_eq!(ts_stats.min_ttl, Some(3600));
assert_eq!(ts_stats.max_ttl, Some(86400));
assert_eq!(ts_stats.rows_with_ttl, 250);
}
#[test]
fn test_parse_column_statistics_empty() {
let data = Vec::new();
let result = parse_column_statistics(&data, 0);
assert!(result.is_ok());
let (remaining, col_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(col_stats.len(), 0);
}
#[test]
fn test_parse_column_statistics_single_column() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
let name = b"user_id";
data.extend_from_slice(&encode_vint(name.len() as i64));
data.extend_from_slice(name);
let col_type = b"int";
data.extend_from_slice(&encode_vint(col_type.len() as i64));
data.extend_from_slice(col_type);
data.extend_from_slice(&encode_vint(1000));
data.extend_from_slice(&encode_vint(0));
data.push(0);
data.extend_from_slice(&4.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(500));
data.extend_from_slice(&0u32.to_be_bytes());
data.push(1);
let result = parse_column_statistics(&data, 1);
assert!(result.is_ok());
let (_, col_stats) = result.unwrap();
assert_eq!(col_stats.len(), 1);
assert_eq!(col_stats[0].name, "user_id");
assert_eq!(col_stats[0].column_type, "int");
assert_eq!(col_stats[0].value_count, 1000);
assert_eq!(col_stats[0].null_count, 0);
assert!(col_stats[0].min_value.is_none());
assert!(col_stats[0].max_value.is_none());
assert_eq!(col_stats[0].avg_size, 4.0);
assert_eq!(col_stats[0].cardinality, 500);
assert_eq!(col_stats[0].value_histogram.len(), 0);
assert!(col_stats[0].has_index);
}
#[test]
fn test_parse_column_statistics_with_min_max() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
let name = b"score";
data.extend_from_slice(&encode_vint(name.len() as i64));
data.extend_from_slice(name);
let col_type = b"int";
data.extend_from_slice(&encode_vint(col_type.len() as i64));
data.extend_from_slice(col_type);
data.extend_from_slice(&encode_vint(500));
data.extend_from_slice(&encode_vint(10));
data.push(1);
let min_val = vec![0x00, 0x00, 0x00, 0x01];
data.extend_from_slice(&encode_vint(min_val.len() as i64));
data.extend_from_slice(&min_val);
let max_val = vec![0x00, 0x00, 0x03, 0xE8];
data.extend_from_slice(&encode_vint(max_val.len() as i64));
data.extend_from_slice(&max_val);
data.extend_from_slice(&4.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(400));
data.extend_from_slice(&0u32.to_be_bytes());
data.push(0);
let result = parse_column_statistics(&data, 1);
assert!(result.is_ok());
let (_, col_stats) = result.unwrap();
assert_eq!(col_stats.len(), 1);
assert!(col_stats[0].min_value.is_some());
assert!(col_stats[0].max_value.is_some());
assert_eq!(col_stats[0].min_value.as_ref().unwrap(), &min_val);
assert_eq!(col_stats[0].max_value.as_ref().unwrap(), &max_val);
}
#[test]
fn test_parse_table_statistics() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&(1024 * 1024u64).to_be_bytes()); data.extend_from_slice(&(2048 * 1024u64).to_be_bytes()); data.extend_from_slice(&0.5f64.to_be_bytes()); data.extend_from_slice(&encode_vint(100)); data.extend_from_slice(&1024.0f64.to_be_bytes()); data.extend_from_slice(&1024u64.to_be_bytes()); data.extend_from_slice(&512u64.to_be_bytes()); data.extend_from_slice(&1u32.to_be_bytes());
let result = parse_table_statistics(&data);
assert!(result.is_ok());
let (remaining, table_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(table_stats.disk_size, 1024 * 1024);
assert_eq!(table_stats.uncompressed_size, 2048 * 1024);
assert_eq!(table_stats.compression_ratio, 0.5);
assert_eq!(table_stats.block_count, 100);
assert_eq!(table_stats.avg_block_size, 1024.0);
assert_eq!(table_stats.index_size, 1024);
assert_eq!(table_stats.bloom_filter_size, 512);
assert_eq!(table_stats.level_count, 1);
}
#[test]
fn test_parse_partition_statistics() {
let mut data = Vec::new();
data.extend_from_slice(&20480.0f64.to_be_bytes()); data.extend_from_slice(&1024u64.to_be_bytes()); data.extend_from_slice(&1048576u64.to_be_bytes()); data.extend_from_slice(&5.0f64.to_be_bytes()); data.extend_from_slice(&0u32.to_be_bytes());
let result = parse_partition_statistics(&data);
assert!(result.is_ok());
let (remaining, part_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(part_stats.avg_partition_size, 20480.0);
assert_eq!(part_stats.min_partition_size, 1024);
assert_eq!(part_stats.max_partition_size, 1048576);
assert_eq!(part_stats.large_partition_percentage, 5.0);
assert_eq!(part_stats.size_histogram.len(), 0);
}
#[test]
fn test_parse_partition_statistics_with_histogram() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&20480.0f64.to_be_bytes());
data.extend_from_slice(&1024u64.to_be_bytes());
data.extend_from_slice(&1048576u64.to_be_bytes());
data.extend_from_slice(&5.0f64.to_be_bytes());
data.extend_from_slice(&2u32.to_be_bytes());
data.extend_from_slice(&encode_vint(0));
data.extend_from_slice(&encode_vint(10240));
data.extend_from_slice(&encode_vint(30));
data.extend_from_slice(&60.0f64.to_be_bytes()); data.extend_from_slice(&encode_vint(10240));
data.extend_from_slice(&encode_vint(1048576));
data.extend_from_slice(&encode_vint(20));
data.extend_from_slice(&100.0f64.to_be_bytes());
let result = parse_partition_statistics(&data);
assert!(result.is_ok());
let (_, part_stats) = result.unwrap();
assert_eq!(part_stats.size_histogram.len(), 2);
assert_eq!(part_stats.size_histogram[0].size_start, 0);
assert_eq!(part_stats.size_histogram[0].cumulative_percentage, 60.0);
assert_eq!(part_stats.size_histogram[1].cumulative_percentage, 100.0);
}
#[test]
fn test_parse_compression_statistics() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
let algo = b"LZ4";
data.extend_from_slice(&encode_vint(algo.len() as i64));
data.extend_from_slice(algo);
data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&100.0f64.to_be_bytes());
data.extend_from_slice(&200.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(100));
let result = parse_compression_statistics(&data);
assert!(result.is_ok());
let (remaining, comp_stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(comp_stats.algorithm, "LZ4");
assert_eq!(comp_stats.original_size, 2048 * 1024);
assert_eq!(comp_stats.compressed_size, 1024 * 1024);
assert_eq!(comp_stats.ratio, 0.5);
assert_eq!(comp_stats.compression_speed, 100.0);
assert_eq!(comp_stats.decompression_speed, 200.0);
assert_eq!(comp_stats.compressed_blocks, 100);
}
#[test]
fn test_parse_compression_statistics_different_algorithms() {
use super::super::vint::encode_vint;
for algorithm in &["LZ4", "Snappy", "Deflate", "Zstd"] {
let mut data = Vec::new();
data.extend_from_slice(&encode_vint(algorithm.len() as i64));
data.extend_from_slice(algorithm.as_bytes());
data.extend_from_slice(&(1000000u64).to_be_bytes());
data.extend_from_slice(&(500000u64).to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&100.0f64.to_be_bytes());
data.extend_from_slice(&200.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(50));
let result = parse_compression_statistics(&data);
assert!(result.is_ok());
let (_, comp_stats) = result.unwrap();
assert_eq!(comp_stats.algorithm, *algorithm);
}
}
#[test]
fn test_parse_metadata_section_empty() {
let mut data = Vec::new();
data.extend_from_slice(&0u32.to_be_bytes());
let result = parse_metadata_section(&data);
assert!(result.is_ok());
let (remaining, metadata) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(metadata.len(), 0);
}
#[test]
fn test_parse_metadata_section_with_entries() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&2u32.to_be_bytes());
let key1 = b"compaction_strategy";
data.extend_from_slice(&encode_vint(key1.len() as i64));
data.extend_from_slice(key1);
let val1 = b"LeveledCompactionStrategy";
data.extend_from_slice(&encode_vint(val1.len() as i64));
data.extend_from_slice(val1);
let key2 = b"sstable_format";
data.extend_from_slice(&encode_vint(key2.len() as i64));
data.extend_from_slice(key2);
let val2 = b"nb";
data.extend_from_slice(&encode_vint(val2.len() as i64));
data.extend_from_slice(val2);
let result = parse_metadata_section(&data);
assert!(result.is_ok());
let (remaining, metadata) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(metadata.len(), 2);
assert_eq!(
metadata.get("compaction_strategy"),
Some(&"LeveledCompactionStrategy".to_string())
);
assert_eq!(metadata.get("sstable_format"), Some(&"nb".to_string()));
}
#[test]
fn test_parse_statistics_file() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&1u32.to_be_bytes()); data.extend_from_slice(&[1u8; 16]); data.extend_from_slice(&0u32.to_be_bytes()); data.extend_from_slice(&4096u64.to_be_bytes()); data.extend_from_slice(&0x12345678u32.to_be_bytes());
data.extend_from_slice(&encode_vint(1000));
data.extend_from_slice(&encode_vint(900));
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&encode_vint(50));
data.extend_from_slice(&20.0f64.to_be_bytes());
data.extend_from_slice(&0u32.to_be_bytes());
data.extend_from_slice(&1000000i64.to_be_bytes());
data.extend_from_slice(&2000000i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.push(0);
data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&1024.0f64.to_be_bytes());
data.extend_from_slice(&1024u64.to_be_bytes());
data.extend_from_slice(&512u64.to_be_bytes());
data.extend_from_slice(&1u32.to_be_bytes());
data.extend_from_slice(&20480.0f64.to_be_bytes());
data.extend_from_slice(&1024u64.to_be_bytes());
data.extend_from_slice(&1048576u64.to_be_bytes());
data.extend_from_slice(&5.0f64.to_be_bytes());
data.extend_from_slice(&0u32.to_be_bytes());
let algo = b"LZ4";
data.extend_from_slice(&encode_vint(algo.len() as i64));
data.extend_from_slice(algo);
data.extend_from_slice(&(2048 * 1024u64).to_be_bytes());
data.extend_from_slice(&(1024 * 1024u64).to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&100.0f64.to_be_bytes());
data.extend_from_slice(&200.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&0u32.to_be_bytes());
let result = parse_statistics_file(&data);
assert!(result.is_ok());
let (remaining, stats) = result.unwrap();
assert!(remaining.is_empty());
assert_eq!(stats.header.version, 1);
assert_eq!(stats.row_stats.total_rows, 1000);
assert_eq!(stats.timestamp_stats.min_timestamp, 1000000);
assert_eq!(stats.table_stats.disk_size, 1024 * 1024);
assert_eq!(stats.partition_stats.avg_partition_size, 20480.0);
assert_eq!(stats.compression_stats.algorithm, "LZ4");
assert_eq!(stats.metadata.len(), 0);
}
#[test]
fn test_parse_statistics_file_with_extra_data() {
use super::super::vint::encode_vint;
let mut data = Vec::new();
data.extend_from_slice(&1u32.to_be_bytes());
data.extend_from_slice(&[1u8; 16]);
data.extend_from_slice(&0u32.to_be_bytes());
data.extend_from_slice(&4096u64.to_be_bytes());
data.extend_from_slice(&0x12345678u32.to_be_bytes());
data.extend_from_slice(&encode_vint(100));
data.extend_from_slice(&encode_vint(90));
data.extend_from_slice(&encode_vint(10));
data.extend_from_slice(&encode_vint(10));
data.extend_from_slice(&10.0f64.to_be_bytes());
data.extend_from_slice(&0u32.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.extend_from_slice(&1000000i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes());
data.push(0);
data.extend_from_slice(&1024u64.to_be_bytes());
data.extend_from_slice(&2048u64.to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&encode_vint(10));
data.extend_from_slice(&100.0f64.to_be_bytes());
data.extend_from_slice(&100u64.to_be_bytes());
data.extend_from_slice(&50u64.to_be_bytes());
data.extend_from_slice(&1u32.to_be_bytes());
data.extend_from_slice(&1000.0f64.to_be_bytes());
data.extend_from_slice(&100u64.to_be_bytes());
data.extend_from_slice(&10000u64.to_be_bytes());
data.extend_from_slice(&1.0f64.to_be_bytes());
data.extend_from_slice(&0u32.to_be_bytes());
let algo = b"Snappy";
data.extend_from_slice(&encode_vint(algo.len() as i64));
data.extend_from_slice(algo);
data.extend_from_slice(&2048u64.to_be_bytes());
data.extend_from_slice(&1024u64.to_be_bytes());
data.extend_from_slice(&0.5f64.to_be_bytes());
data.extend_from_slice(&100.0f64.to_be_bytes());
data.extend_from_slice(&200.0f64.to_be_bytes());
data.extend_from_slice(&encode_vint(10));
data.extend_from_slice(&0u32.to_be_bytes());
data.extend_from_slice(b"extra_data");
let result = parse_statistics_file(&data);
assert!(result.is_ok());
let (remaining, stats) = result.unwrap();
assert_eq!(remaining, b"extra_data");
assert_eq!(stats.compression_stats.algorithm, "Snappy");
}
fn create_test_statistics() -> SSTableStatistics {
SSTableStatistics {
header: StatisticsHeader {
version: 1,
statistics_kind: 3,
data_length: 1024,
metadata1: 0,
metadata2: 0,
metadata3: 0,
checksum: 0x12345678,
table_id: Some([1; 16]),
},
row_stats: RowStatistics {
total_rows: 1000,
live_rows: 900,
tombstone_count: 100,
partition_count: 50,
avg_rows_per_partition: 20.0,
row_size_histogram: vec![],
},
timestamp_stats: TimestampStatistics {
min_timestamp: 1000000,
max_timestamp: 2000000,
min_deletion_time: 0,
max_deletion_time: 0,
min_ttl: None,
max_ttl: None,
rows_with_ttl: 0,
},
column_stats: vec![],
table_stats: TableStatistics {
disk_size: 1024 * 1024,
uncompressed_size: 2048 * 1024,
compressed_size: 1024 * 1024,
compression_ratio: 0.5,
block_count: 100,
avg_block_size: 1024.0,
index_size: 1024,
bloom_filter_size: 512,
level_count: 1,
},
partition_stats: PartitionStatistics {
avg_partition_size: 20480.0,
min_partition_size: 1024,
max_partition_size: 1048576,
size_histogram: vec![],
large_partition_percentage: 5.0,
},
compression_stats: CompressionStatistics {
algorithm: "LZ4".to_string(),
original_size: 2048 * 1024,
compressed_size: 1024 * 1024,
ratio: 0.5,
compression_speed: 100.0,
decompression_speed: 200.0,
compressed_blocks: 100,
},
metadata: HashMap::new(),
serialization_header_columns: vec![],
serialization_header_partition_keys: vec![],
serialization_header_clustering_keys: vec![],
}
}
}