Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::CoarseCentroids;
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22pub struct IndexReader<D: DirectoryWriter + 'static> {
23    /// Schema
24    schema: Arc<Schema>,
25    /// Segment manager - authoritative source for segments
26    segment_manager: Arc<crate::merge::SegmentManager<D>>,
27    /// Current searcher
28    searcher: RwLock<Arc<Searcher<D>>>,
29    /// Trained centroids (injected into segment readers for IVF/ScaNN search)
30    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31    /// Term cache blocks
32    term_cache_blocks: usize,
33    /// Last reload check time
34    last_reload_check: RwLock<std::time::Instant>,
35    /// Reload check interval (default 1 second)
36    reload_check_interval: std::time::Duration,
37    /// Current segment IDs (to detect changes)
38    current_segment_ids: RwLock<Vec<String>>,
39}
40
41impl<D: DirectoryWriter + 'static> IndexReader<D> {
42    /// Create a new searcher from a segment manager
43    pub async fn from_segment_manager(
44        schema: Arc<Schema>,
45        segment_manager: Arc<crate::merge::SegmentManager<D>>,
46        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
47        term_cache_blocks: usize,
48    ) -> Result<Self> {
49        Self::from_segment_manager_with_reload_interval(
50            schema,
51            segment_manager,
52            trained_centroids,
53            term_cache_blocks,
54            1000, // default 1 second
55        )
56        .await
57    }
58
59    /// Create a new searcher from a segment manager with custom reload interval
60    pub async fn from_segment_manager_with_reload_interval(
61        schema: Arc<Schema>,
62        segment_manager: Arc<crate::merge::SegmentManager<D>>,
63        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
64        term_cache_blocks: usize,
65        reload_interval_ms: u64,
66    ) -> Result<Self> {
67        // Get initial segment IDs
68        let initial_segment_ids = segment_manager.get_segment_ids().await;
69
70        let reader = Self::create_reader(
71            &schema,
72            &segment_manager,
73            &trained_centroids,
74            term_cache_blocks,
75        )
76        .await?;
77
78        Ok(Self {
79            schema,
80            segment_manager,
81            searcher: RwLock::new(Arc::new(reader)),
82            trained_centroids,
83            term_cache_blocks,
84            last_reload_check: RwLock::new(std::time::Instant::now()),
85            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
86            current_segment_ids: RwLock::new(initial_segment_ids),
87        })
88    }
89
90    /// Create a new reader with fresh snapshot from segment manager
91    /// This avoids race conditions by using SegmentManager's locked metadata
92    async fn create_reader(
93        schema: &Arc<Schema>,
94        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
95        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
96        term_cache_blocks: usize,
97    ) -> Result<Searcher<D>> {
98        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
99        let snapshot = segment_manager.acquire_snapshot().await;
100
101        Searcher::from_snapshot(
102            segment_manager.directory(),
103            Arc::clone(schema),
104            snapshot,
105            trained_centroids.clone(),
106            term_cache_blocks,
107        )
108        .await
109    }
110
111    /// Set reload check interval
112    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
113        self.reload_check_interval = interval;
114    }
115
116    /// Get current searcher (reloads only if segments changed)
117    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
118        // Check if we should check for segment changes
119        let should_check = {
120            let last = self.last_reload_check.read();
121            last.elapsed() >= self.reload_check_interval
122        };
123
124        if should_check {
125            // Update check time first to avoid concurrent checks
126            *self.last_reload_check.write() = std::time::Instant::now();
127
128            // Get current segment IDs from segment manager (non-blocking)
129            let new_segment_ids = self.segment_manager.get_segment_ids().await;
130
131            // Check if segments actually changed
132            let segments_changed = {
133                let current = self.current_segment_ids.read();
134                *current != new_segment_ids
135            };
136
137            if segments_changed {
138                let old_count = self.current_segment_ids.read().len();
139                let new_count = new_segment_ids.len();
140                log::info!(
141                    "[index_reload] old_count={} new_count={}",
142                    old_count,
143                    new_count
144                );
145                self.reload_with_segments(new_segment_ids).await?;
146            }
147        }
148
149        Ok(Arc::clone(&*self.searcher.read()))
150    }
151
152    /// Force reload reader with fresh snapshot
153    pub async fn reload(&self) -> Result<()> {
154        let new_segment_ids = self.segment_manager.get_segment_ids().await;
155        self.reload_with_segments(new_segment_ids).await
156    }
157
158    /// Internal reload with specific segment IDs
159    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
160        let new_reader = Self::create_reader(
161            &self.schema,
162            &self.segment_manager,
163            &self.trained_centroids,
164            self.term_cache_blocks,
165        )
166        .await?;
167
168        // Swap in new searcher and update segment IDs
169        *self.searcher.write() = Arc::new(new_reader);
170        *self.current_segment_ids.write() = new_segment_ids;
171
172        Ok(())
173    }
174
175    /// Get schema
176    pub fn schema(&self) -> &Schema {
177        &self.schema
178    }
179}