hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
11    RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18/// Vector index type - RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
19#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22    /// RaBitQ - binary quantization, good for small datasets
23    RaBitQ(Arc<RaBitQIndex>),
24    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
25    IVF {
26        index: Arc<IVFRaBitQIndex>,
27        codebook: Arc<RaBitQCodebook>,
28    },
29    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
30    ScaNN {
31        index: Arc<IVFPQIndex>,
32        codebook: Arc<PQCodebook>,
33    },
34}
35
36/// Async segment reader with lazy loading
37///
38/// - Term dictionary: only index loaded, blocks loaded on-demand
39/// - Postings: loaded on-demand per term via HTTP range requests
40/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
41pub struct AsyncSegmentReader {
42    meta: SegmentMeta,
43    /// Term dictionary with lazy block loading
44    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
45    /// Postings file handle - fetches ranges on demand
46    postings_handle: LazyFileHandle,
47    /// Document store with lazy block loading
48    store: Arc<AsyncStoreReader>,
49    schema: Arc<Schema>,
50    /// Base doc_id offset for this segment
51    doc_id_offset: DocId,
52    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
53    vector_indexes: FxHashMap<u32, VectorIndex>,
54    /// Shared coarse centroids for IVF search (loaded once)
55    coarse_centroids: Option<Arc<CoarseCentroids>>,
56}
57
58impl AsyncSegmentReader {
59    /// Open a segment with lazy loading
60    pub async fn open<D: Directory>(
61        dir: &D,
62        segment_id: SegmentId,
63        schema: Arc<Schema>,
64        doc_id_offset: DocId,
65        cache_blocks: usize,
66    ) -> Result<Self> {
67        let files = SegmentFiles::new(segment_id.0);
68
69        // Read metadata (small, always loaded)
70        let meta_slice = dir.open_read(&files.meta).await?;
71        let meta_bytes = meta_slice.read_bytes().await?;
72        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
73        debug_assert_eq!(meta.id, segment_id.0);
74
75        // Open term dictionary with lazy loading (fetches ranges on demand)
76        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
77        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
78
79        // Get postings file handle (lazy - fetches ranges on demand)
80        let postings_handle = dir.open_lazy(&files.postings).await?;
81
82        // Open store with lazy loading
83        let store_handle = dir.open_lazy(&files.store).await?;
84        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
85
86        // Load dense vector indexes from unified .vectors file
87        let (vector_indexes, coarse_centroids) =
88            Self::load_vectors_file(dir, &files, &schema).await?;
89
90        Ok(Self {
91            meta,
92            term_dict: Arc::new(term_dict),
93            postings_handle,
94            store: Arc::new(store),
95            schema,
96            doc_id_offset,
97            vector_indexes,
98            coarse_centroids,
99        })
100    }
101
102    pub fn meta(&self) -> &SegmentMeta {
103        &self.meta
104    }
105
106    pub fn num_docs(&self) -> u32 {
107        self.meta.num_docs
108    }
109
110    /// Get average field length for BM25F scoring
111    pub fn avg_field_len(&self, field: Field) -> f32 {
112        self.meta.avg_field_len(field)
113    }
114
115    pub fn doc_id_offset(&self) -> DocId {
116        self.doc_id_offset
117    }
118
119    pub fn schema(&self) -> &Schema {
120        &self.schema
121    }
122
123    /// Get term dictionary stats for debugging
124    pub fn term_dict_stats(&self) -> SSTableStats {
125        self.term_dict.stats()
126    }
127
128    /// Get posting list for a term (async - loads on demand)
129    ///
130    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
131    /// and no additional I/O is needed. For larger lists, reads from .post file.
132    pub async fn get_postings(
133        &self,
134        field: Field,
135        term: &[u8],
136    ) -> Result<Option<BlockPostingList>> {
137        log::debug!(
138            "SegmentReader::get_postings field={} term_len={}",
139            field.0,
140            term.len()
141        );
142
143        // Build key: field_id + term
144        let mut key = Vec::with_capacity(4 + term.len());
145        key.extend_from_slice(&field.0.to_le_bytes());
146        key.extend_from_slice(term);
147
148        // Look up in term dictionary
149        let term_info = match self.term_dict.get(&key).await? {
150            Some(info) => {
151                log::debug!("SegmentReader::get_postings found term_info");
152                info
153            }
154            None => {
155                log::debug!("SegmentReader::get_postings term not found");
156                return Ok(None);
157            }
158        };
159
160        // Check if posting list is inlined
161        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
162            // Build BlockPostingList from inline data (no I/O needed!)
163            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
164            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
165                posting_list.push(doc_id, tf);
166            }
167            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
168            return Ok(Some(block_list));
169        }
170
171        // External posting list - read from postings file handle (lazy - HTTP range request)
172        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
173            Error::Corruption("TermInfo has neither inline nor external data".to_string())
174        })?;
175
176        let start = posting_offset as usize;
177        let end = start + posting_len as usize;
178
179        if end > self.postings_handle.len() {
180            return Err(Error::Corruption(
181                "Posting offset out of bounds".to_string(),
182            ));
183        }
184
185        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
186        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
187
188        Ok(Some(block_list))
189    }
190
191    /// Get document by local doc_id (async - loads on demand)
192    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
193        self.store
194            .get(local_doc_id, &self.schema)
195            .await
196            .map_err(Error::from)
197    }
198
199    /// Prefetch term dictionary blocks for a key range
200    pub async fn prefetch_terms(
201        &self,
202        field: Field,
203        start_term: &[u8],
204        end_term: &[u8],
205    ) -> Result<()> {
206        let mut start_key = Vec::with_capacity(4 + start_term.len());
207        start_key.extend_from_slice(&field.0.to_le_bytes());
208        start_key.extend_from_slice(start_term);
209
210        let mut end_key = Vec::with_capacity(4 + end_term.len());
211        end_key.extend_from_slice(&field.0.to_le_bytes());
212        end_key.extend_from_slice(end_term);
213
214        self.term_dict.prefetch_range(&start_key, &end_key).await?;
215        Ok(())
216    }
217
218    /// Check if store uses dictionary compression (incompatible with raw merging)
219    pub fn store_has_dict(&self) -> bool {
220        self.store.has_dict()
221    }
222
223    /// Get raw store blocks for optimized merging
224    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
225        self.store.raw_blocks()
226    }
227
228    /// Get store data slice for raw block access
229    pub fn store_data_slice(&self) -> &LazyFileSlice {
230        self.store.data_slice()
231    }
232
233    /// Get all terms from this segment (for merge)
234    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
235        self.term_dict.all_entries().await.map_err(Error::from)
236    }
237
238    /// Read raw posting bytes at offset
239    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
240        let start = offset as usize;
241        let end = start + len as usize;
242        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
243        Ok(bytes.to_vec())
244    }
245
246    /// Search dense vectors using RaBitQ
247    ///
248    /// Returns (doc_id, distance) pairs sorted by distance (ascending).
249    /// The doc_ids are adjusted by doc_id_offset for this segment.
250    /// If mrl_dim is configured, the query vector is automatically trimmed.
251    pub fn search_dense_vector(
252        &self,
253        field: Field,
254        query: &[f32],
255        k: usize,
256        rerank_factor: usize,
257    ) -> Result<Vec<(DocId, f32)>> {
258        let index = self
259            .vector_indexes
260            .get(&field.0)
261            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
262
263        // Get mrl_dim from config to trim query vector if needed
264        let mrl_dim = self
265            .schema
266            .get_field_entry(field)
267            .and_then(|e| e.dense_vector_config.as_ref())
268            .and_then(|c| c.mrl_dim);
269
270        // Trim query vector if mrl_dim is set
271        let query_vec: Vec<f32>;
272        let effective_query = if let Some(trim_dim) = mrl_dim {
273            if trim_dim < query.len() {
274                query_vec = query[..trim_dim].to_vec();
275                query_vec.as_slice()
276            } else {
277                query
278            }
279        } else {
280            query
281        };
282
283        let results: Vec<(u32, f32)> = match index {
284            VectorIndex::RaBitQ(rabitq) => rabitq
285                .search(effective_query, k, rerank_factor)
286                .into_iter()
287                .map(|(idx, dist)| (idx as u32, dist))
288                .collect(),
289            VectorIndex::IVF { index, codebook } => {
290                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
291                    Error::Schema("IVF index requires coarse centroids".to_string())
292                })?;
293                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
294                index.search(centroids, codebook, effective_query, k, Some(nprobe))
295            }
296            VectorIndex::ScaNN { index, codebook } => {
297                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
298                    Error::Schema("ScaNN index requires coarse centroids".to_string())
299                })?;
300                let nprobe = rerank_factor.max(32);
301                index.search(centroids, codebook, effective_query, k, Some(nprobe))
302            }
303        };
304
305        // Adjust doc_ids by segment offset
306        Ok(results
307            .into_iter()
308            .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
309            .collect())
310    }
311
312    /// Check if this segment has a dense vector index for the given field
313    pub fn has_dense_vector_index(&self, field: Field) -> bool {
314        self.vector_indexes.contains_key(&field.0)
315    }
316
317    /// Get the dense vector index for a field (if available)
318    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
319        match self.vector_indexes.get(&field.0) {
320            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
321            _ => None,
322        }
323    }
324
325    /// Get the IVF vector index for a field (if available)
326    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
327        match self.vector_indexes.get(&field.0) {
328            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
329            _ => None,
330        }
331    }
332
333    /// Get the ScaNN vector index for a field (if available)
334    pub fn get_scann_vector_index(
335        &self,
336        field: Field,
337    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
338        match self.vector_indexes.get(&field.0) {
339            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
340            _ => None,
341        }
342    }
343
344    /// Get the vector index type for a field
345    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
346        self.vector_indexes.get(&field.0)
347    }
348
349    /// Search for similar sparse vectors using inverted index
350    ///
351    /// Sparse vectors are indexed as terms "dim_{index}" with the weight stored.
352    /// This method accumulates dot product scores across all non-zero dimensions.
353    ///
354    /// Returns (doc_id, score) pairs sorted by score descending.
355    pub async fn search_sparse_vector(
356        &self,
357        field: Field,
358        indices: &[u32],
359        weights: &[f32],
360        k: usize,
361    ) -> Result<Vec<(u32, f32)>> {
362        use rustc_hash::FxHashMap;
363
364        let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
365
366        // For each non-zero dimension, look up the posting list
367        for (&idx, &weight) in indices.iter().zip(weights.iter()) {
368            // Sparse vector dimensions are indexed as "dim_{index}"
369            let term = format!("dim_{}", idx);
370
371            if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
372                // Iterate through posting list and accumulate scores
373                // Term frequency represents the stored weight (quantized)
374                let mut iter = postings.iterator();
375                while iter.doc() != crate::TERMINATED {
376                    let doc_id = iter.doc();
377                    let tf = iter.term_freq();
378                    // Dequantize TF back to weight (stored as tf * 1000)
379                    let stored_weight = tf as f32 / 1000.0;
380                    *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
381                    iter.advance();
382                }
383            }
384        }
385
386        // Sort by score descending and take top k
387        let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
388        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
389        results.truncate(k);
390
391        Ok(results)
392    }
393
394    /// Load dense vector indexes from unified .vectors file
395    ///
396    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
397    /// Also loads coarse centroids and PQ codebook as needed.
398    async fn load_vectors_file<D: Directory>(
399        dir: &D,
400        files: &SegmentFiles,
401        schema: &Schema,
402    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
403        use byteorder::{LittleEndian, ReadBytesExt};
404        use std::io::Cursor;
405
406        let mut indexes = FxHashMap::default();
407        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
408
409        // Try to open vectors file (may not exist if no vectors were indexed)
410        let handle = match dir.open_lazy(&files.vectors).await {
411            Ok(h) => h,
412            Err(_) => return Ok((indexes, None)),
413        };
414
415        let bytes = match handle.read_bytes().await {
416            Ok(b) => b,
417            Err(_) => return Ok((indexes, None)),
418        };
419
420        if bytes.is_empty() {
421            return Ok((indexes, None));
422        }
423
424        let mut cursor = Cursor::new(bytes.as_slice());
425
426        // Read header
427        let num_fields = cursor.read_u32::<LittleEndian>()?;
428
429        // Read field entries (field_id, index_type, offset, length)
430        let mut entries = Vec::with_capacity(num_fields as usize);
431        for _ in 0..num_fields {
432            let field_id = cursor.read_u32::<LittleEndian>()?;
433            // Try to read index_type - if this fails, assume old format without type
434            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
435            let offset = cursor.read_u64::<LittleEndian>()?;
436            let length = cursor.read_u64::<LittleEndian>()?;
437            entries.push((field_id, index_type, offset as usize, length as usize));
438        }
439
440        // Load each index based on type
441        for (field_id, index_type, offset, length) in entries {
442            let data = &bytes.as_slice()[offset..offset + length];
443            let field = crate::dsl::Field(field_id);
444
445            match index_type {
446                2 => {
447                    // ScaNN (IVF-PQ)
448                    if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data) {
449                        // Load coarse centroids if not already loaded
450                        if coarse_centroids.is_none()
451                            && let Some(entry) = schema.get_field_entry(field)
452                            && let Some(ref config) = entry.dense_vector_config
453                            && let Some(ref path) = config.coarse_centroids_path
454                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
455                        {
456                            coarse_centroids = Some(Arc::new(c));
457                        }
458
459                        // Load PQ codebook
460                        if let Some(entry) = schema.get_field_entry(field)
461                            && let Some(ref config) = entry.dense_vector_config
462                            && let Some(ref path) = config.pq_codebook_path
463                            && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
464                        {
465                            indexes.insert(
466                                field_id,
467                                VectorIndex::ScaNN {
468                                    index: Arc::new(ivfpq_index),
469                                    codebook: Arc::new(codebook),
470                                },
471                            );
472                        }
473                    }
474                }
475                1 => {
476                    // IVF-RaBitQ
477                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
478                        if coarse_centroids.is_none()
479                            && let Some(entry) = schema.get_field_entry(field)
480                            && let Some(ref config) = entry.dense_vector_config
481                            && let Some(ref path) = config.coarse_centroids_path
482                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
483                        {
484                            coarse_centroids = Some(Arc::new(c));
485                        }
486                        // Create a codebook for the IVF index
487                        let codebook = Arc::new(RaBitQCodebook::new(
488                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
489                        ));
490                        indexes.insert(
491                            field_id,
492                            VectorIndex::IVF {
493                                index: Arc::new(ivf_index),
494                                codebook,
495                            },
496                        );
497                    }
498                }
499                0 => {
500                    // RaBitQ
501                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
502                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
503                    }
504                }
505                _ => {
506                    // Legacy format without type - try to auto-detect
507                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
508                        if coarse_centroids.is_none()
509                            && let Some(entry) = schema.get_field_entry(field)
510                            && let Some(ref config) = entry.dense_vector_config
511                            && let Some(ref path) = config.coarse_centroids_path
512                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
513                        {
514                            coarse_centroids = Some(Arc::new(c));
515                        }
516                        // Create a codebook for the IVF index
517                        let codebook = Arc::new(RaBitQCodebook::new(
518                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
519                        ));
520                        indexes.insert(
521                            field_id,
522                            VectorIndex::IVF {
523                                index: Arc::new(ivf_index),
524                                codebook,
525                            },
526                        );
527                    } else if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
528                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
529                    }
530                }
531            }
532        }
533
534        Ok((indexes, coarse_centroids))
535    }
536}
537
538/// Alias for AsyncSegmentReader
539pub type SegmentReader = AsyncSegmentReader;