Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: FileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Per-field coarse centroids for IVF/ScaNN search
77    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<FileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116
117        // Load sparse vector indexes from .sparse file
118        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120        // Open positions file handle (if exists) - offsets are now in TermInfo
121        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123        // Log segment loading stats
124        {
125            let mut parts = vec![format!(
126                "[segment] loaded {:016x}: docs={}",
127                segment_id.0, meta.num_docs
128            )];
129            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
130                parts.push(format!(
131                    "dense: {} ann + {} flat fields",
132                    vector_indexes.len(),
133                    flat_vectors.len()
134                ));
135            }
136            for (field_id, idx) in &sparse_indexes {
137                parts.push(format!(
138                    "sparse field {}: {} dims, ~{:.1} KB",
139                    field_id,
140                    idx.num_dimensions(),
141                    idx.num_dimensions() as f64 * 24.0 / 1024.0
142                ));
143            }
144            log::debug!("{}", parts.join(", "));
145        }
146
147        Ok(Self {
148            meta,
149            term_dict: Arc::new(term_dict),
150            postings_handle,
151            store: Arc::new(store),
152            schema,
153            doc_id_offset,
154            vector_indexes,
155            flat_vectors,
156            coarse_centroids: FxHashMap::default(),
157            sparse_indexes,
158            positions_handle,
159        })
160    }
161
162    pub fn meta(&self) -> &SegmentMeta {
163        &self.meta
164    }
165
166    pub fn num_docs(&self) -> u32 {
167        self.meta.num_docs
168    }
169
170    /// Get average field length for BM25F scoring
171    pub fn avg_field_len(&self, field: Field) -> f32 {
172        self.meta.avg_field_len(field)
173    }
174
175    pub fn doc_id_offset(&self) -> DocId {
176        self.doc_id_offset
177    }
178
179    /// Set the doc_id_offset (used for parallel segment loading)
180    pub fn set_doc_id_offset(&mut self, offset: DocId) {
181        self.doc_id_offset = offset;
182    }
183
184    pub fn schema(&self) -> &Schema {
185        &self.schema
186    }
187
188    /// Get sparse indexes for all fields
189    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
190        &self.sparse_indexes
191    }
192
193    /// Get vector indexes for all fields
194    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
195        &self.vector_indexes
196    }
197
198    /// Get lazy flat vectors for all fields (for reranking and merge)
199    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
200        &self.flat_vectors
201    }
202
203    /// Get term dictionary stats for debugging
204    pub fn term_dict_stats(&self) -> SSTableStats {
205        self.term_dict.stats()
206    }
207
208    /// Estimate memory usage of this segment reader
209    pub fn memory_stats(&self) -> SegmentMemoryStats {
210        let term_dict_stats = self.term_dict.stats();
211
212        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
213        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
214
215        // Store cache: similar estimate
216        let store_cache_bytes = self.store.cached_blocks() * 4096;
217
218        // Sparse index: SoA dim table + OwnedBytes skip section
219        let sparse_index_bytes: usize = self
220            .sparse_indexes
221            .values()
222            .map(|s| s.estimated_memory_bytes())
223            .sum();
224
225        // Dense index: vectors are memory-mapped, but we track index structures
226        // RaBitQ/IVF indexes have cluster assignments in memory
227        let dense_index_bytes: usize = self
228            .vector_indexes
229            .values()
230            .map(|v| v.estimated_memory_bytes())
231            .sum();
232
233        SegmentMemoryStats {
234            segment_id: self.meta.id,
235            num_docs: self.meta.num_docs,
236            term_dict_cache_bytes,
237            store_cache_bytes,
238            sparse_index_bytes,
239            dense_index_bytes,
240            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
241        }
242    }
243
244    /// Get posting list for a term (async - loads on demand)
245    ///
246    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
247    /// and no additional I/O is needed. For larger lists, reads from .post file.
248    pub async fn get_postings(
249        &self,
250        field: Field,
251        term: &[u8],
252    ) -> Result<Option<BlockPostingList>> {
253        log::debug!(
254            "SegmentReader::get_postings field={} term_len={}",
255            field.0,
256            term.len()
257        );
258
259        // Build key: field_id + term
260        let mut key = Vec::with_capacity(4 + term.len());
261        key.extend_from_slice(&field.0.to_le_bytes());
262        key.extend_from_slice(term);
263
264        // Look up in term dictionary
265        let term_info = match self.term_dict.get(&key).await? {
266            Some(info) => {
267                log::debug!("SegmentReader::get_postings found term_info");
268                info
269            }
270            None => {
271                log::debug!("SegmentReader::get_postings term not found");
272                return Ok(None);
273            }
274        };
275
276        // Check if posting list is inlined
277        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
278            // Build BlockPostingList from inline data (no I/O needed!)
279            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
280            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
281                posting_list.push(doc_id, tf);
282            }
283            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
284            return Ok(Some(block_list));
285        }
286
287        // External posting list - read from postings file handle (lazy - HTTP range request)
288        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
289            Error::Corruption("TermInfo has neither inline nor external data".to_string())
290        })?;
291
292        let start = posting_offset;
293        let end = start + posting_len;
294
295        if end > self.postings_handle.len() {
296            return Err(Error::Corruption(
297                "Posting offset out of bounds".to_string(),
298            ));
299        }
300
301        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
302        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
303
304        Ok(Some(block_list))
305    }
306
307    /// Get document by local doc_id (async - loads on demand).
308    ///
309    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
310    /// Uses binary search on sorted doc_ids for O(log N) lookup.
311    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
312        self.doc_with_fields(local_doc_id, None).await
313    }
314
315    /// Get document by local doc_id, hydrating only the specified fields.
316    ///
317    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
318    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
319    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
320    pub async fn doc_with_fields(
321        &self,
322        local_doc_id: DocId,
323        fields: Option<&rustc_hash::FxHashSet<u32>>,
324    ) -> Result<Option<Document>> {
325        let mut doc = match fields {
326            Some(set) => {
327                let field_ids: Vec<u32> = set.iter().copied().collect();
328                match self
329                    .store
330                    .get_fields(local_doc_id, &self.schema, &field_ids)
331                    .await
332                {
333                    Ok(Some(d)) => d,
334                    Ok(None) => return Ok(None),
335                    Err(e) => return Err(Error::from(e)),
336                }
337            }
338            None => match self.store.get(local_doc_id, &self.schema).await {
339                Ok(Some(d)) => d,
340                Ok(None) => return Ok(None),
341                Err(e) => return Err(Error::from(e)),
342            },
343        };
344
345        // Hydrate dense vector fields from flat vector data
346        for (&field_id, lazy_flat) in &self.flat_vectors {
347            // Skip vector fields not in the requested set
348            if let Some(set) = fields
349                && !set.contains(&field_id)
350            {
351                continue;
352            }
353
354            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
355            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
356                let flat_idx = start + j;
357                match lazy_flat.get_vector(flat_idx).await {
358                    Ok(vec) => {
359                        doc.add_dense_vector(Field(field_id), vec);
360                    }
361                    Err(e) => {
362                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
363                    }
364                }
365            }
366        }
367
368        Ok(Some(doc))
369    }
370
371    /// Prefetch term dictionary blocks for a key range
372    pub async fn prefetch_terms(
373        &self,
374        field: Field,
375        start_term: &[u8],
376        end_term: &[u8],
377    ) -> Result<()> {
378        let mut start_key = Vec::with_capacity(4 + start_term.len());
379        start_key.extend_from_slice(&field.0.to_le_bytes());
380        start_key.extend_from_slice(start_term);
381
382        let mut end_key = Vec::with_capacity(4 + end_term.len());
383        end_key.extend_from_slice(&field.0.to_le_bytes());
384        end_key.extend_from_slice(end_term);
385
386        self.term_dict.prefetch_range(&start_key, &end_key).await?;
387        Ok(())
388    }
389
390    /// Check if store uses dictionary compression (incompatible with raw merging)
391    pub fn store_has_dict(&self) -> bool {
392        self.store.has_dict()
393    }
394
395    /// Get store reference for merge operations
396    pub fn store(&self) -> &super::store::AsyncStoreReader {
397        &self.store
398    }
399
400    /// Get raw store blocks for optimized merging
401    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
402        self.store.raw_blocks()
403    }
404
405    /// Get store data slice for raw block access
406    pub fn store_data_slice(&self) -> &FileHandle {
407        self.store.data_slice()
408    }
409
410    /// Get all terms from this segment (for merge)
411    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
412        self.term_dict.all_entries().await.map_err(Error::from)
413    }
414
415    /// Get all terms with parsed field and term string (for statistics aggregation)
416    ///
417    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
418    /// Skips terms that aren't valid UTF-8.
419    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
420        let entries = self.term_dict.all_entries().await?;
421        let mut result = Vec::with_capacity(entries.len());
422
423        for (key, term_info) in entries {
424            // Key format: field_id (4 bytes little-endian) + term bytes
425            if key.len() > 4 {
426                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
427                let term_bytes = &key[4..];
428                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
429                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
430                }
431            }
432        }
433
434        Ok(result)
435    }
436
437    /// Get streaming iterator over term dictionary (for memory-efficient merge)
438    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
439        self.term_dict.iter()
440    }
441
442    /// Prefetch all term dictionary blocks in a single bulk I/O call.
443    ///
444    /// Call before merge iteration to eliminate per-block cache misses.
445    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
446        self.term_dict
447            .prefetch_all_data_bulk()
448            .await
449            .map_err(crate::Error::from)
450    }
451
452    /// Read raw posting bytes at offset
453    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
454        let start = offset;
455        let end = start + len;
456        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
457        Ok(bytes.to_vec())
458    }
459
460    /// Read raw position bytes at offset (for merge)
461    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
462        let handle = match &self.positions_handle {
463            Some(h) => h,
464            None => return Ok(None),
465        };
466        let start = offset;
467        let end = start + len;
468        let bytes = handle.read_bytes_range(start..end).await?;
469        Ok(Some(bytes.to_vec()))
470    }
471
472    /// Check if this segment has a positions file
473    pub fn has_positions_file(&self) -> bool {
474        self.positions_handle.is_some()
475    }
476
477    /// Batch cosine scoring on raw quantized bytes.
478    ///
479    /// Dispatches to the appropriate SIMD scorer based on quantization type.
480    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
481    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
482    fn score_quantized_batch(
483        query: &[f32],
484        raw: &[u8],
485        quant: crate::dsl::DenseVectorQuantization,
486        dim: usize,
487        scores: &mut [f32],
488        unit_norm: bool,
489    ) {
490        use crate::dsl::DenseVectorQuantization;
491        use crate::structures::simd;
492        match (quant, unit_norm) {
493            (DenseVectorQuantization::F32, false) => {
494                let num_floats = scores.len() * dim;
495                debug_assert!(
496                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
497                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
498                );
499                let vectors: &[f32] =
500                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
501                simd::batch_cosine_scores(query, vectors, dim, scores);
502            }
503            (DenseVectorQuantization::F32, true) => {
504                let num_floats = scores.len() * dim;
505                debug_assert!(
506                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
507                    "f32 vector data not 4-byte aligned"
508                );
509                let vectors: &[f32] =
510                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
511                simd::batch_dot_scores(query, vectors, dim, scores);
512            }
513            (DenseVectorQuantization::F16, false) => {
514                simd::batch_cosine_scores_f16(query, raw, dim, scores);
515            }
516            (DenseVectorQuantization::F16, true) => {
517                simd::batch_dot_scores_f16(query, raw, dim, scores);
518            }
519            (DenseVectorQuantization::UInt8, false) => {
520                simd::batch_cosine_scores_u8(query, raw, dim, scores);
521            }
522            (DenseVectorQuantization::UInt8, true) => {
523                simd::batch_dot_scores_u8(query, raw, dim, scores);
524            }
525        }
526    }
527
528    /// Search dense vectors using RaBitQ
529    ///
530    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
531    /// The doc_ids are adjusted by doc_id_offset for this segment.
532    /// For multi-valued documents, scores are combined using the specified combiner.
533    pub async fn search_dense_vector(
534        &self,
535        field: Field,
536        query: &[f32],
537        k: usize,
538        nprobe: usize,
539        rerank_factor: usize,
540        combiner: crate::query::MultiValueCombiner,
541    ) -> Result<Vec<VectorSearchResult>> {
542        let ann_index = self.vector_indexes.get(&field.0);
543        let lazy_flat = self.flat_vectors.get(&field.0);
544
545        // No vectors at all for this field
546        if ann_index.is_none() && lazy_flat.is_none() {
547            return Ok(Vec::new());
548        }
549
550        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
551        let unit_norm = self
552            .schema
553            .get_field_entry(field)
554            .and_then(|e| e.dense_vector_config.as_ref())
555            .is_some_and(|c| c.unit_norm);
556
557        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
558        const BRUTE_FORCE_BATCH: usize = 4096;
559
560        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
561        let t0 = std::time::Instant::now();
562        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
563            // ANN search (RaBitQ, IVF, ScaNN)
564            match index {
565                VectorIndex::RaBitQ(lazy) => {
566                    let rabitq = lazy.get().ok_or_else(|| {
567                        Error::Schema("RaBitQ index deserialization failed".to_string())
568                    })?;
569                    let fetch_k = k * rerank_factor.max(1);
570                    rabitq
571                        .search(query, fetch_k, rerank_factor)
572                        .into_iter()
573                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
574                        .collect()
575                }
576                VectorIndex::IVF(lazy) => {
577                    let (index, codebook) = lazy.get().ok_or_else(|| {
578                        Error::Schema("IVF index deserialization failed".to_string())
579                    })?;
580                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
581                        Error::Schema(format!(
582                            "IVF index requires coarse centroids for field {}",
583                            field.0
584                        ))
585                    })?;
586                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
587                    let fetch_k = k * rerank_factor.max(1);
588                    index
589                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
590                        .into_iter()
591                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
592                        .collect()
593                }
594                VectorIndex::ScaNN(lazy) => {
595                    let (index, codebook) = lazy.get().ok_or_else(|| {
596                        Error::Schema("ScaNN index deserialization failed".to_string())
597                    })?;
598                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
599                        Error::Schema(format!(
600                            "ScaNN index requires coarse centroids for field {}",
601                            field.0
602                        ))
603                    })?;
604                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
605                    let fetch_k = k * rerank_factor.max(1);
606                    index
607                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
608                        .into_iter()
609                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
610                        .collect()
611                }
612            }
613        } else if let Some(lazy_flat) = lazy_flat {
614            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
615            // Uses a top-k heap to avoid collecting and sorting all N candidates.
616            log::debug!(
617                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
618                field.0,
619                lazy_flat.num_vectors,
620                lazy_flat.dim,
621                lazy_flat.quantization
622            );
623            let dim = lazy_flat.dim;
624            let n = lazy_flat.num_vectors;
625            let quant = lazy_flat.quantization;
626            let fetch_k = k * rerank_factor.max(1);
627            let mut collector = crate::query::ScoreCollector::new(fetch_k);
628            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
629
630            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
631                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
632                let batch_bytes = lazy_flat
633                    .read_vectors_batch(batch_start, batch_count)
634                    .await
635                    .map_err(crate::Error::Io)?;
636                let raw = batch_bytes.as_slice();
637
638                Self::score_quantized_batch(
639                    query,
640                    raw,
641                    quant,
642                    dim,
643                    &mut scores[..batch_count],
644                    unit_norm,
645                );
646
647                for (i, &score) in scores.iter().enumerate().take(batch_count) {
648                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
649                    collector.insert_with_ordinal(doc_id, score, ordinal);
650                }
651            }
652
653            collector
654                .into_sorted_results()
655                .into_iter()
656                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
657                .collect()
658        } else {
659            return Ok(Vec::new());
660        };
661        let l1_elapsed = t0.elapsed();
662        log::debug!(
663            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
664            field.0,
665            results.len(),
666            l1_elapsed.as_secs_f64() * 1000.0
667        );
668
669        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
670        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
671        if ann_index.is_some()
672            && !results.is_empty()
673            && let Some(lazy_flat) = lazy_flat
674        {
675            let t_rerank = std::time::Instant::now();
676            let dim = lazy_flat.dim;
677            let quant = lazy_flat.quantization;
678            let vbs = lazy_flat.vector_byte_size();
679
680            // Resolve flat indexes for each candidate via binary search
681            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
682            for (ri, c) in results.iter().enumerate() {
683                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
684                for (j, &(_, ord)) in entries.iter().enumerate() {
685                    if ord == c.1 {
686                        resolved.push((ri, start + j));
687                        break;
688                    }
689                }
690            }
691
692            let t_resolve = t_rerank.elapsed();
693            if !resolved.is_empty() {
694                // Sort by flat_idx for sequential mmap access (better page locality)
695                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
696
697                // Batch-read raw quantized bytes into contiguous buffer
698                let t_read = std::time::Instant::now();
699                let mut raw_buf = vec![0u8; resolved.len() * vbs];
700                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
701                    let _ = lazy_flat
702                        .read_vector_raw_into(
703                            flat_idx,
704                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
705                        )
706                        .await;
707                }
708
709                let read_elapsed = t_read.elapsed();
710
711                // Native-precision batch SIMD cosine scoring
712                let t_score = std::time::Instant::now();
713                let mut scores = vec![0f32; resolved.len()];
714                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
715                let score_elapsed = t_score.elapsed();
716
717                // Write scores back to results
718                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
719                    results[ri].2 = scores[buf_idx];
720                }
721
722                log::debug!(
723                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
724                    field.0,
725                    resolved.len(),
726                    dim,
727                    quant,
728                    vbs,
729                    t_resolve.as_secs_f64() * 1000.0,
730                    read_elapsed.as_secs_f64() * 1000.0,
731                    score_elapsed.as_secs_f64() * 1000.0,
732                );
733            }
734
735            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
736            results.truncate(k * rerank_factor.max(1));
737            log::debug!(
738                "[search_dense] field {}: rerank total={:.1}ms",
739                field.0,
740                t_rerank.elapsed().as_secs_f64() * 1000.0
741            );
742        }
743
744        // Track ordinals with individual scores for each doc_id
745        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
746        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
747            rustc_hash::FxHashMap::default();
748        for (doc_id, ordinal, score) in results {
749            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
750            ordinals.push((ordinal as u32, score));
751        }
752
753        // Combine scores and build results with ordinal tracking
754        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
755            .into_iter()
756            .map(|(doc_id, ordinals)| {
757                let combined_score = combiner.combine(&ordinals);
758                VectorSearchResult::new(doc_id, combined_score, ordinals)
759            })
760            .collect();
761
762        // Sort by score descending and take top k
763        final_results.sort_by(|a, b| {
764            b.score
765                .partial_cmp(&a.score)
766                .unwrap_or(std::cmp::Ordering::Equal)
767        });
768        final_results.truncate(k);
769
770        Ok(final_results)
771    }
772
773    /// Check if this segment has dense vectors for the given field
774    pub fn has_dense_vector_index(&self, field: Field) -> bool {
775        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
776    }
777
778    /// Get the dense vector index for a field (if available)
779    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
780        match self.vector_indexes.get(&field.0) {
781            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
782            _ => None,
783        }
784    }
785
786    /// Get the IVF vector index for a field (if available)
787    pub fn get_ivf_vector_index(
788        &self,
789        field: Field,
790    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
791        match self.vector_indexes.get(&field.0) {
792            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
793            _ => None,
794        }
795    }
796
797    /// Get coarse centroids for a field
798    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
799        self.coarse_centroids.get(&field_id)
800    }
801
802    /// Set per-field coarse centroids from index-level trained structures
803    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
804        self.coarse_centroids = centroids;
805    }
806
807    /// Get the ScaNN vector index for a field (if available)
808    pub fn get_scann_vector_index(
809        &self,
810        field: Field,
811    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
812        match self.vector_indexes.get(&field.0) {
813            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
814            _ => None,
815        }
816    }
817
818    /// Get the vector index type for a field
819    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
820        self.vector_indexes.get(&field.0)
821    }
822
823    /// Search for similar sparse vectors using dedicated sparse posting lists
824    ///
825    /// Uses `BmpExecutor` or `SparseMaxScoreExecutor` for efficient top-k retrieval.
826    /// Optimizations:
827    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
828    /// 2. **Block-max pruning**: Skips blocks where max contribution < threshold
829    /// 3. **Top-K heap**: Efficient score collection
830    ///
831    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
832    pub async fn search_sparse_vector(
833        &self,
834        field: Field,
835        vector: &[(u32, f32)],
836        limit: usize,
837        combiner: crate::query::MultiValueCombiner,
838        heap_factor: f32,
839    ) -> Result<Vec<VectorSearchResult>> {
840        use crate::query::BmpExecutor;
841
842        let query_tokens = vector.len();
843
844        // Get sparse index for this field
845        let sparse_index = match self.sparse_indexes.get(&field.0) {
846            Some(idx) => idx,
847            None => {
848                log::debug!(
849                    "Sparse vector search: no index for field {}, returning empty",
850                    field.0
851                );
852                return Ok(Vec::new());
853            }
854        };
855
856        let index_dimensions = sparse_index.num_dimensions();
857
858        // Filter query terms to only those present in the index
859        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
860        let mut missing_count = 0usize;
861
862        for &(dim_id, query_weight) in vector {
863            if sparse_index.has_dimension(dim_id) {
864                matched_terms.push((dim_id, query_weight));
865            } else {
866                missing_count += 1;
867            }
868        }
869
870        log::debug!(
871            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
872            query_tokens,
873            matched_terms.len(),
874            missing_count,
875            index_dimensions
876        );
877
878        if matched_terms.is_empty() {
879            log::debug!("Sparse vector search: no matching tokens, returning empty");
880            return Ok(Vec::new());
881        }
882
883        // Select executor based on number of query terms:
884        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
885        // - 1-11 terms: SparseMaxScoreExecutor (cursor-based traversal + lazy block loading)
886        let num_terms = matched_terms.len();
887        let over_fetch = limit * 2; // Over-fetch for multi-value combining
888        let raw_results = if num_terms > 12 {
889            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
890            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
891                .execute()
892                .await?
893        } else {
894            // Lazy BlockMaxScore: skip entries drive navigation, blocks loaded on-demand.
895            // No upfront get_posting() — blocks only loaded when cursor actually visits them.
896            crate::query::SparseMaxScoreExecutor::new(
897                sparse_index,
898                matched_terms,
899                over_fetch,
900                heap_factor,
901            )
902            .execute()
903            .await?
904        };
905
906        log::trace!(
907            "Sparse search returned {} raw results for segment (doc_id_offset={})",
908            raw_results.len(),
909            self.doc_id_offset
910        );
911        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
912            for r in raw_results.iter().take(5) {
913                log::trace!(
914                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
915                    r.doc_id,
916                    r.doc_id + self.doc_id_offset,
917                    r.score,
918                    r.ordinal
919                );
920            }
921        }
922
923        // Track ordinals with individual scores for each doc_id
924        // Now using real ordinals from the posting lists
925        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
926            rustc_hash::FxHashMap::default();
927        for r in raw_results {
928            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
929            ordinals.push((r.ordinal as u32, r.score));
930        }
931
932        // Combine scores and build results with ordinal tracking
933        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
934        let mut results: Vec<VectorSearchResult> = doc_ordinals
935            .into_iter()
936            .map(|(doc_id, ordinals)| {
937                let combined_score = combiner.combine(&ordinals);
938                VectorSearchResult::new(doc_id, combined_score, ordinals)
939            })
940            .collect();
941
942        // Sort by score descending and take top limit
943        results.sort_by(|a, b| {
944            b.score
945                .partial_cmp(&a.score)
946                .unwrap_or(std::cmp::Ordering::Equal)
947        });
948        results.truncate(limit);
949
950        Ok(results)
951    }
952
953    /// Get positions for a term (for phrase queries)
954    ///
955    /// Position offsets are now embedded in TermInfo, so we first look up
956    /// the term to get its TermInfo, then use position_info() to get the offset.
957    pub async fn get_positions(
958        &self,
959        field: Field,
960        term: &[u8],
961    ) -> Result<Option<crate::structures::PositionPostingList>> {
962        // Get positions handle
963        let handle = match &self.positions_handle {
964            Some(h) => h,
965            None => return Ok(None),
966        };
967
968        // Build key: field_id + term
969        let mut key = Vec::with_capacity(4 + term.len());
970        key.extend_from_slice(&field.0.to_le_bytes());
971        key.extend_from_slice(term);
972
973        // Look up term in dictionary to get TermInfo with position offset
974        let term_info = match self.term_dict.get(&key).await? {
975            Some(info) => info,
976            None => return Ok(None),
977        };
978
979        // Get position offset from TermInfo
980        let (offset, length) = match term_info.position_info() {
981            Some((o, l)) => (o, l),
982            None => return Ok(None),
983        };
984
985        // Read the position data
986        let slice = handle.slice(offset..offset + length);
987        let data = slice.read_bytes().await?;
988
989        // Deserialize
990        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
991
992        Ok(Some(pos_list))
993    }
994
995    /// Check if positions are available for a field
996    pub fn has_positions(&self, field: Field) -> bool {
997        // Check schema for position mode on this field
998        if let Some(entry) = self.schema.get_field_entry(field) {
999            entry.positions.is_some()
1000        } else {
1001            false
1002        }
1003    }
1004}
1005
1006/// Alias for AsyncSegmentReader
1007pub type SegmentReader = AsyncSegmentReader;