Skip to main content

hermes_core/structures/fast_field/
mod.rs

1//! Fast field columnar storage for efficient filtering and sorting.
2//!
3//! Stores one column per fast-field, indexed by doc_id for O(1) access.
4//! Supports u64, i64, f64, and text (dictionary-encoded ordinal) columns.
5//! Both single-valued and multi-valued columns are supported.
6//!
7//! ## File format (`.fast` — version FST2)
8//!
9//! ```text
10//! [column 0 blocked data] [column 1 blocked data] ... [column N blocked data]
11//! [TOC: FastFieldTocEntry × num_columns]
12//! [footer: toc_offset(8) + num_columns(4) + magic(4)]  = 16 bytes
13//! ```
14//!
15//! ## Blocked column format
16//!
17//! Each column's data region is a sequence of independently-decodable blocks:
18//!
19//! ```text
20//! [num_blocks: u32]
21//! [block_index: BlockIndexEntry × num_blocks]   (16 bytes each)
22//! [block_0 data] [block_0 dict?] [block_1 data] [block_1 dict?] ...
23//! ```
24//!
25//! `BlockIndexEntry`: num_docs(4) + data_len(4) + dict_count(4) + dict_len(4)
26//!
27//! Fresh segments produce a single block. Merges stack blocks from source
28//! segments via raw byte copy (memcpy) — no per-value decode/re-encode.
29//!
30//! ## Codecs (auto-selected per block at build time)
31//!
32//! | ID | Codec           | Description                               |
33//! |----|-----------------|-------------------------------------------|
34//! |  0 | Constant        | All values identical — 0 data bytes       |
35//! |  1 | Bitpacked       | min-subtract + global bitpack             |
36//! |  2 | Linear          | Regression line + bitpacked residuals     |
37//! |  3 | BlockwiseLinear | Per-512-block linear + residuals          |
38
39pub mod codec;
40
41use std::collections::BTreeMap;
42use std::io::{self, Read, Write};
43use std::sync::OnceLock;
44
45use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
46
47// ── Constants ─────────────────────────────────────────────────────────────
48
49/// Magic number for `.fast` file footer — FST2 (auto-codec + multi-value)
50pub const FAST_FIELD_MAGIC: u32 = 0x32545346;
51
52/// Footer size: toc_offset(8) + num_columns(4) + magic(4) = 16
53pub const FAST_FIELD_FOOTER_SIZE: u64 = 16;
54
55/// Sentinel for missing / absent values in any fast-field column type.
56///
57/// - **Text**: document has no value → ordinal stored as `u64::MAX`
58/// - **Numeric (u64/i64/f64)**: document has no value → raw stored as `u64::MAX`
59///
60/// Callers should check `raw != FAST_FIELD_MISSING` before interpreting
61/// the value as a real number or ordinal.
62pub const FAST_FIELD_MISSING: u64 = u64::MAX;
63
64// ── Column type ───────────────────────────────────────────────────────────
65
66/// Type of a fast-field column (stored in TOC).
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68#[repr(u8)]
69pub enum FastFieldColumnType {
70    U64 = 0,
71    I64 = 1,
72    F64 = 2,
73    TextOrdinal = 3,
74}
75
76impl FastFieldColumnType {
77    pub fn from_u8(v: u8) -> Option<Self> {
78        match v {
79            0 => Some(Self::U64),
80            1 => Some(Self::I64),
81            2 => Some(Self::F64),
82            3 => Some(Self::TextOrdinal),
83            _ => None,
84        }
85    }
86}
87
88// ── Encoding helpers ──────────────────────────────────────────────────────
89
90/// Zigzag-encode an i64 to u64 (small absolute values → small u64).
91#[inline]
92pub fn zigzag_encode(v: i64) -> u64 {
93    ((v << 1) ^ (v >> 63)) as u64
94}
95
96/// Zigzag-decode a u64 back to i64.
97#[inline]
98pub fn zigzag_decode(v: u64) -> i64 {
99    ((v >> 1) as i64) ^ -((v & 1) as i64)
100}
101
102/// Encode f64 to u64 preserving total order.
103/// Positive floats: flip sign bit (so they sort above negatives).
104/// Negative floats: flip all bits (so they sort in reverse magnitude).
105#[inline]
106pub fn f64_to_sortable_u64(f: f64) -> u64 {
107    let bits = f.to_bits();
108    if (bits >> 63) == 0 {
109        bits ^ (1u64 << 63) // positive: flip sign bit
110    } else {
111        !bits // negative: flip all bits
112    }
113}
114
115/// Decode sortable u64 back to f64.
116#[inline]
117pub fn sortable_u64_to_f64(v: u64) -> f64 {
118    let bits = if (v >> 63) != 0 {
119        v ^ (1u64 << 63) // was positive: unflip sign bit
120    } else {
121        !v // was negative: unflip all bits
122    };
123    f64::from_bits(bits)
124}
125
126/// Minimum number of bits needed to represent `val`.
127#[inline]
128pub fn bits_needed_u64(val: u64) -> u8 {
129    if val == 0 {
130        0
131    } else {
132        64 - val.leading_zeros() as u8
133    }
134}
135
136// ── Bit-packing ───────────────────────────────────────────────────────────
137
138/// Pack `values` at `bits_per_value` bits each into `out`.
139/// `out` must be large enough: `ceil(values.len() * bits_per_value / 8)` bytes.
140pub fn bitpack_write(values: &[u64], bits_per_value: u8, out: &mut Vec<u8>) {
141    if bits_per_value == 0 {
142        return; // all values are the same (constant column)
143    }
144    let bpv = bits_per_value as usize;
145    let total_bits = values.len() * bpv;
146    let total_bytes = total_bits.div_ceil(8);
147    out.reserve(total_bytes);
148
149    let start = out.len();
150    out.resize(start + total_bytes, 0);
151    let buf = &mut out[start..];
152
153    for (i, &val) in values.iter().enumerate() {
154        let bit_offset = i * bpv;
155        let byte_offset = bit_offset / 8;
156        let bit_shift = bit_offset % 8;
157
158        // Write across byte boundaries (up to 9 bytes for 64-bit values)
159        let mut remaining_bits = bpv;
160        let mut v = val;
161        let mut bo = byte_offset;
162        let mut bs = bit_shift;
163
164        while remaining_bits > 0 {
165            let can_write = (8 - bs).min(remaining_bits);
166            let mask = (1u64 << can_write) - 1;
167            buf[bo] |= ((v & mask) << bs) as u8;
168            v >>= can_write;
169            remaining_bits -= can_write;
170            bo += 1;
171            bs = 0;
172        }
173    }
174}
175
176/// Read value at `index` from bit-packed data.
177///
178/// Fast path: reads a single unaligned u64 (LE) covering the target bits,
179/// shifts and masks. This compiles to ~4 instructions on x86/ARM and avoids
180/// the per-byte loop entirely for bpv ≤ 56.
181#[inline]
182pub fn bitpack_read(data: &[u8], bits_per_value: u8, index: usize) -> u64 {
183    if bits_per_value == 0 {
184        return 0;
185    }
186    let bpv = bits_per_value as usize;
187    let bit_offset = index * bpv;
188    let byte_offset = bit_offset / 8;
189    let bit_shift = bit_offset % 8;
190
191    // Fast path: single unaligned LE u64 load, shift, and mask.
192    // Valid when all needed bits fit within 8 bytes: bit_shift + bpv ≤ 64.
193    if bit_shift + bpv <= 64 && byte_offset + 8 <= data.len() {
194        let raw = u64::from_le_bytes(data[byte_offset..byte_offset + 8].try_into().unwrap());
195        let mask = if bpv >= 64 {
196            u64::MAX
197        } else {
198            (1u64 << bpv) - 1
199        };
200        return (raw >> bit_shift) & mask;
201    }
202
203    // Slow path for the last few values near the end of the buffer
204    let mut result: u64 = 0;
205    let mut remaining_bits = bpv;
206    let mut bo = byte_offset;
207    let mut bs = bit_shift;
208    let mut out_shift = 0;
209
210    while remaining_bits > 0 {
211        let can_read = (8 - bs).min(remaining_bits);
212        let mask = ((1u64 << can_read) - 1) as u8;
213        let byte_val = if bo < data.len() { data[bo] } else { 0 };
214        result |= (((byte_val >> bs) & mask) as u64) << out_shift;
215        remaining_bits -= can_read;
216        out_shift += can_read;
217        bo += 1;
218        bs = 0;
219    }
220
221    result
222}
223
224// ── TOC entry ─────────────────────────────────────────────────────────────
225
226/// On-disk TOC entry for a fast-field column (FST2 format).
227///
228/// Wire: field_id(4) + column_type(1) + flags(1) + data_offset(8) + data_len(8) +
229///       num_docs(4) + dict_offset(8) + dict_count(4) = 38 bytes
230///
231/// The `flags` byte encodes:
232///   bit 0: multi-valued column (offset+value sub-columns)
233///
234/// For multi-valued columns, the data region contains:
235///   [offset column (auto-codec)] [value column (auto-codec)]
236///   with a 4-byte length prefix for the offset column so the reader knows where
237///   the value column starts.
238#[derive(Debug, Clone)]
239pub struct FastFieldTocEntry {
240    pub field_id: u32,
241    pub column_type: FastFieldColumnType,
242    pub multi: bool,
243    pub data_offset: u64,
244    pub data_len: u64,
245    pub num_docs: u32,
246    /// Byte offset of the text dictionary section (0 for numeric columns).
247    pub dict_offset: u64,
248    /// Number of entries in the text dictionary (0 for numeric columns).
249    pub dict_count: u32,
250}
251
252/// FST2 TOC entry size: field_id(4)+column_type(1)+flags(1)+data_offset(8)+data_len(8)+num_docs(4)+dict_offset(8)+dict_count(4) = 38
253pub const FAST_FIELD_TOC_ENTRY_SIZE: usize = 4 + 1 + 1 + 8 + 8 + 4 + 8 + 4; // 38
254
255// ── Block index entry ─────────────────────────────────────────────────────
256
257/// On-disk index entry for one block within a blocked column.
258///
259/// Wire: num_docs(4) + data_len(4) + dict_count(4) + dict_len(4) = 16 bytes
260#[derive(Debug, Clone)]
261pub struct BlockIndexEntry {
262    pub num_docs: u32,
263    pub data_len: u32,
264    pub dict_count: u32,
265    pub dict_len: u32,
266}
267
268pub const BLOCK_INDEX_ENTRY_SIZE: usize = 16;
269
270impl BlockIndexEntry {
271    pub fn write_to(&self, w: &mut dyn Write) -> io::Result<()> {
272        w.write_u32::<LittleEndian>(self.num_docs)?;
273        w.write_u32::<LittleEndian>(self.data_len)?;
274        w.write_u32::<LittleEndian>(self.dict_count)?;
275        w.write_u32::<LittleEndian>(self.dict_len)?;
276        Ok(())
277    }
278
279    pub fn read_from(r: &mut dyn Read) -> io::Result<Self> {
280        let num_docs = r.read_u32::<LittleEndian>()?;
281        let data_len = r.read_u32::<LittleEndian>()?;
282        let dict_count = r.read_u32::<LittleEndian>()?;
283        let dict_len = r.read_u32::<LittleEndian>()?;
284        Ok(Self {
285            num_docs,
286            data_len,
287            dict_count,
288            dict_len,
289        })
290    }
291}
292
293impl FastFieldTocEntry {
294    pub fn write_to(&self, w: &mut dyn Write) -> io::Result<()> {
295        w.write_u32::<LittleEndian>(self.field_id)?;
296        w.write_u8(self.column_type as u8)?;
297        let flags: u8 = if self.multi { 1 } else { 0 };
298        w.write_u8(flags)?;
299        w.write_u64::<LittleEndian>(self.data_offset)?;
300        w.write_u64::<LittleEndian>(self.data_len)?;
301        w.write_u32::<LittleEndian>(self.num_docs)?;
302        w.write_u64::<LittleEndian>(self.dict_offset)?;
303        w.write_u32::<LittleEndian>(self.dict_count)?;
304        Ok(())
305    }
306
307    pub fn read_from(r: &mut dyn Read) -> io::Result<Self> {
308        let field_id = r.read_u32::<LittleEndian>()?;
309        let ct = r.read_u8()?;
310        let column_type = FastFieldColumnType::from_u8(ct)
311            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad column type"))?;
312        let flags = r.read_u8()?;
313        let multi = (flags & 1) != 0;
314        let data_offset = r.read_u64::<LittleEndian>()?;
315        let data_len = r.read_u64::<LittleEndian>()?;
316        let num_docs = r.read_u32::<LittleEndian>()?;
317        let dict_offset = r.read_u64::<LittleEndian>()?;
318        let dict_count = r.read_u32::<LittleEndian>()?;
319        Ok(Self {
320            field_id,
321            column_type,
322            multi,
323            data_offset,
324            data_len,
325            num_docs,
326            dict_offset,
327            dict_count,
328        })
329    }
330}
331
332// ── Writer ────────────────────────────────────────────────────────────────
333
334/// Collects values during indexing and serializes a single fast-field column.
335///
336/// Supports both single-valued and multi-valued columns.
337/// For multi-valued columns, values are stored in a flat array with an
338/// offset column that maps doc_id → value range.
339pub struct FastFieldWriter {
340    pub column_type: FastFieldColumnType,
341    /// Whether this is a multi-valued column.
342    pub multi: bool,
343
344    // ── Single-valued state ──
345    /// Raw u64 values indexed by local doc_id (single-value mode).
346    values: Vec<u64>,
347
348    // ── Multi-valued state ──
349    /// Flat list of all values (multi-value mode).
350    multi_values: Vec<u64>,
351    /// Per-doc cumulative offset into `multi_values`. Length = num_docs + 1.
352    /// offsets[doc_id]..offsets[doc_id+1] is the value range for doc_id.
353    multi_offsets: Vec<u32>,
354    /// Current doc_id being filled (for multi-value sequential writes).
355    multi_current_doc: u32,
356
357    // ── Text state (shared) ──
358    /// For TextOrdinal: maps original string → insertion order.
359    text_values: Option<BTreeMap<String, u32>>,
360    /// For TextOrdinal single-value: per-doc string values (parallel to `values`).
361    text_per_doc: Option<Vec<Option<String>>>,
362    /// For TextOrdinal multi-value: per-value strings (parallel to `multi_values`).
363    text_multi_values: Option<Vec<String>>,
364}
365
366impl FastFieldWriter {
367    /// Create a writer for a single-valued numeric column (u64/i64/f64).
368    pub fn new_numeric(column_type: FastFieldColumnType) -> Self {
369        debug_assert!(matches!(
370            column_type,
371            FastFieldColumnType::U64 | FastFieldColumnType::I64 | FastFieldColumnType::F64
372        ));
373        Self {
374            column_type,
375            multi: false,
376            values: Vec::new(),
377            multi_values: Vec::new(),
378            multi_offsets: vec![0],
379            multi_current_doc: 0,
380            text_values: None,
381            text_per_doc: None,
382            text_multi_values: None,
383        }
384    }
385
386    /// Create a writer for a multi-valued numeric column.
387    pub fn new_numeric_multi(column_type: FastFieldColumnType) -> Self {
388        debug_assert!(matches!(
389            column_type,
390            FastFieldColumnType::U64 | FastFieldColumnType::I64 | FastFieldColumnType::F64
391        ));
392        Self {
393            column_type,
394            multi: true,
395            values: Vec::new(),
396            multi_values: Vec::new(),
397            multi_offsets: vec![0],
398            multi_current_doc: 0,
399            text_values: None,
400            text_per_doc: None,
401            text_multi_values: None,
402        }
403    }
404
405    /// Create a writer for a single-valued text ordinal column.
406    pub fn new_text() -> Self {
407        Self {
408            column_type: FastFieldColumnType::TextOrdinal,
409            multi: false,
410            values: Vec::new(),
411            multi_values: Vec::new(),
412            multi_offsets: vec![0],
413            multi_current_doc: 0,
414            text_values: Some(BTreeMap::new()),
415            text_per_doc: Some(Vec::new()),
416            text_multi_values: None,
417        }
418    }
419
420    /// Create a writer for a multi-valued text ordinal column.
421    pub fn new_text_multi() -> Self {
422        Self {
423            column_type: FastFieldColumnType::TextOrdinal,
424            multi: true,
425            values: Vec::new(),
426            multi_values: Vec::new(),
427            multi_offsets: vec![0],
428            multi_current_doc: 0,
429            text_values: Some(BTreeMap::new()),
430            text_per_doc: None,
431            text_multi_values: Some(Vec::new()),
432        }
433    }
434
435    /// Record a numeric value for `doc_id`. Fills gaps with 0.
436    /// For single-value mode only.
437    pub fn add_u64(&mut self, doc_id: u32, value: u64) {
438        if self.multi {
439            self.add_multi_u64(doc_id, value);
440            return;
441        }
442        let idx = doc_id as usize;
443        if idx >= self.values.len() {
444            self.values.resize(idx + 1, FAST_FIELD_MISSING);
445            if let Some(ref mut tpd) = self.text_per_doc {
446                tpd.resize(idx + 1, None);
447            }
448        }
449        self.values[idx] = value;
450    }
451
452    /// Record a value in multi-value mode.
453    fn add_multi_u64(&mut self, doc_id: u32, value: u64) {
454        // Pad offsets for any skipped doc_ids
455        while self.multi_current_doc < doc_id {
456            self.multi_current_doc += 1;
457            self.multi_offsets.push(self.multi_values.len() as u32);
458        }
459        // Ensure offset exists for current doc
460        if self.multi_current_doc == doc_id && self.multi_offsets.len() == doc_id as usize + 1 {
461            // offset for doc_id already exists as the last entry
462        }
463        self.multi_values.push(value);
464    }
465
466    /// Record an i64 value (zigzag-encoded).
467    pub fn add_i64(&mut self, doc_id: u32, value: i64) {
468        self.add_u64(doc_id, zigzag_encode(value));
469    }
470
471    /// Record an f64 value (sortable-encoded).
472    pub fn add_f64(&mut self, doc_id: u32, value: f64) {
473        self.add_u64(doc_id, f64_to_sortable_u64(value));
474    }
475
476    /// Record a text value (dictionary-encoded at build time).
477    pub fn add_text(&mut self, doc_id: u32, value: &str) {
478        if let Some(ref mut dict) = self.text_values {
479            let next_id = dict.len() as u32;
480            dict.entry(value.to_string()).or_insert(next_id);
481        }
482
483        if self.multi {
484            if let Some(ref mut tmv) = self.text_multi_values {
485                // Pad offsets for skipped docs
486                while self.multi_current_doc < doc_id {
487                    self.multi_current_doc += 1;
488                    self.multi_offsets.push(self.multi_values.len() as u32);
489                }
490                if self.multi_current_doc == doc_id
491                    && self.multi_offsets.len() == doc_id as usize + 1
492                {
493                    // offset already exists
494                }
495                self.multi_values.push(0); // placeholder, resolved later
496                tmv.push(value.to_string());
497            }
498        } else {
499            let idx = doc_id as usize;
500            if idx >= self.values.len() {
501                self.values.resize(idx + 1, FAST_FIELD_MISSING);
502            }
503            if let Some(ref mut tpd) = self.text_per_doc {
504                if idx >= tpd.len() {
505                    tpd.resize(idx + 1, None);
506                }
507                tpd[idx] = Some(value.to_string());
508            }
509        }
510    }
511
512    /// Ensure the column covers `num_docs` entries.
513    ///
514    /// Absent entries are filled with [`FAST_FIELD_MISSING`] for single-value
515    /// columns, or with empty offset ranges for multi-value columns.
516    pub fn pad_to(&mut self, num_docs: u32) {
517        let n = num_docs as usize;
518        if self.multi {
519            while (self.multi_offsets.len() as u32) <= num_docs {
520                self.multi_offsets.push(self.multi_values.len() as u32);
521            }
522            self.multi_current_doc = num_docs;
523        } else {
524            if self.values.len() < n {
525                self.values.resize(n, FAST_FIELD_MISSING);
526                if let Some(ref mut tpd) = self.text_per_doc {
527                    tpd.resize(n, None);
528                }
529            }
530        }
531    }
532
533    /// Number of documents in this column.
534    pub fn num_docs(&self) -> u32 {
535        if self.multi {
536            // offsets has num_docs+1 entries
537            (self.multi_offsets.len() as u32).saturating_sub(1)
538        } else {
539            self.values.len() as u32
540        }
541    }
542
543    /// Serialize column data using blocked format with auto-selecting codec.
544    ///
545    /// Writes a single block: [num_blocks(4)] [BlockIndexEntry] [block_data] [block_dict?]
546    /// Returns `(toc_entry, total_bytes_written)`.
547    pub fn serialize(
548        &mut self,
549        writer: &mut dyn Write,
550        data_offset: u64,
551    ) -> io::Result<(FastFieldTocEntry, u64)> {
552        // For text ordinal: resolve strings to sorted ordinals
553        if self.column_type == FastFieldColumnType::TextOrdinal {
554            self.resolve_text_ordinals();
555        }
556
557        let num_docs = self.num_docs();
558
559        // Serialize block data into a temp buffer to measure lengths
560        let mut block_data = Vec::new();
561        if self.multi {
562            // Multi-value: write [offset_col_len(4)] [offset_col] [value_col]
563            let offsets_u64: Vec<u64> = self.multi_offsets.iter().map(|&v| v as u64).collect();
564            let mut offset_buf = Vec::new();
565            codec::serialize_auto(&offsets_u64, &mut offset_buf)?;
566
567            block_data.write_u32::<LittleEndian>(offset_buf.len() as u32)?;
568            block_data.write_all(&offset_buf)?;
569
570            codec::serialize_auto(&self.multi_values, &mut block_data)?;
571        } else {
572            codec::serialize_auto(&self.values, &mut block_data)?;
573        }
574
575        // Serialize text dictionary into temp buffer
576        let mut dict_buf = Vec::new();
577        let dict_count = if self.column_type == FastFieldColumnType::TextOrdinal {
578            let (count, _) = self.write_text_dictionary(&mut dict_buf)?;
579            count
580        } else {
581            0u32
582        };
583
584        // Build block index entry
585        let block_entry = BlockIndexEntry {
586            num_docs,
587            data_len: block_data.len() as u32,
588            dict_count,
589            dict_len: dict_buf.len() as u32,
590        };
591
592        // Write: num_blocks + block_index + block_data + block_dict
593        let mut total_bytes = 0u64;
594
595        writer.write_u32::<LittleEndian>(1u32)?; // num_blocks
596        total_bytes += 4;
597
598        block_entry.write_to(writer)?;
599        total_bytes += BLOCK_INDEX_ENTRY_SIZE as u64;
600
601        writer.write_all(&block_data)?;
602        total_bytes += block_data.len() as u64;
603
604        writer.write_all(&dict_buf)?;
605        total_bytes += dict_buf.len() as u64;
606
607        let toc = FastFieldTocEntry {
608            field_id: 0, // set by caller
609            column_type: self.column_type,
610            multi: self.multi,
611            data_offset,
612            data_len: total_bytes,
613            num_docs,
614            dict_offset: 0, // no longer used at TOC level (per-block dicts)
615            dict_count: 0,
616        };
617
618        Ok((toc, total_bytes))
619    }
620
621    /// Resolve text per-doc values to sorted ordinals.
622    fn resolve_text_ordinals(&mut self) {
623        let dict = self.text_values.as_ref().expect("text_values required");
624
625        // Build sorted ordinal map: BTreeMap iterates in sorted order
626        let sorted_ordinals: BTreeMap<&str, u64> = dict
627            .keys()
628            .enumerate()
629            .map(|(ord, key)| (key.as_str(), ord as u64))
630            .collect();
631
632        if self.multi {
633            // Multi-value: resolve multi_values via text_multi_values
634            if let Some(ref tmv) = self.text_multi_values {
635                for (i, text) in tmv.iter().enumerate() {
636                    self.multi_values[i] = sorted_ordinals[text.as_str()];
637                }
638            }
639        } else {
640            // Single-value: resolve values via text_per_doc
641            let tpd = self.text_per_doc.as_ref().expect("text_per_doc required");
642            for (i, doc_text) in tpd.iter().enumerate() {
643                match doc_text {
644                    Some(text) => {
645                        self.values[i] = sorted_ordinals[text.as_str()];
646                    }
647                    None => {
648                        self.values[i] = FAST_FIELD_MISSING;
649                    }
650                }
651            }
652        }
653    }
654
655    /// Write len-prefixed sorted strings. Returns (dict_count, bytes_written).
656    fn write_text_dictionary(&self, writer: &mut dyn Write) -> io::Result<(u32, u64)> {
657        let dict = self.text_values.as_ref().expect("text_values required");
658        let mut bytes_written = 0u64;
659
660        // BTreeMap keys are already sorted
661        let count = dict.len() as u32;
662        for key in dict.keys() {
663            let key_bytes = key.as_bytes();
664            writer.write_u32::<LittleEndian>(key_bytes.len() as u32)?;
665            writer.write_all(key_bytes)?;
666            bytes_written += 4 + key_bytes.len() as u64;
667        }
668
669        Ok((count, bytes_written))
670    }
671}
672
673// ── Reader ────────────────────────────────────────────────────────────────
674
675use crate::directories::OwnedBytes;
676
677/// One independently-decodable block within a blocked column.
678///
679/// All byte slices are zero-copy borrows from the mmap'd `.fast` file.
680pub struct ColumnBlock {
681    /// Number of docs before this block (for doc_id → block lookup).
682    pub cumulative_docs: u32,
683    /// Number of docs in this block.
684    pub num_docs: u32,
685    /// Auto-codec encoded data for this block (single-value or raw multi-value region).
686    pub data: OwnedBytes,
687    /// For multi-value blocks: offset sub-column.
688    pub offset_data: OwnedBytes,
689    /// For multi-value blocks: value sub-column.
690    pub value_data: OwnedBytes,
691    /// Per-block text dictionary (text columns only). Lazy — offsets built on first access.
692    pub dict: Option<TextDictReader>,
693    /// Raw dictionary bytes for this block (for merge: memcpy).
694    pub raw_dict: OwnedBytes,
695}
696
697/// Reads a single fast-field column from mmap/buffer with O(1) doc_id access.
698///
699/// A column is a sequence of independently-decodable blocks. Fresh segments
700/// have one block; merged segments may have multiple (one per source segment).
701///
702/// **Zero-copy**: all data is borrowed from the underlying mmap / `OwnedBytes`.
703///
704/// **Lazy text state**: for text-ordinal columns, the global merged dictionary
705/// and per-block ordinal maps are built lazily on first access (not at load time).
706/// This avoids scanning all dictionary pages from mmap during segment loading.
707pub struct FastFieldReader {
708    pub column_type: FastFieldColumnType,
709    pub num_docs: u32,
710    pub multi: bool,
711
712    /// Blocks in doc_id order.
713    blocks: Vec<ColumnBlock>,
714
715    /// Lazy-initialized text state (global dict + ordinal maps).
716    /// Built on first text-related access, not at load time.
717    text_state: OnceLock<TextState>,
718}
719
720/// Lazily-built state for text-ordinal columns.
721struct TextState {
722    /// Global merged dictionary across all blocks.
723    global_dict: TextDictReader,
724    /// Per-block ordinal maps: `ordinal_maps[block_idx][local_ord] → global_ord`.
725    /// Empty Vec for blocks without dicts or single-block columns (identity mapping).
726    ordinal_maps: Vec<Vec<u32>>,
727}
728
729impl FastFieldReader {
730    /// Open a blocked column from an `OwnedBytes` file buffer using a TOC entry.
731    ///
732    /// For text-ordinal columns, dictionary scanning and global dict merging are
733    /// deferred to first access — no mmap pages are touched for dict data here.
734    pub fn open(file_data: &OwnedBytes, toc: &FastFieldTocEntry) -> io::Result<Self> {
735        let region_start = toc.data_offset as usize;
736        let region_end = region_start + toc.data_len as usize;
737
738        if region_end > file_data.len() {
739            return Err(io::Error::new(
740                io::ErrorKind::UnexpectedEof,
741                "fast field data out of bounds",
742            ));
743        }
744
745        let raw = file_data.as_slice();
746
747        // Read num_blocks
748        let mut pos = region_start;
749        if pos + 4 > region_end {
750            return Err(io::Error::new(
751                io::ErrorKind::UnexpectedEof,
752                "fast field: missing num_blocks",
753            ));
754        }
755        let num_blocks = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
756        pos += 4;
757
758        // Read block index
759        let idx_size = num_blocks as usize * BLOCK_INDEX_ENTRY_SIZE;
760        if pos + idx_size > region_end {
761            return Err(io::Error::new(
762                io::ErrorKind::UnexpectedEof,
763                "fast field: block index truncated",
764            ));
765        }
766        let mut block_entries = Vec::with_capacity(num_blocks as usize);
767        {
768            let mut cursor = std::io::Cursor::new(&raw[pos..pos + idx_size]);
769            for _ in 0..num_blocks {
770                block_entries.push(BlockIndexEntry::read_from(&mut cursor)?);
771            }
772        }
773        pos += idx_size;
774
775        let empty = OwnedBytes::new(Vec::new());
776
777        // Parse each block's data + dict slices
778        let mut blocks = Vec::with_capacity(num_blocks as usize);
779        let mut cumulative = 0u32;
780
781        for entry in &block_entries {
782            let data_start = pos;
783            let data_end = data_start + entry.data_len as usize;
784            let dict_start = data_end;
785            let dict_end = dict_start + entry.dict_len as usize;
786
787            if dict_end > file_data.len() {
788                return Err(io::Error::new(
789                    io::ErrorKind::UnexpectedEof,
790                    "fast field: block data/dict truncated",
791                ));
792            }
793
794            // Parse multi-value sub-columns from block data
795            let (block_data, offset_data, value_data) = if toc.multi {
796                let block_raw = &raw[data_start..data_end];
797                if block_raw.len() < 4 {
798                    (empty.clone(), empty.clone(), empty.clone())
799                } else {
800                    let offset_col_len =
801                        u32::from_le_bytes(block_raw[0..4].try_into().unwrap()) as usize;
802                    let o_start = data_start + 4;
803                    let o_end = o_start + offset_col_len;
804                    let v_start = o_end;
805                    let v_end = data_end;
806                    (
807                        file_data.slice(data_start..data_end),
808                        file_data.slice(o_start..o_end),
809                        file_data.slice(v_start..v_end),
810                    )
811                }
812            } else {
813                (
814                    file_data.slice(data_start..data_end),
815                    empty.clone(),
816                    empty.clone(),
817                )
818            };
819
820            // Create lazy block dict — no scanning, just stores the data slice + count
821            let dict = if entry.dict_count > 0 {
822                Some(TextDictReader::new_lazy(
823                    file_data.slice(dict_start..dict_end),
824                    entry.dict_count,
825                ))
826            } else {
827                None
828            };
829
830            let raw_dict = if entry.dict_len > 0 {
831                file_data.slice(dict_start..dict_end)
832            } else {
833                empty.clone()
834            };
835
836            blocks.push(ColumnBlock {
837                cumulative_docs: cumulative,
838                num_docs: entry.num_docs,
839                data: block_data,
840                offset_data,
841                value_data,
842                dict,
843                raw_dict,
844            });
845
846            cumulative += entry.num_docs;
847            pos = dict_end;
848        }
849
850        Ok(Self {
851            column_type: toc.column_type,
852            num_docs: toc.num_docs,
853            multi: toc.multi,
854            blocks,
855            text_state: OnceLock::new(),
856        })
857    }
858
859    /// Lazily initialize and return the text state (global dict + ordinal maps).
860    /// Only called for text-ordinal columns.
861    fn ensure_text_state(&self) -> &TextState {
862        self.text_state
863            .get_or_init(|| Self::build_text_state(&self.blocks))
864    }
865
866    /// Build text state: global merged dictionary + per-block ordinal maps.
867    /// Called lazily on first text-related access (not at segment load time).
868    fn build_text_state(blocks: &[ColumnBlock]) -> TextState {
869        // Fast path: single block → block-local ordinals ARE global ordinals.
870        // No merging, no cloning, no ordinal map needed.
871        let blocks_with_dict = blocks.iter().filter(|b| b.dict.is_some()).count();
872        if blocks_with_dict <= 1 {
873            for block in blocks.iter() {
874                if let Some(ref dict) = block.dict {
875                    // Re-use the existing dict — no ordinal_map needed (identity mapping)
876                    return TextState {
877                        global_dict: TextDictReader::new_lazy(block.raw_dict.clone(), dict.len()),
878                        ordinal_maps: vec![Vec::new(); blocks.len()],
879                    };
880                }
881            }
882            // No blocks have dicts — return empty
883            return TextState {
884                global_dict: TextDictReader::new_lazy(OwnedBytes::new(Vec::new()), 0),
885                ordinal_maps: vec![Vec::new(); blocks.len()],
886            };
887        }
888
889        // Multi-block: merge sorted block dictionaries.
890        // Each block dict is already sorted, so we k-way merge in O(total_entries).
891        // Uses a BTreeMap to deduplicate and assign global ordinals.
892
893        // Phase 1: Collect unique strings → assign global ordinals.
894        //
895        // BTreeMap is sorted by key, so ordinals assigned by iterating values_mut()
896        // match the order that Phase 3 writes the dictionary (also key-sorted).
897        // This is critical: TextDictReader::ordinal() does binary search by position,
898        // so the ordinal_map values MUST equal the sorted position, not insertion order.
899        let mut unique_map: BTreeMap<String, u32> = BTreeMap::new();
900        for block in blocks.iter() {
901            if let Some(ref dict) = block.dict {
902                for ord in 0..dict.len() {
903                    if let Some(text) = dict.get(ord) {
904                        unique_map.entry(text.to_string()).or_insert(0);
905                    }
906                }
907            }
908        }
909        // Assign ordinals by sorted position (BTreeMap iterates keys in order).
910        for (i, value) in unique_map.values_mut().enumerate() {
911            *value = i as u32;
912        }
913
914        // Phase 2: Build per-block ordinal maps
915        let mut ordinal_maps = Vec::with_capacity(blocks.len());
916        for block in blocks.iter() {
917            if let Some(ref dict) = block.dict {
918                let mut map = Vec::with_capacity(dict.len() as usize);
919                for local_ord in 0..dict.len() {
920                    let text = dict
921                        .get(local_ord)
922                        .expect("block dict ordinal out of range");
923                    let global_ord = *unique_map
924                        .get(text)
925                        .expect("block dict entry not found in merged global dict");
926                    map.push(global_ord);
927                }
928                ordinal_maps.push(map);
929            } else {
930                ordinal_maps.push(Vec::new());
931            }
932        }
933
934        // Phase 3: Serialize global dict (sorted) into a buffer
935        let mut dict_buf = Vec::new();
936        let count = unique_map.len() as u32;
937        for s in unique_map.keys() {
938            let bytes = s.as_bytes();
939            dict_buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
940            dict_buf.extend_from_slice(bytes);
941        }
942
943        TextState {
944            global_dict: TextDictReader::new_lazy(OwnedBytes::new(dict_buf), count),
945            ordinal_maps,
946        }
947    }
948
949    /// Remap a block-local raw ordinal to a global ordinal using the ordinal map.
950    /// Returns raw unchanged for non-text columns, single-block columns, or missing ordinals.
951    #[inline]
952    fn remap_ordinal(&self, block_idx: usize, raw: u64) -> u64 {
953        if self.column_type == FastFieldColumnType::TextOrdinal
954            && raw != FAST_FIELD_MISSING
955            && self.blocks.len() > 1
956        {
957            let state = self.ensure_text_state();
958            let map = &state.ordinal_maps[block_idx];
959            if !map.is_empty() {
960                let idx = raw as usize;
961                if idx < map.len() {
962                    map[idx] as u64
963                } else {
964                    FAST_FIELD_MISSING
965                }
966            } else {
967                raw
968            }
969        } else {
970            raw
971        }
972    }
973
974    /// Find the block containing `doc_id`. Returns (block_index, local_doc_id).
975    #[inline]
976    fn find_block(&self, doc_id: u32) -> (usize, u32) {
977        debug_assert!(!self.blocks.is_empty());
978        // Single block fast path (common: fresh segments)
979        if self.blocks.len() == 1 {
980            return (0, doc_id);
981        }
982        // Binary search: find the last block whose cumulative_docs <= doc_id
983        let bi = self
984            .blocks
985            .partition_point(|b| b.cumulative_docs <= doc_id)
986            .saturating_sub(1);
987        (bi, doc_id - self.blocks[bi].cumulative_docs)
988    }
989
990    /// Get raw u64 value for a doc_id.
991    ///
992    /// Returns [`FAST_FIELD_MISSING`] for out-of-range doc_ids **and** for docs
993    /// that were never assigned a value (absent docs).
994    ///
995    /// For text columns, returns the global ordinal (remapped from block-local).
996    /// For multi-valued columns, returns the first value (or `FAST_FIELD_MISSING` if empty).
997    #[inline]
998    pub fn get_u64(&self, doc_id: u32) -> u64 {
999        if doc_id >= self.num_docs {
1000            return FAST_FIELD_MISSING;
1001        }
1002        let (bi, local) = self.find_block(doc_id);
1003        let block = &self.blocks[bi];
1004
1005        if self.multi {
1006            let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
1007            let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
1008            if start >= end {
1009                return FAST_FIELD_MISSING;
1010            }
1011            let raw = codec::auto_read(block.value_data.as_slice(), start as usize);
1012            return self.remap_ordinal(bi, raw);
1013        }
1014
1015        let raw = codec::auto_read(block.data.as_slice(), local as usize);
1016        self.remap_ordinal(bi, raw)
1017    }
1018
1019    /// Get the value range for a multi-valued column within its block.
1020    /// Returns (block_index, start_index, end_index) into the block's flat value array.
1021    #[inline]
1022    fn block_value_range(&self, doc_id: u32) -> (usize, u32, u32) {
1023        if !self.multi || doc_id >= self.num_docs {
1024            return (0, 0, 0);
1025        }
1026        let (bi, local) = self.find_block(doc_id);
1027        let block = &self.blocks[bi];
1028        let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
1029        let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
1030        (bi, start, end)
1031    }
1032
1033    /// Get the value range for a multi-valued column.
1034    /// Returns (start_index, end_index) — for single-block columns these are
1035    /// direct indices; for multi-block, use `get_multi_values` instead.
1036    #[inline]
1037    pub fn value_range(&self, doc_id: u32) -> (u32, u32) {
1038        let (_, start, end) = self.block_value_range(doc_id);
1039        (start, end)
1040    }
1041
1042    /// Get a specific value from the flat value array (multi-value mode).
1043    /// For single-block columns only. For multi-block, use `get_multi_values`.
1044    #[inline]
1045    pub fn get_value_at(&self, index: u32) -> u64 {
1046        // For single-block (common case), delegate directly
1047        if self.blocks.len() == 1 {
1048            let raw = codec::auto_read(self.blocks[0].value_data.as_slice(), index as usize);
1049            return self.remap_ordinal(0, raw);
1050        }
1051        // Multi-block fallback — index is block-local, caller should use get_multi_values
1052        0
1053    }
1054
1055    /// Get all values for a multi-valued doc_id. Handles multi-block correctly.
1056    pub fn get_multi_values(&self, doc_id: u32) -> Vec<u64> {
1057        let (bi, start, end) = self.block_value_range(doc_id);
1058        if start >= end {
1059            return Vec::new();
1060        }
1061        let block = &self.blocks[bi];
1062        (start..end)
1063            .map(|idx| {
1064                let raw = codec::auto_read(block.value_data.as_slice(), idx as usize);
1065                self.remap_ordinal(bi, raw)
1066            })
1067            .collect()
1068    }
1069
1070    /// Iterate multi-values for a doc, calling `f` for each. Returns true if `f` ever returns true (short-circuit).
1071    /// Handles multi-block columns correctly by finding the right block.
1072    #[inline]
1073    pub fn for_each_multi_value(&self, doc_id: u32, mut f: impl FnMut(u64) -> bool) -> bool {
1074        let (bi, start, end) = self.block_value_range(doc_id);
1075        if start >= end {
1076            return false;
1077        }
1078        let block = &self.blocks[bi];
1079        for idx in start..end {
1080            let raw = codec::auto_read(block.value_data.as_slice(), idx as usize);
1081            if f(self.remap_ordinal(bi, raw)) {
1082                return true;
1083            }
1084        }
1085        false
1086    }
1087
1088    /// Batch-scan all values in a single-value column, calling `f(doc_id, raw_value)` for each.
1089    ///
1090    /// Uses `auto_read_batch` internally (one codec dispatch per block, not per value),
1091    /// enabling compiler auto-vectorization for byte-aligned bitpacked columns.
1092    /// For text columns, returned values are global ordinals (remapped).
1093    /// For multi-value columns, use `for_each_multi_value` instead.
1094    pub fn scan_single_values(&self, mut f: impl FnMut(u32, u64)) {
1095        if self.multi {
1096            return;
1097        }
1098        const BATCH: usize = 256;
1099        let mut buf = [0u64; BATCH];
1100        let needs_remap =
1101            self.column_type == FastFieldColumnType::TextOrdinal && self.blocks.len() > 1;
1102
1103        // Pre-fetch ordinal maps once (only for multi-block text columns)
1104        let ordinal_maps = if needs_remap {
1105            Some(&self.ensure_text_state().ordinal_maps)
1106        } else {
1107            None
1108        };
1109
1110        for (block_idx, block) in self.blocks.iter().enumerate() {
1111            let n = block.num_docs as usize;
1112            let mut pos = 0;
1113
1114            let map = ordinal_maps.map(|maps| &maps[block_idx]);
1115            let has_map = map.is_some_and(|m| !m.is_empty());
1116
1117            while pos < n {
1118                let chunk = (n - pos).min(BATCH);
1119                codec::auto_read_batch(block.data.as_slice(), pos, &mut buf[..chunk]);
1120
1121                if has_map {
1122                    let map = map.unwrap();
1123                    for (i, &raw) in buf[..chunk].iter().enumerate() {
1124                        let val = if raw != FAST_FIELD_MISSING {
1125                            let idx = raw as usize;
1126                            if idx < map.len() {
1127                                map[idx] as u64
1128                            } else {
1129                                FAST_FIELD_MISSING
1130                            }
1131                        } else {
1132                            raw
1133                        };
1134                        f(block.cumulative_docs + pos as u32 + i as u32, val);
1135                    }
1136                } else {
1137                    for (i, &val) in buf[..chunk].iter().enumerate() {
1138                        f(block.cumulative_docs + pos as u32 + i as u32, val);
1139                    }
1140                }
1141                pos += chunk;
1142            }
1143        }
1144    }
1145
1146    /// Check if this doc has a value (not [`FAST_FIELD_MISSING`]).
1147    ///
1148    /// For single-value columns, checks the raw sentinel.
1149    /// For multi-value columns, checks if the offset range is non-empty.
1150    #[inline]
1151    pub fn has_value(&self, doc_id: u32) -> bool {
1152        if !self.multi {
1153            return doc_id < self.num_docs && self.get_u64(doc_id) != FAST_FIELD_MISSING;
1154        }
1155        let (_, start, end) = self.block_value_range(doc_id);
1156        start < end
1157    }
1158
1159    /// Get decoded i64 value (zigzag-decoded).
1160    ///
1161    /// Returns `i64::MIN` for absent docs (zigzag_decode of `FAST_FIELD_MISSING`).
1162    /// Use [`has_value`](Self::has_value) to distinguish absent from real values.
1163    #[inline]
1164    pub fn get_i64(&self, doc_id: u32) -> i64 {
1165        zigzag_decode(self.get_u64(doc_id))
1166    }
1167
1168    /// Get decoded f64 value (sortable-decoded).
1169    ///
1170    /// Returns `NaN` for absent docs (`sortable_u64_to_f64(FAST_FIELD_MISSING)`).
1171    /// Use [`has_value`](Self::has_value) to distinguish absent from real values.
1172    #[inline]
1173    pub fn get_f64(&self, doc_id: u32) -> f64 {
1174        sortable_u64_to_f64(self.get_u64(doc_id))
1175    }
1176
1177    /// Get the text ordinal for a doc_id. Returns FAST_FIELD_MISSING if missing.
1178    #[inline]
1179    pub fn get_ordinal(&self, doc_id: u32) -> u64 {
1180        self.get_u64(doc_id)
1181    }
1182
1183    /// Get the text string for a doc_id (looks up ordinal in block-local dictionary).
1184    /// Returns None if the doc has no value or ordinal is missing.
1185    pub fn get_text(&self, doc_id: u32) -> Option<&str> {
1186        if doc_id >= self.num_docs {
1187            return None;
1188        }
1189        let (bi, local) = self.find_block(doc_id);
1190        let block = &self.blocks[bi];
1191        let raw_ordinal = if self.multi {
1192            let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
1193            let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
1194            if start >= end {
1195                return None;
1196            }
1197            codec::auto_read(block.value_data.as_slice(), start as usize)
1198        } else {
1199            codec::auto_read(block.data.as_slice(), local as usize)
1200        };
1201        if raw_ordinal == FAST_FIELD_MISSING {
1202            return None;
1203        }
1204        block.dict.as_ref().and_then(|d| d.get(raw_ordinal as u32))
1205    }
1206
1207    /// Look up text string → global ordinal. Returns None if not found.
1208    pub fn text_ordinal(&self, text: &str) -> Option<u64> {
1209        if self.column_type != FastFieldColumnType::TextOrdinal {
1210            return None;
1211        }
1212        self.ensure_text_state().global_dict.ordinal(text)
1213    }
1214
1215    /// Access the global text dictionary reader (if this is a text column).
1216    pub fn text_dict(&self) -> Option<&TextDictReader> {
1217        if self.column_type != FastFieldColumnType::TextOrdinal {
1218            return None;
1219        }
1220        Some(&self.ensure_text_state().global_dict)
1221    }
1222
1223    /// Number of blocks in this column.
1224    pub fn num_blocks(&self) -> usize {
1225        self.blocks.len()
1226    }
1227
1228    /// Access blocks for raw stacking during merge.
1229    pub fn blocks(&self) -> &[ColumnBlock] {
1230        &self.blocks
1231    }
1232}
1233
1234// ── Text dictionary ───────────────────────────────────────────────────────
1235
1236/// Sorted dictionary for text ordinal columns.
1237///
1238/// **Zero-copy**: the dictionary data is a shared slice of the `.fast` file.
1239/// **Lazy**: the offset table is built on first access (not at load time),
1240/// avoiding mmap page faults during segment loading.
1241pub struct TextDictReader {
1242    /// The raw dictionary bytes from the `.fast` file (zero-copy).
1243    data: OwnedBytes,
1244    /// Number of entries in this dictionary.
1245    count: u32,
1246    /// Per-entry (offset, len) pairs into `data` — built lazily on first access.
1247    offsets: OnceLock<Vec<(u32, u32)>>,
1248}
1249
1250impl TextDictReader {
1251    /// Create a lazy text dictionary from pre-sliced data.
1252    /// No scanning is performed — offsets are built on first `get()`/`ordinal()` call.
1253    pub fn new_lazy(data: OwnedBytes, count: u32) -> Self {
1254        Self {
1255            data,
1256            count,
1257            offsets: OnceLock::new(),
1258        }
1259    }
1260
1261    /// Open a zero-copy text dictionary from `file_data` starting at `dict_start`.
1262    /// Scans to find the dict end position for slicing, but defers offset building.
1263    pub fn open(file_data: &OwnedBytes, dict_start: usize, count: u32) -> io::Result<Self> {
1264        if count == 0 {
1265            return Ok(Self::new_lazy(OwnedBytes::new(Vec::new()), 0));
1266        }
1267        // Scan to find end position (need to know the slice range)
1268        let dict_slice = file_data.as_slice();
1269        let mut pos = dict_start;
1270        for _ in 0..count {
1271            if pos + 4 > dict_slice.len() {
1272                return Err(io::Error::new(
1273                    io::ErrorKind::UnexpectedEof,
1274                    "text dict truncated",
1275                ));
1276            }
1277            let len = u32::from_le_bytes(dict_slice[pos..pos + 4].try_into().unwrap()) as usize;
1278            pos += 4;
1279            if pos + len > dict_slice.len() {
1280                return Err(io::Error::new(
1281                    io::ErrorKind::UnexpectedEof,
1282                    "text dict entry truncated",
1283                ));
1284            }
1285            pos += len;
1286        }
1287        let data = file_data.slice(dict_start..pos);
1288        Ok(Self::new_lazy(data, count))
1289    }
1290
1291    /// Open from raw dict bytes (already length-prefixed entries).
1292    pub fn open_from_raw(raw_dict: &OwnedBytes, count: u32) -> io::Result<Self> {
1293        Ok(Self::new_lazy(raw_dict.clone(), count))
1294    }
1295
1296    /// Build offset table lazily on first access.
1297    #[inline]
1298    fn ensure_offsets(&self) -> &[(u32, u32)] {
1299        self.offsets.get_or_init(|| {
1300            let dict_slice = self.data.as_slice();
1301            let mut pos = 0usize;
1302            let mut offsets = Vec::with_capacity(self.count as usize);
1303            for _ in 0..self.count {
1304                debug_assert!(
1305                    pos + 4 <= dict_slice.len(),
1306                    "text dict truncated during lazy init"
1307                );
1308                let len = u32::from_le_bytes(dict_slice[pos..pos + 4].try_into().unwrap()) as usize;
1309                pos += 4;
1310                debug_assert!(
1311                    pos + len <= dict_slice.len(),
1312                    "text dict entry truncated during lazy init"
1313                );
1314                offsets.push((pos as u32, len as u32));
1315                pos += len;
1316            }
1317            offsets
1318        })
1319    }
1320
1321    /// Get string by ordinal — zero-copy borrow from the underlying file data.
1322    pub fn get(&self, ordinal: u32) -> Option<&str> {
1323        let offsets = self.ensure_offsets();
1324        let &(off, len) = offsets.get(ordinal as usize)?;
1325        let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1326        Some(unsafe { std::str::from_utf8_unchecked(slice) })
1327    }
1328
1329    /// Binary search for a string → ordinal.
1330    pub fn ordinal(&self, text: &str) -> Option<u64> {
1331        let offsets = self.ensure_offsets();
1332        offsets
1333            .binary_search_by(|&(off, len)| {
1334                let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1335                let entry = unsafe { std::str::from_utf8_unchecked(slice) };
1336                entry.cmp(text)
1337            })
1338            .ok()
1339            .map(|i| i as u64)
1340    }
1341
1342    /// Number of entries in the dictionary.
1343    pub fn len(&self) -> u32 {
1344        self.count
1345    }
1346
1347    /// Whether the dictionary is empty.
1348    pub fn is_empty(&self) -> bool {
1349        self.count == 0
1350    }
1351
1352    /// Iterate all entries.
1353    pub fn iter(&self) -> impl Iterator<Item = &str> {
1354        let offsets = self.ensure_offsets();
1355        offsets.iter().map(|&(off, len)| {
1356            let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1357            unsafe { std::str::from_utf8_unchecked(slice) }
1358        })
1359    }
1360}
1361
1362// ── File-level write/read ─────────────────────────────────────────────────
1363
1364/// Write fast-field TOC + footer.
1365pub fn write_fast_field_toc_and_footer(
1366    writer: &mut dyn Write,
1367    toc_offset: u64,
1368    entries: &[FastFieldTocEntry],
1369) -> io::Result<()> {
1370    for e in entries {
1371        e.write_to(writer)?;
1372    }
1373    writer.write_u64::<LittleEndian>(toc_offset)?;
1374    writer.write_u32::<LittleEndian>(entries.len() as u32)?;
1375    writer.write_u32::<LittleEndian>(FAST_FIELD_MAGIC)?;
1376    Ok(())
1377}
1378
1379/// Read fast-field footer from the last 16 bytes.
1380/// Returns (toc_offset, num_columns).
1381pub fn read_fast_field_footer(file_data: &[u8]) -> io::Result<(u64, u32)> {
1382    let len = file_data.len();
1383    if len < FAST_FIELD_FOOTER_SIZE as usize {
1384        return Err(io::Error::new(
1385            io::ErrorKind::UnexpectedEof,
1386            "fast field file too small for footer",
1387        ));
1388    }
1389    let footer = &file_data[len - FAST_FIELD_FOOTER_SIZE as usize..];
1390    let mut cursor = std::io::Cursor::new(footer);
1391    let toc_offset = cursor.read_u64::<LittleEndian>()?;
1392    let num_columns = cursor.read_u32::<LittleEndian>()?;
1393    let magic = cursor.read_u32::<LittleEndian>()?;
1394    if magic != FAST_FIELD_MAGIC {
1395        return Err(io::Error::new(
1396            io::ErrorKind::InvalidData,
1397            format!("bad fast field magic: 0x{:08x}", magic),
1398        ));
1399    }
1400    Ok((toc_offset, num_columns))
1401}
1402
1403/// Read all TOC entries from file data (FST2 format).
1404pub fn read_fast_field_toc(
1405    file_data: &[u8],
1406    toc_offset: u64,
1407    num_columns: u32,
1408) -> io::Result<Vec<FastFieldTocEntry>> {
1409    let start = toc_offset as usize;
1410    let expected = num_columns as usize * FAST_FIELD_TOC_ENTRY_SIZE;
1411    if start + expected > file_data.len() {
1412        return Err(io::Error::new(
1413            io::ErrorKind::UnexpectedEof,
1414            "fast field TOC out of bounds",
1415        ));
1416    }
1417    let mut cursor = std::io::Cursor::new(&file_data[start..start + expected]);
1418    let mut entries = Vec::with_capacity(num_columns as usize);
1419    for _ in 0..num_columns {
1420        entries.push(FastFieldTocEntry::read_from(&mut cursor)?);
1421    }
1422    Ok(entries)
1423}
1424
1425// ── Tests ─────────────────────────────────────────────────────────────────
1426
1427#[cfg(test)]
1428mod tests {
1429    use super::*;
1430
1431    #[test]
1432    fn test_zigzag_roundtrip() {
1433        for v in [0i64, 1, -1, 42, -42, i64::MAX, i64::MIN] {
1434            assert_eq!(zigzag_decode(zigzag_encode(v)), v);
1435        }
1436    }
1437
1438    #[test]
1439    fn test_f64_sortable_roundtrip() {
1440        for v in [0.0f64, 1.0, -1.0, f64::MAX, f64::MIN, f64::MIN_POSITIVE] {
1441            assert_eq!(sortable_u64_to_f64(f64_to_sortable_u64(v)), v);
1442        }
1443    }
1444
1445    #[test]
1446    fn test_f64_sortable_order() {
1447        let values = [-100.0f64, -1.0, -0.0, 0.0, 0.5, 1.0, 100.0];
1448        let encoded: Vec<u64> = values.iter().map(|&v| f64_to_sortable_u64(v)).collect();
1449        for i in 1..encoded.len() {
1450            assert!(
1451                encoded[i] >= encoded[i - 1],
1452                "{} >= {} failed for {} vs {}",
1453                encoded[i],
1454                encoded[i - 1],
1455                values[i],
1456                values[i - 1]
1457            );
1458        }
1459    }
1460
1461    #[test]
1462    fn test_bitpack_roundtrip() {
1463        let values: Vec<u64> = vec![0, 3, 7, 15, 0, 1, 6, 12];
1464        let bpv = 4u8;
1465        let mut packed = Vec::new();
1466        bitpack_write(&values, bpv, &mut packed);
1467
1468        for (i, &expected) in values.iter().enumerate() {
1469            let got = bitpack_read(&packed, bpv, i);
1470            assert_eq!(got, expected, "index {}", i);
1471        }
1472    }
1473
1474    #[test]
1475    fn test_bitpack_high_bpv_regression() {
1476        // Regression: bpv > 56 with non-zero bit_shift used to read wrong bits
1477        // because the old 8-byte fast path didn't check bit_shift + bpv <= 64.
1478        for bpv in [57u8, 58, 59, 60, 63, 64] {
1479            let max_val = if bpv == 64 {
1480                u64::MAX
1481            } else {
1482                (1u64 << bpv) - 1
1483            };
1484            let values: Vec<u64> = (0..32)
1485                .map(|i: u64| {
1486                    if max_val == u64::MAX {
1487                        i * 7
1488                    } else {
1489                        (i * 7) % (max_val + 1)
1490                    }
1491                })
1492                .collect();
1493            let mut packed = Vec::new();
1494            bitpack_write(&values, bpv, &mut packed);
1495            for (i, &expected) in values.iter().enumerate() {
1496                let got = bitpack_read(&packed, bpv, i);
1497                assert_eq!(got, expected, "high bpv={} index={}", bpv, i);
1498            }
1499        }
1500    }
1501
1502    #[test]
1503    fn test_bitpack_various_widths() {
1504        for bpv in [1u8, 2, 3, 5, 7, 8, 13, 16, 32, 64] {
1505            let max_val = if bpv == 64 {
1506                u64::MAX
1507            } else {
1508                (1u64 << bpv) - 1
1509            };
1510            let values: Vec<u64> = (0..100)
1511                .map(|i: u64| {
1512                    if max_val == u64::MAX {
1513                        i
1514                    } else {
1515                        i % (max_val + 1)
1516                    }
1517                })
1518                .collect();
1519            let mut packed = Vec::new();
1520            bitpack_write(&values, bpv, &mut packed);
1521
1522            for (i, &expected) in values.iter().enumerate() {
1523                let got = bitpack_read(&packed, bpv, i);
1524                assert_eq!(got, expected, "bpv={} index={}", bpv, i);
1525            }
1526        }
1527    }
1528
1529    /// Helper: wrap a Vec<u8> in OwnedBytes for tests.
1530    fn owned(buf: Vec<u8>) -> OwnedBytes {
1531        OwnedBytes::new(buf)
1532    }
1533
1534    #[test]
1535    fn test_writer_reader_u64_roundtrip() {
1536        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1537        writer.add_u64(0, 100);
1538        writer.add_u64(1, 200);
1539        writer.add_u64(2, 150);
1540        writer.add_u64(4, 300); // gap at doc_id=3
1541        writer.pad_to(5);
1542
1543        let mut buf = Vec::new();
1544        let (mut toc, _bytes) = writer.serialize(&mut buf, 0).unwrap();
1545        toc.field_id = 42;
1546
1547        // Write TOC + footer
1548        let toc_offset = buf.len() as u64;
1549        write_fast_field_toc_and_footer(&mut buf, toc_offset, &[toc]).unwrap();
1550
1551        // Read back
1552        let ob = owned(buf);
1553        let (toc_off, num_cols) = read_fast_field_footer(&ob).unwrap();
1554        assert_eq!(num_cols, 1);
1555        let tocs = read_fast_field_toc(&ob, toc_off, num_cols).unwrap();
1556        assert_eq!(tocs.len(), 1);
1557        assert_eq!(tocs[0].field_id, 42);
1558
1559        let reader = FastFieldReader::open(&ob, &tocs[0]).unwrap();
1560        assert_eq!(reader.get_u64(0), 100);
1561        assert_eq!(reader.get_u64(1), 200);
1562        assert_eq!(reader.get_u64(2), 150);
1563        assert_eq!(reader.get_u64(3), FAST_FIELD_MISSING); // gap → absent sentinel
1564        assert_eq!(reader.get_u64(4), 300);
1565    }
1566
1567    #[test]
1568    fn test_writer_reader_i64_roundtrip() {
1569        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::I64);
1570        writer.add_i64(0, -100);
1571        writer.add_i64(1, 50);
1572        writer.add_i64(2, 0);
1573        writer.pad_to(3);
1574
1575        let mut buf = Vec::new();
1576        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1577        let ob = owned(buf);
1578        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1579        assert_eq!(reader.get_i64(0), -100);
1580        assert_eq!(reader.get_i64(1), 50);
1581        assert_eq!(reader.get_i64(2), 0);
1582    }
1583
1584    #[test]
1585    fn test_writer_reader_f64_roundtrip() {
1586        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::F64);
1587        writer.add_f64(0, -1.5);
1588        writer.add_f64(1, 3.15);
1589        writer.add_f64(2, 0.0);
1590        writer.pad_to(3);
1591
1592        let mut buf = Vec::new();
1593        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1594        let ob = owned(buf);
1595        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1596        assert_eq!(reader.get_f64(0), -1.5);
1597        assert_eq!(reader.get_f64(1), 3.15);
1598        assert_eq!(reader.get_f64(2), 0.0);
1599    }
1600
1601    #[test]
1602    fn test_writer_reader_text_roundtrip() {
1603        let mut writer = FastFieldWriter::new_text();
1604        writer.add_text(0, "banana");
1605        writer.add_text(1, "apple");
1606        writer.add_text(2, "cherry");
1607        writer.add_text(3, "apple"); // duplicate
1608        // doc_id=4 has no value
1609        writer.pad_to(5);
1610
1611        let mut buf = Vec::new();
1612        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1613        let ob = owned(buf);
1614        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1615
1616        // Dictionary is sorted: apple=0, banana=1, cherry=2
1617        assert_eq!(reader.get_text(0), Some("banana"));
1618        assert_eq!(reader.get_text(1), Some("apple"));
1619        assert_eq!(reader.get_text(2), Some("cherry"));
1620        assert_eq!(reader.get_text(3), Some("apple"));
1621        assert_eq!(reader.get_text(4), None); // missing
1622
1623        // Ordinal lookups
1624        assert_eq!(reader.text_ordinal("apple"), Some(0));
1625        assert_eq!(reader.text_ordinal("banana"), Some(1));
1626        assert_eq!(reader.text_ordinal("cherry"), Some(2));
1627        assert_eq!(reader.text_ordinal("durian"), None);
1628    }
1629
1630    #[test]
1631    fn test_constant_column() {
1632        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1633        for i in 0..100 {
1634            writer.add_u64(i, 42);
1635        }
1636
1637        let mut buf = Vec::new();
1638        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1639
1640        let ob = owned(buf);
1641        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1642        for i in 0..100 {
1643            assert_eq!(reader.get_u64(i), 42);
1644        }
1645    }
1646
1647    // ── Multi-value tests ──
1648
1649    #[test]
1650    fn test_multi_value_u64_roundtrip() {
1651        let mut writer = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1652        // doc 0: [10, 20, 30]
1653        writer.add_u64(0, 10);
1654        writer.add_u64(0, 20);
1655        writer.add_u64(0, 30);
1656        // doc 1: [] (empty)
1657        // doc 2: [100]
1658        writer.add_u64(2, 100);
1659        // doc 3: [5, 15]
1660        writer.add_u64(3, 5);
1661        writer.add_u64(3, 15);
1662        writer.pad_to(4);
1663
1664        let mut buf = Vec::new();
1665        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1666        assert!(toc.multi);
1667        assert_eq!(toc.num_docs, 4);
1668
1669        let ob = owned(buf);
1670        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1671        assert!(reader.multi);
1672
1673        // doc 0: first value
1674        assert_eq!(reader.get_u64(0), 10);
1675        let (s, e) = reader.value_range(0);
1676        assert_eq!(e - s, 3);
1677        assert_eq!(reader.get_value_at(s), 10);
1678        assert_eq!(reader.get_value_at(s + 1), 20);
1679        assert_eq!(reader.get_value_at(s + 2), 30);
1680
1681        // doc 1: empty → sentinel
1682        assert_eq!(reader.get_u64(1), FAST_FIELD_MISSING);
1683        let (s, e) = reader.value_range(1);
1684        assert_eq!(s, e);
1685        assert!(!reader.has_value(1));
1686
1687        // doc 2: [100]
1688        assert_eq!(reader.get_u64(2), 100);
1689        assert!(reader.has_value(2));
1690
1691        // doc 3: [5, 15]
1692        assert_eq!(reader.get_u64(3), 5);
1693        let (s, e) = reader.value_range(3);
1694        assert_eq!(e - s, 2);
1695        assert_eq!(reader.get_value_at(s), 5);
1696        assert_eq!(reader.get_value_at(s + 1), 15);
1697    }
1698
1699    #[test]
1700    fn test_multi_value_text_roundtrip() {
1701        let mut writer = FastFieldWriter::new_text_multi();
1702        // doc 0: ["banana", "apple"]
1703        writer.add_text(0, "banana");
1704        writer.add_text(0, "apple");
1705        // doc 1: ["cherry"]
1706        writer.add_text(1, "cherry");
1707        // doc 2: [] empty
1708        writer.pad_to(3);
1709
1710        let mut buf = Vec::new();
1711        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1712        assert!(toc.multi);
1713
1714        let ob = owned(buf);
1715        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1716
1717        // doc 0: first value ordinal → banana is ordinal 1 (apple=0, banana=1, cherry=2)
1718        let (s, e) = reader.value_range(0);
1719        assert_eq!(e - s, 2);
1720        let ord0 = reader.get_value_at(s);
1721        let ord1 = reader.get_value_at(s + 1);
1722        assert_eq!(reader.text_dict().unwrap().get(ord0 as u32), Some("banana"));
1723        assert_eq!(reader.text_dict().unwrap().get(ord1 as u32), Some("apple"));
1724
1725        // doc 1: cherry
1726        let (s, e) = reader.value_range(1);
1727        assert_eq!(e - s, 1);
1728        let ord = reader.get_value_at(s);
1729        assert_eq!(reader.text_dict().unwrap().get(ord as u32), Some("cherry"));
1730
1731        // doc 2: empty
1732        assert!(!reader.has_value(2));
1733    }
1734
1735    #[test]
1736    fn test_multi_value_full_toc_roundtrip() {
1737        let mut writer = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1738        writer.add_u64(0, 1);
1739        writer.add_u64(0, 2);
1740        writer.add_u64(1, 3);
1741        writer.pad_to(2);
1742
1743        let mut buf = Vec::new();
1744        let (mut toc, _) = writer.serialize(&mut buf, 0).unwrap();
1745        toc.field_id = 7;
1746
1747        let toc_offset = buf.len() as u64;
1748        write_fast_field_toc_and_footer(&mut buf, toc_offset, &[toc]).unwrap();
1749
1750        let ob = owned(buf);
1751        let (toc_off, num_cols) = read_fast_field_footer(&ob).unwrap();
1752        let tocs = read_fast_field_toc(&ob, toc_off, num_cols).unwrap();
1753        assert_eq!(tocs[0].field_id, 7);
1754        assert!(tocs[0].multi);
1755
1756        let reader = FastFieldReader::open(&ob, &tocs[0]).unwrap();
1757        assert_eq!(reader.get_u64(0), 1);
1758        assert_eq!(reader.get_u64(1), 3);
1759    }
1760
1761    /// Helper: serialize a writer into a blocked column, return (block_data, block_dict, block_index_entry)
1762    /// by stripping the blocked header.
1763    fn serialize_single_block(writer: &mut FastFieldWriter) -> (Vec<u8>, Vec<u8>, BlockIndexEntry) {
1764        let mut buf = Vec::new();
1765        let (_toc, _) = writer.serialize(&mut buf, 0).unwrap();
1766        // Strip: [num_blocks(4)] [BlockIndexEntry(16)] [data...] [dict...]
1767        let mut cursor = std::io::Cursor::new(&buf[4..4 + BLOCK_INDEX_ENTRY_SIZE]);
1768        let entry = BlockIndexEntry::read_from(&mut cursor).unwrap();
1769        let data_start = 4 + BLOCK_INDEX_ENTRY_SIZE;
1770        let data_end = data_start + entry.data_len as usize;
1771        let dict_end = data_end + entry.dict_len as usize;
1772        let data = buf[data_start..data_end].to_vec();
1773        let dict = if dict_end > data_end {
1774            buf[data_end..dict_end].to_vec()
1775        } else {
1776            Vec::new()
1777        };
1778        (data, dict, entry)
1779    }
1780
1781    /// Manually assemble a multi-block column from individual block payloads.
1782    fn assemble_blocked_column(
1783        field_id: u32,
1784        column_type: FastFieldColumnType,
1785        multi: bool,
1786        blocks: &[(u32, &[u8], u32, &[u8])], // (num_docs, data, dict_count, dict)
1787    ) -> (Vec<u8>, FastFieldTocEntry) {
1788        use byteorder::{LittleEndian, WriteBytesExt};
1789
1790        let mut buf = Vec::new();
1791        let num_blocks = blocks.len() as u32;
1792
1793        // num_blocks
1794        buf.write_u32::<LittleEndian>(num_blocks).unwrap();
1795
1796        // block index
1797        for &(num_docs, data, dict_count, dict) in blocks {
1798            let entry = BlockIndexEntry {
1799                num_docs,
1800                data_len: data.len() as u32,
1801                dict_count,
1802                dict_len: dict.len() as u32,
1803            };
1804            entry.write_to(&mut buf).unwrap();
1805        }
1806
1807        // block data + dicts
1808        let mut total_docs = 0u32;
1809        for &(num_docs, data, _, dict) in blocks {
1810            buf.extend_from_slice(data);
1811            buf.extend_from_slice(dict);
1812            total_docs += num_docs;
1813        }
1814
1815        let data_len = buf.len() as u64;
1816
1817        // Write TOC + footer
1818        let toc = FastFieldTocEntry {
1819            field_id,
1820            column_type,
1821            multi,
1822            data_offset: 0,
1823            data_len,
1824            num_docs: total_docs,
1825            dict_offset: 0,
1826            dict_count: 0,
1827        };
1828
1829        let toc_offset = buf.len() as u64;
1830        write_fast_field_toc_and_footer(&mut buf, toc_offset, std::slice::from_ref(&toc)).unwrap();
1831
1832        (buf, toc)
1833    }
1834
1835    #[test]
1836    fn test_multi_block_numeric_roundtrip() {
1837        // Block A: 3 docs [10, 20, 30]
1838        let mut wa = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1839        wa.add_u64(0, 10);
1840        wa.add_u64(1, 20);
1841        wa.add_u64(2, 30);
1842        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1843
1844        // Block B: 2 docs [40, 50]
1845        let mut wb = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1846        wb.add_u64(0, 40);
1847        wb.add_u64(1, 50);
1848        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1849
1850        let (buf, toc) = assemble_blocked_column(
1851            1,
1852            FastFieldColumnType::U64,
1853            false,
1854            &[
1855                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1856                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1857            ],
1858        );
1859
1860        let ob = owned(buf);
1861        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1862
1863        assert_eq!(reader.num_docs, 5);
1864        assert_eq!(reader.num_blocks(), 2);
1865        assert_eq!(reader.get_u64(0), 10);
1866        assert_eq!(reader.get_u64(1), 20);
1867        assert_eq!(reader.get_u64(2), 30);
1868        assert_eq!(reader.get_u64(3), 40);
1869        assert_eq!(reader.get_u64(4), 50);
1870    }
1871
1872    #[test]
1873    fn test_multi_block_text_roundtrip() {
1874        // Block A: 2 docs ["alpha", "beta"]
1875        let mut wa = FastFieldWriter::new_text();
1876        wa.add_text(0, "alpha");
1877        wa.add_text(1, "beta");
1878        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1879
1880        // Block B: 2 docs ["gamma", "alpha"]  (alpha shared with block A)
1881        let mut wb = FastFieldWriter::new_text();
1882        wb.add_text(0, "gamma");
1883        wb.add_text(1, "alpha");
1884        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1885
1886        let (buf, toc) = assemble_blocked_column(
1887            2,
1888            FastFieldColumnType::TextOrdinal,
1889            false,
1890            &[
1891                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1892                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1893            ],
1894        );
1895
1896        let ob = owned(buf);
1897        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1898
1899        assert_eq!(reader.num_docs, 4);
1900        assert_eq!(reader.num_blocks(), 2);
1901
1902        // Global dict should be: alpha(0), beta(1), gamma(2)
1903        assert_eq!(reader.text_dict().unwrap().len(), 3);
1904
1905        // Block A: alpha=local0→global0, beta=local1→global1
1906        assert_eq!(reader.get_text(0), Some("alpha"));
1907        assert_eq!(reader.get_text(1), Some("beta"));
1908
1909        // Block B: gamma=local1→global2, alpha=local0→global0
1910        assert_eq!(reader.get_text(2), Some("gamma"));
1911        assert_eq!(reader.get_text(3), Some("alpha"));
1912
1913        // Global ordinal lookups
1914        assert_eq!(reader.text_ordinal("alpha"), Some(0));
1915        assert_eq!(reader.text_ordinal("beta"), Some(1));
1916        assert_eq!(reader.text_ordinal("gamma"), Some(2));
1917
1918        // get_u64 returns global ordinals
1919        assert_eq!(reader.get_u64(0), 0); // alpha
1920        assert_eq!(reader.get_u64(1), 1); // beta
1921        assert_eq!(reader.get_u64(2), 2); // gamma
1922        assert_eq!(reader.get_u64(3), 0); // alpha
1923    }
1924
1925    /// Regression test: ordinal mismatch when blocks have disjoint dicts
1926    /// that arrive in non-sorted order.
1927    ///
1928    /// Block A has ["book","wiki"], Block B has ["apple","wiki"].
1929    /// "apple" < "book" < "wiki" alphabetically, but "book" is encountered
1930    /// first. Before the fix, insertion-order ordinals were used instead of
1931    /// sorted-position ordinals, causing text_ordinal() and get_u64() to
1932    /// disagree — wrong documents would pass fast-field predicates.
1933    #[test]
1934    fn test_multi_block_text_ordinal_mismatch_regression() {
1935        // Block A: 2 docs ["book", "wiki"]
1936        let mut wa = FastFieldWriter::new_text();
1937        wa.add_text(0, "book");
1938        wa.add_text(1, "wiki");
1939        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1940
1941        // Block B: 2 docs ["apple", "wiki"]  ("apple" < "book" alphabetically)
1942        let mut wb = FastFieldWriter::new_text();
1943        wb.add_text(0, "apple");
1944        wb.add_text(1, "wiki");
1945        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1946
1947        let (buf, toc) = assemble_blocked_column(
1948            2,
1949            FastFieldColumnType::TextOrdinal,
1950            false,
1951            &[
1952                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1953                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1954            ],
1955        );
1956
1957        let ob = owned(buf);
1958        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1959
1960        // Global dict should be sorted: apple(0), book(1), wiki(2)
1961        assert_eq!(reader.text_dict().unwrap().len(), 3);
1962        assert_eq!(reader.text_ordinal("apple"), Some(0));
1963        assert_eq!(reader.text_ordinal("book"), Some(1));
1964        assert_eq!(reader.text_ordinal("wiki"), Some(2));
1965
1966        // get_u64 must return the SAME global ordinals that text_ordinal returns
1967        assert_eq!(reader.get_u64(0), 1); // doc0 in block A = "book" → global 1
1968        assert_eq!(reader.get_u64(1), 2); // doc1 in block A = "wiki" → global 2
1969        assert_eq!(reader.get_u64(2), 0); // doc0 in block B = "apple" → global 0
1970        assert_eq!(reader.get_u64(3), 2); // doc1 in block B = "wiki" → global 2
1971
1972        // Simulate TermQuery predicate: text_ordinal("wiki") == get_u64(doc_id)
1973        let wiki_ord = reader.text_ordinal("wiki").unwrap();
1974        assert_eq!(reader.get_u64(1), wiki_ord, "wiki doc should match");
1975        assert_eq!(reader.get_u64(3), wiki_ord, "wiki doc should match");
1976        assert_ne!(reader.get_u64(0), wiki_ord, "book doc must NOT match wiki");
1977        assert_ne!(reader.get_u64(2), wiki_ord, "apple doc must NOT match wiki");
1978    }
1979
1980    #[test]
1981    fn test_multi_block_multi_value_numeric() {
1982        // Block A: doc0=[1,2], doc1=[3]
1983        let mut wa = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1984        wa.add_u64(0, 1);
1985        wa.add_u64(0, 2);
1986        wa.add_u64(1, 3);
1987        wa.pad_to(2);
1988        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1989
1990        // Block B: doc0=[4,5,6], doc1=[]
1991        let mut wb = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1992        wb.add_u64(0, 4);
1993        wb.add_u64(0, 5);
1994        wb.add_u64(0, 6);
1995        wb.pad_to(2);
1996        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1997
1998        let (buf, toc) = assemble_blocked_column(
1999            3,
2000            FastFieldColumnType::U64,
2001            true,
2002            &[
2003                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
2004                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
2005            ],
2006        );
2007
2008        let ob = owned(buf);
2009        let reader = FastFieldReader::open(&ob, &toc).unwrap();
2010
2011        assert_eq!(reader.num_docs, 4);
2012        assert_eq!(reader.num_blocks(), 2);
2013
2014        // doc0 (block A): [1, 2]
2015        assert_eq!(reader.get_multi_values(0), vec![1, 2]);
2016        // doc1 (block A): [3]
2017        assert_eq!(reader.get_multi_values(1), vec![3]);
2018        // doc2 (block B, local 0): [4, 5, 6]
2019        assert_eq!(reader.get_multi_values(2), vec![4, 5, 6]);
2020        // doc3 (block B, local 1): []
2021        assert_eq!(reader.get_multi_values(3), Vec::<u64>::new());
2022    }
2023}