hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFRaBitQIndex, RaBitQIndex,
11    SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18/// Vector index type - either RaBitQ or IVF-RaBitQ
19#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22    RaBitQ(Arc<RaBitQIndex>),
23    IVF(Arc<IVFRaBitQIndex>),
24}
25
26/// Async segment reader with lazy loading
27///
28/// - Term dictionary: only index loaded, blocks loaded on-demand
29/// - Postings: loaded on-demand per term via HTTP range requests
30/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
31pub struct AsyncSegmentReader {
32    meta: SegmentMeta,
33    /// Term dictionary with lazy block loading
34    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
35    /// Postings file handle - fetches ranges on demand
36    postings_handle: LazyFileHandle,
37    /// Document store with lazy block loading
38    store: Arc<AsyncStoreReader>,
39    schema: Arc<Schema>,
40    /// Base doc_id offset for this segment
41    doc_id_offset: DocId,
42    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
43    vector_indexes: FxHashMap<u32, VectorIndex>,
44    /// Shared coarse centroids for IVF search (loaded once)
45    coarse_centroids: Option<Arc<CoarseCentroids>>,
46}
47
48impl AsyncSegmentReader {
49    /// Open a segment with lazy loading
50    pub async fn open<D: Directory>(
51        dir: &D,
52        segment_id: SegmentId,
53        schema: Arc<Schema>,
54        doc_id_offset: DocId,
55        cache_blocks: usize,
56    ) -> Result<Self> {
57        let files = SegmentFiles::new(segment_id.0);
58
59        // Read metadata (small, always loaded)
60        let meta_slice = dir.open_read(&files.meta).await?;
61        let meta_bytes = meta_slice.read_bytes().await?;
62        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
63        debug_assert_eq!(meta.id, segment_id.0);
64
65        // Open term dictionary with lazy loading (fetches ranges on demand)
66        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
67        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
68
69        // Get postings file handle (lazy - fetches ranges on demand)
70        let postings_handle = dir.open_lazy(&files.postings).await?;
71
72        // Open store with lazy loading
73        let store_handle = dir.open_lazy(&files.store).await?;
74        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
75
76        // Load dense vector indexes from unified .vectors file
77        let (vector_indexes, coarse_centroids) =
78            Self::load_vectors_file(dir, &files, &schema).await?;
79
80        Ok(Self {
81            meta,
82            term_dict: Arc::new(term_dict),
83            postings_handle,
84            store: Arc::new(store),
85            schema,
86            doc_id_offset,
87            vector_indexes,
88            coarse_centroids,
89        })
90    }
91
92    pub fn meta(&self) -> &SegmentMeta {
93        &self.meta
94    }
95
96    pub fn num_docs(&self) -> u32 {
97        self.meta.num_docs
98    }
99
100    /// Get average field length for BM25F scoring
101    pub fn avg_field_len(&self, field: Field) -> f32 {
102        self.meta.avg_field_len(field)
103    }
104
105    pub fn doc_id_offset(&self) -> DocId {
106        self.doc_id_offset
107    }
108
109    pub fn schema(&self) -> &Schema {
110        &self.schema
111    }
112
113    /// Get term dictionary stats for debugging
114    pub fn term_dict_stats(&self) -> SSTableStats {
115        self.term_dict.stats()
116    }
117
118    /// Get posting list for a term (async - loads on demand)
119    ///
120    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
121    /// and no additional I/O is needed. For larger lists, reads from .post file.
122    pub async fn get_postings(
123        &self,
124        field: Field,
125        term: &[u8],
126    ) -> Result<Option<BlockPostingList>> {
127        log::debug!(
128            "SegmentReader::get_postings field={} term_len={}",
129            field.0,
130            term.len()
131        );
132
133        // Build key: field_id + term
134        let mut key = Vec::with_capacity(4 + term.len());
135        key.extend_from_slice(&field.0.to_le_bytes());
136        key.extend_from_slice(term);
137
138        // Look up in term dictionary
139        let term_info = match self.term_dict.get(&key).await? {
140            Some(info) => {
141                log::debug!("SegmentReader::get_postings found term_info");
142                info
143            }
144            None => {
145                log::debug!("SegmentReader::get_postings term not found");
146                return Ok(None);
147            }
148        };
149
150        // Check if posting list is inlined
151        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
152            // Build BlockPostingList from inline data (no I/O needed!)
153            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
154            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
155                posting_list.push(doc_id, tf);
156            }
157            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
158            return Ok(Some(block_list));
159        }
160
161        // External posting list - read from postings file handle (lazy - HTTP range request)
162        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
163            Error::Corruption("TermInfo has neither inline nor external data".to_string())
164        })?;
165
166        let start = posting_offset as usize;
167        let end = start + posting_len as usize;
168
169        if end > self.postings_handle.len() {
170            return Err(Error::Corruption(
171                "Posting offset out of bounds".to_string(),
172            ));
173        }
174
175        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
176        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
177
178        Ok(Some(block_list))
179    }
180
181    /// Get document by local doc_id (async - loads on demand)
182    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
183        self.store
184            .get(local_doc_id, &self.schema)
185            .await
186            .map_err(Error::from)
187    }
188
189    /// Prefetch term dictionary blocks for a key range
190    pub async fn prefetch_terms(
191        &self,
192        field: Field,
193        start_term: &[u8],
194        end_term: &[u8],
195    ) -> Result<()> {
196        let mut start_key = Vec::with_capacity(4 + start_term.len());
197        start_key.extend_from_slice(&field.0.to_le_bytes());
198        start_key.extend_from_slice(start_term);
199
200        let mut end_key = Vec::with_capacity(4 + end_term.len());
201        end_key.extend_from_slice(&field.0.to_le_bytes());
202        end_key.extend_from_slice(end_term);
203
204        self.term_dict.prefetch_range(&start_key, &end_key).await?;
205        Ok(())
206    }
207
208    /// Check if store uses dictionary compression (incompatible with raw merging)
209    pub fn store_has_dict(&self) -> bool {
210        self.store.has_dict()
211    }
212
213    /// Get raw store blocks for optimized merging
214    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
215        self.store.raw_blocks()
216    }
217
218    /// Get store data slice for raw block access
219    pub fn store_data_slice(&self) -> &LazyFileSlice {
220        self.store.data_slice()
221    }
222
223    /// Get all terms from this segment (for merge)
224    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
225        self.term_dict.all_entries().await.map_err(Error::from)
226    }
227
228    /// Read raw posting bytes at offset
229    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
230        let start = offset as usize;
231        let end = start + len as usize;
232        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
233        Ok(bytes.to_vec())
234    }
235
236    /// Search dense vectors using RaBitQ
237    ///
238    /// Returns (doc_id, distance) pairs sorted by distance (ascending).
239    /// The doc_ids are adjusted by doc_id_offset for this segment.
240    pub fn search_dense_vector(
241        &self,
242        field: Field,
243        query: &[f32],
244        k: usize,
245        rerank_factor: usize,
246    ) -> Result<Vec<(DocId, f32)>> {
247        let index = self
248            .vector_indexes
249            .get(&field.0)
250            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
251
252        let results: Vec<(u32, f32)> = match index {
253            VectorIndex::RaBitQ(rabitq) => rabitq
254                .search(query, k, rerank_factor)
255                .into_iter()
256                .map(|(idx, dist)| (idx as u32, dist))
257                .collect(),
258            VectorIndex::IVF(ivf) => {
259                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
260                    Error::Schema("IVF index requires coarse centroids".to_string())
261                })?;
262                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
263                ivf.search(centroids, query, k, nprobe)
264            }
265        };
266
267        // Adjust doc_ids by segment offset
268        Ok(results
269            .into_iter()
270            .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
271            .collect())
272    }
273
274    /// Check if this segment has a dense vector index for the given field
275    pub fn has_dense_vector_index(&self, field: Field) -> bool {
276        self.vector_indexes.contains_key(&field.0)
277    }
278
279    /// Get the dense vector index for a field (if available)
280    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
281        match self.vector_indexes.get(&field.0) {
282            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
283            _ => None,
284        }
285    }
286
287    /// Get the IVF vector index for a field (if available)
288    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
289        match self.vector_indexes.get(&field.0) {
290            Some(VectorIndex::IVF(idx)) => Some(idx.clone()),
291            _ => None,
292        }
293    }
294
295    /// Get the vector index type for a field
296    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
297        self.vector_indexes.get(&field.0)
298    }
299
300    /// Search for similar sparse vectors using inverted index
301    ///
302    /// Sparse vectors are indexed as terms "dim_{index}" with the weight stored.
303    /// This method accumulates dot product scores across all non-zero dimensions.
304    ///
305    /// Returns (doc_id, score) pairs sorted by score descending.
306    pub async fn search_sparse_vector(
307        &self,
308        field: Field,
309        indices: &[u32],
310        weights: &[f32],
311        k: usize,
312    ) -> Result<Vec<(u32, f32)>> {
313        use rustc_hash::FxHashMap;
314
315        let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
316
317        // For each non-zero dimension, look up the posting list
318        for (&idx, &weight) in indices.iter().zip(weights.iter()) {
319            // Sparse vector dimensions are indexed as "dim_{index}"
320            let term = format!("dim_{}", idx);
321
322            if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
323                // Iterate through posting list and accumulate scores
324                // Term frequency represents the stored weight (quantized)
325                let mut iter = postings.iterator();
326                while iter.doc() != crate::TERMINATED {
327                    let doc_id = iter.doc();
328                    let tf = iter.term_freq();
329                    // Dequantize TF back to weight (stored as tf * 1000)
330                    let stored_weight = tf as f32 / 1000.0;
331                    *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
332                    iter.advance();
333                }
334            }
335        }
336
337        // Sort by score descending and take top k
338        let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
339        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
340        results.truncate(k);
341
342        Ok(results)
343    }
344
345    /// Load dense vector indexes from unified .vectors file
346    ///
347    /// Tries to deserialize as IVF-RaBitQ first, falls back to RaBitQ.
348    /// Also loads coarse centroids if any IVF index is found.
349    async fn load_vectors_file<D: Directory>(
350        dir: &D,
351        files: &SegmentFiles,
352        schema: &Schema,
353    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
354        use byteorder::{LittleEndian, ReadBytesExt};
355        use std::io::Cursor;
356
357        let mut indexes = FxHashMap::default();
358        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
359
360        // Try to open vectors file (may not exist if no vectors were indexed)
361        let handle = match dir.open_lazy(&files.vectors).await {
362            Ok(h) => h,
363            Err(_) => return Ok((indexes, None)),
364        };
365
366        let bytes = match handle.read_bytes().await {
367            Ok(b) => b,
368            Err(_) => return Ok((indexes, None)),
369        };
370
371        if bytes.is_empty() {
372            return Ok((indexes, None));
373        }
374
375        let mut cursor = Cursor::new(bytes.as_slice());
376
377        // Read header
378        let num_fields = cursor.read_u32::<LittleEndian>()?;
379
380        // Read field entries
381        let mut entries = Vec::with_capacity(num_fields as usize);
382        for _ in 0..num_fields {
383            let field_id = cursor.read_u32::<LittleEndian>()?;
384            let offset = cursor.read_u64::<LittleEndian>()?;
385            let length = cursor.read_u64::<LittleEndian>()?;
386            entries.push((field_id, offset as usize, length as usize));
387        }
388
389        // Load each index - try IVF first, fall back to RaBitQ
390        for (field_id, offset, length) in entries {
391            let data = &bytes.as_slice()[offset..offset + length];
392
393            // Try IVF-RaBitQ first
394            if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
395                // Load coarse centroids if not already loaded
396                if coarse_centroids.is_none() {
397                    let field = crate::dsl::Field(field_id);
398                    if let Some(entry) = schema.get_field_entry(field)
399                        && let Some(ref config) = entry.dense_vector_config
400                        && let Some(ref path) = config.coarse_centroids_path
401                        && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
402                    {
403                        coarse_centroids = Some(Arc::new(c));
404                    }
405                }
406                indexes.insert(field_id, VectorIndex::IVF(Arc::new(ivf_index)));
407            } else if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
408                indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
409            }
410        }
411
412        Ok((indexes, coarse_centroids))
413    }
414}
415
416/// Alias for AsyncSegmentReader
417pub type SegmentReader = AsyncSegmentReader;