hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::{CoarseCentroids, PQCodebook};
15
16use super::Searcher;
17
18pub struct IndexReader<D: DirectoryWriter + 'static> {
23 schema: Arc<Schema>,
25 segment_manager: Arc<crate::merge::SegmentManager<D>>,
27 searcher: RwLock<Arc<Searcher<D>>>,
29 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
33 term_cache_blocks: usize,
35 last_reload_check: RwLock<std::time::Instant>,
37 reload_check_interval: std::time::Duration,
39 current_segment_ids: RwLock<Vec<String>>,
41}
42
43impl<D: DirectoryWriter + 'static> IndexReader<D> {
44 pub async fn from_segment_manager(
46 schema: Arc<Schema>,
47 segment_manager: Arc<crate::merge::SegmentManager<D>>,
48 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
49 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
50 term_cache_blocks: usize,
51 ) -> Result<Self> {
52 Self::from_segment_manager_with_reload_interval(
53 schema,
54 segment_manager,
55 trained_centroids,
56 trained_codebooks,
57 term_cache_blocks,
58 1000, )
60 .await
61 }
62
63 pub async fn from_segment_manager_with_reload_interval(
65 schema: Arc<Schema>,
66 segment_manager: Arc<crate::merge::SegmentManager<D>>,
67 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
68 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
69 term_cache_blocks: usize,
70 reload_interval_ms: u64,
71 ) -> Result<Self> {
72 let initial_segment_ids = segment_manager.get_segment_ids().await;
74
75 let reader = Self::create_reader(
76 &schema,
77 &segment_manager,
78 &trained_centroids,
79 &trained_codebooks,
80 term_cache_blocks,
81 )
82 .await?;
83
84 Ok(Self {
85 schema,
86 segment_manager,
87 searcher: RwLock::new(Arc::new(reader)),
88 trained_centroids,
89 trained_codebooks,
90 term_cache_blocks,
91 last_reload_check: RwLock::new(std::time::Instant::now()),
92 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
93 current_segment_ids: RwLock::new(initial_segment_ids),
94 })
95 }
96
97 async fn create_reader(
100 schema: &Arc<Schema>,
101 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
102 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
103 trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
104 term_cache_blocks: usize,
105 ) -> Result<Searcher<D>> {
106 let snapshot = segment_manager.acquire_snapshot().await;
108
109 Searcher::from_snapshot(
110 segment_manager.directory(),
111 Arc::clone(schema),
112 snapshot,
113 trained_centroids.clone(),
114 trained_codebooks.clone(),
115 term_cache_blocks,
116 )
117 .await
118 }
119
120 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
122 self.reload_check_interval = interval;
123 }
124
125 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
127 let should_check = {
129 let last = self.last_reload_check.read();
130 last.elapsed() >= self.reload_check_interval
131 };
132
133 if should_check {
134 *self.last_reload_check.write() = std::time::Instant::now();
136
137 let new_segment_ids = self.segment_manager.get_segment_ids().await;
139
140 let segments_changed = {
142 let current = self.current_segment_ids.read();
143 *current != new_segment_ids
144 };
145
146 if segments_changed {
147 let old_count = self.current_segment_ids.read().len();
148 let new_count = new_segment_ids.len();
149 log::info!(
150 "[index_reload] old_count={} new_count={}",
151 old_count,
152 new_count
153 );
154 self.reload_with_segments(new_segment_ids).await?;
155 }
156 }
157
158 Ok(Arc::clone(&*self.searcher.read()))
159 }
160
161 pub async fn reload(&self) -> Result<()> {
163 let new_segment_ids = self.segment_manager.get_segment_ids().await;
164 self.reload_with_segments(new_segment_ids).await
165 }
166
167 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
169 let new_reader = Self::create_reader(
170 &self.schema,
171 &self.segment_manager,
172 &self.trained_centroids,
173 &self.trained_codebooks,
174 self.term_cache_blocks,
175 )
176 .await?;
177
178 *self.searcher.write() = Arc::new(new_reader);
180 *self.current_segment_ids.write() = new_segment_ids;
181
182 Ok(())
183 }
184
185 pub fn schema(&self) -> &Schema {
187 &self.schema
188 }
189}