Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::{CoarseCentroids, PQCodebook};
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22pub struct IndexReader<D: DirectoryWriter + 'static> {
23    /// Schema
24    schema: Arc<Schema>,
25    /// Segment manager - authoritative source for segments
26    segment_manager: Arc<crate::merge::SegmentManager<D>>,
27    /// Current searcher
28    searcher: RwLock<Arc<Searcher<D>>>,
29    /// Trained centroids
30    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31    /// Trained codebooks
32    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
33    /// Term cache blocks
34    term_cache_blocks: usize,
35    /// Last reload time
36    last_reload: RwLock<std::time::Instant>,
37    /// Reload interval (default 1 second)
38    reload_interval: std::time::Duration,
39}
40
41impl<D: DirectoryWriter + 'static> IndexReader<D> {
42    /// Create a new searcher from a segment manager
43    pub async fn from_segment_manager(
44        schema: Arc<Schema>,
45        segment_manager: Arc<crate::merge::SegmentManager<D>>,
46        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
47        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
48        term_cache_blocks: usize,
49    ) -> Result<Self> {
50        let reader = Self::create_reader(
51            &schema,
52            &segment_manager,
53            &trained_centroids,
54            &trained_codebooks,
55            term_cache_blocks,
56        )
57        .await?;
58
59        Ok(Self {
60            schema,
61            segment_manager,
62            searcher: RwLock::new(Arc::new(reader)),
63            trained_centroids,
64            trained_codebooks,
65            term_cache_blocks,
66            last_reload: RwLock::new(std::time::Instant::now()),
67            reload_interval: std::time::Duration::from_secs(1),
68        })
69    }
70
71    /// Create a new reader with fresh snapshot from segment manager
72    /// This avoids race conditions by using SegmentManager's locked metadata
73    async fn create_reader(
74        schema: &Arc<Schema>,
75        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
76        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
77        trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
78        term_cache_blocks: usize,
79    ) -> Result<Searcher<D>> {
80        // Use SegmentManager's acquire_snapshot - this locks metadata and tracker together
81        let snapshot = segment_manager.acquire_snapshot().await;
82
83        Searcher::from_snapshot(
84            segment_manager.directory(),
85            Arc::clone(schema),
86            snapshot,
87            trained_centroids.clone(),
88            trained_codebooks.clone(),
89            term_cache_blocks,
90        )
91        .await
92    }
93
94    /// Set reload interval
95    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
96        self.reload_interval = interval;
97    }
98
99    /// Get current searcher (reloads if interval exceeded)
100    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
101        // Check if reload needed
102        let should_reload = {
103            let last = self.last_reload.read();
104            last.elapsed() >= self.reload_interval
105        };
106
107        if should_reload {
108            self.reload().await?;
109        }
110
111        Ok(Arc::clone(&*self.searcher.read()))
112    }
113
114    /// Force reload reader with fresh snapshot
115    pub async fn reload(&self) -> Result<()> {
116        let new_reader = Self::create_reader(
117            &self.schema,
118            &self.segment_manager,
119            &self.trained_centroids,
120            &self.trained_codebooks,
121            self.term_cache_blocks,
122        )
123        .await?;
124
125        // Swap in new searcher (old one will be dropped when last ref released)
126        *self.searcher.write() = Arc::new(new_reader);
127        *self.last_reload.write() = std::time::Instant::now();
128
129        Ok(())
130    }
131
132    /// Get schema
133    pub fn schema(&self) -> &Schema {
134        &self.schema
135    }
136}