use crate::storage::sstable::compression_info::CompressionInfo;
use crate::{Error, Result};
use std::io::{Read, Seek, SeekFrom};
pub struct ChunkReader<R: Read + Seek> {
reader: R,
compression_info: CompressionInfo,
total_file_size: u64,
}
impl<R: Read + Seek> ChunkReader<R> {
pub fn new(reader: R, compression_info: CompressionInfo, total_file_size: u64) -> Self {
Self {
reader,
compression_info,
total_file_size,
}
}
pub fn read_chunk(&mut self, chunk_index: usize) -> Result<Vec<u8>> {
let offset = self
.compression_info
.compressed_chunk_offset(chunk_index)
.ok_or_else(|| {
Error::InvalidFormat(format!(
"Chunk {} not found in CompressionInfo (total chunks: {})",
chunk_index,
self.compression_info.chunk_offsets.len()
))
})?;
let total_chunk_size = self
.compression_info
.compressed_chunk_size(chunk_index, self.total_file_size)
.ok_or_else(|| {
Error::InvalidFormat(format!(
"Cannot determine size for chunk {} (file_size={})",
chunk_index, self.total_file_size
))
})?;
self.reader.seek(SeekFrom::Start(offset)).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to seek to chunk {} at offset 0x{:x}: {}",
chunk_index, offset, e
),
))
})?;
if total_chunk_size < 4 {
return Err(Error::InvalidFormat(format!(
"Chunk {} size too small: {} bytes (minimum 4 for CRC)",
chunk_index, total_chunk_size
)));
}
let chunk_size = (total_chunk_size - 4) as usize;
let mut chunk_data = vec![0u8; chunk_size];
self.reader.read_exact(&mut chunk_data).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to read chunk {} data ({} bytes at offset 0x{:x}): {}",
chunk_index, chunk_size, offset, e
),
))
})?;
let mut crc_bytes = [0u8; 4];
self.reader.read_exact(&mut crc_bytes).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to read CRC32 for chunk {} at offset 0x{:x}: {}",
chunk_index,
offset + chunk_size as u64,
e
),
))
})?;
let expected_crc = u32::from_be_bytes(crc_bytes);
let computed_crc = crc32fast::hash(&chunk_data);
if computed_crc != expected_crc {
return Err(Error::InvalidFormat(format!(
"CRC32 mismatch for chunk {} at offset 0x{:x}: expected=0x{:08x}, computed=0x{:08x}, chunk_size={}",
chunk_index, offset, expected_crc, computed_crc, chunk_size
)));
}
Ok(chunk_data)
}
pub fn read_all_chunks(&mut self) -> Result<Vec<Vec<u8>>> {
let chunk_count = self.compression_info.chunk_offsets.len();
let mut chunks = Vec::with_capacity(chunk_count);
for i in 0..chunk_count {
let chunk = self.read_chunk(i)?;
chunks.push(chunk);
}
Ok(chunks)
}
pub fn chunk_count(&self) -> usize {
self.compression_info.chunk_offsets.len()
}
pub fn compression_algorithm(&self) -> &str {
&self.compression_info.algorithm
}
pub fn chunk_length(&self) -> u32 {
self.compression_info.chunk_length
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_read_chunk_with_valid_crc() {
let compressed_data = b"test compressed chunk data";
let crc = crc32fast::hash(compressed_data);
let crc_bytes = crc.to_be_bytes();
let mut data = Vec::new();
data.extend_from_slice(compressed_data);
data.extend_from_slice(&crc_bytes);
let total_size = data.len() as u64;
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: compressed_data.len() as u64,
chunk_offsets: vec![0],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data);
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result = reader.read_chunk(0);
assert!(result.is_ok());
assert_eq!(result.unwrap(), compressed_data);
}
#[test]
fn test_read_chunk_with_invalid_crc() {
let compressed_data = b"test compressed chunk data";
let wrong_crc = 0xDEADBEEFu32;
let crc_bytes = wrong_crc.to_be_bytes();
let mut data = Vec::new();
data.extend_from_slice(compressed_data);
data.extend_from_slice(&crc_bytes);
let total_size = data.len() as u64;
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: compressed_data.len() as u64,
chunk_offsets: vec![0],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data);
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result = reader.read_chunk(0);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("CRC32 mismatch"));
assert!(err_msg.contains("0xdeadbeef")); }
#[test]
fn test_read_multiple_chunks() {
let chunk1_data = b"first chunk data";
let chunk1_crc = crc32fast::hash(chunk1_data);
let chunk2_data = b"second chunk data with more content";
let chunk2_crc = crc32fast::hash(chunk2_data);
let mut data = Vec::new();
data.extend_from_slice(chunk1_data);
data.extend_from_slice(&chunk1_crc.to_be_bytes());
data.extend_from_slice(chunk2_data);
data.extend_from_slice(&chunk2_crc.to_be_bytes());
let chunk1_size = chunk1_data.len() + 4;
let total_size = data.len() as u64;
let compression_info = CompressionInfo {
algorithm: "SnappyCompressor".to_string(),
chunk_length: 16384,
data_length: (chunk1_data.len() + chunk2_data.len()) as u64,
chunk_offsets: vec![0, chunk1_size as u64],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data);
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result1 = reader.read_chunk(0);
assert!(result1.is_ok());
assert_eq!(result1.unwrap(), chunk1_data);
let result2 = reader.read_chunk(1);
assert!(result2.is_ok());
assert_eq!(result2.unwrap(), chunk2_data);
}
#[test]
fn test_read_all_chunks() {
let chunks_data = vec![b"chunk1".to_vec(), b"chunk2data".to_vec(), b"c3".to_vec()];
let mut data = Vec::new();
let mut offsets = vec![0u64];
for chunk in &chunks_data {
let crc = crc32fast::hash(chunk);
data.extend_from_slice(chunk);
data.extend_from_slice(&crc.to_be_bytes());
offsets.push(data.len() as u64);
}
offsets.pop();
let total_size = data.len() as u64;
let total_uncompressed = chunks_data.iter().map(|c| c.len()).sum::<usize>() as u64;
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: total_uncompressed,
chunk_offsets: offsets,
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data);
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result = reader.read_all_chunks();
assert!(result.is_ok());
let all_chunks = result.unwrap();
assert_eq!(all_chunks.len(), 3);
assert_eq!(all_chunks[0], chunks_data[0]);
assert_eq!(all_chunks[1], chunks_data[1]);
assert_eq!(all_chunks[2], chunks_data[2]);
}
#[test]
fn test_invalid_chunk_index() {
let compressed_data = b"test data";
let crc = crc32fast::hash(compressed_data);
let mut data = Vec::new();
data.extend_from_slice(compressed_data);
data.extend_from_slice(&crc.to_be_bytes());
let total_size = data.len() as u64;
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: compressed_data.len() as u64,
chunk_offsets: vec![0],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data);
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result = reader.read_chunk(1);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Chunk 1 not found"));
}
#[test]
fn test_chunk_size_too_small() {
let data = vec![0xAB, 0xCD];
let compression_info = CompressionInfo {
algorithm: "LZ4Compressor".to_string(),
chunk_length: 16384,
data_length: 0,
chunk_offsets: vec![0],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(data.clone());
let total_size = data.len() as u64;
let mut reader = ChunkReader::new(cursor, compression_info, total_size);
let result = reader.read_chunk(0);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("size too small"));
}
#[test]
fn test_accessor_methods() {
let compression_info = CompressionInfo {
algorithm: "SnappyCompressor".to_string(),
chunk_length: 32768,
data_length: 65536,
chunk_offsets: vec![0, 16384, 32768],
option_pairs: vec![],
max_compressed_length: i32::MAX as u32,
};
let cursor = Cursor::new(vec![]);
let reader = ChunkReader::new(cursor, compression_info, 0);
assert_eq!(reader.chunk_count(), 3);
assert_eq!(reader.compression_algorithm(), "SnappyCompressor");
assert_eq!(reader.chunk_length(), 32768);
}
}