Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::CoarseCentroids;
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22pub struct IndexReader<D: DirectoryWriter + 'static> {
23    /// Schema
24    schema: Arc<Schema>,
25    /// Segment manager - authoritative source for segments
26    segment_manager: Arc<crate::merge::SegmentManager<D>>,
27    /// Current searcher
28    searcher: RwLock<Arc<Searcher<D>>>,
29    /// Term cache blocks
30    term_cache_blocks: usize,
31    /// Last reload check time
32    last_reload_check: RwLock<std::time::Instant>,
33    /// Reload check interval (default 1 second)
34    reload_check_interval: std::time::Duration,
35    /// Current segment IDs (to detect changes)
36    current_segment_ids: RwLock<Vec<String>>,
37}
38
39impl<D: DirectoryWriter + 'static> IndexReader<D> {
40    /// Create a new searcher from a segment manager
41    pub async fn from_segment_manager(
42        schema: Arc<Schema>,
43        segment_manager: Arc<crate::merge::SegmentManager<D>>,
44        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
45        term_cache_blocks: usize,
46    ) -> Result<Self> {
47        let _ = trained_centroids; // centroids now loaded dynamically from metadata
48        Self::from_segment_manager_with_reload_interval(
49            schema,
50            segment_manager,
51            term_cache_blocks,
52            1000, // default 1 second
53        )
54        .await
55    }
56
57    /// Create a new searcher from a segment manager with custom reload interval
58    pub async fn from_segment_manager_with_reload_interval(
59        schema: Arc<Schema>,
60        segment_manager: Arc<crate::merge::SegmentManager<D>>,
61        term_cache_blocks: usize,
62        reload_interval_ms: u64,
63    ) -> Result<Self> {
64        // Get initial segment IDs
65        let initial_segment_ids = segment_manager.get_segment_ids().await;
66
67        let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
68
69        Ok(Self {
70            schema,
71            segment_manager,
72            searcher: RwLock::new(Arc::new(reader)),
73            term_cache_blocks,
74            last_reload_check: RwLock::new(std::time::Instant::now()),
75            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
76            current_segment_ids: RwLock::new(initial_segment_ids),
77        })
78    }
79
80    /// Create a new reader with fresh snapshot from segment manager
81    ///
82    /// Loads trained centroids dynamically from metadata on every call,
83    /// so the reader always picks up centroids trained after Index::create().
84    async fn create_reader(
85        schema: &Arc<Schema>,
86        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
87        term_cache_blocks: usize,
88    ) -> Result<Searcher<D>> {
89        // Load trained centroids from metadata (always fresh)
90        let trained = {
91            let metadata_arc = segment_manager.metadata();
92            let meta = metadata_arc.read().await;
93            meta.load_trained_structures(segment_manager.directory().as_ref())
94                .await
95        };
96        let trained_centroids = trained
97            .as_ref()
98            .map(|t| t.centroids.clone())
99            .unwrap_or_default();
100
101        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
102        let snapshot = segment_manager.acquire_snapshot().await;
103
104        Searcher::from_snapshot(
105            segment_manager.directory(),
106            Arc::clone(schema),
107            snapshot,
108            trained_centroids,
109            term_cache_blocks,
110        )
111        .await
112    }
113
114    /// Set reload check interval
115    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
116        self.reload_check_interval = interval;
117    }
118
119    /// Get current searcher (reloads only if segments changed)
120    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
121        // Check if we should check for segment changes
122        let should_check = {
123            let last = self.last_reload_check.read();
124            last.elapsed() >= self.reload_check_interval
125        };
126
127        if should_check {
128            // Update check time first to avoid concurrent checks
129            *self.last_reload_check.write() = std::time::Instant::now();
130
131            // Get current segment IDs from segment manager (non-blocking)
132            let new_segment_ids = self.segment_manager.get_segment_ids().await;
133
134            // Check if segments actually changed
135            let segments_changed = {
136                let current = self.current_segment_ids.read();
137                *current != new_segment_ids
138            };
139
140            if segments_changed {
141                let old_count = self.current_segment_ids.read().len();
142                let new_count = new_segment_ids.len();
143                log::info!(
144                    "[index_reload] old_count={} new_count={}",
145                    old_count,
146                    new_count
147                );
148                self.reload_with_segments(new_segment_ids).await?;
149            }
150        }
151
152        Ok(Arc::clone(&*self.searcher.read()))
153    }
154
155    /// Force reload reader with fresh snapshot
156    pub async fn reload(&self) -> Result<()> {
157        let new_segment_ids = self.segment_manager.get_segment_ids().await;
158        self.reload_with_segments(new_segment_ids).await
159    }
160
161    /// Internal reload with specific segment IDs
162    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
163        let new_reader =
164            Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
165                .await?;
166
167        // Swap in new searcher and update segment IDs
168        *self.searcher.write() = Arc::new(new_reader);
169        *self.current_segment_ids.write() = new_segment_ids;
170
171        Ok(())
172    }
173
174    /// Get schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178}