#[cfg(zstd_any)]
use crate::compression::CompressionProvider as _;
use super::meta::Metadata;
use crate::{
Checksum, CompressionType, KeyRange, SeqNo, TreeId, UserKey,
checksum::ChecksummedWriter,
fs::{Fs, FsFile, FsOpenOptions},
time::unix_timestamp,
vlog::BlobFileId,
};
use byteorder::{LittleEndian, WriteBytesExt};
use std::{
io::{BufWriter, Write},
path::{Path, PathBuf},
};
const MAX_DECOMPRESSION_SIZE: usize = 256 * 1024 * 1024;
fn check_size_cap(len: usize) -> crate::Result<()> {
if len > MAX_DECOMPRESSION_SIZE {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: len as u64,
limit: MAX_DECOMPRESSION_SIZE as u64,
});
}
Ok(())
}
pub const BLOB_HEADER_MAGIC_V3: &[u8] = b"BLOB";
pub const BLOB_HEADER_MAGIC_V4: &[u8] = b"BLO4";
pub const BLOB_HEADER_LEN_V3: usize = BLOB_HEADER_MAGIC_V3.len()
+ std::mem::size_of::<u128>() + std::mem::size_of::<u64>() + std::mem::size_of::<u16>() + std::mem::size_of::<u32>() + std::mem::size_of::<u32>();
pub const BLOB_HEADER_LEN_V4: usize = BLOB_HEADER_LEN_V3 + std::mem::size_of::<u32>();
#[expect(
clippy::cast_possible_truncation,
reason = "intentionally truncated to 4-byte CRC"
)]
pub(super) fn compute_header_crc(
seqno: u64,
key_len: u16,
real_val_len: u32,
on_disk_val_len: u32,
) -> u32 {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(&seqno.to_le_bytes());
hasher.update(&key_len.to_le_bytes());
hasher.update(&real_val_len.to_le_bytes());
hasher.update(&on_disk_val_len.to_le_bytes());
hasher.digest() as u32
}
pub(super) fn validate_header_crc(
seqno: u64,
key_len: u16,
real_val_len: u32,
on_disk_val_len: u32,
stored_crc: u32,
) -> crate::Result<()> {
let recomputed_crc = compute_header_crc(seqno, key_len, real_val_len, on_disk_val_len);
if stored_crc != recomputed_crc {
return Err(crate::Error::HeaderCrcMismatch {
recomputed: recomputed_crc,
stored: stored_crc,
});
}
Ok(())
}
pub struct Writer {
pub(crate) tree_id: TreeId,
pub path: PathBuf,
pub(crate) blob_file_id: BlobFileId,
#[expect(clippy::struct_field_names)]
writer: sfa::Writer<ChecksummedWriter<BufWriter<Box<dyn FsFile>>>>,
offset: u64,
pub(crate) item_count: u64,
pub(crate) written_blob_bytes: u64,
pub(crate) uncompressed_bytes: u64,
pub(crate) first_key: Option<UserKey>,
pub(crate) last_key: Option<UserKey>,
pub(crate) compression: CompressionType,
}
impl Writer {
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(
path: P,
blob_file_id: BlobFileId,
tree_id: TreeId,
fs: &dyn Fs,
) -> crate::Result<Self> {
let path = path.as_ref();
let file = fs.open(path, &FsOpenOptions::new().write(true).create_new(true))?;
let writer = BufWriter::new(file);
let writer = ChecksummedWriter::new(writer);
let mut writer = sfa::Writer::from_writer(writer);
writer.start("data")?;
Ok(Self {
tree_id,
path: path.into(),
blob_file_id,
writer,
offset: 0,
item_count: 0,
written_blob_bytes: 0,
uncompressed_bytes: 0,
first_key: None,
last_key: None,
compression: CompressionType::None,
})
}
pub fn use_compression(mut self, compressor: CompressionType) -> Self {
self.compression = compressor;
self
}
#[must_use]
pub(crate) fn offset(&self) -> u64 {
self.offset
}
#[must_use]
pub(crate) fn blob_file_id(&self) -> BlobFileId {
self.blob_file_id
}
pub(crate) fn write_raw(
&mut self,
key: &[u8],
seqno: SeqNo,
value: &[u8],
uncompressed_len: u32,
) -> crate::Result<u32> {
assert!(!key.is_empty());
assert!(u16::try_from(key.len()).is_ok());
assert!(u32::try_from(value.len()).is_ok());
check_size_cap(uncompressed_len as usize)?;
check_size_cap(value.len())?;
let value = match &self.compression {
CompressionType::None => std::borrow::Cow::Borrowed(value),
#[cfg(feature = "lz4")]
CompressionType::Lz4 => {
let compressed = lz4_flex::compress(value);
check_size_cap(compressed.len())?;
std::borrow::Cow::Owned(compressed)
}
#[cfg(zstd_any)]
CompressionType::Zstd(level) => {
let compressed = crate::compression::ZstdBackend::compress(value, *level)?;
check_size_cap(compressed.len())?;
std::borrow::Cow::Owned(compressed)
}
#[cfg(zstd_any)]
CompressionType::ZstdDict { .. } => {
return Err(crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"zstd dictionary compression is not supported for blob files",
)));
}
};
let compressed_len_u32 = u32::try_from(value.len())
.map_err(|_| std::io::Error::other("compressed value length exceeds u32::MAX"))?;
if self.first_key.is_none() {
self.first_key = Some(key.into());
}
self.last_key = Some(key.into());
self.uncompressed_bytes += u64::from(uncompressed_len);
#[expect(clippy::cast_possible_truncation, reason = "keys are u16 length max")]
let header_crc = compute_header_crc(
seqno,
key.len() as u16,
uncompressed_len,
compressed_len_u32,
);
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(key);
hasher.update(&value);
hasher.update(&header_crc.to_le_bytes());
hasher.digest128()
};
self.writer.write_all(BLOB_HEADER_MAGIC_V4)?;
self.writer.write_u128::<LittleEndian>(checksum)?;
self.writer.write_u64::<LittleEndian>(seqno)?;
#[expect(clippy::cast_possible_truncation, reason = "keys are u16 length max")]
self.writer.write_u16::<LittleEndian>(key.len() as u16)?;
self.writer.write_u32::<LittleEndian>(uncompressed_len)?;
self.writer.write_u32::<LittleEndian>(compressed_len_u32)?;
self.writer.write_u32::<LittleEndian>(header_crc)?;
self.writer.write_all(key)?;
self.writer.write_all(&value)?;
self.offset += BLOB_HEADER_LEN_V4 as u64;
self.offset += key.len() as u64;
self.offset += value.len() as u64;
self.written_blob_bytes += value.len() as u64;
self.item_count += 1;
Ok(compressed_len_u32)
}
pub fn write(&mut self, key: &[u8], seqno: SeqNo, value: &[u8]) -> crate::Result<u32> {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 max")]
self.write_raw(key, seqno, value, value.len() as u32)
}
pub(crate) fn finish(mut self) -> crate::Result<(Metadata, Checksum)> {
self.writer.start("meta")?;
let metadata = Metadata {
id: self.blob_file_id,
version: 4,
created_at: unix_timestamp().as_nanos(),
item_count: self.item_count,
total_compressed_bytes: self.written_blob_bytes,
total_uncompressed_bytes: self.uncompressed_bytes,
#[expect(clippy::expect_used, reason = "should have written at least 1 item")]
key_range: KeyRange::new((
self.first_key
.clone()
.expect("should have written at least 1 item"),
self.last_key
.clone()
.expect("should have written at least 1 item"),
)),
compression: self.compression,
};
metadata.encode_into(&mut self.writer)?;
let mut checksum = self.writer.into_inner()?;
FsFile::sync_all(&**checksum.inner_mut().get_mut())?;
let checksum = checksum.checksum();
Ok((metadata, checksum))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::StdFs;
#[test]
fn blob_write_rejects_oversized_value() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path().join("test.blob");
let mut writer = Writer::new(&path, 0, 0, &StdFs)?;
#[expect(
clippy::cast_possible_truncation,
reason = "MAX_DECOMPRESSION_SIZE fits in u32"
)]
let oversize = MAX_DECOMPRESSION_SIZE as u32 + 1;
let result = writer.write_raw(b"key", 0, b"small-on-disk", oversize);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge, got: {result:?}",
);
Ok(())
}
#[test]
fn blob_write_accepts_max_size_value() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path().join("test.blob");
let mut writer = Writer::new(&path, 0, 0, &StdFs)?;
#[expect(
clippy::cast_possible_truncation,
reason = "MAX_DECOMPRESSION_SIZE fits in u32"
)]
let at_limit = MAX_DECOMPRESSION_SIZE as u32;
let result = writer.write_raw(b"key", 0, b"small-on-disk", at_limit);
assert!(result.is_ok(), "expected Ok, got: {result:?}");
Ok(())
}
#[test]
fn blob_write_rejects_oversized_value_none_compression() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path().join("test.blob");
let mut writer = Writer::new(&path, 0, 0, &StdFs)?;
let oversize_value = vec![0u8; MAX_DECOMPRESSION_SIZE + 1];
#[expect(
clippy::cast_possible_truncation,
reason = "MAX_DECOMPRESSION_SIZE fits in u32"
)]
let result = writer.write_raw(b"key", 0, &oversize_value, MAX_DECOMPRESSION_SIZE as u32);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge, got: {result:?}",
);
Ok(())
}
#[test]
#[cfg(feature = "lz4")]
fn blob_write_lz4_accepts_small_value() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path().join("test.blob");
let mut writer = Writer::new(&path, 0, 0, &StdFs)?.use_compression(CompressionType::Lz4);
let value = b"hello world lz4 test data";
#[expect(clippy::cast_possible_truncation, reason = "test value is 25 bytes")]
let result = writer.write_raw(b"key", 0, value, value.len() as u32);
assert!(result.is_ok(), "expected Ok, got: {result:?}");
Ok(())
}
#[test]
fn check_size_cap_rejects_over_limit() {
let result = super::check_size_cap(MAX_DECOMPRESSION_SIZE + 1);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge, got: {result:?}",
);
}
#[test]
fn check_size_cap_accepts_at_limit() {
assert!(super::check_size_cap(MAX_DECOMPRESSION_SIZE).is_ok());
assert!(super::check_size_cap(0).is_ok());
}
#[test]
#[cfg(zstd_any)]
fn blob_write_zstd_dict_unsupported() -> crate::Result<()> {
let folder = tempfile::tempdir()?;
let path = folder.path().join("test.blob");
let dict = crate::compression::ZstdDictionary::new(b"test dictionary");
let compression = CompressionType::ZstdDict {
level: 3,
dict_id: dict.id(),
};
let mut writer = Writer::new(&path, 0, 0, &StdFs)?.use_compression(compression);
let result = writer.write(b"key", 0, b"value");
assert!(
matches!(&result, Err(crate::Error::Io(e)) if e.kind() == std::io::ErrorKind::Unsupported),
"expected Io(Unsupported), got: {result:?}",
);
Ok(())
}
}