Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Combine per-ordinal (doc_id, ordinal, score) triples into VectorSearchResults,
57/// applying the multi-value combiner, sorting by score desc, and truncating to `limit`.
58fn combine_ordinal_results(
59    raw: impl IntoIterator<Item = (u32, u16, f32)>,
60    combiner: crate::query::MultiValueCombiner,
61    limit: usize,
62) -> Vec<VectorSearchResult> {
63    let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
64        rustc_hash::FxHashMap::default();
65    for (doc_id, ordinal, score) in raw {
66        doc_ordinals
67            .entry(doc_id as DocId)
68            .or_default()
69            .push((ordinal as u32, score));
70    }
71    let mut results: Vec<VectorSearchResult> = doc_ordinals
72        .into_iter()
73        .map(|(doc_id, ordinals)| {
74            let combined_score = combiner.combine(&ordinals);
75            VectorSearchResult::new(doc_id, combined_score, ordinals)
76        })
77        .collect();
78    results.sort_by(|a, b| {
79        b.score
80            .partial_cmp(&a.score)
81            .unwrap_or(std::cmp::Ordering::Equal)
82    });
83    results.truncate(limit);
84    results
85}
86
87/// Async segment reader with lazy loading
88///
89/// - Term dictionary: only index loaded, blocks loaded on-demand
90/// - Postings: loaded on-demand per term via HTTP range requests
91/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
92pub struct SegmentReader {
93    meta: SegmentMeta,
94    /// Term dictionary with lazy block loading
95    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
96    /// Postings file handle - fetches ranges on demand
97    postings_handle: FileHandle,
98    /// Document store with lazy block loading
99    store: Arc<AsyncStoreReader>,
100    schema: Arc<Schema>,
101    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
102    vector_indexes: FxHashMap<u32, VectorIndex>,
103    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
104    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
105    /// Per-field coarse centroids for IVF/ScaNN search
106    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107    /// Sparse vector indexes per field
108    sparse_indexes: FxHashMap<u32, SparseIndex>,
109    /// Position file handle for phrase queries (lazy loading)
110    positions_handle: Option<FileHandle>,
111    /// Fast-field columnar readers per field_id
112    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
113}
114
115impl SegmentReader {
116    /// Open a segment with lazy loading
117    pub async fn open<D: Directory>(
118        dir: &D,
119        segment_id: SegmentId,
120        schema: Arc<Schema>,
121        cache_blocks: usize,
122    ) -> Result<Self> {
123        let files = SegmentFiles::new(segment_id.0);
124
125        // Read metadata (small, always loaded)
126        let meta_slice = dir.open_read(&files.meta).await?;
127        let meta_bytes = meta_slice.read_bytes().await?;
128        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
129        debug_assert_eq!(meta.id, segment_id.0);
130
131        // Open term dictionary with lazy loading (fetches ranges on demand)
132        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
133        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
134
135        // Get postings file handle (lazy - fetches ranges on demand)
136        let postings_handle = dir.open_lazy(&files.postings).await?;
137
138        // Open store with lazy loading
139        let store_handle = dir.open_lazy(&files.store).await?;
140        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
141
142        // Load dense vector indexes from unified .vectors file
143        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
144        let vector_indexes = vectors_data.indexes;
145        let flat_vectors = vectors_data.flat_vectors;
146
147        // Load sparse vector indexes from .sparse file
148        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
149
150        // Open positions file handle (if exists) - offsets are now in TermInfo
151        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
152
153        // Load fast-field columns from .fast file
154        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
155
156        // Log segment loading stats
157        {
158            let mut parts = vec![format!(
159                "[segment] loaded {:016x}: docs={}",
160                segment_id.0, meta.num_docs
161            )];
162            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
163                parts.push(format!(
164                    "dense: {} ann + {} flat fields",
165                    vector_indexes.len(),
166                    flat_vectors.len()
167                ));
168            }
169            for (field_id, idx) in &sparse_indexes {
170                parts.push(format!(
171                    "sparse field {}: {} dims, ~{:.1} KB",
172                    field_id,
173                    idx.num_dimensions(),
174                    idx.num_dimensions() as f64 * 24.0 / 1024.0
175                ));
176            }
177            if !fast_fields.is_empty() {
178                parts.push(format!("fast: {} fields", fast_fields.len()));
179            }
180            log::debug!("{}", parts.join(", "));
181        }
182
183        Ok(Self {
184            meta,
185            term_dict: Arc::new(term_dict),
186            postings_handle,
187            store: Arc::new(store),
188            schema,
189            vector_indexes,
190            flat_vectors,
191            coarse_centroids: FxHashMap::default(),
192            sparse_indexes,
193            positions_handle,
194            fast_fields,
195        })
196    }
197
198    pub fn meta(&self) -> &SegmentMeta {
199        &self.meta
200    }
201
202    pub fn num_docs(&self) -> u32 {
203        self.meta.num_docs
204    }
205
206    /// Get average field length for BM25F scoring
207    pub fn avg_field_len(&self, field: Field) -> f32 {
208        self.meta.avg_field_len(field)
209    }
210
211    pub fn schema(&self) -> &Schema {
212        &self.schema
213    }
214
215    /// Get sparse indexes for all fields
216    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
217        &self.sparse_indexes
218    }
219
220    /// Get vector indexes for all fields
221    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
222        &self.vector_indexes
223    }
224
225    /// Get lazy flat vectors for all fields (for reranking and merge)
226    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
227        &self.flat_vectors
228    }
229
230    /// Get a fast-field reader for a specific field.
231    pub fn fast_field(
232        &self,
233        field_id: u32,
234    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
235        self.fast_fields.get(&field_id)
236    }
237
238    /// Get all fast-field readers.
239    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
240        &self.fast_fields
241    }
242
243    /// Get term dictionary stats for debugging
244    pub fn term_dict_stats(&self) -> SSTableStats {
245        self.term_dict.stats()
246    }
247
248    /// Estimate memory usage of this segment reader
249    pub fn memory_stats(&self) -> SegmentMemoryStats {
250        let term_dict_stats = self.term_dict.stats();
251
252        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
253        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
254
255        // Store cache: similar estimate
256        let store_cache_bytes = self.store.cached_blocks() * 4096;
257
258        // Sparse index: SoA dim table + OwnedBytes skip section
259        let sparse_index_bytes: usize = self
260            .sparse_indexes
261            .values()
262            .map(|s| s.estimated_memory_bytes())
263            .sum();
264
265        // Dense index: vectors are memory-mapped, but we track index structures
266        // RaBitQ/IVF indexes have cluster assignments in memory
267        let dense_index_bytes: usize = self
268            .vector_indexes
269            .values()
270            .map(|v| v.estimated_memory_bytes())
271            .sum();
272
273        SegmentMemoryStats {
274            segment_id: self.meta.id,
275            num_docs: self.meta.num_docs,
276            term_dict_cache_bytes,
277            store_cache_bytes,
278            sparse_index_bytes,
279            dense_index_bytes,
280            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
281        }
282    }
283
284    /// Get posting list for a term (async - loads on demand)
285    ///
286    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
287    /// and no additional I/O is needed. For larger lists, reads from .post file.
288    pub async fn get_postings(
289        &self,
290        field: Field,
291        term: &[u8],
292    ) -> Result<Option<BlockPostingList>> {
293        log::debug!(
294            "SegmentReader::get_postings field={} term_len={}",
295            field.0,
296            term.len()
297        );
298
299        // Build key: field_id + term
300        let mut key = Vec::with_capacity(4 + term.len());
301        key.extend_from_slice(&field.0.to_le_bytes());
302        key.extend_from_slice(term);
303
304        // Look up in term dictionary
305        let term_info = match self.term_dict.get(&key).await? {
306            Some(info) => {
307                log::debug!("SegmentReader::get_postings found term_info");
308                info
309            }
310            None => {
311                log::debug!("SegmentReader::get_postings term not found");
312                return Ok(None);
313            }
314        };
315
316        // Check if posting list is inlined
317        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
318            // Build BlockPostingList from inline data (no I/O needed!)
319            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
320            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
321                posting_list.push(doc_id, tf);
322            }
323            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
324            return Ok(Some(block_list));
325        }
326
327        // External posting list - read from postings file handle (lazy - HTTP range request)
328        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
329            Error::Corruption("TermInfo has neither inline nor external data".to_string())
330        })?;
331
332        let start = posting_offset;
333        let end = start + posting_len;
334
335        if end > self.postings_handle.len() {
336            return Err(Error::Corruption(
337                "Posting offset out of bounds".to_string(),
338            ));
339        }
340
341        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
342        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
343
344        Ok(Some(block_list))
345    }
346
347    /// Get document by local doc_id (async - loads on demand).
348    ///
349    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
350    /// Uses binary search on sorted doc_ids for O(log N) lookup.
351    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
352        self.doc_with_fields(local_doc_id, None).await
353    }
354
355    /// Get document by local doc_id, hydrating only the specified fields.
356    ///
357    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
358    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
359    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
360    pub async fn doc_with_fields(
361        &self,
362        local_doc_id: DocId,
363        fields: Option<&rustc_hash::FxHashSet<u32>>,
364    ) -> Result<Option<Document>> {
365        let mut doc = match fields {
366            Some(set) => {
367                let field_ids: Vec<u32> = set.iter().copied().collect();
368                match self
369                    .store
370                    .get_fields(local_doc_id, &self.schema, &field_ids)
371                    .await
372                {
373                    Ok(Some(d)) => d,
374                    Ok(None) => return Ok(None),
375                    Err(e) => return Err(Error::from(e)),
376                }
377            }
378            None => match self.store.get(local_doc_id, &self.schema).await {
379                Ok(Some(d)) => d,
380                Ok(None) => return Ok(None),
381                Err(e) => return Err(Error::from(e)),
382            },
383        };
384
385        // Hydrate dense vector fields from flat vector data
386        for (&field_id, lazy_flat) in &self.flat_vectors {
387            // Skip vector fields not in the requested set
388            if let Some(set) = fields
389                && !set.contains(&field_id)
390            {
391                continue;
392            }
393
394            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
395            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
396                let flat_idx = start + j;
397                match lazy_flat.get_vector(flat_idx).await {
398                    Ok(vec) => {
399                        doc.add_dense_vector(Field(field_id), vec);
400                    }
401                    Err(e) => {
402                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
403                    }
404                }
405            }
406        }
407
408        Ok(Some(doc))
409    }
410
411    /// Prefetch term dictionary blocks for a key range
412    pub async fn prefetch_terms(
413        &self,
414        field: Field,
415        start_term: &[u8],
416        end_term: &[u8],
417    ) -> Result<()> {
418        let mut start_key = Vec::with_capacity(4 + start_term.len());
419        start_key.extend_from_slice(&field.0.to_le_bytes());
420        start_key.extend_from_slice(start_term);
421
422        let mut end_key = Vec::with_capacity(4 + end_term.len());
423        end_key.extend_from_slice(&field.0.to_le_bytes());
424        end_key.extend_from_slice(end_term);
425
426        self.term_dict.prefetch_range(&start_key, &end_key).await?;
427        Ok(())
428    }
429
430    /// Check if store uses dictionary compression (incompatible with raw merging)
431    pub fn store_has_dict(&self) -> bool {
432        self.store.has_dict()
433    }
434
435    /// Get store reference for merge operations
436    pub fn store(&self) -> &super::store::AsyncStoreReader {
437        &self.store
438    }
439
440    /// Get raw store blocks for optimized merging
441    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
442        self.store.raw_blocks()
443    }
444
445    /// Get store data slice for raw block access
446    pub fn store_data_slice(&self) -> &FileHandle {
447        self.store.data_slice()
448    }
449
450    /// Get all terms from this segment (for merge)
451    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
452        self.term_dict.all_entries().await.map_err(Error::from)
453    }
454
455    /// Get all terms with parsed field and term string (for statistics aggregation)
456    ///
457    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
458    /// Skips terms that aren't valid UTF-8.
459    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
460        let entries = self.term_dict.all_entries().await?;
461        let mut result = Vec::with_capacity(entries.len());
462
463        for (key, term_info) in entries {
464            // Key format: field_id (4 bytes little-endian) + term bytes
465            if key.len() > 4 {
466                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
467                let term_bytes = &key[4..];
468                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
469                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
470                }
471            }
472        }
473
474        Ok(result)
475    }
476
477    /// Get streaming iterator over term dictionary (for memory-efficient merge)
478    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
479        self.term_dict.iter()
480    }
481
482    /// Prefetch all term dictionary blocks in a single bulk I/O call.
483    ///
484    /// Call before merge iteration to eliminate per-block cache misses.
485    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
486        self.term_dict
487            .prefetch_all_data_bulk()
488            .await
489            .map_err(crate::Error::from)
490    }
491
492    /// Read raw posting bytes at offset
493    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
494        let start = offset;
495        let end = start + len;
496        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
497        Ok(bytes.to_vec())
498    }
499
500    /// Read raw position bytes at offset (for merge)
501    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
502        let handle = match &self.positions_handle {
503            Some(h) => h,
504            None => return Ok(None),
505        };
506        let start = offset;
507        let end = start + len;
508        let bytes = handle.read_bytes_range(start..end).await?;
509        Ok(Some(bytes.to_vec()))
510    }
511
512    /// Check if this segment has a positions file
513    pub fn has_positions_file(&self) -> bool {
514        self.positions_handle.is_some()
515    }
516
517    /// Batch cosine scoring on raw quantized bytes.
518    ///
519    /// Dispatches to the appropriate SIMD scorer based on quantization type.
520    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
521    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
522    fn score_quantized_batch(
523        query: &[f32],
524        raw: &[u8],
525        quant: crate::dsl::DenseVectorQuantization,
526        dim: usize,
527        scores: &mut [f32],
528        unit_norm: bool,
529    ) {
530        use crate::dsl::DenseVectorQuantization;
531        use crate::structures::simd;
532        match (quant, unit_norm) {
533            (DenseVectorQuantization::F32, false) => {
534                let num_floats = scores.len() * dim;
535                debug_assert!(
536                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
537                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
538                );
539                let vectors: &[f32] =
540                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
541                simd::batch_cosine_scores(query, vectors, dim, scores);
542            }
543            (DenseVectorQuantization::F32, true) => {
544                let num_floats = scores.len() * dim;
545                debug_assert!(
546                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
547                    "f32 vector data not 4-byte aligned"
548                );
549                let vectors: &[f32] =
550                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
551                simd::batch_dot_scores(query, vectors, dim, scores);
552            }
553            (DenseVectorQuantization::F16, false) => {
554                simd::batch_cosine_scores_f16(query, raw, dim, scores);
555            }
556            (DenseVectorQuantization::F16, true) => {
557                simd::batch_dot_scores_f16(query, raw, dim, scores);
558            }
559            (DenseVectorQuantization::UInt8, false) => {
560                simd::batch_cosine_scores_u8(query, raw, dim, scores);
561            }
562            (DenseVectorQuantization::UInt8, true) => {
563                simd::batch_dot_scores_u8(query, raw, dim, scores);
564            }
565        }
566    }
567
568    /// Search dense vectors using RaBitQ
569    ///
570    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
571    /// Doc IDs are segment-local.
572    /// For multi-valued documents, scores are combined using the specified combiner.
573    pub async fn search_dense_vector(
574        &self,
575        field: Field,
576        query: &[f32],
577        k: usize,
578        nprobe: usize,
579        rerank_factor: f32,
580        combiner: crate::query::MultiValueCombiner,
581    ) -> Result<Vec<VectorSearchResult>> {
582        let ann_index = self.vector_indexes.get(&field.0);
583        let lazy_flat = self.flat_vectors.get(&field.0);
584
585        // No vectors at all for this field
586        if ann_index.is_none() && lazy_flat.is_none() {
587            return Ok(Vec::new());
588        }
589
590        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
591        let unit_norm = self
592            .schema
593            .get_field_entry(field)
594            .and_then(|e| e.dense_vector_config.as_ref())
595            .is_some_and(|c| c.unit_norm);
596
597        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
598        const BRUTE_FORCE_BATCH: usize = 4096;
599
600        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
601
602        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
603        let t0 = std::time::Instant::now();
604        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
605            // ANN search (RaBitQ, IVF, ScaNN)
606            match index {
607                VectorIndex::RaBitQ(lazy) => {
608                    let rabitq = lazy.get().ok_or_else(|| {
609                        Error::Schema("RaBitQ index deserialization failed".to_string())
610                    })?;
611                    rabitq
612                        .search(query, fetch_k)
613                        .into_iter()
614                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
615                        .collect()
616                }
617                VectorIndex::IVF(lazy) => {
618                    let (index, codebook) = lazy.get().ok_or_else(|| {
619                        Error::Schema("IVF index deserialization failed".to_string())
620                    })?;
621                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
622                        Error::Schema(format!(
623                            "IVF index requires coarse centroids for field {}",
624                            field.0
625                        ))
626                    })?;
627                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
628                    index
629                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
630                        .into_iter()
631                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
632                        .collect()
633                }
634                VectorIndex::ScaNN(lazy) => {
635                    let (index, codebook) = lazy.get().ok_or_else(|| {
636                        Error::Schema("ScaNN index deserialization failed".to_string())
637                    })?;
638                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
639                        Error::Schema(format!(
640                            "ScaNN index requires coarse centroids for field {}",
641                            field.0
642                        ))
643                    })?;
644                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
645                    index
646                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
647                        .into_iter()
648                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
649                        .collect()
650                }
651            }
652        } else if let Some(lazy_flat) = lazy_flat {
653            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
654            // Uses a top-k heap to avoid collecting and sorting all N candidates.
655            log::debug!(
656                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
657                field.0,
658                lazy_flat.num_vectors,
659                lazy_flat.dim,
660                lazy_flat.quantization
661            );
662            let dim = lazy_flat.dim;
663            let n = lazy_flat.num_vectors;
664            let quant = lazy_flat.quantization;
665            let mut collector = crate::query::ScoreCollector::new(fetch_k);
666            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
667
668            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
669                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
670                let batch_bytes = lazy_flat
671                    .read_vectors_batch(batch_start, batch_count)
672                    .await
673                    .map_err(crate::Error::Io)?;
674                let raw = batch_bytes.as_slice();
675
676                Self::score_quantized_batch(
677                    query,
678                    raw,
679                    quant,
680                    dim,
681                    &mut scores[..batch_count],
682                    unit_norm,
683                );
684
685                for (i, &score) in scores.iter().enumerate().take(batch_count) {
686                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
687                    collector.insert_with_ordinal(doc_id, score, ordinal);
688                }
689            }
690
691            collector
692                .into_sorted_results()
693                .into_iter()
694                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
695                .collect()
696        } else {
697            return Ok(Vec::new());
698        };
699        let l1_elapsed = t0.elapsed();
700        log::debug!(
701            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
702            field.0,
703            results.len(),
704            l1_elapsed.as_secs_f64() * 1000.0
705        );
706
707        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
708        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
709        if ann_index.is_some()
710            && !results.is_empty()
711            && let Some(lazy_flat) = lazy_flat
712        {
713            let t_rerank = std::time::Instant::now();
714            let dim = lazy_flat.dim;
715            let quant = lazy_flat.quantization;
716            let vbs = lazy_flat.vector_byte_size();
717
718            // Resolve flat indexes for each candidate via binary search
719            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
720            for (ri, c) in results.iter().enumerate() {
721                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
722                for (j, &(_, ord)) in entries.iter().enumerate() {
723                    if ord == c.1 {
724                        resolved.push((ri, start + j));
725                        break;
726                    }
727                }
728            }
729
730            let t_resolve = t_rerank.elapsed();
731            if !resolved.is_empty() {
732                // Sort by flat_idx for sequential mmap access (better page locality)
733                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
734
735                // Batch-read raw quantized bytes into contiguous buffer
736                let t_read = std::time::Instant::now();
737                let mut raw_buf = vec![0u8; resolved.len() * vbs];
738                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
739                    let _ = lazy_flat
740                        .read_vector_raw_into(
741                            flat_idx,
742                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
743                        )
744                        .await;
745                }
746
747                let read_elapsed = t_read.elapsed();
748
749                // Native-precision batch SIMD cosine scoring
750                let t_score = std::time::Instant::now();
751                let mut scores = vec![0f32; resolved.len()];
752                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
753                let score_elapsed = t_score.elapsed();
754
755                // Write scores back to results
756                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
757                    results[ri].2 = scores[buf_idx];
758                }
759
760                log::debug!(
761                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
762                    field.0,
763                    resolved.len(),
764                    dim,
765                    quant,
766                    vbs,
767                    t_resolve.as_secs_f64() * 1000.0,
768                    read_elapsed.as_secs_f64() * 1000.0,
769                    score_elapsed.as_secs_f64() * 1000.0,
770                );
771            }
772
773            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
774            results.truncate(fetch_k);
775            log::debug!(
776                "[search_dense] field {}: rerank total={:.1}ms",
777                field.0,
778                t_rerank.elapsed().as_secs_f64() * 1000.0
779            );
780        }
781
782        Ok(combine_ordinal_results(results, combiner, k))
783    }
784
785    /// Check if this segment has dense vectors for the given field
786    pub fn has_dense_vector_index(&self, field: Field) -> bool {
787        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
788    }
789
790    /// Get the dense vector index for a field (if available)
791    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
792        match self.vector_indexes.get(&field.0) {
793            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
794            _ => None,
795        }
796    }
797
798    /// Get the IVF vector index for a field (if available)
799    pub fn get_ivf_vector_index(
800        &self,
801        field: Field,
802    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
803        match self.vector_indexes.get(&field.0) {
804            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
805            _ => None,
806        }
807    }
808
809    /// Get coarse centroids for a field
810    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
811        self.coarse_centroids.get(&field_id)
812    }
813
814    /// Set per-field coarse centroids from index-level trained structures
815    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
816        self.coarse_centroids = centroids;
817    }
818
819    /// Get the ScaNN vector index for a field (if available)
820    pub fn get_scann_vector_index(
821        &self,
822        field: Field,
823    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
824        match self.vector_indexes.get(&field.0) {
825            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
826            _ => None,
827        }
828    }
829
830    /// Get the vector index type for a field
831    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
832        self.vector_indexes.get(&field.0)
833    }
834
835    /// Search for similar sparse vectors using dedicated sparse posting lists
836    ///
837    /// Uses `BmpExecutor` or `MaxScoreExecutor` for efficient top-k retrieval.
838    /// Optimizations:
839    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
840    /// 2. **Block-max pruning**: Skips blocks where max contribution < threshold
841    /// 3. **Top-K heap**: Efficient score collection
842    ///
843    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
844    pub async fn search_sparse_vector(
845        &self,
846        field: Field,
847        vector: &[(u32, f32)],
848        limit: usize,
849        combiner: crate::query::MultiValueCombiner,
850        heap_factor: f32,
851    ) -> Result<Vec<VectorSearchResult>> {
852        use crate::query::BmpExecutor;
853
854        let query_tokens = vector.len();
855
856        // Get sparse index for this field
857        let sparse_index = match self.sparse_indexes.get(&field.0) {
858            Some(idx) => idx,
859            None => {
860                log::debug!(
861                    "Sparse vector search: no index for field {}, returning empty",
862                    field.0
863                );
864                return Ok(Vec::new());
865            }
866        };
867
868        let index_dimensions = sparse_index.num_dimensions();
869
870        // Filter query terms to only those present in the index
871        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
872        let mut missing_count = 0usize;
873
874        for &(dim_id, query_weight) in vector {
875            if sparse_index.has_dimension(dim_id) {
876                matched_terms.push((dim_id, query_weight));
877            } else {
878                missing_count += 1;
879            }
880        }
881
882        log::debug!(
883            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
884            query_tokens,
885            matched_terms.len(),
886            missing_count,
887            index_dimensions
888        );
889
890        if matched_terms.is_empty() {
891            log::debug!("Sparse vector search: no matching tokens, returning empty");
892            return Ok(Vec::new());
893        }
894
895        // Select executor based on number of query terms:
896        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
897        // - 1-11 terms: MaxScoreExecutor (cursor-based DAAT + lazy block loading)
898        let num_terms = matched_terms.len();
899        let over_fetch = limit * 2; // Over-fetch for multi-value combining
900        let raw_results = if num_terms > 12 {
901            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
902            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
903                .execute()
904                .await?
905        } else {
906            // Block-Max MaxScore: skip entries drive navigation, blocks loaded on-demand.
907            crate::query::MaxScoreExecutor::sparse(
908                sparse_index,
909                matched_terms,
910                over_fetch,
911                heap_factor,
912            )
913            .execute()
914            .await?
915        };
916
917        log::trace!(
918            "Sparse search returned {} raw results for segment {:016x}",
919            raw_results.len(),
920            self.meta.id
921        );
922        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
923            for r in raw_results.iter().take(5) {
924                log::trace!(
925                    "  Raw result: doc_id={}, score={:.4}, ordinal={}",
926                    r.doc_id,
927                    r.score,
928                    r.ordinal
929                );
930            }
931        }
932
933        Ok(combine_ordinal_results(
934            raw_results
935                .into_iter()
936                .map(|r| (r.doc_id, r.ordinal, r.score)),
937            combiner,
938            limit,
939        ))
940    }
941
942    /// Get positions for a term (for phrase queries)
943    ///
944    /// Position offsets are now embedded in TermInfo, so we first look up
945    /// the term to get its TermInfo, then use position_info() to get the offset.
946    pub async fn get_positions(
947        &self,
948        field: Field,
949        term: &[u8],
950    ) -> Result<Option<crate::structures::PositionPostingList>> {
951        // Get positions handle
952        let handle = match &self.positions_handle {
953            Some(h) => h,
954            None => return Ok(None),
955        };
956
957        // Build key: field_id + term
958        let mut key = Vec::with_capacity(4 + term.len());
959        key.extend_from_slice(&field.0.to_le_bytes());
960        key.extend_from_slice(term);
961
962        // Look up term in dictionary to get TermInfo with position offset
963        let term_info = match self.term_dict.get(&key).await? {
964            Some(info) => info,
965            None => return Ok(None),
966        };
967
968        // Get position offset from TermInfo
969        let (offset, length) = match term_info.position_info() {
970            Some((o, l)) => (o, l),
971            None => return Ok(None),
972        };
973
974        // Read the position data
975        let slice = handle.slice(offset..offset + length);
976        let data = slice.read_bytes().await?;
977
978        // Deserialize
979        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
980
981        Ok(Some(pos_list))
982    }
983
984    /// Check if positions are available for a field
985    pub fn has_positions(&self, field: Field) -> bool {
986        // Check schema for position mode on this field
987        if let Some(entry) = self.schema.get_field_entry(field) {
988            entry.positions.is_some()
989        } else {
990            false
991        }
992    }
993}
994
995// ── Synchronous search methods (mmap/RAM only) ─────────────────────────────
996#[cfg(feature = "sync")]
997impl SegmentReader {
998    /// Synchronous posting list lookup — requires Inline (mmap/RAM) file handles.
999    pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1000        // Build key: field_id + term
1001        let mut key = Vec::with_capacity(4 + term.len());
1002        key.extend_from_slice(&field.0.to_le_bytes());
1003        key.extend_from_slice(term);
1004
1005        // Look up in term dictionary (sync)
1006        let term_info = match self.term_dict.get_sync(&key)? {
1007            Some(info) => info,
1008            None => return Ok(None),
1009        };
1010
1011        // Check if posting list is inlined
1012        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1013            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1014            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1015                posting_list.push(doc_id, tf);
1016            }
1017            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1018            return Ok(Some(block_list));
1019        }
1020
1021        // External posting list — sync range read
1022        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1023            Error::Corruption("TermInfo has neither inline nor external data".to_string())
1024        })?;
1025
1026        let start = posting_offset;
1027        let end = start + posting_len;
1028
1029        if end > self.postings_handle.len() {
1030            return Err(Error::Corruption(
1031                "Posting offset out of bounds".to_string(),
1032            ));
1033        }
1034
1035        let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1036        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1037
1038        Ok(Some(block_list))
1039    }
1040
1041    /// Synchronous position list lookup — requires Inline (mmap/RAM) file handles.
1042    pub fn get_positions_sync(
1043        &self,
1044        field: Field,
1045        term: &[u8],
1046    ) -> Result<Option<crate::structures::PositionPostingList>> {
1047        let handle = match &self.positions_handle {
1048            Some(h) => h,
1049            None => return Ok(None),
1050        };
1051
1052        // Build key: field_id + term
1053        let mut key = Vec::with_capacity(4 + term.len());
1054        key.extend_from_slice(&field.0.to_le_bytes());
1055        key.extend_from_slice(term);
1056
1057        // Look up term in dictionary (sync)
1058        let term_info = match self.term_dict.get_sync(&key)? {
1059            Some(info) => info,
1060            None => return Ok(None),
1061        };
1062
1063        let (offset, length) = match term_info.position_info() {
1064            Some((o, l)) => (o, l),
1065            None => return Ok(None),
1066        };
1067
1068        let slice = handle.slice(offset..offset + length);
1069        let data = slice.read_bytes_sync()?;
1070
1071        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1072        Ok(Some(pos_list))
1073    }
1074
1075    /// Synchronous dense vector search — ANN indexes are already sync,
1076    /// brute-force uses sync mmap reads.
1077    pub fn search_dense_vector_sync(
1078        &self,
1079        field: Field,
1080        query: &[f32],
1081        k: usize,
1082        nprobe: usize,
1083        rerank_factor: f32,
1084        combiner: crate::query::MultiValueCombiner,
1085    ) -> Result<Vec<VectorSearchResult>> {
1086        let ann_index = self.vector_indexes.get(&field.0);
1087        let lazy_flat = self.flat_vectors.get(&field.0);
1088
1089        if ann_index.is_none() && lazy_flat.is_none() {
1090            return Ok(Vec::new());
1091        }
1092
1093        let unit_norm = self
1094            .schema
1095            .get_field_entry(field)
1096            .and_then(|e| e.dense_vector_config.as_ref())
1097            .is_some_and(|c| c.unit_norm);
1098
1099        const BRUTE_FORCE_BATCH: usize = 4096;
1100        let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1101
1102        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1103            // ANN search (already sync)
1104            match index {
1105                VectorIndex::RaBitQ(lazy) => {
1106                    let rabitq = lazy.get().ok_or_else(|| {
1107                        Error::Schema("RaBitQ index deserialization failed".to_string())
1108                    })?;
1109                    rabitq
1110                        .search(query, fetch_k)
1111                        .into_iter()
1112                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1113                        .collect()
1114                }
1115                VectorIndex::IVF(lazy) => {
1116                    let (index, codebook) = lazy.get().ok_or_else(|| {
1117                        Error::Schema("IVF index deserialization failed".to_string())
1118                    })?;
1119                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1120                        Error::Schema(format!(
1121                            "IVF index requires coarse centroids for field {}",
1122                            field.0
1123                        ))
1124                    })?;
1125                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1126                    index
1127                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1128                        .into_iter()
1129                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1130                        .collect()
1131                }
1132                VectorIndex::ScaNN(lazy) => {
1133                    let (index, codebook) = lazy.get().ok_or_else(|| {
1134                        Error::Schema("ScaNN index deserialization failed".to_string())
1135                    })?;
1136                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1137                        Error::Schema(format!(
1138                            "ScaNN index requires coarse centroids for field {}",
1139                            field.0
1140                        ))
1141                    })?;
1142                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1143                    index
1144                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1145                        .into_iter()
1146                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1147                        .collect()
1148                }
1149            }
1150        } else if let Some(lazy_flat) = lazy_flat {
1151            // Batched brute-force (sync mmap reads)
1152            let dim = lazy_flat.dim;
1153            let n = lazy_flat.num_vectors;
1154            let quant = lazy_flat.quantization;
1155            let mut collector = crate::query::ScoreCollector::new(fetch_k);
1156            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1157
1158            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1159                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1160                let batch_bytes = lazy_flat
1161                    .read_vectors_batch_sync(batch_start, batch_count)
1162                    .map_err(crate::Error::Io)?;
1163                let raw = batch_bytes.as_slice();
1164
1165                Self::score_quantized_batch(
1166                    query,
1167                    raw,
1168                    quant,
1169                    dim,
1170                    &mut scores[..batch_count],
1171                    unit_norm,
1172                );
1173
1174                for (i, &score) in scores.iter().enumerate().take(batch_count) {
1175                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1176                    collector.insert_with_ordinal(doc_id, score, ordinal);
1177                }
1178            }
1179
1180            collector
1181                .into_sorted_results()
1182                .into_iter()
1183                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1184                .collect()
1185        } else {
1186            return Ok(Vec::new());
1187        };
1188
1189        // Rerank ANN candidates using raw vectors (sync)
1190        if ann_index.is_some()
1191            && !results.is_empty()
1192            && let Some(lazy_flat) = lazy_flat
1193        {
1194            let dim = lazy_flat.dim;
1195            let quant = lazy_flat.quantization;
1196            let vbs = lazy_flat.vector_byte_size();
1197
1198            let mut resolved: Vec<(usize, usize)> = Vec::new();
1199            for (ri, c) in results.iter().enumerate() {
1200                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1201                for (j, &(_, ord)) in entries.iter().enumerate() {
1202                    if ord == c.1 {
1203                        resolved.push((ri, start + j));
1204                        break;
1205                    }
1206                }
1207            }
1208
1209            if !resolved.is_empty() {
1210                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1211                let mut raw_buf = vec![0u8; resolved.len() * vbs];
1212                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1213                    let _ = lazy_flat.read_vector_raw_into_sync(
1214                        flat_idx,
1215                        &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1216                    );
1217                }
1218
1219                let mut scores = vec![0f32; resolved.len()];
1220                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1221
1222                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1223                    results[ri].2 = scores[buf_idx];
1224                }
1225            }
1226
1227            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1228            results.truncate(fetch_k);
1229        }
1230
1231        Ok(combine_ordinal_results(results, combiner, k))
1232    }
1233
1234    /// Synchronous sparse vector search.
1235    pub fn search_sparse_vector_sync(
1236        &self,
1237        field: Field,
1238        vector: &[(u32, f32)],
1239        limit: usize,
1240        combiner: crate::query::MultiValueCombiner,
1241        heap_factor: f32,
1242    ) -> Result<Vec<VectorSearchResult>> {
1243        use crate::query::MaxScoreExecutor;
1244
1245        let sparse_index = match self.sparse_indexes.get(&field.0) {
1246            Some(idx) => idx,
1247            None => return Ok(Vec::new()),
1248        };
1249
1250        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
1251        for &(dim_id, query_weight) in vector {
1252            if sparse_index.has_dimension(dim_id) {
1253                matched_terms.push((dim_id, query_weight));
1254            }
1255        }
1256
1257        if matched_terms.is_empty() {
1258            return Ok(Vec::new());
1259        }
1260
1261        let num_terms = matched_terms.len();
1262        let over_fetch = limit * 2;
1263        let raw_results = if num_terms > 12 {
1264            crate::query::BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
1265                .execute_sync()?
1266        } else {
1267            MaxScoreExecutor::sparse(sparse_index, matched_terms, over_fetch, heap_factor)
1268                .execute_sync()?
1269        };
1270
1271        Ok(combine_ordinal_results(
1272            raw_results
1273                .into_iter()
1274                .map(|r| (r.doc_id, r.ordinal, r.score)),
1275            combiner,
1276            limit,
1277        ))
1278    }
1279}