hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::CoarseCentroids;
15
16use super::Searcher;
17
18pub struct IndexReader<D: DirectoryWriter + 'static> {
23 schema: Arc<Schema>,
25 segment_manager: Arc<crate::merge::SegmentManager<D>>,
27 searcher: RwLock<Arc<Searcher<D>>>,
29 term_cache_blocks: usize,
31 last_reload_check: RwLock<std::time::Instant>,
33 reload_check_interval: std::time::Duration,
35 current_segment_ids: RwLock<Vec<String>>,
37}
38
39impl<D: DirectoryWriter + 'static> IndexReader<D> {
40 pub async fn from_segment_manager(
42 schema: Arc<Schema>,
43 segment_manager: Arc<crate::merge::SegmentManager<D>>,
44 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
45 term_cache_blocks: usize,
46 ) -> Result<Self> {
47 let _ = trained_centroids; Self::from_segment_manager_with_reload_interval(
49 schema,
50 segment_manager,
51 term_cache_blocks,
52 1000, )
54 .await
55 }
56
57 pub async fn from_segment_manager_with_reload_interval(
59 schema: Arc<Schema>,
60 segment_manager: Arc<crate::merge::SegmentManager<D>>,
61 term_cache_blocks: usize,
62 reload_interval_ms: u64,
63 ) -> Result<Self> {
64 let initial_segment_ids = segment_manager.get_segment_ids().await;
66
67 let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
68
69 Ok(Self {
70 schema,
71 segment_manager,
72 searcher: RwLock::new(Arc::new(reader)),
73 term_cache_blocks,
74 last_reload_check: RwLock::new(std::time::Instant::now()),
75 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
76 current_segment_ids: RwLock::new(initial_segment_ids),
77 })
78 }
79
80 async fn create_reader(
85 schema: &Arc<Schema>,
86 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
87 term_cache_blocks: usize,
88 ) -> Result<Searcher<D>> {
89 let trained = {
91 let metadata_arc = segment_manager.metadata();
92 let meta = metadata_arc.read().await;
93 meta.load_trained_structures(segment_manager.directory().as_ref())
94 .await
95 };
96 let trained_centroids = trained
97 .as_ref()
98 .map(|t| t.centroids.clone())
99 .unwrap_or_default();
100
101 let snapshot = segment_manager.acquire_snapshot().await;
103
104 Searcher::from_snapshot(
105 segment_manager.directory(),
106 Arc::clone(schema),
107 snapshot,
108 trained_centroids,
109 term_cache_blocks,
110 )
111 .await
112 }
113
114 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
116 self.reload_check_interval = interval;
117 }
118
119 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
121 let should_check = {
123 let last = self.last_reload_check.read();
124 last.elapsed() >= self.reload_check_interval
125 };
126
127 if should_check {
128 *self.last_reload_check.write() = std::time::Instant::now();
130
131 let new_segment_ids = self.segment_manager.get_segment_ids().await;
133
134 let segments_changed = {
136 let current = self.current_segment_ids.read();
137 *current != new_segment_ids
138 };
139
140 if segments_changed {
141 let old_count = self.current_segment_ids.read().len();
142 let new_count = new_segment_ids.len();
143 log::info!(
144 "[index_reload] old_count={} new_count={}",
145 old_count,
146 new_count
147 );
148 self.reload_with_segments(new_segment_ids).await?;
149 }
150 }
151
152 Ok(Arc::clone(&*self.searcher.read()))
153 }
154
155 pub async fn reload(&self) -> Result<()> {
157 let new_segment_ids = self.segment_manager.get_segment_ids().await;
158 self.reload_with_segments(new_segment_ids).await
159 }
160
161 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
163 let new_reader =
164 Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
165 .await?;
166
167 *self.searcher.write() = Arc::new(new_reader);
169 *self.current_segment_ids.write() = new_segment_ids;
170
171 Ok(())
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178}