#[cfg(zstd_any)]
use crate::compression::CompressionProvider as _;
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
use super::meta::Metadata;
use crate::io::BufWriter;
#[cfg(not(feature = "std"))]
use crate::io::Write;
use crate::io::{LittleEndian, WriteBytesExt};
use crate::path::{Path, PathBuf};
use crate::{
Checksum, CompressionType, KeyRange, SeqNo, TreeId, UserKey,
checksum::ChecksummedWriter,
fs::{Fs, FsFile, FsOpenOptions, SyncMode},
time::unix_timestamp,
vlog::BlobFileId,
};
#[cfg(feature = "std")]
use std::io::Write;
const MAX_DECOMPRESSION_SIZE: usize = 256 * 1024 * 1024;
fn check_size_cap(len: usize) -> crate::Result<()> {
if len > MAX_DECOMPRESSION_SIZE {
return Err(crate::Error::DecompressedSizeTooLarge {
declared: len as u64,
limit: MAX_DECOMPRESSION_SIZE as u64,
});
}
Ok(())
}
pub const BLOB_HEADER_MAGIC_V3: &[u8] = b"BLOB";
pub const BLOB_HEADER_MAGIC_V4: &[u8] = b"BLO4";
pub const BLOB_HEADER_LEN_V3: usize = BLOB_HEADER_MAGIC_V3.len()
+ core::mem::size_of::<u128>() + core::mem::size_of::<u64>() + core::mem::size_of::<u16>() + core::mem::size_of::<u32>() + core::mem::size_of::<u32>();
pub const BLOB_HEADER_LEN_V4: usize = BLOB_HEADER_LEN_V3 + core::mem::size_of::<u32>();
#[expect(
clippy::cast_possible_truncation,
reason = "intentionally truncated to 4-byte CRC"
)]
pub(super) fn compute_header_crc(
seqno: u64,
key_len: u16,
real_val_len: u32,
on_disk_val_len: u32,
) -> u32 {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(&seqno.to_le_bytes());
hasher.update(&key_len.to_le_bytes());
hasher.update(&real_val_len.to_le_bytes());
hasher.update(&on_disk_val_len.to_le_bytes());
hasher.digest() as u32
}
pub(super) fn validate_header_crc(
seqno: u64,
key_len: u16,
real_val_len: u32,
on_disk_val_len: u32,
stored_crc: u32,
) -> crate::Result<()> {
let recomputed_crc = compute_header_crc(seqno, key_len, real_val_len, on_disk_val_len);
if stored_crc != recomputed_crc {
return Err(crate::Error::HeaderCrcMismatch {
recomputed: recomputed_crc,
stored: stored_crc,
});
}
Ok(())
}
pub struct Writer {
pub(crate) tree_id: TreeId,
pub path: PathBuf,
pub(crate) blob_file_id: BlobFileId,
#[expect(clippy::struct_field_names)]
writer: crate::sfa::Writer<ChecksummedWriter<BufWriter<Box<dyn FsFile>>>>,
offset: u64,
pub(crate) item_count: u64,
pub(crate) written_blob_bytes: u64,
pub(crate) uncompressed_bytes: u64,
pub(crate) first_key: Option<UserKey>,
pub(crate) last_key: Option<UserKey>,
pub(crate) compression: CompressionType,
pub(crate) metadata_compression_override: Option<CompressionType>,
pub(crate) sync_mode: SyncMode,
#[cfg(zstd_any)]
pub(crate) zstd_dictionary: Option<alloc::sync::Arc<crate::compression::ZstdDictionary>>,
}
impl Writer {
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(
path: P,
blob_file_id: BlobFileId,
tree_id: TreeId,
fs: &dyn Fs,
) -> crate::Result<Self> {
let path = path.as_ref();
let file = fs.open(path, &FsOpenOptions::new().write(true).create_new(true))?;
let writer = BufWriter::new(file);
let writer = ChecksummedWriter::new(writer);
let mut writer = crate::sfa::Writer::from_writer(writer);
writer.start("data")?;
Ok(Self {
tree_id,
path: path.into(),
blob_file_id,
writer,
offset: 0,
item_count: 0,
written_blob_bytes: 0,
uncompressed_bytes: 0,
first_key: None,
last_key: None,
compression: CompressionType::None,
metadata_compression_override: None,
sync_mode: SyncMode::Normal,
#[cfg(zstd_any)]
zstd_dictionary: None,
})
}
pub fn use_compression(mut self, compressor: CompressionType) -> Self {
self.compression = compressor;
self
}
#[must_use]
pub fn use_sync_mode(mut self, sync_mode: SyncMode) -> Self {
self.sync_mode = sync_mode;
self
}
#[cfg(zstd_any)]
#[must_use]
pub fn use_zstd_dictionary(
mut self,
dict: Option<alloc::sync::Arc<crate::compression::ZstdDictionary>>,
) -> Self {
self.zstd_dictionary = dict;
self
}
#[must_use]
pub(crate) fn offset(&self) -> u64 {
self.offset
}
#[must_use]
pub(crate) fn blob_file_id(&self) -> BlobFileId {
self.blob_file_id
}
pub(crate) fn write_raw(
&mut self,
key: &[u8],
seqno: SeqNo,
value: &[u8],
uncompressed_len: u32,
) -> crate::Result<u32> {
assert!(!key.is_empty());
assert!(u16::try_from(key.len()).is_ok());
assert!(u32::try_from(value.len()).is_ok());
check_size_cap(uncompressed_len as usize)?;
check_size_cap(value.len())?;
let value = match &self.compression {
CompressionType::None => alloc::borrow::Cow::Borrowed(value),
#[cfg(feature = "lz4")]
CompressionType::Lz4 => {
let compressed = lz4_flex::compress(value);
check_size_cap(compressed.len())?;
alloc::borrow::Cow::Owned(compressed)
}
#[cfg(zstd_any)]
CompressionType::Zstd(level) => {
let compressed = crate::compression::ZstdBackend::compress(value, *level)?;
check_size_cap(compressed.len())?;
alloc::borrow::Cow::Owned(compressed)
}
#[cfg(zstd_any)]
CompressionType::ZstdDict { level, dict_id } => {
let dict =
self.zstd_dictionary
.as_deref()
.ok_or(crate::Error::ZstdDictMismatch {
expected: *dict_id,
got: None,
})?;
if dict.id() != *dict_id {
return Err(crate::Error::ZstdDictMismatch {
expected: *dict_id,
got: Some(dict.id()),
});
}
let compressed =
crate::compression::ZstdBackend::compress_with_dict(value, *level, dict.raw())?;
check_size_cap(compressed.len())?;
alloc::borrow::Cow::Owned(compressed)
}
};
let compressed_len_u32 = u32::try_from(value.len())
.map_err(|_| crate::io::Error::other("compressed value length exceeds u32::MAX"))?;
if self.first_key.is_none() {
self.first_key = Some(key.into());
}
self.last_key = Some(key.into());
self.uncompressed_bytes += u64::from(uncompressed_len);
#[expect(clippy::cast_possible_truncation, reason = "keys are u16 length max")]
let header_crc = compute_header_crc(
seqno,
key.len() as u16,
uncompressed_len,
compressed_len_u32,
);
let checksum = {
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
hasher.update(key);
hasher.update(&value);
hasher.update(&header_crc.to_le_bytes());
hasher.digest128()
};
self.writer.write_all(BLOB_HEADER_MAGIC_V4)?;
self.writer.write_u128::<LittleEndian>(checksum)?;
self.writer.write_u64::<LittleEndian>(seqno)?;
#[expect(clippy::cast_possible_truncation, reason = "keys are u16 length max")]
self.writer.write_u16::<LittleEndian>(key.len() as u16)?;
self.writer.write_u32::<LittleEndian>(uncompressed_len)?;
self.writer.write_u32::<LittleEndian>(compressed_len_u32)?;
self.writer.write_u32::<LittleEndian>(header_crc)?;
self.writer.write_all(key)?;
self.writer.write_all(&value)?;
self.offset += BLOB_HEADER_LEN_V4 as u64;
self.offset += key.len() as u64;
self.offset += value.len() as u64;
self.written_blob_bytes += value.len() as u64;
self.item_count += 1;
Ok(compressed_len_u32)
}
pub fn write(&mut self, key: &[u8], seqno: SeqNo, value: &[u8]) -> crate::Result<u32> {
#[expect(clippy::cast_possible_truncation, reason = "values are u32 max")]
self.write_raw(key, seqno, value, value.len() as u32)
}
pub(crate) fn finish(mut self) -> crate::Result<(Metadata, Checksum)> {
self.writer.start("meta")?;
let metadata = Metadata {
id: self.blob_file_id,
version: 4,
created_at: unix_timestamp().as_nanos(),
item_count: self.item_count,
total_compressed_bytes: self.written_blob_bytes,
total_uncompressed_bytes: self.uncompressed_bytes,
#[expect(clippy::expect_used, reason = "should have written at least 1 item")]
key_range: KeyRange::new((
self.first_key
.clone()
.expect("should have written at least 1 item"),
self.last_key
.clone()
.expect("should have written at least 1 item"),
)),
compression: self
.metadata_compression_override
.unwrap_or(self.compression),
};
metadata.encode_into(&mut self.writer)?;
let mut checksum = self.writer.into_inner()?;
FsFile::sync_all_with(&**checksum.inner_mut().get_mut(), self.sync_mode)?;
let checksum = checksum.checksum();
Ok((metadata, checksum))
}
}
#[cfg(test)]
mod tests;