1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use std::sync::Arc;
39
40use rustc_hash::FxHashMap;
41
42use super::vector_data::LazyFlatVectorData;
43use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
44use crate::dsl::{Document, Field, Schema};
45use crate::structures::{
46 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
47 RaBitQIndex, SSTableStats, TermInfo,
48};
49use crate::{DocId, Error, Result};
50
51use super::store::{AsyncStoreReader, RawStoreBlock};
52use super::types::{SegmentFiles, SegmentId, SegmentMeta};
53
54pub struct AsyncSegmentReader {
60 meta: SegmentMeta,
61 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
63 postings_handle: LazyFileHandle,
65 store: Arc<AsyncStoreReader>,
67 schema: Arc<Schema>,
68 doc_id_offset: DocId,
70 vector_indexes: FxHashMap<u32, VectorIndex>,
72 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
74 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76 sparse_indexes: FxHashMap<u32, SparseIndex>,
78 positions_handle: Option<LazyFileHandle>,
80}
81
82impl AsyncSegmentReader {
83 pub async fn open<D: Directory>(
85 dir: &D,
86 segment_id: SegmentId,
87 schema: Arc<Schema>,
88 doc_id_offset: DocId,
89 cache_blocks: usize,
90 ) -> Result<Self> {
91 let files = SegmentFiles::new(segment_id.0);
92
93 let meta_slice = dir.open_read(&files.meta).await?;
95 let meta_bytes = meta_slice.read_bytes().await?;
96 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
97 debug_assert_eq!(meta.id, segment_id.0);
98
99 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
101 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
102
103 let postings_handle = dir.open_lazy(&files.postings).await?;
105
106 let store_handle = dir.open_lazy(&files.store).await?;
108 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
109
110 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
112 let vector_indexes = vectors_data.indexes;
113 let flat_vectors = vectors_data.flat_vectors;
114
115 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
117
118 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
120
121 {
123 let mut parts = vec![format!(
124 "[segment] loaded {:016x}: docs={}",
125 segment_id.0, meta.num_docs
126 )];
127 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
128 parts.push(format!(
129 "dense: {} ann + {} flat fields",
130 vector_indexes.len(),
131 flat_vectors.len()
132 ));
133 }
134 for (field_id, idx) in &sparse_indexes {
135 parts.push(format!(
136 "sparse field {}: {} dims, ~{:.1} KB",
137 field_id,
138 idx.num_dimensions(),
139 idx.num_dimensions() as f64 * 24.0 / 1024.0
140 ));
141 }
142 log::debug!("{}", parts.join(", "));
143 }
144
145 Ok(Self {
146 meta,
147 term_dict: Arc::new(term_dict),
148 postings_handle,
149 store: Arc::new(store),
150 schema,
151 doc_id_offset,
152 vector_indexes,
153 flat_vectors,
154 coarse_centroids: FxHashMap::default(),
155 sparse_indexes,
156 positions_handle,
157 })
158 }
159
160 pub fn meta(&self) -> &SegmentMeta {
161 &self.meta
162 }
163
164 pub fn num_docs(&self) -> u32 {
165 self.meta.num_docs
166 }
167
168 pub fn avg_field_len(&self, field: Field) -> f32 {
170 self.meta.avg_field_len(field)
171 }
172
173 pub fn doc_id_offset(&self) -> DocId {
174 self.doc_id_offset
175 }
176
177 pub fn set_doc_id_offset(&mut self, offset: DocId) {
179 self.doc_id_offset = offset;
180 }
181
182 pub fn schema(&self) -> &Schema {
183 &self.schema
184 }
185
186 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
188 &self.sparse_indexes
189 }
190
191 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
193 &self.vector_indexes
194 }
195
196 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
198 &self.flat_vectors
199 }
200
201 pub fn term_dict_stats(&self) -> SSTableStats {
203 self.term_dict.stats()
204 }
205
206 pub fn memory_stats(&self) -> SegmentMemoryStats {
208 let term_dict_stats = self.term_dict.stats();
209
210 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
212
213 let store_cache_bytes = self.store.cached_blocks() * 4096;
215
216 let sparse_index_bytes: usize = self
218 .sparse_indexes
219 .values()
220 .map(|s| s.estimated_memory_bytes())
221 .sum();
222
223 let dense_index_bytes: usize = self
226 .vector_indexes
227 .values()
228 .map(|v| v.estimated_memory_bytes())
229 .sum();
230
231 SegmentMemoryStats {
232 segment_id: self.meta.id,
233 num_docs: self.meta.num_docs,
234 term_dict_cache_bytes,
235 store_cache_bytes,
236 sparse_index_bytes,
237 dense_index_bytes,
238 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
239 }
240 }
241
242 pub async fn get_postings(
247 &self,
248 field: Field,
249 term: &[u8],
250 ) -> Result<Option<BlockPostingList>> {
251 log::debug!(
252 "SegmentReader::get_postings field={} term_len={}",
253 field.0,
254 term.len()
255 );
256
257 let mut key = Vec::with_capacity(4 + term.len());
259 key.extend_from_slice(&field.0.to_le_bytes());
260 key.extend_from_slice(term);
261
262 let term_info = match self.term_dict.get(&key).await? {
264 Some(info) => {
265 log::debug!("SegmentReader::get_postings found term_info");
266 info
267 }
268 None => {
269 log::debug!("SegmentReader::get_postings term not found");
270 return Ok(None);
271 }
272 };
273
274 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
276 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
278 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
279 posting_list.push(doc_id, tf);
280 }
281 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
282 return Ok(Some(block_list));
283 }
284
285 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
287 Error::Corruption("TermInfo has neither inline nor external data".to_string())
288 })?;
289
290 let start = posting_offset;
291 let end = start + posting_len as u64;
292
293 if end > self.postings_handle.len() {
294 return Err(Error::Corruption(
295 "Posting offset out of bounds".to_string(),
296 ));
297 }
298
299 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
300 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
301
302 Ok(Some(block_list))
303 }
304
305 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
310 self.doc_with_fields(local_doc_id, None).await
311 }
312
313 pub async fn doc_with_fields(
319 &self,
320 local_doc_id: DocId,
321 fields: Option<&rustc_hash::FxHashSet<u32>>,
322 ) -> Result<Option<Document>> {
323 let mut doc = match fields {
324 Some(set) => {
325 let field_ids: Vec<u32> = set.iter().copied().collect();
326 match self
327 .store
328 .get_fields(local_doc_id, &self.schema, &field_ids)
329 .await
330 {
331 Ok(Some(d)) => d,
332 Ok(None) => return Ok(None),
333 Err(e) => return Err(Error::from(e)),
334 }
335 }
336 None => match self.store.get(local_doc_id, &self.schema).await {
337 Ok(Some(d)) => d,
338 Ok(None) => return Ok(None),
339 Err(e) => return Err(Error::from(e)),
340 },
341 };
342
343 for (&field_id, lazy_flat) in &self.flat_vectors {
345 if let Some(set) = fields
347 && !set.contains(&field_id)
348 {
349 continue;
350 }
351
352 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
353 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
354 let flat_idx = start + j;
355 match lazy_flat.get_vector(flat_idx).await {
356 Ok(vec) => {
357 doc.add_dense_vector(Field(field_id), vec);
358 }
359 Err(e) => {
360 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
361 }
362 }
363 }
364 }
365
366 Ok(Some(doc))
367 }
368
369 pub async fn prefetch_terms(
371 &self,
372 field: Field,
373 start_term: &[u8],
374 end_term: &[u8],
375 ) -> Result<()> {
376 let mut start_key = Vec::with_capacity(4 + start_term.len());
377 start_key.extend_from_slice(&field.0.to_le_bytes());
378 start_key.extend_from_slice(start_term);
379
380 let mut end_key = Vec::with_capacity(4 + end_term.len());
381 end_key.extend_from_slice(&field.0.to_le_bytes());
382 end_key.extend_from_slice(end_term);
383
384 self.term_dict.prefetch_range(&start_key, &end_key).await?;
385 Ok(())
386 }
387
388 pub fn store_has_dict(&self) -> bool {
390 self.store.has_dict()
391 }
392
393 pub fn store(&self) -> &super::store::AsyncStoreReader {
395 &self.store
396 }
397
398 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
400 self.store.raw_blocks()
401 }
402
403 pub fn store_data_slice(&self) -> &LazyFileSlice {
405 self.store.data_slice()
406 }
407
408 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
410 self.term_dict.all_entries().await.map_err(Error::from)
411 }
412
413 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
418 let entries = self.term_dict.all_entries().await?;
419 let mut result = Vec::with_capacity(entries.len());
420
421 for (key, term_info) in entries {
422 if key.len() > 4 {
424 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
425 let term_bytes = &key[4..];
426 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
427 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
428 }
429 }
430 }
431
432 Ok(result)
433 }
434
435 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
437 self.term_dict.iter()
438 }
439
440 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
444 self.term_dict
445 .prefetch_all_data_bulk()
446 .await
447 .map_err(crate::Error::from)
448 }
449
450 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
452 let start = offset;
453 let end = start + len as u64;
454 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
455 Ok(bytes.to_vec())
456 }
457
458 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
460 let handle = match &self.positions_handle {
461 Some(h) => h,
462 None => return Ok(None),
463 };
464 let start = offset;
465 let end = start + len as u64;
466 let bytes = handle.read_bytes_range(start..end).await?;
467 Ok(Some(bytes.to_vec()))
468 }
469
470 pub fn has_positions_file(&self) -> bool {
472 self.positions_handle.is_some()
473 }
474
475 fn score_quantized_batch(
481 query: &[f32],
482 raw: &[u8],
483 quant: crate::dsl::DenseVectorQuantization,
484 dim: usize,
485 scores: &mut [f32],
486 unit_norm: bool,
487 ) {
488 use crate::dsl::DenseVectorQuantization;
489 use crate::structures::simd;
490 match (quant, unit_norm) {
491 (DenseVectorQuantization::F32, false) => {
492 let num_floats = scores.len() * dim;
493 debug_assert!(
494 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
495 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
496 );
497 let vectors: &[f32] =
498 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
499 simd::batch_cosine_scores(query, vectors, dim, scores);
500 }
501 (DenseVectorQuantization::F32, true) => {
502 let num_floats = scores.len() * dim;
503 debug_assert!(
504 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
505 "f32 vector data not 4-byte aligned"
506 );
507 let vectors: &[f32] =
508 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
509 simd::batch_dot_scores(query, vectors, dim, scores);
510 }
511 (DenseVectorQuantization::F16, false) => {
512 simd::batch_cosine_scores_f16(query, raw, dim, scores);
513 }
514 (DenseVectorQuantization::F16, true) => {
515 simd::batch_dot_scores_f16(query, raw, dim, scores);
516 }
517 (DenseVectorQuantization::UInt8, false) => {
518 simd::batch_cosine_scores_u8(query, raw, dim, scores);
519 }
520 (DenseVectorQuantization::UInt8, true) => {
521 simd::batch_dot_scores_u8(query, raw, dim, scores);
522 }
523 }
524 }
525
526 pub async fn search_dense_vector(
532 &self,
533 field: Field,
534 query: &[f32],
535 k: usize,
536 nprobe: usize,
537 rerank_factor: usize,
538 combiner: crate::query::MultiValueCombiner,
539 ) -> Result<Vec<VectorSearchResult>> {
540 let ann_index = self.vector_indexes.get(&field.0);
541 let lazy_flat = self.flat_vectors.get(&field.0);
542
543 if ann_index.is_none() && lazy_flat.is_none() {
545 return Ok(Vec::new());
546 }
547
548 let unit_norm = self
550 .schema
551 .get_field_entry(field)
552 .and_then(|e| e.dense_vector_config.as_ref())
553 .is_some_and(|c| c.unit_norm);
554
555 const BRUTE_FORCE_BATCH: usize = 4096;
557
558 let t0 = std::time::Instant::now();
560 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
561 match index {
563 VectorIndex::RaBitQ(lazy) => {
564 let rabitq = lazy.get().ok_or_else(|| {
565 Error::Schema("RaBitQ index deserialization failed".to_string())
566 })?;
567 let fetch_k = k * rerank_factor.max(1);
568 rabitq
569 .search(query, fetch_k, rerank_factor)
570 .into_iter()
571 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
572 .collect()
573 }
574 VectorIndex::IVF(lazy) => {
575 let (index, codebook) = lazy.get().ok_or_else(|| {
576 Error::Schema("IVF index deserialization failed".to_string())
577 })?;
578 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
579 Error::Schema(format!(
580 "IVF index requires coarse centroids for field {}",
581 field.0
582 ))
583 })?;
584 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
585 let fetch_k = k * rerank_factor.max(1);
586 index
587 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
588 .into_iter()
589 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
590 .collect()
591 }
592 VectorIndex::ScaNN(lazy) => {
593 let (index, codebook) = lazy.get().ok_or_else(|| {
594 Error::Schema("ScaNN index deserialization failed".to_string())
595 })?;
596 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
597 Error::Schema(format!(
598 "ScaNN index requires coarse centroids for field {}",
599 field.0
600 ))
601 })?;
602 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
603 let fetch_k = k * rerank_factor.max(1);
604 index
605 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
606 .into_iter()
607 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
608 .collect()
609 }
610 }
611 } else if let Some(lazy_flat) = lazy_flat {
612 log::debug!(
615 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
616 field.0,
617 lazy_flat.num_vectors,
618 lazy_flat.dim,
619 lazy_flat.quantization
620 );
621 let dim = lazy_flat.dim;
622 let n = lazy_flat.num_vectors;
623 let quant = lazy_flat.quantization;
624 let fetch_k = k * rerank_factor.max(1);
625 let mut collector = crate::query::ScoreCollector::new(fetch_k);
626 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
627
628 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
629 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
630 let batch_bytes = lazy_flat
631 .read_vectors_batch(batch_start, batch_count)
632 .await
633 .map_err(crate::Error::Io)?;
634 let raw = batch_bytes.as_slice();
635
636 Self::score_quantized_batch(
637 query,
638 raw,
639 quant,
640 dim,
641 &mut scores[..batch_count],
642 unit_norm,
643 );
644
645 for (i, &score) in scores.iter().enumerate().take(batch_count) {
646 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
647 collector.insert_with_ordinal(doc_id, score, ordinal);
648 }
649 }
650
651 collector
652 .into_sorted_results()
653 .into_iter()
654 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
655 .collect()
656 } else {
657 return Ok(Vec::new());
658 };
659 let l1_elapsed = t0.elapsed();
660 log::debug!(
661 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
662 field.0,
663 results.len(),
664 l1_elapsed.as_secs_f64() * 1000.0
665 );
666
667 if ann_index.is_some()
670 && !results.is_empty()
671 && let Some(lazy_flat) = lazy_flat
672 {
673 let t_rerank = std::time::Instant::now();
674 let dim = lazy_flat.dim;
675 let quant = lazy_flat.quantization;
676 let vbs = lazy_flat.vector_byte_size();
677
678 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
681 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
682 for (j, &(_, ord)) in entries.iter().enumerate() {
683 if ord == c.1 {
684 resolved.push((ri, start + j));
685 break;
686 }
687 }
688 }
689
690 let t_resolve = t_rerank.elapsed();
691 if !resolved.is_empty() {
692 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
694
695 let t_read = std::time::Instant::now();
697 let mut raw_buf = vec![0u8; resolved.len() * vbs];
698 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
699 let _ = lazy_flat
700 .read_vector_raw_into(
701 flat_idx,
702 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
703 )
704 .await;
705 }
706
707 let read_elapsed = t_read.elapsed();
708
709 let t_score = std::time::Instant::now();
711 let mut scores = vec![0f32; resolved.len()];
712 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
713 let score_elapsed = t_score.elapsed();
714
715 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
717 results[ri].2 = scores[buf_idx];
718 }
719
720 log::debug!(
721 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
722 field.0,
723 resolved.len(),
724 dim,
725 quant,
726 vbs,
727 t_resolve.as_secs_f64() * 1000.0,
728 read_elapsed.as_secs_f64() * 1000.0,
729 score_elapsed.as_secs_f64() * 1000.0,
730 );
731 }
732
733 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
734 results.truncate(k * rerank_factor.max(1));
735 log::debug!(
736 "[search_dense] field {}: rerank total={:.1}ms",
737 field.0,
738 t_rerank.elapsed().as_secs_f64() * 1000.0
739 );
740 }
741
742 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
745 rustc_hash::FxHashMap::default();
746 for (doc_id, ordinal, score) in results {
747 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
748 ordinals.push((ordinal as u32, score));
749 }
750
751 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
753 .into_iter()
754 .map(|(doc_id, ordinals)| {
755 let combined_score = combiner.combine(&ordinals);
756 VectorSearchResult::new(doc_id, combined_score, ordinals)
757 })
758 .collect();
759
760 final_results.sort_by(|a, b| {
762 b.score
763 .partial_cmp(&a.score)
764 .unwrap_or(std::cmp::Ordering::Equal)
765 });
766 final_results.truncate(k);
767
768 Ok(final_results)
769 }
770
771 pub fn has_dense_vector_index(&self, field: Field) -> bool {
773 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
774 }
775
776 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
778 match self.vector_indexes.get(&field.0) {
779 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
780 _ => None,
781 }
782 }
783
784 pub fn get_ivf_vector_index(
786 &self,
787 field: Field,
788 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
789 match self.vector_indexes.get(&field.0) {
790 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
791 _ => None,
792 }
793 }
794
795 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
797 self.coarse_centroids.get(&field_id)
798 }
799
800 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
802 self.coarse_centroids = centroids;
803 }
804
805 pub fn get_scann_vector_index(
807 &self,
808 field: Field,
809 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
810 match self.vector_indexes.get(&field.0) {
811 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
812 _ => None,
813 }
814 }
815
816 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
818 self.vector_indexes.get(&field.0)
819 }
820
821 pub async fn search_sparse_vector(
831 &self,
832 field: Field,
833 vector: &[(u32, f32)],
834 limit: usize,
835 combiner: crate::query::MultiValueCombiner,
836 heap_factor: f32,
837 ) -> Result<Vec<VectorSearchResult>> {
838 use crate::query::BmpExecutor;
839
840 let query_tokens = vector.len();
841
842 let sparse_index = match self.sparse_indexes.get(&field.0) {
844 Some(idx) => idx,
845 None => {
846 log::debug!(
847 "Sparse vector search: no index for field {}, returning empty",
848 field.0
849 );
850 return Ok(Vec::new());
851 }
852 };
853
854 let index_dimensions = sparse_index.num_dimensions();
855
856 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
858 let mut missing_count = 0usize;
859
860 for &(dim_id, query_weight) in vector {
861 if sparse_index.has_dimension(dim_id) {
862 matched_terms.push((dim_id, query_weight));
863 } else {
864 missing_count += 1;
865 }
866 }
867
868 log::debug!(
869 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
870 query_tokens,
871 matched_terms.len(),
872 missing_count,
873 index_dimensions
874 );
875
876 if matched_terms.is_empty() {
877 log::debug!("Sparse vector search: no matching tokens, returning empty");
878 return Ok(Vec::new());
879 }
880
881 let num_terms = matched_terms.len();
885 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
887 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
889 .execute()
890 .await?
891 } else {
892 crate::query::LazyBlockMaxScoreExecutor::new(
895 sparse_index,
896 matched_terms,
897 over_fetch,
898 heap_factor,
899 )
900 .execute()
901 .await?
902 };
903
904 log::trace!(
905 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
906 raw_results.len(),
907 self.doc_id_offset
908 );
909 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
910 for r in raw_results.iter().take(5) {
911 log::trace!(
912 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
913 r.doc_id,
914 r.doc_id + self.doc_id_offset,
915 r.score,
916 r.ordinal
917 );
918 }
919 }
920
921 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
924 rustc_hash::FxHashMap::default();
925 for r in raw_results {
926 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
927 ordinals.push((r.ordinal as u32, r.score));
928 }
929
930 let mut results: Vec<VectorSearchResult> = doc_ordinals
933 .into_iter()
934 .map(|(doc_id, ordinals)| {
935 let combined_score = combiner.combine(&ordinals);
936 VectorSearchResult::new(doc_id, combined_score, ordinals)
937 })
938 .collect();
939
940 results.sort_by(|a, b| {
942 b.score
943 .partial_cmp(&a.score)
944 .unwrap_or(std::cmp::Ordering::Equal)
945 });
946 results.truncate(limit);
947
948 Ok(results)
949 }
950
951 pub async fn get_positions(
956 &self,
957 field: Field,
958 term: &[u8],
959 ) -> Result<Option<crate::structures::PositionPostingList>> {
960 let handle = match &self.positions_handle {
962 Some(h) => h,
963 None => return Ok(None),
964 };
965
966 let mut key = Vec::with_capacity(4 + term.len());
968 key.extend_from_slice(&field.0.to_le_bytes());
969 key.extend_from_slice(term);
970
971 let term_info = match self.term_dict.get(&key).await? {
973 Some(info) => info,
974 None => return Ok(None),
975 };
976
977 let (offset, length) = match term_info.position_info() {
979 Some((o, l)) => (o, l),
980 None => return Ok(None),
981 };
982
983 let slice = handle.slice(offset..offset + length as u64);
985 let data = slice.read_bytes().await?;
986
987 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
989
990 Ok(Some(pos_list))
991 }
992
993 pub fn has_positions(&self, field: Field) -> bool {
995 if let Some(entry) = self.schema.get_field_entry(field) {
997 entry.positions.is_some()
998 } else {
999 false
1000 }
1001 }
1002}
1003
1004pub type SegmentReader = AsyncSegmentReader;