Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: LazyFileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Per-field coarse centroids for IVF/ScaNN search
77    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116
117        // Load sparse vector indexes from .sparse file
118        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120        // Open positions file handle (if exists) - offsets are now in TermInfo
121        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123        // Log segment loading stats (compact format: ~24 bytes per active dim in hashmap)
124        let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
125        let sparse_mem = sparse_dims * 24; // HashMap entry overhead
126        log::debug!(
127            "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
128            segment_id.0,
129            meta.num_docs,
130            sparse_dims,
131            sparse_mem as f64 / 1024.0,
132            flat_vectors.len(),
133            vector_indexes.len()
134        );
135
136        Ok(Self {
137            meta,
138            term_dict: Arc::new(term_dict),
139            postings_handle,
140            store: Arc::new(store),
141            schema,
142            doc_id_offset,
143            vector_indexes,
144            flat_vectors,
145            coarse_centroids: FxHashMap::default(),
146            sparse_indexes,
147            positions_handle,
148        })
149    }
150
151    pub fn meta(&self) -> &SegmentMeta {
152        &self.meta
153    }
154
155    pub fn num_docs(&self) -> u32 {
156        self.meta.num_docs
157    }
158
159    /// Get average field length for BM25F scoring
160    pub fn avg_field_len(&self, field: Field) -> f32 {
161        self.meta.avg_field_len(field)
162    }
163
164    pub fn doc_id_offset(&self) -> DocId {
165        self.doc_id_offset
166    }
167
168    /// Set the doc_id_offset (used for parallel segment loading)
169    pub fn set_doc_id_offset(&mut self, offset: DocId) {
170        self.doc_id_offset = offset;
171    }
172
173    pub fn schema(&self) -> &Schema {
174        &self.schema
175    }
176
177    /// Get sparse indexes for all fields
178    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
179        &self.sparse_indexes
180    }
181
182    /// Get vector indexes for all fields
183    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
184        &self.vector_indexes
185    }
186
187    /// Get lazy flat vectors for all fields (for reranking and merge)
188    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
189        &self.flat_vectors
190    }
191
192    /// Get term dictionary stats for debugging
193    pub fn term_dict_stats(&self) -> SSTableStats {
194        self.term_dict.stats()
195    }
196
197    /// Estimate memory usage of this segment reader
198    pub fn memory_stats(&self) -> SegmentMemoryStats {
199        let term_dict_stats = self.term_dict.stats();
200
201        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
202        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
203
204        // Store cache: similar estimate
205        let store_cache_bytes = self.store.cached_blocks() * 4096;
206
207        // Sparse index: each dimension has a posting list in memory
208        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
209        let sparse_index_bytes: usize = self
210            .sparse_indexes
211            .values()
212            .map(|s| s.num_dimensions() * 24)
213            .sum();
214
215        // Dense index: vectors are memory-mapped, but we track index structures
216        // RaBitQ/IVF indexes have cluster assignments in memory
217        let dense_index_bytes: usize = self
218            .vector_indexes
219            .values()
220            .map(|v| v.estimated_memory_bytes())
221            .sum();
222
223        SegmentMemoryStats {
224            segment_id: self.meta.id,
225            num_docs: self.meta.num_docs,
226            term_dict_cache_bytes,
227            store_cache_bytes,
228            sparse_index_bytes,
229            dense_index_bytes,
230            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
231        }
232    }
233
234    /// Get posting list for a term (async - loads on demand)
235    ///
236    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
237    /// and no additional I/O is needed. For larger lists, reads from .post file.
238    pub async fn get_postings(
239        &self,
240        field: Field,
241        term: &[u8],
242    ) -> Result<Option<BlockPostingList>> {
243        log::debug!(
244            "SegmentReader::get_postings field={} term_len={}",
245            field.0,
246            term.len()
247        );
248
249        // Build key: field_id + term
250        let mut key = Vec::with_capacity(4 + term.len());
251        key.extend_from_slice(&field.0.to_le_bytes());
252        key.extend_from_slice(term);
253
254        // Look up in term dictionary
255        let term_info = match self.term_dict.get(&key).await? {
256            Some(info) => {
257                log::debug!("SegmentReader::get_postings found term_info");
258                info
259            }
260            None => {
261                log::debug!("SegmentReader::get_postings term not found");
262                return Ok(None);
263            }
264        };
265
266        // Check if posting list is inlined
267        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
268            // Build BlockPostingList from inline data (no I/O needed!)
269            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
270            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
271                posting_list.push(doc_id, tf);
272            }
273            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
274            return Ok(Some(block_list));
275        }
276
277        // External posting list - read from postings file handle (lazy - HTTP range request)
278        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
279            Error::Corruption("TermInfo has neither inline nor external data".to_string())
280        })?;
281
282        let start = posting_offset;
283        let end = start + posting_len as u64;
284
285        if end > self.postings_handle.len() {
286            return Err(Error::Corruption(
287                "Posting offset out of bounds".to_string(),
288            ));
289        }
290
291        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
292        let block_list = BlockPostingList::deserialize(posting_bytes.as_slice())?;
293
294        Ok(Some(block_list))
295    }
296
297    /// Get document by local doc_id (async - loads on demand).
298    ///
299    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
300    /// Uses binary search on sorted doc_ids for O(log N) lookup.
301    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
302        self.doc_with_fields(local_doc_id, None).await
303    }
304
305    /// Get document by local doc_id, hydrating only the specified fields.
306    ///
307    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
308    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
309    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
310    pub async fn doc_with_fields(
311        &self,
312        local_doc_id: DocId,
313        fields: Option<&rustc_hash::FxHashSet<u32>>,
314    ) -> Result<Option<Document>> {
315        let mut doc = match self.store.get(local_doc_id, &self.schema).await {
316            Ok(Some(d)) => d,
317            Ok(None) => return Ok(None),
318            Err(e) => return Err(Error::from(e)),
319        };
320
321        // Hydrate dense vector fields from flat vector data
322        for (&field_id, lazy_flat) in &self.flat_vectors {
323            // Skip vector fields not in the requested set
324            if let Some(set) = fields
325                && !set.contains(&field_id)
326            {
327                continue;
328            }
329
330            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
331            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
332                let flat_idx = start + j;
333                match lazy_flat.get_vector(flat_idx).await {
334                    Ok(vec) => {
335                        doc.add_dense_vector(Field(field_id), vec);
336                    }
337                    Err(e) => {
338                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
339                    }
340                }
341            }
342        }
343
344        Ok(Some(doc))
345    }
346
347    /// Prefetch term dictionary blocks for a key range
348    pub async fn prefetch_terms(
349        &self,
350        field: Field,
351        start_term: &[u8],
352        end_term: &[u8],
353    ) -> Result<()> {
354        let mut start_key = Vec::with_capacity(4 + start_term.len());
355        start_key.extend_from_slice(&field.0.to_le_bytes());
356        start_key.extend_from_slice(start_term);
357
358        let mut end_key = Vec::with_capacity(4 + end_term.len());
359        end_key.extend_from_slice(&field.0.to_le_bytes());
360        end_key.extend_from_slice(end_term);
361
362        self.term_dict.prefetch_range(&start_key, &end_key).await?;
363        Ok(())
364    }
365
366    /// Check if store uses dictionary compression (incompatible with raw merging)
367    pub fn store_has_dict(&self) -> bool {
368        self.store.has_dict()
369    }
370
371    /// Get store reference for merge operations
372    pub fn store(&self) -> &super::store::AsyncStoreReader {
373        &self.store
374    }
375
376    /// Get raw store blocks for optimized merging
377    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
378        self.store.raw_blocks()
379    }
380
381    /// Get store data slice for raw block access
382    pub fn store_data_slice(&self) -> &LazyFileSlice {
383        self.store.data_slice()
384    }
385
386    /// Get all terms from this segment (for merge)
387    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
388        self.term_dict.all_entries().await.map_err(Error::from)
389    }
390
391    /// Get all terms with parsed field and term string (for statistics aggregation)
392    ///
393    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
394    /// Skips terms that aren't valid UTF-8.
395    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
396        let entries = self.term_dict.all_entries().await?;
397        let mut result = Vec::with_capacity(entries.len());
398
399        for (key, term_info) in entries {
400            // Key format: field_id (4 bytes little-endian) + term bytes
401            if key.len() > 4 {
402                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
403                let term_bytes = &key[4..];
404                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
405                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
406                }
407            }
408        }
409
410        Ok(result)
411    }
412
413    /// Get streaming iterator over term dictionary (for memory-efficient merge)
414    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
415        self.term_dict.iter()
416    }
417
418    /// Prefetch all term dictionary blocks in a single bulk I/O call.
419    ///
420    /// Call before merge iteration to eliminate per-block cache misses.
421    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
422        self.term_dict
423            .prefetch_all_data_bulk()
424            .await
425            .map_err(crate::Error::from)
426    }
427
428    /// Read raw posting bytes at offset
429    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
430        let start = offset;
431        let end = start + len as u64;
432        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
433        Ok(bytes.to_vec())
434    }
435
436    /// Read raw position bytes at offset (for merge)
437    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
438        let handle = match &self.positions_handle {
439            Some(h) => h,
440            None => return Ok(None),
441        };
442        let start = offset;
443        let end = start + len as u64;
444        let bytes = handle.read_bytes_range(start..end).await?;
445        Ok(Some(bytes.to_vec()))
446    }
447
448    /// Check if this segment has a positions file
449    pub fn has_positions_file(&self) -> bool {
450        self.positions_handle.is_some()
451    }
452
453    /// Batch cosine scoring on raw quantized bytes.
454    ///
455    /// Dispatches to the appropriate SIMD scorer based on quantization type.
456    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
457    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
458    fn score_quantized_batch(
459        query: &[f32],
460        raw: &[u8],
461        quant: crate::dsl::DenseVectorQuantization,
462        dim: usize,
463        scores: &mut [f32],
464    ) {
465        match quant {
466            crate::dsl::DenseVectorQuantization::F32 => {
467                let num_floats = scores.len() * dim;
468                debug_assert!(
469                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
470                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
471                );
472                let vectors: &[f32] =
473                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
474                crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
475            }
476            crate::dsl::DenseVectorQuantization::F16 => {
477                crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
478            }
479            crate::dsl::DenseVectorQuantization::UInt8 => {
480                crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
481            }
482        }
483    }
484
485    /// Search dense vectors using RaBitQ
486    ///
487    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
488    /// The doc_ids are adjusted by doc_id_offset for this segment.
489    /// For multi-valued documents, scores are combined using the specified combiner.
490    pub async fn search_dense_vector(
491        &self,
492        field: Field,
493        query: &[f32],
494        k: usize,
495        nprobe: usize,
496        rerank_factor: usize,
497        combiner: crate::query::MultiValueCombiner,
498    ) -> Result<Vec<VectorSearchResult>> {
499        let ann_index = self.vector_indexes.get(&field.0);
500        let lazy_flat = self.flat_vectors.get(&field.0);
501
502        // No vectors at all for this field
503        if ann_index.is_none() && lazy_flat.is_none() {
504            return Ok(Vec::new());
505        }
506
507        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
508        const BRUTE_FORCE_BATCH: usize = 4096;
509
510        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
511        let t0 = std::time::Instant::now();
512        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
513            // ANN search (RaBitQ, IVF, ScaNN)
514            match index {
515                VectorIndex::RaBitQ(rabitq) => {
516                    let fetch_k = k * rerank_factor.max(1);
517                    rabitq
518                        .search(query, fetch_k, rerank_factor)
519                        .into_iter()
520                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
521                        .collect()
522                }
523                VectorIndex::IVF(lazy) => {
524                    let (index, codebook) = lazy.get().ok_or_else(|| {
525                        Error::Schema("IVF index deserialization failed".to_string())
526                    })?;
527                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
528                        Error::Schema(format!(
529                            "IVF index requires coarse centroids for field {}",
530                            field.0
531                        ))
532                    })?;
533                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
534                    let fetch_k = k * rerank_factor.max(1);
535                    index
536                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
537                        .into_iter()
538                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
539                        .collect()
540                }
541                VectorIndex::ScaNN(lazy) => {
542                    let (index, codebook) = lazy.get().ok_or_else(|| {
543                        Error::Schema("ScaNN index deserialization failed".to_string())
544                    })?;
545                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
546                        Error::Schema(format!(
547                            "ScaNN index requires coarse centroids for field {}",
548                            field.0
549                        ))
550                    })?;
551                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
552                    let fetch_k = k * rerank_factor.max(1);
553                    index
554                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
555                        .into_iter()
556                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
557                        .collect()
558                }
559            }
560        } else if let Some(lazy_flat) = lazy_flat {
561            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
562            // Uses a top-k heap to avoid collecting and sorting all N candidates.
563            log::debug!(
564                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
565                field.0,
566                lazy_flat.num_vectors,
567                lazy_flat.dim,
568                lazy_flat.quantization
569            );
570            let dim = lazy_flat.dim;
571            let n = lazy_flat.num_vectors;
572            let quant = lazy_flat.quantization;
573            let fetch_k = k * rerank_factor.max(1);
574            let mut collector = crate::query::ScoreCollector::new(fetch_k);
575            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
576
577            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
578                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
579                let batch_bytes = lazy_flat
580                    .read_vectors_batch(batch_start, batch_count)
581                    .await
582                    .map_err(crate::Error::Io)?;
583                let raw = batch_bytes.as_slice();
584
585                Self::score_quantized_batch(query, raw, quant, dim, &mut scores[..batch_count]);
586
587                for (i, &score) in scores.iter().enumerate().take(batch_count) {
588                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
589                    collector.insert_with_ordinal(doc_id, score, ordinal);
590                }
591            }
592
593            collector
594                .into_sorted_results()
595                .into_iter()
596                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
597                .collect()
598        } else {
599            return Ok(Vec::new());
600        };
601        let l1_elapsed = t0.elapsed();
602        log::debug!(
603            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
604            field.0,
605            results.len(),
606            l1_elapsed.as_secs_f64() * 1000.0
607        );
608
609        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
610        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
611        if ann_index.is_some()
612            && !results.is_empty()
613            && let Some(lazy_flat) = lazy_flat
614        {
615            let t_rerank = std::time::Instant::now();
616            let dim = lazy_flat.dim;
617            let quant = lazy_flat.quantization;
618            let vbs = lazy_flat.vector_byte_size();
619
620            // Resolve flat indexes for each candidate via binary search
621            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
622            for (ri, c) in results.iter().enumerate() {
623                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
624                for (j, &(_, ord)) in entries.iter().enumerate() {
625                    if ord == c.1 {
626                        resolved.push((ri, start + j));
627                        break;
628                    }
629                }
630            }
631
632            let t_resolve = t_rerank.elapsed();
633            if !resolved.is_empty() {
634                // Sort by flat_idx for sequential mmap access (better page locality)
635                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
636
637                // Batch-read raw quantized bytes into contiguous buffer
638                let t_read = std::time::Instant::now();
639                let mut raw_buf = vec![0u8; resolved.len() * vbs];
640                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
641                    let _ = lazy_flat
642                        .read_vector_raw_into(
643                            flat_idx,
644                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
645                        )
646                        .await;
647                }
648
649                let read_elapsed = t_read.elapsed();
650
651                // Native-precision batch SIMD cosine scoring
652                let t_score = std::time::Instant::now();
653                let mut scores = vec![0f32; resolved.len()];
654                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
655                let score_elapsed = t_score.elapsed();
656
657                // Write scores back to results
658                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
659                    results[ri].2 = scores[buf_idx];
660                }
661
662                log::debug!(
663                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
664                    field.0,
665                    resolved.len(),
666                    dim,
667                    quant,
668                    vbs,
669                    t_resolve.as_secs_f64() * 1000.0,
670                    read_elapsed.as_secs_f64() * 1000.0,
671                    score_elapsed.as_secs_f64() * 1000.0,
672                );
673            }
674
675            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
676            results.truncate(k * rerank_factor.max(1));
677            log::debug!(
678                "[search_dense] field {}: rerank total={:.1}ms",
679                field.0,
680                t_rerank.elapsed().as_secs_f64() * 1000.0
681            );
682        }
683
684        // Track ordinals with individual scores for each doc_id
685        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
686        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
687            rustc_hash::FxHashMap::default();
688        for (doc_id, ordinal, score) in results {
689            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
690            ordinals.push((ordinal as u32, score));
691        }
692
693        // Combine scores and build results with ordinal tracking
694        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
695            .into_iter()
696            .map(|(doc_id, ordinals)| {
697                let combined_score = combiner.combine(&ordinals);
698                VectorSearchResult::new(doc_id, combined_score, ordinals)
699            })
700            .collect();
701
702        // Sort by score descending and take top k
703        final_results.sort_by(|a, b| {
704            b.score
705                .partial_cmp(&a.score)
706                .unwrap_or(std::cmp::Ordering::Equal)
707        });
708        final_results.truncate(k);
709
710        Ok(final_results)
711    }
712
713    /// Check if this segment has dense vectors for the given field
714    pub fn has_dense_vector_index(&self, field: Field) -> bool {
715        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
716    }
717
718    /// Get the dense vector index for a field (if available)
719    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
720        match self.vector_indexes.get(&field.0) {
721            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
722            _ => None,
723        }
724    }
725
726    /// Get the IVF vector index for a field (if available)
727    pub fn get_ivf_vector_index(
728        &self,
729        field: Field,
730    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
731        match self.vector_indexes.get(&field.0) {
732            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
733            _ => None,
734        }
735    }
736
737    /// Get coarse centroids for a field
738    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
739        self.coarse_centroids.get(&field_id)
740    }
741
742    /// Set per-field coarse centroids from index-level trained structures
743    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
744        self.coarse_centroids = centroids;
745    }
746
747    /// Get the ScaNN vector index for a field (if available)
748    pub fn get_scann_vector_index(
749        &self,
750        field: Field,
751    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
752        match self.vector_indexes.get(&field.0) {
753            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
754            _ => None,
755        }
756    }
757
758    /// Get the vector index type for a field
759    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
760        self.vector_indexes.get(&field.0)
761    }
762
763    /// Search for similar sparse vectors using dedicated sparse posting lists
764    ///
765    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
766    /// Optimizations (via WandExecutor):
767    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
768    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
769    /// 3. **Top-K heap**: Efficient score collection
770    ///
771    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
772    pub async fn search_sparse_vector(
773        &self,
774        field: Field,
775        vector: &[(u32, f32)],
776        limit: usize,
777        combiner: crate::query::MultiValueCombiner,
778        heap_factor: f32,
779    ) -> Result<Vec<VectorSearchResult>> {
780        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
781
782        let query_tokens = vector.len();
783
784        // Get sparse index for this field
785        let sparse_index = match self.sparse_indexes.get(&field.0) {
786            Some(idx) => idx,
787            None => {
788                log::debug!(
789                    "Sparse vector search: no index for field {}, returning empty",
790                    field.0
791                );
792                return Ok(Vec::new());
793            }
794        };
795
796        let index_dimensions = sparse_index.num_dimensions();
797
798        // Filter query terms to only those present in the index
799        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
800        let mut missing_count = 0usize;
801
802        for &(dim_id, query_weight) in vector {
803            if sparse_index.has_dimension(dim_id) {
804                matched_terms.push((dim_id, query_weight));
805            } else {
806                missing_count += 1;
807            }
808        }
809
810        log::debug!(
811            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
812            query_tokens,
813            matched_terms.len(),
814            missing_count,
815            index_dimensions
816        );
817
818        if matched_terms.is_empty() {
819            log::debug!("Sparse vector search: no matching tokens, returning empty");
820            return Ok(Vec::new());
821        }
822
823        // Select executor based on number of query terms:
824        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
825        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
826        let num_terms = matched_terms.len();
827        let over_fetch = limit * 2; // Over-fetch for multi-value combining
828        let raw_results = if num_terms > 12 {
829            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
830            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
831                .execute()
832                .await?
833        } else {
834            // Load posting lists only for the few terms (1-11) used by BlockMaxScore
835            let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
836                Vec::with_capacity(num_terms);
837            for &(dim_id, query_weight) in &matched_terms {
838                if let Some(pl) = sparse_index.get_posting(dim_id).await? {
839                    posting_lists.push((dim_id, query_weight, pl));
840                }
841            }
842            let scorers: Vec<SparseTermScorer> = posting_lists
843                .iter()
844                .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
845                .collect();
846            if scorers.is_empty() {
847                return Ok(Vec::new());
848            }
849            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
850        };
851
852        log::trace!(
853            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
854            raw_results.len(),
855            self.doc_id_offset
856        );
857        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
858            for r in raw_results.iter().take(5) {
859                log::trace!(
860                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
861                    r.doc_id,
862                    r.doc_id + self.doc_id_offset,
863                    r.score,
864                    r.ordinal
865                );
866            }
867        }
868
869        // Track ordinals with individual scores for each doc_id
870        // Now using real ordinals from the posting lists
871        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
872            rustc_hash::FxHashMap::default();
873        for r in raw_results {
874            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
875            ordinals.push((r.ordinal as u32, r.score));
876        }
877
878        // Combine scores and build results with ordinal tracking
879        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
880        let mut results: Vec<VectorSearchResult> = doc_ordinals
881            .into_iter()
882            .map(|(doc_id, ordinals)| {
883                let combined_score = combiner.combine(&ordinals);
884                VectorSearchResult::new(doc_id, combined_score, ordinals)
885            })
886            .collect();
887
888        // Sort by score descending and take top limit
889        results.sort_by(|a, b| {
890            b.score
891                .partial_cmp(&a.score)
892                .unwrap_or(std::cmp::Ordering::Equal)
893        });
894        results.truncate(limit);
895
896        Ok(results)
897    }
898
899    /// Get positions for a term (for phrase queries)
900    ///
901    /// Position offsets are now embedded in TermInfo, so we first look up
902    /// the term to get its TermInfo, then use position_info() to get the offset.
903    pub async fn get_positions(
904        &self,
905        field: Field,
906        term: &[u8],
907    ) -> Result<Option<crate::structures::PositionPostingList>> {
908        // Get positions handle
909        let handle = match &self.positions_handle {
910            Some(h) => h,
911            None => return Ok(None),
912        };
913
914        // Build key: field_id + term
915        let mut key = Vec::with_capacity(4 + term.len());
916        key.extend_from_slice(&field.0.to_le_bytes());
917        key.extend_from_slice(term);
918
919        // Look up term in dictionary to get TermInfo with position offset
920        let term_info = match self.term_dict.get(&key).await? {
921            Some(info) => info,
922            None => return Ok(None),
923        };
924
925        // Get position offset from TermInfo
926        let (offset, length) = match term_info.position_info() {
927            Some((o, l)) => (o, l),
928            None => return Ok(None),
929        };
930
931        // Read the position data
932        let slice = handle.slice(offset..offset + length as u64);
933        let data = slice.read_bytes().await?;
934
935        // Deserialize
936        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
937
938        Ok(Some(pos_list))
939    }
940
941    /// Check if positions are available for a field
942    pub fn has_positions(&self, field: Field) -> bool {
943        // Check schema for position mode on this field
944        if let Some(entry) = self.schema.get_field_entry(field) {
945            entry.positions.is_some()
946        } else {
947            false
948        }
949    }
950}
951
952/// Alias for AsyncSegmentReader
953pub type SegmentReader = AsyncSegmentReader;