Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use parking_lot::RwLock;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14
15use super::Searcher;
16
17/// IndexReader - manages Searcher with reload policy
18///
19/// The IndexReader periodically reloads its Searcher to pick up new segments.
20/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
21pub struct IndexReader<D: DirectoryWriter + 'static> {
22    /// Schema
23    schema: Arc<Schema>,
24    /// Segment manager - authoritative source for segments
25    segment_manager: Arc<crate::merge::SegmentManager<D>>,
26    /// Current searcher
27    searcher: RwLock<Arc<Searcher<D>>>,
28    /// Term cache blocks
29    term_cache_blocks: usize,
30    /// Last reload check time
31    last_reload_check: RwLock<std::time::Instant>,
32    /// Reload check interval (default 1 second)
33    reload_check_interval: std::time::Duration,
34    /// Current segment IDs (to detect changes)
35    current_segment_ids: RwLock<Vec<String>>,
36    /// Guard against concurrent reloads
37    reloading: AtomicBool,
38}
39
40impl<D: DirectoryWriter + 'static> IndexReader<D> {
41    /// Create a new IndexReader from a segment manager
42    ///
43    /// Centroids are loaded dynamically from metadata on each reload,
44    /// so the reader always picks up centroids trained after Index::create().
45    pub async fn from_segment_manager(
46        schema: Arc<Schema>,
47        segment_manager: Arc<crate::merge::SegmentManager<D>>,
48        term_cache_blocks: usize,
49        reload_interval_ms: u64,
50    ) -> Result<Self> {
51        // Get initial segment IDs
52        let initial_segment_ids = segment_manager.get_segment_ids().await;
53
54        let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
55
56        Ok(Self {
57            schema,
58            segment_manager,
59            searcher: RwLock::new(Arc::new(reader)),
60            term_cache_blocks,
61            last_reload_check: RwLock::new(std::time::Instant::now()),
62            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
63            current_segment_ids: RwLock::new(initial_segment_ids),
64            reloading: AtomicBool::new(false),
65        })
66    }
67
68    /// Create a new reader with fresh snapshot from segment manager
69    ///
70    /// Reads trained centroids from SegmentManager's ArcSwap (lock-free).
71    async fn create_reader(
72        schema: &Arc<Schema>,
73        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
74        term_cache_blocks: usize,
75    ) -> Result<Searcher<D>> {
76        // Read trained centroids from ArcSwap (lock-free)
77        let trained = segment_manager.trained();
78        let trained_centroids = trained
79            .as_ref()
80            .map(|t| t.centroids.clone())
81            .unwrap_or_default();
82
83        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
84        let snapshot = segment_manager.acquire_snapshot().await;
85
86        Searcher::from_snapshot(
87            segment_manager.directory(),
88            Arc::clone(schema),
89            snapshot,
90            trained_centroids,
91            term_cache_blocks,
92        )
93        .await
94    }
95
96    /// Set reload check interval
97    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
98        self.reload_check_interval = interval;
99    }
100
101    /// Get current searcher (reloads only if segments changed)
102    ///
103    /// Uses an `AtomicBool` guard to prevent concurrent reloads: if another
104    /// caller is already reloading, subsequent callers return the current
105    /// searcher immediately instead of triggering a duplicate reload.
106    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
107        // Check if we should check for segment changes
108        let should_check = {
109            let last = self.last_reload_check.read();
110            last.elapsed() >= self.reload_check_interval
111        };
112
113        if should_check {
114            // Try to acquire the reload guard (non-blocking)
115            if self
116                .reloading
117                .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
118                .is_ok()
119            {
120                // We won the race — do the reload check
121                let result = self.do_reload_check().await;
122                self.reloading.store(false, Ordering::Release);
123                result?;
124            }
125            // Otherwise another reload is in progress — just return current searcher
126        }
127
128        Ok(Arc::clone(&*self.searcher.read()))
129    }
130
131    /// Actual reload check (called under the `reloading` guard)
132    async fn do_reload_check(&self) -> Result<()> {
133        *self.last_reload_check.write() = std::time::Instant::now();
134
135        // Get current segment IDs from segment manager
136        let new_segment_ids = self.segment_manager.get_segment_ids().await;
137
138        // Check if segments actually changed
139        let segments_changed = {
140            let current = self.current_segment_ids.read();
141            *current != new_segment_ids
142        };
143
144        if segments_changed {
145            let old_count = self.current_segment_ids.read().len();
146            let new_count = new_segment_ids.len();
147            log::info!(
148                "[index_reload] old_count={} new_count={}",
149                old_count,
150                new_count
151            );
152            self.reload_with_segments(new_segment_ids).await?;
153        }
154        Ok(())
155    }
156
157    /// Force reload reader with fresh snapshot
158    pub async fn reload(&self) -> Result<()> {
159        let new_segment_ids = self.segment_manager.get_segment_ids().await;
160        self.reload_with_segments(new_segment_ids).await
161    }
162
163    /// Internal reload with specific segment IDs
164    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
165        let new_reader =
166            Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
167                .await?;
168
169        // Swap in new searcher and update segment IDs
170        *self.searcher.write() = Arc::new(new_reader);
171        *self.current_segment_ids.write() = new_segment_ids;
172
173        Ok(())
174    }
175
176    /// Get schema
177    pub fn schema(&self) -> &Schema {
178        &self.schema
179    }
180}