Skip to main content

hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11    IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18/// Vector index type - RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
19#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22    /// RaBitQ - binary quantization, good for small datasets
23    RaBitQ(Arc<RaBitQIndex>),
24    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
25    IVF {
26        index: Arc<IVFRaBitQIndex>,
27        codebook: Arc<RaBitQCodebook>,
28    },
29    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
30    ScaNN {
31        index: Arc<IVFPQIndex>,
32        codebook: Arc<PQCodebook>,
33    },
34}
35
36/// Sparse vector index for a field: direct-indexed by dimension ID
37#[derive(Clone)]
38pub struct SparseIndex {
39    /// Posting lists indexed directly by dimension ID (O(1) lookup)
40    /// None means dimension not present in index
41    pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
42    /// Total document count in this segment (for IDF computation)
43    pub total_docs: u32,
44}
45
46impl SparseIndex {
47    /// Compute IDF (inverse document frequency) for a dimension
48    ///
49    /// IDF = log(N / df) where N = total docs, df = docs containing dimension
50    /// Returns 0.0 if dimension not present
51    #[inline]
52    pub fn idf(&self, dim_id: u32) -> f32 {
53        if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
54            let df = pl.doc_count() as f32;
55            if df > 0.0 {
56                (self.total_docs as f32 / df).ln()
57            } else {
58                0.0
59            }
60        } else {
61            0.0
62        }
63    }
64
65    /// Get IDF weights for multiple dimensions
66    pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
67        dim_ids.iter().map(|&d| self.idf(d)).collect()
68    }
69}
70
71/// Async segment reader with lazy loading
72///
73/// - Term dictionary: only index loaded, blocks loaded on-demand
74/// - Postings: loaded on-demand per term via HTTP range requests
75/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
76pub struct AsyncSegmentReader {
77    meta: SegmentMeta,
78    /// Term dictionary with lazy block loading
79    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
80    /// Postings file handle - fetches ranges on demand
81    postings_handle: LazyFileHandle,
82    /// Document store with lazy block loading
83    store: Arc<AsyncStoreReader>,
84    schema: Arc<Schema>,
85    /// Base doc_id offset for this segment
86    doc_id_offset: DocId,
87    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
88    vector_indexes: FxHashMap<u32, VectorIndex>,
89    /// Shared coarse centroids for IVF search (loaded once)
90    coarse_centroids: Option<Arc<CoarseCentroids>>,
91    /// Sparse vector indexes per field
92    sparse_indexes: FxHashMap<u32, SparseIndex>,
93    /// Position file handle for phrase queries (lazy loading)
94    positions_handle: Option<LazyFileHandle>,
95}
96
97impl AsyncSegmentReader {
98    /// Open a segment with lazy loading
99    pub async fn open<D: Directory>(
100        dir: &D,
101        segment_id: SegmentId,
102        schema: Arc<Schema>,
103        doc_id_offset: DocId,
104        cache_blocks: usize,
105    ) -> Result<Self> {
106        let files = SegmentFiles::new(segment_id.0);
107
108        // Read metadata (small, always loaded)
109        let meta_slice = dir.open_read(&files.meta).await?;
110        let meta_bytes = meta_slice.read_bytes().await?;
111        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
112        debug_assert_eq!(meta.id, segment_id.0);
113
114        // Open term dictionary with lazy loading (fetches ranges on demand)
115        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
116        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
117
118        // Get postings file handle (lazy - fetches ranges on demand)
119        let postings_handle = dir.open_lazy(&files.postings).await?;
120
121        // Open store with lazy loading
122        let store_handle = dir.open_lazy(&files.store).await?;
123        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
124
125        // Load dense vector indexes from unified .vectors file
126        let (vector_indexes, coarse_centroids) =
127            Self::load_vectors_file(dir, &files, &schema).await?;
128
129        // Load sparse vector indexes from .sparse file
130        let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
131
132        // Open positions file handle (if exists) - offsets are now in TermInfo
133        let positions_handle = Self::open_positions_file(dir, &files).await?;
134
135        Ok(Self {
136            meta,
137            term_dict: Arc::new(term_dict),
138            postings_handle,
139            store: Arc::new(store),
140            schema,
141            doc_id_offset,
142            vector_indexes,
143            coarse_centroids,
144            sparse_indexes,
145            positions_handle,
146        })
147    }
148
149    pub fn meta(&self) -> &SegmentMeta {
150        &self.meta
151    }
152
153    pub fn num_docs(&self) -> u32 {
154        self.meta.num_docs
155    }
156
157    /// Get average field length for BM25F scoring
158    pub fn avg_field_len(&self, field: Field) -> f32 {
159        self.meta.avg_field_len(field)
160    }
161
162    pub fn doc_id_offset(&self) -> DocId {
163        self.doc_id_offset
164    }
165
166    pub fn schema(&self) -> &Schema {
167        &self.schema
168    }
169
170    /// Get sparse indexes for all fields
171    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
172        &self.sparse_indexes
173    }
174
175    /// Get term dictionary stats for debugging
176    pub fn term_dict_stats(&self) -> SSTableStats {
177        self.term_dict.stats()
178    }
179
180    /// Get posting list for a term (async - loads on demand)
181    ///
182    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
183    /// and no additional I/O is needed. For larger lists, reads from .post file.
184    pub async fn get_postings(
185        &self,
186        field: Field,
187        term: &[u8],
188    ) -> Result<Option<BlockPostingList>> {
189        log::debug!(
190            "SegmentReader::get_postings field={} term_len={}",
191            field.0,
192            term.len()
193        );
194
195        // Build key: field_id + term
196        let mut key = Vec::with_capacity(4 + term.len());
197        key.extend_from_slice(&field.0.to_le_bytes());
198        key.extend_from_slice(term);
199
200        // Look up in term dictionary
201        let term_info = match self.term_dict.get(&key).await? {
202            Some(info) => {
203                log::debug!("SegmentReader::get_postings found term_info");
204                info
205            }
206            None => {
207                log::debug!("SegmentReader::get_postings term not found");
208                return Ok(None);
209            }
210        };
211
212        // Check if posting list is inlined
213        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
214            // Build BlockPostingList from inline data (no I/O needed!)
215            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
216            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
217                posting_list.push(doc_id, tf);
218            }
219            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
220            return Ok(Some(block_list));
221        }
222
223        // External posting list - read from postings file handle (lazy - HTTP range request)
224        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
225            Error::Corruption("TermInfo has neither inline nor external data".to_string())
226        })?;
227
228        let start = posting_offset;
229        let end = start + posting_len as u64;
230
231        if end > self.postings_handle.len() {
232            return Err(Error::Corruption(
233                "Posting offset out of bounds".to_string(),
234            ));
235        }
236
237        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
238        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
239
240        Ok(Some(block_list))
241    }
242
243    /// Get document by local doc_id (async - loads on demand)
244    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
245        self.store
246            .get(local_doc_id, &self.schema)
247            .await
248            .map_err(Error::from)
249    }
250
251    /// Prefetch term dictionary blocks for a key range
252    pub async fn prefetch_terms(
253        &self,
254        field: Field,
255        start_term: &[u8],
256        end_term: &[u8],
257    ) -> Result<()> {
258        let mut start_key = Vec::with_capacity(4 + start_term.len());
259        start_key.extend_from_slice(&field.0.to_le_bytes());
260        start_key.extend_from_slice(start_term);
261
262        let mut end_key = Vec::with_capacity(4 + end_term.len());
263        end_key.extend_from_slice(&field.0.to_le_bytes());
264        end_key.extend_from_slice(end_term);
265
266        self.term_dict.prefetch_range(&start_key, &end_key).await?;
267        Ok(())
268    }
269
270    /// Check if store uses dictionary compression (incompatible with raw merging)
271    pub fn store_has_dict(&self) -> bool {
272        self.store.has_dict()
273    }
274
275    /// Get raw store blocks for optimized merging
276    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
277        self.store.raw_blocks()
278    }
279
280    /// Get store data slice for raw block access
281    pub fn store_data_slice(&self) -> &LazyFileSlice {
282        self.store.data_slice()
283    }
284
285    /// Get all terms from this segment (for merge)
286    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
287        self.term_dict.all_entries().await.map_err(Error::from)
288    }
289
290    /// Get all terms with parsed field and term string (for statistics aggregation)
291    ///
292    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
293    /// Skips terms that aren't valid UTF-8.
294    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
295        let entries = self.term_dict.all_entries().await?;
296        let mut result = Vec::with_capacity(entries.len());
297
298        for (key, term_info) in entries {
299            // Key format: field_id (4 bytes little-endian) + term bytes
300            if key.len() > 4 {
301                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
302                let term_bytes = &key[4..];
303                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
304                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
305                }
306            }
307        }
308
309        Ok(result)
310    }
311
312    /// Get streaming iterator over term dictionary (for memory-efficient merge)
313    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
314        self.term_dict.iter()
315    }
316
317    /// Read raw posting bytes at offset
318    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
319        let start = offset;
320        let end = start + len as u64;
321        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
322        Ok(bytes.to_vec())
323    }
324
325    /// Search dense vectors using RaBitQ
326    ///
327    /// Returns (doc_id, score) pairs sorted by score (descending).
328    /// The doc_ids are adjusted by doc_id_offset for this segment.
329    /// If mrl_dim is configured, the query vector is automatically trimmed.
330    /// For multi-valued documents, scores are combined using the specified combiner.
331    pub fn search_dense_vector(
332        &self,
333        field: Field,
334        query: &[f32],
335        k: usize,
336        rerank_factor: usize,
337        combiner: crate::query::MultiValueCombiner,
338    ) -> Result<Vec<(DocId, f32)>> {
339        use crate::query::MultiValueCombiner;
340        let index = self
341            .vector_indexes
342            .get(&field.0)
343            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
344
345        // Get mrl_dim from config to trim query vector if needed
346        let mrl_dim = self
347            .schema
348            .get_field_entry(field)
349            .and_then(|e| e.dense_vector_config.as_ref())
350            .and_then(|c| c.mrl_dim);
351
352        // Trim query vector if mrl_dim is set
353        let query_vec: Vec<f32>;
354        let effective_query = if let Some(trim_dim) = mrl_dim {
355            if trim_dim < query.len() {
356                query_vec = query[..trim_dim].to_vec();
357                query_vec.as_slice()
358            } else {
359                query
360            }
361        } else {
362            query
363        };
364
365        let results: Vec<(u32, f32)> = match index {
366            VectorIndex::RaBitQ(rabitq) => rabitq
367                .search(effective_query, k, rerank_factor)
368                .into_iter()
369                .map(|(idx, dist)| (idx as u32, dist))
370                .collect(),
371            VectorIndex::IVF { index, codebook } => {
372                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
373                    Error::Schema("IVF index requires coarse centroids".to_string())
374                })?;
375                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
376                index.search(centroids, codebook, effective_query, k, Some(nprobe))
377            }
378            VectorIndex::ScaNN { index, codebook } => {
379                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
380                    Error::Schema("ScaNN index requires coarse centroids".to_string())
381                })?;
382                let nprobe = rerank_factor.max(32);
383                index.search(centroids, codebook, effective_query, k, Some(nprobe))
384            }
385        };
386
387        // Convert distance to score (smaller distance = higher score)
388        // and adjust doc_ids by segment offset
389        let raw_results: Vec<(DocId, f32)> = results
390            .into_iter()
391            .map(|(idx, dist)| {
392                let doc_id = idx as DocId + self.doc_id_offset;
393                let score = 1.0 / (1.0 + dist); // Convert distance to similarity score
394                (doc_id, score)
395            })
396            .collect();
397
398        // Combine scores for duplicate doc_ids (multi-valued documents)
399        let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
400            rustc_hash::FxHashMap::default();
401        for (doc_id, score) in raw_results {
402            combined
403                .entry(doc_id)
404                .and_modify(|(acc_score, count)| match combiner {
405                    MultiValueCombiner::Sum => *acc_score += score,
406                    MultiValueCombiner::Max => *acc_score = acc_score.max(score),
407                    MultiValueCombiner::Avg => {
408                        *acc_score += score;
409                        *count += 1;
410                    }
411                })
412                .or_insert((score, 1));
413        }
414
415        // Finalize averages and collect results
416        let mut final_results: Vec<(DocId, f32)> = combined
417            .into_iter()
418            .map(|(doc_id, (score, count))| {
419                let final_score = if combiner == MultiValueCombiner::Avg {
420                    score / count as f32
421                } else {
422                    score
423                };
424                (doc_id, final_score)
425            })
426            .collect();
427
428        // Sort by score descending and take top k
429        final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
430        final_results.truncate(k);
431
432        Ok(final_results)
433    }
434
435    /// Check if this segment has a dense vector index for the given field
436    pub fn has_dense_vector_index(&self, field: Field) -> bool {
437        self.vector_indexes.contains_key(&field.0)
438    }
439
440    /// Get the dense vector index for a field (if available)
441    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
442        match self.vector_indexes.get(&field.0) {
443            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
444            _ => None,
445        }
446    }
447
448    /// Get the IVF vector index for a field (if available)
449    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
450        match self.vector_indexes.get(&field.0) {
451            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
452            _ => None,
453        }
454    }
455
456    /// Get the ScaNN vector index for a field (if available)
457    pub fn get_scann_vector_index(
458        &self,
459        field: Field,
460    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
461        match self.vector_indexes.get(&field.0) {
462            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
463            _ => None,
464        }
465    }
466
467    /// Get the vector index type for a field
468    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
469        self.vector_indexes.get(&field.0)
470    }
471
472    /// Search for similar sparse vectors using dedicated sparse posting lists
473    ///
474    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
475    /// Optimizations (via WandExecutor):
476    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
477    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
478    /// 3. **Top-K heap**: Efficient score collection
479    ///
480    /// Returns (doc_id, score) pairs sorted by score descending.
481    pub async fn search_sparse_vector(
482        &self,
483        field: Field,
484        vector: &[(u32, f32)],
485        limit: usize,
486        combiner: crate::query::MultiValueCombiner,
487    ) -> Result<Vec<(u32, f32)>> {
488        use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
489
490        // Get sparse index for this field
491        let sparse_index = match self.sparse_indexes.get(&field.0) {
492            Some(idx) => idx,
493            None => return Ok(Vec::new()),
494        };
495
496        // Build scorers for each dimension that exists in the index
497        let scorers: Vec<SparseTermScorer> = vector
498            .iter()
499            .filter_map(|&(dim_id, query_weight)| {
500                // Direct indexing: O(1) lookup
501                sparse_index
502                    .postings
503                    .get(dim_id as usize)
504                    .and_then(|opt| opt.as_ref())
505                    .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
506            })
507            .collect();
508
509        if scorers.is_empty() {
510            return Ok(Vec::new());
511        }
512
513        // Use shared WandExecutor for top-k retrieval
514        // Note: For multi-valued fields, same doc_id may appear multiple times
515        // with different scores that need to be combined
516        let raw_results = WandExecutor::new(scorers, limit * 2).execute(); // Over-fetch for combining
517
518        // Combine scores for duplicate doc_ids based on combiner strategy
519        let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
520        for r in raw_results {
521            combined
522                .entry(r.doc_id)
523                .and_modify(|(score, count)| match combiner {
524                    MultiValueCombiner::Sum => *score += r.score,
525                    MultiValueCombiner::Max => *score = score.max(r.score),
526                    MultiValueCombiner::Avg => {
527                        *score += r.score;
528                        *count += 1;
529                    }
530                })
531                .or_insert((r.score, 1));
532        }
533
534        // Finalize averages and collect results
535        let mut results: Vec<(u32, f32)> = combined
536            .into_iter()
537            .map(|(doc_id, (score, count))| {
538                let final_score = if combiner == MultiValueCombiner::Avg {
539                    score / count as f32
540                } else {
541                    score
542                };
543                (doc_id, final_score)
544            })
545            .collect();
546
547        // Sort by score descending and take top limit
548        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
549        results.truncate(limit);
550
551        Ok(results)
552    }
553
554    /// Load dense vector indexes from unified .vectors file
555    ///
556    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
557    /// Also loads coarse centroids and PQ codebook as needed.
558    ///
559    /// Memory optimization: Uses lazy range reads to load each index separately,
560    /// avoiding loading the entire vectors file into memory at once.
561    async fn load_vectors_file<D: Directory>(
562        dir: &D,
563        files: &SegmentFiles,
564        schema: &Schema,
565    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
566        use byteorder::{LittleEndian, ReadBytesExt};
567        use std::io::Cursor;
568
569        let mut indexes = FxHashMap::default();
570        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
571
572        // Skip loading vectors file if schema has no dense vector fields
573        let has_dense_vectors = schema
574            .fields()
575            .any(|(_, entry)| entry.dense_vector_config.is_some());
576        if !has_dense_vectors {
577            return Ok((indexes, None));
578        }
579
580        // Try to open vectors file (may not exist if no vectors were indexed)
581        let handle = match dir.open_lazy(&files.vectors).await {
582            Ok(h) => h,
583            Err(_) => return Ok((indexes, None)),
584        };
585
586        // Read only the header first (4 bytes for num_fields)
587        let header_bytes = match handle.read_bytes_range(0..4).await {
588            Ok(b) => b,
589            Err(_) => return Ok((indexes, None)),
590        };
591
592        if header_bytes.is_empty() {
593            return Ok((indexes, None));
594        }
595
596        let mut cursor = Cursor::new(header_bytes.as_slice());
597        let num_fields = cursor.read_u32::<LittleEndian>()?;
598
599        if num_fields == 0 {
600            return Ok((indexes, None));
601        }
602
603        // Read field entries header: (field_id: 4, index_type: 1, offset: 8, length: 8) = 21 bytes per field
604        let entries_size = num_fields as u64 * 21;
605        let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
606        let mut cursor = Cursor::new(entries_bytes.as_slice());
607
608        // Read field entries (field_id, index_type, offset, length)
609        let mut entries = Vec::with_capacity(num_fields as usize);
610        for _ in 0..num_fields {
611            let field_id = cursor.read_u32::<LittleEndian>()?;
612            // Try to read index_type - if this fails, assume old format without type
613            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
614            let offset = cursor.read_u64::<LittleEndian>()?;
615            let length = cursor.read_u64::<LittleEndian>()?;
616            entries.push((field_id, index_type, offset, length));
617        }
618
619        // Load each index on-demand using range reads (memory efficient)
620        for (field_id, index_type, offset, length) in entries {
621            // Read only this index's data
622            let data = handle.read_bytes_range(offset..offset + length).await?;
623            let field = crate::dsl::Field(field_id);
624
625            match index_type {
626                2 => {
627                    // ScaNN (IVF-PQ)
628                    if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
629                        // Load coarse centroids if not already loaded
630                        if coarse_centroids.is_none()
631                            && let Some(entry) = schema.get_field_entry(field)
632                            && let Some(ref config) = entry.dense_vector_config
633                            && let Some(ref path) = config.coarse_centroids_path
634                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
635                        {
636                            coarse_centroids = Some(Arc::new(c));
637                        }
638
639                        // Load PQ codebook
640                        if let Some(entry) = schema.get_field_entry(field)
641                            && let Some(ref config) = entry.dense_vector_config
642                            && let Some(ref path) = config.pq_codebook_path
643                            && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
644                        {
645                            indexes.insert(
646                                field_id,
647                                VectorIndex::ScaNN {
648                                    index: Arc::new(ivfpq_index),
649                                    codebook: Arc::new(codebook),
650                                },
651                            );
652                        }
653                    }
654                }
655                1 => {
656                    // IVF-RaBitQ
657                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
658                    {
659                        if coarse_centroids.is_none()
660                            && let Some(entry) = schema.get_field_entry(field)
661                            && let Some(ref config) = entry.dense_vector_config
662                            && let Some(ref path) = config.coarse_centroids_path
663                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
664                        {
665                            coarse_centroids = Some(Arc::new(c));
666                        }
667                        // Create a codebook for the IVF index
668                        let codebook = Arc::new(RaBitQCodebook::new(
669                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
670                        ));
671                        indexes.insert(
672                            field_id,
673                            VectorIndex::IVF {
674                                index: Arc::new(ivf_index),
675                                codebook,
676                            },
677                        );
678                    }
679                }
680                0 => {
681                    // RaBitQ
682                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
683                    {
684                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
685                    }
686                }
687                _ => {
688                    // Legacy format without type - try to auto-detect
689                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
690                    {
691                        if coarse_centroids.is_none()
692                            && let Some(entry) = schema.get_field_entry(field)
693                            && let Some(ref config) = entry.dense_vector_config
694                            && let Some(ref path) = config.coarse_centroids_path
695                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
696                        {
697                            coarse_centroids = Some(Arc::new(c));
698                        }
699                        // Create a codebook for the IVF index
700                        let codebook = Arc::new(RaBitQCodebook::new(
701                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
702                        ));
703                        indexes.insert(
704                            field_id,
705                            VectorIndex::IVF {
706                                index: Arc::new(ivf_index),
707                                codebook,
708                            },
709                        );
710                    } else if let Ok(rabitq_index) =
711                        serde_json::from_slice::<RaBitQIndex>(data.as_slice())
712                    {
713                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
714                    }
715                }
716            }
717        }
718
719        Ok((indexes, coarse_centroids))
720    }
721
722    /// Load sparse vector indexes from .sparse file
723    ///
724    /// File format (direct-indexed table for O(1) dimension lookup):
725    /// - Header: num_fields (u32)
726    /// - For each field:
727    ///   - field_id (u32)
728    ///   - quantization (u8)
729    ///   - max_dim_id (u32)          ← table size
730    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed
731    ///     (offset=0, length=0 means dimension not present)
732    /// - Data: concatenated serialized BlockSparsePostingList
733    async fn load_sparse_file<D: Directory>(
734        dir: &D,
735        files: &SegmentFiles,
736        total_docs: u32,
737    ) -> Result<FxHashMap<u32, SparseIndex>> {
738        use byteorder::{LittleEndian, ReadBytesExt};
739        use std::io::Cursor;
740
741        let mut indexes = FxHashMap::default();
742
743        // Try to open sparse file (may not exist if no sparse vectors were indexed)
744        let handle = match dir.open_lazy(&files.sparse).await {
745            Ok(h) => h,
746            Err(_) => return Ok(indexes),
747        };
748
749        // Read the entire file (sparse files are typically small enough)
750        let data = match handle.read_bytes().await {
751            Ok(d) => d,
752            Err(_) => return Ok(indexes),
753        };
754
755        if data.len() < 4 {
756            return Ok(indexes);
757        }
758
759        let mut cursor = Cursor::new(data.as_slice());
760        let num_fields = cursor.read_u32::<LittleEndian>()?;
761
762        if num_fields == 0 {
763            return Ok(indexes);
764        }
765
766        // Read field entries and build indexes
767        for _ in 0..num_fields {
768            let field_id = cursor.read_u32::<LittleEndian>()?;
769            let _quantization = cursor.read_u8()?; // Already stored in each BlockSparsePostingList
770            let max_dim_id = cursor.read_u32::<LittleEndian>()?;
771
772            // Read direct-indexed table
773            let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
774                vec![None; max_dim_id as usize];
775
776            for dim_id in 0..max_dim_id {
777                let offset = cursor.read_u64::<LittleEndian>()?;
778                let length = cursor.read_u32::<LittleEndian>()?;
779
780                // offset=0, length=0 means dimension not present
781                if length > 0 {
782                    let start = offset as usize;
783                    let end = start + length as usize;
784                    if end <= data.len() {
785                        let posting_data = &data.as_slice()[start..end];
786                        if let Ok(posting_list) =
787                            BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
788                        {
789                            postings[dim_id as usize] = Some(Arc::new(posting_list));
790                        }
791                    }
792                }
793            }
794
795            indexes.insert(
796                field_id,
797                SparseIndex {
798                    postings,
799                    total_docs,
800                },
801            );
802        }
803
804        Ok(indexes)
805    }
806
807    /// Load position index header from .pos file
808    ///
809    /// File format:
810    /// Open positions file handle (no header parsing needed - offsets are in TermInfo)
811    async fn open_positions_file<D: Directory>(
812        dir: &D,
813        files: &SegmentFiles,
814    ) -> Result<Option<LazyFileHandle>> {
815        // Try to open positions file (may not exist if no positions were indexed)
816        match dir.open_lazy(&files.positions).await {
817            Ok(h) => Ok(Some(h)),
818            Err(_) => Ok(None),
819        }
820    }
821
822    /// Get positions for a term (for phrase queries)
823    ///
824    /// Position offsets are now embedded in TermInfo, so we first look up
825    /// the term to get its TermInfo, then use position_info() to get the offset.
826    pub async fn get_positions(
827        &self,
828        field: Field,
829        term: &[u8],
830    ) -> Result<Option<crate::structures::PositionPostingList>> {
831        use std::io::Cursor;
832
833        // Get positions handle
834        let handle = match &self.positions_handle {
835            Some(h) => h,
836            None => return Ok(None),
837        };
838
839        // Build key: field_id + term
840        let mut key = Vec::with_capacity(4 + term.len());
841        key.extend_from_slice(&field.0.to_le_bytes());
842        key.extend_from_slice(term);
843
844        // Look up term in dictionary to get TermInfo with position offset
845        let term_info = match self.term_dict.get(&key).await? {
846            Some(info) => info,
847            None => return Ok(None),
848        };
849
850        // Get position offset from TermInfo
851        let (offset, length) = match term_info.position_info() {
852            Some((o, l)) => (o, l),
853            None => return Ok(None),
854        };
855
856        // Read the position data
857        let slice = handle.slice(offset..offset + length as u64);
858        let data = slice.read_bytes().await?;
859
860        // Deserialize
861        let mut cursor = Cursor::new(data.as_slice());
862        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
863
864        Ok(Some(pos_list))
865    }
866
867    /// Check if positions are available for a field
868    pub fn has_positions(&self, field: Field) -> bool {
869        // Check schema for position mode on this field
870        if let Some(entry) = self.schema.get_field_entry(field) {
871            entry.positions.is_some()
872        } else {
873            false
874        }
875    }
876}
877
878/// Alias for AsyncSegmentReader
879pub type SegmentReader = AsyncSegmentReader;