hermes_core/index/
reader.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use arc_swap::ArcSwap;
10use parking_lot::RwLock;
11
12use crate::directories::DirectoryWriter;
13use crate::dsl::Schema;
14use crate::error::Result;
15
16use super::Searcher;
17
18struct SearcherState<D: DirectoryWriter + 'static> {
24 searcher: Arc<Searcher<D>>,
25 segment_ids: Vec<String>,
26}
27
28pub struct IndexReader<D: DirectoryWriter + 'static> {
29 schema: Arc<Schema>,
31 segment_manager: Arc<crate::merge::SegmentManager<D>>,
33 state: ArcSwap<SearcherState<D>>,
35 term_cache_blocks: usize,
37 last_reload_check: RwLock<std::time::Instant>,
39 reload_check_interval: std::time::Duration,
41 reloading: AtomicBool,
43}
44
45impl<D: DirectoryWriter + 'static> IndexReader<D> {
46 pub async fn from_segment_manager(
51 schema: Arc<Schema>,
52 segment_manager: Arc<crate::merge::SegmentManager<D>>,
53 term_cache_blocks: usize,
54 reload_interval_ms: u64,
55 ) -> Result<Self> {
56 let initial_segment_ids = segment_manager.get_segment_ids().await;
58
59 let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
60
61 Ok(Self {
62 schema,
63 segment_manager,
64 state: ArcSwap::from_pointee(SearcherState {
65 searcher: Arc::new(reader),
66 segment_ids: initial_segment_ids,
67 }),
68 term_cache_blocks,
69 last_reload_check: RwLock::new(std::time::Instant::now()),
70 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
71 reloading: AtomicBool::new(false),
72 })
73 }
74
75 async fn create_reader(
79 schema: &Arc<Schema>,
80 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
81 term_cache_blocks: usize,
82 ) -> Result<Searcher<D>> {
83 let trained = segment_manager.trained();
85 let trained_centroids = trained
86 .as_ref()
87 .map(|t| t.centroids.clone())
88 .unwrap_or_default();
89
90 let snapshot = segment_manager.acquire_snapshot().await;
92
93 Searcher::from_snapshot(
94 segment_manager.directory(),
95 Arc::clone(schema),
96 snapshot,
97 trained_centroids,
98 term_cache_blocks,
99 )
100 .await
101 }
102
103 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
105 self.reload_check_interval = interval;
106 }
107
108 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
113 let should_check = {
115 let last = self.last_reload_check.read();
116 last.elapsed() >= self.reload_check_interval
117 };
118
119 if should_check {
120 if self
122 .reloading
123 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
124 .is_ok()
125 {
126 let result = self.do_reload_check().await;
128 self.reloading.store(false, Ordering::Release);
129 result?;
130 }
131 }
133
134 Ok(Arc::clone(&self.state.load().searcher))
136 }
137
138 async fn do_reload_check(&self) -> Result<()> {
140 *self.last_reload_check.write() = std::time::Instant::now();
141
142 let new_segment_ids = self.segment_manager.get_segment_ids().await;
144
145 let segments_changed = {
147 let state = self.state.load();
148 state.segment_ids != new_segment_ids
149 };
150
151 if segments_changed {
152 let old_count = self.state.load().segment_ids.len();
153 let new_count = new_segment_ids.len();
154 log::info!(
155 "[index_reload] old_count={} new_count={}",
156 old_count,
157 new_count
158 );
159 self.reload_with_segments(new_segment_ids).await?;
160 }
161 Ok(())
162 }
163
164 pub async fn reload(&self) -> Result<()> {
171 loop {
175 if self
176 .reloading
177 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
178 .is_ok()
179 {
180 break;
181 }
182 tokio::task::yield_now().await;
183 }
184 let new_segment_ids = self.segment_manager.get_segment_ids().await;
185 let result = self.reload_with_segments(new_segment_ids).await;
186 self.reloading.store(false, Ordering::Release);
187 result
188 }
189
190 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
193 let new_reader =
194 Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
195 .await?;
196
197 self.state.store(Arc::new(SearcherState {
199 searcher: Arc::new(new_reader),
200 segment_ids: new_segment_ids,
201 }));
202
203 Ok(())
204 }
205
206 pub fn schema(&self) -> &Schema {
208 &self.schema
209 }
210}