1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
11 RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22 RaBitQ(Arc<RaBitQIndex>),
24 IVF {
26 index: Arc<IVFRaBitQIndex>,
27 codebook: Arc<RaBitQCodebook>,
28 },
29 ScaNN {
31 index: Arc<IVFPQIndex>,
32 codebook: Arc<PQCodebook>,
33 },
34}
35
36pub struct AsyncSegmentReader {
42 meta: SegmentMeta,
43 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
45 postings_handle: LazyFileHandle,
47 store: Arc<AsyncStoreReader>,
49 schema: Arc<Schema>,
50 doc_id_offset: DocId,
52 vector_indexes: FxHashMap<u32, VectorIndex>,
54 coarse_centroids: Option<Arc<CoarseCentroids>>,
56}
57
58impl AsyncSegmentReader {
59 pub async fn open<D: Directory>(
61 dir: &D,
62 segment_id: SegmentId,
63 schema: Arc<Schema>,
64 doc_id_offset: DocId,
65 cache_blocks: usize,
66 ) -> Result<Self> {
67 let files = SegmentFiles::new(segment_id.0);
68
69 let meta_slice = dir.open_read(&files.meta).await?;
71 let meta_bytes = meta_slice.read_bytes().await?;
72 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
73 debug_assert_eq!(meta.id, segment_id.0);
74
75 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
77 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
78
79 let postings_handle = dir.open_lazy(&files.postings).await?;
81
82 let store_handle = dir.open_lazy(&files.store).await?;
84 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
85
86 let (vector_indexes, coarse_centroids) =
88 Self::load_vectors_file(dir, &files, &schema).await?;
89
90 Ok(Self {
91 meta,
92 term_dict: Arc::new(term_dict),
93 postings_handle,
94 store: Arc::new(store),
95 schema,
96 doc_id_offset,
97 vector_indexes,
98 coarse_centroids,
99 })
100 }
101
102 pub fn meta(&self) -> &SegmentMeta {
103 &self.meta
104 }
105
106 pub fn num_docs(&self) -> u32 {
107 self.meta.num_docs
108 }
109
110 pub fn avg_field_len(&self, field: Field) -> f32 {
112 self.meta.avg_field_len(field)
113 }
114
115 pub fn doc_id_offset(&self) -> DocId {
116 self.doc_id_offset
117 }
118
119 pub fn schema(&self) -> &Schema {
120 &self.schema
121 }
122
123 pub fn term_dict_stats(&self) -> SSTableStats {
125 self.term_dict.stats()
126 }
127
128 pub async fn get_postings(
133 &self,
134 field: Field,
135 term: &[u8],
136 ) -> Result<Option<BlockPostingList>> {
137 log::debug!(
138 "SegmentReader::get_postings field={} term_len={}",
139 field.0,
140 term.len()
141 );
142
143 let mut key = Vec::with_capacity(4 + term.len());
145 key.extend_from_slice(&field.0.to_le_bytes());
146 key.extend_from_slice(term);
147
148 let term_info = match self.term_dict.get(&key).await? {
150 Some(info) => {
151 log::debug!("SegmentReader::get_postings found term_info");
152 info
153 }
154 None => {
155 log::debug!("SegmentReader::get_postings term not found");
156 return Ok(None);
157 }
158 };
159
160 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
162 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
164 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
165 posting_list.push(doc_id, tf);
166 }
167 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
168 return Ok(Some(block_list));
169 }
170
171 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
173 Error::Corruption("TermInfo has neither inline nor external data".to_string())
174 })?;
175
176 let start = posting_offset;
177 let end = start + posting_len as u64;
178
179 if end > self.postings_handle.len() {
180 return Err(Error::Corruption(
181 "Posting offset out of bounds".to_string(),
182 ));
183 }
184
185 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
186 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
187
188 Ok(Some(block_list))
189 }
190
191 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
193 self.store
194 .get(local_doc_id, &self.schema)
195 .await
196 .map_err(Error::from)
197 }
198
199 pub async fn prefetch_terms(
201 &self,
202 field: Field,
203 start_term: &[u8],
204 end_term: &[u8],
205 ) -> Result<()> {
206 let mut start_key = Vec::with_capacity(4 + start_term.len());
207 start_key.extend_from_slice(&field.0.to_le_bytes());
208 start_key.extend_from_slice(start_term);
209
210 let mut end_key = Vec::with_capacity(4 + end_term.len());
211 end_key.extend_from_slice(&field.0.to_le_bytes());
212 end_key.extend_from_slice(end_term);
213
214 self.term_dict.prefetch_range(&start_key, &end_key).await?;
215 Ok(())
216 }
217
218 pub fn store_has_dict(&self) -> bool {
220 self.store.has_dict()
221 }
222
223 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
225 self.store.raw_blocks()
226 }
227
228 pub fn store_data_slice(&self) -> &LazyFileSlice {
230 self.store.data_slice()
231 }
232
233 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
235 self.term_dict.all_entries().await.map_err(Error::from)
236 }
237
238 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
240 self.term_dict.iter()
241 }
242
243 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
245 let start = offset;
246 let end = start + len as u64;
247 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
248 Ok(bytes.to_vec())
249 }
250
251 pub fn search_dense_vector(
257 &self,
258 field: Field,
259 query: &[f32],
260 k: usize,
261 rerank_factor: usize,
262 ) -> Result<Vec<(DocId, f32)>> {
263 let index = self
264 .vector_indexes
265 .get(&field.0)
266 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
267
268 let mrl_dim = self
270 .schema
271 .get_field_entry(field)
272 .and_then(|e| e.dense_vector_config.as_ref())
273 .and_then(|c| c.mrl_dim);
274
275 let query_vec: Vec<f32>;
277 let effective_query = if let Some(trim_dim) = mrl_dim {
278 if trim_dim < query.len() {
279 query_vec = query[..trim_dim].to_vec();
280 query_vec.as_slice()
281 } else {
282 query
283 }
284 } else {
285 query
286 };
287
288 let results: Vec<(u32, f32)> = match index {
289 VectorIndex::RaBitQ(rabitq) => rabitq
290 .search(effective_query, k, rerank_factor)
291 .into_iter()
292 .map(|(idx, dist)| (idx as u32, dist))
293 .collect(),
294 VectorIndex::IVF { index, codebook } => {
295 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
296 Error::Schema("IVF index requires coarse centroids".to_string())
297 })?;
298 let nprobe = rerank_factor.max(32); index.search(centroids, codebook, effective_query, k, Some(nprobe))
300 }
301 VectorIndex::ScaNN { index, codebook } => {
302 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
303 Error::Schema("ScaNN index requires coarse centroids".to_string())
304 })?;
305 let nprobe = rerank_factor.max(32);
306 index.search(centroids, codebook, effective_query, k, Some(nprobe))
307 }
308 };
309
310 Ok(results
312 .into_iter()
313 .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
314 .collect())
315 }
316
317 pub fn has_dense_vector_index(&self, field: Field) -> bool {
319 self.vector_indexes.contains_key(&field.0)
320 }
321
322 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
324 match self.vector_indexes.get(&field.0) {
325 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
326 _ => None,
327 }
328 }
329
330 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
332 match self.vector_indexes.get(&field.0) {
333 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
334 _ => None,
335 }
336 }
337
338 pub fn get_scann_vector_index(
340 &self,
341 field: Field,
342 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
343 match self.vector_indexes.get(&field.0) {
344 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
345 _ => None,
346 }
347 }
348
349 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
351 self.vector_indexes.get(&field.0)
352 }
353
354 pub async fn search_sparse_vector(
361 &self,
362 field: Field,
363 indices: &[u32],
364 weights: &[f32],
365 k: usize,
366 ) -> Result<Vec<(u32, f32)>> {
367 use rustc_hash::FxHashMap;
368
369 let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
370
371 for (&idx, &weight) in indices.iter().zip(weights.iter()) {
373 let term = format!("dim_{}", idx);
375
376 if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
377 let mut iter = postings.iterator();
380 while iter.doc() != crate::TERMINATED {
381 let doc_id = iter.doc();
382 let tf = iter.term_freq();
383 let stored_weight = tf as f32 / 1000.0;
385 *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
386 iter.advance();
387 }
388 }
389 }
390
391 let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
393 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
394 results.truncate(k);
395
396 Ok(results)
397 }
398
399 async fn load_vectors_file<D: Directory>(
407 dir: &D,
408 files: &SegmentFiles,
409 schema: &Schema,
410 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
411 use byteorder::{LittleEndian, ReadBytesExt};
412 use std::io::Cursor;
413
414 let mut indexes = FxHashMap::default();
415 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
416
417 let has_dense_vectors = schema
419 .fields()
420 .any(|(_, entry)| entry.dense_vector_config.is_some());
421 if !has_dense_vectors {
422 return Ok((indexes, None));
423 }
424
425 let handle = match dir.open_lazy(&files.vectors).await {
427 Ok(h) => h,
428 Err(_) => return Ok((indexes, None)),
429 };
430
431 let header_bytes = match handle.read_bytes_range(0..4).await {
433 Ok(b) => b,
434 Err(_) => return Ok((indexes, None)),
435 };
436
437 if header_bytes.is_empty() {
438 return Ok((indexes, None));
439 }
440
441 let mut cursor = Cursor::new(header_bytes.as_slice());
442 let num_fields = cursor.read_u32::<LittleEndian>()?;
443
444 if num_fields == 0 {
445 return Ok((indexes, None));
446 }
447
448 let entries_size = num_fields as u64 * 21;
450 let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
451 let mut cursor = Cursor::new(entries_bytes.as_slice());
452
453 let mut entries = Vec::with_capacity(num_fields as usize);
455 for _ in 0..num_fields {
456 let field_id = cursor.read_u32::<LittleEndian>()?;
457 let index_type = cursor.read_u8().unwrap_or(255); let offset = cursor.read_u64::<LittleEndian>()?;
460 let length = cursor.read_u64::<LittleEndian>()?;
461 entries.push((field_id, index_type, offset, length));
462 }
463
464 for (field_id, index_type, offset, length) in entries {
466 let data = handle.read_bytes_range(offset..offset + length).await?;
468 let field = crate::dsl::Field(field_id);
469
470 match index_type {
471 2 => {
472 if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
474 if coarse_centroids.is_none()
476 && let Some(entry) = schema.get_field_entry(field)
477 && let Some(ref config) = entry.dense_vector_config
478 && let Some(ref path) = config.coarse_centroids_path
479 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
480 {
481 coarse_centroids = Some(Arc::new(c));
482 }
483
484 if let Some(entry) = schema.get_field_entry(field)
486 && let Some(ref config) = entry.dense_vector_config
487 && let Some(ref path) = config.pq_codebook_path
488 && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
489 {
490 indexes.insert(
491 field_id,
492 VectorIndex::ScaNN {
493 index: Arc::new(ivfpq_index),
494 codebook: Arc::new(codebook),
495 },
496 );
497 }
498 }
499 }
500 1 => {
501 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
503 {
504 if coarse_centroids.is_none()
505 && let Some(entry) = schema.get_field_entry(field)
506 && let Some(ref config) = entry.dense_vector_config
507 && let Some(ref path) = config.coarse_centroids_path
508 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
509 {
510 coarse_centroids = Some(Arc::new(c));
511 }
512 let codebook = Arc::new(RaBitQCodebook::new(
514 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
515 ));
516 indexes.insert(
517 field_id,
518 VectorIndex::IVF {
519 index: Arc::new(ivf_index),
520 codebook,
521 },
522 );
523 }
524 }
525 0 => {
526 if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
528 {
529 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
530 }
531 }
532 _ => {
533 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
535 {
536 if coarse_centroids.is_none()
537 && let Some(entry) = schema.get_field_entry(field)
538 && let Some(ref config) = entry.dense_vector_config
539 && let Some(ref path) = config.coarse_centroids_path
540 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
541 {
542 coarse_centroids = Some(Arc::new(c));
543 }
544 let codebook = Arc::new(RaBitQCodebook::new(
546 crate::structures::RaBitQConfig::new(ivf_index.config.dim),
547 ));
548 indexes.insert(
549 field_id,
550 VectorIndex::IVF {
551 index: Arc::new(ivf_index),
552 codebook,
553 },
554 );
555 } else if let Ok(rabitq_index) =
556 serde_json::from_slice::<RaBitQIndex>(data.as_slice())
557 {
558 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
559 }
560 }
561 }
562 }
563
564 Ok((indexes, coarse_centroids))
565 }
566}
567
568pub type SegmentReader = AsyncSegmentReader;