1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub(crate) fn combine_ordinal_results(
62 raw: impl IntoIterator<Item = (u32, u16, f32)>,
63 combiner: crate::query::MultiValueCombiner,
64 limit: usize,
65) -> Vec<VectorSearchResult> {
66 let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68 let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
70 if all_single {
71 let mut results: Vec<VectorSearchResult> = collected
72 .into_iter()
73 .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
74 .collect();
75 results.sort_unstable_by(|a, b| {
76 b.score
77 .partial_cmp(&a.score)
78 .unwrap_or(std::cmp::Ordering::Equal)
79 });
80 results.truncate(limit);
81 return results;
82 }
83
84 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
86 rustc_hash::FxHashMap::default();
87 for (doc_id, ordinal, score) in collected {
88 doc_ordinals
89 .entry(doc_id as DocId)
90 .or_default()
91 .push((ordinal as u32, score));
92 }
93 let mut results: Vec<VectorSearchResult> = doc_ordinals
94 .into_iter()
95 .map(|(doc_id, ordinals)| {
96 let combined_score = combiner.combine(&ordinals);
97 VectorSearchResult::new(doc_id, combined_score, ordinals)
98 })
99 .collect();
100 results.sort_unstable_by(|a, b| {
101 b.score
102 .partial_cmp(&a.score)
103 .unwrap_or(std::cmp::Ordering::Equal)
104 });
105 results.truncate(limit);
106 results
107}
108
109pub struct SegmentReader {
115 meta: SegmentMeta,
116 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
118 postings_handle: FileHandle,
120 store: Arc<AsyncStoreReader>,
122 schema: Arc<Schema>,
123 vector_indexes: FxHashMap<u32, VectorIndex>,
125 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
127 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
129 sparse_indexes: FxHashMap<u32, SparseIndex>,
131 positions_handle: Option<FileHandle>,
133 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
135}
136
137impl SegmentReader {
138 pub async fn open<D: Directory>(
140 dir: &D,
141 segment_id: SegmentId,
142 schema: Arc<Schema>,
143 cache_blocks: usize,
144 ) -> Result<Self> {
145 let files = SegmentFiles::new(segment_id.0);
146
147 let meta_slice = dir.open_read(&files.meta).await?;
149 let meta_bytes = meta_slice.read_bytes().await?;
150 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
151 debug_assert_eq!(meta.id, segment_id.0);
152
153 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
155 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
156
157 let postings_handle = dir.open_lazy(&files.postings).await?;
159
160 let store_handle = dir.open_lazy(&files.store).await?;
162 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
163
164 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
166 let vector_indexes = vectors_data.indexes;
167 let flat_vectors = vectors_data.flat_vectors;
168
169 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
171
172 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
174
175 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
177
178 {
180 let mut parts = vec![format!(
181 "[segment] loaded {:016x}: docs={}",
182 segment_id.0, meta.num_docs
183 )];
184 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
185 parts.push(format!(
186 "dense: {} ann + {} flat fields",
187 vector_indexes.len(),
188 flat_vectors.len()
189 ));
190 }
191 for (field_id, idx) in &sparse_indexes {
192 parts.push(format!(
193 "sparse field {}: {} dims, ~{:.1} KB",
194 field_id,
195 idx.num_dimensions(),
196 idx.num_dimensions() as f64 * 24.0 / 1024.0
197 ));
198 }
199 if !fast_fields.is_empty() {
200 parts.push(format!("fast: {} fields", fast_fields.len()));
201 }
202 log::debug!("{}", parts.join(", "));
203 }
204
205 Ok(Self {
206 meta,
207 term_dict: Arc::new(term_dict),
208 postings_handle,
209 store: Arc::new(store),
210 schema,
211 vector_indexes,
212 flat_vectors,
213 coarse_centroids: FxHashMap::default(),
214 sparse_indexes,
215 positions_handle,
216 fast_fields,
217 })
218 }
219
220 pub fn meta(&self) -> &SegmentMeta {
221 &self.meta
222 }
223
224 pub fn num_docs(&self) -> u32 {
225 self.meta.num_docs
226 }
227
228 pub fn avg_field_len(&self, field: Field) -> f32 {
230 self.meta.avg_field_len(field)
231 }
232
233 pub fn schema(&self) -> &Schema {
234 &self.schema
235 }
236
237 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
239 &self.sparse_indexes
240 }
241
242 pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
244 self.sparse_indexes.get(&field.0)
245 }
246
247 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
249 &self.vector_indexes
250 }
251
252 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
254 &self.flat_vectors
255 }
256
257 pub fn fast_field(
259 &self,
260 field_id: u32,
261 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
262 self.fast_fields.get(&field_id)
263 }
264
265 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
267 &self.fast_fields
268 }
269
270 pub fn term_dict_stats(&self) -> SSTableStats {
272 self.term_dict.stats()
273 }
274
275 pub fn memory_stats(&self) -> SegmentMemoryStats {
277 let term_dict_stats = self.term_dict.stats();
278
279 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
281
282 let store_cache_bytes = self.store.cached_blocks() * 4096;
284
285 let sparse_index_bytes: usize = self
287 .sparse_indexes
288 .values()
289 .map(|s| s.estimated_memory_bytes())
290 .sum();
291
292 let dense_index_bytes: usize = self
295 .vector_indexes
296 .values()
297 .map(|v| v.estimated_memory_bytes())
298 .sum();
299
300 SegmentMemoryStats {
301 segment_id: self.meta.id,
302 num_docs: self.meta.num_docs,
303 term_dict_cache_bytes,
304 store_cache_bytes,
305 sparse_index_bytes,
306 dense_index_bytes,
307 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
308 }
309 }
310
311 pub async fn get_postings(
316 &self,
317 field: Field,
318 term: &[u8],
319 ) -> Result<Option<BlockPostingList>> {
320 log::debug!(
321 "SegmentReader::get_postings field={} term_len={}",
322 field.0,
323 term.len()
324 );
325
326 let mut key = Vec::with_capacity(4 + term.len());
328 key.extend_from_slice(&field.0.to_le_bytes());
329 key.extend_from_slice(term);
330
331 let term_info = match self.term_dict.get(&key).await? {
333 Some(info) => {
334 log::debug!("SegmentReader::get_postings found term_info");
335 info
336 }
337 None => {
338 log::debug!("SegmentReader::get_postings term not found");
339 return Ok(None);
340 }
341 };
342
343 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
345 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
347 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
348 posting_list.push(doc_id, tf);
349 }
350 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
351 return Ok(Some(block_list));
352 }
353
354 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
356 Error::Corruption("TermInfo has neither inline nor external data".to_string())
357 })?;
358
359 let start = posting_offset;
360 let end = start + posting_len;
361
362 if end > self.postings_handle.len() {
363 return Err(Error::Corruption(
364 "Posting offset out of bounds".to_string(),
365 ));
366 }
367
368 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
369 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
370
371 Ok(Some(block_list))
372 }
373
374 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
379 self.doc_with_fields(local_doc_id, None).await
380 }
381
382 pub async fn doc_with_fields(
388 &self,
389 local_doc_id: DocId,
390 fields: Option<&rustc_hash::FxHashSet<u32>>,
391 ) -> Result<Option<Document>> {
392 let mut doc = match fields {
393 Some(set) => {
394 let field_ids: Vec<u32> = set.iter().copied().collect();
395 match self
396 .store
397 .get_fields(local_doc_id, &self.schema, &field_ids)
398 .await
399 {
400 Ok(Some(d)) => d,
401 Ok(None) => return Ok(None),
402 Err(e) => return Err(Error::from(e)),
403 }
404 }
405 None => match self.store.get(local_doc_id, &self.schema).await {
406 Ok(Some(d)) => d,
407 Ok(None) => return Ok(None),
408 Err(e) => return Err(Error::from(e)),
409 },
410 };
411
412 for (&field_id, lazy_flat) in &self.flat_vectors {
414 if let Some(set) = fields
416 && !set.contains(&field_id)
417 {
418 continue;
419 }
420
421 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
422 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
423 let flat_idx = start + j;
424 match lazy_flat.get_vector(flat_idx).await {
425 Ok(vec) => {
426 doc.add_dense_vector(Field(field_id), vec);
427 }
428 Err(e) => {
429 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
430 }
431 }
432 }
433 }
434
435 Ok(Some(doc))
436 }
437
438 pub async fn prefetch_terms(
440 &self,
441 field: Field,
442 start_term: &[u8],
443 end_term: &[u8],
444 ) -> Result<()> {
445 let mut start_key = Vec::with_capacity(4 + start_term.len());
446 start_key.extend_from_slice(&field.0.to_le_bytes());
447 start_key.extend_from_slice(start_term);
448
449 let mut end_key = Vec::with_capacity(4 + end_term.len());
450 end_key.extend_from_slice(&field.0.to_le_bytes());
451 end_key.extend_from_slice(end_term);
452
453 self.term_dict.prefetch_range(&start_key, &end_key).await?;
454 Ok(())
455 }
456
457 pub fn store_has_dict(&self) -> bool {
459 self.store.has_dict()
460 }
461
462 pub fn store(&self) -> &super::store::AsyncStoreReader {
464 &self.store
465 }
466
467 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
469 self.store.raw_blocks()
470 }
471
472 pub fn store_data_slice(&self) -> &FileHandle {
474 self.store.data_slice()
475 }
476
477 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
479 self.term_dict.all_entries().await.map_err(Error::from)
480 }
481
482 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
487 let entries = self.term_dict.all_entries().await?;
488 let mut result = Vec::with_capacity(entries.len());
489
490 for (key, term_info) in entries {
491 if key.len() > 4 {
493 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
494 let term_bytes = &key[4..];
495 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
496 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
497 }
498 }
499 }
500
501 Ok(result)
502 }
503
504 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
506 self.term_dict.iter()
507 }
508
509 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
513 self.term_dict
514 .prefetch_all_data_bulk()
515 .await
516 .map_err(crate::Error::from)
517 }
518
519 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
521 let start = offset;
522 let end = start + len;
523 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
524 Ok(bytes.to_vec())
525 }
526
527 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
529 let handle = match &self.positions_handle {
530 Some(h) => h,
531 None => return Ok(None),
532 };
533 let start = offset;
534 let end = start + len;
535 let bytes = handle.read_bytes_range(start..end).await?;
536 Ok(Some(bytes.to_vec()))
537 }
538
539 pub fn has_positions_file(&self) -> bool {
541 self.positions_handle.is_some()
542 }
543
544 fn score_quantized_batch(
550 query: &[f32],
551 raw: &[u8],
552 quant: crate::dsl::DenseVectorQuantization,
553 dim: usize,
554 scores: &mut [f32],
555 unit_norm: bool,
556 ) {
557 use crate::dsl::DenseVectorQuantization;
558 use crate::structures::simd;
559 match (quant, unit_norm) {
560 (DenseVectorQuantization::F32, false) => {
561 let num_floats = scores.len() * dim;
562 debug_assert!(
563 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
564 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
565 );
566 let vectors: &[f32] =
567 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
568 simd::batch_cosine_scores(query, vectors, dim, scores);
569 }
570 (DenseVectorQuantization::F32, true) => {
571 let num_floats = scores.len() * dim;
572 debug_assert!(
573 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
574 "f32 vector data not 4-byte aligned"
575 );
576 let vectors: &[f32] =
577 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
578 simd::batch_dot_scores(query, vectors, dim, scores);
579 }
580 (DenseVectorQuantization::F16, false) => {
581 simd::batch_cosine_scores_f16(query, raw, dim, scores);
582 }
583 (DenseVectorQuantization::F16, true) => {
584 simd::batch_dot_scores_f16(query, raw, dim, scores);
585 }
586 (DenseVectorQuantization::UInt8, false) => {
587 simd::batch_cosine_scores_u8(query, raw, dim, scores);
588 }
589 (DenseVectorQuantization::UInt8, true) => {
590 simd::batch_dot_scores_u8(query, raw, dim, scores);
591 }
592 }
593 }
594
595 pub async fn search_dense_vector(
601 &self,
602 field: Field,
603 query: &[f32],
604 k: usize,
605 nprobe: usize,
606 rerank_factor: f32,
607 combiner: crate::query::MultiValueCombiner,
608 ) -> Result<Vec<VectorSearchResult>> {
609 let ann_index = self.vector_indexes.get(&field.0);
610 let lazy_flat = self.flat_vectors.get(&field.0);
611
612 if ann_index.is_none() && lazy_flat.is_none() {
614 return Ok(Vec::new());
615 }
616
617 let unit_norm = self
619 .schema
620 .get_field_entry(field)
621 .and_then(|e| e.dense_vector_config.as_ref())
622 .is_some_and(|c| c.unit_norm);
623
624 const BRUTE_FORCE_BATCH: usize = 4096;
626
627 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
628
629 let t0 = std::time::Instant::now();
631 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
632 match index {
634 VectorIndex::RaBitQ(lazy) => {
635 let rabitq = lazy.get().ok_or_else(|| {
636 Error::Schema("RaBitQ index deserialization failed".to_string())
637 })?;
638 rabitq
639 .search(query, fetch_k)
640 .into_iter()
641 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
642 .collect()
643 }
644 VectorIndex::IVF(lazy) => {
645 let (index, codebook) = lazy.get().ok_or_else(|| {
646 Error::Schema("IVF index deserialization failed".to_string())
647 })?;
648 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
649 Error::Schema(format!(
650 "IVF index requires coarse centroids for field {}",
651 field.0
652 ))
653 })?;
654 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
655 index
656 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
657 .into_iter()
658 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
659 .collect()
660 }
661 VectorIndex::ScaNN(lazy) => {
662 let (index, codebook) = lazy.get().ok_or_else(|| {
663 Error::Schema("ScaNN index deserialization failed".to_string())
664 })?;
665 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
666 Error::Schema(format!(
667 "ScaNN index requires coarse centroids for field {}",
668 field.0
669 ))
670 })?;
671 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
672 index
673 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
674 .into_iter()
675 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
676 .collect()
677 }
678 }
679 } else if let Some(lazy_flat) = lazy_flat {
680 log::debug!(
683 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
684 field.0,
685 lazy_flat.num_vectors,
686 lazy_flat.dim,
687 lazy_flat.quantization
688 );
689 let dim = lazy_flat.dim;
690 let n = lazy_flat.num_vectors;
691 let quant = lazy_flat.quantization;
692 let mut collector = crate::query::ScoreCollector::new(fetch_k);
693 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
694
695 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
696 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
697 let batch_bytes = lazy_flat
698 .read_vectors_batch(batch_start, batch_count)
699 .await
700 .map_err(crate::Error::Io)?;
701 let raw = batch_bytes.as_slice();
702
703 Self::score_quantized_batch(
704 query,
705 raw,
706 quant,
707 dim,
708 &mut scores[..batch_count],
709 unit_norm,
710 );
711
712 for (i, &score) in scores.iter().enumerate().take(batch_count) {
713 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
714 collector.insert_with_ordinal(doc_id, score, ordinal);
715 }
716 }
717
718 collector
719 .into_sorted_results()
720 .into_iter()
721 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
722 .collect()
723 } else {
724 return Ok(Vec::new());
725 };
726 let l1_elapsed = t0.elapsed();
727 log::debug!(
728 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
729 field.0,
730 results.len(),
731 l1_elapsed.as_secs_f64() * 1000.0
732 );
733
734 if ann_index.is_some()
737 && !results.is_empty()
738 && let Some(lazy_flat) = lazy_flat
739 {
740 let t_rerank = std::time::Instant::now();
741 let dim = lazy_flat.dim;
742 let quant = lazy_flat.quantization;
743 let vbs = lazy_flat.vector_byte_size();
744
745 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
748 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
749 for (j, &(_, ord)) in entries.iter().enumerate() {
750 if ord == c.1 {
751 resolved.push((ri, start + j));
752 break;
753 }
754 }
755 }
756
757 let t_resolve = t_rerank.elapsed();
758 if !resolved.is_empty() {
759 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
761
762 let t_read = std::time::Instant::now();
764 let mut raw_buf = vec![0u8; resolved.len() * vbs];
765 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
766 let _ = lazy_flat
767 .read_vector_raw_into(
768 flat_idx,
769 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
770 )
771 .await;
772 }
773
774 let read_elapsed = t_read.elapsed();
775
776 let t_score = std::time::Instant::now();
778 let mut scores = vec![0f32; resolved.len()];
779 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
780 let score_elapsed = t_score.elapsed();
781
782 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
784 results[ri].2 = scores[buf_idx];
785 }
786
787 log::debug!(
788 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
789 field.0,
790 resolved.len(),
791 dim,
792 quant,
793 vbs,
794 t_resolve.as_secs_f64() * 1000.0,
795 read_elapsed.as_secs_f64() * 1000.0,
796 score_elapsed.as_secs_f64() * 1000.0,
797 );
798 }
799
800 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
801 results.truncate(fetch_k);
802 log::debug!(
803 "[search_dense] field {}: rerank total={:.1}ms",
804 field.0,
805 t_rerank.elapsed().as_secs_f64() * 1000.0
806 );
807 }
808
809 Ok(combine_ordinal_results(results, combiner, k))
810 }
811
812 pub fn has_dense_vector_index(&self, field: Field) -> bool {
814 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
815 }
816
817 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
819 match self.vector_indexes.get(&field.0) {
820 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
821 _ => None,
822 }
823 }
824
825 pub fn get_ivf_vector_index(
827 &self,
828 field: Field,
829 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
830 match self.vector_indexes.get(&field.0) {
831 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
832 _ => None,
833 }
834 }
835
836 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
838 self.coarse_centroids.get(&field_id)
839 }
840
841 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
843 self.coarse_centroids = centroids;
844 }
845
846 pub fn get_scann_vector_index(
848 &self,
849 field: Field,
850 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
851 match self.vector_indexes.get(&field.0) {
852 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
853 _ => None,
854 }
855 }
856
857 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
859 self.vector_indexes.get(&field.0)
860 }
861
862 pub async fn get_positions(
867 &self,
868 field: Field,
869 term: &[u8],
870 ) -> Result<Option<crate::structures::PositionPostingList>> {
871 let handle = match &self.positions_handle {
873 Some(h) => h,
874 None => return Ok(None),
875 };
876
877 let mut key = Vec::with_capacity(4 + term.len());
879 key.extend_from_slice(&field.0.to_le_bytes());
880 key.extend_from_slice(term);
881
882 let term_info = match self.term_dict.get(&key).await? {
884 Some(info) => info,
885 None => return Ok(None),
886 };
887
888 let (offset, length) = match term_info.position_info() {
890 Some((o, l)) => (o, l),
891 None => return Ok(None),
892 };
893
894 let slice = handle.slice(offset..offset + length);
896 let data = slice.read_bytes().await?;
897
898 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
900
901 Ok(Some(pos_list))
902 }
903
904 pub fn has_positions(&self, field: Field) -> bool {
906 if let Some(entry) = self.schema.get_field_entry(field) {
908 entry.positions.is_some()
909 } else {
910 false
911 }
912 }
913}
914
915#[cfg(feature = "sync")]
917impl SegmentReader {
918 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
920 let mut key = Vec::with_capacity(4 + term.len());
922 key.extend_from_slice(&field.0.to_le_bytes());
923 key.extend_from_slice(term);
924
925 let term_info = match self.term_dict.get_sync(&key)? {
927 Some(info) => info,
928 None => return Ok(None),
929 };
930
931 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
933 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
934 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
935 posting_list.push(doc_id, tf);
936 }
937 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
938 return Ok(Some(block_list));
939 }
940
941 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
943 Error::Corruption("TermInfo has neither inline nor external data".to_string())
944 })?;
945
946 let start = posting_offset;
947 let end = start + posting_len;
948
949 if end > self.postings_handle.len() {
950 return Err(Error::Corruption(
951 "Posting offset out of bounds".to_string(),
952 ));
953 }
954
955 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
956 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
957
958 Ok(Some(block_list))
959 }
960
961 pub fn get_positions_sync(
963 &self,
964 field: Field,
965 term: &[u8],
966 ) -> Result<Option<crate::structures::PositionPostingList>> {
967 let handle = match &self.positions_handle {
968 Some(h) => h,
969 None => return Ok(None),
970 };
971
972 let mut key = Vec::with_capacity(4 + term.len());
974 key.extend_from_slice(&field.0.to_le_bytes());
975 key.extend_from_slice(term);
976
977 let term_info = match self.term_dict.get_sync(&key)? {
979 Some(info) => info,
980 None => return Ok(None),
981 };
982
983 let (offset, length) = match term_info.position_info() {
984 Some((o, l)) => (o, l),
985 None => return Ok(None),
986 };
987
988 let slice = handle.slice(offset..offset + length);
989 let data = slice.read_bytes_sync()?;
990
991 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
992 Ok(Some(pos_list))
993 }
994
995 pub fn search_dense_vector_sync(
998 &self,
999 field: Field,
1000 query: &[f32],
1001 k: usize,
1002 nprobe: usize,
1003 rerank_factor: f32,
1004 combiner: crate::query::MultiValueCombiner,
1005 ) -> Result<Vec<VectorSearchResult>> {
1006 let ann_index = self.vector_indexes.get(&field.0);
1007 let lazy_flat = self.flat_vectors.get(&field.0);
1008
1009 if ann_index.is_none() && lazy_flat.is_none() {
1010 return Ok(Vec::new());
1011 }
1012
1013 let unit_norm = self
1014 .schema
1015 .get_field_entry(field)
1016 .and_then(|e| e.dense_vector_config.as_ref())
1017 .is_some_and(|c| c.unit_norm);
1018
1019 const BRUTE_FORCE_BATCH: usize = 4096;
1020 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1021
1022 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1023 match index {
1025 VectorIndex::RaBitQ(lazy) => {
1026 let rabitq = lazy.get().ok_or_else(|| {
1027 Error::Schema("RaBitQ index deserialization failed".to_string())
1028 })?;
1029 rabitq
1030 .search(query, fetch_k)
1031 .into_iter()
1032 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1033 .collect()
1034 }
1035 VectorIndex::IVF(lazy) => {
1036 let (index, codebook) = lazy.get().ok_or_else(|| {
1037 Error::Schema("IVF index deserialization failed".to_string())
1038 })?;
1039 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1040 Error::Schema(format!(
1041 "IVF index requires coarse centroids for field {}",
1042 field.0
1043 ))
1044 })?;
1045 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1046 index
1047 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1048 .into_iter()
1049 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1050 .collect()
1051 }
1052 VectorIndex::ScaNN(lazy) => {
1053 let (index, codebook) = lazy.get().ok_or_else(|| {
1054 Error::Schema("ScaNN index deserialization failed".to_string())
1055 })?;
1056 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1057 Error::Schema(format!(
1058 "ScaNN index requires coarse centroids for field {}",
1059 field.0
1060 ))
1061 })?;
1062 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1063 index
1064 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1065 .into_iter()
1066 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1067 .collect()
1068 }
1069 }
1070 } else if let Some(lazy_flat) = lazy_flat {
1071 let dim = lazy_flat.dim;
1073 let n = lazy_flat.num_vectors;
1074 let quant = lazy_flat.quantization;
1075 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1076 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1077
1078 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1079 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1080 let batch_bytes = lazy_flat
1081 .read_vectors_batch_sync(batch_start, batch_count)
1082 .map_err(crate::Error::Io)?;
1083 let raw = batch_bytes.as_slice();
1084
1085 Self::score_quantized_batch(
1086 query,
1087 raw,
1088 quant,
1089 dim,
1090 &mut scores[..batch_count],
1091 unit_norm,
1092 );
1093
1094 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1095 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1096 collector.insert_with_ordinal(doc_id, score, ordinal);
1097 }
1098 }
1099
1100 collector
1101 .into_sorted_results()
1102 .into_iter()
1103 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1104 .collect()
1105 } else {
1106 return Ok(Vec::new());
1107 };
1108
1109 if ann_index.is_some()
1111 && !results.is_empty()
1112 && let Some(lazy_flat) = lazy_flat
1113 {
1114 let dim = lazy_flat.dim;
1115 let quant = lazy_flat.quantization;
1116 let vbs = lazy_flat.vector_byte_size();
1117
1118 let mut resolved: Vec<(usize, usize)> = Vec::new();
1119 for (ri, c) in results.iter().enumerate() {
1120 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1121 for (j, &(_, ord)) in entries.iter().enumerate() {
1122 if ord == c.1 {
1123 resolved.push((ri, start + j));
1124 break;
1125 }
1126 }
1127 }
1128
1129 if !resolved.is_empty() {
1130 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1131 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1132 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1133 let _ = lazy_flat.read_vector_raw_into_sync(
1134 flat_idx,
1135 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1136 );
1137 }
1138
1139 let mut scores = vec![0f32; resolved.len()];
1140 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1141
1142 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1143 results[ri].2 = scores[buf_idx];
1144 }
1145 }
1146
1147 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1148 results.truncate(fetch_k);
1149 }
1150
1151 Ok(combine_ordinal_results(results, combiner, k))
1152 }
1153}