use std::sync::Arc;
use arcstr::ArcStr;
use bytes::{Bytes, BytesMut};
use grafeo_common::types::Value;
use crate::codec::{BitPackedInts, BitVector, BlockEntry, DictionaryEncoding};
#[inline]
fn read_le_i64(bytes: &Bytes, byte_idx: usize) -> Option<i64> {
let end = byte_idx.checked_add(8)?;
let chunk: [u8; 8] = bytes.get(byte_idx..end)?.try_into().ok()?;
Some(i64::from_le_bytes(chunk))
}
#[inline]
fn read_le_f64(bytes: &Bytes, byte_idx: usize) -> Option<f64> {
let end = byte_idx.checked_add(8)?;
let chunk: [u8; 8] = bytes.get(byte_idx..end)?.try_into().ok()?;
Some(f64::from_le_bytes(chunk))
}
#[inline]
fn read_le_f32(bytes: &Bytes, byte_idx: usize) -> Option<f32> {
let end = byte_idx.checked_add(4)?;
let chunk: [u8; 4] = bytes.get(byte_idx..end)?.try_into().ok()?;
Some(f32::from_le_bytes(chunk))
}
#[inline]
fn read_i8(bytes: &Bytes, byte_idx: usize) -> Option<i8> {
bytes.get(byte_idx).copied().map(u8::cast_signed)
}
fn vec_to_bytes_i64(values: &[i64]) -> Bytes {
let mut buf = BytesMut::with_capacity(values.len() * 8);
for &v in values {
buf.extend_from_slice(&v.to_le_bytes());
}
buf.freeze()
}
fn vec_to_bytes_f64(values: &[f64]) -> Bytes {
let mut buf = BytesMut::with_capacity(values.len() * 8);
for &v in values {
buf.extend_from_slice(&v.to_le_bytes());
}
buf.freeze()
}
fn vec_to_bytes_f32(values: &[f32]) -> Bytes {
let mut buf = BytesMut::with_capacity(values.len() * 4);
for &v in values {
buf.extend_from_slice(&v.to_le_bytes());
}
buf.freeze()
}
fn vec_to_bytes_i8(values: &[i8]) -> Bytes {
let mut buf = BytesMut::with_capacity(values.len());
for &v in values {
buf.extend_from_slice(&[v.cast_unsigned()]);
}
buf.freeze()
}
#[derive(Debug, Clone)]
pub enum I64Store {
Inline(Vec<i64>),
Mapped(Bytes),
}
impl I64Store {
#[inline]
#[must_use]
pub fn len_elements(&self) -> usize {
match self {
Self::Inline(v) => v.len(),
Self::Mapped(b) => b.len() / 8,
}
}
#[inline]
#[must_use]
pub fn as_slice(&self) -> Option<&[i64]> {
match self {
Self::Inline(v) => Some(v.as_slice()),
Self::Mapped(_) => None,
}
}
#[inline]
#[must_use]
pub fn get(&self, idx: usize) -> Option<i64> {
match self {
Self::Inline(v) => v.get(idx).copied(),
Self::Mapped(b) => read_le_i64(b, idx.checked_mul(8)?),
}
}
#[must_use]
pub fn to_bytes(&self) -> Bytes {
match self {
Self::Inline(v) => vec_to_bytes_i64(v),
Self::Mapped(b) => b.clone(),
}
}
#[must_use]
pub fn byte_len(&self) -> usize {
match self {
Self::Inline(v) => v.len() * 8,
Self::Mapped(b) => b.len(),
}
}
}
#[derive(Debug, Clone)]
pub enum F64Store {
Inline(Vec<f64>),
Mapped(Bytes),
}
impl F64Store {
#[inline]
#[must_use]
pub fn len_elements(&self) -> usize {
match self {
Self::Inline(v) => v.len(),
Self::Mapped(b) => b.len() / 8,
}
}
#[inline]
#[must_use]
pub fn as_slice(&self) -> Option<&[f64]> {
match self {
Self::Inline(v) => Some(v.as_slice()),
Self::Mapped(_) => None,
}
}
#[inline]
#[must_use]
pub fn get(&self, idx: usize) -> Option<f64> {
match self {
Self::Inline(v) => v.get(idx).copied(),
Self::Mapped(b) => read_le_f64(b, idx.checked_mul(8)?),
}
}
#[must_use]
pub fn to_bytes(&self) -> Bytes {
match self {
Self::Inline(v) => vec_to_bytes_f64(v),
Self::Mapped(b) => b.clone(),
}
}
#[must_use]
pub fn byte_len(&self) -> usize {
match self {
Self::Inline(v) => v.len() * 8,
Self::Mapped(b) => b.len(),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ColumnCodec {
BitPacked(BitPackedInts),
Dict(DictionaryEncoding),
Bitmap(BitVector),
Int8Vector {
bytes: Bytes,
dimensions: u16,
},
Float64(F64Store),
Float32Vector {
bytes: Bytes,
dimensions: u16,
},
RawI64(I64Store),
}
impl ColumnCodec {
#[must_use]
pub fn raw_i64(values: Vec<i64>) -> Self {
Self::RawI64(I64Store::Inline(values))
}
#[must_use]
pub fn raw_i64_from_bytes(bytes: Bytes) -> Self {
Self::RawI64(I64Store::Mapped(bytes))
}
#[must_use]
pub fn float64(values: Vec<f64>) -> Self {
Self::Float64(F64Store::Inline(values))
}
#[must_use]
pub fn float64_from_bytes(bytes: Bytes) -> Self {
Self::Float64(F64Store::Mapped(bytes))
}
#[must_use]
#[inline]
pub fn as_raw_i64_slice(&self) -> Option<&[i64]> {
match self {
Self::RawI64(s) => s.as_slice(),
_ => None,
}
}
#[must_use]
#[inline]
pub fn as_float64_slice(&self) -> Option<&[f64]> {
match self {
Self::Float64(s) => s.as_slice(),
_ => None,
}
}
#[must_use]
pub fn int8_vector(data: Vec<i8>, dimensions: u16) -> Self {
Self::Int8Vector {
bytes: vec_to_bytes_i8(&data),
dimensions,
}
}
#[must_use]
pub fn float32_vector(data: Vec<f32>, dimensions: u16) -> Self {
Self::Float32Vector {
bytes: vec_to_bytes_f32(&data),
dimensions,
}
}
#[inline]
#[must_use]
pub fn get(&self, index: usize) -> Option<Value> {
match self {
Self::BitPacked(bp) => bp.get(index).map(|v| {
#[allow(clippy::cast_possible_wrap)]
let val = Value::Int64(v as i64);
val
}),
Self::Dict(dict) => dict.get(index).map(|s| Value::String(ArcStr::from(s))),
Self::Bitmap(bv) => bv.get(index).map(Value::Bool),
Self::Int8Vector { bytes, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 {
return None;
}
let start = index.checked_mul(dims)?;
let end = start.checked_add(dims)?;
if end > bytes.len() {
return None;
}
let values: Vec<Value> = (start..end)
.map(|i| Value::Int64(read_i8(bytes, i).unwrap_or(0) as i64))
.collect();
Some(Value::List(Arc::from(values)))
}
Self::Float64(store) => store.get(index).map(Value::Float64),
Self::RawI64(store) => store.get(index).map(Value::Int64),
Self::Float32Vector { bytes, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 {
return None;
}
let start_byte = index.checked_mul(dims)?.checked_mul(4)?;
let end_byte = start_byte.checked_add(dims.checked_mul(4)?)?;
if end_byte > bytes.len() {
return None;
}
let values: Vec<f32> = (0..dims)
.map(|d| read_le_f32(bytes, start_byte + d * 4).unwrap_or(0.0))
.collect();
Some(Value::Vector(Arc::from(values.as_slice())))
}
}
}
#[inline]
#[must_use]
pub fn get_raw_u64(&self, index: usize) -> Option<u64> {
match self {
Self::BitPacked(bp) => bp.get(index),
_ => None,
}
}
#[must_use]
pub fn get_int8_vector(&self, index: usize) -> Option<&[i8]> {
match self {
Self::Int8Vector { bytes, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 {
return None;
}
let start = index.checked_mul(dims)?;
let end = start.checked_add(dims)?;
if end > bytes.len() {
return None;
}
let u8_slice: &[u8] = &bytes[start..end];
#[allow(unsafe_code)]
let i8_slice: &[i8] = unsafe {
std::slice::from_raw_parts(u8_slice.as_ptr().cast::<i8>(), u8_slice.len())
};
Some(i8_slice)
}
_ => None,
}
}
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::BitPacked(bp) => bp.len(),
Self::Dict(dict) => dict.len(),
Self::Bitmap(bv) => bv.len(),
Self::Int8Vector { bytes, dimensions } => {
let dims = *dimensions as usize;
bytes.len().checked_div(dims).unwrap_or(0)
}
Self::Float64(store) => store.len_elements(),
Self::Float32Vector { bytes, dimensions } => {
let dims = *dimensions as usize;
bytes.len().checked_div(dims * 4).unwrap_or(0)
}
Self::RawI64(store) => store.len_elements(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn block_count(&self) -> usize {
if self.is_empty() {
return 1;
}
let block_rows = crate::codec::DEFAULT_BLOCK_ROWS as usize;
self.len().div_ceil(block_rows)
}
#[must_use]
pub fn block_at(&self, i: usize) -> Option<BlockEntry> {
if i >= self.block_count() {
return None;
}
let block_rows = crate::codec::DEFAULT_BLOCK_ROWS as usize;
let start = i * block_rows;
let end = (start + block_rows).min(self.len());
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
Some(BlockEntry::new(row_count))
}
pub fn block_iter(&self) -> impl Iterator<Item = BlockEntry> + '_ {
(0..self.block_count()).filter_map(move |i| self.block_at(i))
}
pub fn find_eq(&self, target: &Value) -> Vec<usize> {
match (self, target) {
(Self::BitPacked(bp), &Value::Int64(v)) => {
if v < 0 {
return Vec::new();
}
#[allow(clippy::cast_sign_loss)]
let target_u64 = v as u64;
bp.scan_eq(target_u64)
}
(Self::Dict(dict), Value::String(s)) => match dict.encode(s.as_str()) {
Some(code) => dict.filter_by_code(|c| c == code),
None => Vec::new(),
},
(Self::Bitmap(bv), &Value::Bool(target_bool)) => (0..bv.len())
.filter(|&i| bv.get(i) == Some(target_bool))
.collect(),
(Self::Float64(store), &Value::Float64(target)) => match store.as_slice() {
Some(slice) => slice
.iter()
.enumerate()
.filter(|&(_, &v)| v == target)
.map(|(i, _)| i)
.collect(),
None => (0..store.len_elements())
.filter(|&i| store.get(i) == Some(target))
.collect(),
},
(Self::RawI64(store), &Value::Int64(target)) => match store.as_slice() {
Some(slice) => slice
.iter()
.enumerate()
.filter(|&(_, &v)| v == target)
.map(|(i, _)| i)
.collect(),
None => (0..store.len_elements())
.filter(|&i| store.get(i) == Some(target))
.collect(),
},
_ => (0..self.len())
.filter(|&i| self.get(i).as_ref() == Some(target))
.collect(),
}
}
pub fn find_in_range(
&self,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<usize> {
if let Self::BitPacked(bp) = self {
let min_u64 = match min {
#[allow(clippy::cast_sign_loss)]
Some(&Value::Int64(v)) if v >= 0 => Some(v as u64),
Some(&Value::Int64(_)) => Some(0),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
let max_u64 = match max {
#[allow(clippy::cast_sign_loss)]
Some(&Value::Int64(v)) if v >= 0 => Some(v as u64),
Some(&Value::Int64(v)) if v < 0 => return Vec::new(),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
return (0..bp.len())
.filter(|&i| {
if let Some(v) = bp.get(i) {
let above_min = match min_u64 {
Some(lo) if min_inclusive => v >= lo,
Some(lo) => v > lo,
None => true,
};
let below_max = match max_u64 {
Some(hi) if max_inclusive => v <= hi,
Some(hi) => v < hi,
None => true,
};
above_min && below_max
} else {
false
}
})
.collect();
}
if let Self::RawI64(store) = self {
let min_i64 = match min {
Some(&Value::Int64(v)) => Some(v),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
let max_i64 = match max {
Some(&Value::Int64(v)) => Some(v),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
let pred = |v: i64| {
let above_min = match min_i64 {
Some(lo) if min_inclusive => v >= lo,
Some(lo) => v > lo,
None => true,
};
let below_max = match max_i64 {
Some(hi) if max_inclusive => v <= hi,
Some(hi) => v < hi,
None => true,
};
above_min && below_max
};
return match store.as_slice() {
Some(slice) => slice
.iter()
.enumerate()
.filter_map(|(i, &v)| pred(v).then_some(i))
.collect(),
None => (0..store.len_elements())
.filter_map(|i| store.get(i).and_then(|v| pred(v).then_some(i)))
.collect(),
};
}
self.find_in_range_fallback(min, max, min_inclusive, max_inclusive)
}
#[inline]
fn matches_range(
&self,
i: usize,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> bool {
use super::zone_map::compare_values;
let Some(v) = self.get(i) else {
return false;
};
if let Some(min_val) = min {
match compare_values(&v, min_val) {
Some(std::cmp::Ordering::Less) => return false,
Some(std::cmp::Ordering::Equal) if !min_inclusive => return false,
None => return false,
_ => {}
}
}
if let Some(max_val) = max {
match compare_values(&v, max_val) {
Some(std::cmp::Ordering::Greater) => return false,
Some(std::cmp::Ordering::Equal) if !max_inclusive => return false,
None => return false,
_ => {}
}
}
true
}
fn find_in_range_fallback(
&self,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<usize> {
(0..self.len())
.filter(|&i| self.matches_range(i, min, max, min_inclusive, max_inclusive))
.collect()
}
pub fn range_iter<'a>(
&'a self,
block_zone_maps: Option<&'a [super::zone_map::ZoneMap]>,
min: Option<&'a Value>,
max: Option<&'a Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Box<dyn Iterator<Item = usize> + 'a> {
use crate::graph::lpg::CompareOp;
let block_count = self.block_count();
let block_rows = crate::codec::DEFAULT_BLOCK_ROWS as usize;
let total_len = self.len();
let zone_maps_ok = block_zone_maps.is_some_and(|zm| zm.len() == block_count);
let zone_maps = if zone_maps_ok { block_zone_maps } else { None };
let blocks = (0..block_count).filter_map(move |block_idx| {
let start = block_idx * block_rows;
let end = start.saturating_add(block_rows).min(total_len);
if let Some(zms) = zone_maps {
let zm = &zms[block_idx];
if let Some(min_val) = min {
let op = if min_inclusive {
CompareOp::Ge
} else {
CompareOp::Gt
};
if !zm.might_match(op, min_val) {
return None;
}
}
if let Some(max_val) = max {
let op = if max_inclusive {
CompareOp::Le
} else {
CompareOp::Lt
};
if !zm.might_match(op, max_val) {
return None;
}
}
}
Some((start, end))
});
Box::new(blocks.flat_map(move |(start, end)| {
(start..end)
.filter(move |&i| self.matches_range(i, min, max, min_inclusive, max_inclusive))
}))
}
pub fn write_to(&self, buf: &mut Vec<u8>) {
match self {
Self::BitPacked(bp) => {
buf.push(0); buf.push(bp.bits_per_value());
write_usize_as_u32(buf, bp.len());
write_usize_as_u32(buf, bp.word_count());
buf.extend_from_slice(bp.data_bytes().as_ref());
}
Self::Dict(dict) => {
buf.push(1); let dict_entries = dict.dictionary();
write_usize_as_u32(buf, dict_entries.len());
for entry in dict_entries.iter() {
let s = entry.as_ref().as_bytes();
write_usize_as_u32(buf, s.len());
buf.extend_from_slice(s);
}
write_usize_as_u32(buf, dict.code_count());
buf.extend_from_slice(dict.codes_bytes().as_ref());
}
Self::Bitmap(bv) => {
buf.push(2); write_usize_as_u32(buf, bv.len());
write_usize_as_u32(buf, bv.word_count());
buf.extend_from_slice(bv.data_bytes());
}
Self::Int8Vector { bytes, dimensions } => {
buf.push(3); buf.extend_from_slice(&dimensions.to_le_bytes());
write_usize_as_u32(buf, bytes.len());
buf.extend_from_slice(bytes);
}
Self::Float64(store) => {
buf.push(4); let body = store.to_bytes();
write_usize_as_u32(buf, body.len() / 8);
buf.extend_from_slice(&body);
}
Self::Float32Vector { bytes, dimensions } => {
buf.push(5); buf.extend_from_slice(&dimensions.to_le_bytes());
let dims_bytes = (*dimensions as usize) * 4;
let total_components = bytes.len().checked_div(4).unwrap_or(0);
write_usize_as_u32(buf, total_components);
let _ = dims_bytes;
buf.extend_from_slice(bytes);
}
Self::RawI64(store) => {
buf.push(6); let body = store.to_bytes();
write_usize_as_u32(buf, body.len() / 8);
buf.extend_from_slice(&body);
}
}
}
pub fn read_from(data: &Bytes, pos: &mut usize) -> Result<Self, &'static str> {
let bytes = data.as_ref();
let discriminant = *bytes.get(*pos).ok_or("truncated codec discriminant")?;
*pos += 1;
match discriminant {
0 => {
let bits = *bytes.get(*pos).ok_or("truncated bits_per_value")?;
*pos += 1;
let count = read_u32_le(bytes, pos)? as usize;
let word_count = read_u32_le(bytes, pos)? as usize;
let need = word_count
.checked_mul(8)
.ok_or("BitPacked word count overflow")?;
if *pos + need > bytes.len() {
return Err("truncated BitPacked data");
}
let storage = data.slice(*pos..*pos + need);
*pos += need;
Ok(Self::BitPacked(BitPackedInts::from_bytes_storage(
storage, bits, count,
)))
}
1 => {
let dict_len = read_u32_le(bytes, pos)? as usize;
let mut entries: Vec<Arc<str>> = Vec::with_capacity(dict_len);
for _ in 0..dict_len {
let slen = read_u32_le(bytes, pos)? as usize;
if *pos + slen > bytes.len() {
return Err("truncated dict string");
}
let s = std::str::from_utf8(&bytes[*pos..*pos + slen])
.map_err(|_| "invalid UTF-8 in dict")?;
entries.push(Arc::from(s));
*pos += slen;
}
let codes_len = read_u32_le(bytes, pos)? as usize;
let need = codes_len.checked_mul(4).ok_or("Dict codes overflow")?;
if *pos + need > bytes.len() {
return Err("truncated Dict codes");
}
let codes_bytes = data.slice(*pos..*pos + need);
*pos += need;
Ok(Self::Dict(DictionaryEncoding::from_bytes_storage(
Arc::from(entries.into_boxed_slice()),
codes_bytes,
codes_len,
)))
}
2 => {
let bit_len = read_u32_le(bytes, pos)? as usize;
let word_count = read_u32_le(bytes, pos)? as usize;
let need = word_count
.checked_mul(8)
.ok_or("Bitmap word count overflow")?;
if *pos + need > bytes.len() {
return Err("truncated Bitmap data");
}
let storage = data.slice(*pos..*pos + need);
*pos += need;
Ok(Self::Bitmap(BitVector::from_bytes_storage(
storage, bit_len,
)))
}
3 => {
let dimensions = read_u16_le(bytes, pos)?;
let data_len = read_u32_le(bytes, pos)? as usize;
if *pos + data_len > bytes.len() {
return Err("truncated Int8Vector data");
}
let storage = data.slice(*pos..*pos + data_len);
*pos += data_len;
Ok(Self::Int8Vector {
bytes: storage,
dimensions,
})
}
4 => {
let count = read_u32_le(bytes, pos)? as usize;
let byte_need = count.checked_mul(8).ok_or("Float64 length overflow")?;
if *pos + byte_need > bytes.len() {
return Err("truncated Float64 data");
}
let storage = data.slice(*pos..*pos + byte_need);
*pos += byte_need;
Ok(Self::Float64(F64Store::Mapped(storage)))
}
5 => {
let dimensions = read_u16_le(bytes, pos)?;
let component_count = read_u32_le(bytes, pos)? as usize;
let byte_need = component_count
.checked_mul(4)
.ok_or("Float32Vector length overflow")?;
if *pos + byte_need > bytes.len() {
return Err("truncated Float32Vector data");
}
let storage = data.slice(*pos..*pos + byte_need);
*pos += byte_need;
Ok(Self::Float32Vector {
bytes: storage,
dimensions,
})
}
6 => {
let count = read_u32_le(bytes, pos)? as usize;
let byte_need = count.checked_mul(8).ok_or("RawI64 length overflow")?;
if *pos + byte_need > bytes.len() {
return Err("truncated RawI64 data");
}
let storage = data.slice(*pos..*pos + byte_need);
*pos += byte_need;
Ok(Self::RawI64(I64Store::Mapped(storage)))
}
_ => Err("unknown codec discriminant"),
}
}
pub fn write_to_v2(&self, buf: &mut Vec<u8>) {
let (metas, bodies) = self.emit_blocked_codec(buf);
write_block_index_and_bodies(buf, &metas, &bodies);
}
pub fn write_to_v3(&self, buf: &mut Vec<u8>, stats_hint: Option<&[super::zone_map::ZoneMap]>) {
let (metas, bodies) = self.emit_blocked_codec(buf);
let computed;
let stats: &[super::zone_map::ZoneMap] = match stats_hint {
Some(hint) if hint.len() == metas.len() => hint,
_ => {
computed = super::zone_map::compute_block_zone_maps(self);
&computed
}
};
write_block_index_and_bodies_with_stats(buf, &metas, &bodies, stats);
}
fn emit_blocked_codec(&self, buf: &mut Vec<u8>) -> (Vec<BlockMeta>, Vec<u8>) {
let block_count = self.block_count();
let block_rows = crate::codec::DEFAULT_BLOCK_ROWS as usize;
let mut bodies: Vec<u8> = Vec::new();
let mut metas: Vec<BlockMeta> = Vec::with_capacity(block_count);
match self {
Self::BitPacked(bp) => {
buf.push(0);
buf.push(bp.bits_per_value());
let bits_per_value = bp.bits_per_value();
for i in 0..block_count {
let start = i * block_rows;
let end = (start + block_rows).min(bp.len());
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
let row_values: Vec<u64> = (start..end)
.map(|j| bp.get(j).expect("row in range"))
.collect();
let block_packed =
crate::codec::BitPackedInts::pack_with_bits(&row_values, bits_per_value);
write_usize_as_u32(&mut bodies, block_packed.word_count());
bodies.extend_from_slice(block_packed.data_bytes().as_ref());
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::Dict(dict) => {
buf.push(1);
let entries = dict.dictionary();
write_usize_as_u32(buf, entries.len());
for entry in entries.iter() {
let s = entry.as_ref().as_bytes();
write_usize_as_u32(buf, s.len());
buf.extend_from_slice(s);
}
let codes_bytes = dict.codes_bytes();
let total_codes = dict.code_count();
for i in 0..block_count {
let start = i * block_rows;
let end = (start + block_rows).min(total_codes);
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
bodies.extend_from_slice(&codes_bytes[start * 4..end * 4]);
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::Bitmap(bv) => {
buf.push(2);
for i in 0..block_count {
let start = i * block_rows;
let end = (start + block_rows).min(bv.len());
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
let bits: Vec<bool> = (start..end)
.map(|j| bv.get(j).expect("row in range"))
.collect();
let block_bv = crate::codec::BitVector::from_bools(&bits);
write_usize_as_u32(&mut bodies, block_bv.word_count());
bodies.extend_from_slice(block_bv.data_bytes());
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::Int8Vector { bytes, dimensions } => {
buf.push(3);
buf.extend_from_slice(&dimensions.to_le_bytes());
let dims = *dimensions as usize;
let row_count_total = bytes.len().checked_div(dims).unwrap_or(0);
for i in 0..block_count {
let start_row = i * block_rows;
let end_row = (start_row + block_rows).min(row_count_total);
#[allow(clippy::cast_possible_truncation)]
let row_count = (end_row - start_row) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
if dims > 0 {
let start = start_row * dims;
let end = end_row * dims;
bodies.extend_from_slice(&bytes[start..end]);
}
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::Float64(store) => {
buf.push(4);
let body = store.to_bytes();
let total_rows = body.len() / 8;
for i in 0..block_count {
let start = i * block_rows;
let end = (start + block_rows).min(total_rows);
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
bodies.extend_from_slice(&body[start * 8..end * 8]);
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::Float32Vector { bytes, dimensions } => {
buf.push(5);
buf.extend_from_slice(&dimensions.to_le_bytes());
let dims = *dimensions as usize;
let row_byte_size = dims.checked_mul(4).unwrap_or(0);
let row_count_total = bytes.len().checked_div(row_byte_size).unwrap_or(0);
for i in 0..block_count {
let start_row = i * block_rows;
let end_row = (start_row + block_rows).min(row_count_total);
#[allow(clippy::cast_possible_truncation)]
let row_count = (end_row - start_row) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
if row_byte_size > 0 {
let start = start_row * row_byte_size;
let end = end_row * row_byte_size;
bodies.extend_from_slice(&bytes[start..end]);
}
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
Self::RawI64(store) => {
buf.push(6);
let body = store.to_bytes();
let total_rows = body.len() / 8;
for i in 0..block_count {
let start = i * block_rows;
let end = (start + block_rows).min(total_rows);
#[allow(clippy::cast_possible_truncation)]
let row_count = (end - start) as u32;
#[allow(clippy::cast_possible_truncation)]
let byte_offset = bodies.len() as u32;
bodies.extend_from_slice(&body[start * 8..end * 8]);
#[allow(clippy::cast_possible_truncation)]
let byte_len = (bodies.len() as u32) - byte_offset;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
}
}
(metas, bodies)
}
pub fn read_from_v2(data: &Bytes, pos: &mut usize) -> Result<Self, &'static str> {
let bytes = data.as_ref();
let discriminant = *bytes.get(*pos).ok_or("truncated codec discriminant")?;
*pos += 1;
match discriminant {
0 => {
let bits = *bytes.get(*pos).ok_or("truncated bits_per_value")?;
*pos += 1;
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let mut all_values: Vec<u64> = Vec::new();
for meta in &metas {
let body_start = bodies_start + meta.byte_offset as usize;
let body_end = body_start + meta.byte_len as usize;
if body_end > bytes.len() {
return Err("BitPacked block body out of bounds");
}
let mut bp = body_start;
let word_count = read_u32_le(bytes, &mut bp)? as usize;
let mut words = Vec::with_capacity(word_count);
for _ in 0..word_count {
words.push(read_u64_le(bytes, &mut bp)?);
}
let block_bp = crate::codec::BitPackedInts::from_raw_parts(
words,
bits,
meta.row_count as usize,
);
for j in 0..meta.row_count as usize {
all_values.push(
block_bp
.get(j)
.ok_or("BitPacked block index out of range")?,
);
}
}
*pos = bodies_start + total_bodies_len(&metas);
Ok(Self::BitPacked(
crate::codec::BitPackedInts::pack_with_bits(&all_values, bits),
))
}
1 => {
let dict_len = read_u32_le(bytes, pos)? as usize;
let mut entries: Vec<Arc<str>> = Vec::with_capacity(dict_len);
for _ in 0..dict_len {
let slen = read_u32_le(bytes, pos)? as usize;
if *pos + slen > bytes.len() {
return Err("truncated dict string");
}
let s = std::str::from_utf8(&bytes[*pos..*pos + slen])
.map_err(|_| "invalid UTF-8 in dict")?;
entries.push(Arc::from(s));
*pos += slen;
}
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Dict v2 bodies out of bounds");
}
let codes_bytes = data.slice(bodies_start..bodies_start + total);
let code_count = total / 4;
*pos = bodies_start + total;
Ok(Self::Dict(DictionaryEncoding::from_bytes_storage(
Arc::from(entries.into_boxed_slice()),
codes_bytes,
code_count,
)))
}
2 => {
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let mut all_bits: Vec<bool> = Vec::new();
for meta in &metas {
let body_start = bodies_start + meta.byte_offset as usize;
let mut bp = body_start;
let word_count = read_u32_le(bytes, &mut bp)? as usize;
let mut words = Vec::with_capacity(word_count);
for _ in 0..word_count {
words.push(read_u64_le(bytes, &mut bp)?);
}
let block_bv =
crate::codec::BitVector::from_raw_parts(words, meta.row_count as usize);
for j in 0..meta.row_count as usize {
all_bits.push(block_bv.get(j).ok_or("Bitmap block index out of range")?);
}
}
*pos = bodies_start + total_bodies_len(&metas);
Ok(Self::Bitmap(crate::codec::BitVector::from_bools(&all_bits)))
}
3 => {
let dimensions = read_u16_le(bytes, pos)?;
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Int8Vector v2 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok(Self::Int8Vector {
bytes: storage,
dimensions,
})
}
4 => {
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Float64 v2 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok(Self::Float64(F64Store::Mapped(storage)))
}
5 => {
let dimensions = read_u16_le(bytes, pos)?;
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Float32Vector v2 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok(Self::Float32Vector {
bytes: storage,
dimensions,
})
}
6 => {
let (metas, bodies_start) = read_block_index(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("RawI64 v2 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok(Self::RawI64(I64Store::Mapped(storage)))
}
_ => Err("unknown codec discriminant"),
}
}
pub fn read_from_v3(
data: &Bytes,
pos: &mut usize,
) -> Result<(Self, Vec<super::zone_map::ZoneMap>), &'static str> {
let bytes = data.as_ref();
let discriminant = *bytes.get(*pos).ok_or("truncated codec discriminant")?;
*pos += 1;
match discriminant {
0 => {
let bits = *bytes.get(*pos).ok_or("truncated bits_per_value")?;
*pos += 1;
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let mut all_values: Vec<u64> = Vec::new();
for meta in &metas {
let body_start = bodies_start + meta.byte_offset as usize;
let body_end = body_start + meta.byte_len as usize;
if body_end > bytes.len() {
return Err("BitPacked block body out of bounds");
}
let mut bp = body_start;
let word_count = read_u32_le(bytes, &mut bp)? as usize;
let mut words = Vec::with_capacity(word_count);
for _ in 0..word_count {
words.push(read_u64_le(bytes, &mut bp)?);
}
let block_bp = crate::codec::BitPackedInts::from_raw_parts(
words,
bits,
meta.row_count as usize,
);
for j in 0..meta.row_count as usize {
all_values.push(
block_bp
.get(j)
.ok_or("BitPacked block index out of range")?,
);
}
}
*pos = bodies_start + total_bodies_len(&metas);
Ok((
Self::BitPacked(crate::codec::BitPackedInts::pack_with_bits(
&all_values,
bits,
)),
stats,
))
}
1 => {
let dict_len = read_u32_le(bytes, pos)? as usize;
let mut entries: Vec<Arc<str>> = Vec::with_capacity(dict_len);
for _ in 0..dict_len {
let slen = read_u32_le(bytes, pos)? as usize;
if *pos + slen > bytes.len() {
return Err("truncated dict string");
}
let s = std::str::from_utf8(&bytes[*pos..*pos + slen])
.map_err(|_| "invalid UTF-8 in dict")?;
entries.push(Arc::from(s));
*pos += slen;
}
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Dict v3 bodies out of bounds");
}
let codes_bytes = data.slice(bodies_start..bodies_start + total);
let code_count = total / 4;
*pos = bodies_start + total;
Ok((
Self::Dict(DictionaryEncoding::from_bytes_storage(
Arc::from(entries.into_boxed_slice()),
codes_bytes,
code_count,
)),
stats,
))
}
2 => {
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let mut all_bits: Vec<bool> = Vec::new();
for meta in &metas {
let body_start = bodies_start + meta.byte_offset as usize;
let mut bp = body_start;
let word_count = read_u32_le(bytes, &mut bp)? as usize;
let mut words = Vec::with_capacity(word_count);
for _ in 0..word_count {
words.push(read_u64_le(bytes, &mut bp)?);
}
let block_bv =
crate::codec::BitVector::from_raw_parts(words, meta.row_count as usize);
for j in 0..meta.row_count as usize {
all_bits.push(block_bv.get(j).ok_or("Bitmap block index out of range")?);
}
}
*pos = bodies_start + total_bodies_len(&metas);
Ok((
Self::Bitmap(crate::codec::BitVector::from_bools(&all_bits)),
stats,
))
}
3 => {
let dimensions = read_u16_le(bytes, pos)?;
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Int8Vector v3 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok((
Self::Int8Vector {
bytes: storage,
dimensions,
},
stats,
))
}
4 => {
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Float64 v3 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok((Self::Float64(F64Store::Mapped(storage)), stats))
}
5 => {
let dimensions = read_u16_le(bytes, pos)?;
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("Float32Vector v3 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok((
Self::Float32Vector {
bytes: storage,
dimensions,
},
stats,
))
}
6 => {
let (metas, stats, bodies_start) = read_block_index_v3(bytes, pos)?;
let total = total_bodies_len(&metas);
if bodies_start + total > bytes.len() {
return Err("RawI64 v3 bodies out of bounds");
}
let storage = data.slice(bodies_start..bodies_start + total);
*pos = bodies_start + total;
Ok((Self::RawI64(I64Store::Mapped(storage)), stats))
}
_ => Err("unknown codec discriminant"),
}
}
#[must_use]
pub fn heap_bytes(&self) -> usize {
match self {
Self::BitPacked(bp) => bp.data_bytes().len(),
Self::Dict(d) => {
let codes_bytes = d.code_count() * 4;
let dict_bytes: usize = d.dictionary().iter().map(|s| s.len()).sum();
codes_bytes + dict_bytes
}
Self::Bitmap(bv) => bv.data_bytes().len(),
Self::Int8Vector { bytes, .. } => bytes.len(),
Self::Float64(store) => store.byte_len(),
Self::Float32Vector { bytes, .. } => bytes.len(),
Self::RawI64(store) => store.byte_len(),
}
}
}
#[derive(Debug, Clone, Copy)]
struct BlockMeta {
byte_offset: u32,
byte_len: u32,
row_count: u32,
}
const BLOCK_META_BYTES: usize = 12;
fn total_bodies_len(metas: &[BlockMeta]) -> usize {
metas
.last()
.map_or(0, |m| (m.byte_offset + m.byte_len) as usize)
}
fn write_block_index_and_bodies(buf: &mut Vec<u8>, metas: &[BlockMeta], bodies: &[u8]) {
write_usize_as_u32(buf, metas.len());
for meta in metas {
buf.extend_from_slice(&meta.byte_offset.to_le_bytes());
buf.extend_from_slice(&meta.byte_len.to_le_bytes());
buf.extend_from_slice(&meta.row_count.to_le_bytes());
}
buf.extend_from_slice(bodies);
}
fn write_block_index_and_bodies_with_stats(
buf: &mut Vec<u8>,
metas: &[BlockMeta],
bodies: &[u8],
stats: &[super::zone_map::ZoneMap],
) {
debug_assert_eq!(metas.len(), stats.len(), "stats must align with metas");
write_usize_as_u32(buf, metas.len());
for (meta, zm) in metas.iter().zip(stats.iter()) {
buf.extend_from_slice(&meta.byte_offset.to_le_bytes());
buf.extend_from_slice(&meta.byte_len.to_le_bytes());
buf.extend_from_slice(&meta.row_count.to_le_bytes());
zm.write_inline(buf);
}
buf.extend_from_slice(bodies);
}
fn read_block_index(data: &[u8], pos: &mut usize) -> Result<(Vec<BlockMeta>, usize), &'static str> {
let block_count = read_u32_le(data, pos)? as usize;
let index_bytes = block_count
.checked_mul(BLOCK_META_BYTES)
.ok_or("block index overflow")?;
if *pos + index_bytes > data.len() {
return Err("truncated block index");
}
let mut metas = Vec::with_capacity(block_count);
for _ in 0..block_count {
let byte_offset = read_u32_le(data, pos)?;
let byte_len = read_u32_le(data, pos)?;
let row_count = read_u32_le(data, pos)?;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
}
validate_block_metas(&metas)?;
let bodies_start = *pos;
Ok((metas, bodies_start))
}
fn validate_block_metas(metas: &[BlockMeta]) -> Result<(), &'static str> {
let mut expected_offset: u64 = 0;
for meta in metas {
if u64::from(meta.byte_offset) != expected_offset {
return Err("non-contiguous block index (gap or overlap)");
}
expected_offset = expected_offset
.checked_add(u64::from(meta.byte_len))
.ok_or("block byte_len overflow")?;
}
if expected_offset > u64::from(u32::MAX) {
return Err("block bodies exceed u32 range");
}
Ok(())
}
fn read_block_index_v3(
data: &[u8],
pos: &mut usize,
) -> Result<(Vec<BlockMeta>, Vec<super::zone_map::ZoneMap>, usize), &'static str> {
let block_count = read_u32_le(data, pos)? as usize;
let mut metas = Vec::with_capacity(block_count);
let mut stats = Vec::with_capacity(block_count);
for _ in 0..block_count {
let byte_offset = read_u32_le(data, pos)?;
let byte_len = read_u32_le(data, pos)?;
let row_count = read_u32_le(data, pos)?;
let zm = super::zone_map::ZoneMap::read_inline(data, pos)?;
metas.push(BlockMeta {
byte_offset,
byte_len,
row_count,
});
stats.push(zm);
}
validate_block_metas(&metas)?;
let bodies_start = *pos;
Ok((metas, stats, bodies_start))
}
fn write_usize_as_u32(buf: &mut Vec<u8>, v: usize) {
let n = u32::try_from(v).expect("value exceeds u32::MAX in compact codec serialization");
buf.extend_from_slice(&n.to_le_bytes());
}
fn read_u16_le(data: &[u8], pos: &mut usize) -> Result<u16, &'static str> {
if *pos + 2 > data.len() {
return Err("truncated u16");
}
let v = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
*pos += 2;
Ok(v)
}
fn read_u32_le(data: &[u8], pos: &mut usize) -> Result<u32, &'static str> {
if *pos + 4 > data.len() {
return Err("truncated u32");
}
let v = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
*pos += 4;
Ok(v)
}
fn read_u64_le(data: &[u8], pos: &mut usize) -> Result<u64, &'static str> {
if *pos + 8 > data.len() {
return Err("truncated u64");
}
let v = u64::from_le_bytes(data[*pos..*pos + 8].try_into().unwrap());
*pos += 8;
Ok(v)
}
#[cfg(test)]
#[allow(clippy::cast_possible_wrap)]
mod tests {
use super::*;
use crate::codec::{BitPackedInts, BitVector, DictionaryBuilder};
#[test]
fn test_bitpacked_round_trip() {
let values = vec![0u64, 5, 10, 15, 3, 7];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.len(), 6);
assert!(!col.is_empty());
for (i, &expected) in values.iter().enumerate() {
let v = col.get(i).unwrap();
assert_eq!(v, Value::Int64(expected as i64));
}
}
#[test]
fn test_dict_round_trip() {
let mut builder = DictionaryBuilder::new();
builder.add("alpha");
builder.add("beta");
builder.add("alpha");
let dict = builder.build();
let col = ColumnCodec::Dict(dict);
assert_eq!(col.len(), 3);
assert_eq!(col.get(0), Some(Value::String(ArcStr::from("alpha"))));
assert_eq!(col.get(1), Some(Value::String(ArcStr::from("beta"))));
assert_eq!(col.get(2), Some(Value::String(ArcStr::from("alpha"))));
}
#[test]
fn test_bitmap_round_trip() {
let bools = vec![true, false, true, true, false];
let bv = BitVector::from_bools(&bools);
let col = ColumnCodec::Bitmap(bv);
assert_eq!(col.len(), 5);
assert_eq!(col.get(0), Some(Value::Bool(true)));
assert_eq!(col.get(1), Some(Value::Bool(false)));
assert_eq!(col.get(2), Some(Value::Bool(true)));
assert_eq!(col.get(3), Some(Value::Bool(true)));
assert_eq!(col.get(4), Some(Value::Bool(false)));
}
#[test]
fn test_int8_vector_round_trip() {
let data = vec![1i8, 2, 3, -4, -5, -6];
let col = ColumnCodec::int8_vector(data, 3);
assert_eq!(col.len(), 2);
let v0 = col.get(0).unwrap();
let expected0: Vec<Value> = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
assert_eq!(v0, Value::List(Arc::from(expected0)));
let v1 = col.get(1).unwrap();
let expected1: Vec<Value> = vec![Value::Int64(-4), Value::Int64(-5), Value::Int64(-6)];
assert_eq!(v1, Value::List(Arc::from(expected1)));
}
#[test]
fn test_get_raw_u64_on_bitpacked() {
let values = vec![100u64, 200, 300];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get_raw_u64(0), Some(100));
assert_eq!(col.get_raw_u64(1), Some(200));
assert_eq!(col.get_raw_u64(2), Some(300));
assert_eq!(col.get_raw_u64(3), None);
let bv = BitVector::from_bools(&[true]);
let bm_col = ColumnCodec::Bitmap(bv);
assert_eq!(bm_col.get_raw_u64(0), None);
}
#[test]
fn test_get_int8_vector_slice() {
let data = vec![10i8, 20, 30, 40, 50, 60];
let col = ColumnCodec::int8_vector(data, 3);
assert_eq!(col.get_int8_vector(0), Some(&[10i8, 20, 30][..]));
assert_eq!(col.get_int8_vector(1), Some(&[40i8, 50, 60][..]));
assert_eq!(col.get_int8_vector(2), None);
let bp = BitPackedInts::pack(&[1u64]);
let bp_col = ColumnCodec::BitPacked(bp);
assert_eq!(bp_col.get_int8_vector(0), None);
}
#[test]
fn test_out_of_bounds_returns_none() {
let bp = BitPackedInts::pack(&[1u64, 2, 3]);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get(999), None);
assert_eq!(col.get_raw_u64(999), None);
let bv = BitVector::from_bools(&[true]);
let bm = ColumnCodec::Bitmap(bv);
assert_eq!(bm.get(5), None);
let mut builder = DictionaryBuilder::new();
builder.add("x");
let dict = builder.build();
let dc = ColumnCodec::Dict(dict);
assert_eq!(dc.get(10), None);
let vec_col = ColumnCodec::int8_vector(vec![1, 2], 2);
assert_eq!(vec_col.get(1), None);
assert_eq!(vec_col.get_int8_vector(1), None);
}
#[test]
fn test_find_eq_bitpacked() {
let values = vec![0u64, 5, 10, 5, 3, 5];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.find_eq(&Value::Int64(5)), vec![1, 3, 5]);
assert_eq!(col.find_eq(&Value::Int64(0)), vec![0]);
assert_eq!(col.find_eq(&Value::Int64(99)), Vec::<usize>::new());
assert_eq!(col.find_eq(&Value::Int64(-1)), Vec::<usize>::new());
}
#[test]
fn test_find_eq_dict() {
let mut builder = DictionaryBuilder::new();
for name in ["Vincent", "Jules", "Vincent", "Mia", "Jules"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
assert_eq!(col.find_eq(&Value::String("Vincent".into())), vec![0, 2]);
assert_eq!(col.find_eq(&Value::String("Mia".into())), vec![3]);
assert_eq!(
col.find_eq(&Value::String("Butch".into())),
Vec::<usize>::new()
);
}
#[test]
fn test_find_eq_bitmap() {
let bools = vec![true, false, true, true, false];
let col = ColumnCodec::Bitmap(BitVector::from_bools(&bools));
assert_eq!(col.find_eq(&Value::Bool(true)), vec![0, 2, 3]);
assert_eq!(col.find_eq(&Value::Bool(false)), vec![1, 4]);
}
#[test]
fn test_find_eq_type_mismatch_uses_fallback() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
assert_eq!(
col.find_eq(&Value::String("hello".into())),
Vec::<usize>::new()
);
}
#[test]
fn test_find_eq_int8_vector_uses_fallback() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::int8_vector(data, 3);
let target_vec: Vec<Value> = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
let target = Value::List(Arc::from(target_vec));
let matches = col.find_eq(&target);
assert_eq!(matches, vec![0]);
}
#[test]
fn test_int8_vector_zero_dimensions_get() {
let col = ColumnCodec::int8_vector(vec![1, 2, 3], 0);
assert_eq!(col.get(0), None);
}
#[test]
fn test_int8_vector_zero_dimensions_get_int8_vector() {
let col = ColumnCodec::int8_vector(vec![1, 2, 3], 0);
assert_eq!(col.get_int8_vector(0), None);
}
#[test]
fn test_int8_vector_zero_dimensions_len_and_is_empty() {
let col = ColumnCodec::int8_vector(vec![1, 2, 3], 0);
assert_eq!(col.len(), 0);
assert!(col.is_empty());
}
#[test]
fn test_heap_bytes_bitpacked() {
let values = vec![0u64, 5, 10, 15];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_dict() {
let mut builder = DictionaryBuilder::new();
builder.add("Amsterdam");
builder.add("Berlin");
builder.add("Paris");
let dict = builder.build();
let col = ColumnCodec::Dict(dict);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_bitmap() {
let bools = vec![true, false, true, true, false];
let bv = BitVector::from_bools(&bools);
let col = ColumnCodec::Bitmap(bv);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_int8_vector() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::int8_vector(data, 3);
assert_eq!(col.heap_bytes(), 6);
}
#[test]
fn test_find_in_range_bitpacked_inclusive() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(3)), Some(&Value::Int64(6)), true, true);
assert_eq!(result, vec![3, 4, 5, 6]);
}
#[test]
fn test_find_in_range_bitpacked_exclusive() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result =
col.find_in_range(Some(&Value::Int64(3)), Some(&Value::Int64(6)), false, false);
assert_eq!(result, vec![4, 5]);
}
#[test]
fn test_find_in_range_bitpacked_open_ended() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(7)), None, false, false);
assert_eq!(result, vec![8, 9]);
let result = col.find_in_range(None, Some(&Value::Int64(2)), false, true);
assert_eq!(result, vec![0, 1, 2]);
}
#[test]
fn test_find_in_range_fallback_for_dict() {
let mut builder = DictionaryBuilder::new();
for name in ["Amsterdam", "Berlin", "Paris", "Prague"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
let result = col.find_in_range(
Some(&Value::String("Berlin".into())),
Some(&Value::String("Prague".into())),
true,
true,
);
assert_eq!(result, vec![1, 2, 3]);
}
#[test]
fn test_find_in_range_negative_max() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(None, Some(&Value::Int64(-1)), false, true);
assert!(result.is_empty());
}
#[test]
fn test_find_in_range_negative_min() {
let values: Vec<u64> = (0..5).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(-10)), None, true, true);
assert_eq!(result, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_find_in_range_type_mismatch_uses_fallback() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(
Some(&Value::String("a".into())),
Some(&Value::String("z".into())),
true,
true,
);
assert!(result.is_empty());
}
#[test]
fn test_find_in_range_int8_vector_uses_fallback() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::int8_vector(data, 3);
let result = col.find_in_range(Some(&Value::Int64(0)), Some(&Value::Int64(10)), true, true);
assert!(result.is_empty());
}
#[test]
fn test_get_out_of_bounds_all_codecs() {
let bp = BitPackedInts::pack(&[1u64, 2, 3]);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get(3), None);
let mut builder = DictionaryBuilder::new();
builder.add("Alix");
let col = ColumnCodec::Dict(builder.build());
assert_eq!(col.get(1), None);
let bv = BitVector::from_bools(&[true]);
let col = ColumnCodec::Bitmap(bv);
assert_eq!(col.get(1), None);
let col = ColumnCodec::int8_vector(vec![1, 2, 3], 3);
assert_eq!(col.get(1), None);
assert_eq!(col.get_int8_vector(1), None);
}
#[test]
fn test_column_int8_vector_roundtrip() {
let dims: u16 = 384;
let rows = 100usize;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let data: Vec<i8> = (0..rows * dims as usize)
.map(|idx| (((idx * 7) % 251) as i64 - 120) as i8)
.collect();
let col = ColumnCodec::int8_vector(data.clone(), dims);
assert_eq!(col.len(), rows);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len(), "read_from should consume the full buffer");
assert_eq!(decoded.len(), rows);
for &row in &[0usize, 1, 50, 99] {
let decoded_slice = decoded.get_int8_vector(row).unwrap();
let start = row * dims as usize;
assert_eq!(decoded_slice, &data[start..start + dims as usize]);
let decoded_value = decoded.get(row).unwrap();
if let Value::List(items) = decoded_value {
assert_eq!(items.len(), dims as usize);
assert_eq!(items[0], Value::Int64(i64::from(decoded_slice[0])));
} else {
panic!("expected Value::List for Int8Vector element");
}
}
}
#[test]
fn test_column_vector_oob_and_zero_dim() {
let col = ColumnCodec::int8_vector(vec![1i8, 2, 3, 4, 5, 6], 3);
assert_eq!(col.len(), 2);
assert!(col.get(2).is_none());
assert!(col.get(5).is_none());
assert!(col.get_int8_vector(2).is_none());
assert!(col.get_int8_vector(5).is_none());
let zero = ColumnCodec::int8_vector(Vec::new(), 0);
assert_eq!(zero.len(), 0);
assert!(zero.is_empty());
assert!(zero.get(0).is_none());
assert!(zero.get_int8_vector(0).is_none());
}
#[test]
fn test_find_in_range_incompatible_types() {
let mut builder = DictionaryBuilder::new();
for city in ["Amsterdam", "Berlin", "Paris", "Prague", "Barcelona"] {
builder.add(city);
}
let col = ColumnCodec::Dict(builder.build());
let result =
col.find_in_range(Some(&Value::Int64(0)), Some(&Value::Int64(100)), true, true);
assert!(
result.is_empty(),
"Int64 bounds on a Dict column should yield no matches"
);
}
#[test]
fn test_column_serde_truncated_buffer() {
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&[1u64, 2, 3, 4, 5]));
let mut buf = Vec::new();
col.write_to(&mut buf);
assert!(buf.len() > 4);
let mut pos = 0;
assert!(ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&[]), &mut pos).is_err());
let mut pos = 0;
assert!(
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf[..1]), &mut pos).is_err()
);
let mut pos = 0;
assert!(
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf[..3]), &mut pos).is_err()
);
let mut pos = 0;
assert!(
ColumnCodec::read_from(
&bytes::Bytes::copy_from_slice(&buf[..buf.len() - 1]),
&mut pos
)
.is_err()
);
let mut pos = 0;
assert!(
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&[0xFFu8]), &mut pos).is_err()
);
let mut bad = vec![3u8];
bad.extend_from_slice(&2u16.to_le_bytes());
bad.extend_from_slice(&4u32.to_le_bytes());
bad.extend_from_slice(&[0u8, 0u8]);
let mut pos = 0;
assert!(ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&bad), &mut pos).is_err());
}
#[test]
fn test_write_read_round_trip_bitpacked() {
let bp = BitPackedInts::pack(&[3u64, 7, 12, 5]);
let col = ColumnCodec::BitPacked(bp);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len(), "read should consume entire buffer");
assert_eq!(decoded.len(), 4);
for i in 0..4 {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_read_round_trip_dict() {
let mut b = DictionaryBuilder::new();
for s in ["Amsterdam", "Berlin", "Amsterdam", "Paris"] {
b.add(s);
}
let col = ColumnCodec::Dict(b.build());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), 4);
for i in 0..4 {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_read_round_trip_bitmap() {
let bv = BitVector::from_bools(&[true, false, true, true, false, false, true]);
let col = ColumnCodec::Bitmap(bv);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), 7);
for i in 0..7 {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_read_round_trip_int8_vector() {
let data: Vec<i8> = vec![1, -2, 3, -4, 5, -6, 7, -8];
let col = ColumnCodec::int8_vector(data, 4);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), 2);
assert_eq!(decoded.get_int8_vector(0), Some(&[1i8, -2, 3, -4][..]));
assert_eq!(decoded.get_int8_vector(1), Some(&[5i8, -6, 7, -8][..]));
}
#[test]
fn test_read_from_empty_buffer_errors() {
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&[]), &mut pos).unwrap_err();
assert_eq!(err, "truncated codec discriminant");
}
#[test]
fn test_read_from_unknown_discriminant_errors() {
let buf = vec![99u8];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "unknown codec discriminant");
}
#[test]
fn test_read_from_truncated_bitpacked_bits() {
let buf = vec![0u8];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated bits_per_value");
}
#[test]
fn test_read_from_truncated_bitpacked_count() {
let buf = vec![0u8, 4, 0, 0]; let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated u32");
}
#[test]
fn test_read_from_truncated_bitpacked_words() {
let mut buf = vec![0u8, 4];
buf.extend_from_slice(&1u32.to_le_bytes()); buf.extend_from_slice(&2u32.to_le_bytes()); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated BitPacked data");
}
#[test]
fn test_read_from_truncated_dict_string() {
let mut buf = vec![1u8];
buf.extend_from_slice(&1u32.to_le_bytes()); buf.extend_from_slice(&5u32.to_le_bytes()); buf.extend_from_slice(b"abc"); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated dict string");
}
#[test]
fn test_read_from_invalid_utf8_in_dict() {
let mut buf = vec![1u8];
buf.extend_from_slice(&1u32.to_le_bytes()); buf.extend_from_slice(&2u32.to_le_bytes()); buf.extend_from_slice(&[0xFF, 0xFE]); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "invalid UTF-8 in dict");
}
#[test]
fn test_read_from_truncated_bitmap_words() {
let mut buf = vec![2u8];
buf.extend_from_slice(&64u32.to_le_bytes()); buf.extend_from_slice(&1u32.to_le_bytes()); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated Bitmap data");
}
#[test]
fn test_read_from_truncated_int8_vector_dimensions() {
let buf = vec![3u8, 0];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated u16");
}
#[test]
fn test_read_from_truncated_int8_vector_data() {
let mut buf = vec![3u8];
buf.extend_from_slice(&2u16.to_le_bytes()); buf.extend_from_slice(&4u32.to_le_bytes()); buf.extend_from_slice(&[10u8, 20]); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated Int8Vector data");
}
#[test]
fn test_empty_bitpacked_round_trip() {
let bp = BitPackedInts::pack(&[]);
let col = ColumnCodec::BitPacked(bp);
assert!(col.is_empty());
assert_eq!(col.len(), 0);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert!(decoded.is_empty());
}
#[test]
fn test_empty_dict_round_trip() {
let builder = DictionaryBuilder::new();
let dict = builder.build();
let col = ColumnCodec::Dict(dict);
assert!(col.is_empty());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert!(decoded.is_empty());
}
#[test]
fn test_empty_bitmap_round_trip() {
let bv = BitVector::from_bools(&[]);
let col = ColumnCodec::Bitmap(bv);
assert!(col.is_empty());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert!(decoded.is_empty());
}
#[test]
fn test_empty_int8_vector_round_trip() {
let col = ColumnCodec::int8_vector(Vec::new(), 4);
assert!(col.is_empty());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(pos, buf.len());
assert!(decoded.is_empty());
}
#[test]
fn test_empty_string_in_dict() {
let mut b = DictionaryBuilder::new();
b.add("");
b.add("Alix");
b.add("");
let col = ColumnCodec::Dict(b.build());
assert_eq!(col.get(0), Some(Value::String(ArcStr::from(""))));
assert_eq!(col.get(1), Some(Value::String(ArcStr::from("Alix"))));
assert_eq!(col.get(2), Some(Value::String(ArcStr::from(""))));
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap();
assert_eq!(decoded.get(0), Some(Value::String(ArcStr::from(""))));
assert_eq!(decoded.get(2), Some(Value::String(ArcStr::from(""))));
}
#[test]
fn test_find_in_range_exact_boundaries_inclusive_vs_exclusive() {
let values = vec![10u64, 20, 30, 40, 50];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let inclusive =
col.find_in_range(Some(&Value::Int64(20)), Some(&Value::Int64(40)), true, true);
assert_eq!(inclusive, vec![1, 2, 3]);
let exclusive = col.find_in_range(
Some(&Value::Int64(20)),
Some(&Value::Int64(40)),
false,
false,
);
assert_eq!(exclusive, vec![2]);
let mixed_a = col.find_in_range(
Some(&Value::Int64(20)),
Some(&Value::Int64(40)),
true,
false,
);
assert_eq!(mixed_a, vec![1, 2]);
let mixed_b = col.find_in_range(
Some(&Value::Int64(20)),
Some(&Value::Int64(40)),
false,
true,
);
assert_eq!(mixed_b, vec![2, 3]);
}
#[test]
fn test_find_in_range_bitpacked_fallback_on_float_min() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Float64(2.5)), None, true, true);
assert_eq!(result, vec![2]);
}
#[test]
fn test_find_in_range_bitpacked_fallback_on_float_max() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(None, Some(&Value::Float64(2.5)), true, true);
assert_eq!(result, vec![0, 1]);
}
#[test]
fn test_find_in_range_open_both_ends_returns_all() {
let values = vec![1u64, 2, 3, 4, 5];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let all = col.find_in_range(None, None, true, true);
assert_eq!(all, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_find_in_range_fallback_dict_exclusive() {
let mut b = DictionaryBuilder::new();
for name in ["Amsterdam", "Berlin", "Paris", "Prague"] {
b.add(name);
}
let col = ColumnCodec::Dict(b.build());
let result = col.find_in_range(Some(&Value::String("Berlin".into())), None, false, true);
assert_eq!(result, vec![2, 3]);
let result = col.find_in_range(None, Some(&Value::String("Prague".into())), true, false);
assert_eq!(result, vec![0, 1, 2]); }
#[test]
fn test_find_in_range_fallback_mismatch_returns_none_for_row() {
let col = ColumnCodec::Bitmap(BitVector::from_bools(&[true, false, true]));
let result = col.find_in_range(Some(&Value::Int64(0)), Some(&Value::Int64(5)), true, true);
assert!(result.is_empty());
}
#[test]
fn test_get_raw_u64_returns_none_for_all_non_bitpacked() {
let mut b = DictionaryBuilder::new();
b.add("x");
assert_eq!(ColumnCodec::Dict(b.build()).get_raw_u64(0), None);
assert_eq!(ColumnCodec::int8_vector(vec![1i8], 1).get_raw_u64(0), None);
}
#[test]
fn test_get_int8_vector_returns_none_for_all_non_vector() {
let bp = BitPackedInts::pack(&[1u64]);
assert_eq!(ColumnCodec::BitPacked(bp).get_int8_vector(0), None);
let mut b = DictionaryBuilder::new();
b.add("x");
assert_eq!(ColumnCodec::Dict(b.build()).get_int8_vector(0), None);
let bv = BitVector::from_bools(&[true]);
assert_eq!(ColumnCodec::Bitmap(bv).get_int8_vector(0), None);
}
#[test]
fn test_heap_bytes_empty_columns() {
let bp = BitPackedInts::pack(&[]);
assert_eq!(ColumnCodec::BitPacked(bp).heap_bytes(), 0);
let builder = DictionaryBuilder::new();
assert_eq!(ColumnCodec::Dict(builder.build()).heap_bytes(), 0);
let col = ColumnCodec::int8_vector(Vec::new(), 4);
assert_eq!(col.heap_bytes(), 0);
}
#[test]
fn test_find_eq_dict_target_not_in_dictionary() {
let mut b = DictionaryBuilder::new();
b.add("Amsterdam");
b.add("Berlin");
let col = ColumnCodec::Dict(b.build());
let result = col.find_eq(&Value::String(ArcStr::from("Prague")));
assert!(result.is_empty());
}
#[test]
fn test_get_raw_u64_on_dict_and_int8_vector_returns_none() {
let mut builder = DictionaryBuilder::new();
builder.add("Vincent");
let dict_col = ColumnCodec::Dict(builder.build());
assert_eq!(dict_col.get_raw_u64(0), None);
let vec_col = ColumnCodec::int8_vector(vec![1i8, 2, 3], 3);
assert_eq!(vec_col.get_raw_u64(0), None);
}
#[test]
fn test_get_int8_vector_on_dict_and_bitmap_returns_none() {
let mut builder = DictionaryBuilder::new();
builder.add("Jules");
let dict_col = ColumnCodec::Dict(builder.build());
assert_eq!(dict_col.get_int8_vector(0), None);
let bm_col = ColumnCodec::Bitmap(BitVector::from_bools(&[true, false]));
assert_eq!(bm_col.get_int8_vector(0), None);
}
#[test]
fn test_find_in_range_dict_exclusive_bounds() {
let mut builder = DictionaryBuilder::new();
for name in ["Amsterdam", "Berlin", "Paris", "Prague"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
let result = col.find_in_range(
Some(&Value::String("Amsterdam".into())),
Some(&Value::String("Prague".into())),
false,
false,
);
assert_eq!(result, vec![1, 2]);
}
#[test]
fn test_find_in_range_dict_open_bounds() {
let mut builder = DictionaryBuilder::new();
for name in ["Amsterdam", "Berlin", "Paris", "Prague"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
let result = col.find_in_range(None, Some(&Value::String("Berlin".into())), true, true);
assert_eq!(result, vec![0, 1]);
let result = col.find_in_range(Some(&Value::String("Paris".into())), None, true, true);
assert_eq!(result, vec![2, 3]);
}
#[test]
fn test_find_in_range_fallback_uncomparable_skips_rows() {
let data = vec![1i8, 2, 3];
let col = ColumnCodec::int8_vector(data, 3);
let min = Value::Int64(0);
let max = Value::Int64(10);
let result = col.find_in_range(Some(&min), None, true, true);
assert!(result.is_empty());
let result = col.find_in_range(None, Some(&max), true, true);
assert!(result.is_empty());
}
#[test]
fn test_write_to_read_from_bitpacked_round_trip() {
let values = vec![0u64, 5, 10, 15, 3, 7];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_to_read_from_dict_round_trip() {
let mut builder = DictionaryBuilder::new();
for name in ["Vincent", "Jules", "Vincent", "Mia"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_to_read_from_bitmap_round_trip() {
let bools = vec![true, false, true, true, false, false, true];
let col = ColumnCodec::Bitmap(BitVector::from_bools(&bools));
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_to_read_from_int8_vector_round_trip() {
let data = vec![1i8, -2, 3, -4, 5, -6, 7, -8, 9, -10, 11, -12];
let col = ColumnCodec::int8_vector(data, 4);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get_int8_vector(i), col.get_int8_vector(i));
}
}
#[test]
fn test_read_from_truncated_discriminant() {
let data: &[u8] = &[];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(data), &mut pos).unwrap_err();
assert_eq!(err, "truncated codec discriminant");
}
#[test]
fn test_read_from_unknown_discriminant() {
let data: &[u8] = &[42];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(data), &mut pos).unwrap_err();
assert_eq!(err, "unknown codec discriminant");
}
#[test]
fn test_read_from_truncated_bits_per_value() {
let data: &[u8] = &[0];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(data), &mut pos).unwrap_err();
assert_eq!(err, "truncated bits_per_value");
}
#[test]
fn test_read_from_truncated_bitpacked_word() {
let mut buf = vec![0u8, 4];
buf.extend_from_slice(&1u32.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes());
buf.extend_from_slice(&[0u8, 0, 0]); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated BitPacked data");
}
#[test]
fn test_read_from_dict_truncated_string() {
let mut buf = vec![1u8];
buf.extend_from_slice(&1u32.to_le_bytes()); buf.extend_from_slice(&5u32.to_le_bytes()); buf.extend_from_slice(b"ab");
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated dict string");
}
#[test]
fn test_read_from_dict_invalid_utf8() {
let mut buf = vec![1u8];
buf.extend_from_slice(&1u32.to_le_bytes()); buf.extend_from_slice(&2u32.to_le_bytes()); buf.extend_from_slice(&[0xFFu8, 0xFE]); let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "invalid UTF-8 in dict");
}
#[test]
fn test_read_from_int8_vector_truncated_data() {
let mut buf = vec![3u8];
buf.extend_from_slice(&2u16.to_le_bytes()); buf.extend_from_slice(&6u32.to_le_bytes()); buf.extend_from_slice(&[1u8, 2, 3]);
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated Int8Vector data");
}
#[test]
fn test_read_from_int8_vector_truncated_dimensions() {
let buf = vec![3u8, 0];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated u16");
}
#[test]
fn test_read_from_bitmap_truncated() {
let buf = vec![2u8, 0, 0];
let mut pos = 0;
let err =
ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos).unwrap_err();
assert_eq!(err, "truncated u32");
}
#[test]
fn test_write_to_read_from_empty_bitpacked() {
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&[]));
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(decoded.len(), 0);
assert!(decoded.is_empty());
}
#[test]
fn test_write_to_read_from_empty_bitmap() {
let col = ColumnCodec::Bitmap(BitVector::from_bools(&[]));
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert!(decoded.is_empty());
}
#[test]
fn test_write_to_read_from_empty_int8_vector() {
let col = ColumnCodec::int8_vector(Vec::new(), 4);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(decoded.len(), 0);
}
#[test]
fn test_raw_i64_get_decodes_as_int64() {
let col = ColumnCodec::raw_i64(vec![-100, 0, 42, i64::MIN, i64::MAX]);
assert_eq!(col.len(), 5);
assert_eq!(col.get(0), Some(Value::Int64(-100)));
assert_eq!(col.get(1), Some(Value::Int64(0)));
assert_eq!(col.get(2), Some(Value::Int64(42)));
assert_eq!(col.get(3), Some(Value::Int64(i64::MIN)));
assert_eq!(col.get(4), Some(Value::Int64(i64::MAX)));
assert_eq!(col.get(5), None);
}
#[test]
fn test_raw_i64_find_eq() {
let col = ColumnCodec::raw_i64(vec![-50, 10, -50, 20, 0, -50]);
assert_eq!(col.find_eq(&Value::Int64(-50)), vec![0, 2, 5]);
assert_eq!(col.find_eq(&Value::Int64(10)), vec![1]);
assert_eq!(col.find_eq(&Value::Int64(0)), vec![4]);
assert_eq!(col.find_eq(&Value::Int64(999)), Vec::<usize>::new());
assert_eq!(col.find_eq(&Value::Float64(10.0)), Vec::<usize>::new());
}
#[test]
fn test_raw_i64_find_in_range_signed_ordering() {
let col = ColumnCodec::raw_i64(vec![-10, -5, 0, 5, 10, -100, 100]);
let result = col.find_in_range(Some(&Value::Int64(-5)), Some(&Value::Int64(5)), true, true);
assert_eq!(result, vec![1, 2, 3]);
let result = col.find_in_range(
Some(&Value::Int64(-5)),
Some(&Value::Int64(5)),
false,
false,
);
assert_eq!(result, vec![2]);
let result = col.find_in_range(None, Some(&Value::Int64(0)), false, false);
assert_eq!(result, vec![0, 1, 5]);
let result = col.find_in_range(Some(&Value::Int64(10)), None, true, true);
assert_eq!(result, vec![4, 6]);
}
#[test]
fn test_write_to_read_from_raw_i64_round_trip() {
let col = ColumnCodec::raw_i64(vec![-42, 0, 1, i64::MIN, i64::MAX, -1_000_000_000]);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(pos, buf.len());
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i));
}
}
#[test]
fn test_write_to_read_from_empty_raw_i64() {
let col = ColumnCodec::raw_i64(Vec::new());
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let decoded = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("decode should succeed");
assert_eq!(decoded.len(), 0);
}
#[test]
fn test_raw_i64_heap_bytes() {
let col = ColumnCodec::raw_i64(vec![-1, 2, -3]);
assert_eq!(col.heap_bytes(), 3 * std::mem::size_of::<i64>());
let empty = ColumnCodec::raw_i64(Vec::new());
assert_eq!(empty.heap_bytes(), 0);
}
#[test]
fn alix_block_count_is_one_for_every_codec() {
let bp = ColumnCodec::BitPacked(BitPackedInts::pack(&[1, 2, 3]));
assert_eq!(bp.block_count(), 1);
let mut b = DictionaryBuilder::new();
b.add("x");
let dict = ColumnCodec::Dict(b.build());
assert_eq!(dict.block_count(), 1);
let bm = ColumnCodec::Bitmap(BitVector::from_bools(&[true, false]));
assert_eq!(bm.block_count(), 1);
let iv = ColumnCodec::int8_vector(vec![1i8, 2, 3, 4], 2);
assert_eq!(iv.block_count(), 1);
let f64 = ColumnCodec::float64(vec![1.0, 2.0]);
assert_eq!(f64.block_count(), 1);
let fv = ColumnCodec::float32_vector(vec![1.0f32, 2.0, 3.0, 4.0], 2);
assert_eq!(fv.block_count(), 1);
let r64 = ColumnCodec::raw_i64(vec![-1, 2, -3]);
assert_eq!(r64.block_count(), 1);
}
#[test]
fn gus_block_at_zero_carries_full_row_count() {
let bp = ColumnCodec::BitPacked(BitPackedInts::pack(&[1, 2, 3, 4, 5]));
let entry = bp.block_at(0).expect("block 0 exists");
assert_eq!(entry.row_count, 5);
let r64 = ColumnCodec::raw_i64(vec![-1, 2, -3, 4]);
let entry = r64.block_at(0).expect("block 0 exists");
assert_eq!(entry.row_count, 4);
}
#[test]
fn vincent_empty_column_has_one_zero_row_block() {
let empty = ColumnCodec::raw_i64(Vec::new());
assert_eq!(empty.block_count(), 1);
let entry = empty.block_at(0).expect("zero-row block at 0");
assert_eq!(entry.row_count, 0);
}
#[test]
fn jules_block_at_out_of_bounds_returns_none() {
let bp = ColumnCodec::BitPacked(BitPackedInts::pack(&[1, 2, 3]));
assert!(bp.block_at(1).is_none());
assert!(bp.block_at(usize::MAX).is_none());
}
#[test]
fn mia_block_iter_yields_block_count_entries() {
let r64 = ColumnCodec::raw_i64(vec![-1, 2, -3, 4]);
let entries: Vec<_> = r64.block_iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].row_count, 4);
}
#[test]
fn butch_block_row_counts_sum_to_column_len() {
for col in [
ColumnCodec::BitPacked(BitPackedInts::pack(&[1, 2, 3, 4, 5, 6, 7])),
ColumnCodec::float64(vec![1.0, 2.0, 3.0]),
ColumnCodec::raw_i64(vec![-1, 2, -3, 4, -5]),
] {
let total: u32 = col.block_iter().map(|b| b.row_count).sum();
assert_eq!(total as usize, col.len());
}
}
fn assert_round_trip_v2_equals(col: &ColumnCodec) {
let mut buf = Vec::new();
col.write_to_v2(&mut buf);
let mut pos = 0;
let recovered = ColumnCodec::read_from_v2(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v2 round-trip");
assert_eq!(pos, buf.len(), "v2 reader should consume entire buffer");
assert_eq!(recovered.len(), col.len(), "len after v2 round-trip");
for i in 0..col.len() {
assert_eq!(
recovered.get(i),
col.get(i),
"value at row {i} after v2 round-trip"
);
}
}
#[test]
fn django_v2_round_trip_raw_i64_single_block() {
let col = ColumnCodec::raw_i64(vec![-1, 2, -3, 4, -5]);
assert_eq!(col.block_count(), 1);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_raw_i64_multi_block() {
#[allow(clippy::cast_possible_wrap)]
let values: Vec<i64> = (0..2049i64).map(|i| i - 1024).collect();
let col = ColumnCodec::raw_i64(values);
assert_eq!(col.block_count(), 3);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_bitpacked_multi_block() {
let values: Vec<u64> = (0..2500u64).map(|i| i % 16).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
assert!(col.block_count() >= 2, "expect multi-block at 2500 rows");
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_dict_multi_block() {
let mut b = DictionaryBuilder::new();
for i in 0..1500u32 {
b.add(if i % 3 == 0 {
"alpha"
} else if i % 3 == 1 {
"beta"
} else {
"gamma"
});
}
let col = ColumnCodec::Dict(b.build());
assert_eq!(col.block_count(), 2);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_bitmap_multi_block() {
let bools: Vec<bool> = (0..1100u32).map(|i| i % 2 == 0).collect();
let col = ColumnCodec::Bitmap(BitVector::from_bools(&bools));
assert!(col.block_count() >= 2);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_float64_multi_block() {
let vals: Vec<f64> = (0..1100u32).map(|i| f64::from(i) * 0.5).collect();
let col = ColumnCodec::float64(vals);
assert!(col.block_count() >= 2);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_int8_vector_multi_block() {
#[allow(clippy::cast_possible_wrap)]
let data: Vec<i8> = (0..4400u32).map(|i| (i % 200) as i8).collect();
let col = ColumnCodec::int8_vector(data, 4);
assert!(col.block_count() >= 2);
assert_round_trip_v2_equals(&col);
}
#[test]
fn django_v2_round_trip_float32_vector_multi_block() {
let data: Vec<f32> = (0..4400u32).map(|i| i as f32 * 0.5).collect();
let col = ColumnCodec::float32_vector(data, 4);
assert!(col.block_count() >= 2);
assert_round_trip_v2_equals(&col);
}
#[test]
fn shosanna_v2_round_trip_empty_column() {
let col = ColumnCodec::raw_i64(Vec::new());
assert_round_trip_v2_equals(&col);
}
#[test]
fn beatrix_v2_block_index_with_gap_is_rejected() {
let col =
ColumnCodec::raw_i64((0..(crate::codec::DEFAULT_BLOCK_ROWS as i64) + 4).collect());
assert!(col.block_count() >= 2, "need a multi-block column");
let mut buf = Vec::new();
col.write_to_v2(&mut buf);
let original_offset = u32::from_le_bytes(buf[17..21].try_into().unwrap());
let bumped = original_offset + 16;
buf[17..21].copy_from_slice(&bumped.to_le_bytes());
let mut pos = 0;
let result = ColumnCodec::read_from_v2(&bytes::Bytes::copy_from_slice(&buf), &mut pos);
assert!(
result.is_err(),
"reader must reject block index with a gap, got {result:?}"
);
}
#[test]
fn shosanna_v2_block_index_with_overlap_is_rejected() {
let col = ColumnCodec::float64(
(0..(crate::codec::DEFAULT_BLOCK_ROWS as i64) + 8)
.map(|i| i as f64)
.collect(),
);
assert!(col.block_count() >= 2);
let mut buf = Vec::new();
col.write_to_v2(&mut buf);
buf[17..21].copy_from_slice(&0u32.to_le_bytes());
let mut pos = 0;
let result = ColumnCodec::read_from_v2(&bytes::Bytes::copy_from_slice(&buf), &mut pos);
assert!(
result.is_err(),
"reader must reject overlapping block index, got {result:?}"
);
}
#[test]
fn vincent_zero_dimension_float32_vector_v2_round_trip() {
let col = ColumnCodec::float32_vector(Vec::new(), 0);
assert_eq!(col.len(), 0);
assert_round_trip_v2_equals(&col);
let mut buf = Vec::new();
col.write_to_v2(&mut buf);
let mut pos = 0;
let recovered = ColumnCodec::read_from_v2(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v2 round-trip");
assert_eq!(recovered.len(), 0);
}
#[test]
fn hans_v1_and_v2_produce_different_bytes() {
let col = ColumnCodec::raw_i64(vec![1, 2, 3, 4, 5]);
let mut v1 = Vec::new();
col.write_to(&mut v1);
let mut v2 = Vec::new();
col.write_to_v2(&mut v2);
assert_ne!(v1, v2, "v1 and v2 layouts must differ");
}
#[test]
fn beatrix_v1_round_trip_still_works() {
let col = ColumnCodec::raw_i64(vec![-1, 2, -3, 4, -5]);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let recovered = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v1 round-trip");
assert_eq!(recovered.len(), col.len());
for i in 0..col.len() {
assert_eq!(recovered.get(i), col.get(i));
}
}
#[test]
fn test_column_codec_raw_i64_v3_round_trip() {
let values: Vec<i64> = (0..200i64).map(|i| (i * 7 - 100) % 991).collect();
let col = ColumnCodec::raw_i64(values);
let mut buf = Vec::new();
col.write_to_v3(&mut buf, None);
let mut pos = 0;
let (decoded, _stats) =
ColumnCodec::read_from_v3(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v3 round-trip");
assert_eq!(pos, buf.len(), "v3 reader should consume entire buffer");
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i), "value at row {i}");
}
let target = col.get(42).expect("row 42 exists");
assert_eq!(decoded.find_eq(&target), col.find_eq(&target));
assert!(!col.find_eq(&target).is_empty());
}
#[test]
fn test_column_codec_float64_v3_round_trip() {
let values: Vec<f64> = (0..200u32)
.map(|i| f64::from(i) * std::f64::consts::PI)
.collect();
let col = ColumnCodec::float64(values);
let mut buf = Vec::new();
col.write_to_v3(&mut buf, None);
let mut pos = 0;
let (decoded, _stats) =
ColumnCodec::read_from_v3(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v3 round-trip");
assert_eq!(pos, buf.len(), "v3 reader should consume entire buffer");
assert_eq!(decoded.len(), col.len());
for i in 0..col.len() {
assert_eq!(decoded.get(i), col.get(i), "value at row {i}");
}
let target = Value::Float64(0.0);
assert_eq!(decoded.find_eq(&target), col.find_eq(&target));
assert!(!col.find_eq(&target).is_empty());
}
#[test]
fn test_raw_i64_constructor_round_trip() {
let values = vec![-100i64, -1, 0, 1, 100, i64::MIN, i64::MAX];
let col = ColumnCodec::raw_i64(values.clone());
assert_eq!(col.len(), values.len());
for (i, &expected) in values.iter().enumerate() {
assert_eq!(col.get(i), Some(Value::Int64(expected)));
}
assert_eq!(col.get(values.len()), None);
}
#[test]
fn test_float64_constructor_round_trip() {
let values = vec![-1.5_f64, 0.0, 100.25, f64::MIN, f64::MAX];
let col = ColumnCodec::float64(values.clone());
assert_eq!(col.len(), values.len());
for (i, &expected) in values.iter().enumerate() {
assert_eq!(col.get(i), Some(Value::Float64(expected)));
}
}
#[test]
fn test_int8_vector_constructor_round_trip() {
let col = ColumnCodec::int8_vector(vec![1i8, 2, 3, -4, -5, -6], 3);
assert_eq!(col.len(), 2);
let v0 = col.get(0).unwrap();
let expected0: Vec<Value> = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
assert_eq!(v0, Value::List(Arc::from(expected0)));
assert_eq!(col.get_int8_vector(0), Some(&[1i8, 2, 3][..]));
assert_eq!(col.get_int8_vector(1), Some(&[-4i8, -5, -6][..]));
}
#[test]
fn test_float32_vector_constructor_round_trip() {
let col = ColumnCodec::float32_vector(vec![1.0_f32, 2.0, 3.0, 4.0, 5.0, 6.0], 3);
assert_eq!(col.len(), 2);
match col.get(0) {
Some(Value::Vector(v)) => {
assert_eq!(&*v, &[1.0_f32, 2.0, 3.0]);
}
other => panic!("expected Vector, got {other:?}"),
}
}
#[test]
fn test_bytes_backed_zero_copy_clone() {
let big: Vec<i64> = (0..10_000).collect();
let col = ColumnCodec::raw_i64(big);
let cloned = col.clone();
assert_eq!(col.len(), cloned.len());
for i in (0..10_000).step_by(1024) {
assert_eq!(col.get(i), cloned.get(i));
}
}
#[test]
fn test_raw_i64_v1_round_trip_with_bytes_storage() {
let col = ColumnCodec::raw_i64(vec![-7, 0, 7, 42]);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let recovered = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v1 round-trip");
assert_eq!(recovered.len(), col.len());
for i in 0..col.len() {
assert_eq!(recovered.get(i), col.get(i));
}
}
#[test]
fn test_float64_v1_round_trip_with_bytes_storage() {
let col = ColumnCodec::float64(vec![-2.5, 0.0, 1.0, std::f64::consts::PI]);
let mut buf = Vec::new();
col.write_to(&mut buf);
let mut pos = 0;
let recovered = ColumnCodec::read_from(&bytes::Bytes::copy_from_slice(&buf), &mut pos)
.expect("v1 round-trip");
assert_eq!(recovered.len(), col.len());
for i in 0..col.len() {
assert_eq!(recovered.get(i), col.get(i));
}
}
use crate::graph::compact::zone_map::compute_block_zone_maps;
fn raw_i64_seq(n: i64) -> ColumnCodec {
ColumnCodec::raw_i64((0..n).collect())
}
#[test]
fn alix_range_iter_matches_find_in_range_full_scan() {
let col = raw_i64_seq(50);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(10);
let max = Value::Int64(20);
let from_iter: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let eager = col.find_in_range(Some(&min), Some(&max), true, true);
assert_eq!(from_iter, eager);
}
#[test]
fn gus_range_iter_skips_disjoint_blocks() {
let col = raw_i64_seq(3072);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(1500);
let max = Value::Int64(1700);
let from_iter: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let expected: Vec<usize> = (1500..=1700).collect();
assert_eq!(from_iter, expected);
}
#[test]
fn vincent_range_iter_open_min_bound() {
let col = raw_i64_seq(50);
let zm = compute_block_zone_maps(&col);
let max = Value::Int64(10);
let result: Vec<usize> = col
.range_iter(Some(&zm), None, Some(&max), false, true)
.collect();
let expected: Vec<usize> = (0..=10).collect();
assert_eq!(result, expected);
}
#[test]
fn jules_range_iter_open_max_bound() {
let col = raw_i64_seq(20);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(15);
let result: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), None, true, false)
.collect();
let expected: Vec<usize> = (15..20).collect();
assert_eq!(result, expected);
}
#[test]
fn mia_range_iter_no_zone_maps_falls_back_to_full_scan() {
let col = raw_i64_seq(100);
let min = Value::Int64(40);
let max = Value::Int64(60);
let result: Vec<usize> = col
.range_iter(None, Some(&min), Some(&max), true, true)
.collect();
let expected = col.find_in_range(Some(&min), Some(&max), true, true);
assert_eq!(result, expected);
}
#[test]
fn butch_range_iter_empty_column_yields_nothing() {
let col = raw_i64_seq(0);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(0);
let result: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), None, true, false)
.collect();
assert!(result.is_empty());
}
#[test]
fn shosanna_range_iter_string_column() {
let mut b = DictionaryBuilder::new();
for s in ["amsterdam", "berlin", "paris", "prague", "barcelona"] {
b.add(s);
}
let col = ColumnCodec::Dict(b.build());
let zm = compute_block_zone_maps(&col);
let min = Value::from("b");
let max = Value::from("c");
let from_iter: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let eager = col.find_in_range(Some(&min), Some(&max), true, true);
assert_eq!(from_iter, eager);
}
#[test]
fn hans_range_iter_bitpacked_negative_min_bound() {
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&(0..20u64).collect::<Vec<_>>()));
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(-5);
let max = Value::Int64(10);
let from_iter: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let eager = col.find_in_range(Some(&min), Some(&max), true, true);
assert_eq!(from_iter, eager);
}
#[test]
fn beatrix_range_iter_exclusive_bounds() {
let col = raw_i64_seq(50);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(10);
let max = Value::Int64(20);
let result: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), false, false)
.collect();
let expected: Vec<usize> = (11..20).collect();
assert_eq!(result, expected);
}
#[test]
fn django_range_iter_float64_with_nan_in_column() {
let col = ColumnCodec::float64(vec![1.0, f64::NAN, 2.0, 3.0]);
let zm = compute_block_zone_maps(&col);
let min = Value::Float64(0.5);
let max = Value::Float64(4.0);
let from_iter: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let eager = col.find_in_range(Some(&min), Some(&max), true, true);
assert_eq!(from_iter, eager);
assert!(
!from_iter.contains(&1),
"NaN row offset 1 must not appear in range result"
);
}
#[test]
fn tarantino_range_iter_yields_offsets_in_ascending_order() {
let col = raw_i64_seq(2048);
let zm = compute_block_zone_maps(&col);
let min = Value::Int64(500);
let max = Value::Int64(1500);
let result: Vec<usize> = col
.range_iter(Some(&zm), Some(&min), Some(&max), true, true)
.collect();
let mut sorted = result.clone();
sorted.sort_unstable();
assert_eq!(result, sorted, "iterator output must be sorted ascending");
}
}