1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: LazyFileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 doc_id_offset: DocId,
72 vector_indexes: FxHashMap<u32, VectorIndex>,
74 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76 coarse_centroids: Option<Arc<CoarseCentroids>>,
78 sparse_indexes: FxHashMap<u32, SparseIndex>,
80 positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 doc_id_offset: DocId,
91 cache_blocks: usize,
92 ) -> Result<Self> {
93 let files = SegmentFiles::new(segment_id.0);
94
95 let meta_slice = dir.open_read(&files.meta).await?;
97 let meta_bytes = meta_slice.read_bytes().await?;
98 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99 debug_assert_eq!(meta.id, segment_id.0);
100
101 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105 let postings_handle = dir.open_lazy(&files.postings).await?;
107
108 let store_handle = dir.open_lazy(&files.store).await?;
110 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114 let vector_indexes = vectors_data.indexes;
115 let flat_vectors = vectors_data.flat_vectors;
116 let coarse_centroids = vectors_data.coarse_centroids;
117
118 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
120
121 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
123
124 let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
126 let sparse_mem = sparse_dims * 24; log::debug!(
128 "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
129 segment_id.0,
130 meta.num_docs,
131 sparse_dims,
132 sparse_mem as f64 / 1024.0,
133 flat_vectors.len(),
134 vector_indexes.len()
135 );
136
137 Ok(Self {
138 meta,
139 term_dict: Arc::new(term_dict),
140 postings_handle,
141 store: Arc::new(store),
142 schema,
143 doc_id_offset,
144 vector_indexes,
145 flat_vectors,
146 coarse_centroids,
147 sparse_indexes,
148 positions_handle,
149 })
150 }
151
152 pub fn meta(&self) -> &SegmentMeta {
153 &self.meta
154 }
155
156 pub fn num_docs(&self) -> u32 {
157 self.meta.num_docs
158 }
159
160 pub fn avg_field_len(&self, field: Field) -> f32 {
162 self.meta.avg_field_len(field)
163 }
164
165 pub fn doc_id_offset(&self) -> DocId {
166 self.doc_id_offset
167 }
168
169 pub fn set_doc_id_offset(&mut self, offset: DocId) {
171 self.doc_id_offset = offset;
172 }
173
174 pub fn schema(&self) -> &Schema {
175 &self.schema
176 }
177
178 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
180 &self.sparse_indexes
181 }
182
183 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
185 &self.vector_indexes
186 }
187
188 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
190 &self.flat_vectors
191 }
192
193 pub fn term_dict_stats(&self) -> SSTableStats {
195 self.term_dict.stats()
196 }
197
198 pub fn memory_stats(&self) -> SegmentMemoryStats {
200 let term_dict_stats = self.term_dict.stats();
201
202 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
204
205 let store_cache_bytes = self.store.cached_blocks() * 4096;
207
208 let sparse_index_bytes: usize = self
211 .sparse_indexes
212 .values()
213 .map(|s| s.num_dimensions() * 24)
214 .sum();
215
216 let dense_index_bytes: usize = self
219 .vector_indexes
220 .values()
221 .map(|v| v.estimated_memory_bytes())
222 .sum();
223
224 SegmentMemoryStats {
225 segment_id: self.meta.id,
226 num_docs: self.meta.num_docs,
227 term_dict_cache_bytes,
228 store_cache_bytes,
229 sparse_index_bytes,
230 dense_index_bytes,
231 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
232 }
233 }
234
235 pub async fn get_postings(
240 &self,
241 field: Field,
242 term: &[u8],
243 ) -> Result<Option<BlockPostingList>> {
244 log::debug!(
245 "SegmentReader::get_postings field={} term_len={}",
246 field.0,
247 term.len()
248 );
249
250 let mut key = Vec::with_capacity(4 + term.len());
252 key.extend_from_slice(&field.0.to_le_bytes());
253 key.extend_from_slice(term);
254
255 let term_info = match self.term_dict.get(&key).await? {
257 Some(info) => {
258 log::debug!("SegmentReader::get_postings found term_info");
259 info
260 }
261 None => {
262 log::debug!("SegmentReader::get_postings term not found");
263 return Ok(None);
264 }
265 };
266
267 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
269 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
271 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
272 posting_list.push(doc_id, tf);
273 }
274 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
275 return Ok(Some(block_list));
276 }
277
278 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
280 Error::Corruption("TermInfo has neither inline nor external data".to_string())
281 })?;
282
283 let start = posting_offset;
284 let end = start + posting_len as u64;
285
286 if end > self.postings_handle.len() {
287 return Err(Error::Corruption(
288 "Posting offset out of bounds".to_string(),
289 ));
290 }
291
292 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
293 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
294
295 Ok(Some(block_list))
296 }
297
298 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
303 let mut doc = match self.store.get(local_doc_id, &self.schema).await {
304 Ok(Some(d)) => d,
305 Ok(None) => return Ok(None),
306 Err(e) => return Err(Error::from(e)),
307 };
308
309 for (&field_id, lazy_flat) in &self.flat_vectors {
311 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
312 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
313 let flat_idx = start + j;
314 match lazy_flat.get_vector(flat_idx).await {
315 Ok(vec) => {
316 doc.add_dense_vector(Field(field_id), vec);
317 }
318 Err(e) => {
319 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
320 }
321 }
322 }
323 }
324
325 Ok(Some(doc))
326 }
327
328 pub async fn prefetch_terms(
330 &self,
331 field: Field,
332 start_term: &[u8],
333 end_term: &[u8],
334 ) -> Result<()> {
335 let mut start_key = Vec::with_capacity(4 + start_term.len());
336 start_key.extend_from_slice(&field.0.to_le_bytes());
337 start_key.extend_from_slice(start_term);
338
339 let mut end_key = Vec::with_capacity(4 + end_term.len());
340 end_key.extend_from_slice(&field.0.to_le_bytes());
341 end_key.extend_from_slice(end_term);
342
343 self.term_dict.prefetch_range(&start_key, &end_key).await?;
344 Ok(())
345 }
346
347 pub fn store_has_dict(&self) -> bool {
349 self.store.has_dict()
350 }
351
352 pub fn store(&self) -> &super::store::AsyncStoreReader {
354 &self.store
355 }
356
357 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
359 self.store.raw_blocks()
360 }
361
362 pub fn store_data_slice(&self) -> &LazyFileSlice {
364 self.store.data_slice()
365 }
366
367 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
369 self.term_dict.all_entries().await.map_err(Error::from)
370 }
371
372 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
377 let entries = self.term_dict.all_entries().await?;
378 let mut result = Vec::with_capacity(entries.len());
379
380 for (key, term_info) in entries {
381 if key.len() > 4 {
383 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
384 let term_bytes = &key[4..];
385 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
386 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
387 }
388 }
389 }
390
391 Ok(result)
392 }
393
394 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
396 self.term_dict.iter()
397 }
398
399 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
403 self.term_dict
404 .prefetch_all_data_bulk()
405 .await
406 .map_err(crate::Error::from)
407 }
408
409 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
411 let start = offset;
412 let end = start + len as u64;
413 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
414 Ok(bytes.to_vec())
415 }
416
417 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
419 let handle = match &self.positions_handle {
420 Some(h) => h,
421 None => return Ok(None),
422 };
423 let start = offset;
424 let end = start + len as u64;
425 let bytes = handle.read_bytes_range(start..end).await?;
426 Ok(Some(bytes.to_vec()))
427 }
428
429 pub fn has_positions_file(&self) -> bool {
431 self.positions_handle.is_some()
432 }
433
434 pub async fn search_dense_vector(
440 &self,
441 field: Field,
442 query: &[f32],
443 k: usize,
444 nprobe: usize,
445 rerank_factor: usize,
446 combiner: crate::query::MultiValueCombiner,
447 ) -> Result<Vec<VectorSearchResult>> {
448 let ann_index = self.vector_indexes.get(&field.0);
449 let lazy_flat = self.flat_vectors.get(&field.0);
450
451 if ann_index.is_none() && lazy_flat.is_none() {
453 return Ok(Vec::new());
454 }
455
456 const BRUTE_FORCE_BATCH: usize = 4096;
458
459 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
461 match index {
463 VectorIndex::RaBitQ(rabitq) => {
464 let fetch_k = k * rerank_factor.max(1);
465 rabitq
466 .search(query, fetch_k, rerank_factor)
467 .into_iter()
468 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
469 .collect()
470 }
471 VectorIndex::IVF { index, codebook } => {
472 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
473 Error::Schema("IVF index requires coarse centroids".to_string())
474 })?;
475 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
476 let fetch_k = k * rerank_factor.max(1);
477 index
478 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
479 .into_iter()
480 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
481 .collect()
482 }
483 VectorIndex::ScaNN { index, codebook } => {
484 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
485 Error::Schema("ScaNN index requires coarse centroids".to_string())
486 })?;
487 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
488 let fetch_k = k * rerank_factor.max(1);
489 index
490 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
491 .into_iter()
492 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
493 .collect()
494 }
495 }
496 } else if let Some(lazy_flat) = lazy_flat {
497 let dim = lazy_flat.dim;
499 let n = lazy_flat.num_vectors;
500 let quant = lazy_flat.quantization;
501 let fetch_k = k * rerank_factor.max(1);
502 let mut candidates: Vec<(u32, u16, f32)> = Vec::new();
503
504 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
505 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
506 let batch_bytes = lazy_flat
507 .read_vectors_batch(batch_start, batch_count)
508 .await
509 .map_err(crate::Error::Io)?;
510 let raw = batch_bytes.as_slice();
511
512 let mut scores = vec![0f32; batch_count];
513
514 match quant {
515 crate::dsl::DenseVectorQuantization::F32 => {
516 let batch_floats = batch_count * dim;
517 let mut aligned_buf: Vec<f32> = Vec::new();
518 let vectors: &[f32] = if (raw.as_ptr() as usize)
519 .is_multiple_of(std::mem::align_of::<f32>())
520 {
521 unsafe {
522 std::slice::from_raw_parts(raw.as_ptr() as *const f32, batch_floats)
523 }
524 } else {
525 aligned_buf.resize(batch_floats, 0.0);
526 unsafe {
527 std::ptr::copy_nonoverlapping(
528 raw.as_ptr(),
529 aligned_buf.as_mut_ptr() as *mut u8,
530 batch_floats * std::mem::size_of::<f32>(),
531 );
532 }
533 &aligned_buf
534 };
535 crate::structures::simd::batch_cosine_scores(
536 query,
537 vectors,
538 dim,
539 &mut scores,
540 );
541 }
542 crate::dsl::DenseVectorQuantization::F16 => {
543 crate::structures::simd::batch_cosine_scores_f16(
544 query,
545 raw,
546 dim,
547 &mut scores,
548 );
549 }
550 crate::dsl::DenseVectorQuantization::UInt8 => {
551 crate::structures::simd::batch_cosine_scores_u8(
552 query,
553 raw,
554 dim,
555 &mut scores,
556 );
557 }
558 }
559
560 for (i, &score) in scores.iter().enumerate().take(batch_count) {
561 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
562 candidates.push((doc_id, ordinal, score));
563 }
564 }
565
566 candidates.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
567 candidates.truncate(fetch_k);
568 candidates
569 } else {
570 return Ok(Vec::new());
571 };
572
573 if ann_index.is_some()
576 && !results.is_empty()
577 && let Some(lazy_flat) = lazy_flat
578 {
579 let dim = lazy_flat.dim;
580 let quant = lazy_flat.quantization;
581 let vbs = lazy_flat.vector_byte_size();
582
583 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
586 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
587 for (j, &(_, ord)) in entries.iter().enumerate() {
588 if ord == c.1 {
589 resolved.push((ri, start + j));
590 break;
591 }
592 }
593 }
594
595 if !resolved.is_empty() {
596 let mut raw_buf = vec![0u8; resolved.len() * vbs];
598 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
599 let _ = lazy_flat
600 .read_vector_raw_into(
601 flat_idx,
602 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
603 )
604 .await;
605 }
606
607 let mut scores = vec![0f32; resolved.len()];
609 match quant {
610 crate::dsl::DenseVectorQuantization::F32 => {
611 let floats = resolved.len() * dim;
612 let mut aligned_buf: Vec<f32> = Vec::new();
613 let vectors: &[f32] = if (raw_buf.as_ptr() as usize)
614 .is_multiple_of(std::mem::align_of::<f32>())
615 {
616 unsafe {
617 std::slice::from_raw_parts(raw_buf.as_ptr() as *const f32, floats)
618 }
619 } else {
620 aligned_buf.resize(floats, 0.0);
621 unsafe {
622 std::ptr::copy_nonoverlapping(
623 raw_buf.as_ptr(),
624 aligned_buf.as_mut_ptr() as *mut u8,
625 floats * std::mem::size_of::<f32>(),
626 );
627 }
628 &aligned_buf
629 };
630 crate::structures::simd::batch_cosine_scores(
631 query,
632 vectors,
633 dim,
634 &mut scores,
635 );
636 }
637 crate::dsl::DenseVectorQuantization::F16 => {
638 crate::structures::simd::batch_cosine_scores_f16(
639 query,
640 &raw_buf,
641 dim,
642 &mut scores,
643 );
644 }
645 crate::dsl::DenseVectorQuantization::UInt8 => {
646 crate::structures::simd::batch_cosine_scores_u8(
647 query,
648 &raw_buf,
649 dim,
650 &mut scores,
651 );
652 }
653 }
654
655 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
657 results[ri].2 = scores[buf_idx];
658 }
659 }
660
661 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
662 results.truncate(k * rerank_factor.max(1));
663 }
664
665 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
668 rustc_hash::FxHashMap::default();
669 for (doc_id, ordinal, score) in results {
670 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
671 ordinals.push((ordinal as u32, score));
672 }
673
674 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
676 .into_iter()
677 .map(|(doc_id, ordinals)| {
678 let combined_score = combiner.combine(&ordinals);
679 VectorSearchResult::new(doc_id, combined_score, ordinals)
680 })
681 .collect();
682
683 final_results.sort_by(|a, b| {
685 b.score
686 .partial_cmp(&a.score)
687 .unwrap_or(std::cmp::Ordering::Equal)
688 });
689 final_results.truncate(k);
690
691 Ok(final_results)
692 }
693
694 pub fn has_dense_vector_index(&self, field: Field) -> bool {
696 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
697 }
698
699 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
701 match self.vector_indexes.get(&field.0) {
702 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
703 _ => None,
704 }
705 }
706
707 pub fn get_ivf_vector_index(
709 &self,
710 field: Field,
711 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
712 match self.vector_indexes.get(&field.0) {
713 Some(VectorIndex::IVF { index, codebook }) => Some((index.clone(), codebook.clone())),
714 _ => None,
715 }
716 }
717
718 pub fn coarse_centroids(&self) -> Option<&Arc<CoarseCentroids>> {
720 self.coarse_centroids.as_ref()
721 }
722
723 pub fn get_scann_vector_index(
725 &self,
726 field: Field,
727 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
728 match self.vector_indexes.get(&field.0) {
729 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
730 _ => None,
731 }
732 }
733
734 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
736 self.vector_indexes.get(&field.0)
737 }
738
739 pub async fn search_sparse_vector(
749 &self,
750 field: Field,
751 vector: &[(u32, f32)],
752 limit: usize,
753 combiner: crate::query::MultiValueCombiner,
754 heap_factor: f32,
755 ) -> Result<Vec<VectorSearchResult>> {
756 use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
757
758 let query_tokens = vector.len();
759
760 let sparse_index = match self.sparse_indexes.get(&field.0) {
762 Some(idx) => idx,
763 None => {
764 log::debug!(
765 "Sparse vector search: no index for field {}, returning empty",
766 field.0
767 );
768 return Ok(Vec::new());
769 }
770 };
771
772 let index_dimensions = sparse_index.num_dimensions();
773
774 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
776 let mut missing_tokens = Vec::new();
777
778 for &(dim_id, query_weight) in vector {
779 if sparse_index.has_dimension(dim_id) {
780 matched_terms.push((dim_id, query_weight));
781 } else {
782 missing_tokens.push(dim_id);
783 }
784 }
785
786 log::debug!(
787 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
788 query_tokens,
789 matched_terms.len(),
790 missing_tokens.len(),
791 index_dimensions
792 );
793
794 if log::log_enabled!(log::Level::Debug) {
795 let query_details: Vec<_> = vector
796 .iter()
797 .take(30)
798 .map(|(id, w)| format!("{}:{:.3}", id, w))
799 .collect();
800 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
801 }
802
803 if !missing_tokens.is_empty() {
804 log::debug!(
805 "Missing token IDs (not in index): {:?}",
806 missing_tokens.iter().take(20).collect::<Vec<_>>()
807 );
808 }
809
810 if matched_terms.is_empty() {
811 log::debug!("Sparse vector search: no matching tokens, returning empty");
812 return Ok(Vec::new());
813 }
814
815 let num_terms = matched_terms.len();
819 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
821 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
823 .execute()
824 .await?
825 } else {
826 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
828 Vec::with_capacity(num_terms);
829 for &(dim_id, query_weight) in &matched_terms {
830 if let Some(pl) = sparse_index.get_posting(dim_id).await? {
831 posting_lists.push((dim_id, query_weight, pl));
832 }
833 }
834 let scorers: Vec<SparseTermScorer> = posting_lists
835 .iter()
836 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
837 .collect();
838 if scorers.is_empty() {
839 return Ok(Vec::new());
840 }
841 BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
842 };
843
844 log::trace!(
845 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
846 raw_results.len(),
847 self.doc_id_offset
848 );
849 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
850 for r in raw_results.iter().take(5) {
851 log::trace!(
852 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
853 r.doc_id,
854 r.doc_id + self.doc_id_offset,
855 r.score,
856 r.ordinal
857 );
858 }
859 }
860
861 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
864 rustc_hash::FxHashMap::default();
865 for r in raw_results {
866 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
867 ordinals.push((r.ordinal as u32, r.score));
868 }
869
870 let mut results: Vec<VectorSearchResult> = doc_ordinals
873 .into_iter()
874 .map(|(doc_id, ordinals)| {
875 let combined_score = combiner.combine(&ordinals);
876 VectorSearchResult::new(doc_id, combined_score, ordinals)
877 })
878 .collect();
879
880 results.sort_by(|a, b| {
882 b.score
883 .partial_cmp(&a.score)
884 .unwrap_or(std::cmp::Ordering::Equal)
885 });
886 results.truncate(limit);
887
888 Ok(results)
889 }
890
891 pub async fn get_positions(
896 &self,
897 field: Field,
898 term: &[u8],
899 ) -> Result<Option<crate::structures::PositionPostingList>> {
900 use std::io::Cursor;
901
902 let handle = match &self.positions_handle {
904 Some(h) => h,
905 None => return Ok(None),
906 };
907
908 let mut key = Vec::with_capacity(4 + term.len());
910 key.extend_from_slice(&field.0.to_le_bytes());
911 key.extend_from_slice(term);
912
913 let term_info = match self.term_dict.get(&key).await? {
915 Some(info) => info,
916 None => return Ok(None),
917 };
918
919 let (offset, length) = match term_info.position_info() {
921 Some((o, l)) => (o, l),
922 None => return Ok(None),
923 };
924
925 let slice = handle.slice(offset..offset + length as u64);
927 let data = slice.read_bytes().await?;
928
929 let mut cursor = Cursor::new(data.as_slice());
931 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
932
933 Ok(Some(pos_list))
934 }
935
936 pub fn has_positions(&self, field: Field) -> bool {
938 if let Some(entry) = self.schema.get_field_entry(field) {
940 entry.positions.is_some()
941 } else {
942 false
943 }
944 }
945}
946
947pub type SegmentReader = AsyncSegmentReader;