1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: FileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 doc_id_offset: DocId,
72 vector_indexes: FxHashMap<u32, VectorIndex>,
74 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
76 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
78 sparse_indexes: FxHashMap<u32, SparseIndex>,
80 positions_handle: Option<FileHandle>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 doc_id_offset: DocId,
91 cache_blocks: usize,
92 ) -> Result<Self> {
93 let files = SegmentFiles::new(segment_id.0);
94
95 let meta_slice = dir.open_read(&files.meta).await?;
97 let meta_bytes = meta_slice.read_bytes().await?;
98 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
99 debug_assert_eq!(meta.id, segment_id.0);
100
101 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
103 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
104
105 let postings_handle = dir.open_lazy(&files.postings).await?;
107
108 let store_handle = dir.open_lazy(&files.store).await?;
110 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
111
112 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
114 let vector_indexes = vectors_data.indexes;
115 let flat_vectors = vectors_data.flat_vectors;
116
117 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
119
120 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
122
123 {
125 let mut parts = vec![format!(
126 "[segment] loaded {:016x}: docs={}",
127 segment_id.0, meta.num_docs
128 )];
129 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
130 parts.push(format!(
131 "dense: {} ann + {} flat fields",
132 vector_indexes.len(),
133 flat_vectors.len()
134 ));
135 }
136 for (field_id, idx) in &sparse_indexes {
137 parts.push(format!(
138 "sparse field {}: {} dims, ~{:.1} KB",
139 field_id,
140 idx.num_dimensions(),
141 idx.num_dimensions() as f64 * 24.0 / 1024.0
142 ));
143 }
144 log::debug!("{}", parts.join(", "));
145 }
146
147 Ok(Self {
148 meta,
149 term_dict: Arc::new(term_dict),
150 postings_handle,
151 store: Arc::new(store),
152 schema,
153 doc_id_offset,
154 vector_indexes,
155 flat_vectors,
156 coarse_centroids: FxHashMap::default(),
157 sparse_indexes,
158 positions_handle,
159 })
160 }
161
162 pub fn meta(&self) -> &SegmentMeta {
163 &self.meta
164 }
165
166 pub fn num_docs(&self) -> u32 {
167 self.meta.num_docs
168 }
169
170 pub fn avg_field_len(&self, field: Field) -> f32 {
172 self.meta.avg_field_len(field)
173 }
174
175 pub fn doc_id_offset(&self) -> DocId {
176 self.doc_id_offset
177 }
178
179 pub fn set_doc_id_offset(&mut self, offset: DocId) {
181 self.doc_id_offset = offset;
182 }
183
184 pub fn schema(&self) -> &Schema {
185 &self.schema
186 }
187
188 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
190 &self.sparse_indexes
191 }
192
193 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
195 &self.vector_indexes
196 }
197
198 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
200 &self.flat_vectors
201 }
202
203 pub fn term_dict_stats(&self) -> SSTableStats {
205 self.term_dict.stats()
206 }
207
208 pub fn memory_stats(&self) -> SegmentMemoryStats {
210 let term_dict_stats = self.term_dict.stats();
211
212 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
214
215 let store_cache_bytes = self.store.cached_blocks() * 4096;
217
218 let sparse_index_bytes: usize = self
220 .sparse_indexes
221 .values()
222 .map(|s| s.estimated_memory_bytes())
223 .sum();
224
225 let dense_index_bytes: usize = self
228 .vector_indexes
229 .values()
230 .map(|v| v.estimated_memory_bytes())
231 .sum();
232
233 SegmentMemoryStats {
234 segment_id: self.meta.id,
235 num_docs: self.meta.num_docs,
236 term_dict_cache_bytes,
237 store_cache_bytes,
238 sparse_index_bytes,
239 dense_index_bytes,
240 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
241 }
242 }
243
244 pub async fn get_postings(
249 &self,
250 field: Field,
251 term: &[u8],
252 ) -> Result<Option<BlockPostingList>> {
253 log::debug!(
254 "SegmentReader::get_postings field={} term_len={}",
255 field.0,
256 term.len()
257 );
258
259 let mut key = Vec::with_capacity(4 + term.len());
261 key.extend_from_slice(&field.0.to_le_bytes());
262 key.extend_from_slice(term);
263
264 let term_info = match self.term_dict.get(&key).await? {
266 Some(info) => {
267 log::debug!("SegmentReader::get_postings found term_info");
268 info
269 }
270 None => {
271 log::debug!("SegmentReader::get_postings term not found");
272 return Ok(None);
273 }
274 };
275
276 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
278 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
280 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
281 posting_list.push(doc_id, tf);
282 }
283 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
284 return Ok(Some(block_list));
285 }
286
287 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
289 Error::Corruption("TermInfo has neither inline nor external data".to_string())
290 })?;
291
292 let start = posting_offset;
293 let end = start + posting_len;
294
295 if end > self.postings_handle.len() {
296 return Err(Error::Corruption(
297 "Posting offset out of bounds".to_string(),
298 ));
299 }
300
301 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
302 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
303
304 Ok(Some(block_list))
305 }
306
307 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
312 self.doc_with_fields(local_doc_id, None).await
313 }
314
315 pub async fn doc_with_fields(
321 &self,
322 local_doc_id: DocId,
323 fields: Option<&rustc_hash::FxHashSet<u32>>,
324 ) -> Result<Option<Document>> {
325 let mut doc = match fields {
326 Some(set) => {
327 let field_ids: Vec<u32> = set.iter().copied().collect();
328 match self
329 .store
330 .get_fields(local_doc_id, &self.schema, &field_ids)
331 .await
332 {
333 Ok(Some(d)) => d,
334 Ok(None) => return Ok(None),
335 Err(e) => return Err(Error::from(e)),
336 }
337 }
338 None => match self.store.get(local_doc_id, &self.schema).await {
339 Ok(Some(d)) => d,
340 Ok(None) => return Ok(None),
341 Err(e) => return Err(Error::from(e)),
342 },
343 };
344
345 for (&field_id, lazy_flat) in &self.flat_vectors {
347 if let Some(set) = fields
349 && !set.contains(&field_id)
350 {
351 continue;
352 }
353
354 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
355 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
356 let flat_idx = start + j;
357 match lazy_flat.get_vector(flat_idx).await {
358 Ok(vec) => {
359 doc.add_dense_vector(Field(field_id), vec);
360 }
361 Err(e) => {
362 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
363 }
364 }
365 }
366 }
367
368 Ok(Some(doc))
369 }
370
371 pub async fn prefetch_terms(
373 &self,
374 field: Field,
375 start_term: &[u8],
376 end_term: &[u8],
377 ) -> Result<()> {
378 let mut start_key = Vec::with_capacity(4 + start_term.len());
379 start_key.extend_from_slice(&field.0.to_le_bytes());
380 start_key.extend_from_slice(start_term);
381
382 let mut end_key = Vec::with_capacity(4 + end_term.len());
383 end_key.extend_from_slice(&field.0.to_le_bytes());
384 end_key.extend_from_slice(end_term);
385
386 self.term_dict.prefetch_range(&start_key, &end_key).await?;
387 Ok(())
388 }
389
390 pub fn store_has_dict(&self) -> bool {
392 self.store.has_dict()
393 }
394
395 pub fn store(&self) -> &super::store::AsyncStoreReader {
397 &self.store
398 }
399
400 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
402 self.store.raw_blocks()
403 }
404
405 pub fn store_data_slice(&self) -> &FileHandle {
407 self.store.data_slice()
408 }
409
410 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
412 self.term_dict.all_entries().await.map_err(Error::from)
413 }
414
415 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
420 let entries = self.term_dict.all_entries().await?;
421 let mut result = Vec::with_capacity(entries.len());
422
423 for (key, term_info) in entries {
424 if key.len() > 4 {
426 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
427 let term_bytes = &key[4..];
428 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
429 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
430 }
431 }
432 }
433
434 Ok(result)
435 }
436
437 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
439 self.term_dict.iter()
440 }
441
442 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
446 self.term_dict
447 .prefetch_all_data_bulk()
448 .await
449 .map_err(crate::Error::from)
450 }
451
452 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
454 let start = offset;
455 let end = start + len;
456 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
457 Ok(bytes.to_vec())
458 }
459
460 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
462 let handle = match &self.positions_handle {
463 Some(h) => h,
464 None => return Ok(None),
465 };
466 let start = offset;
467 let end = start + len;
468 let bytes = handle.read_bytes_range(start..end).await?;
469 Ok(Some(bytes.to_vec()))
470 }
471
472 pub fn has_positions_file(&self) -> bool {
474 self.positions_handle.is_some()
475 }
476
477 fn score_quantized_batch(
483 query: &[f32],
484 raw: &[u8],
485 quant: crate::dsl::DenseVectorQuantization,
486 dim: usize,
487 scores: &mut [f32],
488 unit_norm: bool,
489 ) {
490 use crate::dsl::DenseVectorQuantization;
491 use crate::structures::simd;
492 match (quant, unit_norm) {
493 (DenseVectorQuantization::F32, false) => {
494 let num_floats = scores.len() * dim;
495 debug_assert!(
496 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
497 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
498 );
499 let vectors: &[f32] =
500 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
501 simd::batch_cosine_scores(query, vectors, dim, scores);
502 }
503 (DenseVectorQuantization::F32, true) => {
504 let num_floats = scores.len() * dim;
505 debug_assert!(
506 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
507 "f32 vector data not 4-byte aligned"
508 );
509 let vectors: &[f32] =
510 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
511 simd::batch_dot_scores(query, vectors, dim, scores);
512 }
513 (DenseVectorQuantization::F16, false) => {
514 simd::batch_cosine_scores_f16(query, raw, dim, scores);
515 }
516 (DenseVectorQuantization::F16, true) => {
517 simd::batch_dot_scores_f16(query, raw, dim, scores);
518 }
519 (DenseVectorQuantization::UInt8, false) => {
520 simd::batch_cosine_scores_u8(query, raw, dim, scores);
521 }
522 (DenseVectorQuantization::UInt8, true) => {
523 simd::batch_dot_scores_u8(query, raw, dim, scores);
524 }
525 }
526 }
527
528 pub async fn search_dense_vector(
534 &self,
535 field: Field,
536 query: &[f32],
537 k: usize,
538 nprobe: usize,
539 rerank_factor: usize,
540 combiner: crate::query::MultiValueCombiner,
541 ) -> Result<Vec<VectorSearchResult>> {
542 let ann_index = self.vector_indexes.get(&field.0);
543 let lazy_flat = self.flat_vectors.get(&field.0);
544
545 if ann_index.is_none() && lazy_flat.is_none() {
547 return Ok(Vec::new());
548 }
549
550 let unit_norm = self
552 .schema
553 .get_field_entry(field)
554 .and_then(|e| e.dense_vector_config.as_ref())
555 .is_some_and(|c| c.unit_norm);
556
557 const BRUTE_FORCE_BATCH: usize = 4096;
559
560 let t0 = std::time::Instant::now();
562 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
563 match index {
565 VectorIndex::RaBitQ(lazy) => {
566 let rabitq = lazy.get().ok_or_else(|| {
567 Error::Schema("RaBitQ index deserialization failed".to_string())
568 })?;
569 let fetch_k = k * rerank_factor.max(1);
570 rabitq
571 .search(query, fetch_k, rerank_factor)
572 .into_iter()
573 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
574 .collect()
575 }
576 VectorIndex::IVF(lazy) => {
577 let (index, codebook) = lazy.get().ok_or_else(|| {
578 Error::Schema("IVF index deserialization failed".to_string())
579 })?;
580 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
581 Error::Schema(format!(
582 "IVF index requires coarse centroids for field {}",
583 field.0
584 ))
585 })?;
586 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
587 let fetch_k = k * rerank_factor.max(1);
588 index
589 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
590 .into_iter()
591 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
592 .collect()
593 }
594 VectorIndex::ScaNN(lazy) => {
595 let (index, codebook) = lazy.get().ok_or_else(|| {
596 Error::Schema("ScaNN index deserialization failed".to_string())
597 })?;
598 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
599 Error::Schema(format!(
600 "ScaNN index requires coarse centroids for field {}",
601 field.0
602 ))
603 })?;
604 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
605 let fetch_k = k * rerank_factor.max(1);
606 index
607 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
608 .into_iter()
609 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
610 .collect()
611 }
612 }
613 } else if let Some(lazy_flat) = lazy_flat {
614 log::debug!(
617 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
618 field.0,
619 lazy_flat.num_vectors,
620 lazy_flat.dim,
621 lazy_flat.quantization
622 );
623 let dim = lazy_flat.dim;
624 let n = lazy_flat.num_vectors;
625 let quant = lazy_flat.quantization;
626 let fetch_k = k * rerank_factor.max(1);
627 let mut collector = crate::query::ScoreCollector::new(fetch_k);
628 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
629
630 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
631 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
632 let batch_bytes = lazy_flat
633 .read_vectors_batch(batch_start, batch_count)
634 .await
635 .map_err(crate::Error::Io)?;
636 let raw = batch_bytes.as_slice();
637
638 Self::score_quantized_batch(
639 query,
640 raw,
641 quant,
642 dim,
643 &mut scores[..batch_count],
644 unit_norm,
645 );
646
647 for (i, &score) in scores.iter().enumerate().take(batch_count) {
648 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
649 collector.insert_with_ordinal(doc_id, score, ordinal);
650 }
651 }
652
653 collector
654 .into_sorted_results()
655 .into_iter()
656 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
657 .collect()
658 } else {
659 return Ok(Vec::new());
660 };
661 let l1_elapsed = t0.elapsed();
662 log::debug!(
663 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
664 field.0,
665 results.len(),
666 l1_elapsed.as_secs_f64() * 1000.0
667 );
668
669 if ann_index.is_some()
672 && !results.is_empty()
673 && let Some(lazy_flat) = lazy_flat
674 {
675 let t_rerank = std::time::Instant::now();
676 let dim = lazy_flat.dim;
677 let quant = lazy_flat.quantization;
678 let vbs = lazy_flat.vector_byte_size();
679
680 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
683 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
684 for (j, &(_, ord)) in entries.iter().enumerate() {
685 if ord == c.1 {
686 resolved.push((ri, start + j));
687 break;
688 }
689 }
690 }
691
692 let t_resolve = t_rerank.elapsed();
693 if !resolved.is_empty() {
694 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
696
697 let t_read = std::time::Instant::now();
699 let mut raw_buf = vec![0u8; resolved.len() * vbs];
700 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
701 let _ = lazy_flat
702 .read_vector_raw_into(
703 flat_idx,
704 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
705 )
706 .await;
707 }
708
709 let read_elapsed = t_read.elapsed();
710
711 let t_score = std::time::Instant::now();
713 let mut scores = vec![0f32; resolved.len()];
714 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
715 let score_elapsed = t_score.elapsed();
716
717 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
719 results[ri].2 = scores[buf_idx];
720 }
721
722 log::debug!(
723 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
724 field.0,
725 resolved.len(),
726 dim,
727 quant,
728 vbs,
729 t_resolve.as_secs_f64() * 1000.0,
730 read_elapsed.as_secs_f64() * 1000.0,
731 score_elapsed.as_secs_f64() * 1000.0,
732 );
733 }
734
735 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
736 results.truncate(k * rerank_factor.max(1));
737 log::debug!(
738 "[search_dense] field {}: rerank total={:.1}ms",
739 field.0,
740 t_rerank.elapsed().as_secs_f64() * 1000.0
741 );
742 }
743
744 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
747 rustc_hash::FxHashMap::default();
748 for (doc_id, ordinal, score) in results {
749 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
750 ordinals.push((ordinal as u32, score));
751 }
752
753 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
755 .into_iter()
756 .map(|(doc_id, ordinals)| {
757 let combined_score = combiner.combine(&ordinals);
758 VectorSearchResult::new(doc_id, combined_score, ordinals)
759 })
760 .collect();
761
762 final_results.sort_by(|a, b| {
764 b.score
765 .partial_cmp(&a.score)
766 .unwrap_or(std::cmp::Ordering::Equal)
767 });
768 final_results.truncate(k);
769
770 Ok(final_results)
771 }
772
773 pub fn has_dense_vector_index(&self, field: Field) -> bool {
775 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
776 }
777
778 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
780 match self.vector_indexes.get(&field.0) {
781 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
782 _ => None,
783 }
784 }
785
786 pub fn get_ivf_vector_index(
788 &self,
789 field: Field,
790 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
791 match self.vector_indexes.get(&field.0) {
792 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
793 _ => None,
794 }
795 }
796
797 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
799 self.coarse_centroids.get(&field_id)
800 }
801
802 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
804 self.coarse_centroids = centroids;
805 }
806
807 pub fn get_scann_vector_index(
809 &self,
810 field: Field,
811 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
812 match self.vector_indexes.get(&field.0) {
813 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
814 _ => None,
815 }
816 }
817
818 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
820 self.vector_indexes.get(&field.0)
821 }
822
823 pub async fn search_sparse_vector(
833 &self,
834 field: Field,
835 vector: &[(u32, f32)],
836 limit: usize,
837 combiner: crate::query::MultiValueCombiner,
838 heap_factor: f32,
839 ) -> Result<Vec<VectorSearchResult>> {
840 use crate::query::BmpExecutor;
841
842 let query_tokens = vector.len();
843
844 let sparse_index = match self.sparse_indexes.get(&field.0) {
846 Some(idx) => idx,
847 None => {
848 log::debug!(
849 "Sparse vector search: no index for field {}, returning empty",
850 field.0
851 );
852 return Ok(Vec::new());
853 }
854 };
855
856 let index_dimensions = sparse_index.num_dimensions();
857
858 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
860 let mut missing_count = 0usize;
861
862 for &(dim_id, query_weight) in vector {
863 if sparse_index.has_dimension(dim_id) {
864 matched_terms.push((dim_id, query_weight));
865 } else {
866 missing_count += 1;
867 }
868 }
869
870 log::debug!(
871 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
872 query_tokens,
873 matched_terms.len(),
874 missing_count,
875 index_dimensions
876 );
877
878 if matched_terms.is_empty() {
879 log::debug!("Sparse vector search: no matching tokens, returning empty");
880 return Ok(Vec::new());
881 }
882
883 let num_terms = matched_terms.len();
887 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
889 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
891 .execute()
892 .await?
893 } else {
894 crate::query::SparseMaxScoreExecutor::new(
897 sparse_index,
898 matched_terms,
899 over_fetch,
900 heap_factor,
901 )
902 .execute()
903 .await?
904 };
905
906 log::trace!(
907 "Sparse search returned {} raw results for segment (doc_id_offset={})",
908 raw_results.len(),
909 self.doc_id_offset
910 );
911 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
912 for r in raw_results.iter().take(5) {
913 log::trace!(
914 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
915 r.doc_id,
916 r.doc_id + self.doc_id_offset,
917 r.score,
918 r.ordinal
919 );
920 }
921 }
922
923 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
926 rustc_hash::FxHashMap::default();
927 for r in raw_results {
928 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
929 ordinals.push((r.ordinal as u32, r.score));
930 }
931
932 let mut results: Vec<VectorSearchResult> = doc_ordinals
935 .into_iter()
936 .map(|(doc_id, ordinals)| {
937 let combined_score = combiner.combine(&ordinals);
938 VectorSearchResult::new(doc_id, combined_score, ordinals)
939 })
940 .collect();
941
942 results.sort_by(|a, b| {
944 b.score
945 .partial_cmp(&a.score)
946 .unwrap_or(std::cmp::Ordering::Equal)
947 });
948 results.truncate(limit);
949
950 Ok(results)
951 }
952
953 pub async fn get_positions(
958 &self,
959 field: Field,
960 term: &[u8],
961 ) -> Result<Option<crate::structures::PositionPostingList>> {
962 let handle = match &self.positions_handle {
964 Some(h) => h,
965 None => return Ok(None),
966 };
967
968 let mut key = Vec::with_capacity(4 + term.len());
970 key.extend_from_slice(&field.0.to_le_bytes());
971 key.extend_from_slice(term);
972
973 let term_info = match self.term_dict.get(&key).await? {
975 Some(info) => info,
976 None => return Ok(None),
977 };
978
979 let (offset, length) = match term_info.position_info() {
981 Some((o, l)) => (o, l),
982 None => return Ok(None),
983 };
984
985 let slice = handle.slice(offset..offset + length);
987 let data = slice.read_bytes().await?;
988
989 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
991
992 Ok(Some(pos_list))
993 }
994
995 pub fn has_positions(&self, field: Field) -> bool {
997 if let Some(entry) = self.schema.get_field_entry(field) {
999 entry.positions.is_some()
1000 } else {
1001 false
1002 }
1003 }
1004}
1005
1006pub type SegmentReader = AsyncSegmentReader;