Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use arc_swap::ArcSwap;
10use parking_lot::RwLock;
11
12use crate::directories::DirectoryWriter;
13use crate::dsl::Schema;
14use crate::error::Result;
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22/// Combined searcher + segment IDs, swapped atomically via ArcSwap (wait-free reads).
23struct SearcherState<D: DirectoryWriter + 'static> {
24    searcher: Arc<Searcher<D>>,
25    segment_ids: Vec<String>,
26}
27
28pub struct IndexReader<D: DirectoryWriter + 'static> {
29    /// Schema
30    schema: Arc<Schema>,
31    /// Segment manager - authoritative source for segments
32    segment_manager: Arc<crate::merge::SegmentManager<D>>,
33    /// Current searcher + segment IDs (ArcSwap for wait-free reads)
34    state: ArcSwap<SearcherState<D>>,
35    /// Term cache blocks
36    term_cache_blocks: usize,
37    /// Last reload check time
38    last_reload_check: RwLock<std::time::Instant>,
39    /// Reload check interval (default 1 second)
40    reload_check_interval: std::time::Duration,
41    /// Guard against concurrent reloads
42    reloading: AtomicBool,
43}
44
45impl<D: DirectoryWriter + 'static> IndexReader<D> {
46    /// Create a new IndexReader from a segment manager
47    ///
48    /// Centroids are loaded dynamically from metadata on each reload,
49    /// so the reader always picks up centroids trained after Index::create().
50    pub async fn from_segment_manager(
51        schema: Arc<Schema>,
52        segment_manager: Arc<crate::merge::SegmentManager<D>>,
53        term_cache_blocks: usize,
54        reload_interval_ms: u64,
55    ) -> Result<Self> {
56        // Get initial segment IDs
57        let initial_segment_ids = segment_manager.get_segment_ids().await;
58
59        let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
60
61        Ok(Self {
62            schema,
63            segment_manager,
64            state: ArcSwap::from_pointee(SearcherState {
65                searcher: Arc::new(reader),
66                segment_ids: initial_segment_ids,
67            }),
68            term_cache_blocks,
69            last_reload_check: RwLock::new(std::time::Instant::now()),
70            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
71            reloading: AtomicBool::new(false),
72        })
73    }
74
75    /// Create a new reader with fresh snapshot from segment manager
76    ///
77    /// Reads trained centroids from SegmentManager's ArcSwap (lock-free).
78    async fn create_reader(
79        schema: &Arc<Schema>,
80        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
81        term_cache_blocks: usize,
82    ) -> Result<Searcher<D>> {
83        // Read trained centroids from ArcSwap (lock-free)
84        let trained = segment_manager.trained();
85        let trained_centroids = trained
86            .as_ref()
87            .map(|t| t.centroids.clone())
88            .unwrap_or_default();
89
90        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
91        let snapshot = segment_manager.acquire_snapshot().await;
92
93        Searcher::from_snapshot(
94            segment_manager.directory(),
95            Arc::clone(schema),
96            snapshot,
97            trained_centroids,
98            term_cache_blocks,
99        )
100        .await
101    }
102
103    /// Set reload check interval
104    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
105        self.reload_check_interval = interval;
106    }
107
108    /// Get current searcher (reloads only if segments changed)
109    ///
110    /// Wait-free read path via ArcSwap::load(). Reload checks are guarded
111    /// by an AtomicBool to prevent concurrent reloads.
112    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
113        // Check if we should check for segment changes
114        let should_check = {
115            let last = self.last_reload_check.read();
116            last.elapsed() >= self.reload_check_interval
117        };
118
119        if should_check {
120            // Try to acquire the reload guard (non-blocking)
121            if self
122                .reloading
123                .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
124                .is_ok()
125            {
126                // We won the race — do the reload check
127                let result = self.do_reload_check().await;
128                self.reloading.store(false, Ordering::Release);
129                result?;
130            }
131            // Otherwise another reload is in progress — just return current searcher
132        }
133
134        // Wait-free load (no lock contention with reloads)
135        Ok(Arc::clone(&self.state.load().searcher))
136    }
137
138    /// Actual reload check (called under the `reloading` guard)
139    async fn do_reload_check(&self) -> Result<()> {
140        *self.last_reload_check.write() = std::time::Instant::now();
141
142        // Get current segment IDs from segment manager
143        let new_segment_ids = self.segment_manager.get_segment_ids().await;
144
145        // Check if segments actually changed (wait-free read)
146        let segments_changed = {
147            let state = self.state.load();
148            state.segment_ids != new_segment_ids
149        };
150
151        if segments_changed {
152            let old_count = self.state.load().segment_ids.len();
153            let new_count = new_segment_ids.len();
154            log::info!(
155                "[index_reload] old_count={} new_count={}",
156                old_count,
157                new_count
158            );
159            self.reload_with_segments(new_segment_ids).await?;
160        }
161        Ok(())
162    }
163
164    /// Force reload reader with fresh snapshot.
165    ///
166    /// Waits for any in-progress reload (from `searcher()`) to finish, then
167    /// performs its own reload with the latest segment IDs. This guarantees
168    /// the reload actually happens — unlike `searcher()` which silently skips
169    /// if another reload is in progress.
170    pub async fn reload(&self) -> Result<()> {
171        // Wait for any in-progress reload to finish, then acquire the guard.
172        // This is critical: a concurrent do_reload_check() may have started
173        // before a commit, so its reload won't see the new segments.
174        loop {
175            if self
176                .reloading
177                .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
178                .is_ok()
179            {
180                break;
181            }
182            tokio::task::yield_now().await;
183        }
184        let new_segment_ids = self.segment_manager.get_segment_ids().await;
185        let result = self.reload_with_segments(new_segment_ids).await;
186        self.reloading.store(false, Ordering::Release);
187        result
188    }
189
190    /// Internal reload with specific segment IDs.
191    /// Atomic swap via ArcSwap::store (wait-free for readers).
192    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
193        let new_reader =
194            Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
195                .await?;
196
197        // Atomic swap — readers see old or new state, never a torn read
198        self.state.store(Arc::new(SearcherState {
199            searcher: Arc::new(new_reader),
200            segment_ids: new_segment_ids,
201        }));
202
203        Ok(())
204    }
205
206    /// Get schema
207    pub fn schema(&self) -> &Schema {
208        &self.schema
209    }
210}