Skip to main content

luci/columnar/
reader.rs

1//! Columnar store reader: access column values by doc_id.
2//!
3//! See [[columnar-storage]] and [[feature-aggregations-v010#Step 1]].
4
5#[cfg(test)]
6use std::cell::Cell;
7use std::cell::OnceCell;
8
9use super::writer::{ColumnType, DICT_BLOCK_MASK, DICT_BLOCK_SHIFT, DICT_BLOCK_SIZE, unpack_i64};
10use crate::core::FieldId;
11
12/// Per-column statistics for numeric columns (zonemaps).
13#[derive(Clone, Debug)]
14pub struct ColumnStats {
15    pub null_count: u32,
16    pub min: f64,
17    pub max: f64,
18}
19
20/// Dispatched keyword-dictionary representation, replacing the eager
21/// `dict: Vec<&'a str>` so a `KeywordBlocked` column can resolve `ordinal →
22/// string` by random block seek without materializing the whole dictionary.
23/// See [[optimization-keyword-dict-offset-index]].
24enum KeywordDict<'a> {
25    /// Non-keyword (or empty) column — no dictionary.
26    None,
27    /// Legacy `Keyword = 1` column: eagerly parsed at open (back-compat).
28    Eager(Vec<&'a str>),
29    /// `KeywordBlocked = 8` column: random-access via the block-address array.
30    /// `dict` is the lazily-materialized eager `Vec`, built by `ensure_dict()`
31    /// only when a bulk consumer (sort/collapse/terms-agg) opts in. Random
32    /// consumers (`hit.id`, `fields=`) leave it empty and block-seek. `OnceCell`
33    /// is `!Sync` but `Send` (its `T = Vec<&str>` is `Send`), which suffices:
34    /// every bulk reader is thread-local under `par_iter`, never shared.
35    Blocked {
36        dict_body_start: usize,
37        block_addrs_start: usize,
38        num_entries: u32,
39        dict: OnceCell<Vec<&'a str>>,
40    },
41}
42
43/// Reads a single column's values from serialized bytes.
44pub struct ColumnReader<'a> {
45    data: &'a [u8],
46    field_id: FieldId,
47    col_type: ColumnType,
48    doc_count: u32,
49    null_bitset_start: usize,
50    body_start: usize,
51    /// For keyword columns: the dictionary (eager, or block-addressed + lazy).
52    keyword: KeywordDict<'a>,
53    /// For keyword columns: start of ordinal array
54    ordinals_start: usize,
55    /// For numeric columns: precomputed stats (zonemaps)
56    stats: Option<ColumnStats>,
57}
58
59// Lock in the auto-trait invariant: the `OnceCell` in `KeywordDict::Blocked` is
60// `Send` (its `T` is) but drops `Sync`, so `ColumnReader`/`OwnedColumn` stay
61// Send-but-not-Sync. `!Sync` is sound — every bulk reader is stack-local within a
62// per-segment `par_iter` closure and finished there; only string-keyed results
63// (sort keys, finished agg buckets) cross threads, never the reader. `Send` is
64// required transitively by the `Aggregator: Send` supertrait (`agg/mod.rs`);
65// assert it here as a tripwire so a future change making the reader `!Send` fails
66// *here*, loudly, instead of being papered over by one of the `unsafe impl Send`
67// collectors (`bucket.rs`, `hll.rs`) and miscompiling. `Sync` is intentionally
68// NOT asserted. See [[optimization-keyword-dict-offset-index]].
69const _: fn() = || {
70    fn assert_send<T: Send>() {}
71    assert_send::<ColumnReader<'static>>();
72};
73
74#[cfg(test)]
75thread_local! {
76    /// Test-only counter of real lazy-dictionary **builds** (not calls). Lives
77    /// at module scope so `ensure_dict()` can bump it from inside its
78    /// `get_or_init` closure; `mod tests` reads it via `super::DICT_BUILDS`.
79    /// Distinct from `owned.rs`'s `COLUMN_OPENS` (which counts
80    /// `OwnedColumn::new`). See Test 10 in
81    /// [[optimization-keyword-dict-offset-index]].
82    static DICT_BUILDS: Cell<u32> = const { Cell::new(0) };
83}
84
85impl<'a> ColumnReader<'a> {
86    pub fn open(data: &'a [u8]) -> Self {
87        if data.len() < 7 {
88            return Self::empty(data);
89        }
90
91        let field_id = FieldId::new(u16::from_le_bytes([data[0], data[1]]));
92        let col_type_byte = data[2];
93        let doc_count = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
94
95        let col_type = match col_type_byte {
96            0 => ColumnType::Empty,
97            1 => ColumnType::Keyword,
98            2 => ColumnType::I64,
99            3 => ColumnType::F64,
100            4 => ColumnType::Bool,
101            5 => ColumnType::ConstantI64,
102            6 => ColumnType::ConstantF64,
103            7 => ColumnType::BitpackedI64,
104            8 => ColumnType::KeywordBlocked,
105            _ => ColumnType::Empty,
106        };
107
108        if doc_count == 0 || col_type == ColumnType::Empty {
109            return Self {
110                data,
111                field_id,
112                col_type,
113                doc_count: 0,
114                null_bitset_start: 7,
115                body_start: 7,
116                keyword: KeywordDict::None,
117                ordinals_start: 0,
118                stats: None,
119            };
120        }
121
122        let null_bytes = (doc_count as usize + 7) / 8;
123        let null_bitset_start = 7;
124        let mut body_start = null_bitset_start + null_bytes;
125
126        // Parse numeric stats if present
127        let stats = if col_type.is_numeric() && body_start + 20 <= data.len() {
128            let null_count =
129                u32::from_le_bytes(data[body_start..body_start + 4].try_into().unwrap());
130            let min = f64::from_le_bytes(data[body_start + 4..body_start + 12].try_into().unwrap());
131            let max =
132                f64::from_le_bytes(data[body_start + 12..body_start + 20].try_into().unwrap());
133            body_start += 20; // skip past stats
134            Some(ColumnStats {
135                null_count,
136                min,
137                max,
138            })
139        } else {
140            None
141        };
142
143        let mut reader = Self {
144            data,
145            field_id,
146            col_type,
147            doc_count,
148            null_bitset_start,
149            body_start,
150            keyword: KeywordDict::None,
151            ordinals_start: 0,
152            stats,
153        };
154
155        match col_type {
156            // Legacy: eager parse (back-compat) — sets KeywordDict::Eager.
157            ColumnType::Keyword => reader.parse_keyword_dict(),
158            // New: record offsets, NO full parse. The dictionary body is walked
159            // lazily by ensure_dict(); random lookups block-seek directly.
160            ColumnType::KeywordBlocked => {
161                let mut pos = reader.body_start;
162                let num_entries = u32::from_le_bytes(reader.data[pos..pos + 4].try_into().unwrap());
163                pos += 4;
164                let body_len =
165                    u64::from_le_bytes(reader.data[pos..pos + 8].try_into().unwrap()) as usize;
166                pos += 8;
167                // Unreachable on supported (64-bit usize) targets; makes the
168                // "u64 prevents overflow" claim literally true. A >4 GiB body
169                // cannot be mmap'd into a &[u8] on 32-bit anyway.
170                debug_assert!(body_len as u64 <= usize::MAX as u64);
171                let dict_body_start = pos;
172                let block_addrs_start = dict_body_start + body_len;
173                let num_blocks = (num_entries as usize).div_ceil(DICT_BLOCK_SIZE);
174                reader.ordinals_start = block_addrs_start + num_blocks * 8;
175                reader.keyword = KeywordDict::Blocked {
176                    dict_body_start,
177                    block_addrs_start,
178                    num_entries,
179                    dict: OnceCell::new(),
180                };
181            }
182            _ => {}
183        }
184
185        reader
186    }
187
188    fn empty(data: &'a [u8]) -> Self {
189        Self {
190            data,
191            field_id: FieldId::new(0),
192            col_type: ColumnType::Empty,
193            doc_count: 0,
194            null_bitset_start: 0,
195            body_start: 0,
196            keyword: KeywordDict::None,
197            ordinals_start: 0,
198            stats: None,
199        }
200    }
201
202    fn parse_keyword_dict(&mut self) {
203        let mut pos = self.body_start;
204        let num_entries = u32::from_le_bytes(self.data[pos..pos + 4].try_into().unwrap()) as usize;
205        pos += 4;
206
207        let mut dict = Vec::with_capacity(num_entries);
208        for _ in 0..num_entries {
209            let len = u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
210            pos += 2;
211            let s = std::str::from_utf8(&self.data[pos..pos + len]).unwrap();
212            pos += len;
213            dict.push(s);
214        }
215        self.ordinals_start = pos;
216        self.keyword = KeywordDict::Eager(dict);
217    }
218
219    pub fn field_id(&self) -> FieldId {
220        self.field_id
221    }
222
223    pub(crate) fn col_type(&self) -> ColumnType {
224        self.col_type
225    }
226
227    pub fn doc_count(&self) -> u32 {
228        self.doc_count
229    }
230
231    pub fn is_null(&self, doc_id: u32) -> bool {
232        if doc_id >= self.doc_count {
233            return true;
234        }
235        let byte_idx = self.null_bitset_start + (doc_id as usize / 8);
236        let bit_idx = doc_id as usize % 8;
237        (self.data[byte_idx] >> bit_idx) & 1 == 1
238    }
239
240    /// Get the raw dictionary ordinal for a keyword field (no string decode).
241    pub fn keyword_ordinal(&self, doc_id: u32) -> Option<u32> {
242        if doc_id >= self.doc_count || self.is_null(doc_id) {
243            return None;
244        }
245        let pos = self.ordinals_start + doc_id as usize * 4;
246        let ordinal = u32::from_le_bytes(self.data[pos..pos + 4].try_into().unwrap());
247        if ordinal == u32::MAX {
248            None
249        } else {
250            Some(ordinal)
251        }
252    }
253
254    /// Number of unique values in the keyword dictionary.
255    pub fn dict_size(&self) -> usize {
256        match &self.keyword {
257            KeywordDict::None => 0,
258            KeywordDict::Eager(dict) => dict.len(),
259            // Read num_entries DIRECTLY, never the lazy OnceCell: dict_size() is
260            // called at collector construction (bucket.rs) BEFORE any
261            // ensure_dict(), so reading the unbuilt OnceCell would return 0 and
262            // silently degrade the terms-agg Vec::with_capacity pre-size.
263            KeywordDict::Blocked { num_entries, .. } => *num_entries as usize,
264        }
265    }
266
267    /// Resolve an ordinal back to its string value. `None` means "no such
268    /// ordinal"; invalid UTF-8 (corruption only) panics rather than returning
269    /// `None`, so the two are never conflated ([[code-must-not-lie]]).
270    pub fn ordinal_to_string(&self, ordinal: u32) -> Option<&'a str> {
271        match &self.keyword {
272            KeywordDict::None => None,
273            KeywordDict::Eager(dict) => dict.get(ordinal as usize).copied(),
274            KeywordDict::Blocked {
275                dict_body_start,
276                block_addrs_start,
277                num_entries,
278                dict,
279            } => {
280                if ordinal >= *num_entries {
281                    return None;
282                }
283                // BULK fast path: if a bulk consumer materialized the eager Vec
284                // via ensure_dict(), serve O(1) from it. Random consumers never
285                // populate it and fall through to the block seek below.
286                if let Some(v) = dict.get() {
287                    return v.get(ordinal as usize).copied();
288                }
289                // RANDOM path: block seek + ≤DICT_BLOCK_MASK-step walk, no Vec.
290                let block = (ordinal >> DICT_BLOCK_SHIFT) as usize;
291                let off = block_addrs_start + block * 8;
292                let addr = u64::from_le_bytes(self.data[off..off + 8].try_into().unwrap()) as usize;
293                let mut pos = dict_body_start + addr;
294                for _ in 0..(ordinal & DICT_BLOCK_MASK) {
295                    let len =
296                        u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
297                    pos += 2 + len;
298                }
299                let len = u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
300                pos += 2;
301                Some(
302                    std::str::from_utf8(&self.data[pos..pos + len])
303                        .expect("keyword dict bytes are valid UTF-8 by construction"),
304                )
305            }
306        }
307    }
308
309    /// Bulk opt-in: materialize the eager `Vec` once into the `OnceCell` so
310    /// subsequent `ordinal_to_string`/`keyword_value` calls take the O(1)
311    /// populated branch. No-op for `Eager`/`None`; idempotent for `Blocked`
312    /// (the `OnceCell` guarantees the O(N) body walk runs at most once). Bulk
313    /// consumers (sort/collapse/terms-agg `finish`) call this before their
314    /// resolve loop so N lookups are O(1) instead of O(DICT_BLOCK_SIZE) seeks.
315    pub fn ensure_dict(&self) {
316        if let KeywordDict::Blocked {
317            dict_body_start,
318            num_entries,
319            dict,
320            ..
321        } = &self.keyword
322        {
323            dict.get_or_init(|| {
324                #[cfg(test)]
325                DICT_BUILDS.with(|c| c.set(c.get() + 1));
326                let mut v = Vec::with_capacity(*num_entries as usize);
327                let mut pos = *dict_body_start;
328                for _ in 0..*num_entries {
329                    let len =
330                        u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
331                    pos += 2;
332                    v.push(
333                        std::str::from_utf8(&self.data[pos..pos + len])
334                            .expect("keyword dict bytes are valid UTF-8 by construction"),
335                    );
336                    pos += len;
337                }
338                v
339            });
340        }
341    }
342
343    /// Resolve a doc's keyword value: ordinal lookup then string decode (both
344    /// O(1) on a populated/eager dict, block-seek otherwise). Routes through the
345    /// bounds-checked `ordinal_to_string`, so a (corruption-only) out-of-range
346    /// ordinal yields `None` instead of panicking — making it consistent with
347    /// `ordinal_to_string`; for all writer-emitted data the values are identical.
348    pub fn keyword_value(&self, doc_id: u32) -> Option<&'a str> {
349        self.keyword_ordinal(doc_id)
350            .and_then(|o| self.ordinal_to_string(o))
351    }
352
353    pub fn i64_value(&self, doc_id: u32) -> Option<i64> {
354        if doc_id >= self.doc_count || self.is_null(doc_id) {
355            return None;
356        }
357        if self.col_type == ColumnType::BitpackedI64 {
358            let min_val = i64::from_le_bytes(
359                self.data[self.body_start..self.body_start + 8]
360                    .try_into()
361                    .unwrap(),
362            );
363            let bit_width = self.data[self.body_start + 8];
364            let packed_start = self.body_start + 9;
365            return Some(unpack_i64(
366                &self.data[packed_start..],
367                doc_id as usize,
368                min_val,
369                bit_width,
370            ));
371        }
372        let pos = self.body_start + doc_id as usize * 8;
373        Some(i64::from_le_bytes(
374            self.data[pos..pos + 8].try_into().unwrap(),
375        ))
376    }
377
378    pub fn f64_value(&self, doc_id: u32) -> Option<f64> {
379        if doc_id >= self.doc_count || self.is_null(doc_id) {
380            return None;
381        }
382        let pos = self.body_start + doc_id as usize * 8;
383        Some(f64::from_le_bytes(
384            self.data[pos..pos + 8].try_into().unwrap(),
385        ))
386    }
387
388    pub fn bool_value(&self, doc_id: u32) -> Option<bool> {
389        if doc_id >= self.doc_count || self.is_null(doc_id) {
390            return None;
391        }
392        let byte_idx = self.body_start + (doc_id as usize / 8);
393        let bit_idx = doc_id as usize % 8;
394        Some((self.data[byte_idx] >> bit_idx) & 1 == 1)
395    }
396
397    /// Get the numeric value as f64, regardless of underlying type.
398    /// Converts i64 → f64. Returns None for non-numeric, null, or constant
399    /// columns. For constant columns, use `is_constant()` + `constant_value()`.
400    pub fn numeric_value(&self, doc_id: u32) -> Option<f64> {
401        match self.col_type {
402            ColumnType::I64 | ColumnType::BitpackedI64 => self.i64_value(doc_id).map(|v| v as f64),
403            ColumnType::F64 => self.f64_value(doc_id),
404            _ => None,
405        }
406    }
407
408    /// Returns true if this column uses constant encoding (all non-null
409    /// values are identical). Enables O(1) aggregation fast-paths.
410    pub fn is_constant(&self) -> bool {
411        matches!(
412            self.col_type,
413            ColumnType::ConstantI64 | ColumnType::ConstantF64
414        )
415    }
416
417    /// Get the constant value if this is a constant-encoded column.
418    pub fn constant_value(&self) -> Option<f64> {
419        match self.col_type {
420            ColumnType::ConstantI64 => Some(i64::from_le_bytes(
421                self.data[self.body_start..self.body_start + 8]
422                    .try_into()
423                    .unwrap(),
424            ) as f64),
425            ColumnType::ConstantF64 => Some(f64::from_le_bytes(
426                self.data[self.body_start..self.body_start + 8]
427                    .try_into()
428                    .unwrap(),
429            )),
430            _ => None,
431        }
432    }
433
434    /// Get precomputed column statistics (min, max, null_count) for numeric
435    /// columns. Returns None for non-numeric types. Enables O(1) aggregation
436    /// pushdown for min/max/count on unfiltered queries.
437    pub fn stats(&self) -> Option<&ColumnStats> {
438        self.stats.as_ref()
439    }
440}
441
442/// Reads the columnar component of a segment — multiple columns.
443pub struct ColumnarReader<'a> {
444    data: &'a [u8],
445    columns: Vec<(FieldId, usize, usize)>, // (field_id, start_offset, end_offset)
446}
447
448impl<'a> ColumnarReader<'a> {
449    pub fn open(data: &'a [u8]) -> Self {
450        if data.len() < 2 {
451            return Self {
452                data,
453                columns: Vec::new(),
454            };
455        }
456
457        let num_columns = u16::from_le_bytes([data[0], data[1]]) as usize;
458        let mut pos = 2usize;
459        let mut columns = Vec::with_capacity(num_columns);
460
461        for _ in 0..num_columns {
462            let start = pos;
463            // Parse enough to find the end of this column
464            if pos + 7 > data.len() {
465                break;
466            }
467            let field_id = FieldId::new(u16::from_le_bytes([data[pos], data[pos + 1]]));
468            let col_type = data[pos + 2];
469            let doc_count = u32::from_le_bytes(data[pos + 3..pos + 7].try_into().unwrap()) as usize;
470            pos += 7;
471
472            if doc_count == 0 || col_type == 0 {
473                columns.push((field_id, start, pos));
474                continue;
475            }
476
477            // Skip null bitset
478            let null_bytes = (doc_count + 7) / 8;
479            pos += null_bytes;
480
481            // Skip numeric stats (null_count + min + max = 20 bytes)
482            let is_numeric = matches!(col_type, 2 | 3 | 5 | 6 | 7);
483            if is_numeric {
484                pos += 20;
485            }
486
487            // Skip body based on type
488            match col_type {
489                1 => {
490                    // Keyword: dictionary + ordinals
491                    let num_entries =
492                        u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
493                    pos += 4;
494                    for _ in 0..num_entries {
495                        let len =
496                            u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
497                        pos += 2 + len;
498                    }
499                    pos += doc_count * 4; // ordinals
500                }
501                8 => {
502                    // KeywordBlocked: O(1) skip past dict_count, body_len, the
503                    // dictionary body, the block-address array, and ordinals.
504                    let num_entries =
505                        u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
506                    pos += 4;
507                    let body_len =
508                        u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
509                    pos += 8;
510                    pos += body_len; // dict body
511                    pos += num_entries.div_ceil(DICT_BLOCK_SIZE) * 8; // block-address array
512                    pos += doc_count * 4; // ordinals
513                }
514                2 | 3 => {
515                    // i64 or f64: 8 bytes per doc
516                    pos += doc_count * 8;
517                }
518                4 => {
519                    // Bool: bitset
520                    pos += (doc_count + 7) / 8;
521                }
522                5 | 6 => {
523                    // ConstantI64 or ConstantF64: single 8-byte value
524                    pos += 8;
525                }
526                7 => {
527                    // BitpackedI64: min(8) + bit_width(1) + packed data
528                    let bit_width = data[pos + 8] as usize;
529                    pos += 9; // min + bit_width
530                    pos += (doc_count * bit_width + 7) / 8; // packed residuals
531                }
532                _ => {}
533            }
534
535            columns.push((field_id, start, pos));
536        }
537
538        Self { data, columns }
539    }
540
541    /// Get a column reader for a specific field.
542    pub fn column(&self, field_id: FieldId) -> Option<ColumnReader<'a>> {
543        for &(fid, start, end) in &self.columns {
544            if fid == field_id {
545                return Some(ColumnReader::open(&self.data[start..end]));
546            }
547        }
548        None
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::{ColumnReader, ColumnarReader, DICT_BUILDS, KeywordDict};
555    use crate::columnar::writer::{
556        ColumnType, ColumnValue, ColumnWriter, ColumnarWriter, DICT_BLOCK_SIZE,
557    };
558    use crate::core::FieldId;
559    use std::collections::HashMap;
560
561    /// Emit the pre-v3 `Keyword = 1` layout (no offset index) to exercise the
562    /// `Eager` back-compat read path. No production code emits this anymore, so
563    /// this helper is the only fixture for the legacy path. Layout:
564    /// `[field_id][type=1][doc_count][null_bitset][dict_count][(len:u16)(bytes)×N][ordinals:u32×doc_count]`.
565    fn write_legacy_keyword_column(field_id: u16, values: &[&str]) -> Vec<u8> {
566        let mut dict: Vec<String> = values.iter().map(|s| s.to_string()).collect();
567        dict.sort();
568        dict.dedup();
569        let mut ord_of: HashMap<String, u32> = HashMap::new();
570        for (i, t) in dict.iter().enumerate() {
571            ord_of.insert(t.clone(), i as u32);
572        }
573        let mut buf = Vec::new();
574        buf.extend_from_slice(&field_id.to_le_bytes());
575        buf.push(1u8); // ColumnType::Keyword
576        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
577        let null_bytes = values.len().div_ceil(8);
578        buf.resize(buf.len() + null_bytes, 0u8); // no nulls
579        buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
580        for t in &dict {
581            buf.extend_from_slice(&(t.len() as u16).to_le_bytes());
582            buf.extend_from_slice(t.as_bytes());
583        }
584        for v in values {
585            buf.extend_from_slice(&ord_of[*v].to_le_bytes());
586        }
587        buf
588    }
589
590    /// Test 1: read every ordinal of a >3-block dictionary back via the block
591    /// seek + intra-block walk, at every inner offset and across boundaries.
592    #[test]
593    fn keyword_offset_index_roundtrip() {
594        // Zero-padded so sorted dict order == numeric order: ordinal i == values[i].
595        let values: Vec<String> = (0..200).map(|i| format!("key_{i:04}")).collect();
596        let mut w = ColumnWriter::new(FieldId::new(0));
597        for v in &values {
598            w.add(ColumnValue::Keyword(v.clone()));
599        }
600        let data = w.finish();
601        let r = ColumnReader::open(&data);
602        assert_eq!(r.dict_size(), 200);
603        for (i, v) in values.iter().enumerate() {
604            assert_eq!(
605                r.ordinal_to_string(i as u32),
606                Some(v.as_str()),
607                "ordinal {i} mismatch"
608            );
609        }
610        assert_eq!(r.ordinal_to_string(200), None);
611    }
612
613    /// Test 2: per-doc keyword_value matches the source over 1000 docs with nulls.
614    #[test]
615    fn keyword_value_matches_doc() {
616        let mut w = ColumnWriter::new(FieldId::new(0));
617        let mut expected: Vec<Option<String>> = Vec::new();
618        for doc in 0..1000u32 {
619            if doc % 7 == 3 {
620                w.add(ColumnValue::Null);
621                expected.push(None);
622            } else {
623                let v = format!("val_{:03}", (doc.wrapping_mul(31).wrapping_add(17)) % 80);
624                w.add(ColumnValue::Keyword(v.clone()));
625                expected.push(Some(v));
626            }
627        }
628        let data = w.finish();
629        let r = ColumnReader::open(&data);
630        for (doc, exp) in expected.iter().enumerate() {
631            assert_eq!(
632                r.keyword_value(doc as u32),
633                exp.as_deref(),
634                "doc {doc} mismatch"
635            );
636        }
637    }
638
639    /// Test 4: single-entry dict (one block of size 1).
640    #[test]
641    fn keyword_single_entry() {
642        let mut w = ColumnWriter::new(FieldId::new(0));
643        w.add(ColumnValue::Keyword("only".into()));
644        let data = w.finish();
645        let r = ColumnReader::open(&data);
646        assert_eq!(r.dict_size(), 1);
647        assert_eq!(r.ordinal_to_string(0), Some("only"));
648        assert_eq!(r.ordinal_to_string(1), None);
649        assert_eq!(r.keyword_value(0), Some("only"));
650    }
651
652    /// Test 5: a zero-doc column reaches `KeywordDict::None`; and the extent
653    /// walk skips a blocked column correctly (the `8 =>` arm) to reach a later
654    /// column.
655    #[test]
656    fn keyword_none_column() {
657        let w = ColumnWriter::new(FieldId::new(0));
658        let data = w.finish(); // no values → write_empty → doc_count 0
659        let r = ColumnReader::open(&data);
660        assert!(matches!(r.keyword, KeywordDict::None));
661        assert_eq!(r.dict_size(), 0);
662        assert_eq!(r.ordinal_to_string(0), None);
663        assert_eq!(r.keyword_value(0), None);
664
665        // Extent walk must skip a blocked column (the `8 =>` arm) to land
666        // exactly on the next column.
667        let mut cw = ColumnarWriter::new();
668        cw.add(FieldId::new(0), ColumnValue::Keyword("alpha".into()));
669        cw.add(FieldId::new(1), ColumnValue::Keyword("beta".into()));
670        cw.pad_to(1);
671        let cdata = cw.finish();
672        let cr = ColumnarReader::open(&cdata);
673        assert_eq!(
674            cr.column(FieldId::new(0)).unwrap().keyword_value(0),
675            Some("alpha")
676        );
677        assert_eq!(
678            cr.column(FieldId::new(1)).unwrap().keyword_value(0),
679            Some("beta")
680        );
681    }
682
683    /// Test 6: a hand-emitted legacy `Keyword = 1` column reads via the `Eager`
684    /// back-compat path.
685    #[test]
686    fn legacy_keyword_column_still_reads() {
687        let data = write_legacy_keyword_column(0, &["cherry", "apple", "banana", "apple"]);
688        let r = ColumnReader::open(&data);
689        assert_eq!(r.col_type(), ColumnType::Keyword);
690        assert!(matches!(r.keyword, KeywordDict::Eager(_)));
691        assert_eq!(r.dict_size(), 3); // apple, banana, cherry (sorted, deduped)
692        assert_eq!(r.keyword_value(0), Some("cherry"));
693        assert_eq!(r.keyword_value(1), Some("apple"));
694        assert_eq!(r.keyword_value(2), Some("banana"));
695        assert_eq!(r.keyword_value(3), Some("apple"));
696        assert_eq!(r.ordinal_to_string(0), Some("apple"));
697        assert_eq!(r.ordinal_to_string(2), Some("cherry"));
698    }
699
700    /// Test 8: exhaustive boundaries at N ∈ {K, K+1, 2K} catch off-by-one in
701    /// `ord >> SHIFT` / `& MASK` at exact-multiple and partial-block edges.
702    #[test]
703    fn block_boundary_exhaustive() {
704        let k = DICT_BLOCK_SIZE;
705        for &n in &[k, k + 1, 2 * k] {
706            let values: Vec<String> = (0..n).map(|i| format!("k{i:05}")).collect();
707            let mut w = ColumnWriter::new(FieldId::new(0));
708            for v in &values {
709                w.add(ColumnValue::Keyword(v.clone()));
710            }
711            let data = w.finish();
712            let r = ColumnReader::open(&data);
713            assert_eq!(r.dict_size(), n, "N={n} dict_size");
714            // Every ordinal round-trips, including the block boundaries.
715            for (i, v) in values.iter().enumerate() {
716                assert_eq!(
717                    r.ordinal_to_string(i as u32),
718                    Some(v.as_str()),
719                    "N={n} ordinal {i}"
720                );
721            }
722            assert_eq!(r.ordinal_to_string(n as u32), None, "N={n} out-of-range");
723        }
724    }
725
726    /// Test 10: the lazy dictionary builds at most once (the Option-B
727    /// load-bearing invariant), and `dict_size()` is correct before any build.
728    #[test]
729    fn ensure_dict_builds_once() {
730        let values: Vec<String> = (0..200).map(|i| format!("v{i:04}")).collect();
731        let mut w = ColumnWriter::new(FieldId::new(0));
732        for v in &values {
733            w.add(ColumnValue::Keyword(v.clone()));
734        }
735        let data = w.finish();
736        let r = ColumnReader::open(&data);
737
738        // (c) dict_size() reads num_entries directly — correct BEFORE any build.
739        assert_eq!(r.dict_size(), 200);
740
741        // (a) idempotent: 1000 ensure_dict() calls → exactly one build.
742        DICT_BUILDS.with(|c| c.set(0));
743        for _ in 0..1000 {
744            r.ensure_dict();
745        }
746        assert_eq!(DICT_BUILDS.with(|c| c.get()), 1);
747
748        // (b) a 5000-lookup bulk pass serves from the populated Vec, no rebuild.
749        for i in 0..5000u32 {
750            let ord = i % 200;
751            assert_eq!(
752                r.ordinal_to_string(ord),
753                Some(values[ord as usize].as_str())
754            );
755        }
756        assert_eq!(DICT_BUILDS.with(|c| c.get()), 1);
757    }
758
759    /// Test 12: reading a corrupt (invalid-UTF-8) dict entry fails LOUD — never
760    /// a silent `None` (which is reserved for "no such ordinal").
761    #[test]
762    #[should_panic(expected = "valid UTF-8")]
763    fn blocked_corrupt_entry_fails_loud() {
764        let mut w = ColumnWriter::new(FieldId::new(0));
765        for i in 0..10 {
766            w.add(ColumnValue::Keyword(format!("value{i}")));
767        }
768        let mut data = w.finish();
769        // Flip the first 'v' (in "value0", the body's first entry) to an
770        // invalid UTF-8 start byte. Length prefixes precede it and are not 'v'.
771        let pos = data
772            .iter()
773            .position(|&b| b == b'v')
774            .expect("dict body contains 'v'");
775        data[pos] = 0xFF;
776        let r = ColumnReader::open(&data);
777        for ord in 0..r.dict_size() as u32 {
778            let _ = r.ordinal_to_string(ord);
779        }
780    }
781}