hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::directories::DirectoryWriter;
11use crate::dsl::Schema;
12use crate::error::Result;
13
14use super::Searcher;
15
16pub struct IndexReader<D: DirectoryWriter + 'static> {
21 schema: Arc<Schema>,
23 segment_manager: Arc<crate::merge::SegmentManager<D>>,
25 searcher: RwLock<Arc<Searcher<D>>>,
27 term_cache_blocks: usize,
29 last_reload_check: RwLock<std::time::Instant>,
31 reload_check_interval: std::time::Duration,
33 current_segment_ids: RwLock<Vec<String>>,
35}
36
37impl<D: DirectoryWriter + 'static> IndexReader<D> {
38 pub async fn from_segment_manager(
43 schema: Arc<Schema>,
44 segment_manager: Arc<crate::merge::SegmentManager<D>>,
45 term_cache_blocks: usize,
46 reload_interval_ms: u64,
47 ) -> Result<Self> {
48 let initial_segment_ids = segment_manager.get_segment_ids().await;
50
51 let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
52
53 Ok(Self {
54 schema,
55 segment_manager,
56 searcher: RwLock::new(Arc::new(reader)),
57 term_cache_blocks,
58 last_reload_check: RwLock::new(std::time::Instant::now()),
59 reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
60 current_segment_ids: RwLock::new(initial_segment_ids),
61 })
62 }
63
64 async fn create_reader(
68 schema: &Arc<Schema>,
69 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
70 term_cache_blocks: usize,
71 ) -> Result<Searcher<D>> {
72 let trained = segment_manager.trained();
74 let trained_centroids = trained
75 .as_ref()
76 .map(|t| t.centroids.clone())
77 .unwrap_or_default();
78
79 let snapshot = segment_manager.acquire_snapshot().await;
81
82 Searcher::from_snapshot(
83 segment_manager.directory(),
84 Arc::clone(schema),
85 snapshot,
86 trained_centroids,
87 term_cache_blocks,
88 )
89 .await
90 }
91
92 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
94 self.reload_check_interval = interval;
95 }
96
97 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
99 let should_check = {
101 let last = self.last_reload_check.read();
102 last.elapsed() >= self.reload_check_interval
103 };
104
105 if should_check {
106 *self.last_reload_check.write() = std::time::Instant::now();
108
109 let new_segment_ids = self.segment_manager.get_segment_ids().await;
111
112 let segments_changed = {
114 let current = self.current_segment_ids.read();
115 *current != new_segment_ids
116 };
117
118 if segments_changed {
119 let old_count = self.current_segment_ids.read().len();
120 let new_count = new_segment_ids.len();
121 log::info!(
122 "[index_reload] old_count={} new_count={}",
123 old_count,
124 new_count
125 );
126 self.reload_with_segments(new_segment_ids).await?;
127 }
128 }
129
130 Ok(Arc::clone(&*self.searcher.read()))
131 }
132
133 pub async fn reload(&self) -> Result<()> {
135 let new_segment_ids = self.segment_manager.get_segment_ids().await;
136 self.reload_with_segments(new_segment_ids).await
137 }
138
139 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
141 let new_reader =
142 Self::create_reader(&self.schema, &self.segment_manager, self.term_cache_blocks)
143 .await?;
144
145 *self.searcher.write() = Arc::new(new_reader);
147 *self.current_segment_ids.write() = new_segment_ids;
148
149 Ok(())
150 }
151
152 pub fn schema(&self) -> &Schema {
154 &self.schema
155 }
156}