Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: LazyFileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Shared coarse centroids for IVF search (loaded once)
77    coarse_centroids: Option<Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116        let coarse_centroids = vectors_data.coarse_centroids;
117
118        // Load sparse vector indexes from .sparse file
119        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
120
121        // Open positions file handle (if exists) - offsets are now in TermInfo
122        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
123
124        // Log segment loading stats (compact format: ~24 bytes per active dim in hashmap)
125        let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
126        let sparse_mem = sparse_dims * 24; // HashMap entry overhead
127        log::debug!(
128            "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
129            segment_id.0,
130            meta.num_docs,
131            sparse_dims,
132            sparse_mem as f64 / 1024.0,
133            flat_vectors.len(),
134            vector_indexes.len()
135        );
136
137        Ok(Self {
138            meta,
139            term_dict: Arc::new(term_dict),
140            postings_handle,
141            store: Arc::new(store),
142            schema,
143            doc_id_offset,
144            vector_indexes,
145            flat_vectors,
146            coarse_centroids,
147            sparse_indexes,
148            positions_handle,
149        })
150    }
151
152    pub fn meta(&self) -> &SegmentMeta {
153        &self.meta
154    }
155
156    pub fn num_docs(&self) -> u32 {
157        self.meta.num_docs
158    }
159
160    /// Get average field length for BM25F scoring
161    pub fn avg_field_len(&self, field: Field) -> f32 {
162        self.meta.avg_field_len(field)
163    }
164
165    pub fn doc_id_offset(&self) -> DocId {
166        self.doc_id_offset
167    }
168
169    /// Set the doc_id_offset (used for parallel segment loading)
170    pub fn set_doc_id_offset(&mut self, offset: DocId) {
171        self.doc_id_offset = offset;
172    }
173
174    pub fn schema(&self) -> &Schema {
175        &self.schema
176    }
177
178    /// Get sparse indexes for all fields
179    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
180        &self.sparse_indexes
181    }
182
183    /// Get vector indexes for all fields
184    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
185        &self.vector_indexes
186    }
187
188    /// Get lazy flat vectors for all fields (for reranking and merge)
189    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
190        &self.flat_vectors
191    }
192
193    /// Get term dictionary stats for debugging
194    pub fn term_dict_stats(&self) -> SSTableStats {
195        self.term_dict.stats()
196    }
197
198    /// Estimate memory usage of this segment reader
199    pub fn memory_stats(&self) -> SegmentMemoryStats {
200        let term_dict_stats = self.term_dict.stats();
201
202        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
203        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
204
205        // Store cache: similar estimate
206        let store_cache_bytes = self.store.cached_blocks() * 4096;
207
208        // Sparse index: each dimension has a posting list in memory
209        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
210        let sparse_index_bytes: usize = self
211            .sparse_indexes
212            .values()
213            .map(|s| s.num_dimensions() * 24)
214            .sum();
215
216        // Dense index: vectors are memory-mapped, but we track index structures
217        // RaBitQ/IVF indexes have cluster assignments in memory
218        let dense_index_bytes: usize = self
219            .vector_indexes
220            .values()
221            .map(|v| v.estimated_memory_bytes())
222            .sum();
223
224        SegmentMemoryStats {
225            segment_id: self.meta.id,
226            num_docs: self.meta.num_docs,
227            term_dict_cache_bytes,
228            store_cache_bytes,
229            sparse_index_bytes,
230            dense_index_bytes,
231            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
232        }
233    }
234
235    /// Get posting list for a term (async - loads on demand)
236    ///
237    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
238    /// and no additional I/O is needed. For larger lists, reads from .post file.
239    pub async fn get_postings(
240        &self,
241        field: Field,
242        term: &[u8],
243    ) -> Result<Option<BlockPostingList>> {
244        log::debug!(
245            "SegmentReader::get_postings field={} term_len={}",
246            field.0,
247            term.len()
248        );
249
250        // Build key: field_id + term
251        let mut key = Vec::with_capacity(4 + term.len());
252        key.extend_from_slice(&field.0.to_le_bytes());
253        key.extend_from_slice(term);
254
255        // Look up in term dictionary
256        let term_info = match self.term_dict.get(&key).await? {
257            Some(info) => {
258                log::debug!("SegmentReader::get_postings found term_info");
259                info
260            }
261            None => {
262                log::debug!("SegmentReader::get_postings term not found");
263                return Ok(None);
264            }
265        };
266
267        // Check if posting list is inlined
268        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
269            // Build BlockPostingList from inline data (no I/O needed!)
270            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
271            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
272                posting_list.push(doc_id, tf);
273            }
274            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
275            return Ok(Some(block_list));
276        }
277
278        // External posting list - read from postings file handle (lazy - HTTP range request)
279        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
280            Error::Corruption("TermInfo has neither inline nor external data".to_string())
281        })?;
282
283        let start = posting_offset;
284        let end = start + posting_len as u64;
285
286        if end > self.postings_handle.len() {
287            return Err(Error::Corruption(
288                "Posting offset out of bounds".to_string(),
289            ));
290        }
291
292        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
293        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
294
295        Ok(Some(block_list))
296    }
297
298    /// Get document by local doc_id (async - loads on demand).
299    ///
300    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
301    /// Uses binary search on sorted doc_ids for O(log N) lookup.
302    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
303        let mut doc = match self.store.get(local_doc_id, &self.schema).await {
304            Ok(Some(d)) => d,
305            Ok(None) => return Ok(None),
306            Err(e) => return Err(Error::from(e)),
307        };
308
309        // Hydrate dense vector fields from flat vector data
310        for (&field_id, lazy_flat) in &self.flat_vectors {
311            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
312            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
313                let flat_idx = start + j;
314                match lazy_flat.get_vector(flat_idx).await {
315                    Ok(vec) => {
316                        doc.add_dense_vector(Field(field_id), vec);
317                    }
318                    Err(e) => {
319                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
320                    }
321                }
322            }
323        }
324
325        Ok(Some(doc))
326    }
327
328    /// Prefetch term dictionary blocks for a key range
329    pub async fn prefetch_terms(
330        &self,
331        field: Field,
332        start_term: &[u8],
333        end_term: &[u8],
334    ) -> Result<()> {
335        let mut start_key = Vec::with_capacity(4 + start_term.len());
336        start_key.extend_from_slice(&field.0.to_le_bytes());
337        start_key.extend_from_slice(start_term);
338
339        let mut end_key = Vec::with_capacity(4 + end_term.len());
340        end_key.extend_from_slice(&field.0.to_le_bytes());
341        end_key.extend_from_slice(end_term);
342
343        self.term_dict.prefetch_range(&start_key, &end_key).await?;
344        Ok(())
345    }
346
347    /// Check if store uses dictionary compression (incompatible with raw merging)
348    pub fn store_has_dict(&self) -> bool {
349        self.store.has_dict()
350    }
351
352    /// Get store reference for merge operations
353    pub fn store(&self) -> &super::store::AsyncStoreReader {
354        &self.store
355    }
356
357    /// Get raw store blocks for optimized merging
358    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
359        self.store.raw_blocks()
360    }
361
362    /// Get store data slice for raw block access
363    pub fn store_data_slice(&self) -> &LazyFileSlice {
364        self.store.data_slice()
365    }
366
367    /// Get all terms from this segment (for merge)
368    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
369        self.term_dict.all_entries().await.map_err(Error::from)
370    }
371
372    /// Get all terms with parsed field and term string (for statistics aggregation)
373    ///
374    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
375    /// Skips terms that aren't valid UTF-8.
376    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
377        let entries = self.term_dict.all_entries().await?;
378        let mut result = Vec::with_capacity(entries.len());
379
380        for (key, term_info) in entries {
381            // Key format: field_id (4 bytes little-endian) + term bytes
382            if key.len() > 4 {
383                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
384                let term_bytes = &key[4..];
385                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
386                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
387                }
388            }
389        }
390
391        Ok(result)
392    }
393
394    /// Get streaming iterator over term dictionary (for memory-efficient merge)
395    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
396        self.term_dict.iter()
397    }
398
399    /// Prefetch all term dictionary blocks in a single bulk I/O call.
400    ///
401    /// Call before merge iteration to eliminate per-block cache misses.
402    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
403        self.term_dict
404            .prefetch_all_data_bulk()
405            .await
406            .map_err(crate::Error::from)
407    }
408
409    /// Read raw posting bytes at offset
410    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
411        let start = offset;
412        let end = start + len as u64;
413        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
414        Ok(bytes.to_vec())
415    }
416
417    /// Read raw position bytes at offset (for merge)
418    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
419        let handle = match &self.positions_handle {
420            Some(h) => h,
421            None => return Ok(None),
422        };
423        let start = offset;
424        let end = start + len as u64;
425        let bytes = handle.read_bytes_range(start..end).await?;
426        Ok(Some(bytes.to_vec()))
427    }
428
429    /// Check if this segment has a positions file
430    pub fn has_positions_file(&self) -> bool {
431        self.positions_handle.is_some()
432    }
433
434    /// Batch cosine scoring on raw quantized bytes.
435    ///
436    /// Dispatches to the appropriate SIMD scorer based on quantization type.
437    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
438    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
439    fn score_quantized_batch(
440        query: &[f32],
441        raw: &[u8],
442        quant: crate::dsl::DenseVectorQuantization,
443        dim: usize,
444        scores: &mut [f32],
445    ) {
446        match quant {
447            crate::dsl::DenseVectorQuantization::F32 => {
448                let num_floats = scores.len() * dim;
449                debug_assert!(
450                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
451                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
452                );
453                let vectors: &[f32] =
454                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
455                crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
456            }
457            crate::dsl::DenseVectorQuantization::F16 => {
458                crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
459            }
460            crate::dsl::DenseVectorQuantization::UInt8 => {
461                crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
462            }
463        }
464    }
465
466    /// Search dense vectors using RaBitQ
467    ///
468    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
469    /// The doc_ids are adjusted by doc_id_offset for this segment.
470    /// For multi-valued documents, scores are combined using the specified combiner.
471    pub async fn search_dense_vector(
472        &self,
473        field: Field,
474        query: &[f32],
475        k: usize,
476        nprobe: usize,
477        rerank_factor: usize,
478        combiner: crate::query::MultiValueCombiner,
479    ) -> Result<Vec<VectorSearchResult>> {
480        let ann_index = self.vector_indexes.get(&field.0);
481        let lazy_flat = self.flat_vectors.get(&field.0);
482
483        // No vectors at all for this field
484        if ann_index.is_none() && lazy_flat.is_none() {
485            return Ok(Vec::new());
486        }
487
488        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
489        const BRUTE_FORCE_BATCH: usize = 4096;
490
491        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
492        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
493            // ANN search (RaBitQ, IVF, ScaNN)
494            match index {
495                VectorIndex::RaBitQ(rabitq) => {
496                    let fetch_k = k * rerank_factor.max(1);
497                    rabitq
498                        .search(query, fetch_k, rerank_factor)
499                        .into_iter()
500                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
501                        .collect()
502                }
503                VectorIndex::IVF { index, codebook } => {
504                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
505                        Error::Schema("IVF index requires coarse centroids".to_string())
506                    })?;
507                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
508                    let fetch_k = k * rerank_factor.max(1);
509                    index
510                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
511                        .into_iter()
512                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
513                        .collect()
514                }
515                VectorIndex::ScaNN { index, codebook } => {
516                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
517                        Error::Schema("ScaNN index requires coarse centroids".to_string())
518                    })?;
519                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
520                    let fetch_k = k * rerank_factor.max(1);
521                    index
522                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
523                        .into_iter()
524                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
525                        .collect()
526                }
527            }
528        } else if let Some(lazy_flat) = lazy_flat {
529            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
530            // Uses a top-k heap to avoid collecting and sorting all N candidates.
531            let dim = lazy_flat.dim;
532            let n = lazy_flat.num_vectors;
533            let quant = lazy_flat.quantization;
534            let fetch_k = k * rerank_factor.max(1);
535            let mut collector = crate::query::ScoreCollector::new(fetch_k);
536
537            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
538                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
539                let batch_bytes = lazy_flat
540                    .read_vectors_batch(batch_start, batch_count)
541                    .await
542                    .map_err(crate::Error::Io)?;
543                let raw = batch_bytes.as_slice();
544
545                let mut scores = vec![0f32; batch_count];
546                Self::score_quantized_batch(query, raw, quant, dim, &mut scores);
547
548                for (i, &score) in scores.iter().enumerate().take(batch_count) {
549                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
550                    collector.insert_with_ordinal(doc_id, score, ordinal);
551                }
552            }
553
554            collector
555                .into_sorted_results()
556                .into_iter()
557                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
558                .collect()
559        } else {
560            return Ok(Vec::new());
561        };
562
563        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
564        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
565        if ann_index.is_some()
566            && !results.is_empty()
567            && let Some(lazy_flat) = lazy_flat
568        {
569            let dim = lazy_flat.dim;
570            let quant = lazy_flat.quantization;
571            let vbs = lazy_flat.vector_byte_size();
572
573            // Resolve flat indexes for each candidate via binary search
574            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
575            for (ri, c) in results.iter().enumerate() {
576                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
577                for (j, &(_, ord)) in entries.iter().enumerate() {
578                    if ord == c.1 {
579                        resolved.push((ri, start + j));
580                        break;
581                    }
582                }
583            }
584
585            if !resolved.is_empty() {
586                // Sort by flat_idx for sequential mmap access (better page locality)
587                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
588
589                // Batch-read raw quantized bytes into contiguous buffer
590                let mut raw_buf = vec![0u8; resolved.len() * vbs];
591                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
592                    let _ = lazy_flat
593                        .read_vector_raw_into(
594                            flat_idx,
595                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
596                        )
597                        .await;
598                }
599
600                // Native-precision batch SIMD cosine scoring
601                let mut scores = vec![0f32; resolved.len()];
602                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
603
604                // Write scores back to results
605                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
606                    results[ri].2 = scores[buf_idx];
607                }
608            }
609
610            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
611            results.truncate(k * rerank_factor.max(1));
612        }
613
614        // Track ordinals with individual scores for each doc_id
615        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
616        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
617            rustc_hash::FxHashMap::default();
618        for (doc_id, ordinal, score) in results {
619            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
620            ordinals.push((ordinal as u32, score));
621        }
622
623        // Combine scores and build results with ordinal tracking
624        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
625            .into_iter()
626            .map(|(doc_id, ordinals)| {
627                let combined_score = combiner.combine(&ordinals);
628                VectorSearchResult::new(doc_id, combined_score, ordinals)
629            })
630            .collect();
631
632        // Sort by score descending and take top k
633        final_results.sort_by(|a, b| {
634            b.score
635                .partial_cmp(&a.score)
636                .unwrap_or(std::cmp::Ordering::Equal)
637        });
638        final_results.truncate(k);
639
640        Ok(final_results)
641    }
642
643    /// Check if this segment has dense vectors for the given field
644    pub fn has_dense_vector_index(&self, field: Field) -> bool {
645        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
646    }
647
648    /// Get the dense vector index for a field (if available)
649    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
650        match self.vector_indexes.get(&field.0) {
651            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
652            _ => None,
653        }
654    }
655
656    /// Get the IVF vector index for a field (if available)
657    pub fn get_ivf_vector_index(
658        &self,
659        field: Field,
660    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
661        match self.vector_indexes.get(&field.0) {
662            Some(VectorIndex::IVF { index, codebook }) => Some((index.clone(), codebook.clone())),
663            _ => None,
664        }
665    }
666
667    /// Get coarse centroids (shared across IVF/ScaNN indexes)
668    pub fn coarse_centroids(&self) -> Option<&Arc<CoarseCentroids>> {
669        self.coarse_centroids.as_ref()
670    }
671
672    /// Get the ScaNN vector index for a field (if available)
673    pub fn get_scann_vector_index(
674        &self,
675        field: Field,
676    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
677        match self.vector_indexes.get(&field.0) {
678            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
679            _ => None,
680        }
681    }
682
683    /// Get the vector index type for a field
684    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
685        self.vector_indexes.get(&field.0)
686    }
687
688    /// Search for similar sparse vectors using dedicated sparse posting lists
689    ///
690    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
691    /// Optimizations (via WandExecutor):
692    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
693    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
694    /// 3. **Top-K heap**: Efficient score collection
695    ///
696    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
697    pub async fn search_sparse_vector(
698        &self,
699        field: Field,
700        vector: &[(u32, f32)],
701        limit: usize,
702        combiner: crate::query::MultiValueCombiner,
703        heap_factor: f32,
704    ) -> Result<Vec<VectorSearchResult>> {
705        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
706
707        let query_tokens = vector.len();
708
709        // Get sparse index for this field
710        let sparse_index = match self.sparse_indexes.get(&field.0) {
711            Some(idx) => idx,
712            None => {
713                log::debug!(
714                    "Sparse vector search: no index for field {}, returning empty",
715                    field.0
716                );
717                return Ok(Vec::new());
718            }
719        };
720
721        let index_dimensions = sparse_index.num_dimensions();
722
723        // Filter query terms to only those present in the index
724        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
725        let mut missing_tokens = Vec::new();
726
727        for &(dim_id, query_weight) in vector {
728            if sparse_index.has_dimension(dim_id) {
729                matched_terms.push((dim_id, query_weight));
730            } else {
731                missing_tokens.push(dim_id);
732            }
733        }
734
735        log::debug!(
736            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
737            query_tokens,
738            matched_terms.len(),
739            missing_tokens.len(),
740            index_dimensions
741        );
742
743        if log::log_enabled!(log::Level::Debug) {
744            let query_details: Vec<_> = vector
745                .iter()
746                .take(30)
747                .map(|(id, w)| format!("{}:{:.3}", id, w))
748                .collect();
749            log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
750        }
751
752        if !missing_tokens.is_empty() {
753            log::debug!(
754                "Missing token IDs (not in index): {:?}",
755                missing_tokens.iter().take(20).collect::<Vec<_>>()
756            );
757        }
758
759        if matched_terms.is_empty() {
760            log::debug!("Sparse vector search: no matching tokens, returning empty");
761            return Ok(Vec::new());
762        }
763
764        // Select executor based on number of query terms:
765        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
766        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
767        let num_terms = matched_terms.len();
768        let over_fetch = limit * 2; // Over-fetch for multi-value combining
769        let raw_results = if num_terms > 12 {
770            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
771            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
772                .execute()
773                .await?
774        } else {
775            // Load posting lists only for the few terms (1-11) used by BlockMaxScore
776            let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
777                Vec::with_capacity(num_terms);
778            for &(dim_id, query_weight) in &matched_terms {
779                if let Some(pl) = sparse_index.get_posting(dim_id).await? {
780                    posting_lists.push((dim_id, query_weight, pl));
781                }
782            }
783            let scorers: Vec<SparseTermScorer> = posting_lists
784                .iter()
785                .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
786                .collect();
787            if scorers.is_empty() {
788                return Ok(Vec::new());
789            }
790            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
791        };
792
793        log::trace!(
794            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
795            raw_results.len(),
796            self.doc_id_offset
797        );
798        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
799            for r in raw_results.iter().take(5) {
800                log::trace!(
801                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
802                    r.doc_id,
803                    r.doc_id + self.doc_id_offset,
804                    r.score,
805                    r.ordinal
806                );
807            }
808        }
809
810        // Track ordinals with individual scores for each doc_id
811        // Now using real ordinals from the posting lists
812        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
813            rustc_hash::FxHashMap::default();
814        for r in raw_results {
815            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
816            ordinals.push((r.ordinal as u32, r.score));
817        }
818
819        // Combine scores and build results with ordinal tracking
820        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
821        let mut results: Vec<VectorSearchResult> = doc_ordinals
822            .into_iter()
823            .map(|(doc_id, ordinals)| {
824                let combined_score = combiner.combine(&ordinals);
825                VectorSearchResult::new(doc_id, combined_score, ordinals)
826            })
827            .collect();
828
829        // Sort by score descending and take top limit
830        results.sort_by(|a, b| {
831            b.score
832                .partial_cmp(&a.score)
833                .unwrap_or(std::cmp::Ordering::Equal)
834        });
835        results.truncate(limit);
836
837        Ok(results)
838    }
839
840    /// Get positions for a term (for phrase queries)
841    ///
842    /// Position offsets are now embedded in TermInfo, so we first look up
843    /// the term to get its TermInfo, then use position_info() to get the offset.
844    pub async fn get_positions(
845        &self,
846        field: Field,
847        term: &[u8],
848    ) -> Result<Option<crate::structures::PositionPostingList>> {
849        use std::io::Cursor;
850
851        // Get positions handle
852        let handle = match &self.positions_handle {
853            Some(h) => h,
854            None => return Ok(None),
855        };
856
857        // Build key: field_id + term
858        let mut key = Vec::with_capacity(4 + term.len());
859        key.extend_from_slice(&field.0.to_le_bytes());
860        key.extend_from_slice(term);
861
862        // Look up term in dictionary to get TermInfo with position offset
863        let term_info = match self.term_dict.get(&key).await? {
864            Some(info) => info,
865            None => return Ok(None),
866        };
867
868        // Get position offset from TermInfo
869        let (offset, length) = match term_info.position_info() {
870            Some((o, l)) => (o, l),
871            None => return Ok(None),
872        };
873
874        // Read the position data
875        let slice = handle.slice(offset..offset + length as u64);
876        let data = slice.read_bytes().await?;
877
878        // Deserialize
879        let mut cursor = Cursor::new(data.as_slice());
880        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
881
882        Ok(Some(pos_list))
883    }
884
885    /// Check if positions are available for a field
886    pub fn has_positions(&self, field: Field) -> bool {
887        // Check schema for position mode on this field
888        if let Some(entry) = self.schema.get_field_entry(field) {
889            entry.positions.is_some()
890        } else {
891            false
892        }
893    }
894}
895
896/// Alias for AsyncSegmentReader
897pub type SegmentReader = AsyncSegmentReader;