1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
11 RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22 RaBitQ(Arc<RaBitQIndex>),
24 IVF {
26 index: Arc<IVFRaBitQIndex>,
27 codebook: Arc<RaBitQCodebook>,
28 },
29 ScaNN {
31 index: Arc<IVFPQIndex>,
32 codebook: Arc<PQCodebook>,
33 },
34}
35
36pub struct AsyncSegmentReader {
42 meta: SegmentMeta,
43 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
45 postings_handle: LazyFileHandle,
47 store: Arc<AsyncStoreReader>,
49 schema: Arc<Schema>,
50 doc_id_offset: DocId,
52 vector_indexes: FxHashMap<u32, VectorIndex>,
54 coarse_centroids: Option<Arc<CoarseCentroids>>,
56}
57
58impl AsyncSegmentReader {
59 pub async fn open<D: Directory>(
61 dir: &D,
62 segment_id: SegmentId,
63 schema: Arc<Schema>,
64 doc_id_offset: DocId,
65 cache_blocks: usize,
66 ) -> Result<Self> {
67 let files = SegmentFiles::new(segment_id.0);
68
69 let meta_slice = dir.open_read(&files.meta).await?;
71 let meta_bytes = meta_slice.read_bytes().await?;
72 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
73 debug_assert_eq!(meta.id, segment_id.0);
74
75 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
77 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
78
79 let postings_handle = dir.open_lazy(&files.postings).await?;
81
82 let store_handle = dir.open_lazy(&files.store).await?;
84 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
85
86 let (vector_indexes, coarse_centroids) =
88 Self::load_vectors_file(dir, &files, &schema).await?;
89
90 Ok(Self {
91 meta,
92 term_dict: Arc::new(term_dict),
93 postings_handle,
94 store: Arc::new(store),
95 schema,
96 doc_id_offset,
97 vector_indexes,
98 coarse_centroids,
99 })
100 }
101
102 pub fn meta(&self) -> &SegmentMeta {
103 &self.meta
104 }
105
106 pub fn num_docs(&self) -> u32 {
107 self.meta.num_docs
108 }
109
110 pub fn avg_field_len(&self, field: Field) -> f32 {
112 self.meta.avg_field_len(field)
113 }
114
115 pub fn doc_id_offset(&self) -> DocId {
116 self.doc_id_offset
117 }
118
119 pub fn schema(&self) -> &Schema {
120 &self.schema
121 }
122
123 pub fn term_dict_stats(&self) -> SSTableStats {
125 self.term_dict.stats()
126 }
127
128 pub async fn get_postings(
133 &self,
134 field: Field,
135 term: &[u8],
136 ) -> Result<Option<BlockPostingList>> {
137 log::debug!(
138 "SegmentReader::get_postings field={} term_len={}",
139 field.0,
140 term.len()
141 );
142
143 let mut key = Vec::with_capacity(4 + term.len());
145 key.extend_from_slice(&field.0.to_le_bytes());
146 key.extend_from_slice(term);
147
148 let term_info = match self.term_dict.get(&key).await? {
150 Some(info) => {
151 log::debug!("SegmentReader::get_postings found term_info");
152 info
153 }
154 None => {
155 log::debug!("SegmentReader::get_postings term not found");
156 return Ok(None);
157 }
158 };
159
160 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
162 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
164 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
165 posting_list.push(doc_id, tf);
166 }
167 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
168 return Ok(Some(block_list));
169 }
170
171 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
173 Error::Corruption("TermInfo has neither inline nor external data".to_string())
174 })?;
175
176 let start = posting_offset as usize;
177 let end = start + posting_len as usize;
178
179 if end > self.postings_handle.len() {
180 return Err(Error::Corruption(
181 "Posting offset out of bounds".to_string(),
182 ));
183 }
184
185 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
186 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
187
188 Ok(Some(block_list))
189 }
190
191 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
193 self.store
194 .get(local_doc_id, &self.schema)
195 .await
196 .map_err(Error::from)
197 }
198
199 pub async fn prefetch_terms(
201 &self,
202 field: Field,
203 start_term: &[u8],
204 end_term: &[u8],
205 ) -> Result<()> {
206 let mut start_key = Vec::with_capacity(4 + start_term.len());
207 start_key.extend_from_slice(&field.0.to_le_bytes());
208 start_key.extend_from_slice(start_term);
209
210 let mut end_key = Vec::with_capacity(4 + end_term.len());
211 end_key.extend_from_slice(&field.0.to_le_bytes());
212 end_key.extend_from_slice(end_term);
213
214 self.term_dict.prefetch_range(&start_key, &end_key).await?;
215 Ok(())
216 }
217
218 pub fn store_has_dict(&self) -> bool {
220 self.store.has_dict()
221 }
222
223 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
225 self.store.raw_blocks()
226 }
227
228 pub fn store_data_slice(&self) -> &LazyFileSlice {
230 self.store.data_slice()
231 }
232
233 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
235 self.term_dict.all_entries().await.map_err(Error::from)
236 }
237
238 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
240 let start = offset as usize;
241 let end = start + len as usize;
242 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
243 Ok(bytes.to_vec())
244 }
245
246 pub fn search_dense_vector(
252 &self,
253 field: Field,
254 query: &[f32],
255 k: usize,
256 rerank_factor: usize,
257 ) -> Result<Vec<(DocId, f32)>> {
258 let index = self
259 .vector_indexes
260 .get(&field.0)
261 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
262
263 let mrl_dim = self
265 .schema
266 .get_field_entry(field)
267 .and_then(|e| e.dense_vector_config.as_ref())
268 .and_then(|c| c.mrl_dim);
269
270 let query_vec: Vec<f32>;
272 let effective_query = if let Some(trim_dim) = mrl_dim {
273 if trim_dim < query.len() {
274 query_vec = query[..trim_dim].to_vec();
275 query_vec.as_slice()
276 } else {
277 query
278 }
279 } else {
280 query
281 };
282
283 let results: Vec<(u32, f32)> = match index {
284 VectorIndex::RaBitQ(rabitq) => rabitq
285 .search(effective_query, k, rerank_factor)
286 .into_iter()
287 .map(|(idx, dist)| (idx as u32, dist))
288 .collect(),
289 VectorIndex::IVF { index, codebook } => {
290 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
291 Error::Schema("IVF index requires coarse centroids".to_string())
292 })?;
293 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
295 }
296 VectorIndex::ScaNN { index, codebook } => {
297 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
298 Error::Schema("ScaNN index requires coarse centroids".to_string())
299 })?;
300 let nprobe = rerank_factor.max(32);
301 index.search(centroids, codebook, effective_query, k, Some(nprobe))
302 }
303 };
304
305 Ok(results
307 .into_iter()
308 .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
309 .collect())
310 }
311
312 pub fn has_dense_vector_index(&self, field: Field) -> bool {
314 self.vector_indexes.contains_key(&field.0)
315 }
316
317 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
319 match self.vector_indexes.get(&field.0) {
320 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
321 _ => None,
322 }
323 }
324
325 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
327 match self.vector_indexes.get(&field.0) {
328 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
329 _ => None,
330 }
331 }
332
333 pub fn get_scann_vector_index(
335 &self,
336 field: Field,
337 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
338 match self.vector_indexes.get(&field.0) {
339 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
340 _ => None,
341 }
342 }
343
344 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
346 self.vector_indexes.get(&field.0)
347 }
348
349 pub async fn search_sparse_vector(
356 &self,
357 field: Field,
358 indices: &[u32],
359 weights: &[f32],
360 k: usize,
361 ) -> Result<Vec<(u32, f32)>> {
362 use rustc_hash::FxHashMap;
363
364 let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
365
366 for (&idx, &weight) in indices.iter().zip(weights.iter()) {
368 let term = format!("dim_{}", idx);
370
371 if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
372 let mut iter = postings.iterator();
375 while iter.doc() != crate::TERMINATED {
376 let doc_id = iter.doc();
377 let tf = iter.term_freq();
378 let stored_weight = tf as f32 / 1000.0;
380 *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
381 iter.advance();
382 }
383 }
384 }
385
386 let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
388 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
389 results.truncate(k);
390
391 Ok(results)
392 }
393
394 async fn load_vectors_file<D: Directory>(
399 dir: &D,
400 files: &SegmentFiles,
401 schema: &Schema,
402 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
403 use byteorder::{LittleEndian, ReadBytesExt};
404 use std::io::Cursor;
405
406 let mut indexes = FxHashMap::default();
407 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
408
409 let handle = match dir.open_lazy(&files.vectors).await {
411 Ok(h) => h,
412 Err(_) => return Ok((indexes, None)),
413 };
414
415 let bytes = match handle.read_bytes().await {
416 Ok(b) => b,
417 Err(_) => return Ok((indexes, None)),
418 };
419
420 if bytes.is_empty() {
421 return Ok((indexes, None));
422 }
423
424 let mut cursor = Cursor::new(bytes.as_slice());
425
426 let num_fields = cursor.read_u32::<LittleEndian>()?;
428
429 let mut entries = Vec::with_capacity(num_fields as usize);
431 for _ in 0..num_fields {
432 let field_id = cursor.read_u32::<LittleEndian>()?;
433 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
436 let length = cursor.read_u64::<LittleEndian>()?;
437 entries.push((field_id, index_type, offset as usize, length as usize));
438 }
439
440 for (field_id, index_type, offset, length) in entries {
442 let data = &bytes.as_slice()[offset..offset + length];
443 let field = crate::dsl::Field(field_id);
444
445 match index_type {
446 2 => {
447 if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data) {
449 if coarse_centroids.is_none()
451 && let Some(entry) = schema.get_field_entry(field)
452 && let Some(ref config) = entry.dense_vector_config
453 && let Some(ref path) = config.coarse_centroids_path
454 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
455 {
456 coarse_centroids = Some(Arc::new(c));
457 }
458
459 if let Some(entry) = schema.get_field_entry(field)
461 && let Some(ref config) = entry.dense_vector_config
462 && let Some(ref path) = config.pq_codebook_path
463 && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
464 {
465 indexes.insert(
466 field_id,
467 VectorIndex::ScaNN {
468 index: Arc::new(ivfpq_index),
469 codebook: Arc::new(codebook),
470 },
471 );
472 }
473 }
474 }
475 1 => {
476 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
478 if coarse_centroids.is_none()
479 && let Some(entry) = schema.get_field_entry(field)
480 && let Some(ref config) = entry.dense_vector_config
481 && let Some(ref path) = config.coarse_centroids_path
482 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
483 {
484 coarse_centroids = Some(Arc::new(c));
485 }
486 let codebook = Arc::new(RaBitQCodebook::new(
488 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
489 ));
490 indexes.insert(
491 field_id,
492 VectorIndex::IVF {
493 index: Arc::new(ivf_index),
494 codebook,
495 },
496 );
497 }
498 }
499 0 => {
500 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
502 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
503 }
504 }
505 _ => {
506 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
508 if coarse_centroids.is_none()
509 && let Some(entry) = schema.get_field_entry(field)
510 && let Some(ref config) = entry.dense_vector_config
511 && let Some(ref path) = config.coarse_centroids_path
512 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
513 {
514 coarse_centroids = Some(Arc::new(c));
515 }
516 let codebook = Arc::new(RaBitQCodebook::new(
518 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
519 ));
520 indexes.insert(
521 field_id,
522 VectorIndex::IVF {
523 index: Arc::new(ivf_index),
524 codebook,
525 },
526 );
527 } else if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
528 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
529 }
530 }
531 }
532 }
533
534 Ok((indexes, coarse_centroids))
535 }
536}
537
538pub type SegmentReader = AsyncSegmentReader;