Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::directories::DirectoryWriter;
11use crate::dsl::Schema;
12use crate::error::Result;
13
14use super::Searcher;
15
16/// IndexReader - manages Searcher with reload policy
17///
18/// The IndexReader periodically reloads its Searcher to pick up new segments.
19/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
20pub struct IndexReader<D: DirectoryWriter + 'static> {
21    /// Schema
22    schema: Arc<Schema>,
23    /// Segment manager - authoritative source for segments
24    segment_manager: Arc<crate::merge::SegmentManager<D>>,
25    /// Current searcher
26    searcher: RwLock<Arc<Searcher<D>>>,
27    /// Term cache blocks
28    term_cache_blocks: usize,
29    /// Last reload check time
30    last_reload_check: RwLock<std::time::Instant>,
31    /// Reload check interval (default 1 second)
32    reload_check_interval: std::time::Duration,
33    /// Current segment IDs (to detect changes)
34    current_segment_ids: RwLock<Vec<String>>,
35}
36
37impl<D: DirectoryWriter + 'static> IndexReader<D> {
38    /// Create a new IndexReader from a segment manager
39    ///
40    /// Centroids are loaded dynamically from metadata on each reload,
41    /// so the reader always picks up centroids trained after Index::create().
42    pub async fn from_segment_manager(
43        schema: Arc<Schema>,
44        segment_manager: Arc<crate::merge::SegmentManager<D>>,
45        term_cache_blocks: usize,
46        reload_interval_ms: u64,
47    ) -> Result<Self> {
48        // Get initial segment IDs
49        let initial_segment_ids = segment_manager.get_segment_ids().await;
50
51        let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
52
53        Ok(Self {
54            schema,
55            segment_manager,
56            searcher: RwLock::new(Arc::new(reader)),
57            term_cache_blocks,
58            last_reload_check: RwLock::new(std::time::Instant::now()),
59            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
60            current_segment_ids: RwLock::new(initial_segment_ids),
61        })
62    }
63
64    /// Create a new reader with fresh snapshot from segment manager
65    ///
66    /// Loads trained centroids dynamically from metadata on every call,
67    /// so the reader always picks up centroids trained after Index::create().
68    async fn create_reader(
69        schema: &Arc<Schema>,
70        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
71        term_cache_blocks: usize,
72    ) -> Result<Searcher<D>> {
73        // Load trained centroids from metadata (always fresh)
74        let trained = {
75            let metadata_arc = segment_manager.metadata();
76            let meta = metadata_arc.read().await;
77            meta.load_trained_structures(segment_manager.directory().as_ref())
78                .await
79        };
80        let trained_centroids = trained
81            .as_ref()
82            .map(|t| t.centroids.clone())
83            .unwrap_or_default();
84
85        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
86        let snapshot = segment_manager.acquire_snapshot().await;
87
88        Searcher::from_snapshot(
89            segment_manager.directory(),
90            Arc::clone(schema),
91            snapshot,
92            trained_centroids,
93            term_cache_blocks,
94        )
95        .await
96    }
97
98    /// Set reload check interval
99    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
100        self.reload_check_interval = interval;
101    }
102
103    /// Get current searcher (reloads only if segments changed)
104    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
105        // Check if we should check for segment changes
106        let should_check = {
107            let last = self.last_reload_check.read();
108            last.elapsed() >= self.reload_check_interval
109        };
110
111        if should_check {
112            // Update check time first to avoid concurrent checks
113            *self.last_reload_check.write() = std::time::Instant::now();
114
115            // Get current segment IDs from segment manager (non-blocking)
116            let new_segment_ids = self.segment_manager.get_segment_ids().await;
117
118            // Check if segments actually changed
119            let segments_changed = {
120                let current = self.current_segment_ids.read();
121                *current != new_segment_ids
122            };
123
124            if segments_changed {
125                let old_count = self.current_segment_ids.read().len();
126                let new_count = new_segment_ids.len();
127                log::info!(
128                    "[index_reload] old_count={} new_count={}",
129                    old_count,
130                    new_count
131                );
132                self.reload_with_segments(new_segment_ids).await?;
133            }
134        }
135
136        Ok(Arc::clone(&*self.searcher.read()))
137    }
138
139    /// Force reload reader with fresh snapshot
140    pub async fn reload(&self) -> Result<()> {
141        let new_segment_ids = self.segment_manager.get_segment_ids().await;
142        self.reload_with_segments(new_segment_ids).await
143    }
144
145    /// Internal reload with specific segment IDs
146    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
147        let new_reader =
148            Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
149                .await?;
150
151        // Swap in new searcher and update segment IDs
152        *self.searcher.write() = Arc::new(new_reader);
153        *self.current_segment_ids.write() = new_segment_ids;
154
155        Ok(())
156    }
157
158    /// Get schema
159    pub fn schema(&self) -> &Schema {
160        &self.schema
161    }
162}