#![cfg_attr(
not(test),
expect(
dead_code,
reason = "cursor API consumed by the columnar block-skip scan"
)
)]
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use super::BlockOffset;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnStats {
pub column_id: u32,
pub type_tag: u8,
pub codec_id: u8,
pub null_count: u32,
pub row_count: u32,
pub min: Vec<u8>,
pub max: Vec<u8>,
}
pub fn encode_zone_map(
out: &mut Vec<u8>,
blocks: &[(BlockOffset, Vec<ColumnStats>)],
) -> crate::Result<()> {
const ERR: crate::Error = crate::Error::InvalidHeader("ZoneMap");
let count = u32::try_from(blocks.len()).map_err(|_| ERR)?;
out.extend_from_slice(&count.to_le_bytes());
for (offset, columns) in blocks {
let n_columns = u16::try_from(columns.len()).map_err(|_| ERR)?;
out.extend_from_slice(&offset.0.to_le_bytes());
out.extend_from_slice(&n_columns.to_le_bytes());
for c in columns {
let min_len = u32::try_from(c.min.len()).map_err(|_| ERR)?;
let max_len = u32::try_from(c.max.len()).map_err(|_| ERR)?;
out.extend_from_slice(&c.column_id.to_le_bytes());
out.push(c.type_tag);
out.push(c.codec_id);
out.extend_from_slice(&c.null_count.to_le_bytes());
out.extend_from_slice(&c.row_count.to_le_bytes());
out.extend_from_slice(&min_len.to_le_bytes());
out.extend_from_slice(&c.min);
out.extend_from_slice(&max_len.to_le_bytes());
out.extend_from_slice(&c.max);
}
}
Ok(())
}
#[derive(Debug, Default, Clone)]
pub struct ZoneMap {
entries: Vec<(u64, Vec<ColumnStats>)>,
}
const MIN_ENTRY_SIZE: usize = 8 + 2;
impl ZoneMap {
pub fn decode(bytes: &[u8]) -> crate::Result<Self> {
const ERR: crate::Error = crate::Error::InvalidHeader("ZoneMap");
fn take<'a>(r: &mut &'a [u8], n: usize) -> Option<&'a [u8]> {
if r.len() < n {
return None;
}
let (head, tail) = r.split_at(n);
*r = tail;
Some(head)
}
fn read_u8(r: &mut &[u8]) -> Option<u8> {
take(r, 1)?.first().copied()
}
fn read_u16(r: &mut &[u8]) -> Option<u16> {
let b: [u8; 2] = take(r, 2)?.try_into().ok()?;
Some(u16::from_le_bytes(b))
}
fn read_u32(r: &mut &[u8]) -> Option<u32> {
let b: [u8; 4] = take(r, 4)?.try_into().ok()?;
Some(u32::from_le_bytes(b))
}
fn read_u64(r: &mut &[u8]) -> Option<u64> {
let b: [u8; 8] = take(r, 8)?.try_into().ok()?;
Some(u64::from_le_bytes(b))
}
let mut r = bytes;
let count = read_u32(&mut r).ok_or(ERR)?;
match (count as usize).checked_mul(MIN_ENTRY_SIZE) {
Some(needed) if needed <= r.len() => {}
_ => return Err(ERR),
}
let mut entries: Vec<(u64, Vec<ColumnStats>)> = Vec::with_capacity(count as usize);
let mut prev: Option<u64> = None;
for _ in 0..count {
let offset = read_u64(&mut r).ok_or(ERR)?;
if prev.is_some_and(|p| offset <= p) {
return Err(ERR);
}
prev = Some(offset);
let n_columns = read_u16(&mut r).ok_or(ERR)?;
let mut columns = Vec::with_capacity(n_columns as usize);
for _ in 0..n_columns {
let column_id = read_u32(&mut r).ok_or(ERR)?;
let type_tag = read_u8(&mut r).ok_or(ERR)?;
let codec_id = read_u8(&mut r).ok_or(ERR)?;
let null_count = read_u32(&mut r).ok_or(ERR)?;
let row_count = read_u32(&mut r).ok_or(ERR)?;
let min_len = read_u32(&mut r).ok_or(ERR)? as usize;
let min = take(&mut r, min_len).ok_or(ERR)?.to_vec();
let max_len = read_u32(&mut r).ok_or(ERR)? as usize;
let max = take(&mut r, max_len).ok_or(ERR)?.to_vec();
columns.push(ColumnStats {
column_id,
type_tag,
codec_id,
null_count,
row_count,
min,
max,
});
}
entries.push((offset, columns));
}
if !r.is_empty() {
return Err(ERR);
}
Ok(Self { entries })
}
#[must_use]
pub fn columns_for(&self, offset: u64) -> Option<&[ColumnStats]> {
let idx = self
.entries
.binary_search_by_key(&offset, |(o, _)| *o)
.ok()?;
self.entries.get(idx).map(|(_, c)| c.as_slice())
}
#[must_use]
pub fn cursor(&self) -> ZoneMapCursor<'_> {
ZoneMapCursor {
entries: &self.entries,
pos: 0,
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
}
pub struct ZoneMapCursor<'a> {
entries: &'a [(u64, Vec<ColumnStats>)],
pos: usize,
}
impl<'a> ZoneMapCursor<'a> {
pub fn columns_for(&mut self, offset: u64) -> Option<&'a [ColumnStats]> {
let at_or_ahead = self
.entries
.get(self.pos)
.is_some_and(|(o, _)| *o <= offset);
if at_or_ahead {
while self.entries.get(self.pos).is_some_and(|(o, _)| *o < offset) {
self.pos += 1;
}
} else {
match self.entries.binary_search_by_key(&offset, |(o, _)| *o) {
Ok(i) => self.pos = i,
Err(i) => {
self.pos = i;
return None;
}
}
}
self.entries
.get(self.pos)
.filter(|(o, _)| *o == offset)
.map(|(_, c)| c.as_slice())
}
}
#[cfg(test)]
#[expect(clippy::expect_used, reason = "test code")]
mod tests;