1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: LazyFileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 doc_id_offset: DocId,
72 vector_indexes: FxHashMap<u32, VectorIndex>,
74 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78 sparse_indexes: FxHashMap<u32, SparseIndex>,
80 positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 doc_id_offset: DocId,
91 cache_blocks: usize,
92 ) -> Result<Self> {
93 let files = SegmentFiles::new(segment_id.0);
94
95 let meta_slice = dir.open_read(&files.meta).await?;
97 let meta_bytes = meta_slice.read_bytes().await?;
98 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99 debug_assert_eq!(meta.id, segment_id.0);
100
101 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105 let postings_handle = dir.open_lazy(&files.postings).await?;
107
108 let store_handle = dir.open_lazy(&files.store).await?;
110 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114 let vector_indexes = vectors_data.indexes;
115 let flat_vectors = vectors_data.flat_vectors;
116
117 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123 {
125 let mut parts = vec![format!(
126 "[segment] loaded {:016x}: docs={}",
127 segment_id.0, meta.num_docs
128 )];
129 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
130 parts.push(format!(
131 "dense: {} ann + {} flat fields",
132 vector_indexes.len(),
133 flat_vectors.len()
134 ));
135 }
136 for (field_id, idx) in &sparse_indexes {
137 parts.push(format!(
138 "sparse field {}: {} dims, ~{:.1} KB",
139 field_id,
140 idx.num_dimensions(),
141 idx.num_dimensions() as f64 * 24.0 / 1024.0
142 ));
143 }
144 log::debug!("{}", parts.join(", "));
145 }
146
147 Ok(Self {
148 meta,
149 term_dict: Arc::new(term_dict),
150 postings_handle,
151 store: Arc::new(store),
152 schema,
153 doc_id_offset,
154 vector_indexes,
155 flat_vectors,
156 coarse_centroids: FxHashMap::default(),
157 sparse_indexes,
158 positions_handle,
159 })
160 }
161
162 pub fn meta(&self) -> &SegmentMeta {
163 &self.meta
164 }
165
166 pub fn num_docs(&self) -> u32 {
167 self.meta.num_docs
168 }
169
170 pub fn avg_field_len(&self, field: Field) -> f32 {
172 self.meta.avg_field_len(field)
173 }
174
175 pub fn doc_id_offset(&self) -> DocId {
176 self.doc_id_offset
177 }
178
179 pub fn set_doc_id_offset(&mut self, offset: DocId) {
181 self.doc_id_offset = offset;
182 }
183
184 pub fn schema(&self) -> &Schema {
185 &self.schema
186 }
187
188 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
190 &self.sparse_indexes
191 }
192
193 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
195 &self.vector_indexes
196 }
197
198 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
200 &self.flat_vectors
201 }
202
203 pub fn term_dict_stats(&self) -> SSTableStats {
205 self.term_dict.stats()
206 }
207
208 pub fn memory_stats(&self) -> SegmentMemoryStats {
210 let term_dict_stats = self.term_dict.stats();
211
212 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
214
215 let store_cache_bytes = self.store.cached_blocks() * 4096;
217
218 let sparse_index_bytes: usize = self
221 .sparse_indexes
222 .values()
223 .map(|s| s.num_dimensions() * 24)
224 .sum();
225
226 let dense_index_bytes: usize = self
229 .vector_indexes
230 .values()
231 .map(|v| v.estimated_memory_bytes())
232 .sum();
233
234 SegmentMemoryStats {
235 segment_id: self.meta.id,
236 num_docs: self.meta.num_docs,
237 term_dict_cache_bytes,
238 store_cache_bytes,
239 sparse_index_bytes,
240 dense_index_bytes,
241 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
242 }
243 }
244
245 pub async fn get_postings(
250 &self,
251 field: Field,
252 term: &[u8],
253 ) -> Result<Option<BlockPostingList>> {
254 log::debug!(
255 "SegmentReader::get_postings field={} term_len={}",
256 field.0,
257 term.len()
258 );
259
260 let mut key = Vec::with_capacity(4 + term.len());
262 key.extend_from_slice(&field.0.to_le_bytes());
263 key.extend_from_slice(term);
264
265 let term_info = match self.term_dict.get(&key).await? {
267 Some(info) => {
268 log::debug!("SegmentReader::get_postings found term_info");
269 info
270 }
271 None => {
272 log::debug!("SegmentReader::get_postings term not found");
273 return Ok(None);
274 }
275 };
276
277 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
279 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
281 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
282 posting_list.push(doc_id, tf);
283 }
284 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
285 return Ok(Some(block_list));
286 }
287
288 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
290 Error::Corruption("TermInfo has neither inline nor external data".to_string())
291 })?;
292
293 let start = posting_offset;
294 let end = start + posting_len as u64;
295
296 if end > self.postings_handle.len() {
297 return Err(Error::Corruption(
298 "Posting offset out of bounds".to_string(),
299 ));
300 }
301
302 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
303 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
304
305 Ok(Some(block_list))
306 }
307
308 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
313 self.doc_with_fields(local_doc_id, None).await
314 }
315
316 pub async fn doc_with_fields(
322 &self,
323 local_doc_id: DocId,
324 fields: Option<&rustc_hash::FxHashSet<u32>>,
325 ) -> Result<Option<Document>> {
326 let mut doc = match self.store.get(local_doc_id, &self.schema).await {
327 Ok(Some(d)) => d,
328 Ok(None) => return Ok(None),
329 Err(e) => return Err(Error::from(e)),
330 };
331
332 for (&field_id, lazy_flat) in &self.flat_vectors {
334 if let Some(set) = fields
336 && !set.contains(&field_id)
337 {
338 continue;
339 }
340
341 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
342 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
343 let flat_idx = start + j;
344 match lazy_flat.get_vector(flat_idx).await {
345 Ok(vec) => {
346 doc.add_dense_vector(Field(field_id), vec);
347 }
348 Err(e) => {
349 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
350 }
351 }
352 }
353 }
354
355 Ok(Some(doc))
356 }
357
358 pub async fn prefetch_terms(
360 &self,
361 field: Field,
362 start_term: &[u8],
363 end_term: &[u8],
364 ) -> Result<()> {
365 let mut start_key = Vec::with_capacity(4 + start_term.len());
366 start_key.extend_from_slice(&field.0.to_le_bytes());
367 start_key.extend_from_slice(start_term);
368
369 let mut end_key = Vec::with_capacity(4 + end_term.len());
370 end_key.extend_from_slice(&field.0.to_le_bytes());
371 end_key.extend_from_slice(end_term);
372
373 self.term_dict.prefetch_range(&start_key, &end_key).await?;
374 Ok(())
375 }
376
377 pub fn store_has_dict(&self) -> bool {
379 self.store.has_dict()
380 }
381
382 pub fn store(&self) -> &super::store::AsyncStoreReader {
384 &self.store
385 }
386
387 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
389 self.store.raw_blocks()
390 }
391
392 pub fn store_data_slice(&self) -> &LazyFileSlice {
394 self.store.data_slice()
395 }
396
397 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
399 self.term_dict.all_entries().await.map_err(Error::from)
400 }
401
402 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
407 let entries = self.term_dict.all_entries().await?;
408 let mut result = Vec::with_capacity(entries.len());
409
410 for (key, term_info) in entries {
411 if key.len() > 4 {
413 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
414 let term_bytes = &key[4..];
415 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
416 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
417 }
418 }
419 }
420
421 Ok(result)
422 }
423
424 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
426 self.term_dict.iter()
427 }
428
429 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
433 self.term_dict
434 .prefetch_all_data_bulk()
435 .await
436 .map_err(crate::Error::from)
437 }
438
439 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
441 let start = offset;
442 let end = start + len as u64;
443 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
444 Ok(bytes.to_vec())
445 }
446
447 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
449 let handle = match &self.positions_handle {
450 Some(h) => h,
451 None => return Ok(None),
452 };
453 let start = offset;
454 let end = start + len as u64;
455 let bytes = handle.read_bytes_range(start..end).await?;
456 Ok(Some(bytes.to_vec()))
457 }
458
459 pub fn has_positions_file(&self) -> bool {
461 self.positions_handle.is_some()
462 }
463
464 fn score_quantized_batch(
470 query: &[f32],
471 raw: &[u8],
472 quant: crate::dsl::DenseVectorQuantization,
473 dim: usize,
474 scores: &mut [f32],
475 ) {
476 match quant {
477 crate::dsl::DenseVectorQuantization::F32 => {
478 let num_floats = scores.len() * dim;
479 debug_assert!(
480 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
481 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
482 );
483 let vectors: &[f32] =
484 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
485 crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
486 }
487 crate::dsl::DenseVectorQuantization::F16 => {
488 crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
489 }
490 crate::dsl::DenseVectorQuantization::UInt8 => {
491 crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
492 }
493 }
494 }
495
496 pub async fn search_dense_vector(
502 &self,
503 field: Field,
504 query: &[f32],
505 k: usize,
506 nprobe: usize,
507 rerank_factor: usize,
508 combiner: crate::query::MultiValueCombiner,
509 ) -> Result<Vec<VectorSearchResult>> {
510 let ann_index = self.vector_indexes.get(&field.0);
511 let lazy_flat = self.flat_vectors.get(&field.0);
512
513 if ann_index.is_none() && lazy_flat.is_none() {
515 return Ok(Vec::new());
516 }
517
518 const BRUTE_FORCE_BATCH: usize = 4096;
520
521 let t0 = std::time::Instant::now();
523 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
524 match index {
526 VectorIndex::RaBitQ(rabitq) => {
527 let fetch_k = k * rerank_factor.max(1);
528 rabitq
529 .search(query, fetch_k, rerank_factor)
530 .into_iter()
531 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
532 .collect()
533 }
534 VectorIndex::IVF(lazy) => {
535 let (index, codebook) = lazy.get().ok_or_else(|| {
536 Error::Schema("IVF index deserialization failed".to_string())
537 })?;
538 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
539 Error::Schema(format!(
540 "IVF index requires coarse centroids for field {}",
541 field.0
542 ))
543 })?;
544 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
545 let fetch_k = k * rerank_factor.max(1);
546 index
547 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
548 .into_iter()
549 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
550 .collect()
551 }
552 VectorIndex::ScaNN(lazy) => {
553 let (index, codebook) = lazy.get().ok_or_else(|| {
554 Error::Schema("ScaNN index deserialization failed".to_string())
555 })?;
556 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
557 Error::Schema(format!(
558 "ScaNN index requires coarse centroids for field {}",
559 field.0
560 ))
561 })?;
562 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
563 let fetch_k = k * rerank_factor.max(1);
564 index
565 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
566 .into_iter()
567 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
568 .collect()
569 }
570 }
571 } else if let Some(lazy_flat) = lazy_flat {
572 log::debug!(
575 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
576 field.0,
577 lazy_flat.num_vectors,
578 lazy_flat.dim,
579 lazy_flat.quantization
580 );
581 let dim = lazy_flat.dim;
582 let n = lazy_flat.num_vectors;
583 let quant = lazy_flat.quantization;
584 let fetch_k = k * rerank_factor.max(1);
585 let mut collector = crate::query::ScoreCollector::new(fetch_k);
586 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
587
588 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
589 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
590 let batch_bytes = lazy_flat
591 .read_vectors_batch(batch_start, batch_count)
592 .await
593 .map_err(crate::Error::Io)?;
594 let raw = batch_bytes.as_slice();
595
596 Self::score_quantized_batch(query, raw, quant, dim, &mut scores[..batch_count]);
597
598 for (i, &score) in scores.iter().enumerate().take(batch_count) {
599 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
600 collector.insert_with_ordinal(doc_id, score, ordinal);
601 }
602 }
603
604 collector
605 .into_sorted_results()
606 .into_iter()
607 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
608 .collect()
609 } else {
610 return Ok(Vec::new());
611 };
612 let l1_elapsed = t0.elapsed();
613 log::debug!(
614 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
615 field.0,
616 results.len(),
617 l1_elapsed.as_secs_f64() * 1000.0
618 );
619
620 if ann_index.is_some()
623 && !results.is_empty()
624 && let Some(lazy_flat) = lazy_flat
625 {
626 let t_rerank = std::time::Instant::now();
627 let dim = lazy_flat.dim;
628 let quant = lazy_flat.quantization;
629 let vbs = lazy_flat.vector_byte_size();
630
631 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
634 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
635 for (j, &(_, ord)) in entries.iter().enumerate() {
636 if ord == c.1 {
637 resolved.push((ri, start + j));
638 break;
639 }
640 }
641 }
642
643 let t_resolve = t_rerank.elapsed();
644 if !resolved.is_empty() {
645 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
647
648 let t_read = std::time::Instant::now();
650 let mut raw_buf = vec![0u8; resolved.len() * vbs];
651 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
652 let _ = lazy_flat
653 .read_vector_raw_into(
654 flat_idx,
655 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
656 )
657 .await;
658 }
659
660 let read_elapsed = t_read.elapsed();
661
662 let t_score = std::time::Instant::now();
664 let mut scores = vec![0f32; resolved.len()];
665 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
666 let score_elapsed = t_score.elapsed();
667
668 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
670 results[ri].2 = scores[buf_idx];
671 }
672
673 log::debug!(
674 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
675 field.0,
676 resolved.len(),
677 dim,
678 quant,
679 vbs,
680 t_resolve.as_secs_f64() * 1000.0,
681 read_elapsed.as_secs_f64() * 1000.0,
682 score_elapsed.as_secs_f64() * 1000.0,
683 );
684 }
685
686 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
687 results.truncate(k * rerank_factor.max(1));
688 log::debug!(
689 "[search_dense] field {}: rerank total={:.1}ms",
690 field.0,
691 t_rerank.elapsed().as_secs_f64() * 1000.0
692 );
693 }
694
695 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
698 rustc_hash::FxHashMap::default();
699 for (doc_id, ordinal, score) in results {
700 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
701 ordinals.push((ordinal as u32, score));
702 }
703
704 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
706 .into_iter()
707 .map(|(doc_id, ordinals)| {
708 let combined_score = combiner.combine(&ordinals);
709 VectorSearchResult::new(doc_id, combined_score, ordinals)
710 })
711 .collect();
712
713 final_results.sort_by(|a, b| {
715 b.score
716 .partial_cmp(&a.score)
717 .unwrap_or(std::cmp::Ordering::Equal)
718 });
719 final_results.truncate(k);
720
721 Ok(final_results)
722 }
723
724 pub fn has_dense_vector_index(&self, field: Field) -> bool {
726 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
727 }
728
729 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
731 match self.vector_indexes.get(&field.0) {
732 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
733 _ => None,
734 }
735 }
736
737 pub fn get_ivf_vector_index(
739 &self,
740 field: Field,
741 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
742 match self.vector_indexes.get(&field.0) {
743 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
744 _ => None,
745 }
746 }
747
748 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
750 self.coarse_centroids.get(&field_id)
751 }
752
753 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
755 self.coarse_centroids = centroids;
756 }
757
758 pub fn get_scann_vector_index(
760 &self,
761 field: Field,
762 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
763 match self.vector_indexes.get(&field.0) {
764 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
765 _ => None,
766 }
767 }
768
769 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
771 self.vector_indexes.get(&field.0)
772 }
773
774 pub async fn search_sparse_vector(
784 &self,
785 field: Field,
786 vector: &[(u32, f32)],
787 limit: usize,
788 combiner: crate::query::MultiValueCombiner,
789 heap_factor: f32,
790 ) -> Result<Vec<VectorSearchResult>> {
791 use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
792
793 let query_tokens = vector.len();
794
795 let sparse_index = match self.sparse_indexes.get(&field.0) {
797 Some(idx) => idx,
798 None => {
799 log::debug!(
800 "Sparse vector search: no index for field {}, returning empty",
801 field.0
802 );
803 return Ok(Vec::new());
804 }
805 };
806
807 let index_dimensions = sparse_index.num_dimensions();
808
809 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
811 let mut missing_count = 0usize;
812
813 for &(dim_id, query_weight) in vector {
814 if sparse_index.has_dimension(dim_id) {
815 matched_terms.push((dim_id, query_weight));
816 } else {
817 missing_count += 1;
818 }
819 }
820
821 log::debug!(
822 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
823 query_tokens,
824 matched_terms.len(),
825 missing_count,
826 index_dimensions
827 );
828
829 if matched_terms.is_empty() {
830 log::debug!("Sparse vector search: no matching tokens, returning empty");
831 return Ok(Vec::new());
832 }
833
834 let num_terms = matched_terms.len();
838 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
840 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
842 .execute()
843 .await?
844 } else {
845 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
847 Vec::with_capacity(num_terms);
848 for &(dim_id, query_weight) in &matched_terms {
849 if let Some(pl) = sparse_index.get_posting(dim_id).await? {
850 posting_lists.push((dim_id, query_weight, pl));
851 }
852 }
853 let scorers: Vec<SparseTermScorer> = posting_lists
854 .iter()
855 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
856 .collect();
857 if scorers.is_empty() {
858 return Ok(Vec::new());
859 }
860 BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
861 };
862
863 log::trace!(
864 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
865 raw_results.len(),
866 self.doc_id_offset
867 );
868 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
869 for r in raw_results.iter().take(5) {
870 log::trace!(
871 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
872 r.doc_id,
873 r.doc_id + self.doc_id_offset,
874 r.score,
875 r.ordinal
876 );
877 }
878 }
879
880 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
883 rustc_hash::FxHashMap::default();
884 for r in raw_results {
885 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
886 ordinals.push((r.ordinal as u32, r.score));
887 }
888
889 let mut results: Vec<VectorSearchResult> = doc_ordinals
892 .into_iter()
893 .map(|(doc_id, ordinals)| {
894 let combined_score = combiner.combine(&ordinals);
895 VectorSearchResult::new(doc_id, combined_score, ordinals)
896 })
897 .collect();
898
899 results.sort_by(|a, b| {
901 b.score
902 .partial_cmp(&a.score)
903 .unwrap_or(std::cmp::Ordering::Equal)
904 });
905 results.truncate(limit);
906
907 Ok(results)
908 }
909
910 pub async fn get_positions(
915 &self,
916 field: Field,
917 term: &[u8],
918 ) -> Result<Option<crate::structures::PositionPostingList>> {
919 let handle = match &self.positions_handle {
921 Some(h) => h,
922 None => return Ok(None),
923 };
924
925 let mut key = Vec::with_capacity(4 + term.len());
927 key.extend_from_slice(&field.0.to_le_bytes());
928 key.extend_from_slice(term);
929
930 let term_info = match self.term_dict.get(&key).await? {
932 Some(info) => info,
933 None => return Ok(None),
934 };
935
936 let (offset, length) = match term_info.position_info() {
938 Some((o, l)) => (o, l),
939 None => return Ok(None),
940 };
941
942 let slice = handle.slice(offset..offset + length as u64);
944 let data = slice.read_bytes().await?;
945
946 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
948
949 Ok(Some(pos_list))
950 }
951
952 pub fn has_positions(&self, field: Field) -> bool {
954 if let Some(entry) = self.schema.get_field_entry(field) {
956 entry.positions.is_some()
957 } else {
958 false
959 }
960 }
961}
962
963pub type SegmentReader = AsyncSegmentReader;