1mod loader;
4mod types;
5
6#[cfg(feature = "diagnostics")]
7pub use types::DimRawData;
8pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
9
10#[derive(Debug, Clone, Default)]
12pub struct SegmentMemoryStats {
13 pub segment_id: u128,
15 pub num_docs: u32,
17 pub term_dict_cache_bytes: usize,
19 pub store_cache_bytes: usize,
21 pub sparse_index_bytes: usize,
23 pub dense_index_bytes: usize,
25 pub bloom_filter_bytes: usize,
27}
28
29impl SegmentMemoryStats {
30 pub fn total_bytes(&self) -> usize {
32 self.term_dict_cache_bytes
33 + self.store_cache_bytes
34 + self.sparse_index_bytes
35 + self.dense_index_bytes
36 + self.bloom_filter_bytes
37 }
38}
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use super::vector_data::LazyFlatVectorData;
45use crate::directories::{Directory, FileHandle};
46use crate::dsl::{Document, Field, Schema};
47use crate::structures::{
48 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
49 RaBitQIndex, SSTableStats, TermInfo,
50};
51use crate::{DocId, Error, Result};
52
53use super::store::{AsyncStoreReader, RawStoreBlock};
54use super::types::{SegmentFiles, SegmentId, SegmentMeta};
55
56fn combine_ordinal_results(
59 raw: impl IntoIterator<Item = (u32, u16, f32)>,
60 combiner: crate::query::MultiValueCombiner,
61 limit: usize,
62) -> Vec<VectorSearchResult> {
63 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
64 rustc_hash::FxHashMap::default();
65 for (doc_id, ordinal, score) in raw {
66 doc_ordinals
67 .entry(doc_id as DocId)
68 .or_default()
69 .push((ordinal as u32, score));
70 }
71 let mut results: Vec<VectorSearchResult> = doc_ordinals
72 .into_iter()
73 .map(|(doc_id, ordinals)| {
74 let combined_score = combiner.combine(&ordinals);
75 VectorSearchResult::new(doc_id, combined_score, ordinals)
76 })
77 .collect();
78 results.sort_by(|a, b| {
79 b.score
80 .partial_cmp(&a.score)
81 .unwrap_or(std::cmp::Ordering::Equal)
82 });
83 results.truncate(limit);
84 results
85}
86
87pub struct SegmentReader {
93 meta: SegmentMeta,
94 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
96 postings_handle: FileHandle,
98 store: Arc<AsyncStoreReader>,
100 schema: Arc<Schema>,
101 vector_indexes: FxHashMap<u32, VectorIndex>,
103 flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
105 coarse_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107 sparse_indexes: FxHashMap<u32, SparseIndex>,
109 positions_handle: Option<FileHandle>,
111 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldReader>,
113}
114
115impl SegmentReader {
116 pub async fn open<D: Directory>(
118 dir: &D,
119 segment_id: SegmentId,
120 schema: Arc<Schema>,
121 cache_blocks: usize,
122 ) -> Result<Self> {
123 let files = SegmentFiles::new(segment_id.0);
124
125 let meta_slice = dir.open_read(&files.meta).await?;
127 let meta_bytes = meta_slice.read_bytes().await?;
128 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
129 debug_assert_eq!(meta.id, segment_id.0);
130
131 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
133 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
134
135 let postings_handle = dir.open_lazy(&files.postings).await?;
137
138 let store_handle = dir.open_lazy(&files.store).await?;
140 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
141
142 let vectors_data = loader::load_vectors_file(dir, &files, &schema).await?;
144 let vector_indexes = vectors_data.indexes;
145 let flat_vectors = vectors_data.flat_vectors;
146
147 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
149
150 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
152
153 let fast_fields = loader::load_fast_fields_file(dir, &files, &schema).await?;
155
156 {
158 let mut parts = vec![format!(
159 "[segment] loaded {:016x}: docs={}",
160 segment_id.0, meta.num_docs
161 )];
162 if !vector_indexes.is_empty() || !flat_vectors.is_empty() {
163 parts.push(format!(
164 "dense: {} ann + {} flat fields",
165 vector_indexes.len(),
166 flat_vectors.len()
167 ));
168 }
169 for (field_id, idx) in &sparse_indexes {
170 parts.push(format!(
171 "sparse field {}: {} dims, ~{:.1} KB",
172 field_id,
173 idx.num_dimensions(),
174 idx.num_dimensions() as f64 * 24.0 / 1024.0
175 ));
176 }
177 if !fast_fields.is_empty() {
178 parts.push(format!("fast: {} fields", fast_fields.len()));
179 }
180 log::debug!("{}", parts.join(", "));
181 }
182
183 Ok(Self {
184 meta,
185 term_dict: Arc::new(term_dict),
186 postings_handle,
187 store: Arc::new(store),
188 schema,
189 vector_indexes,
190 flat_vectors,
191 coarse_centroids: FxHashMap::default(),
192 sparse_indexes,
193 positions_handle,
194 fast_fields,
195 })
196 }
197
198 pub fn meta(&self) -> &SegmentMeta {
199 &self.meta
200 }
201
202 pub fn num_docs(&self) -> u32 {
203 self.meta.num_docs
204 }
205
206 pub fn avg_field_len(&self, field: Field) -> f32 {
208 self.meta.avg_field_len(field)
209 }
210
211 pub fn schema(&self) -> &Schema {
212 &self.schema
213 }
214
215 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
217 &self.sparse_indexes
218 }
219
220 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
222 &self.vector_indexes
223 }
224
225 pub fn flat_vectors(&self) -> &FxHashMap<u32, LazyFlatVectorData> {
227 &self.flat_vectors
228 }
229
230 pub fn fast_field(
232 &self,
233 field_id: u32,
234 ) -> Option<&crate::structures::fast_field::FastFieldReader> {
235 self.fast_fields.get(&field_id)
236 }
237
238 pub fn fast_fields(&self) -> &FxHashMap<u32, crate::structures::fast_field::FastFieldReader> {
240 &self.fast_fields
241 }
242
243 pub fn term_dict_stats(&self) -> SSTableStats {
245 self.term_dict.stats()
246 }
247
248 pub fn memory_stats(&self) -> SegmentMemoryStats {
250 let term_dict_stats = self.term_dict.stats();
251
252 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
254
255 let store_cache_bytes = self.store.cached_blocks() * 4096;
257
258 let sparse_index_bytes: usize = self
260 .sparse_indexes
261 .values()
262 .map(|s| s.estimated_memory_bytes())
263 .sum();
264
265 let dense_index_bytes: usize = self
268 .vector_indexes
269 .values()
270 .map(|v| v.estimated_memory_bytes())
271 .sum();
272
273 SegmentMemoryStats {
274 segment_id: self.meta.id,
275 num_docs: self.meta.num_docs,
276 term_dict_cache_bytes,
277 store_cache_bytes,
278 sparse_index_bytes,
279 dense_index_bytes,
280 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
281 }
282 }
283
284 pub async fn get_postings(
289 &self,
290 field: Field,
291 term: &[u8],
292 ) -> Result<Option<BlockPostingList>> {
293 log::debug!(
294 "SegmentReader::get_postings field={} term_len={}",
295 field.0,
296 term.len()
297 );
298
299 let mut key = Vec::with_capacity(4 + term.len());
301 key.extend_from_slice(&field.0.to_le_bytes());
302 key.extend_from_slice(term);
303
304 let term_info = match self.term_dict.get(&key).await? {
306 Some(info) => {
307 log::debug!("SegmentReader::get_postings found term_info");
308 info
309 }
310 None => {
311 log::debug!("SegmentReader::get_postings term not found");
312 return Ok(None);
313 }
314 };
315
316 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
318 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
320 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
321 posting_list.push(doc_id, tf);
322 }
323 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
324 return Ok(Some(block_list));
325 }
326
327 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
329 Error::Corruption("TermInfo has neither inline nor external data".to_string())
330 })?;
331
332 let start = posting_offset;
333 let end = start + posting_len;
334
335 if end > self.postings_handle.len() {
336 return Err(Error::Corruption(
337 "Posting offset out of bounds".to_string(),
338 ));
339 }
340
341 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
342 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
343
344 Ok(Some(block_list))
345 }
346
347 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
352 self.doc_with_fields(local_doc_id, None).await
353 }
354
355 pub async fn doc_with_fields(
361 &self,
362 local_doc_id: DocId,
363 fields: Option<&rustc_hash::FxHashSet<u32>>,
364 ) -> Result<Option<Document>> {
365 let mut doc = match fields {
366 Some(set) => {
367 let field_ids: Vec<u32> = set.iter().copied().collect();
368 match self
369 .store
370 .get_fields(local_doc_id, &self.schema, &field_ids)
371 .await
372 {
373 Ok(Some(d)) => d,
374 Ok(None) => return Ok(None),
375 Err(e) => return Err(Error::from(e)),
376 }
377 }
378 None => match self.store.get(local_doc_id, &self.schema).await {
379 Ok(Some(d)) => d,
380 Ok(None) => return Ok(None),
381 Err(e) => return Err(Error::from(e)),
382 },
383 };
384
385 for (&field_id, lazy_flat) in &self.flat_vectors {
387 if let Some(set) = fields
389 && !set.contains(&field_id)
390 {
391 continue;
392 }
393
394 let (start, entries) = lazy_flat.flat_indexes_for_doc(local_doc_id);
395 for (j, &(_doc_id, _ordinal)) in entries.iter().enumerate() {
396 let flat_idx = start + j;
397 match lazy_flat.get_vector(flat_idx).await {
398 Ok(vec) => {
399 doc.add_dense_vector(Field(field_id), vec);
400 }
401 Err(e) => {
402 log::warn!("Failed to hydrate vector field {}: {}", field_id, e);
403 }
404 }
405 }
406 }
407
408 Ok(Some(doc))
409 }
410
411 pub async fn prefetch_terms(
413 &self,
414 field: Field,
415 start_term: &[u8],
416 end_term: &[u8],
417 ) -> Result<()> {
418 let mut start_key = Vec::with_capacity(4 + start_term.len());
419 start_key.extend_from_slice(&field.0.to_le_bytes());
420 start_key.extend_from_slice(start_term);
421
422 let mut end_key = Vec::with_capacity(4 + end_term.len());
423 end_key.extend_from_slice(&field.0.to_le_bytes());
424 end_key.extend_from_slice(end_term);
425
426 self.term_dict.prefetch_range(&start_key, &end_key).await?;
427 Ok(())
428 }
429
430 pub fn store_has_dict(&self) -> bool {
432 self.store.has_dict()
433 }
434
435 pub fn store(&self) -> &super::store::AsyncStoreReader {
437 &self.store
438 }
439
440 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
442 self.store.raw_blocks()
443 }
444
445 pub fn store_data_slice(&self) -> &FileHandle {
447 self.store.data_slice()
448 }
449
450 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
452 self.term_dict.all_entries().await.map_err(Error::from)
453 }
454
455 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
460 let entries = self.term_dict.all_entries().await?;
461 let mut result = Vec::with_capacity(entries.len());
462
463 for (key, term_info) in entries {
464 if key.len() > 4 {
466 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
467 let term_bytes = &key[4..];
468 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
469 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
470 }
471 }
472 }
473
474 Ok(result)
475 }
476
477 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
479 self.term_dict.iter()
480 }
481
482 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
486 self.term_dict
487 .prefetch_all_data_bulk()
488 .await
489 .map_err(crate::Error::from)
490 }
491
492 pub async fn read_postings(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
494 let start = offset;
495 let end = start + len;
496 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
497 Ok(bytes.to_vec())
498 }
499
500 pub async fn read_position_bytes(&self, offset: u64, len: u64) -> Result<Option<Vec<u8>>> {
502 let handle = match &self.positions_handle {
503 Some(h) => h,
504 None => return Ok(None),
505 };
506 let start = offset;
507 let end = start + len;
508 let bytes = handle.read_bytes_range(start..end).await?;
509 Ok(Some(bytes.to_vec()))
510 }
511
512 pub fn has_positions_file(&self) -> bool {
514 self.positions_handle.is_some()
515 }
516
517 fn score_quantized_batch(
523 query: &[f32],
524 raw: &[u8],
525 quant: crate::dsl::DenseVectorQuantization,
526 dim: usize,
527 scores: &mut [f32],
528 unit_norm: bool,
529 ) {
530 use crate::dsl::DenseVectorQuantization;
531 use crate::structures::simd;
532 match (quant, unit_norm) {
533 (DenseVectorQuantization::F32, false) => {
534 let num_floats = scores.len() * dim;
535 debug_assert!(
536 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
537 "f32 vector data not 4-byte aligned — vectors file may use legacy format"
538 );
539 let vectors: &[f32] =
540 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
541 simd::batch_cosine_scores(query, vectors, dim, scores);
542 }
543 (DenseVectorQuantization::F32, true) => {
544 let num_floats = scores.len() * dim;
545 debug_assert!(
546 (raw.as_ptr() as usize).is_multiple_of(std::mem::align_of::<f32>()),
547 "f32 vector data not 4-byte aligned"
548 );
549 let vectors: &[f32] =
550 unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const f32, num_floats) };
551 simd::batch_dot_scores(query, vectors, dim, scores);
552 }
553 (DenseVectorQuantization::F16, false) => {
554 simd::batch_cosine_scores_f16(query, raw, dim, scores);
555 }
556 (DenseVectorQuantization::F16, true) => {
557 simd::batch_dot_scores_f16(query, raw, dim, scores);
558 }
559 (DenseVectorQuantization::UInt8, false) => {
560 simd::batch_cosine_scores_u8(query, raw, dim, scores);
561 }
562 (DenseVectorQuantization::UInt8, true) => {
563 simd::batch_dot_scores_u8(query, raw, dim, scores);
564 }
565 }
566 }
567
568 pub async fn search_dense_vector(
574 &self,
575 field: Field,
576 query: &[f32],
577 k: usize,
578 nprobe: usize,
579 rerank_factor: f32,
580 combiner: crate::query::MultiValueCombiner,
581 ) -> Result<Vec<VectorSearchResult>> {
582 let ann_index = self.vector_indexes.get(&field.0);
583 let lazy_flat = self.flat_vectors.get(&field.0);
584
585 if ann_index.is_none() && lazy_flat.is_none() {
587 return Ok(Vec::new());
588 }
589
590 let unit_norm = self
592 .schema
593 .get_field_entry(field)
594 .and_then(|e| e.dense_vector_config.as_ref())
595 .is_some_and(|c| c.unit_norm);
596
597 const BRUTE_FORCE_BATCH: usize = 4096;
599
600 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
601
602 let t0 = std::time::Instant::now();
604 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
605 match index {
607 VectorIndex::RaBitQ(lazy) => {
608 let rabitq = lazy.get().ok_or_else(|| {
609 Error::Schema("RaBitQ index deserialization failed".to_string())
610 })?;
611 rabitq
612 .search(query, fetch_k)
613 .into_iter()
614 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
615 .collect()
616 }
617 VectorIndex::IVF(lazy) => {
618 let (index, codebook) = lazy.get().ok_or_else(|| {
619 Error::Schema("IVF index deserialization failed".to_string())
620 })?;
621 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
622 Error::Schema(format!(
623 "IVF index requires coarse centroids for field {}",
624 field.0
625 ))
626 })?;
627 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
628 index
629 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
630 .into_iter()
631 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
632 .collect()
633 }
634 VectorIndex::ScaNN(lazy) => {
635 let (index, codebook) = lazy.get().ok_or_else(|| {
636 Error::Schema("ScaNN index deserialization failed".to_string())
637 })?;
638 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
639 Error::Schema(format!(
640 "ScaNN index requires coarse centroids for field {}",
641 field.0
642 ))
643 })?;
644 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
645 index
646 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
647 .into_iter()
648 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
649 .collect()
650 }
651 }
652 } else if let Some(lazy_flat) = lazy_flat {
653 log::debug!(
656 "[search_dense] field {}: brute-force on {} vectors (dim={}, quant={:?})",
657 field.0,
658 lazy_flat.num_vectors,
659 lazy_flat.dim,
660 lazy_flat.quantization
661 );
662 let dim = lazy_flat.dim;
663 let n = lazy_flat.num_vectors;
664 let quant = lazy_flat.quantization;
665 let mut collector = crate::query::ScoreCollector::new(fetch_k);
666 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
667
668 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
669 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
670 let batch_bytes = lazy_flat
671 .read_vectors_batch(batch_start, batch_count)
672 .await
673 .map_err(crate::Error::Io)?;
674 let raw = batch_bytes.as_slice();
675
676 Self::score_quantized_batch(
677 query,
678 raw,
679 quant,
680 dim,
681 &mut scores[..batch_count],
682 unit_norm,
683 );
684
685 for (i, &score) in scores.iter().enumerate().take(batch_count) {
686 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
687 collector.insert_with_ordinal(doc_id, score, ordinal);
688 }
689 }
690
691 collector
692 .into_sorted_results()
693 .into_iter()
694 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
695 .collect()
696 } else {
697 return Ok(Vec::new());
698 };
699 let l1_elapsed = t0.elapsed();
700 log::debug!(
701 "[search_dense] field {}: L1 returned {} candidates in {:.1}ms",
702 field.0,
703 results.len(),
704 l1_elapsed.as_secs_f64() * 1000.0
705 );
706
707 if ann_index.is_some()
710 && !results.is_empty()
711 && let Some(lazy_flat) = lazy_flat
712 {
713 let t_rerank = std::time::Instant::now();
714 let dim = lazy_flat.dim;
715 let quant = lazy_flat.quantization;
716 let vbs = lazy_flat.vector_byte_size();
717
718 let mut resolved: Vec<(usize, usize)> = Vec::new(); for (ri, c) in results.iter().enumerate() {
721 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
722 for (j, &(_, ord)) in entries.iter().enumerate() {
723 if ord == c.1 {
724 resolved.push((ri, start + j));
725 break;
726 }
727 }
728 }
729
730 let t_resolve = t_rerank.elapsed();
731 if !resolved.is_empty() {
732 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
734
735 let t_read = std::time::Instant::now();
737 let mut raw_buf = vec![0u8; resolved.len() * vbs];
738 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
739 let _ = lazy_flat
740 .read_vector_raw_into(
741 flat_idx,
742 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
743 )
744 .await;
745 }
746
747 let read_elapsed = t_read.elapsed();
748
749 let t_score = std::time::Instant::now();
751 let mut scores = vec![0f32; resolved.len()];
752 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
753 let score_elapsed = t_score.elapsed();
754
755 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
757 results[ri].2 = scores[buf_idx];
758 }
759
760 log::debug!(
761 "[search_dense] field {}: rerank {} vectors (dim={}, quant={:?}, {}B/vec): resolve={:.1}ms read={:.1}ms score={:.1}ms",
762 field.0,
763 resolved.len(),
764 dim,
765 quant,
766 vbs,
767 t_resolve.as_secs_f64() * 1000.0,
768 read_elapsed.as_secs_f64() * 1000.0,
769 score_elapsed.as_secs_f64() * 1000.0,
770 );
771 }
772
773 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
774 results.truncate(fetch_k);
775 log::debug!(
776 "[search_dense] field {}: rerank total={:.1}ms",
777 field.0,
778 t_rerank.elapsed().as_secs_f64() * 1000.0
779 );
780 }
781
782 Ok(combine_ordinal_results(results, combiner, k))
783 }
784
785 pub fn has_dense_vector_index(&self, field: Field) -> bool {
787 self.vector_indexes.contains_key(&field.0) || self.flat_vectors.contains_key(&field.0)
788 }
789
790 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
792 match self.vector_indexes.get(&field.0) {
793 Some(VectorIndex::RaBitQ(lazy)) => lazy.get().cloned(),
794 _ => None,
795 }
796 }
797
798 pub fn get_ivf_vector_index(
800 &self,
801 field: Field,
802 ) -> Option<(Arc<IVFRaBitQIndex>, Arc<crate::structures::RaBitQCodebook>)> {
803 match self.vector_indexes.get(&field.0) {
804 Some(VectorIndex::IVF(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
805 _ => None,
806 }
807 }
808
809 pub fn coarse_centroids(&self, field_id: u32) -> Option<&Arc<CoarseCentroids>> {
811 self.coarse_centroids.get(&field_id)
812 }
813
814 pub fn set_coarse_centroids(&mut self, centroids: FxHashMap<u32, Arc<CoarseCentroids>>) {
816 self.coarse_centroids = centroids;
817 }
818
819 pub fn get_scann_vector_index(
821 &self,
822 field: Field,
823 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
824 match self.vector_indexes.get(&field.0) {
825 Some(VectorIndex::ScaNN(lazy)) => lazy.get().map(|(i, c)| (i.clone(), c.clone())),
826 _ => None,
827 }
828 }
829
830 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
832 self.vector_indexes.get(&field.0)
833 }
834
835 pub async fn search_sparse_vector(
845 &self,
846 field: Field,
847 vector: &[(u32, f32)],
848 limit: usize,
849 combiner: crate::query::MultiValueCombiner,
850 heap_factor: f32,
851 ) -> Result<Vec<VectorSearchResult>> {
852 use crate::query::BmpExecutor;
853
854 let query_tokens = vector.len();
855
856 let sparse_index = match self.sparse_indexes.get(&field.0) {
858 Some(idx) => idx,
859 None => {
860 log::debug!(
861 "Sparse vector search: no index for field {}, returning empty",
862 field.0
863 );
864 return Ok(Vec::new());
865 }
866 };
867
868 let index_dimensions = sparse_index.num_dimensions();
869
870 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
872 let mut missing_count = 0usize;
873
874 for &(dim_id, query_weight) in vector {
875 if sparse_index.has_dimension(dim_id) {
876 matched_terms.push((dim_id, query_weight));
877 } else {
878 missing_count += 1;
879 }
880 }
881
882 log::debug!(
883 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
884 query_tokens,
885 matched_terms.len(),
886 missing_count,
887 index_dimensions
888 );
889
890 if matched_terms.is_empty() {
891 log::debug!("Sparse vector search: no matching tokens, returning empty");
892 return Ok(Vec::new());
893 }
894
895 let num_terms = matched_terms.len();
899 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
901 BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
903 .execute()
904 .await?
905 } else {
906 crate::query::MaxScoreExecutor::sparse(
908 sparse_index,
909 matched_terms,
910 over_fetch,
911 heap_factor,
912 )
913 .execute()
914 .await?
915 };
916
917 log::trace!(
918 "Sparse search returned {} raw results for segment {:016x}",
919 raw_results.len(),
920 self.meta.id
921 );
922 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
923 for r in raw_results.iter().take(5) {
924 log::trace!(
925 " Raw result: doc_id={}, score={:.4}, ordinal={}",
926 r.doc_id,
927 r.score,
928 r.ordinal
929 );
930 }
931 }
932
933 Ok(combine_ordinal_results(
934 raw_results
935 .into_iter()
936 .map(|r| (r.doc_id, r.ordinal, r.score)),
937 combiner,
938 limit,
939 ))
940 }
941
942 pub async fn get_positions(
947 &self,
948 field: Field,
949 term: &[u8],
950 ) -> Result<Option<crate::structures::PositionPostingList>> {
951 let handle = match &self.positions_handle {
953 Some(h) => h,
954 None => return Ok(None),
955 };
956
957 let mut key = Vec::with_capacity(4 + term.len());
959 key.extend_from_slice(&field.0.to_le_bytes());
960 key.extend_from_slice(term);
961
962 let term_info = match self.term_dict.get(&key).await? {
964 Some(info) => info,
965 None => return Ok(None),
966 };
967
968 let (offset, length) = match term_info.position_info() {
970 Some((o, l)) => (o, l),
971 None => return Ok(None),
972 };
973
974 let slice = handle.slice(offset..offset + length);
976 let data = slice.read_bytes().await?;
977
978 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
980
981 Ok(Some(pos_list))
982 }
983
984 pub fn has_positions(&self, field: Field) -> bool {
986 if let Some(entry) = self.schema.get_field_entry(field) {
988 entry.positions.is_some()
989 } else {
990 false
991 }
992 }
993}
994
995#[cfg(feature = "sync")]
997impl SegmentReader {
998 pub fn get_postings_sync(&self, field: Field, term: &[u8]) -> Result<Option<BlockPostingList>> {
1000 let mut key = Vec::with_capacity(4 + term.len());
1002 key.extend_from_slice(&field.0.to_le_bytes());
1003 key.extend_from_slice(term);
1004
1005 let term_info = match self.term_dict.get_sync(&key)? {
1007 Some(info) => info,
1008 None => return Ok(None),
1009 };
1010
1011 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
1013 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
1014 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
1015 posting_list.push(doc_id, tf);
1016 }
1017 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
1018 return Ok(Some(block_list));
1019 }
1020
1021 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
1023 Error::Corruption("TermInfo has neither inline nor external data".to_string())
1024 })?;
1025
1026 let start = posting_offset;
1027 let end = start + posting_len;
1028
1029 if end > self.postings_handle.len() {
1030 return Err(Error::Corruption(
1031 "Posting offset out of bounds".to_string(),
1032 ));
1033 }
1034
1035 let posting_bytes = self.postings_handle.read_bytes_range_sync(start..end)?;
1036 let block_list = BlockPostingList::deserialize_zero_copy(posting_bytes)?;
1037
1038 Ok(Some(block_list))
1039 }
1040
1041 pub fn get_positions_sync(
1043 &self,
1044 field: Field,
1045 term: &[u8],
1046 ) -> Result<Option<crate::structures::PositionPostingList>> {
1047 let handle = match &self.positions_handle {
1048 Some(h) => h,
1049 None => return Ok(None),
1050 };
1051
1052 let mut key = Vec::with_capacity(4 + term.len());
1054 key.extend_from_slice(&field.0.to_le_bytes());
1055 key.extend_from_slice(term);
1056
1057 let term_info = match self.term_dict.get_sync(&key)? {
1059 Some(info) => info,
1060 None => return Ok(None),
1061 };
1062
1063 let (offset, length) = match term_info.position_info() {
1064 Some((o, l)) => (o, l),
1065 None => return Ok(None),
1066 };
1067
1068 let slice = handle.slice(offset..offset + length);
1069 let data = slice.read_bytes_sync()?;
1070
1071 let pos_list = crate::structures::PositionPostingList::deserialize(data.as_slice())?;
1072 Ok(Some(pos_list))
1073 }
1074
1075 pub fn search_dense_vector_sync(
1078 &self,
1079 field: Field,
1080 query: &[f32],
1081 k: usize,
1082 nprobe: usize,
1083 rerank_factor: f32,
1084 combiner: crate::query::MultiValueCombiner,
1085 ) -> Result<Vec<VectorSearchResult>> {
1086 let ann_index = self.vector_indexes.get(&field.0);
1087 let lazy_flat = self.flat_vectors.get(&field.0);
1088
1089 if ann_index.is_none() && lazy_flat.is_none() {
1090 return Ok(Vec::new());
1091 }
1092
1093 let unit_norm = self
1094 .schema
1095 .get_field_entry(field)
1096 .and_then(|e| e.dense_vector_config.as_ref())
1097 .is_some_and(|c| c.unit_norm);
1098
1099 const BRUTE_FORCE_BATCH: usize = 4096;
1100 let fetch_k = (k as f32 * rerank_factor.max(1.0)).ceil() as usize;
1101
1102 let mut results: Vec<(u32, u16, f32)> = if let Some(index) = ann_index {
1103 match index {
1105 VectorIndex::RaBitQ(lazy) => {
1106 let rabitq = lazy.get().ok_or_else(|| {
1107 Error::Schema("RaBitQ index deserialization failed".to_string())
1108 })?;
1109 rabitq
1110 .search(query, fetch_k)
1111 .into_iter()
1112 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1113 .collect()
1114 }
1115 VectorIndex::IVF(lazy) => {
1116 let (index, codebook) = lazy.get().ok_or_else(|| {
1117 Error::Schema("IVF index deserialization failed".to_string())
1118 })?;
1119 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1120 Error::Schema(format!(
1121 "IVF index requires coarse centroids for field {}",
1122 field.0
1123 ))
1124 })?;
1125 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1126 index
1127 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1128 .into_iter()
1129 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1130 .collect()
1131 }
1132 VectorIndex::ScaNN(lazy) => {
1133 let (index, codebook) = lazy.get().ok_or_else(|| {
1134 Error::Schema("ScaNN index deserialization failed".to_string())
1135 })?;
1136 let centroids = self.coarse_centroids.get(&field.0).ok_or_else(|| {
1137 Error::Schema(format!(
1138 "ScaNN index requires coarse centroids for field {}",
1139 field.0
1140 ))
1141 })?;
1142 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
1143 index
1144 .search(centroids, codebook, query, fetch_k, Some(effective_nprobe))
1145 .into_iter()
1146 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
1147 .collect()
1148 }
1149 }
1150 } else if let Some(lazy_flat) = lazy_flat {
1151 let dim = lazy_flat.dim;
1153 let n = lazy_flat.num_vectors;
1154 let quant = lazy_flat.quantization;
1155 let mut collector = crate::query::ScoreCollector::new(fetch_k);
1156 let mut scores = vec![0f32; BRUTE_FORCE_BATCH];
1157
1158 for batch_start in (0..n).step_by(BRUTE_FORCE_BATCH) {
1159 let batch_count = BRUTE_FORCE_BATCH.min(n - batch_start);
1160 let batch_bytes = lazy_flat
1161 .read_vectors_batch_sync(batch_start, batch_count)
1162 .map_err(crate::Error::Io)?;
1163 let raw = batch_bytes.as_slice();
1164
1165 Self::score_quantized_batch(
1166 query,
1167 raw,
1168 quant,
1169 dim,
1170 &mut scores[..batch_count],
1171 unit_norm,
1172 );
1173
1174 for (i, &score) in scores.iter().enumerate().take(batch_count) {
1175 let (doc_id, ordinal) = lazy_flat.get_doc_id(batch_start + i);
1176 collector.insert_with_ordinal(doc_id, score, ordinal);
1177 }
1178 }
1179
1180 collector
1181 .into_sorted_results()
1182 .into_iter()
1183 .map(|(doc_id, score, ordinal)| (doc_id, ordinal, score))
1184 .collect()
1185 } else {
1186 return Ok(Vec::new());
1187 };
1188
1189 if ann_index.is_some()
1191 && !results.is_empty()
1192 && let Some(lazy_flat) = lazy_flat
1193 {
1194 let dim = lazy_flat.dim;
1195 let quant = lazy_flat.quantization;
1196 let vbs = lazy_flat.vector_byte_size();
1197
1198 let mut resolved: Vec<(usize, usize)> = Vec::new();
1199 for (ri, c) in results.iter().enumerate() {
1200 let (start, entries) = lazy_flat.flat_indexes_for_doc(c.0);
1201 for (j, &(_, ord)) in entries.iter().enumerate() {
1202 if ord == c.1 {
1203 resolved.push((ri, start + j));
1204 break;
1205 }
1206 }
1207 }
1208
1209 if !resolved.is_empty() {
1210 resolved.sort_unstable_by_key(|&(_, flat_idx)| flat_idx);
1211 let mut raw_buf = vec![0u8; resolved.len() * vbs];
1212 for (buf_idx, &(_, flat_idx)) in resolved.iter().enumerate() {
1213 let _ = lazy_flat.read_vector_raw_into_sync(
1214 flat_idx,
1215 &mut raw_buf[buf_idx * vbs..(buf_idx + 1) * vbs],
1216 );
1217 }
1218
1219 let mut scores = vec![0f32; resolved.len()];
1220 Self::score_quantized_batch(query, &raw_buf, quant, dim, &mut scores, unit_norm);
1221
1222 for (buf_idx, &(ri, _)) in resolved.iter().enumerate() {
1223 results[ri].2 = scores[buf_idx];
1224 }
1225 }
1226
1227 results.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
1228 results.truncate(fetch_k);
1229 }
1230
1231 Ok(combine_ordinal_results(results, combiner, k))
1232 }
1233
1234 pub fn search_sparse_vector_sync(
1236 &self,
1237 field: Field,
1238 vector: &[(u32, f32)],
1239 limit: usize,
1240 combiner: crate::query::MultiValueCombiner,
1241 heap_factor: f32,
1242 ) -> Result<Vec<VectorSearchResult>> {
1243 use crate::query::MaxScoreExecutor;
1244
1245 let sparse_index = match self.sparse_indexes.get(&field.0) {
1246 Some(idx) => idx,
1247 None => return Ok(Vec::new()),
1248 };
1249
1250 let mut matched_terms: Vec<(u32, f32)> = Vec::with_capacity(vector.len());
1251 for &(dim_id, query_weight) in vector {
1252 if sparse_index.has_dimension(dim_id) {
1253 matched_terms.push((dim_id, query_weight));
1254 }
1255 }
1256
1257 if matched_terms.is_empty() {
1258 return Ok(Vec::new());
1259 }
1260
1261 let num_terms = matched_terms.len();
1262 let over_fetch = limit * 2;
1263 let raw_results = if num_terms > 12 {
1264 crate::query::BmpExecutor::new(sparse_index, matched_terms, over_fetch, heap_factor)
1265 .execute_sync()?
1266 } else {
1267 MaxScoreExecutor::sparse(sparse_index, matched_terms, over_fetch, heap_factor)
1268 .execute_sync()?
1269 };
1270
1271 Ok(combine_ordinal_results(
1272 raw_results
1273 .into_iter()
1274 .map(|r| (r.doc_id, r.ordinal, r.score)),
1275 combiner,
1276 limit,
1277 ))
1278 }
1279}