1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: LazyFileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 doc_id_offset: DocId,
72 vector_indexes: FxHashMap<u32, VectorIndex>,
74 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78 sparse_indexes: FxHashMap<u32, SparseIndex>,
80 positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 doc_id_offset: DocId,
91 cache_blocks: usize,
92 ) -> Result<Self> {
93 let files = SegmentFiles::new(segment_id.0);
94
95 let meta_slice = dir.open_read(&files.meta).await?;
97 let meta_bytes = meta_slice.read_bytes().await?;
98 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99 debug_assert_eq!(meta.id, segment_id.0);
100
101 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105 let postings_handle = dir.open_lazy(&files.postings).await?;
107
108 let store_handle = dir.open_lazy(&files.store).await?;
110 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114 let vector_indexes = vectors_data.indexes;
115 let flat_vectors = vectors_data.flat_vectors;
116
117 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123 let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
125 let sparse_mem = sparse_dims * 24; log::debug!(
127 "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
128 segment_id.0,
129 meta.num_docs,
130 sparse_dims,
131 sparse_mem as f64 / 1024.0,
132 flat_vectors.len(),
133 vector_indexes.len()
134 );
135
136 Ok(Self {
137 meta,
138 term_dict: Arc::new(term_dict),
139 postings_handle,
140 store: Arc::new(store),
141 schema,
142 doc_id_offset,
143 vector_indexes,
144 flat_vectors,
145 coarse_centroids: FxHashMap::default(),
146 sparse_indexes,
147 positions_handle,
148 })
149 }
150
151 pub fn meta(&self) -> &SegmentMeta {
152 &self.meta
153 }
154
155 pub fn num_docs(&self) -> u32 {
156 self.meta.num_docs
157 }
158
159 pub fn avg_field_len(&self, field: Field) -> f32 {
161 self.meta.avg_field_len(field)
162 }
163
164 pub fn doc_id_offset(&self) -> DocId {
165 self.doc_id_offset
166 }
167
168 pub fn set_doc_id_offset(&mut self, offset: DocId) {
170 self.doc_id_offset = offset;
171 }
172
173 pub fn schema(&self) -> &Schema {
174 &self.schema
175 }
176
177 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
179 &self.sparse_indexes
180 }
181
182 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
184 &self.vector_indexes
185 }
186
187 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
189 &self.flat_vectors
190 }
191
192 pub fn term_dict_stats(&self) -> SSTableStats {
194 self.term_dict.stats()
195 }
196
197 pub fn memory_stats(&self) -> SegmentMemoryStats {
199 let term_dict_stats = self.term_dict.stats();
200
201 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
203
204 let store_cache_bytes = self.store.cached_blocks() * 4096;
206
207 let sparse_index_bytes: usize = self
210 .sparse_indexes
211 .values()
212 .map(|s| s.num_dimensions() * 24)
213 .sum();
214
215 let dense_index_bytes: usize = self
218 .vector_indexes
219 .values()
220 .map(|v| v.estimated_memory_bytes())
221 .sum();
222
223 SegmentMemoryStats {
224 segment_id: self.meta.id,
225 num_docs: self.meta.num_docs,
226 term_dict_cache_bytes,
227 store_cache_bytes,
228 sparse_index_bytes,
229 dense_index_bytes,
230 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
231 }
232 }
233
234 pub async fn get_postings(
239 &self,
240 field: Field,
241 term: &[u8],
242 ) -> Result<Option<BlockPostingList>> {
243 log::debug!(
244 "SegmentReader::get_postings field={} term_len={}",
245 field.0,
246 term.len()
247 );
248
249 let mut key = Vec::with_capacity(4 + term.len());
251 key.extend_from_slice(&field.0.to_le_bytes());
252 key.extend_from_slice(term);
253
254 let term_info = match self.term_dict.get(&key).await? {
256 Some(info) => {
257 log::debug!("SegmentReader::get_postings found term_info");
258 info
259 }
260 None => {
261 log::debug!("SegmentReader::get_postings term not found");
262 return Ok(None);
263 }
264 };
265
266 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
268 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
270 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
271 posting_list.push(doc_id, tf);
272 }
273 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
274 return Ok(Some(block_list));
275 }
276
277 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
279 Error::Corruption("TermInfo has neither inline nor external data".to_string())
280 })?;
281
282 let start = posting_offset;
283 let end = start + posting_len as u64;
284
285 if end > self.postings_handle.len() {
286 return Err(Error::Corruption(
287 "Posting offset out of bounds".to_string(),
288 ));
289 }
290
291 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
292 let block_list = BlockPostingList::deserialize(posting_bytes.as_slice())?;
293
294 Ok(Some(block_list))
295 }
296
297 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
302 let mut doc = match self.store.get(local_doc_id, &self.schema).await {
303 Ok(Some(d)) => d,
304 Ok(None) => return Ok(None),
305 Err(e) => return Err(Error::from(e)),
306 };
307
308 for (&field_id, lazy_flat) in &self.flat_vectors {
310 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
311 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
312 let flat_idx = start + j;
313 match lazy_flat.get_vector(flat_idx).await {
314 Ok(vec) => {
315 doc.add_dense_vector(Field(field_id), vec);
316 }
317 Err(e) => {
318 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
319 }
320 }
321 }
322 }
323
324 Ok(Some(doc))
325 }
326
327 pub async fn prefetch_terms(
329 &self,
330 field: Field,
331 start_term: &[u8],
332 end_term: &[u8],
333 ) -> Result<()> {
334 let mut start_key = Vec::with_capacity(4 + start_term.len());
335 start_key.extend_from_slice(&field.0.to_le_bytes());
336 start_key.extend_from_slice(start_term);
337
338 let mut end_key = Vec::with_capacity(4 + end_term.len());
339 end_key.extend_from_slice(&field.0.to_le_bytes());
340 end_key.extend_from_slice(end_term);
341
342 self.term_dict.prefetch_range(&start_key, &end_key).await?;
343 Ok(())
344 }
345
346 pub fn store_has_dict(&self) -> bool {
348 self.store.has_dict()
349 }
350
351 pub fn store(&self) -> &super::store::AsyncStoreReader {
353 &self.store
354 }
355
356 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
358 self.store.raw_blocks()
359 }
360
361 pub fn store_data_slice(&self) -> &LazyFileSlice {
363 self.store.data_slice()
364 }
365
366 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
368 self.term_dict.all_entries().await.map_err(Error::from)
369 }
370
371 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
376 let entries = self.term_dict.all_entries().await?;
377 let mut result = Vec::with_capacity(entries.len());
378
379 for (key, term_info) in entries {
380 if key.len() > 4 {
382 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
383 let term_bytes = &key[4..];
384 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
385 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
386 }
387 }
388 }
389
390 Ok(result)
391 }
392
393 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
395 self.term_dict.iter()
396 }
397
398 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
402 self.term_dict
403 .prefetch_all_data_bulk()
404 .await
405 .map_err(crate::Error::from)
406 }
407
408 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
410 let start = offset;
411 let end = start + len as u64;
412 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
413 Ok(bytes.to_vec())
414 }
415
416 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
418 let handle = match &self.positions_handle {
419 Some(h) => h,
420 None => return Ok(None),
421 };
422 let start = offset;
423 let end = start + len as u64;
424 let bytes = handle.read_bytes_range(start..end).await?;
425 Ok(Some(bytes.to_vec()))
426 }
427
428 pub fn has_positions_file(&self) -> bool {
430 self.positions_handle.is_some()
431 }
432
433 fn score_quantized_batch(
439 query: &[f32],
440 raw: &[u8],
441 quant: crate::dsl::DenseVectorQuantization,
442 dim: usize,
443 scores: &mut [f32],
444 ) {
445 match quant {
446 crate::dsl::DenseVectorQuantization::F32 => {
447 let num_floats = scores.len() * dim;
448 debug_assert!(
449 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
450 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
451 );
452 let vectors: &[f32] =
453 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
454 crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
455 }
456 crate::dsl::DenseVectorQuantization::F16 => {
457 crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
458 }
459 crate::dsl::DenseVectorQuantization::UInt8 => {
460 crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
461 }
462 }
463 }
464
465 pub async fn search_dense_vector(
471 &self,
472 field: Field,
473 query: &[f32],
474 k: usize,
475 nprobe: usize,
476 rerank_factor: usize,
477 combiner: crate::query::MultiValueCombiner,
478 ) -> Result<Vec<VectorSearchResult>> {
479 let ann_index = self.vector_indexes.get(&field.0);
480 let lazy_flat = self.flat_vectors.get(&field.0);
481
482 if ann_index.is_none() && lazy_flat.is_none() {
484 return Ok(Vec::new());
485 }
486
487 const BRUTE_FORCE_BATCH: usize = 4096;
489
490 let t0 = std::time::Instant::now();
492 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
493 match index {
495 VectorIndex::RaBitQ(rabitq) => {
496 let fetch_k = k * rerank_factor.max(1);
497 rabitq
498 .search(query, fetch_k, rerank_factor)
499 .into_iter()
500 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
501 .collect()
502 }
503 VectorIndex::IVF(lazy) => {
504 let (index, codebook) = lazy.get().ok_or_else(|| {
505 Error::Schema("IVF index deserialization failed".to_string())
506 })?;
507 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
508 Error::Schema(format!(
509 "IVF index requires coarse centroids for field {}",
510 field.0
511 ))
512 })?;
513 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
514 let fetch_k = k * rerank_factor.max(1);
515 index
516 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
517 .into_iter()
518 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
519 .collect()
520 }
521 VectorIndex::ScaNN(lazy) => {
522 let (index, codebook) = lazy.get().ok_or_else(|| {
523 Error::Schema("ScaNN index deserialization failed".to_string())
524 })?;
525 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
526 Error::Schema(format!(
527 "ScaNN index requires coarse centroids for field {}",
528 field.0
529 ))
530 })?;
531 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
532 let fetch_k = k * rerank_factor.max(1);
533 index
534 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
535 .into_iter()
536 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
537 .collect()
538 }
539 }
540 } else if let Some(lazy_flat) = lazy_flat {
541 log::debug!(
544 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
545 field.0,
546 lazy_flat.num_vectors,
547 lazy_flat.dim,
548 lazy_flat.quantization
549 );
550 let dim = lazy_flat.dim;
551 let n = lazy_flat.num_vectors;
552 let quant = lazy_flat.quantization;
553 let fetch_k = k * rerank_factor.max(1);
554 let mut collector = crate::query::ScoreCollector::new(fetch_k);
555
556 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
557 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
558 let batch_bytes = lazy_flat
559 .read_vectors_batch(batch_start, batch_count)
560 .await
561 .map_err(crate::Error::Io)?;
562 let raw = batch_bytes.as_slice();
563
564 let mut scores = vec![0f32; batch_count];
565 Self::score_quantized_batch(query, raw, quant, dim, &mut scores);
566
567 for (i, &score) in scores.iter().enumerate().take(batch_count) {
568 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
569 collector.insert_with_ordinal(doc_id, score, ordinal);
570 }
571 }
572
573 collector
574 .into_sorted_results()
575 .into_iter()
576 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
577 .collect()
578 } else {
579 return Ok(Vec::new());
580 };
581 let l1_elapsed = t0.elapsed();
582 log::debug!(
583 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
584 field.0,
585 results.len(),
586 l1_elapsed.as_secs_f64() * 1000.0
587 );
588
589 if ann_index.is_some()
592 && !results.is_empty()
593 && let Some(lazy_flat) = lazy_flat
594 {
595 let t_rerank = std::time::Instant::now();
596 let dim = lazy_flat.dim;
597 let quant = lazy_flat.quantization;
598 let vbs = lazy_flat.vector_byte_size();
599
600 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
603 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
604 for (j, &(_, ord)) in entries.iter().enumerate() {
605 if ord == c.1 {
606 resolved.push((ri, start + j));
607 break;
608 }
609 }
610 }
611
612 let t_resolve = t_rerank.elapsed();
613 if !resolved.is_empty() {
614 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
616
617 let t_read = std::time::Instant::now();
619 let mut raw_buf = vec![0u8; resolved.len() * vbs];
620 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
621 let _ = lazy_flat
622 .read_vector_raw_into(
623 flat_idx,
624 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
625 )
626 .await;
627 }
628
629 let read_elapsed = t_read.elapsed();
630
631 let t_score = std::time::Instant::now();
633 let mut scores = vec![0f32; resolved.len()];
634 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
635 let score_elapsed = t_score.elapsed();
636
637 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
639 results[ri].2 = scores[buf_idx];
640 }
641
642 log::debug!(
643 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
644 field.0,
645 resolved.len(),
646 dim,
647 quant,
648 vbs,
649 t_resolve.as_secs_f64() * 1000.0,
650 read_elapsed.as_secs_f64() * 1000.0,
651 score_elapsed.as_secs_f64() * 1000.0,
652 );
653 }
654
655 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
656 results.truncate(k * rerank_factor.max(1));
657 log::debug!(
658 "[search_dense] field {}: rerank total={:.1}ms",
659 field.0,
660 t_rerank.elapsed().as_secs_f64() * 1000.0
661 );
662 }
663
664 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
667 rustc_hash::FxHashMap::default();
668 for (doc_id, ordinal, score) in results {
669 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
670 ordinals.push((ordinal as u32, score));
671 }
672
673 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
675 .into_iter()
676 .map(|(doc_id, ordinals)| {
677 let combined_score = combiner.combine(&ordinals);
678 VectorSearchResult::new(doc_id, combined_score, ordinals)
679 })
680 .collect();
681
682 final_results.sort_by(|a, b| {
684 b.score
685 .partial_cmp(&a.score)
686 .unwrap_or(std::cmp::Ordering::Equal)
687 });
688 final_results.truncate(k);
689
690 Ok(final_results)
691 }
692
693 pub fn has_dense_vector_index(&self, field: Field) -> bool {
695 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
696 }
697
698 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
700 match self.vector_indexes.get(&field.0) {
701 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
702 _ => None,
703 }
704 }
705
706 pub fn get_ivf_vector_index(
708 &self,
709 field: Field,
710 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
711 match self.vector_indexes.get(&field.0) {
712 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
713 _ => None,
714 }
715 }
716
717 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
719 self.coarse_centroids.get(&field_id)
720 }
721
722 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
724 self.coarse_centroids = centroids;
725 }
726
727 pub fn get_scann_vector_index(
729 &self,
730 field: Field,
731 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
732 match self.vector_indexes.get(&field.0) {
733 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
734 _ => None,
735 }
736 }
737
738 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
740 self.vector_indexes.get(&field.0)
741 }
742
743 pub async fn search_sparse_vector(
753 &self,
754 field: Field,
755 vector: &[(u32, f32)],
756 limit: usize,
757 combiner: crate::query::MultiValueCombiner,
758 heap_factor: f32,
759 ) -> Result<Vec<VectorSearchResult>> {
760 use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
761
762 let query_tokens = vector.len();
763
764 let sparse_index = match self.sparse_indexes.get(&field.0) {
766 Some(idx) => idx,
767 None => {
768 log::debug!(
769 "Sparse vector search: no index for field {}, returning empty",
770 field.0
771 );
772 return Ok(Vec::new());
773 }
774 };
775
776 let index_dimensions = sparse_index.num_dimensions();
777
778 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
780 let mut missing_tokens = Vec::new();
781
782 for &(dim_id, query_weight) in vector {
783 if sparse_index.has_dimension(dim_id) {
784 matched_terms.push((dim_id, query_weight));
785 } else {
786 missing_tokens.push(dim_id);
787 }
788 }
789
790 log::debug!(
791 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
792 query_tokens,
793 matched_terms.len(),
794 missing_tokens.len(),
795 index_dimensions
796 );
797
798 if log::log_enabled!(log::Level::Debug) {
799 let query_details: Vec<_> = vector
800 .iter()
801 .take(30)
802 .map(|(id, w)| format!("{}:{:.3}", id, w))
803 .collect();
804 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
805 }
806
807 if !missing_tokens.is_empty() {
808 log::debug!(
809 "Missing token IDs (not in index): {:?}",
810 missing_tokens.iter().take(20).collect::<Vec<_>>()
811 );
812 }
813
814 if matched_terms.is_empty() {
815 log::debug!("Sparse vector search: no matching tokens, returning empty");
816 return Ok(Vec::new());
817 }
818
819 let num_terms = matched_terms.len();
823 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
825 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
827 .execute()
828 .await?
829 } else {
830 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
832 Vec::with_capacity(num_terms);
833 for &(dim_id, query_weight) in &matched_terms {
834 if let Some(pl) = sparse_index.get_posting(dim_id).await? {
835 posting_lists.push((dim_id, query_weight, pl));
836 }
837 }
838 let scorers: Vec<SparseTermScorer> = posting_lists
839 .iter()
840 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
841 .collect();
842 if scorers.is_empty() {
843 return Ok(Vec::new());
844 }
845 BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
846 };
847
848 log::trace!(
849 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
850 raw_results.len(),
851 self.doc_id_offset
852 );
853 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
854 for r in raw_results.iter().take(5) {
855 log::trace!(
856 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
857 r.doc_id,
858 r.doc_id + self.doc_id_offset,
859 r.score,
860 r.ordinal
861 );
862 }
863 }
864
865 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
868 rustc_hash::FxHashMap::default();
869 for r in raw_results {
870 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
871 ordinals.push((r.ordinal as u32, r.score));
872 }
873
874 let mut results: Vec<VectorSearchResult> = doc_ordinals
877 .into_iter()
878 .map(|(doc_id, ordinals)| {
879 let combined_score = combiner.combine(&ordinals);
880 VectorSearchResult::new(doc_id, combined_score, ordinals)
881 })
882 .collect();
883
884 results.sort_by(|a, b| {
886 b.score
887 .partial_cmp(&a.score)
888 .unwrap_or(std::cmp::Ordering::Equal)
889 });
890 results.truncate(limit);
891
892 Ok(results)
893 }
894
895 pub async fn get_positions(
900 &self,
901 field: Field,
902 term: &[u8],
903 ) -> Result<Option<crate::structures::PositionPostingList>> {
904 let handle = match &self.positions_handle {
906 Some(h) => h,
907 None => return Ok(None),
908 };
909
910 let mut key = Vec::with_capacity(4 + term.len());
912 key.extend_from_slice(&field.0.to_le_bytes());
913 key.extend_from_slice(term);
914
915 let term_info = match self.term_dict.get(&key).await? {
917 Some(info) => info,
918 None => return Ok(None),
919 };
920
921 let (offset, length) = match term_info.position_info() {
923 Some((o, l)) => (o, l),
924 None => return Ok(None),
925 };
926
927 let slice = handle.slice(offset..offset + length as u64);
929 let data = slice.read_bytes().await?;
930
931 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
933
934 Ok(Some(pos_list))
935 }
936
937 pub fn has_positions(&self, field: Field) -> bool {
939 if let Some(entry) = self.schema.get_field_entry(field) {
941 entry.positions.is_some()
942 } else {
943 false
944 }
945 }
946}
947
948pub type SegmentReader = AsyncSegmentReader;