Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
57/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
58///
59/// Fast path: when all ordinals are 0 (single-valued field), skips the HashMap
60/// grouping entirely and just sorts + truncates the raw results.
61pub(crate) fn combine_ordinal_results(
62    raw: impl IntoIterator<Item = (u32, u16, f32)>,
63    combiner: crate::query::MultiValueCombiner,
64    limit: usize,
65) -> Vec<VectorSearchResult> {
66    let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68    let num_raw = collected.len();
69    let unique_docs = {
70        let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
71        ids.sort_unstable();
72        ids.dedup();
73        ids.len()
74    };
75    log::debug!(
76        "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77        num_raw,
78        unique_docs,
79        combiner,
80        limit
81    );
82
83    // Fast path: all ordinals are 0 → no grouping needed, skip HashMap
84    let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
85    if all_single {
86        let mut results: Vec<VectorSearchResult> = collected
87            .into_iter()
88            .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
89            .collect();
90        results.sort_unstable_by(|a, b| {
91            b.score
92                .partial_cmp(&a.score)
93                .unwrap_or(std::cmp::Ordering::Equal)
94        });
95        results.truncate(limit);
96        return results;
97    }
98
99    // Slow path: multi-valued field — group by doc_id, apply combiner
100    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
101        rustc_hash::FxHashMap::default();
102    for (doc_id, ordinal, score) in collected {
103        doc_ordinals
104            .entry(doc_id as DocId)
105            .or_default()
106            .push((ordinal as u32, score));
107    }
108    let mut results: Vec<VectorSearchResult> = doc_ordinals
109        .into_iter()
110        .map(|(doc_id, ordinals)| {
111            let combined_score = combiner.combine(&ordinals);
112            VectorSearchResult::new(doc_id, combined_score, ordinals)
113        })
114        .collect();
115    results.sort_unstable_by(|a, b| {
116        b.score
117            .partial_cmp(&a.score)
118            .unwrap_or(std::cmp::Ordering::Equal)
119    });
120    results.truncate(limit);
121    results
122}
123
124/// Async segment reader with lazy loading
125///
126/// - Term dictionary: only index loaded, blocks loaded on-demand
127/// - Postings: loaded on-demand per term via HTTP range requests
128/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
129pub struct SegmentReader {
130    meta: SegmentMeta,
131    /// Term dictionary with lazy block loading
132    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
133    /// Postings file handle - fetches ranges on demand
134    postings_handle: FileHandle,
135    /// Document store with lazy block loading
136    store: Arc<AsyncStoreReader>,
137    schema: Arc<Schema>,
138    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
139    vector_indexes: FxHashMap<u32, VectorIndex>,
140    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
141    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
142    /// Per-field coarse centroids for IVF/ScaNN search
143    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
144    /// Sparse vector indexes per field
145    sparse_indexes: FxHashMap<u32, SparseIndex>,
146    /// Position file handle for phrase queries (lazy loading)
147    positions_handle: Option<FileHandle>,
148    /// Fast-field columnar readers per field_id
149    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
150}
151
152impl SegmentReader {
153    /// Open a segment with lazy loading
154    pub async fn open<D: Directory>(
155        dir: &D,
156        segment_id: SegmentId,
157        schema: Arc<Schema>,
158        cache_blocks: usize,
159    ) -> Result<Self> {
160        let files = SegmentFiles::new(segment_id.0);
161
162        // Read metadata (small, always loaded)
163        let meta_slice = dir.open_read(&files.meta).await?;
164        let meta_bytes = meta_slice.read_bytes().await?;
165        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
166        debug_assert_eq!(meta.id, segment_id.0);
167
168        // Open term dictionary with lazy loading (fetches ranges on demand)
169        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
170        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
171
172        // Get postings file handle (lazy - fetches ranges on demand)
173        let postings_handle = dir.open_lazy(&files.postings).await?;
174
175        // Open store with lazy loading
176        let store_handle = dir.open_lazy(&files.store).await?;
177        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
178
179        // Load dense vector indexes from unified .vectors file
180        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
181        let vector_indexes = vectors_data.indexes;
182        let flat_vectors = vectors_data.flat_vectors;
183
184        // Load sparse vector indexes from .sparse file
185        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
186
187        // Open positions file handle (if exists) - offsets are now in TermInfo
188        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
189
190        // Load fast-field columns from .fast file
191        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
192
193        // Log segment loading stats
194        {
195            let mut parts = vec![format!(
196                "[segment] loaded {:016x}: docs={}",
197                segment_id.0, meta.num_docs
198            )];
199            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
200                parts.push(format!(
201                    "dense: {} ann + {} flat fields",
202                    vector_indexes.len(),
203                    flat_vectors.len()
204                ));
205            }
206            for (field_id, idx) in &sparse_indexes {
207                parts.push(format!(
208                    "sparse field {}: {} dims, ~{:.1} KB",
209                    field_id,
210                    idx.num_dimensions(),
211                    idx.num_dimensions() as f64 * 24.0 / 1024.0
212                ));
213            }
214            if !fast_fields.is_empty() {
215                parts.push(format!("fast: {} fields", fast_fields.len()));
216            }
217            log::debug!("{}", parts.join(", "));
218        }
219
220        Ok(Self {
221            meta,
222            term_dict: Arc::new(term_dict),
223            postings_handle,
224            store: Arc::new(store),
225            schema,
226            vector_indexes,
227            flat_vectors,
228            coarse_centroids: FxHashMap::default(),
229            sparse_indexes,
230            positions_handle,
231            fast_fields,
232        })
233    }
234
235    pub fn meta(&self) -> &SegmentMeta {
236        &self.meta
237    }
238
239    pub fn num_docs(&self) -> u32 {
240        self.meta.num_docs
241    }
242
243    /// Get average field length for BM25F scoring
244    pub fn avg_field_len(&self, field: Field) -> f32 {
245        self.meta.avg_field_len(field)
246    }
247
248    pub fn schema(&self) -> &Schema {
249        &self.schema
250    }
251
252    /// Get sparse indexes for all fields
253    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
254        &self.sparse_indexes
255    }
256
257    /// Get sparse index for a specific field
258    pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
259        self.sparse_indexes.get(&field.0)
260    }
261
262    /// Get vector indexes for all fields
263    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
264        &self.vector_indexes
265    }
266
267    /// Get lazy flat vectors for all fields (for reranking and merge)
268    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
269        &self.flat_vectors
270    }
271
272    /// Get a fast-field reader for a specific field.
273    pub fn fast_field(
274        &self,
275        field_id: u32,
276    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
277        self.fast_fields.get(&field_id)
278    }
279
280    /// Get all fast-field readers.
281    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
282        &self.fast_fields
283    }
284
285    /// Get term dictionary stats for debugging
286    pub fn term_dict_stats(&self) -> SSTableStats {
287        self.term_dict.stats()
288    }
289
290    /// Estimate memory usage of this segment reader
291    pub fn memory_stats(&self) -> SegmentMemoryStats {
292        let term_dict_stats = self.term_dict.stats();
293
294        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
295        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
296
297        // Store cache: similar estimate
298        let store_cache_bytes = self.store.cached_blocks() * 4096;
299
300        // Sparse index: SoA dim table + OwnedBytes skip section
301        let sparse_index_bytes: usize = self
302            .sparse_indexes
303            .values()
304            .map(|s| s.estimated_memory_bytes())
305            .sum();
306
307        // Dense index: vectors are memory-mapped, but we track index structures
308        // RaBitQ/IVF indexes have cluster assignments in memory
309        let dense_index_bytes: usize = self
310            .vector_indexes
311            .values()
312            .map(|v| v.estimated_memory_bytes())
313            .sum();
314
315        SegmentMemoryStats {
316            segment_id: self.meta.id,
317            num_docs: self.meta.num_docs,
318            term_dict_cache_bytes,
319            store_cache_bytes,
320            sparse_index_bytes,
321            dense_index_bytes,
322            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
323        }
324    }
325
326    /// Get posting list for a term (async - loads on demand)
327    ///
328    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
329    /// and no additional I/O is needed. For larger lists, reads from .post file.
330    pub async fn get_postings(
331        &self,
332        field: Field,
333        term: &[u8],
334    ) -> Result<Option<BlockPostingList>> {
335        log::debug!(
336            "SegmentReader::get_postings field={} term_len={}",
337            field.0,
338            term.len()
339        );
340
341        // Build key: field_id + term
342        let mut key = Vec::with_capacity(4 + term.len());
343        key.extend_from_slice(&field.0.to_le_bytes());
344        key.extend_from_slice(term);
345
346        // Look up in term dictionary
347        let term_info = match self.term_dict.get(&key).await? {
348            Some(info) => {
349                log::debug!("SegmentReader::get_postings found term_info");
350                info
351            }
352            None => {
353                log::debug!("SegmentReader::get_postings term not found");
354                return Ok(None);
355            }
356        };
357
358        // Check if posting list is inlined
359        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
360            // Build BlockPostingList from inline data (no I/O needed!)
361            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
362            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
363                posting_list.push(doc_id, tf);
364            }
365            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
366            return Ok(Some(block_list));
367        }
368
369        // External posting list - read from postings file handle (lazy - HTTP range request)
370        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
371            Error::Corruption("TermInfo has neither inline nor external data".to_string())
372        })?;
373
374        let start = posting_offset;
375        let end = start + posting_len;
376
377        if end > self.postings_handle.len() {
378            return Err(Error::Corruption(
379                "Posting offset out of bounds".to_string(),
380            ));
381        }
382
383        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
384        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
385
386        Ok(Some(block_list))
387    }
388
389    /// Get document by local doc_id (async - loads on demand).
390    ///
391    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
392    /// Uses binary search on sorted doc_ids for O(log N) lookup.
393    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
394        self.doc_with_fields(local_doc_id, None).await
395    }
396
397    /// Get document by local doc_id, hydrating only the specified fields.
398    ///
399    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
400    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
401    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
402    pub async fn doc_with_fields(
403        &self,
404        local_doc_id: DocId,
405        fields: Option<&rustc_hash::FxHashSet<u32>>,
406    ) -> Result<Option<Document>> {
407        let mut doc = match fields {
408            Some(set) => {
409                let field_ids: Vec<u32> = set.iter().copied().collect();
410                match self
411                    .store
412                    .get_fields(local_doc_id, &self.schema, &field_ids)
413                    .await
414                {
415                    Ok(Some(d)) => d,
416                    Ok(None) => return Ok(None),
417                    Err(e) => return Err(Error::from(e)),
418                }
419            }
420            None => match self.store.get(local_doc_id, &self.schema).await {
421                Ok(Some(d)) => d,
422                Ok(None) => return Ok(None),
423                Err(e) => return Err(Error::from(e)),
424            },
425        };
426
427        // Hydrate dense vector fields from flat vector data
428        for (&field_id, lazy_flat) in &self.flat_vectors {
429            // Skip vector fields not in the requested set
430            if let Some(set) = fields
431                && !set.contains(&field_id)
432            {
433                continue;
434            }
435
436            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
437            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
438                let flat_idx = start + j;
439                match lazy_flat.get_vector(flat_idx).await {
440                    Ok(vec) => {
441                        doc.add_dense_vector(Field(field_id), vec);
442                    }
443                    Err(e) => {
444                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
445                    }
446                }
447            }
448        }
449
450        Ok(Some(doc))
451    }
452
453    /// Prefetch term dictionary blocks for a key range
454    pub async fn prefetch_terms(
455        &self,
456        field: Field,
457        start_term: &[u8],
458        end_term: &[u8],
459    ) -> Result<()> {
460        let mut start_key = Vec::with_capacity(4 + start_term.len());
461        start_key.extend_from_slice(&field.0.to_le_bytes());
462        start_key.extend_from_slice(start_term);
463
464        let mut end_key = Vec::with_capacity(4 + end_term.len());
465        end_key.extend_from_slice(&field.0.to_le_bytes());
466        end_key.extend_from_slice(end_term);
467
468        self.term_dict.prefetch_range(&start_key, &end_key).await?;
469        Ok(())
470    }
471
472    /// Check if store uses dictionary compression (incompatible with raw merging)
473    pub fn store_has_dict(&self) -> bool {
474        self.store.has_dict()
475    }
476
477    /// Get store reference for merge operations
478    pub fn store(&self) -> &super::store::AsyncStoreReader {
479        &self.store
480    }
481
482    /// Get raw store blocks for optimized merging
483    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
484        self.store.raw_blocks()
485    }
486
487    /// Get store data slice for raw block access
488    pub fn store_data_slice(&self) -> &FileHandle {
489        self.store.data_slice()
490    }
491
492    /// Get all terms from this segment (for merge)
493    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
494        self.term_dict.all_entries().await.map_err(Error::from)
495    }
496
497    /// Get all terms with parsed field and term string (for statistics aggregation)
498    ///
499    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
500    /// Skips terms that aren't valid UTF-8.
501    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
502        let entries = self.term_dict.all_entries().await?;
503        let mut result = Vec::with_capacity(entries.len());
504
505        for (key, term_info) in entries {
506            // Key format: field_id (4 bytes little-endian) + term bytes
507            if key.len() > 4 {
508                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
509                let term_bytes = &key[4..];
510                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
511                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
512                }
513            }
514        }
515
516        Ok(result)
517    }
518
519    /// Get streaming iterator over term dictionary (for memory-efficient merge)
520    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
521        self.term_dict.iter()
522    }
523
524    /// Prefetch all term dictionary blocks in a single bulk I/O call.
525    ///
526    /// Call before merge iteration to eliminate per-block cache misses.
527    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
528        self.term_dict
529            .prefetch_all_data_bulk()
530            .await
531            .map_err(crate::Error::from)
532    }
533
534    /// Read raw posting bytes at offset
535    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
536        let start = offset;
537        let end = start + len;
538        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
539        Ok(bytes.to_vec())
540    }
541
542    /// Read raw position bytes at offset (for merge)
543    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
544        let handle = match &self.positions_handle {
545            Some(h) => h,
546            None => return Ok(None),
547        };
548        let start = offset;
549        let end = start + len;
550        let bytes = handle.read_bytes_range(start..end).await?;
551        Ok(Some(bytes.to_vec()))
552    }
553
554    /// Check if this segment has a positions file
555    pub fn has_positions_file(&self) -> bool {
556        self.positions_handle.is_some()
557    }
558
559    /// Batch cosine scoring on raw quantized bytes.
560    ///
561    /// Dispatches to the appropriate SIMD scorer based on quantization type.
562    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
563    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
564    fn score_quantized_batch(
565        query: &[f32],
566        raw: &[u8],
567        quant: crate::dsl::DenseVectorQuantization,
568        dim: usize,
569        scores: &mut [f32],
570        unit_norm: bool,
571    ) {
572        use crate::dsl::DenseVectorQuantization;
573        use crate::structures::simd;
574        match (quant, unit_norm) {
575            (DenseVectorQuantization::F32, false) => {
576                let num_floats = scores.len() * dim;
577                debug_assert!(
578                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
579                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
580                );
581                let vectors: &[f32] =
582                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
583                simd::batch_cosine_scores(query, vectors, dim, scores);
584            }
585            (DenseVectorQuantization::F32, true) => {
586                let num_floats = scores.len() * dim;
587                debug_assert!(
588                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
589                    "f32 vector data not 4-byte aligned"
590                );
591                let vectors: &[f32] =
592                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
593                simd::batch_dot_scores(query, vectors, dim, scores);
594            }
595            (DenseVectorQuantization::F16, false) => {
596                simd::batch_cosine_scores_f16(query, raw, dim, scores);
597            }
598            (DenseVectorQuantization::F16, true) => {
599                simd::batch_dot_scores_f16(query, raw, dim, scores);
600            }
601            (DenseVectorQuantization::UInt8, false) => {
602                simd::batch_cosine_scores_u8(query, raw, dim, scores);
603            }
604            (DenseVectorQuantization::UInt8, true) => {
605                simd::batch_dot_scores_u8(query, raw, dim, scores);
606            }
607        }
608    }
609
610    /// Search dense vectors using RaBitQ
611    ///
612    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
613    /// Doc IDs are segment-local.
614    /// For multi-valued documents, scores are combined using the specified combiner.
615    pub async fn search_dense_vector(
616        &self,
617        field: Field,
618        query: &[f32],
619        k: usize,
620        nprobe: usize,
621        rerank_factor: f32,
622        combiner: crate::query::MultiValueCombiner,
623    ) -> Result<Vec<VectorSearchResult>> {
624        let ann_index = self.vector_indexes.get(&field.0);
625        let lazy_flat = self.flat_vectors.get(&field.0);
626
627        // No vectors at all for this field
628        if ann_index.is_none() && lazy_flat.is_none() {
629            return Ok(Vec::new());
630        }
631
632        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
633        let unit_norm = self
634            .schema
635            .get_field_entry(field)
636            .and_then(|e| e.dense_vector_config.as_ref())
637            .is_some_and(|c| c.unit_norm);
638
639        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
640        const BRUTE_FORCE_BATCH: usize = 4096;
641
642        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
643
644        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
645        let t0 = std::time::Instant::now();
646        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
647            // ANN search (RaBitQ, IVF, ScaNN)
648            match index {
649                VectorIndex::RaBitQ(lazy) => {
650                    let rabitq = lazy.get().ok_or_else(|| {
651                        Error::Schema("RaBitQ index deserialization failed".to_string())
652                    })?;
653                    rabitq
654                        .search(query, fetch_k)
655                        .into_iter()
656                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
657                        .collect()
658                }
659                VectorIndex::IVF(lazy) => {
660                    let (index, codebook) = lazy.get().ok_or_else(|| {
661                        Error::Schema("IVF index deserialization failed".to_string())
662                    })?;
663                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
664                        Error::Schema(format!(
665                            "IVF index requires coarse centroids for field {}",
666                            field.0
667                        ))
668                    })?;
669                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
670                    index
671                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
672                        .into_iter()
673                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
674                        .collect()
675                }
676                VectorIndex::ScaNN(lazy) => {
677                    let (index, codebook) = lazy.get().ok_or_else(|| {
678                        Error::Schema("ScaNN index deserialization failed".to_string())
679                    })?;
680                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
681                        Error::Schema(format!(
682                            "ScaNN index requires coarse centroids for field {}",
683                            field.0
684                        ))
685                    })?;
686                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
687                    index
688                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
689                        .into_iter()
690                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
691                        .collect()
692                }
693            }
694        } else if let Some(lazy_flat) = lazy_flat {
695            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
696            // Uses a top-k heap to avoid collecting and sorting all N candidates.
697            log::debug!(
698                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
699                field.0,
700                lazy_flat.num_vectors,
701                lazy_flat.dim,
702                lazy_flat.quantization
703            );
704            let dim = lazy_flat.dim;
705            let n = lazy_flat.num_vectors;
706            let quant = lazy_flat.quantization;
707            let mut collector = crate::query::ScoreCollector::new(fetch_k);
708            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
709
710            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
711                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
712                let batch_bytes = lazy_flat
713                    .read_vectors_batch(batch_start, batch_count)
714                    .await
715                    .map_err(crate::Error::Io)?;
716                let raw = batch_bytes.as_slice();
717
718                Self::score_quantized_batch(
719                    query,
720                    raw,
721                    quant,
722                    dim,
723                    &mut scores[..batch_count],
724                    unit_norm,
725                );
726
727                for (i, &score) in scores.iter().enumerate().take(batch_count) {
728                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
729                    collector.insert_with_ordinal(doc_id, score, ordinal);
730                }
731            }
732
733            collector
734                .into_sorted_results()
735                .into_iter()
736                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
737                .collect()
738        } else {
739            return Ok(Vec::new());
740        };
741        let l1_elapsed = t0.elapsed();
742        log::debug!(
743            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
744            field.0,
745            results.len(),
746            l1_elapsed.as_secs_f64() * 1000.0
747        );
748
749        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
750        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
751        if ann_index.is_some()
752            && !results.is_empty()
753            && let Some(lazy_flat) = lazy_flat
754        {
755            let t_rerank = std::time::Instant::now();
756            let dim = lazy_flat.dim;
757            let quant = lazy_flat.quantization;
758            let vbs = lazy_flat.vector_byte_size();
759
760            // Resolve flat indexes for each candidate via binary search
761            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
762            for (ri, c) in results.iter().enumerate() {
763                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
764                for (j, &(_, ord)) in entries.iter().enumerate() {
765                    if ord == c.1 {
766                        resolved.push((ri, start + j));
767                        break;
768                    }
769                }
770            }
771
772            let t_resolve = t_rerank.elapsed();
773            if !resolved.is_empty() {
774                // Sort by flat_idx for sequential mmap access (better page locality)
775                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
776
777                // Batch-read raw quantized bytes into contiguous buffer
778                let t_read = std::time::Instant::now();
779                let mut raw_buf = vec![0u8; resolved.len() * vbs];
780                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
781                    let _ = lazy_flat
782                        .read_vector_raw_into(
783                            flat_idx,
784                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
785                        )
786                        .await;
787                }
788
789                let read_elapsed = t_read.elapsed();
790
791                // Native-precision batch SIMD cosine scoring
792                let t_score = std::time::Instant::now();
793                let mut scores = vec![0f32; resolved.len()];
794                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
795                let score_elapsed = t_score.elapsed();
796
797                // Write scores back to results
798                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
799                    results[ri].2 = scores[buf_idx];
800                }
801
802                log::debug!(
803                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
804                    field.0,
805                    resolved.len(),
806                    dim,
807                    quant,
808                    vbs,
809                    t_resolve.as_secs_f64() * 1000.0,
810                    read_elapsed.as_secs_f64() * 1000.0,
811                    score_elapsed.as_secs_f64() * 1000.0,
812                );
813            }
814
815            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
816            results.truncate(fetch_k);
817            log::debug!(
818                "[search_dense] field {}: rerank total={:.1}ms",
819                field.0,
820                t_rerank.elapsed().as_secs_f64() * 1000.0
821            );
822        }
823
824        Ok(combine_ordinal_results(results, combiner, k))
825    }
826
827    /// Check if this segment has dense vectors for the given field
828    pub fn has_dense_vector_index(&self, field: Field) -> bool {
829        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
830    }
831
832    /// Get the dense vector index for a field (if available)
833    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
834        match self.vector_indexes.get(&field.0) {
835            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
836            _ => None,
837        }
838    }
839
840    /// Get the IVF vector index for a field (if available)
841    pub fn get_ivf_vector_index(
842        &self,
843        field: Field,
844    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
845        match self.vector_indexes.get(&field.0) {
846            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
847            _ => None,
848        }
849    }
850
851    /// Get coarse centroids for a field
852    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
853        self.coarse_centroids.get(&field_id)
854    }
855
856    /// Set per-field coarse centroids from index-level trained structures
857    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
858        self.coarse_centroids = centroids;
859    }
860
861    /// Get the ScaNN vector index for a field (if available)
862    pub fn get_scann_vector_index(
863        &self,
864        field: Field,
865    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
866        match self.vector_indexes.get(&field.0) {
867            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
868            _ => None,
869        }
870    }
871
872    /// Get the vector index type for a field
873    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
874        self.vector_indexes.get(&field.0)
875    }
876
877    /// Get positions for a term (for phrase queries)
878    ///
879    /// Position offsets are now embedded in TermInfo, so we first look up
880    /// the term to get its TermInfo, then use position_info() to get the offset.
881    pub async fn get_positions(
882        &self,
883        field: Field,
884        term: &[u8],
885    ) -> Result<Option<crate::structures::PositionPostingList>> {
886        // Get positions handle
887        let handle = match &self.positions_handle {
888            Some(h) => h,
889            None => return Ok(None),
890        };
891
892        // Build key: field_id + term
893        let mut key = Vec::with_capacity(4 + term.len());
894        key.extend_from_slice(&field.0.to_le_bytes());
895        key.extend_from_slice(term);
896
897        // Look up term in dictionary to get TermInfo with position offset
898        let term_info = match self.term_dict.get(&key).await? {
899            Some(info) => info,
900            None => return Ok(None),
901        };
902
903        // Get position offset from TermInfo
904        let (offset, length) = match term_info.position_info() {
905            Some((o, l)) => (o, l),
906            None => return Ok(None),
907        };
908
909        // Read the position data
910        let slice = handle.slice(offset..offset + length);
911        let data = slice.read_bytes().await?;
912
913        // Deserialize
914        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
915
916        Ok(Some(pos_list))
917    }
918
919    /// Check if positions are available for a field
920    pub fn has_positions(&self, field: Field) -> bool {
921        // Check schema for position mode on this field
922        if let Some(entry) = self.schema.get_field_entry(field) {
923            entry.positions.is_some()
924        } else {
925            false
926        }
927    }
928}
929
930// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
931#[cfg(feature = "sync")]
932impl SegmentReader {
933    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
934    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
935        // Build key: field_id + term
936        let mut key = Vec::with_capacity(4 + term.len());
937        key.extend_from_slice(&field.0.to_le_bytes());
938        key.extend_from_slice(term);
939
940        // Look up in term dictionary (sync)
941        let term_info = match self.term_dict.get_sync(&key)? {
942            Some(info) => info,
943            None => return Ok(None),
944        };
945
946        // Check if posting list is inlined
947        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
948            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
949            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
950                posting_list.push(doc_id, tf);
951            }
952            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
953            return Ok(Some(block_list));
954        }
955
956        // External posting list — sync range read
957        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
958            Error::Corruption("TermInfo has neither inline nor external data".to_string())
959        })?;
960
961        let start = posting_offset;
962        let end = start + posting_len;
963
964        if end > self.postings_handle.len() {
965            return Err(Error::Corruption(
966                "Posting offset out of bounds".to_string(),
967            ));
968        }
969
970        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
971        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
972
973        Ok(Some(block_list))
974    }
975
976    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
977    pub fn get_positions_sync(
978        &self,
979        field: Field,
980        term: &[u8],
981    ) -> Result<Option<crate::structures::PositionPostingList>> {
982        let handle = match &self.positions_handle {
983            Some(h) => h,
984            None => return Ok(None),
985        };
986
987        // Build key: field_id + term
988        let mut key = Vec::with_capacity(4 + term.len());
989        key.extend_from_slice(&field.0.to_le_bytes());
990        key.extend_from_slice(term);
991
992        // Look up term in dictionary (sync)
993        let term_info = match self.term_dict.get_sync(&key)? {
994            Some(info) => info,
995            None => return Ok(None),
996        };
997
998        let (offset, length) = match term_info.position_info() {
999            Some((o, l)) => (o, l),
1000            None => return Ok(None),
1001        };
1002
1003        let slice = handle.slice(offset..offset + length);
1004        let data = slice.read_bytes_sync()?;
1005
1006        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1007        Ok(Some(pos_list))
1008    }
1009
1010    /// Synchronous dense vector search — ANN indexes are already sync,
1011    /// brute-force uses sync mmap reads.
1012    pub fn search_dense_vector_sync(
1013        &self,
1014        field: Field,
1015        query: &[f32],
1016        k: usize,
1017        nprobe: usize,
1018        rerank_factor: f32,
1019        combiner: crate::query::MultiValueCombiner,
1020    ) -> Result<Vec<VectorSearchResult>> {
1021        let ann_index = self.vector_indexes.get(&field.0);
1022        let lazy_flat = self.flat_vectors.get(&field.0);
1023
1024        if ann_index.is_none() && lazy_flat.is_none() {
1025            return Ok(Vec::new());
1026        }
1027
1028        let unit_norm = self
1029            .schema
1030            .get_field_entry(field)
1031            .and_then(|e| e.dense_vector_config.as_ref())
1032            .is_some_and(|c| c.unit_norm);
1033
1034        const BRUTE_FORCE_BATCH: usize = 4096;
1035        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1036
1037        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1038            // ANN search (already sync)
1039            match index {
1040                VectorIndex::RaBitQ(lazy) => {
1041                    let rabitq = lazy.get().ok_or_else(|| {
1042                        Error::Schema("RaBitQ index deserialization failed".to_string())
1043                    })?;
1044                    rabitq
1045                        .search(query, fetch_k)
1046                        .into_iter()
1047                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1048                        .collect()
1049                }
1050                VectorIndex::IVF(lazy) => {
1051                    let (index, codebook) = lazy.get().ok_or_else(|| {
1052                        Error::Schema("IVF index deserialization failed".to_string())
1053                    })?;
1054                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1055                        Error::Schema(format!(
1056                            "IVF index requires coarse centroids for field {}",
1057                            field.0
1058                        ))
1059                    })?;
1060                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1061                    index
1062                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1063                        .into_iter()
1064                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1065                        .collect()
1066                }
1067                VectorIndex::ScaNN(lazy) => {
1068                    let (index, codebook) = lazy.get().ok_or_else(|| {
1069                        Error::Schema("ScaNN index deserialization failed".to_string())
1070                    })?;
1071                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1072                        Error::Schema(format!(
1073                            "ScaNN index requires coarse centroids for field {}",
1074                            field.0
1075                        ))
1076                    })?;
1077                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1078                    index
1079                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1080                        .into_iter()
1081                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1082                        .collect()
1083                }
1084            }
1085        } else if let Some(lazy_flat) = lazy_flat {
1086            // Batched brute-force (sync mmap reads)
1087            let dim = lazy_flat.dim;
1088            let n = lazy_flat.num_vectors;
1089            let quant = lazy_flat.quantization;
1090            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1091            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1092
1093            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1094                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1095                let batch_bytes = lazy_flat
1096                    .read_vectors_batch_sync(batch_start, batch_count)
1097                    .map_err(crate::Error::Io)?;
1098                let raw = batch_bytes.as_slice();
1099
1100                Self::score_quantized_batch(
1101                    query,
1102                    raw,
1103                    quant,
1104                    dim,
1105                    &mut scores[..batch_count],
1106                    unit_norm,
1107                );
1108
1109                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1110                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1111                    collector.insert_with_ordinal(doc_id, score, ordinal);
1112                }
1113            }
1114
1115            collector
1116                .into_sorted_results()
1117                .into_iter()
1118                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1119                .collect()
1120        } else {
1121            return Ok(Vec::new());
1122        };
1123
1124        // Rerank ANN candidates using raw vectors (sync)
1125        if ann_index.is_some()
1126            && !results.is_empty()
1127            && let Some(lazy_flat) = lazy_flat
1128        {
1129            let dim = lazy_flat.dim;
1130            let quant = lazy_flat.quantization;
1131            let vbs = lazy_flat.vector_byte_size();
1132
1133            let mut resolved: Vec<(usize, usize)> = Vec::new();
1134            for (ri, c) in results.iter().enumerate() {
1135                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1136                for (j, &(_, ord)) in entries.iter().enumerate() {
1137                    if ord == c.1 {
1138                        resolved.push((ri, start + j));
1139                        break;
1140                    }
1141                }
1142            }
1143
1144            if !resolved.is_empty() {
1145                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1146                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1147                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1148                    let _ = lazy_flat.read_vector_raw_into_sync(
1149                        flat_idx,
1150                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1151                    );
1152                }
1153
1154                let mut scores = vec![0f32; resolved.len()];
1155                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1156
1157                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1158                    results[ri].2 = scores[buf_idx];
1159                }
1160            }
1161
1162            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1163            results.truncate(fetch_k);
1164        }
1165
1166        Ok(combine_ordinal_results(results, combiner, k))
1167    }
1168}