1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::{CoarseCentroids, PQCodebook};
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot<D>,
27 #[cfg(not(feature = "native"))]
29 _phantom: std::marker::PhantomData<D>,
30 segments: Vec<Arc<SegmentReader>>,
32 schema: Arc<Schema>,
34 default_fields: Vec<crate::Field>,
36 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
42 global_stats: Arc<LazyGlobalStats>,
44}
45
46impl<D: Directory + 'static> Searcher<D> {
47 pub async fn open(
52 directory: Arc<D>,
53 schema: Arc<Schema>,
54 segment_ids: &[String],
55 term_cache_blocks: usize,
56 ) -> Result<Self> {
57 Self::create(
58 directory,
59 schema,
60 segment_ids,
61 FxHashMap::default(),
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot<D>,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
76 term_cache_blocks: usize,
77 ) -> Result<Self> {
78 let (segments, default_fields, global_stats) = Self::load_common(
79 &directory,
80 &schema,
81 snapshot.segment_ids(),
82 term_cache_blocks,
83 )
84 .await;
85
86 Ok(Self {
87 _snapshot: snapshot,
88 segments,
89 schema,
90 default_fields,
91 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
92 trained_centroids,
93 trained_codebooks,
94 global_stats,
95 })
96 }
97
98 async fn create(
100 directory: Arc<D>,
101 schema: Arc<Schema>,
102 segment_ids: &[String],
103 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
104 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
105 term_cache_blocks: usize,
106 ) -> Result<Self> {
107 let (segments, default_fields, global_stats) =
108 Self::load_common(&directory, &schema, segment_ids, term_cache_blocks).await;
109
110 #[cfg(feature = "native")]
111 {
112 let tracker = Arc::new(SegmentTracker::new());
113 let snapshot = SegmentSnapshot::new(tracker, directory, segment_ids.to_vec());
114 Ok(Self {
115 _snapshot: snapshot,
116 segments,
117 schema,
118 default_fields,
119 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
120 trained_centroids,
121 trained_codebooks,
122 global_stats,
123 })
124 }
125
126 #[cfg(not(feature = "native"))]
127 {
128 let _ = directory; Ok(Self {
130 _phantom: std::marker::PhantomData,
131 segments,
132 schema,
133 default_fields,
134 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135 trained_centroids,
136 trained_codebooks,
137 global_stats,
138 })
139 }
140 }
141
142 async fn load_common(
144 directory: &Arc<D>,
145 schema: &Arc<Schema>,
146 segment_ids: &[String],
147 term_cache_blocks: usize,
148 ) -> (
149 Vec<Arc<SegmentReader>>,
150 Vec<crate::Field>,
151 Arc<LazyGlobalStats>,
152 ) {
153 let segments = Self::load_segments(directory, schema, segment_ids, term_cache_blocks).await;
154 let default_fields = Self::build_default_fields(schema);
155 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
156 (segments, default_fields, global_stats)
157 }
158
159 async fn load_segments(
161 directory: &Arc<D>,
162 schema: &Arc<Schema>,
163 segment_ids: &[String],
164 term_cache_blocks: usize,
165 ) -> Vec<Arc<SegmentReader>> {
166 let mut segments = Vec::new();
167 let mut doc_id_offset = 0u32;
168
169 for id_str in segment_ids {
170 let Some(segment_id) = SegmentId::from_hex(id_str) else {
171 continue;
172 };
173
174 match SegmentReader::open(
175 directory.as_ref(),
176 segment_id,
177 Arc::clone(schema),
178 doc_id_offset,
179 term_cache_blocks,
180 )
181 .await
182 {
183 Ok(reader) => {
184 doc_id_offset += reader.meta().num_docs;
185 segments.push(Arc::new(reader));
186 }
187 Err(e) => {
188 log::warn!("Failed to open segment {}: {:?}", id_str, e);
189 }
190 }
191 }
192
193 segments
194 }
195
196 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
198 if !schema.default_fields().is_empty() {
199 schema.default_fields().to_vec()
200 } else {
201 schema
202 .fields()
203 .filter(|(_, entry)| {
204 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
205 })
206 .map(|(field, _)| field)
207 .collect()
208 }
209 }
210
211 pub fn schema(&self) -> &Schema {
213 &self.schema
214 }
215
216 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
218 &self.segments
219 }
220
221 pub fn default_fields(&self) -> &[crate::Field] {
223 &self.default_fields
224 }
225
226 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
228 &self.tokenizers
229 }
230
231 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
233 &self.trained_centroids
234 }
235
236 pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
238 &self.trained_codebooks
239 }
240
241 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
243 &self.global_stats
244 }
245
246 pub fn num_docs(&self) -> u32 {
248 self.segments.iter().map(|s| s.meta().num_docs).sum()
249 }
250
251 pub fn num_segments(&self) -> usize {
253 self.segments.len()
254 }
255
256 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
258 let mut offset = 0u32;
259 for segment in &self.segments {
260 let segment_docs = segment.meta().num_docs;
261 if doc_id < offset + segment_docs {
262 let local_doc_id = doc_id - offset;
263 return segment.doc(local_doc_id).await;
264 }
265 offset += segment_docs;
266 }
267 Ok(None)
268 }
269
270 pub async fn search(
272 &self,
273 query: &dyn crate::query::Query,
274 limit: usize,
275 ) -> Result<Vec<crate::query::SearchResult>> {
276 self.search_with_offset(query, limit, 0).await
277 }
278
279 pub async fn search_with_offset(
281 &self,
282 query: &dyn crate::query::Query,
283 limit: usize,
284 offset: usize,
285 ) -> Result<Vec<crate::query::SearchResult>> {
286 let fetch_limit = offset + limit;
287 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
288
289 for segment in &self.segments {
290 let segment_id = segment.meta().id;
291 let results =
292 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
293 for result in results {
294 all_results.push((segment_id, result));
295 }
296 }
297
298 all_results.sort_by(|a, b| {
300 b.1.score
301 .partial_cmp(&a.1.score)
302 .unwrap_or(std::cmp::Ordering::Equal)
303 });
304
305 Ok(all_results
307 .into_iter()
308 .skip(offset)
309 .take(limit)
310 .map(|(_, result)| result)
311 .collect())
312 }
313
314 pub async fn query(
316 &self,
317 query_str: &str,
318 limit: usize,
319 ) -> Result<crate::query::SearchResponse> {
320 self.query_offset(query_str, limit, 0).await
321 }
322
323 pub async fn query_offset(
325 &self,
326 query_str: &str,
327 limit: usize,
328 offset: usize,
329 ) -> Result<crate::query::SearchResponse> {
330 let parser = self.query_parser();
331 let query = parser
332 .parse(query_str)
333 .map_err(crate::error::Error::Query)?;
334
335 let fetch_limit = offset + limit;
336 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
337
338 for segment in &self.segments {
339 let segment_id = segment.meta().id;
340 let results =
341 crate::query::search_segment(segment.as_ref(), query.as_ref(), fetch_limit).await?;
342 for result in results {
343 all_results.push((segment_id, result));
344 }
345 }
346
347 all_results.sort_by(|a, b| {
348 b.1.score
349 .partial_cmp(&a.1.score)
350 .unwrap_or(std::cmp::Ordering::Equal)
351 });
352
353 let total_hits = all_results.len() as u32;
354
355 let hits: Vec<crate::query::SearchHit> = all_results
356 .into_iter()
357 .skip(offset)
358 .take(limit)
359 .map(|(segment_id, result)| crate::query::SearchHit {
360 address: crate::query::DocAddress::new(segment_id, result.doc_id),
361 score: result.score,
362 matched_fields: result.extract_ordinals(),
363 })
364 .collect();
365
366 Ok(crate::query::SearchResponse { hits, total_hits })
367 }
368
369 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
371 let query_routers = self.schema.query_routers();
372 if !query_routers.is_empty()
373 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
374 {
375 return crate::dsl::QueryLanguageParser::with_router(
376 Arc::clone(&self.schema),
377 self.default_fields.clone(),
378 Arc::clone(&self.tokenizers),
379 router,
380 );
381 }
382
383 crate::dsl::QueryLanguageParser::new(
384 Arc::clone(&self.schema),
385 self.default_fields.clone(),
386 Arc::clone(&self.tokenizers),
387 )
388 }
389
390 pub async fn get_document(
392 &self,
393 address: &crate::query::DocAddress,
394 ) -> Result<Option<crate::dsl::Document>> {
395 let segment_id = address.segment_id_u128().ok_or_else(|| {
396 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
397 })?;
398
399 for segment in &self.segments {
400 if segment.meta().id == segment_id {
401 return segment.doc(address.doc_id).await;
402 }
403 }
404
405 Ok(None)
406 }
407}