#![allow(
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::doc_markdown,
clippy::items_after_statements,
clippy::similar_names,
clippy::unreadable_literal
)]
use alloc::format;
use alloc::string::String;
use alloc::vec;
use alloc::vec::Vec;
use core::fmt;
use spg_crypto::crc32::crc32;
use crate::bloom::{BloomError, BloomFilter};
pub const SEGMENT_MAGIC: [u8; 8] = *b"SPGSEG\x01\x00";
pub const SEGMENT_MAGIC_V2: [u8; 8] = *b"SPGSEG\x02\x00";
pub const BRIN_SIDECAR_MAGIC: [u8; 4] = *b"BRIN";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrinSummary {
pub page_index: u32,
pub min_key: u64,
pub max_key: u64,
}
pub(crate) const SEGMENT_V2_HEADER_LEN: usize = 8 + 1 + 4;
pub const SEGMENT_COMPRESS_ALGO_NONE: u8 = 0;
pub const SEGMENT_COMPRESS_ALGO_LZSS: u8 = 1;
pub const SEGMENT_PAGE_BYTES: u32 = 4096;
const HEADER_FIXED_LEN: usize = 8 + 4 + 4 + 4 + 8 + 8 + 4;
const FOOTER_LEN: usize = 4;
#[derive(Debug)]
pub enum SegmentError {
TooShort {
got: usize,
need: usize,
},
BadMagic {
got: [u8; 8],
},
BadShape(String),
BadCrc {
expected: u32,
got: u32,
},
BloomError(BloomError),
UnsortedKey {
prev: u64,
next: u64,
},
KeyNotInPage {
key: u64,
},
PageOutOfRange {
got: u32,
num_pages: u32,
},
CompressionDecodeFailed(String),
UnknownCompressionAlgo(u8),
}
impl fmt::Display for SegmentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TooShort { got, need } => write!(
f,
"segment: too short, got {got} bytes, need at least {need}"
),
Self::BadMagic { got } => {
write!(f, "segment: bad magic {got:?}, expected {SEGMENT_MAGIC:?}")
}
Self::BadShape(s) => write!(f, "segment: bad shape: {s}"),
Self::BadCrc { expected, got } => write!(
f,
"segment: crc mismatch, expected 0x{expected:08x}, got 0x{got:08x}"
),
Self::BloomError(e) => write!(f, "segment: bloom decode failed: {e}"),
Self::UnsortedKey { prev, next } => write!(
f,
"segment: writer received unsorted keys (prev={prev}, next={next}); \
the segment contract requires ascending u64 keys"
),
Self::KeyNotInPage { key } => {
write!(f, "segment: key {key} not found in target page")
}
Self::PageOutOfRange { got, num_pages } => write!(
f,
"segment: page index {got} out of range, num_pages = {num_pages}"
),
Self::CompressionDecodeFailed(s) => write!(
f,
"segment v2 envelope: LZSS decompress failed: {s}"
),
Self::UnknownCompressionAlgo(b) => write!(
f,
"segment v2 envelope: unknown compression algo byte {b:#04x}"
),
}
}
}
impl From<BloomError> for SegmentError {
fn from(e: BloomError) -> Self {
Self::BloomError(e)
}
}
#[derive(Debug, Clone)]
pub struct SegmentMeta {
pub num_rows: u64,
pub num_pages: u32,
pub page_size_bytes: u32,
pub min_pk: u64,
pub max_pk: u64,
pub total_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct PageIndexEntry {
first_pk: u64,
file_offset: u32,
}
#[allow(clippy::too_many_lines)]
pub fn encode_segment<I>(
rows: I,
bloom_target_fp: f64,
page_size_bytes: u32,
) -> Result<(Vec<u8>, SegmentMeta), SegmentError>
where
I: ExactSizeIterator<Item = (u64, Vec<u8>)>,
{
if !(256..=65_536).contains(&page_size_bytes) {
return Err(SegmentError::BadShape(format!(
"page_size_bytes {page_size_bytes} must be in [256, 65536]"
)));
}
let num_rows_hint = rows.len();
if num_rows_hint == 0 {
return Err(SegmentError::BadShape(
"encode_segment: at least one row required".into(),
));
}
let mut bloom = BloomFilter::with_target_fp_rate(num_rows_hint, bloom_target_fp);
let mut pages: Vec<Vec<u8>> = Vec::new();
let mut page_index: Vec<PageIndexEntry> = Vec::new();
let mut row_bytes_in_page: Vec<Vec<u8>> = Vec::new();
let mut first_pk_in_page: Option<u64> = None;
let mut last_key: Option<u64> = None;
let mut min_pk: Option<u64> = None;
let mut max_pk: u64 = 0;
let mut total_rows: u64 = 0;
for (key, payload) in rows {
if let Some(prev) = last_key
&& key <= prev
{
return Err(SegmentError::UnsortedKey { prev, next: key });
}
last_key = Some(key);
if min_pk.is_none() {
min_pk = Some(key);
}
max_pk = key;
total_rows = total_rows.wrapping_add(1);
bloom.insert(&key.to_le_bytes());
let mut row_bytes = Vec::with_capacity(12 + payload.len());
row_bytes.extend_from_slice(&key.to_le_bytes());
let plen = u32::try_from(payload.len()).map_err(|_| {
SegmentError::BadShape(format!(
"row payload too large: {} bytes > u32::MAX",
payload.len()
))
})?;
row_bytes.extend_from_slice(&plen.to_le_bytes());
row_bytes.extend_from_slice(&payload);
let proposed_num_rows = row_bytes_in_page.len() + 1;
let proposed_offsets_bytes = proposed_num_rows * 4;
let proposed_rows_bytes: usize =
row_bytes_in_page.iter().map(Vec::len).sum::<usize>() + row_bytes.len();
let proposed_size = 4 + proposed_offsets_bytes + proposed_rows_bytes;
if proposed_size > page_size_bytes as usize {
if row_bytes_in_page.is_empty() {
return Err(SegmentError::BadShape(format!(
"row of {} bytes doesn't fit in page of {page_size_bytes} bytes",
row_bytes.len()
)));
}
let page_file_offset =
u32::try_from(pages.len() * page_size_bytes as usize).expect("page count fits u32");
page_index.push(PageIndexEntry {
first_pk: first_pk_in_page.expect("page is non-empty"),
file_offset: page_file_offset,
});
let finalised = serialise_page(&row_bytes_in_page, page_size_bytes as usize);
pages.push(finalised);
row_bytes_in_page.clear();
first_pk_in_page = None;
}
if first_pk_in_page.is_none() {
first_pk_in_page = Some(key);
}
row_bytes_in_page.push(row_bytes);
}
if !row_bytes_in_page.is_empty() {
let page_file_offset =
u32::try_from(pages.len() * page_size_bytes as usize).expect("page count fits u32");
page_index.push(PageIndexEntry {
first_pk: first_pk_in_page.expect("trailing page is non-empty"),
file_offset: page_file_offset,
});
let final_page = serialise_page(&row_bytes_in_page, page_size_bytes as usize);
pages.push(final_page);
}
let num_pages = u32::try_from(pages.len()).map_err(|_| {
SegmentError::BadShape(format!(
"segment has {} pages, exceeds u32::MAX",
pages.len()
))
})?;
let num_rows = total_rows;
let num_rows_u32 = u32::try_from(num_rows)
.map_err(|_| SegmentError::BadShape(format!("num_rows {num_rows} exceeds u32::MAX")))?;
let min_pk = min_pk.expect("non-empty rows");
let bloom_bytes = bloom.to_bytes();
let page_index_bytes = encode_page_index(&page_index);
let mut out = Vec::with_capacity(
HEADER_FIXED_LEN
+ 4
+ bloom_bytes.len()
+ 4
+ page_index_bytes.len()
+ pages.len() * page_size_bytes as usize
+ FOOTER_LEN,
);
out.extend_from_slice(&SEGMENT_MAGIC);
let body_start = out.len();
out.extend_from_slice(&num_rows_u32.to_le_bytes());
out.extend_from_slice(&num_pages.to_le_bytes());
out.extend_from_slice(&page_size_bytes.to_le_bytes());
out.extend_from_slice(&min_pk.to_le_bytes());
out.extend_from_slice(&max_pk.to_le_bytes());
out.extend_from_slice(
&u32::try_from(bloom_bytes.len())
.expect("bloom < 4 GiB")
.to_le_bytes(),
);
out.extend_from_slice(&bloom_bytes);
out.extend_from_slice(
&u32::try_from(page_index_bytes.len())
.expect("page index < 4 GiB")
.to_le_bytes(),
);
out.extend_from_slice(&page_index_bytes);
for page in &pages {
debug_assert_eq!(page.len(), page_size_bytes as usize, "page is fixed-size");
out.extend_from_slice(page);
}
let crc = crc32(&out[body_start..]);
out.extend_from_slice(&crc.to_le_bytes());
let meta = SegmentMeta {
num_rows,
num_pages,
page_size_bytes,
min_pk,
max_pk,
total_bytes: out.len(),
};
Ok((out, meta))
}
fn serialise_page(row_bytes: &[Vec<u8>], page_size_bytes: usize) -> Vec<u8> {
let num_rows = u32::try_from(row_bytes.len()).expect("row count fits u32");
let offsets_section_bytes = num_rows as usize * 4;
let header_total = 4 + offsets_section_bytes;
let mut page = Vec::with_capacity(page_size_bytes);
page.extend_from_slice(&num_rows.to_le_bytes());
page.resize(header_total, 0);
let mut offsets = Vec::with_capacity(row_bytes.len());
for row in row_bytes {
offsets.push(u32::try_from(page.len()).expect("page < 4 GiB"));
page.extend_from_slice(row);
}
for (i, off) in offsets.iter().enumerate() {
let pos = 4 + i * 4;
page[pos..pos + 4].copy_from_slice(&off.to_le_bytes());
}
debug_assert!(
page.len() <= page_size_bytes,
"page overflow: {} > {page_size_bytes}",
page.len()
);
page.resize(page_size_bytes, 0);
page
}
fn encode_page_index(index: &[PageIndexEntry]) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + index.len() * 12);
out.extend_from_slice(
&u32::try_from(index.len())
.expect("page count fits u32")
.to_le_bytes(),
);
for entry in index {
out.extend_from_slice(&entry.first_pk.to_le_bytes());
out.extend_from_slice(&entry.file_offset.to_le_bytes());
}
out
}
fn parse_page_index(input: &[u8]) -> Result<Vec<PageIndexEntry>, SegmentError> {
if input.len() < 4 {
return Err(SegmentError::BadShape(
"page index: too short for count prefix".into(),
));
}
let count = u32::from_le_bytes([input[0], input[1], input[2], input[3]]) as usize;
let expected = 4 + count * 12;
if input.len() != expected {
return Err(SegmentError::BadShape(format!(
"page index: input is {} bytes, expected {} for count {count}",
input.len(),
expected
)));
}
let mut out = Vec::with_capacity(count);
for i in 0..count {
let off = 4 + i * 12;
let first_pk = u64::from_le_bytes([
input[off],
input[off + 1],
input[off + 2],
input[off + 3],
input[off + 4],
input[off + 5],
input[off + 6],
input[off + 7],
]);
let file_offset = u32::from_le_bytes([
input[off + 8],
input[off + 9],
input[off + 10],
input[off + 11],
]);
out.push(PageIndexEntry {
first_pk,
file_offset,
});
}
Ok(out)
}
#[derive(Debug, Clone)]
struct SegmentMetadata {
meta: SegmentMeta,
bloom: BloomFilter,
page_index: Vec<PageIndexEntry>,
pages_start_offset: usize,
}
fn parse_segment_metadata(bytes: &[u8]) -> Result<SegmentMetadata, SegmentError> {
if bytes.len() < HEADER_FIXED_LEN + FOOTER_LEN {
return Err(SegmentError::TooShort {
got: bytes.len(),
need: HEADER_FIXED_LEN + FOOTER_LEN,
});
}
let mut magic = [0u8; 8];
magic.copy_from_slice(&bytes[..8]);
if magic != SEGMENT_MAGIC {
return Err(SegmentError::BadMagic { got: magic });
}
let num_rows = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
let num_pages = u32::from_le_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]);
let page_size_bytes = u32::from_le_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]);
let min_pk = u64::from_le_bytes([
bytes[20], bytes[21], bytes[22], bytes[23], bytes[24], bytes[25], bytes[26], bytes[27],
]);
let max_pk = u64::from_le_bytes([
bytes[28], bytes[29], bytes[30], bytes[31], bytes[32], bytes[33], bytes[34], bytes[35],
]);
let bloom_len = u32::from_le_bytes([bytes[36], bytes[37], bytes[38], bytes[39]]) as usize;
let bloom_offset = HEADER_FIXED_LEN;
if bytes.len() < bloom_offset + bloom_len + 4 {
return Err(SegmentError::TooShort {
got: bytes.len(),
need: bloom_offset + bloom_len + 4,
});
}
let bloom = BloomFilter::from_bytes(&bytes[bloom_offset..bloom_offset + bloom_len])?;
let page_index_len_off = bloom_offset + bloom_len;
let page_index_len = u32::from_le_bytes([
bytes[page_index_len_off],
bytes[page_index_len_off + 1],
bytes[page_index_len_off + 2],
bytes[page_index_len_off + 3],
]) as usize;
let page_index_off = page_index_len_off + 4;
if bytes.len() < page_index_off + page_index_len {
return Err(SegmentError::TooShort {
got: bytes.len(),
need: page_index_off + page_index_len,
});
}
let page_index = parse_page_index(&bytes[page_index_off..page_index_off + page_index_len])?;
let pages_start_offset = page_index_off + page_index_len;
let pages_total_bytes = num_pages as usize * page_size_bytes as usize;
let expected_total = pages_start_offset + pages_total_bytes + FOOTER_LEN;
if bytes.len() != expected_total {
return Err(SegmentError::BadShape(format!(
"segment: input is {} bytes, header implies {expected_total}",
bytes.len()
)));
}
let stored_crc_off = expected_total - FOOTER_LEN;
let stored_crc = u32::from_le_bytes([
bytes[stored_crc_off],
bytes[stored_crc_off + 1],
bytes[stored_crc_off + 2],
bytes[stored_crc_off + 3],
]);
let computed_crc = crc32(&bytes[8..stored_crc_off]);
if computed_crc != stored_crc {
return Err(SegmentError::BadCrc {
expected: stored_crc,
got: computed_crc,
});
}
let meta = SegmentMeta {
num_rows: u64::from(num_rows),
num_pages,
page_size_bytes,
min_pk,
max_pk,
total_bytes: bytes.len(),
};
Ok(SegmentMetadata {
meta,
bloom,
page_index,
pages_start_offset,
})
}
fn segment_might_contain(metadata: &SegmentMetadata, key: u64) -> bool {
if key < metadata.meta.min_pk || key > metadata.meta.max_pk {
return false;
}
metadata.bloom.contains(&key.to_le_bytes())
}
fn segment_lookup(metadata: &SegmentMetadata, bytes: &[u8], key: u64) -> Option<Vec<u8>> {
if !segment_might_contain(metadata, key) {
return None;
}
let candidate = match metadata
.page_index
.binary_search_by(|entry| entry.first_pk.cmp(&key))
{
Ok(i) => i,
Err(0) => return None,
Err(i) => i - 1,
};
let entry = metadata.page_index[candidate];
let page_off = metadata.pages_start_offset + entry.file_offset as usize;
let page_end = page_off + metadata.meta.page_size_bytes as usize;
if page_end > bytes.len() - FOOTER_LEN {
return None;
}
let page = &bytes[page_off..page_end];
decode_page_lookup(page, key)
}
fn segment_scan<'a>(
metadata: &'a SegmentMetadata,
bytes: &'a [u8],
) -> impl Iterator<Item = (u64, Vec<u8>)> + 'a {
let page_size = metadata.meta.page_size_bytes as usize;
(0..metadata.meta.num_pages as usize).flat_map(move |i| {
let off = metadata.pages_start_offset + i * page_size;
let page = &bytes[off..off + page_size];
decode_page_iter(page)
})
}
#[derive(Debug)]
pub struct SegmentReader<'a> {
bytes: &'a [u8],
metadata: SegmentMetadata,
}
pub fn derive_brin_summaries(v1_bytes: &[u8]) -> Result<Vec<BrinSummary>, SegmentError> {
let reader = SegmentReader::open(v1_bytes)?;
let num_pages = reader.meta().num_pages as usize;
if num_pages == 0 {
return Ok(Vec::new());
}
let page_starts: Vec<u64> = reader.metadata.page_index.iter().map(|e| e.first_pk).collect();
let mut min_by_page: Vec<Option<u64>> = alloc::vec![None; num_pages];
let mut max_by_page: Vec<Option<u64>> = alloc::vec![None; num_pages];
let mut current_page: usize = 0;
for (key, _) in reader.scan() {
while current_page + 1 < num_pages && key >= page_starts[current_page + 1] {
current_page += 1;
}
if min_by_page[current_page].is_none() {
min_by_page[current_page] = Some(key);
}
max_by_page[current_page] = Some(key);
}
let mut out = Vec::with_capacity(num_pages);
for p in 0..num_pages {
let (Some(min_key), Some(max_key)) = (min_by_page[p], max_by_page[p]) else {
continue;
};
out.push(BrinSummary {
page_index: u32::try_from(p).expect("page count fits u32"),
min_key,
max_key,
});
}
Ok(out)
}
#[must_use]
pub fn wrap_v2_envelope_with_brin(
v1_bytes: Vec<u8>,
summaries: &[BrinSummary],
compress: bool,
) -> Vec<u8> {
if summaries.is_empty() {
return wrap_v2_envelope(v1_bytes, compress);
}
let brin_section_len = 4 + summaries.len() * 20;
let mut inner = Vec::with_capacity(4 + 4 + brin_section_len + v1_bytes.len());
inner.extend_from_slice(&BRIN_SIDECAR_MAGIC);
let n = u32::try_from(summaries.len()).expect("BRIN summary count fits u32");
inner.extend_from_slice(&n.to_le_bytes());
for s in summaries {
inner.extend_from_slice(&s.page_index.to_le_bytes());
inner.extend_from_slice(&s.min_key.to_le_bytes());
inner.extend_from_slice(&s.max_key.to_le_bytes());
}
inner.extend_from_slice(&v1_bytes);
wrap_v2_envelope(inner, compress)
}
#[must_use]
pub fn wrap_v2_envelope(v1_bytes: Vec<u8>, compress: bool) -> Vec<u8> {
if !compress {
return v1_bytes;
}
let compressed = spg_crypto::lzss::compress(&v1_bytes);
if compressed.len() + SEGMENT_V2_HEADER_LEN >= v1_bytes.len() {
return v1_bytes;
}
let inner_len = u32::try_from(v1_bytes.len()).expect("v1 segment < 4 GiB");
let mut out = Vec::with_capacity(SEGMENT_V2_HEADER_LEN + compressed.len());
out.extend_from_slice(&SEGMENT_MAGIC_V2);
out.push(SEGMENT_COMPRESS_ALGO_LZSS);
out.extend_from_slice(&inner_len.to_le_bytes());
out.extend_from_slice(&compressed);
out
}
pub(crate) fn unwrap_v2_envelope(
bytes: Vec<u8>,
) -> Result<(Vec<u8>, Vec<BrinSummary>), SegmentError> {
if bytes.len() < 8 || bytes[..8] != SEGMENT_MAGIC_V2 {
return Ok((bytes, Vec::new()));
}
if bytes.len() < SEGMENT_V2_HEADER_LEN {
return Err(SegmentError::TooShort {
got: bytes.len(),
need: SEGMENT_V2_HEADER_LEN,
});
}
let algo = bytes[8];
let inner_len = u32::from_le_bytes([bytes[9], bytes[10], bytes[11], bytes[12]]) as usize;
let inner = &bytes[SEGMENT_V2_HEADER_LEN..];
let decoded = match algo {
SEGMENT_COMPRESS_ALGO_NONE => {
if inner.len() != inner_len {
return Err(SegmentError::BadShape(alloc::format!(
"v2 envelope algo=none: declared inner_len {inner_len} \
differs from body {}",
inner.len()
)));
}
inner.to_vec()
}
SEGMENT_COMPRESS_ALGO_LZSS => {
let decompressed = spg_crypto::lzss::decompress(inner)
.map_err(|e| SegmentError::CompressionDecodeFailed(alloc::format!("{e:?}")))?;
if decompressed.len() != inner_len {
return Err(SegmentError::BadShape(alloc::format!(
"v2 envelope LZSS: decompressed {} bytes, declared {inner_len}",
decompressed.len()
)));
}
decompressed
}
other => return Err(SegmentError::UnknownCompressionAlgo(other)),
};
if decoded.len() >= 4 && decoded[..4] == BRIN_SIDECAR_MAGIC {
return parse_brin_sidecar_then_v1(decoded);
}
Ok((decoded, Vec::new()))
}
fn parse_brin_sidecar_then_v1(
decoded: Vec<u8>,
) -> Result<(Vec<u8>, Vec<BrinSummary>), SegmentError> {
if decoded.len() < 8 {
return Err(SegmentError::BadShape(alloc::format!(
"BRIN sidecar: truncated header ({}B < 8)",
decoded.len()
)));
}
let n_summaries =
u32::from_le_bytes([decoded[4], decoded[5], decoded[6], decoded[7]]) as usize;
let summaries_end = 8 + n_summaries * 20;
if decoded.len() < summaries_end {
return Err(SegmentError::BadShape(alloc::format!(
"BRIN sidecar: truncated body (need {summaries_end}B, have {}B)",
decoded.len()
)));
}
let mut summaries = Vec::with_capacity(n_summaries);
for i in 0..n_summaries {
let off = 8 + i * 20;
let page_index =
u32::from_le_bytes([decoded[off], decoded[off + 1], decoded[off + 2], decoded[off + 3]]);
let mut k = [0u8; 8];
k.copy_from_slice(&decoded[off + 4..off + 12]);
let min_key = u64::from_le_bytes(k);
k.copy_from_slice(&decoded[off + 12..off + 20]);
let max_key = u64::from_le_bytes(k);
summaries.push(BrinSummary {
page_index,
min_key,
max_key,
});
}
let v1_bytes = decoded[summaries_end..].to_vec();
Ok((v1_bytes, summaries))
}
impl<'a> SegmentReader<'a> {
pub fn open(bytes: &'a [u8]) -> Result<Self, SegmentError> {
if bytes.len() >= 8 && bytes[..8] == SEGMENT_MAGIC_V2 {
return Err(SegmentError::BadShape(alloc::format!(
"v2 envelope: SegmentReader requires the caller to first \
unwrap to v1 bytes via OwnedSegment::from_bytes; the \
borrowed-slice reader does not allocate."
)));
}
let metadata = parse_segment_metadata(bytes)?;
Ok(Self { bytes, metadata })
}
#[must_use]
pub fn meta(&self) -> &SegmentMeta {
&self.metadata.meta
}
#[must_use]
pub fn might_contain(&self, key: u64) -> bool {
segment_might_contain(&self.metadata, key)
}
pub fn lookup(&self, key: u64) -> Option<Vec<u8>> {
segment_lookup(&self.metadata, self.bytes, key)
}
pub fn scan(&self) -> impl Iterator<Item = (u64, Vec<u8>)> + '_ {
segment_scan(&self.metadata, self.bytes)
}
}
#[derive(Debug, Clone)]
pub struct OwnedSegment {
bytes: Vec<u8>,
metadata: SegmentMetadata,
brin_summaries: Vec<BrinSummary>,
}
impl OwnedSegment {
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, SegmentError> {
let (bytes, brin_summaries) = unwrap_v2_envelope(bytes)?;
let metadata = parse_segment_metadata(&bytes)?;
Ok(Self {
bytes,
metadata,
brin_summaries,
})
}
#[must_use]
pub fn brin_summaries(&self) -> &[BrinSummary] {
&self.brin_summaries
}
#[must_use]
pub fn meta(&self) -> &SegmentMeta {
&self.metadata.meta
}
#[must_use]
pub fn might_contain(&self, key: u64) -> bool {
segment_might_contain(&self.metadata, key)
}
pub fn lookup(&self, key: u64) -> Option<Vec<u8>> {
segment_lookup(&self.metadata, &self.bytes, key)
}
pub fn scan(&self) -> impl Iterator<Item = (u64, Vec<u8>)> + '_ {
segment_scan(&self.metadata, &self.bytes)
}
#[must_use]
pub fn bytes(&self) -> &[u8] {
&self.bytes
}
}
fn decode_page_lookup(page: &[u8], key: u64) -> Option<Vec<u8>> {
if page.len() < 4 {
return None;
}
let num_rows = u32::from_le_bytes([page[0], page[1], page[2], page[3]]) as usize;
if num_rows == 0 {
return None;
}
let offsets_start = 4;
let offsets_end = offsets_start + num_rows * 4;
if page.len() < offsets_end {
return None;
}
let offsets: Vec<u32> = (0..num_rows)
.map(|i| {
let o = offsets_start + i * 4;
u32::from_le_bytes([page[o], page[o + 1], page[o + 2], page[o + 3]])
})
.collect();
let mut lo = 0usize;
let mut hi = num_rows;
while lo < hi {
let mid = usize::midpoint(lo, hi);
let row_off = offsets[mid] as usize;
if row_off + 8 > page.len() {
return None;
}
let row_key = u64::from_le_bytes([
page[row_off],
page[row_off + 1],
page[row_off + 2],
page[row_off + 3],
page[row_off + 4],
page[row_off + 5],
page[row_off + 6],
page[row_off + 7],
]);
match row_key.cmp(&key) {
core::cmp::Ordering::Less => lo = mid + 1,
core::cmp::Ordering::Greater => hi = mid,
core::cmp::Ordering::Equal => {
let plen_off = row_off + 8;
if plen_off + 4 > page.len() {
return None;
}
let plen = u32::from_le_bytes([
page[plen_off],
page[plen_off + 1],
page[plen_off + 2],
page[plen_off + 3],
]) as usize;
let payload_start = plen_off + 4;
let payload_end = payload_start + plen;
if payload_end > page.len() {
return None;
}
return Some(page[payload_start..payload_end].to_vec());
}
}
}
None
}
fn decode_page_iter(page: &[u8]) -> Vec<(u64, Vec<u8>)> {
if page.len() < 4 {
return vec![];
}
let num_rows = u32::from_le_bytes([page[0], page[1], page[2], page[3]]) as usize;
if num_rows == 0 {
return vec![];
}
let offsets_end = 4 + num_rows * 4;
if page.len() < offsets_end {
return vec![];
}
let offsets: Vec<u32> = (0..num_rows)
.map(|i| {
let o = 4 + i * 4;
u32::from_le_bytes([page[o], page[o + 1], page[o + 2], page[o + 3]])
})
.collect();
let mut out = Vec::with_capacity(num_rows);
for off in offsets {
let row_off = off as usize;
if row_off + 12 > page.len() {
break;
}
let key = u64::from_le_bytes([
page[row_off],
page[row_off + 1],
page[row_off + 2],
page[row_off + 3],
page[row_off + 4],
page[row_off + 5],
page[row_off + 6],
page[row_off + 7],
]);
let plen = u32::from_le_bytes([
page[row_off + 8],
page[row_off + 9],
page[row_off + 10],
page[row_off + 11],
]) as usize;
let payload_start = row_off + 12;
let payload_end = payload_start + plen;
if payload_end > page.len() {
break;
}
out.push((key, page[payload_start..payload_end].to_vec()));
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn build_rows(n: u64) -> Vec<(u64, Vec<u8>)> {
(0..n)
.map(|i| {
let payload = format!("row-{i}").into_bytes();
(i * 2 + 1, payload) })
.collect()
}
#[test]
fn brin_summaries_derive_matches_per_page_pk_ranges() {
let rows = build_rows(200);
let expected: Vec<u64> = rows.iter().map(|(k, _)| *k).collect();
let (v1_bytes, meta) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
let summaries = derive_brin_summaries(&v1_bytes).expect("derive");
assert_eq!(summaries.len(), meta.num_pages as usize);
for k in expected {
let hits = summaries
.iter()
.filter(|s| k >= s.min_key && k <= s.max_key)
.count();
assert!(hits >= 1, "key {k} not covered by any BRIN summary");
}
for w in summaries.windows(2) {
assert!(
w[0].max_key < w[1].min_key,
"summary ranges overlap: page {} max {} >= page {} min {}",
w[0].page_index,
w[0].max_key,
w[1].page_index,
w[1].min_key
);
}
}
#[test]
fn brin_sidecar_round_trips_through_v2_envelope() {
let rows = build_rows(150);
let (v1_bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
let summaries = derive_brin_summaries(&v1_bytes).expect("derive");
assert!(!summaries.is_empty());
let wrapped = wrap_v2_envelope_with_brin(v1_bytes, &summaries, true);
let seg = OwnedSegment::from_bytes(wrapped).expect("v2+brin parses");
assert!(seg.lookup(1).is_some(), "lookup hits a known key");
assert!(seg.lookup(299).is_some(), "lookup hits another known key");
let recovered = seg.brin_summaries();
assert_eq!(recovered.len(), summaries.len());
for (a, b) in summaries.iter().zip(recovered) {
assert_eq!(a, b);
}
}
#[test]
fn segment_without_brin_sidecar_returns_empty_summaries() {
let rows = build_rows(50);
let (v1_bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
let seg1 = OwnedSegment::from_bytes(v1_bytes.clone()).expect("v1 parses");
assert!(seg1.brin_summaries().is_empty());
let wrapped = wrap_v2_envelope(v1_bytes, true);
let seg2 = OwnedSegment::from_bytes(wrapped).expect("v2 parses");
assert!(seg2.brin_summaries().is_empty());
}
#[test]
fn v2_envelope_round_trips_byte_equal() {
let rows = build_rows(1000);
let (v1_bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
let wrapped = wrap_v2_envelope(v1_bytes.clone(), true);
assert!(
wrapped.len() < v1_bytes.len(),
"v2 envelope should be smaller: {} vs v1 {}",
wrapped.len(),
v1_bytes.len()
);
let seg = OwnedSegment::from_bytes(wrapped).expect("v2 unwrap + parse");
assert_eq!(seg.meta().num_rows, 1000);
assert!(seg.lookup(1).is_some());
assert!(seg.lookup(1999).is_some());
}
#[test]
fn v2_envelope_with_compress_false_is_v1_passthrough() {
let rows = build_rows(64);
let (v1_bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
let wrapped = wrap_v2_envelope(v1_bytes.clone(), false);
assert_eq!(wrapped, v1_bytes);
}
#[test]
fn legacy_v1_segments_still_load_via_from_bytes() {
let rows = build_rows(100);
let (v1_bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode");
assert_eq!(&v1_bytes[..8], &SEGMENT_MAGIC);
let seg = OwnedSegment::from_bytes(v1_bytes).expect("v1 still parses");
assert_eq!(seg.meta().num_rows, 100);
}
#[test]
fn v2_envelope_invalid_algo_byte_errors_loudly() {
let mut bogus = Vec::new();
bogus.extend_from_slice(&SEGMENT_MAGIC_V2);
bogus.push(0x42); bogus.extend_from_slice(&0u32.to_le_bytes());
let err = OwnedSegment::from_bytes(bogus).unwrap_err();
assert!(matches!(err, SegmentError::UnknownCompressionAlgo(0x42)));
}
#[test]
fn encode_then_open_roundtrips_meta() {
let rows = build_rows(1000);
let (bytes, meta) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode succeeds");
let reader = SegmentReader::open(&bytes).expect("open succeeds");
assert_eq!(reader.meta().num_rows, meta.num_rows);
assert_eq!(reader.meta().num_pages, meta.num_pages);
assert_eq!(reader.meta().min_pk, 1);
assert_eq!(reader.meta().max_pk, 1999);
assert_eq!(reader.meta().total_bytes, bytes.len());
}
#[test]
fn lookup_finds_every_inserted_key() {
let rows = build_rows(1000);
let expected: Vec<_> = rows.clone();
let (bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode succeeds");
let reader = SegmentReader::open(&bytes).expect("open succeeds");
for (key, payload) in expected {
assert_eq!(
reader.lookup(key),
Some(payload),
"lookup({key}) returned wrong payload"
);
}
}
#[test]
fn lookup_returns_none_for_unknown_key() {
let rows = build_rows(1000);
let (bytes, _) =
encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).expect("encode succeeds");
let reader = SegmentReader::open(&bytes).expect("open succeeds");
for k in (0..2000u64).step_by(2) {
assert!(reader.lookup(k).is_none(), "expected None for gap key {k}");
}
assert!(reader.lookup(99_999).is_none());
assert!(reader.lookup(0).is_none());
}
#[test]
fn might_contain_short_circuits_out_of_range() {
let rows = build_rows(100);
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let reader = SegmentReader::open(&bytes).unwrap();
assert!(!reader.might_contain(0));
assert!(!reader.might_contain(200));
assert!(reader.might_contain(1));
assert!(reader.might_contain(199));
}
#[test]
fn scan_yields_rows_in_key_order() {
let rows = build_rows(500);
let expected: Vec<_> = rows.clone();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let reader = SegmentReader::open(&bytes).unwrap();
let scanned: Vec<_> = reader.scan().collect();
assert_eq!(scanned.len(), 500);
for w in scanned.windows(2) {
assert!(
w[0].0 < w[1].0,
"scan out of order: {} >= {}",
w[0].0,
w[1].0
);
}
assert_eq!(scanned, expected);
}
#[test]
fn open_rejects_bad_magic() {
let rows = build_rows(10);
let (mut bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
bytes[0] ^= 0xff;
match SegmentReader::open(&bytes) {
Err(SegmentError::BadMagic { .. }) => {}
other => panic!("expected BadMagic, got {other:?}"),
}
}
#[test]
fn open_rejects_bad_crc() {
let rows = build_rows(10);
let (mut bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let off = bytes.len() / 2;
bytes[off] ^= 0x01;
match SegmentReader::open(&bytes) {
Err(SegmentError::BadCrc { .. }) => {}
other => panic!("expected BadCrc, got {other:?}"),
}
}
#[test]
fn encode_rejects_unsorted_keys() {
let rows = vec![(10u64, vec![1]), (5u64, vec![2])];
match encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES) {
Err(SegmentError::UnsortedKey { prev: 10, next: 5 }) => {}
other => panic!("expected UnsortedKey, got {other:?}"),
}
}
#[test]
fn encode_rejects_empty_input() {
let rows: Vec<(u64, Vec<u8>)> = vec![];
match encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES) {
Err(SegmentError::BadShape(_)) => {}
other => panic!("expected BadShape for empty input, got {other:?}"),
}
}
#[test]
fn encode_rejects_bad_page_size() {
let rows = build_rows(1);
match encode_segment(rows.clone().into_iter(), 0.01, 128) {
Err(SegmentError::BadShape(_)) => {}
other => panic!("expected BadShape for tiny page, got {other:?}"),
}
match encode_segment(rows.into_iter(), 0.01, 1_000_000) {
Err(SegmentError::BadShape(_)) => {}
other => panic!("expected BadShape for huge page, got {other:?}"),
}
}
#[test]
fn large_payload_spanning_one_page_each_is_rejected_if_too_big() {
let rows = vec![(1u64, vec![0u8; 8192])];
match encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES) {
Err(SegmentError::BadShape(_)) => {}
other => panic!("expected BadShape for too-large row, got {other:?}"),
}
}
#[test]
fn owned_segment_lookup_matches_reader_for_every_key() {
let rows = build_rows(500);
let expected: Vec<_> = rows.clone();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let bytes_len = bytes.len();
let (r_meta_num_rows, r_meta_min_pk, r_meta_max_pk, r_lookups, r_scan) = {
let reader = SegmentReader::open(&bytes).unwrap();
let lookups: Vec<_> = expected.iter().map(|(k, _)| reader.lookup(*k)).collect();
let scan: Vec<_> = reader.scan().collect();
(
reader.meta().num_rows,
reader.meta().min_pk,
reader.meta().max_pk,
lookups,
scan,
)
};
let owned = OwnedSegment::from_bytes(bytes).unwrap();
for ((key, expected_payload), reader_payload) in expected.iter().zip(r_lookups.iter()) {
assert_eq!(reader_payload.as_ref(), Some(expected_payload));
assert_eq!(owned.lookup(*key).as_ref(), Some(expected_payload));
}
assert_eq!(r_meta_num_rows, owned.meta().num_rows);
assert_eq!(r_meta_min_pk, owned.meta().min_pk);
assert_eq!(r_meta_max_pk, owned.meta().max_pk);
let o_scan: Vec<_> = owned.scan().collect();
assert_eq!(r_scan, o_scan);
assert_eq!(owned.bytes().len(), bytes_len);
}
#[test]
fn owned_segment_might_contain_matches_reader() {
let rows = build_rows(64);
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let probes = [0u64, 1, 50, 127, 128, 200];
let reader_results: Vec<bool> = {
let reader = SegmentReader::open(&bytes).unwrap();
probes.iter().map(|k| reader.might_contain(*k)).collect()
};
let owned = OwnedSegment::from_bytes(bytes).unwrap();
for (key, r_hit) in probes.iter().zip(reader_results.iter()) {
assert_eq!(*r_hit, owned.might_contain(*key));
}
}
#[test]
fn owned_segment_rejects_bad_bytes_at_construction() {
let rows = build_rows(8);
let (mut bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
bytes[0] ^= 0xff; match OwnedSegment::from_bytes(bytes) {
Err(SegmentError::BadMagic { .. }) => {}
other => panic!("expected BadMagic, got {other:?}"),
}
}
#[test]
fn owned_segment_lookup_returns_none_for_missing_key() {
let rows = build_rows(100); let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let owned = OwnedSegment::from_bytes(bytes).unwrap();
for key in [0u64, 2, 50, 198, 200, 9999] {
assert!(
owned.lookup(key).is_none(),
"expected None for non-inserted key {key}, got Some"
);
}
}
}