1use std::sync::Arc;
13
14use parking_lot::RwLock;
15use rustc_hash::FxHashMap;
16
17use crate::dsl::Schema;
18use crate::error::Result;
19use crate::segment::{SegmentId, SegmentReader, SegmentSnapshot};
20use crate::structures::{CoarseCentroids, PQCodebook};
21
22#[cfg(feature = "native")]
23use crate::directories::DirectoryWriter;
24
25#[cfg(feature = "native")]
29pub struct Searcher<D: DirectoryWriter + 'static> {
30 _snapshot: SegmentSnapshot<D>,
32 segments: Vec<Arc<SegmentReader>>,
34 schema: Arc<Schema>,
36 default_fields: Vec<crate::Field>,
38 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
40 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
42 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
44}
45
46#[cfg(feature = "native")]
47impl<D: DirectoryWriter + 'static> Searcher<D> {
48 pub(crate) async fn from_snapshot(
50 directory: Arc<D>,
51 schema: Arc<Schema>,
52 snapshot: SegmentSnapshot<D>,
53 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
54 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
55 term_cache_blocks: usize,
56 ) -> Result<Self> {
57 let mut segments = Vec::new();
59 let mut doc_id_offset = 0u32;
60
61 for id_str in snapshot.segment_ids() {
62 let Some(segment_id) = SegmentId::from_hex(id_str) else {
63 continue;
64 };
65
66 match SegmentReader::open(
67 directory.as_ref(),
68 segment_id,
69 Arc::clone(&schema),
70 doc_id_offset,
71 term_cache_blocks,
72 )
73 .await
74 {
75 Ok(reader) => {
76 doc_id_offset += reader.meta().num_docs;
77 segments.push(Arc::new(reader));
78 }
79 Err(e) => {
80 log::warn!("Failed to open segment {}: {:?}", id_str, e);
81 }
82 }
83 }
84
85 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
87 schema.default_fields().to_vec()
88 } else {
89 schema
90 .fields()
91 .filter(|(_, entry)| {
92 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
93 })
94 .map(|(field, _)| field)
95 .collect()
96 };
97
98 Ok(Self {
99 _snapshot: snapshot,
100 segments,
101 schema,
102 default_fields,
103 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
104 trained_centroids,
105 trained_codebooks,
106 })
107 }
108
109 pub fn schema(&self) -> &Schema {
111 &self.schema
112 }
113
114 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
116 &self.segments
117 }
118
119 pub fn default_fields(&self) -> &[crate::Field] {
121 &self.default_fields
122 }
123
124 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
126 &self.tokenizers
127 }
128
129 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
131 &self.trained_centroids
132 }
133
134 pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
136 &self.trained_codebooks
137 }
138
139 pub fn num_docs(&self) -> u32 {
141 self.segments.iter().map(|s| s.meta().num_docs).sum()
142 }
143
144 pub fn num_segments(&self) -> usize {
146 self.segments.len()
147 }
148
149 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
151 let mut offset = 0u32;
152 for segment in &self.segments {
153 let segment_docs = segment.meta().num_docs;
154 if doc_id < offset + segment_docs {
155 let local_doc_id = doc_id - offset;
156 return segment.doc(local_doc_id).await;
157 }
158 offset += segment_docs;
159 }
160 Ok(None)
161 }
162
163 pub async fn search(
165 &self,
166 query: &dyn crate::query::Query,
167 limit: usize,
168 ) -> Result<Vec<crate::query::SearchResult>> {
169 self.search_with_offset(query, limit, 0).await
170 }
171
172 pub async fn search_with_offset(
174 &self,
175 query: &dyn crate::query::Query,
176 limit: usize,
177 offset: usize,
178 ) -> Result<Vec<crate::query::SearchResult>> {
179 let fetch_limit = offset + limit;
180 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
181
182 for segment in &self.segments {
183 let segment_id = segment.meta().id;
184 let results =
185 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
186 for result in results {
187 all_results.push((segment_id, result));
188 }
189 }
190
191 all_results.sort_by(|a, b| {
193 b.1.score
194 .partial_cmp(&a.1.score)
195 .unwrap_or(std::cmp::Ordering::Equal)
196 });
197
198 Ok(all_results
200 .into_iter()
201 .skip(offset)
202 .take(limit)
203 .map(|(_, result)| result)
204 .collect())
205 }
206}
207
208#[cfg(feature = "native")]
213pub struct IndexReader<D: DirectoryWriter + 'static> {
214 schema: Arc<Schema>,
216 segment_manager: Arc<crate::merge::SegmentManager<D>>,
218 searcher: RwLock<Arc<Searcher<D>>>,
220 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
222 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
224 term_cache_blocks: usize,
226 last_reload: RwLock<std::time::Instant>,
228 reload_interval: std::time::Duration,
230}
231
232#[cfg(feature = "native")]
233impl<D: DirectoryWriter + 'static> IndexReader<D> {
234 pub async fn from_segment_manager(
236 schema: Arc<Schema>,
237 segment_manager: Arc<crate::merge::SegmentManager<D>>,
238 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
239 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
240 term_cache_blocks: usize,
241 ) -> Result<Self> {
242 let reader = Self::create_reader(
243 &schema,
244 &segment_manager,
245 &trained_centroids,
246 &trained_codebooks,
247 term_cache_blocks,
248 )
249 .await?;
250
251 Ok(Self {
252 schema,
253 segment_manager,
254 searcher: RwLock::new(Arc::new(reader)),
255 trained_centroids,
256 trained_codebooks,
257 term_cache_blocks,
258 last_reload: RwLock::new(std::time::Instant::now()),
259 reload_interval: std::time::Duration::from_secs(1),
260 })
261 }
262
263 async fn create_reader(
266 schema: &Arc<Schema>,
267 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
268 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
269 trained_codebooks: &FxHashMap<u32, Arc<PQCodebook>>,
270 term_cache_blocks: usize,
271 ) -> Result<Searcher<D>> {
272 let snapshot = segment_manager.acquire_snapshot().await;
274
275 Searcher::from_snapshot(
276 segment_manager.directory(),
277 Arc::clone(schema),
278 snapshot,
279 trained_centroids.clone(),
280 trained_codebooks.clone(),
281 term_cache_blocks,
282 )
283 .await
284 }
285
286 pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
288 self.reload_interval = interval;
289 }
290
291 pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
293 let should_reload = {
295 let last = self.last_reload.read();
296 last.elapsed() >= self.reload_interval
297 };
298
299 if should_reload {
300 self.reload().await?;
301 }
302
303 Ok(Arc::clone(&*self.searcher.read()))
304 }
305
306 pub async fn reload(&self) -> Result<()> {
308 let new_reader = Self::create_reader(
309 &self.schema,
310 &self.segment_manager,
311 &self.trained_centroids,
312 &self.trained_codebooks,
313 self.term_cache_blocks,
314 )
315 .await?;
316
317 *self.searcher.write() = Arc::new(new_reader);
319 *self.last_reload.write() = std::time::Instant::now();
320
321 Ok(())
322 }
323
324 pub fn schema(&self) -> &Schema {
326 &self.schema
327 }
328}