Skip to main content

hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11    IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18use super::builder::FlatVectorData;
19
20/// Vector index type - Flat, RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
21#[derive(Clone)]
22#[allow(clippy::upper_case_acronyms)]
23pub enum VectorIndex {
24    /// Flat - brute-force search over raw vectors (accumulating state)
25    Flat(Arc<FlatVectorData>),
26    /// RaBitQ - binary quantization, good for small datasets
27    RaBitQ(Arc<RaBitQIndex>),
28    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
29    IVF {
30        index: Arc<IVFRaBitQIndex>,
31        codebook: Arc<RaBitQCodebook>,
32    },
33    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
34    ScaNN {
35        index: Arc<IVFPQIndex>,
36        codebook: Arc<PQCodebook>,
37    },
38}
39
40/// Sparse vector index for a field: direct-indexed by dimension ID
41#[derive(Clone)]
42pub struct SparseIndex {
43    /// Posting lists indexed directly by dimension ID (O(1) lookup)
44    /// None means dimension not present in index
45    pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
46    /// Total document count in this segment (for IDF computation)
47    pub total_docs: u32,
48}
49
50impl SparseIndex {
51    /// Compute IDF (inverse document frequency) for a dimension
52    ///
53    /// IDF = log(N / df) where N = total docs, df = docs containing dimension
54    /// Returns 0.0 if dimension not present
55    #[inline]
56    pub fn idf(&self, dim_id: u32) -> f32 {
57        if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
58            let df = pl.doc_count() as f32;
59            if df > 0.0 {
60                (self.total_docs as f32 / df).ln()
61            } else {
62                0.0
63            }
64        } else {
65            0.0
66        }
67    }
68
69    /// Get IDF weights for multiple dimensions
70    pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
71        dim_ids.iter().map(|&d| self.idf(d)).collect()
72    }
73}
74
75/// Async segment reader with lazy loading
76///
77/// - Term dictionary: only index loaded, blocks loaded on-demand
78/// - Postings: loaded on-demand per term via HTTP range requests
79/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
80pub struct AsyncSegmentReader {
81    meta: SegmentMeta,
82    /// Term dictionary with lazy block loading
83    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
84    /// Postings file handle - fetches ranges on demand
85    postings_handle: LazyFileHandle,
86    /// Document store with lazy block loading
87    store: Arc<AsyncStoreReader>,
88    schema: Arc<Schema>,
89    /// Base doc_id offset for this segment
90    doc_id_offset: DocId,
91    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
92    vector_indexes: FxHashMap<u32, VectorIndex>,
93    /// Shared coarse centroids for IVF search (loaded once)
94    coarse_centroids: Option<Arc<CoarseCentroids>>,
95    /// Sparse vector indexes per field
96    sparse_indexes: FxHashMap<u32, SparseIndex>,
97    /// Position file handle for phrase queries (lazy loading)
98    positions_handle: Option<LazyFileHandle>,
99}
100
101impl AsyncSegmentReader {
102    /// Open a segment with lazy loading
103    pub async fn open<D: Directory>(
104        dir: &D,
105        segment_id: SegmentId,
106        schema: Arc<Schema>,
107        doc_id_offset: DocId,
108        cache_blocks: usize,
109    ) -> Result<Self> {
110        let files = SegmentFiles::new(segment_id.0);
111
112        // Read metadata (small, always loaded)
113        let meta_slice = dir.open_read(&files.meta).await?;
114        let meta_bytes = meta_slice.read_bytes().await?;
115        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
116        debug_assert_eq!(meta.id, segment_id.0);
117
118        // Open term dictionary with lazy loading (fetches ranges on demand)
119        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
120        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
121
122        // Get postings file handle (lazy - fetches ranges on demand)
123        let postings_handle = dir.open_lazy(&files.postings).await?;
124
125        // Open store with lazy loading
126        let store_handle = dir.open_lazy(&files.store).await?;
127        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
128
129        // Load dense vector indexes from unified .vectors file
130        let (vector_indexes, coarse_centroids) =
131            Self::load_vectors_file(dir, &files, &schema).await?;
132
133        // Load sparse vector indexes from .sparse file
134        let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
135
136        // Open positions file handle (if exists) - offsets are now in TermInfo
137        let positions_handle = Self::open_positions_file(dir, &files).await?;
138
139        Ok(Self {
140            meta,
141            term_dict: Arc::new(term_dict),
142            postings_handle,
143            store: Arc::new(store),
144            schema,
145            doc_id_offset,
146            vector_indexes,
147            coarse_centroids,
148            sparse_indexes,
149            positions_handle,
150        })
151    }
152
153    pub fn meta(&self) -> &SegmentMeta {
154        &self.meta
155    }
156
157    pub fn num_docs(&self) -> u32 {
158        self.meta.num_docs
159    }
160
161    /// Get average field length for BM25F scoring
162    pub fn avg_field_len(&self, field: Field) -> f32 {
163        self.meta.avg_field_len(field)
164    }
165
166    pub fn doc_id_offset(&self) -> DocId {
167        self.doc_id_offset
168    }
169
170    pub fn schema(&self) -> &Schema {
171        &self.schema
172    }
173
174    /// Get sparse indexes for all fields
175    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
176        &self.sparse_indexes
177    }
178
179    /// Get vector indexes for all fields
180    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
181        &self.vector_indexes
182    }
183
184    /// Get term dictionary stats for debugging
185    pub fn term_dict_stats(&self) -> SSTableStats {
186        self.term_dict.stats()
187    }
188
189    /// Get posting list for a term (async - loads on demand)
190    ///
191    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
192    /// and no additional I/O is needed. For larger lists, reads from .post file.
193    pub async fn get_postings(
194        &self,
195        field: Field,
196        term: &[u8],
197    ) -> Result<Option<BlockPostingList>> {
198        log::debug!(
199            "SegmentReader::get_postings field={} term_len={}",
200            field.0,
201            term.len()
202        );
203
204        // Build key: field_id + term
205        let mut key = Vec::with_capacity(4 + term.len());
206        key.extend_from_slice(&field.0.to_le_bytes());
207        key.extend_from_slice(term);
208
209        // Look up in term dictionary
210        let term_info = match self.term_dict.get(&key).await? {
211            Some(info) => {
212                log::debug!("SegmentReader::get_postings found term_info");
213                info
214            }
215            None => {
216                log::debug!("SegmentReader::get_postings term not found");
217                return Ok(None);
218            }
219        };
220
221        // Check if posting list is inlined
222        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
223            // Build BlockPostingList from inline data (no I/O needed!)
224            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
225            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
226                posting_list.push(doc_id, tf);
227            }
228            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
229            return Ok(Some(block_list));
230        }
231
232        // External posting list - read from postings file handle (lazy - HTTP range request)
233        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
234            Error::Corruption("TermInfo has neither inline nor external data".to_string())
235        })?;
236
237        let start = posting_offset;
238        let end = start + posting_len as u64;
239
240        if end > self.postings_handle.len() {
241            return Err(Error::Corruption(
242                "Posting offset out of bounds".to_string(),
243            ));
244        }
245
246        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
247        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
248
249        Ok(Some(block_list))
250    }
251
252    /// Get document by local doc_id (async - loads on demand)
253    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
254        self.store
255            .get(local_doc_id, &self.schema)
256            .await
257            .map_err(Error::from)
258    }
259
260    /// Prefetch term dictionary blocks for a key range
261    pub async fn prefetch_terms(
262        &self,
263        field: Field,
264        start_term: &[u8],
265        end_term: &[u8],
266    ) -> Result<()> {
267        let mut start_key = Vec::with_capacity(4 + start_term.len());
268        start_key.extend_from_slice(&field.0.to_le_bytes());
269        start_key.extend_from_slice(start_term);
270
271        let mut end_key = Vec::with_capacity(4 + end_term.len());
272        end_key.extend_from_slice(&field.0.to_le_bytes());
273        end_key.extend_from_slice(end_term);
274
275        self.term_dict.prefetch_range(&start_key, &end_key).await?;
276        Ok(())
277    }
278
279    /// Check if store uses dictionary compression (incompatible with raw merging)
280    pub fn store_has_dict(&self) -> bool {
281        self.store.has_dict()
282    }
283
284    /// Get raw store blocks for optimized merging
285    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
286        self.store.raw_blocks()
287    }
288
289    /// Get store data slice for raw block access
290    pub fn store_data_slice(&self) -> &LazyFileSlice {
291        self.store.data_slice()
292    }
293
294    /// Get all terms from this segment (for merge)
295    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
296        self.term_dict.all_entries().await.map_err(Error::from)
297    }
298
299    /// Get all terms with parsed field and term string (for statistics aggregation)
300    ///
301    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
302    /// Skips terms that aren't valid UTF-8.
303    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
304        let entries = self.term_dict.all_entries().await?;
305        let mut result = Vec::with_capacity(entries.len());
306
307        for (key, term_info) in entries {
308            // Key format: field_id (4 bytes little-endian) + term bytes
309            if key.len() > 4 {
310                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
311                let term_bytes = &key[4..];
312                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
313                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
314                }
315            }
316        }
317
318        Ok(result)
319    }
320
321    /// Get streaming iterator over term dictionary (for memory-efficient merge)
322    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
323        self.term_dict.iter()
324    }
325
326    /// Read raw posting bytes at offset
327    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
328        let start = offset;
329        let end = start + len as u64;
330        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
331        Ok(bytes.to_vec())
332    }
333
334    /// Search dense vectors using RaBitQ
335    ///
336    /// Returns (doc_id, score) pairs sorted by score (descending).
337    /// The doc_ids are adjusted by doc_id_offset for this segment.
338    /// If mrl_dim is configured, the query vector is automatically trimmed.
339    /// For multi-valued documents, scores are combined using the specified combiner.
340    pub fn search_dense_vector(
341        &self,
342        field: Field,
343        query: &[f32],
344        k: usize,
345        rerank_factor: usize,
346        combiner: crate::query::MultiValueCombiner,
347    ) -> Result<Vec<(DocId, f32)>> {
348        use crate::query::MultiValueCombiner;
349        let index = self
350            .vector_indexes
351            .get(&field.0)
352            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
353
354        // Get mrl_dim from config to trim query vector if needed
355        let mrl_dim = self
356            .schema
357            .get_field_entry(field)
358            .and_then(|e| e.dense_vector_config.as_ref())
359            .and_then(|c| c.mrl_dim);
360
361        // Trim query vector if mrl_dim is set
362        let query_vec: Vec<f32>;
363        let effective_query = if let Some(trim_dim) = mrl_dim {
364            if trim_dim < query.len() {
365                query_vec = query[..trim_dim].to_vec();
366                query_vec.as_slice()
367            } else {
368                query
369            }
370        } else {
371            query
372        };
373
374        let results: Vec<(u32, f32)> = match index {
375            VectorIndex::Flat(flat_data) => {
376                // Brute-force search over raw vectors using SIMD-accelerated distance
377                use crate::structures::simd::squared_euclidean_distance;
378
379                let mut candidates: Vec<(u32, f32)> = flat_data
380                    .vectors
381                    .iter()
382                    .zip(flat_data.doc_ids.iter())
383                    .map(|(vec, &doc_id)| {
384                        let dist = squared_euclidean_distance(effective_query, vec);
385                        (doc_id, dist)
386                    })
387                    .collect();
388                candidates
389                    .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
390                candidates.truncate(k);
391                candidates
392            }
393            VectorIndex::RaBitQ(rabitq) => rabitq
394                .search(effective_query, k, rerank_factor)
395                .into_iter()
396                .map(|(idx, dist)| (idx as u32, dist))
397                .collect(),
398            VectorIndex::IVF { index, codebook } => {
399                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
400                    Error::Schema("IVF index requires coarse centroids".to_string())
401                })?;
402                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
403                index.search(centroids, codebook, effective_query, k, Some(nprobe))
404            }
405            VectorIndex::ScaNN { index, codebook } => {
406                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
407                    Error::Schema("ScaNN index requires coarse centroids".to_string())
408                })?;
409                let nprobe = rerank_factor.max(32);
410                index.search(centroids, codebook, effective_query, k, Some(nprobe))
411            }
412        };
413
414        // Convert distance to score (smaller distance = higher score)
415        // and adjust doc_ids by segment offset
416        let raw_results: Vec<(DocId, f32)> = results
417            .into_iter()
418            .map(|(idx, dist)| {
419                let doc_id = idx as DocId + self.doc_id_offset;
420                let score = 1.0 / (1.0 + dist); // Convert distance to similarity score
421                (doc_id, score)
422            })
423            .collect();
424
425        // Combine scores for duplicate doc_ids (multi-valued documents)
426        let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
427            rustc_hash::FxHashMap::default();
428        for (doc_id, score) in raw_results {
429            combined
430                .entry(doc_id)
431                .and_modify(|(acc_score, count)| match combiner {
432                    MultiValueCombiner::Sum => *acc_score += score,
433                    MultiValueCombiner::Max => *acc_score = acc_score.max(score),
434                    MultiValueCombiner::Avg => {
435                        *acc_score += score;
436                        *count += 1;
437                    }
438                })
439                .or_insert((score, 1));
440        }
441
442        // Finalize averages and collect results
443        let mut final_results: Vec<(DocId, f32)> = combined
444            .into_iter()
445            .map(|(doc_id, (score, count))| {
446                let final_score = if combiner == MultiValueCombiner::Avg {
447                    score / count as f32
448                } else {
449                    score
450                };
451                (doc_id, final_score)
452            })
453            .collect();
454
455        // Sort by score descending and take top k
456        final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
457        final_results.truncate(k);
458
459        Ok(final_results)
460    }
461
462    /// Check if this segment has a dense vector index for the given field
463    pub fn has_dense_vector_index(&self, field: Field) -> bool {
464        self.vector_indexes.contains_key(&field.0)
465    }
466
467    /// Get the dense vector index for a field (if available)
468    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
469        match self.vector_indexes.get(&field.0) {
470            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
471            _ => None,
472        }
473    }
474
475    /// Get the IVF vector index for a field (if available)
476    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
477        match self.vector_indexes.get(&field.0) {
478            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
479            _ => None,
480        }
481    }
482
483    /// Get the ScaNN vector index for a field (if available)
484    pub fn get_scann_vector_index(
485        &self,
486        field: Field,
487    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
488        match self.vector_indexes.get(&field.0) {
489            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
490            _ => None,
491        }
492    }
493
494    /// Get the vector index type for a field
495    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
496        self.vector_indexes.get(&field.0)
497    }
498
499    /// Search for similar sparse vectors using dedicated sparse posting lists
500    ///
501    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
502    /// Optimizations (via WandExecutor):
503    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
504    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
505    /// 3. **Top-K heap**: Efficient score collection
506    ///
507    /// Returns (doc_id, score) pairs sorted by score descending.
508    pub async fn search_sparse_vector(
509        &self,
510        field: Field,
511        vector: &[(u32, f32)],
512        limit: usize,
513        combiner: crate::query::MultiValueCombiner,
514    ) -> Result<Vec<(u32, f32)>> {
515        use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
516
517        // Get sparse index for this field
518        let sparse_index = match self.sparse_indexes.get(&field.0) {
519            Some(idx) => idx,
520            None => return Ok(Vec::new()),
521        };
522
523        // Build scorers for each dimension that exists in the index
524        let scorers: Vec<SparseTermScorer> = vector
525            .iter()
526            .filter_map(|&(dim_id, query_weight)| {
527                // Direct indexing: O(1) lookup
528                sparse_index
529                    .postings
530                    .get(dim_id as usize)
531                    .and_then(|opt| opt.as_ref())
532                    .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
533            })
534            .collect();
535
536        if scorers.is_empty() {
537            return Ok(Vec::new());
538        }
539
540        // Use shared WandExecutor for top-k retrieval
541        // Note: For multi-valued fields, same doc_id may appear multiple times
542        // with different scores that need to be combined
543        let raw_results = WandExecutor::new(scorers, limit * 2).execute(); // Over-fetch for combining
544
545        // Combine scores for duplicate doc_ids based on combiner strategy
546        let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
547        for r in raw_results {
548            combined
549                .entry(r.doc_id)
550                .and_modify(|(score, count)| match combiner {
551                    MultiValueCombiner::Sum => *score += r.score,
552                    MultiValueCombiner::Max => *score = score.max(r.score),
553                    MultiValueCombiner::Avg => {
554                        *score += r.score;
555                        *count += 1;
556                    }
557                })
558                .or_insert((r.score, 1));
559        }
560
561        // Finalize averages and collect results
562        let mut results: Vec<(u32, f32)> = combined
563            .into_iter()
564            .map(|(doc_id, (score, count))| {
565                let final_score = if combiner == MultiValueCombiner::Avg {
566                    score / count as f32
567                } else {
568                    score
569                };
570                (doc_id, final_score)
571            })
572            .collect();
573
574        // Sort by score descending and take top limit
575        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
576        results.truncate(limit);
577
578        Ok(results)
579    }
580
581    /// Load dense vector indexes from unified .vectors file
582    ///
583    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
584    /// Also loads coarse centroids and PQ codebook as needed.
585    ///
586    /// Memory optimization: Uses lazy range reads to load each index separately,
587    /// avoiding loading the entire vectors file into memory at once.
588    async fn load_vectors_file<D: Directory>(
589        dir: &D,
590        files: &SegmentFiles,
591        schema: &Schema,
592    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
593        use byteorder::{LittleEndian, ReadBytesExt};
594        use std::io::Cursor;
595
596        let mut indexes = FxHashMap::default();
597        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
598
599        // Skip loading vectors file if schema has no dense vector fields
600        let has_dense_vectors = schema
601            .fields()
602            .any(|(_, entry)| entry.dense_vector_config.is_some());
603        if !has_dense_vectors {
604            return Ok((indexes, None));
605        }
606
607        // Try to open vectors file (may not exist if no vectors were indexed)
608        let handle = match dir.open_lazy(&files.vectors).await {
609            Ok(h) => h,
610            Err(_) => return Ok((indexes, None)),
611        };
612
613        // Read only the header first (4 bytes for num_fields)
614        let header_bytes = match handle.read_bytes_range(0..4).await {
615            Ok(b) => b,
616            Err(_) => return Ok((indexes, None)),
617        };
618
619        if header_bytes.is_empty() {
620            return Ok((indexes, None));
621        }
622
623        let mut cursor = Cursor::new(header_bytes.as_slice());
624        let num_fields = cursor.read_u32::<LittleEndian>()?;
625
626        if num_fields == 0 {
627            return Ok((indexes, None));
628        }
629
630        // Read field entries header: (field_id: 4, index_type: 1, offset: 8, length: 8) = 21 bytes per field
631        let entries_size = num_fields as u64 * 21;
632        let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
633        let mut cursor = Cursor::new(entries_bytes.as_slice());
634
635        // Read field entries (field_id, index_type, offset, length)
636        let mut entries = Vec::with_capacity(num_fields as usize);
637        for _ in 0..num_fields {
638            let field_id = cursor.read_u32::<LittleEndian>()?;
639            // Try to read index_type - if this fails, assume old format without type
640            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
641            let offset = cursor.read_u64::<LittleEndian>()?;
642            let length = cursor.read_u64::<LittleEndian>()?;
643            entries.push((field_id, index_type, offset, length));
644        }
645
646        // Load each index on-demand using range reads (memory efficient)
647        for (field_id, index_type, offset, length) in entries {
648            // Read only this index's data
649            let data = handle.read_bytes_range(offset..offset + length).await?;
650            let _field = crate::dsl::Field(field_id);
651
652            match index_type {
653                3 => {
654                    // Flat (brute-force) - raw vectors for accumulating state
655                    if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
656                    {
657                        indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
658                    }
659                }
660                2 => {
661                    // ScaNN (IVF-PQ) with embedded centroids and codebook
662                    use super::builder::ScaNNIndexData;
663                    if let Ok(scann_data) = ScaNNIndexData::from_bytes(data.as_slice()) {
664                        coarse_centroids = Some(Arc::new(scann_data.centroids));
665                        indexes.insert(
666                            field_id,
667                            VectorIndex::ScaNN {
668                                index: Arc::new(scann_data.index),
669                                codebook: Arc::new(scann_data.codebook),
670                            },
671                        );
672                    }
673                }
674                1 => {
675                    // IVF-RaBitQ with embedded centroids and codebook
676                    use super::builder::IVFRaBitQIndexData;
677                    if let Ok(ivf_data) = IVFRaBitQIndexData::from_bytes(data.as_slice()) {
678                        coarse_centroids = Some(Arc::new(ivf_data.centroids));
679                        indexes.insert(
680                            field_id,
681                            VectorIndex::IVF {
682                                index: Arc::new(ivf_data.index),
683                                codebook: Arc::new(ivf_data.codebook),
684                            },
685                        );
686                    }
687                }
688                0 => {
689                    // RaBitQ (standalone)
690                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
691                    {
692                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
693                    }
694                }
695                _ => {
696                    // Unknown type - try Flat first (most common in new indexes)
697                    if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
698                    {
699                        indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
700                    } else if let Ok(rabitq_index) =
701                        serde_json::from_slice::<RaBitQIndex>(data.as_slice())
702                    {
703                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
704                    }
705                }
706            }
707        }
708
709        Ok((indexes, coarse_centroids))
710    }
711
712    /// Load sparse vector indexes from .sparse file
713    ///
714    /// File format (direct-indexed table for O(1) dimension lookup):
715    /// - Header: num_fields (u32)
716    /// - For each field:
717    ///   - field_id (u32)
718    ///   - quantization (u8)
719    ///   - max_dim_id (u32)          ← table size
720    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed
721    ///     (offset=0, length=0 means dimension not present)
722    /// - Data: concatenated serialized BlockSparsePostingList
723    async fn load_sparse_file<D: Directory>(
724        dir: &D,
725        files: &SegmentFiles,
726        total_docs: u32,
727    ) -> Result<FxHashMap<u32, SparseIndex>> {
728        use byteorder::{LittleEndian, ReadBytesExt};
729        use std::io::Cursor;
730
731        let mut indexes = FxHashMap::default();
732
733        // Try to open sparse file (may not exist if no sparse vectors were indexed)
734        let handle = match dir.open_lazy(&files.sparse).await {
735            Ok(h) => h,
736            Err(_) => return Ok(indexes),
737        };
738
739        // Read the entire file (sparse files are typically small enough)
740        let data = match handle.read_bytes().await {
741            Ok(d) => d,
742            Err(_) => return Ok(indexes),
743        };
744
745        if data.len() < 4 {
746            return Ok(indexes);
747        }
748
749        let mut cursor = Cursor::new(data.as_slice());
750        let num_fields = cursor.read_u32::<LittleEndian>()?;
751
752        if num_fields == 0 {
753            return Ok(indexes);
754        }
755
756        // Read field entries and build indexes
757        for _ in 0..num_fields {
758            let field_id = cursor.read_u32::<LittleEndian>()?;
759            let _quantization = cursor.read_u8()?; // Already stored in each BlockSparsePostingList
760            let max_dim_id = cursor.read_u32::<LittleEndian>()?;
761
762            // Read direct-indexed table
763            let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
764                vec![None; max_dim_id as usize];
765
766            for dim_id in 0..max_dim_id {
767                let offset = cursor.read_u64::<LittleEndian>()?;
768                let length = cursor.read_u32::<LittleEndian>()?;
769
770                // offset=0, length=0 means dimension not present
771                if length > 0 {
772                    let start = offset as usize;
773                    let end = start + length as usize;
774                    if end <= data.len() {
775                        let posting_data = &data.as_slice()[start..end];
776                        if let Ok(posting_list) =
777                            BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
778                        {
779                            postings[dim_id as usize] = Some(Arc::new(posting_list));
780                        }
781                    }
782                }
783            }
784
785            indexes.insert(
786                field_id,
787                SparseIndex {
788                    postings,
789                    total_docs,
790                },
791            );
792        }
793
794        Ok(indexes)
795    }
796
797    /// Load position index header from .pos file
798    ///
799    /// File format:
800    /// Open positions file handle (no header parsing needed - offsets are in TermInfo)
801    async fn open_positions_file<D: Directory>(
802        dir: &D,
803        files: &SegmentFiles,
804    ) -> Result<Option<LazyFileHandle>> {
805        // Try to open positions file (may not exist if no positions were indexed)
806        match dir.open_lazy(&files.positions).await {
807            Ok(h) => Ok(Some(h)),
808            Err(_) => Ok(None),
809        }
810    }
811
812    /// Get positions for a term (for phrase queries)
813    ///
814    /// Position offsets are now embedded in TermInfo, so we first look up
815    /// the term to get its TermInfo, then use position_info() to get the offset.
816    pub async fn get_positions(
817        &self,
818        field: Field,
819        term: &[u8],
820    ) -> Result<Option<crate::structures::PositionPostingList>> {
821        use std::io::Cursor;
822
823        // Get positions handle
824        let handle = match &self.positions_handle {
825            Some(h) => h,
826            None => return Ok(None),
827        };
828
829        // Build key: field_id + term
830        let mut key = Vec::with_capacity(4 + term.len());
831        key.extend_from_slice(&field.0.to_le_bytes());
832        key.extend_from_slice(term);
833
834        // Look up term in dictionary to get TermInfo with position offset
835        let term_info = match self.term_dict.get(&key).await? {
836            Some(info) => info,
837            None => return Ok(None),
838        };
839
840        // Get position offset from TermInfo
841        let (offset, length) = match term_info.position_info() {
842            Some((o, l)) => (o, l),
843            None => return Ok(None),
844        };
845
846        // Read the position data
847        let slice = handle.slice(offset..offset + length as u64);
848        let data = slice.read_bytes().await?;
849
850        // Deserialize
851        let mut cursor = Cursor::new(data.as_slice());
852        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
853
854        Ok(Some(pos_list))
855    }
856
857    /// Check if positions are available for a field
858    pub fn has_positions(&self, field: Field) -> bool {
859        // Check schema for position mode on this field
860        if let Some(entry) = self.schema.get_field_entry(field) {
861            entry.positions.is_some()
862        } else {
863            false
864        }
865    }
866}
867
868/// Alias for AsyncSegmentReader
869pub type SegmentReader = AsyncSegmentReader;