1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56pub(crate) fn combine_ordinal_results(
62 raw: impl IntoIterator<Item = (u32, u16, f32)>,
63 combiner: crate::query::MultiValueCombiner,
64 limit: usize,
65) -> Vec<VectorSearchResult> {
66 let collected: Vec<(u32, u16, f32)> = raw.into_iter().collect();
67
68 let num_raw = collected.len();
69 let unique_docs = {
70 let mut ids: Vec<u32> = collected.iter().map(|(d, _, _)| *d).collect();
71 ids.sort_unstable();
72 ids.dedup();
73 ids.len()
74 };
75 log::debug!(
76 "combine_ordinal_results: {} raw entries, {} unique docs, combiner={:?}, limit={}",
77 num_raw,
78 unique_docs,
79 combiner,
80 limit
81 );
82
83 let all_single = collected.iter().all(|&(_, ord, _)| ord == 0);
85 if all_single {
86 let mut results: Vec<VectorSearchResult> = collected
87 .into_iter()
88 .map(|(doc_id, _, score)| VectorSearchResult::new(doc_id, score, vec![(0, score)]))
89 .collect();
90 results.sort_unstable_by(|a, b| {
91 b.score
92 .partial_cmp(&a.score)
93 .unwrap_or(std::cmp::Ordering::Equal)
94 });
95 results.truncate(limit);
96 return results;
97 }
98
99 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
101 rustc_hash::FxHashMap::default();
102 for (doc_id, ordinal, score) in collected {
103 doc_ordinals
104 .entry(doc_id as DocId)
105 .or_default()
106 .push((ordinal as u32, score));
107 }
108 let mut results: Vec<VectorSearchResult> = doc_ordinals
109 .into_iter()
110 .map(|(doc_id, ordinals)| {
111 let combined_score = combiner.combine(&ordinals);
112 VectorSearchResult::new(doc_id, combined_score, ordinals)
113 })
114 .collect();
115 results.sort_unstable_by(|a, b| {
116 b.score
117 .partial_cmp(&a.score)
118 .unwrap_or(std::cmp::Ordering::Equal)
119 });
120 results.truncate(limit);
121 results
122}
123
124pub struct SegmentReader {
130 meta: SegmentMeta,
131 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
133 postings_handle: FileHandle,
135 store: Arc<AsyncStoreReader>,
137 schema: Arc<Schema>,
138 vector_indexes: FxHashMap<u32, VectorIndex>,
140 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
142 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
144 sparse_indexes: FxHashMap<u32, SparseIndex>,
146 positions_handle: Option<FileHandle>,
148 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
150}
151
152impl SegmentReader {
153 pub async fn open<D: Directory>(
155 dir: &D,
156 segment_id: SegmentId,
157 schema: Arc<Schema>,
158 cache_blocks: usize,
159 ) -> Result<Self> {
160 let files = SegmentFiles::new(segment_id.0);
161
162 let meta_slice = dir.open_read(&files.meta).await?;
164 let meta_bytes = meta_slice.read_bytes().await?;
165 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
166 debug_assert_eq!(meta.id, segment_id.0);
167
168 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
170 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
171
172 let postings_handle = dir.open_lazy(&files.postings).await?;
174
175 let store_handle = dir.open_lazy(&files.store).await?;
177 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
178
179 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
181 let vector_indexes = vectors_data.indexes;
182 let flat_vectors = vectors_data.flat_vectors;
183
184 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
186
187 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
189
190 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
192
193 {
195 let mut parts = vec![format!(
196 "[segment] loaded {:016x}: docs={}",
197 segment_id.0, meta.num_docs
198 )];
199 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
200 parts.push(format!(
201 "dense: {} ann + {} flat fields",
202 vector_indexes.len(),
203 flat_vectors.len()
204 ));
205 }
206 for (field_id, idx) in &sparse_indexes {
207 parts.push(format!(
208 "sparse field {}: {} dims, ~{:.1} KB",
209 field_id,
210 idx.num_dimensions(),
211 idx.num_dimensions() as f64 * 24.0 / 1024.0
212 ));
213 }
214 if !fast_fields.is_empty() {
215 parts.push(format!("fast: {} fields", fast_fields.len()));
216 }
217 log::debug!("{}", parts.join(", "));
218 }
219
220 Ok(Self {
221 meta,
222 term_dict: Arc::new(term_dict),
223 postings_handle,
224 store: Arc::new(store),
225 schema,
226 vector_indexes,
227 flat_vectors,
228 coarse_centroids: FxHashMap::default(),
229 sparse_indexes,
230 positions_handle,
231 fast_fields,
232 })
233 }
234
235 pub fn meta(&self) -> &SegmentMeta {
236 &self.meta
237 }
238
239 pub fn num_docs(&self) -> u32 {
240 self.meta.num_docs
241 }
242
243 pub fn avg_field_len(&self, field: Field) -> f32 {
245 self.meta.avg_field_len(field)
246 }
247
248 pub fn schema(&self) -> &Schema {
249 &self.schema
250 }
251
252 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
254 &self.sparse_indexes
255 }
256
257 pub fn sparse_index(&self, field: Field) -> Option<&SparseIndex> {
259 self.sparse_indexes.get(&field.0)
260 }
261
262 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
264 &self.vector_indexes
265 }
266
267 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
269 &self.flat_vectors
270 }
271
272 pub fn fast_field(
274 &self,
275 field_id: u32,
276 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
277 self.fast_fields.get(&field_id)
278 }
279
280 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
282 &self.fast_fields
283 }
284
285 pub fn term_dict_stats(&self) -> SSTableStats {
287 self.term_dict.stats()
288 }
289
290 pub fn memory_stats(&self) -> SegmentMemoryStats {
292 let term_dict_stats = self.term_dict.stats();
293
294 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
296
297 let store_cache_bytes = self.store.cached_blocks() * 4096;
299
300 let sparse_index_bytes: usize = self
302 .sparse_indexes
303 .values()
304 .map(|s| s.estimated_memory_bytes())
305 .sum();
306
307 let dense_index_bytes: usize = self
310 .vector_indexes
311 .values()
312 .map(|v| v.estimated_memory_bytes())
313 .sum();
314
315 SegmentMemoryStats {
316 segment_id: self.meta.id,
317 num_docs: self.meta.num_docs,
318 term_dict_cache_bytes,
319 store_cache_bytes,
320 sparse_index_bytes,
321 dense_index_bytes,
322 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
323 }
324 }
325
326 pub async fn get_postings(
331 &self,
332 field: Field,
333 term: &[u8],
334 ) -> Result<Option<BlockPostingList>> {
335 log::debug!(
336 "SegmentReader::get_postings field={} term_len={}",
337 field.0,
338 term.len()
339 );
340
341 let mut key = Vec::with_capacity(4 + term.len());
343 key.extend_from_slice(&field.0.to_le_bytes());
344 key.extend_from_slice(term);
345
346 let term_info = match self.term_dict.get(&key).await? {
348 Some(info) => {
349 log::debug!("SegmentReader::get_postings found term_info");
350 info
351 }
352 None => {
353 log::debug!("SegmentReader::get_postings term not found");
354 return Ok(None);
355 }
356 };
357
358 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
360 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
362 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
363 posting_list.push(doc_id, tf);
364 }
365 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
366 return Ok(Some(block_list));
367 }
368
369 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
371 Error::Corruption("TermInfo has neither inline nor external data".to_string())
372 })?;
373
374 let start = posting_offset;
375 let end = start + posting_len;
376
377 if end > self.postings_handle.len() {
378 return Err(Error::Corruption(
379 "Posting offset out of bounds".to_string(),
380 ));
381 }
382
383 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
384 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
385
386 Ok(Some(block_list))
387 }
388
389 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
394 self.doc_with_fields(local_doc_id, None).await
395 }
396
397 pub async fn doc_with_fields(
403 &self,
404 local_doc_id: DocId,
405 fields: Option<&rustc_hash::FxHashSet<u32>>,
406 ) -> Result<Option<Document>> {
407 let mut doc = match fields {
408 Some(set) => {
409 let field_ids: Vec<u32> = set.iter().copied().collect();
410 match self
411 .store
412 .get_fields(local_doc_id, &self.schema, &field_ids)
413 .await
414 {
415 Ok(Some(d)) => d,
416 Ok(None) => return Ok(None),
417 Err(e) => return Err(Error::from(e)),
418 }
419 }
420 None => match self.store.get(local_doc_id, &self.schema).await {
421 Ok(Some(d)) => d,
422 Ok(None) => return Ok(None),
423 Err(e) => return Err(Error::from(e)),
424 },
425 };
426
427 for (&field_id, lazy_flat) in &self.flat_vectors {
429 if let Some(set) = fields
431 && !set.contains(&field_id)
432 {
433 continue;
434 }
435
436 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
437 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
438 let flat_idx = start + j;
439 match lazy_flat.get_vector(flat_idx).await {
440 Ok(vec) => {
441 doc.add_dense_vector(Field(field_id), vec);
442 }
443 Err(e) => {
444 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
445 }
446 }
447 }
448 }
449
450 Ok(Some(doc))
451 }
452
453 pub async fn prefetch_terms(
455 &self,
456 field: Field,
457 start_term: &[u8],
458 end_term: &[u8],
459 ) -> Result<()> {
460 let mut start_key = Vec::with_capacity(4 + start_term.len());
461 start_key.extend_from_slice(&field.0.to_le_bytes());
462 start_key.extend_from_slice(start_term);
463
464 let mut end_key = Vec::with_capacity(4 + end_term.len());
465 end_key.extend_from_slice(&field.0.to_le_bytes());
466 end_key.extend_from_slice(end_term);
467
468 self.term_dict.prefetch_range(&start_key, &end_key).await?;
469 Ok(())
470 }
471
472 pub fn store_has_dict(&self) -> bool {
474 self.store.has_dict()
475 }
476
477 pub fn store(&self) -> &super::store::AsyncStoreReader {
479 &self.store
480 }
481
482 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
484 self.store.raw_blocks()
485 }
486
487 pub fn store_data_slice(&self) -> &FileHandle {
489 self.store.data_slice()
490 }
491
492 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
494 self.term_dict.all_entries().await.map_err(Error::from)
495 }
496
497 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
502 let entries = self.term_dict.all_entries().await?;
503 let mut result = Vec::with_capacity(entries.len());
504
505 for (key, term_info) in entries {
506 if key.len() > 4 {
508 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
509 let term_bytes = &key[4..];
510 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
511 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
512 }
513 }
514 }
515
516 Ok(result)
517 }
518
519 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
521 self.term_dict.iter()
522 }
523
524 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
528 self.term_dict
529 .prefetch_all_data_bulk()
530 .await
531 .map_err(crate::Error::from)
532 }
533
534 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
536 let start = offset;
537 let end = start + len;
538 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
539 Ok(bytes.to_vec())
540 }
541
542 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
544 let handle = match &self.positions_handle {
545 Some(h) => h,
546 None => return Ok(None),
547 };
548 let start = offset;
549 let end = start + len;
550 let bytes = handle.read_bytes_range(start..end).await?;
551 Ok(Some(bytes.to_vec()))
552 }
553
554 pub fn has_positions_file(&self) -> bool {
556 self.positions_handle.is_some()
557 }
558
559 fn score_quantized_batch(
565 query: &[f32],
566 raw: &[u8],
567 quant: crate::dsl::DenseVectorQuantization,
568 dim: usize,
569 scores: &mut [f32],
570 unit_norm: bool,
571 ) {
572 use crate::dsl::DenseVectorQuantization;
573 use crate::structures::simd;
574 match (quant, unit_norm) {
575 (DenseVectorQuantization::F32, false) => {
576 let num_floats = scores.len() * dim;
577 debug_assert!(
578 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
579 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
580 );
581 let vectors: &[f32] =
582 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
583 simd::batch_cosine_scores(query, vectors, dim, scores);
584 }
585 (DenseVectorQuantization::F32, true) => {
586 let num_floats = scores.len() * dim;
587 debug_assert!(
588 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
589 "f32 vector data not 4-byte aligned"
590 );
591 let vectors: &[f32] =
592 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
593 simd::batch_dot_scores(query, vectors, dim, scores);
594 }
595 (DenseVectorQuantization::F16, false) => {
596 simd::batch_cosine_scores_f16(query, raw, dim, scores);
597 }
598 (DenseVectorQuantization::F16, true) => {
599 simd::batch_dot_scores_f16(query, raw, dim, scores);
600 }
601 (DenseVectorQuantization::UInt8, false) => {
602 simd::batch_cosine_scores_u8(query, raw, dim, scores);
603 }
604 (DenseVectorQuantization::UInt8, true) => {
605 simd::batch_dot_scores_u8(query, raw, dim, scores);
606 }
607 }
608 }
609
610 pub async fn search_dense_vector(
616 &self,
617 field: Field,
618 query: &[f32],
619 k: usize,
620 nprobe: usize,
621 rerank_factor: f32,
622 combiner: crate::query::MultiValueCombiner,
623 ) -> Result<Vec<VectorSearchResult>> {
624 let ann_index = self.vector_indexes.get(&field.0);
625 let lazy_flat = self.flat_vectors.get(&field.0);
626
627 if ann_index.is_none() && lazy_flat.is_none() {
629 return Ok(Vec::new());
630 }
631
632 let unit_norm = self
634 .schema
635 .get_field_entry(field)
636 .and_then(|e| e.dense_vector_config.as_ref())
637 .is_some_and(|c| c.unit_norm);
638
639 const BRUTE_FORCE_BATCH: usize = 4096;
641
642 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
643
644 let t0 = std::time::Instant::now();
646 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
647 match index {
649 VectorIndex::RaBitQ(lazy) => {
650 let rabitq = lazy.get().ok_or_else(|| {
651 Error::Schema("RaBitQ index deserialization failed".to_string())
652 })?;
653 rabitq
654 .search(query, fetch_k)
655 .into_iter()
656 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
657 .collect()
658 }
659 VectorIndex::IVF(lazy) => {
660 let (index, codebook) = lazy.get().ok_or_else(|| {
661 Error::Schema("IVF index deserialization failed".to_string())
662 })?;
663 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
664 Error::Schema(format!(
665 "IVF index requires coarse centroids for field {}",
666 field.0
667 ))
668 })?;
669 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
670 index
671 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
672 .into_iter()
673 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
674 .collect()
675 }
676 VectorIndex::ScaNN(lazy) => {
677 let (index, codebook) = lazy.get().ok_or_else(|| {
678 Error::Schema("ScaNN index deserialization failed".to_string())
679 })?;
680 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
681 Error::Schema(format!(
682 "ScaNN index requires coarse centroids for field {}",
683 field.0
684 ))
685 })?;
686 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
687 index
688 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
689 .into_iter()
690 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
691 .collect()
692 }
693 }
694 } else if let Some(lazy_flat) = lazy_flat {
695 log::debug!(
698 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
699 field.0,
700 lazy_flat.num_vectors,
701 lazy_flat.dim,
702 lazy_flat.quantization
703 );
704 let dim = lazy_flat.dim;
705 let n = lazy_flat.num_vectors;
706 let quant = lazy_flat.quantization;
707 let mut collector = crate::query::ScoreCollector::new(fetch_k);
708 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
709
710 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
711 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
712 let batch_bytes = lazy_flat
713 .read_vectors_batch(batch_start, batch_count)
714 .await
715 .map_err(crate::Error::Io)?;
716 let raw = batch_bytes.as_slice();
717
718 Self::score_quantized_batch(
719 query,
720 raw,
721 quant,
722 dim,
723 &mut scores[..batch_count],
724 unit_norm,
725 );
726
727 for (i, &score) in scores.iter().enumerate().take(batch_count) {
728 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
729 collector.insert_with_ordinal(doc_id, score, ordinal);
730 }
731 }
732
733 collector
734 .into_sorted_results()
735 .into_iter()
736 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
737 .collect()
738 } else {
739 return Ok(Vec::new());
740 };
741 let l1_elapsed = t0.elapsed();
742 log::debug!(
743 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
744 field.0,
745 results.len(),
746 l1_elapsed.as_secs_f64() * 1000.0
747 );
748
749 if ann_index.is_some()
752 && !results.is_empty()
753 && let Some(lazy_flat) = lazy_flat
754 {
755 let t_rerank = std::time::Instant::now();
756 let dim = lazy_flat.dim;
757 let quant = lazy_flat.quantization;
758 let vbs = lazy_flat.vector_byte_size();
759
760 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
763 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
764 for (j, &(_, ord)) in entries.iter().enumerate() {
765 if ord == c.1 {
766 resolved.push((ri, start + j));
767 break;
768 }
769 }
770 }
771
772 let t_resolve = t_rerank.elapsed();
773 if !resolved.is_empty() {
774 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
776
777 let t_read = std::time::Instant::now();
779 let mut raw_buf = vec![0u8; resolved.len() * vbs];
780 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
781 let _ = lazy_flat
782 .read_vector_raw_into(
783 flat_idx,
784 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
785 )
786 .await;
787 }
788
789 let read_elapsed = t_read.elapsed();
790
791 let t_score = std::time::Instant::now();
793 let mut scores = vec![0f32; resolved.len()];
794 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
795 let score_elapsed = t_score.elapsed();
796
797 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
799 results[ri].2 = scores[buf_idx];
800 }
801
802 log::debug!(
803 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
804 field.0,
805 resolved.len(),
806 dim,
807 quant,
808 vbs,
809 t_resolve.as_secs_f64() * 1000.0,
810 read_elapsed.as_secs_f64() * 1000.0,
811 score_elapsed.as_secs_f64() * 1000.0,
812 );
813 }
814
815 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
816 results.truncate(fetch_k);
817 log::debug!(
818 "[search_dense] field {}: rerank total={:.1}ms",
819 field.0,
820 t_rerank.elapsed().as_secs_f64() * 1000.0
821 );
822 }
823
824 Ok(combine_ordinal_results(results, combiner, k))
825 }
826
827 pub fn has_dense_vector_index(&self, field: Field) -> bool {
829 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
830 }
831
832 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
834 match self.vector_indexes.get(&field.0) {
835 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
836 _ => None,
837 }
838 }
839
840 pub fn get_ivf_vector_index(
842 &self,
843 field: Field,
844 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
845 match self.vector_indexes.get(&field.0) {
846 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
847 _ => None,
848 }
849 }
850
851 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
853 self.coarse_centroids.get(&field_id)
854 }
855
856 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
858 self.coarse_centroids = centroids;
859 }
860
861 pub fn get_scann_vector_index(
863 &self,
864 field: Field,
865 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
866 match self.vector_indexes.get(&field.0) {
867 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
868 _ => None,
869 }
870 }
871
872 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
874 self.vector_indexes.get(&field.0)
875 }
876
877 pub async fn get_positions(
882 &self,
883 field: Field,
884 term: &[u8],
885 ) -> Result<Option<crate::structures::PositionPostingList>> {
886 let handle = match &self.positions_handle {
888 Some(h) => h,
889 None => return Ok(None),
890 };
891
892 let mut key = Vec::with_capacity(4 + term.len());
894 key.extend_from_slice(&field.0.to_le_bytes());
895 key.extend_from_slice(term);
896
897 let term_info = match self.term_dict.get(&key).await? {
899 Some(info) => info,
900 None => return Ok(None),
901 };
902
903 let (offset, length) = match term_info.position_info() {
905 Some((o, l)) => (o, l),
906 None => return Ok(None),
907 };
908
909 let slice = handle.slice(offset..offset + length);
911 let data = slice.read_bytes().await?;
912
913 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
915
916 Ok(Some(pos_list))
917 }
918
919 pub fn has_positions(&self, field: Field) -> bool {
921 if let Some(entry) = self.schema.get_field_entry(field) {
923 entry.positions.is_some()
924 } else {
925 false
926 }
927 }
928}
929
930#[cfg(feature = "sync")]
932impl SegmentReader {
933 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
935 let mut key = Vec::with_capacity(4 + term.len());
937 key.extend_from_slice(&field.0.to_le_bytes());
938 key.extend_from_slice(term);
939
940 let term_info = match self.term_dict.get_sync(&key)? {
942 Some(info) => info,
943 None => return Ok(None),
944 };
945
946 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
948 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
949 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
950 posting_list.push(doc_id, tf);
951 }
952 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
953 return Ok(Some(block_list));
954 }
955
956 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
958 Error::Corruption("TermInfo has neither inline nor external data".to_string())
959 })?;
960
961 let start = posting_offset;
962 let end = start + posting_len;
963
964 if end > self.postings_handle.len() {
965 return Err(Error::Corruption(
966 "Posting offset out of bounds".to_string(),
967 ));
968 }
969
970 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
971 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
972
973 Ok(Some(block_list))
974 }
975
976 pub fn get_positions_sync(
978 &self,
979 field: Field,
980 term: &[u8],
981 ) -> Result<Option<crate::structures::PositionPostingList>> {
982 let handle = match &self.positions_handle {
983 Some(h) => h,
984 None => return Ok(None),
985 };
986
987 let mut key = Vec::with_capacity(4 + term.len());
989 key.extend_from_slice(&field.0.to_le_bytes());
990 key.extend_from_slice(term);
991
992 let term_info = match self.term_dict.get_sync(&key)? {
994 Some(info) => info,
995 None => return Ok(None),
996 };
997
998 let (offset, length) = match term_info.position_info() {
999 Some((o, l)) => (o, l),
1000 None => return Ok(None),
1001 };
1002
1003 let slice = handle.slice(offset..offset + length);
1004 let data = slice.read_bytes_sync()?;
1005
1006 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1007 Ok(Some(pos_list))
1008 }
1009
1010 pub fn search_dense_vector_sync(
1013 &self,
1014 field: Field,
1015 query: &[f32],
1016 k: usize,
1017 nprobe: usize,
1018 rerank_factor: f32,
1019 combiner: crate::query::MultiValueCombiner,
1020 ) -> Result<Vec<VectorSearchResult>> {
1021 let ann_index = self.vector_indexes.get(&field.0);
1022 let lazy_flat = self.flat_vectors.get(&field.0);
1023
1024 if ann_index.is_none() && lazy_flat.is_none() {
1025 return Ok(Vec::new());
1026 }
1027
1028 let unit_norm = self
1029 .schema
1030 .get_field_entry(field)
1031 .and_then(|e| e.dense_vector_config.as_ref())
1032 .is_some_and(|c| c.unit_norm);
1033
1034 const BRUTE_FORCE_BATCH: usize = 4096;
1035 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1036
1037 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1038 match index {
1040 VectorIndex::RaBitQ(lazy) => {
1041 let rabitq = lazy.get().ok_or_else(|| {
1042 Error::Schema("RaBitQ index deserialization failed".to_string())
1043 })?;
1044 rabitq
1045 .search(query, fetch_k)
1046 .into_iter()
1047 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1048 .collect()
1049 }
1050 VectorIndex::IVF(lazy) => {
1051 let (index, codebook) = lazy.get().ok_or_else(|| {
1052 Error::Schema("IVF index deserialization failed".to_string())
1053 })?;
1054 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1055 Error::Schema(format!(
1056 "IVF index requires coarse centroids for field {}",
1057 field.0
1058 ))
1059 })?;
1060 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1061 index
1062 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1063 .into_iter()
1064 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1065 .collect()
1066 }
1067 VectorIndex::ScaNN(lazy) => {
1068 let (index, codebook) = lazy.get().ok_or_else(|| {
1069 Error::Schema("ScaNN index deserialization failed".to_string())
1070 })?;
1071 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1072 Error::Schema(format!(
1073 "ScaNN index requires coarse centroids for field {}",
1074 field.0
1075 ))
1076 })?;
1077 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1078 index
1079 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1080 .into_iter()
1081 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1082 .collect()
1083 }
1084 }
1085 } else if let Some(lazy_flat) = lazy_flat {
1086 let dim = lazy_flat.dim;
1088 let n = lazy_flat.num_vectors;
1089 let quant = lazy_flat.quantization;
1090 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1091 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1092
1093 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1094 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1095 let batch_bytes = lazy_flat
1096 .read_vectors_batch_sync(batch_start, batch_count)
1097 .map_err(crate::Error::Io)?;
1098 let raw = batch_bytes.as_slice();
1099
1100 Self::score_quantized_batch(
1101 query,
1102 raw,
1103 quant,
1104 dim,
1105 &mut scores[..batch_count],
1106 unit_norm,
1107 );
1108
1109 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1110 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1111 collector.insert_with_ordinal(doc_id, score, ordinal);
1112 }
1113 }
1114
1115 collector
1116 .into_sorted_results()
1117 .into_iter()
1118 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1119 .collect()
1120 } else {
1121 return Ok(Vec::new());
1122 };
1123
1124 if ann_index.is_some()
1126 && !results.is_empty()
1127 && let Some(lazy_flat) = lazy_flat
1128 {
1129 let dim = lazy_flat.dim;
1130 let quant = lazy_flat.quantization;
1131 let vbs = lazy_flat.vector_byte_size();
1132
1133 let mut resolved: Vec<(usize, usize)> = Vec::new();
1134 for (ri, c) in results.iter().enumerate() {
1135 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1136 for (j, &(_, ord)) in entries.iter().enumerate() {
1137 if ord == c.1 {
1138 resolved.push((ri, start + j));
1139 break;
1140 }
1141 }
1142 }
1143
1144 if !resolved.is_empty() {
1145 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1146 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1147 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1148 let _ = lazy_flat.read_vector_raw_into_sync(
1149 flat_idx,
1150 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1151 );
1152 }
1153
1154 let mut scores = vec![0f32; resolved.len()];
1155 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1156
1157 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1158 results[ri].2 = scores[buf_idx];
1159 }
1160 }
1161
1162 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1163 results.truncate(fetch_k);
1164 }
1165
1166 Ok(combine_ordinal_results(results, combiner, k))
1167 }
1168}