1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8use std::sync::Arc;
9
10use rustc_hash::FxHashMap;
11
12use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
13use crate::dsl::{Document, Field, Schema};
14use crate::structures::{
15 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
16 RaBitQIndex, SSTableStats, TermInfo,
17};
18use crate::{DocId, Error, Result};
19
20use super::store::{AsyncStoreReader, RawStoreBlock};
21use super::types::{SegmentFiles, SegmentId, SegmentMeta};
22
23pub struct AsyncSegmentReader {
29 meta: SegmentMeta,
30 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
32 postings_handle: LazyFileHandle,
34 store: Arc<AsyncStoreReader>,
36 schema: Arc<Schema>,
37 doc_id_offset: DocId,
39 vector_indexes: FxHashMap<u32, VectorIndex>,
41 coarse_centroids: Option<Arc<CoarseCentroids>>,
43 sparse_indexes: FxHashMap<u32, SparseIndex>,
45 positions_handle: Option<LazyFileHandle>,
47}
48
49impl AsyncSegmentReader {
50 pub async fn open<D: Directory>(
52 dir: &D,
53 segment_id: SegmentId,
54 schema: Arc<Schema>,
55 doc_id_offset: DocId,
56 cache_blocks: usize,
57 ) -> Result<Self> {
58 let files = SegmentFiles::new(segment_id.0);
59
60 let meta_slice = dir.open_read(&files.meta).await?;
62 let meta_bytes = meta_slice.read_bytes().await?;
63 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
64 debug_assert_eq!(meta.id, segment_id.0);
65
66 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
68 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
69
70 let postings_handle = dir.open_lazy(&files.postings).await?;
72
73 let store_handle = dir.open_lazy(&files.store).await?;
75 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
76
77 let (vector_indexes, coarse_centroids) =
79 loader::load_vectors_file(dir, &files, &schema).await?;
80
81 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
83
84 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
86
87 Ok(Self {
88 meta,
89 term_dict: Arc::new(term_dict),
90 postings_handle,
91 store: Arc::new(store),
92 schema,
93 doc_id_offset,
94 vector_indexes,
95 coarse_centroids,
96 sparse_indexes,
97 positions_handle,
98 })
99 }
100
101 pub fn meta(&self) -> &SegmentMeta {
102 &self.meta
103 }
104
105 pub fn num_docs(&self) -> u32 {
106 self.meta.num_docs
107 }
108
109 pub fn avg_field_len(&self, field: Field) -> f32 {
111 self.meta.avg_field_len(field)
112 }
113
114 pub fn doc_id_offset(&self) -> DocId {
115 self.doc_id_offset
116 }
117
118 pub fn schema(&self) -> &Schema {
119 &self.schema
120 }
121
122 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
124 &self.sparse_indexes
125 }
126
127 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
129 &self.vector_indexes
130 }
131
132 pub fn term_dict_stats(&self) -> SSTableStats {
134 self.term_dict.stats()
135 }
136
137 pub async fn get_postings(
142 &self,
143 field: Field,
144 term: &[u8],
145 ) -> Result<Option<BlockPostingList>> {
146 log::debug!(
147 "SegmentReader::get_postings field={} term_len={}",
148 field.0,
149 term.len()
150 );
151
152 let mut key = Vec::with_capacity(4 + term.len());
154 key.extend_from_slice(&field.0.to_le_bytes());
155 key.extend_from_slice(term);
156
157 let term_info = match self.term_dict.get(&key).await? {
159 Some(info) => {
160 log::debug!("SegmentReader::get_postings found term_info");
161 info
162 }
163 None => {
164 log::debug!("SegmentReader::get_postings term not found");
165 return Ok(None);
166 }
167 };
168
169 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
171 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
173 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
174 posting_list.push(doc_id, tf);
175 }
176 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
177 return Ok(Some(block_list));
178 }
179
180 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
182 Error::Corruption("TermInfo has neither inline nor external data".to_string())
183 })?;
184
185 let start = posting_offset;
186 let end = start + posting_len as u64;
187
188 if end > self.postings_handle.len() {
189 return Err(Error::Corruption(
190 "Posting offset out of bounds".to_string(),
191 ));
192 }
193
194 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
195 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
196
197 Ok(Some(block_list))
198 }
199
200 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
202 self.store
203 .get(local_doc_id, &self.schema)
204 .await
205 .map_err(Error::from)
206 }
207
208 pub async fn prefetch_terms(
210 &self,
211 field: Field,
212 start_term: &[u8],
213 end_term: &[u8],
214 ) -> Result<()> {
215 let mut start_key = Vec::with_capacity(4 + start_term.len());
216 start_key.extend_from_slice(&field.0.to_le_bytes());
217 start_key.extend_from_slice(start_term);
218
219 let mut end_key = Vec::with_capacity(4 + end_term.len());
220 end_key.extend_from_slice(&field.0.to_le_bytes());
221 end_key.extend_from_slice(end_term);
222
223 self.term_dict.prefetch_range(&start_key, &end_key).await?;
224 Ok(())
225 }
226
227 pub fn store_has_dict(&self) -> bool {
229 self.store.has_dict()
230 }
231
232 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
234 self.store.raw_blocks()
235 }
236
237 pub fn store_data_slice(&self) -> &LazyFileSlice {
239 self.store.data_slice()
240 }
241
242 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
244 self.term_dict.all_entries().await.map_err(Error::from)
245 }
246
247 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
252 let entries = self.term_dict.all_entries().await?;
253 let mut result = Vec::with_capacity(entries.len());
254
255 for (key, term_info) in entries {
256 if key.len() > 4 {
258 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
259 let term_bytes = &key[4..];
260 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
261 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
262 }
263 }
264 }
265
266 Ok(result)
267 }
268
269 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
271 self.term_dict.iter()
272 }
273
274 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
276 let start = offset;
277 let end = start + len as u64;
278 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
279 Ok(bytes.to_vec())
280 }
281
282 pub fn search_dense_vector(
289 &self,
290 field: Field,
291 query: &[f32],
292 k: usize,
293 rerank_factor: usize,
294 combiner: crate::query::MultiValueCombiner,
295 ) -> Result<Vec<VectorSearchResult>> {
296 use crate::query::MultiValueCombiner;
297 let index = self
298 .vector_indexes
299 .get(&field.0)
300 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
301
302 let mrl_dim = self
304 .schema
305 .get_field_entry(field)
306 .and_then(|e| e.dense_vector_config.as_ref())
307 .and_then(|c| c.mrl_dim);
308
309 let query_vec: Vec<f32>;
311 let effective_query = if let Some(trim_dim) = mrl_dim {
312 if trim_dim < query.len() {
313 query_vec = query[..trim_dim].to_vec();
314 query_vec.as_slice()
315 } else {
316 query
317 }
318 } else {
319 query
320 };
321
322 let results: Vec<(u32, u16, f32)> = match index {
324 VectorIndex::Flat(flat_data) => {
325 use crate::structures::simd::squared_euclidean_distance;
327
328 let mut candidates: Vec<(u32, u16, f32)> = flat_data
329 .vectors
330 .iter()
331 .zip(flat_data.doc_ids.iter())
332 .map(|(vec, &(doc_id, ordinal))| {
333 let dist = squared_euclidean_distance(effective_query, vec);
334 (doc_id, ordinal, dist)
335 })
336 .collect();
337 candidates
338 .sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
339 candidates.truncate(k);
340 candidates
341 }
342 VectorIndex::RaBitQ(rabitq) => rabitq.search(effective_query, k, rerank_factor),
343 VectorIndex::IVF { index, codebook } => {
344 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
345 Error::Schema("IVF index requires coarse centroids".to_string())
346 })?;
347 let nprobe = rerank_factor.max(32); index
349 .search(centroids, codebook, effective_query, k, Some(nprobe))
350 .into_iter()
351 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
353 }
354 VectorIndex::ScaNN { index, codebook } => {
355 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
356 Error::Schema("ScaNN index requires coarse centroids".to_string())
357 })?;
358 let nprobe = rerank_factor.max(32);
359 index
360 .search(centroids, codebook, effective_query, k, Some(nprobe))
361 .into_iter()
362 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
364 }
365 };
366
367 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
371 rustc_hash::FxHashMap::default();
372 for (doc_id, ordinal, dist) in results {
373 let doc_id = doc_id as DocId + self.doc_id_offset;
374 let score = 1.0 / (1.0 + dist); let ordinals = doc_ordinals.entry(doc_id).or_default();
376 ordinals.push((ordinal as u32, score));
377 }
378
379 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
381 .into_iter()
382 .map(|(doc_id, ordinals)| {
383 let combined_score = match combiner {
384 MultiValueCombiner::Sum => ordinals.iter().map(|(_, s)| s).sum(),
385 MultiValueCombiner::Max => ordinals
386 .iter()
387 .map(|(_, s)| *s)
388 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
389 .unwrap_or(0.0),
390 MultiValueCombiner::Avg => {
391 let sum: f32 = ordinals.iter().map(|(_, s)| s).sum();
392 sum / ordinals.len() as f32
393 }
394 };
395 VectorSearchResult::new(doc_id, combined_score, ordinals)
396 })
397 .collect();
398
399 final_results.sort_by(|a, b| {
401 b.score
402 .partial_cmp(&a.score)
403 .unwrap_or(std::cmp::Ordering::Equal)
404 });
405 final_results.truncate(k);
406
407 Ok(final_results)
408 }
409
410 pub fn has_dense_vector_index(&self, field: Field) -> bool {
412 self.vector_indexes.contains_key(&field.0)
413 }
414
415 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
417 match self.vector_indexes.get(&field.0) {
418 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
419 _ => None,
420 }
421 }
422
423 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
425 match self.vector_indexes.get(&field.0) {
426 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
427 _ => None,
428 }
429 }
430
431 pub fn get_scann_vector_index(
433 &self,
434 field: Field,
435 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
436 match self.vector_indexes.get(&field.0) {
437 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
438 _ => None,
439 }
440 }
441
442 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
444 self.vector_indexes.get(&field.0)
445 }
446
447 pub async fn search_sparse_vector(
457 &self,
458 field: Field,
459 vector: &[(u32, f32)],
460 limit: usize,
461 combiner: crate::query::MultiValueCombiner,
462 ) -> Result<Vec<VectorSearchResult>> {
463 use crate::query::{MultiValueCombiner, SparseTermScorer, WandExecutor};
464
465 let query_tokens = vector.len();
466
467 let sparse_index = match self.sparse_indexes.get(&field.0) {
469 Some(idx) => idx,
470 None => {
471 log::debug!(
472 "Sparse vector search: no index for field {}, returning empty",
473 field.0
474 );
475 return Ok(Vec::new());
476 }
477 };
478
479 let index_dimensions = sparse_index.postings.len();
480
481 let mut matched_tokens = Vec::new();
483 let mut missing_tokens = Vec::new();
484
485 let scorers: Vec<SparseTermScorer> = vector
486 .iter()
487 .filter_map(|&(dim_id, query_weight)| {
488 match sparse_index
490 .postings
491 .get(dim_id as usize)
492 .and_then(|opt| opt.as_ref())
493 {
494 Some(pl) => {
495 matched_tokens.push(dim_id);
496 Some(SparseTermScorer::from_arc(pl, query_weight))
497 }
498 None => {
499 missing_tokens.push(dim_id);
500 None
501 }
502 }
503 })
504 .collect();
505
506 log::debug!(
507 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
508 query_tokens,
509 matched_tokens.len(),
510 missing_tokens.len(),
511 index_dimensions
512 );
513
514 if log::log_enabled!(log::Level::Debug) {
516 let query_details: Vec<_> = vector
517 .iter()
518 .take(30)
519 .map(|(id, w)| format!("{}:{:.3}", id, w))
520 .collect();
521 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
522 }
523
524 if !matched_tokens.is_empty() {
525 log::debug!(
526 "Matched token IDs: {:?}",
527 matched_tokens.iter().take(20).collect::<Vec<_>>()
528 );
529 }
530
531 if !missing_tokens.is_empty() {
532 log::debug!(
533 "Missing token IDs (not in index): {:?}",
534 missing_tokens.iter().take(20).collect::<Vec<_>>()
535 );
536 }
537
538 if scorers.is_empty() {
539 log::debug!("Sparse vector search: no matching tokens, returning empty");
540 return Ok(Vec::new());
541 }
542
543 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); log::trace!(
549 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
550 raw_results.len(),
551 self.doc_id_offset
552 );
553 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
554 for r in raw_results.iter().take(5) {
555 log::trace!(
556 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
557 r.doc_id,
558 r.doc_id + self.doc_id_offset,
559 r.score,
560 r.ordinal
561 );
562 }
563 }
564
565 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
568 rustc_hash::FxHashMap::default();
569 for r in raw_results {
570 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
571 ordinals.push((r.ordinal as u32, r.score));
572 }
573
574 let mut results: Vec<VectorSearchResult> = doc_ordinals
577 .into_iter()
578 .map(|(doc_id, ordinals)| {
579 let global_doc_id = doc_id + self.doc_id_offset;
580 let combined_score = match combiner {
581 MultiValueCombiner::Sum => ordinals.iter().map(|(_, s)| s).sum(),
582 MultiValueCombiner::Max => ordinals
583 .iter()
584 .map(|(_, s)| *s)
585 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
586 .unwrap_or(0.0),
587 MultiValueCombiner::Avg => {
588 let sum: f32 = ordinals.iter().map(|(_, s)| s).sum();
589 sum / ordinals.len() as f32
590 }
591 };
592 VectorSearchResult::new(global_doc_id, combined_score, ordinals)
593 })
594 .collect();
595
596 results.sort_by(|a, b| {
598 b.score
599 .partial_cmp(&a.score)
600 .unwrap_or(std::cmp::Ordering::Equal)
601 });
602 results.truncate(limit);
603
604 Ok(results)
605 }
606
607 pub async fn get_positions(
612 &self,
613 field: Field,
614 term: &[u8],
615 ) -> Result<Option<crate::structures::PositionPostingList>> {
616 use std::io::Cursor;
617
618 let handle = match &self.positions_handle {
620 Some(h) => h,
621 None => return Ok(None),
622 };
623
624 let mut key = Vec::with_capacity(4 + term.len());
626 key.extend_from_slice(&field.0.to_le_bytes());
627 key.extend_from_slice(term);
628
629 let term_info = match self.term_dict.get(&key).await? {
631 Some(info) => info,
632 None => return Ok(None),
633 };
634
635 let (offset, length) = match term_info.position_info() {
637 Some((o, l)) => (o, l),
638 None => return Ok(None),
639 };
640
641 let slice = handle.slice(offset..offset + length as u64);
643 let data = slice.read_bytes().await?;
644
645 let mut cursor = Cursor::new(data.as_slice());
647 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
648
649 Ok(Some(pos_list))
650 }
651
652 pub fn has_positions(&self, field: Field) -> bool {
654 if let Some(entry) = self.schema.get_field_entry(field) {
656 entry.positions.is_some()
657 } else {
658 false
659 }
660 }
661}
662
663pub type SegmentReader = AsyncSegmentReader;