Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
57/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
58///
59/// Fast path: when all ordinals are 0 (single-valued field), skips the HashMap
60/// grouping entirely and just sorts + truncates the raw results.
61pub(crate) fn combine_ordinal_results(
62    raw: impl IntoIterator<Item = (u32, u16, f32)>,
63    combiner: crate::query::MultiValueCombiner,
64    limit: usize,
65) -> Vec<VectorSearchResult> {
66    let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68    // Fast path: all ordinals are 0 → no grouping needed, skip HashMap
69    let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
70    if all_single {
71        let mut results: Vec<VectorSearchResult> = collected
72            .into_iter()
73            .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
74            .collect();
75        results.sort_unstable_by(|a, b| {
76            b.score
77                .partial_cmp(&a.score)
78                .unwrap_or(std::cmp::Ordering::Equal)
79        });
80        results.truncate(limit);
81        return results;
82    }
83
84    // Slow path: multi-valued field — group by doc_id, apply combiner
85    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
86        rustc_hash::FxHashMap::default();
87    for (doc_id, ordinal, score) in collected {
88        doc_ordinals
89            .entry(doc_id as DocId)
90            .or_default()
91            .push((ordinal as u32, score));
92    }
93    let mut results: Vec<VectorSearchResult> = doc_ordinals
94        .into_iter()
95        .map(|(doc_id, ordinals)| {
96            let combined_score = combiner.combine(&ordinals);
97            VectorSearchResult::new(doc_id, combined_score, ordinals)
98        })
99        .collect();
100    results.sort_unstable_by(|a, b| {
101        b.score
102            .partial_cmp(&a.score)
103            .unwrap_or(std::cmp::Ordering::Equal)
104    });
105    results.truncate(limit);
106    results
107}
108
109/// Async segment reader with lazy loading
110///
111/// - Term dictionary: only index loaded, blocks loaded on-demand
112/// - Postings: loaded on-demand per term via HTTP range requests
113/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
114pub struct SegmentReader {
115    meta: SegmentMeta,
116    /// Term dictionary with lazy block loading
117    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
118    /// Postings file handle - fetches ranges on demand
119    postings_handle: FileHandle,
120    /// Document store with lazy block loading
121    store: Arc<AsyncStoreReader>,
122    schema: Arc<Schema>,
123    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
124    vector_indexes: FxHashMap<u32, VectorIndex>,
125    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
126    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
127    /// Per-field coarse centroids for IVF/ScaNN search
128    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
129    /// Sparse vector indexes per field
130    sparse_indexes: FxHashMap<u32, SparseIndex>,
131    /// Position file handle for phrase queries (lazy loading)
132    positions_handle: Option<FileHandle>,
133    /// Fast-field columnar readers per field_id
134    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
135}
136
137impl SegmentReader {
138    /// Open a segment with lazy loading
139    pub async fn open<D: Directory>(
140        dir: &D,
141        segment_id: SegmentId,
142        schema: Arc<Schema>,
143        cache_blocks: usize,
144    ) -> Result<Self> {
145        let files = SegmentFiles::new(segment_id.0);
146
147        // Read metadata (small, always loaded)
148        let meta_slice = dir.open_read(&files.meta).await?;
149        let meta_bytes = meta_slice.read_bytes().await?;
150        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
151        debug_assert_eq!(meta.id, segment_id.0);
152
153        // Open term dictionary with lazy loading (fetches ranges on demand)
154        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
155        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
156
157        // Get postings file handle (lazy - fetches ranges on demand)
158        let postings_handle = dir.open_lazy(&files.postings).await?;
159
160        // Open store with lazy loading
161        let store_handle = dir.open_lazy(&files.store).await?;
162        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
163
164        // Load dense vector indexes from unified .vectors file
165        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
166        let vector_indexes = vectors_data.indexes;
167        let flat_vectors = vectors_data.flat_vectors;
168
169        // Load sparse vector indexes from .sparse file
170        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
171
172        // Open positions file handle (if exists) - offsets are now in TermInfo
173        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
174
175        // Load fast-field columns from .fast file
176        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
177
178        // Log segment loading stats
179        {
180            let mut parts = vec![format!(
181                "[segment] loaded {:016x}: docs={}",
182                segment_id.0, meta.num_docs
183            )];
184            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
185                parts.push(format!(
186                    "dense: {} ann + {} flat fields",
187                    vector_indexes.len(),
188                    flat_vectors.len()
189                ));
190            }
191            for (field_id, idx) in &sparse_indexes {
192                parts.push(format!(
193                    "sparse field {}: {} dims, ~{:.1} KB",
194                    field_id,
195                    idx.num_dimensions(),
196                    idx.num_dimensions() as f64 * 24.0 / 1024.0
197                ));
198            }
199            if !fast_fields.is_empty() {
200                parts.push(format!("fast: {} fields", fast_fields.len()));
201            }
202            log::debug!("{}", parts.join(", "));
203        }
204
205        Ok(Self {
206            meta,
207            term_dict: Arc::new(term_dict),
208            postings_handle,
209            store: Arc::new(store),
210            schema,
211            vector_indexes,
212            flat_vectors,
213            coarse_centroids: FxHashMap::default(),
214            sparse_indexes,
215            positions_handle,
216            fast_fields,
217        })
218    }
219
220    pub fn meta(&self) -> &SegmentMeta {
221        &self.meta
222    }
223
224    pub fn num_docs(&self) -> u32 {
225        self.meta.num_docs
226    }
227
228    /// Get average field length for BM25F scoring
229    pub fn avg_field_len(&self, field: Field) -> f32 {
230        self.meta.avg_field_len(field)
231    }
232
233    pub fn schema(&self) -> &Schema {
234        &self.schema
235    }
236
237    /// Get sparse indexes for all fields
238    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
239        &self.sparse_indexes
240    }
241
242    /// Get sparse index for a specific field
243    pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
244        self.sparse_indexes.get(&field.0)
245    }
246
247    /// Get vector indexes for all fields
248    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
249        &self.vector_indexes
250    }
251
252    /// Get lazy flat vectors for all fields (for reranking and merge)
253    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
254        &self.flat_vectors
255    }
256
257    /// Get a fast-field reader for a specific field.
258    pub fn fast_field(
259        &self,
260        field_id: u32,
261    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
262        self.fast_fields.get(&field_id)
263    }
264
265    /// Get all fast-field readers.
266    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
267        &self.fast_fields
268    }
269
270    /// Get term dictionary stats for debugging
271    pub fn term_dict_stats(&self) -> SSTableStats {
272        self.term_dict.stats()
273    }
274
275    /// Estimate memory usage of this segment reader
276    pub fn memory_stats(&self) -> SegmentMemoryStats {
277        let term_dict_stats = self.term_dict.stats();
278
279        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
280        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
281
282        // Store cache: similar estimate
283        let store_cache_bytes = self.store.cached_blocks() * 4096;
284
285        // Sparse index: SoA dim table + OwnedBytes skip section
286        let sparse_index_bytes: usize = self
287            .sparse_indexes
288            .values()
289            .map(|s| s.estimated_memory_bytes())
290            .sum();
291
292        // Dense index: vectors are memory-mapped, but we track index structures
293        // RaBitQ/IVF indexes have cluster assignments in memory
294        let dense_index_bytes: usize = self
295            .vector_indexes
296            .values()
297            .map(|v| v.estimated_memory_bytes())
298            .sum();
299
300        SegmentMemoryStats {
301            segment_id: self.meta.id,
302            num_docs: self.meta.num_docs,
303            term_dict_cache_bytes,
304            store_cache_bytes,
305            sparse_index_bytes,
306            dense_index_bytes,
307            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
308        }
309    }
310
311    /// Get posting list for a term (async - loads on demand)
312    ///
313    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
314    /// and no additional I/O is needed. For larger lists, reads from .post file.
315    pub async fn get_postings(
316        &self,
317        field: Field,
318        term: &[u8],
319    ) -> Result<Option<BlockPostingList>> {
320        log::debug!(
321            "SegmentReader::get_postings field={} term_len={}",
322            field.0,
323            term.len()
324        );
325
326        // Build key: field_id + term
327        let mut key = Vec::with_capacity(4 + term.len());
328        key.extend_from_slice(&field.0.to_le_bytes());
329        key.extend_from_slice(term);
330
331        // Look up in term dictionary
332        let term_info = match self.term_dict.get(&key).await? {
333            Some(info) => {
334                log::debug!("SegmentReader::get_postings found term_info");
335                info
336            }
337            None => {
338                log::debug!("SegmentReader::get_postings term not found");
339                return Ok(None);
340            }
341        };
342
343        // Check if posting list is inlined
344        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
345            // Build BlockPostingList from inline data (no I/O needed!)
346            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
347            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
348                posting_list.push(doc_id, tf);
349            }
350            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
351            return Ok(Some(block_list));
352        }
353
354        // External posting list - read from postings file handle (lazy - HTTP range request)
355        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
356            Error::Corruption("TermInfo has neither inline nor external data".to_string())
357        })?;
358
359        let start = posting_offset;
360        let end = start + posting_len;
361
362        if end > self.postings_handle.len() {
363            return Err(Error::Corruption(
364                "Posting offset out of bounds".to_string(),
365            ));
366        }
367
368        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
369        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
370
371        Ok(Some(block_list))
372    }
373
374    /// Get document by local doc_id (async - loads on demand).
375    ///
376    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
377    /// Uses binary search on sorted doc_ids for O(log N) lookup.
378    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
379        self.doc_with_fields(local_doc_id, None).await
380    }
381
382    /// Get document by local doc_id, hydrating only the specified fields.
383    ///
384    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
385    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
386    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
387    pub async fn doc_with_fields(
388        &self,
389        local_doc_id: DocId,
390        fields: Option<&rustc_hash::FxHashSet<u32>>,
391    ) -> Result<Option<Document>> {
392        let mut doc = match fields {
393            Some(set) => {
394                let field_ids: Vec<u32> = set.iter().copied().collect();
395                match self
396                    .store
397                    .get_fields(local_doc_id, &self.schema, &field_ids)
398                    .await
399                {
400                    Ok(Some(d)) => d,
401                    Ok(None) => return Ok(None),
402                    Err(e) => return Err(Error::from(e)),
403                }
404            }
405            None => match self.store.get(local_doc_id, &self.schema).await {
406                Ok(Some(d)) => d,
407                Ok(None) => return Ok(None),
408                Err(e) => return Err(Error::from(e)),
409            },
410        };
411
412        // Hydrate dense vector fields from flat vector data
413        for (&field_id, lazy_flat) in &self.flat_vectors {
414            // Skip vector fields not in the requested set
415            if let Some(set) = fields
416                && !set.contains(&field_id)
417            {
418                continue;
419            }
420
421            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
422            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
423                let flat_idx = start + j;
424                match lazy_flat.get_vector(flat_idx).await {
425                    Ok(vec) => {
426                        doc.add_dense_vector(Field(field_id), vec);
427                    }
428                    Err(e) => {
429                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
430                    }
431                }
432            }
433        }
434
435        Ok(Some(doc))
436    }
437
438    /// Prefetch term dictionary blocks for a key range
439    pub async fn prefetch_terms(
440        &self,
441        field: Field,
442        start_term: &[u8],
443        end_term: &[u8],
444    ) -> Result<()> {
445        let mut start_key = Vec::with_capacity(4 + start_term.len());
446        start_key.extend_from_slice(&field.0.to_le_bytes());
447        start_key.extend_from_slice(start_term);
448
449        let mut end_key = Vec::with_capacity(4 + end_term.len());
450        end_key.extend_from_slice(&field.0.to_le_bytes());
451        end_key.extend_from_slice(end_term);
452
453        self.term_dict.prefetch_range(&start_key, &end_key).await?;
454        Ok(())
455    }
456
457    /// Check if store uses dictionary compression (incompatible with raw merging)
458    pub fn store_has_dict(&self) -> bool {
459        self.store.has_dict()
460    }
461
462    /// Get store reference for merge operations
463    pub fn store(&self) -> &super::store::AsyncStoreReader {
464        &self.store
465    }
466
467    /// Get raw store blocks for optimized merging
468    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
469        self.store.raw_blocks()
470    }
471
472    /// Get store data slice for raw block access
473    pub fn store_data_slice(&self) -> &FileHandle {
474        self.store.data_slice()
475    }
476
477    /// Get all terms from this segment (for merge)
478    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
479        self.term_dict.all_entries().await.map_err(Error::from)
480    }
481
482    /// Get all terms with parsed field and term string (for statistics aggregation)
483    ///
484    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
485    /// Skips terms that aren't valid UTF-8.
486    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
487        let entries = self.term_dict.all_entries().await?;
488        let mut result = Vec::with_capacity(entries.len());
489
490        for (key, term_info) in entries {
491            // Key format: field_id (4 bytes little-endian) + term bytes
492            if key.len() > 4 {
493                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
494                let term_bytes = &key[4..];
495                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
496                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
497                }
498            }
499        }
500
501        Ok(result)
502    }
503
504    /// Get streaming iterator over term dictionary (for memory-efficient merge)
505    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
506        self.term_dict.iter()
507    }
508
509    /// Prefetch all term dictionary blocks in a single bulk I/O call.
510    ///
511    /// Call before merge iteration to eliminate per-block cache misses.
512    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
513        self.term_dict
514            .prefetch_all_data_bulk()
515            .await
516            .map_err(crate::Error::from)
517    }
518
519    /// Read raw posting bytes at offset
520    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
521        let start = offset;
522        let end = start + len;
523        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
524        Ok(bytes.to_vec())
525    }
526
527    /// Read raw position bytes at offset (for merge)
528    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
529        let handle = match &self.positions_handle {
530            Some(h) => h,
531            None => return Ok(None),
532        };
533        let start = offset;
534        let end = start + len;
535        let bytes = handle.read_bytes_range(start..end).await?;
536        Ok(Some(bytes.to_vec()))
537    }
538
539    /// Check if this segment has a positions file
540    pub fn has_positions_file(&self) -> bool {
541        self.positions_handle.is_some()
542    }
543
544    /// Batch cosine scoring on raw quantized bytes.
545    ///
546    /// Dispatches to the appropriate SIMD scorer based on quantization type.
547    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
548    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
549    fn score_quantized_batch(
550        query: &[f32],
551        raw: &[u8],
552        quant: crate::dsl::DenseVectorQuantization,
553        dim: usize,
554        scores: &mut [f32],
555        unit_norm: bool,
556    ) {
557        use crate::dsl::DenseVectorQuantization;
558        use crate::structures::simd;
559        match (quant, unit_norm) {
560            (DenseVectorQuantization::F32, false) => {
561                let num_floats = scores.len() * dim;
562                debug_assert!(
563                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
564                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
565                );
566                let vectors: &[f32] =
567                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
568                simd::batch_cosine_scores(query, vectors, dim, scores);
569            }
570            (DenseVectorQuantization::F32, true) => {
571                let num_floats = scores.len() * dim;
572                debug_assert!(
573                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
574                    "f32 vector data not 4-byte aligned"
575                );
576                let vectors: &[f32] =
577                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
578                simd::batch_dot_scores(query, vectors, dim, scores);
579            }
580            (DenseVectorQuantization::F16, false) => {
581                simd::batch_cosine_scores_f16(query, raw, dim, scores);
582            }
583            (DenseVectorQuantization::F16, true) => {
584                simd::batch_dot_scores_f16(query, raw, dim, scores);
585            }
586            (DenseVectorQuantization::UInt8, false) => {
587                simd::batch_cosine_scores_u8(query, raw, dim, scores);
588            }
589            (DenseVectorQuantization::UInt8, true) => {
590                simd::batch_dot_scores_u8(query, raw, dim, scores);
591            }
592        }
593    }
594
595    /// Search dense vectors using RaBitQ
596    ///
597    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
598    /// Doc IDs are segment-local.
599    /// For multi-valued documents, scores are combined using the specified combiner.
600    pub async fn search_dense_vector(
601        &self,
602        field: Field,
603        query: &[f32],
604        k: usize,
605        nprobe: usize,
606        rerank_factor: f32,
607        combiner: crate::query::MultiValueCombiner,
608    ) -> Result<Vec<VectorSearchResult>> {
609        let ann_index = self.vector_indexes.get(&field.0);
610        let lazy_flat = self.flat_vectors.get(&field.0);
611
612        // No vectors at all for this field
613        if ann_index.is_none() && lazy_flat.is_none() {
614            return Ok(Vec::new());
615        }
616
617        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
618        let unit_norm = self
619            .schema
620            .get_field_entry(field)
621            .and_then(|e| e.dense_vector_config.as_ref())
622            .is_some_and(|c| c.unit_norm);
623
624        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
625        const BRUTE_FORCE_BATCH: usize = 4096;
626
627        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
628
629        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
630        let t0 = std::time::Instant::now();
631        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
632            // ANN search (RaBitQ, IVF, ScaNN)
633            match index {
634                VectorIndex::RaBitQ(lazy) => {
635                    let rabitq = lazy.get().ok_or_else(|| {
636                        Error::Schema("RaBitQ index deserialization failed".to_string())
637                    })?;
638                    rabitq
639                        .search(query, fetch_k)
640                        .into_iter()
641                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
642                        .collect()
643                }
644                VectorIndex::IVF(lazy) => {
645                    let (index, codebook) = lazy.get().ok_or_else(|| {
646                        Error::Schema("IVF index deserialization failed".to_string())
647                    })?;
648                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
649                        Error::Schema(format!(
650                            "IVF index requires coarse centroids for field {}",
651                            field.0
652                        ))
653                    })?;
654                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
655                    index
656                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
657                        .into_iter()
658                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
659                        .collect()
660                }
661                VectorIndex::ScaNN(lazy) => {
662                    let (index, codebook) = lazy.get().ok_or_else(|| {
663                        Error::Schema("ScaNN index deserialization failed".to_string())
664                    })?;
665                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
666                        Error::Schema(format!(
667                            "ScaNN index requires coarse centroids for field {}",
668                            field.0
669                        ))
670                    })?;
671                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
672                    index
673                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
674                        .into_iter()
675                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
676                        .collect()
677                }
678            }
679        } else if let Some(lazy_flat) = lazy_flat {
680            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
681            // Uses a top-k heap to avoid collecting and sorting all N candidates.
682            log::debug!(
683                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
684                field.0,
685                lazy_flat.num_vectors,
686                lazy_flat.dim,
687                lazy_flat.quantization
688            );
689            let dim = lazy_flat.dim;
690            let n = lazy_flat.num_vectors;
691            let quant = lazy_flat.quantization;
692            let mut collector = crate::query::ScoreCollector::new(fetch_k);
693            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
694
695            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
696                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
697                let batch_bytes = lazy_flat
698                    .read_vectors_batch(batch_start, batch_count)
699                    .await
700                    .map_err(crate::Error::Io)?;
701                let raw = batch_bytes.as_slice();
702
703                Self::score_quantized_batch(
704                    query,
705                    raw,
706                    quant,
707                    dim,
708                    &mut scores[..batch_count],
709                    unit_norm,
710                );
711
712                for (i, &score) in scores.iter().enumerate().take(batch_count) {
713                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
714                    collector.insert_with_ordinal(doc_id, score, ordinal);
715                }
716            }
717
718            collector
719                .into_sorted_results()
720                .into_iter()
721                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
722                .collect()
723        } else {
724            return Ok(Vec::new());
725        };
726        let l1_elapsed = t0.elapsed();
727        log::debug!(
728            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
729            field.0,
730            results.len(),
731            l1_elapsed.as_secs_f64() * 1000.0
732        );
733
734        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
735        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
736        if ann_index.is_some()
737            && !results.is_empty()
738            && let Some(lazy_flat) = lazy_flat
739        {
740            let t_rerank = std::time::Instant::now();
741            let dim = lazy_flat.dim;
742            let quant = lazy_flat.quantization;
743            let vbs = lazy_flat.vector_byte_size();
744
745            // Resolve flat indexes for each candidate via binary search
746            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
747            for (ri, c) in results.iter().enumerate() {
748                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
749                for (j, &(_, ord)) in entries.iter().enumerate() {
750                    if ord == c.1 {
751                        resolved.push((ri, start + j));
752                        break;
753                    }
754                }
755            }
756
757            let t_resolve = t_rerank.elapsed();
758            if !resolved.is_empty() {
759                // Sort by flat_idx for sequential mmap access (better page locality)
760                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
761
762                // Batch-read raw quantized bytes into contiguous buffer
763                let t_read = std::time::Instant::now();
764                let mut raw_buf = vec![0u8; resolved.len() * vbs];
765                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
766                    let _ = lazy_flat
767                        .read_vector_raw_into(
768                            flat_idx,
769                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
770                        )
771                        .await;
772                }
773
774                let read_elapsed = t_read.elapsed();
775
776                // Native-precision batch SIMD cosine scoring
777                let t_score = std::time::Instant::now();
778                let mut scores = vec![0f32; resolved.len()];
779                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
780                let score_elapsed = t_score.elapsed();
781
782                // Write scores back to results
783                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
784                    results[ri].2 = scores[buf_idx];
785                }
786
787                log::debug!(
788                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
789                    field.0,
790                    resolved.len(),
791                    dim,
792                    quant,
793                    vbs,
794                    t_resolve.as_secs_f64() * 1000.0,
795                    read_elapsed.as_secs_f64() * 1000.0,
796                    score_elapsed.as_secs_f64() * 1000.0,
797                );
798            }
799
800            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
801            results.truncate(fetch_k);
802            log::debug!(
803                "[search_dense] field {}: rerank total={:.1}ms",
804                field.0,
805                t_rerank.elapsed().as_secs_f64() * 1000.0
806            );
807        }
808
809        Ok(combine_ordinal_results(results, combiner, k))
810    }
811
812    /// Check if this segment has dense vectors for the given field
813    pub fn has_dense_vector_index(&self, field: Field) -> bool {
814        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
815    }
816
817    /// Get the dense vector index for a field (if available)
818    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
819        match self.vector_indexes.get(&field.0) {
820            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
821            _ => None,
822        }
823    }
824
825    /// Get the IVF vector index for a field (if available)
826    pub fn get_ivf_vector_index(
827        &self,
828        field: Field,
829    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
830        match self.vector_indexes.get(&field.0) {
831            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
832            _ => None,
833        }
834    }
835
836    /// Get coarse centroids for a field
837    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
838        self.coarse_centroids.get(&field_id)
839    }
840
841    /// Set per-field coarse centroids from index-level trained structures
842    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
843        self.coarse_centroids = centroids;
844    }
845
846    /// Get the ScaNN vector index for a field (if available)
847    pub fn get_scann_vector_index(
848        &self,
849        field: Field,
850    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
851        match self.vector_indexes.get(&field.0) {
852            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
853            _ => None,
854        }
855    }
856
857    /// Get the vector index type for a field
858    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
859        self.vector_indexes.get(&field.0)
860    }
861
862    /// Get positions for a term (for phrase queries)
863    ///
864    /// Position offsets are now embedded in TermInfo, so we first look up
865    /// the term to get its TermInfo, then use position_info() to get the offset.
866    pub async fn get_positions(
867        &self,
868        field: Field,
869        term: &[u8],
870    ) -> Result<Option<crate::structures::PositionPostingList>> {
871        // Get positions handle
872        let handle = match &self.positions_handle {
873            Some(h) => h,
874            None => return Ok(None),
875        };
876
877        // Build key: field_id + term
878        let mut key = Vec::with_capacity(4 + term.len());
879        key.extend_from_slice(&field.0.to_le_bytes());
880        key.extend_from_slice(term);
881
882        // Look up term in dictionary to get TermInfo with position offset
883        let term_info = match self.term_dict.get(&key).await? {
884            Some(info) => info,
885            None => return Ok(None),
886        };
887
888        // Get position offset from TermInfo
889        let (offset, length) = match term_info.position_info() {
890            Some((o, l)) => (o, l),
891            None => return Ok(None),
892        };
893
894        // Read the position data
895        let slice = handle.slice(offset..offset + length);
896        let data = slice.read_bytes().await?;
897
898        // Deserialize
899        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
900
901        Ok(Some(pos_list))
902    }
903
904    /// Check if positions are available for a field
905    pub fn has_positions(&self, field: Field) -> bool {
906        // Check schema for position mode on this field
907        if let Some(entry) = self.schema.get_field_entry(field) {
908            entry.positions.is_some()
909        } else {
910            false
911        }
912    }
913}
914
915// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
916#[cfg(feature = "sync")]
917impl SegmentReader {
918    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
919    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
920        // Build key: field_id + term
921        let mut key = Vec::with_capacity(4 + term.len());
922        key.extend_from_slice(&field.0.to_le_bytes());
923        key.extend_from_slice(term);
924
925        // Look up in term dictionary (sync)
926        let term_info = match self.term_dict.get_sync(&key)? {
927            Some(info) => info,
928            None => return Ok(None),
929        };
930
931        // Check if posting list is inlined
932        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
933            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
934            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
935                posting_list.push(doc_id, tf);
936            }
937            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
938            return Ok(Some(block_list));
939        }
940
941        // External posting list — sync range read
942        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
943            Error::Corruption("TermInfo has neither inline nor external data".to_string())
944        })?;
945
946        let start = posting_offset;
947        let end = start + posting_len;
948
949        if end > self.postings_handle.len() {
950            return Err(Error::Corruption(
951                "Posting offset out of bounds".to_string(),
952            ));
953        }
954
955        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
956        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
957
958        Ok(Some(block_list))
959    }
960
961    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
962    pub fn get_positions_sync(
963        &self,
964        field: Field,
965        term: &[u8],
966    ) -> Result<Option<crate::structures::PositionPostingList>> {
967        let handle = match &self.positions_handle {
968            Some(h) => h,
969            None => return Ok(None),
970        };
971
972        // Build key: field_id + term
973        let mut key = Vec::with_capacity(4 + term.len());
974        key.extend_from_slice(&field.0.to_le_bytes());
975        key.extend_from_slice(term);
976
977        // Look up term in dictionary (sync)
978        let term_info = match self.term_dict.get_sync(&key)? {
979            Some(info) => info,
980            None => return Ok(None),
981        };
982
983        let (offset, length) = match term_info.position_info() {
984            Some((o, l)) => (o, l),
985            None => return Ok(None),
986        };
987
988        let slice = handle.slice(offset..offset + length);
989        let data = slice.read_bytes_sync()?;
990
991        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
992        Ok(Some(pos_list))
993    }
994
995    /// Synchronous dense vector search — ANN indexes are already sync,
996    /// brute-force uses sync mmap reads.
997    pub fn search_dense_vector_sync(
998        &self,
999        field: Field,
1000        query: &[f32],
1001        k: usize,
1002        nprobe: usize,
1003        rerank_factor: f32,
1004        combiner: crate::query::MultiValueCombiner,
1005    ) -> Result<Vec<VectorSearchResult>> {
1006        let ann_index = self.vector_indexes.get(&field.0);
1007        let lazy_flat = self.flat_vectors.get(&field.0);
1008
1009        if ann_index.is_none() && lazy_flat.is_none() {
1010            return Ok(Vec::new());
1011        }
1012
1013        let unit_norm = self
1014            .schema
1015            .get_field_entry(field)
1016            .and_then(|e| e.dense_vector_config.as_ref())
1017            .is_some_and(|c| c.unit_norm);
1018
1019        const BRUTE_FORCE_BATCH: usize = 4096;
1020        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1021
1022        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1023            // ANN search (already sync)
1024            match index {
1025                VectorIndex::RaBitQ(lazy) => {
1026                    let rabitq = lazy.get().ok_or_else(|| {
1027                        Error::Schema("RaBitQ index deserialization failed".to_string())
1028                    })?;
1029                    rabitq
1030                        .search(query, fetch_k)
1031                        .into_iter()
1032                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1033                        .collect()
1034                }
1035                VectorIndex::IVF(lazy) => {
1036                    let (index, codebook) = lazy.get().ok_or_else(|| {
1037                        Error::Schema("IVF index deserialization failed".to_string())
1038                    })?;
1039                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1040                        Error::Schema(format!(
1041                            "IVF index requires coarse centroids for field {}",
1042                            field.0
1043                        ))
1044                    })?;
1045                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1046                    index
1047                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1048                        .into_iter()
1049                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1050                        .collect()
1051                }
1052                VectorIndex::ScaNN(lazy) => {
1053                    let (index, codebook) = lazy.get().ok_or_else(|| {
1054                        Error::Schema("ScaNN index deserialization failed".to_string())
1055                    })?;
1056                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1057                        Error::Schema(format!(
1058                            "ScaNN index requires coarse centroids for field {}",
1059                            field.0
1060                        ))
1061                    })?;
1062                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1063                    index
1064                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1065                        .into_iter()
1066                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1067                        .collect()
1068                }
1069            }
1070        } else if let Some(lazy_flat) = lazy_flat {
1071            // Batched brute-force (sync mmap reads)
1072            let dim = lazy_flat.dim;
1073            let n = lazy_flat.num_vectors;
1074            let quant = lazy_flat.quantization;
1075            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1076            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1077
1078            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1079                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1080                let batch_bytes = lazy_flat
1081                    .read_vectors_batch_sync(batch_start, batch_count)
1082                    .map_err(crate::Error::Io)?;
1083                let raw = batch_bytes.as_slice();
1084
1085                Self::score_quantized_batch(
1086                    query,
1087                    raw,
1088                    quant,
1089                    dim,
1090                    &mut scores[..batch_count],
1091                    unit_norm,
1092                );
1093
1094                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1095                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1096                    collector.insert_with_ordinal(doc_id, score, ordinal);
1097                }
1098            }
1099
1100            collector
1101                .into_sorted_results()
1102                .into_iter()
1103                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1104                .collect()
1105        } else {
1106            return Ok(Vec::new());
1107        };
1108
1109        // Rerank ANN candidates using raw vectors (sync)
1110        if ann_index.is_some()
1111            && !results.is_empty()
1112            && let Some(lazy_flat) = lazy_flat
1113        {
1114            let dim = lazy_flat.dim;
1115            let quant = lazy_flat.quantization;
1116            let vbs = lazy_flat.vector_byte_size();
1117
1118            let mut resolved: Vec<(usize, usize)> = Vec::new();
1119            for (ri, c) in results.iter().enumerate() {
1120                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1121                for (j, &(_, ord)) in entries.iter().enumerate() {
1122                    if ord == c.1 {
1123                        resolved.push((ri, start + j));
1124                        break;
1125                    }
1126                }
1127            }
1128
1129            if !resolved.is_empty() {
1130                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1131                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1132                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1133                    let _ = lazy_flat.read_vector_raw_into_sync(
1134                        flat_idx,
1135                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1136                    );
1137                }
1138
1139                let mut scores = vec![0f32; resolved.len()];
1140                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1141
1142                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1143                    results[ri].2 = scores[buf_idx];
1144                }
1145            }
1146
1147            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1148            results.truncate(fetch_k);
1149        }
1150
1151        Ok(combine_ordinal_results(results, combiner, k))
1152    }
1153}