1use crate::core::{FieldId, LuciError, Result};
6
7pub(crate) const DICT_BLOCK_SHIFT: u32 = 6;
18pub(crate) const DICT_BLOCK_SIZE: usize = 1 << DICT_BLOCK_SHIFT;
19pub(crate) const DICT_BLOCK_MASK: u32 = DICT_BLOCK_SIZE as u32 - 1;
20
21#[derive(Clone, Debug)]
23pub enum ColumnValue {
24 Keyword(String),
25 I64(i64),
26 F64(f64),
27 Bool(bool),
28 Null,
29}
30
31impl ColumnValue {
32 pub fn keyword(s: String) -> Result<ColumnValue> {
43 if s.len() > u16::MAX as usize {
44 return Err(LuciError::InvalidValue(format!(
45 "keyword value is {} bytes, exceeds the maximum of {} bytes",
46 s.len(),
47 u16::MAX
48 )));
49 }
50 Ok(ColumnValue::Keyword(s))
51 }
52}
53
54pub struct ColumnWriter {
56 field_id: FieldId,
57 values: Vec<ColumnValue>,
58}
59
60impl ColumnWriter {
61 pub fn new(field_id: FieldId) -> Self {
62 Self {
63 field_id,
64 values: Vec::new(),
65 }
66 }
67
68 pub fn add(&mut self, value: ColumnValue) {
69 self.values.push(value);
70 }
71
72 pub fn doc_count(&self) -> u32 {
73 self.values.len() as u32
74 }
75
76 pub fn finish(self) -> Vec<u8> {
84 if self.values.is_empty() {
85 return self.write_empty();
86 }
87
88 let col_type = self.detect_type();
90 match col_type {
91 ColumnType::Keyword | ColumnType::KeywordBlocked => self.write_keyword_column(),
94 ColumnType::I64 | ColumnType::ConstantI64 | ColumnType::BitpackedI64 => {
95 self.write_i64_column()
96 }
97 ColumnType::F64 | ColumnType::ConstantF64 => self.write_f64_column(),
98 ColumnType::Bool => self.write_bool_column(),
99 ColumnType::Empty => self.write_empty(),
100 }
101 }
102
103 fn detect_type(&self) -> ColumnType {
104 for v in &self.values {
105 match v {
106 ColumnValue::Keyword(_) => return ColumnType::Keyword,
107 ColumnValue::I64(_) => return ColumnType::I64,
108 ColumnValue::F64(_) => return ColumnType::F64,
109 ColumnValue::Bool(_) => return ColumnType::Bool,
110 ColumnValue::Null => continue,
111 }
112 }
113 ColumnType::Empty
114 }
115
116 fn write_header(&self, col_type: ColumnType) -> Vec<u8> {
117 let mut buf = Vec::new();
118 buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
119 buf.push(col_type as u8);
120 buf.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
121
122 let null_bytes = (self.values.len() + 7) / 8;
124 let mut null_bitset = vec![0u8; null_bytes];
125 let mut null_count: u32 = 0;
126 for (i, v) in self.values.iter().enumerate() {
127 if matches!(v, ColumnValue::Null) {
128 null_bitset[i / 8] |= 1 << (i % 8);
129 null_count += 1;
130 }
131 }
132 buf.extend_from_slice(&null_bitset);
133
134 if col_type.is_numeric() {
137 buf.extend_from_slice(&null_count.to_le_bytes());
138 let (min_val, max_val) = self.numeric_range();
139 buf.extend_from_slice(&min_val.to_le_bytes());
140 buf.extend_from_slice(&max_val.to_le_bytes());
141 }
142
143 buf
144 }
145
146 fn numeric_range(&self) -> (f64, f64) {
148 let mut min_val = f64::INFINITY;
149 let mut max_val = f64::NEG_INFINITY;
150 for v in &self.values {
151 let n = match v {
152 ColumnValue::I64(n) => *n as f64,
153 ColumnValue::F64(n) => *n,
154 _ => continue,
155 };
156 if n < min_val {
157 min_val = n;
158 }
159 if n > max_val {
160 max_val = n;
161 }
162 }
163 (min_val, max_val)
164 }
165
166 fn write_empty(&self) -> Vec<u8> {
167 let mut buf = Vec::new();
168 buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
169 buf.push(ColumnType::Empty as u8);
170 buf.extend_from_slice(&0u32.to_le_bytes());
171 buf
172 }
173
174 fn write_keyword_column(self) -> Vec<u8> {
175 let mut buf = self.write_header(ColumnType::KeywordBlocked);
176
177 let mut dict: Vec<String> = Vec::new();
179 let mut seen = std::collections::HashMap::new();
180 for v in &self.values {
181 if let ColumnValue::Keyword(s) = v {
182 if !seen.contains_key(s.as_str()) {
183 seen.insert(s.clone(), 0u32); dict.push(s.clone());
185 }
186 }
187 }
188 dict.sort();
189 for (i, term) in dict.iter().enumerate() {
190 seen.insert(term.clone(), i as u32);
191 }
192
193 buf.extend_from_slice(&(dict.len() as u32).to_le_bytes()); let body_len_pos = buf.len();
202 buf.extend_from_slice(&0u64.to_le_bytes()); let body_start = buf.len();
204 let mut block_addrs: Vec<u64> = Vec::with_capacity(dict.len().div_ceil(DICT_BLOCK_SIZE));
205 for (i, term) in dict.iter().enumerate() {
206 if i % DICT_BLOCK_SIZE == 0 {
207 block_addrs.push((buf.len() - body_start) as u64);
208 }
209 let bytes = term.as_bytes();
210 debug_assert!(bytes.len() <= u16::MAX as usize);
213 buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
214 buf.extend_from_slice(bytes);
215 }
216 let body_len = (buf.len() - body_start) as u64;
217 buf[body_len_pos..body_len_pos + 8].copy_from_slice(&body_len.to_le_bytes());
218 for a in &block_addrs {
219 buf.extend_from_slice(&a.to_le_bytes());
220 }
221
222 for v in &self.values {
224 let ordinal = match v {
225 ColumnValue::Keyword(s) => *seen.get(s.as_str()).unwrap(),
226 _ => u32::MAX, };
228 buf.extend_from_slice(&ordinal.to_le_bytes());
229 }
230
231 buf
232 }
233
234 fn write_i64_column(self) -> Vec<u8> {
235 if let Some(constant) = self.constant_i64() {
237 let mut buf = self.write_header(ColumnType::ConstantI64);
238 buf.extend_from_slice(&constant.to_le_bytes());
239 return buf;
240 }
241
242 let (min_val, max_val) = self.i64_range();
244 let range = (max_val as u128).wrapping_sub(min_val as u128);
245 let bit_width = if range == 0 {
246 0
247 } else {
248 128 - range.leading_zeros()
249 } as u8;
250
251 let raw_bytes = self.values.len() * 8;
253 let packed_bytes = (self.values.len() * bit_width as usize + 7) / 8;
254 if bit_width < 64 && packed_bytes + 9 < raw_bytes {
256 let mut buf = self.write_header(ColumnType::BitpackedI64);
257 buf.extend_from_slice(&min_val.to_le_bytes());
258 buf.push(bit_width);
259 bitpack_i64(&self.values, min_val, bit_width, &mut buf);
260 return buf;
261 }
262
263 let mut buf = self.write_header(ColumnType::I64);
265 for v in &self.values {
266 let val = match v {
267 ColumnValue::I64(n) => *n,
268 _ => 0,
269 };
270 buf.extend_from_slice(&val.to_le_bytes());
271 }
272 buf
273 }
274
275 fn i64_range(&self) -> (i64, i64) {
277 let mut min_val = i64::MAX;
278 let mut max_val = i64::MIN;
279 for v in &self.values {
280 if let ColumnValue::I64(n) = v {
281 if *n < min_val {
282 min_val = *n;
283 }
284 if *n > max_val {
285 max_val = *n;
286 }
287 }
288 }
289 (min_val, max_val)
290 }
291
292 fn write_f64_column(self) -> Vec<u8> {
293 if let Some(constant) = self.constant_f64() {
295 let mut buf = self.write_header(ColumnType::ConstantF64);
296 buf.extend_from_slice(&constant.to_le_bytes());
297 return buf;
298 }
299
300 let mut buf = self.write_header(ColumnType::F64);
301 for v in &self.values {
302 let val = match v {
303 ColumnValue::F64(n) => *n,
304 _ => 0.0,
305 };
306 buf.extend_from_slice(&val.to_le_bytes());
307 }
308 buf
309 }
310
311 fn constant_i64(&self) -> Option<i64> {
313 let mut constant: Option<i64> = None;
314 for v in &self.values {
315 if let ColumnValue::I64(n) = v {
316 match constant {
317 None => constant = Some(*n),
318 Some(c) if c != *n => return None,
319 _ => {}
320 }
321 }
322 }
323 constant
324 }
325
326 fn constant_f64(&self) -> Option<f64> {
328 let mut constant: Option<f64> = None;
329 for v in &self.values {
330 if let ColumnValue::F64(n) = v {
331 match constant {
332 None => constant = Some(*n),
333 Some(c) if c != *n => return None,
334 _ => {}
335 }
336 }
337 }
338 constant
339 }
340
341 fn write_bool_column(self) -> Vec<u8> {
342 let mut buf = self.write_header(ColumnType::Bool);
343 let bool_bytes = (self.values.len() + 7) / 8;
344 let mut bitset = vec![0u8; bool_bytes];
345 for (i, v) in self.values.iter().enumerate() {
346 if let ColumnValue::Bool(true) = v {
347 bitset[i / 8] |= 1 << (i % 8);
348 }
349 }
350 buf.extend_from_slice(&bitset);
351 buf
352 }
353}
354
355pub struct ColumnarWriter {
357 columns: std::collections::HashMap<FieldId, ColumnWriter>,
358}
359
360impl ColumnarWriter {
361 pub fn new() -> Self {
362 Self {
363 columns: std::collections::HashMap::new(),
364 }
365 }
366
367 pub fn add(&mut self, field_id: FieldId, value: ColumnValue) {
368 self.columns
369 .entry(field_id)
370 .or_insert_with(|| ColumnWriter::new(field_id))
371 .add(value);
372 }
373
374 pub fn pad_to(&mut self, doc_count: u32) {
376 for writer in self.columns.values_mut() {
377 while writer.doc_count() < doc_count {
378 writer.add(ColumnValue::Null);
379 }
380 }
381 }
382
383 pub fn is_empty(&self) -> bool {
384 self.columns.is_empty()
385 }
386
387 pub fn finish(self) -> Vec<u8> {
391 let mut buf = Vec::new();
392 let mut entries: Vec<(FieldId, ColumnWriter)> = self.columns.into_iter().collect();
393 entries.sort_by_key(|(fid, _)| *fid);
394
395 buf.extend_from_slice(&(entries.len() as u16).to_le_bytes());
396 for (_, writer) in entries {
397 buf.extend_from_slice(&writer.finish());
398 }
399 buf
400 }
401}
402
403impl Default for ColumnarWriter {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409fn bitpack_i64(values: &[ColumnValue], min_val: i64, bit_width: u8, buf: &mut Vec<u8>) {
411 if bit_width == 0 {
412 return; }
414 let num_bytes = (values.len() * bit_width as usize + 7) / 8;
415 let start = buf.len();
416 buf.resize(start + num_bytes, 0);
417
418 let mut bit_pos: usize = 0;
419 for v in values {
420 let val = match v {
421 ColumnValue::I64(n) => *n,
422 _ => min_val, };
424 let residual = (val - min_val) as u64;
425
426 let mut remaining = bit_width as usize;
428 let mut bits = residual;
429 let mut pos = bit_pos;
430 while remaining > 0 {
431 let byte_idx = start + pos / 8;
432 let bit_offset = pos % 8;
433 let can_write = (8 - bit_offset).min(remaining);
434 let mask = ((1u64 << can_write) - 1) as u8;
435 buf[byte_idx] |= ((bits as u8) & mask) << bit_offset;
436 bits >>= can_write;
437 pos += can_write;
438 remaining -= can_write;
439 }
440 bit_pos += bit_width as usize;
441 }
442}
443
444pub(crate) fn unpack_i64(data: &[u8], index: usize, min_val: i64, bit_width: u8) -> i64 {
446 if bit_width == 0 {
447 return min_val;
448 }
449 let bit_pos = index * bit_width as usize;
450 let mut result: u64 = 0;
451 let mut remaining = bit_width as usize;
452 let mut pos = bit_pos;
453 let mut shift = 0;
454 while remaining > 0 {
455 let byte_idx = pos / 8;
456 let bit_offset = pos % 8;
457 let can_read = (8 - bit_offset).min(remaining);
458 let mask = ((1u64 << can_read) - 1) as u8;
459 let bits = (data[byte_idx] >> bit_offset) & mask;
460 result |= (bits as u64) << shift;
461 pos += can_read;
462 shift += can_read;
463 remaining -= can_read;
464 }
465 min_val + result as i64
466}
467
468#[derive(Clone, Copy, Debug, PartialEq, Eq)]
469#[repr(u8)]
470pub(crate) enum ColumnType {
471 Empty = 0,
472 Keyword = 1,
477 I64 = 2,
478 F64 = 3,
479 Bool = 4,
480 ConstantI64 = 5,
482 ConstantF64 = 6,
484 BitpackedI64 = 7,
487 KeywordBlocked = 8,
495}
496
497impl ColumnType {
498 pub(crate) fn is_numeric(self) -> bool {
500 matches!(
501 self,
502 ColumnType::I64
503 | ColumnType::F64
504 | ColumnType::ConstantI64
505 | ColumnType::ConstantF64
506 | ColumnType::BitpackedI64
507 )
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::columnar::reader::ColumnReader;
515
516 #[test]
517 fn keyword_column_round_trip() {
518 let mut w = ColumnWriter::new(FieldId::new(0));
519 w.add(ColumnValue::Keyword("hello".into()));
520 w.add(ColumnValue::Keyword("world".into()));
521 w.add(ColumnValue::Keyword("hello".into()));
522 let data = w.finish();
523
524 let r = ColumnReader::open(&data);
525 assert_eq!(r.doc_count(), 3);
526 assert_eq!(r.keyword_value(0), Some("hello"));
527 assert_eq!(r.keyword_value(1), Some("world"));
528 assert_eq!(r.keyword_value(2), Some("hello"));
529 }
530
531 #[test]
532 fn i64_column_round_trip() {
533 let mut w = ColumnWriter::new(FieldId::new(1));
534 w.add(ColumnValue::I64(42));
535 w.add(ColumnValue::I64(-7));
536 w.add(ColumnValue::I64(0));
537 let data = w.finish();
538
539 let r = ColumnReader::open(&data);
540 assert_eq!(r.i64_value(0), Some(42));
541 assert_eq!(r.i64_value(1), Some(-7));
542 assert_eq!(r.i64_value(2), Some(0));
543 }
544
545 #[test]
546 fn f64_column_round_trip() {
547 let mut w = ColumnWriter::new(FieldId::new(2));
548 w.add(ColumnValue::F64(3.14));
549 w.add(ColumnValue::F64(-1.5));
550 let data = w.finish();
551
552 let r = ColumnReader::open(&data);
553 assert_eq!(r.f64_value(0), Some(3.14));
554 assert_eq!(r.f64_value(1), Some(-1.5));
555 }
556
557 #[test]
558 fn bool_column_round_trip() {
559 let mut w = ColumnWriter::new(FieldId::new(3));
560 w.add(ColumnValue::Bool(true));
561 w.add(ColumnValue::Bool(false));
562 w.add(ColumnValue::Bool(true));
563 let data = w.finish();
564
565 let r = ColumnReader::open(&data);
566 assert_eq!(r.bool_value(0), Some(true));
567 assert_eq!(r.bool_value(1), Some(false));
568 assert_eq!(r.bool_value(2), Some(true));
569 }
570
571 #[test]
572 fn null_handling() {
573 let mut w = ColumnWriter::new(FieldId::new(0));
574 w.add(ColumnValue::Keyword("a".into()));
575 w.add(ColumnValue::Null);
576 w.add(ColumnValue::Keyword("b".into()));
577 let data = w.finish();
578
579 let r = ColumnReader::open(&data);
580 assert_eq!(r.keyword_value(0), Some("a"));
581 assert!(r.is_null(1));
582 assert_eq!(r.keyword_value(1), None);
583 assert_eq!(r.keyword_value(2), Some("b"));
584 }
585
586 #[test]
587 fn dict_encoding_sorted() {
588 let mut w = ColumnWriter::new(FieldId::new(0));
589 w.add(ColumnValue::Keyword("cherry".into()));
590 w.add(ColumnValue::Keyword("apple".into()));
591 w.add(ColumnValue::Keyword("banana".into()));
592 w.add(ColumnValue::Keyword("apple".into()));
593 let data = w.finish();
594
595 let r = ColumnReader::open(&data);
596 assert_eq!(r.keyword_value(0), Some("cherry"));
597 assert_eq!(r.keyword_value(1), Some("apple"));
598 assert_eq!(r.keyword_value(2), Some("banana"));
599 assert_eq!(r.keyword_value(3), Some("apple"));
600 }
601
602 #[test]
603 fn empty_column() {
604 let w = ColumnWriter::new(FieldId::new(0));
605 let data = w.finish();
606
607 let r = ColumnReader::open(&data);
608 assert_eq!(r.doc_count(), 0);
609 }
610
611 #[test]
612 fn out_of_range() {
613 let mut w = ColumnWriter::new(FieldId::new(0));
614 w.add(ColumnValue::I64(1));
615 let data = w.finish();
616
617 let r = ColumnReader::open(&data);
618 assert_eq!(r.i64_value(99), None);
619 }
620
621 #[test]
622 fn constant_i64_encoding() {
623 let mut w = ColumnWriter::new(FieldId::new(0));
624 for _ in 0..100 {
625 w.add(ColumnValue::I64(42));
626 }
627 let data = w.finish();
628
629 let r = ColumnReader::open(&data);
630 assert_eq!(r.doc_count(), 100);
631 assert!(r.is_constant());
632 assert_eq!(r.constant_value(), Some(42.0));
633 assert!(
636 data.len() < 100,
637 "constant encoding should be compact: {} bytes",
638 data.len()
639 );
640 }
641
642 #[test]
643 fn constant_f64_encoding() {
644 let mut w = ColumnWriter::new(FieldId::new(0));
645 for _ in 0..50 {
646 w.add(ColumnValue::F64(3.14));
647 }
648 let data = w.finish();
649
650 let r = ColumnReader::open(&data);
651 assert!(r.is_constant());
652 assert_eq!(r.constant_value(), Some(3.14));
653 }
654
655 #[test]
656 fn constant_with_nulls() {
657 let mut w = ColumnWriter::new(FieldId::new(0));
658 w.add(ColumnValue::I64(7));
659 w.add(ColumnValue::Null);
660 w.add(ColumnValue::I64(7));
661 w.add(ColumnValue::Null);
662 let data = w.finish();
663
664 let r = ColumnReader::open(&data);
665 assert!(r.is_constant());
666 assert_eq!(r.constant_value(), Some(7.0));
667 assert!(r.is_null(1));
668 assert!(r.is_null(3));
669 assert!(!r.is_null(0));
670 assert!(!r.is_null(2));
671 }
672
673 #[test]
674 fn non_constant_stays_raw() {
675 let mut w = ColumnWriter::new(FieldId::new(0));
676 w.add(ColumnValue::I64(1));
677 w.add(ColumnValue::I64(2));
678 let data = w.finish();
679
680 let r = ColumnReader::open(&data);
681 assert!(!r.is_constant());
682 assert_eq!(r.i64_value(0), Some(1));
683 assert_eq!(r.i64_value(1), Some(2));
684 }
685
686 #[test]
687 fn bitpacked_narrow_range() {
688 let mut w = ColumnWriter::new(FieldId::new(0));
689 for i in 0..1000 {
691 w.add(ColumnValue::I64(100 + (i % 16)));
692 }
693 let data = w.finish();
694
695 let r = ColumnReader::open(&data);
696 assert_eq!(r.doc_count(), 1000);
697 assert_eq!(r.i64_value(0), Some(100));
698 assert_eq!(r.i64_value(1), Some(101));
699 assert_eq!(r.i64_value(15), Some(115));
700 assert_eq!(r.i64_value(16), Some(100));
701 assert_eq!(r.numeric_value(999), Some(107.0)); assert!(
705 data.len() < 1000,
706 "bitpacked should be compact: {} bytes",
707 data.len()
708 );
709 }
710
711 #[test]
712 fn bitpacked_with_nulls() {
713 let mut w = ColumnWriter::new(FieldId::new(0));
714 w.add(ColumnValue::I64(10));
715 w.add(ColumnValue::Null);
716 w.add(ColumnValue::I64(13));
717 w.add(ColumnValue::Null);
718 w.add(ColumnValue::I64(11));
719 let data = w.finish();
720
721 let r = ColumnReader::open(&data);
722 assert_eq!(r.i64_value(0), Some(10));
723 assert_eq!(r.i64_value(1), None); assert_eq!(r.i64_value(2), Some(13));
725 assert_eq!(r.i64_value(3), None); assert_eq!(r.i64_value(4), Some(11));
727 }
728
729 #[test]
730 fn wide_range_stays_raw() {
731 let mut w = ColumnWriter::new(FieldId::new(0));
732 w.add(ColumnValue::I64(i64::MIN));
734 w.add(ColumnValue::I64(i64::MAX));
735 let data = w.finish();
736
737 let r = ColumnReader::open(&data);
738 assert_eq!(r.i64_value(0), Some(i64::MIN));
739 assert_eq!(r.i64_value(1), Some(i64::MAX));
740 }
741}