Skip to main content

hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11    IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17use super::vector_data::FlatVectorData;
18
19/// Vector index type - Flat, RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
20#[derive(Clone)]
21#[allow(clippy::upper_case_acronyms)]
22pub enum VectorIndex {
23    /// Flat - brute-force search over raw vectors (accumulating state)
24    Flat(Arc<FlatVectorData>),
25    /// RaBitQ - binary quantization, good for small datasets
26    RaBitQ(Arc<RaBitQIndex>),
27    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
28    IVF {
29        index: Arc<IVFRaBitQIndex>,
30        codebook: Arc<RaBitQCodebook>,
31    },
32    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
33    ScaNN {
34        index: Arc<IVFPQIndex>,
35        codebook: Arc<PQCodebook>,
36    },
37}
38
39/// Sparse vector index for a field: direct-indexed by dimension ID
40#[derive(Clone)]
41pub struct SparseIndex {
42    /// Posting lists indexed directly by dimension ID (O(1) lookup)
43    /// None means dimension not present in index
44    pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
45    /// Total document count in this segment (for IDF computation)
46    pub total_docs: u32,
47}
48
49impl SparseIndex {
50    /// Compute IDF (inverse document frequency) for a dimension
51    ///
52    /// IDF = log(N / df) where N = total docs, df = docs containing dimension
53    /// Returns 0.0 if dimension not present
54    #[inline]
55    pub fn idf(&self, dim_id: u32) -> f32 {
56        if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
57            let df = pl.doc_count() as f32;
58            if df > 0.0 {
59                (self.total_docs as f32 / df).ln()
60            } else {
61                0.0
62            }
63        } else {
64            0.0
65        }
66    }
67
68    /// Get IDF weights for multiple dimensions
69    pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
70        dim_ids.iter().map(|&d| self.idf(d)).collect()
71    }
72}
73
74/// Async segment reader with lazy loading
75///
76/// - Term dictionary: only index loaded, blocks loaded on-demand
77/// - Postings: loaded on-demand per term via HTTP range requests
78/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
79pub struct AsyncSegmentReader {
80    meta: SegmentMeta,
81    /// Term dictionary with lazy block loading
82    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
83    /// Postings file handle - fetches ranges on demand
84    postings_handle: LazyFileHandle,
85    /// Document store with lazy block loading
86    store: Arc<AsyncStoreReader>,
87    schema: Arc<Schema>,
88    /// Base doc_id offset for this segment
89    doc_id_offset: DocId,
90    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
91    vector_indexes: FxHashMap<u32, VectorIndex>,
92    /// Shared coarse centroids for IVF search (loaded once)
93    coarse_centroids: Option<Arc<CoarseCentroids>>,
94    /// Sparse vector indexes per field
95    sparse_indexes: FxHashMap<u32, SparseIndex>,
96    /// Position file handle for phrase queries (lazy loading)
97    positions_handle: Option<LazyFileHandle>,
98}
99
100impl AsyncSegmentReader {
101    /// Open a segment with lazy loading
102    pub async fn open<D: Directory>(
103        dir: &D,
104        segment_id: SegmentId,
105        schema: Arc<Schema>,
106        doc_id_offset: DocId,
107        cache_blocks: usize,
108    ) -> Result<Self> {
109        let files = SegmentFiles::new(segment_id.0);
110
111        // Read metadata (small, always loaded)
112        let meta_slice = dir.open_read(&files.meta).await?;
113        let meta_bytes = meta_slice.read_bytes().await?;
114        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
115        debug_assert_eq!(meta.id, segment_id.0);
116
117        // Open term dictionary with lazy loading (fetches ranges on demand)
118        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
119        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
120
121        // Get postings file handle (lazy - fetches ranges on demand)
122        let postings_handle = dir.open_lazy(&files.postings).await?;
123
124        // Open store with lazy loading
125        let store_handle = dir.open_lazy(&files.store).await?;
126        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
127
128        // Load dense vector indexes from unified .vectors file
129        let (vector_indexes, coarse_centroids) =
130            Self::load_vectors_file(dir, &files, &schema).await?;
131
132        // Load sparse vector indexes from .sparse file
133        let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
134
135        // Open positions file handle (if exists) - offsets are now in TermInfo
136        let positions_handle = Self::open_positions_file(dir, &files).await?;
137
138        Ok(Self {
139            meta,
140            term_dict: Arc::new(term_dict),
141            postings_handle,
142            store: Arc::new(store),
143            schema,
144            doc_id_offset,
145            vector_indexes,
146            coarse_centroids,
147            sparse_indexes,
148            positions_handle,
149        })
150    }
151
152    pub fn meta(&self) -> &SegmentMeta {
153        &self.meta
154    }
155
156    pub fn num_docs(&self) -> u32 {
157        self.meta.num_docs
158    }
159
160    /// Get average field length for BM25F scoring
161    pub fn avg_field_len(&self, field: Field) -> f32 {
162        self.meta.avg_field_len(field)
163    }
164
165    pub fn doc_id_offset(&self) -> DocId {
166        self.doc_id_offset
167    }
168
169    pub fn schema(&self) -> &Schema {
170        &self.schema
171    }
172
173    /// Get sparse indexes for all fields
174    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
175        &self.sparse_indexes
176    }
177
178    /// Get vector indexes for all fields
179    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
180        &self.vector_indexes
181    }
182
183    /// Get term dictionary stats for debugging
184    pub fn term_dict_stats(&self) -> SSTableStats {
185        self.term_dict.stats()
186    }
187
188    /// Get posting list for a term (async - loads on demand)
189    ///
190    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
191    /// and no additional I/O is needed. For larger lists, reads from .post file.
192    pub async fn get_postings(
193        &self,
194        field: Field,
195        term: &[u8],
196    ) -> Result<Option<BlockPostingList>> {
197        log::debug!(
198            "SegmentReader::get_postings field={} term_len={}",
199            field.0,
200            term.len()
201        );
202
203        // Build key: field_id + term
204        let mut key = Vec::with_capacity(4 + term.len());
205        key.extend_from_slice(&field.0.to_le_bytes());
206        key.extend_from_slice(term);
207
208        // Look up in term dictionary
209        let term_info = match self.term_dict.get(&key).await? {
210            Some(info) => {
211                log::debug!("SegmentReader::get_postings found term_info");
212                info
213            }
214            None => {
215                log::debug!("SegmentReader::get_postings term not found");
216                return Ok(None);
217            }
218        };
219
220        // Check if posting list is inlined
221        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
222            // Build BlockPostingList from inline data (no I/O needed!)
223            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
224            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
225                posting_list.push(doc_id, tf);
226            }
227            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
228            return Ok(Some(block_list));
229        }
230
231        // External posting list - read from postings file handle (lazy - HTTP range request)
232        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
233            Error::Corruption("TermInfo has neither inline nor external data".to_string())
234        })?;
235
236        let start = posting_offset;
237        let end = start + posting_len as u64;
238
239        if end > self.postings_handle.len() {
240            return Err(Error::Corruption(
241                "Posting offset out of bounds".to_string(),
242            ));
243        }
244
245        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
246        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
247
248        Ok(Some(block_list))
249    }
250
251    /// Get document by local doc_id (async - loads on demand)
252    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
253        self.store
254            .get(local_doc_id, &self.schema)
255            .await
256            .map_err(Error::from)
257    }
258
259    /// Prefetch term dictionary blocks for a key range
260    pub async fn prefetch_terms(
261        &self,
262        field: Field,
263        start_term: &[u8],
264        end_term: &[u8],
265    ) -> Result<()> {
266        let mut start_key = Vec::with_capacity(4 + start_term.len());
267        start_key.extend_from_slice(&field.0.to_le_bytes());
268        start_key.extend_from_slice(start_term);
269
270        let mut end_key = Vec::with_capacity(4 + end_term.len());
271        end_key.extend_from_slice(&field.0.to_le_bytes());
272        end_key.extend_from_slice(end_term);
273
274        self.term_dict.prefetch_range(&start_key, &end_key).await?;
275        Ok(())
276    }
277
278    /// Check if store uses dictionary compression (incompatible with raw merging)
279    pub fn store_has_dict(&self) -> bool {
280        self.store.has_dict()
281    }
282
283    /// Get raw store blocks for optimized merging
284    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
285        self.store.raw_blocks()
286    }
287
288    /// Get store data slice for raw block access
289    pub fn store_data_slice(&self) -> &LazyFileSlice {
290        self.store.data_slice()
291    }
292
293    /// Get all terms from this segment (for merge)
294    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
295        self.term_dict.all_entries().await.map_err(Error::from)
296    }
297
298    /// Get all terms with parsed field and term string (for statistics aggregation)
299    ///
300    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
301    /// Skips terms that aren't valid UTF-8.
302    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
303        let entries = self.term_dict.all_entries().await?;
304        let mut result = Vec::with_capacity(entries.len());
305
306        for (key, term_info) in entries {
307            // Key format: field_id (4 bytes little-endian) + term bytes
308            if key.len() > 4 {
309                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
310                let term_bytes = &key[4..];
311                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
312                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
313                }
314            }
315        }
316
317        Ok(result)
318    }
319
320    /// Get streaming iterator over term dictionary (for memory-efficient merge)
321    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
322        self.term_dict.iter()
323    }
324
325    /// Read raw posting bytes at offset
326    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
327        let start = offset;
328        let end = start + len as u64;
329        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
330        Ok(bytes.to_vec())
331    }
332
333    /// Search dense vectors using RaBitQ
334    ///
335    /// Returns (doc_id, score) pairs sorted by score (descending).
336    /// The doc_ids are adjusted by doc_id_offset for this segment.
337    /// If mrl_dim is configured, the query vector is automatically trimmed.
338    /// For multi-valued documents, scores are combined using the specified combiner.
339    pub fn search_dense_vector(
340        &self,
341        field: Field,
342        query: &[f32],
343        k: usize,
344        rerank_factor: usize,
345        combiner: crate::query::MultiValueCombiner,
346    ) -> Result<Vec<(DocId, f32)>> {
347        use crate::query::MultiValueCombiner;
348        let index = self
349            .vector_indexes
350            .get(&field.0)
351            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
352
353        // Get mrl_dim from config to trim query vector if needed
354        let mrl_dim = self
355            .schema
356            .get_field_entry(field)
357            .and_then(|e| e.dense_vector_config.as_ref())
358            .and_then(|c| c.mrl_dim);
359
360        // Trim query vector if mrl_dim is set
361        let query_vec: Vec<f32>;
362        let effective_query = if let Some(trim_dim) = mrl_dim {
363            if trim_dim < query.len() {
364                query_vec = query[..trim_dim].to_vec();
365                query_vec.as_slice()
366            } else {
367                query
368            }
369        } else {
370            query
371        };
372
373        let results: Vec<(u32, f32)> = match index {
374            VectorIndex::Flat(flat_data) => {
375                // Brute-force search over raw vectors using SIMD-accelerated distance
376                use crate::structures::simd::squared_euclidean_distance;
377
378                let mut candidates: Vec<(u32, f32)> = flat_data
379                    .vectors
380                    .iter()
381                    .zip(flat_data.doc_ids.iter())
382                    .map(|(vec, &doc_id)| {
383                        let dist = squared_euclidean_distance(effective_query, vec);
384                        (doc_id, dist)
385                    })
386                    .collect();
387                candidates
388                    .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
389                candidates.truncate(k);
390                candidates
391            }
392            VectorIndex::RaBitQ(rabitq) => rabitq
393                .search(effective_query, k, rerank_factor)
394                .into_iter()
395                .map(|(idx, dist)| (idx as u32, dist))
396                .collect(),
397            VectorIndex::IVF { index, codebook } => {
398                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
399                    Error::Schema("IVF index requires coarse centroids".to_string())
400                })?;
401                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
402                index.search(centroids, codebook, effective_query, k, Some(nprobe))
403            }
404            VectorIndex::ScaNN { index, codebook } => {
405                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
406                    Error::Schema("ScaNN index requires coarse centroids".to_string())
407                })?;
408                let nprobe = rerank_factor.max(32);
409                index.search(centroids, codebook, effective_query, k, Some(nprobe))
410            }
411        };
412
413        // Convert distance to score (smaller distance = higher score)
414        // and adjust doc_ids by segment offset
415        let raw_results: Vec<(DocId, f32)> = results
416            .into_iter()
417            .map(|(idx, dist)| {
418                let doc_id = idx as DocId + self.doc_id_offset;
419                let score = 1.0 / (1.0 + dist); // Convert distance to similarity score
420                (doc_id, score)
421            })
422            .collect();
423
424        // Combine scores for duplicate doc_ids (multi-valued documents)
425        let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
426            rustc_hash::FxHashMap::default();
427        for (doc_id, score) in raw_results {
428            combined
429                .entry(doc_id)
430                .and_modify(|(acc_score, count)| match combiner {
431                    MultiValueCombiner::Sum => *acc_score += score,
432                    MultiValueCombiner::Max => *acc_score = acc_score.max(score),
433                    MultiValueCombiner::Avg => {
434                        *acc_score += score;
435                        *count += 1;
436                    }
437                })
438                .or_insert((score, 1));
439        }
440
441        // Finalize averages and collect results
442        let mut final_results: Vec<(DocId, f32)> = combined
443            .into_iter()
444            .map(|(doc_id, (score, count))| {
445                let final_score = if combiner == MultiValueCombiner::Avg {
446                    score / count as f32
447                } else {
448                    score
449                };
450                (doc_id, final_score)
451            })
452            .collect();
453
454        // Sort by score descending and take top k
455        final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
456        final_results.truncate(k);
457
458        Ok(final_results)
459    }
460
461    /// Check if this segment has a dense vector index for the given field
462    pub fn has_dense_vector_index(&self, field: Field) -> bool {
463        self.vector_indexes.contains_key(&field.0)
464    }
465
466    /// Get the dense vector index for a field (if available)
467    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
468        match self.vector_indexes.get(&field.0) {
469            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
470            _ => None,
471        }
472    }
473
474    /// Get the IVF vector index for a field (if available)
475    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
476        match self.vector_indexes.get(&field.0) {
477            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
478            _ => None,
479        }
480    }
481
482    /// Get the ScaNN vector index for a field (if available)
483    pub fn get_scann_vector_index(
484        &self,
485        field: Field,
486    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
487        match self.vector_indexes.get(&field.0) {
488            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
489            _ => None,
490        }
491    }
492
493    /// Get the vector index type for a field
494    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
495        self.vector_indexes.get(&field.0)
496    }
497
498    /// Search for similar sparse vectors using dedicated sparse posting lists
499    ///
500    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
501    /// Optimizations (via WandExecutor):
502    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
503    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
504    /// 3. **Top-K heap**: Efficient score collection
505    ///
506    /// Returns (doc_id, score) pairs sorted by score descending.
507    pub async fn search_sparse_vector(
508        &self,
509        field: Field,
510        vector: &[(u32, f32)],
511        limit: usize,
512        combiner: crate::query::MultiValueCombiner,
513    ) -> Result<Vec<(u32, f32)>> {
514        use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
515
516        let query_tokens = vector.len();
517
518        // Get sparse index for this field
519        let sparse_index = match self.sparse_indexes.get(&field.0) {
520            Some(idx) => idx,
521            None => {
522                log::debug!(
523                    "Sparse vector search: no index for field {}, returning empty",
524                    field.0
525                );
526                return Ok(Vec::new());
527            }
528        };
529
530        let index_dimensions = sparse_index.postings.len();
531
532        // Build scorers for each dimension that exists in the index
533        let mut matched_tokens = Vec::new();
534        let mut missing_tokens = Vec::new();
535
536        let scorers: Vec<SparseTermScorer> = vector
537            .iter()
538            .filter_map(|&(dim_id, query_weight)| {
539                // Direct indexing: O(1) lookup
540                match sparse_index
541                    .postings
542                    .get(dim_id as usize)
543                    .and_then(|opt| opt.as_ref())
544                {
545                    Some(pl) => {
546                        matched_tokens.push(dim_id);
547                        Some(SparseTermScorer::from_arc(pl, query_weight))
548                    }
549                    None => {
550                        missing_tokens.push(dim_id);
551                        None
552                    }
553                }
554            })
555            .collect();
556
557        log::debug!(
558            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
559            query_tokens,
560            matched_tokens.len(),
561            missing_tokens.len(),
562            index_dimensions
563        );
564
565        if !matched_tokens.is_empty() {
566            log::debug!(
567                "Matched token IDs: {:?}",
568                matched_tokens.iter().take(20).collect::<Vec<_>>()
569            );
570        }
571
572        if !missing_tokens.is_empty() {
573            log::debug!(
574                "Missing token IDs (not in index): {:?}",
575                missing_tokens.iter().take(20).collect::<Vec<_>>()
576            );
577        }
578
579        if scorers.is_empty() {
580            log::debug!("Sparse vector search: no matching tokens, returning empty");
581            return Ok(Vec::new());
582        }
583
584        // Use shared WandExecutor for top-k retrieval
585        // Note: For multi-valued fields, same doc_id may appear multiple times
586        // with different scores that need to be combined
587        let raw_results = WandExecutor::new(scorers, limit * 2).execute(); // Over-fetch for combining
588
589        // Combine scores for duplicate doc_ids based on combiner strategy
590        let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
591        for r in raw_results {
592            combined
593                .entry(r.doc_id)
594                .and_modify(|(score, count)| match combiner {
595                    MultiValueCombiner::Sum => *score += r.score,
596                    MultiValueCombiner::Max => *score = score.max(r.score),
597                    MultiValueCombiner::Avg => {
598                        *score += r.score;
599                        *count += 1;
600                    }
601                })
602                .or_insert((r.score, 1));
603        }
604
605        // Finalize averages and collect results
606        let mut results: Vec<(u32, f32)> = combined
607            .into_iter()
608            .map(|(doc_id, (score, count))| {
609                let final_score = if combiner == MultiValueCombiner::Avg {
610                    score / count as f32
611                } else {
612                    score
613                };
614                (doc_id, final_score)
615            })
616            .collect();
617
618        // Sort by score descending and take top limit
619        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
620        results.truncate(limit);
621
622        Ok(results)
623    }
624
625    /// Load dense vector indexes from unified .vectors file
626    ///
627    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
628    /// Also loads coarse centroids and PQ codebook as needed.
629    ///
630    /// Memory optimization: Uses lazy range reads to load each index separately,
631    /// avoiding loading the entire vectors file into memory at once.
632    async fn load_vectors_file<D: Directory>(
633        dir: &D,
634        files: &SegmentFiles,
635        schema: &Schema,
636    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
637        use byteorder::{LittleEndian, ReadBytesExt};
638        use std::io::Cursor;
639
640        let mut indexes = FxHashMap::default();
641        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
642
643        // Skip loading vectors file if schema has no dense vector fields
644        let has_dense_vectors = schema
645            .fields()
646            .any(|(_, entry)| entry.dense_vector_config.is_some());
647        if !has_dense_vectors {
648            return Ok((indexes, None));
649        }
650
651        // Try to open vectors file (may not exist if no vectors were indexed)
652        let handle = match dir.open_lazy(&files.vectors).await {
653            Ok(h) => h,
654            Err(_) => return Ok((indexes, None)),
655        };
656
657        // Read only the header first (4 bytes for num_fields)
658        let header_bytes = match handle.read_bytes_range(0..4).await {
659            Ok(b) => b,
660            Err(_) => return Ok((indexes, None)),
661        };
662
663        if header_bytes.is_empty() {
664            return Ok((indexes, None));
665        }
666
667        let mut cursor = Cursor::new(header_bytes.as_slice());
668        let num_fields = cursor.read_u32::<LittleEndian>()?;
669
670        if num_fields == 0 {
671            return Ok((indexes, None));
672        }
673
674        // Read field entries header: (field_id: 4, index_type: 1, offset: 8, length: 8) = 21 bytes per field
675        let entries_size = num_fields as u64 * 21;
676        let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
677        let mut cursor = Cursor::new(entries_bytes.as_slice());
678
679        // Read field entries (field_id, index_type, offset, length)
680        let mut entries = Vec::with_capacity(num_fields as usize);
681        for _ in 0..num_fields {
682            let field_id = cursor.read_u32::<LittleEndian>()?;
683            // Try to read index_type - if this fails, assume old format without type
684            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
685            let offset = cursor.read_u64::<LittleEndian>()?;
686            let length = cursor.read_u64::<LittleEndian>()?;
687            entries.push((field_id, index_type, offset, length));
688        }
689
690        // Load each index on-demand using range reads (memory efficient)
691        for (field_id, index_type, offset, length) in entries {
692            // Read only this index's data
693            let data = handle.read_bytes_range(offset..offset + length).await?;
694            let _field = crate::dsl::Field(field_id);
695
696            match index_type {
697                3 => {
698                    // Flat (brute-force) - raw vectors for accumulating state
699                    if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
700                    {
701                        indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
702                    }
703                }
704                2 => {
705                    // ScaNN (IVF-PQ) with embedded centroids and codebook
706                    use super::vector_data::ScaNNIndexData;
707                    if let Ok(scann_data) = ScaNNIndexData::from_bytes(data.as_slice()) {
708                        coarse_centroids = Some(Arc::new(scann_data.centroids));
709                        indexes.insert(
710                            field_id,
711                            VectorIndex::ScaNN {
712                                index: Arc::new(scann_data.index),
713                                codebook: Arc::new(scann_data.codebook),
714                            },
715                        );
716                    }
717                }
718                1 => {
719                    // IVF-RaBitQ with embedded centroids and codebook
720                    use super::vector_data::IVFRaBitQIndexData;
721                    if let Ok(ivf_data) = IVFRaBitQIndexData::from_bytes(data.as_slice()) {
722                        coarse_centroids = Some(Arc::new(ivf_data.centroids));
723                        indexes.insert(
724                            field_id,
725                            VectorIndex::IVF {
726                                index: Arc::new(ivf_data.index),
727                                codebook: Arc::new(ivf_data.codebook),
728                            },
729                        );
730                    }
731                }
732                0 => {
733                    // RaBitQ (standalone)
734                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
735                    {
736                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
737                    }
738                }
739                _ => {
740                    // Unknown type - try Flat first (most common in new indexes)
741                    if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
742                    {
743                        indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
744                    } else if let Ok(rabitq_index) =
745                        serde_json::from_slice::<RaBitQIndex>(data.as_slice())
746                    {
747                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
748                    }
749                }
750            }
751        }
752
753        Ok((indexes, coarse_centroids))
754    }
755
756    /// Load sparse vector indexes from .sparse file
757    ///
758    /// File format (direct-indexed table for O(1) dimension lookup):
759    /// - Header: num_fields (u32)
760    /// - For each field:
761    ///   - field_id (u32)
762    ///   - quantization (u8)
763    ///   - max_dim_id (u32)          ← table size
764    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed
765    ///     (offset=0, length=0 means dimension not present)
766    /// - Data: concatenated serialized BlockSparsePostingList
767    async fn load_sparse_file<D: Directory>(
768        dir: &D,
769        files: &SegmentFiles,
770        total_docs: u32,
771    ) -> Result<FxHashMap<u32, SparseIndex>> {
772        use byteorder::{LittleEndian, ReadBytesExt};
773        use std::io::Cursor;
774
775        let mut indexes = FxHashMap::default();
776
777        // Try to open sparse file (may not exist if no sparse vectors were indexed)
778        let handle = match dir.open_lazy(&files.sparse).await {
779            Ok(h) => h,
780            Err(e) => {
781                log::debug!("No sparse file found ({}): {:?}", files.sparse.display(), e);
782                return Ok(indexes);
783            }
784        };
785
786        // Read the entire file (sparse files are typically small enough)
787        let data = match handle.read_bytes().await {
788            Ok(d) => d,
789            Err(_) => return Ok(indexes),
790        };
791
792        if data.len() < 4 {
793            return Ok(indexes);
794        }
795
796        let mut cursor = Cursor::new(data.as_slice());
797        let num_fields = cursor.read_u32::<LittleEndian>()?;
798
799        log::debug!(
800            "Loading sparse file: size={} bytes, num_fields={}",
801            data.len(),
802            num_fields
803        );
804
805        if num_fields == 0 {
806            return Ok(indexes);
807        }
808
809        // Read field entries and build indexes
810        for _ in 0..num_fields {
811            let field_id = cursor.read_u32::<LittleEndian>()?;
812            let _quantization = cursor.read_u8()?; // Already stored in each BlockSparsePostingList
813            let max_dim_id = cursor.read_u32::<LittleEndian>()?;
814
815            // Read direct-indexed table
816            let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
817                vec![None; max_dim_id as usize];
818
819            for dim_id in 0..max_dim_id {
820                let offset = cursor.read_u64::<LittleEndian>()?;
821                let length = cursor.read_u32::<LittleEndian>()?;
822
823                // offset=0, length=0 means dimension not present
824                if length > 0 {
825                    let start = offset as usize;
826                    let end = start + length as usize;
827                    if end <= data.len() {
828                        let posting_data = &data.as_slice()[start..end];
829                        if let Ok(posting_list) =
830                            BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
831                        {
832                            postings[dim_id as usize] = Some(Arc::new(posting_list));
833                        }
834                    }
835                }
836            }
837
838            let num_postings = postings.iter().filter(|p| p.is_some()).count();
839            log::debug!(
840                "Loaded sparse index for field {}: max_dim={}, active_postings={}",
841                field_id,
842                max_dim_id,
843                num_postings
844            );
845
846            indexes.insert(
847                field_id,
848                SparseIndex {
849                    postings,
850                    total_docs,
851                },
852            );
853        }
854
855        log::debug!(
856            "Sparse file loaded: fields={:?}",
857            indexes.keys().collect::<Vec<_>>()
858        );
859
860        Ok(indexes)
861    }
862
863    /// Load position index header from .pos file
864    ///
865    /// File format:
866    /// Open positions file handle (no header parsing needed - offsets are in TermInfo)
867    async fn open_positions_file<D: Directory>(
868        dir: &D,
869        files: &SegmentFiles,
870    ) -> Result<Option<LazyFileHandle>> {
871        // Try to open positions file (may not exist if no positions were indexed)
872        match dir.open_lazy(&files.positions).await {
873            Ok(h) => Ok(Some(h)),
874            Err(_) => Ok(None),
875        }
876    }
877
878    /// Get positions for a term (for phrase queries)
879    ///
880    /// Position offsets are now embedded in TermInfo, so we first look up
881    /// the term to get its TermInfo, then use position_info() to get the offset.
882    pub async fn get_positions(
883        &self,
884        field: Field,
885        term: &[u8],
886    ) -> Result<Option<crate::structures::PositionPostingList>> {
887        use std::io::Cursor;
888
889        // Get positions handle
890        let handle = match &self.positions_handle {
891            Some(h) => h,
892            None => return Ok(None),
893        };
894
895        // Build key: field_id + term
896        let mut key = Vec::with_capacity(4 + term.len());
897        key.extend_from_slice(&field.0.to_le_bytes());
898        key.extend_from_slice(term);
899
900        // Look up term in dictionary to get TermInfo with position offset
901        let term_info = match self.term_dict.get(&key).await? {
902            Some(info) => info,
903            None => return Ok(None),
904        };
905
906        // Get position offset from TermInfo
907        let (offset, length) = match term_info.position_info() {
908            Some((o, l)) => (o, l),
909            None => return Ok(None),
910        };
911
912        // Read the position data
913        let slice = handle.slice(offset..offset + length as u64);
914        let data = slice.read_bytes().await?;
915
916        // Deserialize
917        let mut cursor = Cursor::new(data.as_slice());
918        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
919
920        Ok(Some(pos_list))
921    }
922
923    /// Check if positions are available for a field
924    pub fn has_positions(&self, field: Field) -> bool {
925        // Check schema for position mode on this field
926        if let Some(entry) = self.schema.get_field_entry(field) {
927            entry.positions.is_some()
928        } else {
929            false
930        }
931    }
932}
933
934/// Alias for AsyncSegmentReader
935pub type SegmentReader = AsyncSegmentReader;