Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3pub(crate) mod bmp;
4pub(crate) mod loader;
5mod types;
6
7pub use bmp::BmpIndex;
8#[cfg(feature = "diagnostics")]
9pub use types::DimRawData;
10pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
11
12/// Memory statistics for a single segment
13#[derive(Debug, Clone, Default)]
14pub struct SegmentMemoryStats {
15    /// Segment ID
16    pub segment_id: u128,
17    /// Number of documents in segment
18    pub num_docs: u32,
19    /// Term dictionary block cache bytes
20    pub term_dict_cache_bytes: usize,
21    /// Document store block cache bytes
22    pub store_cache_bytes: usize,
23    /// Sparse vector index bytes (in-memory posting lists)
24    pub sparse_index_bytes: usize,
25    /// Dense vector index bytes (cluster assignments, quantized codes)
26    pub dense_index_bytes: usize,
27    /// Bloom filter bytes
28    pub bloom_filter_bytes: usize,
29}
30
31impl SegmentMemoryStats {
32    /// Total estimated memory for this segment
33    pub fn total_bytes(&self) -> usize {
34        self.term_dict_cache_bytes
35            + self.store_cache_bytes
36            + self.sparse_index_bytes
37            + self.dense_index_bytes
38            + self.bloom_filter_bytes
39    }
40}
41
42use std::sync::Arc;
43
44use rustc_hash::FxHashMap;
45
46use super::vector_data::LazyFlatVectorData;
47use crate::directories::{Directory, FileHandle};
48use crate::dsl::{DenseVectorQuantization, Document, Field, Schema};
49use crate::structures::{
50    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
51    RaBitQIndex, SSTableStats, TermInfo,
52};
53use crate::{DocId, Error, Result};
54
55use super::store::{AsyncStoreReader, RawStoreBlock};
56use super::types::{SegmentFiles, SegmentId, SegmentMeta};
57
58/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
59/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
60///
61/// Fast path: when all ordinals are 0 (single-valued field), skips the HashMap
62/// grouping entirely and just sorts + truncates the raw results.
63pub(crate) fn combine_ordinal_results(
64    raw: impl IntoIterator<Item = (u32, u16, f32)>,
65    combiner: crate::query::MultiValueCombiner,
66    limit: usize,
67) -> Vec<VectorSearchResult> {
68    let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
69
70    let num_raw = collected.len();
71    if log::log_enabled!(log::Level::Debug) {
72        let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
73        ids.sort_unstable();
74        ids.dedup();
75        log::debug!(
76            "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77            num_raw,
78            ids.len(),
79            combiner,
80            limit
81        );
82    }
83
84    // Fast path: all ordinals are 0 → no grouping needed, skip HashMap
85    let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
86    if all_single {
87        let mut results: Vec<VectorSearchResult> = collected
88            .into_iter()
89            .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
90            .collect();
91        results.sort_unstable_by(|a, b| {
92            b.score
93                .partial_cmp(&a.score)
94                .unwrap_or(std::cmp::Ordering::Equal)
95        });
96        results.truncate(limit);
97        return results;
98    }
99
100    // Slow path: multi-valued field — group by doc_id, apply combiner
101    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
102        rustc_hash::FxHashMap::default();
103    for (doc_id, ordinal, score) in collected {
104        doc_ordinals
105            .entry(doc_id as DocId)
106            .or_default()
107            .push((ordinal as u32, score));
108    }
109    let mut results: Vec<VectorSearchResult> = doc_ordinals
110        .into_iter()
111        .map(|(doc_id, ordinals)| {
112            let combined_score = combiner.combine(&ordinals);
113            VectorSearchResult::new(doc_id, combined_score, ordinals)
114        })
115        .collect();
116    results.sort_unstable_by(|a, b| {
117        b.score
118            .partial_cmp(&a.score)
119            .unwrap_or(std::cmp::Ordering::Equal)
120    });
121    results.truncate(limit);
122    results
123}
124
125/// Async segment reader with lazy loading
126///
127/// - Term dictionary: only index loaded, blocks loaded on-demand
128/// - Postings: loaded on-demand per term via HTTP range requests
129/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
130pub struct SegmentReader {
131    meta: SegmentMeta,
132    /// Term dictionary with lazy block loading
133    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
134    /// Postings file handle - fetches ranges on demand
135    postings_handle: FileHandle,
136    /// Document store with lazy block loading
137    store: Arc<AsyncStoreReader>,
138    schema: Arc<Schema>,
139    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
140    vector_indexes: FxHashMap<u32, VectorIndex>,
141    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
142    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
143    /// Per-field coarse centroids for IVF/ScaNN search
144    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
145    /// Sparse vector indexes per field (MaxScore format)
146    sparse_indexes: FxHashMap<u32, SparseIndex>,
147    /// BMP sparse vector indexes per field (BMP format)
148    bmp_indexes: FxHashMap<u32, BmpIndex>,
149    /// Position file handle for phrase queries (lazy loading)
150    positions_handle: Option<FileHandle>,
151    /// Fast-field columnar readers per field_id
152    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
153    /// Per-segment MaxScore threshold (f32 stored as AtomicU32 bits).
154    /// Allows per-field MaxScore groups within a single query to share thresholds:
155    /// field A's result seeds field B's pruning on the same segment.
156    shared_threshold: std::sync::atomic::AtomicU32,
157}
158
159impl SegmentReader {
160    /// Open a segment with lazy loading
161    pub async fn open<D: Directory>(
162        dir: &D,
163        segment_id: SegmentId,
164        schema: Arc<Schema>,
165        cache_blocks: usize,
166    ) -> Result<Self> {
167        let files = SegmentFiles::new(segment_id.0);
168
169        // Read metadata (small, always loaded)
170        let meta_slice = dir.open_read(&files.meta).await?;
171        let meta_bytes = meta_slice.read_bytes().await?;
172        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
173        debug_assert_eq!(meta.id, segment_id.0);
174
175        // Open term dictionary with lazy loading (fetches ranges on demand)
176        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
177        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
178
179        // Get postings file handle (lazy - fetches ranges on demand)
180        let postings_handle = dir.open_lazy(&files.postings).await?;
181
182        // Open store with lazy loading
183        let store_handle = dir.open_lazy(&files.store).await?;
184        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
185
186        // Load dense vector indexes from unified .vectors file
187        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
188        let vector_indexes = vectors_data.indexes;
189        let flat_vectors = vectors_data.flat_vectors;
190
191        // Load sparse vector indexes from .sparse file (MaxScore + BMP)
192        let sparse_data = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
193        let sparse_indexes = sparse_data.maxscore_indexes;
194        let bmp_indexes = sparse_data.bmp_indexes;
195
196        // Open positions file handle (if exists) - offsets are now in TermInfo
197        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
198
199        // Load fast-field columns from .fast file
200        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
201
202        // Log segment loading stats
203        {
204            let mut parts = vec![format!(
205                "[segment] loaded {:016x}: docs={}",
206                segment_id.0, meta.num_docs
207            )];
208            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
209                parts.push(format!(
210                    "dense: {} ann + {} flat fields",
211                    vector_indexes.len(),
212                    flat_vectors.len()
213                ));
214            }
215            for (field_id, idx) in &sparse_indexes {
216                parts.push(format!(
217                    "sparse field {}: {} dims, ~{:.1} KB",
218                    field_id,
219                    idx.num_dimensions(),
220                    idx.num_dimensions() as f64 * 24.0 / 1024.0
221                ));
222            }
223            for (field_id, idx) in &bmp_indexes {
224                parts.push(format!(
225                    "bmp field {}: {} dims, {} blocks",
226                    field_id,
227                    idx.dims(),
228                    idx.num_blocks
229                ));
230            }
231            if !fast_fields.is_empty() {
232                parts.push(format!("fast: {} fields", fast_fields.len()));
233            }
234            log::debug!("{}", parts.join(", "));
235        }
236
237        Ok(Self {
238            meta,
239            term_dict: Arc::new(term_dict),
240            postings_handle,
241            store: Arc::new(store),
242            schema,
243            vector_indexes,
244            flat_vectors,
245            coarse_centroids: FxHashMap::default(),
246            sparse_indexes,
247            bmp_indexes,
248            positions_handle,
249            fast_fields,
250            shared_threshold: std::sync::atomic::AtomicU32::new(0),
251        })
252    }
253
254    /// Reset the per-segment threshold (call before each new search).
255    #[inline]
256    pub fn reset_shared_threshold(&self) {
257        self.shared_threshold
258            .store(0, std::sync::atomic::Ordering::Relaxed);
259    }
260
261    /// Read the per-segment MaxScore threshold.
262    #[inline]
263    pub fn shared_threshold_f32(&self) -> f32 {
264        f32::from_bits(
265            self.shared_threshold
266                .load(std::sync::atomic::Ordering::Relaxed),
267        )
268    }
269
270    /// Update the threshold if new value is higher (monotonic max).
271    #[inline]
272    pub fn update_shared_threshold(&self, new_threshold: f32) {
273        use std::sync::atomic::Ordering::Relaxed;
274        let new_bits = new_threshold.to_bits();
275        let mut current_bits = self.shared_threshold.load(Relaxed);
276        while new_threshold > f32::from_bits(current_bits) {
277            match self.shared_threshold.compare_exchange_weak(
278                current_bits,
279                new_bits,
280                Relaxed,
281                Relaxed,
282            ) {
283                Ok(_) => return,
284                Err(actual) => current_bits = actual,
285            }
286        }
287    }
288
289    pub fn meta(&self) -> &SegmentMeta {
290        &self.meta
291    }
292
293    pub fn num_docs(&self) -> u32 {
294        self.meta.num_docs
295    }
296
297    /// Get average field length for BM25F scoring
298    pub fn avg_field_len(&self, field: Field) -> f32 {
299        self.meta.avg_field_len(field)
300    }
301
302    pub fn schema(&self) -> &Schema {
303        &self.schema
304    }
305
306    /// Get sparse indexes for all fields
307    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
308        &self.sparse_indexes
309    }
310
311    /// Get sparse index for a specific field (MaxScore format)
312    pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
313        self.sparse_indexes.get(&field.0)
314    }
315
316    /// Get BMP index for a specific field
317    pub fn bmp_index(&self, field: Field) -> Option<&BmpIndex> {
318        self.bmp_indexes.get(&field.0)
319    }
320
321    /// Get all BMP indexes
322    pub fn bmp_indexes(&self) -> &FxHashMap<u32, BmpIndex> {
323        &self.bmp_indexes
324    }
325
326    /// Get vector indexes for all fields
327    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
328        &self.vector_indexes
329    }
330
331    /// Get lazy flat vectors for all fields (for reranking and merge)
332    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
333        &self.flat_vectors
334    }
335
336    /// Get a fast-field reader for a specific field.
337    pub fn fast_field(
338        &self,
339        field_id: u32,
340    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
341        self.fast_fields.get(&field_id)
342    }
343
344    /// Get all fast-field readers.
345    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
346        &self.fast_fields
347    }
348
349    /// Get term dictionary stats for debugging
350    pub fn term_dict_stats(&self) -> SSTableStats {
351        self.term_dict.stats()
352    }
353
354    /// Estimate memory usage of this segment reader
355    pub fn memory_stats(&self) -> SegmentMemoryStats {
356        let term_dict_stats = self.term_dict.stats();
357
358        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
359        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
360
361        // Store cache: similar estimate
362        let store_cache_bytes = self.store.cached_blocks() * 4096;
363
364        // Sparse index: SoA dim table + OwnedBytes skip section + BMP grids
365        let sparse_index_bytes: usize = self
366            .sparse_indexes
367            .values()
368            .map(|s| s.estimated_memory_bytes())
369            .sum::<usize>()
370            + self
371                .bmp_indexes
372                .values()
373                .map(|b| b.estimated_memory_bytes())
374                .sum::<usize>();
375
376        // Dense index: vectors are memory-mapped, but we track index structures
377        // RaBitQ/IVF indexes have cluster assignments in memory
378        let dense_index_bytes: usize = self
379            .vector_indexes
380            .values()
381            .map(|v| v.estimated_memory_bytes())
382            .sum();
383
384        SegmentMemoryStats {
385            segment_id: self.meta.id,
386            num_docs: self.meta.num_docs,
387            term_dict_cache_bytes,
388            store_cache_bytes,
389            sparse_index_bytes,
390            dense_index_bytes,
391            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
392        }
393    }
394
395    /// Get posting list for a term (async - loads on demand)
396    ///
397    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
398    /// and no additional I/O is needed. For larger lists, reads from .post file.
399    pub async fn get_postings(
400        &self,
401        field: Field,
402        term: &[u8],
403    ) -> Result<Option<BlockPostingList>> {
404        log::debug!(
405            "SegmentReader::get_postings field={} term_len={}",
406            field.0,
407            term.len()
408        );
409
410        // Build key: field_id + term
411        let mut key = Vec::with_capacity(4 + term.len());
412        key.extend_from_slice(&field.0.to_le_bytes());
413        key.extend_from_slice(term);
414
415        // Look up in term dictionary
416        let term_info = match self.term_dict.get(&key).await? {
417            Some(info) => {
418                log::debug!("SegmentReader::get_postings found term_info");
419                info
420            }
421            None => {
422                log::debug!("SegmentReader::get_postings term not found");
423                return Ok(None);
424            }
425        };
426
427        // Check if posting list is inlined
428        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
429            // Build BlockPostingList from inline data (no I/O needed!)
430            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
431            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
432                posting_list.push(doc_id, tf);
433            }
434            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
435            return Ok(Some(block_list));
436        }
437
438        // External posting list - read from postings file handle (lazy - HTTP range request)
439        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
440            Error::Corruption("TermInfo has neither inline nor external data".to_string())
441        })?;
442
443        let start = posting_offset;
444        let end = start + posting_len;
445
446        if end > self.postings_handle.len() {
447            return Err(Error::Corruption(
448                "Posting offset out of bounds".to_string(),
449            ));
450        }
451
452        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
453        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
454
455        Ok(Some(block_list))
456    }
457
458    /// Get all posting lists for terms that start with `prefix` in the given field.
459    pub async fn get_prefix_postings(
460        &self,
461        field: Field,
462        prefix: &[u8],
463    ) -> Result<Vec<BlockPostingList>> {
464        // Build composite key prefix: field_id ++ prefix
465        let mut key_prefix = Vec::with_capacity(4 + prefix.len());
466        key_prefix.extend_from_slice(&field.0.to_le_bytes());
467        key_prefix.extend_from_slice(prefix);
468
469        let entries = self.term_dict.prefix_scan(&key_prefix).await?;
470        let mut results = Vec::with_capacity(entries.len());
471
472        for (_key, term_info) in entries {
473            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
474                let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
475                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
476                    posting_list.push(doc_id, tf);
477                }
478                results.push(BlockPostingList::from_posting_list(&posting_list)?);
479            } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
480                let start = posting_offset;
481                let end = start + posting_len;
482                if end > self.postings_handle.len() {
483                    continue;
484                }
485                let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
486                results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
487            }
488        }
489
490        Ok(results)
491    }
492
493    /// Get document by local doc_id (async - loads on demand).
494    ///
495    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
496    /// Uses binary search on sorted doc_ids for O(log N) lookup.
497    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
498        self.doc_with_fields(local_doc_id, None).await
499    }
500
501    /// Get document by local doc_id, hydrating only the specified fields.
502    ///
503    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
504    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
505    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
506    pub async fn doc_with_fields(
507        &self,
508        local_doc_id: DocId,
509        fields: Option<&rustc_hash::FxHashSet<u32>>,
510    ) -> Result<Option<Document>> {
511        let mut doc = match fields {
512            Some(set) => {
513                let field_ids: Vec<u32> = set.iter().copied().collect();
514                match self
515                    .store
516                    .get_fields(local_doc_id, &self.schema, &field_ids)
517                    .await
518                {
519                    Ok(Some(d)) => d,
520                    Ok(None) => return Ok(None),
521                    Err(e) => return Err(Error::from(e)),
522                }
523            }
524            None => match self.store.get(local_doc_id, &self.schema).await {
525                Ok(Some(d)) => d,
526                Ok(None) => return Ok(None),
527                Err(e) => return Err(Error::from(e)),
528            },
529        };
530
531        // Hydrate dense vector fields from flat vector data
532        for (&field_id, lazy_flat) in &self.flat_vectors {
533            // Skip vector fields not in the requested set
534            if let Some(set) = fields
535                && !set.contains(&field_id)
536            {
537                continue;
538            }
539
540            let is_binary = lazy_flat.quantization == DenseVectorQuantization::Binary;
541            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
542            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
543                let flat_idx = start + j;
544                if is_binary {
545                    let vbs = lazy_flat.vector_byte_size();
546                    let mut raw = vec![0u8; vbs];
547                    match lazy_flat.read_vector_raw_into(flat_idx, &mut raw).await {
548                        Ok(()) => {
549                            doc.add_binary_dense_vector(Field(field_id), raw);
550                        }
551                        Err(e) => {
552                            log::warn!("Failed to hydrate binary vector field {}: {}", field_id, e);
553                        }
554                    }
555                } else {
556                    match lazy_flat.get_vector(flat_idx).await {
557                        Ok(vec) => {
558                            doc.add_dense_vector(Field(field_id), vec);
559                        }
560                        Err(e) => {
561                            log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
562                        }
563                    }
564                }
565            }
566        }
567
568        Ok(Some(doc))
569    }
570
571    /// Prefetch term dictionary blocks for a key range
572    pub async fn prefetch_terms(
573        &self,
574        field: Field,
575        start_term: &[u8],
576        end_term: &[u8],
577    ) -> Result<()> {
578        let mut start_key = Vec::with_capacity(4 + start_term.len());
579        start_key.extend_from_slice(&field.0.to_le_bytes());
580        start_key.extend_from_slice(start_term);
581
582        let mut end_key = Vec::with_capacity(4 + end_term.len());
583        end_key.extend_from_slice(&field.0.to_le_bytes());
584        end_key.extend_from_slice(end_term);
585
586        self.term_dict.prefetch_range(&start_key, &end_key).await?;
587        Ok(())
588    }
589
590    /// Check if store uses dictionary compression (incompatible with raw merging)
591    pub fn store_has_dict(&self) -> bool {
592        self.store.has_dict()
593    }
594
595    /// Get store reference for merge operations
596    pub fn store(&self) -> &super::store::AsyncStoreReader {
597        &self.store
598    }
599
600    /// Get raw store blocks for optimized merging
601    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
602        self.store.raw_blocks()
603    }
604
605    /// Get store data slice for raw block access
606    pub fn store_data_slice(&self) -> &FileHandle {
607        self.store.data_slice()
608    }
609
610    /// Get all terms from this segment (for merge)
611    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
612        self.term_dict.all_entries().await.map_err(Error::from)
613    }
614
615    /// Get all terms with parsed field and term string (for statistics aggregation)
616    ///
617    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
618    /// Skips terms that aren't valid UTF-8.
619    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
620        let entries = self.term_dict.all_entries().await?;
621        let mut result = Vec::with_capacity(entries.len());
622
623        for (key, term_info) in entries {
624            // Key format: field_id (4 bytes little-endian) + term bytes
625            if key.len() > 4 {
626                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
627                let term_bytes = &key[4..];
628                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
629                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
630                }
631            }
632        }
633
634        Ok(result)
635    }
636
637    /// Get streaming iterator over term dictionary (for memory-efficient merge)
638    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
639        self.term_dict.iter()
640    }
641
642    /// Prefetch all term dictionary blocks in a single bulk I/O call.
643    ///
644    /// Call before merge iteration to eliminate per-block cache misses.
645    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
646        self.term_dict
647            .prefetch_all_data_bulk()
648            .await
649            .map_err(crate::Error::from)
650    }
651
652    /// Read raw posting bytes at offset
653    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
654        let start = offset;
655        let end = start + len;
656        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
657        Ok(bytes.to_vec())
658    }
659
660    /// Read raw position bytes at offset (for merge)
661    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
662        let handle = match &self.positions_handle {
663            Some(h) => h,
664            None => return Ok(None),
665        };
666        let start = offset;
667        let end = start + len;
668        let bytes = handle.read_bytes_range(start..end).await?;
669        Ok(Some(bytes.to_vec()))
670    }
671
672    /// Check if this segment has a positions file
673    pub fn has_positions_file(&self) -> bool {
674        self.positions_handle.is_some()
675    }
676
677    /// Batch cosine scoring on raw quantized bytes.
678    ///
679    /// Dispatches to the appropriate SIMD scorer based on quantization type.
680    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
681    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
682    fn score_quantized_batch(
683        query: &[f32],
684        raw: &[u8],
685        quant: crate::dsl::DenseVectorQuantization,
686        dim: usize,
687        scores: &mut [f32],
688        unit_norm: bool,
689    ) {
690        use crate::dsl::DenseVectorQuantization;
691        use crate::structures::simd;
692        match (quant, unit_norm) {
693            (DenseVectorQuantization::F32, false) => {
694                let num_floats = scores.len() * dim;
695                debug_assert!(
696                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
697                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
698                );
699                let vectors: &[f32] =
700                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
701                simd::batch_cosine_scores(query, vectors, dim, scores);
702            }
703            (DenseVectorQuantization::F32, true) => {
704                let num_floats = scores.len() * dim;
705                debug_assert!(
706                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
707                    "f32 vector data not 4-byte aligned"
708                );
709                let vectors: &[f32] =
710                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
711                simd::batch_dot_scores(query, vectors, dim, scores);
712            }
713            (DenseVectorQuantization::F16, false) => {
714                simd::batch_cosine_scores_f16(query, raw, dim, scores);
715            }
716            (DenseVectorQuantization::F16, true) => {
717                simd::batch_dot_scores_f16(query, raw, dim, scores);
718            }
719            (DenseVectorQuantization::UInt8, false) => {
720                simd::batch_cosine_scores_u8(query, raw, dim, scores);
721            }
722            (DenseVectorQuantization::UInt8, true) => {
723                simd::batch_dot_scores_u8(query, raw, dim, scores);
724            }
725            (DenseVectorQuantization::Binary, _) => {
726                // Binary vectors use search_binary_dense_vector(), not search_dense_vector()
727                unreachable!("Binary quantization should not reach score_quantized_batch");
728            }
729        }
730    }
731
732    /// Search dense vectors using RaBitQ
733    ///
734    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
735    /// Doc IDs are segment-local.
736    /// For multi-valued documents, scores are combined using the specified combiner.
737    pub async fn search_dense_vector(
738        &self,
739        field: Field,
740        query: &[f32],
741        k: usize,
742        nprobe: usize,
743        rerank_factor: f32,
744        combiner: crate::query::MultiValueCombiner,
745    ) -> Result<Vec<VectorSearchResult>> {
746        let ann_index = self.vector_indexes.get(&field.0);
747        let lazy_flat = self.flat_vectors.get(&field.0);
748
749        // No vectors at all for this field
750        if ann_index.is_none() && lazy_flat.is_none() {
751            return Ok(Vec::new());
752        }
753
754        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
755        let unit_norm = self
756            .schema
757            .get_field_entry(field)
758            .and_then(|e| e.dense_vector_config.as_ref())
759            .is_some_and(|c| c.unit_norm);
760
761        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
762        const BRUTE_FORCE_BATCH: usize = 4096;
763
764        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
765
766        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
767        let t0 = std::time::Instant::now();
768        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
769            // ANN search (RaBitQ, IVF, ScaNN)
770            match index {
771                VectorIndex::RaBitQ(lazy) => {
772                    let rabitq = lazy.get().ok_or_else(|| {
773                        Error::Schema("RaBitQ index deserialization failed".to_string())
774                    })?;
775                    rabitq
776                        .search(query, fetch_k)
777                        .into_iter()
778                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
779                        .collect()
780                }
781                VectorIndex::IVF(lazy) => {
782                    let (index, codebook) = lazy.get().ok_or_else(|| {
783                        Error::Schema("IVF index deserialization failed".to_string())
784                    })?;
785                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
786                        Error::Schema(format!(
787                            "IVF index requires coarse centroids for field {}",
788                            field.0
789                        ))
790                    })?;
791                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
792                    index
793                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
794                        .into_iter()
795                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
796                        .collect()
797                }
798                VectorIndex::ScaNN(lazy) => {
799                    let (index, codebook) = lazy.get().ok_or_else(|| {
800                        Error::Schema("ScaNN index deserialization failed".to_string())
801                    })?;
802                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
803                        Error::Schema(format!(
804                            "ScaNN index requires coarse centroids for field {}",
805                            field.0
806                        ))
807                    })?;
808                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
809                    index
810                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
811                        .into_iter()
812                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
813                        .collect()
814                }
815            }
816        } else if let Some(lazy_flat) = lazy_flat {
817            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
818            // Uses a top-k heap to avoid collecting and sorting all N candidates.
819            log::debug!(
820                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
821                field.0,
822                lazy_flat.num_vectors,
823                lazy_flat.dim,
824                lazy_flat.quantization
825            );
826            let dim = lazy_flat.dim;
827            let n = lazy_flat.num_vectors;
828            let quant = lazy_flat.quantization;
829            let mut collector = crate::query::ScoreCollector::new(fetch_k);
830            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
831
832            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
833                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
834                let batch_bytes = lazy_flat
835                    .read_vectors_batch(batch_start, batch_count)
836                    .await
837                    .map_err(crate::Error::Io)?;
838                let raw = batch_bytes.as_slice();
839
840                Self::score_quantized_batch(
841                    query,
842                    raw,
843                    quant,
844                    dim,
845                    &mut scores[..batch_count],
846                    unit_norm,
847                );
848
849                for (i, &score) in scores.iter().enumerate().take(batch_count) {
850                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
851                    collector.insert_with_ordinal(doc_id, score, ordinal);
852                }
853            }
854
855            collector
856                .into_sorted_results()
857                .into_iter()
858                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
859                .collect()
860        } else {
861            return Ok(Vec::new());
862        };
863        let l1_elapsed = t0.elapsed();
864        log::debug!(
865            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
866            field.0,
867            results.len(),
868            l1_elapsed.as_secs_f64() * 1000.0
869        );
870
871        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
872        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
873        if ann_index.is_some()
874            && !results.is_empty()
875            && let Some(lazy_flat) = lazy_flat
876        {
877            let t_rerank = std::time::Instant::now();
878            let dim = lazy_flat.dim;
879            let quant = lazy_flat.quantization;
880            let vbs = lazy_flat.vector_byte_size();
881
882            // Resolve flat indexes for each candidate via binary search
883            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
884            for (ri, c) in results.iter().enumerate() {
885                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
886                for (j, &(_, ord)) in entries.iter().enumerate() {
887                    if ord == c.1 {
888                        resolved.push((ri, start + j));
889                        break;
890                    }
891                }
892            }
893
894            let t_resolve = t_rerank.elapsed();
895            if !resolved.is_empty() {
896                // Sort by flat_idx for sequential mmap access (better page locality)
897                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
898
899                // Batch-read raw quantized bytes into contiguous buffer
900                let t_read = std::time::Instant::now();
901                let mut raw_buf = vec![0u8; resolved.len() * vbs];
902                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
903                    let _ = lazy_flat
904                        .read_vector_raw_into(
905                            flat_idx,
906                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
907                        )
908                        .await;
909                }
910
911                let read_elapsed = t_read.elapsed();
912
913                // Native-precision batch SIMD cosine scoring
914                let t_score = std::time::Instant::now();
915                let mut scores = vec![0f32; resolved.len()];
916                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
917                let score_elapsed = t_score.elapsed();
918
919                // Write scores back to results
920                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
921                    results[ri].2 = scores[buf_idx];
922                }
923
924                log::debug!(
925                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
926                    field.0,
927                    resolved.len(),
928                    dim,
929                    quant,
930                    vbs,
931                    t_resolve.as_secs_f64() * 1000.0,
932                    read_elapsed.as_secs_f64() * 1000.0,
933                    score_elapsed.as_secs_f64() * 1000.0,
934                );
935            }
936
937            if results.len() > fetch_k {
938                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
939                results.truncate(fetch_k);
940            }
941            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
942            log::debug!(
943                "[search_dense] field {}: rerank total={:.1}ms",
944                field.0,
945                t_rerank.elapsed().as_secs_f64() * 1000.0
946            );
947        }
948
949        Ok(combine_ordinal_results(results, combiner, k))
950    }
951
952    /// Search binary dense vectors using brute-force Hamming distance.
953    ///
954    /// Always flat brute-force (no ANN). Returns VectorSearchResult with ordinal tracking.
955    pub async fn search_binary_dense_vector(
956        &self,
957        field: Field,
958        query: &[u8],
959        k: usize,
960        combiner: crate::query::MultiValueCombiner,
961    ) -> Result<Vec<VectorSearchResult>> {
962        let lazy_flat = match self.flat_vectors.get(&field.0) {
963            Some(f) => f,
964            None => return Ok(Vec::new()),
965        };
966
967        const BRUTE_FORCE_BATCH: usize = 8192; // Binary vectors are tiny, use larger batches
968
969        let dim_bits = lazy_flat.dim;
970        let byte_len = lazy_flat.vector_byte_size();
971        let n = lazy_flat.num_vectors;
972
973        if byte_len != query.len() {
974            return Err(Error::Schema(format!(
975                "Binary query vector byte length {} != field byte length {}",
976                query.len(),
977                byte_len
978            )));
979        }
980
981        let mut collector = crate::query::ScoreCollector::new(k);
982        let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
983
984        for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
985            let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
986            let batch_bytes = lazy_flat
987                .read_vectors_batch(batch_start, batch_count)
988                .await
989                .map_err(crate::Error::Io)?;
990            let raw = batch_bytes.as_slice();
991
992            crate::structures::simd::batch_hamming_scores(
993                query,
994                raw,
995                byte_len,
996                dim_bits,
997                &mut scores[..batch_count],
998            );
999
1000            let threshold = collector.threshold();
1001            for (i, &score) in scores.iter().enumerate().take(batch_count) {
1002                if score > threshold {
1003                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1004                    collector.insert_with_ordinal(doc_id, score, ordinal);
1005                }
1006            }
1007        }
1008
1009        let results: Vec<(u32, u16, f32)> = collector
1010            .into_sorted_results()
1011            .into_iter()
1012            .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1013            .collect();
1014
1015        Ok(combine_ordinal_results(results, combiner, k))
1016    }
1017
1018    /// Check if this segment has dense vectors for the given field
1019    pub fn has_dense_vector_index(&self, field: Field) -> bool {
1020        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
1021    }
1022
1023    /// Get the dense vector index for a field (if available)
1024    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
1025        match self.vector_indexes.get(&field.0) {
1026            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
1027            _ => None,
1028        }
1029    }
1030
1031    /// Get the IVF vector index for a field (if available)
1032    pub fn get_ivf_vector_index(
1033        &self,
1034        field: Field,
1035    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
1036        match self.vector_indexes.get(&field.0) {
1037            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1038            _ => None,
1039        }
1040    }
1041
1042    /// Get coarse centroids for a field
1043    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
1044        self.coarse_centroids.get(&field_id)
1045    }
1046
1047    /// Set per-field coarse centroids from index-level trained structures
1048    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
1049        self.coarse_centroids = centroids;
1050    }
1051
1052    /// Get the ScaNN vector index for a field (if available)
1053    pub fn get_scann_vector_index(
1054        &self,
1055        field: Field,
1056    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
1057        match self.vector_indexes.get(&field.0) {
1058            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1059            _ => None,
1060        }
1061    }
1062
1063    /// Get the vector index type for a field
1064    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
1065        self.vector_indexes.get(&field.0)
1066    }
1067
1068    /// Get positions for a term (for phrase queries)
1069    ///
1070    /// Position offsets are now embedded in TermInfo, so we first look up
1071    /// the term to get its TermInfo, then use position_info() to get the offset.
1072    pub async fn get_positions(
1073        &self,
1074        field: Field,
1075        term: &[u8],
1076    ) -> Result<Option<crate::structures::PositionPostingList>> {
1077        // Get positions handle
1078        let handle = match &self.positions_handle {
1079            Some(h) => h,
1080            None => return Ok(None),
1081        };
1082
1083        // Build key: field_id + term
1084        let mut key = Vec::with_capacity(4 + term.len());
1085        key.extend_from_slice(&field.0.to_le_bytes());
1086        key.extend_from_slice(term);
1087
1088        // Look up term in dictionary to get TermInfo with position offset
1089        let term_info = match self.term_dict.get(&key).await? {
1090            Some(info) => info,
1091            None => return Ok(None),
1092        };
1093
1094        // Get position offset from TermInfo
1095        let (offset, length) = match term_info.position_info() {
1096            Some((o, l)) => (o, l),
1097            None => return Ok(None),
1098        };
1099
1100        // Read the position data
1101        let slice = handle.slice(offset..offset + length);
1102        let data = slice.read_bytes().await?;
1103
1104        // Deserialize
1105        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1106
1107        Ok(Some(pos_list))
1108    }
1109
1110    /// Check if positions are available for a field
1111    pub fn has_positions(&self, field: Field) -> bool {
1112        // Check schema for position mode on this field
1113        if let Some(entry) = self.schema.get_field_entry(field) {
1114            entry.positions.is_some()
1115        } else {
1116            false
1117        }
1118    }
1119}
1120
1121// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
1122#[cfg(feature = "sync")]
1123impl SegmentReader {
1124    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
1125    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1126        // Build key: field_id + term
1127        let mut key = Vec::with_capacity(4 + term.len());
1128        key.extend_from_slice(&field.0.to_le_bytes());
1129        key.extend_from_slice(term);
1130
1131        // Look up in term dictionary (sync)
1132        let term_info = match self.term_dict.get_sync(&key)? {
1133            Some(info) => info,
1134            None => return Ok(None),
1135        };
1136
1137        // Check if posting list is inlined
1138        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1139            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1140            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1141                posting_list.push(doc_id, tf);
1142            }
1143            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1144            return Ok(Some(block_list));
1145        }
1146
1147        // External posting list — sync range read
1148        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1149            Error::Corruption("TermInfo has neither inline nor external data".to_string())
1150        })?;
1151
1152        let start = posting_offset;
1153        let end = start + posting_len;
1154
1155        if end > self.postings_handle.len() {
1156            return Err(Error::Corruption(
1157                "Posting offset out of bounds".to_string(),
1158            ));
1159        }
1160
1161        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1162        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1163
1164        Ok(Some(block_list))
1165    }
1166
1167    /// Synchronous prefix posting list lookup — requires Inline (mmap/RAM) file handles.
1168    pub fn get_prefix_postings_sync(
1169        &self,
1170        field: Field,
1171        prefix: &[u8],
1172    ) -> Result<Vec<BlockPostingList>> {
1173        let mut key_prefix = Vec::with_capacity(4 + prefix.len());
1174        key_prefix.extend_from_slice(&field.0.to_le_bytes());
1175        key_prefix.extend_from_slice(prefix);
1176
1177        let entries = self.term_dict.prefix_scan_sync(&key_prefix)?;
1178        let mut results = Vec::with_capacity(entries.len());
1179
1180        for (_key, term_info) in entries {
1181            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1182                let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1183                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1184                    posting_list.push(doc_id, tf);
1185                }
1186                results.push(BlockPostingList::from_posting_list(&posting_list)?);
1187            } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
1188                let start = posting_offset;
1189                let end = start + posting_len;
1190                if end > self.postings_handle.len() {
1191                    continue;
1192                }
1193                let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1194                results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
1195            }
1196        }
1197
1198        Ok(results)
1199    }
1200
1201    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
1202    pub fn get_positions_sync(
1203        &self,
1204        field: Field,
1205        term: &[u8],
1206    ) -> Result<Option<crate::structures::PositionPostingList>> {
1207        let handle = match &self.positions_handle {
1208            Some(h) => h,
1209            None => return Ok(None),
1210        };
1211
1212        // Build key: field_id + term
1213        let mut key = Vec::with_capacity(4 + term.len());
1214        key.extend_from_slice(&field.0.to_le_bytes());
1215        key.extend_from_slice(term);
1216
1217        // Look up term in dictionary (sync)
1218        let term_info = match self.term_dict.get_sync(&key)? {
1219            Some(info) => info,
1220            None => return Ok(None),
1221        };
1222
1223        let (offset, length) = match term_info.position_info() {
1224            Some((o, l)) => (o, l),
1225            None => return Ok(None),
1226        };
1227
1228        let slice = handle.slice(offset..offset + length);
1229        let data = slice.read_bytes_sync()?;
1230
1231        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1232        Ok(Some(pos_list))
1233    }
1234
1235    /// Synchronous dense vector search — ANN indexes are already sync,
1236    /// brute-force uses sync mmap reads.
1237    pub fn search_dense_vector_sync(
1238        &self,
1239        field: Field,
1240        query: &[f32],
1241        k: usize,
1242        nprobe: usize,
1243        rerank_factor: f32,
1244        combiner: crate::query::MultiValueCombiner,
1245    ) -> Result<Vec<VectorSearchResult>> {
1246        let ann_index = self.vector_indexes.get(&field.0);
1247        let lazy_flat = self.flat_vectors.get(&field.0);
1248
1249        if ann_index.is_none() && lazy_flat.is_none() {
1250            return Ok(Vec::new());
1251        }
1252
1253        let unit_norm = self
1254            .schema
1255            .get_field_entry(field)
1256            .and_then(|e| e.dense_vector_config.as_ref())
1257            .is_some_and(|c| c.unit_norm);
1258
1259        const BRUTE_FORCE_BATCH: usize = 4096;
1260        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1261
1262        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1263            // ANN search (already sync)
1264            match index {
1265                VectorIndex::RaBitQ(lazy) => {
1266                    let rabitq = lazy.get().ok_or_else(|| {
1267                        Error::Schema("RaBitQ index deserialization failed".to_string())
1268                    })?;
1269                    rabitq
1270                        .search(query, fetch_k)
1271                        .into_iter()
1272                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1273                        .collect()
1274                }
1275                VectorIndex::IVF(lazy) => {
1276                    let (index, codebook) = lazy.get().ok_or_else(|| {
1277                        Error::Schema("IVF index deserialization failed".to_string())
1278                    })?;
1279                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1280                        Error::Schema(format!(
1281                            "IVF index requires coarse centroids for field {}",
1282                            field.0
1283                        ))
1284                    })?;
1285                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1286                    index
1287                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1288                        .into_iter()
1289                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1290                        .collect()
1291                }
1292                VectorIndex::ScaNN(lazy) => {
1293                    let (index, codebook) = lazy.get().ok_or_else(|| {
1294                        Error::Schema("ScaNN index deserialization failed".to_string())
1295                    })?;
1296                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1297                        Error::Schema(format!(
1298                            "ScaNN index requires coarse centroids for field {}",
1299                            field.0
1300                        ))
1301                    })?;
1302                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1303                    index
1304                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1305                        .into_iter()
1306                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1307                        .collect()
1308                }
1309            }
1310        } else if let Some(lazy_flat) = lazy_flat {
1311            // Batched brute-force (sync mmap reads)
1312            let dim = lazy_flat.dim;
1313            let n = lazy_flat.num_vectors;
1314            let quant = lazy_flat.quantization;
1315            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1316            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1317
1318            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1319                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1320                let batch_bytes = lazy_flat
1321                    .read_vectors_batch_sync(batch_start, batch_count)
1322                    .map_err(crate::Error::Io)?;
1323                let raw = batch_bytes.as_slice();
1324
1325                Self::score_quantized_batch(
1326                    query,
1327                    raw,
1328                    quant,
1329                    dim,
1330                    &mut scores[..batch_count],
1331                    unit_norm,
1332                );
1333
1334                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1335                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1336                    collector.insert_with_ordinal(doc_id, score, ordinal);
1337                }
1338            }
1339
1340            collector
1341                .into_sorted_results()
1342                .into_iter()
1343                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1344                .collect()
1345        } else {
1346            return Ok(Vec::new());
1347        };
1348
1349        // Rerank ANN candidates using raw vectors (sync)
1350        if ann_index.is_some()
1351            && !results.is_empty()
1352            && let Some(lazy_flat) = lazy_flat
1353        {
1354            let dim = lazy_flat.dim;
1355            let quant = lazy_flat.quantization;
1356            let vbs = lazy_flat.vector_byte_size();
1357
1358            let mut resolved: Vec<(usize, usize)> = Vec::new();
1359            for (ri, c) in results.iter().enumerate() {
1360                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1361                for (j, &(_, ord)) in entries.iter().enumerate() {
1362                    if ord == c.1 {
1363                        resolved.push((ri, start + j));
1364                        break;
1365                    }
1366                }
1367            }
1368
1369            if !resolved.is_empty() {
1370                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1371                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1372                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1373                    let _ = lazy_flat.read_vector_raw_into_sync(
1374                        flat_idx,
1375                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1376                    );
1377                }
1378
1379                let mut scores = vec![0f32; resolved.len()];
1380                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1381
1382                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1383                    results[ri].2 = scores[buf_idx];
1384                }
1385            }
1386
1387            if results.len() > fetch_k {
1388                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1389                results.truncate(fetch_k);
1390            }
1391            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1392        }
1393
1394        Ok(combine_ordinal_results(results, combiner, k))
1395    }
1396}