1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11 IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22 RaBitQ(Arc<RaBitQIndex>),
24 IVF {
26 index: Arc<IVFRaBitQIndex>,
27 codebook: Arc<RaBitQCodebook>,
28 },
29 ScaNN {
31 index: Arc<IVFPQIndex>,
32 codebook: Arc<PQCodebook>,
33 },
34}
35
36#[derive(Clone)]
38pub struct SparseIndex {
39 pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
42 pub total_docs: u32,
44}
45
46impl SparseIndex {
47 #[inline]
52 pub fn idf(&self, dim_id: u32) -> f32 {
53 if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
54 let df = pl.doc_count() as f32;
55 if df > 0.0 {
56 (self.total_docs as f32 / df).ln()
57 } else {
58 0.0
59 }
60 } else {
61 0.0
62 }
63 }
64
65 pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
67 dim_ids.iter().map(|&d| self.idf(d)).collect()
68 }
69}
70
71pub struct AsyncSegmentReader {
77 meta: SegmentMeta,
78 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
80 postings_handle: LazyFileHandle,
82 store: Arc<AsyncStoreReader>,
84 schema: Arc<Schema>,
85 doc_id_offset: DocId,
87 vector_indexes: FxHashMap<u32, VectorIndex>,
89 coarse_centroids: Option<Arc<CoarseCentroids>>,
91 sparse_indexes: FxHashMap<u32, SparseIndex>,
93 positions_handle: Option<LazyFileHandle>,
95}
96
97impl AsyncSegmentReader {
98 pub async fn open<D: Directory>(
100 dir: &D,
101 segment_id: SegmentId,
102 schema: Arc<Schema>,
103 doc_id_offset: DocId,
104 cache_blocks: usize,
105 ) -> Result<Self> {
106 let files = SegmentFiles::new(segment_id.0);
107
108 let meta_slice = dir.open_read(&files.meta).await?;
110 let meta_bytes = meta_slice.read_bytes().await?;
111 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
112 debug_assert_eq!(meta.id, segment_id.0);
113
114 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
116 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
117
118 let postings_handle = dir.open_lazy(&files.postings).await?;
120
121 let store_handle = dir.open_lazy(&files.store).await?;
123 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
124
125 let (vector_indexes, coarse_centroids) =
127 Self::load_vectors_file(dir, &files, &schema).await?;
128
129 let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
131
132 let positions_handle = Self::open_positions_file(dir, &files).await?;
134
135 Ok(Self {
136 meta,
137 term_dict: Arc::new(term_dict),
138 postings_handle,
139 store: Arc::new(store),
140 schema,
141 doc_id_offset,
142 vector_indexes,
143 coarse_centroids,
144 sparse_indexes,
145 positions_handle,
146 })
147 }
148
149 pub fn meta(&self) -> &SegmentMeta {
150 &self.meta
151 }
152
153 pub fn num_docs(&self) -> u32 {
154 self.meta.num_docs
155 }
156
157 pub fn avg_field_len(&self, field: Field) -> f32 {
159 self.meta.avg_field_len(field)
160 }
161
162 pub fn doc_id_offset(&self) -> DocId {
163 self.doc_id_offset
164 }
165
166 pub fn schema(&self) -> &Schema {
167 &self.schema
168 }
169
170 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
172 &self.sparse_indexes
173 }
174
175 pub fn term_dict_stats(&self) -> SSTableStats {
177 self.term_dict.stats()
178 }
179
180 pub async fn get_postings(
185 &self,
186 field: Field,
187 term: &[u8],
188 ) -> Result<Option<BlockPostingList>> {
189 log::debug!(
190 "SegmentReader::get_postings field={} term_len={}",
191 field.0,
192 term.len()
193 );
194
195 let mut key = Vec::with_capacity(4 + term.len());
197 key.extend_from_slice(&field.0.to_le_bytes());
198 key.extend_from_slice(term);
199
200 let term_info = match self.term_dict.get(&key).await? {
202 Some(info) => {
203 log::debug!("SegmentReader::get_postings found term_info");
204 info
205 }
206 None => {
207 log::debug!("SegmentReader::get_postings term not found");
208 return Ok(None);
209 }
210 };
211
212 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
214 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
216 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
217 posting_list.push(doc_id, tf);
218 }
219 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
220 return Ok(Some(block_list));
221 }
222
223 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
225 Error::Corruption("TermInfo has neither inline nor external data".to_string())
226 })?;
227
228 let start = posting_offset;
229 let end = start + posting_len as u64;
230
231 if end > self.postings_handle.len() {
232 return Err(Error::Corruption(
233 "Posting offset out of bounds".to_string(),
234 ));
235 }
236
237 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
238 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
239
240 Ok(Some(block_list))
241 }
242
243 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
245 self.store
246 .get(local_doc_id, &self.schema)
247 .await
248 .map_err(Error::from)
249 }
250
251 pub async fn prefetch_terms(
253 &self,
254 field: Field,
255 start_term: &[u8],
256 end_term: &[u8],
257 ) -> Result<()> {
258 let mut start_key = Vec::with_capacity(4 + start_term.len());
259 start_key.extend_from_slice(&field.0.to_le_bytes());
260 start_key.extend_from_slice(start_term);
261
262 let mut end_key = Vec::with_capacity(4 + end_term.len());
263 end_key.extend_from_slice(&field.0.to_le_bytes());
264 end_key.extend_from_slice(end_term);
265
266 self.term_dict.prefetch_range(&start_key, &end_key).await?;
267 Ok(())
268 }
269
270 pub fn store_has_dict(&self) -> bool {
272 self.store.has_dict()
273 }
274
275 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
277 self.store.raw_blocks()
278 }
279
280 pub fn store_data_slice(&self) -> &LazyFileSlice {
282 self.store.data_slice()
283 }
284
285 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
287 self.term_dict.all_entries().await.map_err(Error::from)
288 }
289
290 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
295 let entries = self.term_dict.all_entries().await?;
296 let mut result = Vec::with_capacity(entries.len());
297
298 for (key, term_info) in entries {
299 if key.len() > 4 {
301 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
302 let term_bytes = &key[4..];
303 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
304 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
305 }
306 }
307 }
308
309 Ok(result)
310 }
311
312 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
314 self.term_dict.iter()
315 }
316
317 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
319 let start = offset;
320 let end = start + len as u64;
321 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
322 Ok(bytes.to_vec())
323 }
324
325 pub fn search_dense_vector(
332 &self,
333 field: Field,
334 query: &[f32],
335 k: usize,
336 rerank_factor: usize,
337 combiner: crate::query::MultiValueCombiner,
338 ) -> Result<Vec<(DocId, f32)>> {
339 use crate::query::MultiValueCombiner;
340 let index = self
341 .vector_indexes
342 .get(&field.0)
343 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
344
345 let mrl_dim = self
347 .schema
348 .get_field_entry(field)
349 .and_then(|e| e.dense_vector_config.as_ref())
350 .and_then(|c| c.mrl_dim);
351
352 let query_vec: Vec<f32>;
354 let effective_query = if let Some(trim_dim) = mrl_dim {
355 if trim_dim < query.len() {
356 query_vec = query[..trim_dim].to_vec();
357 query_vec.as_slice()
358 } else {
359 query
360 }
361 } else {
362 query
363 };
364
365 let results: Vec<(u32, f32)> = match index {
366 VectorIndex::RaBitQ(rabitq) => rabitq
367 .search(effective_query, k, rerank_factor)
368 .into_iter()
369 .map(|(idx, dist)| (idx as u32, dist))
370 .collect(),
371 VectorIndex::IVF { index, codebook } => {
372 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
373 Error::Schema("IVF index requires coarse centroids".to_string())
374 })?;
375 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
377 }
378 VectorIndex::ScaNN { index, codebook } => {
379 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
380 Error::Schema("ScaNN index requires coarse centroids".to_string())
381 })?;
382 let nprobe = rerank_factor.max(32);
383 index.search(centroids, codebook, effective_query, k, Some(nprobe))
384 }
385 };
386
387 let raw_results: Vec<(DocId, f32)> = results
390 .into_iter()
391 .map(|(idx, dist)| {
392 let doc_id = idx as DocId + self.doc_id_offset;
393 let score = 1.0 / (1.0 + dist); (doc_id, score)
395 })
396 .collect();
397
398 let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
400 rustc_hash::FxHashMap::default();
401 for (doc_id, score) in raw_results {
402 combined
403 .entry(doc_id)
404 .and_modify(|(acc_score, count)| match combiner {
405 MultiValueCombiner::Sum => *acc_score += score,
406 MultiValueCombiner::Max => *acc_score = acc_score.max(score),
407 MultiValueCombiner::Avg => {
408 *acc_score += score;
409 *count += 1;
410 }
411 })
412 .or_insert((score, 1));
413 }
414
415 let mut final_results: Vec<(DocId, f32)> = combined
417 .into_iter()
418 .map(|(doc_id, (score, count))| {
419 let final_score = if combiner == MultiValueCombiner::Avg {
420 score / count as f32
421 } else {
422 score
423 };
424 (doc_id, final_score)
425 })
426 .collect();
427
428 final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
430 final_results.truncate(k);
431
432 Ok(final_results)
433 }
434
435 pub fn has_dense_vector_index(&self, field: Field) -> bool {
437 self.vector_indexes.contains_key(&field.0)
438 }
439
440 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
442 match self.vector_indexes.get(&field.0) {
443 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
444 _ => None,
445 }
446 }
447
448 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
450 match self.vector_indexes.get(&field.0) {
451 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
452 _ => None,
453 }
454 }
455
456 pub fn get_scann_vector_index(
458 &self,
459 field: Field,
460 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
461 match self.vector_indexes.get(&field.0) {
462 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
463 _ => None,
464 }
465 }
466
467 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
469 self.vector_indexes.get(&field.0)
470 }
471
472 pub async fn search_sparse_vector(
482 &self,
483 field: Field,
484 vector: &[(u32, f32)],
485 limit: usize,
486 combiner: crate::query::MultiValueCombiner,
487 ) -> Result<Vec<(u32, f32)>> {
488 use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
489
490 let sparse_index = match self.sparse_indexes.get(&field.0) {
492 Some(idx) => idx,
493 None => return Ok(Vec::new()),
494 };
495
496 let scorers: Vec<SparseTermScorer> = vector
498 .iter()
499 .filter_map(|&(dim_id, query_weight)| {
500 sparse_index
502 .postings
503 .get(dim_id as usize)
504 .and_then(|opt| opt.as_ref())
505 .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
506 })
507 .collect();
508
509 if scorers.is_empty() {
510 return Ok(Vec::new());
511 }
512
513 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
520 for r in raw_results {
521 combined
522 .entry(r.doc_id)
523 .and_modify(|(score, count)| match combiner {
524 MultiValueCombiner::Sum => *score += r.score,
525 MultiValueCombiner::Max => *score = score.max(r.score),
526 MultiValueCombiner::Avg => {
527 *score += r.score;
528 *count += 1;
529 }
530 })
531 .or_insert((r.score, 1));
532 }
533
534 let mut results: Vec<(u32, f32)> = combined
536 .into_iter()
537 .map(|(doc_id, (score, count))| {
538 let final_score = if combiner == MultiValueCombiner::Avg {
539 score / count as f32
540 } else {
541 score
542 };
543 (doc_id, final_score)
544 })
545 .collect();
546
547 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
549 results.truncate(limit);
550
551 Ok(results)
552 }
553
554 async fn load_vectors_file<D: Directory>(
562 dir: &D,
563 files: &SegmentFiles,
564 schema: &Schema,
565 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
566 use byteorder::{LittleEndian, ReadBytesExt};
567 use std::io::Cursor;
568
569 let mut indexes = FxHashMap::default();
570 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
571
572 let has_dense_vectors = schema
574 .fields()
575 .any(|(_, entry)| entry.dense_vector_config.is_some());
576 if !has_dense_vectors {
577 return Ok((indexes, None));
578 }
579
580 let handle = match dir.open_lazy(&files.vectors).await {
582 Ok(h) => h,
583 Err(_) => return Ok((indexes, None)),
584 };
585
586 let header_bytes = match handle.read_bytes_range(0..4).await {
588 Ok(b) => b,
589 Err(_) => return Ok((indexes, None)),
590 };
591
592 if header_bytes.is_empty() {
593 return Ok((indexes, None));
594 }
595
596 let mut cursor = Cursor::new(header_bytes.as_slice());
597 let num_fields = cursor.read_u32::<LittleEndian>()?;
598
599 if num_fields == 0 {
600 return Ok((indexes, None));
601 }
602
603 let entries_size = num_fields as u64 * 21;
605 let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
606 let mut cursor = Cursor::new(entries_bytes.as_slice());
607
608 let mut entries = Vec::with_capacity(num_fields as usize);
610 for _ in 0..num_fields {
611 let field_id = cursor.read_u32::<LittleEndian>()?;
612 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
615 let length = cursor.read_u64::<LittleEndian>()?;
616 entries.push((field_id, index_type, offset, length));
617 }
618
619 for (field_id, index_type, offset, length) in entries {
621 let data = handle.read_bytes_range(offset..offset + length).await?;
623 let field = crate::dsl::Field(field_id);
624
625 match index_type {
626 2 => {
627 if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
629 if coarse_centroids.is_none()
631 && let Some(entry) = schema.get_field_entry(field)
632 && let Some(ref config) = entry.dense_vector_config
633 && let Some(ref path) = config.coarse_centroids_path
634 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
635 {
636 coarse_centroids = Some(Arc::new(c));
637 }
638
639 if let Some(entry) = schema.get_field_entry(field)
641 && let Some(ref config) = entry.dense_vector_config
642 && let Some(ref path) = config.pq_codebook_path
643 && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
644 {
645 indexes.insert(
646 field_id,
647 VectorIndex::ScaNN {
648 index: Arc::new(ivfpq_index),
649 codebook: Arc::new(codebook),
650 },
651 );
652 }
653 }
654 }
655 1 => {
656 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
658 {
659 if coarse_centroids.is_none()
660 && let Some(entry) = schema.get_field_entry(field)
661 && let Some(ref config) = entry.dense_vector_config
662 && let Some(ref path) = config.coarse_centroids_path
663 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
664 {
665 coarse_centroids = Some(Arc::new(c));
666 }
667 let codebook = Arc::new(RaBitQCodebook::new(
669 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
670 ));
671 indexes.insert(
672 field_id,
673 VectorIndex::IVF {
674 index: Arc::new(ivf_index),
675 codebook,
676 },
677 );
678 }
679 }
680 0 => {
681 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
683 {
684 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
685 }
686 }
687 _ => {
688 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
690 {
691 if coarse_centroids.is_none()
692 && let Some(entry) = schema.get_field_entry(field)
693 && let Some(ref config) = entry.dense_vector_config
694 && let Some(ref path) = config.coarse_centroids_path
695 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
696 {
697 coarse_centroids = Some(Arc::new(c));
698 }
699 let codebook = Arc::new(RaBitQCodebook::new(
701 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
702 ));
703 indexes.insert(
704 field_id,
705 VectorIndex::IVF {
706 index: Arc::new(ivf_index),
707 codebook,
708 },
709 );
710 } else if let Ok(rabitq_index) =
711 serde_json::from_slice::<RaBitQIndex>(data.as_slice())
712 {
713 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
714 }
715 }
716 }
717 }
718
719 Ok((indexes, coarse_centroids))
720 }
721
722 async fn load_sparse_file<D: Directory>(
734 dir: &D,
735 files: &SegmentFiles,
736 total_docs: u32,
737 ) -> Result<FxHashMap<u32, SparseIndex>> {
738 use byteorder::{LittleEndian, ReadBytesExt};
739 use std::io::Cursor;
740
741 let mut indexes = FxHashMap::default();
742
743 let handle = match dir.open_lazy(&files.sparse).await {
745 Ok(h) => h,
746 Err(_) => return Ok(indexes),
747 };
748
749 let data = match handle.read_bytes().await {
751 Ok(d) => d,
752 Err(_) => return Ok(indexes),
753 };
754
755 if data.len() < 4 {
756 return Ok(indexes);
757 }
758
759 let mut cursor = Cursor::new(data.as_slice());
760 let num_fields = cursor.read_u32::<LittleEndian>()?;
761
762 if num_fields == 0 {
763 return Ok(indexes);
764 }
765
766 for _ in 0..num_fields {
768 let field_id = cursor.read_u32::<LittleEndian>()?;
769 let _quantization = cursor.read_u8()?; let max_dim_id = cursor.read_u32::<LittleEndian>()?;
771
772 let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
774 vec![None; max_dim_id as usize];
775
776 for dim_id in 0..max_dim_id {
777 let offset = cursor.read_u64::<LittleEndian>()?;
778 let length = cursor.read_u32::<LittleEndian>()?;
779
780 if length > 0 {
782 let start = offset as usize;
783 let end = start + length as usize;
784 if end <= data.len() {
785 let posting_data = &data.as_slice()[start..end];
786 if let Ok(posting_list) =
787 BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
788 {
789 postings[dim_id as usize] = Some(Arc::new(posting_list));
790 }
791 }
792 }
793 }
794
795 indexes.insert(
796 field_id,
797 SparseIndex {
798 postings,
799 total_docs,
800 },
801 );
802 }
803
804 Ok(indexes)
805 }
806
807 async fn open_positions_file<D: Directory>(
812 dir: &D,
813 files: &SegmentFiles,
814 ) -> Result<Option<LazyFileHandle>> {
815 match dir.open_lazy(&files.positions).await {
817 Ok(h) => Ok(Some(h)),
818 Err(_) => Ok(None),
819 }
820 }
821
822 pub async fn get_positions(
827 &self,
828 field: Field,
829 term: &[u8],
830 ) -> Result<Option<crate::structures::PositionPostingList>> {
831 use std::io::Cursor;
832
833 let handle = match &self.positions_handle {
835 Some(h) => h,
836 None => return Ok(None),
837 };
838
839 let mut key = Vec::with_capacity(4 + term.len());
841 key.extend_from_slice(&field.0.to_le_bytes());
842 key.extend_from_slice(term);
843
844 let term_info = match self.term_dict.get(&key).await? {
846 Some(info) => info,
847 None => return Ok(None),
848 };
849
850 let (offset, length) = match term_info.position_info() {
852 Some((o, l)) => (o, l),
853 None => return Ok(None),
854 };
855
856 let slice = handle.slice(offset..offset + length as u64);
858 let data = slice.read_bytes().await?;
859
860 let mut cursor = Cursor::new(data.as_slice());
862 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
863
864 Ok(Some(pos_list))
865 }
866
867 pub fn has_positions(&self, field: Field) -> bool {
869 if let Some(entry) = self.schema.get_field_entry(field) {
871 entry.positions.is_some()
872 } else {
873 false
874 }
875 }
876}
877
878pub type SegmentReader = AsyncSegmentReader;