Skip to main content

luci/segment/
reader.rs

1//! Segment reader: parse segment bytes, provide typed component accessors.
2//!
3//! Parses a segment produced by [`SegmentBuilder`], validates checksums, and
4//! provides accessors for term lookups, posting list iteration, norms, and
5//! document store retrieval.
6//!
7//! See [[architecture-segment-layout]] and [[architecture-query-execution]].
8
9use std::collections::HashMap;
10use std::sync::RwLock;
11
12use crate::core::{FieldId, Result, SegmentId};
13
14use crate::inverted::norms::FieldNormsReader;
15use crate::inverted::postings::{
16    BlockMaxPostingListReader, PositionPostingListReader, PostingListReader, has_block_max,
17    has_positions,
18};
19use crate::inverted::term_dict::TermDict;
20use crate::segment::format::{ComponentType, SegmentHeader};
21use crate::spatial::geo::GeoPointStore;
22use crate::store::doc_store::DocStoreReader;
23
24/// Per-field inverted index data slices.
25struct FieldIndex<'a> {
26    term_dict: TermDict<'a>,
27    postings_data: &'a [u8],
28    norms_data: Option<&'a [u8]>,
29}
30
31/// Reads a segment produced by [`super::builder::SegmentBuilder`].
32///
33/// Owns the segment data and provides typed accessors for all components.
34pub struct SegmentReader {
35    data: Vec<u8>,
36    header: SegmentHeader,
37    #[allow(dead_code)]
38    header_size: usize,
39    /// Cached geo point stores (deserialized lazily, with lat-sorted index).
40    geo_cache: RwLock<HashMap<FieldId, GeoPointStore>>,
41    /// Cached geo shape stores (deserialized lazily).
42    geo_shape_cache: RwLock<HashMap<FieldId, crate::spatial::shape::GeoShapeStore>>,
43}
44
45impl SegmentReader {
46    /// Open a segment from its raw bytes.
47    ///
48    /// Validates the header magic and checksum. Component checksums are
49    /// validated lazily on access.
50    pub fn open(data: Vec<u8>) -> Result<Self> {
51        let (header, header_size) = SegmentHeader::from_bytes(&data)?;
52        Ok(Self {
53            data,
54            header,
55            header_size,
56            geo_cache: RwLock::new(HashMap::new()),
57            geo_shape_cache: RwLock::new(HashMap::new()),
58        })
59    }
60
61    pub fn segment_id(&self) -> SegmentId {
62        self.header.segment_id
63    }
64
65    pub fn doc_count(&self) -> u32 {
66        self.header.doc_count
67    }
68
69    pub fn max_doc(&self) -> u32 {
70        self.header.max_doc
71    }
72
73    pub fn header(&self) -> &SegmentHeader {
74        &self.header
75    }
76
77    /// Get the document store reader.
78    pub fn doc_store(&self) -> DocStoreReader<'_> {
79        let comp = self
80            .header
81            .component(ComponentType::DocStore)
82            .expect("segment must have a DocStore component");
83        let start = comp.offset as usize;
84        let end = start + comp.length as usize;
85        DocStoreReader::open(&self.data[start..end])
86    }
87
88    /// Look up a term in a field's inverted index and return a posting list reader.
89    ///
90    /// Works for both position-aware and non-position formats. Positions are
91    /// skipped — only doc_id and tf are returned. Use `postings_with_positions`
92    /// for position-aware reading (phrase queries).
93    pub fn postings(&self, field_id: FieldId, term: &str) -> Option<PostingListReader<'_>> {
94        let field_index = self.field_index(field_id)?;
95        let posting_offset = field_index.term_dict.get(term)?;
96        let postings_data = &field_index.postings_data[posting_offset as usize..];
97        Some(PostingListReader::new(postings_data))
98    }
99
100    /// Look up a term and return a block-max posting list reader (if the posting
101    /// list uses the block-max format). Returns `None` for old-format postings.
102    pub fn postings_block_max(
103        &self,
104        field_id: FieldId,
105        term: &str,
106    ) -> Option<BlockMaxPostingListReader<'_>> {
107        let field_index = self.field_index(field_id)?;
108        let posting_offset = field_index.term_dict.get(term)?;
109        let postings_data = &field_index.postings_data[posting_offset as usize..];
110        if has_block_max(postings_data) {
111            Some(BlockMaxPostingListReader::new(postings_data))
112        } else {
113            None
114        }
115    }
116
117    /// Look up a term and return a position-aware posting list reader.
118    ///
119    /// Returns `None` if the field/term doesn't exist or the postings don't
120    /// have positions encoded.
121    pub fn postings_with_positions(
122        &self,
123        field_id: FieldId,
124        term: &str,
125    ) -> Option<PositionPostingListReader<'_>> {
126        let field_index = self.field_index(field_id)?;
127        let posting_offset = field_index.term_dict.get(term)?;
128        let postings_data = &field_index.postings_data[posting_offset as usize..];
129        if has_positions(postings_data) {
130            Some(PositionPostingListReader::new(postings_data))
131        } else {
132            None
133        }
134    }
135
136    /// Get all terms with a given prefix and their doc frequencies.
137    pub fn terms_with_prefix(&self, field_id: FieldId, prefix: &str) -> Vec<(String, u32)> {
138        let Some(field_index) = self.field_index(field_id) else {
139            return Vec::new();
140        };
141        field_index
142            .term_dict
143            .prefix_iter(prefix)
144            .into_iter()
145            .map(|(term, offset)| {
146                let postings_data = &field_index.postings_data[offset as usize..];
147                let reader = PostingListReader::new(postings_data);
148                (term, reader.len())
149            })
150            .collect()
151    }
152
153    /// Search terms using an FST automaton. Returns matching (term, doc_count) pairs.
154    /// The automaton prunes non-matching FST subtrees for O(matching) complexity.
155    pub fn automaton_search<A: fst::Automaton>(
156        &self,
157        field_id: FieldId,
158        aut: A,
159    ) -> Vec<(String, u32)> {
160        let Some(field_index) = self.field_index(field_id) else {
161            return Vec::new();
162        };
163        field_index
164            .term_dict
165            .automaton_search(aut)
166            .into_iter()
167            .map(|(term, offset)| {
168                let postings_data = &field_index.postings_data[offset as usize..];
169                let reader = PostingListReader::new(postings_data);
170                (term, reader.len())
171            })
172            .collect()
173    }
174
175    /// Get the term count for a specific term in a field (number of docs containing it).
176    pub fn doc_freq(&self, field_id: FieldId, term: &str) -> u32 {
177        match self.postings(field_id, term) {
178            Some(reader) => reader.len(),
179            None => 0,
180        }
181    }
182
183    /// Get the parent bitset (if this segment has nested documents).
184    pub fn parent_bitset(&self) -> Option<&[bool]> {
185        self.header.parent_bitset.as_deref()
186    }
187
188    /// Get geo point store for a geo_point field.
189    ///
190    /// The store is deserialized and lat-sorted on first access, then cached
191    /// for subsequent queries on the same segment.
192    pub fn geo_points(&self, field_id: FieldId) -> Option<GeoPointStore> {
193        // Return from cache if available
194        {
195            let cache = self.geo_cache.read().unwrap();
196            if let Some(store) = cache.get(&field_id) {
197                return Some(store.clone());
198            }
199        }
200
201        // Deserialize from segment data
202        let comp = self.header.component(ComponentType::Spatial)?;
203        let start = comp.offset as usize;
204        let spatial_data = &self.data[start..start + comp.length as usize];
205
206        let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
207        let mut pos = 2;
208        for _ in 0..num_fields {
209            let fid = FieldId::new(u16::from_le_bytes([
210                spatial_data[pos],
211                spatial_data[pos + 1],
212            ]));
213            pos += 2;
214            let sub_type = spatial_data[pos];
215            pos += 1;
216            let data_len =
217                u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
218            pos += 4;
219
220            if fid == field_id && sub_type == 0 {
221                let store = GeoPointStore::from_bytes(&spatial_data[pos..pos + data_len]);
222                self.geo_cache
223                    .write()
224                    .unwrap()
225                    .insert(field_id, store.clone());
226                return Some(store);
227            }
228            pos += data_len;
229        }
230        None
231    }
232
233    /// Get geo shape store for a geo_shape field.
234    ///
235    /// The store is deserialized on first access, then cached.
236    pub fn geo_shapes(&self, field_id: FieldId) -> Option<crate::spatial::shape::GeoShapeStore> {
237        {
238            let cache = self.geo_shape_cache.read().unwrap();
239            if let Some(store) = cache.get(&field_id) {
240                return Some(store.clone());
241            }
242        }
243
244        let comp = self.header.component(ComponentType::Spatial)?;
245        let start = comp.offset as usize;
246        let spatial_data = &self.data[start..start + comp.length as usize];
247
248        let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
249        let mut pos = 2;
250        for _ in 0..num_fields {
251            let fid = FieldId::new(u16::from_le_bytes([
252                spatial_data[pos],
253                spatial_data[pos + 1],
254            ]));
255            pos += 2;
256            let sub_type = spatial_data[pos];
257            pos += 1;
258            let data_len =
259                u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
260            pos += 4;
261
262            if fid == field_id && sub_type == 1 {
263                let store = crate::spatial::shape::GeoShapeStore::from_bytes(
264                    &spatial_data[pos..pos + data_len],
265                );
266                self.geo_shape_cache
267                    .write()
268                    .unwrap()
269                    .insert(field_id, store.clone());
270                return Some(store);
271            }
272            pos += data_len;
273        }
274        None
275    }
276
277    /// Get a column reader for a doc_values field.
278    pub fn column(&self, field_id: FieldId) -> Option<crate::columnar::reader::ColumnReader<'_>> {
279        let comp = self.header.component(ComponentType::Columnar)?;
280        let start = comp.offset as usize;
281        let end = start + comp.length as usize;
282        let columnar = crate::columnar::reader::ColumnarReader::open(&self.data[start..end]);
283        columnar.column(field_id)
284    }
285
286    /// Get the field norms reader for a field.
287    pub fn norms(&self, field_id: FieldId) -> Option<FieldNormsReader<'_>> {
288        let field_index = self.field_index(field_id)?;
289        let norms_data = field_index.norms_data?;
290        Some(FieldNormsReader::open(norms_data))
291    }
292
293    /// Compute the average field length for a field (for BM25).
294    pub fn avg_field_length(&self, field_id: FieldId) -> f32 {
295        match self.norms(field_id) {
296            Some(norms_reader) => {
297                if norms_reader.doc_count() == 0 {
298                    return 0.0;
299                }
300                let mut total = 0.0f64;
301                for i in 0..norms_reader.doc_count() {
302                    total += norms_reader.norm(crate::core::DocId::new(i)) as f64;
303                }
304                (total / norms_reader.doc_count() as f64) as f32
305            }
306            None => 0.0,
307        }
308    }
309
310    /// Parse the inverted index component and locate a specific field's data.
311    fn field_index(&self, field_id: FieldId) -> Option<FieldIndex<'_>> {
312        let comp = self.header.component(ComponentType::InvertedIndex)?;
313        let inv_start = comp.offset as usize;
314        let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
315
316        // Parse inverted index internal format
317        let num_fields = u16::from_le_bytes([inv_data[0], inv_data[1]]) as usize;
318        let mut pos = 2;
319
320        for _ in 0..num_fields {
321            let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
322            pos += 2;
323
324            // Term dict
325            let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
326            pos += 4;
327            let td_data = &inv_data[pos..pos + td_len];
328            pos += td_len;
329
330            // Postings data
331            let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
332            pos += 4;
333            let pd_data = &inv_data[pos..pos + pd_len];
334            pos += pd_len;
335
336            // Norms
337            let has_norms = inv_data[pos] != 0;
338            pos += 1;
339            let norms_data = if has_norms {
340                let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
341                pos += 4;
342                let nd = &inv_data[pos..pos + n_len];
343                pos += n_len;
344                Some(nd)
345            } else {
346                None
347            };
348
349            if fid == field_id {
350                return Some(FieldIndex {
351                    term_dict: TermDict::open(td_data),
352                    postings_data: pd_data,
353                    norms_data,
354                });
355            }
356        }
357
358        None
359    }
360
361    /// Get all terms in a field (useful for testing/debugging).
362    pub fn terms(&self, field_id: FieldId) -> Vec<String> {
363        let Some(field_index) = self.field_index(field_id) else {
364            return Vec::new();
365        };
366        // Walk the term dict — we need to iterate all entries.
367        // The term dict doesn't expose an iterator, so we use the internal format.
368        // For now, collect by trying common patterns. This is a test helper.
369        // A proper approach would add an iterator to TermDict, but for M1
370        // we keep it simple.
371        let td = &field_index.term_dict;
372        let mut result = Vec::new();
373
374        // Binary-search-based term dict doesn't expose iteration.
375        // We'll use the raw data to iterate.
376        if td.len() == 0 {
377            return result;
378        }
379
380        // Access the underlying data through the term dict's known format:
381        // [num_terms: u32] [term_data...] [offset_index: u32 * num_terms]
382        // We can get the term dict data from the field index.
383        let comp = self.header.component(ComponentType::InvertedIndex).unwrap();
384        let inv_start = comp.offset as usize;
385        let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
386        let mut pos = 2;
387
388        for _ in 0..u16::from_le_bytes([inv_data[0], inv_data[1]]) {
389            let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
390            pos += 2;
391
392            let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
393            pos += 4;
394            let td_data = &inv_data[pos..pos + td_len];
395            pos += td_len;
396
397            let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
398            pos += 4 + pd_len;
399
400            let has_norms = inv_data[pos] != 0;
401            pos += 1;
402            if has_norms {
403                let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
404                pos += 4 + n_len;
405            }
406
407            if fid == field_id {
408                // Parse term dict via FST API
409                let td = crate::inverted::term_dict::TermDict::open(td_data);
410                for (term, _) in td.prefix_iter("") {
411                    result.push(term);
412                }
413                break;
414            }
415        }
416
417        result
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::analysis::Token;
425    use crate::core::DocId;
426    use crate::mapping::{FieldType, Mapping};
427    use crate::segment::builder::SegmentBuilder;
428
429    fn make_tokens(terms: &[&str]) -> Vec<Token> {
430        terms
431            .iter()
432            .enumerate()
433            .map(|(i, t)| Token::new(*t, 0, t.len(), i as u32))
434            .collect()
435    }
436
437    fn build_single_doc_segment() -> Vec<u8> {
438        let schema = Mapping::builder().field("title", FieldType::Text).build();
439        let mut builder = SegmentBuilder::new(SegmentId::new(1), &schema);
440        builder.add_document(
441            &[(FieldId::new(0), make_tokens(&["hello", "world"]))],
442            br#"{"title":"hello world"}"#,
443        );
444        builder.build()
445    }
446
447    #[test]
448    fn open_valid_segment() {
449        let data = build_single_doc_segment();
450        let reader = SegmentReader::open(data).unwrap();
451        assert_eq!(reader.segment_id(), SegmentId::new(1));
452        assert_eq!(reader.doc_count(), 1);
453    }
454
455    #[test]
456    fn reject_invalid_magic() {
457        let mut data = build_single_doc_segment();
458        data[0] = b'X';
459        assert!(SegmentReader::open(data).is_err());
460    }
461
462    #[test]
463    fn reject_bad_checksum() {
464        let mut data = build_single_doc_segment();
465        // Corrupt a byte in the header (after magic but before checksum range ends)
466        data[12] ^= 0xFF; // doc_count byte
467        assert!(SegmentReader::open(data).is_err());
468    }
469
470    #[test]
471    fn term_lookup() {
472        let data = build_single_doc_segment();
473        let reader = SegmentReader::open(data).unwrap();
474
475        // "hello" and "world" should be found
476        let terms = reader.terms(FieldId::new(0));
477        assert!(terms.contains(&"hello".to_string()));
478        assert!(terms.contains(&"world".to_string()));
479    }
480
481    #[test]
482    fn posting_iteration() {
483        let data = build_single_doc_segment();
484        let reader = SegmentReader::open(data).unwrap();
485
486        let mut postings = reader.postings(FieldId::new(0), "hello").unwrap();
487        let (doc_id, tf) = postings.next().unwrap();
488        assert_eq!(doc_id, DocId::new(0));
489        assert_eq!(tf, 1);
490        assert!(postings.next().is_none());
491    }
492
493    #[test]
494    fn doc_retrieval() {
495        let data = build_single_doc_segment();
496        let reader = SegmentReader::open(data).unwrap();
497        let store = reader.doc_store();
498        let doc = store.get(0).unwrap();
499        assert_eq!(doc, br#"{"title":"hello world"}"#);
500    }
501
502    #[test]
503    fn norms_lookup() {
504        let data = build_single_doc_segment();
505        let reader = SegmentReader::open(data).unwrap();
506
507        let norms = reader.norms(FieldId::new(0)).unwrap();
508        // Document has 2 tokens in the title field
509        assert_eq!(norms.norm(DocId::new(0)), 2.0);
510    }
511
512    #[test]
513    fn missing_term_returns_none() {
514        let data = build_single_doc_segment();
515        let reader = SegmentReader::open(data).unwrap();
516        assert!(reader.postings(FieldId::new(0), "nonexistent").is_none());
517    }
518
519    #[test]
520    fn missing_field_returns_none() {
521        let data = build_single_doc_segment();
522        let reader = SegmentReader::open(data).unwrap();
523        assert!(reader.postings(FieldId::new(99), "hello").is_none());
524    }
525
526    #[test]
527    fn end_to_end_multi_doc() {
528        let schema = Mapping::builder()
529            .field("body", FieldType::Text)
530            .field("tag", FieldType::Keyword)
531            .build();
532        let mut builder = SegmentBuilder::new(SegmentId::new(42), &schema);
533
534        builder.add_document(
535            &[
536                (
537                    FieldId::new(0),
538                    make_tokens(&["the", "quick", "brown", "fox"]),
539                ),
540                (FieldId::new(1), make_tokens(&["animal"])),
541            ],
542            br#"{"body":"the quick brown fox","tag":"animal"}"#,
543        );
544        builder.add_document(
545            &[
546                (FieldId::new(0), make_tokens(&["the", "lazy", "dog"])),
547                (FieldId::new(1), make_tokens(&["animal"])),
548            ],
549            br#"{"body":"the lazy dog","tag":"animal"}"#,
550        );
551        builder.add_document(
552            &[
553                (FieldId::new(0), make_tokens(&["quick", "search", "engine"])),
554                (FieldId::new(1), make_tokens(&["tech"])),
555            ],
556            br#"{"body":"quick search engine","tag":"tech"}"#,
557        );
558
559        let data = builder.build();
560        let reader = SegmentReader::open(data).unwrap();
561
562        assert_eq!(reader.doc_count(), 3);
563
564        // "the" appears in docs 0, 1
565        let mut postings = reader.postings(FieldId::new(0), "the").unwrap();
566        assert_eq!(postings.next().unwrap().0, DocId::new(0));
567        assert_eq!(postings.next().unwrap().0, DocId::new(1));
568        assert!(postings.next().is_none());
569
570        // "quick" appears in docs 0, 2
571        let mut postings = reader.postings(FieldId::new(0), "quick").unwrap();
572        assert_eq!(postings.next().unwrap().0, DocId::new(0));
573        assert_eq!(postings.next().unwrap().0, DocId::new(2));
574        assert!(postings.next().is_none());
575
576        // "animal" tag in docs 0, 1
577        let mut postings = reader.postings(FieldId::new(1), "animal").unwrap();
578        assert_eq!(postings.next().unwrap().0, DocId::new(0));
579        assert_eq!(postings.next().unwrap().0, DocId::new(1));
580        assert!(postings.next().is_none());
581
582        // Doc store
583        let store = reader.doc_store();
584        let doc0: serde_json::Value = serde_json::from_slice(&store.get(0).unwrap()).unwrap();
585        assert_eq!(doc0["tag"], "animal");
586        let doc2: serde_json::Value = serde_json::from_slice(&store.get(2).unwrap()).unwrap();
587        assert_eq!(doc2["tag"], "tech");
588
589        // Norms — body field: doc0=4, doc1=3, doc2=3
590        let norms = reader.norms(FieldId::new(0)).unwrap();
591        assert_eq!(norms.norm(DocId::new(0)), 4.0);
592        assert_eq!(norms.norm(DocId::new(1)), 3.0);
593        assert_eq!(norms.norm(DocId::new(2)), 3.0);
594
595        // doc_freq
596        assert_eq!(reader.doc_freq(FieldId::new(0), "the"), 2);
597        assert_eq!(reader.doc_freq(FieldId::new(0), "quick"), 2);
598        assert_eq!(reader.doc_freq(FieldId::new(0), "fox"), 1);
599        assert_eq!(reader.doc_freq(FieldId::new(0), "missing"), 0);
600    }
601}