1pub(crate) mod bmp;
4pub(crate) mod loader;
5mod types;
6
7pub use bmp::BmpIndex;
8#[cfg(feature = "diagnostics")]
9pub use types::DimRawData;
10pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
11
12#[derive(Debug, Clone, Default)]
14pub struct SegmentMemoryStats {
15 pub segment_id: u128,
17 pub num_docs: u32,
19 pub term_dict_cache_bytes: usize,
21 pub store_cache_bytes: usize,
23 pub sparse_index_bytes: usize,
25 pub dense_index_bytes: usize,
27 pub bloom_filter_bytes: usize,
29}
30
31impl SegmentMemoryStats {
32 pub fn total_bytes(&self) -> usize {
34 self.term_dict_cache_bytes
35 + self.store_cache_bytes
36 + self.sparse_index_bytes
37 + self.dense_index_bytes
38 + self.bloom_filter_bytes
39 }
40}
41
42use std::sync::Arc;
43
44use rustc_hash::FxHashMap;
45
46use super::vector_data::LazyFlatVectorData;
47use crate::directories::{Directory, FileHandle};
48use crate::dsl::{DenseVectorQuantization, Document, Field, Schema};
49use crate::structures::{
50 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
51 RaBitQIndex, SSTableStats, TermInfo,
52};
53use crate::{DocId, Error, Result};
54
55use super::store::{AsyncStoreReader, RawStoreBlock};
56use super::types::{SegmentFiles, SegmentId, SegmentMeta};
57
58pub(crate) fn combine_ordinal_results(
64 raw: impl IntoIterator<Item = (u32, u16, f32)>,
65 combiner: crate::query::MultiValueCombiner,
66 limit: usize,
67) -> Vec<VectorSearchResult> {
68 let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
69
70 let num_raw = collected.len();
71 if log::log_enabled!(log::Level::Debug) {
72 let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
73 ids.sort_unstable();
74 ids.dedup();
75 log::debug!(
76 "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77 num_raw,
78 ids.len(),
79 combiner,
80 limit
81 );
82 }
83
84 let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
86 if all_single {
87 let mut results: Vec<VectorSearchResult> = collected
88 .into_iter()
89 .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
90 .collect();
91 results.sort_unstable_by(|a, b| {
92 b.score
93 .partial_cmp(&a.score)
94 .unwrap_or(std::cmp::Ordering::Equal)
95 });
96 results.truncate(limit);
97 return results;
98 }
99
100 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
102 rustc_hash::FxHashMap::default();
103 for (doc_id, ordinal, score) in collected {
104 doc_ordinals
105 .entry(doc_id as DocId)
106 .or_default()
107 .push((ordinal as u32, score));
108 }
109 let mut results: Vec<VectorSearchResult> = doc_ordinals
110 .into_iter()
111 .map(|(doc_id, ordinals)| {
112 let combined_score = combiner.combine(&ordinals);
113 VectorSearchResult::new(doc_id, combined_score, ordinals)
114 })
115 .collect();
116 results.sort_unstable_by(|a, b| {
117 b.score
118 .partial_cmp(&a.score)
119 .unwrap_or(std::cmp::Ordering::Equal)
120 });
121 results.truncate(limit);
122 results
123}
124
125pub struct SegmentReader {
131 meta: SegmentMeta,
132 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
134 postings_handle: FileHandle,
136 store: Arc<AsyncStoreReader>,
138 schema: Arc<Schema>,
139 vector_indexes: FxHashMap<u32, VectorIndex>,
141 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
143 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
145 sparse_indexes: FxHashMap<u32, SparseIndex>,
147 bmp_indexes: FxHashMap<u32, BmpIndex>,
149 positions_handle: Option<FileHandle>,
151 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
153 shared_threshold: std::sync::atomic::AtomicU32,
157}
158
159impl SegmentReader {
160 pub async fn open<D: Directory>(
162 dir: &D,
163 segment_id: SegmentId,
164 schema: Arc<Schema>,
165 cache_blocks: usize,
166 ) -> Result<Self> {
167 let files = SegmentFiles::new(segment_id.0);
168
169 let meta_slice = dir.open_read(&files.meta).await?;
171 let meta_bytes = meta_slice.read_bytes().await?;
172 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
173 debug_assert_eq!(meta.id, segment_id.0);
174
175 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
177 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
178
179 let postings_handle = dir.open_lazy(&files.postings).await?;
181
182 let store_handle = dir.open_lazy(&files.store).await?;
184 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
185
186 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
188 let vector_indexes = vectors_data.indexes;
189 let flat_vectors = vectors_data.flat_vectors;
190
191 let sparse_data = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
193 let sparse_indexes = sparse_data.maxscore_indexes;
194 let bmp_indexes = sparse_data.bmp_indexes;
195
196 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
198
199 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
201
202 {
204 let mut parts = vec![format!(
205 "[segment] loaded {:016x}: docs={}",
206 segment_id.0, meta.num_docs
207 )];
208 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
209 parts.push(format!(
210 "dense: {} ann + {} flat fields",
211 vector_indexes.len(),
212 flat_vectors.len()
213 ));
214 }
215 for (field_id, idx) in &sparse_indexes {
216 parts.push(format!(
217 "sparse field {}: {} dims, ~{:.1} KB",
218 field_id,
219 idx.num_dimensions(),
220 idx.num_dimensions() as f64 * 24.0 / 1024.0
221 ));
222 }
223 for (field_id, idx) in &bmp_indexes {
224 parts.push(format!(
225 "bmp field {}: {} dims, {} blocks",
226 field_id,
227 idx.dims(),
228 idx.num_blocks
229 ));
230 }
231 if !fast_fields.is_empty() {
232 parts.push(format!("fast: {} fields", fast_fields.len()));
233 }
234 log::debug!("{}", parts.join(", "));
235 }
236
237 Ok(Self {
238 meta,
239 term_dict: Arc::new(term_dict),
240 postings_handle,
241 store: Arc::new(store),
242 schema,
243 vector_indexes,
244 flat_vectors,
245 coarse_centroids: FxHashMap::default(),
246 sparse_indexes,
247 bmp_indexes,
248 positions_handle,
249 fast_fields,
250 shared_threshold: std::sync::atomic::AtomicU32::new(0),
251 })
252 }
253
254 #[inline]
256 pub fn reset_shared_threshold(&self) {
257 self.shared_threshold
258 .store(0, std::sync::atomic::Ordering::Relaxed);
259 }
260
261 #[inline]
263 pub fn shared_threshold_f32(&self) -> f32 {
264 f32::from_bits(
265 self.shared_threshold
266 .load(std::sync::atomic::Ordering::Relaxed),
267 )
268 }
269
270 #[inline]
272 pub fn update_shared_threshold(&self, new_threshold: f32) {
273 use std::sync::atomic::Ordering::Relaxed;
274 let new_bits = new_threshold.to_bits();
275 let mut current_bits = self.shared_threshold.load(Relaxed);
276 while new_threshold > f32::from_bits(current_bits) {
277 match self.shared_threshold.compare_exchange_weak(
278 current_bits,
279 new_bits,
280 Relaxed,
281 Relaxed,
282 ) {
283 Ok(_) => return,
284 Err(actual) => current_bits = actual,
285 }
286 }
287 }
288
289 pub fn meta(&self) -> &SegmentMeta {
290 &self.meta
291 }
292
293 pub fn num_docs(&self) -> u32 {
294 self.meta.num_docs
295 }
296
297 pub fn avg_field_len(&self, field: Field) -> f32 {
299 self.meta.avg_field_len(field)
300 }
301
302 pub fn schema(&self) -> &Schema {
303 &self.schema
304 }
305
306 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
308 &self.sparse_indexes
309 }
310
311 pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
313 self.sparse_indexes.get(&field.0)
314 }
315
316 pub fn bmp_index(&self, field: Field) -> Option<&BmpIndex> {
318 self.bmp_indexes.get(&field.0)
319 }
320
321 pub fn bmp_indexes(&self) -> &FxHashMap<u32, BmpIndex> {
323 &self.bmp_indexes
324 }
325
326 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
328 &self.vector_indexes
329 }
330
331 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
333 &self.flat_vectors
334 }
335
336 pub fn fast_field(
338 &self,
339 field_id: u32,
340 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
341 self.fast_fields.get(&field_id)
342 }
343
344 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
346 &self.fast_fields
347 }
348
349 pub fn term_dict_stats(&self) -> SSTableStats {
351 self.term_dict.stats()
352 }
353
354 pub fn memory_stats(&self) -> SegmentMemoryStats {
356 let term_dict_stats = self.term_dict.stats();
357
358 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
360
361 let store_cache_bytes = self.store.cached_blocks() * 4096;
363
364 let sparse_index_bytes: usize = self
366 .sparse_indexes
367 .values()
368 .map(|s| s.estimated_memory_bytes())
369 .sum::<usize>()
370 + self
371 .bmp_indexes
372 .values()
373 .map(|b| b.estimated_memory_bytes())
374 .sum::<usize>();
375
376 let dense_index_bytes: usize = self
379 .vector_indexes
380 .values()
381 .map(|v| v.estimated_memory_bytes())
382 .sum();
383
384 SegmentMemoryStats {
385 segment_id: self.meta.id,
386 num_docs: self.meta.num_docs,
387 term_dict_cache_bytes,
388 store_cache_bytes,
389 sparse_index_bytes,
390 dense_index_bytes,
391 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
392 }
393 }
394
395 pub async fn get_postings(
400 &self,
401 field: Field,
402 term: &[u8],
403 ) -> Result<Option<BlockPostingList>> {
404 log::debug!(
405 "SegmentReader::get_postings field={} term_len={}",
406 field.0,
407 term.len()
408 );
409
410 let mut key = Vec::with_capacity(4 + term.len());
412 key.extend_from_slice(&field.0.to_le_bytes());
413 key.extend_from_slice(term);
414
415 let term_info = match self.term_dict.get(&key).await? {
417 Some(info) => {
418 log::debug!("SegmentReader::get_postings found term_info");
419 info
420 }
421 None => {
422 log::debug!("SegmentReader::get_postings term not found");
423 return Ok(None);
424 }
425 };
426
427 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
429 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
431 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
432 posting_list.push(doc_id, tf);
433 }
434 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
435 return Ok(Some(block_list));
436 }
437
438 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
440 Error::Corruption("TermInfo has neither inline nor external data".to_string())
441 })?;
442
443 let start = posting_offset;
444 let end = start + posting_len;
445
446 if end > self.postings_handle.len() {
447 return Err(Error::Corruption(
448 "Posting offset out of bounds".to_string(),
449 ));
450 }
451
452 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
453 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
454
455 Ok(Some(block_list))
456 }
457
458 pub async fn get_prefix_postings(
460 &self,
461 field: Field,
462 prefix: &[u8],
463 ) -> Result<Vec<BlockPostingList>> {
464 let mut key_prefix = Vec::with_capacity(4 + prefix.len());
466 key_prefix.extend_from_slice(&field.0.to_le_bytes());
467 key_prefix.extend_from_slice(prefix);
468
469 let entries = self.term_dict.prefix_scan(&key_prefix).await?;
470 let mut results = Vec::with_capacity(entries.len());
471
472 for (_key, term_info) in entries {
473 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
474 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
475 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
476 posting_list.push(doc_id, tf);
477 }
478 results.push(BlockPostingList::from_posting_list(&posting_list)?);
479 } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
480 let start = posting_offset;
481 let end = start + posting_len;
482 if end > self.postings_handle.len() {
483 continue;
484 }
485 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
486 results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
487 }
488 }
489
490 Ok(results)
491 }
492
493 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
498 self.doc_with_fields(local_doc_id, None).await
499 }
500
501 pub async fn doc_with_fields(
507 &self,
508 local_doc_id: DocId,
509 fields: Option<&rustc_hash::FxHashSet<u32>>,
510 ) -> Result<Option<Document>> {
511 let mut doc = match fields {
512 Some(set) => {
513 let field_ids: Vec<u32> = set.iter().copied().collect();
514 match self
515 .store
516 .get_fields(local_doc_id, &self.schema, &field_ids)
517 .await
518 {
519 Ok(Some(d)) => d,
520 Ok(None) => return Ok(None),
521 Err(e) => return Err(Error::from(e)),
522 }
523 }
524 None => match self.store.get(local_doc_id, &self.schema).await {
525 Ok(Some(d)) => d,
526 Ok(None) => return Ok(None),
527 Err(e) => return Err(Error::from(e)),
528 },
529 };
530
531 for (&field_id, lazy_flat) in &self.flat_vectors {
533 if let Some(set) = fields
535 && !set.contains(&field_id)
536 {
537 continue;
538 }
539
540 let is_binary = lazy_flat.quantization == DenseVectorQuantization::Binary;
541 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
542 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
543 let flat_idx = start + j;
544 if is_binary {
545 let vbs = lazy_flat.vector_byte_size();
546 let mut raw = vec![0u8; vbs];
547 match lazy_flat.read_vector_raw_into(flat_idx, &mut raw).await {
548 Ok(()) => {
549 doc.add_binary_dense_vector(Field(field_id), raw);
550 }
551 Err(e) => {
552 log::warn!("Failed to hydrate binary vector field {}: {}", field_id, e);
553 }
554 }
555 } else {
556 match lazy_flat.get_vector(flat_idx).await {
557 Ok(vec) => {
558 doc.add_dense_vector(Field(field_id), vec);
559 }
560 Err(e) => {
561 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
562 }
563 }
564 }
565 }
566 }
567
568 Ok(Some(doc))
569 }
570
571 pub async fn prefetch_terms(
573 &self,
574 field: Field,
575 start_term: &[u8],
576 end_term: &[u8],
577 ) -> Result<()> {
578 let mut start_key = Vec::with_capacity(4 + start_term.len());
579 start_key.extend_from_slice(&field.0.to_le_bytes());
580 start_key.extend_from_slice(start_term);
581
582 let mut end_key = Vec::with_capacity(4 + end_term.len());
583 end_key.extend_from_slice(&field.0.to_le_bytes());
584 end_key.extend_from_slice(end_term);
585
586 self.term_dict.prefetch_range(&start_key, &end_key).await?;
587 Ok(())
588 }
589
590 pub fn store_has_dict(&self) -> bool {
592 self.store.has_dict()
593 }
594
595 pub fn store(&self) -> &super::store::AsyncStoreReader {
597 &self.store
598 }
599
600 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
602 self.store.raw_blocks()
603 }
604
605 pub fn store_data_slice(&self) -> &FileHandle {
607 self.store.data_slice()
608 }
609
610 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
612 self.term_dict.all_entries().await.map_err(Error::from)
613 }
614
615 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
620 let entries = self.term_dict.all_entries().await?;
621 let mut result = Vec::with_capacity(entries.len());
622
623 for (key, term_info) in entries {
624 if key.len() > 4 {
626 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
627 let term_bytes = &key[4..];
628 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
629 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
630 }
631 }
632 }
633
634 Ok(result)
635 }
636
637 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
639 self.term_dict.iter()
640 }
641
642 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
646 self.term_dict
647 .prefetch_all_data_bulk()
648 .await
649 .map_err(crate::Error::from)
650 }
651
652 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
654 let start = offset;
655 let end = start + len;
656 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
657 Ok(bytes.to_vec())
658 }
659
660 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
662 let handle = match &self.positions_handle {
663 Some(h) => h,
664 None => return Ok(None),
665 };
666 let start = offset;
667 let end = start + len;
668 let bytes = handle.read_bytes_range(start..end).await?;
669 Ok(Some(bytes.to_vec()))
670 }
671
672 pub fn has_positions_file(&self) -> bool {
674 self.positions_handle.is_some()
675 }
676
677 fn score_quantized_batch(
683 query: &[f32],
684 raw: &[u8],
685 quant: crate::dsl::DenseVectorQuantization,
686 dim: usize,
687 scores: &mut [f32],
688 unit_norm: bool,
689 ) {
690 use crate::dsl::DenseVectorQuantization;
691 use crate::structures::simd;
692 match (quant, unit_norm) {
693 (DenseVectorQuantization::F32, false) => {
694 let num_floats = scores.len() * dim;
695 debug_assert!(
696 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
697 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
698 );
699 let vectors: &[f32] =
700 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
701 simd::batch_cosine_scores(query, vectors, dim, scores);
702 }
703 (DenseVectorQuantization::F32, true) => {
704 let num_floats = scores.len() * dim;
705 debug_assert!(
706 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
707 "f32 vector data not 4-byte aligned"
708 );
709 let vectors: &[f32] =
710 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
711 simd::batch_dot_scores(query, vectors, dim, scores);
712 }
713 (DenseVectorQuantization::F16, false) => {
714 simd::batch_cosine_scores_f16(query, raw, dim, scores);
715 }
716 (DenseVectorQuantization::F16, true) => {
717 simd::batch_dot_scores_f16(query, raw, dim, scores);
718 }
719 (DenseVectorQuantization::UInt8, false) => {
720 simd::batch_cosine_scores_u8(query, raw, dim, scores);
721 }
722 (DenseVectorQuantization::UInt8, true) => {
723 simd::batch_dot_scores_u8(query, raw, dim, scores);
724 }
725 (DenseVectorQuantization::Binary, _) => {
726 unreachable!("Binary quantization should not reach score_quantized_batch");
728 }
729 }
730 }
731
732 pub async fn search_dense_vector(
738 &self,
739 field: Field,
740 query: &[f32],
741 k: usize,
742 nprobe: usize,
743 rerank_factor: f32,
744 combiner: crate::query::MultiValueCombiner,
745 ) -> Result<Vec<VectorSearchResult>> {
746 let ann_index = self.vector_indexes.get(&field.0);
747 let lazy_flat = self.flat_vectors.get(&field.0);
748
749 if ann_index.is_none() && lazy_flat.is_none() {
751 return Ok(Vec::new());
752 }
753
754 let unit_norm = self
756 .schema
757 .get_field_entry(field)
758 .and_then(|e| e.dense_vector_config.as_ref())
759 .is_some_and(|c| c.unit_norm);
760
761 const BRUTE_FORCE_BATCH: usize = 4096;
763
764 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
765
766 let t0 = std::time::Instant::now();
768 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
769 match index {
771 VectorIndex::RaBitQ(lazy) => {
772 let rabitq = lazy.get().ok_or_else(|| {
773 Error::Schema("RaBitQ index deserialization failed".to_string())
774 })?;
775 rabitq
776 .search(query, fetch_k)
777 .into_iter()
778 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
779 .collect()
780 }
781 VectorIndex::IVF(lazy) => {
782 let (index, codebook) = lazy.get().ok_or_else(|| {
783 Error::Schema("IVF index deserialization failed".to_string())
784 })?;
785 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
786 Error::Schema(format!(
787 "IVF index requires coarse centroids for field {}",
788 field.0
789 ))
790 })?;
791 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
792 index
793 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
794 .into_iter()
795 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
796 .collect()
797 }
798 VectorIndex::ScaNN(lazy) => {
799 let (index, codebook) = lazy.get().ok_or_else(|| {
800 Error::Schema("ScaNN index deserialization failed".to_string())
801 })?;
802 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
803 Error::Schema(format!(
804 "ScaNN index requires coarse centroids for field {}",
805 field.0
806 ))
807 })?;
808 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
809 index
810 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
811 .into_iter()
812 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
813 .collect()
814 }
815 }
816 } else if let Some(lazy_flat) = lazy_flat {
817 log::debug!(
820 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
821 field.0,
822 lazy_flat.num_vectors,
823 lazy_flat.dim,
824 lazy_flat.quantization
825 );
826 let dim = lazy_flat.dim;
827 let n = lazy_flat.num_vectors;
828 let quant = lazy_flat.quantization;
829 let mut collector = crate::query::ScoreCollector::new(fetch_k);
830 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
831
832 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
833 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
834 let batch_bytes = lazy_flat
835 .read_vectors_batch(batch_start, batch_count)
836 .await
837 .map_err(crate::Error::Io)?;
838 let raw = batch_bytes.as_slice();
839
840 Self::score_quantized_batch(
841 query,
842 raw,
843 quant,
844 dim,
845 &mut scores[..batch_count],
846 unit_norm,
847 );
848
849 for (i, &score) in scores.iter().enumerate().take(batch_count) {
850 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
851 collector.insert_with_ordinal(doc_id, score, ordinal);
852 }
853 }
854
855 collector
856 .into_sorted_results()
857 .into_iter()
858 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
859 .collect()
860 } else {
861 return Ok(Vec::new());
862 };
863 let l1_elapsed = t0.elapsed();
864 log::debug!(
865 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
866 field.0,
867 results.len(),
868 l1_elapsed.as_secs_f64() * 1000.0
869 );
870
871 if ann_index.is_some()
874 && !results.is_empty()
875 && let Some(lazy_flat) = lazy_flat
876 {
877 let t_rerank = std::time::Instant::now();
878 let dim = lazy_flat.dim;
879 let quant = lazy_flat.quantization;
880 let vbs = lazy_flat.vector_byte_size();
881
882 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
885 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
886 for (j, &(_, ord)) in entries.iter().enumerate() {
887 if ord == c.1 {
888 resolved.push((ri, start + j));
889 break;
890 }
891 }
892 }
893
894 let t_resolve = t_rerank.elapsed();
895 if !resolved.is_empty() {
896 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
898
899 let t_read = std::time::Instant::now();
901 let mut raw_buf = vec![0u8; resolved.len() * vbs];
902 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
903 let _ = lazy_flat
904 .read_vector_raw_into(
905 flat_idx,
906 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
907 )
908 .await;
909 }
910
911 let read_elapsed = t_read.elapsed();
912
913 let t_score = std::time::Instant::now();
915 let mut scores = vec![0f32; resolved.len()];
916 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
917 let score_elapsed = t_score.elapsed();
918
919 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
921 results[ri].2 = scores[buf_idx];
922 }
923
924 log::debug!(
925 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
926 field.0,
927 resolved.len(),
928 dim,
929 quant,
930 vbs,
931 t_resolve.as_secs_f64() * 1000.0,
932 read_elapsed.as_secs_f64() * 1000.0,
933 score_elapsed.as_secs_f64() * 1000.0,
934 );
935 }
936
937 if results.len() > fetch_k {
938 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
939 results.truncate(fetch_k);
940 }
941 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
942 log::debug!(
943 "[search_dense] field {}: rerank total={:.1}ms",
944 field.0,
945 t_rerank.elapsed().as_secs_f64() * 1000.0
946 );
947 }
948
949 Ok(combine_ordinal_results(results, combiner, k))
950 }
951
952 pub async fn search_binary_dense_vector(
956 &self,
957 field: Field,
958 query: &[u8],
959 k: usize,
960 combiner: crate::query::MultiValueCombiner,
961 ) -> Result<Vec<VectorSearchResult>> {
962 let lazy_flat = match self.flat_vectors.get(&field.0) {
963 Some(f) => f,
964 None => return Ok(Vec::new()),
965 };
966
967 const BRUTE_FORCE_BATCH: usize = 8192; let dim_bits = lazy_flat.dim;
970 let byte_len = lazy_flat.vector_byte_size();
971 let n = lazy_flat.num_vectors;
972
973 if byte_len != query.len() {
974 return Err(Error::Schema(format!(
975 "Binary query vector byte length {} != field byte length {}",
976 query.len(),
977 byte_len
978 )));
979 }
980
981 let mut collector = crate::query::ScoreCollector::new(k);
982 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
983
984 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
985 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
986 let batch_bytes = lazy_flat
987 .read_vectors_batch(batch_start, batch_count)
988 .await
989 .map_err(crate::Error::Io)?;
990 let raw = batch_bytes.as_slice();
991
992 crate::structures::simd::batch_hamming_scores(
993 query,
994 raw,
995 byte_len,
996 dim_bits,
997 &mut scores[..batch_count],
998 );
999
1000 let threshold = collector.threshold();
1001 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1002 if score > threshold {
1003 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1004 collector.insert_with_ordinal(doc_id, score, ordinal);
1005 }
1006 }
1007 }
1008
1009 let results: Vec<(u32, u16, f32)> = collector
1010 .into_sorted_results()
1011 .into_iter()
1012 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1013 .collect();
1014
1015 Ok(combine_ordinal_results(results, combiner, k))
1016 }
1017
1018 pub fn has_dense_vector_index(&self, field: Field) -> bool {
1020 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
1021 }
1022
1023 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
1025 match self.vector_indexes.get(&field.0) {
1026 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
1027 _ => None,
1028 }
1029 }
1030
1031 pub fn get_ivf_vector_index(
1033 &self,
1034 field: Field,
1035 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
1036 match self.vector_indexes.get(&field.0) {
1037 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1038 _ => None,
1039 }
1040 }
1041
1042 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
1044 self.coarse_centroids.get(&field_id)
1045 }
1046
1047 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
1049 self.coarse_centroids = centroids;
1050 }
1051
1052 pub fn get_scann_vector_index(
1054 &self,
1055 field: Field,
1056 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
1057 match self.vector_indexes.get(&field.0) {
1058 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
1059 _ => None,
1060 }
1061 }
1062
1063 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
1065 self.vector_indexes.get(&field.0)
1066 }
1067
1068 pub async fn get_positions(
1073 &self,
1074 field: Field,
1075 term: &[u8],
1076 ) -> Result<Option<crate::structures::PositionPostingList>> {
1077 let handle = match &self.positions_handle {
1079 Some(h) => h,
1080 None => return Ok(None),
1081 };
1082
1083 let mut key = Vec::with_capacity(4 + term.len());
1085 key.extend_from_slice(&field.0.to_le_bytes());
1086 key.extend_from_slice(term);
1087
1088 let term_info = match self.term_dict.get(&key).await? {
1090 Some(info) => info,
1091 None => return Ok(None),
1092 };
1093
1094 let (offset, length) = match term_info.position_info() {
1096 Some((o, l)) => (o, l),
1097 None => return Ok(None),
1098 };
1099
1100 let slice = handle.slice(offset..offset + length);
1102 let data = slice.read_bytes().await?;
1103
1104 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1106
1107 Ok(Some(pos_list))
1108 }
1109
1110 pub fn has_positions(&self, field: Field) -> bool {
1112 if let Some(entry) = self.schema.get_field_entry(field) {
1114 entry.positions.is_some()
1115 } else {
1116 false
1117 }
1118 }
1119}
1120
1121#[cfg(feature = "sync")]
1123impl SegmentReader {
1124 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1126 let mut key = Vec::with_capacity(4 + term.len());
1128 key.extend_from_slice(&field.0.to_le_bytes());
1129 key.extend_from_slice(term);
1130
1131 let term_info = match self.term_dict.get_sync(&key)? {
1133 Some(info) => info,
1134 None => return Ok(None),
1135 };
1136
1137 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1139 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1140 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1141 posting_list.push(doc_id, tf);
1142 }
1143 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1144 return Ok(Some(block_list));
1145 }
1146
1147 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1149 Error::Corruption("TermInfo has neither inline nor external data".to_string())
1150 })?;
1151
1152 let start = posting_offset;
1153 let end = start + posting_len;
1154
1155 if end > self.postings_handle.len() {
1156 return Err(Error::Corruption(
1157 "Posting offset out of bounds".to_string(),
1158 ));
1159 }
1160
1161 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1162 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1163
1164 Ok(Some(block_list))
1165 }
1166
1167 pub fn get_prefix_postings_sync(
1169 &self,
1170 field: Field,
1171 prefix: &[u8],
1172 ) -> Result<Vec<BlockPostingList>> {
1173 let mut key_prefix = Vec::with_capacity(4 + prefix.len());
1174 key_prefix.extend_from_slice(&field.0.to_le_bytes());
1175 key_prefix.extend_from_slice(prefix);
1176
1177 let entries = self.term_dict.prefix_scan_sync(&key_prefix)?;
1178 let mut results = Vec::with_capacity(entries.len());
1179
1180 for (_key, term_info) in entries {
1181 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1182 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1183 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1184 posting_list.push(doc_id, tf);
1185 }
1186 results.push(BlockPostingList::from_posting_list(&posting_list)?);
1187 } else if let Some((posting_offset, posting_len)) = term_info.external_info() {
1188 let start = posting_offset;
1189 let end = start + posting_len;
1190 if end > self.postings_handle.len() {
1191 continue;
1192 }
1193 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1194 results.push(BlockPostingList::deserialize_zero_copy(posting_bytes)?);
1195 }
1196 }
1197
1198 Ok(results)
1199 }
1200
1201 pub fn get_positions_sync(
1203 &self,
1204 field: Field,
1205 term: &[u8],
1206 ) -> Result<Option<crate::structures::PositionPostingList>> {
1207 let handle = match &self.positions_handle {
1208 Some(h) => h,
1209 None => return Ok(None),
1210 };
1211
1212 let mut key = Vec::with_capacity(4 + term.len());
1214 key.extend_from_slice(&field.0.to_le_bytes());
1215 key.extend_from_slice(term);
1216
1217 let term_info = match self.term_dict.get_sync(&key)? {
1219 Some(info) => info,
1220 None => return Ok(None),
1221 };
1222
1223 let (offset, length) = match term_info.position_info() {
1224 Some((o, l)) => (o, l),
1225 None => return Ok(None),
1226 };
1227
1228 let slice = handle.slice(offset..offset + length);
1229 let data = slice.read_bytes_sync()?;
1230
1231 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1232 Ok(Some(pos_list))
1233 }
1234
1235 pub fn search_dense_vector_sync(
1238 &self,
1239 field: Field,
1240 query: &[f32],
1241 k: usize,
1242 nprobe: usize,
1243 rerank_factor: f32,
1244 combiner: crate::query::MultiValueCombiner,
1245 ) -> Result<Vec<VectorSearchResult>> {
1246 let ann_index = self.vector_indexes.get(&field.0);
1247 let lazy_flat = self.flat_vectors.get(&field.0);
1248
1249 if ann_index.is_none() && lazy_flat.is_none() {
1250 return Ok(Vec::new());
1251 }
1252
1253 let unit_norm = self
1254 .schema
1255 .get_field_entry(field)
1256 .and_then(|e| e.dense_vector_config.as_ref())
1257 .is_some_and(|c| c.unit_norm);
1258
1259 const BRUTE_FORCE_BATCH: usize = 4096;
1260 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1261
1262 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1263 match index {
1265 VectorIndex::RaBitQ(lazy) => {
1266 let rabitq = lazy.get().ok_or_else(|| {
1267 Error::Schema("RaBitQ index deserialization failed".to_string())
1268 })?;
1269 rabitq
1270 .search(query, fetch_k)
1271 .into_iter()
1272 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1273 .collect()
1274 }
1275 VectorIndex::IVF(lazy) => {
1276 let (index, codebook) = lazy.get().ok_or_else(|| {
1277 Error::Schema("IVF index deserialization failed".to_string())
1278 })?;
1279 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1280 Error::Schema(format!(
1281 "IVF index requires coarse centroids for field {}",
1282 field.0
1283 ))
1284 })?;
1285 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1286 index
1287 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1288 .into_iter()
1289 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1290 .collect()
1291 }
1292 VectorIndex::ScaNN(lazy) => {
1293 let (index, codebook) = lazy.get().ok_or_else(|| {
1294 Error::Schema("ScaNN index deserialization failed".to_string())
1295 })?;
1296 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1297 Error::Schema(format!(
1298 "ScaNN index requires coarse centroids for field {}",
1299 field.0
1300 ))
1301 })?;
1302 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1303 index
1304 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1305 .into_iter()
1306 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1307 .collect()
1308 }
1309 }
1310 } else if let Some(lazy_flat) = lazy_flat {
1311 let dim = lazy_flat.dim;
1313 let n = lazy_flat.num_vectors;
1314 let quant = lazy_flat.quantization;
1315 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1316 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1317
1318 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1319 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1320 let batch_bytes = lazy_flat
1321 .read_vectors_batch_sync(batch_start, batch_count)
1322 .map_err(crate::Error::Io)?;
1323 let raw = batch_bytes.as_slice();
1324
1325 Self::score_quantized_batch(
1326 query,
1327 raw,
1328 quant,
1329 dim,
1330 &mut scores[..batch_count],
1331 unit_norm,
1332 );
1333
1334 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1335 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1336 collector.insert_with_ordinal(doc_id, score, ordinal);
1337 }
1338 }
1339
1340 collector
1341 .into_sorted_results()
1342 .into_iter()
1343 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1344 .collect()
1345 } else {
1346 return Ok(Vec::new());
1347 };
1348
1349 if ann_index.is_some()
1351 && !results.is_empty()
1352 && let Some(lazy_flat) = lazy_flat
1353 {
1354 let dim = lazy_flat.dim;
1355 let quant = lazy_flat.quantization;
1356 let vbs = lazy_flat.vector_byte_size();
1357
1358 let mut resolved: Vec<(usize, usize)> = Vec::new();
1359 for (ri, c) in results.iter().enumerate() {
1360 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1361 for (j, &(_, ord)) in entries.iter().enumerate() {
1362 if ord == c.1 {
1363 resolved.push((ri, start + j));
1364 break;
1365 }
1366 }
1367 }
1368
1369 if !resolved.is_empty() {
1370 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1371 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1372 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1373 let _ = lazy_flat.read_vector_raw_into_sync(
1374 flat_idx,
1375 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1376 );
1377 }
1378
1379 let mut scores = vec![0f32; resolved.len()];
1380 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1381
1382 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1383 results[ri].2 = scores[buf_idx];
1384 }
1385 }
1386
1387 if results.len() > fetch_k {
1388 results.select_nth_unstable_by(fetch_k, |a, b| b.2.total_cmp(&a.2));
1389 results.truncate(fetch_k);
1390 }
1391 results.sort_unstable_by(|a, b| b.2.total_cmp(&a.2));
1392 }
1393
1394 Ok(combine_ordinal_results(results, combiner, k))
1395 }
1396}