1pub(crate) mod bmp;
4pub(crate) mod loader;
5mod types;
6
7pub use bmp::BmpIndex;
8#[cfg(feature = "diagnostics")]
9pub use types::DimRawData;
10pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
11
12#[derive(Debug, Clone, Default)]
14pub struct SegmentMemoryStats {
15 pub segment_id: u128,
17 pub num_docs: u32,
19 pub term_dict_cache_bytes: usize,
21 pub store_cache_bytes: usize,
23 pub sparse_index_bytes: usize,
25 pub dense_index_bytes: usize,
27 pub bloom_filter_bytes: usize,
29}
30
31impl SegmentMemoryStats {
32 pub fn total_bytes(&self) -> usize {
34 self.term_dict_cache_bytes
35 + self.store_cache_bytes
36 + self.sparse_index_bytes
37 + self.dense_index_bytes
38 + self.bloom_filter_bytes
39 }
40}
41
42use std::sync::Arc;
43
44use rustc_hash::FxHashMap;
45
46use super::vector_data::LazyFlatVectorData;
47use crate::directories::{Directory, FileHandle};
48use crate::dsl::{DenseVectorQuantization, Document, Field, Schema};
49use crate::structures::{
50 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
51 RaBitQIndex, SSTableStats, TermInfo,
52};
53use crate::{DocId, Error, Result};
54
55use super::store::{AsyncStoreReader, RawStoreBlock};
56use super::types::{SegmentFiles, SegmentId, SegmentMeta};
57
58pub(crate) fn combine_ordinal_results(
64 raw: impl IntoIterator<Item = (u32, u16, f32)>,
65 combiner: crate::query::MultiValueCombiner,
66 limit: usize,
67) -> Vec<VectorSearchResult> {
68 let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
69
70 let num_raw = collected.len();
71 if log::log_enabled!(log::Level::Debug) {
72 let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
73 ids.sort_unstable();
74 ids.dedup();
75 log::debug!(
76 "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77 num_raw,
78 ids.len(),
79 combiner,
80 limit
81 );
82 }
83
84 let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
86 if all_single {
87 let mut results: Vec<VectorSearchResult> = collected
88 .into_iter()
89 .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
90 .collect();
91 results.sort_unstable_by(|a, b| {
92 b.score
93 .partial_cmp(&a.score)
94 .unwrap_or(std::cmp::Ordering::Equal)
95 });
96 results.truncate(limit);
97 return results;
98 }
99
100 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
102 rustc_hash::FxHashMap::default();
103 for (doc_id, ordinal, score) in collected {
104 doc_ordinals
105 .entry(doc_id as DocId)
106 .or_default()
107 .push((ordinal as u32, score));
108 }
109 let mut results: Vec<VectorSearchResult> = doc_ordinals
110 .into_iter()
111 .map(|(doc_id, ordinals)| {
112 let combined_score = combiner.combine(&ordinals);
113 VectorSearchResult::new(doc_id, combined_score, ordinals)
114 })
115 .collect();
116 results.sort_unstable_by(|a, b| {
117 b.score
118 .partial_cmp(&a.score)
119 .unwrap_or(std::cmp::Ordering::Equal)
120 });
121 results.truncate(limit);
122 results
123}
124
125pub struct SegmentReader {
131 meta: SegmentMeta,
132 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
134 postings_handle: FileHandle,
136 store: Arc<AsyncStoreReader>,
138 schema: Arc<Schema>,
139 vector_indexes: FxHashMap<u32, VectorIndex>,
141 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
143 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
145 sparse_indexes: FxHashMap<u32, SparseIndex>,
147 bmp_indexes: FxHashMap<u32, BmpIndex>,
149 positions_handle: Option<FileHandle>,
151 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
153}
154
155impl SegmentReader {
156 pub async fn open<D: Directory>(
158 dir: &D,
159 segment_id: SegmentId,
160 schema: Arc<Schema>,
161 cache_blocks: usize,
162 ) -> Result<Self> {
163 let files = SegmentFiles::new(segment_id.0);
164
165 let meta_slice = dir.open_read(&files.meta).await?;
167 let meta_bytes = meta_slice.read_bytes().await?;
168 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
169 debug_assert_eq!(meta.id, segment_id.0);
170
171 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
173 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
174
175 let postings_handle = dir.open_lazy(&files.postings).await?;
177
178 let store_handle = dir.open_lazy(&files.store).await?;
180 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
181
182 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
184 let vector_indexes = vectors_data.indexes;
185 let flat_vectors = vectors_data.flat_vectors;
186
187 let sparse_data = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
189 let sparse_indexes = sparse_data.maxscore_indexes;
190 let bmp_indexes = sparse_data.bmp_indexes;
191
192 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
194
195 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
197
198 {
200 let mut parts = vec![format!(
201 "[segment] loaded {:016x}: docs={}",
202 segment_id.0, meta.num_docs
203 )];
204 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
205 parts.push(format!(
206 "dense: {} ann + {} flat fields",
207 vector_indexes.len(),
208 flat_vectors.len()
209 ));
210 }
211 for (field_id, idx) in &sparse_indexes {
212 parts.push(format!(
213 "sparse field {}: {} dims, ~{:.1} KB",
214 field_id,
215 idx.num_dimensions(),
216 idx.num_dimensions() as f64 * 24.0 / 1024.0
217 ));
218 }
219 for (field_id, idx) in &bmp_indexes {
220 parts.push(format!(
221 "bmp field {}: {} dims, {} blocks",
222 field_id,
223 idx.dims(),
224 idx.num_blocks
225 ));
226 }
227 if !fast_fields.is_empty() {
228 parts.push(format!("fast: {} fields", fast_fields.len()));
229 }
230 log::debug!("{}", parts.join(", "));
231 }
232
233 Ok(Self {
234 meta,
235 term_dict: Arc::new(term_dict),
236 postings_handle,
237 store: Arc::new(store),
238 schema,
239 vector_indexes,
240 flat_vectors,
241 coarse_centroids: FxHashMap::default(),
242 sparse_indexes,
243 bmp_indexes,
244 positions_handle,
245 fast_fields,
246 })
247 }
248
249 pub fn meta(&self) -> &SegmentMeta {
250 &self.meta
251 }
252
253 pub fn num_docs(&self) -> u32 {
254 self.meta.num_docs
255 }
256
257 pub fn avg_field_len(&self, field: Field) -> f32 {
259 self.meta.avg_field_len(field)
260 }
261
262 pub fn schema(&self) -> &Schema {
263 &self.schema
264 }
265
266 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
268 &self.sparse_indexes
269 }
270
271 pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
273 self.sparse_indexes.get(&field.0)
274 }
275
276 pub fn bmp_index(&self, field: Field) -> Option<&BmpIndex> {
278 self.bmp_indexes.get(&field.0)
279 }
280
281 pub fn bmp_indexes(&self) -> &FxHashMap<u32, BmpIndex> {
283 &self.bmp_indexes
284 }
285
286 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
288 &self.vector_indexes
289 }
290
291 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
293 &self.flat_vectors
294 }
295
296 pub fn fast_field(
298 &self,
299 field_id: u32,
300 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
301 self.fast_fields.get(&field_id)
302 }
303
304 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
306 &self.fast_fields
307 }
308
309 pub fn term_dict_stats(&self) -> SSTableStats {
311 self.term_dict.stats()
312 }
313
314 pub fn memory_stats(&self) -> SegmentMemoryStats {
316 let term_dict_stats = self.term_dict.stats();
317
318 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
320
321 let store_cache_bytes = self.store.cached_blocks() * 4096;
323
324 let sparse_index_bytes: usize = self
326 .sparse_indexes
327 .values()
328 .map(|s| s.estimated_memory_bytes())
329 .sum::<usize>()
330 + self
331 .bmp_indexes
332 .values()
333 .map(|b| b.estimated_memory_bytes())
334 .sum::<usize>();
335
336 let dense_index_bytes: usize = self
339 .vector_indexes
340 .values()
341 .map(|v| v.estimated_memory_bytes())
342 .sum();
343
344 SegmentMemoryStats {
345 segment_id: self.meta.id,
346 num_docs: self.meta.num_docs,
347 term_dict_cache_bytes,
348 store_cache_bytes,
349 sparse_index_bytes,
350 dense_index_bytes,
351 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
352 }
353 }
354
355 pub async fn get_postings(
360 &self,
361 field: Field,
362 term: &[u8],
363 ) -> Result<Option<BlockPostingList>> {
364 log::debug!(
365 "SegmentReader::get_postings field={} term_len={}",
366 field.0,
367 term.len()
368 );
369
370 let mut key = Vec::with_capacity(4 + term.len());
372 key.extend_from_slice(&field.0.to_le_bytes());
373 key.extend_from_slice(term);
374
375 let term_info = match self.term_dict.get(&key).await? {
377 Some(info) => {
378 log::debug!("SegmentReader::get_postings found term_info");
379 info
380 }
381 None => {
382 log::debug!("SegmentReader::get_postings term not found");
383 return Ok(None);
384 }
385 };
386
387 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
389 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
391 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
392 posting_list.push(doc_id, tf);
393 }
394 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
395 return Ok(Some(block_list));
396 }
397
398 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
400 Error::Corruption("TermInfo has neither inline nor external data".to_string())
401 })?;
402
403 let start = posting_offset;
404 let end = start + posting_len;
405
406 if end > self.postings_handle.len() {
407 return Err(Error::Corruption(
408 "Posting offset out of bounds".to_string(),
409 ));
410 }
411
412 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
413 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
414
415 Ok(Some(block_list))
416 }
417
418 pub async fn get_prefix_postings(
420 &self,
421 field: Field,
422 prefix: &[u8],
423 ) -> Result<Vec<BlockPostingList>> {
424 let mut key_prefix = Vec::with_capacity(4 + prefix.len());
426 key_prefix.extend_from_slice(&field.0.to_le_bytes());
427 key_prefix.extend_from_slice(prefix);
428
429 let entries = self.term_dict.prefix_scan(&key_prefix).await?;
430 let mut results = Vec::with_capacity(entries.len());
431
432 for (_key, term_info) in entries {
433 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
434 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
435 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
436 posting_list.push(doc_id, tf);
437 }
438 results.push(BlockPostingList::from_posting_list(&posting_list)?);
439 } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
440 let start = posting_offset;
441 let end = start + posting_len;
442 if end > self.postings_handle.len() {
443 continue;
444 }
445 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
446 results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
447 }
448 }
449
450 Ok(results)
451 }
452
453 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
458 self.doc_with_fields(local_doc_id, None).await
459 }
460
461 pub async fn doc_with_fields(
467 &self,
468 local_doc_id: DocId,
469 fields: Option<&rustc_hash::FxHashSet<u32>>,
470 ) -> Result<Option<Document>> {
471 let mut doc = match fields {
472 Some(set) => {
473 let field_ids: Vec<u32> = set.iter().copied().collect();
474 match self
475 .store
476 .get_fields(local_doc_id, &self.schema, &field_ids)
477 .await
478 {
479 Ok(Some(d)) => d,
480 Ok(None) => return Ok(None),
481 Err(e) => return Err(Error::from(e)),
482 }
483 }
484 None => match self.store.get(local_doc_id, &self.schema).await {
485 Ok(Some(d)) => d,
486 Ok(None) => return Ok(None),
487 Err(e) => return Err(Error::from(e)),
488 },
489 };
490
491 for (&field_id, lazy_flat) in &self.flat_vectors {
493 if let Some(set) = fields
495 && !set.contains(&field_id)
496 {
497 continue;
498 }
499
500 let is_binary = lazy_flat.quantization == DenseVectorQuantization::Binary;
501 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
502 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
503 let flat_idx = start + j;
504 if is_binary {
505 let vbs = lazy_flat.vector_byte_size();
506 let mut raw = vec![0u8; vbs];
507 match lazy_flat.read_vector_raw_into(flat_idx, &mut raw).await {
508 Ok(()) => {
509 doc.add_binary_dense_vector(Field(field_id), raw);
510 }
511 Err(e) => {
512 log::warn!("Failed to hydrate binary vector field {}: {}", field_id, e);
513 }
514 }
515 } else {
516 match lazy_flat.get_vector(flat_idx).await {
517 Ok(vec) => {
518 doc.add_dense_vector(Field(field_id), vec);
519 }
520 Err(e) => {
521 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
522 }
523 }
524 }
525 }
526 }
527
528 Ok(Some(doc))
529 }
530
531 pub async fn prefetch_terms(
533 &self,
534 field: Field,
535 start_term: &[u8],
536 end_term: &[u8],
537 ) -> Result<()> {
538 let mut start_key = Vec::with_capacity(4 + start_term.len());
539 start_key.extend_from_slice(&field.0.to_le_bytes());
540 start_key.extend_from_slice(start_term);
541
542 let mut end_key = Vec::with_capacity(4 + end_term.len());
543 end_key.extend_from_slice(&field.0.to_le_bytes());
544 end_key.extend_from_slice(end_term);
545
546 self.term_dict.prefetch_range(&start_key, &end_key).await?;
547 Ok(())
548 }
549
550 pub fn store_has_dict(&self) -> bool {
552 self.store.has_dict()
553 }
554
555 pub fn store(&self) -> &super::store::AsyncStoreReader {
557 &self.store
558 }
559
560 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
562 self.store.raw_blocks()
563 }
564
565 pub fn store_data_slice(&self) -> &FileHandle {
567 self.store.data_slice()
568 }
569
570 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
572 self.term_dict.all_entries().await.map_err(Error::from)
573 }
574
575 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
580 let entries = self.term_dict.all_entries().await?;
581 let mut result = Vec::with_capacity(entries.len());
582
583 for (key, term_info) in entries {
584 if key.len() > 4 {
586 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
587 let term_bytes = &key[4..];
588 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
589 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
590 }
591 }
592 }
593
594 Ok(result)
595 }
596
597 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
599 self.term_dict.iter()
600 }
601
602 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
606 self.term_dict
607 .prefetch_all_data_bulk()
608 .await
609 .map_err(crate::Error::from)
610 }
611
612 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
614 let start = offset;
615 let end = start + len;
616 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
617 Ok(bytes.to_vec())
618 }
619
620 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
622 let handle = match &self.positions_handle {
623 Some(h) => h,
624 None => return Ok(None),
625 };
626 let start = offset;
627 let end = start + len;
628 let bytes = handle.read_bytes_range(start..end).await?;
629 Ok(Some(bytes.to_vec()))
630 }
631
632 pub fn has_positions_file(&self) -> bool {
634 self.positions_handle.is_some()
635 }
636
637 fn score_quantized_batch(
643 query: &[f32],
644 raw: &[u8],
645 quant: crate::dsl::DenseVectorQuantization,
646 dim: usize,
647 scores: &mut [f32],
648 unit_norm: bool,
649 ) {
650 use crate::dsl::DenseVectorQuantization;
651 use crate::structures::simd;
652 match (quant, unit_norm) {
653 (DenseVectorQuantization::F32, false) => {
654 let num_floats = scores.len() * dim;
655 debug_assert!(
656 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
657 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
658 );
659 let vectors: &[f32] =
660 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
661 simd::batch_cosine_scores(query, vectors, dim, scores);
662 }
663 (DenseVectorQuantization::F32, true) => {
664 let num_floats = scores.len() * dim;
665 debug_assert!(
666 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
667 "f32 vector data not 4-byte aligned"
668 );
669 let vectors: &[f32] =
670 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
671 simd::batch_dot_scores(query, vectors, dim, scores);
672 }
673 (DenseVectorQuantization::F16, false) => {
674 simd::batch_cosine_scores_f16(query, raw, dim, scores);
675 }
676 (DenseVectorQuantization::F16, true) => {
677 simd::batch_dot_scores_f16(query, raw, dim, scores);
678 }
679 (DenseVectorQuantization::UInt8, false) => {
680 simd::batch_cosine_scores_u8(query, raw, dim, scores);
681 }
682 (DenseVectorQuantization::UInt8, true) => {
683 simd::batch_dot_scores_u8(query, raw, dim, scores);
684 }
685 (DenseVectorQuantization::Binary, _) => {
686 unreachable!("Binary quantization should not reach score_quantized_batch");
688 }
689 }
690 }
691
692 pub async fn search_dense_vector(
698 &self,
699 field: Field,
700 query: &[f32],
701 k: usize,
702 nprobe: usize,
703 rerank_factor: f32,
704 combiner: crate::query::MultiValueCombiner,
705 ) -> Result<Vec<VectorSearchResult>> {
706 let ann_index = self.vector_indexes.get(&field.0);
707 let lazy_flat = self.flat_vectors.get(&field.0);
708
709 if ann_index.is_none() && lazy_flat.is_none() {
711 return Ok(Vec::new());
712 }
713
714 let unit_norm = self
716 .schema
717 .get_field_entry(field)
718 .and_then(|e| e.dense_vector_config.as_ref())
719 .is_some_and(|c| c.unit_norm);
720
721 const BRUTE_FORCE_BATCH: usize = 4096;
723
724 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
725
726 let t0 = std::time::Instant::now();
728 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
729 match index {
731 VectorIndex::RaBitQ(lazy) => {
732 let rabitq = lazy.get().ok_or_else(|| {
733 Error::Schema("RaBitQ index deserialization failed".to_string())
734 })?;
735 rabitq
736 .search(query, fetch_k)
737 .into_iter()
738 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
739 .collect()
740 }
741 VectorIndex::IVF(lazy) => {
742 let (index, codebook) = lazy.get().ok_or_else(|| {
743 Error::Schema("IVF index deserialization failed".to_string())
744 })?;
745 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
746 Error::Schema(format!(
747 "IVF index requires coarse centroids for field {}",
748 field.0
749 ))
750 })?;
751 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
752 index
753 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
754 .into_iter()
755 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
756 .collect()
757 }
758 VectorIndex::ScaNN(lazy) => {
759 let (index, codebook) = lazy.get().ok_or_else(|| {
760 Error::Schema("ScaNN index deserialization failed".to_string())
761 })?;
762 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
763 Error::Schema(format!(
764 "ScaNN index requires coarse centroids for field {}",
765 field.0
766 ))
767 })?;
768 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
769 index
770 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
771 .into_iter()
772 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
773 .collect()
774 }
775 }
776 } else if let Some(lazy_flat) = lazy_flat {
777 log::debug!(
780 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
781 field.0,
782 lazy_flat.num_vectors,
783 lazy_flat.dim,
784 lazy_flat.quantization
785 );
786 let dim = lazy_flat.dim;
787 let n = lazy_flat.num_vectors;
788 let quant = lazy_flat.quantization;
789 let mut collector = crate::query::ScoreCollector::new(fetch_k);
790 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
791
792 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
793 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
794 let batch_bytes = lazy_flat
795 .read_vectors_batch(batch_start, batch_count)
796 .await
797 .map_err(crate::Error::Io)?;
798 let raw = batch_bytes.as_slice();
799
800 Self::score_quantized_batch(
801 query,
802 raw,
803 quant,
804 dim,
805 &mut scores[..batch_count],
806 unit_norm,
807 );
808
809 for (i, &score) in scores.iter().enumerate().take(batch_count) {
810 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
811 collector.insert_with_ordinal(doc_id, score, ordinal);
812 }
813 }
814
815 collector
816 .into_sorted_results()
817 .into_iter()
818 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
819 .collect()
820 } else {
821 return Ok(Vec::new());
822 };
823 let l1_elapsed = t0.elapsed();
824 log::debug!(
825 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
826 field.0,
827 results.len(),
828 l1_elapsed.as_secs_f64() * 1000.0
829 );
830
831 if ann_index.is_some()
834 && !results.is_empty()
835 && let Some(lazy_flat) = lazy_flat
836 {
837 let t_rerank = std::time::Instant::now();
838 let dim = lazy_flat.dim;
839 let quant = lazy_flat.quantization;
840 let vbs = lazy_flat.vector_byte_size();
841
842 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
845 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
846 for (j, &(_, ord)) in entries.iter().enumerate() {
847 if ord == c.1 {
848 resolved.push((ri, start + j));
849 break;
850 }
851 }
852 }
853
854 let t_resolve = t_rerank.elapsed();
855 if !resolved.is_empty() {
856 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
858
859 let t_read = std::time::Instant::now();
861 let mut raw_buf = vec![0u8; resolved.len() * vbs];
862 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
863 let _ = lazy_flat
864 .read_vector_raw_into(
865 flat_idx,
866 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
867 )
868 .await;
869 }
870
871 let read_elapsed = t_read.elapsed();
872
873 let t_score = std::time::Instant::now();
875 let mut scores = vec![0f32; resolved.len()];
876 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
877 let score_elapsed = t_score.elapsed();
878
879 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
881 results[ri].2 = scores[buf_idx];
882 }
883
884 log::debug!(
885 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
886 field.0,
887 resolved.len(),
888 dim,
889 quant,
890 vbs,
891 t_resolve.as_secs_f64() * 1000.0,
892 read_elapsed.as_secs_f64() * 1000.0,
893 score_elapsed.as_secs_f64() * 1000.0,
894 );
895 }
896
897 if results.len() > fetch_k {
898 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
899 results.truncate(fetch_k);
900 }
901 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
902 log::debug!(
903 "[search_dense] field {}: rerank total={:.1}ms",
904 field.0,
905 t_rerank.elapsed().as_secs_f64() * 1000.0
906 );
907 }
908
909 Ok(combine_ordinal_results(results, combiner, k))
910 }
911
912 pub async fn search_binary_dense_vector(
916 &self,
917 field: Field,
918 query: &[u8],
919 k: usize,
920 combiner: crate::query::MultiValueCombiner,
921 ) -> Result<Vec<VectorSearchResult>> {
922 let lazy_flat = match self.flat_vectors.get(&field.0) {
923 Some(f) => f,
924 None => return Ok(Vec::new()),
925 };
926
927 const BRUTE_FORCE_BATCH: usize = 8192; let dim_bits = lazy_flat.dim;
930 let byte_len = lazy_flat.vector_byte_size();
931 let n = lazy_flat.num_vectors;
932
933 if byte_len != query.len() {
934 return Err(Error::Schema(format!(
935 "Binary query vector byte length {} != field byte length {}",
936 query.len(),
937 byte_len
938 )));
939 }
940
941 let mut collector = crate::query::ScoreCollector::new(k);
942 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
943
944 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
945 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
946 let batch_bytes = lazy_flat
947 .read_vectors_batch(batch_start, batch_count)
948 .await
949 .map_err(crate::Error::Io)?;
950 let raw = batch_bytes.as_slice();
951
952 crate::structures::simd::batch_hamming_scores(
953 query,
954 raw,
955 byte_len,
956 dim_bits,
957 &mut scores[..batch_count],
958 );
959
960 let threshold = collector.threshold();
961 for (i, &score) in scores.iter().enumerate().take(batch_count) {
962 if score > threshold {
963 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
964 collector.insert_with_ordinal(doc_id, score, ordinal);
965 }
966 }
967 }
968
969 let results: Vec<(u32, u16, f32)> = collector
970 .into_sorted_results()
971 .into_iter()
972 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
973 .collect();
974
975 Ok(combine_ordinal_results(results, combiner, k))
976 }
977
978 pub fn has_dense_vector_index(&self, field: Field) -> bool {
980 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
981 }
982
983 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
985 match self.vector_indexes.get(&field.0) {
986 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
987 _ => None,
988 }
989 }
990
991 pub fn get_ivf_vector_index(
993 &self,
994 field: Field,
995 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
996 match self.vector_indexes.get(&field.0) {
997 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
998 _ => None,
999 }
1000 }
1001
1002 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
1004 self.coarse_centroids.get(&field_id)
1005 }
1006
1007 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
1009 self.coarse_centroids = centroids;
1010 }
1011
1012 pub fn get_scann_vector_index(
1014 &self,
1015 field: Field,
1016 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
1017 match self.vector_indexes.get(&field.0) {
1018 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1019 _ => None,
1020 }
1021 }
1022
1023 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
1025 self.vector_indexes.get(&field.0)
1026 }
1027
1028 pub async fn get_positions(
1033 &self,
1034 field: Field,
1035 term: &[u8],
1036 ) -> Result<Option<crate::structures::PositionPostingList>> {
1037 let handle = match &self.positions_handle {
1039 Some(h) => h,
1040 None => return Ok(None),
1041 };
1042
1043 let mut key = Vec::with_capacity(4 + term.len());
1045 key.extend_from_slice(&field.0.to_le_bytes());
1046 key.extend_from_slice(term);
1047
1048 let term_info = match self.term_dict.get(&key).await? {
1050 Some(info) => info,
1051 None => return Ok(None),
1052 };
1053
1054 let (offset, length) = match term_info.position_info() {
1056 Some((o, l)) => (o, l),
1057 None => return Ok(None),
1058 };
1059
1060 let slice = handle.slice(offset..offset + length);
1062 let data = slice.read_bytes().await?;
1063
1064 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1066
1067 Ok(Some(pos_list))
1068 }
1069
1070 pub fn has_positions(&self, field: Field) -> bool {
1072 if let Some(entry) = self.schema.get_field_entry(field) {
1074 entry.positions.is_some()
1075 } else {
1076 false
1077 }
1078 }
1079}
1080
1081#[cfg(feature = "sync")]
1083impl SegmentReader {
1084 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1086 let mut key = Vec::with_capacity(4 + term.len());
1088 key.extend_from_slice(&field.0.to_le_bytes());
1089 key.extend_from_slice(term);
1090
1091 let term_info = match self.term_dict.get_sync(&key)? {
1093 Some(info) => info,
1094 None => return Ok(None),
1095 };
1096
1097 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1099 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1100 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1101 posting_list.push(doc_id, tf);
1102 }
1103 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1104 return Ok(Some(block_list));
1105 }
1106
1107 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1109 Error::Corruption("TermInfo has neither inline nor external data".to_string())
1110 })?;
1111
1112 let start = posting_offset;
1113 let end = start + posting_len;
1114
1115 if end > self.postings_handle.len() {
1116 return Err(Error::Corruption(
1117 "Posting offset out of bounds".to_string(),
1118 ));
1119 }
1120
1121 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1122 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1123
1124 Ok(Some(block_list))
1125 }
1126
1127 pub fn get_prefix_postings_sync(
1129 &self,
1130 field: Field,
1131 prefix: &[u8],
1132 ) -> Result<Vec<BlockPostingList>> {
1133 let mut key_prefix = Vec::with_capacity(4 + prefix.len());
1134 key_prefix.extend_from_slice(&field.0.to_le_bytes());
1135 key_prefix.extend_from_slice(prefix);
1136
1137 let entries = self.term_dict.prefix_scan_sync(&key_prefix)?;
1138 let mut results = Vec::with_capacity(entries.len());
1139
1140 for (_key, term_info) in entries {
1141 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1142 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1143 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1144 posting_list.push(doc_id, tf);
1145 }
1146 results.push(BlockPostingList::from_posting_list(&posting_list)?);
1147 } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
1148 let start = posting_offset;
1149 let end = start + posting_len;
1150 if end > self.postings_handle.len() {
1151 continue;
1152 }
1153 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1154 results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
1155 }
1156 }
1157
1158 Ok(results)
1159 }
1160
1161 pub fn get_positions_sync(
1163 &self,
1164 field: Field,
1165 term: &[u8],
1166 ) -> Result<Option<crate::structures::PositionPostingList>> {
1167 let handle = match &self.positions_handle {
1168 Some(h) => h,
1169 None => return Ok(None),
1170 };
1171
1172 let mut key = Vec::with_capacity(4 + term.len());
1174 key.extend_from_slice(&field.0.to_le_bytes());
1175 key.extend_from_slice(term);
1176
1177 let term_info = match self.term_dict.get_sync(&key)? {
1179 Some(info) => info,
1180 None => return Ok(None),
1181 };
1182
1183 let (offset, length) = match term_info.position_info() {
1184 Some((o, l)) => (o, l),
1185 None => return Ok(None),
1186 };
1187
1188 let slice = handle.slice(offset..offset + length);
1189 let data = slice.read_bytes_sync()?;
1190
1191 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1192 Ok(Some(pos_list))
1193 }
1194
1195 pub fn search_dense_vector_sync(
1198 &self,
1199 field: Field,
1200 query: &[f32],
1201 k: usize,
1202 nprobe: usize,
1203 rerank_factor: f32,
1204 combiner: crate::query::MultiValueCombiner,
1205 ) -> Result<Vec<VectorSearchResult>> {
1206 let ann_index = self.vector_indexes.get(&field.0);
1207 let lazy_flat = self.flat_vectors.get(&field.0);
1208
1209 if ann_index.is_none() && lazy_flat.is_none() {
1210 return Ok(Vec::new());
1211 }
1212
1213 let unit_norm = self
1214 .schema
1215 .get_field_entry(field)
1216 .and_then(|e| e.dense_vector_config.as_ref())
1217 .is_some_and(|c| c.unit_norm);
1218
1219 const BRUTE_FORCE_BATCH: usize = 4096;
1220 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1221
1222 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1223 match index {
1225 VectorIndex::RaBitQ(lazy) => {
1226 let rabitq = lazy.get().ok_or_else(|| {
1227 Error::Schema("RaBitQ index deserialization failed".to_string())
1228 })?;
1229 rabitq
1230 .search(query, fetch_k)
1231 .into_iter()
1232 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1233 .collect()
1234 }
1235 VectorIndex::IVF(lazy) => {
1236 let (index, codebook) = lazy.get().ok_or_else(|| {
1237 Error::Schema("IVF index deserialization failed".to_string())
1238 })?;
1239 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1240 Error::Schema(format!(
1241 "IVF index requires coarse centroids for field {}",
1242 field.0
1243 ))
1244 })?;
1245 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1246 index
1247 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1248 .into_iter()
1249 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1250 .collect()
1251 }
1252 VectorIndex::ScaNN(lazy) => {
1253 let (index, codebook) = lazy.get().ok_or_else(|| {
1254 Error::Schema("ScaNN index deserialization failed".to_string())
1255 })?;
1256 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1257 Error::Schema(format!(
1258 "ScaNN index requires coarse centroids for field {}",
1259 field.0
1260 ))
1261 })?;
1262 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1263 index
1264 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1265 .into_iter()
1266 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1267 .collect()
1268 }
1269 }
1270 } else if let Some(lazy_flat) = lazy_flat {
1271 let dim = lazy_flat.dim;
1273 let n = lazy_flat.num_vectors;
1274 let quant = lazy_flat.quantization;
1275 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1276 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1277
1278 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1279 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1280 let batch_bytes = lazy_flat
1281 .read_vectors_batch_sync(batch_start, batch_count)
1282 .map_err(crate::Error::Io)?;
1283 let raw = batch_bytes.as_slice();
1284
1285 Self::score_quantized_batch(
1286 query,
1287 raw,
1288 quant,
1289 dim,
1290 &mut scores[..batch_count],
1291 unit_norm,
1292 );
1293
1294 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1295 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1296 collector.insert_with_ordinal(doc_id, score, ordinal);
1297 }
1298 }
1299
1300 collector
1301 .into_sorted_results()
1302 .into_iter()
1303 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1304 .collect()
1305 } else {
1306 return Ok(Vec::new());
1307 };
1308
1309 if ann_index.is_some()
1311 && !results.is_empty()
1312 && let Some(lazy_flat) = lazy_flat
1313 {
1314 let dim = lazy_flat.dim;
1315 let quant = lazy_flat.quantization;
1316 let vbs = lazy_flat.vector_byte_size();
1317
1318 let mut resolved: Vec<(usize, usize)> = Vec::new();
1319 for (ri, c) in results.iter().enumerate() {
1320 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1321 for (j, &(_, ord)) in entries.iter().enumerate() {
1322 if ord == c.1 {
1323 resolved.push((ri, start + j));
1324 break;
1325 }
1326 }
1327 }
1328
1329 if !resolved.is_empty() {
1330 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1331 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1332 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1333 let _ = lazy_flat.read_vector_raw_into_sync(
1334 flat_idx,
1335 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1336 );
1337 }
1338
1339 let mut scores = vec![0f32; resolved.len()];
1340 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1341
1342 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1343 results[ri].2 = scores[buf_idx];
1344 }
1345 }
1346
1347 if results.len() > fetch_k {
1348 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1349 results.truncate(fetch_k);
1350 }
1351 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1352 }
1353
1354 Ok(combine_ordinal_results(results, combiner, k))
1355 }
1356}