hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::CoarseCentroids;
15
16use super::Searcher;
17
18pub struct IndexReader<D: DirectoryWriter + 'static> {
23 schema: Arc<Schema>,
25 segment_manager: Arc<crate::merge::SegmentManager<D>>,
27 searcher: RwLock<Arc<Searcher<D>>>,
29 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31 term_cache_blocks: usize,
33 last_reload_check: RwLock<std::time::Instant>,
35 reload_check_interval: std::time::Duration,
37 current_segment_ids: RwLock<Vec<String>>,
39}
40
41impl<D: DirectoryWriter + 'static> IndexReader<D> {
42 pub async fn from_segment_manager(
44 schema: Arc<Schema>,
45 segment_manager: Arc<crate::merge::SegmentManager<D>>,
46 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
47 term_cache_blocks: usize,
48 ) -> Result<Self> {
49 Self::from_segment_manager_with_reload_interval(
50 schema,
51 segment_manager,
52 trained_centroids,
53 term_cache_blocks,
54 1000, )
56 .await
57 }
58
59 pub async fn from_segment_manager_with_reload_interval(
61 schema: Arc<Schema>,
62 segment_manager: Arc<crate::merge::SegmentManager<D>>,
63 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
64 term_cache_blocks: usize,
65 reload_interval_ms: u64,
66 ) -> Result<Self> {
67 let initial_segment_ids = segment_manager.get_segment_ids().await;
69
70 let reader = Self::create_reader(
71 &schema,
72 &segment_manager,
73 &trained_centroids,
74 term_cache_blocks,
75 )
76 .await?;
77
78 Ok(Self {
79 schema,
80 segment_manager,
81 searcher: RwLock::new(Arc::new(reader)),
82 trained_centroids,
83 term_cache_blocks,
84 last_reload_check: RwLock::new(std::time::Instant::now()),
85 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
86 current_segment_ids: RwLock::new(initial_segment_ids),
87 })
88 }
89
90 async fn create_reader(
93 schema: &Arc<Schema>,
94 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
95 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
96 term_cache_blocks: usize,
97 ) -> Result<Searcher<D>> {
98 let snapshot = segment_manager.acquire_snapshot().await;
100
101 Searcher::from_snapshot(
102 segment_manager.directory(),
103 Arc::clone(schema),
104 snapshot,
105 trained_centroids.clone(),
106 term_cache_blocks,
107 )
108 .await
109 }
110
111 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
113 self.reload_check_interval = interval;
114 }
115
116 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
118 let should_check = {
120 let last = self.last_reload_check.read();
121 last.elapsed() >= self.reload_check_interval
122 };
123
124 if should_check {
125 *self.last_reload_check.write() = std::time::Instant::now();
127
128 let new_segment_ids = self.segment_manager.get_segment_ids().await;
130
131 let segments_changed = {
133 let current = self.current_segment_ids.read();
134 *current != new_segment_ids
135 };
136
137 if segments_changed {
138 let old_count = self.current_segment_ids.read().len();
139 let new_count = new_segment_ids.len();
140 log::info!(
141 "[index_reload] old_count={} new_count={}",
142 old_count,
143 new_count
144 );
145 self.reload_with_segments(new_segment_ids).await?;
146 }
147 }
148
149 Ok(Arc::clone(&*self.searcher.read()))
150 }
151
152 pub async fn reload(&self) -> Result<()> {
154 let new_segment_ids = self.segment_manager.get_segment_ids().await;
155 self.reload_with_segments(new_segment_ids).await
156 }
157
158 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
160 let new_reader = Self::create_reader(
161 &self.schema,
162 &self.segment_manager,
163 &self.trained_centroids,
164 self.term_cache_blocks,
165 )
166 .await?;
167
168 *self.searcher.write() = Arc::new(new_reader);
170 *self.current_segment_ids.write() = new_segment_ids;
171
172 Ok(())
173 }
174
175 pub fn schema(&self) -> &Schema {
177 &self.schema
178 }
179}