hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::directories::DirectoryWriter;
11use crate::dsl::Schema;
12use crate::error::Result;
13
14use super::Searcher;
15
16pub struct IndexReader<D: DirectoryWriter + 'static> {
21 schema: Arc<Schema>,
23 segment_manager: Arc<crate::merge::SegmentManager<D>>,
25 searcher: RwLock<Arc<Searcher<D>>>,
27 term_cache_blocks: usize,
29 last_reload_check: RwLock<std::time::Instant>,
31 reload_check_interval: std::time::Duration,
33 current_segment_ids: RwLock<Vec<String>>,
35}
36
37impl<D: DirectoryWriter + 'static> IndexReader<D> {
38 pub async fn from_segment_manager(
43 schema: Arc<Schema>,
44 segment_manager: Arc<crate::merge::SegmentManager<D>>,
45 term_cache_blocks: usize,
46 reload_interval_ms: u64,
47 ) -> Result<Self> {
48 let initial_segment_ids = segment_manager.get_segment_ids().await;
50
51 let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
52
53 Ok(Self {
54 schema,
55 segment_manager,
56 searcher: RwLock::new(Arc::new(reader)),
57 term_cache_blocks,
58 last_reload_check: RwLock::new(std::time::Instant::now()),
59 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
60 current_segment_ids: RwLock::new(initial_segment_ids),
61 })
62 }
63
64 async fn create_reader(
69 schema: &Arc<Schema>,
70 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
71 term_cache_blocks: usize,
72 ) -> Result<Searcher<D>> {
73 let trained = {
75 let metadata_arc = segment_manager.metadata();
76 let meta = metadata_arc.read().await;
77 meta.load_trained_structures(segment_manager.directory().as_ref())
78 .await
79 };
80 let trained_centroids = trained
81 .as_ref()
82 .map(|t| t.centroids.clone())
83 .unwrap_or_default();
84
85 let snapshot = segment_manager.acquire_snapshot().await;
87
88 Searcher::from_snapshot(
89 segment_manager.directory(),
90 Arc::clone(schema),
91 snapshot,
92 trained_centroids,
93 term_cache_blocks,
94 )
95 .await
96 }
97
98 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
100 self.reload_check_interval = interval;
101 }
102
103 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
105 let should_check = {
107 let last = self.last_reload_check.read();
108 last.elapsed() >= self.reload_check_interval
109 };
110
111 if should_check {
112 *self.last_reload_check.write() = std::time::Instant::now();
114
115 let new_segment_ids = self.segment_manager.get_segment_ids().await;
117
118 let segments_changed = {
120 let current = self.current_segment_ids.read();
121 *current != new_segment_ids
122 };
123
124 if segments_changed {
125 let old_count = self.current_segment_ids.read().len();
126 let new_count = new_segment_ids.len();
127 log::info!(
128 "[index_reload] old_count={} new_count={}",
129 old_count,
130 new_count
131 );
132 self.reload_with_segments(new_segment_ids).await?;
133 }
134 }
135
136 Ok(Arc::clone(&*self.searcher.read()))
137 }
138
139 pub async fn reload(&self) -> Result<()> {
141 let new_segment_ids = self.segment_manager.get_segment_ids().await;
142 self.reload_with_segments(new_segment_ids).await
143 }
144
145 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
147 let new_reader =
148 Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
149 .await?;
150
151 *self.searcher.write() = Arc::new(new_reader);
153 *self.current_segment_ids.write() = new_segment_ids;
154
155 Ok(())
156 }
157
158 pub fn schema(&self) -> &Schema {
160 &self.schema
161 }
162}