1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11 IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22 RaBitQ(Arc<RaBitQIndex>),
24 IVF {
26 index: Arc<IVFRaBitQIndex>,
27 codebook: Arc<RaBitQCodebook>,
28 },
29 ScaNN {
31 index: Arc<IVFPQIndex>,
32 codebook: Arc<PQCodebook>,
33 },
34}
35
36#[derive(Clone)]
38pub struct SparseIndex {
39 pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
42 pub total_docs: u32,
44}
45
46impl SparseIndex {
47 #[inline]
52 pub fn idf(&self, dim_id: u32) -> f32 {
53 if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
54 let df = pl.doc_count() as f32;
55 if df > 0.0 {
56 (self.total_docs as f32 / df).ln()
57 } else {
58 0.0
59 }
60 } else {
61 0.0
62 }
63 }
64
65 pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
67 dim_ids.iter().map(|&d| self.idf(d)).collect()
68 }
69}
70
71pub struct AsyncSegmentReader {
77 meta: SegmentMeta,
78 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
80 postings_handle: LazyFileHandle,
82 store: Arc<AsyncStoreReader>,
84 schema: Arc<Schema>,
85 doc_id_offset: DocId,
87 vector_indexes: FxHashMap<u32, VectorIndex>,
89 coarse_centroids: Option<Arc<CoarseCentroids>>,
91 sparse_indexes: FxHashMap<u32, SparseIndex>,
93}
94
95impl AsyncSegmentReader {
96 pub async fn open<D: Directory>(
98 dir: &D,
99 segment_id: SegmentId,
100 schema: Arc<Schema>,
101 doc_id_offset: DocId,
102 cache_blocks: usize,
103 ) -> Result<Self> {
104 let files = SegmentFiles::new(segment_id.0);
105
106 let meta_slice = dir.open_read(&files.meta).await?;
108 let meta_bytes = meta_slice.read_bytes().await?;
109 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
110 debug_assert_eq!(meta.id, segment_id.0);
111
112 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
114 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
115
116 let postings_handle = dir.open_lazy(&files.postings).await?;
118
119 let store_handle = dir.open_lazy(&files.store).await?;
121 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
122
123 let (vector_indexes, coarse_centroids) =
125 Self::load_vectors_file(dir, &files, &schema).await?;
126
127 let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
129
130 Ok(Self {
131 meta,
132 term_dict: Arc::new(term_dict),
133 postings_handle,
134 store: Arc::new(store),
135 schema,
136 doc_id_offset,
137 vector_indexes,
138 coarse_centroids,
139 sparse_indexes,
140 })
141 }
142
143 pub fn meta(&self) -> &SegmentMeta {
144 &self.meta
145 }
146
147 pub fn num_docs(&self) -> u32 {
148 self.meta.num_docs
149 }
150
151 pub fn avg_field_len(&self, field: Field) -> f32 {
153 self.meta.avg_field_len(field)
154 }
155
156 pub fn doc_id_offset(&self) -> DocId {
157 self.doc_id_offset
158 }
159
160 pub fn schema(&self) -> &Schema {
161 &self.schema
162 }
163
164 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
166 &self.sparse_indexes
167 }
168
169 pub fn term_dict_stats(&self) -> SSTableStats {
171 self.term_dict.stats()
172 }
173
174 pub async fn get_postings(
179 &self,
180 field: Field,
181 term: &[u8],
182 ) -> Result<Option<BlockPostingList>> {
183 log::debug!(
184 "SegmentReader::get_postings field={} term_len={}",
185 field.0,
186 term.len()
187 );
188
189 let mut key = Vec::with_capacity(4 + term.len());
191 key.extend_from_slice(&field.0.to_le_bytes());
192 key.extend_from_slice(term);
193
194 let term_info = match self.term_dict.get(&key).await? {
196 Some(info) => {
197 log::debug!("SegmentReader::get_postings found term_info");
198 info
199 }
200 None => {
201 log::debug!("SegmentReader::get_postings term not found");
202 return Ok(None);
203 }
204 };
205
206 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
208 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
210 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
211 posting_list.push(doc_id, tf);
212 }
213 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
214 return Ok(Some(block_list));
215 }
216
217 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
219 Error::Corruption("TermInfo has neither inline nor external data".to_string())
220 })?;
221
222 let start = posting_offset;
223 let end = start + posting_len as u64;
224
225 if end > self.postings_handle.len() {
226 return Err(Error::Corruption(
227 "Posting offset out of bounds".to_string(),
228 ));
229 }
230
231 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
232 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
233
234 Ok(Some(block_list))
235 }
236
237 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
239 self.store
240 .get(local_doc_id, &self.schema)
241 .await
242 .map_err(Error::from)
243 }
244
245 pub async fn prefetch_terms(
247 &self,
248 field: Field,
249 start_term: &[u8],
250 end_term: &[u8],
251 ) -> Result<()> {
252 let mut start_key = Vec::with_capacity(4 + start_term.len());
253 start_key.extend_from_slice(&field.0.to_le_bytes());
254 start_key.extend_from_slice(start_term);
255
256 let mut end_key = Vec::with_capacity(4 + end_term.len());
257 end_key.extend_from_slice(&field.0.to_le_bytes());
258 end_key.extend_from_slice(end_term);
259
260 self.term_dict.prefetch_range(&start_key, &end_key).await?;
261 Ok(())
262 }
263
264 pub fn store_has_dict(&self) -> bool {
266 self.store.has_dict()
267 }
268
269 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
271 self.store.raw_blocks()
272 }
273
274 pub fn store_data_slice(&self) -> &LazyFileSlice {
276 self.store.data_slice()
277 }
278
279 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
281 self.term_dict.all_entries().await.map_err(Error::from)
282 }
283
284 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
289 let entries = self.term_dict.all_entries().await?;
290 let mut result = Vec::with_capacity(entries.len());
291
292 for (key, term_info) in entries {
293 if key.len() > 4 {
295 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
296 let term_bytes = &key[4..];
297 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
298 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
299 }
300 }
301 }
302
303 Ok(result)
304 }
305
306 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
308 self.term_dict.iter()
309 }
310
311 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
313 let start = offset;
314 let end = start + len as u64;
315 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
316 Ok(bytes.to_vec())
317 }
318
319 pub fn search_dense_vector(
325 &self,
326 field: Field,
327 query: &[f32],
328 k: usize,
329 rerank_factor: usize,
330 ) -> Result<Vec<(DocId, f32)>> {
331 let index = self
332 .vector_indexes
333 .get(&field.0)
334 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
335
336 let mrl_dim = self
338 .schema
339 .get_field_entry(field)
340 .and_then(|e| e.dense_vector_config.as_ref())
341 .and_then(|c| c.mrl_dim);
342
343 let query_vec: Vec<f32>;
345 let effective_query = if let Some(trim_dim) = mrl_dim {
346 if trim_dim < query.len() {
347 query_vec = query[..trim_dim].to_vec();
348 query_vec.as_slice()
349 } else {
350 query
351 }
352 } else {
353 query
354 };
355
356 let results: Vec<(u32, f32)> = match index {
357 VectorIndex::RaBitQ(rabitq) => rabitq
358 .search(effective_query, k, rerank_factor)
359 .into_iter()
360 .map(|(idx, dist)| (idx as u32, dist))
361 .collect(),
362 VectorIndex::IVF { index, codebook } => {
363 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
364 Error::Schema("IVF index requires coarse centroids".to_string())
365 })?;
366 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
368 }
369 VectorIndex::ScaNN { index, codebook } => {
370 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
371 Error::Schema("ScaNN index requires coarse centroids".to_string())
372 })?;
373 let nprobe = rerank_factor.max(32);
374 index.search(centroids, codebook, effective_query, k, Some(nprobe))
375 }
376 };
377
378 Ok(results
380 .into_iter()
381 .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
382 .collect())
383 }
384
385 pub fn has_dense_vector_index(&self, field: Field) -> bool {
387 self.vector_indexes.contains_key(&field.0)
388 }
389
390 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
392 match self.vector_indexes.get(&field.0) {
393 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
394 _ => None,
395 }
396 }
397
398 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
400 match self.vector_indexes.get(&field.0) {
401 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
402 _ => None,
403 }
404 }
405
406 pub fn get_scann_vector_index(
408 &self,
409 field: Field,
410 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
411 match self.vector_indexes.get(&field.0) {
412 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
413 _ => None,
414 }
415 }
416
417 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
419 self.vector_indexes.get(&field.0)
420 }
421
422 pub async fn search_sparse_vector(
432 &self,
433 field: Field,
434 vector: &[(u32, f32)],
435 limit: usize,
436 ) -> Result<Vec<(u32, f32)>> {
437 use crate::query::{SparseTermScorer, WandExecutor};
438
439 let sparse_index = match self.sparse_indexes.get(&field.0) {
441 Some(idx) => idx,
442 None => return Ok(Vec::new()),
443 };
444
445 let scorers: Vec<SparseTermScorer> = vector
447 .iter()
448 .filter_map(|&(dim_id, query_weight)| {
449 sparse_index
451 .postings
452 .get(dim_id as usize)
453 .and_then(|opt| opt.as_ref())
454 .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
455 })
456 .collect();
457
458 if scorers.is_empty() {
459 return Ok(Vec::new());
460 }
461
462 let results = WandExecutor::new(scorers, limit).execute();
464
465 Ok(results.into_iter().map(|r| (r.doc_id, r.score)).collect())
466 }
467
468 async fn load_vectors_file<D: Directory>(
476 dir: &D,
477 files: &SegmentFiles,
478 schema: &Schema,
479 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
480 use byteorder::{LittleEndian, ReadBytesExt};
481 use std::io::Cursor;
482
483 let mut indexes = FxHashMap::default();
484 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
485
486 let has_dense_vectors = schema
488 .fields()
489 .any(|(_, entry)| entry.dense_vector_config.is_some());
490 if !has_dense_vectors {
491 return Ok((indexes, None));
492 }
493
494 let handle = match dir.open_lazy(&files.vectors).await {
496 Ok(h) => h,
497 Err(_) => return Ok((indexes, None)),
498 };
499
500 let header_bytes = match handle.read_bytes_range(0..4).await {
502 Ok(b) => b,
503 Err(_) => return Ok((indexes, None)),
504 };
505
506 if header_bytes.is_empty() {
507 return Ok((indexes, None));
508 }
509
510 let mut cursor = Cursor::new(header_bytes.as_slice());
511 let num_fields = cursor.read_u32::<LittleEndian>()?;
512
513 if num_fields == 0 {
514 return Ok((indexes, None));
515 }
516
517 let entries_size = num_fields as u64 * 21;
519 let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
520 let mut cursor = Cursor::new(entries_bytes.as_slice());
521
522 let mut entries = Vec::with_capacity(num_fields as usize);
524 for _ in 0..num_fields {
525 let field_id = cursor.read_u32::<LittleEndian>()?;
526 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
529 let length = cursor.read_u64::<LittleEndian>()?;
530 entries.push((field_id, index_type, offset, length));
531 }
532
533 for (field_id, index_type, offset, length) in entries {
535 let data = handle.read_bytes_range(offset..offset + length).await?;
537 let field = crate::dsl::Field(field_id);
538
539 match index_type {
540 2 => {
541 if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
543 if coarse_centroids.is_none()
545 && let Some(entry) = schema.get_field_entry(field)
546 && let Some(ref config) = entry.dense_vector_config
547 && let Some(ref path) = config.coarse_centroids_path
548 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
549 {
550 coarse_centroids = Some(Arc::new(c));
551 }
552
553 if let Some(entry) = schema.get_field_entry(field)
555 && let Some(ref config) = entry.dense_vector_config
556 && let Some(ref path) = config.pq_codebook_path
557 && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
558 {
559 indexes.insert(
560 field_id,
561 VectorIndex::ScaNN {
562 index: Arc::new(ivfpq_index),
563 codebook: Arc::new(codebook),
564 },
565 );
566 }
567 }
568 }
569 1 => {
570 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
572 {
573 if coarse_centroids.is_none()
574 && let Some(entry) = schema.get_field_entry(field)
575 && let Some(ref config) = entry.dense_vector_config
576 && let Some(ref path) = config.coarse_centroids_path
577 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
578 {
579 coarse_centroids = Some(Arc::new(c));
580 }
581 let codebook = Arc::new(RaBitQCodebook::new(
583 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
584 ));
585 indexes.insert(
586 field_id,
587 VectorIndex::IVF {
588 index: Arc::new(ivf_index),
589 codebook,
590 },
591 );
592 }
593 }
594 0 => {
595 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
597 {
598 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
599 }
600 }
601 _ => {
602 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
604 {
605 if coarse_centroids.is_none()
606 && let Some(entry) = schema.get_field_entry(field)
607 && let Some(ref config) = entry.dense_vector_config
608 && let Some(ref path) = config.coarse_centroids_path
609 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
610 {
611 coarse_centroids = Some(Arc::new(c));
612 }
613 let codebook = Arc::new(RaBitQCodebook::new(
615 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
616 ));
617 indexes.insert(
618 field_id,
619 VectorIndex::IVF {
620 index: Arc::new(ivf_index),
621 codebook,
622 },
623 );
624 } else if let Ok(rabitq_index) =
625 serde_json::from_slice::<RaBitQIndex>(data.as_slice())
626 {
627 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
628 }
629 }
630 }
631 }
632
633 Ok((indexes, coarse_centroids))
634 }
635
636 async fn load_sparse_file<D: Directory>(
648 dir: &D,
649 files: &SegmentFiles,
650 total_docs: u32,
651 ) -> Result<FxHashMap<u32, SparseIndex>> {
652 use byteorder::{LittleEndian, ReadBytesExt};
653 use std::io::Cursor;
654
655 let mut indexes = FxHashMap::default();
656
657 let handle = match dir.open_lazy(&files.sparse).await {
659 Ok(h) => h,
660 Err(_) => return Ok(indexes),
661 };
662
663 let data = match handle.read_bytes().await {
665 Ok(d) => d,
666 Err(_) => return Ok(indexes),
667 };
668
669 if data.len() < 4 {
670 return Ok(indexes);
671 }
672
673 let mut cursor = Cursor::new(data.as_slice());
674 let num_fields = cursor.read_u32::<LittleEndian>()?;
675
676 if num_fields == 0 {
677 return Ok(indexes);
678 }
679
680 for _ in 0..num_fields {
682 let field_id = cursor.read_u32::<LittleEndian>()?;
683 let _quantization = cursor.read_u8()?; let max_dim_id = cursor.read_u32::<LittleEndian>()?;
685
686 let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
688 vec![None; max_dim_id as usize];
689
690 for dim_id in 0..max_dim_id {
691 let offset = cursor.read_u64::<LittleEndian>()?;
692 let length = cursor.read_u32::<LittleEndian>()?;
693
694 if length > 0 {
696 let start = offset as usize;
697 let end = start + length as usize;
698 if end <= data.len() {
699 let posting_data = &data.as_slice()[start..end];
700 if let Ok(posting_list) =
701 BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
702 {
703 postings[dim_id as usize] = Some(Arc::new(posting_list));
704 }
705 }
706 }
707 }
708
709 indexes.insert(
710 field_id,
711 SparseIndex {
712 postings,
713 total_docs,
714 },
715 );
716 }
717
718 Ok(indexes)
719 }
720}
721
722pub type SegmentReader = AsyncSegmentReader;