Skip to main content

hermes_core/segment/reader/
mod.rs

1//! Async segment reader with lazy loading
2
3mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8/// Memory statistics for a single segment
9#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11    /// Segment ID
12    pub segment_id: u128,
13    /// Number of documents in segment
14    pub num_docs: u32,
15    /// Term dictionary block cache bytes
16    pub term_dict_cache_bytes: usize,
17    /// Document store block cache bytes
18    pub store_cache_bytes: usize,
19    /// Sparse vector index bytes (in-memory posting lists)
20    pub sparse_index_bytes: usize,
21    /// Dense vector index bytes (cluster assignments, quantized codes)
22    pub dense_index_bytes: usize,
23    /// Bloom filter bytes
24    pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28    /// Total estimated memory for this segment
29    pub fn total_bytes(&self) -> usize {
30        self.term_dict_cache_bytes
31            + self.store_cache_bytes
32            + self.sparse_index_bytes
33            + self.dense_index_bytes
34            + self.bloom_filter_bytes
35    }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
45use crate::dsl::{Document, Field, Schema};
46use crate::structures::{
47    AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
48    RaBitQIndex, SSTableStats, TermInfo,
49};
50use crate::{DocId, Error, Result};
51
52use super::store::{AsyncStoreReader, RawStoreBlock};
53use super::types::{SegmentFiles, SegmentId, SegmentMeta};
54
55/// Async segment reader with lazy loading
56///
57/// - Term dictionary: only index loaded, blocks loaded on-demand
58/// - Postings: loaded on-demand per term via HTTP range requests
59/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
60pub struct AsyncSegmentReader {
61    meta: SegmentMeta,
62    /// Term dictionary with lazy block loading
63    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
64    /// Postings file handle - fetches ranges on demand
65    postings_handle: LazyFileHandle,
66    /// Document store with lazy block loading
67    store: Arc<AsyncStoreReader>,
68    schema: Arc<Schema>,
69    /// Base doc_id offset for this segment
70    doc_id_offset: DocId,
71    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
72    vector_indexes: FxHashMap<u32, VectorIndex>,
73    /// Shared coarse centroids for IVF search (loaded once)
74    coarse_centroids: Option<Arc<CoarseCentroids>>,
75    /// Sparse vector indexes per field
76    sparse_indexes: FxHashMap<u32, SparseIndex>,
77    /// Position file handle for phrase queries (lazy loading)
78    positions_handle: Option<LazyFileHandle>,
79}
80
81impl AsyncSegmentReader {
82    /// Open a segment with lazy loading
83    pub async fn open<D: Directory>(
84        dir: &D,
85        segment_id: SegmentId,
86        schema: Arc<Schema>,
87        doc_id_offset: DocId,
88        cache_blocks: usize,
89    ) -> Result<Self> {
90        let files = SegmentFiles::new(segment_id.0);
91
92        // Read metadata (small, always loaded)
93        let meta_slice = dir.open_read(&files.meta).await?;
94        let meta_bytes = meta_slice.read_bytes().await?;
95        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
96        debug_assert_eq!(meta.id, segment_id.0);
97
98        // Open term dictionary with lazy loading (fetches ranges on demand)
99        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
100        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
101
102        // Get postings file handle (lazy - fetches ranges on demand)
103        let postings_handle = dir.open_lazy(&files.postings).await?;
104
105        // Open store with lazy loading
106        let store_handle = dir.open_lazy(&files.store).await?;
107        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
108
109        // Load dense vector indexes from unified .vectors file
110        let (vector_indexes, coarse_centroids) =
111            loader::load_vectors_file(dir, &files, &schema).await?;
112
113        // Load sparse vector indexes from .sparse file
114        let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
115
116        // Open positions file handle (if exists) - offsets are now in TermInfo
117        let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
118
119        // Log segment loading stats (compact format: ~24 bytes per active dim in hashmap)
120        let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
121        let sparse_mem = sparse_dims * 24; // HashMap entry overhead
122        log::debug!(
123            "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, vectors={}",
124            segment_id.0,
125            meta.num_docs,
126            sparse_dims,
127            sparse_mem as f64 / 1024.0,
128            vector_indexes.len()
129        );
130
131        Ok(Self {
132            meta,
133            term_dict: Arc::new(term_dict),
134            postings_handle,
135            store: Arc::new(store),
136            schema,
137            doc_id_offset,
138            vector_indexes,
139            coarse_centroids,
140            sparse_indexes,
141            positions_handle,
142        })
143    }
144
145    pub fn meta(&self) -> &SegmentMeta {
146        &self.meta
147    }
148
149    pub fn num_docs(&self) -> u32 {
150        self.meta.num_docs
151    }
152
153    /// Get average field length for BM25F scoring
154    pub fn avg_field_len(&self, field: Field) -> f32 {
155        self.meta.avg_field_len(field)
156    }
157
158    pub fn doc_id_offset(&self) -> DocId {
159        self.doc_id_offset
160    }
161
162    /// Set the doc_id_offset (used for parallel segment loading)
163    pub fn set_doc_id_offset(&mut self, offset: DocId) {
164        self.doc_id_offset = offset;
165    }
166
167    pub fn schema(&self) -> &Schema {
168        &self.schema
169    }
170
171    /// Get sparse indexes for all fields
172    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
173        &self.sparse_indexes
174    }
175
176    /// Get vector indexes for all fields
177    pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
178        &self.vector_indexes
179    }
180
181    /// Get term dictionary stats for debugging
182    pub fn term_dict_stats(&self) -> SSTableStats {
183        self.term_dict.stats()
184    }
185
186    /// Estimate memory usage of this segment reader
187    pub fn memory_stats(&self) -> SegmentMemoryStats {
188        let term_dict_stats = self.term_dict.stats();
189
190        // Term dict cache: num_blocks * avg_block_size (estimate 4KB per cached block)
191        let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
192
193        // Store cache: similar estimate
194        let store_cache_bytes = self.store.cached_blocks() * 4096;
195
196        // Sparse index: each dimension has a posting list in memory
197        // Estimate: ~24 bytes per active dimension (HashMap entry overhead)
198        let sparse_index_bytes: usize = self
199            .sparse_indexes
200            .values()
201            .map(|s| s.num_dimensions() * 24)
202            .sum();
203
204        // Dense index: vectors are memory-mapped, but we track index structures
205        // RaBitQ/IVF indexes have cluster assignments in memory
206        let dense_index_bytes: usize = self
207            .vector_indexes
208            .values()
209            .map(|v| v.estimated_memory_bytes())
210            .sum();
211
212        SegmentMemoryStats {
213            segment_id: self.meta.id,
214            num_docs: self.meta.num_docs,
215            term_dict_cache_bytes,
216            store_cache_bytes,
217            sparse_index_bytes,
218            dense_index_bytes,
219            bloom_filter_bytes: term_dict_stats.bloom_filter_size,
220        }
221    }
222
223    /// Get posting list for a term (async - loads on demand)
224    ///
225    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
226    /// and no additional I/O is needed. For larger lists, reads from .post file.
227    pub async fn get_postings(
228        &self,
229        field: Field,
230        term: &[u8],
231    ) -> Result<Option<BlockPostingList>> {
232        log::debug!(
233            "SegmentReader::get_postings field={} term_len={}",
234            field.0,
235            term.len()
236        );
237
238        // Build key: field_id + term
239        let mut key = Vec::with_capacity(4 + term.len());
240        key.extend_from_slice(&field.0.to_le_bytes());
241        key.extend_from_slice(term);
242
243        // Look up in term dictionary
244        let term_info = match self.term_dict.get(&key).await? {
245            Some(info) => {
246                log::debug!("SegmentReader::get_postings found term_info");
247                info
248            }
249            None => {
250                log::debug!("SegmentReader::get_postings term not found");
251                return Ok(None);
252            }
253        };
254
255        // Check if posting list is inlined
256        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
257            // Build BlockPostingList from inline data (no I/O needed!)
258            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
259            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
260                posting_list.push(doc_id, tf);
261            }
262            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
263            return Ok(Some(block_list));
264        }
265
266        // External posting list - read from postings file handle (lazy - HTTP range request)
267        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
268            Error::Corruption("TermInfo has neither inline nor external data".to_string())
269        })?;
270
271        let start = posting_offset;
272        let end = start + posting_len as u64;
273
274        if end > self.postings_handle.len() {
275            return Err(Error::Corruption(
276                "Posting offset out of bounds".to_string(),
277            ));
278        }
279
280        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
281        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
282
283        Ok(Some(block_list))
284    }
285
286    /// Get document by local doc_id (async - loads on demand)
287    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
288        self.store
289            .get(local_doc_id, &self.schema)
290            .await
291            .map_err(Error::from)
292    }
293
294    /// Prefetch term dictionary blocks for a key range
295    pub async fn prefetch_terms(
296        &self,
297        field: Field,
298        start_term: &[u8],
299        end_term: &[u8],
300    ) -> Result<()> {
301        let mut start_key = Vec::with_capacity(4 + start_term.len());
302        start_key.extend_from_slice(&field.0.to_le_bytes());
303        start_key.extend_from_slice(start_term);
304
305        let mut end_key = Vec::with_capacity(4 + end_term.len());
306        end_key.extend_from_slice(&field.0.to_le_bytes());
307        end_key.extend_from_slice(end_term);
308
309        self.term_dict.prefetch_range(&start_key, &end_key).await?;
310        Ok(())
311    }
312
313    /// Check if store uses dictionary compression (incompatible with raw merging)
314    pub fn store_has_dict(&self) -> bool {
315        self.store.has_dict()
316    }
317
318    /// Get store reference for merge operations
319    pub fn store(&self) -> &super::store::AsyncStoreReader {
320        &self.store
321    }
322
323    /// Get raw store blocks for optimized merging
324    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
325        self.store.raw_blocks()
326    }
327
328    /// Get store data slice for raw block access
329    pub fn store_data_slice(&self) -> &LazyFileSlice {
330        self.store.data_slice()
331    }
332
333    /// Get all terms from this segment (for merge)
334    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
335        self.term_dict.all_entries().await.map_err(Error::from)
336    }
337
338    /// Get all terms with parsed field and term string (for statistics aggregation)
339    ///
340    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
341    /// Skips terms that aren't valid UTF-8.
342    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
343        let entries = self.term_dict.all_entries().await?;
344        let mut result = Vec::with_capacity(entries.len());
345
346        for (key, term_info) in entries {
347            // Key format: field_id (4 bytes little-endian) + term bytes
348            if key.len() > 4 {
349                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
350                let term_bytes = &key[4..];
351                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
352                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
353                }
354            }
355        }
356
357        Ok(result)
358    }
359
360    /// Get streaming iterator over term dictionary (for memory-efficient merge)
361    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
362        self.term_dict.iter()
363    }
364
365    /// Prefetch all term dictionary blocks in a single bulk I/O call.
366    ///
367    /// Call before merge iteration to eliminate per-block cache misses.
368    pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
369        self.term_dict
370            .prefetch_all_data_bulk()
371            .await
372            .map_err(crate::Error::from)
373    }
374
375    /// Read raw posting bytes at offset
376    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
377        let start = offset;
378        let end = start + len as u64;
379        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
380        Ok(bytes.to_vec())
381    }
382
383    /// Read raw position bytes at offset (for merge)
384    pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
385        let handle = match &self.positions_handle {
386            Some(h) => h,
387            None => return Ok(None),
388        };
389        let start = offset;
390        let end = start + len as u64;
391        let bytes = handle.read_bytes_range(start..end).await?;
392        Ok(Some(bytes.to_vec()))
393    }
394
395    /// Check if this segment has a positions file
396    pub fn has_positions_file(&self) -> bool {
397        self.positions_handle.is_some()
398    }
399
400    /// Search dense vectors using RaBitQ
401    ///
402    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
403    /// The doc_ids are adjusted by doc_id_offset for this segment.
404    /// If mrl_dim is configured, the query vector is automatically trimmed.
405    /// For multi-valued documents, scores are combined using the specified combiner.
406    pub fn search_dense_vector(
407        &self,
408        field: Field,
409        query: &[f32],
410        k: usize,
411        nprobe: usize,
412        rerank_factor: usize,
413        combiner: crate::query::MultiValueCombiner,
414    ) -> Result<Vec<VectorSearchResult>> {
415        let index = match self.vector_indexes.get(&field.0) {
416            Some(idx) => idx,
417            None => return Ok(Vec::new()), // segment has no vectors for this field
418        };
419
420        // Get mrl_dim from config to trim query vector if needed
421        let mrl_dim = self
422            .schema
423            .get_field_entry(field)
424            .and_then(|e| e.dense_vector_config.as_ref())
425            .and_then(|c| c.mrl_dim);
426
427        // Trim query vector if mrl_dim is set
428        let query_vec: Vec<f32>;
429        let effective_query = if let Some(trim_dim) = mrl_dim {
430            if trim_dim < query.len() {
431                query_vec = query[..trim_dim].to_vec();
432                query_vec.as_slice()
433            } else {
434                query
435            }
436        } else {
437            query
438        };
439
440        // Results are (doc_id, ordinal, score) where score = similarity (higher = better)
441        let results: Vec<(u32, u16, f32)> = match index {
442            VectorIndex::Flat(flat_data) => {
443                // Brute-force search using cosine similarity
444                use crate::structures::simd::cosine_similarity;
445
446                let mut candidates: Vec<(u32, u16, f32)> = (0..flat_data.num_vectors())
447                    .map(|i| {
448                        let vec = flat_data.get_vector(i);
449                        let (doc_id, ordinal) = flat_data.get_doc_id(i);
450                        let score = cosine_similarity(effective_query, vec);
451                        (doc_id, ordinal, score)
452                    })
453                    .collect();
454                // Sort by score descending (higher similarity = better)
455                candidates
456                    .sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
457                // Use rerank_factor to fetch more candidates for multi-valued fields,
458                // so grouping by doc_id still yields enough unique documents
459                candidates.truncate(k * rerank_factor.max(1));
460                candidates
461            }
462            VectorIndex::RaBitQ(rabitq) => {
463                // RaBitQ returns (doc_id, ordinal, euclidean_dist) — convert to score
464                rabitq
465                    .search(effective_query, k, rerank_factor)
466                    .into_iter()
467                    .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
468                    .collect()
469            }
470            VectorIndex::IVF { index, codebook } => {
471                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
472                    Error::Schema("IVF index requires coarse centroids".to_string())
473                })?;
474                let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
475                // IVF returns euclidean distances — convert to scores
476                index
477                    .search(
478                        centroids,
479                        codebook,
480                        effective_query,
481                        k,
482                        Some(effective_nprobe),
483                    )
484                    .into_iter()
485                    .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
486                    .collect()
487            }
488            VectorIndex::ScaNN { index, codebook } => {
489                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
490                    Error::Schema("ScaNN index requires coarse centroids".to_string())
491                })?;
492                let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
493                // ScaNN returns euclidean distances — convert to scores
494                index
495                    .search(
496                        centroids,
497                        codebook,
498                        effective_query,
499                        k,
500                        Some(effective_nprobe),
501                    )
502                    .into_iter()
503                    .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
504                    .collect()
505            }
506        };
507
508        // Track ordinals with individual scores for each doc_id
509        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
510        let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
511            rustc_hash::FxHashMap::default();
512        for (doc_id, ordinal, score) in results {
513            let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
514            ordinals.push((ordinal as u32, score));
515        }
516
517        // Combine scores and build results with ordinal tracking
518        let mut final_results: Vec<VectorSearchResult> = doc_ordinals
519            .into_iter()
520            .map(|(doc_id, ordinals)| {
521                let combined_score = combiner.combine(&ordinals);
522                VectorSearchResult::new(doc_id, combined_score, ordinals)
523            })
524            .collect();
525
526        // Sort by score descending and take top k
527        final_results.sort_by(|a, b| {
528            b.score
529                .partial_cmp(&a.score)
530                .unwrap_or(std::cmp::Ordering::Equal)
531        });
532        final_results.truncate(k);
533
534        Ok(final_results)
535    }
536
537    /// Check if this segment has a dense vector index for the given field
538    pub fn has_dense_vector_index(&self, field: Field) -> bool {
539        self.vector_indexes.contains_key(&field.0)
540    }
541
542    /// Get the dense vector index for a field (if available)
543    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
544        match self.vector_indexes.get(&field.0) {
545            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
546            _ => None,
547        }
548    }
549
550    /// Get the IVF vector index for a field (if available)
551    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
552        match self.vector_indexes.get(&field.0) {
553            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
554            _ => None,
555        }
556    }
557
558    /// Get the ScaNN vector index for a field (if available)
559    pub fn get_scann_vector_index(
560        &self,
561        field: Field,
562    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
563        match self.vector_indexes.get(&field.0) {
564            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
565            _ => None,
566        }
567    }
568
569    /// Get the vector index type for a field
570    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
571        self.vector_indexes.get(&field.0)
572    }
573
574    /// Search for similar sparse vectors using dedicated sparse posting lists
575    ///
576    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
577    /// Optimizations (via WandExecutor):
578    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
579    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
580    /// 3. **Top-K heap**: Efficient score collection
581    ///
582    /// Returns VectorSearchResult with ordinal tracking for multi-value fields.
583    pub async fn search_sparse_vector(
584        &self,
585        field: Field,
586        vector: &[(u32, f32)],
587        limit: usize,
588        combiner: crate::query::MultiValueCombiner,
589        heap_factor: f32,
590    ) -> Result<Vec<VectorSearchResult>> {
591        use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
592
593        let query_tokens = vector.len();
594
595        // Get sparse index for this field
596        let sparse_index = match self.sparse_indexes.get(&field.0) {
597            Some(idx) => idx,
598            None => {
599                log::debug!(
600                    "Sparse vector search: no index for field {}, returning empty",
601                    field.0
602                );
603                return Ok(Vec::new());
604            }
605        };
606
607        let index_dimensions = sparse_index.num_dimensions();
608
609        // Build scorers for each dimension that exists in the index
610        // Load posting lists on-demand (lazy loading via mmap)
611        // Keep Arc references alive for the duration of scoring
612        let mut matched_tokens = Vec::new();
613        let mut missing_tokens = Vec::new();
614        let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
615            Vec::with_capacity(vector.len());
616
617        for &(dim_id, query_weight) in vector {
618            // Check if dimension exists before loading
619            if !sparse_index.has_dimension(dim_id) {
620                missing_tokens.push(dim_id);
621                continue;
622            }
623
624            // Load posting list on-demand (async, uses mmap)
625            match sparse_index.get_posting(dim_id).await? {
626                Some(pl) => {
627                    matched_tokens.push(dim_id);
628                    posting_lists.push((dim_id, query_weight, pl));
629                }
630                None => {
631                    missing_tokens.push(dim_id);
632                }
633            }
634        }
635
636        // Create scorers from the loaded posting lists (borrows from posting_lists)
637        let scorers: Vec<SparseTermScorer> = posting_lists
638            .iter()
639            .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
640            .collect();
641
642        log::debug!(
643            "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
644            query_tokens,
645            matched_tokens.len(),
646            missing_tokens.len(),
647            index_dimensions
648        );
649
650        // Log query tokens with their IDs and weights
651        if log::log_enabled!(log::Level::Debug) {
652            let query_details: Vec<_> = vector
653                .iter()
654                .take(30)
655                .map(|(id, w)| format!("{}:{:.3}", id, w))
656                .collect();
657            log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
658        }
659
660        if !matched_tokens.is_empty() {
661            log::debug!(
662                "Matched token IDs: {:?}",
663                matched_tokens.iter().take(20).collect::<Vec<_>>()
664            );
665        }
666
667        if !missing_tokens.is_empty() {
668            log::debug!(
669                "Missing token IDs (not in index): {:?}",
670                missing_tokens.iter().take(20).collect::<Vec<_>>()
671            );
672        }
673
674        if scorers.is_empty() {
675            log::debug!("Sparse vector search: no matching tokens, returning empty");
676            return Ok(Vec::new());
677        }
678
679        // Select executor based on number of query terms:
680        // - 12+ terms: BMP (block-at-a-time, best for SPLADE expansions)
681        // - 1-11 terms: BlockMaxScoreExecutor (unified MaxScore + block-max + conjunction)
682        let num_terms = scorers.len();
683        let over_fetch = limit * 2; // Over-fetch for multi-value combining
684        let raw_results = if num_terms > 12 {
685            // BMP: use posting lists directly (not iterators)
686            let pl_refs: Vec<_> = posting_lists
687                .iter()
688                .map(|(_, _, pl)| Arc::clone(pl))
689                .collect();
690            let weights: Vec<_> = posting_lists.iter().map(|(_, qw, _)| *qw).collect();
691            drop(scorers); // Release borrowing iterators before using posting_lists
692            BmpExecutor::new(pl_refs, weights, over_fetch, heap_factor).execute()
693        } else {
694            BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
695        };
696
697        log::trace!(
698            "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
699            raw_results.len(),
700            self.doc_id_offset
701        );
702        if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
703            for r in raw_results.iter().take(5) {
704                log::trace!(
705                    "  Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
706                    r.doc_id,
707                    r.doc_id + self.doc_id_offset,
708                    r.score,
709                    r.ordinal
710                );
711            }
712        }
713
714        // Track ordinals with individual scores for each doc_id
715        // Now using real ordinals from the posting lists
716        let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
717            rustc_hash::FxHashMap::default();
718        for r in raw_results {
719            let ordinals = doc_ordinals.entry(r.doc_id).or_default();
720            ordinals.push((r.ordinal as u32, r.score));
721        }
722
723        // Combine scores and build results with ordinal tracking
724        // Note: doc_id_offset is NOT applied here - the collector applies it uniformly
725        let mut results: Vec<VectorSearchResult> = doc_ordinals
726            .into_iter()
727            .map(|(doc_id, ordinals)| {
728                let combined_score = combiner.combine(&ordinals);
729                VectorSearchResult::new(doc_id, combined_score, ordinals)
730            })
731            .collect();
732
733        // Sort by score descending and take top limit
734        results.sort_by(|a, b| {
735            b.score
736                .partial_cmp(&a.score)
737                .unwrap_or(std::cmp::Ordering::Equal)
738        });
739        results.truncate(limit);
740
741        Ok(results)
742    }
743
744    /// Get positions for a term (for phrase queries)
745    ///
746    /// Position offsets are now embedded in TermInfo, so we first look up
747    /// the term to get its TermInfo, then use position_info() to get the offset.
748    pub async fn get_positions(
749        &self,
750        field: Field,
751        term: &[u8],
752    ) -> Result<Option<crate::structures::PositionPostingList>> {
753        use std::io::Cursor;
754
755        // Get positions handle
756        let handle = match &self.positions_handle {
757            Some(h) => h,
758            None => return Ok(None),
759        };
760
761        // Build key: field_id + term
762        let mut key = Vec::with_capacity(4 + term.len());
763        key.extend_from_slice(&field.0.to_le_bytes());
764        key.extend_from_slice(term);
765
766        // Look up term in dictionary to get TermInfo with position offset
767        let term_info = match self.term_dict.get(&key).await? {
768            Some(info) => info,
769            None => return Ok(None),
770        };
771
772        // Get position offset from TermInfo
773        let (offset, length) = match term_info.position_info() {
774            Some((o, l)) => (o, l),
775            None => return Ok(None),
776        };
777
778        // Read the position data
779        let slice = handle.slice(offset..offset + length as u64);
780        let data = slice.read_bytes().await?;
781
782        // Deserialize
783        let mut cursor = Cursor::new(data.as_slice());
784        let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
785
786        Ok(Some(pos_list))
787    }
788
789    /// Check if positions are available for a field
790    pub fn has_positions(&self, field: Field) -> bool {
791        // Check schema for position mode on this field
792        if let Some(entry) = self.schema.get_field_entry(field) {
793            entry.positions.is_some()
794        } else {
795            false
796        }
797    }
798}
799
800/// Alias for AsyncSegmentReader
801pub type SegmentReader = AsyncSegmentReader;