#[cfg(zstd_any)]
use crate::compression::CompressionProvider as _;
#[cfg(not(feature = "std"))]
use crate::io::{Cursor, Read};
use crate::io::{LittleEndian, ReadBytesExt};
use crate::{
BlobFile, Checksum, CompressionType, UserValue,
fs::FsFile,
vlog::{
ValueHandle,
blob_file::writer::{
BLOB_HEADER_LEN_V4, BLOB_HEADER_MAGIC_V3, BLOB_HEADER_MAGIC_V4, validate_header_crc,
},
},
};
#[cfg(feature = "std")]
use std::io::{Cursor, Read};
const MAX_DECOMPRESSION_SIZE: usize = 256 * 1024 * 1024;
pub struct Reader<'a> {
blob_file: &'a BlobFile,
file: &'a dyn FsFile,
#[cfg(zstd_any)]
zstd_dictionary: Option<&'a crate::compression::ZstdDictionary>,
}
impl<'a> Reader<'a> {
pub fn new(blob_file: &'a BlobFile, file: &'a dyn FsFile) -> Self {
Self {
blob_file,
file,
#[cfg(zstd_any)]
zstd_dictionary: None,
}
}
#[cfg(zstd_any)]
#[must_use]
pub fn with_dict(mut self, dict: Option<&'a crate::compression::ZstdDictionary>) -> Self {
self.zstd_dictionary = dict;
self
}
#[expect(
clippy::too_many_lines,
reason = "blob read/validation path is kept in one function so error handling and size checks stay co-located"
)]
pub fn get(&self, key: &'a [u8], vhandle: &'a ValueHandle) -> crate::Result<UserValue> {
debug_assert_eq!(vhandle.blob_file_id, self.blob_file.id());
if key.len() > u16::MAX as usize {
return Err(crate::Error::InvalidHeader("Blob"));
}
let add_size = (BLOB_HEADER_LEN_V4 as u64) + (key.len() as u64);
let max_total_read_size = (MAX_DECOMPRESSION_SIZE as u64) + add_size;
let total_read_size = u64::from(vhandle.on_disk_size) + add_size;
if total_read_size > max_total_read_size {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: total_read_size,
limit: max_total_read_size,
});
}
#[expect(
clippy::cast_possible_truncation,
reason = "bounded to MAX_DECOMPRESSION_SIZE + overhead by the check above"
)]
let read_len = total_read_size as usize;
let value = crate::file::read_exact(self.file, vhandle.offset, read_len)?;
let mut reader = Cursor::new(&value[..]);
let mut magic = [0u8; 4];
reader.read_exact(&mut magic)?;
let frame_is_v4 = magic == BLOB_HEADER_MAGIC_V4;
if !frame_is_v4 && magic != BLOB_HEADER_MAGIC_V3 {
return Err(crate::Error::InvalidHeader("Blob"));
}
let expected_checksum = reader.read_u128::<LittleEndian>()?;
let seqno = reader.read_u64::<LittleEndian>()?;
let key_len = reader.read_u16::<LittleEndian>()?;
let real_val_len = reader.read_u32::<LittleEndian>()? as usize;
let on_disk_val_len = reader.read_u32::<LittleEndian>()?;
let stored_header_crc = if frame_is_v4 {
let crc = reader.read_u32::<LittleEndian>()?;
#[expect(
clippy::cast_possible_truncation,
reason = "real_val_len originates as u32, round-tripped through usize; lossless on supported targets"
)]
validate_header_crc(seqno, key_len, real_val_len as u32, on_disk_val_len, crc)?;
Some(crc)
} else {
let _ = seqno;
None
};
if key_len as usize != key.len() || on_disk_val_len != vhandle.on_disk_size {
return Err(crate::Error::InvalidHeader("Blob"));
}
if real_val_len > MAX_DECOMPRESSION_SIZE {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: real_val_len as u64,
limit: MAX_DECOMPRESSION_SIZE as u64,
});
}
let header_len = if frame_is_v4 {
BLOB_HEADER_LEN_V4
} else {
crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V3
};
let on_disk_key = value.slice(header_len..header_len + key_len as usize);
if on_disk_key != key {
return Err(crate::Error::InvalidHeader("Blob"));
}
let data_offset = header_len + key.len();
let raw_data = value.slice(data_offset..data_offset + on_disk_val_len as usize);
{
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(&on_disk_key);
hasher.update(&raw_data);
if let Some(hcrc) = stored_header_crc {
hasher.update(&hcrc.to_le_bytes());
}
hasher.digest128()
};
if expected_checksum != checksum {
log::error!(
"Checksum mismatch for blob {vhandle:?}, got={checksum}, expected={expected_checksum}",
);
return Err(crate::Error::ChecksumMismatch {
got: Checksum::from_raw(checksum),
expected: Checksum::from_raw(expected_checksum),
});
}
}
#[warn(clippy::match_single_binding)]
let value = match &self.blob_file.0.meta.compression {
CompressionType::None => {
if real_val_len != raw_data.len() {
return Err(crate::Error::InvalidHeader("Blob"));
}
raw_data
}
#[cfg(feature = "lz4")]
CompressionType::Lz4 => {
let mut buf = vec![0u8; real_val_len];
let bytes_written = lz4_flex::decompress_into(&raw_data, &mut buf)
.map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?;
if bytes_written != real_val_len {
return Err(crate::Error::Decompress(self.blob_file.0.meta.compression));
}
UserValue::from(buf)
}
#[cfg(zstd_any)]
CompressionType::Zstd(_) => {
let decompressed =
crate::compression::ZstdBackend::decompress(&raw_data, real_val_len)
.map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?;
if decompressed.len() != real_val_len {
return Err(crate::Error::Decompress(self.blob_file.0.meta.compression));
}
UserValue::from(decompressed)
}
#[cfg(zstd_any)]
CompressionType::ZstdDict { dict_id, .. } => {
let dict = self.zstd_dictionary.ok_or(crate::Error::ZstdDictMismatch {
expected: *dict_id,
got: None,
})?;
if dict.id() != *dict_id {
return Err(crate::Error::ZstdDictMismatch {
expected: *dict_id,
got: Some(dict.id()),
});
}
let decompressed = crate::compression::ZstdBackend::decompress_with_dict(
&raw_data,
dict,
real_val_len,
)
.map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?;
if decompressed.len() != real_val_len {
return Err(crate::Error::Decompress(self.blob_file.0.meta.compression));
}
UserValue::from(decompressed)
}
};
debug_assert_eq!(real_val_len, value.len());
Ok(value)
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, clippy::indexing_slicing, reason = "test code")]
mod tests;