Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10/// Memory statistics for a single segment
11#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13    /// Segment ID
14    pub segment_id: u128,
15    /// Number of documents in segment
16    pub num_docs: u32,
17    /// Term dictionary block cache bytes
18    pub term_dict_cache_bytes: usize,
19    /// Document store block cache bytes
20    pub store_cache_bytes: usize,
21    /// Sparse vector index bytes (in-memory posting lists)
22    pub sparse_index_bytes: usize,
23    /// Dense vector index bytes (cluster assignments, quantized codes)
24    pub dense_index_bytes: usize,
25    /// Bloom filter bytes
26    pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30    /// Total estimated memory for this segment
31    pub fn total_bytes(&self) -> usize {
32        self.term_dict_cache_bytes
33            + self.store_cache_bytes
34            + self.sparse_index_bytes
35            + self.dense_index_bytes
36            + self.bloom_filter_bytes
37    }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: FileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
71    vector_indexes: FxHashMap<u32, VectorIndex>,
72    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
73    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
74    /// Per-field coarse centroids for IVF/ScaNN search
75    coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76    /// Sparse vector indexes per field
77    sparse_indexes: FxHashMap<u32, SparseIndex>,
78    /// Position file handle for phrase queries (lazy loading)
79    positions_handle: Option<FileHandle>,
80    /// Fast-field columnar readers per field_id
81    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        cache_blocks: usize,
91    ) -> Result<Self> {
92        let files = SegmentFiles::new(segment_id.0);
93
94        // Read metadata (small, always loaded)
95        let meta_slice = dir.open_read(&files.meta).await?;
96        let meta_bytes = meta_slice.read_bytes().await?;
97        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
98        debug_assert_eq!(meta.id, segment_id.0);
99
100        // Open term dictionary with lazy loading (fetches ranges on demand)
101        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
102        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
103
104        // Get postings file handle (lazy - fetches ranges on demand)
105        let postings_handle = dir.open_lazy(&files.postings).await?;
106
107        // Open store with lazy loading
108        let store_handle = dir.open_lazy(&files.store).await?;
109        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
110
111        // Load dense vector indexes from unified .vectors file
112        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
113        let vector_indexes = vectors_data.indexes;
114        let flat_vectors = vectors_data.flat_vectors;
115
116        // Load sparse vector indexes from .sparse file
117        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
118
119        // Open positions file handle (if exists) - offsets are now in TermInfo
120        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
121
122        // Load fast-field columns from .fast file
123        let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
124
125        // Log segment loading stats
126        {
127            let mut parts = vec![format!(
128                "[segment] loaded {:016x}: docs={}",
129                segment_id.0, meta.num_docs
130            )];
131            if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
132                parts.push(format!(
133                    "dense: {} ann + {} flat fields",
134                    vector_indexes.len(),
135                    flat_vectors.len()
136                ));
137            }
138            for (field_id, idx) in &sparse_indexes {
139                parts.push(format!(
140                    "sparse field {}: {} dims, ~{:.1} KB",
141                    field_id,
142                    idx.num_dimensions(),
143                    idx.num_dimensions() as f64 * 24.0 / 1024.0
144                ));
145            }
146            if !fast_fields.is_empty() {
147                parts.push(format!("fast: {} fields", fast_fields.len()));
148            }
149            log::debug!("{}", parts.join(", "));
150        }
151
152        Ok(Self {
153            meta,
154            term_dict: Arc::new(term_dict),
155            postings_handle,
156            store: Arc::new(store),
157            schema,
158            vector_indexes,
159            flat_vectors,
160            coarse_centroids: FxHashMap::default(),
161            sparse_indexes,
162            positions_handle,
163            fast_fields,
164        })
165    }
166
167    pub fn meta(&self) -> &SegmentMeta {
168        &self.meta
169    }
170
171    pub fn num_docs(&self) -> u32 {
172        self.meta.num_docs
173    }
174
175    /// Get average field length for BM25F scoring
176    pub fn avg_field_len(&self, field: Field) -> f32 {
177        self.meta.avg_field_len(field)
178    }
179
180    pub fn schema(&self) -> &Schema {
181        &self.schema
182    }
183
184    /// Get sparse indexes for all fields
185    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
186        &self.sparse_indexes
187    }
188
189    /// Get vector indexes for all fields
190    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
191        &self.vector_indexes
192    }
193
194    /// Get lazy flat vectors for all fields (for reranking and merge)
195    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
196        &self.flat_vectors
197    }
198
199    /// Get a fast-field reader for a specific field.
200    pub fn fast_field(
201        &self,
202        field_id: u32,
203    ) -> Option<&crate::structures::fast_field::FastFieldReader> {
204        self.fast_fields.get(&field_id)
205    }
206
207    /// Get all fast-field readers.
208    pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
209        &self.fast_fields
210    }
211
212    /// Get term dictionary stats for debugging
213    pub fn term_dict_stats(&self) -> SSTableStats {
214        self.term_dict.stats()
215    }
216
217    /// Estimate memory usage of this segment reader
218    pub fn memory_stats(&self) -> SegmentMemoryStats {
219        let term_dict_stats = self.term_dict.stats();
220
221        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
222        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
223
224        // Store cache: similar estimate
225        let store_cache_bytes = self.store.cached_blocks() * 4096;
226
227        // Sparse index: SoA dim table + OwnedBytes skip section
228        let sparse_index_bytes: usize = self
229            .sparse_indexes
230            .values()
231            .map(|s| s.estimated_memory_bytes())
232            .sum();
233
234        // Dense index: vectors are memory-mapped, but we track index structures
235        // RaBitQ/IVF indexes have cluster assignments in memory
236        let dense_index_bytes: usize = self
237            .vector_indexes
238            .values()
239            .map(|v| v.estimated_memory_bytes())
240            .sum();
241
242        SegmentMemoryStats {
243            segment_id: self.meta.id,
244            num_docs: self.meta.num_docs,
245            term_dict_cache_bytes,
246            store_cache_bytes,
247            sparse_index_bytes,
248            dense_index_bytes,
249            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
250        }
251    }
252
253    /// Get posting list for a term (async - loads on demand)
254    ///
255    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
256    /// and no additional I/O is needed. For larger lists, reads from .post file.
257    pub async fn get_postings(
258        &self,
259        field: Field,
260        term: &[u8],
261    ) -> Result<Option<BlockPostingList>> {
262        log::debug!(
263            "SegmentReader::get_postings field={} term_len={}",
264            field.0,
265            term.len()
266        );
267
268        // Build key: field_id + term
269        let mut key = Vec::with_capacity(4 + term.len());
270        key.extend_from_slice(&field.0.to_le_bytes());
271        key.extend_from_slice(term);
272
273        // Look up in term dictionary
274        let term_info = match self.term_dict.get(&key).await? {
275            Some(info) => {
276                log::debug!("SegmentReader::get_postings found term_info");
277                info
278            }
279            None => {
280                log::debug!("SegmentReader::get_postings term not found");
281                return Ok(None);
282            }
283        };
284
285        // Check if posting list is inlined
286        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
287            // Build BlockPostingList from inline data (no I/O needed!)
288            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
289            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
290                posting_list.push(doc_id, tf);
291            }
292            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
293            return Ok(Some(block_list));
294        }
295
296        // External posting list - read from postings file handle (lazy - HTTP range request)
297        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
298            Error::Corruption("TermInfo has neither inline nor external data".to_string())
299        })?;
300
301        let start = posting_offset;
302        let end = start + posting_len;
303
304        if end > self.postings_handle.len() {
305            return Err(Error::Corruption(
306                "Posting offset out of bounds".to_string(),
307            ));
308        }
309
310        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
311        let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
312
313        Ok(Some(block_list))
314    }
315
316    /// Get document by local doc_id (async - loads on demand).
317    ///
318    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
319    /// Uses binary search on sorted doc_ids for O(log N) lookup.
320    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
321        self.doc_with_fields(local_doc_id, None).await
322    }
323
324    /// Get document by local doc_id, hydrating only the specified fields.
325    ///
326    /// If `fields` is `None`, all fields (including dense vectors) are hydrated.
327    /// If `fields` is `Some(set)`, only dense vector fields in the set are hydrated,
328    /// skipping expensive mmap reads + dequantization for unrequested vector fields.
329    pub async fn doc_with_fields(
330        &self,
331        local_doc_id: DocId,
332        fields: Option<&rustc_hash::FxHashSet<u32>>,
333    ) -> Result<Option<Document>> {
334        let mut doc = match fields {
335            Some(set) => {
336                let field_ids: Vec<u32> = set.iter().copied().collect();
337                match self
338                    .store
339                    .get_fields(local_doc_id, &self.schema, &field_ids)
340                    .await
341                {
342                    Ok(Some(d)) => d,
343                    Ok(None) => return Ok(None),
344                    Err(e) => return Err(Error::from(e)),
345                }
346            }
347            None => match self.store.get(local_doc_id, &self.schema).await {
348                Ok(Some(d)) => d,
349                Ok(None) => return Ok(None),
350                Err(e) => return Err(Error::from(e)),
351            },
352        };
353
354        // Hydrate dense vector fields from flat vector data
355        for (&field_id, lazy_flat) in &self.flat_vectors {
356            // Skip vector fields not in the requested set
357            if let Some(set) = fields
358                && !set.contains(&field_id)
359            {
360                continue;
361            }
362
363            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
364            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
365                let flat_idx = start + j;
366                match lazy_flat.get_vector(flat_idx).await {
367                    Ok(vec) => {
368                        doc.add_dense_vector(Field(field_id), vec);
369                    }
370                    Err(e) => {
371                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
372                    }
373                }
374            }
375        }
376
377        Ok(Some(doc))
378    }
379
380    /// Prefetch term dictionary blocks for a key range
381    pub async fn prefetch_terms(
382        &self,
383        field: Field,
384        start_term: &[u8],
385        end_term: &[u8],
386    ) -> Result<()> {
387        let mut start_key = Vec::with_capacity(4 + start_term.len());
388        start_key.extend_from_slice(&field.0.to_le_bytes());
389        start_key.extend_from_slice(start_term);
390
391        let mut end_key = Vec::with_capacity(4 + end_term.len());
392        end_key.extend_from_slice(&field.0.to_le_bytes());
393        end_key.extend_from_slice(end_term);
394
395        self.term_dict.prefetch_range(&start_key, &end_key).await?;
396        Ok(())
397    }
398
399    /// Check if store uses dictionary compression (incompatible with raw merging)
400    pub fn store_has_dict(&self) -> bool {
401        self.store.has_dict()
402    }
403
404    /// Get store reference for merge operations
405    pub fn store(&self) -> &super::store::AsyncStoreReader {
406        &self.store
407    }
408
409    /// Get raw store blocks for optimized merging
410    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
411        self.store.raw_blocks()
412    }
413
414    /// Get store data slice for raw block access
415    pub fn store_data_slice(&self) -> &FileHandle {
416        self.store.data_slice()
417    }
418
419    /// Get all terms from this segment (for merge)
420    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
421        self.term_dict.all_entries().await.map_err(Error::from)
422    }
423
424    /// Get all terms with parsed field and term string (for statistics aggregation)
425    ///
426    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
427    /// Skips terms that aren't valid UTF-8.
428    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
429        let entries = self.term_dict.all_entries().await?;
430        let mut result = Vec::with_capacity(entries.len());
431
432        for (key, term_info) in entries {
433            // Key format: field_id (4 bytes little-endian) + term bytes
434            if key.len() > 4 {
435                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
436                let term_bytes = &key[4..];
437                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
438                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
439                }
440            }
441        }
442
443        Ok(result)
444    }
445
446    /// Get streaming iterator over term dictionary (for memory-efficient merge)
447    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
448        self.term_dict.iter()
449    }
450
451    /// Prefetch all term dictionary blocks in a single bulk I/O call.
452    ///
453    /// Call before merge iteration to eliminate per-block cache misses.
454    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
455        self.term_dict
456            .prefetch_all_data_bulk()
457            .await
458            .map_err(crate::Error::from)
459    }
460
461    /// Read raw posting bytes at offset
462    pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
463        let start = offset;
464        let end = start + len;
465        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
466        Ok(bytes.to_vec())
467    }
468
469    /// Read raw position bytes at offset (for merge)
470    pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
471        let handle = match &self.positions_handle {
472            Some(h) => h,
473            None => return Ok(None),
474        };
475        let start = offset;
476        let end = start + len;
477        let bytes = handle.read_bytes_range(start..end).await?;
478        Ok(Some(bytes.to_vec()))
479    }
480
481    /// Check if this segment has a positions file
482    pub fn has_positions_file(&self) -> bool {
483        self.positions_handle.is_some()
484    }
485
486    /// Batch cosine scoring on raw quantized bytes.
487    ///
488    /// Dispatches to the appropriate SIMD scorer based on quantization type.
489    /// Vectors file uses data-first layout (offset 0) with 8-byte padding between
490    /// fields, so mmap slices are always properly aligned for f32/f16/u8 access.
491    fn score_quantized_batch(
492        query: &[f32],
493        raw: &[u8],
494        quant: crate::dsl::DenseVectorQuantization,
495        dim: usize,
496        scores: &mut [f32],
497        unit_norm: bool,
498    ) {
499        use crate::dsl::DenseVectorQuantization;
500        use crate::structures::simd;
501        match (quant, unit_norm) {
502            (DenseVectorQuantization::F32, false) => {
503                let num_floats = scores.len() * dim;
504                debug_assert!(
505                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
506                    "f32 vector data not 4-byte aligned — vectors file may use legacy format"
507                );
508                let vectors: &[f32] =
509                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
510                simd::batch_cosine_scores(query, vectors, dim, scores);
511            }
512            (DenseVectorQuantization::F32, true) => {
513                let num_floats = scores.len() * dim;
514                debug_assert!(
515                    (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
516                    "f32 vector data not 4-byte aligned"
517                );
518                let vectors: &[f32] =
519                    unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
520                simd::batch_dot_scores(query, vectors, dim, scores);
521            }
522            (DenseVectorQuantization::F16, false) => {
523                simd::batch_cosine_scores_f16(query, raw, dim, scores);
524            }
525            (DenseVectorQuantization::F16, true) => {
526                simd::batch_dot_scores_f16(query, raw, dim, scores);
527            }
528            (DenseVectorQuantization::UInt8, false) => {
529                simd::batch_cosine_scores_u8(query, raw, dim, scores);
530            }
531            (DenseVectorQuantization::UInt8, true) => {
532                simd::batch_dot_scores_u8(query, raw, dim, scores);
533            }
534        }
535    }
536
537    /// Search dense vectors using RaBitQ
538    ///
539    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
540    /// Doc IDs are segment-local.
541    /// For multi-valued documents, scores are combined using the specified combiner.
542    pub async fn search_dense_vector(
543        &self,
544        field: Field,
545        query: &[f32],
546        k: usize,
547        nprobe: usize,
548        rerank_factor: usize,
549        combiner: crate::query::MultiValueCombiner,
550    ) -> Result<Vec<VectorSearchResult>> {
551        let ann_index = self.vector_indexes.get(&field.0);
552        let lazy_flat = self.flat_vectors.get(&field.0);
553
554        // No vectors at all for this field
555        if ann_index.is_none() && lazy_flat.is_none() {
556            return Ok(Vec::new());
557        }
558
559        // Check if vectors are pre-normalized (skip per-vector norm in scoring)
560        let unit_norm = self
561            .schema
562            .get_field_entry(field)
563            .and_then(|e| e.dense_vector_config.as_ref())
564            .is_some_and(|c| c.unit_norm);
565
566        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
567        const BRUTE_FORCE_BATCH: usize = 4096;
568
569        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
570        let t0 = std::time::Instant::now();
571        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
572            // ANN search (RaBitQ, IVF, ScaNN)
573            match index {
574                VectorIndex::RaBitQ(lazy) => {
575                    let rabitq = lazy.get().ok_or_else(|| {
576                        Error::Schema("RaBitQ index deserialization failed".to_string())
577                    })?;
578                    let fetch_k = k * rerank_factor.max(1);
579                    rabitq
580                        .search(query, fetch_k, rerank_factor)
581                        .into_iter()
582                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
583                        .collect()
584                }
585                VectorIndex::IVF(lazy) => {
586                    let (index, codebook) = lazy.get().ok_or_else(|| {
587                        Error::Schema("IVF index deserialization failed".to_string())
588                    })?;
589                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
590                        Error::Schema(format!(
591                            "IVF index requires coarse centroids for field {}",
592                            field.0
593                        ))
594                    })?;
595                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
596                    let fetch_k = k * rerank_factor.max(1);
597                    index
598                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
599                        .into_iter()
600                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
601                        .collect()
602                }
603                VectorIndex::ScaNN(lazy) => {
604                    let (index, codebook) = lazy.get().ok_or_else(|| {
605                        Error::Schema("ScaNN index deserialization failed".to_string())
606                    })?;
607                    let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
608                        Error::Schema(format!(
609                            "ScaNN index requires coarse centroids for field {}",
610                            field.0
611                        ))
612                    })?;
613                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
614                    let fetch_k = k * rerank_factor.max(1);
615                    index
616                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
617                        .into_iter()
618                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
619                        .collect()
620                }
621            }
622        } else if let Some(lazy_flat) = lazy_flat {
623            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
624            // Uses a top-k heap to avoid collecting and sorting all N candidates.
625            log::debug!(
626                "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
627                field.0,
628                lazy_flat.num_vectors,
629                lazy_flat.dim,
630                lazy_flat.quantization
631            );
632            let dim = lazy_flat.dim;
633            let n = lazy_flat.num_vectors;
634            let quant = lazy_flat.quantization;
635            let fetch_k = k * rerank_factor.max(1);
636            let mut collector = crate::query::ScoreCollector::new(fetch_k);
637            let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
638
639            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
640                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
641                let batch_bytes = lazy_flat
642                    .read_vectors_batch(batch_start, batch_count)
643                    .await
644                    .map_err(crate::Error::Io)?;
645                let raw = batch_bytes.as_slice();
646
647                Self::score_quantized_batch(
648                    query,
649                    raw,
650                    quant,
651                    dim,
652                    &mut scores[..batch_count],
653                    unit_norm,
654                );
655
656                for (i, &score) in scores.iter().enumerate().take(batch_count) {
657                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
658                    collector.insert_with_ordinal(doc_id, score, ordinal);
659                }
660            }
661
662            collector
663                .into_sorted_results()
664                .into_iter()
665                .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
666                .collect()
667        } else {
668            return Ok(Vec::new());
669        };
670        let l1_elapsed = t0.elapsed();
671        log::debug!(
672            "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
673            field.0,
674            results.len(),
675            l1_elapsed.as_secs_f64() * 1000.0
676        );
677
678        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
679        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
680        if ann_index.is_some()
681            && !results.is_empty()
682            && let Some(lazy_flat) = lazy_flat
683        {
684            let t_rerank = std::time::Instant::now();
685            let dim = lazy_flat.dim;
686            let quant = lazy_flat.quantization;
687            let vbs = lazy_flat.vector_byte_size();
688
689            // Resolve flat indexes for each candidate via binary search
690            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
691            for (ri, c) in results.iter().enumerate() {
692                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
693                for (j, &(_, ord)) in entries.iter().enumerate() {
694                    if ord == c.1 {
695                        resolved.push((ri, start + j));
696                        break;
697                    }
698                }
699            }
700
701            let t_resolve = t_rerank.elapsed();
702            if !resolved.is_empty() {
703                // Sort by flat_idx for sequential mmap access (better page locality)
704                resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
705
706                // Batch-read raw quantized bytes into contiguous buffer
707                let t_read = std::time::Instant::now();
708                let mut raw_buf = vec![0u8; resolved.len() * vbs];
709                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
710                    let _ = lazy_flat
711                        .read_vector_raw_into(
712                            flat_idx,
713                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
714                        )
715                        .await;
716                }
717
718                let read_elapsed = t_read.elapsed();
719
720                // Native-precision batch SIMD cosine scoring
721                let t_score = std::time::Instant::now();
722                let mut scores = vec![0f32; resolved.len()];
723                Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
724                let score_elapsed = t_score.elapsed();
725
726                // Write scores back to results
727                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
728                    results[ri].2 = scores[buf_idx];
729                }
730
731                log::debug!(
732                    "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
733                    field.0,
734                    resolved.len(),
735                    dim,
736                    quant,
737                    vbs,
738                    t_resolve.as_secs_f64() * 1000.0,
739                    read_elapsed.as_secs_f64() * 1000.0,
740                    score_elapsed.as_secs_f64() * 1000.0,
741                );
742            }
743
744            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
745            results.truncate(k * rerank_factor.max(1));
746            log::debug!(
747                "[search_dense] field {}: rerank total={:.1}ms",
748                field.0,
749                t_rerank.elapsed().as_secs_f64() * 1000.0
750            );
751        }
752
753        // Track ordinals with individual scores for each doc_id
754        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
755            rustc_hash::FxHashMap::default();
756        for (doc_id, ordinal, score) in results {
757            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
758            ordinals.push((ordinal as u32, score));
759        }
760
761        // Combine scores and build results with ordinal tracking
762        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
763            .into_iter()
764            .map(|(doc_id, ordinals)| {
765                let combined_score = combiner.combine(&ordinals);
766                VectorSearchResult::new(doc_id, combined_score, ordinals)
767            })
768            .collect();
769
770        // Sort by score descending and take top k
771        final_results.sort_by(|a, b| {
772            b.score
773                .partial_cmp(&a.score)
774                .unwrap_or(std::cmp::Ordering::Equal)
775        });
776        final_results.truncate(k);
777
778        Ok(final_results)
779    }
780
781    /// Check if this segment has dense vectors for the given field
782    pub fn has_dense_vector_index(&self, field: Field) -> bool {
783        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
784    }
785
786    /// Get the dense vector index for a field (if available)
787    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
788        match self.vector_indexes.get(&field.0) {
789            Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
790            _ => None,
791        }
792    }
793
794    /// Get the IVF vector index for a field (if available)
795    pub fn get_ivf_vector_index(
796        &self,
797        field: Field,
798    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
799        match self.vector_indexes.get(&field.0) {
800            Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
801            _ => None,
802        }
803    }
804
805    /// Get coarse centroids for a field
806    pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
807        self.coarse_centroids.get(&field_id)
808    }
809
810    /// Set per-field coarse centroids from index-level trained structures
811    pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
812        self.coarse_centroids = centroids;
813    }
814
815    /// Get the ScaNN vector index for a field (if available)
816    pub fn get_scann_vector_index(
817        &self,
818        field: Field,
819    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
820        match self.vector_indexes.get(&field.0) {
821            Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
822            _ => None,
823        }
824    }
825
826    /// Get the vector index type for a field
827    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
828        self.vector_indexes.get(&field.0)
829    }
830
831    /// Search for similar sparse vectors using dedicated sparse posting lists
832    ///
833    /// Uses `BmpExecutor` or `SparseMaxScoreExecutor` for efficient top-k retrieval.
834    /// Optimizations:
835    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
836    /// 2. **Block-max pruning**: Skips blocks where max contribution < threshold
837    /// 3. **Top-K heap**: Efficient score collection
838    ///
839    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
840    pub async fn search_sparse_vector(
841        &self,
842        field: Field,
843        vector: &[(u32, f32)],
844        limit: usize,
845        combiner: crate::query::MultiValueCombiner,
846        heap_factor: f32,
847    ) -> Result<Vec<VectorSearchResult>> {
848        use crate::query::BmpExecutor;
849
850        let query_tokens = vector.len();
851
852        // Get sparse index for this field
853        let sparse_index = match self.sparse_indexes.get(&field.0) {
854            Some(idx) => idx,
855            None => {
856                log::debug!(
857                    "Sparse vector search: no index for field {}, returning empty",
858                    field.0
859                );
860                return Ok(Vec::new());
861            }
862        };
863
864        let index_dimensions = sparse_index.num_dimensions();
865
866        // Filter query terms to only those present in the index
867        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
868        let mut missing_count = 0usize;
869
870        for &(dim_id, query_weight) in vector {
871            if sparse_index.has_dimension(dim_id) {
872                matched_terms.push((dim_id, query_weight));
873            } else {
874                missing_count += 1;
875            }
876        }
877
878        log::debug!(
879            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
880            query_tokens,
881            matched_terms.len(),
882            missing_count,
883            index_dimensions
884        );
885
886        if matched_terms.is_empty() {
887            log::debug!("Sparse vector search: no matching tokens, returning empty");
888            return Ok(Vec::new());
889        }
890
891        // Select executor based on number of query terms:
892        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
893        // - 1-11 terms: SparseMaxScoreExecutor (cursor-based traversal + lazy block loading)
894        let num_terms = matched_terms.len();
895        let over_fetch = limit * 2; // Over-fetch for multi-value combining
896        let raw_results = if num_terms > 12 {
897            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
898            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
899                .execute()
900                .await?
901        } else {
902            // Lazy BlockMaxScore: skip entries drive navigation, blocks loaded on-demand.
903            // No upfront get_posting() — blocks only loaded when cursor actually visits them.
904            crate::query::SparseMaxScoreExecutor::new(
905                sparse_index,
906                matched_terms,
907                over_fetch,
908                heap_factor,
909            )
910            .execute()
911            .await?
912        };
913
914        log::trace!(
915            "Sparse search returned {} raw results for segment {:016x}",
916            raw_results.len(),
917            self.meta.id
918        );
919        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
920            for r in raw_results.iter().take(5) {
921                log::trace!(
922                    "  Raw result: doc_id={}, score={:.4}, ordinal={}",
923                    r.doc_id,
924                    r.score,
925                    r.ordinal
926                );
927            }
928        }
929
930        // Track ordinals with individual scores for each doc_id
931        // Now using real ordinals from the posting lists
932        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
933            rustc_hash::FxHashMap::default();
934        for r in raw_results {
935            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
936            ordinals.push((r.ordinal as u32, r.score));
937        }
938
939        // Combine scores and build results with ordinal tracking
940        let mut results: Vec<VectorSearchResult> = doc_ordinals
941            .into_iter()
942            .map(|(doc_id, ordinals)| {
943                let combined_score = combiner.combine(&ordinals);
944                VectorSearchResult::new(doc_id, combined_score, ordinals)
945            })
946            .collect();
947
948        // Sort by score descending and take top limit
949        results.sort_by(|a, b| {
950            b.score
951                .partial_cmp(&a.score)
952                .unwrap_or(std::cmp::Ordering::Equal)
953        });
954        results.truncate(limit);
955
956        Ok(results)
957    }
958
959    /// Get positions for a term (for phrase queries)
960    ///
961    /// Position offsets are now embedded in TermInfo, so we first look up
962    /// the term to get its TermInfo, then use position_info() to get the offset.
963    pub async fn get_positions(
964        &self,
965        field: Field,
966        term: &[u8],
967    ) -> Result<Option<crate::structures::PositionPostingList>> {
968        // Get positions handle
969        let handle = match &self.positions_handle {
970            Some(h) => h,
971            None => return Ok(None),
972        };
973
974        // Build key: field_id + term
975        let mut key = Vec::with_capacity(4 + term.len());
976        key.extend_from_slice(&field.0.to_le_bytes());
977        key.extend_from_slice(term);
978
979        // Look up term in dictionary to get TermInfo with position offset
980        let term_info = match self.term_dict.get(&key).await? {
981            Some(info) => info,
982            None => return Ok(None),
983        };
984
985        // Get position offset from TermInfo
986        let (offset, length) = match term_info.position_info() {
987            Some((o, l)) => (o, l),
988            None => return Ok(None),
989        };
990
991        // Read the position data
992        let slice = handle.slice(offset..offset + length);
993        let data = slice.read_bytes().await?;
994
995        // Deserialize
996        let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
997
998        Ok(Some(pos_list))
999    }
1000
1001    /// Check if positions are available for a field
1002    pub fn has_positions(&self, field: Field) -> bool {
1003        // Check schema for position mode on this field
1004        if let Some(entry) = self.schema.get_field_entry(field) {
1005            entry.positions.is_some()
1006        } else {
1007            false
1008        }
1009    }
1010}
1011
1012/// Alias for AsyncSegmentReader
1013pub type SegmentReader = AsyncSegmentReader;