use std::sync::Arc;
use arcstr::ArcStr;
use grafeo_common::types::Value;
use crate::codec::{BitPackedInts, BitVector, DictionaryEncoding};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ColumnCodec {
BitPacked(BitPackedInts),
Dict(DictionaryEncoding),
Bitmap(BitVector),
Int8Vector {
data: Vec<i8>,
dimensions: u16,
},
}
impl ColumnCodec {
#[inline]
#[must_use]
pub fn get(&self, index: usize) -> Option<Value> {
match self {
Self::BitPacked(bp) => bp.get(index).map(|v| {
#[allow(clippy::cast_possible_wrap)]
let val = Value::Int64(v as i64);
val
}),
Self::Dict(dict) => dict.get(index).map(|s| Value::String(ArcStr::from(s))),
Self::Bitmap(bv) => bv.get(index).map(Value::Bool),
Self::Int8Vector { data, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 {
return None;
}
let start = index.checked_mul(dims)?;
let end = start.checked_add(dims)?;
if end > data.len() {
return None;
}
let values: Vec<Value> = data[start..end]
.iter()
.map(|&v| Value::Int64(v as i64))
.collect();
Some(Value::List(Arc::from(values)))
}
}
}
#[inline]
#[must_use]
pub fn get_raw_u64(&self, index: usize) -> Option<u64> {
match self {
Self::BitPacked(bp) => bp.get(index),
_ => None,
}
}
#[must_use]
pub fn get_int8_vector(&self, index: usize) -> Option<&[i8]> {
match self {
Self::Int8Vector { data, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 {
return None;
}
let start = index.checked_mul(dims)?;
let end = start.checked_add(dims)?;
if end > data.len() {
return None;
}
Some(&data[start..end])
}
_ => None,
}
}
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::BitPacked(bp) => bp.len(),
Self::Dict(dict) => dict.len(),
Self::Bitmap(bv) => bv.len(),
Self::Int8Vector { data, dimensions } => {
let dims = *dimensions as usize;
if dims == 0 { 0 } else { data.len() / dims }
}
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn find_eq(&self, target: &Value) -> Vec<usize> {
match (self, target) {
(Self::BitPacked(bp), &Value::Int64(v)) => {
if v < 0 {
return Vec::new();
}
#[allow(clippy::cast_sign_loss)]
let target_u64 = v as u64;
(0..bp.len())
.filter(|&i| bp.get(i) == Some(target_u64))
.collect()
}
(Self::Dict(dict), Value::String(s)) => match dict.encode(s.as_str()) {
Some(code) => dict.filter_by_code(|c| c == code),
None => Vec::new(),
},
(Self::Bitmap(bv), &Value::Bool(target_bool)) => (0..bv.len())
.filter(|&i| bv.get(i) == Some(target_bool))
.collect(),
_ => (0..self.len())
.filter(|&i| self.get(i).as_ref() == Some(target))
.collect(),
}
}
pub fn find_in_range(
&self,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<usize> {
if let Self::BitPacked(bp) = self {
let min_u64 = match min {
#[allow(clippy::cast_sign_loss)]
Some(&Value::Int64(v)) if v >= 0 => Some(v as u64),
Some(&Value::Int64(_)) => Some(0),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
let max_u64 = match max {
#[allow(clippy::cast_sign_loss)]
Some(&Value::Int64(v)) if v >= 0 => Some(v as u64),
Some(&Value::Int64(v)) if v < 0 => return Vec::new(),
None => None,
_ => return self.find_in_range_fallback(min, max, min_inclusive, max_inclusive),
};
return (0..bp.len())
.filter(|&i| {
if let Some(v) = bp.get(i) {
let above_min = match min_u64 {
Some(lo) if min_inclusive => v >= lo,
Some(lo) => v > lo,
None => true,
};
let below_max = match max_u64 {
Some(hi) if max_inclusive => v <= hi,
Some(hi) => v < hi,
None => true,
};
above_min && below_max
} else {
false
}
})
.collect();
}
self.find_in_range_fallback(min, max, min_inclusive, max_inclusive)
}
fn find_in_range_fallback(
&self,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<usize> {
use super::zone_map::compare_values;
(0..self.len())
.filter(|&i| {
let Some(v) = self.get(i) else {
return false;
};
if let Some(min_val) = min {
match compare_values(&v, min_val) {
Some(std::cmp::Ordering::Less) => return false,
Some(std::cmp::Ordering::Equal) if !min_inclusive => return false,
None => return false,
_ => {}
}
}
if let Some(max_val) = max {
match compare_values(&v, max_val) {
Some(std::cmp::Ordering::Greater) => return false,
Some(std::cmp::Ordering::Equal) if !max_inclusive => return false,
None => return false,
_ => {}
}
}
true
})
.collect()
}
pub fn write_to(&self, buf: &mut Vec<u8>) {
match self {
Self::BitPacked(bp) => {
buf.push(0); buf.push(bp.bits_per_value());
write_usize_as_u32(buf, bp.len());
let data = bp.data();
write_usize_as_u32(buf, data.len());
for &word in data {
buf.extend_from_slice(&word.to_le_bytes());
}
}
Self::Dict(dict) => {
buf.push(1); let dict_entries = dict.dictionary();
write_usize_as_u32(buf, dict_entries.len());
for entry in dict_entries.iter() {
let s = entry.as_ref().as_bytes();
write_usize_as_u32(buf, s.len());
buf.extend_from_slice(s);
}
let codes = dict.codes();
write_usize_as_u32(buf, codes.len());
for &code in codes {
buf.extend_from_slice(&code.to_le_bytes());
}
}
Self::Bitmap(bv) => {
buf.push(2); write_usize_as_u32(buf, bv.len());
let data = bv.data();
write_usize_as_u32(buf, data.len());
for &word in data {
buf.extend_from_slice(&word.to_le_bytes());
}
}
Self::Int8Vector { data, dimensions } => {
buf.push(3); buf.extend_from_slice(&dimensions.to_le_bytes());
write_usize_as_u32(buf, data.len());
for &v in data {
buf.push(v.to_le_bytes()[0]);
}
}
}
}
pub fn read_from(data: &[u8], pos: &mut usize) -> Result<Self, &'static str> {
let discriminant = *data.get(*pos).ok_or("truncated codec discriminant")?;
*pos += 1;
match discriminant {
0 => {
let bits = *data.get(*pos).ok_or("truncated bits_per_value")?;
*pos += 1;
let count = read_u32_le(data, pos)? as usize;
let data_len = read_u32_le(data, pos)? as usize;
let mut words = Vec::with_capacity(data_len);
for _ in 0..data_len {
words.push(read_u64_le(data, pos)?);
}
Ok(Self::BitPacked(BitPackedInts::from_raw_parts(
words, bits, count,
)))
}
1 => {
let dict_len = read_u32_le(data, pos)? as usize;
let mut entries: Vec<Arc<str>> = Vec::with_capacity(dict_len);
for _ in 0..dict_len {
let slen = read_u32_le(data, pos)? as usize;
if *pos + slen > data.len() {
return Err("truncated dict string");
}
let s = std::str::from_utf8(&data[*pos..*pos + slen])
.map_err(|_| "invalid UTF-8 in dict")?;
entries.push(Arc::from(s));
*pos += slen;
}
let codes_len = read_u32_le(data, pos)? as usize;
let mut codes = Vec::with_capacity(codes_len);
for _ in 0..codes_len {
codes.push(read_u32_le(data, pos)?);
}
Ok(Self::Dict(DictionaryEncoding::new(
Arc::from(entries.into_boxed_slice()),
codes,
)))
}
2 => {
let bit_len = read_u32_le(data, pos)? as usize;
let data_len = read_u32_le(data, pos)? as usize;
let mut words = Vec::with_capacity(data_len);
for _ in 0..data_len {
words.push(read_u64_le(data, pos)?);
}
Ok(Self::Bitmap(BitVector::from_raw_parts(words, bit_len)))
}
3 => {
let dimensions = read_u16_le(data, pos)?;
let data_len = read_u32_le(data, pos)? as usize;
if *pos + data_len > data.len() {
return Err("truncated Int8Vector data");
}
let bytes = &data[*pos..*pos + data_len];
let i8_data: Vec<i8> = bytes.iter().map(|&b| i8::from_le_bytes([b])).collect();
*pos += data_len;
Ok(Self::Int8Vector {
data: i8_data,
dimensions,
})
}
_ => Err("unknown codec discriminant"),
}
}
#[must_use]
pub fn heap_bytes(&self) -> usize {
match self {
Self::BitPacked(bp) => bp.data().len() * std::mem::size_of::<u64>(),
Self::Dict(d) => {
let codes_bytes = d.codes().len() * std::mem::size_of::<u32>();
let dict_bytes: usize = d.dictionary().iter().map(|s| s.len()).sum();
codes_bytes + dict_bytes
}
Self::Bitmap(bv) => bv.data().len() * std::mem::size_of::<u64>(),
Self::Int8Vector { data, .. } => data.len(),
}
}
}
fn write_usize_as_u32(buf: &mut Vec<u8>, v: usize) {
let n = u32::try_from(v).expect("value exceeds u32::MAX in compact codec serialization");
buf.extend_from_slice(&n.to_le_bytes());
}
fn read_u16_le(data: &[u8], pos: &mut usize) -> Result<u16, &'static str> {
if *pos + 2 > data.len() {
return Err("truncated u16");
}
let v = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
*pos += 2;
Ok(v)
}
fn read_u32_le(data: &[u8], pos: &mut usize) -> Result<u32, &'static str> {
if *pos + 4 > data.len() {
return Err("truncated u32");
}
let v = u32::from_le_bytes([data[*pos], data[*pos + 1], data[*pos + 2], data[*pos + 3]]);
*pos += 4;
Ok(v)
}
fn read_u64_le(data: &[u8], pos: &mut usize) -> Result<u64, &'static str> {
if *pos + 8 > data.len() {
return Err("truncated u64");
}
let v = u64::from_le_bytes(data[*pos..*pos + 8].try_into().unwrap());
*pos += 8;
Ok(v)
}
#[cfg(test)]
#[allow(clippy::cast_possible_wrap)]
mod tests {
use super::*;
use crate::codec::{BitPackedInts, BitVector, DictionaryBuilder};
#[test]
fn test_bitpacked_round_trip() {
let values = vec![0u64, 5, 10, 15, 3, 7];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.len(), 6);
assert!(!col.is_empty());
for (i, &expected) in values.iter().enumerate() {
let v = col.get(i).unwrap();
assert_eq!(v, Value::Int64(expected as i64));
}
}
#[test]
fn test_dict_round_trip() {
let mut builder = DictionaryBuilder::new();
builder.add("alpha");
builder.add("beta");
builder.add("alpha");
let dict = builder.build();
let col = ColumnCodec::Dict(dict);
assert_eq!(col.len(), 3);
assert_eq!(col.get(0), Some(Value::String(ArcStr::from("alpha"))));
assert_eq!(col.get(1), Some(Value::String(ArcStr::from("beta"))));
assert_eq!(col.get(2), Some(Value::String(ArcStr::from("alpha"))));
}
#[test]
fn test_bitmap_round_trip() {
let bools = vec![true, false, true, true, false];
let bv = BitVector::from_bools(&bools);
let col = ColumnCodec::Bitmap(bv);
assert_eq!(col.len(), 5);
assert_eq!(col.get(0), Some(Value::Bool(true)));
assert_eq!(col.get(1), Some(Value::Bool(false)));
assert_eq!(col.get(2), Some(Value::Bool(true)));
assert_eq!(col.get(3), Some(Value::Bool(true)));
assert_eq!(col.get(4), Some(Value::Bool(false)));
}
#[test]
fn test_int8_vector_round_trip() {
let data = vec![1i8, 2, 3, -4, -5, -6];
let col = ColumnCodec::Int8Vector {
data,
dimensions: 3,
};
assert_eq!(col.len(), 2);
let v0 = col.get(0).unwrap();
let expected0: Vec<Value> = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
assert_eq!(v0, Value::List(Arc::from(expected0)));
let v1 = col.get(1).unwrap();
let expected1: Vec<Value> = vec![Value::Int64(-4), Value::Int64(-5), Value::Int64(-6)];
assert_eq!(v1, Value::List(Arc::from(expected1)));
}
#[test]
fn test_get_raw_u64_on_bitpacked() {
let values = vec![100u64, 200, 300];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get_raw_u64(0), Some(100));
assert_eq!(col.get_raw_u64(1), Some(200));
assert_eq!(col.get_raw_u64(2), Some(300));
assert_eq!(col.get_raw_u64(3), None);
let bv = BitVector::from_bools(&[true]);
let bm_col = ColumnCodec::Bitmap(bv);
assert_eq!(bm_col.get_raw_u64(0), None);
}
#[test]
fn test_get_int8_vector_slice() {
let data = vec![10i8, 20, 30, 40, 50, 60];
let col = ColumnCodec::Int8Vector {
data,
dimensions: 3,
};
assert_eq!(col.get_int8_vector(0), Some(&[10i8, 20, 30][..]));
assert_eq!(col.get_int8_vector(1), Some(&[40i8, 50, 60][..]));
assert_eq!(col.get_int8_vector(2), None);
let bp = BitPackedInts::pack(&[1u64]);
let bp_col = ColumnCodec::BitPacked(bp);
assert_eq!(bp_col.get_int8_vector(0), None);
}
#[test]
fn test_out_of_bounds_returns_none() {
let bp = BitPackedInts::pack(&[1u64, 2, 3]);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get(999), None);
assert_eq!(col.get_raw_u64(999), None);
let bv = BitVector::from_bools(&[true]);
let bm = ColumnCodec::Bitmap(bv);
assert_eq!(bm.get(5), None);
let mut builder = DictionaryBuilder::new();
builder.add("x");
let dict = builder.build();
let dc = ColumnCodec::Dict(dict);
assert_eq!(dc.get(10), None);
let vec_col = ColumnCodec::Int8Vector {
data: vec![1, 2],
dimensions: 2,
};
assert_eq!(vec_col.get(1), None);
assert_eq!(vec_col.get_int8_vector(1), None);
}
#[test]
fn test_find_eq_bitpacked() {
let values = vec![0u64, 5, 10, 5, 3, 5];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.find_eq(&Value::Int64(5)), vec![1, 3, 5]);
assert_eq!(col.find_eq(&Value::Int64(0)), vec![0]);
assert_eq!(col.find_eq(&Value::Int64(99)), Vec::<usize>::new());
assert_eq!(col.find_eq(&Value::Int64(-1)), Vec::<usize>::new());
}
#[test]
fn test_find_eq_dict() {
let mut builder = DictionaryBuilder::new();
for name in ["Vincent", "Jules", "Vincent", "Mia", "Jules"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
assert_eq!(col.find_eq(&Value::String("Vincent".into())), vec![0, 2]);
assert_eq!(col.find_eq(&Value::String("Mia".into())), vec![3]);
assert_eq!(
col.find_eq(&Value::String("Butch".into())),
Vec::<usize>::new()
);
}
#[test]
fn test_find_eq_bitmap() {
let bools = vec![true, false, true, true, false];
let col = ColumnCodec::Bitmap(BitVector::from_bools(&bools));
assert_eq!(col.find_eq(&Value::Bool(true)), vec![0, 2, 3]);
assert_eq!(col.find_eq(&Value::Bool(false)), vec![1, 4]);
}
#[test]
fn test_find_eq_type_mismatch_uses_fallback() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
assert_eq!(
col.find_eq(&Value::String("hello".into())),
Vec::<usize>::new()
);
}
#[test]
fn test_find_eq_int8_vector_uses_fallback() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::Int8Vector {
data,
dimensions: 3,
};
let target_vec: Vec<Value> = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
let target = Value::List(Arc::from(target_vec));
let matches = col.find_eq(&target);
assert_eq!(matches, vec![0]);
}
#[test]
fn test_int8_vector_zero_dimensions_get() {
let col = ColumnCodec::Int8Vector {
data: vec![1, 2, 3],
dimensions: 0,
};
assert_eq!(col.get(0), None);
}
#[test]
fn test_int8_vector_zero_dimensions_get_int8_vector() {
let col = ColumnCodec::Int8Vector {
data: vec![1, 2, 3],
dimensions: 0,
};
assert_eq!(col.get_int8_vector(0), None);
}
#[test]
fn test_int8_vector_zero_dimensions_len_and_is_empty() {
let col = ColumnCodec::Int8Vector {
data: vec![1, 2, 3],
dimensions: 0,
};
assert_eq!(col.len(), 0);
assert!(col.is_empty());
}
#[test]
fn test_heap_bytes_bitpacked() {
let values = vec![0u64, 5, 10, 15];
let bp = BitPackedInts::pack(&values);
let col = ColumnCodec::BitPacked(bp);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_dict() {
let mut builder = DictionaryBuilder::new();
builder.add("Amsterdam");
builder.add("Berlin");
builder.add("Paris");
let dict = builder.build();
let col = ColumnCodec::Dict(dict);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_bitmap() {
let bools = vec![true, false, true, true, false];
let bv = BitVector::from_bools(&bools);
let col = ColumnCodec::Bitmap(bv);
assert!(col.heap_bytes() > 0);
}
#[test]
fn test_heap_bytes_int8_vector() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::Int8Vector {
data,
dimensions: 3,
};
assert_eq!(col.heap_bytes(), 6);
}
#[test]
fn test_find_in_range_bitpacked_inclusive() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(3)), Some(&Value::Int64(6)), true, true);
assert_eq!(result, vec![3, 4, 5, 6]);
}
#[test]
fn test_find_in_range_bitpacked_exclusive() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result =
col.find_in_range(Some(&Value::Int64(3)), Some(&Value::Int64(6)), false, false);
assert_eq!(result, vec![4, 5]);
}
#[test]
fn test_find_in_range_bitpacked_open_ended() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(7)), None, false, false);
assert_eq!(result, vec![8, 9]);
let result = col.find_in_range(None, Some(&Value::Int64(2)), false, true);
assert_eq!(result, vec![0, 1, 2]);
}
#[test]
fn test_find_in_range_fallback_for_dict() {
let mut builder = DictionaryBuilder::new();
for name in ["Amsterdam", "Berlin", "Paris", "Prague"] {
builder.add(name);
}
let col = ColumnCodec::Dict(builder.build());
let result = col.find_in_range(
Some(&Value::String("Berlin".into())),
Some(&Value::String("Prague".into())),
true,
true,
);
assert_eq!(result, vec![1, 2, 3]);
}
#[test]
fn test_find_in_range_negative_max() {
let values: Vec<u64> = (0..10).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(None, Some(&Value::Int64(-1)), false, true);
assert!(result.is_empty());
}
#[test]
fn test_find_in_range_negative_min() {
let values: Vec<u64> = (0..5).collect();
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(Some(&Value::Int64(-10)), None, true, true);
assert_eq!(result, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_find_in_range_type_mismatch_uses_fallback() {
let values = vec![1u64, 2, 3];
let col = ColumnCodec::BitPacked(BitPackedInts::pack(&values));
let result = col.find_in_range(
Some(&Value::String("a".into())),
Some(&Value::String("z".into())),
true,
true,
);
assert!(result.is_empty());
}
#[test]
fn test_find_in_range_int8_vector_uses_fallback() {
let data = vec![1i8, 2, 3, 4, 5, 6];
let col = ColumnCodec::Int8Vector {
data,
dimensions: 3,
};
let result = col.find_in_range(Some(&Value::Int64(0)), Some(&Value::Int64(10)), true, true);
assert!(result.is_empty());
}
#[test]
fn test_get_out_of_bounds_all_codecs() {
let bp = BitPackedInts::pack(&[1u64, 2, 3]);
let col = ColumnCodec::BitPacked(bp);
assert_eq!(col.get(3), None);
let mut builder = DictionaryBuilder::new();
builder.add("Alix");
let col = ColumnCodec::Dict(builder.build());
assert_eq!(col.get(1), None);
let bv = BitVector::from_bools(&[true]);
let col = ColumnCodec::Bitmap(bv);
assert_eq!(col.get(1), None);
let col = ColumnCodec::Int8Vector {
data: vec![1, 2, 3],
dimensions: 3,
};
assert_eq!(col.get(1), None);
assert_eq!(col.get_int8_vector(1), None);
}
}