1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8#[derive(Debug, Clone, Default)]
10pub struct SegmentMemoryStats {
11 pub segment_id: u128,
13 pub num_docs: u32,
15 pub term_dict_cache_bytes: usize,
17 pub store_cache_bytes: usize,
19 pub sparse_index_bytes: usize,
21 pub dense_index_bytes: usize,
23 pub bloom_filter_bytes: usize,
25}
26
27impl SegmentMemoryStats {
28 pub fn total_bytes(&self) -> usize {
30 self.term_dict_cache_bytes
31 + self.store_cache_bytes
32 + self.sparse_index_bytes
33 + self.dense_index_bytes
34 + self.bloom_filter_bytes
35 }
36}
37
38use crate::structures::BlockSparsePostingList;
39
40use std::sync::Arc;
41
42use rustc_hash::FxHashMap;
43
44use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
45use crate::dsl::{Document, Field, Schema};
46use crate::structures::{
47 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
48 RaBitQIndex, SSTableStats, TermInfo,
49};
50use crate::{DocId, Error, Result};
51
52use super::store::{AsyncStoreReader, RawStoreBlock};
53use super::types::{SegmentFiles, SegmentId, SegmentMeta};
54
55pub struct AsyncSegmentReader {
61 meta: SegmentMeta,
62 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
64 postings_handle: LazyFileHandle,
66 store: Arc<AsyncStoreReader>,
68 schema: Arc<Schema>,
69 doc_id_offset: DocId,
71 vector_indexes: FxHashMap<u32, VectorIndex>,
73 coarse_centroids: Option<Arc<CoarseCentroids>>,
75 sparse_indexes: FxHashMap<u32, SparseIndex>,
77 positions_handle: Option<LazyFileHandle>,
79}
80
81impl AsyncSegmentReader {
82 pub async fn open<D: Directory>(
84 dir: &D,
85 segment_id: SegmentId,
86 schema: Arc<Schema>,
87 doc_id_offset: DocId,
88 cache_blocks: usize,
89 ) -> Result<Self> {
90 let files = SegmentFiles::new(segment_id.0);
91
92 let meta_slice = dir.open_read(&files.meta).await?;
94 let meta_bytes = meta_slice.read_bytes().await?;
95 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
96 debug_assert_eq!(meta.id, segment_id.0);
97
98 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
100 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
101
102 let postings_handle = dir.open_lazy(&files.postings).await?;
104
105 let store_handle = dir.open_lazy(&files.store).await?;
107 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
108
109 let (vector_indexes, coarse_centroids) =
111 loader::load_vectors_file(dir, &files, &schema).await?;
112
113 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
115
116 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
118
119 let sparse_dims: usize = sparse_indexes.values().map(|s| s.num_dimensions()).sum();
121 let sparse_mem = sparse_dims * 24; log::debug!(
123 "[segment] loaded {:016x}: docs={}, sparse_dims={}, sparse_mem={:.2} KB, vectors={}",
124 segment_id.0,
125 meta.num_docs,
126 sparse_dims,
127 sparse_mem as f64 / 1024.0,
128 vector_indexes.len()
129 );
130
131 Ok(Self {
132 meta,
133 term_dict: Arc::new(term_dict),
134 postings_handle,
135 store: Arc::new(store),
136 schema,
137 doc_id_offset,
138 vector_indexes,
139 coarse_centroids,
140 sparse_indexes,
141 positions_handle,
142 })
143 }
144
145 pub fn meta(&self) -> &SegmentMeta {
146 &self.meta
147 }
148
149 pub fn num_docs(&self) -> u32 {
150 self.meta.num_docs
151 }
152
153 pub fn avg_field_len(&self, field: Field) -> f32 {
155 self.meta.avg_field_len(field)
156 }
157
158 pub fn doc_id_offset(&self) -> DocId {
159 self.doc_id_offset
160 }
161
162 pub fn set_doc_id_offset(&mut self, offset: DocId) {
164 self.doc_id_offset = offset;
165 }
166
167 pub fn schema(&self) -> &Schema {
168 &self.schema
169 }
170
171 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
173 &self.sparse_indexes
174 }
175
176 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
178 &self.vector_indexes
179 }
180
181 pub fn term_dict_stats(&self) -> SSTableStats {
183 self.term_dict.stats()
184 }
185
186 pub fn memory_stats(&self) -> SegmentMemoryStats {
188 let term_dict_stats = self.term_dict.stats();
189
190 let term_dict_cache_bytes = self.term_dict.cached_blocks() * 4096;
192
193 let store_cache_bytes = self.store.cached_blocks() * 4096;
195
196 let sparse_index_bytes: usize = self
199 .sparse_indexes
200 .values()
201 .map(|s| s.num_dimensions() * 24)
202 .sum();
203
204 let dense_index_bytes: usize = self
207 .vector_indexes
208 .values()
209 .map(|v| v.estimated_memory_bytes())
210 .sum();
211
212 SegmentMemoryStats {
213 segment_id: self.meta.id,
214 num_docs: self.meta.num_docs,
215 term_dict_cache_bytes,
216 store_cache_bytes,
217 sparse_index_bytes,
218 dense_index_bytes,
219 bloom_filter_bytes: term_dict_stats.bloom_filter_size,
220 }
221 }
222
223 pub async fn get_postings(
228 &self,
229 field: Field,
230 term: &[u8],
231 ) -> Result<Option<BlockPostingList>> {
232 log::debug!(
233 "SegmentReader::get_postings field={} term_len={}",
234 field.0,
235 term.len()
236 );
237
238 let mut key = Vec::with_capacity(4 + term.len());
240 key.extend_from_slice(&field.0.to_le_bytes());
241 key.extend_from_slice(term);
242
243 let term_info = match self.term_dict.get(&key).await? {
245 Some(info) => {
246 log::debug!("SegmentReader::get_postings found term_info");
247 info
248 }
249 None => {
250 log::debug!("SegmentReader::get_postings term not found");
251 return Ok(None);
252 }
253 };
254
255 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
257 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
259 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
260 posting_list.push(doc_id, tf);
261 }
262 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
263 return Ok(Some(block_list));
264 }
265
266 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
268 Error::Corruption("TermInfo has neither inline nor external data".to_string())
269 })?;
270
271 let start = posting_offset;
272 let end = start + posting_len as u64;
273
274 if end > self.postings_handle.len() {
275 return Err(Error::Corruption(
276 "Posting offset out of bounds".to_string(),
277 ));
278 }
279
280 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
281 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
282
283 Ok(Some(block_list))
284 }
285
286 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
288 self.store
289 .get(local_doc_id, &self.schema)
290 .await
291 .map_err(Error::from)
292 }
293
294 pub async fn prefetch_terms(
296 &self,
297 field: Field,
298 start_term: &[u8],
299 end_term: &[u8],
300 ) -> Result<()> {
301 let mut start_key = Vec::with_capacity(4 + start_term.len());
302 start_key.extend_from_slice(&field.0.to_le_bytes());
303 start_key.extend_from_slice(start_term);
304
305 let mut end_key = Vec::with_capacity(4 + end_term.len());
306 end_key.extend_from_slice(&field.0.to_le_bytes());
307 end_key.extend_from_slice(end_term);
308
309 self.term_dict.prefetch_range(&start_key, &end_key).await?;
310 Ok(())
311 }
312
313 pub fn store_has_dict(&self) -> bool {
315 self.store.has_dict()
316 }
317
318 pub fn store(&self) -> &super::store::AsyncStoreReader {
320 &self.store
321 }
322
323 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
325 self.store.raw_blocks()
326 }
327
328 pub fn store_data_slice(&self) -> &LazyFileSlice {
330 self.store.data_slice()
331 }
332
333 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
335 self.term_dict.all_entries().await.map_err(Error::from)
336 }
337
338 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
343 let entries = self.term_dict.all_entries().await?;
344 let mut result = Vec::with_capacity(entries.len());
345
346 for (key, term_info) in entries {
347 if key.len() > 4 {
349 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
350 let term_bytes = &key[4..];
351 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
352 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
353 }
354 }
355 }
356
357 Ok(result)
358 }
359
360 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
362 self.term_dict.iter()
363 }
364
365 pub async fn prefetch_term_dict(&self) -> crate::Result<()> {
369 self.term_dict
370 .prefetch_all_data_bulk()
371 .await
372 .map_err(crate::Error::from)
373 }
374
375 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
377 let start = offset;
378 let end = start + len as u64;
379 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
380 Ok(bytes.to_vec())
381 }
382
383 pub async fn read_position_bytes(&self, offset: u64, len: u32) -> Result<Option<Vec<u8>>> {
385 let handle = match &self.positions_handle {
386 Some(h) => h,
387 None => return Ok(None),
388 };
389 let start = offset;
390 let end = start + len as u64;
391 let bytes = handle.read_bytes_range(start..end).await?;
392 Ok(Some(bytes.to_vec()))
393 }
394
395 pub fn has_positions_file(&self) -> bool {
397 self.positions_handle.is_some()
398 }
399
400 pub fn search_dense_vector(
407 &self,
408 field: Field,
409 query: &[f32],
410 k: usize,
411 nprobe: usize,
412 rerank_factor: usize,
413 combiner: crate::query::MultiValueCombiner,
414 ) -> Result<Vec<VectorSearchResult>> {
415 let index = match self.vector_indexes.get(&field.0) {
416 Some(idx) => idx,
417 None => return Ok(Vec::new()), };
419
420 let mrl_dim = self
422 .schema
423 .get_field_entry(field)
424 .and_then(|e| e.dense_vector_config.as_ref())
425 .and_then(|c| c.mrl_dim);
426
427 let query_vec: Vec<f32>;
429 let effective_query = if let Some(trim_dim) = mrl_dim {
430 if trim_dim < query.len() {
431 query_vec = query[..trim_dim].to_vec();
432 query_vec.as_slice()
433 } else {
434 query
435 }
436 } else {
437 query
438 };
439
440 let results: Vec<(u32, u16, f32)> = match index {
442 VectorIndex::Flat(flat_data) => {
443 use crate::structures::simd::cosine_similarity;
445
446 let mut candidates: Vec<(u32, u16, f32)> = (0..flat_data.num_vectors())
447 .map(|i| {
448 let vec = flat_data.get_vector(i);
449 let (doc_id, ordinal) = flat_data.get_doc_id(i);
450 let score = cosine_similarity(effective_query, vec);
451 (doc_id, ordinal, score)
452 })
453 .collect();
454 candidates
456 .sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal));
457 candidates.truncate(k * rerank_factor.max(1));
460 candidates
461 }
462 VectorIndex::RaBitQ(rabitq) => {
463 rabitq
465 .search(effective_query, k, rerank_factor)
466 .into_iter()
467 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
468 .collect()
469 }
470 VectorIndex::IVF { index, codebook } => {
471 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
472 Error::Schema("IVF index requires coarse centroids".to_string())
473 })?;
474 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
475 index
477 .search(
478 centroids,
479 codebook,
480 effective_query,
481 k,
482 Some(effective_nprobe),
483 )
484 .into_iter()
485 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
486 .collect()
487 }
488 VectorIndex::ScaNN { index, codebook } => {
489 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
490 Error::Schema("ScaNN index requires coarse centroids".to_string())
491 })?;
492 let effective_nprobe = if nprobe > 0 { nprobe } else { 32 };
493 index
495 .search(
496 centroids,
497 codebook,
498 effective_query,
499 k,
500 Some(effective_nprobe),
501 )
502 .into_iter()
503 .map(|(doc_id, ordinal, dist)| (doc_id, ordinal, 1.0 / (1.0 + dist)))
504 .collect()
505 }
506 };
507
508 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
511 rustc_hash::FxHashMap::default();
512 for (doc_id, ordinal, score) in results {
513 let ordinals = doc_ordinals.entry(doc_id as DocId).or_default();
514 ordinals.push((ordinal as u32, score));
515 }
516
517 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
519 .into_iter()
520 .map(|(doc_id, ordinals)| {
521 let combined_score = combiner.combine(&ordinals);
522 VectorSearchResult::new(doc_id, combined_score, ordinals)
523 })
524 .collect();
525
526 final_results.sort_by(|a, b| {
528 b.score
529 .partial_cmp(&a.score)
530 .unwrap_or(std::cmp::Ordering::Equal)
531 });
532 final_results.truncate(k);
533
534 Ok(final_results)
535 }
536
537 pub fn has_dense_vector_index(&self, field: Field) -> bool {
539 self.vector_indexes.contains_key(&field.0)
540 }
541
542 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
544 match self.vector_indexes.get(&field.0) {
545 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
546 _ => None,
547 }
548 }
549
550 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
552 match self.vector_indexes.get(&field.0) {
553 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
554 _ => None,
555 }
556 }
557
558 pub fn get_scann_vector_index(
560 &self,
561 field: Field,
562 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
563 match self.vector_indexes.get(&field.0) {
564 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
565 _ => None,
566 }
567 }
568
569 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
571 self.vector_indexes.get(&field.0)
572 }
573
574 pub async fn search_sparse_vector(
584 &self,
585 field: Field,
586 vector: &[(u32, f32)],
587 limit: usize,
588 combiner: crate::query::MultiValueCombiner,
589 heap_factor: f32,
590 ) -> Result<Vec<VectorSearchResult>> {
591 use crate::query::{BlockMaxScoreExecutor, BmpExecutor, SparseTermScorer};
592
593 let query_tokens = vector.len();
594
595 let sparse_index = match self.sparse_indexes.get(&field.0) {
597 Some(idx) => idx,
598 None => {
599 log::debug!(
600 "Sparse vector search: no index for field {}, returning empty",
601 field.0
602 );
603 return Ok(Vec::new());
604 }
605 };
606
607 let index_dimensions = sparse_index.num_dimensions();
608
609 let mut matched_tokens = Vec::new();
613 let mut missing_tokens = Vec::new();
614 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
615 Vec::with_capacity(vector.len());
616
617 for &(dim_id, query_weight) in vector {
618 if !sparse_index.has_dimension(dim_id) {
620 missing_tokens.push(dim_id);
621 continue;
622 }
623
624 match sparse_index.get_posting(dim_id).await? {
626 Some(pl) => {
627 matched_tokens.push(dim_id);
628 posting_lists.push((dim_id, query_weight, pl));
629 }
630 None => {
631 missing_tokens.push(dim_id);
632 }
633 }
634 }
635
636 let scorers: Vec<SparseTermScorer> = posting_lists
638 .iter()
639 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
640 .collect();
641
642 log::debug!(
643 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
644 query_tokens,
645 matched_tokens.len(),
646 missing_tokens.len(),
647 index_dimensions
648 );
649
650 if log::log_enabled!(log::Level::Debug) {
652 let query_details: Vec<_> = vector
653 .iter()
654 .take(30)
655 .map(|(id, w)| format!("{}:{:.3}", id, w))
656 .collect();
657 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
658 }
659
660 if !matched_tokens.is_empty() {
661 log::debug!(
662 "Matched token IDs: {:?}",
663 matched_tokens.iter().take(20).collect::<Vec<_>>()
664 );
665 }
666
667 if !missing_tokens.is_empty() {
668 log::debug!(
669 "Missing token IDs (not in index): {:?}",
670 missing_tokens.iter().take(20).collect::<Vec<_>>()
671 );
672 }
673
674 if scorers.is_empty() {
675 log::debug!("Sparse vector search: no matching tokens, returning empty");
676 return Ok(Vec::new());
677 }
678
679 let num_terms = scorers.len();
683 let over_fetch = limit * 2; let raw_results = if num_terms > 12 {
685 let pl_refs: Vec<_> = posting_lists
687 .iter()
688 .map(|(_, _, pl)| Arc::clone(pl))
689 .collect();
690 let weights: Vec<_> = posting_lists.iter().map(|(_, qw, _)| *qw).collect();
691 drop(scorers); BmpExecutor::new(pl_refs, weights, over_fetch, heap_factor).execute()
693 } else {
694 BlockMaxScoreExecutor::with_heap_factor(scorers, over_fetch, heap_factor).execute()
695 };
696
697 log::trace!(
698 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
699 raw_results.len(),
700 self.doc_id_offset
701 );
702 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
703 for r in raw_results.iter().take(5) {
704 log::trace!(
705 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
706 r.doc_id,
707 r.doc_id + self.doc_id_offset,
708 r.score,
709 r.ordinal
710 );
711 }
712 }
713
714 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
717 rustc_hash::FxHashMap::default();
718 for r in raw_results {
719 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
720 ordinals.push((r.ordinal as u32, r.score));
721 }
722
723 let mut results: Vec<VectorSearchResult> = doc_ordinals
726 .into_iter()
727 .map(|(doc_id, ordinals)| {
728 let combined_score = combiner.combine(&ordinals);
729 VectorSearchResult::new(doc_id, combined_score, ordinals)
730 })
731 .collect();
732
733 results.sort_by(|a, b| {
735 b.score
736 .partial_cmp(&a.score)
737 .unwrap_or(std::cmp::Ordering::Equal)
738 });
739 results.truncate(limit);
740
741 Ok(results)
742 }
743
744 pub async fn get_positions(
749 &self,
750 field: Field,
751 term: &[u8],
752 ) -> Result<Option<crate::structures::PositionPostingList>> {
753 use std::io::Cursor;
754
755 let handle = match &self.positions_handle {
757 Some(h) => h,
758 None => return Ok(None),
759 };
760
761 let mut key = Vec::with_capacity(4 + term.len());
763 key.extend_from_slice(&field.0.to_le_bytes());
764 key.extend_from_slice(term);
765
766 let term_info = match self.term_dict.get(&key).await? {
768 Some(info) => info,
769 None => return Ok(None),
770 };
771
772 let (offset, length) = match term_info.position_info() {
774 Some((o, l)) => (o, l),
775 None => return Ok(None),
776 };
777
778 let slice = handle.slice(offset..offset + length as u64);
780 let data = slice.read_bytes().await?;
781
782 let mut cursor = Cursor::new(data.as_slice());
784 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
785
786 Ok(Some(pos_list))
787 }
788
789 pub fn has_positions(&self, field: Field) -> bool {
791 if let Some(entry) = self.schema.get_field_entry(field) {
793 entry.positions.is_some()
794 } else {
795 false
796 }
797 }
798}
799
800pub type SegmentReader = AsyncSegmentReader;