use crate::error::{Error, Result};
use crate::parser::vint::encode_vuint;
use crate::schema::TableSchema;
use std::io::Write;
use std::path::PathBuf;
const TIMESTAMP_EPOCH: i64 = 1442880000000000; const DELETION_TIME_EPOCH: i32 = 1442880000; const TTL_EPOCH: i32 = 0;
const NUM_COMPONENTS: u32 = 4;
const METADATA_TYPE_VALIDATION: u32 = 0;
const METADATA_TYPE_COMPACTION: u32 = 1;
const METADATA_TYPE_STATS: u32 = 2;
const METADATA_TYPE_SERIALIZATION_HEADER: u32 = 3;
#[derive(Debug, Clone)]
pub struct StatisticsMetadata {
pub min_timestamp: i64,
pub max_timestamp: i64,
pub min_local_deletion_time: i32,
pub max_local_deletion_time: i32,
pub min_ttl: i32,
pub max_ttl: i32,
pub partition_count: u64,
pub row_count: u64,
pub column_count: u64,
pub total_rows_size: u64,
}
impl Default for StatisticsMetadata {
fn default() -> Self {
Self {
min_timestamp: i64::MAX,
max_timestamp: i64::MIN,
min_local_deletion_time: i32::MAX,
max_local_deletion_time: i32::MIN,
min_ttl: i32::MAX,
max_ttl: 0,
partition_count: 0,
row_count: 0,
column_count: 0,
total_rows_size: 0,
}
}
}
impl StatisticsMetadata {
pub fn new() -> Self {
Self::default()
}
pub fn update_timestamp(&mut self, timestamp: i64) {
self.min_timestamp = self.min_timestamp.min(timestamp);
self.max_timestamp = self.max_timestamp.max(timestamp);
}
pub fn update_local_deletion_time(&mut self, deletion_time: i32) {
self.min_local_deletion_time = self.min_local_deletion_time.min(deletion_time);
self.max_local_deletion_time = self.max_local_deletion_time.max(deletion_time);
}
pub fn update_ttl(&mut self, ttl: i32) {
if ttl > 0 {
self.min_ttl = self.min_ttl.min(ttl);
self.max_ttl = self.max_ttl.max(ttl);
}
}
pub fn increment_partition_count(&mut self) {
self.partition_count += 1;
}
pub fn increment_row_count(&mut self) {
self.row_count += 1;
}
pub fn add_column_count(&mut self, count: u64) {
self.column_count += count;
}
pub fn add_rows_size(&mut self, size: u64) {
self.total_rows_size += size;
}
pub fn finalize(&mut self) {
if self.min_timestamp == i64::MAX {
self.min_timestamp = 0;
}
if self.max_timestamp == i64::MIN {
self.max_timestamp = 0;
}
if self.min_local_deletion_time == i32::MAX {
self.min_local_deletion_time = 0;
}
if self.max_local_deletion_time == i32::MIN {
self.max_local_deletion_time = 0;
}
if self.min_ttl == i32::MAX {
self.min_ttl = 0;
}
}
}
fn cql_type_to_marshal_type(cql_type: &str) -> String {
let trimmed = cql_type.trim().to_lowercase();
let trimmed = trimmed.as_str();
let prefix = "org.apache.cassandra.db.marshal.";
if let Some(inner) = strip_cql_wrapper(trimmed, "list") {
return format!("{prefix}ListType({})", cql_type_to_marshal_type(inner));
}
if let Some(inner) = strip_cql_wrapper(trimmed, "set") {
return format!("{prefix}SetType({})", cql_type_to_marshal_type(inner));
}
if let Some(inner) = strip_cql_wrapper(trimmed, "map") {
let args = split_cql_type_args(inner);
if args.len() == 2 {
return format!(
"{prefix}MapType({},{})",
cql_type_to_marshal_type(args[0]),
cql_type_to_marshal_type(args[1])
);
}
}
if let Some(inner) = strip_cql_wrapper(trimmed, "frozen") {
return format!("{prefix}FrozenType({})", cql_type_to_marshal_type(inner));
}
if let Some(inner) = strip_cql_wrapper(trimmed, "tuple") {
let args = split_cql_type_args(inner);
let components: Vec<String> = args.iter().map(|a| cql_type_to_marshal_type(a)).collect();
return format!("{prefix}TupleType({})", components.join(","));
}
match trimmed {
"text" | "varchar" => format!("{prefix}UTF8Type"),
"int" => format!("{prefix}Int32Type"),
"bigint" => format!("{prefix}LongType"),
"smallint" => format!("{prefix}ShortType"),
"tinyint" => format!("{prefix}ByteType"),
"float" => format!("{prefix}FloatType"),
"double" => format!("{prefix}DoubleType"),
"boolean" => format!("{prefix}BooleanType"),
"blob" => format!("{prefix}BytesType"),
"uuid" => format!("{prefix}UUIDType"),
"timeuuid" => format!("{prefix}TimeUUIDType"),
"timestamp" => format!("{prefix}TimestampType"),
"date" => format!("{prefix}SimpleDateType"),
"time" => format!("{prefix}TimeType"),
"duration" => format!("{prefix}DurationType"),
"inet" => format!("{prefix}InetAddressType"),
"ascii" => format!("{prefix}AsciiType"),
"decimal" => format!("{prefix}DecimalType"),
"varint" => format!("{prefix}IntegerType"),
"counter" => format!("{prefix}CounterColumnType"),
_ => format!("{prefix}BytesType"),
}
}
fn strip_cql_wrapper<'a>(cql_type: &'a str, wrapper: &str) -> Option<&'a str> {
let pattern = format!("{}<", wrapper);
if let Some(rest) = cql_type.strip_prefix(&pattern) {
let mut depth = 1;
for (i, ch) in rest.char_indices() {
match ch {
'<' => depth += 1,
'>' => {
depth -= 1;
if depth == 0 {
return Some(rest[..i].trim());
}
}
_ => {}
}
}
}
None
}
fn split_cql_type_args(s: &str) -> Vec<&str> {
let mut result = Vec::new();
let mut depth = 0;
let mut start = 0;
for (i, ch) in s.char_indices() {
match ch {
'<' => depth += 1,
'>' => depth -= 1,
',' if depth == 0 => {
result.push(s[start..i].trim());
start = i + 1;
}
_ => {}
}
}
let last = s[start..].trim();
if !last.is_empty() {
result.push(last);
}
result
}
#[derive(Debug)]
pub struct StatisticsWriter {
path: PathBuf,
}
impl StatisticsWriter {
pub fn new(path: PathBuf) -> Self {
Self { path }
}
pub fn write(&self, metadata: &StatisticsMetadata, schema: Option<&TableSchema>) -> Result<()> {
let mut meta = metadata.clone();
meta.finalize();
let validation_data = self.build_validation_component()?;
let compaction_data = self.build_compaction_component()?;
let stats_data = self.build_stats_component(&meta)?;
let header_data = self.build_serialization_header_component(schema, metadata)?;
let toc_size = 4 + 4 + (NUM_COMPONENTS as usize * 8) + 4;
let mut offset = toc_size;
let validation_offset = offset;
offset += validation_data.len() + 4;
let compaction_offset = offset;
offset += compaction_data.len() + 4;
let stats_offset = offset;
offset += stats_data.len() + 4;
let header_offset = offset;
if offset > u32::MAX as usize {
return Err(Error::Storage(format!(
"Statistics.db too large: {} bytes exceeds u32::MAX",
offset
)));
}
let mut buffer = Vec::new();
let mut crc = crc32fast::Hasher::new();
buffer.write_all(&NUM_COMPONENTS.to_be_bytes())?;
self.update_checksum_int(&mut crc, NUM_COMPONENTS);
let checksum1 = crc.clone().finalize();
buffer.write_all(&checksum1.to_be_bytes())?;
crc = crc32fast::Hasher::new();
self.update_checksum_int(&mut crc, NUM_COMPONENTS);
self.write_toc_entry(
&mut buffer,
&mut crc,
METADATA_TYPE_VALIDATION,
validation_offset as u32,
)?;
self.write_toc_entry(
&mut buffer,
&mut crc,
METADATA_TYPE_COMPACTION,
compaction_offset as u32,
)?;
self.write_toc_entry(
&mut buffer,
&mut crc,
METADATA_TYPE_STATS,
stats_offset as u32,
)?;
self.write_toc_entry(
&mut buffer,
&mut crc,
METADATA_TYPE_SERIALIZATION_HEADER,
header_offset as u32,
)?;
let toc_checksum = crc.finalize();
buffer.write_all(&toc_checksum.to_be_bytes())?;
self.write_component(&mut buffer, &validation_data)?;
self.write_component(&mut buffer, &compaction_data)?;
self.write_component(&mut buffer, &stats_data)?;
self.write_component(&mut buffer, &header_data)?;
std::fs::write(&self.path, buffer).map_err(|e| {
Error::Storage(format!(
"Failed to write Statistics.db to {}: {}",
self.path.display(),
e
))
})?;
Ok(())
}
fn update_checksum_int(&self, crc: &mut crc32fast::Hasher, value: u32) {
crc.update(&value.to_be_bytes());
}
fn write_toc_entry(
&self,
buffer: &mut Vec<u8>,
crc: &mut crc32fast::Hasher,
component_type: u32,
offset: u32,
) -> Result<()> {
buffer.write_all(&component_type.to_be_bytes())?;
self.update_checksum_int(crc, component_type);
buffer.write_all(&offset.to_be_bytes())?;
self.update_checksum_int(crc, offset);
Ok(())
}
fn write_component(&self, buffer: &mut Vec<u8>, data: &[u8]) -> Result<()> {
buffer.write_all(data)?;
let checksum = crc32fast::hash(data);
buffer.write_all(&checksum.to_be_bytes())?;
Ok(())
}
fn build_validation_component(&self) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
let partitioner = b"org.apache.cassandra.dht.Murmur3Partitioner";
let len = partitioner.len() as u16;
buffer.write_all(&len.to_be_bytes())?;
buffer.write_all(partitioner)?;
let fp_chance = 0.01f64;
buffer.write_all(&fp_chance.to_be_bytes())?;
Ok(buffer)
}
fn build_compaction_component(&self) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
const HLL_DATA: [u8; 15] = [
0xFF, 0xFF, 0xFF, 0xFE, 0x0B, 0x19, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
buffer.write_all(&(HLL_DATA.len() as i32).to_be_bytes())?;
buffer.write_all(&HLL_DATA)?;
Ok(buffer)
}
fn build_stats_component(&self, metadata: &StatisticsMetadata) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
self.write_estimated_histogram(&mut buffer)?;
self.write_estimated_histogram(&mut buffer)?;
buffer.write_all(&(-1i64).to_be_bytes())?; buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&metadata.min_timestamp.to_be_bytes())?;
buffer.write_all(&metadata.max_timestamp.to_be_bytes())?;
let min_del_time = if metadata.min_local_deletion_time == 0 {
i32::MAX
} else {
metadata.min_local_deletion_time
};
buffer.write_all(&min_del_time.to_be_bytes())?;
let max_del_time = if metadata.max_local_deletion_time == 0 {
i32::MAX
} else {
metadata.max_local_deletion_time
};
buffer.write_all(&max_del_time.to_be_bytes())?;
buffer.write_all(&metadata.min_ttl.to_be_bytes())?;
buffer.write_all(&metadata.max_ttl.to_be_bytes())?;
buffer.write_all(&(-1.0f64).to_be_bytes())?;
self.write_tombstone_histogram(&mut buffer)?;
buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&0i64.to_be_bytes())?;
buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&[0x00])?;
buffer.write_all(&metadata.column_count.to_be_bytes())?;
buffer.write_all(&metadata.row_count.to_be_bytes())?;
buffer.write_all(&(-1i64).to_be_bytes())?; buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&0i32.to_be_bytes())?;
buffer.write_all(&[0x00])?;
buffer.write_all(&[0x00])?;
buffer.write_all(&[0x00])?;
Ok(buffer)
}
fn write_estimated_histogram(&self, buffer: &mut Vec<u8>) -> Result<()> {
buffer.write_all(&2i32.to_be_bytes())?;
buffer.write_all(&1i64.to_be_bytes())?; buffer.write_all(&0i64.to_be_bytes())?;
buffer.write_all(&1i64.to_be_bytes())?; buffer.write_all(&0i64.to_be_bytes())?;
Ok(())
}
fn write_tombstone_histogram(&self, buffer: &mut Vec<u8>) -> Result<()> {
buffer.write_all(&0i32.to_be_bytes())?; buffer.write_all(&0i32.to_be_bytes())?; Ok(())
}
fn build_serialization_header_component(
&self,
schema: Option<&TableSchema>,
metadata: &StatisticsMetadata,
) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
let min_ts = if metadata.min_timestamp == i64::MAX {
TIMESTAMP_EPOCH as u64
} else {
metadata.min_timestamp as u64
};
let min_ts_delta = min_ts.wrapping_sub(TIMESTAMP_EPOCH as u64);
buffer.write_all(&encode_vuint(min_ts_delta))?;
let min_ldt = if metadata.min_local_deletion_time == i32::MAX {
i32::MAX as u64
} else {
metadata.min_local_deletion_time as u64
};
let min_del_delta = min_ldt.wrapping_sub(DELETION_TIME_EPOCH as u64);
buffer.write_all(&encode_vuint(min_del_delta))?;
let min_ttl = if metadata.min_ttl == i32::MAX {
0u64
} else {
metadata.min_ttl as u64
};
let min_ttl_delta = min_ttl.wrapping_sub(TTL_EPOCH as u64);
buffer.write_all(&encode_vuint(min_ttl_delta))?;
match schema {
Some(s) => {
let key_marshal = if s.partition_keys.len() > 1 {
let inner: Vec<String> = s
.partition_keys
.iter()
.map(|pk| cql_type_to_marshal_type(&pk.data_type))
.collect();
format!(
"org.apache.cassandra.db.marshal.CompositeType({})",
inner.join(",")
)
} else if !s.partition_keys.is_empty() {
cql_type_to_marshal_type(&s.partition_keys[0].data_type)
} else {
"org.apache.cassandra.db.marshal.BytesType".to_string()
};
buffer.write_all(&encode_vuint(key_marshal.len() as u64))?;
buffer.write_all(key_marshal.as_bytes())?;
buffer.write_all(&encode_vuint(s.clustering_keys.len() as u64))?;
for ck in &s.clustering_keys {
let ck_marshal = cql_type_to_marshal_type(&ck.data_type);
buffer.write_all(&encode_vuint(ck_marshal.len() as u64))?;
buffer.write_all(ck_marshal.as_bytes())?;
}
let pk_names: std::collections::HashSet<&str> =
s.partition_keys.iter().map(|k| k.name.as_str()).collect();
let ck_names: std::collections::HashSet<&str> =
s.clustering_keys.iter().map(|k| k.name.as_str()).collect();
let mut static_cols: Vec<_> = s
.columns
.iter()
.filter(|c| {
c.is_static
&& !pk_names.contains(c.name.as_str())
&& !ck_names.contains(c.name.as_str())
})
.collect();
static_cols.sort_by(|a, b| a.name.cmp(&b.name));
buffer.write_all(&encode_vuint(static_cols.len() as u64))?;
for col in &static_cols {
buffer.write_all(&encode_vuint(col.name.len() as u64))?;
buffer.write_all(col.name.as_bytes())?;
let col_marshal = cql_type_to_marshal_type(&col.data_type);
buffer.write_all(&encode_vuint(col_marshal.len() as u64))?;
buffer.write_all(col_marshal.as_bytes())?;
}
let mut regular_cols: Vec<_> = s
.columns
.iter()
.filter(|c| {
!c.is_static
&& !pk_names.contains(c.name.as_str())
&& !ck_names.contains(c.name.as_str())
})
.collect();
regular_cols.sort_by(|a, b| a.name.cmp(&b.name));
buffer.write_all(&encode_vuint(regular_cols.len() as u64))?;
for col in ®ular_cols {
buffer.write_all(&encode_vuint(col.name.len() as u64))?;
buffer.write_all(col.name.as_bytes())?;
let col_marshal = cql_type_to_marshal_type(&col.data_type);
buffer.write_all(&encode_vuint(col_marshal.len() as u64))?;
buffer.write_all(col_marshal.as_bytes())?;
}
}
None => {
let key_type = b"org.apache.cassandra.db.marshal.BytesType";
buffer.write_all(&encode_vuint(key_type.len() as u64))?;
buffer.write_all(key_type)?;
buffer.write_all(&encode_vuint(0))?;
buffer.write_all(&encode_vuint(0))?;
buffer.write_all(&encode_vuint(0))?;
}
}
Ok(buffer)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_statistics_metadata_default() {
let meta = StatisticsMetadata::new();
assert_eq!(meta.partition_count, 0);
assert_eq!(meta.row_count, 0);
}
#[test]
fn test_statistics_metadata_update_timestamp() {
let mut meta = StatisticsMetadata::new();
meta.update_timestamp(1000000);
meta.update_timestamp(2000000);
meta.update_timestamp(500000);
assert_eq!(meta.min_timestamp, 500000);
assert_eq!(meta.max_timestamp, 2000000);
}
#[test]
fn test_statistics_metadata_update_ttl() {
let mut meta = StatisticsMetadata::new();
meta.update_ttl(3600);
meta.update_ttl(86400);
meta.update_ttl(1800);
assert_eq!(meta.min_ttl, 1800);
assert_eq!(meta.max_ttl, 86400);
}
#[test]
fn test_statistics_metadata_finalize() {
let mut meta = StatisticsMetadata::new();
meta.finalize();
assert_eq!(meta.min_timestamp, 0);
assert_eq!(meta.max_timestamp, 0);
assert_eq!(meta.min_local_deletion_time, 0);
assert_eq!(meta.max_local_deletion_time, 0);
assert_eq!(meta.min_ttl, 0);
}
#[test]
fn test_statistics_writer_basic() {
let temp_dir = TempDir::new().unwrap();
let stats_path = temp_dir.path().join("test-Statistics.db");
let writer = StatisticsWriter::new(stats_path.clone());
let mut meta = StatisticsMetadata::new();
meta.update_timestamp(1000000);
meta.update_timestamp(2000000);
meta.min_local_deletion_time = 0;
meta.max_local_deletion_time = 0;
meta.min_ttl = 0;
meta.max_ttl = 0;
meta.partition_count = 10;
meta.row_count = 100;
let result = writer.write(&meta, None);
assert!(result.is_ok(), "Write should succeed: {:?}", result);
assert!(stats_path.exists());
let file_size = std::fs::metadata(&stats_path).unwrap().len();
assert!(file_size > 0, "Statistics.db should not be empty");
let file_data = std::fs::read(&stats_path).unwrap();
assert!(
file_data.len() >= 44,
"File should have at least 44 bytes (TOC)"
);
let num_components =
u32::from_be_bytes([file_data[0], file_data[1], file_data[2], file_data[3]]);
assert_eq!(num_components, 4, "Should have num_components=4");
let checksum1 =
u32::from_be_bytes([file_data[4], file_data[5], file_data[6], file_data[7]]);
let expected_checksum1 = crc32fast::hash(&num_components.to_be_bytes());
assert_eq!(
checksum1, expected_checksum1,
"First checksum should match CRC32(num_components)"
);
assert!(file_data.len() >= 40, "Should have space for TOC entries");
assert!(file_data.len() >= 44, "Should have TOC checksum at byte 40");
}
#[test]
fn test_build_validation_component() {
let writer = StatisticsWriter::new(PathBuf::from("test.db"));
let result = writer.build_validation_component();
assert!(result.is_ok());
let bytes = result.unwrap();
assert!(!bytes.is_empty());
let partitioner = b"org.apache.cassandra.dht.Murmur3Partitioner";
assert!(bytes.windows(partitioner.len()).any(|w| w == partitioner));
assert_eq!(bytes.len(), 53);
}
#[test]
fn test_build_stats_component() {
let writer = StatisticsWriter::new(PathBuf::from("test.db"));
let mut meta = StatisticsMetadata::new();
meta.min_timestamp = 1000000;
meta.max_timestamp = 2000000;
meta.min_local_deletion_time = 0;
meta.max_local_deletion_time = 0;
meta.min_ttl = 0;
meta.max_ttl = 0;
meta.partition_count = 100;
meta.row_count = 100;
meta.column_count = 200;
let result = writer.build_stats_component(&meta);
assert!(result.is_ok());
let data = result.unwrap();
assert!(!data.is_empty());
assert_eq!(data.len(), 188);
let row_count_offset = 161;
let row_count_bytes = &data[row_count_offset..row_count_offset + 8];
let row_count = u64::from_be_bytes(row_count_bytes.try_into().unwrap());
assert_eq!(row_count, 100);
}
#[test]
fn test_checksums_format() {
let temp_dir = TempDir::new().unwrap();
let stats_path = temp_dir.path().join("test-Statistics.db");
let writer = StatisticsWriter::new(stats_path.clone());
let mut meta = StatisticsMetadata::new();
meta.min_timestamp = 1000000;
meta.partition_count = 10;
writer.write(&meta, None).unwrap();
let file_data = std::fs::read(&stats_path).unwrap();
let num_components =
u32::from_be_bytes([file_data[0], file_data[1], file_data[2], file_data[3]]);
let checksum1 =
u32::from_be_bytes([file_data[4], file_data[5], file_data[6], file_data[7]]);
let mut crc = crc32fast::Hasher::new();
crc.update(&num_components.to_be_bytes());
let expected_checksum1 = crc.finalize();
assert_eq!(checksum1, expected_checksum1, "Count checksum should match");
let mut crc = crc32fast::Hasher::new();
crc.update(&num_components.to_be_bytes());
for i in 0..num_components {
let offset = 8 + (i as usize * 8);
let comp_type = u32::from_be_bytes([
file_data[offset],
file_data[offset + 1],
file_data[offset + 2],
file_data[offset + 3],
]);
let comp_offset = u32::from_be_bytes([
file_data[offset + 4],
file_data[offset + 5],
file_data[offset + 6],
file_data[offset + 7],
]);
crc.update(&comp_type.to_be_bytes());
crc.update(&comp_offset.to_be_bytes());
}
let toc_checksum =
u32::from_be_bytes([file_data[40], file_data[41], file_data[42], file_data[43]]);
let expected_toc_checksum = crc.finalize();
assert_eq!(
toc_checksum, expected_toc_checksum,
"TOC checksum should match cumulative CRC32"
);
}
#[test]
fn test_component_checksums() {
let temp_dir = TempDir::new().unwrap();
let stats_path = temp_dir.path().join("test-Statistics.db");
let writer = StatisticsWriter::new(stats_path.clone());
let mut meta = StatisticsMetadata::new();
meta.min_timestamp = 1000000;
meta.partition_count = 100;
writer.write(&meta, None).unwrap();
let file_data = std::fs::read(&stats_path).unwrap();
let num_components =
u32::from_be_bytes([file_data[0], file_data[1], file_data[2], file_data[3]]);
assert_eq!(num_components, 4);
let mut component_offsets = Vec::new();
for i in 0..num_components {
let offset = 8 + (i as usize * 8) + 4; let comp_offset = u32::from_be_bytes([
file_data[offset],
file_data[offset + 1],
file_data[offset + 2],
file_data[offset + 3],
]);
component_offsets.push(comp_offset as usize);
}
for i in 0..num_components as usize {
let comp_start = component_offsets[i];
let comp_end = if i < component_offsets.len() - 1 {
component_offsets[i + 1]
} else {
file_data.len()
};
let comp_length = comp_end - comp_start - 4;
let component_data = &file_data[comp_start..comp_start + comp_length];
let stored_checksum = u32::from_be_bytes([
file_data[comp_start + comp_length],
file_data[comp_start + comp_length + 1],
file_data[comp_start + comp_length + 2],
file_data[comp_start + comp_length + 3],
]);
let computed_checksum = crc32fast::hash(component_data);
assert_eq!(
stored_checksum, computed_checksum,
"Component {} checksum mismatch",
i
);
}
}
#[test]
fn test_component_binary_formats() {
let temp_dir = TempDir::new().unwrap();
let stats_path = temp_dir.path().join("test-Statistics.db");
let writer = StatisticsWriter::new(stats_path.clone());
let mut meta = StatisticsMetadata::new();
meta.min_timestamp = TIMESTAMP_EPOCH;
meta.max_timestamp = TIMESTAMP_EPOCH + 1000000;
meta.min_local_deletion_time = DELETION_TIME_EPOCH;
meta.max_local_deletion_time = DELETION_TIME_EPOCH + 100;
meta.min_ttl = 0;
meta.max_ttl = 200;
meta.partition_count = 50;
meta.row_count = 150;
meta.column_count = 300;
writer.write(&meta, None).unwrap();
let file_data = std::fs::read(&stats_path).unwrap();
let num_components =
u32::from_be_bytes([file_data[0], file_data[1], file_data[2], file_data[3]]);
assert_eq!(num_components, 4, "Should have 4 components");
let validation_offset =
u32::from_be_bytes([file_data[12], file_data[13], file_data[14], file_data[15]])
as usize;
let compaction_offset =
u32::from_be_bytes([file_data[20], file_data[21], file_data[22], file_data[23]])
as usize;
let stats_offset =
u32::from_be_bytes([file_data[28], file_data[29], file_data[30], file_data[31]])
as usize;
let header_offset =
u32::from_be_bytes([file_data[36], file_data[37], file_data[38], file_data[39]])
as usize;
let partitioner_len = u16::from_be_bytes([
file_data[validation_offset],
file_data[validation_offset + 1],
]);
assert_eq!(
partitioner_len, 43,
"Partitioner string length should be 43"
);
let hll_len = i32::from_be_bytes([
file_data[compaction_offset],
file_data[compaction_offset + 1],
file_data[compaction_offset + 2],
file_data[compaction_offset + 3],
]);
assert_eq!(hll_len, 15, "HLL data length should be 15 bytes");
let hll_version = i32::from_be_bytes([
file_data[compaction_offset + 4],
file_data[compaction_offset + 5],
file_data[compaction_offset + 6],
file_data[compaction_offset + 7],
]);
assert_eq!(hll_version, -2, "HLL version should be -2");
let stats_end = header_offset;
let stats_size = stats_end - stats_offset - 4; assert_eq!(stats_size, 188, "STATS component should be 188 bytes");
let ts_offset = stats_offset + 84;
let min_ts = i64::from_be_bytes([
file_data[ts_offset],
file_data[ts_offset + 1],
file_data[ts_offset + 2],
file_data[ts_offset + 3],
file_data[ts_offset + 4],
file_data[ts_offset + 5],
file_data[ts_offset + 6],
file_data[ts_offset + 7],
]);
assert_eq!(min_ts, TIMESTAMP_EPOCH, "Min timestamp should be preserved");
assert_eq!(
file_data[header_offset], 0x00,
"EncodingStats minTimestamp delta should be 0"
);
assert_eq!(
file_data[header_offset + 1],
0x00,
"EncodingStats minLocalDeletionTime delta should be 0"
);
assert_eq!(
file_data[header_offset + 2],
0x00,
"EncodingStats minTTL delta should be 0"
);
}
#[test]
fn test_cql_type_to_marshal_type() {
assert_eq!(
cql_type_to_marshal_type("text"),
"org.apache.cassandra.db.marshal.UTF8Type"
);
assert_eq!(
cql_type_to_marshal_type("int"),
"org.apache.cassandra.db.marshal.Int32Type"
);
assert_eq!(
cql_type_to_marshal_type("bigint"),
"org.apache.cassandra.db.marshal.LongType"
);
assert_eq!(
cql_type_to_marshal_type("uuid"),
"org.apache.cassandra.db.marshal.UUIDType"
);
assert_eq!(
cql_type_to_marshal_type("blob"),
"org.apache.cassandra.db.marshal.BytesType"
);
assert_eq!(
cql_type_to_marshal_type("timestamp"),
"org.apache.cassandra.db.marshal.TimestampType"
);
assert_eq!(
cql_type_to_marshal_type("boolean"),
"org.apache.cassandra.db.marshal.BooleanType"
);
assert_eq!(
cql_type_to_marshal_type("varint"),
"org.apache.cassandra.db.marshal.IntegerType"
);
assert_eq!(
cql_type_to_marshal_type("unknown_type"),
"org.apache.cassandra.db.marshal.BytesType"
);
assert_eq!(
cql_type_to_marshal_type("list<int>"),
"org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.Int32Type)"
);
assert_eq!(
cql_type_to_marshal_type("set<text>"),
"org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)"
);
assert_eq!(
cql_type_to_marshal_type("map<text, int>"),
"org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.Int32Type)"
);
assert_eq!(
cql_type_to_marshal_type("frozen<list<int>>"),
"org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.Int32Type))"
);
assert_eq!(
cql_type_to_marshal_type("tuple<int, text>"),
"org.apache.cassandra.db.marshal.TupleType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)"
);
}
#[test]
fn test_serialization_header_with_schema() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "uuid".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "uuid".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "age".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let writer = StatisticsWriter::new(PathBuf::from("test.db"));
let meta = StatisticsMetadata::new();
let result = writer.build_serialization_header_component(Some(&schema), &meta);
assert!(result.is_ok());
let bytes = result.unwrap();
let header_str = String::from_utf8_lossy(&bytes);
assert!(
header_str.contains("UUIDType"),
"Header should contain UUIDType for uuid partition key"
);
assert!(
header_str.contains("name"),
"Header should contain column 'name'"
);
assert!(
header_str.contains("age"),
"Header should contain column 'age'"
);
assert!(
header_str.contains("UTF8Type"),
"Header should contain UTF8Type for text column"
);
assert!(
header_str.contains("Int32Type"),
"Header should contain Int32Type for int column"
);
}
#[test]
fn test_serialization_header_composite_partition_key() {
use crate::schema::{Column, KeyColumn, TableSchema};
use std::collections::HashMap;
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "composite_table".to_string(),
partition_keys: vec![
KeyColumn {
name: "tenant".to_string(),
data_type: "text".to_string(),
position: 0,
},
KeyColumn {
name: "id".to_string(),
data_type: "uuid".to_string(),
position: 1,
},
],
clustering_keys: vec![],
columns: vec![
Column {
name: "tenant".to_string(),
data_type: "text".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "id".to_string(),
data_type: "uuid".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "value".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let writer = StatisticsWriter::new(PathBuf::from("test.db"));
let meta = StatisticsMetadata::new();
let bytes = writer
.build_serialization_header_component(Some(&schema), &meta)
.unwrap();
let header_str = String::from_utf8_lossy(&bytes);
assert!(
header_str.contains("CompositeType("),
"Composite PK should produce CompositeType wrapper"
);
assert!(
header_str.contains("UTF8Type"),
"CompositeType should contain UTF8Type for text PK"
);
assert!(
header_str.contains("UUIDType"),
"CompositeType should contain UUIDType for uuid PK"
);
}
}