Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::{CoarseCentroids, PQCodebook};
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22pub struct IndexReader<D: DirectoryWriter + 'static> {
23    /// Schema
24    schema: Arc<Schema>,
25    /// Segment manager - authoritative source for segments
26    segment_manager: Arc<crate::merge::SegmentManager<D>>,
27    /// Current searcher
28    searcher: RwLock<Arc<Searcher<D>>>,
29    /// Trained centroids
30    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31    /// Trained codebooks
32    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
33    /// Term cache blocks
34    term_cache_blocks: usize,
35    /// Last reload check time
36    last_reload_check: RwLock<std::time::Instant>,
37    /// Reload check interval (default 1 second)
38    reload_check_interval: std::time::Duration,
39    /// Current segment IDs (to detect changes)
40    current_segment_ids: RwLock<Vec<String>>,
41}
42
43impl<D: DirectoryWriter + 'static> IndexReader<D> {
44    /// Create a new searcher from a segment manager
45    pub async fn from_segment_manager(
46        schema: Arc<Schema>,
47        segment_manager: Arc<crate::merge::SegmentManager<D>>,
48        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
49        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
50        term_cache_blocks: usize,
51    ) -> Result<Self> {
52        Self::from_segment_manager_with_reload_interval(
53            schema,
54            segment_manager,
55            trained_centroids,
56            trained_codebooks,
57            term_cache_blocks,
58            1000, // default 1 second
59        )
60        .await
61    }
62
63    /// Create a new searcher from a segment manager with custom reload interval
64    pub async fn from_segment_manager_with_reload_interval(
65        schema: Arc<Schema>,
66        segment_manager: Arc<crate::merge::SegmentManager<D>>,
67        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
68        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
69        term_cache_blocks: usize,
70        reload_interval_ms: u64,
71    ) -> Result<Self> {
72        // Get initial segment IDs
73        let initial_segment_ids = segment_manager.get_segment_ids().await;
74
75        let reader = Self::create_reader(
76            &schema,
77            &segment_manager,
78            &trained_centroids,
79            &trained_codebooks,
80            term_cache_blocks,
81        )
82        .await?;
83
84        Ok(Self {
85            schema,
86            segment_manager,
87            searcher: RwLock::new(Arc::new(reader)),
88            trained_centroids,
89            trained_codebooks,
90            term_cache_blocks,
91            last_reload_check: RwLock::new(std::time::Instant::now()),
92            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
93            current_segment_ids: RwLock::new(initial_segment_ids),
94        })
95    }
96
97    /// Create a new reader with fresh snapshot from segment manager
98    /// This avoids race conditions by using SegmentManager's locked metadata
99    async fn create_reader(
100        schema: &Arc<Schema>,
101        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
102        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
103        trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
104        term_cache_blocks: usize,
105    ) -> Result<Searcher<D>> {
106        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
107        let snapshot = segment_manager.acquire_snapshot().await;
108
109        Searcher::from_snapshot(
110            segment_manager.directory(),
111            Arc::clone(schema),
112            snapshot,
113            trained_centroids.clone(),
114            trained_codebooks.clone(),
115            term_cache_blocks,
116        )
117        .await
118    }
119
120    /// Set reload check interval
121    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
122        self.reload_check_interval = interval;
123    }
124
125    /// Get current searcher (reloads only if segments changed)
126    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
127        // Check if we should check for segment changes
128        let should_check = {
129            let last = self.last_reload_check.read();
130            last.elapsed() >= self.reload_check_interval
131        };
132
133        if should_check {
134            // Update check time first to avoid concurrent checks
135            *self.last_reload_check.write() = std::time::Instant::now();
136
137            // Get current segment IDs from segment manager (non-blocking)
138            let new_segment_ids = self.segment_manager.get_segment_ids().await;
139
140            // Check if segments actually changed
141            let segments_changed = {
142                let current = self.current_segment_ids.read();
143                *current != new_segment_ids
144            };
145
146            if segments_changed {
147                let old_count = self.current_segment_ids.read().len();
148                let new_count = new_segment_ids.len();
149                log::info!(
150                    "[index_reload] old_count={} new_count={}",
151                    old_count,
152                    new_count
153                );
154                self.reload_with_segments(new_segment_ids).await?;
155            }
156        }
157
158        Ok(Arc::clone(&*self.searcher.read()))
159    }
160
161    /// Force reload reader with fresh snapshot
162    pub async fn reload(&self) -> Result<()> {
163        let new_segment_ids = self.segment_manager.get_segment_ids().await;
164        self.reload_with_segments(new_segment_ids).await
165    }
166
167    /// Internal reload with specific segment IDs
168    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
169        let new_reader = Self::create_reader(
170            &self.schema,
171            &self.segment_manager,
172            &self.trained_centroids,
173            &self.trained_codebooks,
174            self.term_cache_blocks,
175        )
176        .await?;
177
178        // Swap in new searcher and update segment IDs
179        *self.searcher.write() = Arc::new(new_reader);
180        *self.current_segment_ids.write() = new_segment_ids;
181
182        Ok(())
183    }
184
185    /// Get schema
186    pub fn schema(&self) -> &Schema {
187        &self.schema
188    }
189}