Skip to main content

hermes_core/structures/fast_field/
mod.rs

1//! Fast field columnar storage for efficient filtering and sorting.
2//!
3//! Stores one column per fast-field, indexed by doc_id for O(1) access.
4//! Supports u64, i64, f64, and text (dictionary-encoded ordinal) columns.
5//! Both single-valued and multi-valued columns are supported.
6//!
7//! ## File format (`.fast` — version FST2)
8//!
9//! ```text
10//! [column 0 blocked data] [column 1 blocked data] ... [column N blocked data]
11//! [TOC: FastFieldTocEntry × num_columns]
12//! [footer: toc_offset(8) + num_columns(4) + magic(4)]  = 16 bytes
13//! ```
14//!
15//! ## Blocked column format
16//!
17//! Each column's data region is a sequence of independently-decodable blocks:
18//!
19//! ```text
20//! [num_blocks: u32]
21//! [block_index: BlockIndexEntry × num_blocks]   (16 bytes each)
22//! [block_0 data] [block_0 dict?] [block_1 data] [block_1 dict?] ...
23//! ```
24//!
25//! `BlockIndexEntry`: num_docs(4) + data_len(4) + dict_count(4) + dict_len(4)
26//!
27//! Fresh segments produce a single block. Merges stack blocks from source
28//! segments via raw byte copy (memcpy) — no per-value decode/re-encode.
29//!
30//! ## Codecs (auto-selected per block at build time)
31//!
32//! | ID | Codec           | Description                               |
33//! |----|-----------------|-------------------------------------------|
34//! |  0 | Constant        | All values identical — 0 data bytes       |
35//! |  1 | Bitpacked       | min-subtract + global bitpack             |
36//! |  2 | Linear          | Regression line + bitpacked residuals     |
37//! |  3 | BlockwiseLinear | Per-512-block linear + residuals          |
38
39pub mod codec;
40
41use std::collections::BTreeMap;
42use std::io::{self, Read, Write};
43
44use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
45
46// ── Constants ─────────────────────────────────────────────────────────────
47
48/// Magic number for `.fast` file footer — FST2 (auto-codec + multi-value)
49pub const FAST_FIELD_MAGIC: u32 = 0x32545346;
50
51/// Footer size: toc_offset(8) + num_columns(4) + magic(4) = 16
52pub const FAST_FIELD_FOOTER_SIZE: u64 = 16;
53
54/// Sentinel for missing / absent values in any fast-field column type.
55///
56/// - **Text**: document has no value → ordinal stored as `u64::MAX`
57/// - **Numeric (u64/i64/f64)**: document has no value → raw stored as `u64::MAX`
58///
59/// Callers should check `raw != FAST_FIELD_MISSING` before interpreting
60/// the value as a real number or ordinal.
61pub const FAST_FIELD_MISSING: u64 = u64::MAX;
62
63// ── Column type ───────────────────────────────────────────────────────────
64
65/// Type of a fast-field column (stored in TOC).
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67#[repr(u8)]
68pub enum FastFieldColumnType {
69    U64 = 0,
70    I64 = 1,
71    F64 = 2,
72    TextOrdinal = 3,
73}
74
75impl FastFieldColumnType {
76    pub fn from_u8(v: u8) -> Option<Self> {
77        match v {
78            0 => Some(Self::U64),
79            1 => Some(Self::I64),
80            2 => Some(Self::F64),
81            3 => Some(Self::TextOrdinal),
82            _ => None,
83        }
84    }
85}
86
87// ── Encoding helpers ──────────────────────────────────────────────────────
88
89/// Zigzag-encode an i64 to u64 (small absolute values → small u64).
90#[inline]
91pub fn zigzag_encode(v: i64) -> u64 {
92    ((v << 1) ^ (v >> 63)) as u64
93}
94
95/// Zigzag-decode a u64 back to i64.
96#[inline]
97pub fn zigzag_decode(v: u64) -> i64 {
98    ((v >> 1) as i64) ^ -((v & 1) as i64)
99}
100
101/// Encode f64 to u64 preserving total order.
102/// Positive floats: flip sign bit (so they sort above negatives).
103/// Negative floats: flip all bits (so they sort in reverse magnitude).
104#[inline]
105pub fn f64_to_sortable_u64(f: f64) -> u64 {
106    let bits = f.to_bits();
107    if (bits >> 63) == 0 {
108        bits ^ (1u64 << 63) // positive: flip sign bit
109    } else {
110        !bits // negative: flip all bits
111    }
112}
113
114/// Decode sortable u64 back to f64.
115#[inline]
116pub fn sortable_u64_to_f64(v: u64) -> f64 {
117    let bits = if (v >> 63) != 0 {
118        v ^ (1u64 << 63) // was positive: unflip sign bit
119    } else {
120        !v // was negative: unflip all bits
121    };
122    f64::from_bits(bits)
123}
124
125/// Minimum number of bits needed to represent `val`.
126#[inline]
127pub fn bits_needed_u64(val: u64) -> u8 {
128    if val == 0 {
129        0
130    } else {
131        64 - val.leading_zeros() as u8
132    }
133}
134
135// ── Bit-packing ───────────────────────────────────────────────────────────
136
137/// Pack `values` at `bits_per_value` bits each into `out`.
138/// `out` must be large enough: `ceil(values.len() * bits_per_value / 8)` bytes.
139pub fn bitpack_write(values: &[u64], bits_per_value: u8, out: &mut Vec<u8>) {
140    if bits_per_value == 0 {
141        return; // all values are the same (constant column)
142    }
143    let bpv = bits_per_value as usize;
144    let total_bits = values.len() * bpv;
145    let total_bytes = total_bits.div_ceil(8);
146    out.reserve(total_bytes);
147
148    let start = out.len();
149    out.resize(start + total_bytes, 0);
150    let buf = &mut out[start..];
151
152    for (i, &val) in values.iter().enumerate() {
153        let bit_offset = i * bpv;
154        let byte_offset = bit_offset / 8;
155        let bit_shift = bit_offset % 8;
156
157        // Write across byte boundaries (up to 9 bytes for 64-bit values)
158        let mut remaining_bits = bpv;
159        let mut v = val;
160        let mut bo = byte_offset;
161        let mut bs = bit_shift;
162
163        while remaining_bits > 0 {
164            let can_write = (8 - bs).min(remaining_bits);
165            let mask = (1u64 << can_write) - 1;
166            buf[bo] |= ((v & mask) << bs) as u8;
167            v >>= can_write;
168            remaining_bits -= can_write;
169            bo += 1;
170            bs = 0;
171        }
172    }
173}
174
175/// Read value at `index` from bit-packed data.
176///
177/// Fast path: reads a single unaligned u64 (LE) covering the target bits,
178/// shifts and masks. This compiles to ~4 instructions on x86/ARM and avoids
179/// the per-byte loop entirely for bpv ≤ 56.
180#[inline]
181pub fn bitpack_read(data: &[u8], bits_per_value: u8, index: usize) -> u64 {
182    if bits_per_value == 0 {
183        return 0;
184    }
185    let bpv = bits_per_value as usize;
186    let bit_offset = index * bpv;
187    let byte_offset = bit_offset / 8;
188    let bit_shift = bit_offset % 8;
189
190    // Fast path: single unaligned LE u64 load, shift, and mask.
191    // Valid when all needed bits fit within 8 bytes: bit_shift + bpv ≤ 64.
192    if bit_shift + bpv <= 64 && byte_offset + 8 <= data.len() {
193        let raw = u64::from_le_bytes(data[byte_offset..byte_offset + 8].try_into().unwrap());
194        let mask = if bpv >= 64 {
195            u64::MAX
196        } else {
197            (1u64 << bpv) - 1
198        };
199        return (raw >> bit_shift) & mask;
200    }
201
202    // Slow path for the last few values near the end of the buffer
203    let mut result: u64 = 0;
204    let mut remaining_bits = bpv;
205    let mut bo = byte_offset;
206    let mut bs = bit_shift;
207    let mut out_shift = 0;
208
209    while remaining_bits > 0 {
210        let can_read = (8 - bs).min(remaining_bits);
211        let mask = ((1u64 << can_read) - 1) as u8;
212        let byte_val = if bo < data.len() { data[bo] } else { 0 };
213        result |= (((byte_val >> bs) & mask) as u64) << out_shift;
214        remaining_bits -= can_read;
215        out_shift += can_read;
216        bo += 1;
217        bs = 0;
218    }
219
220    result
221}
222
223// ── TOC entry ─────────────────────────────────────────────────────────────
224
225/// On-disk TOC entry for a fast-field column (FST2 format).
226///
227/// Wire: field_id(4) + column_type(1) + flags(1) + data_offset(8) + data_len(8) +
228///       num_docs(4) + dict_offset(8) + dict_count(4) = 38 bytes
229///
230/// The `flags` byte encodes:
231///   bit 0: multi-valued column (offset+value sub-columns)
232///
233/// For multi-valued columns, the data region contains:
234///   [offset column (auto-codec)] [value column (auto-codec)]
235///   with a 4-byte length prefix for the offset column so the reader knows where
236///   the value column starts.
237#[derive(Debug, Clone)]
238pub struct FastFieldTocEntry {
239    pub field_id: u32,
240    pub column_type: FastFieldColumnType,
241    pub multi: bool,
242    pub data_offset: u64,
243    pub data_len: u64,
244    pub num_docs: u32,
245    /// Byte offset of the text dictionary section (0 for numeric columns).
246    pub dict_offset: u64,
247    /// Number of entries in the text dictionary (0 for numeric columns).
248    pub dict_count: u32,
249}
250
251/// FST2 TOC entry size: field_id(4)+column_type(1)+flags(1)+data_offset(8)+data_len(8)+num_docs(4)+dict_offset(8)+dict_count(4) = 38
252pub const FAST_FIELD_TOC_ENTRY_SIZE: usize = 4 + 1 + 1 + 8 + 8 + 4 + 8 + 4; // 38
253
254// ── Block index entry ─────────────────────────────────────────────────────
255
256/// On-disk index entry for one block within a blocked column.
257///
258/// Wire: num_docs(4) + data_len(4) + dict_count(4) + dict_len(4) = 16 bytes
259#[derive(Debug, Clone)]
260pub struct BlockIndexEntry {
261    pub num_docs: u32,
262    pub data_len: u32,
263    pub dict_count: u32,
264    pub dict_len: u32,
265}
266
267pub const BLOCK_INDEX_ENTRY_SIZE: usize = 16;
268
269impl BlockIndexEntry {
270    pub fn write_to(&self, w: &mut dyn Write) -> io::Result<()> {
271        w.write_u32::<LittleEndian>(self.num_docs)?;
272        w.write_u32::<LittleEndian>(self.data_len)?;
273        w.write_u32::<LittleEndian>(self.dict_count)?;
274        w.write_u32::<LittleEndian>(self.dict_len)?;
275        Ok(())
276    }
277
278    pub fn read_from(r: &mut dyn Read) -> io::Result<Self> {
279        let num_docs = r.read_u32::<LittleEndian>()?;
280        let data_len = r.read_u32::<LittleEndian>()?;
281        let dict_count = r.read_u32::<LittleEndian>()?;
282        let dict_len = r.read_u32::<LittleEndian>()?;
283        Ok(Self {
284            num_docs,
285            data_len,
286            dict_count,
287            dict_len,
288        })
289    }
290}
291
292impl FastFieldTocEntry {
293    pub fn write_to(&self, w: &mut dyn Write) -> io::Result<()> {
294        w.write_u32::<LittleEndian>(self.field_id)?;
295        w.write_u8(self.column_type as u8)?;
296        let flags: u8 = if self.multi { 1 } else { 0 };
297        w.write_u8(flags)?;
298        w.write_u64::<LittleEndian>(self.data_offset)?;
299        w.write_u64::<LittleEndian>(self.data_len)?;
300        w.write_u32::<LittleEndian>(self.num_docs)?;
301        w.write_u64::<LittleEndian>(self.dict_offset)?;
302        w.write_u32::<LittleEndian>(self.dict_count)?;
303        Ok(())
304    }
305
306    pub fn read_from(r: &mut dyn Read) -> io::Result<Self> {
307        let field_id = r.read_u32::<LittleEndian>()?;
308        let ct = r.read_u8()?;
309        let column_type = FastFieldColumnType::from_u8(ct)
310            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad column type"))?;
311        let flags = r.read_u8()?;
312        let multi = (flags & 1) != 0;
313        let data_offset = r.read_u64::<LittleEndian>()?;
314        let data_len = r.read_u64::<LittleEndian>()?;
315        let num_docs = r.read_u32::<LittleEndian>()?;
316        let dict_offset = r.read_u64::<LittleEndian>()?;
317        let dict_count = r.read_u32::<LittleEndian>()?;
318        Ok(Self {
319            field_id,
320            column_type,
321            multi,
322            data_offset,
323            data_len,
324            num_docs,
325            dict_offset,
326            dict_count,
327        })
328    }
329}
330
331// ── Writer ────────────────────────────────────────────────────────────────
332
333/// Collects values during indexing and serializes a single fast-field column.
334///
335/// Supports both single-valued and multi-valued columns.
336/// For multi-valued columns, values are stored in a flat array with an
337/// offset column that maps doc_id → value range.
338pub struct FastFieldWriter {
339    pub column_type: FastFieldColumnType,
340    /// Whether this is a multi-valued column.
341    pub multi: bool,
342
343    // ── Single-valued state ──
344    /// Raw u64 values indexed by local doc_id (single-value mode).
345    values: Vec<u64>,
346
347    // ── Multi-valued state ──
348    /// Flat list of all values (multi-value mode).
349    multi_values: Vec<u64>,
350    /// Per-doc cumulative offset into `multi_values`. Length = num_docs + 1.
351    /// offsets[doc_id]..offsets[doc_id+1] is the value range for doc_id.
352    multi_offsets: Vec<u32>,
353    /// Current doc_id being filled (for multi-value sequential writes).
354    multi_current_doc: u32,
355
356    // ── Text state (shared) ──
357    /// For TextOrdinal: maps original string → insertion order.
358    text_values: Option<BTreeMap<String, u32>>,
359    /// For TextOrdinal single-value: per-doc string values (parallel to `values`).
360    text_per_doc: Option<Vec<Option<String>>>,
361    /// For TextOrdinal multi-value: per-value strings (parallel to `multi_values`).
362    text_multi_values: Option<Vec<String>>,
363}
364
365impl FastFieldWriter {
366    /// Create a writer for a single-valued numeric column (u64/i64/f64).
367    pub fn new_numeric(column_type: FastFieldColumnType) -> Self {
368        debug_assert!(matches!(
369            column_type,
370            FastFieldColumnType::U64 | FastFieldColumnType::I64 | FastFieldColumnType::F64
371        ));
372        Self {
373            column_type,
374            multi: false,
375            values: Vec::new(),
376            multi_values: Vec::new(),
377            multi_offsets: vec![0],
378            multi_current_doc: 0,
379            text_values: None,
380            text_per_doc: None,
381            text_multi_values: None,
382        }
383    }
384
385    /// Create a writer for a multi-valued numeric column.
386    pub fn new_numeric_multi(column_type: FastFieldColumnType) -> Self {
387        debug_assert!(matches!(
388            column_type,
389            FastFieldColumnType::U64 | FastFieldColumnType::I64 | FastFieldColumnType::F64
390        ));
391        Self {
392            column_type,
393            multi: true,
394            values: Vec::new(),
395            multi_values: Vec::new(),
396            multi_offsets: vec![0],
397            multi_current_doc: 0,
398            text_values: None,
399            text_per_doc: None,
400            text_multi_values: None,
401        }
402    }
403
404    /// Create a writer for a single-valued text ordinal column.
405    pub fn new_text() -> Self {
406        Self {
407            column_type: FastFieldColumnType::TextOrdinal,
408            multi: false,
409            values: Vec::new(),
410            multi_values: Vec::new(),
411            multi_offsets: vec![0],
412            multi_current_doc: 0,
413            text_values: Some(BTreeMap::new()),
414            text_per_doc: Some(Vec::new()),
415            text_multi_values: None,
416        }
417    }
418
419    /// Create a writer for a multi-valued text ordinal column.
420    pub fn new_text_multi() -> Self {
421        Self {
422            column_type: FastFieldColumnType::TextOrdinal,
423            multi: true,
424            values: Vec::new(),
425            multi_values: Vec::new(),
426            multi_offsets: vec![0],
427            multi_current_doc: 0,
428            text_values: Some(BTreeMap::new()),
429            text_per_doc: None,
430            text_multi_values: Some(Vec::new()),
431        }
432    }
433
434    /// Record a numeric value for `doc_id`. Fills gaps with 0.
435    /// For single-value mode only.
436    pub fn add_u64(&mut self, doc_id: u32, value: u64) {
437        if self.multi {
438            self.add_multi_u64(doc_id, value);
439            return;
440        }
441        let idx = doc_id as usize;
442        if idx >= self.values.len() {
443            self.values.resize(idx + 1, FAST_FIELD_MISSING);
444            if let Some(ref mut tpd) = self.text_per_doc {
445                tpd.resize(idx + 1, None);
446            }
447        }
448        self.values[idx] = value;
449    }
450
451    /// Record a value in multi-value mode.
452    fn add_multi_u64(&mut self, doc_id: u32, value: u64) {
453        // Pad offsets for any skipped doc_ids
454        while self.multi_current_doc < doc_id {
455            self.multi_current_doc += 1;
456            self.multi_offsets.push(self.multi_values.len() as u32);
457        }
458        // Ensure offset exists for current doc
459        if self.multi_current_doc == doc_id && self.multi_offsets.len() == doc_id as usize + 1 {
460            // offset for doc_id already exists as the last entry
461        }
462        self.multi_values.push(value);
463    }
464
465    /// Record an i64 value (zigzag-encoded).
466    pub fn add_i64(&mut self, doc_id: u32, value: i64) {
467        self.add_u64(doc_id, zigzag_encode(value));
468    }
469
470    /// Record an f64 value (sortable-encoded).
471    pub fn add_f64(&mut self, doc_id: u32, value: f64) {
472        self.add_u64(doc_id, f64_to_sortable_u64(value));
473    }
474
475    /// Record a text value (dictionary-encoded at build time).
476    pub fn add_text(&mut self, doc_id: u32, value: &str) {
477        if let Some(ref mut dict) = self.text_values {
478            let next_id = dict.len() as u32;
479            dict.entry(value.to_string()).or_insert(next_id);
480        }
481
482        if self.multi {
483            if let Some(ref mut tmv) = self.text_multi_values {
484                // Pad offsets for skipped docs
485                while self.multi_current_doc < doc_id {
486                    self.multi_current_doc += 1;
487                    self.multi_offsets.push(self.multi_values.len() as u32);
488                }
489                if self.multi_current_doc == doc_id
490                    && self.multi_offsets.len() == doc_id as usize + 1
491                {
492                    // offset already exists
493                }
494                self.multi_values.push(0); // placeholder, resolved later
495                tmv.push(value.to_string());
496            }
497        } else {
498            let idx = doc_id as usize;
499            if idx >= self.values.len() {
500                self.values.resize(idx + 1, FAST_FIELD_MISSING);
501            }
502            if let Some(ref mut tpd) = self.text_per_doc {
503                if idx >= tpd.len() {
504                    tpd.resize(idx + 1, None);
505                }
506                tpd[idx] = Some(value.to_string());
507            }
508        }
509    }
510
511    /// Ensure the column covers `num_docs` entries.
512    ///
513    /// Absent entries are filled with [`FAST_FIELD_MISSING`] for single-value
514    /// columns, or with empty offset ranges for multi-value columns.
515    pub fn pad_to(&mut self, num_docs: u32) {
516        let n = num_docs as usize;
517        if self.multi {
518            while (self.multi_offsets.len() as u32) <= num_docs {
519                self.multi_offsets.push(self.multi_values.len() as u32);
520            }
521            self.multi_current_doc = num_docs;
522        } else {
523            if self.values.len() < n {
524                self.values.resize(n, FAST_FIELD_MISSING);
525                if let Some(ref mut tpd) = self.text_per_doc {
526                    tpd.resize(n, None);
527                }
528            }
529        }
530    }
531
532    /// Number of documents in this column.
533    pub fn num_docs(&self) -> u32 {
534        if self.multi {
535            // offsets has num_docs+1 entries
536            (self.multi_offsets.len() as u32).saturating_sub(1)
537        } else {
538            self.values.len() as u32
539        }
540    }
541
542    /// Serialize column data using blocked format with auto-selecting codec.
543    ///
544    /// Writes a single block: [num_blocks(4)] [BlockIndexEntry] [block_data] [block_dict?]
545    /// Returns `(toc_entry, total_bytes_written)`.
546    pub fn serialize(
547        &mut self,
548        writer: &mut dyn Write,
549        data_offset: u64,
550    ) -> io::Result<(FastFieldTocEntry, u64)> {
551        // For text ordinal: resolve strings to sorted ordinals
552        if self.column_type == FastFieldColumnType::TextOrdinal {
553            self.resolve_text_ordinals();
554        }
555
556        let num_docs = self.num_docs();
557
558        // Serialize block data into a temp buffer to measure lengths
559        let mut block_data = Vec::new();
560        if self.multi {
561            // Multi-value: write [offset_col_len(4)] [offset_col] [value_col]
562            let offsets_u64: Vec<u64> = self.multi_offsets.iter().map(|&v| v as u64).collect();
563            let mut offset_buf = Vec::new();
564            codec::serialize_auto(&offsets_u64, &mut offset_buf)?;
565
566            block_data.write_u32::<LittleEndian>(offset_buf.len() as u32)?;
567            block_data.write_all(&offset_buf)?;
568
569            codec::serialize_auto(&self.multi_values, &mut block_data)?;
570        } else {
571            codec::serialize_auto(&self.values, &mut block_data)?;
572        }
573
574        // Serialize text dictionary into temp buffer
575        let mut dict_buf = Vec::new();
576        let dict_count = if self.column_type == FastFieldColumnType::TextOrdinal {
577            let (count, _) = self.write_text_dictionary(&mut dict_buf)?;
578            count
579        } else {
580            0u32
581        };
582
583        // Build block index entry
584        let block_entry = BlockIndexEntry {
585            num_docs,
586            data_len: block_data.len() as u32,
587            dict_count,
588            dict_len: dict_buf.len() as u32,
589        };
590
591        // Write: num_blocks + block_index + block_data + block_dict
592        let mut total_bytes = 0u64;
593
594        writer.write_u32::<LittleEndian>(1u32)?; // num_blocks
595        total_bytes += 4;
596
597        block_entry.write_to(writer)?;
598        total_bytes += BLOCK_INDEX_ENTRY_SIZE as u64;
599
600        writer.write_all(&block_data)?;
601        total_bytes += block_data.len() as u64;
602
603        writer.write_all(&dict_buf)?;
604        total_bytes += dict_buf.len() as u64;
605
606        let toc = FastFieldTocEntry {
607            field_id: 0, // set by caller
608            column_type: self.column_type,
609            multi: self.multi,
610            data_offset,
611            data_len: total_bytes,
612            num_docs,
613            dict_offset: 0, // no longer used at TOC level (per-block dicts)
614            dict_count: 0,
615        };
616
617        Ok((toc, total_bytes))
618    }
619
620    /// Resolve text per-doc values to sorted ordinals.
621    fn resolve_text_ordinals(&mut self) {
622        let dict = self.text_values.as_ref().expect("text_values required");
623
624        // Build sorted ordinal map: BTreeMap iterates in sorted order
625        let sorted_ordinals: BTreeMap<&str, u64> = dict
626            .keys()
627            .enumerate()
628            .map(|(ord, key)| (key.as_str(), ord as u64))
629            .collect();
630
631        if self.multi {
632            // Multi-value: resolve multi_values via text_multi_values
633            if let Some(ref tmv) = self.text_multi_values {
634                for (i, text) in tmv.iter().enumerate() {
635                    self.multi_values[i] = sorted_ordinals[text.as_str()];
636                }
637            }
638        } else {
639            // Single-value: resolve values via text_per_doc
640            let tpd = self.text_per_doc.as_ref().expect("text_per_doc required");
641            for (i, doc_text) in tpd.iter().enumerate() {
642                match doc_text {
643                    Some(text) => {
644                        self.values[i] = sorted_ordinals[text.as_str()];
645                    }
646                    None => {
647                        self.values[i] = FAST_FIELD_MISSING;
648                    }
649                }
650            }
651        }
652    }
653
654    /// Write len-prefixed sorted strings. Returns (dict_count, bytes_written).
655    fn write_text_dictionary(&self, writer: &mut dyn Write) -> io::Result<(u32, u64)> {
656        let dict = self.text_values.as_ref().expect("text_values required");
657        let mut bytes_written = 0u64;
658
659        // BTreeMap keys are already sorted
660        let count = dict.len() as u32;
661        for key in dict.keys() {
662            let key_bytes = key.as_bytes();
663            writer.write_u32::<LittleEndian>(key_bytes.len() as u32)?;
664            writer.write_all(key_bytes)?;
665            bytes_written += 4 + key_bytes.len() as u64;
666        }
667
668        Ok((count, bytes_written))
669    }
670}
671
672// ── Reader ────────────────────────────────────────────────────────────────
673
674use crate::directories::OwnedBytes;
675
676/// One independently-decodable block within a blocked column.
677///
678/// All byte slices are zero-copy borrows from the mmap'd `.fast` file.
679pub struct ColumnBlock {
680    /// Number of docs before this block (for doc_id → block lookup).
681    pub cumulative_docs: u32,
682    /// Number of docs in this block.
683    pub num_docs: u32,
684    /// Auto-codec encoded data for this block (single-value or raw multi-value region).
685    pub data: OwnedBytes,
686    /// For multi-value blocks: offset sub-column.
687    pub offset_data: OwnedBytes,
688    /// For multi-value blocks: value sub-column.
689    pub value_data: OwnedBytes,
690    /// Per-block text dictionary (text columns only).
691    pub dict: Option<TextDictReader>,
692    /// Block-local ordinal → global ordinal mapping (text columns with >1 block).
693    pub ordinal_map: Vec<u32>,
694    /// Raw dictionary bytes for this block (for merge: memcpy).
695    pub raw_dict: OwnedBytes,
696}
697
698/// Reads a single fast-field column from mmap/buffer with O(1) doc_id access.
699///
700/// A column is a sequence of independently-decodable blocks. Fresh segments
701/// have one block; merged segments may have multiple (one per source segment).
702///
703/// **Zero-copy**: all data is borrowed from the underlying mmap / `OwnedBytes`.
704pub struct FastFieldReader {
705    pub column_type: FastFieldColumnType,
706    pub num_docs: u32,
707    pub multi: bool,
708
709    /// Blocks in doc_id order.
710    blocks: Vec<ColumnBlock>,
711
712    /// Global merged text dictionary (for text_ordinal lookups and filters).
713    /// Built at open time by merging per-block dictionaries.
714    global_text_dict: Option<TextDictReader>,
715}
716
717impl FastFieldReader {
718    /// Open a blocked column from an `OwnedBytes` file buffer using a TOC entry.
719    pub fn open(file_data: &OwnedBytes, toc: &FastFieldTocEntry) -> io::Result<Self> {
720        let region_start = toc.data_offset as usize;
721        let region_end = region_start + toc.data_len as usize;
722
723        if region_end > file_data.len() {
724            return Err(io::Error::new(
725                io::ErrorKind::UnexpectedEof,
726                "fast field data out of bounds",
727            ));
728        }
729
730        let raw = file_data.as_slice();
731
732        // Read num_blocks
733        let mut pos = region_start;
734        if pos + 4 > region_end {
735            return Err(io::Error::new(
736                io::ErrorKind::UnexpectedEof,
737                "fast field: missing num_blocks",
738            ));
739        }
740        let num_blocks = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
741        pos += 4;
742
743        // Read block index
744        let idx_size = num_blocks as usize * BLOCK_INDEX_ENTRY_SIZE;
745        if pos + idx_size > region_end {
746            return Err(io::Error::new(
747                io::ErrorKind::UnexpectedEof,
748                "fast field: block index truncated",
749            ));
750        }
751        let mut block_entries = Vec::with_capacity(num_blocks as usize);
752        {
753            let mut cursor = std::io::Cursor::new(&raw[pos..pos + idx_size]);
754            for _ in 0..num_blocks {
755                block_entries.push(BlockIndexEntry::read_from(&mut cursor)?);
756            }
757        }
758        pos += idx_size;
759
760        let empty = OwnedBytes::new(Vec::new());
761
762        // Parse each block's data + dict slices
763        let mut blocks = Vec::with_capacity(num_blocks as usize);
764        let mut cumulative = 0u32;
765
766        for entry in &block_entries {
767            let data_start = pos;
768            let data_end = data_start + entry.data_len as usize;
769            let dict_start = data_end;
770            let dict_end = dict_start + entry.dict_len as usize;
771
772            if dict_end > file_data.len() {
773                return Err(io::Error::new(
774                    io::ErrorKind::UnexpectedEof,
775                    "fast field: block data/dict truncated",
776                ));
777            }
778
779            // Parse multi-value sub-columns from block data
780            let (block_data, offset_data, value_data) = if toc.multi {
781                let block_raw = &raw[data_start..data_end];
782                if block_raw.len() < 4 {
783                    (empty.clone(), empty.clone(), empty.clone())
784                } else {
785                    let offset_col_len =
786                        u32::from_le_bytes(block_raw[0..4].try_into().unwrap()) as usize;
787                    let o_start = data_start + 4;
788                    let o_end = o_start + offset_col_len;
789                    let v_start = o_end;
790                    let v_end = data_end;
791                    (
792                        file_data.slice(data_start..data_end),
793                        file_data.slice(o_start..o_end),
794                        file_data.slice(v_start..v_end),
795                    )
796                }
797            } else {
798                (
799                    file_data.slice(data_start..data_end),
800                    empty.clone(),
801                    empty.clone(),
802                )
803            };
804
805            // Parse block dict
806            let dict = if entry.dict_count > 0 {
807                Some(TextDictReader::open(
808                    file_data,
809                    dict_start,
810                    entry.dict_count,
811                )?)
812            } else {
813                None
814            };
815
816            let raw_dict = if entry.dict_len > 0 {
817                file_data.slice(dict_start..dict_end)
818            } else {
819                empty.clone()
820            };
821
822            blocks.push(ColumnBlock {
823                cumulative_docs: cumulative,
824                num_docs: entry.num_docs,
825                data: block_data,
826                offset_data,
827                value_data,
828                dict,
829                ordinal_map: Vec::new(),
830                raw_dict,
831            });
832
833            cumulative += entry.num_docs;
834            pos = dict_end;
835        }
836
837        // Build global text dictionary for multi-block text columns
838        let global_text_dict = if toc.column_type == FastFieldColumnType::TextOrdinal {
839            Some(Self::build_global_text_dict(&mut blocks)?)
840        } else {
841            None
842        };
843
844        Ok(Self {
845            column_type: toc.column_type,
846            num_docs: toc.num_docs,
847            multi: toc.multi,
848            blocks,
849            global_text_dict,
850        })
851    }
852
853    /// Build a global merged dictionary from all per-block dictionaries.
854    /// Also populates `ordinal_map` on each block (block-local → global ordinal).
855    fn build_global_text_dict(blocks: &mut [ColumnBlock]) -> io::Result<TextDictReader> {
856        use std::collections::BTreeSet;
857
858        // Collect all unique strings from all blocks
859        let mut all_strings = BTreeSet::new();
860        for block in blocks.iter() {
861            if let Some(ref dict) = block.dict {
862                for s in dict.iter() {
863                    all_strings.insert(s.to_string());
864                }
865            }
866        }
867
868        // Build global sorted list
869        let global_sorted: Vec<String> = all_strings.into_iter().collect();
870
871        // Build per-block ordinal maps: block-local ordinal → global ordinal
872        for block in blocks.iter_mut() {
873            if let Some(ref dict) = block.dict {
874                let mut map = Vec::with_capacity(dict.len() as usize);
875                for local_ord in 0..dict.len() {
876                    let text = dict.get(local_ord).ok_or_else(|| {
877                        io::Error::new(
878                            io::ErrorKind::InvalidData,
879                            format!(
880                                "block dict ordinal {} out of range (dict len {})",
881                                local_ord,
882                                dict.len()
883                            ),
884                        )
885                    })?;
886                    let global_ord = global_sorted
887                        .binary_search_by(|s| s.as_str().cmp(text))
888                        .map_err(|_| {
889                            io::Error::new(
890                                io::ErrorKind::InvalidData,
891                                format!(
892                                    "block dict entry {:?} not found in merged global dict",
893                                    text
894                                ),
895                            )
896                        })? as u32;
897                    map.push(global_ord);
898                }
899                block.ordinal_map = map;
900            }
901        }
902
903        // Serialize global dict into a buffer and create a TextDictReader
904        let mut dict_buf = Vec::new();
905        for s in &global_sorted {
906            let bytes = s.as_bytes();
907            dict_buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
908            dict_buf.extend_from_slice(bytes);
909        }
910        let dict_data = OwnedBytes::new(dict_buf);
911        TextDictReader::open(&dict_data, 0, global_sorted.len() as u32)
912    }
913
914    /// Remap a block-local raw ordinal to a global ordinal using the block's ordinal_map.
915    /// Returns raw unchanged for non-text columns, missing ordinals, or if no map exists.
916    #[inline]
917    fn remap_ordinal(&self, block: &ColumnBlock, raw: u64) -> u64 {
918        if self.column_type == FastFieldColumnType::TextOrdinal
919            && raw != FAST_FIELD_MISSING
920            && !block.ordinal_map.is_empty()
921        {
922            let idx = raw as usize;
923            if idx < block.ordinal_map.len() {
924                block.ordinal_map[idx] as u64
925            } else {
926                // Corrupt ordinal — treat as missing
927                FAST_FIELD_MISSING
928            }
929        } else {
930            raw
931        }
932    }
933
934    /// Find the block containing `doc_id`. Returns (block_index, local_doc_id).
935    #[inline]
936    fn find_block(&self, doc_id: u32) -> (usize, u32) {
937        debug_assert!(!self.blocks.is_empty());
938        // Single block fast path (common: fresh segments)
939        if self.blocks.len() == 1 {
940            return (0, doc_id);
941        }
942        // Binary search: find the last block whose cumulative_docs <= doc_id
943        let bi = self
944            .blocks
945            .partition_point(|b| b.cumulative_docs <= doc_id)
946            .saturating_sub(1);
947        (bi, doc_id - self.blocks[bi].cumulative_docs)
948    }
949
950    /// Get raw u64 value for a doc_id.
951    ///
952    /// Returns [`FAST_FIELD_MISSING`] for out-of-range doc_ids **and** for docs
953    /// that were never assigned a value (absent docs).
954    ///
955    /// For text columns, returns the global ordinal (remapped from block-local).
956    /// For multi-valued columns, returns the first value (or `FAST_FIELD_MISSING` if empty).
957    #[inline]
958    pub fn get_u64(&self, doc_id: u32) -> u64 {
959        if doc_id >= self.num_docs {
960            return FAST_FIELD_MISSING;
961        }
962        let (bi, local) = self.find_block(doc_id);
963        let block = &self.blocks[bi];
964
965        if self.multi {
966            let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
967            let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
968            if start >= end {
969                return FAST_FIELD_MISSING;
970            }
971            let raw = codec::auto_read(block.value_data.as_slice(), start as usize);
972            return self.remap_ordinal(block, raw);
973        }
974
975        let raw = codec::auto_read(block.data.as_slice(), local as usize);
976        self.remap_ordinal(block, raw)
977    }
978
979    /// Get the value range for a multi-valued column within its block.
980    /// Returns (block_index, start_index, end_index) into the block's flat value array.
981    #[inline]
982    fn block_value_range(&self, doc_id: u32) -> (usize, u32, u32) {
983        if !self.multi || doc_id >= self.num_docs {
984            return (0, 0, 0);
985        }
986        let (bi, local) = self.find_block(doc_id);
987        let block = &self.blocks[bi];
988        let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
989        let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
990        (bi, start, end)
991    }
992
993    /// Get the value range for a multi-valued column.
994    /// Returns (start_index, end_index) — for single-block columns these are
995    /// direct indices; for multi-block, use `get_multi_values` instead.
996    #[inline]
997    pub fn value_range(&self, doc_id: u32) -> (u32, u32) {
998        let (_, start, end) = self.block_value_range(doc_id);
999        (start, end)
1000    }
1001
1002    /// Get a specific value from the flat value array (multi-value mode).
1003    /// For single-block columns only. For multi-block, use `get_multi_values`.
1004    #[inline]
1005    pub fn get_value_at(&self, index: u32) -> u64 {
1006        // For single-block (common case), delegate directly
1007        if self.blocks.len() == 1 {
1008            let raw = codec::auto_read(self.blocks[0].value_data.as_slice(), index as usize);
1009            return self.remap_ordinal(&self.blocks[0], raw);
1010        }
1011        // Multi-block fallback — index is block-local, caller should use get_multi_values
1012        0
1013    }
1014
1015    /// Get all values for a multi-valued doc_id. Handles multi-block correctly.
1016    pub fn get_multi_values(&self, doc_id: u32) -> Vec<u64> {
1017        let (bi, start, end) = self.block_value_range(doc_id);
1018        if start >= end {
1019            return Vec::new();
1020        }
1021        let block = &self.blocks[bi];
1022        (start..end)
1023            .map(|idx| {
1024                let raw = codec::auto_read(block.value_data.as_slice(), idx as usize);
1025                self.remap_ordinal(block, raw)
1026            })
1027            .collect()
1028    }
1029
1030    /// Iterate multi-values for a doc, calling `f` for each. Returns true if `f` ever returns true (short-circuit).
1031    /// Handles multi-block columns correctly by finding the right block.
1032    #[inline]
1033    pub fn for_each_multi_value(&self, doc_id: u32, mut f: impl FnMut(u64) -> bool) -> bool {
1034        let (bi, start, end) = self.block_value_range(doc_id);
1035        if start >= end {
1036            return false;
1037        }
1038        let block = &self.blocks[bi];
1039        for idx in start..end {
1040            let raw = codec::auto_read(block.value_data.as_slice(), idx as usize);
1041            if f(self.remap_ordinal(block, raw)) {
1042                return true;
1043            }
1044        }
1045        false
1046    }
1047
1048    /// Batch-scan all values in a single-value column, calling `f(doc_id, raw_value)` for each.
1049    ///
1050    /// Uses `auto_read_batch` internally (one codec dispatch per block, not per value),
1051    /// enabling compiler auto-vectorization for byte-aligned bitpacked columns.
1052    /// For text columns, returned values are global ordinals (remapped).
1053    /// For multi-value columns, use `for_each_multi_value` instead.
1054    pub fn scan_single_values(&self, mut f: impl FnMut(u32, u64)) {
1055        if self.multi {
1056            return;
1057        }
1058        const BATCH: usize = 256;
1059        let mut buf = [0u64; BATCH];
1060        let needs_remap = self.column_type == FastFieldColumnType::TextOrdinal;
1061
1062        for block in &self.blocks {
1063            let n = block.num_docs as usize;
1064            let mut pos = 0;
1065            while pos < n {
1066                let chunk = (n - pos).min(BATCH);
1067                codec::auto_read_batch(block.data.as_slice(), pos, &mut buf[..chunk]);
1068
1069                if needs_remap && !block.ordinal_map.is_empty() {
1070                    for (i, &raw) in buf[..chunk].iter().enumerate() {
1071                        let val = if raw != FAST_FIELD_MISSING {
1072                            let idx = raw as usize;
1073                            if idx < block.ordinal_map.len() {
1074                                block.ordinal_map[idx] as u64
1075                            } else {
1076                                FAST_FIELD_MISSING
1077                            }
1078                        } else {
1079                            raw
1080                        };
1081                        f(block.cumulative_docs + pos as u32 + i as u32, val);
1082                    }
1083                } else {
1084                    for (i, &val) in buf[..chunk].iter().enumerate() {
1085                        f(block.cumulative_docs + pos as u32 + i as u32, val);
1086                    }
1087                }
1088                pos += chunk;
1089            }
1090        }
1091    }
1092
1093    /// Check if this doc has a value (not [`FAST_FIELD_MISSING`]).
1094    ///
1095    /// For single-value columns, checks the raw sentinel.
1096    /// For multi-value columns, checks if the offset range is non-empty.
1097    #[inline]
1098    pub fn has_value(&self, doc_id: u32) -> bool {
1099        if !self.multi {
1100            return doc_id < self.num_docs && self.get_u64(doc_id) != FAST_FIELD_MISSING;
1101        }
1102        let (_, start, end) = self.block_value_range(doc_id);
1103        start < end
1104    }
1105
1106    /// Get decoded i64 value (zigzag-decoded).
1107    ///
1108    /// Returns `i64::MIN` for absent docs (zigzag_decode of `FAST_FIELD_MISSING`).
1109    /// Use [`has_value`](Self::has_value) to distinguish absent from real values.
1110    #[inline]
1111    pub fn get_i64(&self, doc_id: u32) -> i64 {
1112        zigzag_decode(self.get_u64(doc_id))
1113    }
1114
1115    /// Get decoded f64 value (sortable-decoded).
1116    ///
1117    /// Returns `NaN` for absent docs (`sortable_u64_to_f64(FAST_FIELD_MISSING)`).
1118    /// Use [`has_value`](Self::has_value) to distinguish absent from real values.
1119    #[inline]
1120    pub fn get_f64(&self, doc_id: u32) -> f64 {
1121        sortable_u64_to_f64(self.get_u64(doc_id))
1122    }
1123
1124    /// Get the text ordinal for a doc_id. Returns FAST_FIELD_MISSING if missing.
1125    #[inline]
1126    pub fn get_ordinal(&self, doc_id: u32) -> u64 {
1127        self.get_u64(doc_id)
1128    }
1129
1130    /// Get the text string for a doc_id (looks up ordinal in block-local dictionary).
1131    /// Returns None if the doc has no value or ordinal is missing.
1132    pub fn get_text(&self, doc_id: u32) -> Option<&str> {
1133        if doc_id >= self.num_docs {
1134            return None;
1135        }
1136        let (bi, local) = self.find_block(doc_id);
1137        let block = &self.blocks[bi];
1138        let raw_ordinal = if self.multi {
1139            let start = codec::auto_read(block.offset_data.as_slice(), local as usize) as u32;
1140            let end = codec::auto_read(block.offset_data.as_slice(), local as usize + 1) as u32;
1141            if start >= end {
1142                return None;
1143            }
1144            codec::auto_read(block.value_data.as_slice(), start as usize)
1145        } else {
1146            codec::auto_read(block.data.as_slice(), local as usize)
1147        };
1148        if raw_ordinal == FAST_FIELD_MISSING {
1149            return None;
1150        }
1151        block.dict.as_ref().and_then(|d| d.get(raw_ordinal as u32))
1152    }
1153
1154    /// Look up text string → global ordinal. Returns None if not found.
1155    pub fn text_ordinal(&self, text: &str) -> Option<u64> {
1156        self.global_text_dict.as_ref().and_then(|d| d.ordinal(text))
1157    }
1158
1159    /// Access the global text dictionary reader (if this is a text column).
1160    pub fn text_dict(&self) -> Option<&TextDictReader> {
1161        self.global_text_dict.as_ref()
1162    }
1163
1164    /// Number of blocks in this column.
1165    pub fn num_blocks(&self) -> usize {
1166        self.blocks.len()
1167    }
1168
1169    /// Access blocks for raw stacking during merge.
1170    pub fn blocks(&self) -> &[ColumnBlock] {
1171        &self.blocks
1172    }
1173}
1174
1175// ── Text dictionary ───────────────────────────────────────────────────────
1176
1177/// Sorted dictionary for text ordinal columns.
1178///
1179/// **Zero-copy**: the dictionary data is a shared slice of the `.fast` file.
1180/// An offset table (one u32 per entry) is built at open time so that
1181/// individual strings can be accessed without scanning.
1182pub struct TextDictReader {
1183    /// The raw dictionary bytes from the `.fast` file (zero-copy).
1184    data: OwnedBytes,
1185    /// Per-entry (offset, len) pairs into `data` — built at open time.
1186    offsets: Vec<(u32, u32)>,
1187}
1188
1189impl TextDictReader {
1190    /// Open a zero-copy text dictionary from `file_data` starting at `dict_start`.
1191    pub fn open(file_data: &OwnedBytes, dict_start: usize, count: u32) -> io::Result<Self> {
1192        // First pass: scan len-prefixed entries to build offset table
1193        let dict_slice = file_data.as_slice();
1194        let mut pos = dict_start;
1195        let mut offsets = Vec::with_capacity(count as usize);
1196
1197        for _ in 0..count {
1198            if pos + 4 > dict_slice.len() {
1199                return Err(io::Error::new(
1200                    io::ErrorKind::UnexpectedEof,
1201                    "text dict truncated",
1202                ));
1203            }
1204            let len = u32::from_le_bytes([
1205                dict_slice[pos],
1206                dict_slice[pos + 1],
1207                dict_slice[pos + 2],
1208                dict_slice[pos + 3],
1209            ]) as usize;
1210            pos += 4;
1211            if pos + len > dict_slice.len() {
1212                return Err(io::Error::new(
1213                    io::ErrorKind::UnexpectedEof,
1214                    "text dict entry truncated",
1215                ));
1216            }
1217            // Validate UTF-8 eagerly
1218            std::str::from_utf8(&dict_slice[pos..pos + len]).map_err(|e| {
1219                io::Error::new(io::ErrorKind::InvalidData, format!("invalid utf8: {}", e))
1220            })?;
1221            offsets.push((pos as u32, len as u32));
1222            pos += len;
1223        }
1224
1225        // Slice the full dict region (dict_start..pos) zero-copy
1226        let data = file_data.slice(dict_start..pos);
1227
1228        // Adjust offsets to be relative to `data` (subtract dict_start)
1229        for entry in offsets.iter_mut() {
1230            entry.0 -= dict_start as u32;
1231        }
1232
1233        Ok(Self { data, offsets })
1234    }
1235
1236    /// Get string by ordinal — zero-copy borrow from the underlying file data.
1237    pub fn get(&self, ordinal: u32) -> Option<&str> {
1238        let &(off, len) = self.offsets.get(ordinal as usize)?;
1239        let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1240        // Safety: we validated UTF-8 in open()
1241        Some(unsafe { std::str::from_utf8_unchecked(slice) })
1242    }
1243
1244    /// Binary search for a string → ordinal.
1245    pub fn ordinal(&self, text: &str) -> Option<u64> {
1246        self.offsets
1247            .binary_search_by(|&(off, len)| {
1248                let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1249                // Safety: validated UTF-8 in open()
1250                let entry = unsafe { std::str::from_utf8_unchecked(slice) };
1251                entry.cmp(text)
1252            })
1253            .ok()
1254            .map(|i| i as u64)
1255    }
1256
1257    /// Number of entries in the dictionary.
1258    pub fn len(&self) -> u32 {
1259        self.offsets.len() as u32
1260    }
1261
1262    /// Whether the dictionary is empty.
1263    pub fn is_empty(&self) -> bool {
1264        self.offsets.is_empty()
1265    }
1266
1267    /// Iterate all entries.
1268    pub fn iter(&self) -> impl Iterator<Item = &str> {
1269        self.offsets.iter().map(|&(off, len)| {
1270            let slice = &self.data.as_slice()[off as usize..off as usize + len as usize];
1271            unsafe { std::str::from_utf8_unchecked(slice) }
1272        })
1273    }
1274}
1275
1276// ── File-level write/read ─────────────────────────────────────────────────
1277
1278/// Write fast-field TOC + footer.
1279pub fn write_fast_field_toc_and_footer(
1280    writer: &mut dyn Write,
1281    toc_offset: u64,
1282    entries: &[FastFieldTocEntry],
1283) -> io::Result<()> {
1284    for e in entries {
1285        e.write_to(writer)?;
1286    }
1287    writer.write_u64::<LittleEndian>(toc_offset)?;
1288    writer.write_u32::<LittleEndian>(entries.len() as u32)?;
1289    writer.write_u32::<LittleEndian>(FAST_FIELD_MAGIC)?;
1290    Ok(())
1291}
1292
1293/// Read fast-field footer from the last 16 bytes.
1294/// Returns (toc_offset, num_columns).
1295pub fn read_fast_field_footer(file_data: &[u8]) -> io::Result<(u64, u32)> {
1296    let len = file_data.len();
1297    if len < FAST_FIELD_FOOTER_SIZE as usize {
1298        return Err(io::Error::new(
1299            io::ErrorKind::UnexpectedEof,
1300            "fast field file too small for footer",
1301        ));
1302    }
1303    let footer = &file_data[len - FAST_FIELD_FOOTER_SIZE as usize..];
1304    let mut cursor = std::io::Cursor::new(footer);
1305    let toc_offset = cursor.read_u64::<LittleEndian>()?;
1306    let num_columns = cursor.read_u32::<LittleEndian>()?;
1307    let magic = cursor.read_u32::<LittleEndian>()?;
1308    if magic != FAST_FIELD_MAGIC {
1309        return Err(io::Error::new(
1310            io::ErrorKind::InvalidData,
1311            format!("bad fast field magic: 0x{:08x}", magic),
1312        ));
1313    }
1314    Ok((toc_offset, num_columns))
1315}
1316
1317/// Read all TOC entries from file data (FST2 format).
1318pub fn read_fast_field_toc(
1319    file_data: &[u8],
1320    toc_offset: u64,
1321    num_columns: u32,
1322) -> io::Result<Vec<FastFieldTocEntry>> {
1323    let start = toc_offset as usize;
1324    let expected = num_columns as usize * FAST_FIELD_TOC_ENTRY_SIZE;
1325    if start + expected > file_data.len() {
1326        return Err(io::Error::new(
1327            io::ErrorKind::UnexpectedEof,
1328            "fast field TOC out of bounds",
1329        ));
1330    }
1331    let mut cursor = std::io::Cursor::new(&file_data[start..start + expected]);
1332    let mut entries = Vec::with_capacity(num_columns as usize);
1333    for _ in 0..num_columns {
1334        entries.push(FastFieldTocEntry::read_from(&mut cursor)?);
1335    }
1336    Ok(entries)
1337}
1338
1339// ── Tests ─────────────────────────────────────────────────────────────────
1340
1341#[cfg(test)]
1342mod tests {
1343    use super::*;
1344
1345    #[test]
1346    fn test_zigzag_roundtrip() {
1347        for v in [0i64, 1, -1, 42, -42, i64::MAX, i64::MIN] {
1348            assert_eq!(zigzag_decode(zigzag_encode(v)), v);
1349        }
1350    }
1351
1352    #[test]
1353    fn test_f64_sortable_roundtrip() {
1354        for v in [0.0f64, 1.0, -1.0, f64::MAX, f64::MIN, f64::MIN_POSITIVE] {
1355            assert_eq!(sortable_u64_to_f64(f64_to_sortable_u64(v)), v);
1356        }
1357    }
1358
1359    #[test]
1360    fn test_f64_sortable_order() {
1361        let values = [-100.0f64, -1.0, -0.0, 0.0, 0.5, 1.0, 100.0];
1362        let encoded: Vec<u64> = values.iter().map(|&v| f64_to_sortable_u64(v)).collect();
1363        for i in 1..encoded.len() {
1364            assert!(
1365                encoded[i] >= encoded[i - 1],
1366                "{} >= {} failed for {} vs {}",
1367                encoded[i],
1368                encoded[i - 1],
1369                values[i],
1370                values[i - 1]
1371            );
1372        }
1373    }
1374
1375    #[test]
1376    fn test_bitpack_roundtrip() {
1377        let values: Vec<u64> = vec![0, 3, 7, 15, 0, 1, 6, 12];
1378        let bpv = 4u8;
1379        let mut packed = Vec::new();
1380        bitpack_write(&values, bpv, &mut packed);
1381
1382        for (i, &expected) in values.iter().enumerate() {
1383            let got = bitpack_read(&packed, bpv, i);
1384            assert_eq!(got, expected, "index {}", i);
1385        }
1386    }
1387
1388    #[test]
1389    fn test_bitpack_high_bpv_regression() {
1390        // Regression: bpv > 56 with non-zero bit_shift used to read wrong bits
1391        // because the old 8-byte fast path didn't check bit_shift + bpv <= 64.
1392        for bpv in [57u8, 58, 59, 60, 63, 64] {
1393            let max_val = if bpv == 64 {
1394                u64::MAX
1395            } else {
1396                (1u64 << bpv) - 1
1397            };
1398            let values: Vec<u64> = (0..32)
1399                .map(|i: u64| {
1400                    if max_val == u64::MAX {
1401                        i * 7
1402                    } else {
1403                        (i * 7) % (max_val + 1)
1404                    }
1405                })
1406                .collect();
1407            let mut packed = Vec::new();
1408            bitpack_write(&values, bpv, &mut packed);
1409            for (i, &expected) in values.iter().enumerate() {
1410                let got = bitpack_read(&packed, bpv, i);
1411                assert_eq!(got, expected, "high bpv={} index={}", bpv, i);
1412            }
1413        }
1414    }
1415
1416    #[test]
1417    fn test_bitpack_various_widths() {
1418        for bpv in [1u8, 2, 3, 5, 7, 8, 13, 16, 32, 64] {
1419            let max_val = if bpv == 64 {
1420                u64::MAX
1421            } else {
1422                (1u64 << bpv) - 1
1423            };
1424            let values: Vec<u64> = (0..100)
1425                .map(|i: u64| {
1426                    if max_val == u64::MAX {
1427                        i
1428                    } else {
1429                        i % (max_val + 1)
1430                    }
1431                })
1432                .collect();
1433            let mut packed = Vec::new();
1434            bitpack_write(&values, bpv, &mut packed);
1435
1436            for (i, &expected) in values.iter().enumerate() {
1437                let got = bitpack_read(&packed, bpv, i);
1438                assert_eq!(got, expected, "bpv={} index={}", bpv, i);
1439            }
1440        }
1441    }
1442
1443    /// Helper: wrap a Vec<u8> in OwnedBytes for tests.
1444    fn owned(buf: Vec<u8>) -> OwnedBytes {
1445        OwnedBytes::new(buf)
1446    }
1447
1448    #[test]
1449    fn test_writer_reader_u64_roundtrip() {
1450        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1451        writer.add_u64(0, 100);
1452        writer.add_u64(1, 200);
1453        writer.add_u64(2, 150);
1454        writer.add_u64(4, 300); // gap at doc_id=3
1455        writer.pad_to(5);
1456
1457        let mut buf = Vec::new();
1458        let (mut toc, _bytes) = writer.serialize(&mut buf, 0).unwrap();
1459        toc.field_id = 42;
1460
1461        // Write TOC + footer
1462        let toc_offset = buf.len() as u64;
1463        write_fast_field_toc_and_footer(&mut buf, toc_offset, &[toc]).unwrap();
1464
1465        // Read back
1466        let ob = owned(buf);
1467        let (toc_off, num_cols) = read_fast_field_footer(&ob).unwrap();
1468        assert_eq!(num_cols, 1);
1469        let tocs = read_fast_field_toc(&ob, toc_off, num_cols).unwrap();
1470        assert_eq!(tocs.len(), 1);
1471        assert_eq!(tocs[0].field_id, 42);
1472
1473        let reader = FastFieldReader::open(&ob, &tocs[0]).unwrap();
1474        assert_eq!(reader.get_u64(0), 100);
1475        assert_eq!(reader.get_u64(1), 200);
1476        assert_eq!(reader.get_u64(2), 150);
1477        assert_eq!(reader.get_u64(3), FAST_FIELD_MISSING); // gap → absent sentinel
1478        assert_eq!(reader.get_u64(4), 300);
1479    }
1480
1481    #[test]
1482    fn test_writer_reader_i64_roundtrip() {
1483        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::I64);
1484        writer.add_i64(0, -100);
1485        writer.add_i64(1, 50);
1486        writer.add_i64(2, 0);
1487        writer.pad_to(3);
1488
1489        let mut buf = Vec::new();
1490        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1491        let ob = owned(buf);
1492        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1493        assert_eq!(reader.get_i64(0), -100);
1494        assert_eq!(reader.get_i64(1), 50);
1495        assert_eq!(reader.get_i64(2), 0);
1496    }
1497
1498    #[test]
1499    fn test_writer_reader_f64_roundtrip() {
1500        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::F64);
1501        writer.add_f64(0, -1.5);
1502        writer.add_f64(1, 3.15);
1503        writer.add_f64(2, 0.0);
1504        writer.pad_to(3);
1505
1506        let mut buf = Vec::new();
1507        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1508        let ob = owned(buf);
1509        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1510        assert_eq!(reader.get_f64(0), -1.5);
1511        assert_eq!(reader.get_f64(1), 3.15);
1512        assert_eq!(reader.get_f64(2), 0.0);
1513    }
1514
1515    #[test]
1516    fn test_writer_reader_text_roundtrip() {
1517        let mut writer = FastFieldWriter::new_text();
1518        writer.add_text(0, "banana");
1519        writer.add_text(1, "apple");
1520        writer.add_text(2, "cherry");
1521        writer.add_text(3, "apple"); // duplicate
1522        // doc_id=4 has no value
1523        writer.pad_to(5);
1524
1525        let mut buf = Vec::new();
1526        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1527        let ob = owned(buf);
1528        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1529
1530        // Dictionary is sorted: apple=0, banana=1, cherry=2
1531        assert_eq!(reader.get_text(0), Some("banana"));
1532        assert_eq!(reader.get_text(1), Some("apple"));
1533        assert_eq!(reader.get_text(2), Some("cherry"));
1534        assert_eq!(reader.get_text(3), Some("apple"));
1535        assert_eq!(reader.get_text(4), None); // missing
1536
1537        // Ordinal lookups
1538        assert_eq!(reader.text_ordinal("apple"), Some(0));
1539        assert_eq!(reader.text_ordinal("banana"), Some(1));
1540        assert_eq!(reader.text_ordinal("cherry"), Some(2));
1541        assert_eq!(reader.text_ordinal("durian"), None);
1542    }
1543
1544    #[test]
1545    fn test_constant_column() {
1546        let mut writer = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1547        for i in 0..100 {
1548            writer.add_u64(i, 42);
1549        }
1550
1551        let mut buf = Vec::new();
1552        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1553
1554        let ob = owned(buf);
1555        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1556        for i in 0..100 {
1557            assert_eq!(reader.get_u64(i), 42);
1558        }
1559    }
1560
1561    // ── Multi-value tests ──
1562
1563    #[test]
1564    fn test_multi_value_u64_roundtrip() {
1565        let mut writer = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1566        // doc 0: [10, 20, 30]
1567        writer.add_u64(0, 10);
1568        writer.add_u64(0, 20);
1569        writer.add_u64(0, 30);
1570        // doc 1: [] (empty)
1571        // doc 2: [100]
1572        writer.add_u64(2, 100);
1573        // doc 3: [5, 15]
1574        writer.add_u64(3, 5);
1575        writer.add_u64(3, 15);
1576        writer.pad_to(4);
1577
1578        let mut buf = Vec::new();
1579        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1580        assert!(toc.multi);
1581        assert_eq!(toc.num_docs, 4);
1582
1583        let ob = owned(buf);
1584        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1585        assert!(reader.multi);
1586
1587        // doc 0: first value
1588        assert_eq!(reader.get_u64(0), 10);
1589        let (s, e) = reader.value_range(0);
1590        assert_eq!(e - s, 3);
1591        assert_eq!(reader.get_value_at(s), 10);
1592        assert_eq!(reader.get_value_at(s + 1), 20);
1593        assert_eq!(reader.get_value_at(s + 2), 30);
1594
1595        // doc 1: empty → sentinel
1596        assert_eq!(reader.get_u64(1), FAST_FIELD_MISSING);
1597        let (s, e) = reader.value_range(1);
1598        assert_eq!(s, e);
1599        assert!(!reader.has_value(1));
1600
1601        // doc 2: [100]
1602        assert_eq!(reader.get_u64(2), 100);
1603        assert!(reader.has_value(2));
1604
1605        // doc 3: [5, 15]
1606        assert_eq!(reader.get_u64(3), 5);
1607        let (s, e) = reader.value_range(3);
1608        assert_eq!(e - s, 2);
1609        assert_eq!(reader.get_value_at(s), 5);
1610        assert_eq!(reader.get_value_at(s + 1), 15);
1611    }
1612
1613    #[test]
1614    fn test_multi_value_text_roundtrip() {
1615        let mut writer = FastFieldWriter::new_text_multi();
1616        // doc 0: ["banana", "apple"]
1617        writer.add_text(0, "banana");
1618        writer.add_text(0, "apple");
1619        // doc 1: ["cherry"]
1620        writer.add_text(1, "cherry");
1621        // doc 2: [] empty
1622        writer.pad_to(3);
1623
1624        let mut buf = Vec::new();
1625        let (toc, _) = writer.serialize(&mut buf, 0).unwrap();
1626        assert!(toc.multi);
1627
1628        let ob = owned(buf);
1629        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1630
1631        // doc 0: first value ordinal → banana is ordinal 1 (apple=0, banana=1, cherry=2)
1632        let (s, e) = reader.value_range(0);
1633        assert_eq!(e - s, 2);
1634        let ord0 = reader.get_value_at(s);
1635        let ord1 = reader.get_value_at(s + 1);
1636        assert_eq!(reader.text_dict().unwrap().get(ord0 as u32), Some("banana"));
1637        assert_eq!(reader.text_dict().unwrap().get(ord1 as u32), Some("apple"));
1638
1639        // doc 1: cherry
1640        let (s, e) = reader.value_range(1);
1641        assert_eq!(e - s, 1);
1642        let ord = reader.get_value_at(s);
1643        assert_eq!(reader.text_dict().unwrap().get(ord as u32), Some("cherry"));
1644
1645        // doc 2: empty
1646        assert!(!reader.has_value(2));
1647    }
1648
1649    #[test]
1650    fn test_multi_value_full_toc_roundtrip() {
1651        let mut writer = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1652        writer.add_u64(0, 1);
1653        writer.add_u64(0, 2);
1654        writer.add_u64(1, 3);
1655        writer.pad_to(2);
1656
1657        let mut buf = Vec::new();
1658        let (mut toc, _) = writer.serialize(&mut buf, 0).unwrap();
1659        toc.field_id = 7;
1660
1661        let toc_offset = buf.len() as u64;
1662        write_fast_field_toc_and_footer(&mut buf, toc_offset, &[toc]).unwrap();
1663
1664        let ob = owned(buf);
1665        let (toc_off, num_cols) = read_fast_field_footer(&ob).unwrap();
1666        let tocs = read_fast_field_toc(&ob, toc_off, num_cols).unwrap();
1667        assert_eq!(tocs[0].field_id, 7);
1668        assert!(tocs[0].multi);
1669
1670        let reader = FastFieldReader::open(&ob, &tocs[0]).unwrap();
1671        assert_eq!(reader.get_u64(0), 1);
1672        assert_eq!(reader.get_u64(1), 3);
1673    }
1674
1675    /// Helper: serialize a writer into a blocked column, return (block_data, block_dict, block_index_entry)
1676    /// by stripping the blocked header.
1677    fn serialize_single_block(writer: &mut FastFieldWriter) -> (Vec<u8>, Vec<u8>, BlockIndexEntry) {
1678        let mut buf = Vec::new();
1679        let (_toc, _) = writer.serialize(&mut buf, 0).unwrap();
1680        // Strip: [num_blocks(4)] [BlockIndexEntry(16)] [data...] [dict...]
1681        let mut cursor = std::io::Cursor::new(&buf[4..4 + BLOCK_INDEX_ENTRY_SIZE]);
1682        let entry = BlockIndexEntry::read_from(&mut cursor).unwrap();
1683        let data_start = 4 + BLOCK_INDEX_ENTRY_SIZE;
1684        let data_end = data_start + entry.data_len as usize;
1685        let dict_end = data_end + entry.dict_len as usize;
1686        let data = buf[data_start..data_end].to_vec();
1687        let dict = if dict_end > data_end {
1688            buf[data_end..dict_end].to_vec()
1689        } else {
1690            Vec::new()
1691        };
1692        (data, dict, entry)
1693    }
1694
1695    /// Manually assemble a multi-block column from individual block payloads.
1696    fn assemble_blocked_column(
1697        field_id: u32,
1698        column_type: FastFieldColumnType,
1699        multi: bool,
1700        blocks: &[(u32, &[u8], u32, &[u8])], // (num_docs, data, dict_count, dict)
1701    ) -> (Vec<u8>, FastFieldTocEntry) {
1702        use byteorder::{LittleEndian, WriteBytesExt};
1703
1704        let mut buf = Vec::new();
1705        let num_blocks = blocks.len() as u32;
1706
1707        // num_blocks
1708        buf.write_u32::<LittleEndian>(num_blocks).unwrap();
1709
1710        // block index
1711        for &(num_docs, data, dict_count, dict) in blocks {
1712            let entry = BlockIndexEntry {
1713                num_docs,
1714                data_len: data.len() as u32,
1715                dict_count,
1716                dict_len: dict.len() as u32,
1717            };
1718            entry.write_to(&mut buf).unwrap();
1719        }
1720
1721        // block data + dicts
1722        let mut total_docs = 0u32;
1723        for &(num_docs, data, _, dict) in blocks {
1724            buf.extend_from_slice(data);
1725            buf.extend_from_slice(dict);
1726            total_docs += num_docs;
1727        }
1728
1729        let data_len = buf.len() as u64;
1730
1731        // Write TOC + footer
1732        let toc = FastFieldTocEntry {
1733            field_id,
1734            column_type,
1735            multi,
1736            data_offset: 0,
1737            data_len,
1738            num_docs: total_docs,
1739            dict_offset: 0,
1740            dict_count: 0,
1741        };
1742
1743        let toc_offset = buf.len() as u64;
1744        write_fast_field_toc_and_footer(&mut buf, toc_offset, std::slice::from_ref(&toc)).unwrap();
1745
1746        (buf, toc)
1747    }
1748
1749    #[test]
1750    fn test_multi_block_numeric_roundtrip() {
1751        // Block A: 3 docs [10, 20, 30]
1752        let mut wa = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1753        wa.add_u64(0, 10);
1754        wa.add_u64(1, 20);
1755        wa.add_u64(2, 30);
1756        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1757
1758        // Block B: 2 docs [40, 50]
1759        let mut wb = FastFieldWriter::new_numeric(FastFieldColumnType::U64);
1760        wb.add_u64(0, 40);
1761        wb.add_u64(1, 50);
1762        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1763
1764        let (buf, toc) = assemble_blocked_column(
1765            1,
1766            FastFieldColumnType::U64,
1767            false,
1768            &[
1769                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1770                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1771            ],
1772        );
1773
1774        let ob = owned(buf);
1775        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1776
1777        assert_eq!(reader.num_docs, 5);
1778        assert_eq!(reader.num_blocks(), 2);
1779        assert_eq!(reader.get_u64(0), 10);
1780        assert_eq!(reader.get_u64(1), 20);
1781        assert_eq!(reader.get_u64(2), 30);
1782        assert_eq!(reader.get_u64(3), 40);
1783        assert_eq!(reader.get_u64(4), 50);
1784    }
1785
1786    #[test]
1787    fn test_multi_block_text_roundtrip() {
1788        // Block A: 2 docs ["alpha", "beta"]
1789        let mut wa = FastFieldWriter::new_text();
1790        wa.add_text(0, "alpha");
1791        wa.add_text(1, "beta");
1792        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1793
1794        // Block B: 2 docs ["gamma", "alpha"]  (alpha shared with block A)
1795        let mut wb = FastFieldWriter::new_text();
1796        wb.add_text(0, "gamma");
1797        wb.add_text(1, "alpha");
1798        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1799
1800        let (buf, toc) = assemble_blocked_column(
1801            2,
1802            FastFieldColumnType::TextOrdinal,
1803            false,
1804            &[
1805                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1806                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1807            ],
1808        );
1809
1810        let ob = owned(buf);
1811        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1812
1813        assert_eq!(reader.num_docs, 4);
1814        assert_eq!(reader.num_blocks(), 2);
1815
1816        // Global dict should be: alpha(0), beta(1), gamma(2)
1817        assert_eq!(reader.text_dict().unwrap().len(), 3);
1818
1819        // Block A: alpha=local0→global0, beta=local1→global1
1820        assert_eq!(reader.get_text(0), Some("alpha"));
1821        assert_eq!(reader.get_text(1), Some("beta"));
1822
1823        // Block B: gamma=local1→global2, alpha=local0→global0
1824        assert_eq!(reader.get_text(2), Some("gamma"));
1825        assert_eq!(reader.get_text(3), Some("alpha"));
1826
1827        // Global ordinal lookups
1828        assert_eq!(reader.text_ordinal("alpha"), Some(0));
1829        assert_eq!(reader.text_ordinal("beta"), Some(1));
1830        assert_eq!(reader.text_ordinal("gamma"), Some(2));
1831
1832        // get_u64 returns global ordinals
1833        assert_eq!(reader.get_u64(0), 0); // alpha
1834        assert_eq!(reader.get_u64(1), 1); // beta
1835        assert_eq!(reader.get_u64(2), 2); // gamma
1836        assert_eq!(reader.get_u64(3), 0); // alpha
1837    }
1838
1839    #[test]
1840    fn test_multi_block_multi_value_numeric() {
1841        // Block A: doc0=[1,2], doc1=[3]
1842        let mut wa = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1843        wa.add_u64(0, 1);
1844        wa.add_u64(0, 2);
1845        wa.add_u64(1, 3);
1846        wa.pad_to(2);
1847        let (data_a, dict_a, entry_a) = serialize_single_block(&mut wa);
1848
1849        // Block B: doc0=[4,5,6], doc1=[]
1850        let mut wb = FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64);
1851        wb.add_u64(0, 4);
1852        wb.add_u64(0, 5);
1853        wb.add_u64(0, 6);
1854        wb.pad_to(2);
1855        let (data_b, dict_b, entry_b) = serialize_single_block(&mut wb);
1856
1857        let (buf, toc) = assemble_blocked_column(
1858            3,
1859            FastFieldColumnType::U64,
1860            true,
1861            &[
1862                (entry_a.num_docs, &data_a, entry_a.dict_count, &dict_a),
1863                (entry_b.num_docs, &data_b, entry_b.dict_count, &dict_b),
1864            ],
1865        );
1866
1867        let ob = owned(buf);
1868        let reader = FastFieldReader::open(&ob, &toc).unwrap();
1869
1870        assert_eq!(reader.num_docs, 4);
1871        assert_eq!(reader.num_blocks(), 2);
1872
1873        // doc0 (block A): [1, 2]
1874        assert_eq!(reader.get_multi_values(0), vec![1, 2]);
1875        // doc1 (block A): [3]
1876        assert_eq!(reader.get_multi_values(1), vec![3]);
1877        // doc2 (block B, local 0): [4, 5, 6]
1878        assert_eq!(reader.get_multi_values(2), vec![4, 5, 6]);
1879        // doc3 (block B, local 1): []
1880        assert_eq!(reader.get_multi_values(3), Vec::<u64>::new());
1881    }
1882}