1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFRaBitQIndex, RaBitQIndex,
11 SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22 RaBitQ(Arc<RaBitQIndex>),
23 IVF(Arc<IVFRaBitQIndex>),
24}
25
26pub struct AsyncSegmentReader {
32 meta: SegmentMeta,
33 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
35 postings_handle: LazyFileHandle,
37 store: Arc<AsyncStoreReader>,
39 schema: Arc<Schema>,
40 doc_id_offset: DocId,
42 vector_indexes: FxHashMap<u32, VectorIndex>,
44 coarse_centroids: Option<Arc<CoarseCentroids>>,
46}
47
48impl AsyncSegmentReader {
49 pub async fn open<D: Directory>(
51 dir: &D,
52 segment_id: SegmentId,
53 schema: Arc<Schema>,
54 doc_id_offset: DocId,
55 cache_blocks: usize,
56 ) -> Result<Self> {
57 let files = SegmentFiles::new(segment_id.0);
58
59 let meta_slice = dir.open_read(&files.meta).await?;
61 let meta_bytes = meta_slice.read_bytes().await?;
62 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
63 debug_assert_eq!(meta.id, segment_id.0);
64
65 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
67 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
68
69 let postings_handle = dir.open_lazy(&files.postings).await?;
71
72 let store_handle = dir.open_lazy(&files.store).await?;
74 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
75
76 let (vector_indexes, coarse_centroids) =
78 Self::load_vectors_file(dir, &files, &schema).await?;
79
80 Ok(Self {
81 meta,
82 term_dict: Arc::new(term_dict),
83 postings_handle,
84 store: Arc::new(store),
85 schema,
86 doc_id_offset,
87 vector_indexes,
88 coarse_centroids,
89 })
90 }
91
92 pub fn meta(&self) -> &SegmentMeta {
93 &self.meta
94 }
95
96 pub fn num_docs(&self) -> u32 {
97 self.meta.num_docs
98 }
99
100 pub fn avg_field_len(&self, field: Field) -> f32 {
102 self.meta.avg_field_len(field)
103 }
104
105 pub fn doc_id_offset(&self) -> DocId {
106 self.doc_id_offset
107 }
108
109 pub fn schema(&self) -> &Schema {
110 &self.schema
111 }
112
113 pub fn term_dict_stats(&self) -> SSTableStats {
115 self.term_dict.stats()
116 }
117
118 pub async fn get_postings(
123 &self,
124 field: Field,
125 term: &[u8],
126 ) -> Result<Option<BlockPostingList>> {
127 log::debug!(
128 "SegmentReader::get_postings field={} term_len={}",
129 field.0,
130 term.len()
131 );
132
133 let mut key = Vec::with_capacity(4 + term.len());
135 key.extend_from_slice(&field.0.to_le_bytes());
136 key.extend_from_slice(term);
137
138 let term_info = match self.term_dict.get(&key).await? {
140 Some(info) => {
141 log::debug!("SegmentReader::get_postings found term_info");
142 info
143 }
144 None => {
145 log::debug!("SegmentReader::get_postings term not found");
146 return Ok(None);
147 }
148 };
149
150 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
152 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
154 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
155 posting_list.push(doc_id, tf);
156 }
157 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
158 return Ok(Some(block_list));
159 }
160
161 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
163 Error::Corruption("TermInfo has neither inline nor external data".to_string())
164 })?;
165
166 let start = posting_offset as usize;
167 let end = start + posting_len as usize;
168
169 if end > self.postings_handle.len() {
170 return Err(Error::Corruption(
171 "Posting offset out of bounds".to_string(),
172 ));
173 }
174
175 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
176 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
177
178 Ok(Some(block_list))
179 }
180
181 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
183 self.store
184 .get(local_doc_id, &self.schema)
185 .await
186 .map_err(Error::from)
187 }
188
189 pub async fn prefetch_terms(
191 &self,
192 field: Field,
193 start_term: &[u8],
194 end_term: &[u8],
195 ) -> Result<()> {
196 let mut start_key = Vec::with_capacity(4 + start_term.len());
197 start_key.extend_from_slice(&field.0.to_le_bytes());
198 start_key.extend_from_slice(start_term);
199
200 let mut end_key = Vec::with_capacity(4 + end_term.len());
201 end_key.extend_from_slice(&field.0.to_le_bytes());
202 end_key.extend_from_slice(end_term);
203
204 self.term_dict.prefetch_range(&start_key, &end_key).await?;
205 Ok(())
206 }
207
208 pub fn store_has_dict(&self) -> bool {
210 self.store.has_dict()
211 }
212
213 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
215 self.store.raw_blocks()
216 }
217
218 pub fn store_data_slice(&self) -> &LazyFileSlice {
220 self.store.data_slice()
221 }
222
223 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
225 self.term_dict.all_entries().await.map_err(Error::from)
226 }
227
228 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
230 let start = offset as usize;
231 let end = start + len as usize;
232 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
233 Ok(bytes.to_vec())
234 }
235
236 pub fn search_dense_vector(
241 &self,
242 field: Field,
243 query: &[f32],
244 k: usize,
245 rerank_factor: usize,
246 ) -> Result<Vec<(DocId, f32)>> {
247 let index = self
248 .vector_indexes
249 .get(&field.0)
250 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
251
252 let results: Vec<(u32, f32)> = match index {
253 VectorIndex::RaBitQ(rabitq) => rabitq
254 .search(query, k, rerank_factor)
255 .into_iter()
256 .map(|(idx, dist)| (idx as u32, dist))
257 .collect(),
258 VectorIndex::IVF(ivf) => {
259 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
260 Error::Schema("IVF index requires coarse centroids".to_string())
261 })?;
262 let nprobe = rerank_factor.max(32); ivf.search(centroids, query, k, nprobe)
264 }
265 };
266
267 Ok(results
269 .into_iter()
270 .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
271 .collect())
272 }
273
274 pub fn has_dense_vector_index(&self, field: Field) -> bool {
276 self.vector_indexes.contains_key(&field.0)
277 }
278
279 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
281 match self.vector_indexes.get(&field.0) {
282 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
283 _ => None,
284 }
285 }
286
287 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
289 match self.vector_indexes.get(&field.0) {
290 Some(VectorIndex::IVF(idx)) => Some(idx.clone()),
291 _ => None,
292 }
293 }
294
295 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
297 self.vector_indexes.get(&field.0)
298 }
299
300 pub async fn search_sparse_vector(
307 &self,
308 field: Field,
309 indices: &[u32],
310 weights: &[f32],
311 k: usize,
312 ) -> Result<Vec<(u32, f32)>> {
313 use rustc_hash::FxHashMap;
314
315 let mut doc_scores: FxHashMap<DocId, f32> = FxHashMap::default();
316
317 for (&idx, &weight) in indices.iter().zip(weights.iter()) {
319 let term = format!("dim_{}", idx);
321
322 if let Some(postings) = self.get_postings(field, term.as_bytes()).await? {
323 let mut iter = postings.iterator();
326 while iter.doc() != crate::TERMINATED {
327 let doc_id = iter.doc();
328 let tf = iter.term_freq();
329 let stored_weight = tf as f32 / 1000.0;
331 *doc_scores.entry(doc_id).or_insert(0.0) += weight * stored_weight;
332 iter.advance();
333 }
334 }
335 }
336
337 let mut results: Vec<(u32, f32)> = doc_scores.into_iter().collect();
339 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
340 results.truncate(k);
341
342 Ok(results)
343 }
344
345 async fn load_vectors_file<D: Directory>(
350 dir: &D,
351 files: &SegmentFiles,
352 schema: &Schema,
353 ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
354 use byteorder::{LittleEndian, ReadBytesExt};
355 use std::io::Cursor;
356
357 let mut indexes = FxHashMap::default();
358 let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
359
360 let handle = match dir.open_lazy(&files.vectors).await {
362 Ok(h) => h,
363 Err(_) => return Ok((indexes, None)),
364 };
365
366 let bytes = match handle.read_bytes().await {
367 Ok(b) => b,
368 Err(_) => return Ok((indexes, None)),
369 };
370
371 if bytes.is_empty() {
372 return Ok((indexes, None));
373 }
374
375 let mut cursor = Cursor::new(bytes.as_slice());
376
377 let num_fields = cursor.read_u32::<LittleEndian>()?;
379
380 let mut entries = Vec::with_capacity(num_fields as usize);
382 for _ in 0..num_fields {
383 let field_id = cursor.read_u32::<LittleEndian>()?;
384 let offset = cursor.read_u64::<LittleEndian>()?;
385 let length = cursor.read_u64::<LittleEndian>()?;
386 entries.push((field_id, offset as usize, length as usize));
387 }
388
389 for (field_id, offset, length) in entries {
391 let data = &bytes.as_slice()[offset..offset + length];
392
393 if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data) {
395 if coarse_centroids.is_none() {
397 let field = crate::dsl::Field(field_id);
398 if let Some(entry) = schema.get_field_entry(field)
399 && let Some(ref config) = entry.dense_vector_config
400 && let Some(ref path) = config.coarse_centroids_path
401 && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
402 {
403 coarse_centroids = Some(Arc::new(c));
404 }
405 }
406 indexes.insert(field_id, VectorIndex::IVF(Arc::new(ivf_index)));
407 } else if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data) {
408 indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
409 }
410 }
411
412 Ok((indexes, coarse_centroids))
413 }
414}
415
416pub type SegmentReader = AsyncSegmentReader;