Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::directories::DirectoryWriter;
11use crate::dsl::Schema;
12use crate::error::Result;
13
14use super::Searcher;
15
16/// IndexReader - manages Searcher with reload policy
17///
18/// The IndexReader periodically reloads its Searcher to pick up new segments.
19/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
20pub struct IndexReader<D: DirectoryWriter + 'static> {
21    /// Schema
22    schema: Arc<Schema>,
23    /// Segment manager - authoritative source for segments
24    segment_manager: Arc<crate::merge::SegmentManager<D>>,
25    /// Current searcher
26    searcher: RwLock<Arc<Searcher<D>>>,
27    /// Term cache blocks
28    term_cache_blocks: usize,
29    /// Last reload check time
30    last_reload_check: RwLock<std::time::Instant>,
31    /// Reload check interval (default 1 second)
32    reload_check_interval: std::time::Duration,
33    /// Current segment IDs (to detect changes)
34    current_segment_ids: RwLock<Vec<String>>,
35}
36
37impl<D: DirectoryWriter + 'static> IndexReader<D> {
38    /// Create a new IndexReader from a segment manager
39    ///
40    /// Centroids are loaded dynamically from metadata on each reload,
41    /// so the reader always picks up centroids trained after Index::create().
42    pub async fn from_segment_manager(
43        schema: Arc<Schema>,
44        segment_manager: Arc<crate::merge::SegmentManager<D>>,
45        term_cache_blocks: usize,
46        reload_interval_ms: u64,
47    ) -> Result<Self> {
48        // Get initial segment IDs
49        let initial_segment_ids = segment_manager.get_segment_ids().await;
50
51        let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
52
53        Ok(Self {
54            schema,
55            segment_manager,
56            searcher: RwLock::new(Arc::new(reader)),
57            term_cache_blocks,
58            last_reload_check: RwLock::new(std::time::Instant::now()),
59            reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
60            current_segment_ids: RwLock::new(initial_segment_ids),
61        })
62    }
63
64    /// Create a new reader with fresh snapshot from segment manager
65    ///
66    /// Reads trained centroids from SegmentManager's ArcSwap (lock-free).
67    async fn create_reader(
68        schema: &Arc<Schema>,
69        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
70        term_cache_blocks: usize,
71    ) -> Result<Searcher<D>> {
72        // Read trained centroids from ArcSwap (lock-free)
73        let trained = segment_manager.trained();
74        let trained_centroids = trained
75            .as_ref()
76            .map(|t| t.centroids.clone())
77            .unwrap_or_default();
78
79        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
80        let snapshot = segment_manager.acquire_snapshot().await;
81
82        Searcher::from_snapshot(
83            segment_manager.directory(),
84            Arc::clone(schema),
85            snapshot,
86            trained_centroids,
87            term_cache_blocks,
88        )
89        .await
90    }
91
92    /// Set reload check interval
93    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
94        self.reload_check_interval = interval;
95    }
96
97    /// Get current searcher (reloads only if segments changed)
98    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
99        // Check if we should check for segment changes
100        let should_check = {
101            let last = self.last_reload_check.read();
102            last.elapsed() >= self.reload_check_interval
103        };
104
105        if should_check {
106            // Update check time first to avoid concurrent checks
107            *self.last_reload_check.write() = std::time::Instant::now();
108
109            // Get current segment IDs from segment manager (non-blocking)
110            let new_segment_ids = self.segment_manager.get_segment_ids().await;
111
112            // Check if segments actually changed
113            let segments_changed = {
114                let current = self.current_segment_ids.read();
115                *current != new_segment_ids
116            };
117
118            if segments_changed {
119                let old_count = self.current_segment_ids.read().len();
120                let new_count = new_segment_ids.len();
121                log::info!(
122                    "[index_reload] old_count={} new_count={}",
123                    old_count,
124                    new_count
125                );
126                self.reload_with_segments(new_segment_ids).await?;
127            }
128        }
129
130        Ok(Arc::clone(&*self.searcher.read()))
131    }
132
133    /// Force reload reader with fresh snapshot
134    pub async fn reload(&self) -> Result<()> {
135        let new_segment_ids = self.segment_manager.get_segment_ids().await;
136        self.reload_with_segments(new_segment_ids).await
137    }
138
139    /// Internal reload with specific segment IDs
140    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
141        let new_reader =
142            Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
143                .await?;
144
145        // Swap in new searcher and update segment IDs
146        *self.searcher.write() = Arc::new(new_reader);
147        *self.current_segment_ids.write() = new_segment_ids;
148
149        Ok(())
150    }
151
152    /// Get schema
153    pub fn schema(&self) -> &Schema {
154        &self.schema
155    }
156}