hermes_core/index/
reader.rs1use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use parking_lot::RwLock;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14
15use super::Searcher;
16
17pub struct IndexReader<D: DirectoryWriter + 'static> {
22 schema: Arc<Schema>,
24 segment_manager: Arc<crate::merge::SegmentManager<D>>,
26 searcher: RwLock<Arc<Searcher<D>>>,
28 term_cache_blocks: usize,
30 last_reload_check: RwLock<std::time::Instant>,
32 reload_check_interval: std::time::Duration,
34 current_segment_ids: RwLock<Vec<String>>,
36 reloading: AtomicBool,
38}
39
40impl<D: DirectoryWriter + 'static> IndexReader<D> {
41 pub async fn from_segment_manager(
46 schema: Arc<Schema>,
47 segment_manager: Arc<crate::merge::SegmentManager<D>>,
48 term_cache_blocks: usize,
49 reload_interval_ms: u64,
50 ) -> Result<Self> {
51 let initial_segment_ids = segment_manager.get_segment_ids().await;
53
54 let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
55
56 Ok(Self {
57 schema,
58 segment_manager,
59 searcher: RwLock::new(Arc::new(reader)),
60 term_cache_blocks,
61 last_reload_check: RwLock::new(std::time::Instant::now()),
62 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
63 current_segment_ids: RwLock::new(initial_segment_ids),
64 reloading: AtomicBool::new(false),
65 })
66 }
67
68 async fn create_reader(
72 schema: &Arc<Schema>,
73 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
74 term_cache_blocks: usize,
75 ) -> Result<Searcher<D>> {
76 let trained = segment_manager.trained();
78 let trained_centroids = trained
79 .as_ref()
80 .map(|t| t.centroids.clone())
81 .unwrap_or_default();
82
83 let snapshot = segment_manager.acquire_snapshot().await;
85
86 Searcher::from_snapshot(
87 segment_manager.directory(),
88 Arc::clone(schema),
89 snapshot,
90 trained_centroids,
91 term_cache_blocks,
92 )
93 .await
94 }
95
96 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
98 self.reload_check_interval = interval;
99 }
100
101 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
107 let should_check = {
109 let last = self.last_reload_check.read();
110 last.elapsed() >= self.reload_check_interval
111 };
112
113 if should_check {
114 if self
116 .reloading
117 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
118 .is_ok()
119 {
120 let result = self.do_reload_check().await;
122 self.reloading.store(false, Ordering::Release);
123 result?;
124 }
125 }
127
128 Ok(Arc::clone(&*self.searcher.read()))
129 }
130
131 async fn do_reload_check(&self) -> Result<()> {
133 *self.last_reload_check.write() = std::time::Instant::now();
134
135 let new_segment_ids = self.segment_manager.get_segment_ids().await;
137
138 let segments_changed = {
140 let current = self.current_segment_ids.read();
141 *current != new_segment_ids
142 };
143
144 if segments_changed {
145 let old_count = self.current_segment_ids.read().len();
146 let new_count = new_segment_ids.len();
147 log::info!(
148 "[index_reload] old_count={} new_count={}",
149 old_count,
150 new_count
151 );
152 self.reload_with_segments(new_segment_ids).await?;
153 }
154 Ok(())
155 }
156
157 pub async fn reload(&self) -> Result<()> {
159 let new_segment_ids = self.segment_manager.get_segment_ids().await;
160 self.reload_with_segments(new_segment_ids).await
161 }
162
163 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
165 let new_reader =
166 Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
167 .await?;
168
169 *self.searcher.write() = Arc::new(new_reader);
171 *self.current_segment_ids.write() = new_segment_ids;
172
173 Ok(())
174 }
175
176 pub fn schema(&self) -> &Schema {
178 &self.schema
179 }
180}