use crate::{Error, Result, Slice, ValueType, key::InternalKey, value::InternalValue};
use alloc::vec::Vec;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum TypeTag {
Fixed(u8),
Bytes,
}
impl TypeTag {
const fn to_wire(self) -> (u8, u8) {
match self {
Self::Fixed(width) => (0, width),
Self::Bytes => (1, 0),
}
}
fn from_wire(tag: u8, width: u8) -> Result<Self> {
match tag {
0 => {
if width == 0 {
return Err(Error::InvalidHeader("columnar: fixed column width is zero"));
}
Ok(Self::Fixed(width))
}
1 => {
if width != 0 {
return Err(Error::InvalidHeader(
"columnar: bytes column width must be zero",
));
}
Ok(Self::Bytes)
}
_ => Err(Error::InvalidTag(("ColumnTypeTag", tag))),
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum CodecId {
Plain,
Delta,
}
impl From<CodecId> for u8 {
fn from(c: CodecId) -> Self {
match c {
CodecId::Plain => 0,
CodecId::Delta => 1,
}
}
}
impl TryFrom<u8> for CodecId {
type Error = Error;
fn try_from(v: u8) -> Result<Self> {
match v {
0 => Ok(Self::Plain),
1 => Ok(Self::Delta),
_ => Err(Error::InvalidTag(("ColumnCodecId", v))),
}
}
}
const fn auto_codec(column_id: u16, type_tag: TypeTag) -> Option<CodecId> {
match (column_id, type_tag) {
(COL_SEQNO, TypeTag::Fixed(8)) => Some(CodecId::Delta),
_ => None,
}
}
fn read_u64_le(chunk: &[u8]) -> u64 {
let mut arr = [0u8; 8];
for (dst, &src) in arr.iter_mut().zip(chunk) {
*dst = src;
}
u64::from_le_bytes(arr)
}
fn delta_encode_u64(data: &[u8]) -> Vec<u8> {
debug_assert!(
data.len().is_multiple_of(8),
"delta_encode_u64: input length is not a multiple of 8"
);
let mut out = Vec::with_capacity(data.len());
let mut prev = 0u64;
for chunk in data.chunks_exact(8) {
let v = read_u64_le(chunk);
out.extend_from_slice(&v.wrapping_sub(prev).to_le_bytes());
prev = v;
}
out
}
fn delta_decode_u64(data: &[u8]) -> Result<Vec<u8>> {
if !data.len().is_multiple_of(8) {
return Err(Error::InvalidHeader(
"columnar: delta column length is not a multiple of 8",
));
}
let mut out = Vec::with_capacity(data.len());
let mut acc = 0u64;
for chunk in data.chunks_exact(8) {
acc = acc.wrapping_add(read_u64_le(chunk));
out.extend_from_slice(&acc.to_le_bytes());
}
Ok(out)
}
fn codec_encode(codec: CodecId, type_tag: TypeTag, data: &[u8]) -> Result<Vec<u8>> {
match codec {
CodecId::Plain => Ok(data.to_vec()),
CodecId::Delta if matches!(type_tag, TypeTag::Fixed(8)) => Ok(delta_encode_u64(data)),
CodecId::Delta => Err(Error::InvalidHeader(
"columnar: delta codec requires a fixed-8 column",
)),
}
}
fn codec_decode(codec: CodecId, type_tag: TypeTag, data: &[u8]) -> Result<Vec<u8>> {
match codec {
CodecId::Plain => Ok(data.to_vec()),
CodecId::Delta if matches!(type_tag, TypeTag::Fixed(8)) => delta_decode_u64(data),
CodecId::Delta => Err(Error::InvalidHeader(
"columnar: delta codec requires a fixed-8 column",
)),
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Column {
pub column_id: u16,
pub type_tag: TypeTag,
pub validity: Option<Vec<u8>>,
pub data: Vec<u8>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ColumnBatch {
pub row_count: u32,
pub columns: Vec<Column>,
}
const fn validity_len(row_count: u32) -> usize {
(row_count as usize).div_ceil(8)
}
fn check_validity(v: &[u8], row_count: u32) -> Result<()> {
if v.len() != validity_len(row_count) {
return Err(Error::InvalidHeader(
"columnar: validity bitmap length is not ceil(row_count / 8)",
));
}
let used = row_count % 8;
if used != 0 {
let last = v.last().copied().unwrap_or(0);
let valid_mask = (1u8 << used) - 1;
if last & !valid_mask != 0 {
return Err(Error::InvalidHeader(
"columnar: validity padding bits above row_count must be zero",
));
}
}
Ok(())
}
fn check_bytes_framing(data: &[u8], row_count: u32) -> Result<()> {
let off_count = (row_count as usize)
.checked_add(1)
.ok_or(Error::InvalidHeader(
"columnar: bytes offset count overflow",
))?;
let off_bytes = off_count.checked_mul(4).ok_or(Error::InvalidHeader(
"columnar: bytes offset table overflow",
))?;
let table = data.get(..off_bytes).ok_or(Error::InvalidHeader(
"columnar: bytes column shorter than its offset table",
))?;
let payload_len = data.len() - off_bytes;
let mut prev = 0usize;
for (i, chunk) in table.chunks_exact(4).enumerate() {
let off = u32::from_le_bytes(
chunk
.try_into()
.map_err(|_| Error::InvalidHeader("columnar: short bytes offset"))?,
) as usize;
if i == 0 && off != 0 {
return Err(Error::InvalidHeader(
"columnar: first bytes offset must be zero",
));
}
if off < prev {
return Err(Error::InvalidHeader(
"columnar: bytes offsets must be non-decreasing",
));
}
if off > payload_len {
return Err(Error::InvalidHeader(
"columnar: bytes offset past payload end",
));
}
prev = off;
}
if prev != payload_len {
return Err(Error::InvalidHeader(
"columnar: final bytes offset must equal the payload length",
));
}
Ok(())
}
impl Column {
fn validate(&self, row_count: u32) -> Result<()> {
match self.type_tag {
TypeTag::Fixed(0) => {
return Err(Error::InvalidHeader("columnar: fixed column width is zero"));
}
TypeTag::Fixed(w) => {
let expected =
(row_count as usize)
.checked_mul(w as usize)
.ok_or(Error::InvalidHeader(
"columnar: fixed column length overflow",
))?;
if self.data.len() != expected {
return Err(Error::InvalidHeader(
"columnar: fixed column byte length is not row_count * width",
));
}
}
TypeTag::Bytes => check_bytes_framing(&self.data, row_count)?,
}
if let Some(v) = &self.validity {
check_validity(v, row_count)?;
}
Ok(())
}
}
impl ColumnBatch {
pub(crate) fn first_user_key(&self) -> Result<Option<&[u8]>> {
if self.row_count == 0 {
return Ok(None);
}
let key_col = self
.columns
.first()
.filter(|c| c.column_id == COL_USER_KEY)
.ok_or(Error::InvalidHeader(
"columnar: first column is not the user-key column",
))?;
if key_col.type_tag != TypeTag::Bytes || key_col.validity.is_some() {
return Err(Error::InvalidHeader(
"columnar: first column is not the non-null user-key column",
));
}
key_col.validate(self.row_count)?;
bytes_column_row(&key_col.data, self.row_count, 0).map(Some)
}
pub(crate) fn last_user_key(&self) -> Result<Option<&[u8]>> {
let Some(last) = self.row_count.checked_sub(1) else {
return Ok(None);
};
let key_col = self
.columns
.first()
.filter(|c| c.column_id == COL_USER_KEY)
.ok_or(Error::InvalidHeader(
"columnar: first column is not the user-key column",
))?;
if key_col.type_tag != TypeTag::Bytes || key_col.validity.is_some() {
return Err(Error::InvalidHeader(
"columnar: first column is not the non-null user-key column",
));
}
key_col.validate(self.row_count)?;
bytes_column_row(&key_col.data, self.row_count, last).map(Some)
}
#[must_use]
pub(crate) fn same_layout(&self, other: &Self) -> bool {
self.columns.len() == other.columns.len()
&& self
.columns
.iter()
.zip(&other.columns)
.all(|(a, b)| a.column_id == b.column_id && a.type_tag == b.type_tag)
}
#[must_use]
pub(crate) fn data_size(&self) -> usize {
self.columns
.iter()
.map(|c| c.data.len() + c.validity.as_ref().map_or(0, Vec::len))
.sum()
}
pub(crate) fn append(&mut self, other: &Self) -> Result<()> {
if self.columns.len() != other.columns.len() {
return Err(Error::InvalidHeader(
"columnar: append column-count mismatch",
));
}
let old_rows = self.row_count;
let combined_rows = old_rows
.checked_add(other.row_count)
.ok_or(Error::InvalidHeader(
"columnar: appended row count exceeds u32",
))?;
for (a, b) in self.columns.iter().zip(&other.columns) {
if a.column_id != b.column_id || a.type_tag != b.type_tag {
return Err(Error::InvalidHeader(
"columnar: append column layout mismatch",
));
}
a.validate(old_rows)?;
b.validate(other.row_count)?;
}
let mut merged: Vec<(Vec<u8>, Option<Vec<u8>>)> = Vec::with_capacity(self.columns.len());
for (a, b) in self.columns.iter().zip(&other.columns) {
let new_validity = combine_validity(
a.validity.as_deref(),
old_rows,
b.validity.as_deref(),
other.row_count,
)?;
let new_data = match a.type_tag {
TypeTag::Fixed(_) => {
let mut data = Vec::with_capacity(a.data.len() + b.data.len());
data.extend_from_slice(&a.data);
data.extend_from_slice(&b.data);
data
}
TypeTag::Bytes => {
let mut cells: Vec<&[u8]> = Vec::with_capacity(combined_rows as usize);
for i in 0..old_rows {
cells.push(bytes_column_row(&a.data, old_rows, i)?);
}
for j in 0..other.row_count {
cells.push(bytes_column_row(&b.data, other.row_count, j)?);
}
encode_bytes_column(&cells)?
}
};
merged.push((new_data, new_validity));
}
for (col, (data, validity)) in self.columns.iter_mut().zip(merged) {
col.data = data;
col.validity = validity;
}
self.row_count = combined_rows;
Ok(())
}
#[expect(
clippy::cast_possible_truncation,
reason = "column count and per-column chunk length are bounded by the block size policy, far below u32::MAX"
)]
pub fn encode(&self, codec: CodecId) -> Result<Vec<u8>> {
let mut out = Vec::new();
out.extend_from_slice(&self.row_count.to_le_bytes());
out.extend_from_slice(&(self.columns.len() as u32).to_le_bytes());
for col in &self.columns {
col.validate(self.row_count)?;
let col_codec = auto_codec(col.column_id, col.type_tag).unwrap_or(codec);
let encoded = codec_encode(col_codec, col.type_tag, &col.data)?;
let (type_tag, width) = col.type_tag.to_wire();
out.extend_from_slice(&col.column_id.to_le_bytes());
out.push(type_tag);
out.push(width);
out.push(col_codec.into());
out.push(u8::from(col.validity.is_some()));
out.extend_from_slice(&(encoded.len() as u32).to_le_bytes());
if let Some(v) = &col.validity {
out.extend_from_slice(v);
}
out.extend_from_slice(&encoded);
}
Ok(out)
}
pub fn decode(bytes: &[u8]) -> Result<Self> {
Self::decode_inner(bytes, None)
}
pub fn decode_projected(bytes: &[u8], wanted: &[u16]) -> Result<Self> {
Self::decode_inner(bytes, Some(wanted))
}
fn decode_inner(bytes: &[u8], wanted: Option<&[u16]>) -> Result<Self> {
const MIN_COLUMN_BYTES: usize = 10;
let mut cur = Cursor::new(bytes);
let row_count = cur.read_u32()?;
let column_count = cur.read_u32()? as usize;
if column_count > cur.remaining() / MIN_COLUMN_BYTES {
return Err(Error::InvalidHeader(
"columnar: declared column count exceeds payload size",
));
}
let mut columns = Vec::with_capacity(column_count);
for _ in 0..column_count {
let column_id = cur.read_u16()?;
let type_tag = cur.read_u8()?;
let width = cur.read_u8()?;
let codec = CodecId::try_from(cur.read_u8()?)?;
let has_validity = match cur.read_u8()? {
0 => false,
1 => true,
_ => {
return Err(Error::InvalidHeader(
"columnar: validity flag must be 0 or 1",
));
}
};
let data_len = cur.read_u32()? as usize;
let type_tag = TypeTag::from_wire(type_tag, width)?;
let want = wanted.is_none_or(|w| w.contains(&column_id));
let validity = if has_validity {
let v = cur.read_bytes(validity_len(row_count))?;
if want { Some(v.to_vec()) } else { None }
} else {
None
};
let raw = cur.read_bytes(data_len)?;
if !want {
continue;
}
let data = codec_decode(codec, type_tag, raw)?;
let column = Column {
column_id,
type_tag,
validity,
data,
};
column.validate(row_count)?;
columns.push(column);
}
if !cur.is_empty() {
return Err(Error::InvalidHeader(
"columnar: trailing bytes after the last column",
));
}
Ok(Self { row_count, columns })
}
}
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> Cursor<'a> {
const fn new(buf: &'a [u8]) -> Self {
Self { buf, pos: 0 }
}
const fn remaining(&self) -> usize {
self.buf.len() - self.pos
}
const fn is_empty(&self) -> bool {
self.pos == self.buf.len()
}
fn read_bytes(&mut self, n: usize) -> Result<&'a [u8]> {
let end = self
.pos
.checked_add(n)
.ok_or(Error::InvalidHeader("columnar: read length overflow"))?;
let slice = self
.buf
.get(self.pos..end)
.ok_or(Error::InvalidHeader("columnar: truncated block payload"))?;
self.pos = end;
Ok(slice)
}
fn read_array<const N: usize>(&mut self) -> Result<[u8; N]> {
let arr: [u8; N] = self
.read_bytes(N)?
.try_into()
.map_err(|_| Error::InvalidHeader("columnar: short fixed-width read"))?;
Ok(arr)
}
fn read_u8(&mut self) -> Result<u8> {
let [b] = self.read_array::<1>()?;
Ok(b)
}
fn read_u16(&mut self) -> Result<u16> {
Ok(u16::from_le_bytes(self.read_array()?))
}
fn read_u32(&mut self) -> Result<u32> {
Ok(u32::from_le_bytes(self.read_array()?))
}
}
pub const COL_USER_KEY: u16 = 0;
pub const COL_SEQNO: u16 = 1;
pub const COL_VALUE_TYPE: u16 = 2;
pub const COL_VALUE: u16 = 3;
fn build_bytes_column<'a>(rows: impl Iterator<Item = &'a [u8]>) -> Result<Vec<u8>> {
let mut offsets = Vec::new();
let mut payload = Vec::new();
let mut off: u32 = 0;
offsets.extend_from_slice(&off.to_le_bytes());
for r in rows {
let len = u32::try_from(r.len())
.map_err(|_| Error::InvalidHeader("columnar: column value exceeds u32"))?;
off = off
.checked_add(len)
.ok_or(Error::InvalidHeader("columnar: column payload exceeds u32"))?;
payload.extend_from_slice(r);
offsets.extend_from_slice(&off.to_le_bytes());
}
offsets.extend_from_slice(&payload);
Ok(offsets)
}
fn bytes_column_row(data: &[u8], row_count: u32, i: u32) -> Result<&[u8]> {
let off_bytes = (row_count as usize + 1) * 4;
let read_off = |idx: u32| -> Result<usize> {
let base = idx as usize * 4;
let b = data
.get(base..base + 4)
.ok_or(Error::InvalidHeader("columnar: bytes offset truncated"))?;
let arr: [u8; 4] = b
.try_into()
.map_err(|_| Error::InvalidHeader("columnar: short bytes offset"))?;
Ok(u32::from_le_bytes(arr) as usize)
};
let start = read_off(i)?;
let end = read_off(i + 1)?;
let payload = data
.get(off_bytes..)
.ok_or(Error::InvalidHeader("columnar: bytes payload truncated"))?;
payload
.get(start..end)
.ok_or(Error::InvalidHeader("columnar: bytes row out of range"))
}
fn encode_bytes_column(cells: &[&[u8]]) -> Result<Vec<u8>> {
let mut offsets = Vec::with_capacity((cells.len() + 1) * 4);
let mut acc = 0u32;
offsets.extend_from_slice(&acc.to_le_bytes());
for c in cells {
let len = u32::try_from(c.len())
.map_err(|_| Error::InvalidHeader("columnar: bytes cell exceeds u32"))?;
acc = acc
.checked_add(len)
.ok_or(Error::InvalidHeader("columnar: bytes payload exceeds u32"))?;
offsets.extend_from_slice(&acc.to_le_bytes());
}
let mut out = offsets;
out.reserve(acc as usize);
for c in cells {
out.extend_from_slice(c);
}
Ok(out)
}
fn validity_bit(bitmap: &[u8], row: u32) -> bool {
bitmap
.get((row / 8) as usize)
.is_some_and(|b| (b >> (row % 8)) & 1 == 1)
}
fn combine_validity(
a: Option<&[u8]>,
a_rows: u32,
b: Option<&[u8]>,
b_rows: u32,
) -> Result<Option<Vec<u8>>> {
if a.is_none() && b.is_none() {
return Ok(None);
}
let total = a_rows.checked_add(b_rows).ok_or(Error::InvalidHeader(
"columnar: combined row count exceeds u32",
))?;
let mut out = alloc::vec![0u8; validity_len(total)];
for idx in 0..total {
let present = if idx < a_rows {
a.is_none_or(|v| validity_bit(v, idx))
} else {
b.is_none_or(|v| validity_bit(v, idx - a_rows))
};
if present && let Some(byte) = out.get_mut((idx / 8) as usize) {
*byte |= 1u8 << (idx % 8);
}
}
Ok(Some(out))
}
fn fixed_u64_row(data: &[u8], i: u32) -> Result<u64> {
let base = i as usize * 8;
let b = data
.get(base..base + 8)
.ok_or(Error::InvalidHeader("columnar: fixed8 row truncated"))?;
let arr: [u8; 8] = b
.try_into()
.map_err(|_| Error::InvalidHeader("columnar: short fixed8 row"))?;
Ok(u64::from_le_bytes(arr))
}
pub fn entries_to_column_batch(entries: &[InternalValue]) -> Result<ColumnBatch> {
let row_count = u32::try_from(entries.len())
.map_err(|_| Error::InvalidHeader("columnar: row count exceeds u32"))?;
let key_data = build_bytes_column(entries.iter().map(|e| e.key.user_key.as_ref()))?;
let value_data = build_bytes_column(entries.iter().map(|e| e.value.as_ref()))?;
let mut seqno_data = Vec::with_capacity(entries.len() * 8);
let mut vt_data = Vec::with_capacity(entries.len());
for e in entries {
seqno_data.extend_from_slice(&e.key.seqno.to_le_bytes());
vt_data.push(u8::from(e.key.value_type));
}
let columns = alloc::vec![
Column {
column_id: COL_USER_KEY,
type_tag: TypeTag::Bytes,
validity: None,
data: key_data,
},
Column {
column_id: COL_SEQNO,
type_tag: TypeTag::Fixed(8),
validity: None,
data: seqno_data,
},
Column {
column_id: COL_VALUE_TYPE,
type_tag: TypeTag::Fixed(1),
validity: None,
data: vt_data,
},
Column {
column_id: COL_VALUE,
type_tag: TypeTag::Bytes,
validity: None,
data: value_data,
},
];
Ok(ColumnBatch { row_count, columns })
}
fn fixed_column_row(data: &[u8], width: u8, row: u32) -> Result<&[u8]> {
let w = width as usize;
let start = (row as usize).checked_mul(w).ok_or(Error::InvalidHeader(
"columnar: fixed column offset overflow",
))?;
let end = start.checked_add(w).ok_or(Error::InvalidHeader(
"columnar: fixed column offset overflow",
))?;
data.get(start..end)
.ok_or(Error::InvalidHeader("columnar: fixed column row truncated"))
}
fn column_cell(col: &Column, row_count: u32, row: u32) -> Result<&[u8]> {
match col.type_tag {
TypeTag::Fixed(width) => fixed_column_row(&col.data, width, row),
TypeTag::Bytes => bytes_column_row(&col.data, row_count, row),
}
}
fn reconstruct_row_value(value_cols: &[Column], row_count: u32, row: u32) -> Result<Slice> {
if value_cols.iter().any(|c| c.validity.is_some()) {
let mut cells = Vec::with_capacity(value_cols.len());
for col in value_cols {
cells.push((col.type_tag, column_value_cell(col, row_count, row)?));
}
return Ok(Slice::from(frame_value_cells_nullable(&cells)?));
}
if let [single] = value_cols {
return Ok(Slice::from(column_cell(single, row_count, row)?));
}
let mut cells = Vec::with_capacity(value_cols.len());
for col in value_cols {
cells.push((col.type_tag, column_cell(col, row_count, row)?));
}
Ok(Slice::from(frame_value_cells(&cells)?))
}
fn validate_columnar_columns(
batch: &ColumnBatch,
) -> Result<(&Column, &Column, &Column, &[Column])> {
let [key_col, seqno_col, vt_col, value_cols @ ..] = batch.columns.as_slice() else {
return Err(Error::InvalidHeader(
"columnar: batch missing the intrinsic columns",
));
};
if value_cols.is_empty() {
return Err(Error::InvalidHeader(
"columnar: batch carries no value column",
));
}
if key_col.column_id != COL_USER_KEY
|| key_col.type_tag != TypeTag::Bytes
|| seqno_col.column_id != COL_SEQNO
|| seqno_col.type_tag != TypeTag::Fixed(8)
|| vt_col.column_id != COL_VALUE_TYPE
|| vt_col.type_tag != TypeTag::Fixed(1)
{
return Err(Error::InvalidHeader(
"columnar: unexpected intrinsic column layout",
));
}
for col in [key_col, seqno_col, vt_col] {
if col.validity.is_some() {
return Err(Error::InvalidHeader(
"columnar: intrinsic columns must not be nullable",
));
}
col.validate(batch.row_count)?;
}
let mut seen_value_column_ids = Vec::with_capacity(value_cols.len());
for col in value_cols {
if col.column_id < COL_VALUE || seen_value_column_ids.contains(&col.column_id) {
return Err(Error::InvalidHeader(
"columnar: value sub-column ids must be unique and must not overlap intrinsic columns",
));
}
seen_value_column_ids.push(col.column_id);
col.validate(batch.row_count)?;
}
Ok((key_col, seqno_col, vt_col, value_cols))
}
pub fn validate_columnar_ingest_batch(
batch: &ColumnBatch,
comparator: &crate::SharedComparator,
) -> Result<()> {
let (key_col, seqno_col, vt_col, _value_cols) = validate_columnar_columns(batch)?;
for &vt_byte in &vt_col.data {
ValueType::try_from(vt_byte).map_err(|()| Error::InvalidTag(("ValueType", vt_byte)))?;
}
for i in 0..batch.row_count {
if fixed_u64_row(&seqno_col.data, i)? != 0 {
return Err(Error::FeatureUnsupported(
"columnar batch ingest requires every row seqno to be 0 (the ingestion assigns the sequence number)",
));
}
}
let mut prev: Option<&[u8]> = None;
for i in 0..batch.row_count {
let key = bytes_column_row(&key_col.data, batch.row_count, i)?;
if key.is_empty() || key.len() > u16::MAX as usize {
return Err(Error::InvalidHeader(
"columnar: user key is empty or longer than u16::MAX",
));
}
if let Some(p) = prev
&& comparator.compare(p, key) != core::cmp::Ordering::Less
{
return Err(Error::InvalidHeader(
"columnar batch ingest requires strictly increasing keys",
));
}
prev = Some(key);
}
Ok(())
}
pub fn column_batch_to_entries(batch: &ColumnBatch) -> Result<Vec<InternalValue>> {
let (key_col, seqno_col, vt_col, value_cols) = validate_columnar_columns(batch)?;
let mut out = Vec::with_capacity(batch.row_count as usize);
for i in 0..batch.row_count {
let user_key = bytes_column_row(&key_col.data, batch.row_count, i)?;
if user_key.is_empty() || user_key.len() > u16::MAX as usize {
return Err(Error::InvalidHeader(
"columnar: user key is empty or longer than u16::MAX",
));
}
let seqno = fixed_u64_row(&seqno_col.data, i)?;
let vt_byte = vt_col
.data
.get(i as usize)
.copied()
.ok_or(Error::InvalidHeader("columnar: value-type row truncated"))?;
let value_type =
ValueType::try_from(vt_byte).map_err(|()| Error::InvalidTag(("ValueType", vt_byte)))?;
let value = reconstruct_row_value(value_cols, batch.row_count, i)?;
out.push(InternalValue {
key: InternalKey {
user_key: Slice::from(user_key),
seqno,
value_type,
},
value,
});
}
Ok(out)
}
pub fn column_batch_into_entries(batch: ColumnBatch) -> Result<Vec<InternalValue>> {
validate_columnar_columns(&batch)?;
let row_count = batch.row_count;
let mut cols = batch.columns.into_iter();
let key_col = cols
.next()
.ok_or(Error::InvalidHeader("columnar: missing key column"))?;
let seqno_col = cols
.next()
.ok_or(Error::InvalidHeader("columnar: missing seqno column"))?;
let vt_col = cols
.next()
.ok_or(Error::InvalidHeader("columnar: missing value-type column"))?;
let value_cols: Vec<Column> = cols.collect();
let key_data = Slice::from(key_col.data);
let single_bytes_value = matches!(
value_cols.as_slice(),
[c] if c.type_tag == TypeTag::Bytes && c.validity.is_none()
);
let value_source = if single_bytes_value {
let single = value_cols
.into_iter()
.next()
.ok_or(Error::InvalidHeader("columnar: value column vanished"))?;
ValueSource::SharedBytes(Slice::from(single.data))
} else {
ValueSource::Reconstruct(value_cols)
};
let mut out = Vec::with_capacity(row_count as usize);
for i in 0..row_count {
let user_key = bytes_row_slice(&key_data, row_count, i)?;
if user_key.is_empty() || user_key.len() > u16::MAX as usize {
return Err(Error::InvalidHeader(
"columnar: user key is empty or longer than u16::MAX",
));
}
let seqno = fixed_u64_row(&seqno_col.data, i)?;
let vt_byte = vt_col
.data
.get(i as usize)
.copied()
.ok_or(Error::InvalidHeader("columnar: value-type row truncated"))?;
let value_type =
ValueType::try_from(vt_byte).map_err(|()| Error::InvalidTag(("ValueType", vt_byte)))?;
let value = match &value_source {
ValueSource::SharedBytes(data) => bytes_row_slice(data, row_count, i)?,
ValueSource::Reconstruct(cols) => reconstruct_row_value(cols, row_count, i)?,
};
out.push(InternalValue {
key: InternalKey {
user_key,
seqno,
value_type,
},
value,
});
}
Ok(out)
}
enum ValueSource {
SharedBytes(Slice),
Reconstruct(Vec<Column>),
}
fn bytes_row_slice(data: &Slice, row_count: u32, i: u32) -> Result<Slice> {
let bytes: &[u8] = data.as_ref();
let off_bytes = (row_count as usize + 1) * 4;
let read_off = |idx: u32| -> Result<usize> {
let base = idx as usize * 4;
let b = bytes
.get(base..base + 4)
.ok_or(Error::InvalidHeader("columnar: bytes offset truncated"))?;
let arr: [u8; 4] = b
.try_into()
.map_err(|_| Error::InvalidHeader("columnar: short bytes offset"))?;
Ok(u32::from_le_bytes(arr) as usize)
};
let start = read_off(i)?;
let end = read_off(i + 1)?;
let payload_start = off_bytes.checked_add(start).ok_or(Error::InvalidHeader(
"columnar: bytes payload offset overflow",
))?;
let payload_end = off_bytes.checked_add(end).ok_or(Error::InvalidHeader(
"columnar: bytes payload offset overflow",
))?;
if start > end || payload_end > bytes.len() {
return Err(Error::InvalidHeader("columnar: bytes row out of range"));
}
Ok(data.slice(payload_start..payload_end))
}
pub fn column_batch_match_entries(
batch: &ColumnBatch,
needle: &[u8],
comparator: &crate::comparator::SharedComparator,
deletes: Option<(&crate::table::delete_bitmap::DeleteBitmap, u32)>,
) -> Result<Vec<InternalValue>> {
let (key_col, seqno_col, vt_col, value_cols) = validate_columnar_columns(batch)?;
let row_count = batch.row_count;
if row_count == 0 {
return Err(Error::InvalidHeader(
"columnar: empty reconstructed data block",
));
}
let mut lo = 0u32;
let mut hi = row_count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let k = bytes_column_row(&key_col.data, row_count, mid)?;
if comparator.compare(k, needle) == core::cmp::Ordering::Less {
lo = mid + 1;
} else {
hi = mid;
}
}
let mut out = Vec::new();
let mut row = lo;
while row < row_count {
let k = bytes_column_row(&key_col.data, row_count, row)?;
if comparator.compare(k, needle) != core::cmp::Ordering::Equal {
break;
}
let masked = if let Some((bitmap, start)) = deletes {
let pos = start.checked_add(row).ok_or(Error::InvalidHeader(
"columnar: row position exceeds u32::MAX",
))?;
bitmap.contains(pos)
} else {
false
};
if !masked {
if k.is_empty() || k.len() > u16::MAX as usize {
return Err(Error::InvalidHeader(
"columnar: user key is empty or longer than u16::MAX",
));
}
let seqno = fixed_u64_row(&seqno_col.data, row)?;
let vt_byte = vt_col
.data
.get(row as usize)
.copied()
.ok_or(Error::InvalidHeader("columnar: value-type row truncated"))?;
let value_type = ValueType::try_from(vt_byte)
.map_err(|()| Error::InvalidTag(("ValueType", vt_byte)))?;
let value = reconstruct_row_value(value_cols, row_count, row)?;
out.push(InternalValue {
key: InternalKey {
user_key: Slice::from(k),
seqno,
value_type,
},
value,
});
}
row += 1;
}
Ok(out)
}
pub fn frame_value_cells(cells: &[(TypeTag, &[u8])]) -> Result<Vec<u8>> {
let mut out = Vec::new();
for (tag, cell) in cells {
match tag {
TypeTag::Fixed(width) => {
if cell.len() != usize::from(*width) {
return Err(Error::InvalidHeader(
"columnar: fixed value sub-cell length does not match its type tag",
));
}
out.extend_from_slice(cell);
}
TypeTag::Bytes => {
let len = u32::try_from(cell.len()).map_err(|_| {
Error::InvalidHeader("columnar: framed value sub-cell exceeds u32")
})?;
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(cell);
}
}
}
Ok(out)
}
pub fn unframe_value_cells<'a>(blob: &'a [u8], type_tags: &[TypeTag]) -> Result<Vec<&'a [u8]>> {
let mut out = Vec::with_capacity(type_tags.len());
let mut pos = 0usize;
for tag in type_tags {
match tag {
TypeTag::Fixed(width) => {
let end = pos
.checked_add(*width as usize)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let cell = blob.get(pos..end).ok_or(Error::InvalidHeader(
"columnar: framed value truncated (fixed)",
))?;
out.push(cell);
pos = end;
}
TypeTag::Bytes => {
let len_end = pos
.checked_add(4)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let len_bytes = blob.get(pos..len_end).ok_or(Error::InvalidHeader(
"columnar: framed value truncated (length)",
))?;
let len = u32::from_le_bytes(
<[u8; 4]>::try_from(len_bytes)
.map_err(|_| Error::InvalidHeader("columnar: framed value length"))?,
) as usize;
let end = len_end
.checked_add(len)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let cell = blob.get(len_end..end).ok_or(Error::InvalidHeader(
"columnar: framed value truncated (bytes)",
))?;
out.push(cell);
pos = end;
}
}
}
if pos != blob.len() {
return Err(Error::InvalidHeader(
"columnar: framed value has trailing bytes",
));
}
Ok(out)
}
pub fn frame_value_cells_nullable(cells: &[(TypeTag, Option<&[u8]>)]) -> Result<Vec<u8>> {
let bitmap_len = cells.len().div_ceil(8);
let mut out = alloc::vec![0u8; bitmap_len];
for (i, (tag, cell)) in cells.iter().enumerate() {
let Some(c) = cell else {
continue; };
let byte = out.get_mut(i / 8).ok_or(Error::InvalidHeader(
"columnar: presence bitmap index out of range",
))?;
*byte |= 1u8 << (i % 8);
match tag {
TypeTag::Fixed(width) => {
if c.len() != usize::from(*width) {
return Err(Error::InvalidHeader(
"columnar: fixed value sub-cell length does not match its type tag",
));
}
out.extend_from_slice(c);
}
TypeTag::Bytes => {
let len = u32::try_from(c.len()).map_err(|_| {
Error::InvalidHeader("columnar: framed value sub-cell exceeds u32")
})?;
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(c);
}
}
}
Ok(out)
}
pub fn unframe_value_cells_nullable<'a>(
blob: &'a [u8],
type_tags: &[TypeTag],
) -> Result<Vec<Option<&'a [u8]>>> {
let bitmap_len = type_tags.len().div_ceil(8);
let bitmap = blob.get(0..bitmap_len).ok_or(Error::InvalidHeader(
"columnar: nullable framed value truncated (presence bitmap)",
))?;
let mut pos = bitmap_len;
let mut out = Vec::with_capacity(type_tags.len());
for (i, tag) in type_tags.iter().enumerate() {
let present = bitmap.get(i / 8).is_some_and(|b| (b >> (i % 8)) & 1 == 1);
if !present {
out.push(None);
continue;
}
match tag {
TypeTag::Fixed(width) => {
let end = pos
.checked_add(*width as usize)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let cell = blob.get(pos..end).ok_or(Error::InvalidHeader(
"columnar: nullable framed value truncated (fixed)",
))?;
out.push(Some(cell));
pos = end;
}
TypeTag::Bytes => {
let len_end = pos
.checked_add(4)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let len_bytes = blob.get(pos..len_end).ok_or(Error::InvalidHeader(
"columnar: nullable framed value truncated (length)",
))?;
let len = u32::from_le_bytes(
<[u8; 4]>::try_from(len_bytes)
.map_err(|_| Error::InvalidHeader("columnar: framed value length"))?,
) as usize;
let end = len_end
.checked_add(len)
.ok_or(Error::InvalidHeader("columnar: framed value overflow"))?;
let cell = blob.get(len_end..end).ok_or(Error::InvalidHeader(
"columnar: nullable framed value truncated (bytes)",
))?;
out.push(Some(cell));
pos = end;
}
}
}
if pos != blob.len() {
return Err(Error::InvalidHeader(
"columnar: nullable framed value has trailing bytes",
));
}
Ok(out)
}
pub fn unframe_value_cells_with_defaults<'a>(
blob: &'a [u8],
columns: &[(TypeTag, &'a [u8])],
) -> Result<Vec<&'a [u8]>> {
let tags: Vec<TypeTag> = columns.iter().map(|(t, _)| *t).collect();
let cells = unframe_value_cells_nullable(blob, &tags)?;
Ok(cells
.into_iter()
.zip(columns)
.map(|(cell, (_, default))| cell.unwrap_or(default))
.collect())
}
fn column_value_cell(col: &Column, row_count: u32, row: u32) -> Result<Option<&[u8]>> {
if let Some(validity) = &col.validity {
let byte = *validity
.get((row / 8) as usize)
.ok_or(Error::InvalidHeader(
"columnar: validity bitmap shorter than row count",
))?;
if (byte >> (row % 8)) & 1 == 0 {
return Ok(None); }
}
Ok(Some(column_cell(col, row_count, row)?))
}
#[expect(clippy::expect_used, clippy::indexing_slicing, reason = "test code")]
#[cfg(test)]
mod tests;