Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
57/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
58///
59/// Fast path: when all ordinals are 0 (single-valued field), skips the HashMap
60/// grouping entirely and just sorts + truncates the raw results.
61pub(crate) fn combine_ordinal_results(
62    raw: impl IntoIterator<Item = (u32, u16, f32)>,
63    combiner: crate::query::MultiValueCombiner,
64    limit: usize,
65) -> Vec<VectorSearchResult> {
66    let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68    let num_raw = collected.len();
69    if log::log_enabled!(log::Level::Debug) {
70        let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
71        ids.sort_unstable();
72        ids.dedup();
73        log::debug!(
74            "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
75            num_raw,
76            ids.len(),
77            combiner,
78            limit
79        );
80    }
81
82    // Fast path: all ordinals are 0 → no grouping needed, skip HashMap
83    let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
84    if all_single {
85        let mut results: Vec<VectorSearchResult> = collected
86            .into_iter()
87            .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
88            .collect();
89        results.sort_unstable_by(|a, b| {
90            b.score
91                .partial_cmp(&a.score)
92                .unwrap_or(std::cmp::Ordering::Equal)
93        });
94        results.truncate(limit);
95        return results;
96    }
97
98    // Slow path: multi-valued field — group by doc_id, apply combiner
99    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
100        rustc_hash::FxHashMap::default();
101    for (doc_id, ordinal, score) in collected {
102        doc_ordinals
103            .entry(doc_id as DocId)
104            .or_default()
105            .push((ordinal as u32, score));
106    }
107    let mut results: Vec<VectorSearchResult> = doc_ordinals
108        .into_iter()
109        .map(|(doc_id, ordinals)| {
110            let combined_score = combiner.combine(&ordinals);
111            VectorSearchResult::new(doc_id, combined_score, ordinals)
112        })
113        .collect();
114    results.sort_unstable_by(|a, b| {
115        b.score
116            .partial_cmp(&a.score)
117            .unwrap_or(std::cmp::Ordering::Equal)
118    });
119    results.truncate(limit);
120    results
121}
122
123/// Async segment reader with lazy loading
124///
125/// - Term dictionary: only index loaded, blocks loaded on-demand
126/// - Postings: loaded on-demand per term via HTTP range requests
127/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
128pub struct SegmentReader {
129    meta: SegmentMeta,
130    /// Term dictionary with lazy block loading
131    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
132    /// Postings file handle - fetches ranges on demand
133    postings_handle: FileHandle,
134    /// Document store with lazy block loading
135    store: Arc<AsyncStoreReader>,
136    schema: Arc<Schema>,
137    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
138    vector_indexes: FxHashMap<u32, VectorIndex>,
139    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
140    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
141    /// Per-field coarse centroids for IVF/ScaNN search
142    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143    /// Sparse vector indexes per field
144    sparse_indexes: FxHashMap<u32, SparseIndex>,
145    /// Position file handle for phrase queries (lazy loading)
146    positions_handle: Option<FileHandle>,
147    /// Fast-field columnar readers per field_id
148    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
149}
150
151impl SegmentReader {
152    /// Open a segment with lazy loading
153    pub async fn open<D: Directory>(
154        dir: &D,
155        segment_id: SegmentId,
156        schema: Arc<Schema>,
157        cache_blocks: usize,
158    ) -> Result<Self> {
159        let files = SegmentFiles::new(segment_id.0);
160
161        // Read metadata (small, always loaded)
162        let meta_slice = dir.open_read(&files.meta).await?;
163        let meta_bytes = meta_slice.read_bytes().await?;
164        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
165        debug_assert_eq!(meta.id, segment_id.0);
166
167        // Open term dictionary with lazy loading (fetches ranges on demand)
168        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
169        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
170
171        // Get postings file handle (lazy - fetches ranges on demand)
172        let postings_handle = dir.open_lazy(&files.postings).await?;
173
174        // Open store with lazy loading
175        let store_handle = dir.open_lazy(&files.store).await?;
176        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
177
178        // Load dense vector indexes from unified .vectors file
179        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
180        let vector_indexes = vectors_data.indexes;
181        let flat_vectors = vectors_data.flat_vectors;
182
183        // Load sparse vector indexes from .sparse file
184        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
185
186        // Open positions file handle (if exists) - offsets are now in TermInfo
187        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
188
189        // Load fast-field columns from .fast file
190        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
191
192        // Log segment loading stats
193        {
194            let mut parts = vec![format!(
195                "[segment] loaded {:016x}: docs={}",
196                segment_id.0, meta.num_docs
197            )];
198            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
199                parts.push(format!(
200                    "dense: {} ann + {} flat fields",
201                    vector_indexes.len(),
202                    flat_vectors.len()
203                ));
204            }
205            for (field_id, idx) in &sparse_indexes {
206                parts.push(format!(
207                    "sparse field {}: {} dims, ~{:.1} KB",
208                    field_id,
209                    idx.num_dimensions(),
210                    idx.num_dimensions() as f64 * 24.0 / 1024.0
211                ));
212            }
213            if !fast_fields.is_empty() {
214                parts.push(format!("fast: {} fields", fast_fields.len()));
215            }
216            log::debug!("{}", parts.join(", "));
217        }
218
219        Ok(Self {
220            meta,
221            term_dict: Arc::new(term_dict),
222            postings_handle,
223            store: Arc::new(store),
224            schema,
225            vector_indexes,
226            flat_vectors,
227            coarse_centroids: FxHashMap::default(),
228            sparse_indexes,
229            positions_handle,
230            fast_fields,
231        })
232    }
233
234    pub fn meta(&self) -> &SegmentMeta {
235        &self.meta
236    }
237
238    pub fn num_docs(&self) -> u32 {
239        self.meta.num_docs
240    }
241
242    /// Get average field length for BM25F scoring
243    pub fn avg_field_len(&self, field: Field) -> f32 {
244        self.meta.avg_field_len(field)
245    }
246
247    pub fn schema(&self) -> &Schema {
248        &self.schema
249    }
250
251    /// Get sparse indexes for all fields
252    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
253        &self.sparse_indexes
254    }
255
256    /// Get sparse index for a specific field
257    pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
258        self.sparse_indexes.get(&field.0)
259    }
260
261    /// Get vector indexes for all fields
262    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
263        &self.vector_indexes
264    }
265
266    /// Get lazy flat vectors for all fields (for reranking and merge)
267    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
268        &self.flat_vectors
269    }
270
271    /// Get a fast-field reader for a specific field.
272    pub fn fast_field(
273        &self,
274        field_id: u32,
275    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
276        self.fast_fields.get(&field_id)
277    }
278
279    /// Get all fast-field readers.
280    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
281        &self.fast_fields
282    }
283
284    /// Get term dictionary stats for debugging
285    pub fn term_dict_stats(&self) -> SSTableStats {
286        self.term_dict.stats()
287    }
288
289    /// Estimate memory usage of this segment reader
290    pub fn memory_stats(&self) -> SegmentMemoryStats {
291        let term_dict_stats = self.term_dict.stats();
292
293        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
294        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
295
296        // Store cache: similar estimate
297        let store_cache_bytes = self.store.cached_blocks() * 4096;
298
299        // Sparse index: SoA dim table + OwnedBytes skip section
300        let sparse_index_bytes: usize = self
301            .sparse_indexes
302            .values()
303            .map(|s| s.estimated_memory_bytes())
304            .sum();
305
306        // Dense index: vectors are memory-mapped, but we track index structures
307        // RaBitQ/IVF indexes have cluster assignments in memory
308        let dense_index_bytes: usize = self
309            .vector_indexes
310            .values()
311            .map(|v| v.estimated_memory_bytes())
312            .sum();
313
314        SegmentMemoryStats {
315            segment_id: self.meta.id,
316            num_docs: self.meta.num_docs,
317            term_dict_cache_bytes,
318            store_cache_bytes,
319            sparse_index_bytes,
320            dense_index_bytes,
321            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
322        }
323    }
324
325    /// Get posting list for a term (async - loads on demand)
326    ///
327    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
328    /// and no additional I/O is needed. For larger lists, reads from .post file.
329    pub async fn get_postings(
330        &self,
331        field: Field,
332        term: &[u8],
333    ) -> Result<Option<BlockPostingList>> {
334        log::debug!(
335            "SegmentReader::get_postings field={} term_len={}",
336            field.0,
337            term.len()
338        );
339
340        // Build key: field_id + term
341        let mut key = Vec::with_capacity(4 + term.len());
342        key.extend_from_slice(&field.0.to_le_bytes());
343        key.extend_from_slice(term);
344
345        // Look up in term dictionary
346        let term_info = match self.term_dict.get(&key).await? {
347            Some(info) => {
348                log::debug!("SegmentReader::get_postings found term_info");
349                info
350            }
351            None => {
352                log::debug!("SegmentReader::get_postings term not found");
353                return Ok(None);
354            }
355        };
356
357        // Check if posting list is inlined
358        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
359            // Build BlockPostingList from inline data (no I/O needed!)
360            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
361            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
362                posting_list.push(doc_id, tf);
363            }
364            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
365            return Ok(Some(block_list));
366        }
367
368        // External posting list - read from postings file handle (lazy - HTTP range request)
369        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
370            Error::Corruption("TermInfo has neither inline nor external data".to_string())
371        })?;
372
373        let start = posting_offset;
374        let end = start + posting_len;
375
376        if end > self.postings_handle.len() {
377            return Err(Error::Corruption(
378                "Posting offset out of bounds".to_string(),
379            ));
380        }
381
382        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
383        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
384
385        Ok(Some(block_list))
386    }
387
388    /// Get document by local doc_id (async - loads on demand).
389    ///
390    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
391    /// Uses binary search on sorted doc_ids for O(log N) lookup.
392    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
393        self.doc_with_fields(local_doc_id, None).await
394    }
395
396    /// Get document by local doc_id, hydrating only the specified fields.
397    ///
398    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
399    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
400    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
401    pub async fn doc_with_fields(
402        &self,
403        local_doc_id: DocId,
404        fields: Option<&rustc_hash::FxHashSet<u32>>,
405    ) -> Result<Option<Document>> {
406        let mut doc = match fields {
407            Some(set) => {
408                let field_ids: Vec<u32> = set.iter().copied().collect();
409                match self
410                    .store
411                    .get_fields(local_doc_id, &self.schema, &field_ids)
412                    .await
413                {
414                    Ok(Some(d)) => d,
415                    Ok(None) => return Ok(None),
416                    Err(e) => return Err(Error::from(e)),
417                }
418            }
419            None => match self.store.get(local_doc_id, &self.schema).await {
420                Ok(Some(d)) => d,
421                Ok(None) => return Ok(None),
422                Err(e) => return Err(Error::from(e)),
423            },
424        };
425
426        // Hydrate dense vector fields from flat vector data
427        for (&field_id, lazy_flat) in &self.flat_vectors {
428            // Skip vector fields not in the requested set
429            if let Some(set) = fields
430                && !set.contains(&field_id)
431            {
432                continue;
433            }
434
435            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
436            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
437                let flat_idx = start + j;
438                match lazy_flat.get_vector(flat_idx).await {
439                    Ok(vec) => {
440                        doc.add_dense_vector(Field(field_id), vec);
441                    }
442                    Err(e) => {
443                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
444                    }
445                }
446            }
447        }
448
449        Ok(Some(doc))
450    }
451
452    /// Prefetch term dictionary blocks for a key range
453    pub async fn prefetch_terms(
454        &self,
455        field: Field,
456        start_term: &[u8],
457        end_term: &[u8],
458    ) -> Result<()> {
459        let mut start_key = Vec::with_capacity(4 + start_term.len());
460        start_key.extend_from_slice(&field.0.to_le_bytes());
461        start_key.extend_from_slice(start_term);
462
463        let mut end_key = Vec::with_capacity(4 + end_term.len());
464        end_key.extend_from_slice(&field.0.to_le_bytes());
465        end_key.extend_from_slice(end_term);
466
467        self.term_dict.prefetch_range(&start_key, &end_key).await?;
468        Ok(())
469    }
470
471    /// Check if store uses dictionary compression (incompatible with raw merging)
472    pub fn store_has_dict(&self) -> bool {
473        self.store.has_dict()
474    }
475
476    /// Get store reference for merge operations
477    pub fn store(&self) -> &super::store::AsyncStoreReader {
478        &self.store
479    }
480
481    /// Get raw store blocks for optimized merging
482    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
483        self.store.raw_blocks()
484    }
485
486    /// Get store data slice for raw block access
487    pub fn store_data_slice(&self) -> &FileHandle {
488        self.store.data_slice()
489    }
490
491    /// Get all terms from this segment (for merge)
492    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
493        self.term_dict.all_entries().await.map_err(Error::from)
494    }
495
496    /// Get all terms with parsed field and term string (for statistics aggregation)
497    ///
498    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
499    /// Skips terms that aren't valid UTF-8.
500    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
501        let entries = self.term_dict.all_entries().await?;
502        let mut result = Vec::with_capacity(entries.len());
503
504        for (key, term_info) in entries {
505            // Key format: field_id (4 bytes little-endian) + term bytes
506            if key.len() > 4 {
507                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
508                let term_bytes = &key[4..];
509                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
510                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
511                }
512            }
513        }
514
515        Ok(result)
516    }
517
518    /// Get streaming iterator over term dictionary (for memory-efficient merge)
519    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
520        self.term_dict.iter()
521    }
522
523    /// Prefetch all term dictionary blocks in a single bulk I/O call.
524    ///
525    /// Call before merge iteration to eliminate per-block cache misses.
526    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
527        self.term_dict
528            .prefetch_all_data_bulk()
529            .await
530            .map_err(crate::Error::from)
531    }
532
533    /// Read raw posting bytes at offset
534    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
535        let start = offset;
536        let end = start + len;
537        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
538        Ok(bytes.to_vec())
539    }
540
541    /// Read raw position bytes at offset (for merge)
542    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
543        let handle = match &self.positions_handle {
544            Some(h) => h,
545            None => return Ok(None),
546        };
547        let start = offset;
548        let end = start + len;
549        let bytes = handle.read_bytes_range(start..end).await?;
550        Ok(Some(bytes.to_vec()))
551    }
552
553    /// Check if this segment has a positions file
554    pub fn has_positions_file(&self) -> bool {
555        self.positions_handle.is_some()
556    }
557
558    /// Batch cosine scoring on raw quantized bytes.
559    ///
560    /// Dispatches to the appropriate SIMD scorer based on quantization type.
561    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
562    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
563    fn score_quantized_batch(
564        query: &[f32],
565        raw: &[u8],
566        quant: crate::dsl::DenseVectorQuantization,
567        dim: usize,
568        scores: &mut [f32],
569        unit_norm: bool,
570    ) {
571        use crate::dsl::DenseVectorQuantization;
572        use crate::structures::simd;
573        match (quant, unit_norm) {
574            (DenseVectorQuantization::F32, false) => {
575                let num_floats = scores.len() * dim;
576                debug_assert!(
577                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
578                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
579                );
580                let vectors: &[f32] =
581                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
582                simd::batch_cosine_scores(query, vectors, dim, scores);
583            }
584            (DenseVectorQuantization::F32, true) => {
585                let num_floats = scores.len() * dim;
586                debug_assert!(
587                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
588                    "f32 vector data not 4-byte aligned"
589                );
590                let vectors: &[f32] =
591                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
592                simd::batch_dot_scores(query, vectors, dim, scores);
593            }
594            (DenseVectorQuantization::F16, false) => {
595                simd::batch_cosine_scores_f16(query, raw, dim, scores);
596            }
597            (DenseVectorQuantization::F16, true) => {
598                simd::batch_dot_scores_f16(query, raw, dim, scores);
599            }
600            (DenseVectorQuantization::UInt8, false) => {
601                simd::batch_cosine_scores_u8(query, raw, dim, scores);
602            }
603            (DenseVectorQuantization::UInt8, true) => {
604                simd::batch_dot_scores_u8(query, raw, dim, scores);
605            }
606        }
607    }
608
609    /// Search dense vectors using RaBitQ
610    ///
611    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
612    /// Doc IDs are segment-local.
613    /// For multi-valued documents, scores are combined using the specified combiner.
614    pub async fn search_dense_vector(
615        &self,
616        field: Field,
617        query: &[f32],
618        k: usize,
619        nprobe: usize,
620        rerank_factor: f32,
621        combiner: crate::query::MultiValueCombiner,
622    ) -> Result<Vec<VectorSearchResult>> {
623        let ann_index = self.vector_indexes.get(&field.0);
624        let lazy_flat = self.flat_vectors.get(&field.0);
625
626        // No vectors at all for this field
627        if ann_index.is_none() && lazy_flat.is_none() {
628            return Ok(Vec::new());
629        }
630
631        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
632        let unit_norm = self
633            .schema
634            .get_field_entry(field)
635            .and_then(|e| e.dense_vector_config.as_ref())
636            .is_some_and(|c| c.unit_norm);
637
638        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
639        const BRUTE_FORCE_BATCH: usize = 4096;
640
641        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
642
643        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
644        let t0 = std::time::Instant::now();
645        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
646            // ANN search (RaBitQ, IVF, ScaNN)
647            match index {
648                VectorIndex::RaBitQ(lazy) => {
649                    let rabitq = lazy.get().ok_or_else(|| {
650                        Error::Schema("RaBitQ index deserialization failed".to_string())
651                    })?;
652                    rabitq
653                        .search(query, fetch_k)
654                        .into_iter()
655                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
656                        .collect()
657                }
658                VectorIndex::IVF(lazy) => {
659                    let (index, codebook) = lazy.get().ok_or_else(|| {
660                        Error::Schema("IVF index deserialization failed".to_string())
661                    })?;
662                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
663                        Error::Schema(format!(
664                            "IVF index requires coarse centroids for field {}",
665                            field.0
666                        ))
667                    })?;
668                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
669                    index
670                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
671                        .into_iter()
672                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
673                        .collect()
674                }
675                VectorIndex::ScaNN(lazy) => {
676                    let (index, codebook) = lazy.get().ok_or_else(|| {
677                        Error::Schema("ScaNN index deserialization failed".to_string())
678                    })?;
679                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
680                        Error::Schema(format!(
681                            "ScaNN index requires coarse centroids for field {}",
682                            field.0
683                        ))
684                    })?;
685                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
686                    index
687                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
688                        .into_iter()
689                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
690                        .collect()
691                }
692            }
693        } else if let Some(lazy_flat) = lazy_flat {
694            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
695            // Uses a top-k heap to avoid collecting and sorting all N candidates.
696            log::debug!(
697                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
698                field.0,
699                lazy_flat.num_vectors,
700                lazy_flat.dim,
701                lazy_flat.quantization
702            );
703            let dim = lazy_flat.dim;
704            let n = lazy_flat.num_vectors;
705            let quant = lazy_flat.quantization;
706            let mut collector = crate::query::ScoreCollector::new(fetch_k);
707            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
708
709            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
710                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
711                let batch_bytes = lazy_flat
712                    .read_vectors_batch(batch_start, batch_count)
713                    .await
714                    .map_err(crate::Error::Io)?;
715                let raw = batch_bytes.as_slice();
716
717                Self::score_quantized_batch(
718                    query,
719                    raw,
720                    quant,
721                    dim,
722                    &mut scores[..batch_count],
723                    unit_norm,
724                );
725
726                for (i, &score) in scores.iter().enumerate().take(batch_count) {
727                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
728                    collector.insert_with_ordinal(doc_id, score, ordinal);
729                }
730            }
731
732            collector
733                .into_sorted_results()
734                .into_iter()
735                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
736                .collect()
737        } else {
738            return Ok(Vec::new());
739        };
740        let l1_elapsed = t0.elapsed();
741        log::debug!(
742            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
743            field.0,
744            results.len(),
745            l1_elapsed.as_secs_f64() * 1000.0
746        );
747
748        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
749        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
750        if ann_index.is_some()
751            && !results.is_empty()
752            && let Some(lazy_flat) = lazy_flat
753        {
754            let t_rerank = std::time::Instant::now();
755            let dim = lazy_flat.dim;
756            let quant = lazy_flat.quantization;
757            let vbs = lazy_flat.vector_byte_size();
758
759            // Resolve flat indexes for each candidate via binary search
760            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
761            for (ri, c) in results.iter().enumerate() {
762                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
763                for (j, &(_, ord)) in entries.iter().enumerate() {
764                    if ord == c.1 {
765                        resolved.push((ri, start + j));
766                        break;
767                    }
768                }
769            }
770
771            let t_resolve = t_rerank.elapsed();
772            if !resolved.is_empty() {
773                // Sort by flat_idx for sequential mmap access (better page locality)
774                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
775
776                // Batch-read raw quantized bytes into contiguous buffer
777                let t_read = std::time::Instant::now();
778                let mut raw_buf = vec![0u8; resolved.len() * vbs];
779                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
780                    let _ = lazy_flat
781                        .read_vector_raw_into(
782                            flat_idx,
783                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
784                        )
785                        .await;
786                }
787
788                let read_elapsed = t_read.elapsed();
789
790                // Native-precision batch SIMD cosine scoring
791                let t_score = std::time::Instant::now();
792                let mut scores = vec![0f32; resolved.len()];
793                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
794                let score_elapsed = t_score.elapsed();
795
796                // Write scores back to results
797                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
798                    results[ri].2 = scores[buf_idx];
799                }
800
801                log::debug!(
802                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
803                    field.0,
804                    resolved.len(),
805                    dim,
806                    quant,
807                    vbs,
808                    t_resolve.as_secs_f64() * 1000.0,
809                    read_elapsed.as_secs_f64() * 1000.0,
810                    score_elapsed.as_secs_f64() * 1000.0,
811                );
812            }
813
814            if results.len() > fetch_k {
815                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
816                results.truncate(fetch_k);
817            }
818            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
819            log::debug!(
820                "[search_dense] field {}: rerank total={:.1}ms",
821                field.0,
822                t_rerank.elapsed().as_secs_f64() * 1000.0
823            );
824        }
825
826        Ok(combine_ordinal_results(results, combiner, k))
827    }
828
829    /// Check if this segment has dense vectors for the given field
830    pub fn has_dense_vector_index(&self, field: Field) -> bool {
831        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
832    }
833
834    /// Get the dense vector index for a field (if available)
835    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
836        match self.vector_indexes.get(&field.0) {
837            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
838            _ => None,
839        }
840    }
841
842    /// Get the IVF vector index for a field (if available)
843    pub fn get_ivf_vector_index(
844        &self,
845        field: Field,
846    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
847        match self.vector_indexes.get(&field.0) {
848            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
849            _ => None,
850        }
851    }
852
853    /// Get coarse centroids for a field
854    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
855        self.coarse_centroids.get(&field_id)
856    }
857
858    /// Set per-field coarse centroids from index-level trained structures
859    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
860        self.coarse_centroids = centroids;
861    }
862
863    /// Get the ScaNN vector index for a field (if available)
864    pub fn get_scann_vector_index(
865        &self,
866        field: Field,
867    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
868        match self.vector_indexes.get(&field.0) {
869            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
870            _ => None,
871        }
872    }
873
874    /// Get the vector index type for a field
875    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
876        self.vector_indexes.get(&field.0)
877    }
878
879    /// Get positions for a term (for phrase queries)
880    ///
881    /// Position offsets are now embedded in TermInfo, so we first look up
882    /// the term to get its TermInfo, then use position_info() to get the offset.
883    pub async fn get_positions(
884        &self,
885        field: Field,
886        term: &[u8],
887    ) -> Result<Option<crate::structures::PositionPostingList>> {
888        // Get positions handle
889        let handle = match &self.positions_handle {
890            Some(h) => h,
891            None => return Ok(None),
892        };
893
894        // Build key: field_id + term
895        let mut key = Vec::with_capacity(4 + term.len());
896        key.extend_from_slice(&field.0.to_le_bytes());
897        key.extend_from_slice(term);
898
899        // Look up term in dictionary to get TermInfo with position offset
900        let term_info = match self.term_dict.get(&key).await? {
901            Some(info) => info,
902            None => return Ok(None),
903        };
904
905        // Get position offset from TermInfo
906        let (offset, length) = match term_info.position_info() {
907            Some((o, l)) => (o, l),
908            None => return Ok(None),
909        };
910
911        // Read the position data
912        let slice = handle.slice(offset..offset + length);
913        let data = slice.read_bytes().await?;
914
915        // Deserialize
916        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
917
918        Ok(Some(pos_list))
919    }
920
921    /// Check if positions are available for a field
922    pub fn has_positions(&self, field: Field) -> bool {
923        // Check schema for position mode on this field
924        if let Some(entry) = self.schema.get_field_entry(field) {
925            entry.positions.is_some()
926        } else {
927            false
928        }
929    }
930}
931
932// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
933#[cfg(feature = "sync")]
934impl SegmentReader {
935    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
936    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
937        // Build key: field_id + term
938        let mut key = Vec::with_capacity(4 + term.len());
939        key.extend_from_slice(&field.0.to_le_bytes());
940        key.extend_from_slice(term);
941
942        // Look up in term dictionary (sync)
943        let term_info = match self.term_dict.get_sync(&key)? {
944            Some(info) => info,
945            None => return Ok(None),
946        };
947
948        // Check if posting list is inlined
949        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
950            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
951            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
952                posting_list.push(doc_id, tf);
953            }
954            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
955            return Ok(Some(block_list));
956        }
957
958        // External posting list — sync range read
959        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
960            Error::Corruption("TermInfo has neither inline nor external data".to_string())
961        })?;
962
963        let start = posting_offset;
964        let end = start + posting_len;
965
966        if end > self.postings_handle.len() {
967            return Err(Error::Corruption(
968                "Posting offset out of bounds".to_string(),
969            ));
970        }
971
972        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
973        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
974
975        Ok(Some(block_list))
976    }
977
978    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
979    pub fn get_positions_sync(
980        &self,
981        field: Field,
982        term: &[u8],
983    ) -> Result<Option<crate::structures::PositionPostingList>> {
984        let handle = match &self.positions_handle {
985            Some(h) => h,
986            None => return Ok(None),
987        };
988
989        // Build key: field_id + term
990        let mut key = Vec::with_capacity(4 + term.len());
991        key.extend_from_slice(&field.0.to_le_bytes());
992        key.extend_from_slice(term);
993
994        // Look up term in dictionary (sync)
995        let term_info = match self.term_dict.get_sync(&key)? {
996            Some(info) => info,
997            None => return Ok(None),
998        };
999
1000        let (offset, length) = match term_info.position_info() {
1001            Some((o, l)) => (o, l),
1002            None => return Ok(None),
1003        };
1004
1005        let slice = handle.slice(offset..offset + length);
1006        let data = slice.read_bytes_sync()?;
1007
1008        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1009        Ok(Some(pos_list))
1010    }
1011
1012    /// Synchronous dense vector search — ANN indexes are already sync,
1013    /// brute-force uses sync mmap reads.
1014    pub fn search_dense_vector_sync(
1015        &self,
1016        field: Field,
1017        query: &[f32],
1018        k: usize,
1019        nprobe: usize,
1020        rerank_factor: f32,
1021        combiner: crate::query::MultiValueCombiner,
1022    ) -> Result<Vec<VectorSearchResult>> {
1023        let ann_index = self.vector_indexes.get(&field.0);
1024        let lazy_flat = self.flat_vectors.get(&field.0);
1025
1026        if ann_index.is_none() && lazy_flat.is_none() {
1027            return Ok(Vec::new());
1028        }
1029
1030        let unit_norm = self
1031            .schema
1032            .get_field_entry(field)
1033            .and_then(|e| e.dense_vector_config.as_ref())
1034            .is_some_and(|c| c.unit_norm);
1035
1036        const BRUTE_FORCE_BATCH: usize = 4096;
1037        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1038
1039        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1040            // ANN search (already sync)
1041            match index {
1042                VectorIndex::RaBitQ(lazy) => {
1043                    let rabitq = lazy.get().ok_or_else(|| {
1044                        Error::Schema("RaBitQ index deserialization failed".to_string())
1045                    })?;
1046                    rabitq
1047                        .search(query, fetch_k)
1048                        .into_iter()
1049                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1050                        .collect()
1051                }
1052                VectorIndex::IVF(lazy) => {
1053                    let (index, codebook) = lazy.get().ok_or_else(|| {
1054                        Error::Schema("IVF index deserialization failed".to_string())
1055                    })?;
1056                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1057                        Error::Schema(format!(
1058                            "IVF index requires coarse centroids for field {}",
1059                            field.0
1060                        ))
1061                    })?;
1062                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1063                    index
1064                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1065                        .into_iter()
1066                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1067                        .collect()
1068                }
1069                VectorIndex::ScaNN(lazy) => {
1070                    let (index, codebook) = lazy.get().ok_or_else(|| {
1071                        Error::Schema("ScaNN index deserialization failed".to_string())
1072                    })?;
1073                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1074                        Error::Schema(format!(
1075                            "ScaNN index requires coarse centroids for field {}",
1076                            field.0
1077                        ))
1078                    })?;
1079                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1080                    index
1081                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1082                        .into_iter()
1083                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1084                        .collect()
1085                }
1086            }
1087        } else if let Some(lazy_flat) = lazy_flat {
1088            // Batched brute-force (sync mmap reads)
1089            let dim = lazy_flat.dim;
1090            let n = lazy_flat.num_vectors;
1091            let quant = lazy_flat.quantization;
1092            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1093            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1094
1095            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1096                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1097                let batch_bytes = lazy_flat
1098                    .read_vectors_batch_sync(batch_start, batch_count)
1099                    .map_err(crate::Error::Io)?;
1100                let raw = batch_bytes.as_slice();
1101
1102                Self::score_quantized_batch(
1103                    query,
1104                    raw,
1105                    quant,
1106                    dim,
1107                    &mut scores[..batch_count],
1108                    unit_norm,
1109                );
1110
1111                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1112                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1113                    collector.insert_with_ordinal(doc_id, score, ordinal);
1114                }
1115            }
1116
1117            collector
1118                .into_sorted_results()
1119                .into_iter()
1120                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1121                .collect()
1122        } else {
1123            return Ok(Vec::new());
1124        };
1125
1126        // Rerank ANN candidates using raw vectors (sync)
1127        if ann_index.is_some()
1128            && !results.is_empty()
1129            && let Some(lazy_flat) = lazy_flat
1130        {
1131            let dim = lazy_flat.dim;
1132            let quant = lazy_flat.quantization;
1133            let vbs = lazy_flat.vector_byte_size();
1134
1135            let mut resolved: Vec<(usize, usize)> = Vec::new();
1136            for (ri, c) in results.iter().enumerate() {
1137                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1138                for (j, &(_, ord)) in entries.iter().enumerate() {
1139                    if ord == c.1 {
1140                        resolved.push((ri, start + j));
1141                        break;
1142                    }
1143                }
1144            }
1145
1146            if !resolved.is_empty() {
1147                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1148                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1149                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1150                    let _ = lazy_flat.read_vector_raw_into_sync(
1151                        flat_idx,
1152                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1153                    );
1154                }
1155
1156                let mut scores = vec![0f32; resolved.len()];
1157                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1158
1159                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1160                    results[ri].2 = scores[buf_idx];
1161                }
1162            }
1163
1164            if results.len() > fetch_k {
1165                results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1166                results.truncate(fetch_k);
1167            }
1168            results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1169        }
1170
1171        Ok(combine_ordinal_results(results, combiner, k))
1172    }
1173}