use super::compression_info::CompressionInfo;
use crate::parser::header::CassandraVersion;
use crate::{Error, Result};
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom};
pub struct ChunkDecompressor {
compression_info: CompressionInfo,
chunk_cache: HashMap<usize, Vec<u8>>,
max_cached_chunks: usize,
data_file_path: Option<String>,
}
impl ChunkDecompressor {
pub fn new(
compression_info: CompressionInfo,
_cassandra_version: CassandraVersion,
) -> Result<Self> {
compression_info.validate()?;
Ok(Self {
compression_info,
chunk_cache: HashMap::new(),
max_cached_chunks: 16, data_file_path: None,
})
}
pub fn new_with_path(
compression_info: CompressionInfo,
_cassandra_version: CassandraVersion,
data_file_path: String,
) -> Result<Self> {
compression_info.validate()?;
Ok(Self {
compression_info,
chunk_cache: HashMap::new(),
max_cached_chunks: 16,
data_file_path: Some(data_file_path),
})
}
pub fn read_data<R: Read + Seek>(
&mut self,
reader: &mut R,
offset: u64,
length: usize,
) -> Result<Vec<u8>> {
let mut result = Vec::with_capacity(length);
let mut remaining = length;
let mut current_offset = offset;
while remaining > 0 {
let chunk_index = self.compression_info.chunk_for_offset(current_offset);
let offset_in_chunk = self.compression_info.offset_within_chunk(current_offset);
let chunk_data = self.get_decompressed_chunk(reader, chunk_index)?;
let chunk_start = offset_in_chunk as usize;
let chunk_end = std::cmp::min(chunk_start + remaining, chunk_data.len());
if chunk_start >= chunk_data.len() {
return Err(Error::InvalidFormat(format!(
"Offset {} beyond chunk {} size {}",
chunk_start,
chunk_index,
chunk_data.len()
)));
}
let chunk_slice = &chunk_data[chunk_start..chunk_end];
result.extend_from_slice(chunk_slice);
let bytes_read = chunk_slice.len();
remaining -= bytes_read;
current_offset += bytes_read as u64;
}
Ok(result)
}
fn get_decompressed_chunk<R: Read + Seek>(
&mut self,
reader: &mut R,
chunk_index: usize,
) -> Result<Vec<u8>> {
if let Some(cached_chunk) = self.chunk_cache.get(&chunk_index) {
return Ok(cached_chunk.clone());
}
let chunk_data = self.decompress_chunk(reader, chunk_index)?;
if self.chunk_cache.len() >= self.max_cached_chunks {
if let Some(first_key) = self.chunk_cache.keys().next().copied() {
self.chunk_cache.remove(&first_key);
}
}
self.chunk_cache.insert(chunk_index, chunk_data.clone());
Ok(chunk_data)
}
fn decompress_chunk<R: Read + Seek>(
&self,
reader: &mut R,
chunk_index: usize,
) -> Result<Vec<u8>> {
let compressed_offset = self
.compression_info
.compressed_chunk_offset(chunk_index)
.ok_or_else(|| Error::InvalidFormat(format!("No offset for chunk {}", chunk_index)))?;
let current_pos = reader.stream_position().map_err(Error::Io)?;
let file_size = reader.seek(SeekFrom::End(0)).map_err(Error::Io)?;
reader
.seek(SeekFrom::Start(current_pos))
.map_err(Error::Io)?;
let record_size = self
.compression_info
.compressed_chunk_size(chunk_index, file_size)
.ok_or_else(|| {
Error::InvalidFormat(format!("Cannot determine size for chunk {}", chunk_index))
})?;
if record_size < 4 {
return Err(Error::InvalidFormat(format!(
"Chunk {} record size {} is too small (minimum 4 bytes for inline CRC)",
chunk_index, record_size
)));
}
let compressed_len = (record_size - 4) as usize;
reader
.seek(SeekFrom::Start(compressed_offset))
.map_err(Error::Io)?;
let mut compressed_data = vec![0u8; compressed_len];
reader.read_exact(&mut compressed_data).map_err(Error::Io)?;
let mut crc_bytes = [0u8; 4];
reader.read_exact(&mut crc_bytes).map_err(Error::Io)?;
let stored_crc = u32::from_be_bytes(crc_bytes);
let computed_crc = crc32fast::hash(&compressed_data);
if stored_crc != computed_crc {
let file_info = match &self.data_file_path {
Some(path) => format!(" in file {}", path),
None => String::new(),
};
return Err(Error::InvalidFormat(format!(
"CRC32 mismatch for chunk {} at offset 0x{:x}{}: stored=0x{:08x}, computed=0x{:08x}, compressed_len={}",
chunk_index, compressed_offset, file_info, stored_crc, computed_crc, compressed_len
)));
}
log::debug!(
"Reading chunk {} at offset {} ({} bytes compressed, CRC OK)",
chunk_index,
compressed_offset,
compressed_len
);
let max_compressed_length = self.compression_info.max_compressed_length as usize;
if compressed_len >= max_compressed_length {
log::debug!(
"Chunk {} is incompressible (compressed_len={} >= max_compressed_length={}), returning raw bytes",
chunk_index, compressed_len, max_compressed_length
);
return Ok(compressed_data);
}
let decompressed = match self.compression_info.algorithm.as_str() {
"LZ4Compressor" => self.decompress_lz4_chunk(&compressed_data, chunk_index),
"SnappyCompressor" => self.decompress_snappy_chunk(&compressed_data, chunk_index),
"DeflateCompressor" => self.decompress_deflate_chunk(&compressed_data, chunk_index),
"ZstdCompressor" => self.decompress_zstd_chunk(&compressed_data, chunk_index),
algorithm => Err(Error::UnsupportedFormat(format!(
"Unknown compression algorithm: {}",
algorithm
))),
}?;
if chunk_index < self.compression_info.chunk_offsets.len() - 1 {
let expected_size = self.compression_info.chunk_length as usize;
if decompressed.len() != expected_size {
return Err(Error::InvalidFormat(format!(
"Decompressed chunk {} size mismatch: expected {}, got {}",
chunk_index,
expected_size,
decompressed.len()
)));
}
}
Ok(decompressed)
}
fn decompress_lz4_chunk(&self, compressed_data: &[u8], chunk_index: usize) -> Result<Vec<u8>> {
let file_info = match &self.data_file_path {
Some(path) => format!(" in file {}", path),
None => String::new(),
};
if compressed_data.len() < 4 {
return Err(Error::InvalidFormat(format!(
"LZ4 compressed data too short for chunk {}{} (need at least 4 bytes for length prefix, got {})",
chunk_index, file_info, compressed_data.len()
)));
}
let decompressed_length = u32::from_le_bytes([
compressed_data[0],
compressed_data[1],
compressed_data[2],
compressed_data[3],
]) as usize;
let expected_size = self.compression_info.chunk_length as usize;
if chunk_index < self.compression_info.chunk_offsets.len() - 1
&& decompressed_length != expected_size
{
return Err(Error::InvalidFormat(format!(
"LZ4 length prefix mismatch for chunk {} at offset 0x{:x}: expected {}, got {} (first 4 bytes: {:02x} {:02x} {:02x} {:02x}){}",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
expected_size,
decompressed_length,
compressed_data[0],
compressed_data[1],
compressed_data[2],
compressed_data[3],
file_info
)));
}
let lz4_data = &compressed_data[4..];
match lz4_flex::decompress(lz4_data, decompressed_length) {
Ok(decompressed) => {
if decompressed.len() != decompressed_length {
return Err(Error::InvalidFormat(format!(
"LZ4 decompression size mismatch for chunk {} at offset 0x{:x}: expected {}, got {}{}",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
decompressed_length,
decompressed.len(),
file_info
)));
}
Ok(decompressed)
}
Err(e) => Err(Error::InvalidFormat(format!(
"LZ4 decompression failed for chunk {} at offset 0x{:x}: {}{}",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
e,
file_info
))),
}
}
fn decompress_snappy_chunk(
&self,
compressed_data: &[u8],
chunk_index: usize,
) -> Result<Vec<u8>> {
#[cfg(feature = "snappy")]
{
use snap::raw::Decoder;
let mut decoder = Decoder::new();
match decoder.decompress_vec(compressed_data) {
Ok(decompressed) => Ok(decompressed),
Err(e) => Err(Error::InvalidFormat(format!(
"Snappy decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
e
))),
}
}
#[cfg(not(feature = "snappy"))]
{
let _ = (compressed_data, chunk_index); Err(Error::UnsupportedFormat(
"Snappy support not compiled in".to_string(),
))
}
}
fn decompress_deflate_chunk(
&self,
compressed_data: &[u8],
chunk_index: usize,
) -> Result<Vec<u8>> {
#[cfg(feature = "deflate")]
{
use flate2::read::DeflateDecoder;
use std::io::Read;
let mut decoder = DeflateDecoder::new(compressed_data);
let mut decompressed = Vec::new();
match decoder.read_to_end(&mut decompressed) {
Ok(_) => Ok(decompressed),
Err(e) => Err(Error::InvalidFormat(format!(
"Deflate decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
e
))),
}
}
#[cfg(not(feature = "deflate"))]
{
let _ = (compressed_data, chunk_index); Err(Error::UnsupportedFormat(
"Deflate support not compiled in".to_string(),
))
}
}
fn decompress_zstd_chunk(&self, compressed_data: &[u8], chunk_index: usize) -> Result<Vec<u8>> {
#[cfg(feature = "zstd")]
{
match zstd::decode_all(compressed_data) {
Ok(decompressed) => Ok(decompressed),
Err(e) => Err(Error::InvalidFormat(format!(
"Zstd decompression failed for chunk {} at offset 0x{:x}: {}. No fallback allowed for modern formats.",
chunk_index,
self.compression_info
.chunk_offsets
.get(chunk_index)
.unwrap_or(&0),
e
))),
}
}
#[cfg(not(feature = "zstd"))]
{
let _ = (compressed_data, chunk_index); Err(Error::UnsupportedFormat(
"Zstd support not compiled in".to_string(),
))
}
}
pub fn clear_cache(&mut self) {
self.chunk_cache.clear();
}
pub fn cache_stats(&self) -> (usize, usize) {
(self.chunk_cache.len(), self.max_cached_chunks)
}
pub fn read_all_data<R: Read + Seek>(&mut self, reader: &mut R) -> Result<Vec<u8>> {
self.read_data(reader, 0, self.compression_info.data_length as usize)
}
pub fn compression_info(&self) -> &CompressionInfo {
&self.compression_info
}
}
pub fn create_decompressor_from_file(
compression_info_path: &std::path::Path,
) -> Result<ChunkDecompressor> {
let compression_data = std::fs::read(compression_info_path).map_err(Error::Io)?;
let compression_info = CompressionInfo::parse(&compression_data)?;
log::info!("Loaded compression info:");
log::info!(" Algorithm: {}", compression_info.algorithm);
log::info!(" Chunk Length: {} bytes", compression_info.chunk_length);
log::info!(" Data Length: {} bytes", compression_info.data_length);
log::info!(" Chunk Count: {}", compression_info.chunk_offsets.len());
ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_decompressor_creation() {
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: 32768,
chunk_offsets: vec![0, 8192, 16384],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let decompressor =
ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release).unwrap();
assert_eq!(decompressor.compression_info.algorithm, "LZ4Compressor");
assert_eq!(decompressor.compression_info.chunk_length, 16384);
assert_eq!(decompressor.compression_info.chunk_offsets.len(), 3);
}
#[test]
fn test_chunk_cache() {
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: 16384,
chunk_offsets: vec![0],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let mut decompressor =
ChunkDecompressor::new(compression_info, CassandraVersion::V5_0Release).unwrap();
let (cached, max) = decompressor.cache_stats();
assert_eq!(cached, 0);
assert_eq!(max, 16);
decompressor.clear_cache();
let (cached_after_clear, _) = decompressor.cache_stats();
assert_eq!(cached_after_clear, 0);
}
}