use std::path::Path;
use crate::{
parser::header::{
parse_sstable_header, CassandraVersion, ColumnInfo, CompressionInfo, SSTableHeader,
SSTableStats, SUPPORTED_MAGIC_NUMBERS,
},
storage::sstable::version_gate::VersionGates,
Error, Result,
};
use super::super::header_spec::{get_global_registry, ParsedHeader};
pub(crate) use super::header_helpers::{
calculate_actual_header_size, extract_generation_from_path,
};
fn extract_keyspace_from_path(path: &Path) -> String {
path.parent()
.and_then(|table_dir| {
table_dir.parent()
})
.and_then(|keyspace_dir| keyspace_dir.file_name())
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string()
}
fn extract_table_name_from_path(path: &Path) -> String {
path.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.and_then(|s| {
s.rsplit_once('-').map(|(table_name, _uuid)| table_name)
})
.unwrap_or("unknown")
.to_string()
}
pub(crate) fn is_ascii_corruption_value(value: u32) -> bool {
match value {
2959239534 | 1684108385 => return true, _ => {}
}
let bytes = value.to_be_bytes();
let ascii_count = bytes
.iter()
.filter(|&&b| (0x20..=0x7E).contains(&b))
.count();
ascii_count >= 3
}
pub(crate) fn detect_ascii_header_corruption(header: &[u8]) -> bool {
if header.len() < 4 {
return false;
}
let chunk = &header[0..4];
let ascii_patterns = [
b"data", b"node", b"temp", b"logs", b"meta", b"home", b"root",
];
for pattern in &ascii_patterns {
if chunk == *pattern {
return true;
}
}
let ascii_count = chunk
.iter()
.filter(|&&b| (0x20..=0x7E).contains(&b))
.count();
ascii_count >= 3
}
pub(crate) async fn parse_header_with_version_detection(
header_buffer: &[u8],
path: &Path,
gates: &VersionGates,
) -> Result<SSTableHeader> {
if header_buffer.len() < 8 {
return Err(Error::corruption(format!(
"Header buffer too small for parsing: {} bytes (minimum 8 bytes required). \
File: {}",
header_buffer.len(),
path.display()
)));
}
let is_nb_or_oa_format = matches!(gates, VersionGates::Big(_))
|| path
.file_name()
.and_then(|n| n.to_str())
.map(|s| (s.contains("nb-") || s.contains("oa-")) && s.contains("-big-"))
.unwrap_or(false);
if is_nb_or_oa_format {
log::debug!(
"Detected BIG format (nb/oa) from filename '{}' — checking for embedded header presence",
path.display()
);
if header_buffer.len() >= 4 {
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let detected_version = CassandraVersion::from_magic_number(first_4_bytes);
let is_snappy_varint_collision = matches!(
detected_version,
Some(CassandraVersion::V5_0WideRows) | Some(CassandraVersion::V5_0StaticColumns)
) && (header_buffer[0] & 0x80) != 0;
if detected_version.is_some() && !is_snappy_varint_collision {
log::debug!(
"NB format file '{}' has embedded header (magic: 0x{:08x}) - using standard header parsing",
path.display(),
first_4_bytes
);
} else if is_snappy_varint_collision {
log::debug!(
"NB format file '{}' has Snappy varint collision with magic 0x{:08x} - treating as headerless",
path.display(),
first_4_bytes
);
return create_minimal_nb_header(path).await;
} else {
log::debug!(
"NB format file '{}' is headerless (first bytes: 0x{:08x}) - loading CompressionInfo.db",
path.display(),
first_4_bytes
);
return create_minimal_nb_header(path).await;
}
} else {
log::warn!(
"NB format file '{}' has insufficient header buffer ({} bytes) - assuming headerless format",
path.display(),
header_buffer.len()
);
return create_minimal_nb_header(path).await;
}
}
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
if let Some(CassandraVersion::V5_0Uncompressed) =
CassandraVersion::from_magic_number(first_4_bytes)
{
let parent_dir = path.parent().unwrap_or(Path::new("."));
let compression_info_exists = check_compression_info_exists(path, parent_dir);
if !compression_info_exists {
log::debug!(
"Detected V5_0Uncompressed magic (0x{:08x}) but no CompressionInfo.db file exists - \
treating as headerless uncompressed format (partition data collision). File: '{}'",
first_4_bytes,
path.display()
);
return create_minimal_uncompressed_header(path).await;
} else {
log::debug!(
"Detected V5_0Uncompressed magic (0x{:08x}) with CompressionInfo.db present - \
parsing as standard header. File: '{}'",
first_4_bytes,
path.display()
);
}
}
let actual_header = if CassandraVersion::from_magic_number(first_4_bytes).is_none() {
log::debug!(
"Detected CRC32 checksum prefix: 0x{:08x} in file '{}'",
first_4_bytes,
path.display()
);
let expected_checksum = first_4_bytes;
let header_data = &header_buffer[4..];
if header_data.len() < 4 {
return Err(Error::corruption(format!(
"Insufficient data after CRC32 prefix: {} bytes. File: {}",
header_data.len(),
path.display()
)));
}
let computed_checksum = crc32fast::hash(header_data);
if computed_checksum != expected_checksum {
log::warn!(
"Header CRC32 checksum mismatch for file '{}' \
(Expected: 0x{:08x}, Computed: 0x{:08x}). \
Proceeding with parsing - checksum validation may use different algorithm.",
path.display(),
expected_checksum,
computed_checksum
);
} else {
log::info!(
"Header CRC32 validated (0x{:08x}) for file '{}'",
expected_checksum,
path.display()
);
}
header_data
} else {
header_buffer
};
let registry = get_global_registry();
match registry.parse_data_header(actual_header) {
Ok(parsed_header) => {
log::debug!(
"Successfully parsed Data.db header using spec-driven approach for file '{}' \
with version: {:?}",
path.display(),
parsed_header.cassandra_version
);
return convert_parsed_header_to_sstable_header(parsed_header, actual_header);
}
Err(spec_error) => {
log::debug!(
"Spec-driven parsing failed for file '{}', falling back to legacy parser: {}",
path.display(),
spec_error
);
}
}
let magic_bytes = &actual_header[0..4];
let magic = u32::from_be_bytes([
magic_bytes[0],
magic_bytes[1],
magic_bytes[2],
magic_bytes[3],
]);
if !SUPPORTED_MAGIC_NUMBERS.contains(&magic) {
return Err(Error::unsupported_format(format!(
"Unsupported SSTable format: magic number 0x{:08x} not recognized. \
Supported formats: {:?}. File: {}. \
This may indicate file corruption or an unsupported Cassandra version.",
magic,
SUPPORTED_MAGIC_NUMBERS
.iter()
.map(|m| format!("0x{:08x}", m))
.collect::<Vec<_>>(),
path.display()
)));
}
let cassandra_version = CassandraVersion::from_magic_number(magic).ok_or_else(|| {
Error::corruption(format!(
"Failed to map magic number 0x{:08x} to Cassandra version. File: {}",
magic,
path.display()
))
})?;
match parse_sstable_header(actual_header) {
Ok((_, mut header)) => {
log::debug!(
"Successfully parsed header for file '{}' with version: {:?}",
path.display(),
header.cassandra_version
);
header.keyspace = extract_keyspace_from_path(path);
header.table_name = extract_table_name_from_path(path);
Ok(header)
}
Err(parse_error) => {
if cassandra_version == CassandraVersion::Legacy {
#[cfg(feature = "legacy-heuristics")]
{
log::warn!(
"Failed to parse full header for legacy format file '{}', \
attempting minimal legacy header parsing: {:?}",
path.display(),
parse_error
);
parse_minimal_legacy_header(actual_header, path, cassandra_version)
}
#[cfg(not(feature = "legacy-heuristics"))]
{
Err(Error::unsupported_format(format!(
"Legacy SSTable format detected but legacy-heuristics feature is disabled. \
Enable feature for backward compatibility. File: {}. Parse error: {:?}",
path.display(),
parse_error
)))
}
} else {
Err(Error::corruption(format!(
"Failed to parse header for modern format {:?} file '{}': {:?}. \
This indicates file corruption or format incompatibility.",
cassandra_version,
path.display(),
parse_error
)))
}
}
}
}
pub(crate) fn convert_parsed_header_to_sstable_header(
parsed_header: ParsedHeader,
_header_buffer: &[u8],
) -> Result<SSTableHeader> {
use std::collections::HashMap;
let table_id = parsed_header
.fields
.get("table_id")
.and_then(|v| v.as_bytes().ok())
.and_then(|bytes| {
if bytes.len() == 16 {
let mut id = [0u8; 16];
id.copy_from_slice(bytes);
Some(id)
} else {
None
}
})
.unwrap_or([0u8; 16]);
let keyspace = parsed_header
.fields
.get("keyspace")
.and_then(|v| v.as_string().ok())
.unwrap_or("unknown")
.to_string();
let table_name = parsed_header
.fields
.get("table_name")
.and_then(|v| v.as_string().ok())
.unwrap_or("unknown")
.to_string();
let generation = parsed_header
.fields
.get("generation")
.and_then(|v| v.as_u64().ok())
.unwrap_or(0);
let compression = CompressionInfo {
algorithm: "NONE".to_string(),
chunk_size: 4096,
parameters: HashMap::new(),
};
let stats = SSTableStats {
row_count: 0,
min_timestamp: 0,
max_timestamp: 0,
max_deletion_time: 0,
compression_ratio: 1.0,
row_size_histogram: Vec::new(),
};
let columns = Vec::<ColumnInfo>::new();
let properties = HashMap::new();
Ok(SSTableHeader {
cassandra_version: parsed_header.cassandra_version,
version: parsed_header.format_version as u16,
table_id,
keyspace,
table_name,
generation,
compression,
stats,
columns,
properties,
})
}
fn check_compression_info_exists(data_db_path: &Path, parent_dir: &Path) -> bool {
use super::compression::extract_sstable_base_name;
if let Some(base_name) = extract_sstable_base_name(data_db_path) {
let compression_info_path = parent_dir.join(format!("{}-CompressionInfo.db", base_name));
if compression_info_path.exists() {
return true;
}
}
let generic_path = parent_dir.join("CompressionInfo.db");
generic_path.exists()
}
async fn create_minimal_uncompressed_header(path: &Path) -> Result<SSTableHeader> {
log::info!(
"Creating minimal uncompressed header for headerless file: {}",
path.display()
);
Ok(SSTableHeader {
cassandra_version: CassandraVersion::V5_0Uncompressed,
version: 0, table_id: [0; 16],
keyspace: extract_keyspace_from_path(path),
table_name: extract_table_name_from_path(path),
generation: extract_generation_from_path(path),
compression: CompressionInfo {
algorithm: "NONE".to_string(),
chunk_size: 0,
parameters: std::collections::HashMap::new(),
},
stats: SSTableStats {
row_count: 0,
min_timestamp: 0,
max_timestamp: 0,
max_deletion_time: 0,
compression_ratio: 1.0,
row_size_histogram: vec![],
},
columns: vec![],
properties: std::collections::HashMap::new(),
})
}
async fn create_minimal_nb_header(path: &Path) -> Result<SSTableHeader> {
let compression_algorithm = match load_nb_compression_info(path).await {
Ok(info) => {
log::info!(
"Loaded CompressionInfo.db for NB format: algorithm={}, chunk_length={}, chunks={}",
info.algorithm,
info.chunk_length,
info.chunk_offsets.len()
);
info.algorithm
}
Err(e) => {
log::warn!(
"Could not load CompressionInfo.db for NB format file '{}': {}. Assuming no compression.",
path.display(),
e
);
"NONE".to_string()
}
};
Ok(SSTableHeader {
cassandra_version: CassandraVersion::V5_0NewBig, version: 0, table_id: [0; 16], keyspace: extract_keyspace_from_path(path),
table_name: extract_table_name_from_path(path),
generation: extract_generation_from_path(path),
compression: CompressionInfo {
algorithm: compression_algorithm,
chunk_size: 16384, parameters: std::collections::HashMap::new(),
},
stats: SSTableStats {
row_count: 0,
min_timestamp: 0,
max_timestamp: 0,
max_deletion_time: 0,
compression_ratio: 1.0,
row_size_histogram: vec![],
},
columns: vec![],
properties: std::collections::HashMap::new(),
})
}
async fn load_nb_compression_info(
data_db_path: &Path,
) -> Result<crate::storage::sstable::compression_info::CompressionInfo> {
use super::compression::extract_sstable_base_name;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
let base_name = extract_sstable_base_name(data_db_path).ok_or_else(|| {
Error::InvalidFormat(format!("Cannot extract base name from {:?}", data_db_path))
})?;
let parent_dir = data_db_path.parent().unwrap_or(Path::new("."));
let compression_info_path = parent_dir.join(format!("{}-CompressionInfo.db", base_name));
let mut file = File::open(&compression_info_path).await.map_err(|e| {
Error::InvalidFormat(format!(
"Failed to open CompressionInfo.db at {:?}: {}. NB format requires CompressionInfo.db",
compression_info_path, e
))
})?;
let mut data = Vec::new();
file.read_to_end(&mut data).await.map_err(|e| {
Error::InvalidFormat(format!(
"Failed to read CompressionInfo.db at {:?}: {}",
compression_info_path, e
))
})?;
crate::storage::sstable::compression_info::CompressionInfo::parse(&data)
}
#[cfg(feature = "legacy-heuristics")]
pub(crate) fn parse_minimal_legacy_header(
header_buffer: &[u8],
path: &Path,
cassandra_version: CassandraVersion,
) -> Result<SSTableHeader> {
use crate::parser::header::SUPPORTED_VERSION;
let version = if header_buffer.len() >= 6 {
u16::from_be_bytes([header_buffer[4], header_buffer[5]])
} else {
log::warn!(
"Legacy header too short for version extraction, using default version. File: {}",
path.display()
);
SUPPORTED_VERSION
};
if version > 100 {
return Err(Error::corruption(format!(
"Invalid version {} in legacy header. File: {}",
version,
path.display()
)));
}
log::info!(
"Creating minimal legacy header for file '{}' with version {}",
path.display(),
version
);
Ok(SSTableHeader {
cassandra_version,
version,
table_id: [0; 16], keyspace: path
.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.map(|s| s.split('-').next().unwrap_or("unknown").to_string())
.unwrap_or_else(|| "unknown".to_string()),
table_name: path
.file_stem()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string()),
generation: extract_generation_from_path(path),
compression: CompressionInfo {
algorithm: "NONE".to_string(),
chunk_size: 0,
parameters: std::collections::HashMap::new(),
},
stats: SSTableStats {
row_count: 0,
min_timestamp: 0,
max_timestamp: 0,
max_deletion_time: 0,
compression_ratio: 1.0,
row_size_histogram: vec![],
},
columns: vec![],
properties: std::collections::HashMap::new(),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_snappy_varint_collision_detection() {
let header_buffer = [0xF0, 0x7C, 0x5C, 0x00, 0x10, 0x30, 0xB5, 0x68];
assert_eq!(
header_buffer[0] & 0x80,
0x80,
"First byte should have high bit set"
);
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
assert_eq!(first_4_bytes, 0xF07C5C00, "Should match V5_0WideRows magic");
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(
detected,
Some(CassandraVersion::V5_0WideRows),
"Should detect V5_0WideRows"
);
let is_collision =
detected == Some(CassandraVersion::V5_0WideRows) && (header_buffer[0] & 0x80) != 0;
assert!(is_collision, "Should detect Snappy collision");
}
#[test]
fn test_genuine_magic_number_not_collision() {
let header_buffer = [0x0F, 0x3C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
assert_eq!(
header_buffer[0] & 0x80,
0x00,
"First byte should NOT have high bit"
);
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(
detected,
Some(CassandraVersion::V5_0TypedCollections),
"Should detect V5_0TypedCollections"
);
let is_collision =
detected == Some(CassandraVersion::V5_0WideRows) && (header_buffer[0] & 0x80) != 0;
assert!(
!is_collision,
"V5_0TypedCollections should not be flagged as collision"
);
}
#[test]
fn test_snappy_varint_collision_v5_0_static_columns() {
let header_buffer = [0xC0, 0x51, 0x5C, 0x00, 0x10, 0x30, 0xB5, 0x68];
assert_eq!(
header_buffer[0] & 0x80,
0x80,
"First byte should have high bit set"
);
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
assert_eq!(
first_4_bytes, 0xC051_5C00,
"Should match V5_0StaticColumns magic"
);
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(
detected,
Some(CassandraVersion::V5_0StaticColumns),
"Should detect V5_0StaticColumns"
);
let is_collision = matches!(
detected,
Some(CassandraVersion::V5_0WideRows) | Some(CassandraVersion::V5_0StaticColumns)
) && (header_buffer[0] & 0x80) != 0;
assert!(
is_collision,
"Should detect Snappy collision for V5_0StaticColumns"
);
}
#[test]
fn test_other_high_bit_magic_not_collision() {
let header_buffer = [0x82, 0x36, 0x5C, 0x00, 0x00, 0x00, 0x00, 0x00];
assert_eq!(
header_buffer[0] & 0x80,
0x80,
"First byte should have high bit"
);
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(
detected,
Some(CassandraVersion::V5_0ComplexTypes),
"Should detect V5_0ComplexTypes"
);
let is_collision = matches!(
detected,
Some(CassandraVersion::V5_0WideRows) | Some(CassandraVersion::V5_0StaticColumns)
) && (header_buffer[0] & 0x80) != 0;
assert!(
!is_collision,
"V5_0ComplexTypes should not be flagged as collision"
);
}
#[test]
fn test_unrecognized_bytes_not_collision() {
let header_buffer = [0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00];
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(detected, None, "Should not detect any version");
let is_collision =
detected == Some(CassandraVersion::V5_0WideRows) && (header_buffer[0] & 0x80) != 0;
assert!(!is_collision, "Random bytes should not be flagged");
}
#[test]
fn test_v5_uncompressed_magic_collision() {
let header_buffer = [0x00, 0x10, 0x04, 0x5e, 0x63, 0xfc, 0x4e, 0x93];
let first_4_bytes = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
assert_eq!(
first_4_bytes, 0x0010045e,
"Should match V5_0Uncompressed magic"
);
let detected = CassandraVersion::from_magic_number(first_4_bytes);
assert_eq!(
detected,
Some(CassandraVersion::V5_0Uncompressed),
"Should detect V5_0Uncompressed"
);
}
}