hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
6use crate::dsl::{Document, Field, Schema};
7use crate::structures::{AsyncSSTableReader, BlockPostingList, SSTableStats, TermInfo};
8use crate::{DocId, Error, Result};
9
10use super::store::{AsyncStoreReader, RawStoreBlock};
11use super::types::{SegmentFiles, SegmentId, SegmentMeta};
12
13/// Async segment reader with lazy loading
14///
15/// - Term dictionary: only index loaded, blocks loaded on-demand
16/// - Postings: loaded on-demand per term via HTTP range requests
17/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
18pub struct AsyncSegmentReader {
19    meta: SegmentMeta,
20    /// Term dictionary with lazy block loading
21    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
22    /// Postings file handle - fetches ranges on demand
23    postings_handle: LazyFileHandle,
24    /// Document store with lazy block loading
25    store: Arc<AsyncStoreReader>,
26    schema: Arc<Schema>,
27    /// Base doc_id offset for this segment
28    doc_id_offset: DocId,
29}
30
31impl AsyncSegmentReader {
32    /// Open a segment with lazy loading
33    pub async fn open<D: Directory>(
34        dir: &D,
35        segment_id: SegmentId,
36        schema: Arc<Schema>,
37        doc_id_offset: DocId,
38        cache_blocks: usize,
39    ) -> Result<Self> {
40        let files = SegmentFiles::new(segment_id.0);
41
42        // Read metadata (small, always loaded)
43        let meta_slice = dir.open_read(&files.meta).await?;
44        let meta_bytes = meta_slice.read_bytes().await?;
45        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
46        debug_assert_eq!(meta.id, segment_id.0);
47
48        // Open term dictionary with lazy loading (fetches ranges on demand)
49        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
50        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
51
52        // Get postings file handle (lazy - fetches ranges on demand)
53        let postings_handle = dir.open_lazy(&files.postings).await?;
54
55        // Open store with lazy loading
56        let store_handle = dir.open_lazy(&files.store).await?;
57        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
58
59        Ok(Self {
60            meta,
61            term_dict: Arc::new(term_dict),
62            postings_handle,
63            store: Arc::new(store),
64            schema,
65            doc_id_offset,
66        })
67    }
68
69    pub fn meta(&self) -> &SegmentMeta {
70        &self.meta
71    }
72
73    pub fn num_docs(&self) -> u32 {
74        self.meta.num_docs
75    }
76
77    /// Get average field length for BM25F scoring
78    pub fn avg_field_len(&self, field: Field) -> f32 {
79        self.meta.avg_field_len(field)
80    }
81
82    pub fn doc_id_offset(&self) -> DocId {
83        self.doc_id_offset
84    }
85
86    pub fn schema(&self) -> &Schema {
87        &self.schema
88    }
89
90    /// Get term dictionary stats for debugging
91    pub fn term_dict_stats(&self) -> SSTableStats {
92        self.term_dict.stats()
93    }
94
95    /// Get posting list for a term (async - loads on demand)
96    ///
97    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
98    /// and no additional I/O is needed. For larger lists, reads from .post file.
99    pub async fn get_postings(
100        &self,
101        field: Field,
102        term: &[u8],
103    ) -> Result<Option<BlockPostingList>> {
104        log::debug!(
105            "SegmentReader::get_postings field={} term_len={}",
106            field.0,
107            term.len()
108        );
109
110        // Build key: field_id + term
111        let mut key = Vec::with_capacity(4 + term.len());
112        key.extend_from_slice(&field.0.to_le_bytes());
113        key.extend_from_slice(term);
114
115        // Look up in term dictionary
116        let term_info = match self.term_dict.get(&key).await? {
117            Some(info) => {
118                log::debug!("SegmentReader::get_postings found term_info");
119                info
120            }
121            None => {
122                log::debug!("SegmentReader::get_postings term not found");
123                return Ok(None);
124            }
125        };
126
127        // Check if posting list is inlined
128        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
129            // Build BlockPostingList from inline data (no I/O needed!)
130            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
131            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
132                posting_list.push(doc_id, tf);
133            }
134            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
135            return Ok(Some(block_list));
136        }
137
138        // External posting list - read from postings file handle (lazy - HTTP range request)
139        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
140            Error::Corruption("TermInfo has neither inline nor external data".to_string())
141        })?;
142
143        let start = posting_offset as usize;
144        let end = start + posting_len as usize;
145
146        if end > self.postings_handle.len() {
147            return Err(Error::Corruption(
148                "Posting offset out of bounds".to_string(),
149            ));
150        }
151
152        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
153        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
154
155        Ok(Some(block_list))
156    }
157
158    /// Get document by local doc_id (async - loads on demand)
159    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
160        self.store
161            .get(local_doc_id, &self.schema)
162            .await
163            .map_err(Error::from)
164    }
165
166    /// Prefetch term dictionary blocks for a key range
167    pub async fn prefetch_terms(
168        &self,
169        field: Field,
170        start_term: &[u8],
171        end_term: &[u8],
172    ) -> Result<()> {
173        let mut start_key = Vec::with_capacity(4 + start_term.len());
174        start_key.extend_from_slice(&field.0.to_le_bytes());
175        start_key.extend_from_slice(start_term);
176
177        let mut end_key = Vec::with_capacity(4 + end_term.len());
178        end_key.extend_from_slice(&field.0.to_le_bytes());
179        end_key.extend_from_slice(end_term);
180
181        self.term_dict.prefetch_range(&start_key, &end_key).await?;
182        Ok(())
183    }
184
185    /// Check if store uses dictionary compression (incompatible with raw merging)
186    pub fn store_has_dict(&self) -> bool {
187        self.store.has_dict()
188    }
189
190    /// Get raw store blocks for optimized merging
191    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
192        self.store.raw_blocks()
193    }
194
195    /// Get store data slice for raw block access
196    pub fn store_data_slice(&self) -> &LazyFileSlice {
197        self.store.data_slice()
198    }
199
200    /// Get all terms from this segment (for merge)
201    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
202        self.term_dict.all_entries().await.map_err(Error::from)
203    }
204
205    /// Read raw posting bytes at offset
206    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
207        let start = offset as usize;
208        let end = start + len as usize;
209        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
210        Ok(bytes.to_vec())
211    }
212}
213
214/// Alias for AsyncSegmentReader
215pub type SegmentReader = AsyncSegmentReader;