Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader and Searcher - two-tier search abstraction
2//!
3//! - `IndexReader`: Manages reload policy, holds current Searcher
4//! - `Searcher`: Holds segment snapshot, provides search/doc access
5//!
6//! Usage pattern:
7//! 1. Get IndexReader from Index via `index.reader()`
8//! 2. Get Searcher from IndexReader via `reader.searcher()`
9//! 3. Use Searcher for search/doc operations
10//! 4. Searcher dropped → segment refs released → deferred deletions proceed
11
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15use rustc_hash::FxHashMap;
16
17use crate::dsl::Schema;
18use crate::error::Result;
19use crate::segment::{SegmentId, SegmentReader, SegmentSnapshot};
20use crate::structures::{CoarseCentroids, PQCodebook};
21
22#[cfg(feature = "native")]
23use crate::directories::DirectoryWriter;
24
25/// Searcher - holds segment snapshot for safe searching
26///
27/// Segments referenced by this reader won't be deleted until the reader is dropped.
28#[cfg(feature = "native")]
29pub struct Searcher<D: DirectoryWriter + 'static> {
30    /// Segment snapshot holding refs (prevents deletion)
31    _snapshot: SegmentSnapshot<D>,
32    /// Loaded segment readers
33    segments: Vec<Arc<SegmentReader>>,
34    /// Schema
35    schema: Arc<Schema>,
36    /// Default fields for search
37    default_fields: Vec<crate::Field>,
38    /// Tokenizers
39    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
40    /// Trained centroids per field
41    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
42    /// Trained codebooks per field
43    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
44}
45
46#[cfg(feature = "native")]
47impl<D: DirectoryWriter + 'static> Searcher<D> {
48    /// Create a new reader from a snapshot
49    pub(crate) async fn from_snapshot(
50        directory: Arc<D>,
51        schema: Arc<Schema>,
52        snapshot: SegmentSnapshot<D>,
53        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
54        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
55        term_cache_blocks: usize,
56    ) -> Result<Self> {
57        // Load segment readers for the snapshot
58        let mut segments = Vec::new();
59        let mut doc_id_offset = 0u32;
60
61        for id_str in snapshot.segment_ids() {
62            let Some(segment_id) = SegmentId::from_hex(id_str) else {
63                continue;
64            };
65
66            match SegmentReader::open(
67                directory.as_ref(),
68                segment_id,
69                Arc::clone(&schema),
70                doc_id_offset,
71                term_cache_blocks,
72            )
73            .await
74            {
75                Ok(reader) => {
76                    doc_id_offset += reader.meta().num_docs;
77                    segments.push(Arc::new(reader));
78                }
79                Err(e) => {
80                    log::warn!("Failed to open segment {}: {:?}", id_str, e);
81                }
82            }
83        }
84
85        // Build default fields
86        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
87            schema.default_fields().to_vec()
88        } else {
89            schema
90                .fields()
91                .filter(|(_, entry)| {
92                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
93                })
94                .map(|(field, _)| field)
95                .collect()
96        };
97
98        Ok(Self {
99            _snapshot: snapshot,
100            segments,
101            schema,
102            default_fields,
103            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
104            trained_centroids,
105            trained_codebooks,
106        })
107    }
108
109    /// Get the schema
110    pub fn schema(&self) -> &Schema {
111        &self.schema
112    }
113
114    /// Get segment readers
115    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
116        &self.segments
117    }
118
119    /// Get default fields for search
120    pub fn default_fields(&self) -> &[crate::Field] {
121        &self.default_fields
122    }
123
124    /// Get tokenizer registry
125    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
126        &self.tokenizers
127    }
128
129    /// Get trained centroids
130    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
131        &self.trained_centroids
132    }
133
134    /// Get trained codebooks
135    pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
136        &self.trained_codebooks
137    }
138
139    /// Get total document count across all segments
140    pub fn num_docs(&self) -> u32 {
141        self.segments.iter().map(|s| s.meta().num_docs).sum()
142    }
143
144    /// Get number of segments
145    pub fn num_segments(&self) -> usize {
146        self.segments.len()
147    }
148
149    /// Get a document by global doc_id
150    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
151        let mut offset = 0u32;
152        for segment in &self.segments {
153            let segment_docs = segment.meta().num_docs;
154            if doc_id < offset + segment_docs {
155                let local_doc_id = doc_id - offset;
156                return segment.doc(local_doc_id).await;
157            }
158            offset += segment_docs;
159        }
160        Ok(None)
161    }
162
163    /// Search across all segments and return aggregated results
164    pub async fn search(
165        &self,
166        query: &dyn crate::query::Query,
167        limit: usize,
168    ) -> Result<Vec<crate::query::SearchResult>> {
169        self.search_with_offset(query, limit, 0).await
170    }
171
172    /// Search with offset for pagination
173    pub async fn search_with_offset(
174        &self,
175        query: &dyn crate::query::Query,
176        limit: usize,
177        offset: usize,
178    ) -> Result<Vec<crate::query::SearchResult>> {
179        let fetch_limit = offset + limit;
180        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
181
182        for segment in &self.segments {
183            let segment_id = segment.meta().id;
184            let results =
185                crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
186            for result in results {
187                all_results.push((segment_id, result));
188            }
189        }
190
191        // Sort by score descending
192        all_results.sort_by(|a, b| {
193            b.1.score
194                .partial_cmp(&a.1.score)
195                .unwrap_or(std::cmp::Ordering::Equal)
196        });
197
198        // Apply offset and limit, return just the results
199        Ok(all_results
200            .into_iter()
201            .skip(offset)
202            .take(limit)
203            .map(|(_, result)| result)
204            .collect())
205    }
206}
207
208/// IndexReader - manages Searcher with reload policy
209///
210/// The IndexReader periodically reloads its Searcher to pick up new segments.
211/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
212#[cfg(feature = "native")]
213pub struct IndexReader<D: DirectoryWriter + 'static> {
214    /// Schema
215    schema: Arc<Schema>,
216    /// Segment manager - authoritative source for segments
217    segment_manager: Arc<crate::merge::SegmentManager<D>>,
218    /// Current searcher
219    searcher: RwLock<Arc<Searcher<D>>>,
220    /// Trained centroids
221    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
222    /// Trained codebooks
223    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
224    /// Term cache blocks
225    term_cache_blocks: usize,
226    /// Last reload time
227    last_reload: RwLock<std::time::Instant>,
228    /// Reload interval (default 1 second)
229    reload_interval: std::time::Duration,
230}
231
232#[cfg(feature = "native")]
233impl<D: DirectoryWriter + 'static> IndexReader<D> {
234    /// Create a new searcher from a segment manager
235    pub async fn from_segment_manager(
236        schema: Arc<Schema>,
237        segment_manager: Arc<crate::merge::SegmentManager<D>>,
238        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
239        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
240        term_cache_blocks: usize,
241    ) -> Result<Self> {
242        let reader = Self::create_reader(
243            &schema,
244            &segment_manager,
245            &trained_centroids,
246            &trained_codebooks,
247            term_cache_blocks,
248        )
249        .await?;
250
251        Ok(Self {
252            schema,
253            segment_manager,
254            searcher: RwLock::new(Arc::new(reader)),
255            trained_centroids,
256            trained_codebooks,
257            term_cache_blocks,
258            last_reload: RwLock::new(std::time::Instant::now()),
259            reload_interval: std::time::Duration::from_secs(1),
260        })
261    }
262
263    /// Create a new reader with fresh snapshot from segment manager
264    /// This avoids race conditions by using SegmentManager's locked metadata
265    async fn create_reader(
266        schema: &Arc<Schema>,
267        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
268        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
269        trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
270        term_cache_blocks: usize,
271    ) -> Result<Searcher<D>> {
272        // Use SegmentManager's acquire_snapshot - this locks metadata and tracker together
273        let snapshot = segment_manager.acquire_snapshot().await;
274
275        Searcher::from_snapshot(
276            segment_manager.directory(),
277            Arc::clone(schema),
278            snapshot,
279            trained_centroids.clone(),
280            trained_codebooks.clone(),
281            term_cache_blocks,
282        )
283        .await
284    }
285
286    /// Set reload interval
287    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
288        self.reload_interval = interval;
289    }
290
291    /// Get current searcher (reloads if interval exceeded)
292    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
293        // Check if reload needed
294        let should_reload = {
295            let last = self.last_reload.read();
296            last.elapsed() >= self.reload_interval
297        };
298
299        if should_reload {
300            self.reload().await?;
301        }
302
303        Ok(Arc::clone(&*self.searcher.read()))
304    }
305
306    /// Force reload reader with fresh snapshot
307    pub async fn reload(&self) -> Result<()> {
308        let new_reader = Self::create_reader(
309            &self.schema,
310            &self.segment_manager,
311            &self.trained_centroids,
312            &self.trained_codebooks,
313            self.term_cache_blocks,
314        )
315        .await?;
316
317        // Swap in new searcher (old one will be dropped when last ref released)
318        *self.searcher.write() = Arc::new(new_reader);
319        *self.last_reload.write() = std::time::Instant::now();
320
321        Ok(())
322    }
323
324    /// Get schema
325    pub fn schema(&self) -> &Schema {
326        &self.schema
327    }
328}