1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: LazyFileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 doc_id_offset: DocId,
72 vector_indexes: FxHashMap<u32, VectorIndex>,
74 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76 coarse_centroids: Option<Arc<CoarseCentroids>>,
78 sparse_indexes: FxHashMap<u32, SparseIndex>,
80 positions_handle: Option<LazyFileHandle>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 doc_id_offset: DocId,
91 cache_blocks: usize,
92 ) -> Result<Self> {
93 let files = SegmentFiles::new(segment_id.0);
94
95 let meta_slice = dir.open_read(&files.meta).await?;
97 let meta_bytes = meta_slice.read_bytes().await?;
98 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99 debug_assert_eq!(meta.id, segment_id.0);
100
101 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105 let postings_handle = dir.open_lazy(&files.postings).await?;
107
108 let store_handle = dir.open_lazy(&files.store).await?;
110 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114 let vector_indexes = vectors_data.indexes;
115 let flat_vectors = vectors_data.flat_vectors;
116 let coarse_centroids = vectors_data.coarse_centroids;
117
118 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
120
121 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
123
124 let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
126 let sparse_mem = sparse_dims * 24; log::debug!(
128 "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, dense_flat={}, dense_ann={}",
129 segment_id.0,
130 meta.num_docs,
131 sparse_dims,
132 sparse_mem as f64 / 1024.0,
133 flat_vectors.len(),
134 vector_indexes.len()
135 );
136
137 Ok(Self {
138 meta,
139 term_dict: Arc::new(term_dict),
140 postings_handle,
141 store: Arc::new(store),
142 schema,
143 doc_id_offset,
144 vector_indexes,
145 flat_vectors,
146 coarse_centroids,
147 sparse_indexes,
148 positions_handle,
149 })
150 }
151
152 pub fn meta(&self) -> &SegmentMeta {
153 &self.meta
154 }
155
156 pub fn num_docs(&self) -> u32 {
157 self.meta.num_docs
158 }
159
160 pub fn avg_field_len(&self, field: Field) -> f32 {
162 self.meta.avg_field_len(field)
163 }
164
165 pub fn doc_id_offset(&self) -> DocId {
166 self.doc_id_offset
167 }
168
169 pub fn set_doc_id_offset(&mut self, offset: DocId) {
171 self.doc_id_offset = offset;
172 }
173
174 pub fn schema(&self) -> &Schema {
175 &self.schema
176 }
177
178 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
180 &self.sparse_indexes
181 }
182
183 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
185 &self.vector_indexes
186 }
187
188 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
190 &self.flat_vectors
191 }
192
193 pub fn term_dict_stats(&self) -> SSTableStats {
195 self.term_dict.stats()
196 }
197
198 pub fn memory_stats(&self) -> SegmentMemoryStats {
200 let term_dict_stats = self.term_dict.stats();
201
202 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
204
205 let store_cache_bytes = self.store.cached_blocks() * 4096;
207
208 let sparse_index_bytes: usize = self
211 .sparse_indexes
212 .values()
213 .map(|s| s.num_dimensions() * 24)
214 .sum();
215
216 let dense_index_bytes: usize = self
219 .vector_indexes
220 .values()
221 .map(|v| v.estimated_memory_bytes())
222 .sum();
223
224 SegmentMemoryStats {
225 segment_id: self.meta.id,
226 num_docs: self.meta.num_docs,
227 term_dict_cache_bytes,
228 store_cache_bytes,
229 sparse_index_bytes,
230 dense_index_bytes,
231 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
232 }
233 }
234
235 pub async fn get_postings(
240 &self,
241 field: Field,
242 term: &[u8],
243 ) -> Result<Option<BlockPostingList>> {
244 log::debug!(
245 "SegmentReader::get_postings field={} term_len={}",
246 field.0,
247 term.len()
248 );
249
250 let mut key = Vec::with_capacity(4 + term.len());
252 key.extend_from_slice(&field.0.to_le_bytes());
253 key.extend_from_slice(term);
254
255 let term_info = match self.term_dict.get(&key).await? {
257 Some(info) => {
258 log::debug!("SegmentReader::get_postings found term_info");
259 info
260 }
261 None => {
262 log::debug!("SegmentReader::get_postings term not found");
263 return Ok(None);
264 }
265 };
266
267 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
269 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
271 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
272 posting_list.push(doc_id, tf);
273 }
274 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
275 return Ok(Some(block_list));
276 }
277
278 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
280 Error::Corruption("TermInfo has neither inline nor external data".to_string())
281 })?;
282
283 let start = posting_offset;
284 let end = start + posting_len as u64;
285
286 if end > self.postings_handle.len() {
287 return Err(Error::Corruption(
288 "Posting offset out of bounds".to_string(),
289 ));
290 }
291
292 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
293 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
294
295 Ok(Some(block_list))
296 }
297
298 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
303 let mut doc = match self.store.get(local_doc_id, &self.schema).await {
304 Ok(Some(d)) => d,
305 Ok(None) => return Ok(None),
306 Err(e) => return Err(Error::from(e)),
307 };
308
309 for (&field_id, lazy_flat) in &self.flat_vectors {
311 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
312 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
313 let flat_idx = start + j;
314 match lazy_flat.get_vector(flat_idx).await {
315 Ok(vec) => {
316 doc.add_dense_vector(Field(field_id), vec);
317 }
318 Err(e) => {
319 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
320 }
321 }
322 }
323 }
324
325 Ok(Some(doc))
326 }
327
328 pub async fn prefetch_terms(
330 &self,
331 field: Field,
332 start_term: &[u8],
333 end_term: &[u8],
334 ) -> Result<()> {
335 let mut start_key = Vec::with_capacity(4 + start_term.len());
336 start_key.extend_from_slice(&field.0.to_le_bytes());
337 start_key.extend_from_slice(start_term);
338
339 let mut end_key = Vec::with_capacity(4 + end_term.len());
340 end_key.extend_from_slice(&field.0.to_le_bytes());
341 end_key.extend_from_slice(end_term);
342
343 self.term_dict.prefetch_range(&start_key, &end_key).await?;
344 Ok(())
345 }
346
347 pub fn store_has_dict(&self) -> bool {
349 self.store.has_dict()
350 }
351
352 pub fn store(&self) -> &super::store::AsyncStoreReader {
354 &self.store
355 }
356
357 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
359 self.store.raw_blocks()
360 }
361
362 pub fn store_data_slice(&self) -> &LazyFileSlice {
364 self.store.data_slice()
365 }
366
367 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
369 self.term_dict.all_entries().await.map_err(Error::from)
370 }
371
372 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
377 let entries = self.term_dict.all_entries().await?;
378 let mut result = Vec::with_capacity(entries.len());
379
380 for (key, term_info) in entries {
381 if key.len() > 4 {
383 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
384 let term_bytes = &key[4..];
385 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
386 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
387 }
388 }
389 }
390
391 Ok(result)
392 }
393
394 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
396 self.term_dict.iter()
397 }
398
399 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
403 self.term_dict
404 .prefetch_all_data_bulk()
405 .await
406 .map_err(crate::Error::from)
407 }
408
409 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
411 let start = offset;
412 let end = start + len as u64;
413 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
414 Ok(bytes.to_vec())
415 }
416
417 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
419 let handle = match &self.positions_handle {
420 Some(h) => h,
421 None => return Ok(None),
422 };
423 let start = offset;
424 let end = start + len as u64;
425 let bytes = handle.read_bytes_range(start..end).await?;
426 Ok(Some(bytes.to_vec()))
427 }
428
429 pub fn has_positions_file(&self) -> bool {
431 self.positions_handle.is_some()
432 }
433
434 fn score_quantized_batch(
440 query: &[f32],
441 raw: &[u8],
442 quant: crate::dsl::DenseVectorQuantization,
443 dim: usize,
444 scores: &mut [f32],
445 ) {
446 match quant {
447 crate::dsl::DenseVectorQuantization::F32 => {
448 let num_floats = scores.len() * dim;
449 debug_assert!(
450 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
451 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
452 );
453 let vectors: &[f32] =
454 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
455 crate::structures::simd::batch_cosine_scores(query, vectors, dim, scores);
456 }
457 crate::dsl::DenseVectorQuantization::F16 => {
458 crate::structures::simd::batch_cosine_scores_f16(query, raw, dim, scores);
459 }
460 crate::dsl::DenseVectorQuantization::UInt8 => {
461 crate::structures::simd::batch_cosine_scores_u8(query, raw, dim, scores);
462 }
463 }
464 }
465
466 pub async fn search_dense_vector(
472 &self,
473 field: Field,
474 query: &[f32],
475 k: usize,
476 nprobe: usize,
477 rerank_factor: usize,
478 combiner: crate::query::MultiValueCombiner,
479 ) -> Result<Vec<VectorSearchResult>> {
480 let ann_index = self.vector_indexes.get(&field.0);
481 let lazy_flat = self.flat_vectors.get(&field.0);
482
483 if ann_index.is_none() && lazy_flat.is_none() {
485 return Ok(Vec::new());
486 }
487
488 const BRUTE_FORCE_BATCH: usize = 4096;
490
491 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
493 match index {
495 VectorIndex::RaBitQ(rabitq) => {
496 let fetch_k = k * rerank_factor.max(1);
497 rabitq
498 .search(query, fetch_k, rerank_factor)
499 .into_iter()
500 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
501 .collect()
502 }
503 VectorIndex::IVF { index, codebook } => {
504 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
505 Error::Schema("IVF index requires coarse centroids".to_string())
506 })?;
507 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
508 let fetch_k = k * rerank_factor.max(1);
509 index
510 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
511 .into_iter()
512 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
513 .collect()
514 }
515 VectorIndex::ScaNN { index, codebook } => {
516 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
517 Error::Schema("ScaNN index requires coarse centroids".to_string())
518 })?;
519 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
520 let fetch_k = k * rerank_factor.max(1);
521 index
522 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
523 .into_iter()
524 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
525 .collect()
526 }
527 }
528 } else if let Some(lazy_flat) = lazy_flat {
529 let dim = lazy_flat.dim;
532 let n = lazy_flat.num_vectors;
533 let quant = lazy_flat.quantization;
534 let fetch_k = k * rerank_factor.max(1);
535 let mut collector = crate::query::ScoreCollector::new(fetch_k);
536
537 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
538 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
539 let batch_bytes = lazy_flat
540 .read_vectors_batch(batch_start, batch_count)
541 .await
542 .map_err(crate::Error::Io)?;
543 let raw = batch_bytes.as_slice();
544
545 let mut scores = vec![0f32; batch_count];
546 Self::score_quantized_batch(query, raw, quant, dim, &mut scores);
547
548 for (i, &score) in scores.iter().enumerate().take(batch_count) {
549 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
550 collector.insert_with_ordinal(doc_id, score, ordinal);
551 }
552 }
553
554 collector
555 .into_sorted_results()
556 .into_iter()
557 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
558 .collect()
559 } else {
560 return Ok(Vec::new());
561 };
562
563 if ann_index.is_some()
566 && !results.is_empty()
567 && let Some(lazy_flat) = lazy_flat
568 {
569 let dim = lazy_flat.dim;
570 let quant = lazy_flat.quantization;
571 let vbs = lazy_flat.vector_byte_size();
572
573 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
576 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
577 for (j, &(_, ord)) in entries.iter().enumerate() {
578 if ord == c.1 {
579 resolved.push((ri, start + j));
580 break;
581 }
582 }
583 }
584
585 if !resolved.is_empty() {
586 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
588
589 let mut raw_buf = vec![0u8; resolved.len() * vbs];
591 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
592 let _ = lazy_flat
593 .read_vector_raw_into(
594 flat_idx,
595 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
596 )
597 .await;
598 }
599
600 let mut scores = vec![0f32; resolved.len()];
602 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores);
603
604 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
606 results[ri].2 = scores[buf_idx];
607 }
608 }
609
610 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
611 results.truncate(k * rerank_factor.max(1));
612 }
613
614 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
617 rustc_hash::FxHashMap::default();
618 for (doc_id, ordinal, score) in results {
619 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
620 ordinals.push((ordinal as u32, score));
621 }
622
623 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
625 .into_iter()
626 .map(|(doc_id, ordinals)| {
627 let combined_score = combiner.combine(&ordinals);
628 VectorSearchResult::new(doc_id, combined_score, ordinals)
629 })
630 .collect();
631
632 final_results.sort_by(|a, b| {
634 b.score
635 .partial_cmp(&a.score)
636 .unwrap_or(std::cmp::Ordering::Equal)
637 });
638 final_results.truncate(k);
639
640 Ok(final_results)
641 }
642
643 pub fn has_dense_vector_index(&self, field: Field) -> bool {
645 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
646 }
647
648 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
650 match self.vector_indexes.get(&field.0) {
651 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
652 _ => None,
653 }
654 }
655
656 pub fn get_ivf_vector_index(
658 &self,
659 field: Field,
660 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
661 match self.vector_indexes.get(&field.0) {
662 Some(VectorIndex::IVF { index, codebook }) => Some((index.clone(), codebook.clone())),
663 _ => None,
664 }
665 }
666
667 pub fn coarse_centroids(&self) -> Option<&Arc<CoarseCentroids>> {
669 self.coarse_centroids.as_ref()
670 }
671
672 pub fn get_scann_vector_index(
674 &self,
675 field: Field,
676 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
677 match self.vector_indexes.get(&field.0) {
678 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
679 _ => None,
680 }
681 }
682
683 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
685 self.vector_indexes.get(&field.0)
686 }
687
688 pub async fn search_sparse_vector(
698 &self,
699 field: Field,
700 vector: &[(u32, f32)],
701 limit: usize,
702 combiner: crate::query::MultiValueCombiner,
703 heap_factor: f32,
704 ) -> Result<Vec<VectorSearchResult>> {
705 use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
706
707 let query_tokens = vector.len();
708
709 let sparse_index = match self.sparse_indexes.get(&field.0) {
711 Some(idx) => idx,
712 None => {
713 log::debug!(
714 "Sparse vector search: no index for field {}, returning empty",
715 field.0
716 );
717 return Ok(Vec::new());
718 }
719 };
720
721 let index_dimensions = sparse_index.num_dimensions();
722
723 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
725 let mut missing_tokens = Vec::new();
726
727 for &(dim_id, query_weight) in vector {
728 if sparse_index.has_dimension(dim_id) {
729 matched_terms.push((dim_id, query_weight));
730 } else {
731 missing_tokens.push(dim_id);
732 }
733 }
734
735 log::debug!(
736 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
737 query_tokens,
738 matched_terms.len(),
739 missing_tokens.len(),
740 index_dimensions
741 );
742
743 if log::log_enabled!(log::Level::Debug) {
744 let query_details: Vec<_> = vector
745 .iter()
746 .take(30)
747 .map(|(id, w)| format!("{}:{:.3}", id, w))
748 .collect();
749 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
750 }
751
752 if !missing_tokens.is_empty() {
753 log::debug!(
754 "Missing token IDs (not in index): {:?}",
755 missing_tokens.iter().take(20).collect::<Vec<_>>()
756 );
757 }
758
759 if matched_terms.is_empty() {
760 log::debug!("Sparse vector search: no matching tokens, returning empty");
761 return Ok(Vec::new());
762 }
763
764 let num_terms = matched_terms.len();
768 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
770 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
772 .execute()
773 .await?
774 } else {
775 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
777 Vec::with_capacity(num_terms);
778 for &(dim_id, query_weight) in &matched_terms {
779 if let Some(pl) = sparse_index.get_posting(dim_id).await? {
780 posting_lists.push((dim_id, query_weight, pl));
781 }
782 }
783 let scorers: Vec<SparseTermScorer> = posting_lists
784 .iter()
785 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
786 .collect();
787 if scorers.is_empty() {
788 return Ok(Vec::new());
789 }
790 BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
791 };
792
793 log::trace!(
794 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
795 raw_results.len(),
796 self.doc_id_offset
797 );
798 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
799 for r in raw_results.iter().take(5) {
800 log::trace!(
801 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
802 r.doc_id,
803 r.doc_id + self.doc_id_offset,
804 r.score,
805 r.ordinal
806 );
807 }
808 }
809
810 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
813 rustc_hash::FxHashMap::default();
814 for r in raw_results {
815 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
816 ordinals.push((r.ordinal as u32, r.score));
817 }
818
819 let mut results: Vec<VectorSearchResult> = doc_ordinals
822 .into_iter()
823 .map(|(doc_id, ordinals)| {
824 let combined_score = combiner.combine(&ordinals);
825 VectorSearchResult::new(doc_id, combined_score, ordinals)
826 })
827 .collect();
828
829 results.sort_by(|a, b| {
831 b.score
832 .partial_cmp(&a.score)
833 .unwrap_or(std::cmp::Ordering::Equal)
834 });
835 results.truncate(limit);
836
837 Ok(results)
838 }
839
840 pub async fn get_positions(
845 &self,
846 field: Field,
847 term: &[u8],
848 ) -> Result<Option<crate::structures::PositionPostingList>> {
849 use std::io::Cursor;
850
851 let handle = match &self.positions_handle {
853 Some(h) => h,
854 None => return Ok(None),
855 };
856
857 let mut key = Vec::with_capacity(4 + term.len());
859 key.extend_from_slice(&field.0.to_le_bytes());
860 key.extend_from_slice(term);
861
862 let term_info = match self.term_dict.get(&key).await? {
864 Some(info) => info,
865 None => return Ok(None),
866 };
867
868 let (offset, length) = match term_info.position_info() {
870 Some((o, l)) => (o, l),
871 None => return Ok(None),
872 };
873
874 let slice = handle.slice(offset..offset + length as u64);
876 let data = slice.read_bytes().await?;
877
878 let mut cursor = Cursor::new(data.as_slice());
880 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
881
882 Ok(Some(pos_list))
883 }
884
885 pub fn has_positions(&self, field: Field) -> bool {
887 if let Some(entry) = self.schema.get_field_entry(field) {
889 entry.positions.is_some()
890 } else {
891 false
892 }
893 }
894}
895
896pub type SegmentReader = AsyncSegmentReader;