Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: LazyFileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Shared coarse centroids for IVF search (loaded once)
77    coarse_centroids: Option<Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116        let coarse_centroids = vectors_data.coarse_centroids;
117
118        // Load sparse vector indexes from .sparse file
119        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
120
121        // Open positions file handle (if exists) - offsets are now in TermInfo
122        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
123
124        // Log segment loading stats (compact format: ~24 bytes per active dim in hashmap)
125        let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
126        let sparse_mem = sparse_dims * 24; // HashMap entry overhead
127        log::debug!(
128            "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, vectors={}",
129            segment_id.0,
130            meta.num_docs,
131            sparse_dims,
132            sparse_mem as f64 / 1024.0,
133            vector_indexes.len()
134        );
135
136        Ok(Self {
137            meta,
138            term_dict: Arc::new(term_dict),
139            postings_handle,
140            store: Arc::new(store),
141            schema,
142            doc_id_offset,
143            vector_indexes,
144            flat_vectors,
145            coarse_centroids,
146            sparse_indexes,
147            positions_handle,
148        })
149    }
150
151    pub fn meta(&self) -> &SegmentMeta {
152        &self.meta
153    }
154
155    pub fn num_docs(&self) -> u32 {
156        self.meta.num_docs
157    }
158
159    /// Get average field length for BM25F scoring
160    pub fn avg_field_len(&self, field: Field) -> f32 {
161        self.meta.avg_field_len(field)
162    }
163
164    pub fn doc_id_offset(&self) -> DocId {
165        self.doc_id_offset
166    }
167
168    /// Set the doc_id_offset (used for parallel segment loading)
169    pub fn set_doc_id_offset(&mut self, offset: DocId) {
170        self.doc_id_offset = offset;
171    }
172
173    pub fn schema(&self) -> &Schema {
174        &self.schema
175    }
176
177    /// Get sparse indexes for all fields
178    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
179        &self.sparse_indexes
180    }
181
182    /// Get vector indexes for all fields
183    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
184        &self.vector_indexes
185    }
186
187    /// Get lazy flat vectors for all fields (for reranking and merge)
188    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
189        &self.flat_vectors
190    }
191
192    /// Get term dictionary stats for debugging
193    pub fn term_dict_stats(&self) -> SSTableStats {
194        self.term_dict.stats()
195    }
196
197    /// Estimate memory usage of this segment reader
198    pub fn memory_stats(&self) -> SegmentMemoryStats {
199        let term_dict_stats = self.term_dict.stats();
200
201        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
202        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
203
204        // Store cache: similar estimate
205        let store_cache_bytes = self.store.cached_blocks() * 4096;
206
207        // Sparse index: each dimension has a posting list in memory
208        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
209        let sparse_index_bytes: usize = self
210            .sparse_indexes
211            .values()
212            .map(|s| s.num_dimensions() * 24)
213            .sum();
214
215        // Dense index: vectors are memory-mapped, but we track index structures
216        // RaBitQ/IVF indexes have cluster assignments in memory
217        let dense_index_bytes: usize = self
218            .vector_indexes
219            .values()
220            .map(|v| v.estimated_memory_bytes())
221            .sum();
222
223        SegmentMemoryStats {
224            segment_id: self.meta.id,
225            num_docs: self.meta.num_docs,
226            term_dict_cache_bytes,
227            store_cache_bytes,
228            sparse_index_bytes,
229            dense_index_bytes,
230            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
231        }
232    }
233
234    /// Get posting list for a term (async - loads on demand)
235    ///
236    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
237    /// and no additional I/O is needed. For larger lists, reads from .post file.
238    pub async fn get_postings(
239        &self,
240        field: Field,
241        term: &[u8],
242    ) -> Result<Option<BlockPostingList>> {
243        log::debug!(
244            "SegmentReader::get_postings field={} term_len={}",
245            field.0,
246            term.len()
247        );
248
249        // Build key: field_id + term
250        let mut key = Vec::with_capacity(4 + term.len());
251        key.extend_from_slice(&field.0.to_le_bytes());
252        key.extend_from_slice(term);
253
254        // Look up in term dictionary
255        let term_info = match self.term_dict.get(&key).await? {
256            Some(info) => {
257                log::debug!("SegmentReader::get_postings found term_info");
258                info
259            }
260            None => {
261                log::debug!("SegmentReader::get_postings term not found");
262                return Ok(None);
263            }
264        };
265
266        // Check if posting list is inlined
267        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
268            // Build BlockPostingList from inline data (no I/O needed!)
269            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
270            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
271                posting_list.push(doc_id, tf);
272            }
273            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
274            return Ok(Some(block_list));
275        }
276
277        // External posting list - read from postings file handle (lazy - HTTP range request)
278        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
279            Error::Corruption("TermInfo has neither inline nor external data".to_string())
280        })?;
281
282        let start = posting_offset;
283        let end = start + posting_len as u64;
284
285        if end > self.postings_handle.len() {
286            return Err(Error::Corruption(
287                "Posting offset out of bounds".to_string(),
288            ));
289        }
290
291        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
292        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
293
294        Ok(Some(block_list))
295    }
296
297    /// Get document by local doc_id (async - loads on demand)
298    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
299        self.store
300            .get(local_doc_id, &self.schema)
301            .await
302            .map_err(Error::from)
303    }
304
305    /// Prefetch term dictionary blocks for a key range
306    pub async fn prefetch_terms(
307        &self,
308        field: Field,
309        start_term: &[u8],
310        end_term: &[u8],
311    ) -> Result<()> {
312        let mut start_key = Vec::with_capacity(4 + start_term.len());
313        start_key.extend_from_slice(&field.0.to_le_bytes());
314        start_key.extend_from_slice(start_term);
315
316        let mut end_key = Vec::with_capacity(4 + end_term.len());
317        end_key.extend_from_slice(&field.0.to_le_bytes());
318        end_key.extend_from_slice(end_term);
319
320        self.term_dict.prefetch_range(&start_key, &end_key).await?;
321        Ok(())
322    }
323
324    /// Check if store uses dictionary compression (incompatible with raw merging)
325    pub fn store_has_dict(&self) -> bool {
326        self.store.has_dict()
327    }
328
329    /// Get store reference for merge operations
330    pub fn store(&self) -> &super::store::AsyncStoreReader {
331        &self.store
332    }
333
334    /// Get raw store blocks for optimized merging
335    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
336        self.store.raw_blocks()
337    }
338
339    /// Get store data slice for raw block access
340    pub fn store_data_slice(&self) -> &LazyFileSlice {
341        self.store.data_slice()
342    }
343
344    /// Get all terms from this segment (for merge)
345    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
346        self.term_dict.all_entries().await.map_err(Error::from)
347    }
348
349    /// Get all terms with parsed field and term string (for statistics aggregation)
350    ///
351    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
352    /// Skips terms that aren't valid UTF-8.
353    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
354        let entries = self.term_dict.all_entries().await?;
355        let mut result = Vec::with_capacity(entries.len());
356
357        for (key, term_info) in entries {
358            // Key format: field_id (4 bytes little-endian) + term bytes
359            if key.len() > 4 {
360                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
361                let term_bytes = &key[4..];
362                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
363                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
364                }
365            }
366        }
367
368        Ok(result)
369    }
370
371    /// Get streaming iterator over term dictionary (for memory-efficient merge)
372    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
373        self.term_dict.iter()
374    }
375
376    /// Prefetch all term dictionary blocks in a single bulk I/O call.
377    ///
378    /// Call before merge iteration to eliminate per-block cache misses.
379    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
380        self.term_dict
381            .prefetch_all_data_bulk()
382            .await
383            .map_err(crate::Error::from)
384    }
385
386    /// Read raw posting bytes at offset
387    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
388        let start = offset;
389        let end = start + len as u64;
390        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
391        Ok(bytes.to_vec())
392    }
393
394    /// Read raw position bytes at offset (for merge)
395    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
396        let handle = match &self.positions_handle {
397            Some(h) => h,
398            None => return Ok(None),
399        };
400        let start = offset;
401        let end = start + len as u64;
402        let bytes = handle.read_bytes_range(start..end).await?;
403        Ok(Some(bytes.to_vec()))
404    }
405
406    /// Check if this segment has a positions file
407    pub fn has_positions_file(&self) -> bool {
408        self.positions_handle.is_some()
409    }
410
411    /// Search dense vectors using RaBitQ
412    ///
413    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
414    /// The doc_ids are adjusted by doc_id_offset for this segment.
415    /// If mrl_dim is configured, the query vector is automatically trimmed.
416    /// For multi-valued documents, scores are combined using the specified combiner.
417    pub async fn search_dense_vector(
418        &self,
419        field: Field,
420        query: &[f32],
421        k: usize,
422        nprobe: usize,
423        rerank_factor: usize,
424        combiner: crate::query::MultiValueCombiner,
425    ) -> Result<Vec<VectorSearchResult>> {
426        // Get mrl_dim from config to trim query vector if needed
427        let mrl_dim = self
428            .schema
429            .get_field_entry(field)
430            .and_then(|e| e.dense_vector_config.as_ref())
431            .and_then(|c| c.mrl_dim);
432
433        // Trim query vector if mrl_dim is set
434        let query_vec: Vec<f32>;
435        let effective_query = if let Some(trim_dim) = mrl_dim {
436            if trim_dim < query.len() {
437                query_vec = query[..trim_dim].to_vec();
438                query_vec.as_slice()
439            } else {
440                query
441            }
442        } else {
443            query
444        };
445
446        let ann_index = self.vector_indexes.get(&field.0);
447        let lazy_flat = self.flat_vectors.get(&field.0);
448
449        // No vectors at all for this field
450        if ann_index.is_none() && lazy_flat.is_none() {
451            return Ok(Vec::new());
452        }
453
454        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
455        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
456            // ANN search (RaBitQ, IVF, ScaNN)
457            match index {
458                VectorIndex::RaBitQ(rabitq) => {
459                    let fetch_k = k * rerank_factor.max(1);
460                    rabitq
461                        .search(effective_query, fetch_k, rerank_factor)
462                        .into_iter()
463                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
464                        .collect()
465                }
466                VectorIndex::IVF { index, codebook } => {
467                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
468                        Error::Schema("IVF index requires coarse centroids".to_string())
469                    })?;
470                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
471                    let fetch_k = k * rerank_factor.max(1);
472                    index
473                        .search(
474                            centroids,
475                            codebook,
476                            effective_query,
477                            fetch_k,
478                            Some(effective_nprobe),
479                        )
480                        .into_iter()
481                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
482                        .collect()
483                }
484                VectorIndex::ScaNN { index, codebook } => {
485                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
486                        Error::Schema("ScaNN index requires coarse centroids".to_string())
487                    })?;
488                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
489                    let fetch_k = k * rerank_factor.max(1);
490                    index
491                        .search(
492                            centroids,
493                            codebook,
494                            effective_query,
495                            fetch_k,
496                            Some(effective_nprobe),
497                        )
498                        .into_iter()
499                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
500                        .collect()
501                }
502            }
503        } else if let Some(lazy_flat) = lazy_flat {
504            // Brute-force from lazy flat vectors (mmap-backed range read)
505            let all_bytes = lazy_flat
506                .read_all_vector_bytes()
507                .await
508                .map_err(crate::Error::Io)?;
509            let raw = all_bytes.as_slice();
510            let full_dim = lazy_flat.dim;
511            let n = lazy_flat.num_vectors;
512            let total_floats = n * full_dim;
513
514            // Use mmap bytes directly if already f32-aligned, otherwise copy once
515            let mut aligned_buf: Vec<f32> = Vec::new();
516            let vectors: &[f32] =
517                if (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()) {
518                    // Zero-copy: reinterpret aligned mmap bytes as &[f32]
519                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, total_floats) }
520                } else {
521                    // Fallback: copy into aligned buffer (rare — mmap is page-aligned)
522                    aligned_buf.resize(total_floats, 0.0);
523                    unsafe {
524                        std::ptr::copy_nonoverlapping(
525                            raw.as_ptr(),
526                            aligned_buf.as_mut_ptr() as *mut u8,
527                            total_floats * std::mem::size_of::<f32>(),
528                        );
529                    }
530                    &aligned_buf
531                };
532
533            // Strided SIMD cosine: dim = effective_query.len(), stride = full_dim
534            // MRL: dim < stride → only prefix touched, rest skipped. No copy.
535            let score_dim = effective_query.len();
536            let mut scores = vec![0f32; n];
537            crate::structures::simd::batch_cosine_scores_strided(
538                effective_query,
539                vectors,
540                score_dim,
541                full_dim,
542                &mut scores,
543            );
544
545            let mut candidates: Vec<(u32, u16, f32)> = (0..n)
546                .map(|i| {
547                    let (doc_id, ordinal) = lazy_flat.get_doc_id(i);
548                    (doc_id, ordinal, scores[i])
549                })
550                .collect();
551            candidates.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
552            candidates.truncate(k * rerank_factor.max(1));
553            candidates
554        } else {
555            return Ok(Vec::new());
556        };
557
558        // Rerank ANN candidates using raw vectors from lazy flat (mmap, no store reads)
559        // Batched: build doc_id→index lookup, batch-read vectors, batch cosine scores
560        if ann_index.is_some()
561            && !results.is_empty()
562            && let Some(lazy_flat) = lazy_flat
563        {
564            let dim = lazy_flat.dim;
565
566            // Build lookup: (doc_id, ordinal) → flat index
567            let lookup: rustc_hash::FxHashMap<(u32, u16), usize> = lazy_flat
568                .doc_ids
569                .iter()
570                .enumerate()
571                .map(|(i, &(d, o))| ((d, o), i))
572                .collect();
573
574            // Resolve flat indexes for each candidate
575            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
576            for (ri, c) in results.iter().enumerate() {
577                if let Some(&flat_idx) = lookup.get(&(c.0, c.1)) {
578                    resolved.push((ri, flat_idx));
579                }
580            }
581
582            if !resolved.is_empty() {
583                // Batch-read all needed vectors directly into contiguous buffer (no intermediate alloc)
584                let mut vec_buf = vec![0f32; resolved.len() * dim];
585                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
586                    let _ = lazy_flat
587                        .read_vector_into(
588                            flat_idx,
589                            &mut vec_buf[buf_idx * dim..(buf_idx + 1) * dim],
590                        )
591                        .await;
592                }
593
594                // Batch SIMD cosine with full query (not MRL-trimmed)
595                let mut scores = vec![0f32; resolved.len()];
596                crate::structures::simd::batch_cosine_scores(query, &vec_buf, dim, &mut scores);
597
598                // Write scores back to results
599                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
600                    results[ri].2 = scores[buf_idx];
601                }
602            }
603
604            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
605            results.truncate(k * rerank_factor.max(1));
606        }
607
608        // Track ordinals with individual scores for each doc_id
609        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
610        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
611            rustc_hash::FxHashMap::default();
612        for (doc_id, ordinal, score) in results {
613            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
614            ordinals.push((ordinal as u32, score));
615        }
616
617        // Combine scores and build results with ordinal tracking
618        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
619            .into_iter()
620            .map(|(doc_id, ordinals)| {
621                let combined_score = combiner.combine(&ordinals);
622                VectorSearchResult::new(doc_id, combined_score, ordinals)
623            })
624            .collect();
625
626        // Sort by score descending and take top k
627        final_results.sort_by(|a, b| {
628            b.score
629                .partial_cmp(&a.score)
630                .unwrap_or(std::cmp::Ordering::Equal)
631        });
632        final_results.truncate(k);
633
634        Ok(final_results)
635    }
636
637    /// Check if this segment has dense vectors for the given field
638    pub fn has_dense_vector_index(&self, field: Field) -> bool {
639        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
640    }
641
642    /// Get the dense vector index for a field (if available)
643    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
644        match self.vector_indexes.get(&field.0) {
645            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
646            _ => None,
647        }
648    }
649
650    /// Get the IVF vector index for a field (if available)
651    pub fn get_ivf_vector_index(
652        &self,
653        field: Field,
654    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
655        match self.vector_indexes.get(&field.0) {
656            Some(VectorIndex::IVF { index, codebook }) => Some((index.clone(), codebook.clone())),
657            _ => None,
658        }
659    }
660
661    /// Get coarse centroids (shared across IVF/ScaNN indexes)
662    pub fn coarse_centroids(&self) -> Option<&Arc<CoarseCentroids>> {
663        self.coarse_centroids.as_ref()
664    }
665
666    /// Get the ScaNN vector index for a field (if available)
667    pub fn get_scann_vector_index(
668        &self,
669        field: Field,
670    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
671        match self.vector_indexes.get(&field.0) {
672            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
673            _ => None,
674        }
675    }
676
677    /// Get the vector index type for a field
678    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
679        self.vector_indexes.get(&field.0)
680    }
681
682    /// Search for similar sparse vectors using dedicated sparse posting lists
683    ///
684    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
685    /// Optimizations (via WandExecutor):
686    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
687    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
688    /// 3. **Top-K heap**: Efficient score collection
689    ///
690    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
691    pub async fn search_sparse_vector(
692        &self,
693        field: Field,
694        vector: &[(u32, f32)],
695        limit: usize,
696        combiner: crate::query::MultiValueCombiner,
697        heap_factor: f32,
698    ) -> Result<Vec<VectorSearchResult>> {
699        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
700
701        let query_tokens = vector.len();
702
703        // Get sparse index for this field
704        let sparse_index = match self.sparse_indexes.get(&field.0) {
705            Some(idx) => idx,
706            None => {
707                log::debug!(
708                    "Sparse vector search: no index for field {}, returning empty",
709                    field.0
710                );
711                return Ok(Vec::new());
712            }
713        };
714
715        let index_dimensions = sparse_index.num_dimensions();
716
717        // Build scorers for each dimension that exists in the index
718        // Load posting lists on-demand (lazy loading via mmap)
719        // Keep Arc references alive for the duration of scoring
720        let mut matched_tokens = Vec::new();
721        let mut missing_tokens = Vec::new();
722        let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
723            Vec::with_capacity(vector.len());
724
725        for &(dim_id, query_weight) in vector {
726            // Check if dimension exists before loading
727            if !sparse_index.has_dimension(dim_id) {
728                missing_tokens.push(dim_id);
729                continue;
730            }
731
732            // Load posting list on-demand (async, uses mmap)
733            match sparse_index.get_posting(dim_id).await? {
734                Some(pl) => {
735                    matched_tokens.push(dim_id);
736                    posting_lists.push((dim_id, query_weight, pl));
737                }
738                None => {
739                    missing_tokens.push(dim_id);
740                }
741            }
742        }
743
744        // Create scorers from the loaded posting lists (borrows from posting_lists)
745        let scorers: Vec<SparseTermScorer> = posting_lists
746            .iter()
747            .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
748            .collect();
749
750        log::debug!(
751            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
752            query_tokens,
753            matched_tokens.len(),
754            missing_tokens.len(),
755            index_dimensions
756        );
757
758        // Log query tokens with their IDs and weights
759        if log::log_enabled!(log::Level::Debug) {
760            let query_details: Vec<_> = vector
761                .iter()
762                .take(30)
763                .map(|(id, w)| format!("{}:{:.3}", id, w))
764                .collect();
765            log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
766        }
767
768        if !matched_tokens.is_empty() {
769            log::debug!(
770                "Matched token IDs: {:?}",
771                matched_tokens.iter().take(20).collect::<Vec<_>>()
772            );
773        }
774
775        if !missing_tokens.is_empty() {
776            log::debug!(
777                "Missing token IDs (not in index): {:?}",
778                missing_tokens.iter().take(20).collect::<Vec<_>>()
779            );
780        }
781
782        if scorers.is_empty() {
783            log::debug!("Sparse vector search: no matching tokens, returning empty");
784            return Ok(Vec::new());
785        }
786
787        // Select executor based on number of query terms:
788        // - 12+ terms: BMP (block-at-a-time, best for SPLADE expansions)
789        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
790        let num_terms = scorers.len();
791        let over_fetch = limit * 2; // Over-fetch for multi-value combining
792        let raw_results = if num_terms > 12 {
793            // BMP: use posting lists directly (not iterators)
794            let pl_refs: Vec<_> = posting_lists
795                .iter()
796                .map(|(_, _, pl)| Arc::clone(pl))
797                .collect();
798            let weights: Vec<_> = posting_lists.iter().map(|(_, qw, _)| *qw).collect();
799            drop(scorers); // Release borrowing iterators before using posting_lists
800            BmpExecutor::new(pl_refs, weights, over_fetch, heap_factor).execute()
801        } else {
802            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
803        };
804
805        log::trace!(
806            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
807            raw_results.len(),
808            self.doc_id_offset
809        );
810        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
811            for r in raw_results.iter().take(5) {
812                log::trace!(
813                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
814                    r.doc_id,
815                    r.doc_id + self.doc_id_offset,
816                    r.score,
817                    r.ordinal
818                );
819            }
820        }
821
822        // Track ordinals with individual scores for each doc_id
823        // Now using real ordinals from the posting lists
824        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
825            rustc_hash::FxHashMap::default();
826        for r in raw_results {
827            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
828            ordinals.push((r.ordinal as u32, r.score));
829        }
830
831        // Combine scores and build results with ordinal tracking
832        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
833        let mut results: Vec<VectorSearchResult> = doc_ordinals
834            .into_iter()
835            .map(|(doc_id, ordinals)| {
836                let combined_score = combiner.combine(&ordinals);
837                VectorSearchResult::new(doc_id, combined_score, ordinals)
838            })
839            .collect();
840
841        // Sort by score descending and take top limit
842        results.sort_by(|a, b| {
843            b.score
844                .partial_cmp(&a.score)
845                .unwrap_or(std::cmp::Ordering::Equal)
846        });
847        results.truncate(limit);
848
849        Ok(results)
850    }
851
852    /// Get positions for a term (for phrase queries)
853    ///
854    /// Position offsets are now embedded in TermInfo, so we first look up
855    /// the term to get its TermInfo, then use position_info() to get the offset.
856    pub async fn get_positions(
857        &self,
858        field: Field,
859        term: &[u8],
860    ) -> Result<Option<crate::structures::PositionPostingList>> {
861        use std::io::Cursor;
862
863        // Get positions handle
864        let handle = match &self.positions_handle {
865            Some(h) => h,
866            None => return Ok(None),
867        };
868
869        // Build key: field_id + term
870        let mut key = Vec::with_capacity(4 + term.len());
871        key.extend_from_slice(&field.0.to_le_bytes());
872        key.extend_from_slice(term);
873
874        // Look up term in dictionary to get TermInfo with position offset
875        let term_info = match self.term_dict.get(&key).await? {
876            Some(info) => info,
877            None => return Ok(None),
878        };
879
880        // Get position offset from TermInfo
881        let (offset, length) = match term_info.position_info() {
882            Some((o, l)) => (o, l),
883            None => return Ok(None),
884        };
885
886        // Read the position data
887        let slice = handle.slice(offset..offset + length as u64);
888        let data = slice.read_bytes().await?;
889
890        // Deserialize
891        let mut cursor = Cursor::new(data.as_slice());
892        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
893
894        Ok(Some(pos_list))
895    }
896
897    /// Check if positions are available for a field
898    pub fn has_positions(&self, field: Field) -> bool {
899        // Check schema for position mode on this field
900        if let Some(entry) = self.schema.get_field_entry(field) {
901            entry.positions.is_some()
902        } else {
903            false
904        }
905    }
906}
907
908/// Alias for AsyncSegmentReader
909pub type SegmentReader = AsyncSegmentReader;