use crate::{
CompressionType, InternalValue, KeyRange, SeqNo, Slice,
checksum::ChecksumType,
coding::{Decode, Encode},
comparator::default_comparator,
table::{Block, DataBlock},
vlog::BlobFileId,
};
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::{Read, Write};
macro_rules! read_u64 {
($block:expr, $name:expr, $cmp:expr) => {{
let bytes = $block
.point_read($name, SeqNo::MAX, $cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?;
let mut bytes = &bytes.value[..];
bytes.read_u64::<LittleEndian>()?
}};
}
macro_rules! read_u128 {
($block:expr, $name:expr, $cmp:expr) => {{
let bytes = $block
.point_read($name, SeqNo::MAX, $cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?;
let mut bytes = &bytes.value[..];
bytes.read_u128::<LittleEndian>()?
}};
}
pub const METADATA_HEADER_MAGIC: &[u8] = b"META";
#[derive(Debug, PartialEq, Eq)]
pub struct Metadata {
pub id: BlobFileId,
pub version: u8,
pub created_at: u128,
pub item_count: u64,
pub total_compressed_bytes: u64,
pub total_uncompressed_bytes: u64,
pub key_range: KeyRange,
pub compression: CompressionType,
}
impl Metadata {
pub fn encode_into<W: Write>(&self, writer: &mut W) -> crate::Result<()> {
fn meta(key: &str, value: &[u8]) -> InternalValue {
InternalValue::from_components(key, value, 0, crate::ValueType::Value)
}
writer.write_all(METADATA_HEADER_MAGIC)?;
#[rustfmt::skip]
let meta_items = [
meta("blob_file_version", &[self.version]),
meta("checksum_type", &[u8::from(ChecksumType::Xxh3)]),
meta("compression", &self.compression.encode_into_vec()),
meta("crate_version", env!("CARGO_PKG_VERSION").as_bytes()),
meta("created_at", &self.created_at.to_le_bytes()),
meta("file_size", &self.total_compressed_bytes.to_le_bytes()),
meta("id", &self.id.to_le_bytes()),
meta("item_count", &self.item_count.to_le_bytes()),
meta("key#max", self.key_range.max()),
meta("key#min", self.key_range.min()),
meta("uncompressed_size", &self.total_uncompressed_bytes.to_le_bytes()),
];
#[cfg(debug_assertions)]
{
let is_sorted = meta_items.iter().is_sorted_by_key(|kv| &kv.key);
assert!(is_sorted, "meta items not sorted correctly");
}
let buf = DataBlock::encode_into_vec(&meta_items, 1, 0.0)?;
Block::write_into(
writer,
&buf,
crate::table::block::BlockType::Meta,
CompressionType::None,
None,
#[cfg(zstd_any)]
None,
)?;
Ok(())
}
pub fn from_slice(slice: &Slice) -> crate::Result<Self> {
let reader = &mut &slice[..];
let mut magic = [0u8; METADATA_HEADER_MAGIC.len()];
reader.read_exact(&mut magic)?;
if magic != METADATA_HEADER_MAGIC {
return Err(crate::Error::InvalidHeader("BlobFileMeta"));
}
let block = Block::from_reader(
reader,
CompressionType::None,
None,
#[cfg(zstd_any)]
None,
)?;
let block = DataBlock::new(block);
let cmp = default_comparator();
let version = {
let bytes = block
.point_read(b"blob_file_version", SeqNo::MAX, &cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?;
*bytes
.value
.first()
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?
};
match version {
3 | 4 => {}
_ => return Err(crate::Error::InvalidHeader("BlobFileMeta")),
}
let id = read_u64!(block, b"id", &cmp);
let created_at = read_u128!(block, b"created_at", &cmp);
let item_count = read_u64!(block, b"item_count", &cmp);
let file_size = read_u64!(block, b"file_size", &cmp);
let total_uncompressed_bytes = read_u64!(block, b"uncompressed_size", &cmp);
let compression = {
let bytes = block
.point_read(b"compression", SeqNo::MAX, &cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?;
let mut bytes = &bytes.value[..];
CompressionType::decode_from(&mut bytes)?
};
let key_range = KeyRange::new((
block
.point_read(b"key#min", SeqNo::MAX, &cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?
.value,
block
.point_read(b"key#max", SeqNo::MAX, &cmp)?
.ok_or(crate::Error::InvalidHeader("BlobFileMeta"))?
.value,
));
Ok(Self {
id,
version,
created_at,
compression,
item_count,
total_compressed_bytes: file_size,
total_uncompressed_bytes,
key_range,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
#[test]
fn test_blob_file_meta_truncated_returns_err() {
let buf = Slice::from(METADATA_HEADER_MAGIC.to_vec());
assert!(Metadata::from_slice(&buf).is_err());
}
#[test]
#[expect(clippy::unwrap_used)]
fn test_blob_file_meta_missing_field_returns_err() {
use crate::table::block::BlockType;
use std::io::Write;
fn meta(key: &str, value: &[u8]) -> InternalValue {
InternalValue::from_components(key, value, 0, crate::ValueType::Value)
}
#[rustfmt::skip]
let meta_items = [
meta("blob_file_version", &[4u8]),
meta("checksum_type", &[u8::from(ChecksumType::Xxh3)]),
meta("crate_version", env!("CARGO_PKG_VERSION").as_bytes()),
meta("created_at", &1_234_567_890u128.to_le_bytes()),
meta("file_size", &1024u64.to_le_bytes()),
meta("id", &0u64.to_le_bytes()),
meta("item_count", &100u64.to_le_bytes()),
meta("key#max", b"z"),
meta("key#min", b"a"),
meta("uncompressed_size", &2048u64.to_le_bytes()),
];
let encoded = DataBlock::encode_into_vec(&meta_items, 1, 0.0).unwrap();
let mut buf = Vec::new();
buf.write_all(METADATA_HEADER_MAGIC).unwrap();
Block::write_into(
&mut buf,
&encoded,
BlockType::Meta,
CompressionType::None,
None,
#[cfg(zstd_any)]
None,
)
.unwrap();
let buf = Slice::from(buf);
let result = Metadata::from_slice(&buf);
assert!(
matches!(result, Err(crate::Error::InvalidHeader("BlobFileMeta"))),
"expected Err(InvalidHeader(\"BlobFileMeta\")), got {result:?}",
);
}
#[test]
#[expect(clippy::unwrap_used)]
fn test_blob_file_meta_corrupted_trailer_returns_err() {
let meta = Metadata {
id: 0,
version: 4,
created_at: 1_234_567_890,
compression: CompressionType::None,
item_count: 100,
total_compressed_bytes: 1024,
total_uncompressed_bytes: 2048,
key_range: KeyRange::new((b"a".into(), b"z".into())),
};
let mut buf = Vec::new();
meta.encode_into(&mut buf).unwrap();
let len = buf.len();
assert!(len >= 4, "buffer too small for corruption");
#[expect(clippy::indexing_slicing, reason = "length checked above")]
for b in &mut buf[len - 4..] {
*b ^= 0xFF;
}
let buf = Slice::from(buf);
let result = Metadata::from_slice(&buf);
assert!(
result.is_err(),
"corrupted trailer must produce Err, got {result:?}",
);
}
#[test]
#[expect(clippy::unwrap_used)]
fn test_blob_file_meta_roundtrip() {
let meta = Metadata {
id: 0,
version: 4,
created_at: 1_234_567_890,
compression: CompressionType::None,
item_count: 100,
total_compressed_bytes: 1024,
total_uncompressed_bytes: 2048,
key_range: KeyRange::new((b"a".into(), b"z".into())),
};
let mut buf = Vec::new();
meta.encode_into(&mut buf).unwrap();
let buf = Slice::from(buf);
let meta2 = Metadata::from_slice(&buf).unwrap();
assert_eq!(meta, meta2);
}
}