1mod loader;
4mod types;
5
6pub use types::{SparseIndex, VectorIndex, VectorSearchResult};
7
8use crate::structures::BlockSparsePostingList;
9
10use std::sync::Arc;
11
12use rustc_hash::FxHashMap;
13
14use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
15use crate::dsl::{Document, Field, Schema};
16use crate::structures::{
17 AsyncSSTableReader, BlockPostingList, CoarseCentroids, IVFPQIndex, IVFRaBitQIndex, PQCodebook,
18 RaBitQIndex, SSTableStats, TermInfo,
19};
20use crate::{DocId, Error, Result};
21
22use super::store::{AsyncStoreReader, RawStoreBlock};
23use super::types::{SegmentFiles, SegmentId, SegmentMeta};
24
25pub struct AsyncSegmentReader {
31 meta: SegmentMeta,
32 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
34 postings_handle: LazyFileHandle,
36 store: Arc<AsyncStoreReader>,
38 schema: Arc<Schema>,
39 doc_id_offset: DocId,
41 vector_indexes: FxHashMap<u32, VectorIndex>,
43 coarse_centroids: Option<Arc<CoarseCentroids>>,
45 sparse_indexes: FxHashMap<u32, SparseIndex>,
47 positions_handle: Option<LazyFileHandle>,
49}
50
51impl AsyncSegmentReader {
52 pub async fn open<D: Directory>(
54 dir: &D,
55 segment_id: SegmentId,
56 schema: Arc<Schema>,
57 doc_id_offset: DocId,
58 cache_blocks: usize,
59 ) -> Result<Self> {
60 let files = SegmentFiles::new(segment_id.0);
61
62 let meta_slice = dir.open_read(&files.meta).await?;
64 let meta_bytes = meta_slice.read_bytes().await?;
65 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
66 debug_assert_eq!(meta.id, segment_id.0);
67
68 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
70 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
71
72 let postings_handle = dir.open_lazy(&files.postings).await?;
74
75 let store_handle = dir.open_lazy(&files.store).await?;
77 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
78
79 let (vector_indexes, coarse_centroids) =
81 loader::load_vectors_file(dir, &files, &schema).await?;
82
83 let sparse_indexes = loader::load_sparse_file(dir, &files, meta.num_docs, &schema).await?;
85
86 let positions_handle = loader::open_positions_file(dir, &files, &schema).await?;
88
89 let sparse_mem: usize = sparse_indexes
91 .values()
92 .map(|s| s.num_dimensions() * 12)
93 .sum();
94 log::debug!(
95 "[segment] loaded {:016x}: docs={}, sparse_dims_mem={:.2} MB, vectors={}",
96 segment_id.0,
97 meta.num_docs,
98 sparse_mem as f64 / (1024.0 * 1024.0),
99 vector_indexes.len()
100 );
101
102 Ok(Self {
103 meta,
104 term_dict: Arc::new(term_dict),
105 postings_handle,
106 store: Arc::new(store),
107 schema,
108 doc_id_offset,
109 vector_indexes,
110 coarse_centroids,
111 sparse_indexes,
112 positions_handle,
113 })
114 }
115
116 pub fn meta(&self) -> &SegmentMeta {
117 &self.meta
118 }
119
120 pub fn num_docs(&self) -> u32 {
121 self.meta.num_docs
122 }
123
124 pub fn avg_field_len(&self, field: Field) -> f32 {
126 self.meta.avg_field_len(field)
127 }
128
129 pub fn doc_id_offset(&self) -> DocId {
130 self.doc_id_offset
131 }
132
133 pub fn set_doc_id_offset(&mut self, offset: DocId) {
135 self.doc_id_offset = offset;
136 }
137
138 pub fn schema(&self) -> &Schema {
139 &self.schema
140 }
141
142 pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
144 &self.sparse_indexes
145 }
146
147 pub fn vector_indexes(&self) -> &FxHashMap<u32, VectorIndex> {
149 &self.vector_indexes
150 }
151
152 pub fn term_dict_stats(&self) -> SSTableStats {
154 self.term_dict.stats()
155 }
156
157 pub async fn get_postings(
162 &self,
163 field: Field,
164 term: &[u8],
165 ) -> Result<Option<BlockPostingList>> {
166 log::debug!(
167 "SegmentReader::get_postings field={} term_len={}",
168 field.0,
169 term.len()
170 );
171
172 let mut key = Vec::with_capacity(4 + term.len());
174 key.extend_from_slice(&field.0.to_le_bytes());
175 key.extend_from_slice(term);
176
177 let term_info = match self.term_dict.get(&key).await? {
179 Some(info) => {
180 log::debug!("SegmentReader::get_postings found term_info");
181 info
182 }
183 None => {
184 log::debug!("SegmentReader::get_postings term not found");
185 return Ok(None);
186 }
187 };
188
189 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
191 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
193 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
194 posting_list.push(doc_id, tf);
195 }
196 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
197 return Ok(Some(block_list));
198 }
199
200 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
202 Error::Corruption("TermInfo has neither inline nor external data".to_string())
203 })?;
204
205 let start = posting_offset;
206 let end = start + posting_len as u64;
207
208 if end > self.postings_handle.len() {
209 return Err(Error::Corruption(
210 "Posting offset out of bounds".to_string(),
211 ));
212 }
213
214 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
215 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
216
217 Ok(Some(block_list))
218 }
219
220 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
222 self.store
223 .get(local_doc_id, &self.schema)
224 .await
225 .map_err(Error::from)
226 }
227
228 pub async fn prefetch_terms(
230 &self,
231 field: Field,
232 start_term: &[u8],
233 end_term: &[u8],
234 ) -> Result<()> {
235 let mut start_key = Vec::with_capacity(4 + start_term.len());
236 start_key.extend_from_slice(&field.0.to_le_bytes());
237 start_key.extend_from_slice(start_term);
238
239 let mut end_key = Vec::with_capacity(4 + end_term.len());
240 end_key.extend_from_slice(&field.0.to_le_bytes());
241 end_key.extend_from_slice(end_term);
242
243 self.term_dict.prefetch_range(&start_key, &end_key).await?;
244 Ok(())
245 }
246
247 pub fn store_has_dict(&self) -> bool {
249 self.store.has_dict()
250 }
251
252 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
254 self.store.raw_blocks()
255 }
256
257 pub fn store_data_slice(&self) -> &LazyFileSlice {
259 self.store.data_slice()
260 }
261
262 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
264 self.term_dict.all_entries().await.map_err(Error::from)
265 }
266
267 pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
272 let entries = self.term_dict.all_entries().await?;
273 let mut result = Vec::with_capacity(entries.len());
274
275 for (key, term_info) in entries {
276 if key.len() > 4 {
278 let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
279 let term_bytes = &key[4..];
280 if let Ok(term_str) = std::str::from_utf8(term_bytes) {
281 result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
282 }
283 }
284 }
285
286 Ok(result)
287 }
288
289 pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
291 self.term_dict.iter()
292 }
293
294 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
296 let start = offset;
297 let end = start + len as u64;
298 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
299 Ok(bytes.to_vec())
300 }
301
302 pub fn search_dense_vector(
309 &self,
310 field: Field,
311 query: &[f32],
312 k: usize,
313 rerank_factor: usize,
314 combiner: crate::query::MultiValueCombiner,
315 ) -> Result<Vec<VectorSearchResult>> {
316 let index = self
317 .vector_indexes
318 .get(&field.0)
319 .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
320
321 let mrl_dim = self
323 .schema
324 .get_field_entry(field)
325 .and_then(|e| e.dense_vector_config.as_ref())
326 .and_then(|c| c.mrl_dim);
327
328 let query_vec: Vec<f32>;
330 let effective_query = if let Some(trim_dim) = mrl_dim {
331 if trim_dim < query.len() {
332 query_vec = query[..trim_dim].to_vec();
333 query_vec.as_slice()
334 } else {
335 query
336 }
337 } else {
338 query
339 };
340
341 let results: Vec<(u32, u16, f32)> = match index {
343 VectorIndex::Flat(flat_data) => {
344 use crate::structures::simd::squared_euclidean_distance;
346
347 let mut candidates: Vec<(u32, u16, f32)> = flat_data
348 .vectors
349 .iter()
350 .zip(flat_data.doc_ids.iter())
351 .map(|(vec, &(doc_id, ordinal))| {
352 let dist = squared_euclidean_distance(effective_query, vec);
353 (doc_id, ordinal, dist)
354 })
355 .collect();
356 candidates
357 .sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
358 candidates.truncate(k);
359 candidates
360 }
361 VectorIndex::RaBitQ(rabitq) => rabitq.search(effective_query, k, rerank_factor),
362 VectorIndex::IVF { index, codebook } => {
363 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
364 Error::Schema("IVF index requires coarse centroids".to_string())
365 })?;
366 let nprobe = rerank_factor.max(32); index
368 .search(centroids, codebook, effective_query, k, Some(nprobe))
369 .into_iter()
370 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
372 }
373 VectorIndex::ScaNN { index, codebook } => {
374 let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
375 Error::Schema("ScaNN index requires coarse centroids".to_string())
376 })?;
377 let nprobe = rerank_factor.max(32);
378 index
379 .search(centroids, codebook, effective_query, k, Some(nprobe))
380 .into_iter()
381 .map(|(doc_id, dist)| (doc_id, 0u16, dist)) .collect()
383 }
384 };
385
386 let mut doc_ordinals: rustc_hash::FxHashMap<DocId, Vec<(u32, f32)>> =
390 rustc_hash::FxHashMap::default();
391 for (doc_id, ordinal, dist) in results {
392 let doc_id = doc_id as DocId + self.doc_id_offset;
393 let score = 1.0 / (1.0 + dist); let ordinals = doc_ordinals.entry(doc_id).or_default();
395 ordinals.push((ordinal as u32, score));
396 }
397
398 let mut final_results: Vec<VectorSearchResult> = doc_ordinals
400 .into_iter()
401 .map(|(doc_id, ordinals)| {
402 let combined_score = combiner.combine(&ordinals);
403 VectorSearchResult::new(doc_id, combined_score, ordinals)
404 })
405 .collect();
406
407 final_results.sort_by(|a, b| {
409 b.score
410 .partial_cmp(&a.score)
411 .unwrap_or(std::cmp::Ordering::Equal)
412 });
413 final_results.truncate(k);
414
415 Ok(final_results)
416 }
417
418 pub fn has_dense_vector_index(&self, field: Field) -> bool {
420 self.vector_indexes.contains_key(&field.0)
421 }
422
423 pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
425 match self.vector_indexes.get(&field.0) {
426 Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
427 _ => None,
428 }
429 }
430
431 pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
433 match self.vector_indexes.get(&field.0) {
434 Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
435 _ => None,
436 }
437 }
438
439 pub fn get_scann_vector_index(
441 &self,
442 field: Field,
443 ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
444 match self.vector_indexes.get(&field.0) {
445 Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
446 _ => None,
447 }
448 }
449
450 pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
452 self.vector_indexes.get(&field.0)
453 }
454
455 pub async fn search_sparse_vector(
465 &self,
466 field: Field,
467 vector: &[(u32, f32)],
468 limit: usize,
469 combiner: crate::query::MultiValueCombiner,
470 ) -> Result<Vec<VectorSearchResult>> {
471 use crate::query::{SparseTermScorer, WandExecutor};
472
473 let query_tokens = vector.len();
474
475 let sparse_index = match self.sparse_indexes.get(&field.0) {
477 Some(idx) => idx,
478 None => {
479 log::debug!(
480 "Sparse vector search: no index for field {}, returning empty",
481 field.0
482 );
483 return Ok(Vec::new());
484 }
485 };
486
487 let index_dimensions = sparse_index.num_dimensions();
488
489 let mut matched_tokens = Vec::new();
493 let mut missing_tokens = Vec::new();
494 let mut posting_lists: Vec<(u32, f32, Arc<BlockSparsePostingList>)> =
495 Vec::with_capacity(vector.len());
496
497 for &(dim_id, query_weight) in vector {
498 if !sparse_index.has_dimension(dim_id) {
500 missing_tokens.push(dim_id);
501 continue;
502 }
503
504 match sparse_index.get_posting(dim_id).await? {
506 Some(pl) => {
507 matched_tokens.push(dim_id);
508 posting_lists.push((dim_id, query_weight, pl));
509 }
510 None => {
511 missing_tokens.push(dim_id);
512 }
513 }
514 }
515
516 let scorers: Vec<SparseTermScorer> = posting_lists
518 .iter()
519 .map(|(_, query_weight, pl)| SparseTermScorer::from_arc(pl, *query_weight))
520 .collect();
521
522 log::debug!(
523 "Sparse vector search: query_tokens={}, matched={}, missing={}, index_dimensions={}",
524 query_tokens,
525 matched_tokens.len(),
526 missing_tokens.len(),
527 index_dimensions
528 );
529
530 if log::log_enabled!(log::Level::Debug) {
532 let query_details: Vec<_> = vector
533 .iter()
534 .take(30)
535 .map(|(id, w)| format!("{}:{:.3}", id, w))
536 .collect();
537 log::debug!("Query tokens (id:weight): [{}]", query_details.join(", "));
538 }
539
540 if !matched_tokens.is_empty() {
541 log::debug!(
542 "Matched token IDs: {:?}",
543 matched_tokens.iter().take(20).collect::<Vec<_>>()
544 );
545 }
546
547 if !missing_tokens.is_empty() {
548 log::debug!(
549 "Missing token IDs (not in index): {:?}",
550 missing_tokens.iter().take(20).collect::<Vec<_>>()
551 );
552 }
553
554 if scorers.is_empty() {
555 log::debug!("Sparse vector search: no matching tokens, returning empty");
556 return Ok(Vec::new());
557 }
558
559 let raw_results = WandExecutor::new(scorers, limit * 2).execute(); log::trace!(
565 "Sparse WAND returned {} raw results for segment (doc_id_offset={})",
566 raw_results.len(),
567 self.doc_id_offset
568 );
569 if log::log_enabled!(log::Level::Trace) && !raw_results.is_empty() {
570 for r in raw_results.iter().take(5) {
571 log::trace!(
572 " Raw result: doc_id={} (global={}), score={:.4}, ordinal={}",
573 r.doc_id,
574 r.doc_id + self.doc_id_offset,
575 r.score,
576 r.ordinal
577 );
578 }
579 }
580
581 let mut doc_ordinals: rustc_hash::FxHashMap<u32, Vec<(u32, f32)>> =
584 rustc_hash::FxHashMap::default();
585 for r in raw_results {
586 let ordinals = doc_ordinals.entry(r.doc_id).or_default();
587 ordinals.push((r.ordinal as u32, r.score));
588 }
589
590 let mut results: Vec<VectorSearchResult> = doc_ordinals
593 .into_iter()
594 .map(|(doc_id, ordinals)| {
595 let global_doc_id = doc_id + self.doc_id_offset;
596 let combined_score = combiner.combine(&ordinals);
597 VectorSearchResult::new(global_doc_id, combined_score, ordinals)
598 })
599 .collect();
600
601 results.sort_by(|a, b| {
603 b.score
604 .partial_cmp(&a.score)
605 .unwrap_or(std::cmp::Ordering::Equal)
606 });
607 results.truncate(limit);
608
609 Ok(results)
610 }
611
612 pub async fn get_positions(
617 &self,
618 field: Field,
619 term: &[u8],
620 ) -> Result<Option<crate::structures::PositionPostingList>> {
621 use std::io::Cursor;
622
623 let handle = match &self.positions_handle {
625 Some(h) => h,
626 None => return Ok(None),
627 };
628
629 let mut key = Vec::with_capacity(4 + term.len());
631 key.extend_from_slice(&field.0.to_le_bytes());
632 key.extend_from_slice(term);
633
634 let term_info = match self.term_dict.get(&key).await? {
636 Some(info) => info,
637 None => return Ok(None),
638 };
639
640 let (offset, length) = match term_info.position_info() {
642 Some((o, l)) => (o, l),
643 None => return Ok(None),
644 };
645
646 let slice = handle.slice(offset..offset + length as u64);
648 let data = slice.read_bytes().await?;
649
650 let mut cursor = Cursor::new(data.as_slice());
652 let pos_list = crate::structures::PositionPostingList::deserialize(&mut cursor)?;
653
654 Ok(Some(pos_list))
655 }
656
657 pub fn has_positions(&self, field: Field) -> bool {
659 if let Some(entry) = self.schema.get_field_entry(field) {
661 entry.positions.is_some()
662 } else {
663 false
664 }
665 }
666}
667
668pub type SegmentReader = AsyncSegmentReader;