1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8use crate::structures::BlockSparsePostingList;
9
10use std::sync::Arc;
11
12use rustc_hash::FxHashMap;
13
14use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
15use crate::dsl::{Document, Field, Schema};
16use crate::structures::{
17 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
18 RaBitQIndex, SSTableStats, TermInfo,
19};
20use crate::{DocId, Error, Result};
21
22use super::store::{AsyncStoreReader, RawStoreBlock};
23use super::types::{SegmentFiles, SegmentId, SegmentMeta};
24
25pub struct AsyncSegmentReader {
31 meta: SegmentMeta,
32 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
34 postings_handle: LazyFileHandle,
36 store: Arc<AsyncStoreReader>,
38 schema: Arc<Schema>,
39 doc_id_offset: DocId,
41 vector_indexes: FxHashMap<u32, VectorIndex>,
43 coarse_centroids: Option<Arc<CoarseCentroids>>,
45 sparse_indexes: FxHashMap<u32, SparseIndex>,
47 positions_handle: Option<LazyFileHandle>,
49}
50
51impl AsyncSegmentReader {
52 pub async fn open<D: Directory>(
54 dir: &D,
55 segment_id: SegmentId,
56 schema: Arc<Schema>,
57 doc_id_offset: DocId,
58 cache_blocks: usize,
59 ) -> Result<Self> {
60 let files = SegmentFiles::new(segment_id.0);
61
62 let meta_slice = dir.open_read(&files.meta).await?;
64 let meta_bytes = meta_slice.read_bytes().await?;
65 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
66 debug_assert_eq!(meta.id, segment_id.0);
67
68 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
70 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
71
72 let postings_handle = dir.open_lazy(&files.postings).await?;
74
75 let store_handle = dir.open_lazy(&files.store).await?;
77 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
78
79 let (vector_indexes, coarse_centroids) =
81 loader::load_vectors_file(dir, &files, &schema).await?;
82
83 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
85
86 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
88
89 let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
91 let sparse_mem = sparse_dims * 24; log::debug!(
93 "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, vectors={}",
94 segment_id.0,
95 meta.num_docs,
96 sparse_dims,
97 sparse_mem as f64 / 1024.0,
98 vector_indexes.len()
99 );
100
101 Ok(Self {
102 meta,
103 term_dict: Arc::new(term_dict),
104 postings_handle,
105 store: Arc::new(store),
106 schema,
107 doc_id_offset,
108 vector_indexes,
109 coarse_centroids,
110 sparse_indexes,
111 positions_handle,
112 })
113 }
114
115 pub fn meta(&self) -> &SegmentMeta {
116 &self.meta
117 }
118
119 pub fn num_docs(&self) -> u32 {
120 self.meta.num_docs
121 }
122
123 pub fn avg_field_len(&self, field: Field) -> f32 {
125 self.meta.avg_field_len(field)
126 }
127
128 pub fn doc_id_offset(&self) -> DocId {
129 self.doc_id_offset
130 }
131
132 pub fn set_doc_id_offset(&mut self, offset: DocId) {
134 self.doc_id_offset = offset;
135 }
136
137 pub fn schema(&self) -> &Schema {
138 &self.schema
139 }
140
141 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
143 &self.sparse_indexes
144 }
145
146 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
148 &self.vector_indexes
149 }
150
151 pub fn term_dict_stats(&self) -> SSTableStats {
153 self.term_dict.stats()
154 }
155
156 pub async fn get_postings(
161 &self,
162 field: Field,
163 term: &[u8],
164 ) -> Result<Option<BlockPostingList>> {
165 log::debug!(
166 "SegmentReader::get_postings field={} term_len={}",
167 field.0,
168 term.len()
169 );
170
171 let mut key = Vec::with_capacity(4 + term.len());
173 key.extend_from_slice(&field.0.to_le_bytes());
174 key.extend_from_slice(term);
175
176 let term_info = match self.term_dict.get(&key).await? {
178 Some(info) => {
179 log::debug!("SegmentReader::get_postings found term_info");
180 info
181 }
182 None => {
183 log::debug!("SegmentReader::get_postings term not found");
184 return Ok(None);
185 }
186 };
187
188 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
190 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
192 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
193 posting_list.push(doc_id, tf);
194 }
195 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
196 return Ok(Some(block_list));
197 }
198
199 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
201 Error::Corruption("TermInfo has neither inline nor external data".to_string())
202 })?;
203
204 let start = posting_offset;
205 let end = start + posting_len as u64;
206
207 if end > self.postings_handle.len() {
208 return Err(Error::Corruption(
209 "Posting offset out of bounds".to_string(),
210 ));
211 }
212
213 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
214 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
215
216 Ok(Some(block_list))
217 }
218
219 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
221 self.store
222 .get(local_doc_id, &self.schema)
223 .await
224 .map_err(Error::from)
225 }
226
227 pub async fn prefetch_terms(
229 &self,
230 field: Field,
231 start_term: &[u8],
232 end_term: &[u8],
233 ) -> Result<()> {
234 let mut start_key = Vec::with_capacity(4 + start_term.len());
235 start_key.extend_from_slice(&field.0.to_le_bytes());
236 start_key.extend_from_slice(start_term);
237
238 let mut end_key = Vec::with_capacity(4 + end_term.len());
239 end_key.extend_from_slice(&field.0.to_le_bytes());
240 end_key.extend_from_slice(end_term);
241
242 self.term_dict.prefetch_range(&start_key, &end_key).await?;
243 Ok(())
244 }
245
246 pub fn store_has_dict(&self) -> bool {
248 self.store.has_dict()
249 }
250
251 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
253 self.store.raw_blocks()
254 }
255
256 pub fn store_data_slice(&self) -> &LazyFileSlice {
258 self.store.data_slice()
259 }
260
261 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
263 self.term_dict.all_entries().await.map_err(Error::from)
264 }
265
266 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
271 let entries = self.term_dict.all_entries().await?;
272 let mut result = Vec::with_capacity(entries.len());
273
274 for (key, term_info) in entries {
275 if key.len() > 4 {
277 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
278 let term_bytes = &key[4..];
279 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
280 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
281 }
282 }
283 }
284
285 Ok(result)
286 }
287
288 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
290 self.term_dict.iter()
291 }
292
293 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
295 let start = offset;
296 let end = start + len as u64;
297 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
298 Ok(bytes.to_vec())
299 }
300
301 pub fn search_dense_vector(
308 &self,
309 field: Field,
310 query: &[f32],
311 k: usize,
312 rerank_factor: usize,
313 combiner: crate::query::MultiValueCombiner,
314 ) -> Result<Vec<VectorSearchResult>> {
315 let index = self
316 .vector_indexes
317 .get(&field.0)
318 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
319
320 let mrl_dim = self
322 .schema
323 .get_field_entry(field)
324 .and_then(|e| e.dense_vector_config.as_ref())
325 .and_then(|c| c.mrl_dim);
326
327 let query_vec: Vec<f32>;
329 let effective_query = if let Some(trim_dim) = mrl_dim {
330 if trim_dim < query.len() {
331 query_vec = query[..trim_dim].to_vec();
332 query_vec.as_slice()
333 } else {
334 query
335 }
336 } else {
337 query
338 };
339
340 let results: Vec<(u32, u16, f32)> = match index {
342 VectorIndex::Flat(flat_data) => {
343 use crate::structures::simd::squared_euclidean_distance;
345
346 let mut candidates: Vec<(u32, u16, f32)> = flat_data
347 .vectors
348 .iter()
349 .zip(flat_data.doc_ids.iter())
350 .map(|(vec, &(doc_id, ordinal))| {
351 let dist = squared_euclidean_distance(effective_query, vec);
352 (doc_id, ordinal, dist)
353 })
354 .collect();
355 candidates
356 .sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
357 candidates.truncate(k);
358 candidates
359 }
360 VectorIndex::RaBitQ(rabitq) => rabitq.search(effective_query, k, rerank_factor),
361 VectorIndex::IVF { index, codebook } => {
362 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
363 Error::Schema("IVF index requires coarse centroids".to_string())
364 })?;
365 let nprobe = rerank_factor.max(32); index
367 .search(centroids, codebook, effective_query, k, Some(nprobe))
368 .into_iter()
369 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
371 }
372 VectorIndex::ScaNN { index, codebook } => {
373 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
374 Error::Schema("ScaNN index requires coarse centroids".to_string())
375 })?;
376 let nprobe = rerank_factor.max(32);
377 index
378 .search(centroids, codebook, effective_query, k, Some(nprobe))
379 .into_iter()
380 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
382 }
383 };
384
385 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
389 rustc_hash::FxHashMap::default();
390 for (doc_id, ordinal, dist) in results {
391 let doc_id = doc_id as DocId + self.doc_id_offset;
392 let score = 1.0 / (1.0 + dist); let ordinals = doc_ordinals.entry(doc_id).or_default();
394 ordinals.push((ordinal as u32, score));
395 }
396
397 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
399 .into_iter()
400 .map(|(doc_id, ordinals)| {
401 let combined_score = combiner.combine(&ordinals);
402 VectorSearchResult::new(doc_id, combined_score, ordinals)
403 })
404 .collect();
405
406 final_results.sort_by(|a, b| {
408 b.score
409 .partial_cmp(&a.score)
410 .unwrap_or(std::cmp::Ordering::Equal)
411 });
412 final_results.truncate(k);
413
414 Ok(final_results)
415 }
416
417 pub fn has_dense_vector_index(&self, field: Field) -> bool {
419 self.vector_indexes.contains_key(&field.0)
420 }
421
422 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
424 match self.vector_indexes.get(&field.0) {
425 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
426 _ => None,
427 }
428 }
429
430 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
432 match self.vector_indexes.get(&field.0) {
433 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
434 _ => None,
435 }
436 }
437
438 pub fn get_scann_vector_index(
440 &self,
441 field: Field,
442 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
443 match self.vector_indexes.get(&field.0) {
444 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
445 _ => None,
446 }
447 }
448
449 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
451 self.vector_indexes.get(&field.0)
452 }
453
454 pub async fn search_sparse_vector(
464 &self,
465 field: Field,
466 vector: &[(u32, f32)],
467 limit: usize,
468 combiner: crate::query::MultiValueCombiner,
469 ) -> Result<Vec<VectorSearchResult>> {
470 use crate::query::{SparseTermScorer, WandExecutor};
471
472 let query_tokens = vector.len();
473
474 let sparse_index = match self.sparse_indexes.get(&field.0) {
476 Some(idx) => idx,
477 None => {
478 log::debug!(
479 "Sparse vector search: no index for field {}, returning empty",
480 field.0
481 );
482 return Ok(Vec::new());
483 }
484 };
485
486 let index_dimensions = sparse_index.num_dimensions();
487
488 let mut matched_tokens = Vec::new();
492 let mut missing_tokens = Vec::new();
493 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
494 Vec::with_capacity(vector.len());
495
496 for &(dim_id, query_weight) in vector {
497 if !sparse_index.has_dimension(dim_id) {
499 missing_tokens.push(dim_id);
500 continue;
501 }
502
503 match sparse_index.get_posting(dim_id).await? {
505 Some(pl) => {
506 matched_tokens.push(dim_id);
507 posting_lists.push((dim_id, query_weight, pl));
508 }
509 None => {
510 missing_tokens.push(dim_id);
511 }
512 }
513 }
514
515 let scorers: Vec<SparseTermScorer> = posting_lists
517 .iter()
518 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
519 .collect();
520
521 log::debug!(
522 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
523 query_tokens,
524 matched_tokens.len(),
525 missing_tokens.len(),
526 index_dimensions
527 );
528
529 if log::log_enabled!(log::Level::Debug) {
531 let query_details: Vec<_> = vector
532 .iter()
533 .take(30)
534 .map(|(id, w)| format!("{}:{:.3}", id, w))
535 .collect();
536 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
537 }
538
539 if !matched_tokens.is_empty() {
540 log::debug!(
541 "Matched token IDs: {:?}",
542 matched_tokens.iter().take(20).collect::<Vec<_>>()
543 );
544 }
545
546 if !missing_tokens.is_empty() {
547 log::debug!(
548 "Missing token IDs (not in index): {:?}",
549 missing_tokens.iter().take(20).collect::<Vec<_>>()
550 );
551 }
552
553 if scorers.is_empty() {
554 log::debug!("Sparse vector search: no matching tokens, returning empty");
555 return Ok(Vec::new());
556 }
557
558 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); log::trace!(
564 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
565 raw_results.len(),
566 self.doc_id_offset
567 );
568 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
569 for r in raw_results.iter().take(5) {
570 log::trace!(
571 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
572 r.doc_id,
573 r.doc_id + self.doc_id_offset,
574 r.score,
575 r.ordinal
576 );
577 }
578 }
579
580 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
583 rustc_hash::FxHashMap::default();
584 for r in raw_results {
585 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
586 ordinals.push((r.ordinal as u32, r.score));
587 }
588
589 let mut results: Vec<VectorSearchResult> = doc_ordinals
592 .into_iter()
593 .map(|(doc_id, ordinals)| {
594 let global_doc_id = doc_id + self.doc_id_offset;
595 let combined_score = combiner.combine(&ordinals);
596 VectorSearchResult::new(global_doc_id, combined_score, ordinals)
597 })
598 .collect();
599
600 results.sort_by(|a, b| {
602 b.score
603 .partial_cmp(&a.score)
604 .unwrap_or(std::cmp::Ordering::Equal)
605 });
606 results.truncate(limit);
607
608 Ok(results)
609 }
610
611 pub async fn get_positions(
616 &self,
617 field: Field,
618 term: &[u8],
619 ) -> Result<Option<crate::structures::PositionPostingList>> {
620 use std::io::Cursor;
621
622 let handle = match &self.positions_handle {
624 Some(h) => h,
625 None => return Ok(None),
626 };
627
628 let mut key = Vec::with_capacity(4 + term.len());
630 key.extend_from_slice(&field.0.to_le_bytes());
631 key.extend_from_slice(term);
632
633 let term_info = match self.term_dict.get(&key).await? {
635 Some(info) => info,
636 None => return Ok(None),
637 };
638
639 let (offset, length) = match term_info.position_info() {
641 Some((o, l)) => (o, l),
642 None => return Ok(None),
643 };
644
645 let slice = handle.slice(offset..offset + length as u64);
647 let data = slice.read_bytes().await?;
648
649 let mut cursor = Cursor::new(data.as_slice());
651 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
652
653 Ok(Some(pos_list))
654 }
655
656 pub fn has_positions(&self, field: Field) -> bool {
658 if let Some(entry) = self.schema.get_field_entry(field) {
660 entry.positions.is_some()
661 } else {
662 false
663 }
664 }
665}
666
667pub type SegmentReader = AsyncSegmentReader;