1use std::sync::Arc;
13
14use parking_lot::RwLock;
15use rustc_hash::FxHashMap;
16
17use crate::dsl::Schema;
18use crate::error::Result;
19use crate::query::LazyGlobalStats;
20use crate::segment::{SegmentId, SegmentReader, SegmentSnapshot};
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24use crate::directories::DirectoryWriter;
25
26#[cfg(feature = "native")]
30pub struct Searcher<D: DirectoryWriter + 'static> {
31 _snapshot: SegmentSnapshot<D>,
33 segments: Vec<Arc<SegmentReader>>,
35 schema: Arc<Schema>,
37 default_fields: Vec<crate::Field>,
39 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
41 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
43 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
45 global_stats: Arc<LazyGlobalStats>,
48}
49
50#[cfg(feature = "native")]
51impl<D: DirectoryWriter + 'static> Searcher<D> {
52 pub(crate) async fn from_snapshot(
54 directory: Arc<D>,
55 schema: Arc<Schema>,
56 snapshot: SegmentSnapshot<D>,
57 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
58 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
59 term_cache_blocks: usize,
60 ) -> Result<Self> {
61 let mut segments = Vec::new();
63 let mut doc_id_offset = 0u32;
64
65 for id_str in snapshot.segment_ids() {
66 let Some(segment_id) = SegmentId::from_hex(id_str) else {
67 continue;
68 };
69
70 match SegmentReader::open(
71 directory.as_ref(),
72 segment_id,
73 Arc::clone(&schema),
74 doc_id_offset,
75 term_cache_blocks,
76 )
77 .await
78 {
79 Ok(reader) => {
80 doc_id_offset += reader.meta().num_docs;
81 segments.push(Arc::new(reader));
82 }
83 Err(e) => {
84 log::warn!("Failed to open segment {}: {:?}", id_str, e);
85 }
86 }
87 }
88
89 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
91 schema.default_fields().to_vec()
92 } else {
93 schema
94 .fields()
95 .filter(|(_, entry)| {
96 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
97 })
98 .map(|(field, _)| field)
99 .collect()
100 };
101
102 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
105
106 Ok(Self {
107 _snapshot: snapshot,
108 segments,
109 schema,
110 default_fields,
111 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
112 trained_centroids,
113 trained_codebooks,
114 global_stats,
115 })
116 }
117
118 pub fn schema(&self) -> &Schema {
120 &self.schema
121 }
122
123 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
125 &self.segments
126 }
127
128 pub fn default_fields(&self) -> &[crate::Field] {
130 &self.default_fields
131 }
132
133 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
135 &self.tokenizers
136 }
137
138 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
140 &self.trained_centroids
141 }
142
143 pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
145 &self.trained_codebooks
146 }
147
148 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
153 &self.global_stats
154 }
155
156 pub fn num_docs(&self) -> u32 {
158 self.segments.iter().map(|s| s.meta().num_docs).sum()
159 }
160
161 pub fn num_segments(&self) -> usize {
163 self.segments.len()
164 }
165
166 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
168 let mut offset = 0u32;
169 for segment in &self.segments {
170 let segment_docs = segment.meta().num_docs;
171 if doc_id < offset + segment_docs {
172 let local_doc_id = doc_id - offset;
173 return segment.doc(local_doc_id).await;
174 }
175 offset += segment_docs;
176 }
177 Ok(None)
178 }
179
180 pub async fn search(
182 &self,
183 query: &dyn crate::query::Query,
184 limit: usize,
185 ) -> Result<Vec<crate::query::SearchResult>> {
186 self.search_with_offset(query, limit, 0).await
187 }
188
189 pub async fn search_with_offset(
191 &self,
192 query: &dyn crate::query::Query,
193 limit: usize,
194 offset: usize,
195 ) -> Result<Vec<crate::query::SearchResult>> {
196 let fetch_limit = offset + limit;
197 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
198
199 for segment in &self.segments {
200 let segment_id = segment.meta().id;
201 let results =
202 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
203 for result in results {
204 all_results.push((segment_id, result));
205 }
206 }
207
208 all_results.sort_by(|a, b| {
210 b.1.score
211 .partial_cmp(&a.1.score)
212 .unwrap_or(std::cmp::Ordering::Equal)
213 });
214
215 Ok(all_results
217 .into_iter()
218 .skip(offset)
219 .take(limit)
220 .map(|(_, result)| result)
221 .collect())
222 }
223}
224
225#[cfg(feature = "native")]
230pub struct IndexReader<D: DirectoryWriter + 'static> {
231 schema: Arc<Schema>,
233 segment_manager: Arc<crate::merge::SegmentManager<D>>,
235 searcher: RwLock<Arc<Searcher<D>>>,
237 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
239 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
241 term_cache_blocks: usize,
243 last_reload: RwLock<std::time::Instant>,
245 reload_interval: std::time::Duration,
247}
248
249#[cfg(feature = "native")]
250impl<D: DirectoryWriter + 'static> IndexReader<D> {
251 pub async fn from_segment_manager(
253 schema: Arc<Schema>,
254 segment_manager: Arc<crate::merge::SegmentManager<D>>,
255 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
256 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
257 term_cache_blocks: usize,
258 ) -> Result<Self> {
259 let reader = Self::create_reader(
260 &schema,
261 &segment_manager,
262 &trained_centroids,
263 &trained_codebooks,
264 term_cache_blocks,
265 )
266 .await?;
267
268 Ok(Self {
269 schema,
270 segment_manager,
271 searcher: RwLock::new(Arc::new(reader)),
272 trained_centroids,
273 trained_codebooks,
274 term_cache_blocks,
275 last_reload: RwLock::new(std::time::Instant::now()),
276 reload_interval: std::time::Duration::from_secs(1),
277 })
278 }
279
280 async fn create_reader(
283 schema: &Arc<Schema>,
284 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
285 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
286 trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
287 term_cache_blocks: usize,
288 ) -> Result<Searcher<D>> {
289 let snapshot = segment_manager.acquire_snapshot().await;
291
292 Searcher::from_snapshot(
293 segment_manager.directory(),
294 Arc::clone(schema),
295 snapshot,
296 trained_centroids.clone(),
297 trained_codebooks.clone(),
298 term_cache_blocks,
299 )
300 .await
301 }
302
303 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
305 self.reload_interval = interval;
306 }
307
308 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
310 let should_reload = {
312 let last = self.last_reload.read();
313 last.elapsed() >= self.reload_interval
314 };
315
316 if should_reload {
317 self.reload().await?;
318 }
319
320 Ok(Arc::clone(&*self.searcher.read()))
321 }
322
323 pub async fn reload(&self) -> Result<()> {
325 let new_reader = Self::create_reader(
326 &self.schema,
327 &self.segment_manager,
328 &self.trained_centroids,
329 &self.trained_codebooks,
330 self.term_cache_blocks,
331 )
332 .await?;
333
334 *self.searcher.write() = Arc::new(new_reader);
336 *self.last_reload.write() = std::time::Instant::now();
337
338 Ok(())
339 }
340
341 pub fn schema(&self) -> &Schema {
343 &self.schema
344 }
345}