use crate::types::{MeruError, Result};
use bytes::Bytes;
pub const KV_INDEX_VERSION: u8 = 1;
pub const KV_INDEX_FOOTER_KEY: &str = "merutable.kv_index.v1";
pub const DEFAULT_RESTART_INTERVAL: u32 = 16;
const HEADER_SIZE: usize = 24;
const ENTRY_FIXED_TAIL: usize = 8 + 4 + 8; const ENTRY_HEADER: usize = 2 + 2;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PageLocation {
pub page_offset: u64,
pub page_size: u32,
pub first_row_index: u64,
}
pub const MAX_KEY_LEN: usize = u16::MAX as usize;
pub fn build(entries: &[(Vec<u8>, PageLocation)], restart_interval: u32) -> Result<Bytes> {
let restart_interval = restart_interval.max(1);
let num_entries = entries.len() as u32;
let num_restarts = if num_entries == 0 {
0
} else {
num_entries.div_ceil(restart_interval)
};
let mut entries_buf: Vec<u8> = Vec::with_capacity(
entries
.iter()
.map(|(k, _)| ENTRY_HEADER + k.len() + ENTRY_FIXED_TAIL)
.sum(),
);
let mut restart_offsets: Vec<u32> = Vec::with_capacity(num_restarts as usize);
let mut prev_key: &[u8] = &[];
for (i, (key, loc)) in entries.iter().enumerate() {
if key.len() > MAX_KEY_LEN {
return Err(MeruError::Parquet(format!(
"kv_index::build: key {i} has length {} bytes, exceeds MAX_KEY_LEN ({MAX_KEY_LEN})",
key.len()
)));
}
if i > 0 && key.as_slice() <= prev_key {
return Err(MeruError::Parquet(format!(
"kv_index::build requires strictly ascending keys; entry {i} is not > entry {}",
i - 1
)));
}
let is_restart = (i as u32).is_multiple_of(restart_interval);
let shared = if is_restart {
0
} else {
shared_prefix_len(prev_key, key)
};
let suffix = &key[shared..];
if is_restart {
restart_offsets.push(entries_buf.len() as u32);
}
entries_buf.extend_from_slice(&(shared as u16).to_le_bytes());
entries_buf.extend_from_slice(&(suffix.len() as u16).to_le_bytes());
entries_buf.extend_from_slice(suffix);
entries_buf.extend_from_slice(&loc.page_offset.to_le_bytes());
entries_buf.extend_from_slice(&loc.page_size.to_le_bytes());
entries_buf.extend_from_slice(&loc.first_row_index.to_le_bytes());
prev_key = key;
}
let entries_size = entries_buf.len() as u32;
let total_size = HEADER_SIZE + entries_size as usize + 4 * restart_offsets.len();
let mut out = Vec::with_capacity(total_size);
out.push(KV_INDEX_VERSION);
out.extend_from_slice(&[0u8, 0, 0]); out.extend_from_slice(&num_entries.to_le_bytes());
out.extend_from_slice(&restart_interval.to_le_bytes());
out.extend_from_slice(&entries_size.to_le_bytes());
out.extend_from_slice(&(restart_offsets.len() as u32).to_le_bytes());
out.extend_from_slice(&0u32.to_le_bytes()); debug_assert_eq!(out.len(), HEADER_SIZE);
out.extend_from_slice(&entries_buf);
for ro in &restart_offsets {
out.extend_from_slice(&ro.to_le_bytes());
}
Ok(Bytes::from(out))
}
fn shared_prefix_len(a: &[u8], b: &[u8]) -> usize {
let n = a.len().min(b.len());
let mut i = 0;
while i < n && a[i] == b[i] {
i += 1;
}
i
}
#[derive(Debug, Clone)]
pub struct KvSparseIndex {
bytes: Bytes,
num_entries: u32,
restart_interval: u32,
entries_offset: usize,
entries_size: usize,
restart_offsets: Vec<u32>,
}
impl KvSparseIndex {
pub fn from_bytes(bytes: Bytes) -> Result<Self> {
if bytes.len() < HEADER_SIZE {
return Err(MeruError::Corruption(format!(
"kv_index: buffer too small ({} < {HEADER_SIZE})",
bytes.len()
)));
}
let version = bytes[0];
if version != KV_INDEX_VERSION {
return Err(MeruError::Corruption(format!(
"kv_index: unsupported version {version} (expected {KV_INDEX_VERSION})"
)));
}
let num_entries = u32_at(&bytes, 4);
let restart_interval = u32_at(&bytes, 8);
let entries_size = u32_at(&bytes, 12) as usize;
let num_restarts = u32_at(&bytes, 16) as usize;
if restart_interval == 0 {
return Err(MeruError::Corruption(
"kv_index: restart_interval is 0".into(),
));
}
let entries_offset = HEADER_SIZE;
let restart_table_bytes = num_restarts
.checked_mul(4)
.ok_or_else(|| MeruError::Corruption("kv_index: num_restarts overflow".into()))?;
let restart_section_offset = entries_offset
.checked_add(entries_size)
.ok_or_else(|| MeruError::Corruption("kv_index: entries_size overflow".into()))?;
let expected_total = restart_section_offset
.checked_add(restart_table_bytes)
.ok_or_else(|| MeruError::Corruption("kv_index: total size overflow".into()))?;
if bytes.len() != expected_total {
return Err(MeruError::Corruption(format!(
"kv_index: buffer size mismatch (have {}, need exactly {expected_total})",
bytes.len()
)));
}
let mut restart_offsets: Vec<u32> = Vec::with_capacity(num_restarts);
let mut prev_ro: Option<u32> = None;
for i in 0..num_restarts {
let ro = u32_at(&bytes, restart_section_offset + 4 * i);
if (ro as usize) > entries_size {
return Err(MeruError::Corruption(format!(
"kv_index: restart_offset[{i}] = {ro} exceeds entries_size {entries_size}"
)));
}
if let Some(prev) = prev_ro {
if ro <= prev && i > 0 {
return Err(MeruError::Corruption(format!(
"kv_index: restart_offset[{i}] = {ro} is not > previous {prev}"
)));
}
}
prev_ro = Some(ro);
restart_offsets.push(ro);
}
let expected_restarts = if num_entries == 0 {
0
} else {
num_entries.div_ceil(restart_interval) as usize
};
if num_restarts != expected_restarts {
return Err(MeruError::Corruption(format!(
"kv_index: num_restarts {num_restarts} inconsistent with num_entries \
{num_entries} / restart_interval {restart_interval} (expected {expected_restarts})"
)));
}
{
let mut cursor = 0usize;
let mut prev_key_len: usize = 0;
let entries_end = entries_size;
for i in 0..num_entries as usize {
let abs = entries_offset + cursor;
if cursor + ENTRY_HEADER > entries_end {
return Err(MeruError::Corruption(format!(
"kv_index: entry {i} header overruns entries section"
)));
}
let shared = u16_at(&bytes, abs) as usize;
let suffix_len = u16_at(&bytes, abs + 2) as usize;
let is_restart = (i as u32).is_multiple_of(restart_interval);
if is_restart && shared != 0 {
return Err(MeruError::Corruption(format!(
"kv_index: restart entry {i} has shared={shared} (expected 0)"
)));
}
if shared > prev_key_len {
return Err(MeruError::Corruption(format!(
"kv_index: entry {i} shared={shared} exceeds previous key length {prev_key_len}"
)));
}
let need = ENTRY_HEADER + suffix_len + ENTRY_FIXED_TAIL;
if cursor + need > entries_end {
return Err(MeruError::Corruption(format!(
"kv_index: entry {i} body overruns entries section \
(need {need} at cursor {cursor}, entries_size {entries_end})"
)));
}
if is_restart {
let restart_idx = (i as u32 / restart_interval) as usize;
let expected_ro = cursor as u32;
if restart_offsets[restart_idx] != expected_ro {
return Err(MeruError::Corruption(format!(
"kv_index: restart_offset[{restart_idx}] = {} does not point at \
entry {i} (expected byte offset {expected_ro})",
restart_offsets[restart_idx]
)));
}
}
prev_key_len = shared + suffix_len;
cursor += need;
}
if cursor != entries_end {
return Err(MeruError::Corruption(format!(
"kv_index: trailing bytes in entries section (walked {cursor}, \
entries_size {entries_end})"
)));
}
}
Ok(Self {
bytes,
num_entries,
restart_interval,
entries_offset,
entries_size,
restart_offsets,
})
}
pub fn len(&self) -> usize {
self.num_entries as usize
}
pub fn is_empty(&self) -> bool {
self.num_entries == 0
}
pub fn restart_interval(&self) -> u32 {
self.restart_interval
}
pub fn encoded_size(&self) -> usize {
HEADER_SIZE + self.entries_size + 4 * self.restart_offsets.len()
}
pub fn find_page_with_next(&self, target: &[u8]) -> Option<(PageLocation, Option<u64>)> {
self.find_page_inner(target)
}
pub fn find_page(&self, target: &[u8]) -> Option<PageLocation> {
self.find_page_inner(target).map(|(loc, _)| loc)
}
fn find_page_inner(&self, target: &[u8]) -> Option<(PageLocation, Option<u64>)> {
if self.num_entries == 0 {
return None;
}
let restarts = &self.restart_offsets;
let mut lo: i64 = -1; let mut hi: i64 = restarts.len() as i64;
while hi - lo > 1 {
let mid = ((lo + hi) / 2) as usize;
let mid_key = self.read_restart_key(mid);
match mid_key.cmp(target) {
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => lo = mid as i64,
std::cmp::Ordering::Greater => hi = mid as i64,
}
}
let scan_start_pos = if lo < 0 {
0
} else {
restarts[lo as usize] as usize
};
let mut cursor = scan_start_pos;
let mut prev_key: Vec<u8> = Vec::new();
let mut best: Option<PageLocation> = None;
let mut next_first_row: Option<u64> = None;
loop {
if cursor >= self.entries_size {
break;
}
let (entry_key, loc, next_cursor) = self.decode_entry_at(cursor, &prev_key);
match entry_key.as_slice().cmp(target) {
std::cmp::Ordering::Greater => {
if best.is_some() {
next_first_row = Some(loc.first_row_index);
}
break;
}
_ => {
best = Some(loc);
prev_key = entry_key;
cursor = next_cursor;
}
}
}
best.map(|loc| (loc, next_first_row))
}
fn read_restart_key(&self, idx: usize) -> &[u8] {
let pos = self.entries_offset + self.restart_offsets[idx] as usize;
let suffix_len = u16_at(&self.bytes, pos + 2) as usize;
let key_start = pos + ENTRY_HEADER;
&self.bytes[key_start..key_start + suffix_len]
}
fn decode_entry_at(&self, cursor: usize, prev_key: &[u8]) -> (Vec<u8>, PageLocation, usize) {
let abs = self.entries_offset + cursor;
let shared = u16_at(&self.bytes, abs) as usize;
let suffix_len = u16_at(&self.bytes, abs + 2) as usize;
let suffix_start = abs + ENTRY_HEADER;
let suffix = &self.bytes[suffix_start..suffix_start + suffix_len];
let mut key = Vec::with_capacity(shared + suffix_len);
key.extend_from_slice(&prev_key[..shared]);
key.extend_from_slice(suffix);
let tail = suffix_start + suffix_len;
let page_offset = u64_at(&self.bytes, tail);
let page_size = u32_at(&self.bytes, tail + 8);
let first_row_index = u64_at(&self.bytes, tail + 12);
let next_cursor = (tail + 20) - self.entries_offset;
(
key,
PageLocation {
page_offset,
page_size,
first_row_index,
},
next_cursor,
)
}
pub fn iter(&self) -> KvSparseIndexIter<'_> {
KvSparseIndexIter {
index: self,
cursor: 0,
prev_key: Vec::new(),
remaining: self.num_entries,
}
}
}
pub struct KvSparseIndexIter<'a> {
index: &'a KvSparseIndex,
cursor: usize,
prev_key: Vec<u8>,
remaining: u32,
}
impl Iterator for KvSparseIndexIter<'_> {
type Item = (Vec<u8>, PageLocation);
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let (key, loc, next) = self.index.decode_entry_at(self.cursor, &self.prev_key);
self.cursor = next;
self.prev_key = key.clone();
self.remaining -= 1;
Some((key, loc))
}
}
#[inline]
fn u16_at(buf: &[u8], pos: usize) -> u16 {
u16::from_le_bytes([buf[pos], buf[pos + 1]])
}
#[inline]
fn u32_at(buf: &[u8], pos: usize) -> u32 {
u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
}
#[inline]
fn u64_at(buf: &[u8], pos: usize) -> u64 {
u64::from_le_bytes([
buf[pos],
buf[pos + 1],
buf[pos + 2],
buf[pos + 3],
buf[pos + 4],
buf[pos + 5],
buf[pos + 6],
buf[pos + 7],
])
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
fn loc(offset: u64, size: u32, row: u64) -> PageLocation {
PageLocation {
page_offset: offset,
page_size: size,
first_row_index: row,
}
}
#[test]
fn empty_index_round_trip() {
let bytes = build(&[], DEFAULT_RESTART_INTERVAL).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
assert_eq!(idx.len(), 0);
assert!(idx.is_empty());
assert_eq!(idx.find_page(b"anything"), None);
assert_eq!(idx.iter().count(), 0);
}
#[test]
fn single_entry_returns_for_anything_geq() {
let entries = vec![(b"banana".to_vec(), loc(100, 8192, 0))];
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
assert_eq!(idx.len(), 1);
assert_eq!(idx.find_page(b"apple"), None); assert_eq!(idx.find_page(b"banana"), Some(loc(100, 8192, 0)));
assert_eq!(idx.find_page(b"cherry"), Some(loc(100, 8192, 0)));
}
#[test]
fn small_ordered_round_trip() {
let raw: Vec<(&[u8], PageLocation)> = vec![
(b"k/0001/aaaa", loc(0, 8192, 0)),
(b"k/0001/bbbb", loc(8192, 8192, 100)),
(b"k/0001/cccc", loc(16384, 8192, 200)),
(b"k/0002/aaaa", loc(24576, 8192, 300)),
(b"k/0002/bbbb", loc(32768, 8192, 400)),
];
let entries: Vec<(Vec<u8>, PageLocation)> =
raw.iter().map(|(k, l)| (k.to_vec(), *l)).collect();
let bytes = build(&entries, 2).unwrap(); let idx = KvSparseIndex::from_bytes(bytes).unwrap();
assert_eq!(idx.len(), 5);
assert_eq!(idx.restart_interval(), 2);
let collected: Vec<(Vec<u8>, PageLocation)> = idx.iter().collect();
assert_eq!(collected, entries);
for (k, l) in &entries {
assert_eq!(idx.find_page(k), Some(*l), "exact lookup failed for {k:?}");
}
assert_eq!(
idx.find_page(b"k/0001/aaab"),
Some(loc(0, 8192, 0)),
"predecessor of /aaab should be /aaaa"
);
assert_eq!(idx.find_page(b"k/0001/cccd"), Some(loc(16384, 8192, 200)));
assert_eq!(idx.find_page(b"k/9999/zzzz"), Some(loc(32768, 8192, 400)));
assert_eq!(idx.find_page(b"k/0000/zzzz"), None);
}
#[test]
fn matches_btreemap_oracle() {
let mut rng_state: u64 = 0xdeadbeefcafebabe;
let mut next = || {
rng_state = rng_state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
rng_state
};
let mut oracle: BTreeMap<Vec<u8>, PageLocation> = BTreeMap::new();
for i in 0..1024u64 {
let r = next();
let key = format!("tenant/0001/table/users/pk/{:016x}/{:016x}", i, r).into_bytes();
oracle.insert(key, loc(i * 8192, 8192, i * 100));
}
let entries: Vec<(Vec<u8>, PageLocation)> =
oracle.iter().map(|(k, v)| (k.clone(), *v)).collect();
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
for (k, expected) in &oracle {
let got = idx.find_page(k);
assert_eq!(got, Some(*expected), "exact lookup failed for {k:?}");
}
for _ in 0..1024 {
let r = next();
let probe =
format!("tenant/0001/table/users/pk/{:016x}/{:016x}", r % 2048, r).into_bytes();
let oracle_answer: Option<PageLocation> =
oracle.range(..=probe.clone()).next_back().map(|(_, v)| *v);
let idx_answer = idx.find_page(&probe);
assert_eq!(
idx_answer, oracle_answer,
"oracle disagreement for probe {probe:?}"
);
}
}
#[test]
fn prefix_compression_meaningfully_shrinks_index() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..512u64)
.map(|i| {
(
format!("tenant/0001/table/users/pk/{:032x}", i).into_bytes(),
loc(i * 8192, 8192, i * 100),
)
})
.collect();
let raw_key_bytes: usize = entries.iter().map(|(k, _)| k.len()).sum();
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let idx = KvSparseIndex::from_bytes(bytes.clone()).unwrap();
let on_disk = idx.encoded_size();
assert_eq!(on_disk, bytes.len());
let raw_lower_bound = raw_key_bytes;
assert!(
on_disk * 2 < raw_lower_bound + 24 * entries.len(),
"kv_index expected ≥2x compression vs raw keys; got {on_disk} bytes for {raw_key_bytes} raw key bytes ({} entries)",
entries.len()
);
}
#[test]
fn truncated_buffer_is_rejected() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..32u64)
.map(|i| (format!("k{i:04}").into_bytes(), loc(i, 1, i)))
.collect();
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let truncated = bytes.slice(..bytes.len() - 8);
let result = KvSparseIndex::from_bytes(truncated);
assert!(result.is_err(), "truncated buffer should be rejected");
let tiny = bytes.slice(..4);
let result = KvSparseIndex::from_bytes(tiny);
assert!(result.is_err(), "tiny buffer should be rejected");
}
#[test]
fn wrong_version_is_rejected() {
let entries: Vec<(Vec<u8>, PageLocation)> = vec![(b"hello".to_vec(), loc(0, 1, 0))];
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let mut tampered = bytes.to_vec();
tampered[0] = 99;
let result = KvSparseIndex::from_bytes(Bytes::from(tampered));
assert!(matches!(result, Err(MeruError::Corruption(_))));
}
#[test]
fn restart_interval_of_one_works() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..16u64)
.map(|i| (format!("k{i:04}").into_bytes(), loc(i * 100, 50, i)))
.collect();
let bytes = build(&entries, 1).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
for (k, l) in &entries {
assert_eq!(idx.find_page(k), Some(*l));
}
}
#[test]
fn find_page_with_next_reports_successor_first_row() {
let entries: Vec<(Vec<u8>, PageLocation)> = vec![
(b"k01".to_vec(), loc(0, 8192, 0)),
(b"k02".to_vec(), loc(8192, 8192, 100)),
(b"k03".to_vec(), loc(16384, 8192, 250)),
(b"k04".to_vec(), loc(24576, 8192, 400)),
];
let bytes = build(&entries, 2).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
let (got, next) = idx.find_page_with_next(b"k02").unwrap();
assert_eq!(got, loc(8192, 8192, 100));
assert_eq!(next, Some(250));
let (got, next) = idx.find_page_with_next(b"k02zzz").unwrap();
assert_eq!(got, loc(8192, 8192, 100));
assert_eq!(next, Some(250));
let (got, next) = idx.find_page_with_next(b"k01").unwrap();
assert_eq!(got, loc(0, 8192, 0));
assert_eq!(next, Some(100));
let (got, next) = idx.find_page_with_next(b"k04").unwrap();
assert_eq!(got, loc(24576, 8192, 400));
assert_eq!(next, None);
let (got, next) = idx.find_page_with_next(b"k99").unwrap();
assert_eq!(got, loc(24576, 8192, 400));
assert_eq!(next, None);
assert!(idx.find_page_with_next(b"k00").is_none());
}
#[test]
fn iterator_yields_in_order_for_random_lengths() {
let raw_keys: Vec<&[u8]> = vec![
b"a",
b"aa",
b"aaa",
b"aab",
b"ab",
b"abcdefghijklmnopqrstuvwxyz",
b"abcdefghijklmnopqrstuvwxyz1",
b"b",
b"ba",
b"baz",
];
let entries: Vec<(Vec<u8>, PageLocation)> = raw_keys
.iter()
.enumerate()
.map(|(i, k)| (k.to_vec(), loc(i as u64 * 8, 8, i as u64)))
.collect();
let bytes = build(&entries, 4).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
let yielded: Vec<Vec<u8>> = idx.iter().map(|(k, _)| k).collect();
assert_eq!(
yielded,
raw_keys.iter().map(|k| k.to_vec()).collect::<Vec<_>>()
);
}
#[test]
fn build_rejects_key_longer_than_u16() {
let oversized = vec![0x41u8; MAX_KEY_LEN + 1];
let entries = vec![(oversized, loc(0, 1, 0))];
let err = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("MAX_KEY_LEN") || msg.contains("exceeds"),
"error should name the length limit: {msg}"
);
}
#[test]
fn build_accepts_key_at_u16_max() {
let at_limit = vec![0x42u8; MAX_KEY_LEN];
let entries = vec![(at_limit.clone(), loc(42, 1, 7))];
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let idx = KvSparseIndex::from_bytes(bytes).unwrap();
assert_eq!(idx.len(), 1);
assert_eq!(idx.find_page(&at_limit), Some(loc(42, 1, 7)));
}
#[test]
fn build_rejects_unsorted_input() {
let entries = vec![
(b"zebra".to_vec(), loc(0, 1, 0)),
(b"apple".to_vec(), loc(1, 1, 1)),
];
let err = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("strictly ascending") || msg.contains("not >"),
"{msg}"
);
}
#[test]
fn build_rejects_duplicate_adjacent_keys() {
let entries = vec![
(b"dup".to_vec(), loc(0, 1, 0)),
(b"dup".to_vec(), loc(1, 1, 1)),
];
assert!(build(&entries, DEFAULT_RESTART_INTERVAL).is_err());
}
#[test]
fn from_bytes_rejects_trailing_garbage() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..8u64)
.map(|i| (format!("k{i:04}").into_bytes(), loc(i, 1, i)))
.collect();
let bytes = build(&entries, DEFAULT_RESTART_INTERVAL).unwrap();
let mut padded = bytes.to_vec();
padded.extend_from_slice(b"trailing garbage");
let err = KvSparseIndex::from_bytes(Bytes::from(padded)).unwrap_err();
assert!(matches!(err, MeruError::Corruption(_)));
}
#[test]
fn from_bytes_rejects_restart_offset_out_of_range() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..4u64)
.map(|i| (format!("k{i:04}").into_bytes(), loc(i, 1, i)))
.collect();
let bytes = build(&entries, 2).unwrap();
let entries_size = u32_at(&bytes, 12) as usize;
let num_restarts = u32_at(&bytes, 16) as usize;
assert!(num_restarts >= 1);
let restart_table_start = HEADER_SIZE + entries_size;
let mut tampered = bytes.to_vec();
let bad = (entries_size as u32 + 9999).to_le_bytes();
tampered[restart_table_start..restart_table_start + 4].copy_from_slice(&bad);
let err = KvSparseIndex::from_bytes(Bytes::from(tampered)).unwrap_err();
assert!(matches!(err, MeruError::Corruption(_)));
}
#[test]
fn from_bytes_rejects_shared_exceeding_prev_key() {
let entries = vec![
(b"aa".to_vec(), loc(0, 1, 0)),
(b"ab".to_vec(), loc(1, 1, 1)),
];
let bytes = build(&entries, 4).unwrap();
let mut tampered = bytes.to_vec();
let entry1_off = HEADER_SIZE + 4 + 2 + 20;
tampered[entry1_off..entry1_off + 2].copy_from_slice(&99u16.to_le_bytes());
let err = KvSparseIndex::from_bytes(Bytes::from(tampered)).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("shared") && msg.contains("exceeds"),
"error should name the shared/prev-key mismatch: {msg}"
);
}
#[test]
fn from_bytes_rejects_restart_entry_with_nonzero_shared() {
let entries = vec![
(b"aa".to_vec(), loc(0, 1, 0)),
(b"ab".to_vec(), loc(1, 1, 1)),
];
let bytes = build(&entries, 1).unwrap();
let mut tampered = bytes.to_vec();
let entry1_off = HEADER_SIZE + 26;
tampered[entry1_off..entry1_off + 2].copy_from_slice(&1u16.to_le_bytes());
let err = KvSparseIndex::from_bytes(Bytes::from(tampered)).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("restart entry") && msg.contains("shared"),
"{msg}"
);
}
#[test]
fn from_bytes_rejects_inconsistent_num_restarts() {
let entries: Vec<(Vec<u8>, PageLocation)> = (0..8u64)
.map(|i| (format!("k{i:04}").into_bytes(), loc(i, 1, i)))
.collect();
let bytes = build(&entries, 4).unwrap(); let mut tampered = bytes.to_vec();
tampered[16..20].copy_from_slice(&99u32.to_le_bytes());
let err = KvSparseIndex::from_bytes(Bytes::from(tampered)).unwrap_err();
assert!(matches!(err, MeruError::Corruption(_)));
}
}