Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49    RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56/// Async segment reader with lazy loading
57///
58/// - Term dictionary: only index loaded, blocks loaded on-demand
59/// - Postings: loaded on-demand per term via HTTP range requests
60/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
61pub struct AsyncSegmentReader {
62    meta: SegmentMeta,
63    /// Term dictionary with lazy block loading
64    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65    /// Postings file handle - fetches ranges on demand
66    postings_handle: LazyFileHandle,
67    /// Document store with lazy block loading
68    store: Arc<AsyncStoreReader>,
69    schema: Arc<Schema>,
70    /// Base doc_id offset for this segment
71    doc_id_offset: DocId,
72    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ) — for search
73    vector_indexes: FxHashMap<u32, VectorIndex>,
74    /// Lazy flat vectors per field — for reranking and merge (doc_ids in memory, vectors via mmap)
75    flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76    /// Shared coarse centroids for IVF search (loaded once)
77    coarse_centroids: Option<Arc<CoarseCentroids>>,
78    /// Sparse vector indexes per field
79    sparse_indexes: FxHashMap<u32, SparseIndex>,
80    /// Position file handle for phrase queries (lazy loading)
81    positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85    /// Open a segment with lazy loading
86    pub async fn open<D: Directory>(
87        dir: &D,
88        segment_id: SegmentId,
89        schema: Arc<Schema>,
90        doc_id_offset: DocId,
91        cache_blocks: usize,
92    ) -> Result<Self> {
93        let files = SegmentFiles::new(segment_id.0);
94
95        // Read metadata (small, always loaded)
96        let meta_slice = dir.open_read(&files.meta).await?;
97        let meta_bytes = meta_slice.read_bytes().await?;
98        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99        debug_assert_eq!(meta.id, segment_id.0);
100
101        // Open term dictionary with lazy loading (fetches ranges on demand)
102        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105        // Get postings file handle (lazy - fetches ranges on demand)
106        let postings_handle = dir.open_lazy(&files.postings).await?;
107
108        // Open store with lazy loading
109        let store_handle = dir.open_lazy(&files.store).await?;
110        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112        // Load dense vector indexes from unified .vectors file
113        let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114        let vector_indexes = vectors_data.indexes;
115        let flat_vectors = vectors_data.flat_vectors;
116        let coarse_centroids = vectors_data.coarse_centroids;
117
118        // Load sparse vector indexes from .sparse file
119        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
120
121        // Open positions file handle (if exists) - offsets are now in TermInfo
122        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
123
124        // Log segment loading stats (compact format: ~24 bytes per active dim in hashmap)
125        let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
126        let sparse_mem = sparse_dims * 24; // HashMap entry overhead
127        log::debug!(
128            "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
129            segment_id.0,
130            meta.num_docs,
131            sparse_dims,
132            sparse_mem as f64 / 1024.0,
133            flat_vectors.len(),
134            vector_indexes.len()
135        );
136
137        Ok(Self {
138            meta,
139            term_dict: Arc::new(term_dict),
140            postings_handle,
141            store: Arc::new(store),
142            schema,
143            doc_id_offset,
144            vector_indexes,
145            flat_vectors,
146            coarse_centroids,
147            sparse_indexes,
148            positions_handle,
149        })
150    }
151
152    pub fn meta(&self) -> &SegmentMeta {
153        &self.meta
154    }
155
156    pub fn num_docs(&self) -> u32 {
157        self.meta.num_docs
158    }
159
160    /// Get average field length for BM25F scoring
161    pub fn avg_field_len(&self, field: Field) -> f32 {
162        self.meta.avg_field_len(field)
163    }
164
165    pub fn doc_id_offset(&self) -> DocId {
166        self.doc_id_offset
167    }
168
169    /// Set the doc_id_offset (used for parallel segment loading)
170    pub fn set_doc_id_offset(&mut self, offset: DocId) {
171        self.doc_id_offset = offset;
172    }
173
174    pub fn schema(&self) -> &Schema {
175        &self.schema
176    }
177
178    /// Get sparse indexes for all fields
179    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
180        &self.sparse_indexes
181    }
182
183    /// Get vector indexes for all fields
184    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
185        &self.vector_indexes
186    }
187
188    /// Get lazy flat vectors for all fields (for reranking and merge)
189    pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
190        &self.flat_vectors
191    }
192
193    /// Get term dictionary stats for debugging
194    pub fn term_dict_stats(&self) -> SSTableStats {
195        self.term_dict.stats()
196    }
197
198    /// Estimate memory usage of this segment reader
199    pub fn memory_stats(&self) -> SegmentMemoryStats {
200        let term_dict_stats = self.term_dict.stats();
201
202        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
203        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
204
205        // Store cache: similar estimate
206        let store_cache_bytes = self.store.cached_blocks() * 4096;
207
208        // Sparse index: each dimension has a posting list in memory
209        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
210        let sparse_index_bytes: usize = self
211            .sparse_indexes
212            .values()
213            .map(|s| s.num_dimensions() * 24)
214            .sum();
215
216        // Dense index: vectors are memory-mapped, but we track index structures
217        // RaBitQ/IVF indexes have cluster assignments in memory
218        let dense_index_bytes: usize = self
219            .vector_indexes
220            .values()
221            .map(|v| v.estimated_memory_bytes())
222            .sum();
223
224        SegmentMemoryStats {
225            segment_id: self.meta.id,
226            num_docs: self.meta.num_docs,
227            term_dict_cache_bytes,
228            store_cache_bytes,
229            sparse_index_bytes,
230            dense_index_bytes,
231            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
232        }
233    }
234
235    /// Get posting list for a term (async - loads on demand)
236    ///
237    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
238    /// and no additional I/O is needed. For larger lists, reads from .post file.
239    pub async fn get_postings(
240        &self,
241        field: Field,
242        term: &[u8],
243    ) -> Result<Option<BlockPostingList>> {
244        log::debug!(
245            "SegmentReader::get_postings field={} term_len={}",
246            field.0,
247            term.len()
248        );
249
250        // Build key: field_id + term
251        let mut key = Vec::with_capacity(4 + term.len());
252        key.extend_from_slice(&field.0.to_le_bytes());
253        key.extend_from_slice(term);
254
255        // Look up in term dictionary
256        let term_info = match self.term_dict.get(&key).await? {
257            Some(info) => {
258                log::debug!("SegmentReader::get_postings found term_info");
259                info
260            }
261            None => {
262                log::debug!("SegmentReader::get_postings term not found");
263                return Ok(None);
264            }
265        };
266
267        // Check if posting list is inlined
268        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
269            // Build BlockPostingList from inline data (no I/O needed!)
270            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
271            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
272                posting_list.push(doc_id, tf);
273            }
274            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
275            return Ok(Some(block_list));
276        }
277
278        // External posting list - read from postings file handle (lazy - HTTP range request)
279        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
280            Error::Corruption("TermInfo has neither inline nor external data".to_string())
281        })?;
282
283        let start = posting_offset;
284        let end = start + posting_len as u64;
285
286        if end > self.postings_handle.len() {
287            return Err(Error::Corruption(
288                "Posting offset out of bounds".to_string(),
289            ));
290        }
291
292        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
293        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
294
295        Ok(Some(block_list))
296    }
297
298    /// Get document by local doc_id (async - loads on demand).
299    ///
300    /// Dense vector fields are hydrated from LazyFlatVectorData (not stored in .store).
301    /// Uses binary search on sorted doc_ids for O(log N) lookup.
302    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
303        let mut doc = match self.store.get(local_doc_id, &self.schema).await {
304            Ok(Some(d)) => d,
305            Ok(None) => return Ok(None),
306            Err(e) => return Err(Error::from(e)),
307        };
308
309        // Hydrate dense vector fields from flat vector data
310        for (&field_id, lazy_flat) in &self.flat_vectors {
311            let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
312            for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
313                let flat_idx = start + j;
314                match lazy_flat.get_vector(flat_idx).await {
315                    Ok(vec) => {
316                        doc.add_dense_vector(Field(field_id), vec);
317                    }
318                    Err(e) => {
319                        log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
320                    }
321                }
322            }
323        }
324
325        Ok(Some(doc))
326    }
327
328    /// Prefetch term dictionary blocks for a key range
329    pub async fn prefetch_terms(
330        &self,
331        field: Field,
332        start_term: &[u8],
333        end_term: &[u8],
334    ) -> Result<()> {
335        let mut start_key = Vec::with_capacity(4 + start_term.len());
336        start_key.extend_from_slice(&field.0.to_le_bytes());
337        start_key.extend_from_slice(start_term);
338
339        let mut end_key = Vec::with_capacity(4 + end_term.len());
340        end_key.extend_from_slice(&field.0.to_le_bytes());
341        end_key.extend_from_slice(end_term);
342
343        self.term_dict.prefetch_range(&start_key, &end_key).await?;
344        Ok(())
345    }
346
347    /// Check if store uses dictionary compression (incompatible with raw merging)
348    pub fn store_has_dict(&self) -> bool {
349        self.store.has_dict()
350    }
351
352    /// Get store reference for merge operations
353    pub fn store(&self) -> &super::store::AsyncStoreReader {
354        &self.store
355    }
356
357    /// Get raw store blocks for optimized merging
358    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
359        self.store.raw_blocks()
360    }
361
362    /// Get store data slice for raw block access
363    pub fn store_data_slice(&self) -> &LazyFileSlice {
364        self.store.data_slice()
365    }
366
367    /// Get all terms from this segment (for merge)
368    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
369        self.term_dict.all_entries().await.map_err(Error::from)
370    }
371
372    /// Get all terms with parsed field and term string (for statistics aggregation)
373    ///
374    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
375    /// Skips terms that aren't valid UTF-8.
376    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
377        let entries = self.term_dict.all_entries().await?;
378        let mut result = Vec::with_capacity(entries.len());
379
380        for (key, term_info) in entries {
381            // Key format: field_id (4 bytes little-endian) + term bytes
382            if key.len() > 4 {
383                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
384                let term_bytes = &key[4..];
385                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
386                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
387                }
388            }
389        }
390
391        Ok(result)
392    }
393
394    /// Get streaming iterator over term dictionary (for memory-efficient merge)
395    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
396        self.term_dict.iter()
397    }
398
399    /// Prefetch all term dictionary blocks in a single bulk I/O call.
400    ///
401    /// Call before merge iteration to eliminate per-block cache misses.
402    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
403        self.term_dict
404            .prefetch_all_data_bulk()
405            .await
406            .map_err(crate::Error::from)
407    }
408
409    /// Read raw posting bytes at offset
410    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
411        let start = offset;
412        let end = start + len as u64;
413        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
414        Ok(bytes.to_vec())
415    }
416
417    /// Read raw position bytes at offset (for merge)
418    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
419        let handle = match &self.positions_handle {
420            Some(h) => h,
421            None => return Ok(None),
422        };
423        let start = offset;
424        let end = start + len as u64;
425        let bytes = handle.read_bytes_range(start..end).await?;
426        Ok(Some(bytes.to_vec()))
427    }
428
429    /// Check if this segment has a positions file
430    pub fn has_positions_file(&self) -> bool {
431        self.positions_handle.is_some()
432    }
433
434    /// Search dense vectors using RaBitQ
435    ///
436    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
437    /// The doc_ids are adjusted by doc_id_offset for this segment.
438    /// For multi-valued documents, scores are combined using the specified combiner.
439    pub async fn search_dense_vector(
440        &self,
441        field: Field,
442        query: &[f32],
443        k: usize,
444        nprobe: usize,
445        rerank_factor: usize,
446        combiner: crate::query::MultiValueCombiner,
447    ) -> Result<Vec<VectorSearchResult>> {
448        let ann_index = self.vector_indexes.get(&field.0);
449        let lazy_flat = self.flat_vectors.get(&field.0);
450
451        // No vectors at all for this field
452        if ann_index.is_none() && lazy_flat.is_none() {
453            return Ok(Vec::new());
454        }
455
456        /// Batch size for brute-force scoring (4096 vectors × 768 dims × 4 bytes ≈ 12MB)
457        const BRUTE_FORCE_BATCH: usize = 4096;
458
459        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
460        let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
461            // ANN search (RaBitQ, IVF, ScaNN)
462            match index {
463                VectorIndex::RaBitQ(rabitq) => {
464                    let fetch_k = k * rerank_factor.max(1);
465                    rabitq
466                        .search(query, fetch_k, rerank_factor)
467                        .into_iter()
468                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
469                        .collect()
470                }
471                VectorIndex::IVF { index, codebook } => {
472                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
473                        Error::Schema("IVF index requires coarse centroids".to_string())
474                    })?;
475                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
476                    let fetch_k = k * rerank_factor.max(1);
477                    index
478                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
479                        .into_iter()
480                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
481                        .collect()
482                }
483                VectorIndex::ScaNN { index, codebook } => {
484                    let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
485                        Error::Schema("ScaNN index requires coarse centroids".to_string())
486                    })?;
487                    let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
488                    let fetch_k = k * rerank_factor.max(1);
489                    index
490                        .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
491                        .into_iter()
492                        .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
493                        .collect()
494                }
495            }
496        } else if let Some(lazy_flat) = lazy_flat {
497            // Batched brute-force from lazy flat vectors (native-precision SIMD scoring)
498            let dim = lazy_flat.dim;
499            let n = lazy_flat.num_vectors;
500            let quant = lazy_flat.quantization;
501            let fetch_k = k * rerank_factor.max(1);
502            let mut candidates: Vec<(u32, u16, f32)> = Vec::new();
503
504            for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
505                let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
506                let batch_bytes = lazy_flat
507                    .read_vectors_batch(batch_start, batch_count)
508                    .await
509                    .map_err(crate::Error::Io)?;
510                let raw = batch_bytes.as_slice();
511
512                let mut scores = vec![0f32; batch_count];
513
514                match quant {
515                    crate::dsl::DenseVectorQuantization::F32 => {
516                        let batch_floats = batch_count * dim;
517                        let mut aligned_buf: Vec<f32> = Vec::new();
518                        let vectors: &[f32] = if (raw.as_ptr() as usize)
519                            .is_multiple_of(std::mem::align_of::<f32>())
520                        {
521                            unsafe {
522                                std::slice::from_raw_parts(raw.as_ptr() as *const f32, batch_floats)
523                            }
524                        } else {
525                            aligned_buf.resize(batch_floats, 0.0);
526                            unsafe {
527                                std::ptr::copy_nonoverlapping(
528                                    raw.as_ptr(),
529                                    aligned_buf.as_mut_ptr() as *mut u8,
530                                    batch_floats * std::mem::size_of::<f32>(),
531                                );
532                            }
533                            &aligned_buf
534                        };
535                        crate::structures::simd::batch_cosine_scores(
536                            query,
537                            vectors,
538                            dim,
539                            &mut scores,
540                        );
541                    }
542                    crate::dsl::DenseVectorQuantization::F16 => {
543                        crate::structures::simd::batch_cosine_scores_f16(
544                            query,
545                            raw,
546                            dim,
547                            &mut scores,
548                        );
549                    }
550                    crate::dsl::DenseVectorQuantization::UInt8 => {
551                        crate::structures::simd::batch_cosine_scores_u8(
552                            query,
553                            raw,
554                            dim,
555                            &mut scores,
556                        );
557                    }
558                }
559
560                for (i, &score) in scores.iter().enumerate().take(batch_count) {
561                    let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
562                    candidates.push((doc_id, ordinal, score));
563                }
564            }
565
566            candidates.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
567            candidates.truncate(fetch_k);
568            candidates
569        } else {
570            return Ok(Vec::new());
571        };
572
573        // Rerank ANN candidates using raw vectors from lazy flat (binary search lookup)
574        // Uses native-precision SIMD scoring on quantized bytes — no dequantization overhead.
575        if ann_index.is_some()
576            && !results.is_empty()
577            && let Some(lazy_flat) = lazy_flat
578        {
579            let dim = lazy_flat.dim;
580            let quant = lazy_flat.quantization;
581            let vbs = lazy_flat.vector_byte_size();
582
583            // Resolve flat indexes for each candidate via binary search
584            let mut resolved: Vec<(usize, usize)> = Vec::new(); // (result_idx, flat_idx)
585            for (ri, c) in results.iter().enumerate() {
586                let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
587                for (j, &(_, ord)) in entries.iter().enumerate() {
588                    if ord == c.1 {
589                        resolved.push((ri, start + j));
590                        break;
591                    }
592                }
593            }
594
595            if !resolved.is_empty() {
596                // Batch-read raw quantized bytes into contiguous buffer
597                let mut raw_buf = vec![0u8; resolved.len() * vbs];
598                for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
599                    let _ = lazy_flat
600                        .read_vector_raw_into(
601                            flat_idx,
602                            &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
603                        )
604                        .await;
605                }
606
607                // Native-precision batch SIMD cosine scoring
608                let mut scores = vec![0f32; resolved.len()];
609                match quant {
610                    crate::dsl::DenseVectorQuantization::F32 => {
611                        let floats = resolved.len() * dim;
612                        let mut aligned_buf: Vec<f32> = Vec::new();
613                        let vectors: &[f32] = if (raw_buf.as_ptr() as usize)
614                            .is_multiple_of(std::mem::align_of::<f32>())
615                        {
616                            unsafe {
617                                std::slice::from_raw_parts(raw_buf.as_ptr() as *const f32, floats)
618                            }
619                        } else {
620                            aligned_buf.resize(floats, 0.0);
621                            unsafe {
622                                std::ptr::copy_nonoverlapping(
623                                    raw_buf.as_ptr(),
624                                    aligned_buf.as_mut_ptr() as *mut u8,
625                                    floats * std::mem::size_of::<f32>(),
626                                );
627                            }
628                            &aligned_buf
629                        };
630                        crate::structures::simd::batch_cosine_scores(
631                            query,
632                            vectors,
633                            dim,
634                            &mut scores,
635                        );
636                    }
637                    crate::dsl::DenseVectorQuantization::F16 => {
638                        crate::structures::simd::batch_cosine_scores_f16(
639                            query,
640                            &raw_buf,
641                            dim,
642                            &mut scores,
643                        );
644                    }
645                    crate::dsl::DenseVectorQuantization::UInt8 => {
646                        crate::structures::simd::batch_cosine_scores_u8(
647                            query,
648                            &raw_buf,
649                            dim,
650                            &mut scores,
651                        );
652                    }
653                }
654
655                // Write scores back to results
656                for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
657                    results[ri].2 = scores[buf_idx];
658                }
659            }
660
661            results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
662            results.truncate(k * rerank_factor.max(1));
663        }
664
665        // Track ordinals with individual scores for each doc_id
666        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
667        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
668            rustc_hash::FxHashMap::default();
669        for (doc_id, ordinal, score) in results {
670            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
671            ordinals.push((ordinal as u32, score));
672        }
673
674        // Combine scores and build results with ordinal tracking
675        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
676            .into_iter()
677            .map(|(doc_id, ordinals)| {
678                let combined_score = combiner.combine(&ordinals);
679                VectorSearchResult::new(doc_id, combined_score, ordinals)
680            })
681            .collect();
682
683        // Sort by score descending and take top k
684        final_results.sort_by(|a, b| {
685            b.score
686                .partial_cmp(&a.score)
687                .unwrap_or(std::cmp::Ordering::Equal)
688        });
689        final_results.truncate(k);
690
691        Ok(final_results)
692    }
693
694    /// Check if this segment has dense vectors for the given field
695    pub fn has_dense_vector_index(&self, field: Field) -> bool {
696        self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
697    }
698
699    /// Get the dense vector index for a field (if available)
700    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
701        match self.vector_indexes.get(&field.0) {
702            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
703            _ => None,
704        }
705    }
706
707    /// Get the IVF vector index for a field (if available)
708    pub fn get_ivf_vector_index(
709        &self,
710        field: Field,
711    ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
712        match self.vector_indexes.get(&field.0) {
713            Some(VectorIndex::IVF { index, codebook }) => Some((index.clone(), codebook.clone())),
714            _ => None,
715        }
716    }
717
718    /// Get coarse centroids (shared across IVF/ScaNN indexes)
719    pub fn coarse_centroids(&self) -> Option<&Arc<CoarseCentroids>> {
720        self.coarse_centroids.as_ref()
721    }
722
723    /// Get the ScaNN vector index for a field (if available)
724    pub fn get_scann_vector_index(
725        &self,
726        field: Field,
727    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
728        match self.vector_indexes.get(&field.0) {
729            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
730            _ => None,
731        }
732    }
733
734    /// Get the vector index type for a field
735    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
736        self.vector_indexes.get(&field.0)
737    }
738
739    /// Search for similar sparse vectors using dedicated sparse posting lists
740    ///
741    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
742    /// Optimizations (via WandExecutor):
743    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
744    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
745    /// 3. **Top-K heap**: Efficient score collection
746    ///
747    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
748    pub async fn search_sparse_vector(
749        &self,
750        field: Field,
751        vector: &[(u32, f32)],
752        limit: usize,
753        combiner: crate::query::MultiValueCombiner,
754        heap_factor: f32,
755    ) -> Result<Vec<VectorSearchResult>> {
756        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
757
758        let query_tokens = vector.len();
759
760        // Get sparse index for this field
761        let sparse_index = match self.sparse_indexes.get(&field.0) {
762            Some(idx) => idx,
763            None => {
764                log::debug!(
765                    "Sparse vector search: no index for field {}, returning empty",
766                    field.0
767                );
768                return Ok(Vec::new());
769            }
770        };
771
772        let index_dimensions = sparse_index.num_dimensions();
773
774        // Filter query terms to only those present in the index
775        let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
776        let mut missing_tokens = Vec::new();
777
778        for &(dim_id, query_weight) in vector {
779            if sparse_index.has_dimension(dim_id) {
780                matched_terms.push((dim_id, query_weight));
781            } else {
782                missing_tokens.push(dim_id);
783            }
784        }
785
786        log::debug!(
787            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
788            query_tokens,
789            matched_terms.len(),
790            missing_tokens.len(),
791            index_dimensions
792        );
793
794        if log::log_enabled!(log::Level::Debug) {
795            let query_details: Vec<_> = vector
796                .iter()
797                .take(30)
798                .map(|(id, w)| format!("{}:{:.3}", id, w))
799                .collect();
800            log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
801        }
802
803        if !missing_tokens.is_empty() {
804            log::debug!(
805                "Missing token IDs (not in index): {:?}",
806                missing_tokens.iter().take(20).collect::<Vec<_>>()
807            );
808        }
809
810        if matched_terms.is_empty() {
811            log::debug!("Sparse vector search: no matching tokens, returning empty");
812            return Ok(Vec::new());
813        }
814
815        // Select executor based on number of query terms:
816        // - 12+ terms: BMP (block-at-a-time, lazy block loading, best for SPLADE)
817        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
818        let num_terms = matched_terms.len();
819        let over_fetch = limit * 2; // Over-fetch for multi-value combining
820        let raw_results = if num_terms > 12 {
821            // BMP: lazy block loading — only skip entries in memory, blocks loaded on-demand
822            BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
823                .execute()
824                .await?
825        } else {
826            // Load posting lists only for the few terms (1-11) used by BlockMaxScore
827            let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
828                Vec::with_capacity(num_terms);
829            for &(dim_id, query_weight) in &matched_terms {
830                if let Some(pl) = sparse_index.get_posting(dim_id).await? {
831                    posting_lists.push((dim_id, query_weight, pl));
832                }
833            }
834            let scorers: Vec<SparseTermScorer> = posting_lists
835                .iter()
836                .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
837                .collect();
838            if scorers.is_empty() {
839                return Ok(Vec::new());
840            }
841            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
842        };
843
844        log::trace!(
845            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
846            raw_results.len(),
847            self.doc_id_offset
848        );
849        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
850            for r in raw_results.iter().take(5) {
851                log::trace!(
852                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
853                    r.doc_id,
854                    r.doc_id + self.doc_id_offset,
855                    r.score,
856                    r.ordinal
857                );
858            }
859        }
860
861        // Track ordinals with individual scores for each doc_id
862        // Now using real ordinals from the posting lists
863        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
864            rustc_hash::FxHashMap::default();
865        for r in raw_results {
866            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
867            ordinals.push((r.ordinal as u32, r.score));
868        }
869
870        // Combine scores and build results with ordinal tracking
871        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
872        let mut results: Vec<VectorSearchResult> = doc_ordinals
873            .into_iter()
874            .map(|(doc_id, ordinals)| {
875                let combined_score = combiner.combine(&ordinals);
876                VectorSearchResult::new(doc_id, combined_score, ordinals)
877            })
878            .collect();
879
880        // Sort by score descending and take top limit
881        results.sort_by(|a, b| {
882            b.score
883                .partial_cmp(&a.score)
884                .unwrap_or(std::cmp::Ordering::Equal)
885        });
886        results.truncate(limit);
887
888        Ok(results)
889    }
890
891    /// Get positions for a term (for phrase queries)
892    ///
893    /// Position offsets are now embedded in TermInfo, so we first look up
894    /// the term to get its TermInfo, then use position_info() to get the offset.
895    pub async fn get_positions(
896        &self,
897        field: Field,
898        term: &[u8],
899    ) -> Result<Option<crate::structures::PositionPostingList>> {
900        use std::io::Cursor;
901
902        // Get positions handle
903        let handle = match &self.positions_handle {
904            Some(h) => h,
905            None => return Ok(None),
906        };
907
908        // Build key: field_id + term
909        let mut key = Vec::with_capacity(4 + term.len());
910        key.extend_from_slice(&field.0.to_le_bytes());
911        key.extend_from_slice(term);
912
913        // Look up term in dictionary to get TermInfo with position offset
914        let term_info = match self.term_dict.get(&key).await? {
915            Some(info) => info,
916            None => return Ok(None),
917        };
918
919        // Get position offset from TermInfo
920        let (offset, length) = match term_info.position_info() {
921            Some((o, l)) => (o, l),
922            None => return Ok(None),
923        };
924
925        // Read the position data
926        let slice = handle.slice(offset..offset + length as u64);
927        let data = slice.read_bytes().await?;
928
929        // Deserialize
930        let mut cursor = Cursor::new(data.as_slice());
931        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
932
933        Ok(Some(pos_list))
934    }
935
936    /// Check if positions are available for a field
937    pub fn has_positions(&self, field: Field) -> bool {
938        // Check schema for position mode on this field
939        if let Some(entry) = self.schema.get_field_entry(field) {
940            entry.positions.is_some()
941        } else {
942            false
943        }
944    }
945}
946
947/// Alias for AsyncSegmentReader
948pub type SegmentReader = AsyncSegmentReader;