hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
11    RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18/// Vector index type - RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
19#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22    /// RaBitQ - binary quantization, good for small datasets
23    RaBitQ(Arc<RaBitQIndex>),
24    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
25    IVF {
26        index: Arc<IVFRaBitQIndex>,
27        codebook: Arc<RaBitQCodebook>,
28    },
29    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
30    ScaNN {
31        index: Arc<IVFPQIndex>,
32        codebook: Arc<PQCodebook>,
33    },
34}
35
36/// Async segment reader with lazy loading
37///
38/// - Term dictionary: only index loaded, blocks loaded on-demand
39/// - Postings: loaded on-demand per term via HTTP range requests
40/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
41pub struct AsyncSegmentReader {
42    meta: SegmentMeta,
43    /// Term dictionary with lazy block loading
44    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
45    /// Postings file handle - fetches ranges on demand
46    postings_handle: LazyFileHandle,
47    /// Document store with lazy block loading
48    store: Arc<AsyncStoreReader>,
49    schema: Arc<Schema>,
50    /// Base doc_id offset for this segment
51    doc_id_offset: DocId,
52    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
53    vector_indexes: FxHashMap<u32, VectorIndex>,
54    /// Shared coarse centroids for IVF search (loaded once)
55    coarse_centroids: Option<Arc<CoarseCentroids>>,
56}
57
58impl AsyncSegmentReader {
59    /// Open a segment with lazy loading
60    pub async fn open<D: Directory>(
61        dir: &D,
62        segment_id: SegmentId,
63        schema: Arc<Schema>,
64        doc_id_offset: DocId,
65        cache_blocks: usize,
66    ) -> Result<Self> {
67        let files = SegmentFiles::new(segment_id.0);
68
69        // Read metadata (small, always loaded)
70        let meta_slice = dir.open_read(&files.meta).await?;
71        let meta_bytes = meta_slice.read_bytes().await?;
72        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
73        debug_assert_eq!(meta.id, segment_id.0);
74
75        // Open term dictionary with lazy loading (fetches ranges on demand)
76        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
77        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
78
79        // Get postings file handle (lazy - fetches ranges on demand)
80        let postings_handle = dir.open_lazy(&files.postings).await?;
81
82        // Open store with lazy loading
83        let store_handle = dir.open_lazy(&files.store).await?;
84        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
85
86        // Load dense vector indexes from unified .vectors file
87        let (vector_indexes, coarse_centroids) =
88            Self::load_vectors_file(dir, &files, &schema).await?;
89
90        Ok(Self {
91            meta,
92            term_dict: Arc::new(term_dict),
93            postings_handle,
94            store: Arc::new(store),
95            schema,
96            doc_id_offset,
97            vector_indexes,
98            coarse_centroids,
99        })
100    }
101
102    pub fn meta(&self) -> &SegmentMeta {
103        &self.meta
104    }
105
106    pub fn num_docs(&self) -> u32 {
107        self.meta.num_docs
108    }
109
110    /// Get average field length for BM25F scoring
111    pub fn avg_field_len(&self, field: Field) -> f32 {
112        self.meta.avg_field_len(field)
113    }
114
115    pub fn doc_id_offset(&self) -> DocId {
116        self.doc_id_offset
117    }
118
119    pub fn schema(&self) -> &Schema {
120        &self.schema
121    }
122
123    /// Get term dictionary stats for debugging
124    pub fn term_dict_stats(&self) -> SSTableStats {
125        self.term_dict.stats()
126    }
127
128    /// Get posting list for a term (async - loads on demand)
129    ///
130    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
131    /// and no additional I/O is needed. For larger lists, reads from .post file.
132    pub async fn get_postings(
133        &self,
134        field: Field,
135        term: &[u8],
136    ) -> Result<Option<BlockPostingList>> {
137        log::debug!(
138            "SegmentReader::get_postings field={} term_len={}",
139            field.0,
140            term.len()
141        );
142
143        // Build key: field_id + term
144        let mut key = Vec::with_capacity(4 + term.len());
145        key.extend_from_slice(&field.0.to_le_bytes());
146        key.extend_from_slice(term);
147
148        // Look up in term dictionary
149        let term_info = match self.term_dict.get(&key).await? {
150            Some(info) => {
151                log::debug!("SegmentReader::get_postings found term_info");
152                info
153            }
154            None => {
155                log::debug!("SegmentReader::get_postings term not found");
156                return Ok(None);
157            }
158        };
159
160        // Check if posting list is inlined
161        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
162            // Build BlockPostingList from inline data (no I/O needed!)
163            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
164            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
165                posting_list.push(doc_id, tf);
166            }
167            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
168            return Ok(Some(block_list));
169        }
170
171        // External posting list - read from postings file handle (lazy - HTTP range request)
172        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
173            Error::Corruption("TermInfo has neither inline nor external data".to_string())
174        })?;
175
176        let start = posting_offset;
177        let end = start + posting_len as u64;
178
179        if end > self.postings_handle.len() {
180            return Err(Error::Corruption(
181                "Posting offset out of bounds".to_string(),
182            ));
183        }
184
185        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
186        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
187
188        Ok(Some(block_list))
189    }
190
191    /// Get document by local doc_id (async - loads on demand)
192    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
193        self.store
194            .get(local_doc_id, &self.schema)
195            .await
196            .map_err(Error::from)
197    }
198
199    /// Prefetch term dictionary blocks for a key range
200    pub async fn prefetch_terms(
201        &self,
202        field: Field,
203        start_term: &[u8],
204        end_term: &[u8],
205    ) -> Result<()> {
206        let mut start_key = Vec::with_capacity(4 + start_term.len());
207        start_key.extend_from_slice(&field.0.to_le_bytes());
208        start_key.extend_from_slice(start_term);
209
210        let mut end_key = Vec::with_capacity(4 + end_term.len());
211        end_key.extend_from_slice(&field.0.to_le_bytes());
212        end_key.extend_from_slice(end_term);
213
214        self.term_dict.prefetch_range(&start_key, &end_key).await?;
215        Ok(())
216    }
217
218    /// Check if store uses dictionary compression (incompatible with raw merging)
219    pub fn store_has_dict(&self) -> bool {
220        self.store.has_dict()
221    }
222
223    /// Get raw store blocks for optimized merging
224    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
225        self.store.raw_blocks()
226    }
227
228    /// Get store data slice for raw block access
229    pub fn store_data_slice(&self) -> &LazyFileSlice {
230        self.store.data_slice()
231    }
232
233    /// Get all terms from this segment (for merge)
234    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
235        self.term_dict.all_entries().await.map_err(Error::from)
236    }
237
238    /// Get streaming iterator over term dictionary (for memory-efficient merge)
239    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
240        self.term_dict.iter()
241    }
242
243    /// Read raw posting bytes at offset
244    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
245        let start = offset;
246        let end = start + len as u64;
247        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
248        Ok(bytes.to_vec())
249    }
250
251    /// Search dense vectors using RaBitQ
252    ///
253    /// Returns (doc_id, distance) pairs sorted by distance (ascending).
254    /// The doc_ids are adjusted by doc_id_offset for this segment.
255    /// If mrl_dim is configured, the query vector is automatically trimmed.
256    pub fn search_dense_vector(
257        &self,
258        field: Field,
259        query: &[f32],
260        k: usize,
261        rerank_factor: usize,
262    ) -> Result<Vec<(DocId, f32)>> {
263        let index = self
264            .vector_indexes
265            .get(&field.0)
266            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
267
268        // Get mrl_dim from config to trim query vector if needed
269        let mrl_dim = self
270            .schema
271            .get_field_entry(field)
272            .and_then(|e| e.dense_vector_config.as_ref())
273            .and_then(|c| c.mrl_dim);
274
275        // Trim query vector if mrl_dim is set
276        let query_vec: Vec<f32>;
277        let effective_query = if let Some(trim_dim) = mrl_dim {
278            if trim_dim < query.len() {
279                query_vec = query[..trim_dim].to_vec();
280                query_vec.as_slice()
281            } else {
282                query
283            }
284        } else {
285            query
286        };
287
288        let results: Vec<(u32, f32)> = match index {
289            VectorIndex::RaBitQ(rabitq) => rabitq
290                .search(effective_query, k, rerank_factor)
291                .into_iter()
292                .map(|(idx, dist)| (idx as u32, dist))
293                .collect(),
294            VectorIndex::IVF { index, codebook } => {
295                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
296                    Error::Schema("IVF index requires coarse centroids".to_string())
297                })?;
298                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
299                index.search(centroids, codebook, effective_query, k, Some(nprobe))
300            }
301            VectorIndex::ScaNN { index, codebook } => {
302                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
303                    Error::Schema("ScaNN index requires coarse centroids".to_string())
304                })?;
305                let nprobe = rerank_factor.max(32);
306                index.search(centroids, codebook, effective_query, k, Some(nprobe))
307            }
308        };
309
310        // Adjust doc_ids by segment offset
311        Ok(results
312            .into_iter()
313            .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
314            .collect())
315    }
316
317    /// Check if this segment has a dense vector index for the given field
318    pub fn has_dense_vector_index(&self, field: Field) -> bool {
319        self.vector_indexes.contains_key(&field.0)
320    }
321
322    /// Get the dense vector index for a field (if available)
323    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
324        match self.vector_indexes.get(&field.0) {
325            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
326            _ => None,
327        }
328    }
329
330    /// Get the IVF vector index for a field (if available)
331    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
332        match self.vector_indexes.get(&field.0) {
333            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
334            _ => None,
335        }
336    }
337
338    /// Get the ScaNN vector index for a field (if available)
339    pub fn get_scann_vector_index(
340        &self,
341        field: Field,
342    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
343        match self.vector_indexes.get(&field.0) {
344            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
345            _ => None,
346        }
347    }
348
349    /// Get the vector index type for a field
350    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
351        self.vector_indexes.get(&field.0)
352    }
353
354    /// Search for similar sparse vectors using inverted index
355    ///
356    /// Sparse vectors are indexed as terms "dim_{index}" with the weight stored.
357    /// This method accumulates dot product scores across all non-zero dimensions.
358    ///
359    /// Returns (doc_id, score) pairs sorted by score descending.
360    pub async fn search_sparse_vector(
361        &self,
362        field: Field,
363        indices: &[u32],
364        weights: &[f32],
365        k: usize,
366    ) -> Result<Vec<(u32, f32)>> {
367        use rustc_hash::FxHashMap;
368
369        let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
370
371        // For each non-zero dimension, look up the posting list
372        for (&idx, &weight) in indices.iter().zip(weights.iter()) {
373            // Sparse vector dimensions are indexed as "dim_{index}"
374            let term = format!("dim_{}", idx);
375
376            if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
377                // Iterate through posting list and accumulate scores
378                // Term frequency represents the stored weight (quantized)
379                let mut iter = postings.iterator();
380                while iter.doc() != crate::TERMINATED {
381                    let doc_id = iter.doc();
382                    let tf = iter.term_freq();
383                    // Dequantize TF back to weight (stored as tf * 1000)
384                    let stored_weight = tf as f32 / 1000.0;
385                    *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
386                    iter.advance();
387                }
388            }
389        }
390
391        // Sort by score descending and take top k
392        let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
393        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
394        results.truncate(k);
395
396        Ok(results)
397    }
398
399    /// Load dense vector indexes from unified .vectors file
400    ///
401    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
402    /// Also loads coarse centroids and PQ codebook as needed.
403    ///
404    /// Memory optimization: Uses lazy range reads to load each index separately,
405    /// avoiding loading the entire vectors file into memory at once.
406    async fn load_vectors_file<D: Directory>(
407        dir: &D,
408        files: &SegmentFiles,
409        schema: &Schema,
410    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
411        use byteorder::{LittleEndian, ReadBytesExt};
412        use std::io::Cursor;
413
414        let mut indexes = FxHashMap::default();
415        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
416
417        // Skip loading vectors file if schema has no dense vector fields
418        let has_dense_vectors = schema
419            .fields()
420            .any(|(_, entry)| entry.dense_vector_config.is_some());
421        if !has_dense_vectors {
422            return Ok((indexes, None));
423        }
424
425        // Try to open vectors file (may not exist if no vectors were indexed)
426        let handle = match dir.open_lazy(&files.vectors).await {
427            Ok(h) => h,
428            Err(_) => return Ok((indexes, None)),
429        };
430
431        // Read only the header first (4 bytes for num_fields)
432        let header_bytes = match handle.read_bytes_range(0..4).await {
433            Ok(b) => b,
434            Err(_) => return Ok((indexes, None)),
435        };
436
437        if header_bytes.is_empty() {
438            return Ok((indexes, None));
439        }
440
441        let mut cursor = Cursor::new(header_bytes.as_slice());
442        let num_fields = cursor.read_u32::<LittleEndian>()?;
443
444        if num_fields == 0 {
445            return Ok((indexes, None));
446        }
447
448        // Read field entries header: (field_id: 4, index_type: 1, offset: 8, length: 8) = 21 bytes per field
449        let entries_size = num_fields as u64 * 21;
450        let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
451        let mut cursor = Cursor::new(entries_bytes.as_slice());
452
453        // Read field entries (field_id, index_type, offset, length)
454        let mut entries = Vec::with_capacity(num_fields as usize);
455        for _ in 0..num_fields {
456            let field_id = cursor.read_u32::<LittleEndian>()?;
457            // Try to read index_type - if this fails, assume old format without type
458            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
459            let offset = cursor.read_u64::<LittleEndian>()?;
460            let length = cursor.read_u64::<LittleEndian>()?;
461            entries.push((field_id, index_type, offset, length));
462        }
463
464        // Load each index on-demand using range reads (memory efficient)
465        for (field_id, index_type, offset, length) in entries {
466            // Read only this index's data
467            let data = handle.read_bytes_range(offset..offset + length).await?;
468            let field = crate::dsl::Field(field_id);
469
470            match index_type {
471                2 => {
472                    // ScaNN (IVF-PQ)
473                    if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
474                        // Load coarse centroids if not already loaded
475                        if coarse_centroids.is_none()
476                            && let Some(entry) = schema.get_field_entry(field)
477                            && let Some(ref config) = entry.dense_vector_config
478                            && let Some(ref path) = config.coarse_centroids_path
479                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
480                        {
481                            coarse_centroids = Some(Arc::new(c));
482                        }
483
484                        // Load PQ codebook
485                        if let Some(entry) = schema.get_field_entry(field)
486                            && let Some(ref config) = entry.dense_vector_config
487                            && let Some(ref path) = config.pq_codebook_path
488                            && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
489                        {
490                            indexes.insert(
491                                field_id,
492                                VectorIndex::ScaNN {
493                                    index: Arc::new(ivfpq_index),
494                                    codebook: Arc::new(codebook),
495                                },
496                            );
497                        }
498                    }
499                }
500                1 => {
501                    // IVF-RaBitQ
502                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
503                    {
504                        if coarse_centroids.is_none()
505                            && let Some(entry) = schema.get_field_entry(field)
506                            && let Some(ref config) = entry.dense_vector_config
507                            && let Some(ref path) = config.coarse_centroids_path
508                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
509                        {
510                            coarse_centroids = Some(Arc::new(c));
511                        }
512                        // Create a codebook for the IVF index
513                        let codebook = Arc::new(RaBitQCodebook::new(
514                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
515                        ));
516                        indexes.insert(
517                            field_id,
518                            VectorIndex::IVF {
519                                index: Arc::new(ivf_index),
520                                codebook,
521                            },
522                        );
523                    }
524                }
525                0 => {
526                    // RaBitQ
527                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
528                    {
529                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
530                    }
531                }
532                _ => {
533                    // Legacy format without type - try to auto-detect
534                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
535                    {
536                        if coarse_centroids.is_none()
537                            && let Some(entry) = schema.get_field_entry(field)
538                            && let Some(ref config) = entry.dense_vector_config
539                            && let Some(ref path) = config.coarse_centroids_path
540                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
541                        {
542                            coarse_centroids = Some(Arc::new(c));
543                        }
544                        // Create a codebook for the IVF index
545                        let codebook = Arc::new(RaBitQCodebook::new(
546                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
547                        ));
548                        indexes.insert(
549                            field_id,
550                            VectorIndex::IVF {
551                                index: Arc::new(ivf_index),
552                                codebook,
553                            },
554                        );
555                    } else if let Ok(rabitq_index) =
556                        serde_json::from_slice::<RaBitQIndex>(data.as_slice())
557                    {
558                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
559                    }
560                }
561            }
562        }
563
564        Ok((indexes, coarse_centroids))
565    }
566}
567
568/// Alias for AsyncSegmentReader
569pub type SegmentReader = AsyncSegmentReader;