hermes_core/segment/
reader.rs1use std::sync::Arc;
4
5use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
6use crate::dsl::{Document, Field, Schema};
7use crate::structures::{AsyncSSTableReader, BlockPostingList, SSTableStats, TermInfo};
8use crate::{DocId, Error, Result};
9
10use super::store::{AsyncStoreReader, RawStoreBlock};
11use super::types::{SegmentFiles, SegmentId, SegmentMeta};
12
13pub struct AsyncSegmentReader {
19 meta: SegmentMeta,
20 term_dict: Arc<AsyncSSTableReader<TermInfo>>,
22 postings_handle: LazyFileHandle,
24 store: Arc<AsyncStoreReader>,
26 schema: Arc<Schema>,
27 doc_id_offset: DocId,
29}
30
31impl AsyncSegmentReader {
32 pub async fn open<D: Directory>(
34 dir: &D,
35 segment_id: SegmentId,
36 schema: Arc<Schema>,
37 doc_id_offset: DocId,
38 cache_blocks: usize,
39 ) -> Result<Self> {
40 let files = SegmentFiles::new(segment_id.0);
41
42 let meta_slice = dir.open_read(&files.meta).await?;
44 let meta_bytes = meta_slice.read_bytes().await?;
45 let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
46 debug_assert_eq!(meta.id, segment_id.0);
47
48 let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
50 let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
51
52 let postings_handle = dir.open_lazy(&files.postings).await?;
54
55 let store_handle = dir.open_lazy(&files.store).await?;
57 let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
58
59 Ok(Self {
60 meta,
61 term_dict: Arc::new(term_dict),
62 postings_handle,
63 store: Arc::new(store),
64 schema,
65 doc_id_offset,
66 })
67 }
68
69 pub fn meta(&self) -> &SegmentMeta {
70 &self.meta
71 }
72
73 pub fn num_docs(&self) -> u32 {
74 self.meta.num_docs
75 }
76
77 pub fn avg_field_len(&self, field: Field) -> f32 {
79 self.meta.avg_field_len(field)
80 }
81
82 pub fn doc_id_offset(&self) -> DocId {
83 self.doc_id_offset
84 }
85
86 pub fn schema(&self) -> &Schema {
87 &self.schema
88 }
89
90 pub fn term_dict_stats(&self) -> SSTableStats {
92 self.term_dict.stats()
93 }
94
95 pub async fn get_postings(
100 &self,
101 field: Field,
102 term: &[u8],
103 ) -> Result<Option<BlockPostingList>> {
104 log::debug!(
105 "SegmentReader::get_postings field={} term_len={}",
106 field.0,
107 term.len()
108 );
109
110 let mut key = Vec::with_capacity(4 + term.len());
112 key.extend_from_slice(&field.0.to_le_bytes());
113 key.extend_from_slice(term);
114
115 let term_info = match self.term_dict.get(&key).await? {
117 Some(info) => {
118 log::debug!("SegmentReader::get_postings found term_info");
119 info
120 }
121 None => {
122 log::debug!("SegmentReader::get_postings term not found");
123 return Ok(None);
124 }
125 };
126
127 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
129 let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
131 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
132 posting_list.push(doc_id, tf);
133 }
134 let block_list = BlockPostingList::from_posting_list(&posting_list)?;
135 return Ok(Some(block_list));
136 }
137
138 let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
140 Error::Corruption("TermInfo has neither inline nor external data".to_string())
141 })?;
142
143 let start = posting_offset as usize;
144 let end = start + posting_len as usize;
145
146 if end > self.postings_handle.len() {
147 return Err(Error::Corruption(
148 "Posting offset out of bounds".to_string(),
149 ));
150 }
151
152 let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
153 let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
154
155 Ok(Some(block_list))
156 }
157
158 pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
160 self.store
161 .get(local_doc_id, &self.schema)
162 .await
163 .map_err(Error::from)
164 }
165
166 pub async fn prefetch_terms(
168 &self,
169 field: Field,
170 start_term: &[u8],
171 end_term: &[u8],
172 ) -> Result<()> {
173 let mut start_key = Vec::with_capacity(4 + start_term.len());
174 start_key.extend_from_slice(&field.0.to_le_bytes());
175 start_key.extend_from_slice(start_term);
176
177 let mut end_key = Vec::with_capacity(4 + end_term.len());
178 end_key.extend_from_slice(&field.0.to_le_bytes());
179 end_key.extend_from_slice(end_term);
180
181 self.term_dict.prefetch_range(&start_key, &end_key).await?;
182 Ok(())
183 }
184
185 pub fn store_has_dict(&self) -> bool {
187 self.store.has_dict()
188 }
189
190 pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
192 self.store.raw_blocks()
193 }
194
195 pub fn store_data_slice(&self) -> &LazyFileSlice {
197 self.store.data_slice()
198 }
199
200 pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
202 self.term_dict.all_entries().await.map_err(Error::from)
203 }
204
205 pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
207 let start = offset as usize;
208 let end = start + len as usize;
209 let bytes = self.postings_handle.read_bytes_range(start..end).await?;
210 Ok(bytes.to_vec())
211 }
212}
213
214pub type SegmentReader = AsyncSegmentReader;