Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use std::sync::Arc;
39
40use rustc_hash::FxHashMap;
41
42use super::vector_data::LazyFlatVectorData;
43use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
44use crate::dsl::{Document, Field, Schema};
45use crate::structures::{
46    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
47    RaBitQIndex, SSTableStats, TermInfo,
48};
49use crate::{DocId, Error, Result};
50
51use super::store::{AsyncStoreReader, RawStoreBlock};
52use super::types::{SegmentFiles, SegmentId, SegmentMeta};
53
54/// Async segment reader with lazy loading
55///
56/// - Term dictionary: only index loaded, blocks loaded on-demand
57/// - Postings: loaded on-demand per term via HTTP range requests
58/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
59pub struct AsyncSegmentReader {
60    meta: SegmentMeta,
61    /// Term dictionary with lazy block loading
62    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
63    /// Postings file handle - fetches ranges on demand
64    postings_handle: LazyFileHandle,
65    /// Document store with lazy block loading
66    store: Arc<AsyncStoreReader>,
67    schema: Arc<Schema>,
68    /// Base doc_id offset for this segment
69    doc_id_offset: DocId,
70    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
71    vector_indexes: FxHashMap<u32, VectorIndex>,
72    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
73    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
74    /// Per-field coarse centroids for IVF/ScaNN search
75    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76    /// Sparse vector indexes per field
77    sparse_indexes: FxHashMap<u32, SparseIndex>,
78    /// Position file handle for phrase queries (lazy loading)
79    positions_handle: Option<LazyFileHandle>,
80}
81
82impl AsyncSegmentReader {
83    /// Open a segment with lazy loading
84    pub async fn open<D: Directory>(
85        dir: &D,
86        segment_id: SegmentId,
87        schema: Arc<Schema>,
88        doc_id_offset: DocId,
89        cache_blocks: usize,
90    ) -> Result<Self> {
91        let files = SegmentFiles::new(segment_id.0);
92
93        // Read metadata (small, always loaded)
94        let meta_slice = dir.open_read(&files.meta).await?;
95        let meta_bytes = meta_slice.read_bytes().await?;
96        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
97        debug_assert_eq!(meta.id, segment_id.0);
98
99        // Open term dictionary with lazy loading (fetches ranges on demand)
100        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
101        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
102
103        // Get postings file handle (lazy - fetches ranges on demand)
104        let postings_handle = dir.open_lazy(&files.postings).await?;
105
106        // Open store with lazy loading
107        let store_handle = dir.open_lazy(&files.store).await?;
108        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
109
110        // Load dense vector indexes from unified .vectors file
111        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
112        let vector_indexes = vectors_data.indexes;
113        let flat_vectors = vectors_data.flat_vectors;
114
115        // Load sparse vector indexes from .sparse file
116        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
117
118        // Open positions file handle (if exists) - offsets are now in TermInfo
119        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
120
121        // Log segment loading stats
122        {
123            let mut parts = vec![format!(
124                "[segment] loaded {:016x}: docs={}",
125                segment_id.0, meta.num_docs
126            )];
127            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
128                parts.push(format!(
129                    "dense: {} ann + {} flat fields",
130                    vector_indexes.len(),
131                    flat_vectors.len()
132                ));
133            }
134            for (field_id, idx) in &sparse_indexes {
135                parts.push(format!(
136                    "sparse field {}: {} dims, ~{:.1} KB",
137                    field_id,
138                    idx.num_dimensions(),
139                    idx.num_dimensions() as f64 * 24.0 / 1024.0
140                ));
141            }
142            log::debug!("{}", parts.join(", "));
143        }
144
145        Ok(Self {
146            meta,
147            term_dict: Arc::new(term_dict),
148            postings_handle,
149            store: Arc::new(store),
150            schema,
151            doc_id_offset,
152            vector_indexes,
153            flat_vectors,
154            coarse_centroids: FxHashMap::default(),
155            sparse_indexes,
156            positions_handle,
157        })
158    }
159
160    pub fn meta(&self) -> &SegmentMeta {
161        &self.meta
162    }
163
164    pub fn num_docs(&self) -> u32 {
165        self.meta.num_docs
166    }
167
168    /// Get average field length for BM25F scoring
169    pub fn avg_field_len(&self, field: Field) -> f32 {
170        self.meta.avg_field_len(field)
171    }
172
173    pub fn doc_id_offset(&self) -> DocId {
174        self.doc_id_offset
175    }
176
177    /// Set the doc_id_offset (used for parallel segment loading)
178    pub fn set_doc_id_offset(&mut self, offset: DocId) {
179        self.doc_id_offset = offset;
180    }
181
182    pub fn schema(&self) -> &Schema {
183        &self.schema
184    }
185
186    /// Get sparse indexes for all fields
187    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
188        &self.sparse_indexes
189    }
190
191    /// Get vector indexes for all fields
192    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
193        &self.vector_indexes
194    }
195
196    /// Get lazy flat vectors for all fields (for reranking and merge)
197    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
198        &self.flat_vectors
199    }
200
201    /// Get term dictionary stats for debugging
202    pub fn term_dict_stats(&self) -> SSTableStats {
203        self.term_dict.stats()
204    }
205
206    /// Estimate memory usage of this segment reader
207    pub fn memory_stats(&self) -> SegmentMemoryStats {
208        let term_dict_stats = self.term_dict.stats();
209
210        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
211        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
212
213        // Store cache: similar estimate
214        let store_cache_bytes = self.store.cached_blocks() * 4096;
215
216        // Sparse index: SoA dim table + OwnedBytes skip section
217        let sparse_index_bytes: usize = self
218            .sparse_indexes
219            .values()
220            .map(|s| s.estimated_memory_bytes())
221            .sum();
222
223        // Dense index: vectors are memory-mapped, but we track index structures
224        // RaBitQ/IVF indexes have cluster assignments in memory
225        let dense_index_bytes: usize = self
226            .vector_indexes
227            .values()
228            .map(|v| v.estimated_memory_bytes())
229            .sum();
230
231        SegmentMemoryStats {
232            segment_id: self.meta.id,
233            num_docs: self.meta.num_docs,
234            term_dict_cache_bytes,
235            store_cache_bytes,
236            sparse_index_bytes,
237            dense_index_bytes,
238            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
239        }
240    }
241
242    /// Get posting list for a term (async - loads on demand)
243    ///
244    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
245    /// and no additional I/O is needed. For larger lists, reads from .post file.
246    pub async fn get_postings(
247        &self,
248        field: Field,
249        term: &[u8],
250    ) -> Result<Option<BlockPostingList>> {
251        log::debug!(
252            "SegmentReader::get_postings field={} term_len={}",
253            field.0,
254            term.len()
255        );
256
257        // Build key: field_id + term
258        let mut key = Vec::with_capacity(4 + term.len());
259        key.extend_from_slice(&field.0.to_le_bytes());
260        key.extend_from_slice(term);
261
262        // Look up in term dictionary
263        let term_info = match self.term_dict.get(&key).await? {
264            Some(info) => {
265                log::debug!("SegmentReader::get_postings found term_info");
266                info
267            }
268            None => {
269                log::debug!("SegmentReader::get_postings term not found");
270                return Ok(None);
271            }
272        };
273
274        // Check if posting list is inlined
275        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
276            // Build BlockPostingList from inline data (no I/O needed!)
277            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
278            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
279                posting_list.push(doc_id, tf);
280            }
281            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
282            return Ok(Some(block_list));
283        }
284
285        // External posting list - read from postings file handle (lazy - HTTP range request)
286        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
287            Error::Corruption("TermInfo has neither inline nor external data".to_string())
288        })?;
289
290        let start = posting_offset;
291        let end = start + posting_len as u64;
292
293        if end > self.postings_handle.len() {
294            return Err(Error::Corruption(
295                "Posting offset out of bounds".to_string(),
296            ));
297        }
298
299        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
300        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
301
302        Ok(Some(block_list))
303    }
304
305    /// Get document by local doc_id (async - loads on demand).
306    ///
307    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
308    /// Uses binary search on sorted doc_ids for O(log N) lookup.
309    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
310        self.doc_with_fields(local_doc_id, None).await
311    }
312
313    /// Get document by local doc_id, hydrating only the specified fields.
314    ///
315    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
316    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
317    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
318    pub async fn doc_with_fields(
319        &self,
320        local_doc_id: DocId,
321        fields: Option<&rustc_hash::FxHashSet<u32>>,
322    ) -> Result<Option<Document>> {
323        let mut doc = match fields {
324            Some(set) => {
325                let field_ids: Vec<u32> = set.iter().copied().collect();
326                match self
327                    .store
328                    .get_fields(local_doc_id, &self.schema, &field_ids)
329                    .await
330                {
331                    Ok(Some(d)) => d,
332                    Ok(None) => return Ok(None),
333                    Err(e) => return Err(Error::from(e)),
334                }
335            }
336            None => match self.store.get(local_doc_id, &self.schema).await {
337                Ok(Some(d)) => d,
338                Ok(None) => return Ok(None),
339                Err(e) => return Err(Error::from(e)),
340            },
341        };
342
343        // Hydrate dense vector fields from flat vector data
344        for (&field_id, lazy_flat) in &self.flat_vectors {
345            // Skip vector fields not in the requested set
346            if let Some(set) = fields
347                && !set.contains(&field_id)
348            {
349                continue;
350            }
351
352            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
353            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
354                let flat_idx = start + j;
355                match lazy_flat.get_vector(flat_idx).await {
356                    Ok(vec) => {
357                        doc.add_dense_vector(Field(field_id), vec);
358                    }
359                    Err(e) => {
360                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
361                    }
362                }
363            }
364        }
365
366        Ok(Some(doc))
367    }
368
369    /// Prefetch term dictionary blocks for a key range
370    pub async fn prefetch_terms(
371        &self,
372        field: Field,
373        start_term: &[u8],
374        end_term: &[u8],
375    ) -> Result<()> {
376        let mut start_key = Vec::with_capacity(4 + start_term.len());
377        start_key.extend_from_slice(&field.0.to_le_bytes());
378        start_key.extend_from_slice(start_term);
379
380        let mut end_key = Vec::with_capacity(4 + end_term.len());
381        end_key.extend_from_slice(&field.0.to_le_bytes());
382        end_key.extend_from_slice(end_term);
383
384        self.term_dict.prefetch_range(&start_key, &end_key).await?;
385        Ok(())
386    }
387
388    /// Check if store uses dictionary compression (incompatible with raw merging)
389    pub fn store_has_dict(&self) -> bool {
390        self.store.has_dict()
391    }
392
393    /// Get store reference for merge operations
394    pub fn store(&self) -> &super::store::AsyncStoreReader {
395        &self.store
396    }
397
398    /// Get raw store blocks for optimized merging
399    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
400        self.store.raw_blocks()
401    }
402
403    /// Get store data slice for raw block access
404    pub fn store_data_slice(&self) -> &LazyFileSlice {
405        self.store.data_slice()
406    }
407
408    /// Get all terms from this segment (for merge)
409    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
410        self.term_dict.all_entries().await.map_err(Error::from)
411    }
412
413    /// Get all terms with parsed field and term string (for statistics aggregation)
414    ///
415    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
416    /// Skips terms that aren't valid UTF-8.
417    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
418        let entries = self.term_dict.all_entries().await?;
419        let mut result = Vec::with_capacity(entries.len());
420
421        for (key, term_info) in entries {
422            // Key format: field_id (4 bytes little-endian) + term bytes
423            if key.len() > 4 {
424                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
425                let term_bytes = &key[4..];
426                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
427                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
428                }
429            }
430        }
431
432        Ok(result)
433    }
434
435    /// Get streaming iterator over term dictionary (for memory-efficient merge)
436    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
437        self.term_dict.iter()
438    }
439
440    /// Prefetch all term dictionary blocks in a single bulk I/O call.
441    ///
442    /// Call before merge iteration to eliminate per-block cache misses.
443    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
444        self.term_dict
445            .prefetch_all_data_bulk()
446            .await
447            .map_err(crate::Error::from)
448    }
449
450    /// Read raw posting bytes at offset
451    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
452        let start = offset;
453        let end = start + len as u64;
454        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
455        Ok(bytes.to_vec())
456    }
457
458    /// Read raw position bytes at offset (for merge)
459    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
460        let handle = match &self.positions_handle {
461            Some(h) => h,
462            None => return Ok(None),
463        };
464        let start = offset;
465        let end = start + len as u64;
466        let bytes = handle.read_bytes_range(start..end).await?;
467        Ok(Some(bytes.to_vec()))
468    }
469
470    /// Check if this segment has a positions file
471    pub fn has_positions_file(&self) -> bool {
472        self.positions_handle.is_some()
473    }
474
475    /// Batch cosine scoring on raw quantized bytes.
476    ///
477    /// Dispatches to the appropriate SIMD scorer based on quantization type.
478    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
479    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
480    fn score_quantized_batch(
481        query: &[f32],
482        raw: &[u8],
483        quant: crate::dsl::DenseVectorQuantization,
484        dim: usize,
485        scores: &mut [f32],
486        unit_norm: bool,
487    ) {
488        use crate::dsl::DenseVectorQuantization;
489        use crate::structures::simd;
490        match (quant, unit_norm) {
491            (DenseVectorQuantization::F32, false) => {
492                let num_floats = scores.len() * dim;
493                debug_assert!(
494                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
495                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
496                );
497                let vectors: &[f32] =
498                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
499                simd::batch_cosine_scores(query, vectors, dim, scores);
500            }
501            (DenseVectorQuantization::F32, true) => {
502                let num_floats = scores.len() * dim;
503                debug_assert!(
504                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
505                    "f32 vector data not 4-byte aligned"
506                );
507                let vectors: &[f32] =
508                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
509                simd::batch_dot_scores(query, vectors, dim, scores);
510            }
511            (DenseVectorQuantization::F16, false) => {
512                simd::batch_cosine_scores_f16(query, raw, dim, scores);
513            }
514            (DenseVectorQuantization::F16, true) => {
515                simd::batch_dot_scores_f16(query, raw, dim, scores);
516            }
517            (DenseVectorQuantization::UInt8, false) => {
518                simd::batch_cosine_scores_u8(query, raw, dim, scores);
519            }
520            (DenseVectorQuantization::UInt8, true) => {
521                simd::batch_dot_scores_u8(query, raw, dim, scores);
522            }
523        }
524    }
525
526    /// Search dense vectors using RaBitQ
527    ///
528    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
529    /// The doc_ids are adjusted by doc_id_offset for this segment.
530    /// For multi-valued documents, scores are combined using the specified combiner.
531    pub async fn search_dense_vector(
532        &self,
533        field: Field,
534        query: &[f32],
535        k: usize,
536        nprobe: usize,
537        rerank_factor: usize,
538        combiner: crate::query::MultiValueCombiner,
539    ) -> Result<Vec<VectorSearchResult>> {
540        let ann_index = self.vector_indexes.get(&field.0);
541        let lazy_flat = self.flat_vectors.get(&field.0);
542
543        // No vectors at all for this field
544        if ann_index.is_none() && lazy_flat.is_none() {
545            return Ok(Vec::new());
546        }
547
548        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
549        let unit_norm = self
550            .schema
551            .get_field_entry(field)
552            .and_then(|e| e.dense_vector_config.as_ref())
553            .is_some_and(|c| c.unit_norm);
554
555        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
556        const BRUTE_FORCE_BATCH: usize = 4096;
557
558        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
559        let t0 = std::time::Instant::now();
560        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
561            // ANN search (RaBitQ, IVF, ScaNN)
562            match index {
563                VectorIndex::RaBitQ(lazy) => {
564                    let rabitq = lazy.get().ok_or_else(|| {
565                        Error::Schema("RaBitQ index deserialization failed".to_string())
566                    })?;
567                    let fetch_k = k * rerank_factor.max(1);
568                    rabitq
569                        .search(query, fetch_k, rerank_factor)
570                        .into_iter()
571                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
572                        .collect()
573                }
574                VectorIndex::IVF(lazy) => {
575                    let (index, codebook) = lazy.get().ok_or_else(|| {
576                        Error::Schema("IVF index deserialization failed".to_string())
577                    })?;
578                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
579                        Error::Schema(format!(
580                            "IVF index requires coarse centroids for field {}",
581                            field.0
582                        ))
583                    })?;
584                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
585                    let fetch_k = k * rerank_factor.max(1);
586                    index
587                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
588                        .into_iter()
589                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
590                        .collect()
591                }
592                VectorIndex::ScaNN(lazy) => {
593                    let (index, codebook) = lazy.get().ok_or_else(|| {
594                        Error::Schema("ScaNN index deserialization failed".to_string())
595                    })?;
596                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
597                        Error::Schema(format!(
598                            "ScaNN index requires coarse centroids for field {}",
599                            field.0
600                        ))
601                    })?;
602                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
603                    let fetch_k = k * rerank_factor.max(1);
604                    index
605                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
606                        .into_iter()
607                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
608                        .collect()
609                }
610            }
611        } else if let Some(lazy_flat) = lazy_flat {
612            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
613            // Uses a top-k heap to avoid collecting and sorting all N candidates.
614            log::debug!(
615                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
616                field.0,
617                lazy_flat.num_vectors,
618                lazy_flat.dim,
619                lazy_flat.quantization
620            );
621            let dim = lazy_flat.dim;
622            let n = lazy_flat.num_vectors;
623            let quant = lazy_flat.quantization;
624            let fetch_k = k * rerank_factor.max(1);
625            let mut collector = crate::query::ScoreCollector::new(fetch_k);
626            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
627
628            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
629                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
630                let batch_bytes = lazy_flat
631                    .read_vectors_batch(batch_start, batch_count)
632                    .await
633                    .map_err(crate::Error::Io)?;
634                let raw = batch_bytes.as_slice();
635
636                Self::score_quantized_batch(
637                    query,
638                    raw,
639                    quant,
640                    dim,
641                    &mut scores[..batch_count],
642                    unit_norm,
643                );
644
645                for (i, &score) in scores.iter().enumerate().take(batch_count) {
646                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
647                    collector.insert_with_ordinal(doc_id, score, ordinal);
648                }
649            }
650
651            collector
652                .into_sorted_results()
653                .into_iter()
654                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
655                .collect()
656        } else {
657            return Ok(Vec::new());
658        };
659        let l1_elapsed = t0.elapsed();
660        log::debug!(
661            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
662            field.0,
663            results.len(),
664            l1_elapsed.as_secs_f64() * 1000.0
665        );
666
667        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
668        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
669        if ann_index.is_some()
670            && !results.is_empty()
671            && let Some(lazy_flat) = lazy_flat
672        {
673            let t_rerank = std::time::Instant::now();
674            let dim = lazy_flat.dim;
675            let quant = lazy_flat.quantization;
676            let vbs = lazy_flat.vector_byte_size();
677
678            // Resolve flat indexes for each candidate via binary search
679            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
680            for (ri, c) in results.iter().enumerate() {
681                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
682                for (j, &(_, ord)) in entries.iter().enumerate() {
683                    if ord == c.1 {
684                        resolved.push((ri, start + j));
685                        break;
686                    }
687                }
688            }
689
690            let t_resolve = t_rerank.elapsed();
691            if !resolved.is_empty() {
692                // Sort by flat_idx for sequential mmap access (better page locality)
693                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
694
695                // Batch-read raw quantized bytes into contiguous buffer
696                let t_read = std::time::Instant::now();
697                let mut raw_buf = vec![0u8; resolved.len() * vbs];
698                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
699                    let _ = lazy_flat
700                        .read_vector_raw_into(
701                            flat_idx,
702                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
703                        )
704                        .await;
705                }
706
707                let read_elapsed = t_read.elapsed();
708
709                // Native-precision batch SIMD cosine scoring
710                let t_score = std::time::Instant::now();
711                let mut scores = vec![0f32; resolved.len()];
712                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
713                let score_elapsed = t_score.elapsed();
714
715                // Write scores back to results
716                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
717                    results[ri].2 = scores[buf_idx];
718                }
719
720                log::debug!(
721                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
722                    field.0,
723                    resolved.len(),
724                    dim,
725                    quant,
726                    vbs,
727                    t_resolve.as_secs_f64() * 1000.0,
728                    read_elapsed.as_secs_f64() * 1000.0,
729                    score_elapsed.as_secs_f64() * 1000.0,
730                );
731            }
732
733            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
734            results.truncate(k * rerank_factor.max(1));
735            log::debug!(
736                "[search_dense] field {}: rerank total={:.1}ms",
737                field.0,
738                t_rerank.elapsed().as_secs_f64() * 1000.0
739            );
740        }
741
742        // Track ordinals with individual scores for each doc_id
743        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
744        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
745            rustc_hash::FxHashMap::default();
746        for (doc_id, ordinal, score) in results {
747            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
748            ordinals.push((ordinal as u32, score));
749        }
750
751        // Combine scores and build results with ordinal tracking
752        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
753            .into_iter()
754            .map(|(doc_id, ordinals)| {
755                let combined_score = combiner.combine(&ordinals);
756                VectorSearchResult::new(doc_id, combined_score, ordinals)
757            })
758            .collect();
759
760        // Sort by score descending and take top k
761        final_results.sort_by(|a, b| {
762            b.score
763                .partial_cmp(&a.score)
764                .unwrap_or(std::cmp::Ordering::Equal)
765        });
766        final_results.truncate(k);
767
768        Ok(final_results)
769    }
770
771    /// Check if this segment has dense vectors for the given field
772    pub fn has_dense_vector_index(&self, field: Field) -> bool {
773        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
774    }
775
776    /// Get the dense vector index for a field (if available)
777    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
778        match self.vector_indexes.get(&field.0) {
779            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
780            _ => None,
781        }
782    }
783
784    /// Get the IVF vector index for a field (if available)
785    pub fn get_ivf_vector_index(
786        &self,
787        field: Field,
788    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
789        match self.vector_indexes.get(&field.0) {
790            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
791            _ => None,
792        }
793    }
794
795    /// Get coarse centroids for a field
796    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
797        self.coarse_centroids.get(&field_id)
798    }
799
800    /// Set per-field coarse centroids from index-level trained structures
801    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
802        self.coarse_centroids = centroids;
803    }
804
805    /// Get the ScaNN vector index for a field (if available)
806    pub fn get_scann_vector_index(
807        &self,
808        field: Field,
809    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
810        match self.vector_indexes.get(&field.0) {
811            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
812            _ => None,
813        }
814    }
815
816    /// Get the vector index type for a field
817    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
818        self.vector_indexes.get(&field.0)
819    }
820
821    /// Search for similar sparse vectors using dedicated sparse posting lists
822    ///
823    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
824    /// Optimizations (via WandExecutor):
825    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
826    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
827    /// 3. **Top-K heap**: Efficient score collection
828    ///
829    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
830    pub async fn search_sparse_vector(
831        &self,
832        field: Field,
833        vector: &[(u32, f32)],
834        limit: usize,
835        combiner: crate::query::MultiValueCombiner,
836        heap_factor: f32,
837    ) -> Result<Vec<VectorSearchResult>> {
838        use crate::query::BmpExecutor;
839
840        let query_tokens = vector.len();
841
842        // Get sparse index for this field
843        let sparse_index = match self.sparse_indexes.get(&field.0) {
844            Some(idx) => idx,
845            None => {
846                log::debug!(
847                    "Sparse vector search: no index for field {}, returning empty",
848                    field.0
849                );
850                return Ok(Vec::new());
851            }
852        };
853
854        let index_dimensions = sparse_index.num_dimensions();
855
856        // Filter query terms to only those present in the index
857        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
858        let mut missing_count = 0usize;
859
860        for &(dim_id, query_weight) in vector {
861            if sparse_index.has_dimension(dim_id) {
862                matched_terms.push((dim_id, query_weight));
863            } else {
864                missing_count += 1;
865            }
866        }
867
868        log::debug!(
869            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
870            query_tokens,
871            matched_terms.len(),
872            missing_count,
873            index_dimensions
874        );
875
876        if matched_terms.is_empty() {
877            log::debug!("Sparse vector search: no matching tokens, returning empty");
878            return Ok(Vec::new());
879        }
880
881        // Select executor based on number of query terms:
882        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
883        // - 1-11 terms: LazyBlockMaxScoreExecutor (cursor-based traversal + lazy block loading)
884        let num_terms = matched_terms.len();
885        let over_fetch = limit * 2; // Over-fetch for multi-value combining
886        let raw_results = if num_terms > 12 {
887            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
888            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
889                .execute()
890                .await?
891        } else {
892            // Lazy BlockMaxScore: skip entries drive navigation, blocks loaded on-demand.
893            // No upfront get_posting() — blocks only loaded when cursor actually visits them.
894            crate::query::LazyBlockMaxScoreExecutor::new(
895                sparse_index,
896                matched_terms,
897                over_fetch,
898                heap_factor,
899            )
900            .execute()
901            .await?
902        };
903
904        log::trace!(
905            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
906            raw_results.len(),
907            self.doc_id_offset
908        );
909        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
910            for r in raw_results.iter().take(5) {
911                log::trace!(
912                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
913                    r.doc_id,
914                    r.doc_id + self.doc_id_offset,
915                    r.score,
916                    r.ordinal
917                );
918            }
919        }
920
921        // Track ordinals with individual scores for each doc_id
922        // Now using real ordinals from the posting lists
923        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
924            rustc_hash::FxHashMap::default();
925        for r in raw_results {
926            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
927            ordinals.push((r.ordinal as u32, r.score));
928        }
929
930        // Combine scores and build results with ordinal tracking
931        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
932        let mut results: Vec<VectorSearchResult> = doc_ordinals
933            .into_iter()
934            .map(|(doc_id, ordinals)| {
935                let combined_score = combiner.combine(&ordinals);
936                VectorSearchResult::new(doc_id, combined_score, ordinals)
937            })
938            .collect();
939
940        // Sort by score descending and take top limit
941        results.sort_by(|a, b| {
942            b.score
943                .partial_cmp(&a.score)
944                .unwrap_or(std::cmp::Ordering::Equal)
945        });
946        results.truncate(limit);
947
948        Ok(results)
949    }
950
951    /// Get positions for a term (for phrase queries)
952    ///
953    /// Position offsets are now embedded in TermInfo, so we first look up
954    /// the term to get its TermInfo, then use position_info() to get the offset.
955    pub async fn get_positions(
956        &self,
957        field: Field,
958        term: &[u8],
959    ) -> Result<Option<crate::structures::PositionPostingList>> {
960        // Get positions handle
961        let handle = match &self.positions_handle {
962            Some(h) => h,
963            None => return Ok(None),
964        };
965
966        // Build key: field_id + term
967        let mut key = Vec::with_capacity(4 + term.len());
968        key.extend_from_slice(&field.0.to_le_bytes());
969        key.extend_from_slice(term);
970
971        // Look up term in dictionary to get TermInfo with position offset
972        let term_info = match self.term_dict.get(&key).await? {
973            Some(info) => info,
974            None => return Ok(None),
975        };
976
977        // Get position offset from TermInfo
978        let (offset, length) = match term_info.position_info() {
979            Some((o, l)) => (o, l),
980            None => return Ok(None),
981        };
982
983        // Read the position data
984        let slice = handle.slice(offset..offset + length as u64);
985        let data = slice.read_bytes().await?;
986
987        // Deserialize
988        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
989
990        Ok(Some(pos_list))
991    }
992
993    /// Check if positions are available for a field
994    pub fn has_positions(&self, field: Field) -> bool {
995        // Check schema for position mode on this field
996        if let Some(entry) = self.schema.get_field_entry(field) {
997            entry.positions.is_some()
998        } else {
999            false
1000        }
1001    }
1002}
1003
1004/// Alias for AsyncSegmentReader
1005pub type SegmentReader = AsyncSegmentReader;