1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot<D>,
27 #[cfg(not(feature = "native"))]
29 _phantom: std::marker::PhantomData<D>,
30 segments: Vec<Arc<SegmentReader>>,
32 schema: Arc<Schema>,
34 default_fields: Vec<crate::Field>,
36 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40 global_stats: Arc<LazyGlobalStats>,
42}
43
44impl<D: Directory + 'static> Searcher<D> {
45 pub async fn open(
50 directory: Arc<D>,
51 schema: Arc<Schema>,
52 segment_ids: &[String],
53 term_cache_blocks: usize,
54 ) -> Result<Self> {
55 Self::create(
56 directory,
57 schema,
58 segment_ids,
59 FxHashMap::default(),
60 term_cache_blocks,
61 )
62 .await
63 }
64
65 #[cfg(feature = "native")]
67 pub(crate) async fn from_snapshot(
68 directory: Arc<D>,
69 schema: Arc<Schema>,
70 snapshot: SegmentSnapshot<D>,
71 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
72 term_cache_blocks: usize,
73 ) -> Result<Self> {
74 let (segments, default_fields, global_stats) = Self::load_common(
75 &directory,
76 &schema,
77 snapshot.segment_ids(),
78 &trained_centroids,
79 term_cache_blocks,
80 )
81 .await;
82
83 Ok(Self {
84 _snapshot: snapshot,
85 segments,
86 schema,
87 default_fields,
88 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
89 trained_centroids,
90 global_stats,
91 })
92 }
93
94 async fn create(
96 directory: Arc<D>,
97 schema: Arc<Schema>,
98 segment_ids: &[String],
99 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100 term_cache_blocks: usize,
101 ) -> Result<Self> {
102 let (segments, default_fields, global_stats) = Self::load_common(
103 &directory,
104 &schema,
105 segment_ids,
106 &trained_centroids,
107 term_cache_blocks,
108 )
109 .await;
110
111 #[cfg(feature = "native")]
112 {
113 let tracker = Arc::new(SegmentTracker::new());
114 let snapshot = SegmentSnapshot::new(tracker, segment_ids.to_vec());
115 Ok(Self {
116 _snapshot: snapshot,
117 segments,
118 schema,
119 default_fields,
120 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
121 trained_centroids,
122 global_stats,
123 })
124 }
125
126 #[cfg(not(feature = "native"))]
127 {
128 let _ = directory; Ok(Self {
130 _phantom: std::marker::PhantomData,
131 segments,
132 schema,
133 default_fields,
134 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135 trained_centroids,
136 global_stats,
137 })
138 }
139 }
140
141 async fn load_common(
143 directory: &Arc<D>,
144 schema: &Arc<Schema>,
145 segment_ids: &[String],
146 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
147 term_cache_blocks: usize,
148 ) -> (
149 Vec<Arc<SegmentReader>>,
150 Vec<crate::Field>,
151 Arc<LazyGlobalStats>,
152 ) {
153 let segments = Self::load_segments(
154 directory,
155 schema,
156 segment_ids,
157 trained_centroids,
158 term_cache_blocks,
159 )
160 .await;
161 let default_fields = Self::build_default_fields(schema);
162 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163 (segments, default_fields, global_stats)
164 }
165
166 async fn load_segments(
168 directory: &Arc<D>,
169 schema: &Arc<Schema>,
170 segment_ids: &[String],
171 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
172 term_cache_blocks: usize,
173 ) -> Vec<Arc<SegmentReader>> {
174 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
176 .iter()
177 .enumerate()
178 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
179 .collect();
180
181 let futures: Vec<_> =
183 valid_segments
184 .iter()
185 .map(|(_, segment_id)| {
186 let dir = Arc::clone(directory);
187 let sch = Arc::clone(schema);
188 let sid = *segment_id;
189 async move {
190 SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
191 }
192 })
193 .collect();
194
195 let results = futures::future::join_all(futures).await;
196
197 let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
199 .into_iter()
200 .zip(results)
201 .filter_map(|((idx, _), result)| match result {
202 Ok(reader) => Some((idx, reader)),
203 Err(e) => {
204 log::warn!("Failed to open segment: {:?}", e);
205 None
206 }
207 })
208 .collect();
209
210 loaded.sort_by_key(|(idx, _)| *idx);
212
213 let mut doc_id_offset = 0u32;
215 let mut segments = Vec::with_capacity(loaded.len());
216 for (_, mut reader) in loaded {
217 reader.set_doc_id_offset(doc_id_offset);
218 doc_id_offset += reader.meta().num_docs;
219 if !trained_centroids.is_empty() {
221 reader.set_coarse_centroids(trained_centroids.clone());
222 }
223 segments.push(Arc::new(reader));
224 }
225
226 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
228 let total_sparse_mem: usize = segments
229 .iter()
230 .flat_map(|s| s.sparse_indexes().values())
231 .map(|idx| idx.num_dimensions() * 12)
232 .sum();
233 log::info!(
234 "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
235 segments.len(),
236 total_docs,
237 total_sparse_mem as f64 / (1024.0 * 1024.0)
238 );
239
240 segments
241 }
242
243 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
245 if !schema.default_fields().is_empty() {
246 schema.default_fields().to_vec()
247 } else {
248 schema
249 .fields()
250 .filter(|(_, entry)| {
251 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
252 })
253 .map(|(field, _)| field)
254 .collect()
255 }
256 }
257
258 pub fn schema(&self) -> &Schema {
260 &self.schema
261 }
262
263 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
265 &self.segments
266 }
267
268 pub fn default_fields(&self) -> &[crate::Field] {
270 &self.default_fields
271 }
272
273 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
275 &self.tokenizers
276 }
277
278 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
280 &self.trained_centroids
281 }
282
283 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
285 &self.global_stats
286 }
287
288 pub fn num_docs(&self) -> u32 {
290 self.segments.iter().map(|s| s.meta().num_docs).sum()
291 }
292
293 pub fn num_segments(&self) -> usize {
295 self.segments.len()
296 }
297
298 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
300 let mut offset = 0u32;
301 for segment in &self.segments {
302 let segment_docs = segment.meta().num_docs;
303 if doc_id < offset + segment_docs {
304 let local_doc_id = doc_id - offset;
305 return segment.doc(local_doc_id).await;
306 }
307 offset += segment_docs;
308 }
309 Ok(None)
310 }
311
312 pub async fn search(
314 &self,
315 query: &dyn crate::query::Query,
316 limit: usize,
317 ) -> Result<Vec<crate::query::SearchResult>> {
318 let (results, _) = self.search_with_count(query, limit).await?;
319 Ok(results)
320 }
321
322 pub async fn search_with_count(
325 &self,
326 query: &dyn crate::query::Query,
327 limit: usize,
328 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
329 self.search_with_offset_and_count(query, limit, 0).await
330 }
331
332 pub async fn search_with_offset(
334 &self,
335 query: &dyn crate::query::Query,
336 limit: usize,
337 offset: usize,
338 ) -> Result<Vec<crate::query::SearchResult>> {
339 let (results, _) = self
340 .search_with_offset_and_count(query, limit, offset)
341 .await?;
342 Ok(results)
343 }
344
345 pub async fn search_with_offset_and_count(
347 &self,
348 query: &dyn crate::query::Query,
349 limit: usize,
350 offset: usize,
351 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
352 self.search_internal(query, limit, offset, false).await
353 }
354
355 pub async fn search_with_positions(
359 &self,
360 query: &dyn crate::query::Query,
361 limit: usize,
362 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
363 self.search_internal(query, limit, 0, true).await
364 }
365
366 async fn search_internal(
368 &self,
369 query: &dyn crate::query::Query,
370 limit: usize,
371 offset: usize,
372 collect_positions: bool,
373 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
374 let fetch_limit = offset + limit;
375
376 let futures: Vec<_> = self
377 .segments
378 .iter()
379 .map(|segment| {
380 let sid = segment.meta().id;
381 async move {
382 let (results, segment_seen) = if collect_positions {
383 crate::query::search_segment_with_positions_and_count(
384 segment.as_ref(),
385 query,
386 fetch_limit,
387 )
388 .await?
389 } else {
390 crate::query::search_segment_with_count(
391 segment.as_ref(),
392 query,
393 fetch_limit,
394 )
395 .await?
396 };
397 Ok::<_, crate::error::Error>((
398 results
399 .into_iter()
400 .map(move |r| (sid, r))
401 .collect::<Vec<_>>(),
402 segment_seen,
403 ))
404 }
405 })
406 .collect();
407
408 let batches = futures::future::try_join_all(futures).await?;
409 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
410 let mut total_seen: u32 = 0;
411 for (batch, segment_seen) in batches {
412 total_seen += segment_seen;
413 all_results.extend(batch);
414 }
415
416 all_results.sort_by(|a, b| {
418 b.1.score
419 .partial_cmp(&a.1.score)
420 .unwrap_or(std::cmp::Ordering::Equal)
421 });
422
423 let results = all_results
425 .into_iter()
426 .skip(offset)
427 .take(limit)
428 .map(|(_, result)| result)
429 .collect();
430
431 Ok((results, total_seen))
432 }
433
434 pub async fn search_and_rerank(
439 &self,
440 query: &dyn crate::query::Query,
441 l1_limit: usize,
442 final_limit: usize,
443 config: &crate::query::RerankerConfig,
444 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
445 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
446 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
447 Ok((reranked, total_seen))
448 }
449
450 pub async fn query(
452 &self,
453 query_str: &str,
454 limit: usize,
455 ) -> Result<crate::query::SearchResponse> {
456 self.query_offset(query_str, limit, 0).await
457 }
458
459 pub async fn query_offset(
461 &self,
462 query_str: &str,
463 limit: usize,
464 offset: usize,
465 ) -> Result<crate::query::SearchResponse> {
466 let parser = self.query_parser();
467 let query = parser
468 .parse(query_str)
469 .map_err(crate::error::Error::Query)?;
470
471 let fetch_limit = offset + limit;
472 let query_ref = query.as_ref();
473
474 let futures: Vec<_> = self
475 .segments
476 .iter()
477 .map(|segment| {
478 let sid = segment.meta().id;
479 async move {
480 let results =
481 crate::query::search_segment(segment.as_ref(), query_ref, fetch_limit)
482 .await?;
483 Ok::<_, crate::error::Error>(
484 results
485 .into_iter()
486 .map(move |r| (sid, r))
487 .collect::<Vec<_>>(),
488 )
489 }
490 })
491 .collect();
492
493 let batches = futures::future::try_join_all(futures).await?;
494 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
495 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
496 for batch in batches {
497 all_results.extend(batch);
498 }
499
500 all_results.sort_by(|a, b| {
501 b.1.score
502 .partial_cmp(&a.1.score)
503 .unwrap_or(std::cmp::Ordering::Equal)
504 });
505
506 let total_hits = all_results.len() as u32;
507
508 let hits: Vec<crate::query::SearchHit> = all_results
509 .into_iter()
510 .skip(offset)
511 .take(limit)
512 .map(|(segment_id, result)| crate::query::SearchHit {
513 address: crate::query::DocAddress::new(segment_id, result.doc_id),
514 score: result.score,
515 matched_fields: result.extract_ordinals(),
516 })
517 .collect();
518
519 Ok(crate::query::SearchResponse { hits, total_hits })
520 }
521
522 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
524 let query_routers = self.schema.query_routers();
525 if !query_routers.is_empty()
526 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
527 {
528 return crate::dsl::QueryLanguageParser::with_router(
529 Arc::clone(&self.schema),
530 self.default_fields.clone(),
531 Arc::clone(&self.tokenizers),
532 router,
533 );
534 }
535
536 crate::dsl::QueryLanguageParser::new(
537 Arc::clone(&self.schema),
538 self.default_fields.clone(),
539 Arc::clone(&self.tokenizers),
540 )
541 }
542
543 pub async fn get_document(
548 &self,
549 address: &crate::query::DocAddress,
550 ) -> Result<Option<crate::dsl::Document>> {
551 let segment_id = address.segment_id_u128().ok_or_else(|| {
552 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
553 })?;
554
555 for segment in &self.segments {
556 if segment.meta().id == segment_id {
557 let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
559 return segment.doc(local_doc_id).await;
560 }
561 }
562
563 Ok(None)
564 }
565}