Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3pub(crate) mod bmp;
4pub(crate) mod loader;
5mod types;
6
7pub use bmp::BmpIndex;
8#[cfg(feature = "diagnostics")]
9pub use types::DimRawData;
10pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
11
12/// Memory statistics for a single segment
13#[derive(Debug, Clone, Default)]
14pub struct SegmentMemoryStats {
15    /// Segment ID
16    pub segment_id: u128,
17    /// Number of documents in segment
18    pub num_docs: u32,
19    /// Term dictionary block cache bytes
20    pub term_dict_cache_bytes: usize,
21    /// Document store block cache bytes
22    pub store_cache_bytes: usize,
23    /// Sparse vector index bytes (in-memory posting lists)
24    pub sparse_index_bytes: usize,
25    /// Dense vector index bytes (cluster assignments, quantized codes)
26    pub dense_index_bytes: usize,
27    /// Bloom filter bytes
28    pub bloom_filter_bytes: usize,
29}
30
31impl SegmentMemoryStats {
32    /// Total estimated memory for this segment
33    pub fn total_bytes(&self) -> usize {
34        self.term_dict_cache_bytes
35            + self.store_cache_bytes
36            + self.sparse_index_bytes
37            + self.dense_index_bytes
38            + self.bloom_filter_bytes
39    }
40}
41
42use std::sync::Arc;
43
44use rustc_hash::FxHashMap;
45
46use super::vector_data::LazyFlatVectorData;
47use crate::directories::{Directory, FileHandle};
48use crate::dsl::{DenseVectorQuantization, Document, Field, Schema};
49use crate::structures::{
50    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
51    RaBitQIndex, SSTableStats, TermInfo,
52};
53use crate::{DocId, Error, Result};
54
55use super::store::{AsyncStoreReader, RawStoreBlock};
56use super::types::{SegmentFiles, SegmentId, SegmentMeta};
57
58/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
59/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
60///
61/// Fast path: when all ordinals are 0 (single-valued field), skips the HashMap
62/// grouping entirely and just sorts + truncates the raw results.
63pub(crate) fn combine_ordinal_results(
64    raw: impl IntoIterator<Item = (u32, u16, f32)>,
65    combiner: crate::query::MultiValueCombiner,
66    limit: usize,
67) -> Vec<VectorSearchResult> {
68    let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
69
70    let num_raw = collected.len();
71    if log::log_enabled!(log::Level::Debug) {
72        let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
73        ids.sort_unstable();
74        ids.dedup();
75        log::debug!(
76            "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77            num_raw,
78            ids.len(),
79            combiner,
80            limit
81        );
82    }
83
84    // Fast path: all ordinals are 0 → no grouping needed, skip HashMap
85    let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
86    if all_single {
87        let mut results: Vec<VectorSearchResult> = collected
88            .into_iter()
89            .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
90            .collect();
91        results.sort_unstable_by(|a, b| {
92            b.score
93                .partial_cmp(&a.score)
94                .unwrap_or(std::cmp::Ordering::Equal)
95        });
96        results.truncate(limit);
97        return results;
98    }
99
100    // Slow path: multi-valued field — group by doc_id, apply combiner
101    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
102        rustc_hash::FxHashMap::default();
103    for (doc_id, ordinal, score) in collected {
104        doc_ordinals
105            .entry(doc_id as DocId)
106            .or_default()
107            .push((ordinal as u32, score));
108    }
109    let mut results: Vec<VectorSearchResult> = doc_ordinals
110        .into_iter()
111        .map(|(doc_id, ordinals)| {
112            let combined_score = combiner.combine(&ordinals);
113            VectorSearchResult::new(doc_id, combined_score, ordinals)
114        })
115        .collect();
116    results.sort_unstable_by(|a, b| {
117        b.score
118            .partial_cmp(&a.score)
119            .unwrap_or(std::cmp::Ordering::Equal)
120    });
121    results.truncate(limit);
122    results
123}
124
125/// Async segment reader with lazy loading
126///
127/// - Term dictionary: only index loaded, blocks loaded on-demand
128/// - Postings: loaded on-demand per term via HTTP range requests
129/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
130pub struct SegmentReader {
131    meta: SegmentMeta,
132    /// Term dictionary with lazy block loading
133    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
134    /// Postings file handle - fetches ranges on demand
135    postings_handle: FileHandle,
136    /// Document store with lazy block loading
137    store: Arc<AsyncStoreReader>,
138    schema: Arc<Schema>,
139    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
140    vector_indexes: FxHashMap<u32, VectorIndex>,
141    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
142    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
143    /// Per-field coarse centroids for IVF/ScaNN search
144    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
145    /// Sparse vector indexes per field (MaxScore format)
146    sparse_indexes: FxHashMap<u32, SparseIndex>,
147    /// BMP sparse vector indexes per field (BMP format)
148    bmp_indexes: FxHashMap<u32, BmpIndex>,
149    /// Position file handle for phrase queries (lazy loading)
150    positions_handle: Option<FileHandle>,
151    /// Fast-field columnar readers per field_id
152    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
153}
154
155impl SegmentReader {
156    /// Open a segment with lazy loading
157    pub async fn open<D: Directory>(
158        dir: &D,
159        segment_id: SegmentId,
160        schema: Arc<Schema>,
161        cache_blocks: usize,
162    ) -> Result<Self> {
163        let files = SegmentFiles::new(segment_id.0);
164
165        // Read metadata (small, always loaded)
166        let meta_slice = dir.open_read(&files.meta).await?;
167        let meta_bytes = meta_slice.read_bytes().await?;
168        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
169        debug_assert_eq!(meta.id, segment_id.0);
170
171        // Open term dictionary with lazy loading (fetches ranges on demand)
172        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
173        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
174
175        // Get postings file handle (lazy - fetches ranges on demand)
176        let postings_handle = dir.open_lazy(&files.postings).await?;
177
178        // Open store with lazy loading
179        let store_handle = dir.open_lazy(&files.store).await?;
180        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
181
182        // Load dense vector indexes from unified .vectors file
183        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
184        let vector_indexes = vectors_data.indexes;
185        let flat_vectors = vectors_data.flat_vectors;
186
187        // Load sparse vector indexes from .sparse file (MaxScore + BMP)
188        let sparse_data = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
189        let sparse_indexes = sparse_data.maxscore_indexes;
190        let bmp_indexes = sparse_data.bmp_indexes;
191
192        // Open positions file handle (if exists) - offsets are now in TermInfo
193        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
194
195        // Load fast-field columns from .fast file
196        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
197
198        // Log segment loading stats
199        {
200            let mut parts = vec![format!(
201                "[segment] loaded {:016x}: docs={}",
202                segment_id.0, meta.num_docs
203            )];
204            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
205                parts.push(format!(
206                    "dense: {} ann + {} flat fields",
207                    vector_indexes.len(),
208                    flat_vectors.len()
209                ));
210            }
211            for (field_id, idx) in &sparse_indexes {
212                parts.push(format!(
213                    "sparse field {}: {} dims, ~{:.1} KB",
214                    field_id,
215                    idx.num_dimensions(),
216                    idx.num_dimensions() as f64 * 24.0 / 1024.0
217                ));
218            }
219            for (field_id, idx) in &bmp_indexes {
220                parts.push(format!(
221                    "bmp field {}: {} dims, {} blocks",
222                    field_id,
223                    idx.dims(),
224                    idx.num_blocks
225                ));
226            }
227            if !fast_fields.is_empty() {
228                parts.push(format!("fast: {} fields", fast_fields.len()));
229            }
230            log::debug!("{}", parts.join(", "));
231        }
232
233        Ok(Self {
234            meta,
235            term_dict: Arc::new(term_dict),
236            postings_handle,
237            store: Arc::new(store),
238            schema,
239            vector_indexes,
240            flat_vectors,
241            coarse_centroids: FxHashMap::default(),
242            sparse_indexes,
243            bmp_indexes,
244            positions_handle,
245            fast_fields,
246        })
247    }
248
249    pub fn meta(&self) -> &SegmentMeta {
250        &self.meta
251    }
252
253    pub fn num_docs(&self) -> u32 {
254        self.meta.num_docs
255    }
256
257    /// Get average field length for BM25F scoring
258    pub fn avg_field_len(&self, field: Field) -> f32 {
259        self.meta.avg_field_len(field)
260    }
261
262    pub fn schema(&self) -> &Schema {
263        &self.schema
264    }
265
266    /// Get sparse indexes for all fields
267    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
268        &self.sparse_indexes
269    }
270
271    /// Get sparse index for a specific field (MaxScore format)
272    pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
273        self.sparse_indexes.get(&field.0)
274    }
275
276    /// Get BMP index for a specific field
277    pub fn bmp_index(&self, field: Field) -> Option<&BmpIndex> {
278        self.bmp_indexes.get(&field.0)
279    }
280
281    /// Get all BMP indexes
282    pub fn bmp_indexes(&self) -> &FxHashMap<u32, BmpIndex> {
283        &self.bmp_indexes
284    }
285
286    /// Get vector indexes for all fields
287    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
288        &self.vector_indexes
289    }
290
291    /// Get lazy flat vectors for all fields (for reranking and merge)
292    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
293        &self.flat_vectors
294    }
295
296    /// Get a fast-field reader for a specific field.
297    pub fn fast_field(
298        &self,
299        field_id: u32,
300    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
301        self.fast_fields.get(&field_id)
302    }
303
304    /// Get all fast-field readers.
305    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
306        &self.fast_fields
307    }
308
309    /// Get term dictionary stats for debugging
310    pub fn term_dict_stats(&self) -> SSTableStats {
311        self.term_dict.stats()
312    }
313
314    /// Estimate memory usage of this segment reader
315    pub fn memory_stats(&self) -> SegmentMemoryStats {
316        let term_dict_stats = self.term_dict.stats();
317
318        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
319        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
320
321        // Store cache: similar estimate
322        let store_cache_bytes = self.store.cached_blocks() * 4096;
323
324        // Sparse index: SoA dim table + OwnedBytes skip section + BMP grids
325        let sparse_index_bytes: usize = self
326            .sparse_indexes
327            .values()
328            .map(|s| s.estimated_memory_bytes())
329            .sum::<usize>()
330            + self
331                .bmp_indexes
332                .values()
333                .map(|b| b.estimated_memory_bytes())
334                .sum::<usize>();
335
336        // Dense index: vectors are memory-mapped, but we track index structures
337        // RaBitQ/IVF indexes have cluster assignments in memory
338        let dense_index_bytes: usize = self
339            .vector_indexes
340            .values()
341            .map(|v| v.estimated_memory_bytes())
342            .sum();
343
344        SegmentMemoryStats {
345            segment_id: self.meta.id,
346            num_docs: self.meta.num_docs,
347            term_dict_cache_bytes,
348            store_cache_bytes,
349            sparse_index_bytes,
350            dense_index_bytes,
351            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
352        }
353    }
354
355    /// Get posting list for a term (async - loads on demand)
356    ///
357    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
358    /// and no additional I/O is needed. For larger lists, reads from .post file.
359    pub async fn get_postings(
360        &self,
361        field: Field,
362        term: &[u8],
363    ) -> Result<Option<BlockPostingList>> {
364        log::debug!(
365            "SegmentReader::get_postings field={} term_len={}",
366            field.0,
367            term.len()
368        );
369
370        // Build key: field_id + term
371        let mut key = Vec::with_capacity(4 + term.len());
372        key.extend_from_slice(&field.0.to_le_bytes());
373        key.extend_from_slice(term);
374
375        // Look up in term dictionary
376        let term_info = match self.term_dict.get(&key).await? {
377            Some(info) => {
378                log::debug!("SegmentReader::get_postings found term_info");
379                info
380            }
381            None => {
382                log::debug!("SegmentReader::get_postings term not found");
383                return Ok(None);
384            }
385        };
386
387        // Check if posting list is inlined
388        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
389            // Build BlockPostingList from inline data (no I/O needed!)
390            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
391            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
392                posting_list.push(doc_id, tf);
393            }
394            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
395            return Ok(Some(block_list));
396        }
397
398        // External posting list - read from postings file handle (lazy - HTTP range request)
399        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
400            Error::Corruption("TermInfo has neither inline nor external data".to_string())
401        })?;
402
403        let start = posting_offset;
404        let end = start + posting_len;
405
406        if end > self.postings_handle.len() {
407            return Err(Error::Corruption(
408                "Posting offset out of bounds".to_string(),
409            ));
410        }
411
412        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
413        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
414
415        Ok(Some(block_list))
416    }
417
418    /// Get all posting lists for terms that start with `prefix` in the given field.
419    pub async fn get_prefix_postings(
420        &self,
421        field: Field,
422        prefix: &[u8],
423    ) -> Result<Vec<BlockPostingList>> {
424        // Build composite key prefix: field_id ++ prefix
425        let mut key_prefix = Vec::with_capacity(4 + prefix.len());
426        key_prefix.extend_from_slice(&field.0.to_le_bytes());
427        key_prefix.extend_from_slice(prefix);
428
429        let entries = self.term_dict.prefix_scan(&key_prefix).await?;
430        let mut results = Vec::with_capacity(entries.len());
431
432        for (_key, term_info) in entries {
433            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
434                let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
435                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
436                    posting_list.push(doc_id, tf);
437                }
438                results.push(BlockPostingList::from_posting_list(&posting_list)?);
439            } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
440                let start = posting_offset;
441                let end = start + posting_len;
442                if end > self.postings_handle.len() {
443                    continue;
444                }
445                let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
446                results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
447            }
448        }
449
450        Ok(results)
451    }
452
453    /// Get document by local doc_id (async - loads on demand).
454    ///
455    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
456    /// Uses binary search on sorted doc_ids for O(log N) lookup.
457    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
458        self.doc_with_fields(local_doc_id, None).await
459    }
460
461    /// Get document by local doc_id, hydrating only the specified fields.
462    ///
463    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
464    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
465    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
466    pub async fn doc_with_fields(
467        &self,
468        local_doc_id: DocId,
469        fields: Option<&rustc_hash::FxHashSet<u32>>,
470    ) -> Result<Option<Document>> {
471        let mut doc = match fields {
472            Some(set) => {
473                let field_ids: Vec<u32> = set.iter().copied().collect();
474                match self
475                    .store
476                    .get_fields(local_doc_id, &self.schema, &field_ids)
477                    .await
478                {
479                    Ok(Some(d)) => d,
480                    Ok(None) => return Ok(None),
481                    Err(e) => return Err(Error::from(e)),
482                }
483            }
484            None => match self.store.get(local_doc_id, &self.schema).await {
485                Ok(Some(d)) => d,
486                Ok(None) => return Ok(None),
487                Err(e) => return Err(Error::from(e)),
488            },
489        };
490
491        // Hydrate dense vector fields from flat vector data
492        for (&field_id, lazy_flat) in &self.flat_vectors {
493            // Skip vector fields not in the requested set
494            if let Some(set) = fields
495                && !set.contains(&field_id)
496            {
497                continue;
498            }
499
500            let is_binary = lazy_flat.quantization == DenseVectorQuantization::Binary;
501            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
502            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
503                let flat_idx = start + j;
504                if is_binary {
505                    let vbs = lazy_flat.vector_byte_size();
506                    let mut raw = vec![0u8; vbs];
507                    match lazy_flat.read_vector_raw_into(flat_idx, &mut raw).await {
508                        Ok(()) => {
509                            doc.add_binary_dense_vector(Field(field_id), raw);
510                        }
511                        Err(e) => {
512                            log::warn!("Failed to hydrate binary vector field {}: {}", field_id, e);
513                        }
514                    }
515                } else {
516                    match lazy_flat.get_vector(flat_idx).await {
517                        Ok(vec) => {
518                            doc.add_dense_vector(Field(field_id), vec);
519                        }
520                        Err(e) => {
521                            log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
522                        }
523                    }
524                }
525            }
526        }
527
528        Ok(Some(doc))
529    }
530
531    /// Prefetch term dictionary blocks for a key range
532    pub async fn prefetch_terms(
533        &self,
534        field: Field,
535        start_term: &[u8],
536        end_term: &[u8],
537    ) -> Result<()> {
538        let mut start_key = Vec::with_capacity(4 + start_term.len());
539        start_key.extend_from_slice(&field.0.to_le_bytes());
540        start_key.extend_from_slice(start_term);
541
542        let mut end_key = Vec::with_capacity(4 + end_term.len());
543        end_key.extend_from_slice(&field.0.to_le_bytes());
544        end_key.extend_from_slice(end_term);
545
546        self.term_dict.prefetch_range(&start_key, &end_key).await?;
547        Ok(())
548    }
549
550    /// Check if store uses dictionary compression (incompatible with raw merging)
551    pub fn store_has_dict(&self) -> bool {
552        self.store.has_dict()
553    }
554
555    /// Get store reference for merge operations
556    pub fn store(&self) -> &super::store::AsyncStoreReader {
557        &self.store
558    }
559
560    /// Get raw store blocks for optimized merging
561    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
562        self.store.raw_blocks()
563    }
564
565    /// Get store data slice for raw block access
566    pub fn store_data_slice(&self) -> &FileHandle {
567        self.store.data_slice()
568    }
569
570    /// Get all terms from this segment (for merge)
571    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
572        self.term_dict.all_entries().await.map_err(Error::from)
573    }
574
575    /// Get all terms with parsed field and term string (for statistics aggregation)
576    ///
577    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
578    /// Skips terms that aren't valid UTF-8.
579    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
580        let entries = self.term_dict.all_entries().await?;
581        let mut result = Vec::with_capacity(entries.len());
582
583        for (key, term_info) in entries {
584            // Key format: field_id (4 bytes little-endian) + term bytes
585            if key.len() > 4 {
586                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
587                let term_bytes = &key[4..];
588                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
589                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
590                }
591            }
592        }
593
594        Ok(result)
595    }
596
597    /// Get streaming iterator over term dictionary (for memory-efficient merge)
598    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
599        self.term_dict.iter()
600    }
601
602    /// Prefetch all term dictionary blocks in a single bulk I/O call.
603    ///
604    /// Call before merge iteration to eliminate per-block cache misses.
605    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
606        self.term_dict
607            .prefetch_all_data_bulk()
608            .await
609            .map_err(crate::Error::from)
610    }
611
612    /// Read raw posting bytes at offset
613    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
614        let start = offset;
615        let end = start + len;
616        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
617        Ok(bytes.to_vec())
618    }
619
620    /// Read raw position bytes at offset (for merge)
621    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
622        let handle = match &self.positions_handle {
623            Some(h) => h,
624            None => return Ok(None),
625        };
626        let start = offset;
627        let end = start + len;
628        let bytes = handle.read_bytes_range(start..end).await?;
629        Ok(Some(bytes.to_vec()))
630    }
631
632    /// Check if this segment has a positions file
633    pub fn has_positions_file(&self) -> bool {
634        self.positions_handle.is_some()
635    }
636
637    /// Batch cosine scoring on raw quantized bytes.
638    ///
639    /// Dispatches to the appropriate SIMD scorer based on quantization type.
640    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
641    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
642    fn score_quantized_batch(
643        query: &[f32],
644        raw: &[u8],
645        quant: crate::dsl::DenseVectorQuantization,
646        dim: usize,
647        scores: &mut [f32],
648        unit_norm: bool,
649    ) {
650        use crate::dsl::DenseVectorQuantization;
651        use crate::structures::simd;
652        match (quant, unit_norm) {
653            (DenseVectorQuantization::F32, false) => {
654                let num_floats = scores.len() * dim;
655                debug_assert!(
656                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
657                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
658                );
659                let vectors: &[f32] =
660                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
661                simd::batch_cosine_scores(query, vectors, dim, scores);
662            }
663            (DenseVectorQuantization::F32, true) => {
664                let num_floats = scores.len() * dim;
665                debug_assert!(
666                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
667                    "f32 vector data not 4-byte aligned"
668                );
669                let vectors: &[f32] =
670                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
671                simd::batch_dot_scores(query, vectors, dim, scores);
672            }
673            (DenseVectorQuantization::F16, false) => {
674                simd::batch_cosine_scores_f16(query, raw, dim, scores);
675            }
676            (DenseVectorQuantization::F16, true) => {
677                simd::batch_dot_scores_f16(query, raw, dim, scores);
678            }
679            (DenseVectorQuantization::UInt8, false) => {
680                simd::batch_cosine_scores_u8(query, raw, dim, scores);
681            }
682            (DenseVectorQuantization::UInt8, true) => {
683                simd::batch_dot_scores_u8(query, raw, dim, scores);
684            }
685            (DenseVectorQuantization::Binary, _) => {
686                // Binary vectors use search_binary_dense_vector(), not search_dense_vector()
687                unreachable!("Binary quantization should not reach score_quantized_batch");
688            }
689        }
690    }
691
692    /// Search dense vectors using RaBitQ
693    ///
694    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
695    /// Doc IDs are segment-local.
696    /// For multi-valued documents, scores are combined using the specified combiner.
697    pub async fn search_dense_vector(
698        &self,
699        field: Field,
700        query: &[f32],
701        k: usize,
702        nprobe: usize,
703        rerank_factor: f32,
704        combiner: crate::query::MultiValueCombiner,
705    ) -> Result<Vec<VectorSearchResult>> {
706        let ann_index = self.vector_indexes.get(&field.0);
707        let lazy_flat = self.flat_vectors.get(&field.0);
708
709        // No vectors at all for this field
710        if ann_index.is_none() && lazy_flat.is_none() {
711            return Ok(Vec::new());
712        }
713
714        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
715        let unit_norm = self
716            .schema
717            .get_field_entry(field)
718            .and_then(|e| e.dense_vector_config.as_ref())
719            .is_some_and(|c| c.unit_norm);
720
721        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
722        const BRUTE_FORCE_BATCH: usize = 4096;
723
724        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
725
726        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
727        let t0 = std::time::Instant::now();
728        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
729            // ANN search (RaBitQ, IVF, ScaNN)
730            match index {
731                VectorIndex::RaBitQ(lazy) => {
732                    let rabitq = lazy.get().ok_or_else(|| {
733                        Error::Schema("RaBitQ index deserialization failed".to_string())
734                    })?;
735                    rabitq
736                        .search(query, fetch_k)
737                        .into_iter()
738                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
739                        .collect()
740                }
741                VectorIndex::IVF(lazy) => {
742                    let (index, codebook) = lazy.get().ok_or_else(|| {
743                        Error::Schema("IVF index deserialization failed".to_string())
744                    })?;
745                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
746                        Error::Schema(format!(
747                            "IVF index requires coarse centroids for field {}",
748                            field.0
749                        ))
750                    })?;
751                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
752                    index
753                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
754                        .into_iter()
755                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
756                        .collect()
757                }
758                VectorIndex::ScaNN(lazy) => {
759                    let (index, codebook) = lazy.get().ok_or_else(|| {
760                        Error::Schema("ScaNN index deserialization failed".to_string())
761                    })?;
762                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
763                        Error::Schema(format!(
764                            "ScaNN index requires coarse centroids for field {}",
765                            field.0
766                        ))
767                    })?;
768                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
769                    index
770                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
771                        .into_iter()
772                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
773                        .collect()
774                }
775            }
776        } else if let Some(lazy_flat) = lazy_flat {
777            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
778            // Uses a top-k heap to avoid collecting and sorting all N candidates.
779            log::debug!(
780                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
781                field.0,
782                lazy_flat.num_vectors,
783                lazy_flat.dim,
784                lazy_flat.quantization
785            );
786            let dim = lazy_flat.dim;
787            let n = lazy_flat.num_vectors;
788            let quant = lazy_flat.quantization;
789            let mut collector = crate::query::ScoreCollector::new(fetch_k);
790            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
791
792            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
793                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
794                let batch_bytes = lazy_flat
795                    .read_vectors_batch(batch_start, batch_count)
796                    .await
797                    .map_err(crate::Error::Io)?;
798                let raw = batch_bytes.as_slice();
799
800                Self::score_quantized_batch(
801                    query,
802                    raw,
803                    quant,
804                    dim,
805                    &mut scores[..batch_count],
806                    unit_norm,
807                );
808
809                for (i, &score) in scores.iter().enumerate().take(batch_count) {
810                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
811                    collector.insert_with_ordinal(doc_id, score, ordinal);
812                }
813            }
814
815            collector
816                .into_sorted_results()
817                .into_iter()
818                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
819                .collect()
820        } else {
821            return Ok(Vec::new());
822        };
823        let l1_elapsed = t0.elapsed();
824        log::debug!(
825            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
826            field.0,
827            results.len(),
828            l1_elapsed.as_secs_f64() * 1000.0
829        );
830
831        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
832        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
833        if ann_index.is_some()
834            && !results.is_empty()
835            && let Some(lazy_flat) = lazy_flat
836        {
837            let t_rerank = std::time::Instant::now();
838            let dim = lazy_flat.dim;
839            let quant = lazy_flat.quantization;
840            let vbs = lazy_flat.vector_byte_size();
841
842            // Resolve flat indexes for each candidate via binary search
843            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
844            for (ri, c) in results.iter().enumerate() {
845                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
846                for (j, &(_, ord)) in entries.iter().enumerate() {
847                    if ord == c.1 {
848                        resolved.push((ri, start + j));
849                        break;
850                    }
851                }
852            }
853
854            let t_resolve = t_rerank.elapsed();
855            if !resolved.is_empty() {
856                // Sort by flat_idx for sequential mmap access (better page locality)
857                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
858
859                // Batch-read raw quantized bytes into contiguous buffer
860                let t_read = std::time::Instant::now();
861                let mut raw_buf = vec![0u8; resolved.len() * vbs];
862                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
863                    let _ = lazy_flat
864                        .read_vector_raw_into(
865                            flat_idx,
866                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
867                        )
868                        .await;
869                }
870
871                let read_elapsed = t_read.elapsed();
872
873                // Native-precision batch SIMD cosine scoring
874                let t_score = std::time::Instant::now();
875                let mut scores = vec![0f32; resolved.len()];
876                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
877                let score_elapsed = t_score.elapsed();
878
879                // Write scores back to results
880                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
881                    results[ri].2 = scores[buf_idx];
882                }
883
884                log::debug!(
885                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
886                    field.0,
887                    resolved.len(),
888                    dim,
889                    quant,
890                    vbs,
891                    t_resolve.as_secs_f64() * 1000.0,
892                    read_elapsed.as_secs_f64() * 1000.0,
893                    score_elapsed.as_secs_f64() * 1000.0,
894                );
895            }
896
897            if results.len() > fetch_k {
898                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
899                results.truncate(fetch_k);
900            }
901            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
902            log::debug!(
903                "[search_dense] field {}: rerank total={:.1}ms",
904                field.0,
905                t_rerank.elapsed().as_secs_f64() * 1000.0
906            );
907        }
908
909        Ok(combine_ordinal_results(results, combiner, k))
910    }
911
912    /// Search binary dense vectors using brute-force Hamming distance.
913    ///
914    /// Always flat brute-force (no ANN). Returns VectorSearchResult with ordinal tracking.
915    pub async fn search_binary_dense_vector(
916        &self,
917        field: Field,
918        query: &[u8],
919        k: usize,
920        combiner: crate::query::MultiValueCombiner,
921    ) -> Result<Vec<VectorSearchResult>> {
922        let lazy_flat = match self.flat_vectors.get(&field.0) {
923            Some(f) => f,
924            None => return Ok(Vec::new()),
925        };
926
927        const BRUTE_FORCE_BATCH: usize = 8192; // Binary vectors are tiny, use larger batches
928
929        let dim_bits = lazy_flat.dim;
930        let byte_len = lazy_flat.vector_byte_size();
931        let n = lazy_flat.num_vectors;
932
933        if byte_len != query.len() {
934            return Err(Error::Schema(format!(
935                "Binary query vector byte length {} != field byte length {}",
936                query.len(),
937                byte_len
938            )));
939        }
940
941        let mut collector = crate::query::ScoreCollector::new(k);
942        let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
943
944        for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
945            let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
946            let batch_bytes = lazy_flat
947                .read_vectors_batch(batch_start, batch_count)
948                .await
949                .map_err(crate::Error::Io)?;
950            let raw = batch_bytes.as_slice();
951
952            crate::structures::simd::batch_hamming_scores(
953                query,
954                raw,
955                byte_len,
956                dim_bits,
957                &mut scores[..batch_count],
958            );
959
960            for (i, &score) in scores.iter().enumerate().take(batch_count) {
961                let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
962                collector.insert_with_ordinal(doc_id, score, ordinal);
963            }
964        }
965
966        let results: Vec<(u32, u16, f32)> = collector
967            .into_sorted_results()
968            .into_iter()
969            .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
970            .collect();
971
972        Ok(combine_ordinal_results(results, combiner, k))
973    }
974
975    /// Check if this segment has dense vectors for the given field
976    pub fn has_dense_vector_index(&self, field: Field) -> bool {
977        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
978    }
979
980    /// Get the dense vector index for a field (if available)
981    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
982        match self.vector_indexes.get(&field.0) {
983            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
984            _ => None,
985        }
986    }
987
988    /// Get the IVF vector index for a field (if available)
989    pub fn get_ivf_vector_index(
990        &self,
991        field: Field,
992    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
993        match self.vector_indexes.get(&field.0) {
994            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
995            _ => None,
996        }
997    }
998
999    /// Get coarse centroids for a field
1000    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
1001        self.coarse_centroids.get(&field_id)
1002    }
1003
1004    /// Set per-field coarse centroids from index-level trained structures
1005    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
1006        self.coarse_centroids = centroids;
1007    }
1008
1009    /// Get the ScaNN vector index for a field (if available)
1010    pub fn get_scann_vector_index(
1011        &self,
1012        field: Field,
1013    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
1014        match self.vector_indexes.get(&field.0) {
1015            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1016            _ => None,
1017        }
1018    }
1019
1020    /// Get the vector index type for a field
1021    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
1022        self.vector_indexes.get(&field.0)
1023    }
1024
1025    /// Get positions for a term (for phrase queries)
1026    ///
1027    /// Position offsets are now embedded in TermInfo, so we first look up
1028    /// the term to get its TermInfo, then use position_info() to get the offset.
1029    pub async fn get_positions(
1030        &self,
1031        field: Field,
1032        term: &[u8],
1033    ) -> Result<Option<crate::structures::PositionPostingList>> {
1034        // Get positions handle
1035        let handle = match &self.positions_handle {
1036            Some(h) => h,
1037            None => return Ok(None),
1038        };
1039
1040        // Build key: field_id + term
1041        let mut key = Vec::with_capacity(4 + term.len());
1042        key.extend_from_slice(&field.0.to_le_bytes());
1043        key.extend_from_slice(term);
1044
1045        // Look up term in dictionary to get TermInfo with position offset
1046        let term_info = match self.term_dict.get(&key).await? {
1047            Some(info) => info,
1048            None => return Ok(None),
1049        };
1050
1051        // Get position offset from TermInfo
1052        let (offset, length) = match term_info.position_info() {
1053            Some((o, l)) => (o, l),
1054            None => return Ok(None),
1055        };
1056
1057        // Read the position data
1058        let slice = handle.slice(offset..offset + length);
1059        let data = slice.read_bytes().await?;
1060
1061        // Deserialize
1062        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1063
1064        Ok(Some(pos_list))
1065    }
1066
1067    /// Check if positions are available for a field
1068    pub fn has_positions(&self, field: Field) -> bool {
1069        // Check schema for position mode on this field
1070        if let Some(entry) = self.schema.get_field_entry(field) {
1071            entry.positions.is_some()
1072        } else {
1073            false
1074        }
1075    }
1076}
1077
1078// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
1079#[cfg(feature = "sync")]
1080impl SegmentReader {
1081    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
1082    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1083        // Build key: field_id + term
1084        let mut key = Vec::with_capacity(4 + term.len());
1085        key.extend_from_slice(&field.0.to_le_bytes());
1086        key.extend_from_slice(term);
1087
1088        // Look up in term dictionary (sync)
1089        let term_info = match self.term_dict.get_sync(&key)? {
1090            Some(info) => info,
1091            None => return Ok(None),
1092        };
1093
1094        // Check if posting list is inlined
1095        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1096            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1097            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1098                posting_list.push(doc_id, tf);
1099            }
1100            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1101            return Ok(Some(block_list));
1102        }
1103
1104        // External posting list — sync range read
1105        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1106            Error::Corruption("TermInfo has neither inline nor external data".to_string())
1107        })?;
1108
1109        let start = posting_offset;
1110        let end = start + posting_len;
1111
1112        if end > self.postings_handle.len() {
1113            return Err(Error::Corruption(
1114                "Posting offset out of bounds".to_string(),
1115            ));
1116        }
1117
1118        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1119        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1120
1121        Ok(Some(block_list))
1122    }
1123
1124    /// Synchronous prefix posting list lookup — requires Inline (mmap/RAM) file handles.
1125    pub fn get_prefix_postings_sync(
1126        &self,
1127        field: Field,
1128        prefix: &[u8],
1129    ) -> Result<Vec<BlockPostingList>> {
1130        let mut key_prefix = Vec::with_capacity(4 + prefix.len());
1131        key_prefix.extend_from_slice(&field.0.to_le_bytes());
1132        key_prefix.extend_from_slice(prefix);
1133
1134        let entries = self.term_dict.prefix_scan_sync(&key_prefix)?;
1135        let mut results = Vec::with_capacity(entries.len());
1136
1137        for (_key, term_info) in entries {
1138            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1139                let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1140                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1141                    posting_list.push(doc_id, tf);
1142                }
1143                results.push(BlockPostingList::from_posting_list(&posting_list)?);
1144            } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
1145                let start = posting_offset;
1146                let end = start + posting_len;
1147                if end > self.postings_handle.len() {
1148                    continue;
1149                }
1150                let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1151                results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
1152            }
1153        }
1154
1155        Ok(results)
1156    }
1157
1158    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
1159    pub fn get_positions_sync(
1160        &self,
1161        field: Field,
1162        term: &[u8],
1163    ) -> Result<Option<crate::structures::PositionPostingList>> {
1164        let handle = match &self.positions_handle {
1165            Some(h) => h,
1166            None => return Ok(None),
1167        };
1168
1169        // Build key: field_id + term
1170        let mut key = Vec::with_capacity(4 + term.len());
1171        key.extend_from_slice(&field.0.to_le_bytes());
1172        key.extend_from_slice(term);
1173
1174        // Look up term in dictionary (sync)
1175        let term_info = match self.term_dict.get_sync(&key)? {
1176            Some(info) => info,
1177            None => return Ok(None),
1178        };
1179
1180        let (offset, length) = match term_info.position_info() {
1181            Some((o, l)) => (o, l),
1182            None => return Ok(None),
1183        };
1184
1185        let slice = handle.slice(offset..offset + length);
1186        let data = slice.read_bytes_sync()?;
1187
1188        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1189        Ok(Some(pos_list))
1190    }
1191
1192    /// Synchronous dense vector search — ANN indexes are already sync,
1193    /// brute-force uses sync mmap reads.
1194    pub fn search_dense_vector_sync(
1195        &self,
1196        field: Field,
1197        query: &[f32],
1198        k: usize,
1199        nprobe: usize,
1200        rerank_factor: f32,
1201        combiner: crate::query::MultiValueCombiner,
1202    ) -> Result<Vec<VectorSearchResult>> {
1203        let ann_index = self.vector_indexes.get(&field.0);
1204        let lazy_flat = self.flat_vectors.get(&field.0);
1205
1206        if ann_index.is_none() && lazy_flat.is_none() {
1207            return Ok(Vec::new());
1208        }
1209
1210        let unit_norm = self
1211            .schema
1212            .get_field_entry(field)
1213            .and_then(|e| e.dense_vector_config.as_ref())
1214            .is_some_and(|c| c.unit_norm);
1215
1216        const BRUTE_FORCE_BATCH: usize = 4096;
1217        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1218
1219        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1220            // ANN search (already sync)
1221            match index {
1222                VectorIndex::RaBitQ(lazy) => {
1223                    let rabitq = lazy.get().ok_or_else(|| {
1224                        Error::Schema("RaBitQ index deserialization failed".to_string())
1225                    })?;
1226                    rabitq
1227                        .search(query, fetch_k)
1228                        .into_iter()
1229                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1230                        .collect()
1231                }
1232                VectorIndex::IVF(lazy) => {
1233                    let (index, codebook) = lazy.get().ok_or_else(|| {
1234                        Error::Schema("IVF index deserialization failed".to_string())
1235                    })?;
1236                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1237                        Error::Schema(format!(
1238                            "IVF index requires coarse centroids for field {}",
1239                            field.0
1240                        ))
1241                    })?;
1242                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1243                    index
1244                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1245                        .into_iter()
1246                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1247                        .collect()
1248                }
1249                VectorIndex::ScaNN(lazy) => {
1250                    let (index, codebook) = lazy.get().ok_or_else(|| {
1251                        Error::Schema("ScaNN index deserialization failed".to_string())
1252                    })?;
1253                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1254                        Error::Schema(format!(
1255                            "ScaNN index requires coarse centroids for field {}",
1256                            field.0
1257                        ))
1258                    })?;
1259                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1260                    index
1261                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1262                        .into_iter()
1263                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1264                        .collect()
1265                }
1266            }
1267        } else if let Some(lazy_flat) = lazy_flat {
1268            // Batched brute-force (sync mmap reads)
1269            let dim = lazy_flat.dim;
1270            let n = lazy_flat.num_vectors;
1271            let quant = lazy_flat.quantization;
1272            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1273            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1274
1275            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1276                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1277                let batch_bytes = lazy_flat
1278                    .read_vectors_batch_sync(batch_start, batch_count)
1279                    .map_err(crate::Error::Io)?;
1280                let raw = batch_bytes.as_slice();
1281
1282                Self::score_quantized_batch(
1283                    query,
1284                    raw,
1285                    quant,
1286                    dim,
1287                    &mut scores[..batch_count],
1288                    unit_norm,
1289                );
1290
1291                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1292                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1293                    collector.insert_with_ordinal(doc_id, score, ordinal);
1294                }
1295            }
1296
1297            collector
1298                .into_sorted_results()
1299                .into_iter()
1300                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1301                .collect()
1302        } else {
1303            return Ok(Vec::new());
1304        };
1305
1306        // Rerank ANN candidates using raw vectors (sync)
1307        if ann_index.is_some()
1308            && !results.is_empty()
1309            && let Some(lazy_flat) = lazy_flat
1310        {
1311            let dim = lazy_flat.dim;
1312            let quant = lazy_flat.quantization;
1313            let vbs = lazy_flat.vector_byte_size();
1314
1315            let mut resolved: Vec<(usize, usize)> = Vec::new();
1316            for (ri, c) in results.iter().enumerate() {
1317                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1318                for (j, &(_, ord)) in entries.iter().enumerate() {
1319                    if ord == c.1 {
1320                        resolved.push((ri, start + j));
1321                        break;
1322                    }
1323                }
1324            }
1325
1326            if !resolved.is_empty() {
1327                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1328                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1329                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1330                    let _ = lazy_flat.read_vector_raw_into_sync(
1331                        flat_idx,
1332                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1333                    );
1334                }
1335
1336                let mut scores = vec![0f32; resolved.len()];
1337                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1338
1339                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1340                    results[ri].2 = scores[buf_idx];
1341                }
1342            }
1343
1344            if results.len() > fetch_k {
1345                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1346                results.truncate(fetch_k);
1347            }
1348            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1349        }
1350
1351        Ok(combine_ordinal_results(results, combiner, k))
1352    }
1353}