1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11 IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18use super::builder::FlatVectorData;
19
20#[derive(Clone)]
22#[allow(clippy::upper_case_acronyms)]
23pub enum VectorIndex {
24 Flat(Arc<FlatVectorData>),
26 RaBitQ(Arc<RaBitQIndex>),
28 IVF {
30 index: Arc<IVFRaBitQIndex>,
31 codebook: Arc<RaBitQCodebook>,
32 },
33 ScaNN {
35 index: Arc<IVFPQIndex>,
36 codebook: Arc<PQCodebook>,
37 },
38}
39
40#[derive(Clone)]
42pub struct SparseIndex {
43 pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
46 pub total_docs: u32,
48}
49
50impl SparseIndex {
51 #[inline]
56 pub fn idf(&self, dim_id: u32) -> f32 {
57 if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
58 let df = pl.doc_count() as f32;
59 if df > 0.0 {
60 (self.total_docs as f32 / df).ln()
61 } else {
62 0.0
63 }
64 } else {
65 0.0
66 }
67 }
68
69 pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
71 dim_ids.iter().map(|&d| self.idf(d)).collect()
72 }
73}
74
75pub struct AsyncSegmentReader {
81 meta: SegmentMeta,
82 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
84 postings_handle: LazyFileHandle,
86 store: Arc<AsyncStoreReader>,
88 schema: Arc<Schema>,
89 doc_id_offset: DocId,
91 vector_indexes: FxHashMap<u32, VectorIndex>,
93 coarse_centroids: Option<Arc<CoarseCentroids>>,
95 sparse_indexes: FxHashMap<u32, SparseIndex>,
97 positions_handle: Option<LazyFileHandle>,
99}
100
101impl AsyncSegmentReader {
102 pub async fn open<D: Directory>(
104 dir: &D,
105 segment_id: SegmentId,
106 schema: Arc<Schema>,
107 doc_id_offset: DocId,
108 cache_blocks: usize,
109 ) -> Result<Self> {
110 let files = SegmentFiles::new(segment_id.0);
111
112 let meta_slice = dir.open_read(&files.meta).await?;
114 let meta_bytes = meta_slice.read_bytes().await?;
115 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
116 debug_assert_eq!(meta.id, segment_id.0);
117
118 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
120 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
121
122 let postings_handle = dir.open_lazy(&files.postings).await?;
124
125 let store_handle = dir.open_lazy(&files.store).await?;
127 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
128
129 let (vector_indexes, coarse_centroids) =
131 Self::load_vectors_file(dir, &files, &schema).await?;
132
133 let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
135
136 let positions_handle = Self::open_positions_file(dir, &files).await?;
138
139 Ok(Self {
140 meta,
141 term_dict: Arc::new(term_dict),
142 postings_handle,
143 store: Arc::new(store),
144 schema,
145 doc_id_offset,
146 vector_indexes,
147 coarse_centroids,
148 sparse_indexes,
149 positions_handle,
150 })
151 }
152
153 pub fn meta(&self) -> &SegmentMeta {
154 &self.meta
155 }
156
157 pub fn num_docs(&self) -> u32 {
158 self.meta.num_docs
159 }
160
161 pub fn avg_field_len(&self, field: Field) -> f32 {
163 self.meta.avg_field_len(field)
164 }
165
166 pub fn doc_id_offset(&self) -> DocId {
167 self.doc_id_offset
168 }
169
170 pub fn schema(&self) -> &Schema {
171 &self.schema
172 }
173
174 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
176 &self.sparse_indexes
177 }
178
179 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
181 &self.vector_indexes
182 }
183
184 pub fn term_dict_stats(&self) -> SSTableStats {
186 self.term_dict.stats()
187 }
188
189 pub async fn get_postings(
194 &self,
195 field: Field,
196 term: &[u8],
197 ) -> Result<Option<BlockPostingList>> {
198 log::debug!(
199 "SegmentReader::get_postings field={} term_len={}",
200 field.0,
201 term.len()
202 );
203
204 let mut key = Vec::with_capacity(4 + term.len());
206 key.extend_from_slice(&field.0.to_le_bytes());
207 key.extend_from_slice(term);
208
209 let term_info = match self.term_dict.get(&key).await? {
211 Some(info) => {
212 log::debug!("SegmentReader::get_postings found term_info");
213 info
214 }
215 None => {
216 log::debug!("SegmentReader::get_postings term not found");
217 return Ok(None);
218 }
219 };
220
221 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
223 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
225 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
226 posting_list.push(doc_id, tf);
227 }
228 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
229 return Ok(Some(block_list));
230 }
231
232 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
234 Error::Corruption("TermInfo has neither inline nor external data".to_string())
235 })?;
236
237 let start = posting_offset;
238 let end = start + posting_len as u64;
239
240 if end > self.postings_handle.len() {
241 return Err(Error::Corruption(
242 "Posting offset out of bounds".to_string(),
243 ));
244 }
245
246 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
247 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
248
249 Ok(Some(block_list))
250 }
251
252 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
254 self.store
255 .get(local_doc_id, &self.schema)
256 .await
257 .map_err(Error::from)
258 }
259
260 pub async fn prefetch_terms(
262 &self,
263 field: Field,
264 start_term: &[u8],
265 end_term: &[u8],
266 ) -> Result<()> {
267 let mut start_key = Vec::with_capacity(4 + start_term.len());
268 start_key.extend_from_slice(&field.0.to_le_bytes());
269 start_key.extend_from_slice(start_term);
270
271 let mut end_key = Vec::with_capacity(4 + end_term.len());
272 end_key.extend_from_slice(&field.0.to_le_bytes());
273 end_key.extend_from_slice(end_term);
274
275 self.term_dict.prefetch_range(&start_key, &end_key).await?;
276 Ok(())
277 }
278
279 pub fn store_has_dict(&self) -> bool {
281 self.store.has_dict()
282 }
283
284 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
286 self.store.raw_blocks()
287 }
288
289 pub fn store_data_slice(&self) -> &LazyFileSlice {
291 self.store.data_slice()
292 }
293
294 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
296 self.term_dict.all_entries().await.map_err(Error::from)
297 }
298
299 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
304 let entries = self.term_dict.all_entries().await?;
305 let mut result = Vec::with_capacity(entries.len());
306
307 for (key, term_info) in entries {
308 if key.len() > 4 {
310 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
311 let term_bytes = &key[4..];
312 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
313 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
314 }
315 }
316 }
317
318 Ok(result)
319 }
320
321 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
323 self.term_dict.iter()
324 }
325
326 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
328 let start = offset;
329 let end = start + len as u64;
330 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
331 Ok(bytes.to_vec())
332 }
333
334 pub fn search_dense_vector(
341 &self,
342 field: Field,
343 query: &[f32],
344 k: usize,
345 rerank_factor: usize,
346 combiner: crate::query::MultiValueCombiner,
347 ) -> Result<Vec<(DocId, f32)>> {
348 use crate::query::MultiValueCombiner;
349 let index = self
350 .vector_indexes
351 .get(&field.0)
352 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
353
354 let mrl_dim = self
356 .schema
357 .get_field_entry(field)
358 .and_then(|e| e.dense_vector_config.as_ref())
359 .and_then(|c| c.mrl_dim);
360
361 let query_vec: Vec<f32>;
363 let effective_query = if let Some(trim_dim) = mrl_dim {
364 if trim_dim < query.len() {
365 query_vec = query[..trim_dim].to_vec();
366 query_vec.as_slice()
367 } else {
368 query
369 }
370 } else {
371 query
372 };
373
374 let results: Vec<(u32, f32)> = match index {
375 VectorIndex::Flat(flat_data) => {
376 use crate::structures::simd::squared_euclidean_distance;
378
379 let mut candidates: Vec<(u32, f32)> = flat_data
380 .vectors
381 .iter()
382 .zip(flat_data.doc_ids.iter())
383 .map(|(vec, &doc_id)| {
384 let dist = squared_euclidean_distance(effective_query, vec);
385 (doc_id, dist)
386 })
387 .collect();
388 candidates
389 .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
390 candidates.truncate(k);
391 candidates
392 }
393 VectorIndex::RaBitQ(rabitq) => rabitq
394 .search(effective_query, k, rerank_factor)
395 .into_iter()
396 .map(|(idx, dist)| (idx as u32, dist))
397 .collect(),
398 VectorIndex::IVF { index, codebook } => {
399 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
400 Error::Schema("IVF index requires coarse centroids".to_string())
401 })?;
402 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
404 }
405 VectorIndex::ScaNN { index, codebook } => {
406 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
407 Error::Schema("ScaNN index requires coarse centroids".to_string())
408 })?;
409 let nprobe = rerank_factor.max(32);
410 index.search(centroids, codebook, effective_query, k, Some(nprobe))
411 }
412 };
413
414 let raw_results: Vec<(DocId, f32)> = results
417 .into_iter()
418 .map(|(idx, dist)| {
419 let doc_id = idx as DocId + self.doc_id_offset;
420 let score = 1.0 / (1.0 + dist); (doc_id, score)
422 })
423 .collect();
424
425 let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
427 rustc_hash::FxHashMap::default();
428 for (doc_id, score) in raw_results {
429 combined
430 .entry(doc_id)
431 .and_modify(|(acc_score, count)| match combiner {
432 MultiValueCombiner::Sum => *acc_score += score,
433 MultiValueCombiner::Max => *acc_score = acc_score.max(score),
434 MultiValueCombiner::Avg => {
435 *acc_score += score;
436 *count += 1;
437 }
438 })
439 .or_insert((score, 1));
440 }
441
442 let mut final_results: Vec<(DocId, f32)> = combined
444 .into_iter()
445 .map(|(doc_id, (score, count))| {
446 let final_score = if combiner == MultiValueCombiner::Avg {
447 score / count as f32
448 } else {
449 score
450 };
451 (doc_id, final_score)
452 })
453 .collect();
454
455 final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
457 final_results.truncate(k);
458
459 Ok(final_results)
460 }
461
462 pub fn has_dense_vector_index(&self, field: Field) -> bool {
464 self.vector_indexes.contains_key(&field.0)
465 }
466
467 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
469 match self.vector_indexes.get(&field.0) {
470 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
471 _ => None,
472 }
473 }
474
475 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
477 match self.vector_indexes.get(&field.0) {
478 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
479 _ => None,
480 }
481 }
482
483 pub fn get_scann_vector_index(
485 &self,
486 field: Field,
487 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
488 match self.vector_indexes.get(&field.0) {
489 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
490 _ => None,
491 }
492 }
493
494 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
496 self.vector_indexes.get(&field.0)
497 }
498
499 pub async fn search_sparse_vector(
509 &self,
510 field: Field,
511 vector: &[(u32, f32)],
512 limit: usize,
513 combiner: crate::query::MultiValueCombiner,
514 ) -> Result<Vec<(u32, f32)>> {
515 use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
516
517 let sparse_index = match self.sparse_indexes.get(&field.0) {
519 Some(idx) => idx,
520 None => return Ok(Vec::new()),
521 };
522
523 let scorers: Vec<SparseTermScorer> = vector
525 .iter()
526 .filter_map(|&(dim_id, query_weight)| {
527 sparse_index
529 .postings
530 .get(dim_id as usize)
531 .and_then(|opt| opt.as_ref())
532 .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
533 })
534 .collect();
535
536 if scorers.is_empty() {
537 return Ok(Vec::new());
538 }
539
540 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
547 for r in raw_results {
548 combined
549 .entry(r.doc_id)
550 .and_modify(|(score, count)| match combiner {
551 MultiValueCombiner::Sum => *score += r.score,
552 MultiValueCombiner::Max => *score = score.max(r.score),
553 MultiValueCombiner::Avg => {
554 *score += r.score;
555 *count += 1;
556 }
557 })
558 .or_insert((r.score, 1));
559 }
560
561 let mut results: Vec<(u32, f32)> = combined
563 .into_iter()
564 .map(|(doc_id, (score, count))| {
565 let final_score = if combiner == MultiValueCombiner::Avg {
566 score / count as f32
567 } else {
568 score
569 };
570 (doc_id, final_score)
571 })
572 .collect();
573
574 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
576 results.truncate(limit);
577
578 Ok(results)
579 }
580
581 async fn load_vectors_file<D: Directory>(
589 dir: &D,
590 files: &SegmentFiles,
591 schema: &Schema,
592 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
593 use byteorder::{LittleEndian, ReadBytesExt};
594 use std::io::Cursor;
595
596 let mut indexes = FxHashMap::default();
597 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
598
599 let has_dense_vectors = schema
601 .fields()
602 .any(|(_, entry)| entry.dense_vector_config.is_some());
603 if !has_dense_vectors {
604 return Ok((indexes, None));
605 }
606
607 let handle = match dir.open_lazy(&files.vectors).await {
609 Ok(h) => h,
610 Err(_) => return Ok((indexes, None)),
611 };
612
613 let header_bytes = match handle.read_bytes_range(0..4).await {
615 Ok(b) => b,
616 Err(_) => return Ok((indexes, None)),
617 };
618
619 if header_bytes.is_empty() {
620 return Ok((indexes, None));
621 }
622
623 let mut cursor = Cursor::new(header_bytes.as_slice());
624 let num_fields = cursor.read_u32::<LittleEndian>()?;
625
626 if num_fields == 0 {
627 return Ok((indexes, None));
628 }
629
630 let entries_size = num_fields as u64 * 21;
632 let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
633 let mut cursor = Cursor::new(entries_bytes.as_slice());
634
635 let mut entries = Vec::with_capacity(num_fields as usize);
637 for _ in 0..num_fields {
638 let field_id = cursor.read_u32::<LittleEndian>()?;
639 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
642 let length = cursor.read_u64::<LittleEndian>()?;
643 entries.push((field_id, index_type, offset, length));
644 }
645
646 for (field_id, index_type, offset, length) in entries {
648 let data = handle.read_bytes_range(offset..offset + length).await?;
650 let _field = crate::dsl::Field(field_id);
651
652 match index_type {
653 3 => {
654 if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
656 {
657 indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
658 }
659 }
660 2 => {
661 use super::builder::ScaNNIndexData;
663 if let Ok(scann_data) = ScaNNIndexData::from_bytes(data.as_slice()) {
664 coarse_centroids = Some(Arc::new(scann_data.centroids));
665 indexes.insert(
666 field_id,
667 VectorIndex::ScaNN {
668 index: Arc::new(scann_data.index),
669 codebook: Arc::new(scann_data.codebook),
670 },
671 );
672 }
673 }
674 1 => {
675 use super::builder::IVFRaBitQIndexData;
677 if let Ok(ivf_data) = IVFRaBitQIndexData::from_bytes(data.as_slice()) {
678 coarse_centroids = Some(Arc::new(ivf_data.centroids));
679 indexes.insert(
680 field_id,
681 VectorIndex::IVF {
682 index: Arc::new(ivf_data.index),
683 codebook: Arc::new(ivf_data.codebook),
684 },
685 );
686 }
687 }
688 0 => {
689 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
691 {
692 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
693 }
694 }
695 _ => {
696 if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
698 {
699 indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
700 } else if let Ok(rabitq_index) =
701 serde_json::from_slice::<RaBitQIndex>(data.as_slice())
702 {
703 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
704 }
705 }
706 }
707 }
708
709 Ok((indexes, coarse_centroids))
710 }
711
712 async fn load_sparse_file<D: Directory>(
724 dir: &D,
725 files: &SegmentFiles,
726 total_docs: u32,
727 ) -> Result<FxHashMap<u32, SparseIndex>> {
728 use byteorder::{LittleEndian, ReadBytesExt};
729 use std::io::Cursor;
730
731 let mut indexes = FxHashMap::default();
732
733 let handle = match dir.open_lazy(&files.sparse).await {
735 Ok(h) => h,
736 Err(_) => return Ok(indexes),
737 };
738
739 let data = match handle.read_bytes().await {
741 Ok(d) => d,
742 Err(_) => return Ok(indexes),
743 };
744
745 if data.len() < 4 {
746 return Ok(indexes);
747 }
748
749 let mut cursor = Cursor::new(data.as_slice());
750 let num_fields = cursor.read_u32::<LittleEndian>()?;
751
752 if num_fields == 0 {
753 return Ok(indexes);
754 }
755
756 for _ in 0..num_fields {
758 let field_id = cursor.read_u32::<LittleEndian>()?;
759 let _quantization = cursor.read_u8()?; let max_dim_id = cursor.read_u32::<LittleEndian>()?;
761
762 let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
764 vec![None; max_dim_id as usize];
765
766 for dim_id in 0..max_dim_id {
767 let offset = cursor.read_u64::<LittleEndian>()?;
768 let length = cursor.read_u32::<LittleEndian>()?;
769
770 if length > 0 {
772 let start = offset as usize;
773 let end = start + length as usize;
774 if end <= data.len() {
775 let posting_data = &data.as_slice()[start..end];
776 if let Ok(posting_list) =
777 BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
778 {
779 postings[dim_id as usize] = Some(Arc::new(posting_list));
780 }
781 }
782 }
783 }
784
785 indexes.insert(
786 field_id,
787 SparseIndex {
788 postings,
789 total_docs,
790 },
791 );
792 }
793
794 Ok(indexes)
795 }
796
797 async fn open_positions_file<D: Directory>(
802 dir: &D,
803 files: &SegmentFiles,
804 ) -> Result<Option<LazyFileHandle>> {
805 match dir.open_lazy(&files.positions).await {
807 Ok(h) => Ok(Some(h)),
808 Err(_) => Ok(None),
809 }
810 }
811
812 pub async fn get_positions(
817 &self,
818 field: Field,
819 term: &[u8],
820 ) -> Result<Option<crate::structures::PositionPostingList>> {
821 use std::io::Cursor;
822
823 let handle = match &self.positions_handle {
825 Some(h) => h,
826 None => return Ok(None),
827 };
828
829 let mut key = Vec::with_capacity(4 + term.len());
831 key.extend_from_slice(&field.0.to_le_bytes());
832 key.extend_from_slice(term);
833
834 let term_info = match self.term_dict.get(&key).await? {
836 Some(info) => info,
837 None => return Ok(None),
838 };
839
840 let (offset, length) = match term_info.position_info() {
842 Some((o, l)) => (o, l),
843 None => return Ok(None),
844 };
845
846 let slice = handle.slice(offset..offset + length as u64);
848 let data = slice.read_bytes().await?;
849
850 let mut cursor = Cursor::new(data.as_slice());
852 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
853
854 Ok(Some(pos_list))
855 }
856
857 pub fn has_positions(&self, field: Field) -> bool {
859 if let Some(entry) = self.schema.get_field_entry(field) {
861 entry.positions.is_some()
862 } else {
863 false
864 }
865 }
866}
867
868pub type SegmentReader = AsyncSegmentReader;