use log::{debug, warn};
use std::path::Path;
use crate::{parser::SSTableHeader, Result};
use super::super::compression::{CompressionAlgorithm, CompressionInfo, CompressionReader};
pub(crate) async fn detect_and_initialize_compression(
header: &SSTableHeader,
path: &Path,
) -> Result<Option<CompressionReader>> {
if header.compression.algorithm != "NONE" {
let algorithm = CompressionAlgorithm::from(header.compression.algorithm.as_str());
debug!("Header indicates compression: {:?}", algorithm);
match algorithm {
CompressionAlgorithm::Lz4
| CompressionAlgorithm::Snappy
| CompressionAlgorithm::Deflate
| CompressionAlgorithm::Zstd => {
return Ok(Some(CompressionReader::new(algorithm)));
}
CompressionAlgorithm::None => {
}
}
}
let parent_dir = path.parent().unwrap_or(Path::new("."));
if let Some(compression_reader) = discover_compression_info(path, parent_dir).await? {
return Ok(Some(compression_reader));
}
#[cfg(feature = "legacy-heuristics")]
{
if let Some(algorithm) = detect_compression_heuristic(header, path).await? {
debug!("Heuristic detection found compression: {:?}", algorithm);
return Ok(Some(CompressionReader::new(algorithm)));
}
}
#[cfg(feature = "legacy-heuristics")]
{
if let Some(algorithm) = detect_compression_from_filename(path) {
debug!("Filename pattern suggests compression: {:?}", algorithm);
return Ok(Some(CompressionReader::new(algorithm)));
}
}
debug!("No compression detected for {:?}", path);
Ok(None)
}
#[cfg(feature = "legacy-heuristics")]
async fn detect_compression_heuristic(
header: &SSTableHeader,
_path: &Path,
) -> Result<Option<CompressionAlgorithm>> {
match header.cassandra_version {
crate::parser::header::CassandraVersion::V5_0NewBig
| crate::parser::header::CassandraVersion::V5_0Bti
| crate::parser::header::CassandraVersion::V5_0Alpha
| crate::parser::header::CassandraVersion::V5_0Beta
| crate::parser::header::CassandraVersion::V5_0Release
| crate::parser::header::CassandraVersion::V5_0DataFormat
| crate::parser::header::CassandraVersion::V5_0FormatC
| crate::parser::header::CassandraVersion::V5_0FormatD
| crate::parser::header::CassandraVersion::V5_0FormatE
| crate::parser::header::CassandraVersion::V5_0FormatF
| crate::parser::header::CassandraVersion::V5_0FormatG
| crate::parser::header::CassandraVersion::V5_0StaticColumns
| crate::parser::header::CassandraVersion::V5_0Uncompressed
| crate::parser::header::CassandraVersion::V5_0ComplexTypes
| crate::parser::header::CassandraVersion::V5_0TypedCollections
| crate::parser::header::CassandraVersion::V5_0WideRows
| crate::parser::header::CassandraVersion::V5_0NewBigFormat => {
log::error!(
"Heuristic compression detection called for modern format: {:?}",
header.cassandra_version
);
Ok(None)
}
crate::parser::header::CassandraVersion::Legacy => {
log::warn!("Using unreliable heuristic compression detection for legacy format");
if header.compression.algorithm != "NONE" {
match header.compression.algorithm.to_uppercase().as_str() {
"LZ4" => Ok(Some(CompressionAlgorithm::Lz4)),
"SNAPPY" => Ok(Some(CompressionAlgorithm::Snappy)),
"ZSTD" => Ok(Some(CompressionAlgorithm::Zstd)),
"DEFLATE" => Ok(Some(CompressionAlgorithm::Deflate)),
_ => {
log::warn!(
"Unknown compression algorithm in header: {}",
header.compression.algorithm
);
Ok(None)
}
}
} else {
Ok(None)
}
}
}
}
#[cfg(feature = "legacy-heuristics")]
fn detect_compression_from_filename(path: &Path) -> Option<CompressionAlgorithm> {
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if filename.contains("lz4") || filename.contains("LZ4") {
Some(CompressionAlgorithm::Lz4)
} else if filename.contains("snappy") || filename.contains("SNAPPY") {
Some(CompressionAlgorithm::Snappy)
} else if filename.contains("deflate") || filename.contains("DEFLATE") {
Some(CompressionAlgorithm::Deflate)
} else if filename.contains("zstd") || filename.contains("ZSTD") {
Some(CompressionAlgorithm::Zstd)
} else {
None
}
}
async fn discover_compression_info(
sstable_path: &Path,
parent_dir: &Path,
) -> Result<Option<CompressionReader>> {
let standard_patterns = get_standard_compression_patterns(sstable_path);
for pattern in &standard_patterns {
let compression_info_path = parent_dir.join(pattern);
if compression_info_path.exists() {
match load_compression_info(&compression_info_path).await {
Ok(compression_info) => {
let algorithm = compression_info.get_algorithm();
debug!(
"Found CompressionInfo at {:?} with algorithm: {:?}, chunks: {}",
compression_info_path,
algorithm,
compression_info.chunk_count()
);
if algorithm != CompressionAlgorithm::None {
return Ok(Some(CompressionReader::new(algorithm)));
}
}
Err(e) => {
warn!(
"Failed to load CompressionInfo from {:?}: {}",
compression_info_path, e
);
continue;
}
}
}
}
match scan_directory_for_compression_files(parent_dir, sstable_path).await {
Ok(Some(compression_reader)) => {
return Ok(Some(compression_reader));
}
Ok(None) => {
}
Err(e) => {
warn!("Directory scan failed: {}", e);
}
}
Ok(None)
}
fn get_standard_compression_patterns(sstable_path: &Path) -> Vec<String> {
let mut patterns = Vec::new();
if let Some(base_name) = extract_sstable_base_name(sstable_path) {
patterns.push(format!("{}-CompressionInfo.db", base_name));
}
let generations = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 35, 40, 45, 46, 47, 50, 55,
];
for generation in &generations {
patterns.push(format!("nb-{}-big-CompressionInfo.db", generation));
}
patterns.push("CompressionInfo.db".to_string());
if let Some(stem) = sstable_path.file_stem().and_then(|s| s.to_str()) {
patterns.push(format!("{}-CompressionInfo.db", stem));
}
patterns
}
async fn scan_directory_for_compression_files(
dir: &Path,
sstable_path: &Path,
) -> Result<Option<CompressionReader>> {
use std::fs;
let entries = match fs::read_dir(dir) {
Ok(entries) => entries,
Err(e) => {
warn!("Cannot read directory {:?}: {}", dir, e);
return Ok(None);
}
};
let mut compression_files = Vec::new();
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_file() {
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.ends_with("CompressionInfo.db") {
compression_files.push(path);
}
}
}
}
compression_files.sort_by(|a, b| {
let score_a = score_compression_file_match(a, sstable_path);
let score_b = score_compression_file_match(b, sstable_path);
score_b.cmp(&score_a) });
for compression_path in compression_files {
match load_compression_info(&compression_path).await {
Ok(compression_info) => {
let algorithm = compression_info.get_algorithm();
log::debug!(
"Found CompressionInfo via directory scan at {:?} with algorithm: {:?}, chunks: {}",
compression_path,
algorithm,
compression_info.chunk_count()
);
if algorithm != CompressionAlgorithm::None {
return Ok(Some(CompressionReader::new(algorithm)));
}
}
Err(e) => {
log::warn!(
"Failed to load CompressionInfo from {:?}: {}",
compression_path,
e
);
continue;
}
}
}
Ok(None)
}
fn score_compression_file_match(compression_path: &Path, sstable_path: &Path) -> i32 {
let Some(comp_name) = compression_path.file_name().and_then(|n| n.to_str()) else {
return 0;
};
let Some(sstable_name) = sstable_path.file_name().and_then(|n| n.to_str()) else {
return 0;
};
let mut score = 0;
if let Some(base_name) = extract_sstable_base_name(sstable_path) {
if comp_name.starts_with(&base_name) {
score += 100;
}
}
if let Some(sstable_gen) = extract_generation_number(sstable_name) {
if let Some(comp_gen) = extract_generation_number(comp_name) {
if sstable_gen == comp_gen {
score += 50;
}
}
}
if sstable_name.contains("nb-")
&& sstable_name.contains("-big-")
&& comp_name.contains("nb-")
&& comp_name.contains("-big-")
{
score += 25;
}
if comp_name == "CompressionInfo.db" {
score += 1;
}
score
}
fn extract_generation_number(filename: &str) -> Option<u32> {
if let Some(start) = filename.find("nb-") {
let after_nb = &filename[start + 3..];
if let Some(end) = after_nb.find('-') {
let gen_str = &after_nb[..end];
gen_str.parse().ok()
} else {
None
}
} else {
None
}
}
async fn load_compression_info(path: &Path) -> Result<CompressionInfo> {
use tokio::fs::File;
use tokio::io::AsyncReadExt;
let mut file = File::open(path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
CompressionInfo::parse_binary(&buffer)
}
pub fn extract_sstable_base_name(path: &Path) -> Option<String> {
let filename = path.file_name()?.to_str()?;
let filename_no_ext = filename.strip_suffix(".db")?;
let parts: Vec<&str> = filename_no_ext.split('-').collect();
if parts.len() >= 4 {
Some(parts[0..3].join("-"))
} else {
log::warn!("Non-standard SSTable filename pattern: {}", filename);
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_extract_sstable_base_name_standard_data_file() {
let path = PathBuf::from("nb-1-big-Data.db");
assert_eq!(
extract_sstable_base_name(&path),
Some("nb-1-big".to_string())
);
}
#[test]
fn test_extract_sstable_base_name_index_file() {
let path = PathBuf::from("nb-2-big-Index.db");
assert_eq!(
extract_sstable_base_name(&path),
Some("nb-2-big".to_string())
);
}
#[test]
fn test_extract_sstable_base_name_compression_info() {
let path = PathBuf::from("nb-45-big-CompressionInfo.db");
assert_eq!(
extract_sstable_base_name(&path),
Some("nb-45-big".to_string())
);
}
#[test]
fn test_extract_sstable_base_name_with_full_path() {
let path = PathBuf::from("/var/lib/cassandra/data/keyspace/table-uuid/nb-1-big-Data.db");
assert_eq!(
extract_sstable_base_name(&path),
Some("nb-1-big".to_string())
);
}
#[test]
fn test_extract_sstable_base_name_statistics() {
let path = PathBuf::from("nb-100-big-Statistics.db");
assert_eq!(
extract_sstable_base_name(&path),
Some("nb-100-big".to_string())
);
}
#[test]
fn test_extract_sstable_base_name_too_few_parts() {
let path = PathBuf::from("invalid.db");
assert_eq!(extract_sstable_base_name(&path), None);
}
#[test]
fn test_extract_sstable_base_name_no_extension() {
let path = PathBuf::from("nb-1-big-Data");
assert_eq!(extract_sstable_base_name(&path), None);
}
#[test]
fn test_extract_sstable_base_name_wrong_extension() {
let path = PathBuf::from("nb-1-big-Data.txt");
assert_eq!(extract_sstable_base_name(&path), None);
}
#[test]
fn test_extract_sstable_base_name_three_parts() {
let path = PathBuf::from("nb-1-big.db");
assert_eq!(extract_sstable_base_name(&path), None);
}
#[test]
fn test_extract_generation_number_standard() {
assert_eq!(extract_generation_number("nb-1-big-Data.db"), Some(1));
assert_eq!(extract_generation_number("nb-45-big-Index.db"), Some(45));
assert_eq!(
extract_generation_number("nb-100-big-CompressionInfo.db"),
Some(100)
);
}
#[test]
fn test_extract_generation_number_large() {
assert_eq!(
extract_generation_number("nb-999999-big-Data.db"),
Some(999999)
);
}
#[test]
fn test_extract_generation_number_no_match() {
assert_eq!(extract_generation_number("other-format.db"), None);
assert_eq!(extract_generation_number("Data.db"), None);
}
#[test]
fn test_extract_generation_number_malformed() {
assert_eq!(extract_generation_number("nb-1"), None);
assert_eq!(extract_generation_number("nb-abc-big-Data.db"), None);
}
#[test]
fn test_score_compression_file_match_exact_base() {
let sstable = PathBuf::from("nb-1-big-Data.db");
let compression = PathBuf::from("nb-1-big-CompressionInfo.db");
let score = score_compression_file_match(&compression, &sstable);
assert!(
score >= 100,
"Expected high score for exact base match, got {}",
score
);
}
#[test]
fn test_score_compression_file_match_generation_only() {
let sstable = PathBuf::from("nb-45-big-Data.db");
let compression = PathBuf::from("nb-45-other-CompressionInfo.db");
let score = score_compression_file_match(&compression, &sstable);
assert!(
score >= 50,
"Expected score for generation match, got {}",
score
);
}
#[test]
fn test_score_compression_file_match_generic() {
let sstable = PathBuf::from("nb-1-big-Data.db");
let compression = PathBuf::from("CompressionInfo.db");
let score = score_compression_file_match(&compression, &sstable);
assert_eq!(score, 1, "Expected score 1 for generic CompressionInfo.db");
}
#[test]
fn test_score_compression_file_match_no_filename() {
let sstable = PathBuf::from("");
let compression = PathBuf::from("");
let score = score_compression_file_match(&compression, &sstable);
assert_eq!(score, 0, "Expected 0 for empty paths");
}
#[test]
fn test_score_compression_file_match_different_generation() {
let sstable = PathBuf::from("nb-1-big-Data.db");
let compression = PathBuf::from("nb-99-big-CompressionInfo.db");
let score = score_compression_file_match(&compression, &sstable);
assert!(
score < 100,
"Expected lower score for different generation, got {}",
score
);
}
#[test]
fn test_get_standard_compression_patterns_includes_base_name() {
let path = PathBuf::from("nb-1-big-Data.db");
let patterns = get_standard_compression_patterns(&path);
assert!(
patterns.contains(&"nb-1-big-CompressionInfo.db".to_string()),
"Should include pattern for base name: {:?}",
patterns
);
}
#[test]
fn test_get_standard_compression_patterns_includes_generations() {
let path = PathBuf::from("nb-1-big-Data.db");
let patterns = get_standard_compression_patterns(&path);
assert!(
patterns.contains(&"nb-1-big-CompressionInfo.db".to_string()),
"Should include generation 1: {:?}",
patterns
);
assert!(
patterns.contains(&"nb-45-big-CompressionInfo.db".to_string()),
"Should include generation 45: {:?}",
patterns
);
}
#[test]
fn test_get_standard_compression_patterns_includes_fallback() {
let path = PathBuf::from("nb-1-big-Data.db");
let patterns = get_standard_compression_patterns(&path);
assert!(
patterns.contains(&"CompressionInfo.db".to_string()),
"Should include generic fallback: {:?}",
patterns
);
}
#[test]
fn test_get_standard_compression_patterns_non_standard_path() {
let path = PathBuf::from("weird-file.db");
let patterns = get_standard_compression_patterns(&path);
assert!(
patterns.contains(&"CompressionInfo.db".to_string()),
"Should include generic fallback for non-standard path: {:?}",
patterns
);
}
}