1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11 IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17use super::vector_data::FlatVectorData;
18
19#[derive(Clone)]
21#[allow(clippy::upper_case_acronyms)]
22pub enum VectorIndex {
23 Flat(Arc<FlatVectorData>),
25 RaBitQ(Arc<RaBitQIndex>),
27 IVF {
29 index: Arc<IVFRaBitQIndex>,
30 codebook: Arc<RaBitQCodebook>,
31 },
32 ScaNN {
34 index: Arc<IVFPQIndex>,
35 codebook: Arc<PQCodebook>,
36 },
37}
38
39#[derive(Clone)]
41pub struct SparseIndex {
42 pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
45 pub total_docs: u32,
47}
48
49impl SparseIndex {
50 #[inline]
55 pub fn idf(&self, dim_id: u32) -> f32 {
56 if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
57 let df = pl.doc_count() as f32;
58 if df > 0.0 {
59 (self.total_docs as f32 / df).ln()
60 } else {
61 0.0
62 }
63 } else {
64 0.0
65 }
66 }
67
68 pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
70 dim_ids.iter().map(|&d| self.idf(d)).collect()
71 }
72}
73
74pub struct AsyncSegmentReader {
80 meta: SegmentMeta,
81 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
83 postings_handle: LazyFileHandle,
85 store: Arc<AsyncStoreReader>,
87 schema: Arc<Schema>,
88 doc_id_offset: DocId,
90 vector_indexes: FxHashMap<u32, VectorIndex>,
92 coarse_centroids: Option<Arc<CoarseCentroids>>,
94 sparse_indexes: FxHashMap<u32, SparseIndex>,
96 positions_handle: Option<LazyFileHandle>,
98}
99
100impl AsyncSegmentReader {
101 pub async fn open<D: Directory>(
103 dir: &D,
104 segment_id: SegmentId,
105 schema: Arc<Schema>,
106 doc_id_offset: DocId,
107 cache_blocks: usize,
108 ) -> Result<Self> {
109 let files = SegmentFiles::new(segment_id.0);
110
111 let meta_slice = dir.open_read(&files.meta).await?;
113 let meta_bytes = meta_slice.read_bytes().await?;
114 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
115 debug_assert_eq!(meta.id, segment_id.0);
116
117 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
119 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
120
121 let postings_handle = dir.open_lazy(&files.postings).await?;
123
124 let store_handle = dir.open_lazy(&files.store).await?;
126 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
127
128 let (vector_indexes, coarse_centroids) =
130 Self::load_vectors_file(dir, &files, &schema).await?;
131
132 let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
134
135 let positions_handle = Self::open_positions_file(dir, &files).await?;
137
138 Ok(Self {
139 meta,
140 term_dict: Arc::new(term_dict),
141 postings_handle,
142 store: Arc::new(store),
143 schema,
144 doc_id_offset,
145 vector_indexes,
146 coarse_centroids,
147 sparse_indexes,
148 positions_handle,
149 })
150 }
151
152 pub fn meta(&self) -> &SegmentMeta {
153 &self.meta
154 }
155
156 pub fn num_docs(&self) -> u32 {
157 self.meta.num_docs
158 }
159
160 pub fn avg_field_len(&self, field: Field) -> f32 {
162 self.meta.avg_field_len(field)
163 }
164
165 pub fn doc_id_offset(&self) -> DocId {
166 self.doc_id_offset
167 }
168
169 pub fn schema(&self) -> &Schema {
170 &self.schema
171 }
172
173 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
175 &self.sparse_indexes
176 }
177
178 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
180 &self.vector_indexes
181 }
182
183 pub fn term_dict_stats(&self) -> SSTableStats {
185 self.term_dict.stats()
186 }
187
188 pub async fn get_postings(
193 &self,
194 field: Field,
195 term: &[u8],
196 ) -> Result<Option<BlockPostingList>> {
197 log::debug!(
198 "SegmentReader::get_postings field={} term_len={}",
199 field.0,
200 term.len()
201 );
202
203 let mut key = Vec::with_capacity(4 + term.len());
205 key.extend_from_slice(&field.0.to_le_bytes());
206 key.extend_from_slice(term);
207
208 let term_info = match self.term_dict.get(&key).await? {
210 Some(info) => {
211 log::debug!("SegmentReader::get_postings found term_info");
212 info
213 }
214 None => {
215 log::debug!("SegmentReader::get_postings term not found");
216 return Ok(None);
217 }
218 };
219
220 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
222 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
224 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
225 posting_list.push(doc_id, tf);
226 }
227 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
228 return Ok(Some(block_list));
229 }
230
231 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
233 Error::Corruption("TermInfo has neither inline nor external data".to_string())
234 })?;
235
236 let start = posting_offset;
237 let end = start + posting_len as u64;
238
239 if end > self.postings_handle.len() {
240 return Err(Error::Corruption(
241 "Posting offset out of bounds".to_string(),
242 ));
243 }
244
245 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
246 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
247
248 Ok(Some(block_list))
249 }
250
251 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
253 self.store
254 .get(local_doc_id, &self.schema)
255 .await
256 .map_err(Error::from)
257 }
258
259 pub async fn prefetch_terms(
261 &self,
262 field: Field,
263 start_term: &[u8],
264 end_term: &[u8],
265 ) -> Result<()> {
266 let mut start_key = Vec::with_capacity(4 + start_term.len());
267 start_key.extend_from_slice(&field.0.to_le_bytes());
268 start_key.extend_from_slice(start_term);
269
270 let mut end_key = Vec::with_capacity(4 + end_term.len());
271 end_key.extend_from_slice(&field.0.to_le_bytes());
272 end_key.extend_from_slice(end_term);
273
274 self.term_dict.prefetch_range(&start_key, &end_key).await?;
275 Ok(())
276 }
277
278 pub fn store_has_dict(&self) -> bool {
280 self.store.has_dict()
281 }
282
283 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
285 self.store.raw_blocks()
286 }
287
288 pub fn store_data_slice(&self) -> &LazyFileSlice {
290 self.store.data_slice()
291 }
292
293 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
295 self.term_dict.all_entries().await.map_err(Error::from)
296 }
297
298 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
303 let entries = self.term_dict.all_entries().await?;
304 let mut result = Vec::with_capacity(entries.len());
305
306 for (key, term_info) in entries {
307 if key.len() > 4 {
309 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
310 let term_bytes = &key[4..];
311 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
312 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
313 }
314 }
315 }
316
317 Ok(result)
318 }
319
320 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
322 self.term_dict.iter()
323 }
324
325 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
327 let start = offset;
328 let end = start + len as u64;
329 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
330 Ok(bytes.to_vec())
331 }
332
333 pub fn search_dense_vector(
340 &self,
341 field: Field,
342 query: &[f32],
343 k: usize,
344 rerank_factor: usize,
345 combiner: crate::query::MultiValueCombiner,
346 ) -> Result<Vec<(DocId, f32)>> {
347 use crate::query::MultiValueCombiner;
348 let index = self
349 .vector_indexes
350 .get(&field.0)
351 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
352
353 let mrl_dim = self
355 .schema
356 .get_field_entry(field)
357 .and_then(|e| e.dense_vector_config.as_ref())
358 .and_then(|c| c.mrl_dim);
359
360 let query_vec: Vec<f32>;
362 let effective_query = if let Some(trim_dim) = mrl_dim {
363 if trim_dim < query.len() {
364 query_vec = query[..trim_dim].to_vec();
365 query_vec.as_slice()
366 } else {
367 query
368 }
369 } else {
370 query
371 };
372
373 let results: Vec<(u32, f32)> = match index {
374 VectorIndex::Flat(flat_data) => {
375 use crate::structures::simd::squared_euclidean_distance;
377
378 let mut candidates: Vec<(u32, f32)> = flat_data
379 .vectors
380 .iter()
381 .zip(flat_data.doc_ids.iter())
382 .map(|(vec, &doc_id)| {
383 let dist = squared_euclidean_distance(effective_query, vec);
384 (doc_id, dist)
385 })
386 .collect();
387 candidates
388 .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
389 candidates.truncate(k);
390 candidates
391 }
392 VectorIndex::RaBitQ(rabitq) => rabitq
393 .search(effective_query, k, rerank_factor)
394 .into_iter()
395 .map(|(idx, dist)| (idx as u32, dist))
396 .collect(),
397 VectorIndex::IVF { index, codebook } => {
398 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
399 Error::Schema("IVF index requires coarse centroids".to_string())
400 })?;
401 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
403 }
404 VectorIndex::ScaNN { index, codebook } => {
405 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
406 Error::Schema("ScaNN index requires coarse centroids".to_string())
407 })?;
408 let nprobe = rerank_factor.max(32);
409 index.search(centroids, codebook, effective_query, k, Some(nprobe))
410 }
411 };
412
413 let raw_results: Vec<(DocId, f32)> = results
416 .into_iter()
417 .map(|(idx, dist)| {
418 let doc_id = idx as DocId + self.doc_id_offset;
419 let score = 1.0 / (1.0 + dist); (doc_id, score)
421 })
422 .collect();
423
424 let mut combined: rustc_hash::FxHashMap<DocId, (f32, u32)> =
426 rustc_hash::FxHashMap::default();
427 for (doc_id, score) in raw_results {
428 combined
429 .entry(doc_id)
430 .and_modify(|(acc_score, count)| match combiner {
431 MultiValueCombiner::Sum => *acc_score += score,
432 MultiValueCombiner::Max => *acc_score = acc_score.max(score),
433 MultiValueCombiner::Avg => {
434 *acc_score += score;
435 *count += 1;
436 }
437 })
438 .or_insert((score, 1));
439 }
440
441 let mut final_results: Vec<(DocId, f32)> = combined
443 .into_iter()
444 .map(|(doc_id, (score, count))| {
445 let final_score = if combiner == MultiValueCombiner::Avg {
446 score / count as f32
447 } else {
448 score
449 };
450 (doc_id, final_score)
451 })
452 .collect();
453
454 final_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
456 final_results.truncate(k);
457
458 Ok(final_results)
459 }
460
461 pub fn has_dense_vector_index(&self, field: Field) -> bool {
463 self.vector_indexes.contains_key(&field.0)
464 }
465
466 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
468 match self.vector_indexes.get(&field.0) {
469 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
470 _ => None,
471 }
472 }
473
474 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
476 match self.vector_indexes.get(&field.0) {
477 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
478 _ => None,
479 }
480 }
481
482 pub fn get_scann_vector_index(
484 &self,
485 field: Field,
486 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
487 match self.vector_indexes.get(&field.0) {
488 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
489 _ => None,
490 }
491 }
492
493 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
495 self.vector_indexes.get(&field.0)
496 }
497
498 pub async fn search_sparse_vector(
508 &self,
509 field: Field,
510 vector: &[(u32, f32)],
511 limit: usize,
512 combiner: crate::query::MultiValueCombiner,
513 ) -> Result<Vec<(u32, f32)>> {
514 use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
515
516 let sparse_index = match self.sparse_indexes.get(&field.0) {
518 Some(idx) => idx,
519 None => return Ok(Vec::new()),
520 };
521
522 let scorers: Vec<SparseTermScorer> = vector
524 .iter()
525 .filter_map(|&(dim_id, query_weight)| {
526 sparse_index
528 .postings
529 .get(dim_id as usize)
530 .and_then(|opt| opt.as_ref())
531 .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
532 })
533 .collect();
534
535 if scorers.is_empty() {
536 return Ok(Vec::new());
537 }
538
539 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); let mut combined: rustc_hash::FxHashMap<u32, (f32, u32)> = rustc_hash::FxHashMap::default();
546 for r in raw_results {
547 combined
548 .entry(r.doc_id)
549 .and_modify(|(score, count)| match combiner {
550 MultiValueCombiner::Sum => *score += r.score,
551 MultiValueCombiner::Max => *score = score.max(r.score),
552 MultiValueCombiner::Avg => {
553 *score += r.score;
554 *count += 1;
555 }
556 })
557 .or_insert((r.score, 1));
558 }
559
560 let mut results: Vec<(u32, f32)> = combined
562 .into_iter()
563 .map(|(doc_id, (score, count))| {
564 let final_score = if combiner == MultiValueCombiner::Avg {
565 score / count as f32
566 } else {
567 score
568 };
569 (doc_id, final_score)
570 })
571 .collect();
572
573 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
575 results.truncate(limit);
576
577 Ok(results)
578 }
579
580 async fn load_vectors_file<D: Directory>(
588 dir: &D,
589 files: &SegmentFiles,
590 schema: &Schema,
591 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
592 use byteorder::{LittleEndian, ReadBytesExt};
593 use std::io::Cursor;
594
595 let mut indexes = FxHashMap::default();
596 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
597
598 let has_dense_vectors = schema
600 .fields()
601 .any(|(_, entry)| entry.dense_vector_config.is_some());
602 if !has_dense_vectors {
603 return Ok((indexes, None));
604 }
605
606 let handle = match dir.open_lazy(&files.vectors).await {
608 Ok(h) => h,
609 Err(_) => return Ok((indexes, None)),
610 };
611
612 let header_bytes = match handle.read_bytes_range(0..4).await {
614 Ok(b) => b,
615 Err(_) => return Ok((indexes, None)),
616 };
617
618 if header_bytes.is_empty() {
619 return Ok((indexes, None));
620 }
621
622 let mut cursor = Cursor::new(header_bytes.as_slice());
623 let num_fields = cursor.read_u32::<LittleEndian>()?;
624
625 if num_fields == 0 {
626 return Ok((indexes, None));
627 }
628
629 let entries_size = num_fields as u64 * 21;
631 let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
632 let mut cursor = Cursor::new(entries_bytes.as_slice());
633
634 let mut entries = Vec::with_capacity(num_fields as usize);
636 for _ in 0..num_fields {
637 let field_id = cursor.read_u32::<LittleEndian>()?;
638 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
641 let length = cursor.read_u64::<LittleEndian>()?;
642 entries.push((field_id, index_type, offset, length));
643 }
644
645 for (field_id, index_type, offset, length) in entries {
647 let data = handle.read_bytes_range(offset..offset + length).await?;
649 let _field = crate::dsl::Field(field_id);
650
651 match index_type {
652 3 => {
653 if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
655 {
656 indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
657 }
658 }
659 2 => {
660 use super::vector_data::ScaNNIndexData;
662 if let Ok(scann_data) = ScaNNIndexData::from_bytes(data.as_slice()) {
663 coarse_centroids = Some(Arc::new(scann_data.centroids));
664 indexes.insert(
665 field_id,
666 VectorIndex::ScaNN {
667 index: Arc::new(scann_data.index),
668 codebook: Arc::new(scann_data.codebook),
669 },
670 );
671 }
672 }
673 1 => {
674 use super::vector_data::IVFRaBitQIndexData;
676 if let Ok(ivf_data) = IVFRaBitQIndexData::from_bytes(data.as_slice()) {
677 coarse_centroids = Some(Arc::new(ivf_data.centroids));
678 indexes.insert(
679 field_id,
680 VectorIndex::IVF {
681 index: Arc::new(ivf_data.index),
682 codebook: Arc::new(ivf_data.codebook),
683 },
684 );
685 }
686 }
687 0 => {
688 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
690 {
691 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
692 }
693 }
694 _ => {
695 if let Ok(flat_data) = serde_json::from_slice::<FlatVectorData>(data.as_slice())
697 {
698 indexes.insert(field_id, VectorIndex::Flat(Arc::new(flat_data)));
699 } else if let Ok(rabitq_index) =
700 serde_json::from_slice::<RaBitQIndex>(data.as_slice())
701 {
702 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
703 }
704 }
705 }
706 }
707
708 Ok((indexes, coarse_centroids))
709 }
710
711 async fn load_sparse_file<D: Directory>(
723 dir: &D,
724 files: &SegmentFiles,
725 total_docs: u32,
726 ) -> Result<FxHashMap<u32, SparseIndex>> {
727 use byteorder::{LittleEndian, ReadBytesExt};
728 use std::io::Cursor;
729
730 let mut indexes = FxHashMap::default();
731
732 let handle = match dir.open_lazy(&files.sparse).await {
734 Ok(h) => h,
735 Err(_) => return Ok(indexes),
736 };
737
738 let data = match handle.read_bytes().await {
740 Ok(d) => d,
741 Err(_) => return Ok(indexes),
742 };
743
744 if data.len() < 4 {
745 return Ok(indexes);
746 }
747
748 let mut cursor = Cursor::new(data.as_slice());
749 let num_fields = cursor.read_u32::<LittleEndian>()?;
750
751 if num_fields == 0 {
752 return Ok(indexes);
753 }
754
755 for _ in 0..num_fields {
757 let field_id = cursor.read_u32::<LittleEndian>()?;
758 let _quantization = cursor.read_u8()?; let max_dim_id = cursor.read_u32::<LittleEndian>()?;
760
761 let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
763 vec![None; max_dim_id as usize];
764
765 for dim_id in 0..max_dim_id {
766 let offset = cursor.read_u64::<LittleEndian>()?;
767 let length = cursor.read_u32::<LittleEndian>()?;
768
769 if length > 0 {
771 let start = offset as usize;
772 let end = start + length as usize;
773 if end <= data.len() {
774 let posting_data = &data.as_slice()[start..end];
775 if let Ok(posting_list) =
776 BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
777 {
778 postings[dim_id as usize] = Some(Arc::new(posting_list));
779 }
780 }
781 }
782 }
783
784 indexes.insert(
785 field_id,
786 SparseIndex {
787 postings,
788 total_docs,
789 },
790 );
791 }
792
793 Ok(indexes)
794 }
795
796 async fn open_positions_file<D: Directory>(
801 dir: &D,
802 files: &SegmentFiles,
803 ) -> Result<Option<LazyFileHandle>> {
804 match dir.open_lazy(&files.positions).await {
806 Ok(h) => Ok(Some(h)),
807 Err(_) => Ok(None),
808 }
809 }
810
811 pub async fn get_positions(
816 &self,
817 field: Field,
818 term: &[u8],
819 ) -> Result<Option<crate::structures::PositionPostingList>> {
820 use std::io::Cursor;
821
822 let handle = match &self.positions_handle {
824 Some(h) => h,
825 None => return Ok(None),
826 };
827
828 let mut key = Vec::with_capacity(4 + term.len());
830 key.extend_from_slice(&field.0.to_le_bytes());
831 key.extend_from_slice(term);
832
833 let term_info = match self.term_dict.get(&key).await? {
835 Some(info) => info,
836 None => return Ok(None),
837 };
838
839 let (offset, length) = match term_info.position_info() {
841 Some((o, l)) => (o, l),
842 None => return Ok(None),
843 };
844
845 let slice = handle.slice(offset..offset + length as u64);
847 let data = slice.read_bytes().await?;
848
849 let mut cursor = Cursor::new(data.as_slice());
851 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
852
853 Ok(Some(pos_list))
854 }
855
856 pub fn has_positions(&self, field: Field) -> bool {
858 if let Some(entry) = self.schema.get_field_entry(field) {
860 entry.positions.is_some()
861 } else {
862 false
863 }
864 }
865}
866
867pub type SegmentReader = AsyncSegmentReader;