hermes_core/index/
reader.rs1use std::sync::Arc;
7
8use parking_lot::RwLock;
9use rustc_hash::FxHashMap;
10
11use crate::directories::DirectoryWriter;
12use crate::dsl::Schema;
13use crate::error::Result;
14use crate::structures::{CoarseCentroids, PQCodebook};
15
16use super::Searcher;
17
18pub struct IndexReader<D: DirectoryWriter + 'static> {
23 schema: Arc<Schema>,
25 segment_manager: Arc<crate::merge::SegmentManager<D>>,
27 searcher: RwLock<Arc<Searcher<D>>>,
29 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
31 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
33 term_cache_blocks: usize,
35 last_reload_check: RwLock<std::time::Instant>,
37 reload_check_interval: std::time::Duration,
39 current_segment_ids: RwLock<Vec<String>>,
41}
42
43impl<D: DirectoryWriter + 'static> IndexReader<D> {
44 pub async fn from_segment_manager(
46 schema: Arc<Schema>,
47 segment_manager: Arc<crate::merge::SegmentManager<D>>,
48 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
49 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
50 term_cache_blocks: usize,
51 ) -> Result<Self> {
52 let initial_segment_ids = segment_manager.get_segment_ids().await;
54
55 let reader = Self::create_reader(
56 &schema,
57 &segment_manager,
58 &trained_centroids,
59 &trained_codebooks,
60 term_cache_blocks,
61 )
62 .await?;
63
64 Ok(Self {
65 schema,
66 segment_manager,
67 searcher: RwLock::new(Arc::new(reader)),
68 trained_centroids,
69 trained_codebooks,
70 term_cache_blocks,
71 last_reload_check: RwLock::new(std::time::Instant::now()),
72 reload_check_interval: std::time::Duration::from_secs(1),
73 current_segment_ids: RwLock::new(initial_segment_ids),
74 })
75 }
76
77 async fn create_reader(
80 schema: &Arc<Schema>,
81 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
82 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
83 trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
84 term_cache_blocks: usize,
85 ) -> Result<Searcher<D>> {
86 let snapshot = segment_manager.acquire_snapshot().await;
88
89 Searcher::from_snapshot(
90 segment_manager.directory(),
91 Arc::clone(schema),
92 snapshot,
93 trained_centroids.clone(),
94 trained_codebooks.clone(),
95 term_cache_blocks,
96 )
97 .await
98 }
99
100 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
102 self.reload_check_interval = interval;
103 }
104
105 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
107 let should_check = {
109 let last = self.last_reload_check.read();
110 last.elapsed() >= self.reload_check_interval
111 };
112
113 if should_check {
114 *self.last_reload_check.write() = std::time::Instant::now();
116
117 let new_segment_ids = self.segment_manager.get_segment_ids().await;
119
120 let segments_changed = {
122 let current = self.current_segment_ids.read();
123 *current != new_segment_ids
124 };
125
126 if segments_changed {
127 log::debug!(
128 "Segments changed, reloading searcher ({} -> {} segments)",
129 self.current_segment_ids.read().len(),
130 new_segment_ids.len()
131 );
132 self.reload_with_segments(new_segment_ids).await?;
133 }
134 }
135
136 Ok(Arc::clone(&*self.searcher.read()))
137 }
138
139 pub async fn reload(&self) -> Result<()> {
141 let new_segment_ids = self.segment_manager.get_segment_ids().await;
142 self.reload_with_segments(new_segment_ids).await
143 }
144
145 async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
147 let new_reader = Self::create_reader(
148 &self.schema,
149 &self.segment_manager,
150 &self.trained_centroids,
151 &self.trained_codebooks,
152 self.term_cache_blocks,
153 )
154 .await?;
155
156 *self.searcher.write() = Arc::new(new_reader);
158 *self.current_segment_ids.write() = new_segment_ids;
159
160 Ok(())
161 }
162
163 pub fn schema(&self) -> &Schema {
165 &self.schema
166 }
167}