Skip to main content

hermes_core/index/
reader.rs

1//! IndexReader - manages Searcher with reload policy (native only)
2//!
3//! The IndexReader periodically reloads its Searcher to pick up new segments.
4//! Uses SegmentManager as authoritative source for segment state.
5
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::{CoarseCentroids, PQCodebook};
15
16use super::Searcher;
17
18/// IndexReader - manages Searcher with reload policy
19///
20/// The IndexReader periodically reloads its Searcher to pick up new segments.
21/// Uses SegmentManager as authoritative source for segment state (avoids race conditions).
22pub struct IndexReader<D: DirectoryWriter + 'static> {
23    /// Schema
24    schema: Arc<Schema>,
25    /// Segment manager - authoritative source for segments
26    segment_manager: Arc<crate::merge::SegmentManager<D>>,
27    /// Current searcher
28    searcher: RwLock<Arc<Searcher<D>>>,
29    /// Trained centroids
30    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31    /// Trained codebooks
32    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
33    /// Term cache blocks
34    term_cache_blocks: usize,
35    /// Last reload check time
36    last_reload_check: RwLock<std::time::Instant>,
37    /// Reload check interval (default 1 second)
38    reload_check_interval: std::time::Duration,
39    /// Current segment IDs (to detect changes)
40    current_segment_ids: RwLock<Vec<String>>,
41}
42
43impl<D: DirectoryWriter + 'static> IndexReader<D> {
44    /// Create a new searcher from a segment manager
45    pub async fn from_segment_manager(
46        schema: Arc<Schema>,
47        segment_manager: Arc<crate::merge::SegmentManager<D>>,
48        trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
49        trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
50        term_cache_blocks: usize,
51    ) -> Result<Self> {
52        // Get initial segment IDs
53        let initial_segment_ids = segment_manager.get_segment_ids().await;
54
55        let reader = Self::create_reader(
56            &schema,
57            &segment_manager,
58            &trained_centroids,
59            &trained_codebooks,
60            term_cache_blocks,
61        )
62        .await?;
63
64        Ok(Self {
65            schema,
66            segment_manager,
67            searcher: RwLock::new(Arc::new(reader)),
68            trained_centroids,
69            trained_codebooks,
70            term_cache_blocks,
71            last_reload_check: RwLock::new(std::time::Instant::now()),
72            reload_check_interval: std::time::Duration::from_secs(1),
73            current_segment_ids: RwLock::new(initial_segment_ids),
74        })
75    }
76
77    /// Create a new reader with fresh snapshot from segment manager
78    /// This avoids race conditions by using SegmentManager's locked metadata
79    async fn create_reader(
80        schema: &Arc<Schema>,
81        segment_manager: &Arc<crate::merge::SegmentManager<D>>,
82        trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
83        trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
84        term_cache_blocks: usize,
85    ) -> Result<Searcher<D>> {
86        // Use SegmentManager's acquire_snapshot - non-blocking RwLock read
87        let snapshot = segment_manager.acquire_snapshot().await;
88
89        Searcher::from_snapshot(
90            segment_manager.directory(),
91            Arc::clone(schema),
92            snapshot,
93            trained_centroids.clone(),
94            trained_codebooks.clone(),
95            term_cache_blocks,
96        )
97        .await
98    }
99
100    /// Set reload check interval
101    pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
102        self.reload_check_interval = interval;
103    }
104
105    /// Get current searcher (reloads only if segments changed)
106    pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
107        // Check if we should check for segment changes
108        let should_check = {
109            let last = self.last_reload_check.read();
110            last.elapsed() >= self.reload_check_interval
111        };
112
113        if should_check {
114            // Update check time first to avoid concurrent checks
115            *self.last_reload_check.write() = std::time::Instant::now();
116
117            // Get current segment IDs from segment manager (non-blocking)
118            let new_segment_ids = self.segment_manager.get_segment_ids().await;
119
120            // Check if segments actually changed
121            let segments_changed = {
122                let current = self.current_segment_ids.read();
123                *current != new_segment_ids
124            };
125
126            if segments_changed {
127                log::debug!(
128                    "Segments changed, reloading searcher ({} -> {} segments)",
129                    self.current_segment_ids.read().len(),
130                    new_segment_ids.len()
131                );
132                self.reload_with_segments(new_segment_ids).await?;
133            }
134        }
135
136        Ok(Arc::clone(&*self.searcher.read()))
137    }
138
139    /// Force reload reader with fresh snapshot
140    pub async fn reload(&self) -> Result<()> {
141        let new_segment_ids = self.segment_manager.get_segment_ids().await;
142        self.reload_with_segments(new_segment_ids).await
143    }
144
145    /// Internal reload with specific segment IDs
146    async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
147        let new_reader = Self::create_reader(
148            &self.schema,
149            &self.segment_manager,
150            &self.trained_centroids,
151            &self.trained_codebooks,
152            self.term_cache_blocks,
153        )
154        .await?;
155
156        // Swap in new searcher and update segment IDs
157        *self.searcher.write() = Arc::new(new_reader);
158        *self.current_segment_ids.write() = new_segment_ids;
159
160        Ok(())
161    }
162
163    /// Get schema
164    pub fn schema(&self) -> &Schema {
165        &self.schema
166    }
167}