Skip to main content

luci/columnar/
writer.rs

1//! Columnar store writer: accumulate column values during segment building.
2//!
3//! See [[columnar-storage]] and [[feature-aggregations-v010#Step 1]].
4
5use crate::core::{FieldId, LuciError, Result};
6
7/// Block size for the keyword dictionary offset index: every `DICT_BLOCK_SIZE`-th
8/// dictionary entry gets a persisted byte address, so `ordinal → string` is a
9/// block seek plus a bounded ≤`DICT_BLOCK_MASK`-step walk rather than an O(N)
10/// scan. Named `DICT_BLOCK_*` to avoid confusion with the unrelated storage-layer
11/// `BLOCK_SIZE` constants (`doc_store.rs` / `postings.rs` / `luci-storage`).
12/// Power-of-two so the writer groups by `% DICT_BLOCK_SIZE` while the reader
13/// seeks by `ord >> DICT_BLOCK_SHIFT` and walks `ord & DICT_BLOCK_MASK` with no
14/// search. K is a single source of truth — imported by the reader, never
15/// re-declared, so changing it cannot introduce an off-by-one.
16/// See [[optimization-keyword-dict-offset-index]].
17pub(crate) const DICT_BLOCK_SHIFT: u32 = 6;
18pub(crate) const DICT_BLOCK_SIZE: usize = 1 << DICT_BLOCK_SHIFT;
19pub(crate) const DICT_BLOCK_MASK: u32 = DICT_BLOCK_SIZE as u32 - 1;
20
21/// Column value for a single document.
22#[derive(Clone, Debug)]
23pub enum ColumnValue {
24    Keyword(String),
25    I64(i64),
26    F64(f64),
27    Bool(bool),
28    Null,
29}
30
31impl ColumnValue {
32    /// Construct a keyword value, rejecting any string whose UTF-8 length
33    /// exceeds the 65535-byte columnar dictionary limit.
34    ///
35    /// The blocked keyword column stores each dictionary entry's length as a
36    /// `u16` (see [[optimization-keyword-dict-offset-index]]). Silently
37    /// truncating an over-length value would corrupt the reader's block
38    /// arithmetic and cross-contaminate neighboring `_id`s, so we fail loud at
39    /// construction — before any persistent buffer mutation on the `add` path
40    /// ([[code-must-not-lie]]). A `doc_values: false` keyword never reaches
41    /// here (it has no columnar column) and keeps the uncapped FST path.
42    pub fn keyword(s: String) -> Result<ColumnValue> {
43        if s.len() > u16::MAX as usize {
44            return Err(LuciError::InvalidValue(format!(
45                "keyword value is {} bytes, exceeds the maximum of {} bytes",
46                s.len(),
47                u16::MAX
48            )));
49        }
50        Ok(ColumnValue::Keyword(s))
51    }
52}
53
54/// Writes a single column's values during segment building.
55pub struct ColumnWriter {
56    field_id: FieldId,
57    values: Vec<ColumnValue>,
58}
59
60impl ColumnWriter {
61    pub fn new(field_id: FieldId) -> Self {
62        Self {
63            field_id,
64            values: Vec::new(),
65        }
66    }
67
68    pub fn add(&mut self, value: ColumnValue) {
69        self.values.push(value);
70    }
71
72    pub fn doc_count(&self) -> u32 {
73        self.values.len() as u32
74    }
75
76    /// Serialize the column to bytes.
77    ///
78    /// Format:
79    /// ```text
80    /// [field_id: u16] [column_type: u8] [doc_count: u32]
81    /// Type-specific data follows.
82    /// ```
83    pub fn finish(self) -> Vec<u8> {
84        if self.values.is_empty() {
85            return self.write_empty();
86        }
87
88        // Detect column type from first non-null value
89        let col_type = self.detect_type();
90        match col_type {
91            // KeywordBlocked is dead here (detect_type only ever yields
92            // Keyword), but the exhaustive match forces an honest arm.
93            ColumnType::Keyword | ColumnType::KeywordBlocked => self.write_keyword_column(),
94            ColumnType::I64 | ColumnType::ConstantI64 | ColumnType::BitpackedI64 => {
95                self.write_i64_column()
96            }
97            ColumnType::F64 | ColumnType::ConstantF64 => self.write_f64_column(),
98            ColumnType::Bool => self.write_bool_column(),
99            ColumnType::Empty => self.write_empty(),
100        }
101    }
102
103    fn detect_type(&self) -> ColumnType {
104        for v in &self.values {
105            match v {
106                ColumnValue::Keyword(_) => return ColumnType::Keyword,
107                ColumnValue::I64(_) => return ColumnType::I64,
108                ColumnValue::F64(_) => return ColumnType::F64,
109                ColumnValue::Bool(_) => return ColumnType::Bool,
110                ColumnValue::Null => continue,
111            }
112        }
113        ColumnType::Empty
114    }
115
116    fn write_header(&self, col_type: ColumnType) -> Vec<u8> {
117        let mut buf = Vec::new();
118        buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
119        buf.push(col_type as u8);
120        buf.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
121
122        // Null bitset: 1 bit per doc, 0 = non-null, 1 = null
123        let null_bytes = (self.values.len() + 7) / 8;
124        let mut null_bitset = vec![0u8; null_bytes];
125        let mut null_count: u32 = 0;
126        for (i, v) in self.values.iter().enumerate() {
127            if matches!(v, ColumnValue::Null) {
128                null_bitset[i / 8] |= 1 << (i % 8);
129                null_count += 1;
130            }
131        }
132        buf.extend_from_slice(&null_bitset);
133
134        // Column statistics for numeric types: [null_count: u32][min: f64][max: f64]
135        // Enables segment-level aggregation pushdown (O(1) min/max/count).
136        if col_type.is_numeric() {
137            buf.extend_from_slice(&null_count.to_le_bytes());
138            let (min_val, max_val) = self.numeric_range();
139            buf.extend_from_slice(&min_val.to_le_bytes());
140            buf.extend_from_slice(&max_val.to_le_bytes());
141        }
142
143        buf
144    }
145
146    /// Compute min and max as f64 across all non-null numeric values.
147    fn numeric_range(&self) -> (f64, f64) {
148        let mut min_val = f64::INFINITY;
149        let mut max_val = f64::NEG_INFINITY;
150        for v in &self.values {
151            let n = match v {
152                ColumnValue::I64(n) => *n as f64,
153                ColumnValue::F64(n) => *n,
154                _ => continue,
155            };
156            if n < min_val {
157                min_val = n;
158            }
159            if n > max_val {
160                max_val = n;
161            }
162        }
163        (min_val, max_val)
164    }
165
166    fn write_empty(&self) -> Vec<u8> {
167        let mut buf = Vec::new();
168        buf.extend_from_slice(&self.field_id.as_u16().to_le_bytes());
169        buf.push(ColumnType::Empty as u8);
170        buf.extend_from_slice(&0u32.to_le_bytes());
171        buf
172    }
173
174    fn write_keyword_column(self) -> Vec<u8> {
175        let mut buf = self.write_header(ColumnType::KeywordBlocked);
176
177        // Dictionary encoding: collect unique values, sort, assign ordinals
178        let mut dict: Vec<String> = Vec::new();
179        let mut seen = std::collections::HashMap::new();
180        for v in &self.values {
181            if let ColumnValue::Keyword(s) = v {
182                if !seen.contains_key(s.as_str()) {
183                    seen.insert(s.clone(), 0u32); // placeholder
184                    dict.push(s.clone());
185                }
186            }
187        }
188        dict.sort();
189        for (i, term) in dict.iter().enumerate() {
190            seen.insert(term.clone(), i as u32);
191        }
192
193        // Write dictionary in the blocked layout:
194        //   [dict_count: u32]
195        //   [dict_body_len: u64]                       (lets open() skip body in O(1))
196        //   [ (len: u16)(UTF-8 bytes) × dict_count ]   (sorted, unchanged encoding)
197        //   [ block_addrs: u64 × ceil(dict_count/DICT_BLOCK_SIZE) ]
198        // block_addrs[b] is block b's first-entry byte offset relative to the
199        // dictionary body start; block_addrs[0] == 0 always.
200        buf.extend_from_slice(&(dict.len() as u32).to_le_bytes()); // dict_count
201        let body_len_pos = buf.len();
202        buf.extend_from_slice(&0u64.to_le_bytes()); // dict_body_len placeholder
203        let body_start = buf.len();
204        let mut block_addrs: Vec<u64> = Vec::with_capacity(dict.len().div_ceil(DICT_BLOCK_SIZE));
205        for (i, term) in dict.iter().enumerate() {
206            if i % DICT_BLOCK_SIZE == 0 {
207                block_addrs.push((buf.len() - body_start) as u64);
208            }
209            let bytes = term.as_bytes();
210            // Upstream ColumnValue::keyword() rejects > u16::MAX bytes, so this
211            // never fires; it guards against a future bypass of that constructor.
212            debug_assert!(bytes.len() <= u16::MAX as usize);
213            buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
214            buf.extend_from_slice(bytes);
215        }
216        let body_len = (buf.len() - body_start) as u64;
217        buf[body_len_pos..body_len_pos + 8].copy_from_slice(&body_len.to_le_bytes());
218        for a in &block_addrs {
219            buf.extend_from_slice(&a.to_le_bytes());
220        }
221
222        // Write ordinal array: one u32 per doc (u32::MAX for null)
223        for v in &self.values {
224            let ordinal = match v {
225                ColumnValue::Keyword(s) => *seen.get(s.as_str()).unwrap(),
226                _ => u32::MAX, // null
227            };
228            buf.extend_from_slice(&ordinal.to_le_bytes());
229        }
230
231        buf
232    }
233
234    fn write_i64_column(self) -> Vec<u8> {
235        // Check for constant encoding
236        if let Some(constant) = self.constant_i64() {
237            let mut buf = self.write_header(ColumnType::ConstantI64);
238            buf.extend_from_slice(&constant.to_le_bytes());
239            return buf;
240        }
241
242        // Check for bitpacking: compute range and required bit width
243        let (min_val, max_val) = self.i64_range();
244        let range = (max_val as u128).wrapping_sub(min_val as u128);
245        let bit_width = if range == 0 {
246            0
247        } else {
248            128 - range.leading_zeros()
249        } as u8;
250
251        // Bitpack if it saves space: packed bytes < raw bytes (8 per doc)
252        let raw_bytes = self.values.len() * 8;
253        let packed_bytes = (self.values.len() * bit_width as usize + 7) / 8;
254        // Overhead: 8 (min) + 1 (bit_width)
255        if bit_width < 64 && packed_bytes + 9 < raw_bytes {
256            let mut buf = self.write_header(ColumnType::BitpackedI64);
257            buf.extend_from_slice(&min_val.to_le_bytes());
258            buf.push(bit_width);
259            bitpack_i64(&self.values, min_val, bit_width, &mut buf);
260            return buf;
261        }
262
263        // Fall back to raw encoding
264        let mut buf = self.write_header(ColumnType::I64);
265        for v in &self.values {
266            let val = match v {
267                ColumnValue::I64(n) => *n,
268                _ => 0,
269            };
270            buf.extend_from_slice(&val.to_le_bytes());
271        }
272        buf
273    }
274
275    /// Compute the min and max of non-null i64 values.
276    fn i64_range(&self) -> (i64, i64) {
277        let mut min_val = i64::MAX;
278        let mut max_val = i64::MIN;
279        for v in &self.values {
280            if let ColumnValue::I64(n) = v {
281                if *n < min_val {
282                    min_val = *n;
283                }
284                if *n > max_val {
285                    max_val = *n;
286                }
287            }
288        }
289        (min_val, max_val)
290    }
291
292    fn write_f64_column(self) -> Vec<u8> {
293        // Check for constant encoding
294        if let Some(constant) = self.constant_f64() {
295            let mut buf = self.write_header(ColumnType::ConstantF64);
296            buf.extend_from_slice(&constant.to_le_bytes());
297            return buf;
298        }
299
300        let mut buf = self.write_header(ColumnType::F64);
301        for v in &self.values {
302            let val = match v {
303                ColumnValue::F64(n) => *n,
304                _ => 0.0,
305            };
306            buf.extend_from_slice(&val.to_le_bytes());
307        }
308        buf
309    }
310
311    /// If all non-null i64 values are identical, return the constant.
312    fn constant_i64(&self) -> Option<i64> {
313        let mut constant: Option<i64> = None;
314        for v in &self.values {
315            if let ColumnValue::I64(n) = v {
316                match constant {
317                    None => constant = Some(*n),
318                    Some(c) if c != *n => return None,
319                    _ => {}
320                }
321            }
322        }
323        constant
324    }
325
326    /// If all non-null f64 values are identical, return the constant.
327    fn constant_f64(&self) -> Option<f64> {
328        let mut constant: Option<f64> = None;
329        for v in &self.values {
330            if let ColumnValue::F64(n) = v {
331                match constant {
332                    None => constant = Some(*n),
333                    Some(c) if c != *n => return None,
334                    _ => {}
335                }
336            }
337        }
338        constant
339    }
340
341    fn write_bool_column(self) -> Vec<u8> {
342        let mut buf = self.write_header(ColumnType::Bool);
343        let bool_bytes = (self.values.len() + 7) / 8;
344        let mut bitset = vec![0u8; bool_bytes];
345        for (i, v) in self.values.iter().enumerate() {
346            if let ColumnValue::Bool(true) = v {
347                bitset[i / 8] |= 1 << (i % 8);
348            }
349        }
350        buf.extend_from_slice(&bitset);
351        buf
352    }
353}
354
355/// Accumulates columns for all doc_values fields in a segment.
356pub struct ColumnarWriter {
357    columns: std::collections::HashMap<FieldId, ColumnWriter>,
358}
359
360impl ColumnarWriter {
361    pub fn new() -> Self {
362        Self {
363            columns: std::collections::HashMap::new(),
364        }
365    }
366
367    pub fn add(&mut self, field_id: FieldId, value: ColumnValue) {
368        self.columns
369            .entry(field_id)
370            .or_insert_with(|| ColumnWriter::new(field_id))
371            .add(value);
372    }
373
374    /// Ensure all columns have the same doc count by padding with nulls.
375    pub fn pad_to(&mut self, doc_count: u32) {
376        for writer in self.columns.values_mut() {
377            while writer.doc_count() < doc_count {
378                writer.add(ColumnValue::Null);
379            }
380        }
381    }
382
383    pub fn is_empty(&self) -> bool {
384        self.columns.is_empty()
385    }
386
387    /// Serialize all columns.
388    ///
389    /// Format: [num_columns: u16] [column_data...]
390    pub fn finish(self) -> Vec<u8> {
391        let mut buf = Vec::new();
392        let mut entries: Vec<(FieldId, ColumnWriter)> = self.columns.into_iter().collect();
393        entries.sort_by_key(|(fid, _)| *fid);
394
395        buf.extend_from_slice(&(entries.len() as u16).to_le_bytes());
396        for (_, writer) in entries {
397            buf.extend_from_slice(&writer.finish());
398        }
399        buf
400    }
401}
402
403impl Default for ColumnarWriter {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409/// Bitpack i64 values as (value - min) residuals into `bit_width` bits each.
410fn bitpack_i64(values: &[ColumnValue], min_val: i64, bit_width: u8, buf: &mut Vec<u8>) {
411    if bit_width == 0 {
412        return; // All values equal min (should have been caught by constant encoding)
413    }
414    let num_bytes = (values.len() * bit_width as usize + 7) / 8;
415    let start = buf.len();
416    buf.resize(start + num_bytes, 0);
417
418    let mut bit_pos: usize = 0;
419    for v in values {
420        let val = match v {
421            ColumnValue::I64(n) => *n,
422            _ => min_val, // null → treated as min (won't be read due to null bitset)
423        };
424        let residual = (val - min_val) as u64;
425
426        // Write `bit_width` bits of `residual` starting at `bit_pos`
427        let mut remaining = bit_width as usize;
428        let mut bits = residual;
429        let mut pos = bit_pos;
430        while remaining > 0 {
431            let byte_idx = start + pos / 8;
432            let bit_offset = pos % 8;
433            let can_write = (8 - bit_offset).min(remaining);
434            let mask = ((1u64 << can_write) - 1) as u8;
435            buf[byte_idx] |= ((bits as u8) & mask) << bit_offset;
436            bits >>= can_write;
437            pos += can_write;
438            remaining -= can_write;
439        }
440        bit_pos += bit_width as usize;
441    }
442}
443
444/// Read a single bitpacked value at the given index.
445pub(crate) fn unpack_i64(data: &[u8], index: usize, min_val: i64, bit_width: u8) -> i64 {
446    if bit_width == 0 {
447        return min_val;
448    }
449    let bit_pos = index * bit_width as usize;
450    let mut result: u64 = 0;
451    let mut remaining = bit_width as usize;
452    let mut pos = bit_pos;
453    let mut shift = 0;
454    while remaining > 0 {
455        let byte_idx = pos / 8;
456        let bit_offset = pos % 8;
457        let can_read = (8 - bit_offset).min(remaining);
458        let mask = ((1u64 << can_read) - 1) as u8;
459        let bits = (data[byte_idx] >> bit_offset) & mask;
460        result |= (bits as u64) << shift;
461        pos += can_read;
462        shift += can_read;
463        remaining -= can_read;
464    }
465    min_val + result as i64
466}
467
468#[derive(Clone, Copy, Debug, PartialEq, Eq)]
469#[repr(u8)]
470pub(crate) enum ColumnType {
471    Empty = 0,
472    /// Eager keyword dictionary (pre-v3 layout):
473    /// `[dict_count][(len: u16)(bytes)×N][ordinals]`, no offset index. Retained
474    /// read-only for back-compat; the writer emits `KeywordBlocked` now.
475    /// See [[optimization-keyword-dict-offset-index]].
476    Keyword = 1,
477    I64 = 2,
478    F64 = 3,
479    Bool = 4,
480    /// All non-null i64 values are identical. Body: [value: i64].
481    ConstantI64 = 5,
482    /// All non-null f64 values are identical. Body: [value: f64].
483    ConstantF64 = 6,
484    /// Bitpacked i64: [min: i64][bit_width: u8][packed residuals].
485    /// Each value stored as (value - min) in `bit_width` bits.
486    BitpackedI64 = 7,
487    /// Keyword column with a persisted per-block dictionary offset index for
488    /// O(1) `ordinal → string` lookup. Body is the `Keyword` layout plus a
489    /// `[dict_body_len: u64]` and a
490    /// `[block_addrs: u64 × ceil(dict_count/DICT_BLOCK_SIZE)]` array. The writer
491    /// emits this for every keyword column as of format v3. Non-numeric, so
492    /// `is_numeric()` stays false and no stats block is written.
493    /// See [[optimization-keyword-dict-offset-index]].
494    KeywordBlocked = 8,
495}
496
497impl ColumnType {
498    /// True for types that store numeric stats (null_count, min, max) in the header.
499    pub(crate) fn is_numeric(self) -> bool {
500        matches!(
501            self,
502            ColumnType::I64
503                | ColumnType::F64
504                | ColumnType::ConstantI64
505                | ColumnType::ConstantF64
506                | ColumnType::BitpackedI64
507        )
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use crate::columnar::reader::ColumnReader;
515
516    #[test]
517    fn keyword_column_round_trip() {
518        let mut w = ColumnWriter::new(FieldId::new(0));
519        w.add(ColumnValue::Keyword("hello".into()));
520        w.add(ColumnValue::Keyword("world".into()));
521        w.add(ColumnValue::Keyword("hello".into()));
522        let data = w.finish();
523
524        let r = ColumnReader::open(&data);
525        assert_eq!(r.doc_count(), 3);
526        assert_eq!(r.keyword_value(0), Some("hello"));
527        assert_eq!(r.keyword_value(1), Some("world"));
528        assert_eq!(r.keyword_value(2), Some("hello"));
529    }
530
531    #[test]
532    fn i64_column_round_trip() {
533        let mut w = ColumnWriter::new(FieldId::new(1));
534        w.add(ColumnValue::I64(42));
535        w.add(ColumnValue::I64(-7));
536        w.add(ColumnValue::I64(0));
537        let data = w.finish();
538
539        let r = ColumnReader::open(&data);
540        assert_eq!(r.i64_value(0), Some(42));
541        assert_eq!(r.i64_value(1), Some(-7));
542        assert_eq!(r.i64_value(2), Some(0));
543    }
544
545    #[test]
546    fn f64_column_round_trip() {
547        let mut w = ColumnWriter::new(FieldId::new(2));
548        w.add(ColumnValue::F64(3.14));
549        w.add(ColumnValue::F64(-1.5));
550        let data = w.finish();
551
552        let r = ColumnReader::open(&data);
553        assert_eq!(r.f64_value(0), Some(3.14));
554        assert_eq!(r.f64_value(1), Some(-1.5));
555    }
556
557    #[test]
558    fn bool_column_round_trip() {
559        let mut w = ColumnWriter::new(FieldId::new(3));
560        w.add(ColumnValue::Bool(true));
561        w.add(ColumnValue::Bool(false));
562        w.add(ColumnValue::Bool(true));
563        let data = w.finish();
564
565        let r = ColumnReader::open(&data);
566        assert_eq!(r.bool_value(0), Some(true));
567        assert_eq!(r.bool_value(1), Some(false));
568        assert_eq!(r.bool_value(2), Some(true));
569    }
570
571    #[test]
572    fn null_handling() {
573        let mut w = ColumnWriter::new(FieldId::new(0));
574        w.add(ColumnValue::Keyword("a".into()));
575        w.add(ColumnValue::Null);
576        w.add(ColumnValue::Keyword("b".into()));
577        let data = w.finish();
578
579        let r = ColumnReader::open(&data);
580        assert_eq!(r.keyword_value(0), Some("a"));
581        assert!(r.is_null(1));
582        assert_eq!(r.keyword_value(1), None);
583        assert_eq!(r.keyword_value(2), Some("b"));
584    }
585
586    #[test]
587    fn dict_encoding_sorted() {
588        let mut w = ColumnWriter::new(FieldId::new(0));
589        w.add(ColumnValue::Keyword("cherry".into()));
590        w.add(ColumnValue::Keyword("apple".into()));
591        w.add(ColumnValue::Keyword("banana".into()));
592        w.add(ColumnValue::Keyword("apple".into()));
593        let data = w.finish();
594
595        let r = ColumnReader::open(&data);
596        assert_eq!(r.keyword_value(0), Some("cherry"));
597        assert_eq!(r.keyword_value(1), Some("apple"));
598        assert_eq!(r.keyword_value(2), Some("banana"));
599        assert_eq!(r.keyword_value(3), Some("apple"));
600    }
601
602    #[test]
603    fn empty_column() {
604        let w = ColumnWriter::new(FieldId::new(0));
605        let data = w.finish();
606
607        let r = ColumnReader::open(&data);
608        assert_eq!(r.doc_count(), 0);
609    }
610
611    #[test]
612    fn out_of_range() {
613        let mut w = ColumnWriter::new(FieldId::new(0));
614        w.add(ColumnValue::I64(1));
615        let data = w.finish();
616
617        let r = ColumnReader::open(&data);
618        assert_eq!(r.i64_value(99), None);
619    }
620
621    #[test]
622    fn constant_i64_encoding() {
623        let mut w = ColumnWriter::new(FieldId::new(0));
624        for _ in 0..100 {
625            w.add(ColumnValue::I64(42));
626        }
627        let data = w.finish();
628
629        let r = ColumnReader::open(&data);
630        assert_eq!(r.doc_count(), 100);
631        assert!(r.is_constant());
632        assert_eq!(r.constant_value(), Some(42.0));
633        // Callers use is_constant() + constant_value() to short-circuit,
634        // NOT i64_value() per doc (which doesn't handle constant encoding).
635        assert!(
636            data.len() < 100,
637            "constant encoding should be compact: {} bytes",
638            data.len()
639        );
640    }
641
642    #[test]
643    fn constant_f64_encoding() {
644        let mut w = ColumnWriter::new(FieldId::new(0));
645        for _ in 0..50 {
646            w.add(ColumnValue::F64(3.14));
647        }
648        let data = w.finish();
649
650        let r = ColumnReader::open(&data);
651        assert!(r.is_constant());
652        assert_eq!(r.constant_value(), Some(3.14));
653    }
654
655    #[test]
656    fn constant_with_nulls() {
657        let mut w = ColumnWriter::new(FieldId::new(0));
658        w.add(ColumnValue::I64(7));
659        w.add(ColumnValue::Null);
660        w.add(ColumnValue::I64(7));
661        w.add(ColumnValue::Null);
662        let data = w.finish();
663
664        let r = ColumnReader::open(&data);
665        assert!(r.is_constant());
666        assert_eq!(r.constant_value(), Some(7.0));
667        assert!(r.is_null(1));
668        assert!(r.is_null(3));
669        assert!(!r.is_null(0));
670        assert!(!r.is_null(2));
671    }
672
673    #[test]
674    fn non_constant_stays_raw() {
675        let mut w = ColumnWriter::new(FieldId::new(0));
676        w.add(ColumnValue::I64(1));
677        w.add(ColumnValue::I64(2));
678        let data = w.finish();
679
680        let r = ColumnReader::open(&data);
681        assert!(!r.is_constant());
682        assert_eq!(r.i64_value(0), Some(1));
683        assert_eq!(r.i64_value(1), Some(2));
684    }
685
686    #[test]
687    fn bitpacked_narrow_range() {
688        let mut w = ColumnWriter::new(FieldId::new(0));
689        // Values 100-115: range=15, needs 4 bits (vs 64 bits raw)
690        for i in 0..1000 {
691            w.add(ColumnValue::I64(100 + (i % 16)));
692        }
693        let data = w.finish();
694
695        let r = ColumnReader::open(&data);
696        assert_eq!(r.doc_count(), 1000);
697        assert_eq!(r.i64_value(0), Some(100));
698        assert_eq!(r.i64_value(1), Some(101));
699        assert_eq!(r.i64_value(15), Some(115));
700        assert_eq!(r.i64_value(16), Some(100));
701        assert_eq!(r.numeric_value(999), Some(107.0)); // 100 + (999 % 16)
702        // Bitpacked: header(7) + null_bitset(125) + min(8) + bit_width(1) + 500 bytes = ~641
703        // Raw: header(7) + null_bitset(125) + 8000 bytes = ~8132
704        assert!(
705            data.len() < 1000,
706            "bitpacked should be compact: {} bytes",
707            data.len()
708        );
709    }
710
711    #[test]
712    fn bitpacked_with_nulls() {
713        let mut w = ColumnWriter::new(FieldId::new(0));
714        w.add(ColumnValue::I64(10));
715        w.add(ColumnValue::Null);
716        w.add(ColumnValue::I64(13));
717        w.add(ColumnValue::Null);
718        w.add(ColumnValue::I64(11));
719        let data = w.finish();
720
721        let r = ColumnReader::open(&data);
722        assert_eq!(r.i64_value(0), Some(10));
723        assert_eq!(r.i64_value(1), None); // null
724        assert_eq!(r.i64_value(2), Some(13));
725        assert_eq!(r.i64_value(3), None); // null
726        assert_eq!(r.i64_value(4), Some(11));
727    }
728
729    #[test]
730    fn wide_range_stays_raw() {
731        let mut w = ColumnWriter::new(FieldId::new(0));
732        // Range too wide for bitpacking to save space
733        w.add(ColumnValue::I64(i64::MIN));
734        w.add(ColumnValue::I64(i64::MAX));
735        let data = w.finish();
736
737        let r = ColumnReader::open(&data);
738        assert_eq!(r.i64_value(0), Some(i64::MIN));
739        assert_eq!(r.i64_value(1), Some(i64::MAX));
740    }
741}