Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: LazyFileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Per-field coarse centroids for IVF/ScaNN search
77    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116
117        // Load sparse vector indexes from .sparse file
118        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120        // Open positions file handle (if exists) - offsets are now in TermInfo
121        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123        // Log segment loading stats
124        {
125            let mut parts = vec![format!(
126                "[segment] loaded {:016x}: docs={}",
127                segment_id.0, meta.num_docs
128            )];
129            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
130                parts.push(format!(
131                    "dense: {} ann + {} flat fields",
132                    vector_indexes.len(),
133                    flat_vectors.len()
134                ));
135            }
136            for (field_id, idx) in &sparse_indexes {
137                parts.push(format!(
138                    "sparse field {}: {} dims, ~{:.1} KB",
139                    field_id,
140                    idx.num_dimensions(),
141                    idx.num_dimensions() as f64 * 24.0 / 1024.0
142                ));
143            }
144            log::debug!("{}", parts.join(", "));
145        }
146
147        Ok(Self {
148            meta,
149            term_dict: Arc::new(term_dict),
150            postings_handle,
151            store: Arc::new(store),
152            schema,
153            doc_id_offset,
154            vector_indexes,
155            flat_vectors,
156            coarse_centroids: FxHashMap::default(),
157            sparse_indexes,
158            positions_handle,
159        })
160    }
161
162    pub fn meta(&self) -> &SegmentMeta {
163        &self.meta
164    }
165
166    pub fn num_docs(&self) -> u32 {
167        self.meta.num_docs
168    }
169
170    /// Get average field length for BM25F scoring
171    pub fn avg_field_len(&self, field: Field) -> f32 {
172        self.meta.avg_field_len(field)
173    }
174
175    pub fn doc_id_offset(&self) -> DocId {
176        self.doc_id_offset
177    }
178
179    /// Set the doc_id_offset (used for parallel segment loading)
180    pub fn set_doc_id_offset(&mut self, offset: DocId) {
181        self.doc_id_offset = offset;
182    }
183
184    pub fn schema(&self) -> &Schema {
185        &self.schema
186    }
187
188    /// Get sparse indexes for all fields
189    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
190        &self.sparse_indexes
191    }
192
193    /// Get vector indexes for all fields
194    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
195        &self.vector_indexes
196    }
197
198    /// Get lazy flat vectors for all fields (for reranking and merge)
199    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
200        &self.flat_vectors
201    }
202
203    /// Get term dictionary stats for debugging
204    pub fn term_dict_stats(&self) -> SSTableStats {
205        self.term_dict.stats()
206    }
207
208    /// Estimate memory usage of this segment reader
209    pub fn memory_stats(&self) -> SegmentMemoryStats {
210        let term_dict_stats = self.term_dict.stats();
211
212        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
213        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
214
215        // Store cache: similar estimate
216        let store_cache_bytes = self.store.cached_blocks() * 4096;
217
218        // Sparse index: each dimension has a posting list in memory
219        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
220        let sparse_index_bytes: usize = self
221            .sparse_indexes
222            .values()
223            .map(|s| s.num_dimensions() * 24)
224            .sum();
225
226        // Dense index: vectors are memory-mapped, but we track index structures
227        // RaBitQ/IVF indexes have cluster assignments in memory
228        let dense_index_bytes: usize = self
229            .vector_indexes
230            .values()
231            .map(|v| v.estimated_memory_bytes())
232            .sum();
233
234        SegmentMemoryStats {
235            segment_id: self.meta.id,
236            num_docs: self.meta.num_docs,
237            term_dict_cache_bytes,
238            store_cache_bytes,
239            sparse_index_bytes,
240            dense_index_bytes,
241            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
242        }
243    }
244
245    /// Get posting list for a term (async - loads on demand)
246    ///
247    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
248    /// and no additional I/O is needed. For larger lists, reads from .post file.
249    pub async fn get_postings(
250        &self,
251        field: Field,
252        term: &[u8],
253    ) -> Result<Option<BlockPostingList>> {
254        log::debug!(
255            "SegmentReader::get_postings field={} term_len={}",
256            field.0,
257            term.len()
258        );
259
260        // Build key: field_id + term
261        let mut key = Vec::with_capacity(4 + term.len());
262        key.extend_from_slice(&field.0.to_le_bytes());
263        key.extend_from_slice(term);
264
265        // Look up in term dictionary
266        let term_info = match self.term_dict.get(&key).await? {
267            Some(info) => {
268                log::debug!("SegmentReader::get_postings found term_info");
269                info
270            }
271            None => {
272                log::debug!("SegmentReader::get_postings term not found");
273                return Ok(None);
274            }
275        };
276
277        // Check if posting list is inlined
278        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
279            // Build BlockPostingList from inline data (no I/O needed!)
280            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
281            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
282                posting_list.push(doc_id, tf);
283            }
284            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
285            return Ok(Some(block_list));
286        }
287
288        // External posting list - read from postings file handle (lazy - HTTP range request)
289        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
290            Error::Corruption("TermInfo has neither inline nor external data".to_string())
291        })?;
292
293        let start = posting_offset;
294        let end = start + posting_len as u64;
295
296        if end > self.postings_handle.len() {
297            return Err(Error::Corruption(
298                "Posting offset out of bounds".to_string(),
299            ));
300        }
301
302        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
303        let block_list = BlockPostingList::deserialize(posting_bytes.as_slice())?;
304
305        Ok(Some(block_list))
306    }
307
308    /// Get document by local doc_id (async - loads on demand).
309    ///
310    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
311    /// Uses binary search on sorted doc_ids for O(log N) lookup.
312    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
313        self.doc_with_fields(local_doc_id, None).await
314    }
315
316    /// Get document by local doc_id, hydrating only the specified fields.
317    ///
318    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
319    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
320    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
321    pub async fn doc_with_fields(
322        &self,
323        local_doc_id: DocId,
324        fields: Option<&rustc_hash::FxHashSet<u32>>,
325    ) -> Result<Option<Document>> {
326        let mut doc = match self.store.get(local_doc_id, &self.schema).await {
327            Ok(Some(d)) => d,
328            Ok(None) => return Ok(None),
329            Err(e) => return Err(Error::from(e)),
330        };
331
332        // Hydrate dense vector fields from flat vector data
333        for (&field_id, lazy_flat) in &self.flat_vectors {
334            // Skip vector fields not in the requested set
335            if let Some(set) = fields
336                && !set.contains(&field_id)
337            {
338                continue;
339            }
340
341            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
342            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
343                let flat_idx = start + j;
344                match lazy_flat.get_vector(flat_idx).await {
345                    Ok(vec) => {
346                        doc.add_dense_vector(Field(field_id), vec);
347                    }
348                    Err(e) => {
349                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
350                    }
351                }
352            }
353        }
354
355        Ok(Some(doc))
356    }
357
358    /// Prefetch term dictionary blocks for a key range
359    pub async fn prefetch_terms(
360        &self,
361        field: Field,
362        start_term: &[u8],
363        end_term: &[u8],
364    ) -> Result<()> {
365        let mut start_key = Vec::with_capacity(4 + start_term.len());
366        start_key.extend_from_slice(&field.0.to_le_bytes());
367        start_key.extend_from_slice(start_term);
368
369        let mut end_key = Vec::with_capacity(4 + end_term.len());
370        end_key.extend_from_slice(&field.0.to_le_bytes());
371        end_key.extend_from_slice(end_term);
372
373        self.term_dict.prefetch_range(&start_key, &end_key).await?;
374        Ok(())
375    }
376
377    /// Check if store uses dictionary compression (incompatible with raw merging)
378    pub fn store_has_dict(&self) -> bool {
379        self.store.has_dict()
380    }
381
382    /// Get store reference for merge operations
383    pub fn store(&self) -> &super::store::AsyncStoreReader {
384        &self.store
385    }
386
387    /// Get raw store blocks for optimized merging
388    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
389        self.store.raw_blocks()
390    }
391
392    /// Get store data slice for raw block access
393    pub fn store_data_slice(&self) -> &LazyFileSlice {
394        self.store.data_slice()
395    }
396
397    /// Get all terms from this segment (for merge)
398    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
399        self.term_dict.all_entries().await.map_err(Error::from)
400    }
401
402    /// Get all terms with parsed field and term string (for statistics aggregation)
403    ///
404    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
405    /// Skips terms that aren't valid UTF-8.
406    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
407        let entries = self.term_dict.all_entries().await?;
408        let mut result = Vec::with_capacity(entries.len());
409
410        for (key, term_info) in entries {
411            // Key format: field_id (4 bytes little-endian) + term bytes
412            if key.len() > 4 {
413                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
414                let term_bytes = &key[4..];
415                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
416                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
417                }
418            }
419        }
420
421        Ok(result)
422    }
423
424    /// Get streaming iterator over term dictionary (for memory-efficient merge)
425    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
426        self.term_dict.iter()
427    }
428
429    /// Prefetch all term dictionary blocks in a single bulk I/O call.
430    ///
431    /// Call before merge iteration to eliminate per-block cache misses.
432    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
433        self.term_dict
434            .prefetch_all_data_bulk()
435            .await
436            .map_err(crate::Error::from)
437    }
438
439    /// Read raw posting bytes at offset
440    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
441        let start = offset;
442        let end = start + len as u64;
443        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
444        Ok(bytes.to_vec())
445    }
446
447    /// Read raw position bytes at offset (for merge)
448    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
449        let handle = match &self.positions_handle {
450            Some(h) => h,
451            None => return Ok(None),
452        };
453        let start = offset;
454        let end = start + len as u64;
455        let bytes = handle.read_bytes_range(start..end).await?;
456        Ok(Some(bytes.to_vec()))
457    }
458
459    /// Check if this segment has a positions file
460    pub fn has_positions_file(&self) -> bool {
461        self.positions_handle.is_some()
462    }
463
464    /// Batch cosine scoring on raw quantized bytes.
465    ///
466    /// Dispatches to the appropriate SIMD scorer based on quantization type.
467    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
468    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
469    fn score_quantized_batch(
470        query: &[f32],
471        raw: &[u8],
472        quant: crate::dsl::DenseVectorQuantization,
473        dim: usize,
474        scores: &mut [f32],
475    ) {
476        match quant {
477            crate::dsl::DenseVectorQuantization::F32 => {
478                let num_floats = scores.len() * dim;
479                debug_assert!(
480                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
481                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
482                );
483                let vectors: &[f32] =
484                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
485                crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
486            }
487            crate::dsl::DenseVectorQuantization::F16 => {
488                crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
489            }
490            crate::dsl::DenseVectorQuantization::UInt8 => {
491                crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
492            }
493        }
494    }
495
496    /// Search dense vectors using RaBitQ
497    ///
498    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
499    /// The doc_ids are adjusted by doc_id_offset for this segment.
500    /// For multi-valued documents, scores are combined using the specified combiner.
501    pub async fn search_dense_vector(
502        &self,
503        field: Field,
504        query: &[f32],
505        k: usize,
506        nprobe: usize,
507        rerank_factor: usize,
508        combiner: crate::query::MultiValueCombiner,
509    ) -> Result<Vec<VectorSearchResult>> {
510        let ann_index = self.vector_indexes.get(&field.0);
511        let lazy_flat = self.flat_vectors.get(&field.0);
512
513        // No vectors at all for this field
514        if ann_index.is_none() && lazy_flat.is_none() {
515            return Ok(Vec::new());
516        }
517
518        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
519        const BRUTE_FORCE_BATCH: usize = 4096;
520
521        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
522        let t0 = std::time::Instant::now();
523        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
524            // ANN search (RaBitQ, IVF, ScaNN)
525            match index {
526                VectorIndex::RaBitQ(rabitq) => {
527                    let fetch_k = k * rerank_factor.max(1);
528                    rabitq
529                        .search(query, fetch_k, rerank_factor)
530                        .into_iter()
531                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
532                        .collect()
533                }
534                VectorIndex::IVF(lazy) => {
535                    let (index, codebook) = lazy.get().ok_or_else(|| {
536                        Error::Schema("IVF index deserialization failed".to_string())
537                    })?;
538                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
539                        Error::Schema(format!(
540                            "IVF index requires coarse centroids for field {}",
541                            field.0
542                        ))
543                    })?;
544                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
545                    let fetch_k = k * rerank_factor.max(1);
546                    index
547                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
548                        .into_iter()
549                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
550                        .collect()
551                }
552                VectorIndex::ScaNN(lazy) => {
553                    let (index, codebook) = lazy.get().ok_or_else(|| {
554                        Error::Schema("ScaNN index deserialization failed".to_string())
555                    })?;
556                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
557                        Error::Schema(format!(
558                            "ScaNN index requires coarse centroids for field {}",
559                            field.0
560                        ))
561                    })?;
562                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
563                    let fetch_k = k * rerank_factor.max(1);
564                    index
565                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
566                        .into_iter()
567                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
568                        .collect()
569                }
570            }
571        } else if let Some(lazy_flat) = lazy_flat {
572            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
573            // Uses a top-k heap to avoid collecting and sorting all N candidates.
574            log::debug!(
575                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
576                field.0,
577                lazy_flat.num_vectors,
578                lazy_flat.dim,
579                lazy_flat.quantization
580            );
581            let dim = lazy_flat.dim;
582            let n = lazy_flat.num_vectors;
583            let quant = lazy_flat.quantization;
584            let fetch_k = k * rerank_factor.max(1);
585            let mut collector = crate::query::ScoreCollector::new(fetch_k);
586            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
587
588            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
589                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
590                let batch_bytes = lazy_flat
591                    .read_vectors_batch(batch_start, batch_count)
592                    .await
593                    .map_err(crate::Error::Io)?;
594                let raw = batch_bytes.as_slice();
595
596                Self::score_quantized_batch(query, raw, quant, dim, &mut scores[..batch_count]);
597
598                for (i, &score) in scores.iter().enumerate().take(batch_count) {
599                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
600                    collector.insert_with_ordinal(doc_id, score, ordinal);
601                }
602            }
603
604            collector
605                .into_sorted_results()
606                .into_iter()
607                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
608                .collect()
609        } else {
610            return Ok(Vec::new());
611        };
612        let l1_elapsed = t0.elapsed();
613        log::debug!(
614            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
615            field.0,
616            results.len(),
617            l1_elapsed.as_secs_f64() * 1000.0
618        );
619
620        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
621        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
622        if ann_index.is_some()
623            && !results.is_empty()
624            && let Some(lazy_flat) = lazy_flat
625        {
626            let t_rerank = std::time::Instant::now();
627            let dim = lazy_flat.dim;
628            let quant = lazy_flat.quantization;
629            let vbs = lazy_flat.vector_byte_size();
630
631            // Resolve flat indexes for each candidate via binary search
632            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
633            for (ri, c) in results.iter().enumerate() {
634                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
635                for (j, &(_, ord)) in entries.iter().enumerate() {
636                    if ord == c.1 {
637                        resolved.push((ri, start + j));
638                        break;
639                    }
640                }
641            }
642
643            let t_resolve = t_rerank.elapsed();
644            if !resolved.is_empty() {
645                // Sort by flat_idx for sequential mmap access (better page locality)
646                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
647
648                // Batch-read raw quantized bytes into contiguous buffer
649                let t_read = std::time::Instant::now();
650                let mut raw_buf = vec![0u8; resolved.len() * vbs];
651                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
652                    let _ = lazy_flat
653                        .read_vector_raw_into(
654                            flat_idx,
655                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
656                        )
657                        .await;
658                }
659
660                let read_elapsed = t_read.elapsed();
661
662                // Native-precision batch SIMD cosine scoring
663                let t_score = std::time::Instant::now();
664                let mut scores = vec![0f32; resolved.len()];
665                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
666                let score_elapsed = t_score.elapsed();
667
668                // Write scores back to results
669                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
670                    results[ri].2 = scores[buf_idx];
671                }
672
673                log::debug!(
674                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
675                    field.0,
676                    resolved.len(),
677                    dim,
678                    quant,
679                    vbs,
680                    t_resolve.as_secs_f64() * 1000.0,
681                    read_elapsed.as_secs_f64() * 1000.0,
682                    score_elapsed.as_secs_f64() * 1000.0,
683                );
684            }
685
686            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
687            results.truncate(k * rerank_factor.max(1));
688            log::debug!(
689                "[search_dense] field {}: rerank total={:.1}ms",
690                field.0,
691                t_rerank.elapsed().as_secs_f64() * 1000.0
692            );
693        }
694
695        // Track ordinals with individual scores for each doc_id
696        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
697        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
698            rustc_hash::FxHashMap::default();
699        for (doc_id, ordinal, score) in results {
700            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
701            ordinals.push((ordinal as u32, score));
702        }
703
704        // Combine scores and build results with ordinal tracking
705        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
706            .into_iter()
707            .map(|(doc_id, ordinals)| {
708                let combined_score = combiner.combine(&ordinals);
709                VectorSearchResult::new(doc_id, combined_score, ordinals)
710            })
711            .collect();
712
713        // Sort by score descending and take top k
714        final_results.sort_by(|a, b| {
715            b.score
716                .partial_cmp(&a.score)
717                .unwrap_or(std::cmp::Ordering::Equal)
718        });
719        final_results.truncate(k);
720
721        Ok(final_results)
722    }
723
724    /// Check if this segment has dense vectors for the given field
725    pub fn has_dense_vector_index(&self, field: Field) -> bool {
726        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
727    }
728
729    /// Get the dense vector index for a field (if available)
730    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
731        match self.vector_indexes.get(&field.0) {
732            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
733            _ => None,
734        }
735    }
736
737    /// Get the IVF vector index for a field (if available)
738    pub fn get_ivf_vector_index(
739        &self,
740        field: Field,
741    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
742        match self.vector_indexes.get(&field.0) {
743            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
744            _ => None,
745        }
746    }
747
748    /// Get coarse centroids for a field
749    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
750        self.coarse_centroids.get(&field_id)
751    }
752
753    /// Set per-field coarse centroids from index-level trained structures
754    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
755        self.coarse_centroids = centroids;
756    }
757
758    /// Get the ScaNN vector index for a field (if available)
759    pub fn get_scann_vector_index(
760        &self,
761        field: Field,
762    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
763        match self.vector_indexes.get(&field.0) {
764            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
765            _ => None,
766        }
767    }
768
769    /// Get the vector index type for a field
770    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
771        self.vector_indexes.get(&field.0)
772    }
773
774    /// Search for similar sparse vectors using dedicated sparse posting lists
775    ///
776    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
777    /// Optimizations (via WandExecutor):
778    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
779    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
780    /// 3. **Top-K heap**: Efficient score collection
781    ///
782    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
783    pub async fn search_sparse_vector(
784        &self,
785        field: Field,
786        vector: &[(u32, f32)],
787        limit: usize,
788        combiner: crate::query::MultiValueCombiner,
789        heap_factor: f32,
790    ) -> Result<Vec<VectorSearchResult>> {
791        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
792
793        let query_tokens = vector.len();
794
795        // Get sparse index for this field
796        let sparse_index = match self.sparse_indexes.get(&field.0) {
797            Some(idx) => idx,
798            None => {
799                log::debug!(
800                    "Sparse vector search: no index for field {}, returning empty",
801                    field.0
802                );
803                return Ok(Vec::new());
804            }
805        };
806
807        let index_dimensions = sparse_index.num_dimensions();
808
809        // Filter query terms to only those present in the index
810        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
811        let mut missing_count = 0usize;
812
813        for &(dim_id, query_weight) in vector {
814            if sparse_index.has_dimension(dim_id) {
815                matched_terms.push((dim_id, query_weight));
816            } else {
817                missing_count += 1;
818            }
819        }
820
821        log::debug!(
822            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
823            query_tokens,
824            matched_terms.len(),
825            missing_count,
826            index_dimensions
827        );
828
829        if matched_terms.is_empty() {
830            log::debug!("Sparse vector search: no matching tokens, returning empty");
831            return Ok(Vec::new());
832        }
833
834        // Select executor based on number of query terms:
835        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
836        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
837        let num_terms = matched_terms.len();
838        let over_fetch = limit * 2; // Over-fetch for multi-value combining
839        let raw_results = if num_terms > 12 {
840            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
841            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
842                .execute()
843                .await?
844        } else {
845            // Load posting lists only for the few terms (1-11) used by BlockMaxScore
846            let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
847                Vec::with_capacity(num_terms);
848            for &(dim_id, query_weight) in &matched_terms {
849                if let Some(pl) = sparse_index.get_posting(dim_id).await? {
850                    posting_lists.push((dim_id, query_weight, pl));
851                }
852            }
853            let scorers: Vec<SparseTermScorer> = posting_lists
854                .iter()
855                .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
856                .collect();
857            if scorers.is_empty() {
858                return Ok(Vec::new());
859            }
860            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
861        };
862
863        log::trace!(
864            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
865            raw_results.len(),
866            self.doc_id_offset
867        );
868        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
869            for r in raw_results.iter().take(5) {
870                log::trace!(
871                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
872                    r.doc_id,
873                    r.doc_id + self.doc_id_offset,
874                    r.score,
875                    r.ordinal
876                );
877            }
878        }
879
880        // Track ordinals with individual scores for each doc_id
881        // Now using real ordinals from the posting lists
882        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
883            rustc_hash::FxHashMap::default();
884        for r in raw_results {
885            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
886            ordinals.push((r.ordinal as u32, r.score));
887        }
888
889        // Combine scores and build results with ordinal tracking
890        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
891        let mut results: Vec<VectorSearchResult> = doc_ordinals
892            .into_iter()
893            .map(|(doc_id, ordinals)| {
894                let combined_score = combiner.combine(&ordinals);
895                VectorSearchResult::new(doc_id, combined_score, ordinals)
896            })
897            .collect();
898
899        // Sort by score descending and take top limit
900        results.sort_by(|a, b| {
901            b.score
902                .partial_cmp(&a.score)
903                .unwrap_or(std::cmp::Ordering::Equal)
904        });
905        results.truncate(limit);
906
907        Ok(results)
908    }
909
910    /// Get positions for a term (for phrase queries)
911    ///
912    /// Position offsets are now embedded in TermInfo, so we first look up
913    /// the term to get its TermInfo, then use position_info() to get the offset.
914    pub async fn get_positions(
915        &self,
916        field: Field,
917        term: &[u8],
918    ) -> Result<Option<crate::structures::PositionPostingList>> {
919        // Get positions handle
920        let handle = match &self.positions_handle {
921            Some(h) => h,
922            None => return Ok(None),
923        };
924
925        // Build key: field_id + term
926        let mut key = Vec::with_capacity(4 + term.len());
927        key.extend_from_slice(&field.0.to_le_bytes());
928        key.extend_from_slice(term);
929
930        // Look up term in dictionary to get TermInfo with position offset
931        let term_info = match self.term_dict.get(&key).await? {
932            Some(info) => info,
933            None => return Ok(None),
934        };
935
936        // Get position offset from TermInfo
937        let (offset, length) = match term_info.position_info() {
938            Some((o, l)) => (o, l),
939            None => return Ok(None),
940        };
941
942        // Read the position data
943        let slice = handle.slice(offset..offset + length as u64);
944        let data = slice.read_bytes().await?;
945
946        // Deserialize
947        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
948
949        Ok(Some(pos_list))
950    }
951
952    /// Check if positions are available for a field
953    pub fn has_positions(&self, field: Field) -> bool {
954        // Check schema for position mode on this field
955        if let Some(entry) = self.schema.get_field_entry(field) {
956            entry.positions.is_some()
957        } else {
958            false
959        }
960    }
961}
962
963/// Alias for AsyncSegmentReader
964pub type SegmentReader = AsyncSegmentReader;