use alloc::vec::Vec;
use crate::InternalValue;
use crate::runtime_config::ChecksumAlgorithm;
pub const FOOTER_TAIL_LEN: usize = 1 + 4;
#[must_use]
pub fn kv_digest(item: &InternalValue, algo: ChecksumAlgorithm) -> Option<u64> {
#[expect(
clippy::cast_possible_truncation,
reason = "user keys are bounded well below u32::MAX"
)]
let user_key_len = item.key.user_key.len() as u32;
let mut head = [0u8; 1 + 8 + 4];
head[0] = u8::from(item.key.value_type);
head[1..9].copy_from_slice(&item.key.seqno.to_le_bytes());
head[9..13].copy_from_slice(&user_key_len.to_le_bytes());
algo.compute_chunks(&[&head, &item.key.user_key, &item.value])
}
pub fn append_footer(payload: &mut Vec<u8>, digests: &[u64], algo: ChecksumAlgorithm) {
let size = algo.digest_size();
assert!(size <= 8, "digest_size must fit in u64 LE bytes");
for &d in digests {
let le = d.to_le_bytes();
#[expect(clippy::indexing_slicing, reason = "size <= 8 enforced above")]
payload.extend_from_slice(&le[..size]);
}
payload.push(algo.wire_tag());
#[expect(
clippy::cast_possible_truncation,
reason = "a data block never holds more than u32::MAX entries"
)]
payload.extend_from_slice(&(digests.len() as u32).to_le_bytes());
}
#[must_use]
pub fn descriptor_byte(algo: Option<ChecksumAlgorithm>) -> u8 {
match algo {
None => 0,
Some(a) => 1 + a.wire_tag(),
}
}
pub fn descriptor_from_byte(byte: u8) -> crate::Result<Option<ChecksumAlgorithm>> {
if byte == 0 {
return Ok(None);
}
ChecksumAlgorithm::from_wire_tag(byte - 1)
.map(Some)
.ok_or(crate::Error::InvalidTrailer)
}
pub fn split_inner(bytes: &[u8]) -> crate::Result<&[u8]> {
let total = bytes.len();
if total < FOOTER_TAIL_LEN {
return Err(crate::Error::InvalidTrailer);
}
let tail_start = total - FOOTER_TAIL_LEN;
let tail = bytes
.get(tail_start..total)
.ok_or(crate::Error::InvalidTrailer)?;
let algo_tag = *tail.first().ok_or(crate::Error::InvalidTrailer)?;
let algo = ChecksumAlgorithm::from_wire_tag(algo_tag).ok_or(crate::Error::InvalidTrailer)?;
let count = u32::from_le_bytes(
tail.get(1..)
.and_then(|s| s.try_into().ok())
.ok_or(crate::Error::InvalidTrailer)?,
) as usize;
let array_len = count
.checked_mul(algo.digest_size())
.ok_or(crate::Error::InvalidTrailer)?;
if array_len > tail_start {
return Err(crate::Error::InvalidTrailer);
}
let array_start = tail_start - array_len;
bytes.get(..array_start).ok_or(crate::Error::InvalidTrailer)
}
pub struct SplitFull<'a> {
pub inner: &'a [u8],
digest_array: &'a [u8],
pub algo: ChecksumAlgorithm,
count: usize,
}
impl SplitFull<'_> {
#[must_use]
pub fn count(&self) -> usize {
self.count
}
#[must_use]
pub fn digest(&self, index: usize) -> Option<u64> {
let size = self.algo.digest_size();
let off = index.checked_mul(size)?;
let end = off.checked_add(size)?;
let chunk = self.digest_array.get(off..end)?;
let mut word = [0u8; 8];
word.get_mut(..size)?.copy_from_slice(chunk);
Some(u64::from_le_bytes(word))
}
}
pub fn split_full(bytes: &[u8]) -> crate::Result<SplitFull<'_>> {
let total = bytes.len();
if total < FOOTER_TAIL_LEN {
return Err(crate::Error::InvalidTrailer);
}
let tail_start = total - FOOTER_TAIL_LEN;
let tail = bytes
.get(tail_start..total)
.ok_or(crate::Error::InvalidTrailer)?;
let algo_tag = *tail.first().ok_or(crate::Error::InvalidTrailer)?;
let algo = ChecksumAlgorithm::from_wire_tag(algo_tag).ok_or(crate::Error::InvalidTrailer)?;
let count = u32::from_le_bytes(
tail.get(1..)
.and_then(|s| s.try_into().ok())
.ok_or(crate::Error::InvalidTrailer)?,
) as usize;
let size = algo.digest_size();
let array_len = count
.checked_mul(size)
.ok_or(crate::Error::InvalidTrailer)?;
if array_len > tail_start {
return Err(crate::Error::InvalidTrailer);
}
let array_start = tail_start - array_len;
let digest_array = bytes
.get(array_start..tail_start)
.ok_or(crate::Error::InvalidTrailer)?;
let inner = bytes
.get(..array_start)
.ok_or(crate::Error::InvalidTrailer)?;
Ok(SplitFull {
inner,
digest_array,
algo,
count,
})
}
#[cfg(test)]
#[expect(clippy::expect_used, reason = "test code")]
mod tests {
use super::*;
use crate::ValueType;
fn val(user_key: &[u8], value: &[u8], seqno: u64, vt: ValueType) -> InternalValue {
InternalValue::from_components(user_key.to_vec(), value.to_vec(), seqno, vt)
}
#[test]
fn kv_digest_is_invariant_to_callsite_but_sensitive_to_content() {
let a = val(b"key", b"value", 7, ValueType::Value);
let same = val(b"key", b"value", 7, ValueType::Value);
let d = ChecksumAlgorithm::Xxh3_64;
assert_eq!(kv_digest(&a, d), kv_digest(&same, d));
assert_ne!(
kv_digest(&a, d),
kv_digest(&val(b"KEY", b"value", 7, ValueType::Value), d),
"user_key must matter"
);
assert_ne!(
kv_digest(&a, d),
kv_digest(&val(b"key", b"VALUE", 7, ValueType::Value), d),
"value must matter"
);
assert_ne!(
kv_digest(&a, d),
kv_digest(&val(b"key", b"value", 8, ValueType::Value), d),
"seqno must matter"
);
assert_ne!(
kv_digest(&a, d),
kv_digest(&val(b"key", b"value", 7, ValueType::Tombstone), d),
"value_type must matter"
);
}
#[test]
fn kv_digest_is_injective_across_key_value_boundary() {
let d = ChecksumAlgorithm::Xxh3_64;
let a = val(b"a", b"bc", 7, ValueType::Value);
let b = val(b"ab", b"c", 7, ValueType::Value);
assert_ne!(
kv_digest(&a, d),
kv_digest(&b, d),
"key/value boundary must be unambiguous in the digest domain",
);
}
#[test]
fn footer_roundtrips_inner_payload() {
for algo in [ChecksumAlgorithm::Xxh3_64, ChecksumAlgorithm::Xxh3Low32] {
let inner = b"standard data block payload bytes".to_vec();
let digests: Vec<u64> = (0..5).map(|i| 0x0102_0304_0506_0708 ^ i).collect();
let mut payload = inner.clone();
append_footer(&mut payload, &digests, algo);
assert!(
payload.len() > inner.len(),
"footer must add bytes for {algo:?}"
);
let recovered = split_inner(&payload).expect("well-formed footer must split");
assert_eq!(recovered, &inner[..], "inner payload must round-trip");
}
}
#[test]
fn split_inner_rejects_too_short() {
assert!(split_inner(&[0u8; FOOTER_TAIL_LEN - 1]).is_err());
}
#[test]
fn split_inner_rejects_unknown_algorithm_tag() {
let mut payload = b"inner".to_vec();
payload.push(0xFE); payload.extend_from_slice(&0u32.to_le_bytes());
assert!(split_inner(&payload).is_err());
}
#[test]
fn split_inner_rejects_count_exceeding_available_bytes() {
let mut payload = b"x".to_vec();
payload.push(ChecksumAlgorithm::Xxh3_64.wire_tag());
payload.extend_from_slice(&1000u32.to_le_bytes()); assert!(split_inner(&payload).is_err());
}
#[test]
fn split_full_recovers_digests() {
for algo in [ChecksumAlgorithm::Xxh3_64, ChecksumAlgorithm::Xxh3Low32] {
let inner = b"standard data block payload bytes".to_vec();
let digests: Vec<u64> = (0..5u64).map(|i| 0x0102_0304_0506_0708 ^ i).collect();
let mut payload = inner.clone();
append_footer(&mut payload, &digests, algo);
let split = split_full(&payload).expect("well-formed footer must split");
assert_eq!(split.inner, &inner[..], "inner payload must round-trip");
assert_eq!(split.algo, algo, "algorithm tag must round-trip");
let mask = if algo.digest_size() == 8 {
u64::MAX
} else {
0xFFFF_FFFF
};
let expected: Vec<u64> = digests.iter().map(|d| d & mask).collect();
assert_eq!(
split.count(),
expected.len(),
"digest count must round-trip"
);
let recovered: Vec<u64> = (0..split.count())
.map(|i| split.digest(i).expect("index < count is in range"))
.collect();
assert_eq!(
recovered, expected,
"digests must round-trip (masked to width) for {algo:?}"
);
}
}
#[test]
fn descriptor_byte_roundtrips_none_and_every_algorithm() {
assert_eq!(descriptor_byte(None), 0);
assert_eq!(
descriptor_from_byte(0).expect("zero decodes"),
None,
"0 must mean no footer"
);
for algo in [
ChecksumAlgorithm::Xxh3_64,
ChecksumAlgorithm::Xxh3Low32,
ChecksumAlgorithm::Crc32c,
] {
let byte = descriptor_byte(Some(algo));
assert_ne!(byte, 0, "present footer must not encode as 0 for {algo:?}");
assert_eq!(
descriptor_from_byte(byte).expect("present byte decodes"),
Some(algo),
"algorithm must round-trip for {algo:?}"
);
}
}
#[test]
fn descriptor_from_byte_rejects_unknown_nonzero_tag() {
assert!(descriptor_from_byte(0xFE).is_err());
}
}