use super::*;
use crate::{Slice, fs::StdFs, vlog::blob_file::writer::Writer as BlobFileWriter};
use tempfile::tempdir;
use test_log::test;
#[test]
fn blob_scanner() -> crate::Result<()> {
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
let keys = [b"a", b"b", b"c", b"d", b"e"];
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
for key in keys {
writer.write(key, 0, &key.repeat(100))?;
}
writer.finish()?;
}
{
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
for key in keys {
assert_eq!(
(Slice::from(key), Slice::from(key.repeat(100))),
scanner
.next()
.map(|result| result.map(|entry| { (entry.key, entry.value) }))
.unwrap()?,
);
}
assert!(scanner.next().is_none());
}
Ok(())
}
#[test]
fn blob_scanner_resume_reads_suffix_and_rejects_bad_offset() -> crate::Result<()> {
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
let keys = [b"a", b"b", b"c", b"d", b"e"];
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
for key in keys {
writer.write(key, 0, &key.repeat(100))?;
}
writer.finish()?;
}
let resume_at = {
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let _a = scanner.next().unwrap()?;
let b = scanner.next().unwrap()?;
b.frame_end
};
{
let mut scanner = Scanner::resume(&blob_file_path, &StdFs, 0, resume_at)?;
for key in [b"c", b"d", b"e"] {
assert_eq!(
Slice::from(&key[..]),
scanner.next().map(|r| r.map(|e| e.key)).unwrap()?,
);
}
assert!(scanner.next().is_none());
}
assert!(
matches!(
Scanner::resume(&blob_file_path, &StdFs, 0, u64::MAX),
Err(crate::Error::InvalidHeader("BlobFile")),
),
"resume offset past the data section must error",
);
Ok(())
}
#[test]
fn blob_scanner_v4_corrupted_seqno_detected_by_header_crc() -> crate::Result<()> {
use crate::vlog::blob_file::writer::BLOB_HEADER_MAGIC_V4;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
writer.write(b"key", 42, &b"v".repeat(100))?;
writer.finish()?;
}
let mut raw = std::fs::read(&blob_file_path)?;
let frame_start = 0usize;
let seqno_offset = frame_start + BLOB_HEADER_MAGIC_V4.len() + std::mem::size_of::<u128>();
let seqno_len = std::mem::size_of::<u64>();
raw[seqno_offset..seqno_offset + seqno_len].copy_from_slice(&99u64.to_le_bytes()[..seqno_len]);
std::fs::write(&blob_file_path, &raw)?;
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let result = scanner.next().unwrap();
assert!(
matches!(result, Err(crate::Error::HeaderCrcMismatch { .. })),
"expected HeaderCrcMismatch for corrupted seqno, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_scanner_corrupted_value_detected_by_data_checksum() -> crate::Result<()> {
use crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V4;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
writer.write(b"key", 0, &b"v".repeat(100))?;
writer.finish()?;
}
let mut raw = std::fs::read(&blob_file_path)?;
let frame_start = 0usize;
let key = b"key";
let value_offset = frame_start + BLOB_HEADER_LEN_V4 + key.len();
raw[value_offset] ^= 0xFF;
std::fs::write(&blob_file_path, &raw)?;
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let result = scanner.next().unwrap();
assert!(
matches!(result, Err(crate::Error::ChecksumMismatch { .. })),
"expected ChecksumMismatch for corrupted value, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_scanner_reads_v3_format() -> crate::Result<()> {
use crate::io::{LittleEndian, WriteBytesExt};
use std::io::Write;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
let key = b"abc";
let value = b"hello_v3";
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(key);
hasher.update(value);
hasher.digest128()
};
{
let file = std::fs::File::create(&blob_file_path)?;
let mut sfa_writer = crate::sfa::Writer::from_writer(file);
sfa_writer.start("data")?;
sfa_writer.write_all(b"BLOB")?;
sfa_writer.write_u128::<LittleEndian>(checksum)?;
sfa_writer.write_u64::<LittleEndian>(42)?; #[expect(
clippy::cast_possible_truncation,
reason = "test key length fits in u16"
)]
sfa_writer.write_u16::<LittleEndian>(key.len() as u16)?;
#[expect(
clippy::cast_possible_truncation,
reason = "test value length fits in u32"
)]
sfa_writer.write_u32::<LittleEndian>(value.len() as u32)?; #[expect(
clippy::cast_possible_truncation,
reason = "test value length fits in u32"
)]
sfa_writer.write_u32::<LittleEndian>(value.len() as u32)?; sfa_writer.write_all(key)?;
sfa_writer.write_all(value)?;
sfa_writer.start("meta")?;
let metadata = crate::vlog::blob_file::meta::Metadata {
id: 0,
version: 3,
created_at: 0,
item_count: 1,
total_compressed_bytes: value.len() as u64,
total_uncompressed_bytes: value.len() as u64,
key_range: crate::KeyRange::new((key[..].into(), key[..].into())),
compression: crate::CompressionType::None,
};
metadata.encode_into(&mut sfa_writer)?;
let inner = sfa_writer.into_inner()?;
inner.sync_all()?;
}
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let entry = scanner.next().unwrap()?;
assert_eq!(entry.key, Slice::from(&key[..]));
assert_eq!(entry.value, Slice::from(&value[..]));
assert_eq!(entry.seqno, 42);
assert!(scanner.next().is_none());
Ok(())
}
#[test]
fn blob_scanner_rejects_oversized_on_disk_len() -> crate::Result<()> {
use crate::io::{LittleEndian, WriteBytesExt};
use std::io::Write;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
let key = b"abc";
let value = b"hi";
{
let file = std::fs::File::create(&blob_file_path)?;
let mut sfa_writer = crate::sfa::Writer::from_writer(file);
sfa_writer.start("data")?;
sfa_writer.write_all(b"BLOB")?;
sfa_writer.write_u128::<LittleEndian>(0)?; sfa_writer.write_u64::<LittleEndian>(1)?; #[expect(clippy::cast_possible_truncation, reason = "test key fits u16")]
sfa_writer.write_u16::<LittleEndian>(key.len() as u16)?;
sfa_writer.write_u32::<LittleEndian>(2)?; sfa_writer.write_u32::<LittleEndian>(u32::MAX)?;
sfa_writer.write_all(key)?;
sfa_writer.write_all(value)?;
sfa_writer.start("meta")?;
let metadata = crate::vlog::blob_file::meta::Metadata {
id: 0,
version: 3,
created_at: 0,
item_count: 1,
total_compressed_bytes: 2,
total_uncompressed_bytes: 2,
key_range: crate::KeyRange::new((key[..].into(), key[..].into())),
compression: crate::CompressionType::None,
};
metadata.encode_into(&mut sfa_writer)?;
sfa_writer.into_inner()?.sync_all()?;
}
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let result = scanner.next().unwrap();
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"an oversized on_disk_val_len must be rejected, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_scanner_rejects_invalid_magic() -> crate::Result<()> {
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
writer.write(b"key", 0, b"value")?;
writer.finish()?;
}
let mut raw = std::fs::read(&blob_file_path)?;
raw[0..4].copy_from_slice(b"XXXX");
std::fs::write(&blob_file_path, &raw)?;
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let result = scanner.next().unwrap();
assert!(
matches!(result, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for bad magic, got: {result:?}",
);
assert!(scanner.next().is_none());
Ok(())
}
#[test]
fn blob_scanner_meta_corruption_is_not_silent_eof() -> crate::Result<()> {
use crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V4;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let mut writer = BlobFileWriter::new(&blob_file_path, 0, 0, &StdFs)?;
writer.write(b"a", 0, &b"v".repeat(50))?;
writer.write(b"b", 1, &b"w".repeat(50))?;
writer.finish()?;
}
let data_start = {
let sfa_reader = crate::sfa::Reader::new(&blob_file_path)?;
let section = sfa_reader.toc().section(b"data").unwrap();
#[expect(
clippy::cast_possible_truncation,
reason = "test blob file is tiny, pos fits in usize"
)]
{
section.pos() as usize
}
};
let mut raw = std::fs::read(&blob_file_path)?;
let second_frame_offset = data_start + BLOB_HEADER_LEN_V4 + 1 + 50;
raw.get_mut(second_frame_offset..second_frame_offset + 4)
.unwrap()
.copy_from_slice(b"META");
std::fs::write(&blob_file_path, &raw)?;
let mut scanner = Scanner::new(&blob_file_path, &StdFs, 0)?;
let first = scanner.next().unwrap();
assert!(first.is_ok(), "first frame should be OK: {first:?}");
let second = scanner.next().unwrap();
assert!(
matches!(second, Err(crate::Error::InvalidHeader("Blob"))),
"expected InvalidHeader for META-corrupted magic, got: {second:?}",
);
Ok(())
}
#[test]
fn blob_scanner_rejects_missing_data_section() -> crate::Result<()> {
use std::io::Write;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let file = std::fs::File::create(&blob_file_path)?;
let mut sfa_writer = crate::sfa::Writer::from_writer(file);
sfa_writer.start("meta")?;
sfa_writer.write_all(b"dummy")?;
sfa_writer.finish()?;
}
let result = Scanner::new(&blob_file_path, &StdFs, 0);
assert!(result.is_err(), "expected error for missing data section");
let err = result.err().unwrap();
assert!(
matches!(err, crate::Error::InvalidHeader("BlobFile")),
"expected InvalidHeader for missing data section, got: {err:?}",
);
Ok(())
}
#[test]
fn blob_scanner_rejects_data_section_offset_overflow() -> crate::Result<()> {
use crate::io::{LittleEndian, WriteBytesExt};
use std::io::Write;
let dir = tempdir()?;
let blob_file_path = dir.path().join("0");
{
let mut file = std::fs::File::create(&blob_file_path)?;
file.write_all(b"\x00")?;
let toc_pos: u64 = 1;
let mut toc_buf = Vec::new();
toc_buf.write_all(b"TOC!")?;
toc_buf.write_u32::<LittleEndian>(1)?; toc_buf.write_u64::<LittleEndian>(1)?; toc_buf.write_u64::<LittleEndian>(u64::MAX)?; toc_buf.write_u16::<LittleEndian>(4)?; toc_buf.write_all(b"data")?;
let toc_checksum = xxhash_rust::xxh3::xxh3_128(&toc_buf);
let toc_len = toc_buf.len() as u64;
file.write_all(&toc_buf)?;
file.write_all(b"SFA!")?;
file.write_u8(0x1)?; file.write_u8(0x0)?; file.write_u128::<LittleEndian>(toc_checksum)?;
file.write_u64::<LittleEndian>(toc_pos)?;
file.write_u64::<LittleEndian>(toc_len)?;
file.sync_all()?;
}
let result = Scanner::new(&blob_file_path, &StdFs, 0);
assert!(
result.is_err(),
"expected error for overflowing data section"
);
let err = result.err().unwrap();
assert!(
matches!(err, crate::Error::InvalidHeader("BlobFile")),
"expected InvalidHeader(\"BlobFile\") for overflow, got: {err:?}",
);
Ok(())
}