#[cfg(zstd_any)]
use crate::compression::CompressionProvider as _;
use crate::{
BlobFile, Checksum, CompressionType, UserValue,
fs::FsFile,
vlog::{
ValueHandle,
blob_file::writer::{
BLOB_HEADER_LEN_V4, BLOB_HEADER_MAGIC_V3, BLOB_HEADER_MAGIC_V4, validate_header_crc,
},
},
};
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::{Cursor, Read};
const MAX_DECOMPRESSION_SIZE: usize = 256 * 1024 * 1024;
pub struct Reader<'a> {
blob_file: &'a BlobFile,
file: &'a dyn FsFile,
}
impl<'a> Reader<'a> {
pub fn new(blob_file: &'a BlobFile, file: &'a dyn FsFile) -> Self {
Self { blob_file, file }
}
#[expect(
clippy::too_many_lines,
reason = "blob read/validation path is kept in one function so error handling and size checks stay co-located"
)]
pub fn get(&self, key: &'a [u8], vhandle: &'a ValueHandle) -> crate::Result<UserValue> {
debug_assert_eq!(vhandle.blob_file_id, self.blob_file.id());
if key.len() > u16::MAX as usize {
return Err(crate::Error::InvalidHeader("Blob"));
}
let add_size = (BLOB_HEADER_LEN_V4 as u64) + (key.len() as u64);
let max_total_read_size = (MAX_DECOMPRESSION_SIZE as u64).saturating_add(add_size);
let total_read_size = u64::from(vhandle.on_disk_size) + add_size;
if total_read_size > max_total_read_size {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: total_read_size,
limit: max_total_read_size,
});
}
#[expect(
clippy::cast_possible_truncation,
reason = "bounded to MAX_DECOMPRESSION_SIZE + overhead by the check above"
)]
let read_len = total_read_size as usize;
let value = crate::file::read_exact(self.file, vhandle.offset, read_len)?;
let mut reader = Cursor::new(&value[..]);
let mut magic = [0u8; 4];
reader.read_exact(&mut magic)?;
let frame_is_v4 = magic == BLOB_HEADER_MAGIC_V4;
if !frame_is_v4 && magic != BLOB_HEADER_MAGIC_V3 {
return Err(crate::Error::InvalidHeader("Blob"));
}
let expected_checksum = reader.read_u128::<LittleEndian>()?;
let seqno = reader.read_u64::<LittleEndian>()?;
let key_len = reader.read_u16::<LittleEndian>()?;
let real_val_len = reader.read_u32::<LittleEndian>()? as usize;
let on_disk_val_len = reader.read_u32::<LittleEndian>()?;
let stored_header_crc = if frame_is_v4 {
let crc = reader.read_u32::<LittleEndian>()?;
#[expect(
clippy::cast_possible_truncation,
reason = "real_val_len originates as u32, round-tripped through usize; lossless on supported targets"
)]
validate_header_crc(seqno, key_len, real_val_len as u32, on_disk_val_len, crc)?;
Some(crc)
} else {
let _ = seqno;
None
};
if key_len as usize != key.len() || on_disk_val_len != vhandle.on_disk_size {
return Err(crate::Error::InvalidHeader("Blob"));
}
if real_val_len > MAX_DECOMPRESSION_SIZE {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: real_val_len as u64,
limit: MAX_DECOMPRESSION_SIZE as u64,
});
}
let header_len = if frame_is_v4 {
BLOB_HEADER_LEN_V4
} else {
crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V3
};
let on_disk_key = value.slice(header_len..header_len + key_len as usize);
if on_disk_key != key {
return Err(crate::Error::InvalidHeader("Blob"));
}
let data_offset = header_len + key.len();
let raw_data = value.slice(data_offset..data_offset + on_disk_val_len as usize);
{
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(&on_disk_key);
hasher.update(&raw_data);
if let Some(hcrc) = stored_header_crc {
hasher.update(&hcrc.to_le_bytes());
}
hasher.digest128()
};
if expected_checksum != checksum {
log::error!(
"Checksum mismatch for blob {vhandle:?}, got={checksum}, expected={expected_checksum}",
);
return Err(crate::Error::ChecksumMismatch {
got: Checksum::from_raw(checksum),
expected: Checksum::from_raw(expected_checksum),
});
}
}
#[warn(clippy::match_single_binding)]
let value = match &self.blob_file.0.meta.compression {
CompressionType::None => {
if real_val_len != raw_data.len() {
return Err(crate::Error::InvalidHeader("Blob"));
}
raw_data
}
#[cfg(feature = "lz4")]
CompressionType::Lz4 => {
let mut buf = vec![0u8; real_val_len];
let bytes_written = lz4_flex::decompress_into(&raw_data, &mut buf)
.map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?;
if bytes_written != real_val_len {
return Err(crate::Error::Decompress(self.blob_file.0.meta.compression));
}
UserValue::from(buf)
}
#[cfg(zstd_any)]
CompressionType::Zstd(_) => {
let decompressed =
crate::compression::ZstdBackend::decompress(&raw_data, real_val_len)
.map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?;
if decompressed.len() != real_val_len {
return Err(crate::Error::Decompress(self.blob_file.0.meta.compression));
}
UserValue::from(decompressed)
}
#[cfg(zstd_any)]
CompressionType::ZstdDict { .. } => {
return Err(crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"zstd dictionary compression is not supported for blob files",
)));
}
};
debug_assert_eq!(real_val_len, value.len());
Ok(value)
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, clippy::indexing_slicing, reason = "test code")]
mod tests {
use super::*;
use crate::SequenceNumberCounter;
use crate::fs::StdFs;
use crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V3;
use std::fs::File;
use std::sync::Arc;
use test_log::test;
#[test]
fn blob_reader_roundtrip() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"a", 0, b"abcdef")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
assert_eq!(reader.get(b"a", &handle)?, b"abcdef");
Ok(())
}
#[test]
#[cfg(feature = "lz4")]
fn blob_reader_roundtrip_lz4() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Lz4);
let handle0 = writer.write(b"a", 0, b"abcdef")?;
let handle1 = writer.write(b"b", 0, b"ghi")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
assert_eq!(reader.get(b"a", &handle0)?, b"abcdef");
assert_eq!(reader.get(b"b", &handle1)?, b"ghi");
Ok(())
}
#[test]
#[cfg(feature = "lz4")]
fn blob_reader_reject_absurd_real_val_len() {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir().unwrap();
let mut writer =
crate::vlog::BlobFileWriter::new(id_generator, folder.path(), 0, None, Arc::new(StdFs))
.unwrap()
.use_target_size(u64::MAX)
.use_compression(CompressionType::Lz4);
let handle = writer.write_raw(b"k", 0, b"value", 5).unwrap();
let blob_file = writer.finish().unwrap();
let blob_file = blob_file.first().unwrap();
let mut raw = std::fs::read(&blob_file.0.path).unwrap();
let real_val_len_offset = usize::try_from(handle.offset).unwrap() + 30;
raw[real_val_len_offset..real_val_len_offset + 4].copy_from_slice(&u32::MAX.to_le_bytes());
std::fs::write(&blob_file.0.path, &raw).unwrap();
let file = File::open(&blob_file.0.path).unwrap();
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"k", &handle);
assert!(
matches!(result, Err(crate::Error::HeaderCrcMismatch { .. })),
"expected HeaderCrcMismatch, got: {result:?}",
);
}
#[test]
#[cfg(feature = "lz4")]
fn blob_reader_zero_real_val_len_with_data_fails_decompress() {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir().unwrap();
let mut writer =
crate::vlog::BlobFileWriter::new(id_generator, folder.path(), 0, None, Arc::new(StdFs))
.unwrap()
.use_target_size(u64::MAX)
.use_compression(CompressionType::Lz4);
let handle = writer.write_raw(b"k", 0, b"value", 0).unwrap();
let blob_file = writer.finish().unwrap();
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path).unwrap();
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"k", &handle);
assert!(
matches!(result, Err(crate::Error::Decompress(_))),
"expected Decompress error, got: {result:?}",
);
}
#[test]
#[cfg(feature = "lz4")]
fn blob_reader_lz4_corrupted_real_val_len_triggers_header_crc_mismatch() -> crate::Result<()> {
use byteorder::WriteBytesExt;
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Lz4);
let handle = writer.write(b"a", 0, b"abcdef")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let real_val_len_offset = handle.offset + 4 + 16 + 8 + 2;
{
use std::io::{Seek, Write};
let mut file = std::fs::OpenOptions::new()
.write(true)
.open(&blob_file.0.path)?;
file.seek(std::io::SeekFrom::Start(real_val_len_offset))?;
file.write_u32::<LittleEndian>(u32::try_from(b"abcdef".len()).unwrap() + 1)?;
file.flush()?;
}
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
match reader.get(b"a", &handle) {
Err(crate::Error::HeaderCrcMismatch { .. }) => { }
Ok(_) => panic!("expected HeaderCrcMismatch, but got Ok"),
Err(other) => panic!("expected HeaderCrcMismatch, got: {other:?}"),
}
Ok(())
}
#[test]
fn blob_reader_reject_oversized_on_disk_size() {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir().unwrap();
let mut writer =
crate::vlog::BlobFileWriter::new(id_generator, folder.path(), 0, None, Arc::new(StdFs))
.unwrap()
.use_target_size(u64::MAX);
let mut handle = writer.write(b"a", 0, b"hello").unwrap();
let blob_file = writer.finish().unwrap();
let blob_file = blob_file.first().unwrap();
handle.on_disk_size = u32::MAX;
let file = File::open(&blob_file.0.path).unwrap();
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"a", &handle);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge, got: {result:?}",
);
}
#[test]
#[cfg(zstd_any)]
fn blob_reader_zstd_corrupted_real_val_len_triggers_header_crc_mismatch() -> crate::Result<()> {
use byteorder::WriteBytesExt;
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Zstd(3));
let handle = writer.write(b"a", 0, b"abcdef")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let real_val_len_offset = handle.offset + 4 + 16 + 8 + 2;
{
use std::io::{Seek, Write};
let mut file = std::fs::OpenOptions::new()
.write(true)
.open(&blob_file.0.path)?;
file.seek(std::io::SeekFrom::Start(real_val_len_offset))?;
file.write_u32::<LittleEndian>(u32::try_from(b"abcdef".len()).unwrap() + 1)?;
file.flush()?;
}
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
match reader.get(b"a", &handle) {
Err(crate::Error::HeaderCrcMismatch { .. }) => { }
Ok(_) => panic!("expected HeaderCrcMismatch, but got Ok"),
Err(other) => panic!("expected HeaderCrcMismatch, got: {other:?}"),
}
Ok(())
}
#[test]
fn blob_reader_rejects_oversized_real_val_len() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"a", 0, b"abcdef")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let mut raw = std::fs::read(&blob_file.0.path)?;
let real_val_len_offset = usize::try_from(handle.offset).unwrap() + 4 + 16 + 8 + 2;
let oversize = u32::try_from(MAX_DECOMPRESSION_SIZE).unwrap() + 1;
raw[real_val_len_offset..real_val_len_offset + 4].copy_from_slice(&oversize.to_le_bytes());
std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"a", &handle);
assert!(
matches!(result, Err(crate::Error::HeaderCrcMismatch { .. })),
"expected HeaderCrcMismatch, got: {result:?}",
);
Ok(())
}
#[test]
#[cfg(zstd_any)]
fn blob_reader_roundtrip_zstd() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Zstd(3));
let handle0 = writer.write(b"a", 0, b"abcdef")?;
let handle1 = writer.write(b"b", 0, b"ghi")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
assert_eq!(reader.get(b"a", &handle0)?, b"abcdef");
assert_eq!(reader.get(b"b", &handle1)?, b"ghi");
Ok(())
}
#[test]
fn blob_reader_corrupted_on_disk_key_detected_by_cross_check_and_checksum() -> crate::Result<()>
{
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"abc", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let key_offset = usize::try_from(handle.offset).unwrap() + BLOB_HEADER_LEN_V4;
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[key_offset] ^= 0xFF; let corrupted_key = raw[key_offset..key_offset + 3].to_vec();
std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"abc", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader(Blob) from key cross-check, got: {result:?}",
);
let result = reader.get(&corrupted_key, &handle);
assert!(
matches!(result, Err(crate::Error::ChecksumMismatch { .. })),
"expected ChecksumMismatch for tampered on-disk key, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_reader_wrong_caller_key_same_length_returns_invalid_header() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"aaa", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
assert_eq!(reader.get(b"aaa", &handle)?, b"value");
let result = reader.get(b"bbb", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader(Blob) for wrong caller key, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_reader_wrong_caller_key_different_length_returns_invalid_header() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"abc", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"ab", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for shorter key, got: {result:?}",
);
let result = reader.get(b"abcd", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for longer key, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_reader_corrupted_value_payload_triggers_checksum_mismatch() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"key", 0, b"payload_data")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let payload_offset =
usize::try_from(handle.offset).unwrap() + BLOB_HEADER_LEN_V4 + b"key".len();
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[payload_offset] ^= 0xFF; std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"key", &handle);
assert!(
matches!(result, Err(crate::Error::ChecksumMismatch { .. })),
"expected ChecksumMismatch for corrupted value, got: {result:?}",
);
Ok(())
}
#[test]
#[cfg(feature = "lz4")]
fn blob_reader_corrupted_on_disk_key_lz4_returns_invalid_header() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Lz4);
let handle = writer.write(b"abc", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let key_offset = usize::try_from(handle.offset).unwrap() + BLOB_HEADER_LEN_V4;
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[key_offset] ^= 0xFF;
std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"abc", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for corrupted lz4 key, got: {result:?}",
);
Ok(())
}
#[test]
#[cfg(zstd_any)]
fn blob_reader_corrupted_on_disk_key_zstd_returns_invalid_header() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX)
.use_compression(CompressionType::Zstd(3));
let handle = writer.write(b"abc", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let key_offset = usize::try_from(handle.offset).unwrap() + BLOB_HEADER_LEN_V4;
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[key_offset] ^= 0xFF;
std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"abc", &handle);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for corrupted zstd key, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_reader_v4_corrupted_seqno_detected_by_header_crc() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"key", 42, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let seqno_offset = usize::try_from(handle.offset).unwrap() + 20;
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[seqno_offset..seqno_offset + 8].copy_from_slice(&99u64.to_le_bytes());
std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"key", &handle);
assert!(
matches!(result, Err(crate::Error::HeaderCrcMismatch { .. })),
"expected HeaderCrcMismatch for corrupted seqno, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_reader_v4_corrupted_header_crc_field_detected() -> crate::Result<()> {
let id_generator = SequenceNumberCounter::default();
let folder = tempfile::tempdir()?;
let mut writer = crate::vlog::BlobFileWriter::new(
id_generator,
folder.path(),
0,
None,
Arc::new(StdFs),
)?
.use_target_size(u64::MAX);
let handle = writer.write(b"key", 0, b"value")?;
let blob_file = writer.finish()?;
let blob_file = blob_file.first().unwrap();
let header_crc_offset = usize::try_from(handle.offset).unwrap() + 4 + 16 + 8 + 2 + 4 + 4;
let mut raw = std::fs::read(&blob_file.0.path)?;
raw[header_crc_offset] ^= 0xFF; std::fs::write(&blob_file.0.path, &raw)?;
let file = File::open(&blob_file.0.path)?;
let reader = Reader::new(blob_file, &file);
let result = reader.get(b"key", &handle);
assert!(
matches!(result, Err(crate::Error::HeaderCrcMismatch { .. })),
"expected HeaderCrcMismatch for corrupted header_crc field, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_header_len_v4_is_42() {
assert_eq!(BLOB_HEADER_LEN_V4, 42);
assert_eq!(BLOB_HEADER_LEN_V3, 38);
}
#[test]
fn blob_reader_v3_backward_compat_roundtrip() -> crate::Result<()> {
use crate::file_accessor::FileAccessor;
use crate::vlog::{ValueHandle, blob_file::Inner as BlobFileInner};
use byteorder::WriteBytesExt;
use std::io::Write;
use std::sync::{Arc, atomic::AtomicBool};
let folder = tempfile::tempdir()?;
let blob_file_path = folder.path().join("0");
let key = b"abc";
let value = b"hello_v3";
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(key);
hasher.update(value);
hasher.digest128()
};
{
let file = std::fs::File::create(&blob_file_path)?;
let mut sfa_writer = sfa::Writer::from_writer(file);
sfa_writer.start("data")?;
sfa_writer.write_all(b"BLOB")?;
sfa_writer.write_u128::<byteorder::LittleEndian>(checksum)?;
sfa_writer.write_u64::<byteorder::LittleEndian>(42)?; #[expect(
clippy::cast_possible_truncation,
reason = "test key length fits in u16"
)]
sfa_writer.write_u16::<byteorder::LittleEndian>(key.len() as u16)?;
#[expect(
clippy::cast_possible_truncation,
reason = "test value length fits in u32"
)]
sfa_writer.write_u32::<byteorder::LittleEndian>(value.len() as u32)?;
#[expect(
clippy::cast_possible_truncation,
reason = "test value length fits in u32"
)]
sfa_writer.write_u32::<byteorder::LittleEndian>(value.len() as u32)?;
sfa_writer.write_all(key)?;
sfa_writer.write_all(value)?;
sfa_writer.start("meta")?;
let metadata = crate::vlog::blob_file::meta::Metadata {
id: 0,
version: 3,
created_at: 0,
item_count: 1,
total_compressed_bytes: value.len() as u64,
total_uncompressed_bytes: value.len() as u64,
key_range: crate::KeyRange::new((key[..].into(), key[..].into())),
compression: CompressionType::None,
};
metadata.encode_into(&mut sfa_writer)?;
let inner = sfa_writer.into_inner()?;
inner.sync_all()?;
}
let file = File::open(&blob_file_path)?;
let file2 = File::open(&blob_file_path)?;
let blob_file = crate::BlobFile(Arc::new(BlobFileInner {
id: 0,
tree_id: 0,
path: blob_file_path,
meta: crate::vlog::blob_file::meta::Metadata {
id: 0,
version: 3,
created_at: 0,
item_count: 1,
total_compressed_bytes: value.len() as u64,
total_uncompressed_bytes: value.len() as u64,
key_range: crate::KeyRange::new((key[..].into(), key[..].into())),
compression: CompressionType::None,
},
is_deleted: AtomicBool::new(false),
checksum: crate::Checksum::from_raw(0),
file_accessor: FileAccessor::File(Arc::new(file2)),
}));
let reader = Reader::new(&blob_file, &file);
let sfa_reader = sfa::Reader::new(&blob_file.0.path)?;
let data_section = sfa_reader.toc().section(b"data").unwrap();
let data_start = data_section.pos();
let handle = ValueHandle {
blob_file_id: 0,
offset: data_start,
#[expect(
clippy::cast_possible_truncation,
reason = "test value length fits in u32"
)]
on_disk_size: value.len() as u32,
};
let result = reader.get(key, &handle)?;
assert_eq!(result, value);
Ok(())
}
}