use crate::core::{FieldId, LuciError, Result};
pub(crate) const DICT_BLOCK_SHIFT: u32 = 6;
pub(crate) const DICT_BLOCK_SIZE: usize = 1 << DICT_BLOCK_SHIFT;
pub(crate) const DICT_BLOCK_MASK: u32 = DICT_BLOCK_SIZE as u32 - 1;
#[derive(Clone, Debug)]
pub enum ColumnValue {
Keyword(String),
I64(i64),
F64(f64),
Bool(bool),
Null,
}
impl ColumnValue {
pub fn keyword(s: String) -> Result<ColumnValue> {
if s.len() > u16::MAX as usize {
return Err(LuciError::InvalidValue(format!(
"keyword value is {} bytes, exceeds the maximum of {} bytes",
s.len(),
u16::MAX
)));
}
Ok(ColumnValue::Keyword(s))
}
}
pub struct ColumnWriter {
field_id: FieldId,
values: Vec<ColumnValue>,
}
impl ColumnWriter {
pub fn new(field_id: FieldId) -> Self {
Self {
field_id,
values: Vec::new(),
}
}
pub fn add(&mut self, value: ColumnValue) {
self.values.push(value);
}
pub fn doc_count(&self) -> u32 {
self.values.len() as u32
}
pub fn finish(self) -> Vec<u8> {
if self.values.is_empty() {
return self.write_empty();
}
let col_type = self.detect_type();
match col_type {
ColumnType::Keyword | ColumnType::KeywordBlocked => self.write_keyword_column(),
ColumnType::I64 | ColumnType::ConstantI64 | ColumnType::BitpackedI64 => {
self.write_i64_column()
}
ColumnType::F64 | ColumnType::ConstantF64 => self.write_f64_column(),
ColumnType::Bool => self.write_bool_column(),
ColumnType::Empty => self.write_empty(),
}
}
fn detect_type(&self) -> ColumnType {
for v in &self.values {
match v {
ColumnValue::Keyword(_) => return ColumnType::Keyword,
ColumnValue::I64(_) => return ColumnType::I64,
ColumnValue::F64(_) => return ColumnType::F64,
ColumnValue::Bool(_) => return ColumnType::Bool,
ColumnValue::Null => continue,
}
}
ColumnType::Empty
}
fn write_header(&self, col_type: ColumnType) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
buf.push(col_type as u8);
buf.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
let null_bytes = (self.values.len() + 7) / 8;
let mut null_bitset = vec![0u8; null_bytes];
let mut null_count: u32 = 0;
for (i, v) in self.values.iter().enumerate() {
if matches!(v, ColumnValue::Null) {
null_bitset[i / 8] |= 1 << (i % 8);
null_count += 1;
}
}
buf.extend_from_slice(&null_bitset);
if col_type.is_numeric() {
buf.extend_from_slice(&null_count.to_le_bytes());
let (min_val, max_val) = self.numeric_range();
buf.extend_from_slice(&min_val.to_le_bytes());
buf.extend_from_slice(&max_val.to_le_bytes());
}
buf
}
fn numeric_range(&self) -> (f64, f64) {
let mut min_val = f64::INFINITY;
let mut max_val = f64::NEG_INFINITY;
for v in &self.values {
let n = match v {
ColumnValue::I64(n) => *n as f64,
ColumnValue::F64(n) => *n,
_ => continue,
};
if n < min_val {
min_val = n;
}
if n > max_val {
max_val = n;
}
}
(min_val, max_val)
}
fn write_empty(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
buf.push(ColumnType::Empty as u8);
buf.extend_from_slice(&0u32.to_le_bytes());
buf
}
fn write_keyword_column(self) -> Vec<u8> {
let mut buf = self.write_header(ColumnType::KeywordBlocked);
let mut dict: Vec<String> = Vec::new();
let mut seen = std::collections::HashMap::new();
for v in &self.values {
if let ColumnValue::Keyword(s) = v {
if !seen.contains_key(s.as_str()) {
seen.insert(s.clone(), 0u32); dict.push(s.clone());
}
}
}
dict.sort();
for (i, term) in dict.iter().enumerate() {
seen.insert(term.clone(), i as u32);
}
buf.extend_from_slice(&(dict.len() as u32).to_le_bytes()); let body_len_pos = buf.len();
buf.extend_from_slice(&0u64.to_le_bytes()); let body_start = buf.len();
let mut block_addrs: Vec<u64> = Vec::with_capacity(dict.len().div_ceil(DICT_BLOCK_SIZE));
for (i, term) in dict.iter().enumerate() {
if i % DICT_BLOCK_SIZE == 0 {
block_addrs.push((buf.len() - body_start) as u64);
}
let bytes = term.as_bytes();
debug_assert!(bytes.len() <= u16::MAX as usize);
buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
buf.extend_from_slice(bytes);
}
let body_len = (buf.len() - body_start) as u64;
buf[body_len_pos..body_len_pos + 8].copy_from_slice(&body_len.to_le_bytes());
for a in &block_addrs {
buf.extend_from_slice(&a.to_le_bytes());
}
for v in &self.values {
let ordinal = match v {
ColumnValue::Keyword(s) => *seen.get(s.as_str()).unwrap(),
_ => u32::MAX, };
buf.extend_from_slice(&ordinal.to_le_bytes());
}
buf
}
fn write_i64_column(self) -> Vec<u8> {
if let Some(constant) = self.constant_i64() {
let mut buf = self.write_header(ColumnType::ConstantI64);
buf.extend_from_slice(&constant.to_le_bytes());
return buf;
}
let (min_val, max_val) = self.i64_range();
let range = (max_val as u128).wrapping_sub(min_val as u128);
let bit_width = if range == 0 {
0
} else {
128 - range.leading_zeros()
} as u8;
let raw_bytes = self.values.len() * 8;
let packed_bytes = (self.values.len() * bit_width as usize + 7) / 8;
if bit_width < 64 && packed_bytes + 9 < raw_bytes {
let mut buf = self.write_header(ColumnType::BitpackedI64);
buf.extend_from_slice(&min_val.to_le_bytes());
buf.push(bit_width);
bitpack_i64(&self.values, min_val, bit_width, &mut buf);
return buf;
}
let mut buf = self.write_header(ColumnType::I64);
for v in &self.values {
let val = match v {
ColumnValue::I64(n) => *n,
_ => 0,
};
buf.extend_from_slice(&val.to_le_bytes());
}
buf
}
fn i64_range(&self) -> (i64, i64) {
let mut min_val = i64::MAX;
let mut max_val = i64::MIN;
for v in &self.values {
if let ColumnValue::I64(n) = v {
if *n < min_val {
min_val = *n;
}
if *n > max_val {
max_val = *n;
}
}
}
(min_val, max_val)
}
fn write_f64_column(self) -> Vec<u8> {
if let Some(constant) = self.constant_f64() {
let mut buf = self.write_header(ColumnType::ConstantF64);
buf.extend_from_slice(&constant.to_le_bytes());
return buf;
}
let mut buf = self.write_header(ColumnType::F64);
for v in &self.values {
let val = match v {
ColumnValue::F64(n) => *n,
_ => 0.0,
};
buf.extend_from_slice(&val.to_le_bytes());
}
buf
}
fn constant_i64(&self) -> Option<i64> {
let mut constant: Option<i64> = None;
for v in &self.values {
if let ColumnValue::I64(n) = v {
match constant {
None => constant = Some(*n),
Some(c) if c != *n => return None,
_ => {}
}
}
}
constant
}
fn constant_f64(&self) -> Option<f64> {
let mut constant: Option<f64> = None;
for v in &self.values {
if let ColumnValue::F64(n) = v {
match constant {
None => constant = Some(*n),
Some(c) if c != *n => return None,
_ => {}
}
}
}
constant
}
fn write_bool_column(self) -> Vec<u8> {
let mut buf = self.write_header(ColumnType::Bool);
let bool_bytes = (self.values.len() + 7) / 8;
let mut bitset = vec![0u8; bool_bytes];
for (i, v) in self.values.iter().enumerate() {
if let ColumnValue::Bool(true) = v {
bitset[i / 8] |= 1 << (i % 8);
}
}
buf.extend_from_slice(&bitset);
buf
}
}
pub struct ColumnarWriter {
columns: std::collections::HashMap<FieldId, ColumnWriter>,
}
impl ColumnarWriter {
pub fn new() -> Self {
Self {
columns: std::collections::HashMap::new(),
}
}
pub fn add(&mut self, field_id: FieldId, value: ColumnValue) {
self.columns
.entry(field_id)
.or_insert_with(|| ColumnWriter::new(field_id))
.add(value);
}
pub fn pad_to(&mut self, doc_count: u32) {
for writer in self.columns.values_mut() {
while writer.doc_count() < doc_count {
writer.add(ColumnValue::Null);
}
}
}
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
pub fn finish(self) -> Vec<u8> {
let mut buf = Vec::new();
let mut entries: Vec<(FieldId, ColumnWriter)> = self.columns.into_iter().collect();
entries.sort_by_key(|(fid, _)| *fid);
buf.extend_from_slice(&(entries.len() as u16).to_le_bytes());
for (_, writer) in entries {
buf.extend_from_slice(&writer.finish());
}
buf
}
}
impl Default for ColumnarWriter {
fn default() -> Self {
Self::new()
}
}
fn bitpack_i64(values: &[ColumnValue], min_val: i64, bit_width: u8, buf: &mut Vec<u8>) {
if bit_width == 0 {
return; }
let num_bytes = (values.len() * bit_width as usize + 7) / 8;
let start = buf.len();
buf.resize(start + num_bytes, 0);
let mut bit_pos: usize = 0;
for v in values {
let val = match v {
ColumnValue::I64(n) => *n,
_ => min_val, };
let residual = (val - min_val) as u64;
let mut remaining = bit_width as usize;
let mut bits = residual;
let mut pos = bit_pos;
while remaining > 0 {
let byte_idx = start + pos / 8;
let bit_offset = pos % 8;
let can_write = (8 - bit_offset).min(remaining);
let mask = ((1u64 << can_write) - 1) as u8;
buf[byte_idx] |= ((bits as u8) & mask) << bit_offset;
bits >>= can_write;
pos += can_write;
remaining -= can_write;
}
bit_pos += bit_width as usize;
}
}
pub(crate) fn unpack_i64(data: &[u8], index: usize, min_val: i64, bit_width: u8) -> i64 {
if bit_width == 0 {
return min_val;
}
let bit_pos = index * bit_width as usize;
let mut result: u64 = 0;
let mut remaining = bit_width as usize;
let mut pos = bit_pos;
let mut shift = 0;
while remaining > 0 {
let byte_idx = pos / 8;
let bit_offset = pos % 8;
let can_read = (8 - bit_offset).min(remaining);
let mask = ((1u64 << can_read) - 1) as u8;
let bits = (data[byte_idx] >> bit_offset) & mask;
result |= (bits as u64) << shift;
pos += can_read;
shift += can_read;
remaining -= can_read;
}
min_val + result as i64
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum ColumnType {
Empty = 0,
Keyword = 1,
I64 = 2,
F64 = 3,
Bool = 4,
ConstantI64 = 5,
ConstantF64 = 6,
BitpackedI64 = 7,
KeywordBlocked = 8,
}
impl ColumnType {
pub(crate) fn is_numeric(self) -> bool {
matches!(
self,
ColumnType::I64
| ColumnType::F64
| ColumnType::ConstantI64
| ColumnType::ConstantF64
| ColumnType::BitpackedI64
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::columnar::reader::ColumnReader;
#[test]
fn keyword_column_round_trip() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::Keyword("hello".into()));
w.add(ColumnValue::Keyword("world".into()));
w.add(ColumnValue::Keyword("hello".into()));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.doc_count(), 3);
assert_eq!(r.keyword_value(0), Some("hello"));
assert_eq!(r.keyword_value(1), Some("world"));
assert_eq!(r.keyword_value(2), Some("hello"));
}
#[test]
fn i64_column_round_trip() {
let mut w = ColumnWriter::new(FieldId::new(1));
w.add(ColumnValue::I64(42));
w.add(ColumnValue::I64(-7));
w.add(ColumnValue::I64(0));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.i64_value(0), Some(42));
assert_eq!(r.i64_value(1), Some(-7));
assert_eq!(r.i64_value(2), Some(0));
}
#[test]
fn f64_column_round_trip() {
let mut w = ColumnWriter::new(FieldId::new(2));
w.add(ColumnValue::F64(3.14));
w.add(ColumnValue::F64(-1.5));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.f64_value(0), Some(3.14));
assert_eq!(r.f64_value(1), Some(-1.5));
}
#[test]
fn bool_column_round_trip() {
let mut w = ColumnWriter::new(FieldId::new(3));
w.add(ColumnValue::Bool(true));
w.add(ColumnValue::Bool(false));
w.add(ColumnValue::Bool(true));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.bool_value(0), Some(true));
assert_eq!(r.bool_value(1), Some(false));
assert_eq!(r.bool_value(2), Some(true));
}
#[test]
fn null_handling() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::Keyword("a".into()));
w.add(ColumnValue::Null);
w.add(ColumnValue::Keyword("b".into()));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.keyword_value(0), Some("a"));
assert!(r.is_null(1));
assert_eq!(r.keyword_value(1), None);
assert_eq!(r.keyword_value(2), Some("b"));
}
#[test]
fn dict_encoding_sorted() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::Keyword("cherry".into()));
w.add(ColumnValue::Keyword("apple".into()));
w.add(ColumnValue::Keyword("banana".into()));
w.add(ColumnValue::Keyword("apple".into()));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.keyword_value(0), Some("cherry"));
assert_eq!(r.keyword_value(1), Some("apple"));
assert_eq!(r.keyword_value(2), Some("banana"));
assert_eq!(r.keyword_value(3), Some("apple"));
}
#[test]
fn empty_column() {
let w = ColumnWriter::new(FieldId::new(0));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.doc_count(), 0);
}
#[test]
fn out_of_range() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::I64(1));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.i64_value(99), None);
}
#[test]
fn constant_i64_encoding() {
let mut w = ColumnWriter::new(FieldId::new(0));
for _ in 0..100 {
w.add(ColumnValue::I64(42));
}
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.doc_count(), 100);
assert!(r.is_constant());
assert_eq!(r.constant_value(), Some(42.0));
assert!(
data.len() < 100,
"constant encoding should be compact: {} bytes",
data.len()
);
}
#[test]
fn constant_f64_encoding() {
let mut w = ColumnWriter::new(FieldId::new(0));
for _ in 0..50 {
w.add(ColumnValue::F64(3.14));
}
let data = w.finish();
let r = ColumnReader::open(&data);
assert!(r.is_constant());
assert_eq!(r.constant_value(), Some(3.14));
}
#[test]
fn constant_with_nulls() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::I64(7));
w.add(ColumnValue::Null);
w.add(ColumnValue::I64(7));
w.add(ColumnValue::Null);
let data = w.finish();
let r = ColumnReader::open(&data);
assert!(r.is_constant());
assert_eq!(r.constant_value(), Some(7.0));
assert!(r.is_null(1));
assert!(r.is_null(3));
assert!(!r.is_null(0));
assert!(!r.is_null(2));
}
#[test]
fn non_constant_stays_raw() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::I64(1));
w.add(ColumnValue::I64(2));
let data = w.finish();
let r = ColumnReader::open(&data);
assert!(!r.is_constant());
assert_eq!(r.i64_value(0), Some(1));
assert_eq!(r.i64_value(1), Some(2));
}
#[test]
fn bitpacked_narrow_range() {
let mut w = ColumnWriter::new(FieldId::new(0));
for i in 0..1000 {
w.add(ColumnValue::I64(100 + (i % 16)));
}
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.doc_count(), 1000);
assert_eq!(r.i64_value(0), Some(100));
assert_eq!(r.i64_value(1), Some(101));
assert_eq!(r.i64_value(15), Some(115));
assert_eq!(r.i64_value(16), Some(100));
assert_eq!(r.numeric_value(999), Some(107.0)); assert!(
data.len() < 1000,
"bitpacked should be compact: {} bytes",
data.len()
);
}
#[test]
fn bitpacked_with_nulls() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::I64(10));
w.add(ColumnValue::Null);
w.add(ColumnValue::I64(13));
w.add(ColumnValue::Null);
w.add(ColumnValue::I64(11));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.i64_value(0), Some(10));
assert_eq!(r.i64_value(1), None); assert_eq!(r.i64_value(2), Some(13));
assert_eq!(r.i64_value(3), None); assert_eq!(r.i64_value(4), Some(11));
}
#[test]
fn wide_range_stays_raw() {
let mut w = ColumnWriter::new(FieldId::new(0));
w.add(ColumnValue::I64(i64::MIN));
w.add(ColumnValue::I64(i64::MAX));
let data = w.finish();
let r = ColumnReader::open(&data);
assert_eq!(r.i64_value(0), Some(i64::MIN));
assert_eq!(r.i64_value(1), Some(i64::MAX));
}
}