1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub struct AsyncSegmentReader {
62 meta: SegmentMeta,
63 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
65 postings_handle: FileHandle,
67 store: Arc<AsyncStoreReader>,
69 schema: Arc<Schema>,
70 vector_indexes: FxHashMap<u32, VectorIndex>,
72 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
74 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76 sparse_indexes: FxHashMap<u32, SparseIndex>,
78 positions_handle: Option<FileHandle>,
80 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
82}
83
84impl AsyncSegmentReader {
85 pub async fn open<D: Directory>(
87 dir: &D,
88 segment_id: SegmentId,
89 schema: Arc<Schema>,
90 cache_blocks: usize,
91 ) -> Result<Self> {
92 let files = SegmentFiles::new(segment_id.0);
93
94 let meta_slice = dir.open_read(&files.meta).await?;
96 let meta_bytes = meta_slice.read_bytes().await?;
97 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
98 debug_assert_eq!(meta.id, segment_id.0);
99
100 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
102 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
103
104 let postings_handle = dir.open_lazy(&files.postings).await?;
106
107 let store_handle = dir.open_lazy(&files.store).await?;
109 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
110
111 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
113 let vector_indexes = vectors_data.indexes;
114 let flat_vectors = vectors_data.flat_vectors;
115
116 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
118
119 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
121
122 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
124
125 {
127 let mut parts = vec![format!(
128 "[segment] loaded {:016x}: docs={}",
129 segment_id.0, meta.num_docs
130 )];
131 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
132 parts.push(format!(
133 "dense: {} ann + {} flat fields",
134 vector_indexes.len(),
135 flat_vectors.len()
136 ));
137 }
138 for (field_id, idx) in &sparse_indexes {
139 parts.push(format!(
140 "sparse field {}: {} dims, ~{:.1} KB",
141 field_id,
142 idx.num_dimensions(),
143 idx.num_dimensions() as f64 * 24.0 / 1024.0
144 ));
145 }
146 if !fast_fields.is_empty() {
147 parts.push(format!("fast: {} fields", fast_fields.len()));
148 }
149 log::debug!("{}", parts.join(", "));
150 }
151
152 Ok(Self {
153 meta,
154 term_dict: Arc::new(term_dict),
155 postings_handle,
156 store: Arc::new(store),
157 schema,
158 vector_indexes,
159 flat_vectors,
160 coarse_centroids: FxHashMap::default(),
161 sparse_indexes,
162 positions_handle,
163 fast_fields,
164 })
165 }
166
167 pub fn meta(&self) -> &SegmentMeta {
168 &self.meta
169 }
170
171 pub fn num_docs(&self) -> u32 {
172 self.meta.num_docs
173 }
174
175 pub fn avg_field_len(&self, field: Field) -> f32 {
177 self.meta.avg_field_len(field)
178 }
179
180 pub fn schema(&self) -> &Schema {
181 &self.schema
182 }
183
184 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
186 &self.sparse_indexes
187 }
188
189 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
191 &self.vector_indexes
192 }
193
194 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
196 &self.flat_vectors
197 }
198
199 pub fn fast_field(
201 &self,
202 field_id: u32,
203 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
204 self.fast_fields.get(&field_id)
205 }
206
207 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
209 &self.fast_fields
210 }
211
212 pub fn term_dict_stats(&self) -> SSTableStats {
214 self.term_dict.stats()
215 }
216
217 pub fn memory_stats(&self) -> SegmentMemoryStats {
219 let term_dict_stats = self.term_dict.stats();
220
221 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
223
224 let store_cache_bytes = self.store.cached_blocks() * 4096;
226
227 let sparse_index_bytes: usize = self
229 .sparse_indexes
230 .values()
231 .map(|s| s.estimated_memory_bytes())
232 .sum();
233
234 let dense_index_bytes: usize = self
237 .vector_indexes
238 .values()
239 .map(|v| v.estimated_memory_bytes())
240 .sum();
241
242 SegmentMemoryStats {
243 segment_id: self.meta.id,
244 num_docs: self.meta.num_docs,
245 term_dict_cache_bytes,
246 store_cache_bytes,
247 sparse_index_bytes,
248 dense_index_bytes,
249 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
250 }
251 }
252
253 pub async fn get_postings(
258 &self,
259 field: Field,
260 term: &[u8],
261 ) -> Result<Option<BlockPostingList>> {
262 log::debug!(
263 "SegmentReader::get_postings field={} term_len={}",
264 field.0,
265 term.len()
266 );
267
268 let mut key = Vec::with_capacity(4 + term.len());
270 key.extend_from_slice(&field.0.to_le_bytes());
271 key.extend_from_slice(term);
272
273 let term_info = match self.term_dict.get(&key).await? {
275 Some(info) => {
276 log::debug!("SegmentReader::get_postings found term_info");
277 info
278 }
279 None => {
280 log::debug!("SegmentReader::get_postings term not found");
281 return Ok(None);
282 }
283 };
284
285 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
287 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
289 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
290 posting_list.push(doc_id, tf);
291 }
292 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
293 return Ok(Some(block_list));
294 }
295
296 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
298 Error::Corruption("TermInfo has neither inline nor external data".to_string())
299 })?;
300
301 let start = posting_offset;
302 let end = start + posting_len;
303
304 if end > self.postings_handle.len() {
305 return Err(Error::Corruption(
306 "Posting offset out of bounds".to_string(),
307 ));
308 }
309
310 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
311 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
312
313 Ok(Some(block_list))
314 }
315
316 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
321 self.doc_with_fields(local_doc_id, None).await
322 }
323
324 pub async fn doc_with_fields(
330 &self,
331 local_doc_id: DocId,
332 fields: Option<&rustc_hash::FxHashSet<u32>>,
333 ) -> Result<Option<Document>> {
334 let mut doc = match fields {
335 Some(set) => {
336 let field_ids: Vec<u32> = set.iter().copied().collect();
337 match self
338 .store
339 .get_fields(local_doc_id, &self.schema, &field_ids)
340 .await
341 {
342 Ok(Some(d)) => d,
343 Ok(None) => return Ok(None),
344 Err(e) => return Err(Error::from(e)),
345 }
346 }
347 None => match self.store.get(local_doc_id, &self.schema).await {
348 Ok(Some(d)) => d,
349 Ok(None) => return Ok(None),
350 Err(e) => return Err(Error::from(e)),
351 },
352 };
353
354 for (&field_id, lazy_flat) in &self.flat_vectors {
356 if let Some(set) = fields
358 && !set.contains(&field_id)
359 {
360 continue;
361 }
362
363 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
364 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
365 let flat_idx = start + j;
366 match lazy_flat.get_vector(flat_idx).await {
367 Ok(vec) => {
368 doc.add_dense_vector(Field(field_id), vec);
369 }
370 Err(e) => {
371 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
372 }
373 }
374 }
375 }
376
377 Ok(Some(doc))
378 }
379
380 pub async fn prefetch_terms(
382 &self,
383 field: Field,
384 start_term: &[u8],
385 end_term: &[u8],
386 ) -> Result<()> {
387 let mut start_key = Vec::with_capacity(4 + start_term.len());
388 start_key.extend_from_slice(&field.0.to_le_bytes());
389 start_key.extend_from_slice(start_term);
390
391 let mut end_key = Vec::with_capacity(4 + end_term.len());
392 end_key.extend_from_slice(&field.0.to_le_bytes());
393 end_key.extend_from_slice(end_term);
394
395 self.term_dict.prefetch_range(&start_key, &end_key).await?;
396 Ok(())
397 }
398
399 pub fn store_has_dict(&self) -> bool {
401 self.store.has_dict()
402 }
403
404 pub fn store(&self) -> &super::store::AsyncStoreReader {
406 &self.store
407 }
408
409 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
411 self.store.raw_blocks()
412 }
413
414 pub fn store_data_slice(&self) -> &FileHandle {
416 self.store.data_slice()
417 }
418
419 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
421 self.term_dict.all_entries().await.map_err(Error::from)
422 }
423
424 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
429 let entries = self.term_dict.all_entries().await?;
430 let mut result = Vec::with_capacity(entries.len());
431
432 for (key, term_info) in entries {
433 if key.len() > 4 {
435 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
436 let term_bytes = &key[4..];
437 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
438 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
439 }
440 }
441 }
442
443 Ok(result)
444 }
445
446 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
448 self.term_dict.iter()
449 }
450
451 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
455 self.term_dict
456 .prefetch_all_data_bulk()
457 .await
458 .map_err(crate::Error::from)
459 }
460
461 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
463 let start = offset;
464 let end = start + len;
465 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
466 Ok(bytes.to_vec())
467 }
468
469 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
471 let handle = match &self.positions_handle {
472 Some(h) => h,
473 None => return Ok(None),
474 };
475 let start = offset;
476 let end = start + len;
477 let bytes = handle.read_bytes_range(start..end).await?;
478 Ok(Some(bytes.to_vec()))
479 }
480
481 pub fn has_positions_file(&self) -> bool {
483 self.positions_handle.is_some()
484 }
485
486 fn score_quantized_batch(
492 query: &[f32],
493 raw: &[u8],
494 quant: crate::dsl::DenseVectorQuantization,
495 dim: usize,
496 scores: &mut [f32],
497 unit_norm: bool,
498 ) {
499 use crate::dsl::DenseVectorQuantization;
500 use crate::structures::simd;
501 match (quant, unit_norm) {
502 (DenseVectorQuantization::F32, false) => {
503 let num_floats = scores.len() * dim;
504 debug_assert!(
505 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
506 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
507 );
508 let vectors: &[f32] =
509 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
510 simd::batch_cosine_scores(query, vectors, dim, scores);
511 }
512 (DenseVectorQuantization::F32, true) => {
513 let num_floats = scores.len() * dim;
514 debug_assert!(
515 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
516 "f32 vector data not 4-byte aligned"
517 );
518 let vectors: &[f32] =
519 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
520 simd::batch_dot_scores(query, vectors, dim, scores);
521 }
522 (DenseVectorQuantization::F16, false) => {
523 simd::batch_cosine_scores_f16(query, raw, dim, scores);
524 }
525 (DenseVectorQuantization::F16, true) => {
526 simd::batch_dot_scores_f16(query, raw, dim, scores);
527 }
528 (DenseVectorQuantization::UInt8, false) => {
529 simd::batch_cosine_scores_u8(query, raw, dim, scores);
530 }
531 (DenseVectorQuantization::UInt8, true) => {
532 simd::batch_dot_scores_u8(query, raw, dim, scores);
533 }
534 }
535 }
536
537 pub async fn search_dense_vector(
543 &self,
544 field: Field,
545 query: &[f32],
546 k: usize,
547 nprobe: usize,
548 rerank_factor: usize,
549 combiner: crate::query::MultiValueCombiner,
550 ) -> Result<Vec<VectorSearchResult>> {
551 let ann_index = self.vector_indexes.get(&field.0);
552 let lazy_flat = self.flat_vectors.get(&field.0);
553
554 if ann_index.is_none() && lazy_flat.is_none() {
556 return Ok(Vec::new());
557 }
558
559 let unit_norm = self
561 .schema
562 .get_field_entry(field)
563 .and_then(|e| e.dense_vector_config.as_ref())
564 .is_some_and(|c| c.unit_norm);
565
566 const BRUTE_FORCE_BATCH: usize = 4096;
568
569 let t0 = std::time::Instant::now();
571 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
572 match index {
574 VectorIndex::RaBitQ(lazy) => {
575 let rabitq = lazy.get().ok_or_else(|| {
576 Error::Schema("RaBitQ index deserialization failed".to_string())
577 })?;
578 let fetch_k = k * rerank_factor.max(1);
579 rabitq
580 .search(query, fetch_k, rerank_factor)
581 .into_iter()
582 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
583 .collect()
584 }
585 VectorIndex::IVF(lazy) => {
586 let (index, codebook) = lazy.get().ok_or_else(|| {
587 Error::Schema("IVF index deserialization failed".to_string())
588 })?;
589 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
590 Error::Schema(format!(
591 "IVF index requires coarse centroids for field {}",
592 field.0
593 ))
594 })?;
595 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
596 let fetch_k = k * rerank_factor.max(1);
597 index
598 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
599 .into_iter()
600 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
601 .collect()
602 }
603 VectorIndex::ScaNN(lazy) => {
604 let (index, codebook) = lazy.get().ok_or_else(|| {
605 Error::Schema("ScaNN index deserialization failed".to_string())
606 })?;
607 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
608 Error::Schema(format!(
609 "ScaNN index requires coarse centroids for field {}",
610 field.0
611 ))
612 })?;
613 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
614 let fetch_k = k * rerank_factor.max(1);
615 index
616 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
617 .into_iter()
618 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
619 .collect()
620 }
621 }
622 } else if let Some(lazy_flat) = lazy_flat {
623 log::debug!(
626 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
627 field.0,
628 lazy_flat.num_vectors,
629 lazy_flat.dim,
630 lazy_flat.quantization
631 );
632 let dim = lazy_flat.dim;
633 let n = lazy_flat.num_vectors;
634 let quant = lazy_flat.quantization;
635 let fetch_k = k * rerank_factor.max(1);
636 let mut collector = crate::query::ScoreCollector::new(fetch_k);
637 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
638
639 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
640 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
641 let batch_bytes = lazy_flat
642 .read_vectors_batch(batch_start, batch_count)
643 .await
644 .map_err(crate::Error::Io)?;
645 let raw = batch_bytes.as_slice();
646
647 Self::score_quantized_batch(
648 query,
649 raw,
650 quant,
651 dim,
652 &mut scores[..batch_count],
653 unit_norm,
654 );
655
656 for (i, &score) in scores.iter().enumerate().take(batch_count) {
657 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
658 collector.insert_with_ordinal(doc_id, score, ordinal);
659 }
660 }
661
662 collector
663 .into_sorted_results()
664 .into_iter()
665 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
666 .collect()
667 } else {
668 return Ok(Vec::new());
669 };
670 let l1_elapsed = t0.elapsed();
671 log::debug!(
672 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
673 field.0,
674 results.len(),
675 l1_elapsed.as_secs_f64() * 1000.0
676 );
677
678 if ann_index.is_some()
681 && !results.is_empty()
682 && let Some(lazy_flat) = lazy_flat
683 {
684 let t_rerank = std::time::Instant::now();
685 let dim = lazy_flat.dim;
686 let quant = lazy_flat.quantization;
687 let vbs = lazy_flat.vector_byte_size();
688
689 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
692 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
693 for (j, &(_, ord)) in entries.iter().enumerate() {
694 if ord == c.1 {
695 resolved.push((ri, start + j));
696 break;
697 }
698 }
699 }
700
701 let t_resolve = t_rerank.elapsed();
702 if !resolved.is_empty() {
703 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
705
706 let t_read = std::time::Instant::now();
708 let mut raw_buf = vec![0u8; resolved.len() * vbs];
709 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
710 let _ = lazy_flat
711 .read_vector_raw_into(
712 flat_idx,
713 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
714 )
715 .await;
716 }
717
718 let read_elapsed = t_read.elapsed();
719
720 let t_score = std::time::Instant::now();
722 let mut scores = vec![0f32; resolved.len()];
723 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
724 let score_elapsed = t_score.elapsed();
725
726 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
728 results[ri].2 = scores[buf_idx];
729 }
730
731 log::debug!(
732 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
733 field.0,
734 resolved.len(),
735 dim,
736 quant,
737 vbs,
738 t_resolve.as_secs_f64() * 1000.0,
739 read_elapsed.as_secs_f64() * 1000.0,
740 score_elapsed.as_secs_f64() * 1000.0,
741 );
742 }
743
744 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
745 results.truncate(k * rerank_factor.max(1));
746 log::debug!(
747 "[search_dense] field {}: rerank total={:.1}ms",
748 field.0,
749 t_rerank.elapsed().as_secs_f64() * 1000.0
750 );
751 }
752
753 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
755 rustc_hash::FxHashMap::default();
756 for (doc_id, ordinal, score) in results {
757 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
758 ordinals.push((ordinal as u32, score));
759 }
760
761 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
763 .into_iter()
764 .map(|(doc_id, ordinals)| {
765 let combined_score = combiner.combine(&ordinals);
766 VectorSearchResult::new(doc_id, combined_score, ordinals)
767 })
768 .collect();
769
770 final_results.sort_by(|a, b| {
772 b.score
773 .partial_cmp(&a.score)
774 .unwrap_or(std::cmp::Ordering::Equal)
775 });
776 final_results.truncate(k);
777
778 Ok(final_results)
779 }
780
781 pub fn has_dense_vector_index(&self, field: Field) -> bool {
783 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
784 }
785
786 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
788 match self.vector_indexes.get(&field.0) {
789 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
790 _ => None,
791 }
792 }
793
794 pub fn get_ivf_vector_index(
796 &self,
797 field: Field,
798 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
799 match self.vector_indexes.get(&field.0) {
800 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
801 _ => None,
802 }
803 }
804
805 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
807 self.coarse_centroids.get(&field_id)
808 }
809
810 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
812 self.coarse_centroids = centroids;
813 }
814
815 pub fn get_scann_vector_index(
817 &self,
818 field: Field,
819 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
820 match self.vector_indexes.get(&field.0) {
821 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
822 _ => None,
823 }
824 }
825
826 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
828 self.vector_indexes.get(&field.0)
829 }
830
831 pub async fn search_sparse_vector(
841 &self,
842 field: Field,
843 vector: &[(u32, f32)],
844 limit: usize,
845 combiner: crate::query::MultiValueCombiner,
846 heap_factor: f32,
847 ) -> Result<Vec<VectorSearchResult>> {
848 use crate::query::BmpExecutor;
849
850 let query_tokens = vector.len();
851
852 let sparse_index = match self.sparse_indexes.get(&field.0) {
854 Some(idx) => idx,
855 None => {
856 log::debug!(
857 "Sparse vector search: no index for field {}, returning empty",
858 field.0
859 );
860 return Ok(Vec::new());
861 }
862 };
863
864 let index_dimensions = sparse_index.num_dimensions();
865
866 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
868 let mut missing_count = 0usize;
869
870 for &(dim_id, query_weight) in vector {
871 if sparse_index.has_dimension(dim_id) {
872 matched_terms.push((dim_id, query_weight));
873 } else {
874 missing_count += 1;
875 }
876 }
877
878 log::debug!(
879 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
880 query_tokens,
881 matched_terms.len(),
882 missing_count,
883 index_dimensions
884 );
885
886 if matched_terms.is_empty() {
887 log::debug!("Sparse vector search: no matching tokens, returning empty");
888 return Ok(Vec::new());
889 }
890
891 let num_terms = matched_terms.len();
895 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
897 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
899 .execute()
900 .await?
901 } else {
902 crate::query::SparseMaxScoreExecutor::new(
905 sparse_index,
906 matched_terms,
907 over_fetch,
908 heap_factor,
909 )
910 .execute()
911 .await?
912 };
913
914 log::trace!(
915 "Sparse search returned {} raw results for segment {:016x}",
916 raw_results.len(),
917 self.meta.id
918 );
919 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
920 for r in raw_results.iter().take(5) {
921 log::trace!(
922 " Raw result: doc_id={}, score={:.4}, ordinal={}",
923 r.doc_id,
924 r.score,
925 r.ordinal
926 );
927 }
928 }
929
930 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
933 rustc_hash::FxHashMap::default();
934 for r in raw_results {
935 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
936 ordinals.push((r.ordinal as u32, r.score));
937 }
938
939 let mut results: Vec<VectorSearchResult> = doc_ordinals
941 .into_iter()
942 .map(|(doc_id, ordinals)| {
943 let combined_score = combiner.combine(&ordinals);
944 VectorSearchResult::new(doc_id, combined_score, ordinals)
945 })
946 .collect();
947
948 results.sort_by(|a, b| {
950 b.score
951 .partial_cmp(&a.score)
952 .unwrap_or(std::cmp::Ordering::Equal)
953 });
954 results.truncate(limit);
955
956 Ok(results)
957 }
958
959 pub async fn get_positions(
964 &self,
965 field: Field,
966 term: &[u8],
967 ) -> Result<Option<crate::structures::PositionPostingList>> {
968 let handle = match &self.positions_handle {
970 Some(h) => h,
971 None => return Ok(None),
972 };
973
974 let mut key = Vec::with_capacity(4 + term.len());
976 key.extend_from_slice(&field.0.to_le_bytes());
977 key.extend_from_slice(term);
978
979 let term_info = match self.term_dict.get(&key).await? {
981 Some(info) => info,
982 None => return Ok(None),
983 };
984
985 let (offset, length) = match term_info.position_info() {
987 Some((o, l)) => (o, l),
988 None => return Ok(None),
989 };
990
991 let slice = handle.slice(offset..offset + length);
993 let data = slice.read_bytes().await?;
994
995 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
997
998 Ok(Some(pos_list))
999 }
1000
1001 pub fn has_positions(&self, field: Field) -> bool {
1003 if let Some(entry) = self.schema.get_field_entry(field) {
1005 entry.positions.is_some()
1006 } else {
1007 false
1008 }
1009 }
1010}
1011
1012pub type SegmentReader = AsyncSegmentReader;