Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader and Searcher - two-tier search abstraction
2//!
3//! - `IndexReader`: Manages reload policy, holds current Searcher
4//! - `Searcher`: Holds segment snapshot, provides search/doc access
5//!
6//! Usage pattern:
7//! 1. Get IndexReader from Index via `index.reader()`
8//! 2. Get Searcher from IndexReader via `reader.searcher()`
9//! 3. Use Searcher for search/doc operations
10//! 4. Searcher dropped → segment refs released → deferred deletions proceed
11
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15use rustc_hash::FxHashMap;
16
17use crate::dsl::Schema;
18use crate::error::Result;
19use crate::query::LazyGlobalStats;
20use crate::segment::{SegmentId, SegmentReader, SegmentSnapshot};
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24use crate::directories::DirectoryWriter;
25
26/// Searcher - holds segment snapshot for safe searching
27///
28/// Segments referenced by this reader won't be deleted until the reader is dropped.
29#[cfg(feature = "native")]
30pub struct Searcher<D: DirectoryWriter + 'static> {
31    /// Segment snapshot holding refs (prevents deletion)
32    _snapshot: SegmentSnapshot<D>,
33    /// Loaded segment readers
34    segments: Vec<Arc<SegmentReader>>,
35    /// Schema
36    schema: Arc<Schema>,
37    /// Default fields for search
38    default_fields: Vec<crate::Field>,
39    /// Tokenizers
40    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
41    /// Trained centroids per field
42    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
43    /// Trained codebooks per field
44    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
45    /// Lazy global statistics for cross-segment IDF computation
46    /// Bound to this Searcher's segment set - stats are computed lazily and cached
47    global_stats: Arc<LazyGlobalStats>,
48}
49
50#[cfg(feature = "native")]
51impl<D: DirectoryWriter + 'static> Searcher<D> {
52    /// Create a new reader from a snapshot
53    pub(crate) async fn from_snapshot(
54        directory: Arc<D>,
55        schema: Arc<Schema>,
56        snapshot: SegmentSnapshot<D>,
57        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
58        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
59        term_cache_blocks: usize,
60    ) -> Result<Self> {
61        // Load segment readers for the snapshot
62        let mut segments = Vec::new();
63        let mut doc_id_offset = 0u32;
64
65        for id_str in snapshot.segment_ids() {
66            let Some(segment_id) = SegmentId::from_hex(id_str) else {
67                continue;
68            };
69
70            match SegmentReader::open(
71                directory.as_ref(),
72                segment_id,
73                Arc::clone(&schema),
74                doc_id_offset,
75                term_cache_blocks,
76            )
77            .await
78            {
79                Ok(reader) => {
80                    doc_id_offset += reader.meta().num_docs;
81                    segments.push(Arc::new(reader));
82                }
83                Err(e) => {
84                    log::warn!("Failed to open segment {}: {:?}", id_str, e);
85                }
86            }
87        }
88
89        // Build default fields
90        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
91            schema.default_fields().to_vec()
92        } else {
93            schema
94                .fields()
95                .filter(|(_, entry)| {
96                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
97                })
98                .map(|(field, _)| field)
99                .collect()
100        };
101
102        // Create lazy global statistics bound to this segment set
103        // IDF values will be computed on-demand and cached
104        let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
105
106        Ok(Self {
107            _snapshot: snapshot,
108            segments,
109            schema,
110            default_fields,
111            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
112            trained_centroids,
113            trained_codebooks,
114            global_stats,
115        })
116    }
117
118    /// Get the schema
119    pub fn schema(&self) -> &Schema {
120        &self.schema
121    }
122
123    /// Get segment readers
124    pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
125        &self.segments
126    }
127
128    /// Get default fields for search
129    pub fn default_fields(&self) -> &[crate::Field] {
130        &self.default_fields
131    }
132
133    /// Get tokenizer registry
134    pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
135        &self.tokenizers
136    }
137
138    /// Get trained centroids
139    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
140        &self.trained_centroids
141    }
142
143    /// Get trained codebooks
144    pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
145        &self.trained_codebooks
146    }
147
148    /// Get lazy global statistics for cross-segment IDF computation
149    ///
150    /// Statistics are computed lazily on first access and cached per term/dimension.
151    /// The stats are bound to this Searcher's segment set.
152    pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
153        &self.global_stats
154    }
155
156    /// Get total document count across all segments
157    pub fn num_docs(&self) -> u32 {
158        self.segments.iter().map(|s| s.meta().num_docs).sum()
159    }
160
161    /// Get number of segments
162    pub fn num_segments(&self) -> usize {
163        self.segments.len()
164    }
165
166    /// Get a document by global doc_id
167    pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
168        let mut offset = 0u32;
169        for segment in &self.segments {
170            let segment_docs = segment.meta().num_docs;
171            if doc_id < offset + segment_docs {
172                let local_doc_id = doc_id - offset;
173                return segment.doc(local_doc_id).await;
174            }
175            offset += segment_docs;
176        }
177        Ok(None)
178    }
179
180    /// Search across all segments and return aggregated results
181    pub async fn search(
182        &self,
183        query: &dyn crate::query::Query,
184        limit: usize,
185    ) -> Result<Vec<crate::query::SearchResult>> {
186        self.search_with_offset(query, limit, 0).await
187    }
188
189    /// Search with offset for pagination
190    pub async fn search_with_offset(
191        &self,
192        query: &dyn crate::query::Query,
193        limit: usize,
194        offset: usize,
195    ) -> Result<Vec<crate::query::SearchResult>> {
196        let fetch_limit = offset + limit;
197        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
198
199        for segment in &self.segments {
200            let segment_id = segment.meta().id;
201            let results =
202                crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
203            for result in results {
204                all_results.push((segment_id, result));
205            }
206        }
207
208        // Sort by score descending
209        all_results.sort_by(|a, b| {
210            b.1.score
211                .partial_cmp(&a.1.score)
212                .unwrap_or(std::cmp::Ordering::Equal)
213        });
214
215        // Apply offset and limit, return just the results
216        Ok(all_results
217            .into_iter()
218            .skip(offset)
219            .take(limit)
220            .map(|(_, result)| result)
221            .collect())
222    }
223}
224
225/// IndexReader - manages Searcher with reload policy
226///
227/// The IndexReader periodically reloads its Searcher to pick up new segments.
228/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
229#[cfg(feature = "native")]
230pub struct IndexReader<D: DirectoryWriter + 'static> {
231    /// Schema
232    schema: Arc<Schema>,
233    /// Segment manager - authoritative source for segments
234    segment_manager: Arc<crate::merge::SegmentManager<D>>,
235    /// Current searcher
236    searcher: RwLock<Arc<Searcher<D>>>,
237    /// Trained centroids
238    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
239    /// Trained codebooks
240    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
241    /// Term cache blocks
242    term_cache_blocks: usize,
243    /// Last reload time
244    last_reload: RwLock<std::time::Instant>,
245    /// Reload interval (default 1 second)
246    reload_interval: std::time::Duration,
247}
248
249#[cfg(feature = "native")]
250impl<D: DirectoryWriter + 'static> IndexReader<D> {
251    /// Create a new searcher from a segment manager
252    pub async fn from_segment_manager(
253        schema: Arc<Schema>,
254        segment_manager: Arc<crate::merge::SegmentManager<D>>,
255        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
256        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
257        term_cache_blocks: usize,
258    ) -> Result<Self> {
259        let reader = Self::create_reader(
260            &schema,
261            &segment_manager,
262            &trained_centroids,
263            &trained_codebooks,
264            term_cache_blocks,
265        )
266        .await?;
267
268        Ok(Self {
269            schema,
270            segment_manager,
271            searcher: RwLock::new(Arc::new(reader)),
272            trained_centroids,
273            trained_codebooks,
274            term_cache_blocks,
275            last_reload: RwLock::new(std::time::Instant::now()),
276            reload_interval: std::time::Duration::from_secs(1),
277        })
278    }
279
280    /// Create a new reader with fresh snapshot from segment manager
281    /// This avoids race conditions by using SegmentManager's locked metadata
282    async fn create_reader(
283        schema: &Arc<Schema>,
284        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
285        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
286        trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
287        term_cache_blocks: usize,
288    ) -> Result<Searcher<D>> {
289        // Use SegmentManager's acquire_snapshot - this locks metadata and tracker together
290        let snapshot = segment_manager.acquire_snapshot().await;
291
292        Searcher::from_snapshot(
293            segment_manager.directory(),
294            Arc::clone(schema),
295            snapshot,
296            trained_centroids.clone(),
297            trained_codebooks.clone(),
298            term_cache_blocks,
299        )
300        .await
301    }
302
303    /// Set reload interval
304    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
305        self.reload_interval = interval;
306    }
307
308    /// Get current searcher (reloads if interval exceeded)
309    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
310        // Check if reload needed
311        let should_reload = {
312            let last = self.last_reload.read();
313            last.elapsed() >= self.reload_interval
314        };
315
316        if should_reload {
317            self.reload().await?;
318        }
319
320        Ok(Arc::clone(&*self.searcher.read()))
321    }
322
323    /// Force reload reader with fresh snapshot
324    pub async fn reload(&self) -> Result<()> {
325        let new_reader = Self::create_reader(
326            &self.schema,
327            &self.segment_manager,
328            &self.trained_centroids,
329            &self.trained_codebooks,
330            self.term_cache_blocks,
331        )
332        .await?;
333
334        // Swap in new searcher (old one will be dropped when last ref released)
335        *self.searcher.write() = Arc::new(new_reader);
336        *self.last_reload.write() = std::time::Instant::now();
337
338        Ok(())
339    }
340
341    /// Get schema
342    pub fn schema(&self) -> &Schema {
343        &self.schema
344    }
345}