use super::statistics::*;
use super::vint::parse_vuint;
use crate::error::{Error, Result};
use crate::storage::sstable::version_gate::VersionGates;
use nom::{bytes::complete::take, number::complete::be_u32, IResult};
#[allow(dead_code)]
const METADATA_TYPE_VALIDATION: u32 = 0;
#[allow(dead_code)]
const METADATA_TYPE_COMPACTION: u32 = 1;
#[allow(dead_code)]
const METADATA_TYPE_STATS: u32 = 2;
const METADATA_TYPE_HEADER: u32 = 3;
const TIMESTAMP_EPOCH: i64 = 1_442_880_000_000_000; const DELETION_TIME_EPOCH: i64 = 1_442_880_000; const TTL_EPOCH: i64 = 0;
type EncodingStatsResult = (
i64,
i64,
Option<i64>,
Vec<super::header::ColumnInfo>,
Vec<super::header::ColumnInfo>,
Vec<super::header::ColumnInfo>,
);
type SerializationHeaderResult = (Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>);
pub fn parse_nb_format_header(input: &[u8]) -> IResult<&[u8], StatisticsHeader> {
let (input, version_type) = be_u32(input)?;
let (input, statistics_kind) = be_u32(input)?;
let (input, _reserved1) = be_u32(input)?;
let (input, data_length) = be_u32(input)?;
let (input, metadata1) = be_u32(input)?;
let (input, metadata2) = be_u32(input)?;
let (input, metadata3) = be_u32(input)?;
let (input, checksum_or_more) = be_u32(input)?;
Ok((
input,
StatisticsHeader {
version: version_type,
statistics_kind,
data_length,
metadata1,
metadata2,
metadata3,
checksum: checksum_or_more,
table_id: None,
},
))
}
fn parse_statistics_toc_for_header_offset(input: &[u8]) -> Option<usize> {
if input.len() < 8 {
log::debug!("Statistics.db too small for TOC: {} bytes", input.len());
return None;
}
let num_components = u32::from_be_bytes([input[0], input[1], input[2], input[3]]);
log::debug!("Statistics.db TOC: {} components", num_components);
if num_components > 100 {
log::warn!(
"Suspicious num_components={} in Statistics.db TOC (expected <=4)",
num_components
);
return None;
}
let toc_start: usize = 8;
let toc_entry_size: usize = 8;
let toc_size = (num_components as usize)
.checked_mul(toc_entry_size)
.and_then(|size| size.checked_add(toc_start))?;
if input.len() < toc_size {
log::debug!(
"Statistics.db too small for {} TOC entries: {} bytes (need {})",
num_components,
input.len(),
toc_size
);
return None;
}
for i in 0..num_components as usize {
let entry_offset = i
.checked_mul(toc_entry_size)
.and_then(|offset| offset.checked_add(toc_start))?;
let component_type = u32::from_be_bytes([
input[entry_offset],
input[entry_offset + 1],
input[entry_offset + 2],
input[entry_offset + 3],
]);
let component_offset = u32::from_be_bytes([
input[entry_offset + 4],
input[entry_offset + 5],
input[entry_offset + 6],
input[entry_offset + 7],
]) as usize;
log::debug!(
"TOC entry {}: type={} offset=0x{:x}",
i,
component_type,
component_offset
);
if component_type == METADATA_TYPE_HEADER {
log::debug!(
"Found HEADER component at offset 0x{:x} ({})",
component_offset,
component_offset
);
return Some(component_offset);
}
}
log::debug!("HEADER component not found in Statistics.db TOC");
None
}
#[allow(clippy::type_complexity)]
pub fn parse_nb_format_statistics_data(
input: &[u8],
header: &StatisticsHeader,
full_input: &[u8],
gates: Option<&VersionGates>,
) -> Result<(
RowStatistics,
TimestampStatistics,
TableStatistics,
PartitionStatistics,
CompressionStatistics,
Vec<super::header::ColumnInfo>,
Vec<super::header::ColumnInfo>,
Vec<super::header::ColumnInfo>,
)> {
let header_offset = parse_statistics_toc_for_header_offset(full_input);
let result = parse_minimal_encoding_stats(input, full_input, header_offset, gates);
match result {
Ok((
_,
(
min_timestamp,
min_deletion_time,
min_ttl,
partition_columns,
clustering_columns,
regular_columns,
),
)) => {
let row_stats = RowStatistics {
total_rows: 0,
live_rows: 0,
tombstone_count: 0,
partition_count: 0,
avg_rows_per_partition: 0.0,
row_size_histogram: vec![],
};
let timestamp_stats = TimestampStatistics {
min_timestamp,
max_timestamp: min_timestamp, min_deletion_time,
max_deletion_time: min_deletion_time,
min_ttl,
max_ttl: min_ttl,
rows_with_ttl: 0,
};
let table_stats = TableStatistics {
disk_size: 0,
uncompressed_size: 0,
compressed_size: 0,
compression_ratio: 1.0,
block_count: 0,
avg_block_size: 0.0,
index_size: 0,
bloom_filter_size: 0,
level_count: 0,
};
let partition_stats = PartitionStatistics {
avg_partition_size: 0.0,
min_partition_size: 0,
max_partition_size: 0,
large_partition_percentage: 0.0,
size_histogram: vec![],
};
let compression_stats = CompressionStatistics {
algorithm: "unknown".to_string(),
original_size: 0,
compressed_size: 0,
ratio: 1.0,
compression_speed: 0.0,
decompression_speed: 0.0,
compressed_blocks: 0,
};
Ok((
row_stats,
timestamp_stats,
table_stats,
partition_stats,
compression_stats,
partition_columns,
clustering_columns,
regular_columns,
))
}
Err(e) => {
log::debug!(
"Failed to parse minimal EncodingStats from Statistics.db: {:?}",
e
);
Err(Error::UnsupportedFormat(format!(
"Failed to parse minimal nb-format Statistics.db EncodingStats: {:?}. \
This is required for delta-coded timestamp decoding. \
Header checksum: 0x{:08x}, data_length: {}",
e, header.checksum, header.data_length
)))
}
}
}
fn parse_serialization_header(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
log::debug!(
"Searching for SerializationHeader in {} bytes (max search: 8KB)",
input.len()
);
let preview_len = std::cmp::min(64, input.len());
let preview_hex: String = input[..preview_len]
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(" ");
log::debug!(
"Input buffer size: {} bytes, first 64 bytes: {}",
input.len(),
preview_hex
);
let mut search_offset = 0;
let marshal_pattern = b"org.apache.cassandra.db.marshal";
while search_offset + marshal_pattern.len() < input.len() && search_offset < 8192 {
if &input[search_offset..search_offset + marshal_pattern.len()] == marshal_pattern {
let context_start = search_offset.saturating_sub(10);
let context_end = (search_offset + 50).min(input.len());
log::debug!(
"Found 'org.apache.cassandra.db.marshal' at offset {}, context (offset-10 to offset+50): {:02x?}",
search_offset,
&input[context_start..context_end]
);
for lookback in 1..=15 {
if search_offset < lookback {
break;
}
let type_len_offset = search_offset - lookback;
let first_byte = input[type_len_offset];
let is_valid_single_byte_len = (0x20..=0x7F).contains(&first_byte);
let is_multi_byte_vint = first_byte >= 0x80;
if is_valid_single_byte_len || is_multi_byte_vint {
let result = parse_serialization_header_sequential(&input[type_len_offset..]);
if let Ok((remaining, (pk_types, ck_types, cols))) = result {
if !pk_types.is_empty()
&& pk_types[0].contains("org.apache.cassandra.db.marshal")
{
log::debug!(
"Successfully parsed SerializationHeader at offset {} (lookback: {}): pk_type={}",
type_len_offset,
lookback,
pk_types[0]
);
return Ok((remaining, (pk_types, ck_types, cols)));
}
}
}
if type_len_offset > 0 {
let prev_offset = type_len_offset - 1;
if input[prev_offset] == 0x00 && input[type_len_offset] == 0x00 {
let result = parse_serialization_header_at_offset(&input[prev_offset..]);
if result.is_ok() {
log::debug!(
"Successfully parsed SerializationHeader at legacy marker offset {}",
prev_offset
);
return result;
}
}
}
}
}
search_offset += 1;
}
log::debug!(
"Search completed: searched {} bytes, no partition key type found",
search_offset
);
log::debug!("Attempting to parse regular columns without partition key metadata");
let (remaining, (partition_keys, columns)) = parse_regular_columns(input)?;
if !columns.is_empty() {
log::debug!(
"Successfully parsed {} regular columns, {} partition keys via backtracking",
columns.len(),
partition_keys.len()
);
return Ok((remaining, (partition_keys, Vec::new(), columns)));
}
log::warn!(
"Failed to locate SerializationHeader or regular columns: searched {} bytes",
search_offset
);
if let Some((pk_types, ck_types, cols)) = fallback_parse_serialization_header_ascii(input) {
log::debug!(
"ASCII fallback extracted SerializationHeader: {} partition keys, {} clustering keys, {} regular columns",
pk_types.len(),
ck_types.len(),
cols.len()
);
return Ok((input, (pk_types, ck_types, cols)));
}
Ok((input, (Vec::new(), Vec::new(), Vec::new())))
}
fn parse_serialization_header_at_offset(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
use nom::bytes::complete::tag;
use nom::number::complete::u8 as parse_u8;
let _original_input = input;
let (input, _) = tag(b"\x00\x00")(input)?;
log::debug!("Found 0x00 0x00 marker");
let (input, partition_type_len) = parse_u8(input)?;
log::debug!("Partition key type length: {} bytes", partition_type_len);
let (input, partition_type_bytes) =
nom::bytes::complete::take(partition_type_len as usize)(input)?;
let partition_key_type = std::str::from_utf8(partition_type_bytes)
.map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
.to_string();
log::debug!("Partition key type: {}", partition_key_type);
let (input, clustering_count) = parse_u8(input)?;
log::debug!("Clustering key count: {}", clustering_count);
let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
let mut input = input;
for idx in 0..clustering_count {
let (remaining, type_len) = parse_u8(input)?;
log::debug!("Clustering key {} type length: {} bytes", idx, type_len);
let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
let clustering_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
log::debug!("Clustering key {} type: {}", idx, clustering_type);
clustering_key_types.push(clustering_type);
input = remaining;
}
let (input, static_count) = parse_u8(input)?;
log::debug!("Static column count: {}", static_count);
let mut static_columns = Vec::with_capacity(static_count as usize);
let mut input = input;
for static_idx in 0..static_count {
let (remaining, name_len) = parse_u8(input)?;
log::debug!(
"Static column {} name length: {} bytes",
static_idx,
name_len
);
if name_len == 0 || name_len > 200 {
log::debug!(
"Static column {} name_len sanity check failed: {}",
static_idx,
name_len
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
let column_name = std::str::from_utf8(name_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let (remaining, type_len_u64) = parse_vuint(remaining)?;
log::debug!(
"Static column {} ('{}') type length: {} bytes",
static_idx,
column_name,
type_len_u64
);
if type_len_u64 == 0 || type_len_u64 > 5000 {
log::debug!(
"Static column {} ('{}') type_len sanity check failed: {}",
static_idx,
column_name,
type_len_u64
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if type_len_u64 > 1000 {
log::warn!(
"Unusually long static column type string: {} bytes (typical <1000)",
type_len_u64
);
}
let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
let internal_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let cql_type = convert_marshal_type_to_cql(&internal_type);
log::debug!(
"Static column {}: name='{}', type='{}' (CQL: '{}')",
static_idx,
column_name,
internal_type,
cql_type
);
static_columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: true, is_clustering: false,
});
input = remaining;
}
log::debug!("Parsed {} static columns", static_columns.len());
let (mut input, column_count) = parse_u8(input)?;
log::debug!("Regular column count: {}", column_count);
let mut columns = Vec::with_capacity(column_count as usize + static_columns.len());
for col_idx in 0..column_count {
let (remaining, name_len) = parse_u8(input)?;
log::debug!("Column {} name length: {} bytes", col_idx, name_len);
let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
let column_name = std::str::from_utf8(name_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let (remaining, type_len_u64) = parse_vuint(remaining)?;
log::debug!(
"Column {} ('{}') type length: {} bytes",
col_idx,
column_name,
type_len_u64
);
if type_len_u64 == 0 || type_len_u64 > 5000 {
log::debug!(
"Column {} ('{}') type_len validation failed: {}",
col_idx,
column_name,
type_len_u64
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if type_len_u64 > 1000 {
log::warn!(
"Unusually long column type string: {} bytes (typical <1000)",
type_len_u64
);
}
let (remaining, type_bytes) = nom::bytes::complete::take(type_len_u64 as usize)(remaining)?;
let internal_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
input = remaining;
let cql_type = convert_marshal_type_to_cql(&internal_type);
log::debug!(
"Column {}: name='{}', type='{}' (CQL: '{}')",
col_idx,
column_name,
internal_type,
cql_type
);
columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: false,
is_clustering: false,
});
}
let mut all_columns = static_columns;
all_columns.append(&mut columns);
log::debug!(
"Successfully parsed SerializationHeader: {} partition keys, {} clustering keys, {} static columns, {} regular columns ({} total)",
1, clustering_key_types.len(),
all_columns.iter().filter(|c| c.is_static).count(),
all_columns.iter().filter(|c| !c.is_static).count(),
all_columns.len()
);
Ok((
input,
(vec![partition_key_type], clustering_key_types, all_columns),
))
}
fn extract_partition_key_before_marker(input: &[u8], marker_offset: usize) -> Option<String> {
if marker_offset < 3 {
return None;
}
log::debug!(
"Backtracking from marker at offset {} (input len: {})",
marker_offset,
input.len()
);
let max_lookback = 210;
let search_start = marker_offset.saturating_sub(max_lookback);
log::debug!(
"Searching for VInt from offset {} to {} ({} positions)",
search_start,
marker_offset,
marker_offset - search_start
);
for vint_start in (search_start..marker_offset).rev() {
match parse_vuint(&input[vint_start..marker_offset]) {
Ok((remaining, type_len)) => {
if !(10..200).contains(&type_len) {
continue;
}
let vint_len = marker_offset - vint_start - remaining.len();
let type_start = vint_start + vint_len;
let type_len_usize = type_len as usize;
if type_start > input.len() || type_len_usize > input.len() - type_start {
continue;
}
let type_end = type_start + type_len_usize;
if type_end == marker_offset {
if let Ok(type_str) = std::str::from_utf8(&input[type_start..type_end]) {
log::debug!(
"Candidate at vint_start={}: type_len={}, type_start={}, type_end={}, str={}",
vint_start, type_len, type_start, type_end, type_str
);
if type_str.contains("org.apache.cassandra") {
log::debug!(
"Found partition key type at offset {}: length={}, type={}",
vint_start,
type_len,
type_str
);
return Some(type_str.to_string());
} else {
log::debug!(
"Rejected candidate (starts_with='(': {}, contains 'org.apache.cassandra': {})",
type_str.starts_with('('),
type_str.contains("org.apache.cassandra")
);
}
} else {
log::debug!(
"Rejected candidate at vint_start={}: not valid UTF-8",
vint_start
);
}
}
}
Err(_) => continue, }
}
None
}
fn parse_regular_columns(
input: &[u8],
) -> IResult<&[u8], (Vec<String>, Vec<super::header::ColumnInfo>)> {
use super::header::ColumnInfo;
let mut search_offset = 0;
let mut partition_key_types = Vec::new();
while search_offset + 2 < input.len() && search_offset < 8192 {
if input[search_offset] == 0x00 {
let (marker_offset, count_offset) =
if search_offset + 1 < input.len() && input[search_offset + 1] == 0x00 {
(search_offset, search_offset + 2)
} else {
(search_offset, search_offset + 1)
};
if count_offset >= input.len() {
break;
}
let column_count = input[count_offset] as usize;
if column_count == 0 || column_count > 50 {
search_offset += 1;
continue;
}
log::debug!(
"Attempting to extract partition key by backtracking from marker at offset {}",
marker_offset
);
if let Some(pk_type) = extract_partition_key_before_marker(input, marker_offset) {
log::debug!("Found partition key type before marker: {}", pk_type);
partition_key_types.push(pk_type);
} else {
log::debug!(
"No partition key type found via backtracking at offset {}",
marker_offset
);
}
let mut pos = count_offset + 1;
let context_len = std::cmp::min(128, input.len() - marker_offset);
let context_hex: String = input[marker_offset..marker_offset + context_len]
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(" ");
log::debug!(
"Pattern found at offset {}: count={}, next 128 bytes: {}",
marker_offset,
column_count,
context_hex
);
let mut parsed_columns = Vec::with_capacity(column_count);
let mut parse_success = true;
for col_idx in 0..column_count {
if pos >= input.len() {
log::debug!(
"Column {} parsing failed at offset {}: position {} exceeds buffer length {}",
col_idx,
marker_offset,
pos,
input.len()
);
parse_success = false;
break;
}
if pos >= input.len() {
log::debug!(
"Column {} parsing failed at offset {}: no data available for name length byte (pos={}, len={})",
col_idx,
marker_offset,
pos,
input.len()
);
parse_success = false;
break;
}
let name_len = input[pos] as usize;
pos += 1;
if name_len == 0 || name_len > 200 || pos + name_len > input.len() {
log::debug!(
"Column {} parsing failed at offset {}: name_len sanity check failed (name_len={}, pos={}, buffer_len={})",
col_idx,
marker_offset,
name_len,
pos,
input.len()
);
parse_success = false;
break;
}
let name_bytes = &input[pos..pos + name_len];
let column_name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(e) => {
let name_hex: String = name_bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(" ");
log::debug!(
"Column {} parsing failed at offset {}: UTF-8 decode error for column name at pos {} (len={}): {:?}, bytes: {}",
col_idx,
marker_offset,
pos,
name_len,
e,
name_hex
);
parse_success = false;
break;
}
};
pos += name_len;
if pos >= input.len() {
log::debug!(
"Column {} ('{}') parsing failed at offset {}: no data available for type length byte (pos={}, len={})",
col_idx,
column_name,
marker_offset,
pos,
input.len()
);
parse_success = false;
break;
}
let type_len_result = parse_vuint(&input[pos..]);
let (type_remaining, type_len_u64) = match type_len_result {
Ok(r) => r,
Err(_) => {
log::debug!(
"Column {} ('{}') parsing failed at offset {}: VInt parse error at pos {}",
col_idx,
column_name,
marker_offset,
pos
);
parse_success = false;
break;
}
};
let type_len = type_len_u64 as usize;
pos = input.len() - type_remaining.len();
if type_len == 0 || type_len > 5000 || pos + type_len > input.len() {
log::debug!(
"Column {} ('{}') parsing failed at offset {}: type_len sanity check failed (type_len={}, pos={}, buffer_len={})",
col_idx,
column_name,
marker_offset,
type_len,
pos,
input.len()
);
parse_success = false;
break;
}
let type_bytes = &input[pos..pos + type_len];
let internal_type = match std::str::from_utf8(type_bytes) {
Ok(s) => s.to_string(),
Err(e) => {
let type_hex: String = type_bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(" ");
log::debug!(
"Column {} ('{}') parsing failed at offset {}: UTF-8 decode error for column type at pos {} (len={}): {:?}, bytes: {}",
col_idx,
column_name,
marker_offset,
pos,
type_len,
e,
type_hex
);
parse_success = false;
break;
}
};
pos += type_len;
let cql_type = convert_marshal_type_to_cql(&internal_type);
parsed_columns.push(ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false, key_position: None,
is_static: false,
is_clustering: false,
});
}
if parse_success && parsed_columns.len() == column_count {
let column_names: Vec<&str> =
parsed_columns.iter().map(|c| c.name.as_str()).collect();
log::debug!(
"Successfully parsed {} columns at offset {}: {:?}",
parsed_columns.len(),
marker_offset,
column_names
);
if !partition_key_types.is_empty() {
log::debug!(
"Extracted {} partition key types via backtracking: {:?}",
partition_key_types.len(),
partition_key_types
);
}
let remaining = &input[pos..];
return Ok((remaining, (partition_key_types, parsed_columns)));
}
}
search_offset += 1;
}
log::debug!(
"Regular column section not found: searched {} bytes",
search_offset
);
Ok((input, (Vec::new(), Vec::new())))
}
fn fallback_parse_serialization_header_ascii(
input: &[u8],
) -> Option<(Vec<String>, Vec<String>, Vec<super::header::ColumnInfo>)> {
use super::header::ColumnInfo;
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}
let mut partition_types = Vec::new();
let mut clustering_types = Vec::new();
let mut columns = Vec::new();
if let Some(comp_idx) = find_subsequence(input, b"CompositeType(") {
let start = comp_idx + "CompositeType(".len();
let mut end = start;
while end < input.len() && input[end] != b')' {
end += 1;
}
if end <= input.len() {
if let Ok(inner) = std::str::from_utf8(&input[start..end]) {
partition_types = inner
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
let mut cursor = end + 1;
while cursor < input.len() && input[cursor] < 0x20 {
cursor += 1;
}
if cursor < input.len() && input[cursor] == b'(' {
cursor += 1;
let mut cluster_end = cursor;
while cluster_end < input.len() && input[cluster_end] >= 0x20 {
cluster_end += 1;
}
if cluster_end > cursor {
if let Ok(cluster_str) = std::str::from_utf8(&input[cursor..cluster_end]) {
if cluster_str.contains("org.apache.cassandra.db.marshal") {
clustering_types = cluster_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
}
}
let mut scan_start = cluster_end;
while scan_start < input.len() && input[scan_start] < 0x20 {
scan_start += 1;
}
let mut idx = scan_start;
while idx < input.len() {
let name_len = input[idx] as usize;
if name_len == 0 || name_len > 64 {
idx += 1;
continue;
}
let name_start = idx + 1;
let name_end = name_start + name_len;
if name_end > input.len() {
break;
}
let name_bytes = &input[name_start..name_end];
if !name_bytes
.iter()
.all(|b| b.is_ascii_alphanumeric() || *b == b'_')
{
idx += 1;
continue;
}
if name_end >= input.len() || input[name_end] != b'(' {
idx += 1;
continue;
}
let type_start = name_end + 1;
let mut type_end = type_start;
while type_end < input.len() && input[type_end] >= 0x20 {
type_end += 1;
}
if type_end == type_start {
idx += 1;
continue;
}
let type_bytes = &input[type_start..type_end];
if !type_bytes.windows(10).any(|w| w == b"org.apach") {
idx += 1;
continue;
}
let column_name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(_) => {
idx += 1;
continue;
}
};
let internal_type = match std::str::from_utf8(type_bytes) {
Ok(s) => s.trim().to_string(),
Err(_) => {
idx += 1;
continue;
}
};
let cql_type = convert_marshal_type_to_cql(&internal_type);
columns.push(ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: false,
is_clustering: false,
});
idx = type_end;
while idx < input.len() && input[idx] < 0x20 {
idx += 1;
}
}
}
}
}
if partition_types.is_empty() && columns.is_empty() {
return None;
}
Some((partition_types, clustering_types, columns))
}
fn extract_inner_type(type_with_close_paren: &str) -> Option<&str> {
let mut depth = 1; for (idx, ch) in type_with_close_paren.char_indices() {
match ch {
'(' => depth += 1,
')' => {
depth -= 1;
if depth == 0 {
if idx == 0 {
return None;
}
return Some(&type_with_close_paren[..idx]);
}
}
_ => {}
}
}
None }
fn split_type_arguments(input: &str) -> Vec<&str> {
let mut args = Vec::new();
let mut depth = 0;
let mut start = 0;
for (idx, ch) in input.char_indices() {
match ch {
'(' => depth += 1,
')' => {
if depth > 0 {
depth -= 1;
} else {
log::warn!(
"Unmatched closing parenthesis at position {} in type arguments: '{}'",
idx,
input
);
}
}
',' if depth == 0 => {
let part = input[start..idx].trim();
if !part.is_empty() {
args.push(part);
}
start = idx + ch.len_utf8();
}
_ => {}
}
}
let tail = input[start..].trim();
if !tail.is_empty() {
args.push(tail);
}
args
}
fn convert_marshal_type_to_cql(marshal_type: &str) -> String {
fn strip_wrapping_parens(mut value: &str) -> &str {
loop {
let trimmed = value.trim();
if trimmed.starts_with('(') && trimmed.ends_with(')') && trimmed.len() > 2 {
value = &trimmed[1..trimmed.len() - 1];
} else {
return trimmed;
}
}
}
fn strip_namespace(type_name: &str) -> &str {
type_name.rsplit('.').next().unwrap_or(type_name)
}
fn strip_type_suffix(name: &str) -> &str {
name.trim_end_matches("Type")
}
let mut cleaned = strip_wrapping_parens(marshal_type);
if cleaned.contains("org.apache.cassandra.db.marshal.UserType(") {
return marshal_type.to_string();
}
for prefix in [
"org.apache.cassandra.db.marshal.ReversedType(",
"ReversedType(",
] {
if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
if let Some(inner) = extract_inner_type(params_with_close) {
return convert_marshal_type_to_cql(inner);
}
}
}
for prefix in ["org.apache.cassandra.db.marshal.FrozenType(", "FrozenType("] {
if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
if let Some(inner) = extract_inner_type(params_with_close) {
return format!("frozen<{}>", convert_marshal_type_to_cql(inner));
}
}
}
for prefix in ["org.apache.cassandra.db.marshal.ListType(", "ListType("] {
if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
if let Some(inner) = extract_inner_type(params_with_close) {
return format!("list<{}>", convert_marshal_type_to_cql(inner));
}
}
}
for prefix in ["org.apache.cassandra.db.marshal.SetType(", "SetType("] {
if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
if let Some(inner) = extract_inner_type(params_with_close) {
return format!("set<{}>", convert_marshal_type_to_cql(inner));
}
}
}
for prefix in ["org.apache.cassandra.db.marshal.MapType(", "MapType("] {
if let Some(params_with_close) = cleaned.strip_prefix(prefix) {
if let Some(inner) = extract_inner_type(params_with_close) {
let args = split_type_arguments(inner);
if args.len() == 2 {
let key = convert_marshal_type_to_cql(args[0]);
let value = convert_marshal_type_to_cql(args[1]);
return format!("map<{}, {}>", key, value);
} else if args.len() == 1 {
let value = convert_marshal_type_to_cql(args[0]);
return format!("map<text, {}>", value);
}
}
}
}
cleaned = strip_wrapping_parens(cleaned);
let base = strip_type_suffix(strip_namespace(cleaned)).trim_end_matches(')');
match base {
"UTF8" => "text".to_string(),
"Int32" => "int".to_string(),
"Integer" => "int".to_string(),
"Long" => "bigint".to_string(),
"Short" => "smallint".to_string(),
"Byte" => "tinyint".to_string(),
"SimpleDate" => "date".to_string(),
"Timestamp" => "timestamp".to_string(),
"Boolean" => "boolean".to_string(),
"Decimal" => "decimal".to_string(),
"Float" => "float".to_string(),
"Double" => "double".to_string(),
"Bytes" => "blob".to_string(),
"Ascii" => "ascii".to_string(),
"InetAddress" => "inet".to_string(),
"UUID" => "uuid".to_string(),
"TimeUUID" => "timeuuid".to_string(),
"Duration" => "duration".to_string(),
"Time" => "time".to_string(),
"Counter" | "CounterColumn" => "counter".to_string(),
other => other.to_lowercase(),
}
}
fn build_partition_key_columns(partition_types: &[String]) -> Vec<super::header::ColumnInfo> {
if partition_types.is_empty() {
return Vec::new();
}
let total = partition_types.len();
partition_types
.iter()
.enumerate()
.map(|(idx, marshal_type)| {
let cql_type = convert_marshal_type_to_cql(marshal_type);
let name = if total == 1 {
match cql_type.as_str() {
"uuid" | "timeuuid" => "id".to_string(),
_ => "partition_key".to_string(),
}
} else {
format!("partition_key_{}", idx)
};
super::header::ColumnInfo {
name,
column_type: cql_type,
is_primary_key: true,
key_position: Some(idx as u16),
is_static: false,
is_clustering: false,
}
})
.collect()
}
fn build_clustering_key_columns(clustering_types: &[String]) -> Vec<super::header::ColumnInfo> {
if clustering_types.is_empty() {
return Vec::new();
}
let total = clustering_types.len();
clustering_types
.iter()
.enumerate()
.map(|(idx, marshal_type)| {
let cql_type = convert_marshal_type_to_cql(marshal_type);
let name = if total == 1 {
"clustering_key".to_string()
} else {
format!("clustering_key_{}", idx)
};
super::header::ColumnInfo {
name,
column_type: cql_type,
is_primary_key: true,
key_position: Some(idx as u16),
is_static: false,
is_clustering: true,
}
})
.collect()
}
fn parse_serialization_header_sequential(
input: &[u8],
) -> IResult<&[u8], SerializationHeaderResult> {
let (input, pk_type_len) = parse_vuint(input)?;
if pk_type_len == 0 || pk_type_len > 5000 {
log::debug!(
"Invalid partition key type length: {} (expected 1-2000)",
pk_type_len
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (input, pk_type_bytes) = nom::bytes::complete::take(pk_type_len as usize)(input)?;
let partition_key_type = std::str::from_utf8(pk_type_bytes)
.map_err(|_| nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify)))?
.to_string();
log::debug!(
"Sequential parser: partition key type (len={}): {}",
pk_type_len,
partition_key_type
);
let (input, clustering_count) = parse_vuint(input)?;
let clustering_count = clustering_count as usize;
if clustering_count > 100 {
log::debug!(
"Invalid clustering key count: {} (expected 0-100)",
clustering_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!(
"Sequential parser: clustering key count: {}",
clustering_count
);
let mut clustering_key_types = Vec::with_capacity(clustering_count);
let mut input = input;
for idx in 0..clustering_count {
let (remaining, type_len) = parse_vuint(input)?;
if type_len == 0 || type_len > 5000 {
log::debug!("Invalid clustering key {} type length: {}", idx, type_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
let clustering_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
log::debug!(
"Sequential parser: clustering key {} type (len={}): {}",
idx,
type_len,
clustering_type
);
clustering_key_types.push(clustering_type);
input = remaining;
}
let (input, static_count) = parse_vuint(input)?;
let static_count = static_count as usize;
if static_count > 200 {
log::debug!(
"Invalid static column count: {} (expected 0-200)",
static_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!("Sequential parser: static column count: {}", static_count);
let mut static_columns = Vec::with_capacity(static_count);
let mut input = input;
for idx in 0..static_count {
let (remaining, name_len) = parse_vuint(input)?;
if name_len == 0 || name_len > 200 {
log::debug!("Invalid static column {} name length: {}", idx, name_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
let column_name = std::str::from_utf8(name_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let (remaining, type_len) = parse_vuint(remaining)?;
if type_len == 0 || type_len > 5000 {
log::debug!(
"Invalid static column '{}' type length: {}",
column_name,
type_len
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
let internal_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let cql_type = convert_marshal_type_to_cql(&internal_type);
log::debug!(
"Sequential parser: static column {}: name='{}', type='{}'",
idx,
column_name,
cql_type
);
static_columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: true,
is_clustering: false,
});
input = remaining;
}
let (input, regular_count) = parse_vuint(input)?;
let regular_count = regular_count as usize;
if regular_count > 500 {
log::debug!(
"Invalid regular column count: {} (expected 0-500)",
regular_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!("Sequential parser: regular column count: {}", regular_count);
let mut regular_columns = Vec::with_capacity(regular_count);
let mut input = input;
for idx in 0..regular_count {
let (remaining, name_len) = parse_vuint(input)?;
if name_len == 0 || name_len > 200 {
log::debug!("Invalid regular column {} name length: {}", idx, name_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, name_bytes) = nom::bytes::complete::take(name_len as usize)(remaining)?;
let column_name = std::str::from_utf8(name_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let (remaining, type_len) = parse_vuint(remaining)?;
if type_len == 0 || type_len > 5000 {
log::debug!(
"Invalid regular column '{}' type length: {}",
column_name,
type_len
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, type_bytes) = nom::bytes::complete::take(type_len as usize)(remaining)?;
let internal_type = std::str::from_utf8(type_bytes)
.map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Verify))
})?
.to_string();
let cql_type = convert_marshal_type_to_cql(&internal_type);
log::debug!(
"Sequential parser: regular column {}: name='{}', type='{}'",
idx,
column_name,
cql_type
);
regular_columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: false,
is_clustering: false,
});
input = remaining;
}
let mut all_columns = static_columns;
all_columns.extend(regular_columns);
log::debug!(
"Sequential parser complete: partition_key='{}', {} clustering keys, {} total columns",
partition_key_type,
clustering_key_types.len(),
all_columns.len()
);
Ok((
input,
(vec![partition_key_type], clustering_key_types, all_columns),
))
}
fn parse_serialization_header_schema(input: &[u8]) -> IResult<&[u8], SerializationHeaderResult> {
let (input, pk_type_len) = parse_vuint(input)?;
if pk_type_len == 0 || pk_type_len > 5000 {
log::debug!("Invalid pk_type_len: {}", pk_type_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if pk_type_len > 1000 {
log::warn!(
"Unusually long partition key type string: {} bytes (typical <1000)",
pk_type_len
);
}
let (input, pk_type_bytes) = take(pk_type_len as usize)(input)?;
let partition_key_type = match std::str::from_utf8(pk_type_bytes) {
Ok(s) => convert_marshal_type_to_cql(s),
Err(_) => {
log::debug!("Invalid UTF-8 in partition key type");
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
log::debug!(
"HEADER: Partition key type: {} ({} bytes)",
partition_key_type,
pk_type_len
);
let (input, clustering_count) = parse_vuint(input)?;
if clustering_count > 1000 {
log::warn!(
"Suspicious clustering_count={} in SerializationHeader (expected <100)",
clustering_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!("HEADER: {} clustering key types", clustering_count);
let mut input = input;
let mut clustering_key_types = Vec::with_capacity(clustering_count as usize);
for i in 0..clustering_count {
let (remaining, ck_type_len) = parse_vuint(input)?;
if ck_type_len == 0 || ck_type_len > 5000 {
log::debug!("Invalid clustering key type length: {}", ck_type_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if ck_type_len > 1000 {
log::warn!(
"Unusually long clustering key type string: {} bytes (typical <1000)",
ck_type_len
);
}
let (remaining, ck_type_bytes) = take(ck_type_len as usize)(remaining)?;
let ck_type = match std::str::from_utf8(ck_type_bytes) {
Ok(s) => convert_marshal_type_to_cql(s),
Err(_) => {
log::debug!("Invalid UTF-8 in clustering key type {}", i);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
log::debug!(
"HEADER: Clustering key {}: {} ({} bytes)",
i,
ck_type,
ck_type_len
);
clustering_key_types.push(ck_type);
input = remaining;
}
let (input, static_count) = parse_vuint(input)?;
if static_count > 10000 {
log::warn!(
"Suspicious static_count={} in SerializationHeader (expected <1000)",
static_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!("HEADER: {} static columns", static_count);
let mut input = input;
let mut static_columns = Vec::with_capacity(static_count as usize);
for i in 0..static_count {
let (remaining, name_len) = parse_vuint(input)?;
if name_len == 0 || name_len > 200 {
log::debug!("Invalid static column name length: {}", name_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
let column_name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(_) => {
log::debug!("Invalid UTF-8 in static column name {}", i);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
let (remaining, type_len) = parse_vuint(remaining)?;
if type_len == 0 || type_len > 5000 {
log::debug!("Invalid static column type length: {}", type_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if type_len > 1000 {
log::warn!(
"Unusually long static column type string: {} bytes (typical <1000)",
type_len
);
}
let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
let cql_type = match std::str::from_utf8(type_bytes) {
Ok(s) => convert_marshal_type_to_cql(s),
Err(_) => {
log::debug!("Invalid UTF-8 in static column type {}", i);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
log::debug!(
"HEADER: Static column '{}': {} ({} bytes)",
column_name,
cql_type,
type_len
);
static_columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: true,
is_clustering: false,
});
input = remaining;
}
let (input, regular_count) = parse_vuint(input)?;
if regular_count > 10000 {
log::warn!(
"Suspicious regular_count={} in SerializationHeader (expected <1000)",
regular_count
);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
log::debug!("HEADER: {} regular columns", regular_count);
let mut input = input;
let mut regular_columns = Vec::with_capacity(regular_count as usize);
for i in 0..regular_count {
let (remaining, name_len) = parse_vuint(input)?;
if name_len == 0 || name_len > 200 {
log::debug!("Invalid regular column name length: {}", name_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
let (remaining, name_bytes) = take(name_len as usize)(remaining)?;
let column_name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(_) => {
log::debug!("Invalid UTF-8 in regular column name {}", i);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
let (remaining, type_len) = parse_vuint(remaining)?;
if type_len == 0 || type_len > 5000 {
log::debug!("Invalid regular column type length: {}", type_len);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
if type_len > 1000 {
log::warn!(
"Unusually long regular column type string: {} bytes (typical <1000)",
type_len
);
}
let (remaining, type_bytes) = take(type_len as usize)(remaining)?;
let cql_type = match std::str::from_utf8(type_bytes) {
Ok(s) => convert_marshal_type_to_cql(s),
Err(_) => {
log::debug!("Invalid UTF-8 in regular column type {}", i);
return Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)));
}
};
log::debug!(
"HEADER: Regular column '{}': {} ({} bytes)",
column_name,
cql_type,
type_len
);
regular_columns.push(super::header::ColumnInfo {
name: column_name,
column_type: cql_type,
is_primary_key: false,
key_position: None,
is_static: false,
is_clustering: false,
});
input = remaining;
}
let mut all_columns = static_columns;
all_columns.extend(regular_columns);
log::debug!(
"HEADER parsing complete: partition_key='{}', {} clustering keys, {} total columns",
partition_key_type,
clustering_key_types.len(),
all_columns.len()
);
Ok((
input,
(vec![partition_key_type], clustering_key_types, all_columns),
))
}
fn parse_minimal_encoding_stats<'a>(
input: &'a [u8],
full_input: &'a [u8],
header_offset: Option<usize>,
gates: Option<&VersionGates>,
) -> IResult<&'a [u8], EncodingStatsResult> {
let Some(offset) = header_offset else {
log::debug!("No HEADER TOC offset, using fallback EncodingStats parsing");
return parse_encoding_stats_fallback(input, gates);
};
if offset >= full_input.len() {
log::warn!(
"TOC offset 0x{:x} exceeds input length {}, using fallback",
offset,
full_input.len()
);
return parse_encoding_stats_fallback(input, gates);
}
let header_data = &full_input[offset..];
log::debug!(
"Parsing EncodingStats + SerializationHeader at TOC offset 0x{:x} ({} bytes available)",
offset,
header_data.len()
);
let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
parse_encoding_stats_vuints(header_data, gates)?;
log::debug!(
"EncodingStats from HEADER: min_timestamp={}, min_deletion_time={}, min_ttl={:?}",
min_timestamp,
min_deletion_time,
min_ttl
);
let (partition_types, clustering_types, columns) = match parse_serialization_header_schema(rest)
{
Ok((_, result)) => result,
Err(e) => {
log::warn!(
"Schema parsing after EncodingStats failed: {:?}, falling back to marker search",
e
);
parse_serialization_header(input)?.1
}
};
let (partition_key_columns, clustering_key_columns) =
build_column_infos(&partition_types, &clustering_types);
Ok((
input,
(
min_timestamp,
min_deletion_time,
min_ttl,
partition_key_columns,
clustering_key_columns,
columns,
),
))
}
fn parse_encoding_stats_vuints<'a>(
input: &'a [u8],
_gates: Option<&VersionGates>,
) -> IResult<&'a [u8], (i64, i64, Option<i64>)> {
let (rest, min_ts_delta) = parse_vuint(input)?;
let (rest, min_ldt_delta) = parse_vuint(rest)?;
let (rest, min_ttl_delta) = parse_vuint(rest)?;
Ok((
rest,
(
min_ts_delta as i64 + TIMESTAMP_EPOCH,
min_ldt_delta as i64 + DELETION_TIME_EPOCH,
Some(min_ttl_delta as i64 + TTL_EPOCH),
),
))
}
fn build_column_infos(
partition_types: &[String],
clustering_types: &[String],
) -> (
Vec<super::header::ColumnInfo>,
Vec<super::header::ColumnInfo>,
) {
let partition_key_columns = build_partition_key_columns(partition_types);
let clustering_key_columns = build_clustering_key_columns(clustering_types);
log::debug!(
"Constructed ColumnInfo entries from SerializationHeader: {} partition keys, {} clustering keys",
partition_key_columns.len(),
clustering_key_columns.len()
);
(partition_key_columns, clustering_key_columns)
}
fn parse_encoding_stats_fallback<'a>(
input: &'a [u8],
gates: Option<&VersionGates>,
) -> IResult<&'a [u8], EncodingStatsResult> {
let (rest, _metadata_type) = be_u32(input)?;
let (rest, _data_length) = parse_vuint(rest)?;
let (rest, partitioner_len) = parse_vuint(rest)?;
let (rest, _) = take(partitioner_len as usize)(rest)?;
let (rest, _metadata1) = parse_vuint(rest)?;
let (rest, _metadata2) = parse_vuint(rest)?;
let (rest, (min_timestamp, min_deletion_time, min_ttl)) =
parse_encoding_stats_vuints(rest, gates)?;
let (_, (partition_types, clustering_types, columns)) = parse_serialization_header(rest)?;
let (partition_key_columns, clustering_key_columns) =
build_column_infos(&partition_types, &clustering_types);
Ok((
input,
(
min_timestamp,
min_deletion_time,
min_ttl,
partition_key_columns,
clustering_key_columns,
columns,
),
))
}
pub fn parse_enhanced_statistics_file<'a>(
input: &'a [u8],
gates: Option<&VersionGates>,
) -> IResult<&'a [u8], SSTableStatistics> {
let (remaining, header) = parse_nb_format_header(input)?;
let result = parse_nb_format_statistics_data(remaining, &header, input, gates);
match result {
Ok((
row_stats,
timestamp_stats,
table_stats,
partition_stats,
compression_stats,
partition_columns,
clustering_columns,
columns,
)) => {
log::debug!(
"Successfully parsed Statistics.db serialization header: {} partition keys, {} clustering keys, {} regular columns",
partition_columns.len(),
clustering_columns.len(),
columns.len()
);
let statistics = SSTableStatistics {
header,
row_stats,
timestamp_stats,
column_stats: vec![],
table_stats,
partition_stats,
compression_stats,
metadata: std::collections::HashMap::new(),
serialization_header_columns: columns,
serialization_header_partition_keys: partition_columns,
serialization_header_clustering_keys: clustering_columns,
};
Ok((remaining, statistics))
}
Err(e) => {
log::warn!("Failed to parse nb-format Statistics.db: {}", e);
Err(nom::Err::Error(nom::error::Error::new(
input,
nom::error::ErrorKind::Verify,
)))
}
}
}
pub fn parse_statistics_with_fallback<'a>(
input: &'a [u8],
gates: Option<&VersionGates>,
) -> IResult<&'a [u8], SSTableStatistics> {
parse_enhanced_statistics_file(input, gates)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialization_header_with_no_clustering_keys() {
let mut test_data = vec![];
let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
test_data.extend_from_slice(&[0x00, 0x00]); test_data.push(partition_type.len() as u8);
test_data.extend_from_slice(partition_type);
test_data.push(0x00);
test_data.push(0x00); test_data.push(0x02);
test_data.push(0x02); test_data.extend_from_slice(b"id");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
test_data.push(0x04); test_data.extend_from_slice(b"name");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
let mut full_data = vec![0xFF; 100];
full_data.extend_from_slice(&test_data);
let result = parse_serialization_header(&full_data);
assert!(
result.is_ok(),
"Failed to parse SerializationHeader: {:?}",
result.as_ref().err()
);
let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
assert_eq!(partition_types.len(), 1, "Expected 1 partition key");
assert!(partition_types[0].contains("UUIDType"));
assert_eq!(clustering_types.len(), 0, "Expected 0 clustering keys");
assert_eq!(columns.len(), 2, "Expected 2 columns");
assert_eq!(columns[0].name, "id");
assert_eq!(columns[0].column_type, "uuid");
assert_eq!(columns[1].name, "name");
assert_eq!(columns[1].column_type, "text");
}
#[test]
fn test_serialization_header_with_clustering_keys() {
let mut test_data = vec![];
let partition_type = b"(org.apache.cassandra.db.marshal.UUIDType";
test_data.extend_from_slice(&[0x00, 0x00]); test_data.push(partition_type.len() as u8);
test_data.extend_from_slice(partition_type);
test_data.push(0x02);
let ck1 =
b"[org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType)";
test_data.push(ck1.len() as u8);
test_data.extend_from_slice(ck1);
let ck2 = b"(org.apache.cassandra.db.marshal.UTF8Type)";
test_data.push(ck2.len() as u8);
test_data.extend_from_slice(ck2);
test_data.push(0x00); test_data.push(0x02);
test_data.push(0x04); test_data.extend_from_slice(b"data");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
test_data.push(0x05); test_data.extend_from_slice(b"value");
test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
let mut full_data = vec![0xFF; 100];
full_data.extend_from_slice(&test_data);
let result = parse_serialization_header(&full_data);
assert!(
result.is_ok(),
"Failed to parse SerializationHeader with clustering keys: {:?}",
result.err()
);
let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
assert_eq!(partition_types.len(), 1);
assert!(partition_types[0].contains("UUIDType"));
assert_eq!(clustering_types.len(), 2, "Expected 2 clustering keys");
assert!(clustering_types[0].contains("ReversedType"));
assert!(clustering_types[0].contains("TimestampType"));
assert!(clustering_types[1].contains("UTF8Type"));
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name, "data");
assert_eq!(columns[0].column_type, "text");
assert_eq!(columns[1].name, "value");
assert_eq!(columns[1].column_type, "int");
}
#[test]
fn test_serialization_header_with_static_columns() {
let mut test_data = vec![];
test_data.extend_from_slice(&[0x00, 0x00]);
let partition_type = b"org.apache.cassandra.db.marshal.UUIDType";
test_data.push(partition_type.len() as u8);
test_data.extend_from_slice(partition_type);
test_data.push(0x01);
let ck1 = b"org.apache.cassandra.db.marshal.TimestampType";
test_data.push(ck1.len() as u8);
test_data.extend_from_slice(ck1);
test_data.push(0x01);
test_data.push(0x0b); test_data.extend_from_slice(b"static_data");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
test_data.push(0x02);
test_data.push(0x08); test_data.extend_from_slice(b"row_data");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
test_data.push(0x09); test_data.extend_from_slice(b"row_value");
test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
let mut full_data = vec![0xFF; 100];
full_data.extend_from_slice(&test_data);
let result = parse_serialization_header(&full_data);
assert!(
result.is_ok(),
"Failed to parse SerializationHeader with static columns: {:?}",
result.err()
);
let (_remaining, (partition_types, clustering_types, columns)) = result.unwrap();
assert_eq!(partition_types.len(), 1);
assert!(partition_types[0].contains("UUIDType"));
assert_eq!(clustering_types.len(), 1);
assert!(clustering_types[0].contains("TimestampType"));
assert_eq!(
columns.len(),
3,
"Expected 3 columns (1 static + 2 regular)"
);
assert_eq!(columns[0].name, "static_data");
assert_eq!(columns[0].column_type, "text");
assert!(
columns[0].is_static,
"static_data should be marked as static"
);
assert_eq!(columns[1].name, "row_data");
assert_eq!(columns[1].column_type, "text");
assert!(
!columns[1].is_static,
"row_data should NOT be marked as static"
);
assert_eq!(columns[2].name, "row_value");
assert_eq!(columns[2].column_type, "int");
assert!(
!columns[2].is_static,
"row_value should NOT be marked as static"
);
}
#[test]
fn test_marshal_type_conversion() {
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.Int32Type"),
"int"
);
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UTF8Type"),
"text"
);
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.UUIDType"),
"uuid"
);
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.TimestampType"),
"timestamp"
);
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.DecimalType"),
"decimal"
);
assert_eq!(
convert_marshal_type_to_cql("org.apache.cassandra.db.marshal.SimpleDataType"),
"simpledata"
);
let udt = "org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type,63697479:org.apache.cassandra.db.marshal.UTF8Type)";
assert_eq!(
convert_marshal_type_to_cql(udt),
udt,
"UserType definitions must be preserved to retain keyspace, type name, and field metadata"
);
let frozen_udt = "org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type))";
assert!(
convert_marshal_type_to_cql(frozen_udt).contains("UserType("),
"UserType inside FrozenType should be preserved"
);
let list_udt = "org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.UserType(test_collections,616464726573735f74797065,737472656574:org.apache.cassandra.db.marshal.UTF8Type)))";
assert!(
convert_marshal_type_to_cql(list_udt).contains("UserType("),
"UserType inside List should be preserved"
);
}
#[test]
fn test_nb_format_header_parsing() {
let test_data = vec![
0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14, 0xd4, ];
let result = parse_nb_format_header(&test_data);
assert!(result.is_ok());
let (_, header) = result.unwrap();
assert_eq!(header.version, 4);
assert_eq!(header.statistics_kind, 0x2629_1b05);
assert_eq!(header.data_length, 44);
assert_eq!(header.metadata1, 1);
assert_eq!(header.metadata2, 101);
assert_eq!(header.metadata3, 2);
assert_eq!(header.checksum, 0x14d4);
}
#[test]
fn test_statistics_data_extraction_with_invalid_data() {
let header = StatisticsHeader {
version: 4,
statistics_kind: 0x2629_1b05,
data_length: 44,
metadata1: 1,
metadata2: 101,
metadata3: 2,
checksum: 0x14d4,
table_id: None,
};
let dummy_data = vec![0xFF; 10]; let result = parse_nb_format_statistics_data(&dummy_data, &header, &dummy_data, None);
assert!(result.is_err());
}
#[test]
fn test_enhanced_statistics_file_with_incomplete_data() {
let test_data = vec![
0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14,
0xd4, ];
let result = parse_enhanced_statistics_file(&test_data, None);
assert!(result.is_err());
}
#[test]
fn test_parser_fallback_with_incomplete_data() {
let test_data = vec![
0x00, 0x00, 0x00, 0x04, 0x26, 0x29, 0x1b, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x14, 0xd4, ];
let result = parse_statistics_with_fallback(&test_data, None);
assert!(result.is_err());
}
#[test]
fn test_invalid_data_returns_error() {
let invalid_data = vec![0xFF; 10];
let result = parse_statistics_with_fallback(&invalid_data, None);
assert!(result.is_err(), "Invalid data should fail to parse");
}
#[test]
fn test_partition_key_extraction_via_backtracking() {
let mut test_data = vec![];
test_data.extend_from_slice(&[0xFF; 50]);
test_data.extend_from_slice(&[0x80, 0x28]); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UUIDType");
test_data.push(0x00); test_data.push(0x02);
test_data.push(0x0E); test_data.extend_from_slice(b"expiring_value");
test_data.push(0x29); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.Int32Type");
test_data.push(0x0C); test_data.extend_from_slice(b"session_info");
test_data.push(0x28); test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
let result = parse_regular_columns(&test_data);
assert!(
result.is_ok(),
"Failed to parse columns with backtracking: {:?}",
result.err()
);
let (_remaining, (partition_keys, columns)) = result.unwrap();
assert_eq!(
partition_keys.len(),
1,
"Expected 1 partition key via backtracking"
);
assert_eq!(
partition_keys[0],
"org.apache.cassandra.db.marshal.UUIDType"
);
assert_eq!(columns.len(), 2, "Expected 2 regular columns");
assert_eq!(columns[0].name, "expiring_value");
assert_eq!(columns[0].column_type, "int");
assert!(!columns[0].is_primary_key);
assert_eq!(columns[1].name, "session_info");
assert_eq!(columns[1].column_type, "text");
assert!(!columns[1].is_primary_key);
}
#[test]
fn test_partition_key_extraction_with_longer_type() {
let mut test_data = vec![0xFF; 100];
let composite_type =
"(org.apache.cassandra.db.marshal.CompositeType(UTF8Type,Int32Type,UUIDType)";
let type_len = composite_type.len() as u8;
test_data.push(type_len);
test_data.extend_from_slice(composite_type.as_bytes());
test_data.push(0x00); test_data.push(0x01);
test_data.push(0x04);
test_data.extend_from_slice(b"data");
test_data.push(0x28);
test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
let result = parse_regular_columns(&test_data);
assert!(result.is_ok(), "Failed to parse: {:?}", result.err());
let (_remaining, (partition_keys, columns)) = result.unwrap();
assert_eq!(partition_keys.len(), 1);
assert_eq!(partition_keys[0], composite_type);
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].name, "data");
assert!(!columns[0].is_primary_key);
}
#[test]
fn test_backtracking_with_no_partition_key() {
let mut test_data = vec![];
test_data.push(0x00); test_data.push(0x01);
test_data.push(0x04);
test_data.extend_from_slice(b"name");
test_data.push(0x28);
test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
let result = parse_regular_columns(&test_data);
assert!(result.is_ok());
let (_remaining, (partition_keys, columns)) = result.unwrap();
assert_eq!(partition_keys.len(), 0, "Should have no partition keys");
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].name, "name");
}
#[test]
fn test_backtracking_rejects_invalid_types() {
let mut test_data = vec![0xFF; 50];
test_data.push(0x15); test_data.extend_from_slice(b"InvalidTypeDescriptor");
test_data.extend_from_slice(&[0x00, 0x00, 0x01]);
test_data.push(0x04);
test_data.extend_from_slice(b"test");
test_data.push(0x28);
test_data.extend_from_slice(b"org.apache.cassandra.db.marshal.UTF8Type");
let result = parse_regular_columns(&test_data);
assert!(result.is_ok());
let (_remaining, (partition_keys, _columns)) = result.unwrap();
assert_eq!(
partition_keys.len(),
0,
"Should reject invalid type pattern"
);
}
}