use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::sync::Mutex;
use super::header::{detect_ascii_header_corruption, is_ascii_corruption_value};
use super::source::BlockSource;
use super::types::SSTableReaderConfig;
use crate::{Error, Result};
pub(crate) async fn read_next_block(
file: &Arc<Mutex<BlockSource>>,
cassandra_version: &crate::parser::header::CassandraVersion,
config: &SSTableReaderConfig,
compression_info: &Option<Arc<crate::storage::sstable::compression_info::CompressionInfo>>,
current_chunk_index: &std::sync::atomic::AtomicUsize,
header_offset: u64,
) -> Result<Option<Vec<u8>>> {
read_next_block_with_retry(
file,
cassandra_version,
config,
compression_info,
current_chunk_index,
header_offset,
3,
)
.await
}
async fn read_next_block_with_retry(
file: &Arc<Mutex<BlockSource>>,
cassandra_version: &crate::parser::header::CassandraVersion,
config: &SSTableReaderConfig,
compression_info: &Option<Arc<crate::storage::sstable::compression_info::CompressionInfo>>,
current_chunk_index: &std::sync::atomic::AtomicUsize,
header_offset: u64,
max_retries: usize,
) -> Result<Option<Vec<u8>>> {
let mut retry_count = 0;
loop {
match read_next_block_impl(
file,
cassandra_version,
config,
compression_info,
current_chunk_index,
header_offset,
)
.await
{
Ok(result) => return Ok(result),
Err(e) => {
retry_count += 1;
if retry_count >= max_retries {
log::error!("Failed to read block after {} retries: {}", max_retries, e);
return Err(e);
}
log::warn!(
"Block read failed (attempt {}/{}): {}, retrying...",
retry_count,
max_retries,
e
);
tokio::time::sleep(tokio::time::Duration::from_millis(10 * retry_count as u64))
.await;
}
}
}
}
async fn read_next_block_impl(
file: &Arc<Mutex<BlockSource>>,
cassandra_version: &crate::parser::header::CassandraVersion,
config: &SSTableReaderConfig,
compression_info: &Option<Arc<crate::storage::sstable::compression_info::CompressionInfo>>,
current_chunk_index: &std::sync::atomic::AtomicUsize,
_header_offset: u64, ) -> Result<Option<Vec<u8>>> {
log::debug!("block_io::read_next_block_impl: Starting block read");
log::debug!(
"block_io::read_next_block_impl: Cassandra version: {:?}",
cassandra_version
);
if matches!(
cassandra_version,
crate::parser::header::CassandraVersion::V5_0Uncompressed
) {
log::debug!("block_io::read_next_block_impl: Using uncompressed direct read");
return read_uncompressed_data_block(file, config).await;
}
if cassandra_version.is_nb_format() {
log::debug!("block_io::read_next_block_impl: Using NB format chunk reader");
let file_size = {
let mut file_guard = file.lock().await;
let current = file_guard.stream_position().await?;
file_guard.seek(std::io::SeekFrom::End(0)).await?;
let size = file_guard.stream_position().await?;
file_guard.seek(std::io::SeekFrom::Start(current)).await?;
size
};
return read_nb_format_chunk_data(
file,
config,
compression_info,
current_chunk_index,
file_size,
0, )
.await;
}
let block_header = match cassandra_version {
crate::parser::header::CassandraVersion::V5_0Bti => {
log::debug!("block_io::read_next_block_impl: Using BTI format block header reader");
read_bti_format_block_header(file).await?
}
_ => {
log::debug!("block_io::read_next_block_impl: Using legacy format block header reader");
read_legacy_format_block_header(file).await?
}
};
let Some((compressed_size, checksum, current_pos)) = block_header else {
log::debug!("block_io::read_next_block_impl: Block header returned None (EOF)");
return Ok(None); };
log::debug!(
"block_io::read_next_block_impl: Block header: compressed_size={}, checksum={}, pos={}",
compressed_size,
checksum,
current_pos
);
if compressed_size > 64 * 1024 * 1024 {
return Err(Error::corruption(format!(
"Block size too large: {} bytes (limit: 64MB)",
compressed_size
)));
}
if is_ascii_corruption_value(compressed_size) {
return Err(Error::corruption(format!(
"Block size appears to be ASCII corruption: {} (0x{:08x}) - likely misaligned file reading",
compressed_size, compressed_size
)));
}
if compressed_size == 0 {
log::info!("Encountered empty block at position {}", current_pos);
return Ok(Some(Vec::new()));
}
let block_data = if compressed_size > config.read_buffer_size as u32 {
read_large_block_streaming(file, compressed_size as usize, config).await?
} else {
read_block_direct(file, compressed_size as usize).await?
};
if config.validate_checksums && checksum != 0 {
let computed_checksum = crc32fast::hash(&block_data);
if computed_checksum != checksum {
return Err(Error::corruption(format!(
"Block checksum mismatch at position {}: expected 0x{:08x}, got 0x{:08x}",
current_pos, checksum, computed_checksum
)));
}
log::debug!("Block checksum validated: 0x{:08x}", checksum);
}
log::debug!(
"Successfully read block: {} bytes at position {}",
block_data.len(),
current_pos
);
Ok(Some(block_data))
}
async fn read_nb_format_chunk_data(
file: &Arc<Mutex<BlockSource>>,
config: &SSTableReaderConfig,
compression_info: &Option<Arc<crate::storage::sstable::compression_info::CompressionInfo>>,
current_chunk_index: &std::sync::atomic::AtomicUsize,
file_size: u64,
header_offset: u64,
) -> Result<Option<Vec<u8>>> {
log::debug!("read_nb_format_chunk_data: Starting chunk read");
let Some(comp_info) = compression_info else {
log::debug!(
"read_nb_format_chunk_data: No CompressionInfo.db, falling back to raw data read"
);
return read_uncompressed_data_block(file, config).await;
};
let chunk_idx = current_chunk_index.load(std::sync::atomic::Ordering::Relaxed);
if chunk_idx >= comp_info.chunk_offsets.len() {
log::debug!(
"read_nb_format_chunk_data: All chunks read ({}/{})",
chunk_idx,
comp_info.chunk_offsets.len()
);
return Ok(None); }
log::debug!(
"read_nb_format_chunk_data: Reading chunk {}/{}",
chunk_idx,
comp_info.chunk_offsets.len()
);
let chunk_offset = comp_info
.compressed_chunk_offset(chunk_idx)
.ok_or_else(|| Error::InvalidFormat(format!("No offset for chunk {}", chunk_idx)))?;
log::debug!(
"read_nb_format_chunk_data: Chunk {} offset: 0x{:x}",
chunk_idx,
chunk_offset
);
let total_chunk_size = comp_info
.compressed_chunk_size(chunk_idx, file_size)
.ok_or_else(|| {
Error::InvalidFormat(format!(
"Cannot determine size for chunk {} (file_size={})",
chunk_idx, file_size
))
})?;
if total_chunk_size < 4 {
return Err(Error::InvalidFormat(format!(
"Chunk {} size too small: {} bytes (minimum 4 for CRC)",
chunk_idx, total_chunk_size
)));
}
let chunk_data_size = (total_chunk_size - 4) as usize;
log::debug!(
"read_nb_format_chunk_data: Chunk {} total_size={}, data_size={}, offset=0x{:x}",
chunk_idx,
total_chunk_size,
chunk_data_size,
chunk_offset
);
let (chunk_data, expected_crc) = {
let mut file_guard = file.lock().await;
let absolute_offset = chunk_offset + header_offset;
file_guard
.seek(std::io::SeekFrom::Start(absolute_offset))
.await
.map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to seek to chunk {} at offset 0x{:x} (header_offset={}): {}",
chunk_idx, absolute_offset, header_offset, e
),
))
})?;
let mut chunk_data = vec![0u8; chunk_data_size];
file_guard.read_exact(&mut chunk_data).await.map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to read chunk {} data ({} bytes at offset 0x{:x}): {}",
chunk_idx, chunk_data_size, chunk_offset, e
),
))
})?;
let mut crc_bytes = [0u8; 4];
file_guard.read_exact(&mut crc_bytes).await.map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to read CRC32 for chunk {} at offset 0x{:x}: {}",
chunk_idx,
chunk_offset + chunk_data_size as u64,
e
),
))
})?;
let expected_crc = u32::from_be_bytes(crc_bytes);
(chunk_data, expected_crc)
};
let computed_crc = crc32fast::hash(&chunk_data);
if computed_crc != expected_crc {
return Err(Error::InvalidFormat(format!(
"CRC32 mismatch for chunk {} at offset 0x{:x}: expected=0x{:08x}, computed=0x{:08x}, chunk_size={}",
chunk_idx, chunk_offset, expected_crc, computed_crc, chunk_data_size
)));
}
log::debug!(
"read_nb_format_chunk_data: CRC32 validated for chunk {}: 0x{:08x}",
chunk_idx,
expected_crc
);
log::debug!(
"read_nb_format_chunk_data: Successfully read chunk {}: {} bytes (compressed)",
chunk_idx,
chunk_data.len()
);
current_chunk_index.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(Some(chunk_data))
}
async fn read_bti_format_block_header(
file: &Arc<Mutex<BlockSource>>,
) -> Result<Option<(u32, u32, u64)>> {
let mut header_buffer = [0u8; 12]; let current_pos = {
let mut file_guard = file.lock().await;
let pos = file_guard.stream_position().await.unwrap_or(0);
match file_guard.read_exact(&mut header_buffer).await {
Ok(_) => pos,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => {
return Err(Error::Io(std::io::Error::other(format!(
"Failed to read BTI block header: {}",
e
))));
}
}
};
if detect_ascii_header_corruption(&header_buffer) {
return Err(Error::corruption(format!(
"BTI block header appears to contain ASCII corruption at position {}: {:?}",
current_pos,
String::from_utf8_lossy(&header_buffer[0..4])
)));
}
let compressed_size = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let checksum = u32::from_be_bytes([
header_buffer[8],
header_buffer[9],
header_buffer[10],
header_buffer[11],
]);
Ok(Some((compressed_size, checksum, current_pos)))
}
async fn read_legacy_format_block_header(
file: &Arc<Mutex<BlockSource>>,
) -> Result<Option<(u32, u32, u64)>> {
let mut header_buffer = [0u8; 8]; let current_pos = {
let mut file_guard = file.lock().await;
let pos = file_guard.stream_position().await.unwrap_or(0);
match file_guard.read_exact(&mut header_buffer).await {
Ok(_) => pos,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => {
return Err(Error::Io(std::io::Error::other(format!(
"Failed to read legacy block header: {}",
e
))));
}
}
};
let compressed_size = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let checksum = u32::from_be_bytes([
header_buffer[4],
header_buffer[5],
header_buffer[6],
header_buffer[7],
]);
Ok(Some((compressed_size, checksum, current_pos)))
}
async fn read_block_direct(file: &Arc<Mutex<BlockSource>>, size: usize) -> Result<Vec<u8>> {
let mut block_data = vec![0u8; size];
{
let mut file_guard = file.lock().await;
file_guard.read_exact(&mut block_data).await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read block data ({}): {}",
size, e
)))
})?;
}
Ok(block_data)
}
async fn read_into_vec_capped<R>(
reader: &mut R,
size: usize,
buffer_size: usize,
) -> std::io::Result<Vec<u8>>
where
R: AsyncReadExt + Unpin,
{
let mut out = Vec::with_capacity(size);
if size == 0 {
return Ok(out);
}
let cap = buffer_size.clamp(1, size);
let mut scratch = vec![0u8; cap];
let mut remaining = size;
while remaining > 0 {
let to_read = remaining.min(cap);
reader.read_exact(&mut scratch[..to_read]).await?;
out.extend_from_slice(&scratch[..to_read]);
remaining -= to_read;
if remaining > 0 && out.len() % (1024 * 1024) == 0 {
tokio::task::yield_now().await;
}
}
Ok(out)
}
async fn read_large_block_streaming(
file: &Arc<Mutex<BlockSource>>,
size: usize,
config: &SSTableReaderConfig,
) -> Result<Vec<u8>> {
let buffer_size = config.read_buffer_size.min(size.max(1));
log::info!(
"Reading large block ({} bytes) using streaming with {} byte buffer",
size,
buffer_size
);
let mut file_guard = file.lock().await;
read_into_vec_capped(&mut *file_guard, size, config.read_buffer_size)
.await
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read block chunk: {}",
e
)))
})
}
async fn read_uncompressed_data_block(
file: &Arc<Mutex<BlockSource>>,
config: &SSTableReaderConfig,
) -> Result<Option<Vec<u8>>> {
let (current_pos, file_size) = {
let mut file_guard = file.lock().await;
let current = file_guard.stream_position().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to get stream position: {}",
e
)))
})?;
file_guard
.seek(std::io::SeekFrom::End(0))
.await
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to seek to end: {}",
e
)))
})?;
let size = file_guard.stream_position().await.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to get file size: {}",
e
)))
})?;
file_guard
.seek(std::io::SeekFrom::Start(current))
.await
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to seek back to position: {}",
e
)))
})?;
(current, size)
};
let remaining = file_size.saturating_sub(current_pos) as usize;
if remaining == 0 {
log::debug!(
"read_uncompressed_data_block: EOF reached at position {}",
current_pos
);
return Ok(None);
}
log::debug!(
"read_uncompressed_data_block: Reading {} bytes from position {}",
remaining,
current_pos
);
let data = {
let mut file_guard = file.lock().await;
read_into_vec_capped(&mut *file_guard, remaining, config.read_buffer_size)
.await
.map_err(|e| {
Error::Io(std::io::Error::other(format!(
"Failed to read uncompressed data block ({} bytes): {}",
remaining, e
)))
})?
};
log::debug!(
"read_uncompressed_data_block: Successfully read {} bytes",
data.len()
);
Ok(Some(data))
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
#[test]
fn test_is_ascii_corruption_value_known_patterns() {
assert!(is_ascii_corruption_value(2959239534)); assert!(is_ascii_corruption_value(1684108385)); }
#[test]
fn test_is_ascii_corruption_value_normal_values() {
assert!(!is_ascii_corruption_value(4096));
assert!(!is_ascii_corruption_value(65536));
assert!(!is_ascii_corruption_value(1048576));
}
#[test]
fn test_detect_ascii_header_corruption_ascii_text() {
let header = b"DATA1234";
assert!(detect_ascii_header_corruption(header));
let header2 = b"bindata!";
assert!(detect_ascii_header_corruption(header2));
}
#[test]
fn test_detect_ascii_header_corruption_binary() {
let header = [0x00, 0x00, 0x10, 0x00, 0x12, 0x34, 0x56, 0x78]; assert!(!detect_ascii_header_corruption(&header));
}
#[test]
fn test_block_size_limit() {
let limit = 64 * 1024 * 1024;
assert!(4096 <= limit);
assert!(64 * 1024 * 1024 <= limit);
assert!(65 * 1024 * 1024 > limit);
}
#[test]
fn test_empty_block_handling() {
let size = 0u32;
assert_eq!(size, 0);
}
#[test]
fn test_crc32_calculation() {
let data = b"test data for CRC";
let crc = crc32fast::hash(data);
assert_eq!(crc, crc32fast::hash(data));
let data2 = b"different test data";
assert_ne!(crc, crc32fast::hash(data2));
}
#[test]
fn test_crc32_empty_data() {
let data: &[u8] = b"";
let crc = crc32fast::hash(data);
assert_eq!(crc, 0); }
#[test]
fn test_block_header_parsing_big_endian() {
let header_buffer = [0x00, 0x00, 0x10, 0x00, 0x12, 0x34, 0x56, 0x78];
let compressed_size = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let checksum = u32::from_be_bytes([
header_buffer[4],
header_buffer[5],
header_buffer[6],
header_buffer[7],
]);
assert_eq!(compressed_size, 4096); assert_eq!(checksum, 0x12345678);
}
#[test]
fn test_bti_header_parsing() {
let header_buffer = [
0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0xAB, 0xCD, 0xEF, 0x12, ];
let compressed_size = u32::from_be_bytes([
header_buffer[0],
header_buffer[1],
header_buffer[2],
header_buffer[3],
]);
let checksum = u32::from_be_bytes([
header_buffer[8],
header_buffer[9],
header_buffer[10],
header_buffer[11],
]);
assert_eq!(compressed_size, 2048);
assert_eq!(checksum, 0xABCDEF12);
}
#[test]
fn test_atomic_chunk_index_increment() {
let index = AtomicUsize::new(0);
assert_eq!(index.load(std::sync::atomic::Ordering::Relaxed), 0);
index.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert_eq!(index.load(std::sync::atomic::Ordering::Relaxed), 1);
index.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert_eq!(index.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[tokio::test]
async fn test_read_block_direct_empty() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_empty_block.bin");
tokio::fs::write(&temp_file, b"").await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let result = read_block_direct(&file, 0).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 0);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_block_direct_small() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_small_block.bin");
let test_data = b"Hello, World! This is test data.";
tokio::fs::write(&temp_file, test_data).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let result = read_block_direct(&file, test_data.len()).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), test_data);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_uncompressed_data_block() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_uncompressed_block.bin");
let test_data = b"Uncompressed test data block content";
tokio::fs::write(&temp_file, test_data).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let config = SSTableReaderConfig::default();
let result = read_uncompressed_data_block(&file, &config).await;
assert!(result.is_ok());
let data = result.unwrap();
assert!(data.is_some());
assert_eq!(data.unwrap(), test_data);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_uncompressed_data_block_eof() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_uncompressed_eof.bin");
tokio::fs::write(&temp_file, b"").await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let config = SSTableReaderConfig::default();
let result = read_uncompressed_data_block(&file, &config).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_legacy_format_block_header_eof() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_legacy_header_eof.bin");
tokio::fs::write(&temp_file, &[0x00, 0x00, 0x10, 0x00])
.await
.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let result = read_legacy_format_block_header(&file).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_legacy_format_block_header_valid() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_legacy_header_valid.bin");
let header = [0x00, 0x00, 0x10, 0x00, 0x12, 0x34, 0x56, 0x78];
tokio::fs::write(&temp_file, &header).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let result = read_legacy_format_block_header(&file).await;
assert!(result.is_ok());
let (size, checksum, pos) = result.unwrap().unwrap();
assert_eq!(size, 4096);
assert_eq!(checksum, 0x12345678);
assert_eq!(pos, 0);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_bti_format_block_header_valid() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_bti_header_valid.bin");
let header = [
0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0xAB, 0xCD, 0xEF, 0x12, ];
tokio::fs::write(&temp_file, &header).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let result = read_bti_format_block_header(&file).await;
assert!(result.is_ok());
let (size, checksum, pos) = result.unwrap().unwrap();
assert_eq!(size, 2048);
assert_eq!(checksum, 0xABCDEF12);
assert_eq!(pos, 0);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_large_block_streaming() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("test_large_block.bin");
let size = 128 * 1024;
let test_data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
tokio::fs::write(&temp_file, &test_data).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let config = SSTableReaderConfig {
read_buffer_size: 4096, validate_checksums: true,
..Default::default()
};
let result = read_large_block_streaming(&file, size, &config).await;
assert!(result.is_ok());
let data = result.unwrap();
assert_eq!(data.len(), size);
assert_eq!(data, test_data);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn read_into_vec_capped_bounds_scratch_buffer() {
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use tokio::io::ReadBuf;
struct MaxReadRecorder {
data: std::io::Cursor<Vec<u8>>,
max_request: Arc<AtomicUsize>,
}
impl tokio::io::AsyncRead for MaxReadRecorder {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.max_request
.fetch_max(buf.remaining(), Ordering::Relaxed);
let pos = self.data.position() as usize;
let inner = self.data.get_ref();
let avail = &inner[pos.min(inner.len())..];
let n = avail.len().min(buf.remaining());
buf.put_slice(&avail[..n]);
self.data.set_position((pos + n) as u64);
Poll::Ready(Ok(()))
}
}
let size = 4 * 1024 * 1024; let buffer_size = 64 * 1024; let data: Vec<u8> = (0..size).map(|i| (i % 251) as u8).collect();
let max_request = Arc::new(AtomicUsize::new(0));
let mut reader = MaxReadRecorder {
data: std::io::Cursor::new(data.clone()),
max_request: Arc::clone(&max_request),
};
let out = read_into_vec_capped(&mut reader, size, buffer_size)
.await
.expect("capped read should succeed");
assert_eq!(out.len(), size);
assert_eq!(out, data);
let observed = max_request.load(Ordering::Relaxed);
assert!(
observed <= buffer_size,
"scratch read request {} exceeded cap {} — allocation is scaling with block size",
observed,
buffer_size
);
}
#[tokio::test]
async fn uncompressed_data_block_streams_large_block_byte_identical() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("issue_592_uncompressed_large.bin");
let size = 256 * 1024; let test_data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
tokio::fs::write(&temp_file, &test_data).await.unwrap();
let file = tokio::fs::File::open(&temp_file).await.unwrap();
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
let config = SSTableReaderConfig {
read_buffer_size: 8 * 1024, ..Default::default()
};
let data = read_uncompressed_data_block(&file, &config)
.await
.expect("read should succeed")
.expect("non-empty block");
assert_eq!(data.len(), size);
assert_eq!(data, test_data);
tokio::fs::remove_file(&temp_file).await.ok();
}
#[tokio::test]
async fn test_read_with_real_sstable_data() {
let datasets_root = match std::env::var("CQLITE_DATASETS_ROOT") {
Ok(root) => PathBuf::from(root),
Err(_) => {
eprintln!("CQLITE_DATASETS_ROOT not set, skipping real data test");
return;
}
};
let simple_table_dir = datasets_root.join("sstables/test_basic");
if !simple_table_dir.exists() {
eprintln!("test_basic not found, skipping real data test");
return;
}
let table_dir = std::fs::read_dir(&simple_table_dir)
.ok()
.and_then(|entries| {
entries
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.map(|n| n.starts_with("simple_table"))
.unwrap_or(false)
})
.map(|e| e.path())
});
let Some(table_path) = table_dir else {
eprintln!("simple_table not found, skipping");
return;
};
let data_file = std::fs::read_dir(&table_path).ok().and_then(|entries| {
entries
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.map(|n| n.ends_with("-Data.db"))
.unwrap_or(false)
})
.map(|e| e.path())
});
let Some(data_path) = data_file else {
eprintln!("Data.db not found, skipping");
return;
};
let file = tokio::fs::File::open(&data_path).await.unwrap();
let metadata = file.metadata().await.unwrap();
eprintln!(
"Opened real SSTable Data.db: {} ({} bytes)",
data_path.display(),
metadata.len()
);
let file = Arc::new(Mutex::new(BlockSource::buffered(file)));
if metadata.len() > 100 {
let result = read_block_direct(&file, 100).await;
assert!(result.is_ok(), "Should read first 100 bytes of real file");
let data = result.unwrap();
assert_eq!(data.len(), 100);
eprintln!("Successfully read first 100 bytes from real SSTable");
}
}
}