1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub(crate) fn combine_ordinal_results(
62 raw: impl IntoIterator<Item = (u32, u16, f32)>,
63 combiner: crate::query::MultiValueCombiner,
64 limit: usize,
65) -> Vec<VectorSearchResult> {
66 let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68 let num_raw = collected.len();
69 if log::log_enabled!(log::Level::Debug) {
70 let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
71 ids.sort_unstable();
72 ids.dedup();
73 log::debug!(
74 "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
75 num_raw,
76 ids.len(),
77 combiner,
78 limit
79 );
80 }
81
82 let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
84 if all_single {
85 let mut results: Vec<VectorSearchResult> = collected
86 .into_iter()
87 .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
88 .collect();
89 results.sort_unstable_by(|a, b| {
90 b.score
91 .partial_cmp(&a.score)
92 .unwrap_or(std::cmp::Ordering::Equal)
93 });
94 results.truncate(limit);
95 return results;
96 }
97
98 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
100 rustc_hash::FxHashMap::default();
101 for (doc_id, ordinal, score) in collected {
102 doc_ordinals
103 .entry(doc_id as DocId)
104 .or_default()
105 .push((ordinal as u32, score));
106 }
107 let mut results: Vec<VectorSearchResult> = doc_ordinals
108 .into_iter()
109 .map(|(doc_id, ordinals)| {
110 let combined_score = combiner.combine(&ordinals);
111 VectorSearchResult::new(doc_id, combined_score, ordinals)
112 })
113 .collect();
114 results.sort_unstable_by(|a, b| {
115 b.score
116 .partial_cmp(&a.score)
117 .unwrap_or(std::cmp::Ordering::Equal)
118 });
119 results.truncate(limit);
120 results
121}
122
123pub struct SegmentReader {
129 meta: SegmentMeta,
130 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
132 postings_handle: FileHandle,
134 store: Arc<AsyncStoreReader>,
136 schema: Arc<Schema>,
137 vector_indexes: FxHashMap<u32, VectorIndex>,
139 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
141 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143 sparse_indexes: FxHashMap<u32, SparseIndex>,
145 positions_handle: Option<FileHandle>,
147 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
149}
150
151impl SegmentReader {
152 pub async fn open<D: Directory>(
154 dir: &D,
155 segment_id: SegmentId,
156 schema: Arc<Schema>,
157 cache_blocks: usize,
158 ) -> Result<Self> {
159 let files = SegmentFiles::new(segment_id.0);
160
161 let meta_slice = dir.open_read(&files.meta).await?;
163 let meta_bytes = meta_slice.read_bytes().await?;
164 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
165 debug_assert_eq!(meta.id, segment_id.0);
166
167 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
169 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
170
171 let postings_handle = dir.open_lazy(&files.postings).await?;
173
174 let store_handle = dir.open_lazy(&files.store).await?;
176 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
177
178 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
180 let vector_indexes = vectors_data.indexes;
181 let flat_vectors = vectors_data.flat_vectors;
182
183 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
185
186 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
188
189 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
191
192 {
194 let mut parts = vec![format!(
195 "[segment] loaded {:016x}: docs={}",
196 segment_id.0, meta.num_docs
197 )];
198 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
199 parts.push(format!(
200 "dense: {} ann + {} flat fields",
201 vector_indexes.len(),
202 flat_vectors.len()
203 ));
204 }
205 for (field_id, idx) in &sparse_indexes {
206 parts.push(format!(
207 "sparse field {}: {} dims, ~{:.1} KB",
208 field_id,
209 idx.num_dimensions(),
210 idx.num_dimensions() as f64 * 24.0 / 1024.0
211 ));
212 }
213 if !fast_fields.is_empty() {
214 parts.push(format!("fast: {} fields", fast_fields.len()));
215 }
216 log::debug!("{}", parts.join(", "));
217 }
218
219 Ok(Self {
220 meta,
221 term_dict: Arc::new(term_dict),
222 postings_handle,
223 store: Arc::new(store),
224 schema,
225 vector_indexes,
226 flat_vectors,
227 coarse_centroids: FxHashMap::default(),
228 sparse_indexes,
229 positions_handle,
230 fast_fields,
231 })
232 }
233
234 pub fn meta(&self) -> &SegmentMeta {
235 &self.meta
236 }
237
238 pub fn num_docs(&self) -> u32 {
239 self.meta.num_docs
240 }
241
242 pub fn avg_field_len(&self, field: Field) -> f32 {
244 self.meta.avg_field_len(field)
245 }
246
247 pub fn schema(&self) -> &Schema {
248 &self.schema
249 }
250
251 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
253 &self.sparse_indexes
254 }
255
256 pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
258 self.sparse_indexes.get(&field.0)
259 }
260
261 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
263 &self.vector_indexes
264 }
265
266 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
268 &self.flat_vectors
269 }
270
271 pub fn fast_field(
273 &self,
274 field_id: u32,
275 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
276 self.fast_fields.get(&field_id)
277 }
278
279 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
281 &self.fast_fields
282 }
283
284 pub fn term_dict_stats(&self) -> SSTableStats {
286 self.term_dict.stats()
287 }
288
289 pub fn memory_stats(&self) -> SegmentMemoryStats {
291 let term_dict_stats = self.term_dict.stats();
292
293 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
295
296 let store_cache_bytes = self.store.cached_blocks() * 4096;
298
299 let sparse_index_bytes: usize = self
301 .sparse_indexes
302 .values()
303 .map(|s| s.estimated_memory_bytes())
304 .sum();
305
306 let dense_index_bytes: usize = self
309 .vector_indexes
310 .values()
311 .map(|v| v.estimated_memory_bytes())
312 .sum();
313
314 SegmentMemoryStats {
315 segment_id: self.meta.id,
316 num_docs: self.meta.num_docs,
317 term_dict_cache_bytes,
318 store_cache_bytes,
319 sparse_index_bytes,
320 dense_index_bytes,
321 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
322 }
323 }
324
325 pub async fn get_postings(
330 &self,
331 field: Field,
332 term: &[u8],
333 ) -> Result<Option<BlockPostingList>> {
334 log::debug!(
335 "SegmentReader::get_postings field={} term_len={}",
336 field.0,
337 term.len()
338 );
339
340 let mut key = Vec::with_capacity(4 + term.len());
342 key.extend_from_slice(&field.0.to_le_bytes());
343 key.extend_from_slice(term);
344
345 let term_info = match self.term_dict.get(&key).await? {
347 Some(info) => {
348 log::debug!("SegmentReader::get_postings found term_info");
349 info
350 }
351 None => {
352 log::debug!("SegmentReader::get_postings term not found");
353 return Ok(None);
354 }
355 };
356
357 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
359 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
361 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
362 posting_list.push(doc_id, tf);
363 }
364 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
365 return Ok(Some(block_list));
366 }
367
368 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
370 Error::Corruption("TermInfo has neither inline nor external data".to_string())
371 })?;
372
373 let start = posting_offset;
374 let end = start + posting_len;
375
376 if end > self.postings_handle.len() {
377 return Err(Error::Corruption(
378 "Posting offset out of bounds".to_string(),
379 ));
380 }
381
382 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
383 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
384
385 Ok(Some(block_list))
386 }
387
388 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
393 self.doc_with_fields(local_doc_id, None).await
394 }
395
396 pub async fn doc_with_fields(
402 &self,
403 local_doc_id: DocId,
404 fields: Option<&rustc_hash::FxHashSet<u32>>,
405 ) -> Result<Option<Document>> {
406 let mut doc = match fields {
407 Some(set) => {
408 let field_ids: Vec<u32> = set.iter().copied().collect();
409 match self
410 .store
411 .get_fields(local_doc_id, &self.schema, &field_ids)
412 .await
413 {
414 Ok(Some(d)) => d,
415 Ok(None) => return Ok(None),
416 Err(e) => return Err(Error::from(e)),
417 }
418 }
419 None => match self.store.get(local_doc_id, &self.schema).await {
420 Ok(Some(d)) => d,
421 Ok(None) => return Ok(None),
422 Err(e) => return Err(Error::from(e)),
423 },
424 };
425
426 for (&field_id, lazy_flat) in &self.flat_vectors {
428 if let Some(set) = fields
430 && !set.contains(&field_id)
431 {
432 continue;
433 }
434
435 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
436 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
437 let flat_idx = start + j;
438 match lazy_flat.get_vector(flat_idx).await {
439 Ok(vec) => {
440 doc.add_dense_vector(Field(field_id), vec);
441 }
442 Err(e) => {
443 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
444 }
445 }
446 }
447 }
448
449 Ok(Some(doc))
450 }
451
452 pub async fn prefetch_terms(
454 &self,
455 field: Field,
456 start_term: &[u8],
457 end_term: &[u8],
458 ) -> Result<()> {
459 let mut start_key = Vec::with_capacity(4 + start_term.len());
460 start_key.extend_from_slice(&field.0.to_le_bytes());
461 start_key.extend_from_slice(start_term);
462
463 let mut end_key = Vec::with_capacity(4 + end_term.len());
464 end_key.extend_from_slice(&field.0.to_le_bytes());
465 end_key.extend_from_slice(end_term);
466
467 self.term_dict.prefetch_range(&start_key, &end_key).await?;
468 Ok(())
469 }
470
471 pub fn store_has_dict(&self) -> bool {
473 self.store.has_dict()
474 }
475
476 pub fn store(&self) -> &super::store::AsyncStoreReader {
478 &self.store
479 }
480
481 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
483 self.store.raw_blocks()
484 }
485
486 pub fn store_data_slice(&self) -> &FileHandle {
488 self.store.data_slice()
489 }
490
491 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
493 self.term_dict.all_entries().await.map_err(Error::from)
494 }
495
496 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
501 let entries = self.term_dict.all_entries().await?;
502 let mut result = Vec::with_capacity(entries.len());
503
504 for (key, term_info) in entries {
505 if key.len() > 4 {
507 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
508 let term_bytes = &key[4..];
509 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
510 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
511 }
512 }
513 }
514
515 Ok(result)
516 }
517
518 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
520 self.term_dict.iter()
521 }
522
523 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
527 self.term_dict
528 .prefetch_all_data_bulk()
529 .await
530 .map_err(crate::Error::from)
531 }
532
533 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
535 let start = offset;
536 let end = start + len;
537 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
538 Ok(bytes.to_vec())
539 }
540
541 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
543 let handle = match &self.positions_handle {
544 Some(h) => h,
545 None => return Ok(None),
546 };
547 let start = offset;
548 let end = start + len;
549 let bytes = handle.read_bytes_range(start..end).await?;
550 Ok(Some(bytes.to_vec()))
551 }
552
553 pub fn has_positions_file(&self) -> bool {
555 self.positions_handle.is_some()
556 }
557
558 fn score_quantized_batch(
564 query: &[f32],
565 raw: &[u8],
566 quant: crate::dsl::DenseVectorQuantization,
567 dim: usize,
568 scores: &mut [f32],
569 unit_norm: bool,
570 ) {
571 use crate::dsl::DenseVectorQuantization;
572 use crate::structures::simd;
573 match (quant, unit_norm) {
574 (DenseVectorQuantization::F32, false) => {
575 let num_floats = scores.len() * dim;
576 debug_assert!(
577 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
578 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
579 );
580 let vectors: &[f32] =
581 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
582 simd::batch_cosine_scores(query, vectors, dim, scores);
583 }
584 (DenseVectorQuantization::F32, true) => {
585 let num_floats = scores.len() * dim;
586 debug_assert!(
587 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
588 "f32 vector data not 4-byte aligned"
589 );
590 let vectors: &[f32] =
591 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
592 simd::batch_dot_scores(query, vectors, dim, scores);
593 }
594 (DenseVectorQuantization::F16, false) => {
595 simd::batch_cosine_scores_f16(query, raw, dim, scores);
596 }
597 (DenseVectorQuantization::F16, true) => {
598 simd::batch_dot_scores_f16(query, raw, dim, scores);
599 }
600 (DenseVectorQuantization::UInt8, false) => {
601 simd::batch_cosine_scores_u8(query, raw, dim, scores);
602 }
603 (DenseVectorQuantization::UInt8, true) => {
604 simd::batch_dot_scores_u8(query, raw, dim, scores);
605 }
606 }
607 }
608
609 pub async fn search_dense_vector(
615 &self,
616 field: Field,
617 query: &[f32],
618 k: usize,
619 nprobe: usize,
620 rerank_factor: f32,
621 combiner: crate::query::MultiValueCombiner,
622 ) -> Result<Vec<VectorSearchResult>> {
623 let ann_index = self.vector_indexes.get(&field.0);
624 let lazy_flat = self.flat_vectors.get(&field.0);
625
626 if ann_index.is_none() && lazy_flat.is_none() {
628 return Ok(Vec::new());
629 }
630
631 let unit_norm = self
633 .schema
634 .get_field_entry(field)
635 .and_then(|e| e.dense_vector_config.as_ref())
636 .is_some_and(|c| c.unit_norm);
637
638 const BRUTE_FORCE_BATCH: usize = 4096;
640
641 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
642
643 let t0 = std::time::Instant::now();
645 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
646 match index {
648 VectorIndex::RaBitQ(lazy) => {
649 let rabitq = lazy.get().ok_or_else(|| {
650 Error::Schema("RaBitQ index deserialization failed".to_string())
651 })?;
652 rabitq
653 .search(query, fetch_k)
654 .into_iter()
655 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
656 .collect()
657 }
658 VectorIndex::IVF(lazy) => {
659 let (index, codebook) = lazy.get().ok_or_else(|| {
660 Error::Schema("IVF index deserialization failed".to_string())
661 })?;
662 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
663 Error::Schema(format!(
664 "IVF index requires coarse centroids for field {}",
665 field.0
666 ))
667 })?;
668 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
669 index
670 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
671 .into_iter()
672 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
673 .collect()
674 }
675 VectorIndex::ScaNN(lazy) => {
676 let (index, codebook) = lazy.get().ok_or_else(|| {
677 Error::Schema("ScaNN index deserialization failed".to_string())
678 })?;
679 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
680 Error::Schema(format!(
681 "ScaNN index requires coarse centroids for field {}",
682 field.0
683 ))
684 })?;
685 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
686 index
687 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
688 .into_iter()
689 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
690 .collect()
691 }
692 }
693 } else if let Some(lazy_flat) = lazy_flat {
694 log::debug!(
697 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
698 field.0,
699 lazy_flat.num_vectors,
700 lazy_flat.dim,
701 lazy_flat.quantization
702 );
703 let dim = lazy_flat.dim;
704 let n = lazy_flat.num_vectors;
705 let quant = lazy_flat.quantization;
706 let mut collector = crate::query::ScoreCollector::new(fetch_k);
707 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
708
709 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
710 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
711 let batch_bytes = lazy_flat
712 .read_vectors_batch(batch_start, batch_count)
713 .await
714 .map_err(crate::Error::Io)?;
715 let raw = batch_bytes.as_slice();
716
717 Self::score_quantized_batch(
718 query,
719 raw,
720 quant,
721 dim,
722 &mut scores[..batch_count],
723 unit_norm,
724 );
725
726 for (i, &score) in scores.iter().enumerate().take(batch_count) {
727 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
728 collector.insert_with_ordinal(doc_id, score, ordinal);
729 }
730 }
731
732 collector
733 .into_sorted_results()
734 .into_iter()
735 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
736 .collect()
737 } else {
738 return Ok(Vec::new());
739 };
740 let l1_elapsed = t0.elapsed();
741 log::debug!(
742 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
743 field.0,
744 results.len(),
745 l1_elapsed.as_secs_f64() * 1000.0
746 );
747
748 if ann_index.is_some()
751 && !results.is_empty()
752 && let Some(lazy_flat) = lazy_flat
753 {
754 let t_rerank = std::time::Instant::now();
755 let dim = lazy_flat.dim;
756 let quant = lazy_flat.quantization;
757 let vbs = lazy_flat.vector_byte_size();
758
759 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
762 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
763 for (j, &(_, ord)) in entries.iter().enumerate() {
764 if ord == c.1 {
765 resolved.push((ri, start + j));
766 break;
767 }
768 }
769 }
770
771 let t_resolve = t_rerank.elapsed();
772 if !resolved.is_empty() {
773 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
775
776 let t_read = std::time::Instant::now();
778 let mut raw_buf = vec![0u8; resolved.len() * vbs];
779 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
780 let _ = lazy_flat
781 .read_vector_raw_into(
782 flat_idx,
783 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
784 )
785 .await;
786 }
787
788 let read_elapsed = t_read.elapsed();
789
790 let t_score = std::time::Instant::now();
792 let mut scores = vec![0f32; resolved.len()];
793 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
794 let score_elapsed = t_score.elapsed();
795
796 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
798 results[ri].2 = scores[buf_idx];
799 }
800
801 log::debug!(
802 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
803 field.0,
804 resolved.len(),
805 dim,
806 quant,
807 vbs,
808 t_resolve.as_secs_f64() * 1000.0,
809 read_elapsed.as_secs_f64() * 1000.0,
810 score_elapsed.as_secs_f64() * 1000.0,
811 );
812 }
813
814 if results.len() > fetch_k {
815 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
816 results.truncate(fetch_k);
817 }
818 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
819 log::debug!(
820 "[search_dense] field {}: rerank total={:.1}ms",
821 field.0,
822 t_rerank.elapsed().as_secs_f64() * 1000.0
823 );
824 }
825
826 Ok(combine_ordinal_results(results, combiner, k))
827 }
828
829 pub fn has_dense_vector_index(&self, field: Field) -> bool {
831 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
832 }
833
834 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
836 match self.vector_indexes.get(&field.0) {
837 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
838 _ => None,
839 }
840 }
841
842 pub fn get_ivf_vector_index(
844 &self,
845 field: Field,
846 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
847 match self.vector_indexes.get(&field.0) {
848 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
849 _ => None,
850 }
851 }
852
853 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
855 self.coarse_centroids.get(&field_id)
856 }
857
858 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
860 self.coarse_centroids = centroids;
861 }
862
863 pub fn get_scann_vector_index(
865 &self,
866 field: Field,
867 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
868 match self.vector_indexes.get(&field.0) {
869 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
870 _ => None,
871 }
872 }
873
874 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
876 self.vector_indexes.get(&field.0)
877 }
878
879 pub async fn get_positions(
884 &self,
885 field: Field,
886 term: &[u8],
887 ) -> Result<Option<crate::structures::PositionPostingList>> {
888 let handle = match &self.positions_handle {
890 Some(h) => h,
891 None => return Ok(None),
892 };
893
894 let mut key = Vec::with_capacity(4 + term.len());
896 key.extend_from_slice(&field.0.to_le_bytes());
897 key.extend_from_slice(term);
898
899 let term_info = match self.term_dict.get(&key).await? {
901 Some(info) => info,
902 None => return Ok(None),
903 };
904
905 let (offset, length) = match term_info.position_info() {
907 Some((o, l)) => (o, l),
908 None => return Ok(None),
909 };
910
911 let slice = handle.slice(offset..offset + length);
913 let data = slice.read_bytes().await?;
914
915 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
917
918 Ok(Some(pos_list))
919 }
920
921 pub fn has_positions(&self, field: Field) -> bool {
923 if let Some(entry) = self.schema.get_field_entry(field) {
925 entry.positions.is_some()
926 } else {
927 false
928 }
929 }
930}
931
932#[cfg(feature = "sync")]
934impl SegmentReader {
935 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
937 let mut key = Vec::with_capacity(4 + term.len());
939 key.extend_from_slice(&field.0.to_le_bytes());
940 key.extend_from_slice(term);
941
942 let term_info = match self.term_dict.get_sync(&key)? {
944 Some(info) => info,
945 None => return Ok(None),
946 };
947
948 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
950 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
951 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
952 posting_list.push(doc_id, tf);
953 }
954 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
955 return Ok(Some(block_list));
956 }
957
958 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
960 Error::Corruption("TermInfo has neither inline nor external data".to_string())
961 })?;
962
963 let start = posting_offset;
964 let end = start + posting_len;
965
966 if end > self.postings_handle.len() {
967 return Err(Error::Corruption(
968 "Posting offset out of bounds".to_string(),
969 ));
970 }
971
972 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
973 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
974
975 Ok(Some(block_list))
976 }
977
978 pub fn get_positions_sync(
980 &self,
981 field: Field,
982 term: &[u8],
983 ) -> Result<Option<crate::structures::PositionPostingList>> {
984 let handle = match &self.positions_handle {
985 Some(h) => h,
986 None => return Ok(None),
987 };
988
989 let mut key = Vec::with_capacity(4 + term.len());
991 key.extend_from_slice(&field.0.to_le_bytes());
992 key.extend_from_slice(term);
993
994 let term_info = match self.term_dict.get_sync(&key)? {
996 Some(info) => info,
997 None => return Ok(None),
998 };
999
1000 let (offset, length) = match term_info.position_info() {
1001 Some((o, l)) => (o, l),
1002 None => return Ok(None),
1003 };
1004
1005 let slice = handle.slice(offset..offset + length);
1006 let data = slice.read_bytes_sync()?;
1007
1008 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1009 Ok(Some(pos_list))
1010 }
1011
1012 pub fn search_dense_vector_sync(
1015 &self,
1016 field: Field,
1017 query: &[f32],
1018 k: usize,
1019 nprobe: usize,
1020 rerank_factor: f32,
1021 combiner: crate::query::MultiValueCombiner,
1022 ) -> Result<Vec<VectorSearchResult>> {
1023 let ann_index = self.vector_indexes.get(&field.0);
1024 let lazy_flat = self.flat_vectors.get(&field.0);
1025
1026 if ann_index.is_none() && lazy_flat.is_none() {
1027 return Ok(Vec::new());
1028 }
1029
1030 let unit_norm = self
1031 .schema
1032 .get_field_entry(field)
1033 .and_then(|e| e.dense_vector_config.as_ref())
1034 .is_some_and(|c| c.unit_norm);
1035
1036 const BRUTE_FORCE_BATCH: usize = 4096;
1037 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1038
1039 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1040 match index {
1042 VectorIndex::RaBitQ(lazy) => {
1043 let rabitq = lazy.get().ok_or_else(|| {
1044 Error::Schema("RaBitQ index deserialization failed".to_string())
1045 })?;
1046 rabitq
1047 .search(query, fetch_k)
1048 .into_iter()
1049 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1050 .collect()
1051 }
1052 VectorIndex::IVF(lazy) => {
1053 let (index, codebook) = lazy.get().ok_or_else(|| {
1054 Error::Schema("IVF index deserialization failed".to_string())
1055 })?;
1056 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1057 Error::Schema(format!(
1058 "IVF index requires coarse centroids for field {}",
1059 field.0
1060 ))
1061 })?;
1062 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1063 index
1064 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1065 .into_iter()
1066 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1067 .collect()
1068 }
1069 VectorIndex::ScaNN(lazy) => {
1070 let (index, codebook) = lazy.get().ok_or_else(|| {
1071 Error::Schema("ScaNN index deserialization failed".to_string())
1072 })?;
1073 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1074 Error::Schema(format!(
1075 "ScaNN index requires coarse centroids for field {}",
1076 field.0
1077 ))
1078 })?;
1079 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1080 index
1081 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1082 .into_iter()
1083 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1084 .collect()
1085 }
1086 }
1087 } else if let Some(lazy_flat) = lazy_flat {
1088 let dim = lazy_flat.dim;
1090 let n = lazy_flat.num_vectors;
1091 let quant = lazy_flat.quantization;
1092 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1093 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1094
1095 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1096 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1097 let batch_bytes = lazy_flat
1098 .read_vectors_batch_sync(batch_start, batch_count)
1099 .map_err(crate::Error::Io)?;
1100 let raw = batch_bytes.as_slice();
1101
1102 Self::score_quantized_batch(
1103 query,
1104 raw,
1105 quant,
1106 dim,
1107 &mut scores[..batch_count],
1108 unit_norm,
1109 );
1110
1111 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1112 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1113 collector.insert_with_ordinal(doc_id, score, ordinal);
1114 }
1115 }
1116
1117 collector
1118 .into_sorted_results()
1119 .into_iter()
1120 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1121 .collect()
1122 } else {
1123 return Ok(Vec::new());
1124 };
1125
1126 if ann_index.is_some()
1128 && !results.is_empty()
1129 && let Some(lazy_flat) = lazy_flat
1130 {
1131 let dim = lazy_flat.dim;
1132 let quant = lazy_flat.quantization;
1133 let vbs = lazy_flat.vector_byte_size();
1134
1135 let mut resolved: Vec<(usize, usize)> = Vec::new();
1136 for (ri, c) in results.iter().enumerate() {
1137 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1138 for (j, &(_, ord)) in entries.iter().enumerate() {
1139 if ord == c.1 {
1140 resolved.push((ri, start + j));
1141 break;
1142 }
1143 }
1144 }
1145
1146 if !resolved.is_empty() {
1147 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1148 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1149 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1150 let _ = lazy_flat.read_vector_raw_into_sync(
1151 flat_idx,
1152 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1153 );
1154 }
1155
1156 let mut scores = vec![0f32; resolved.len()];
1157 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1158
1159 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1160 results[ri].2 = scores[buf_idx];
1161 }
1162 }
1163
1164 if results.len() > fetch_k {
1165 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1166 results.truncate(fetch_k);
1167 }
1168 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1169 }
1170
1171 Ok(combine_ordinal_results(results, combiner, k))
1172 }
1173}