1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::{CoarseCentroids, PQCodebook};
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot<D>,
27 #[cfg(not(feature = "native"))]
29 _phantom: std::marker::PhantomData<D>,
30 segments: Vec<Arc<SegmentReader>>,
32 schema: Arc<Schema>,
34 default_fields: Vec<crate::Field>,
36 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
42 global_stats: Arc<LazyGlobalStats>,
44}
45
46impl<D: Directory + 'static> Searcher<D> {
47 pub async fn open(
52 directory: Arc<D>,
53 schema: Arc<Schema>,
54 segment_ids: &[String],
55 term_cache_blocks: usize,
56 ) -> Result<Self> {
57 Self::create(
58 directory,
59 schema,
60 segment_ids,
61 FxHashMap::default(),
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot<D>,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
76 term_cache_blocks: usize,
77 ) -> Result<Self> {
78 let (segments, default_fields, global_stats) = Self::load_common(
79 &directory,
80 &schema,
81 snapshot.segment_ids(),
82 term_cache_blocks,
83 )
84 .await;
85
86 Ok(Self {
87 _snapshot: snapshot,
88 segments,
89 schema,
90 default_fields,
91 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
92 trained_centroids,
93 trained_codebooks,
94 global_stats,
95 })
96 }
97
98 async fn create(
100 directory: Arc<D>,
101 schema: Arc<Schema>,
102 segment_ids: &[String],
103 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
104 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
105 term_cache_blocks: usize,
106 ) -> Result<Self> {
107 let (segments, default_fields, global_stats) =
108 Self::load_common(&directory, &schema, segment_ids, term_cache_blocks).await;
109
110 #[cfg(feature = "native")]
111 {
112 let tracker = Arc::new(SegmentTracker::new());
113 let snapshot = SegmentSnapshot::new(tracker, directory, segment_ids.to_vec());
114 Ok(Self {
115 _snapshot: snapshot,
116 segments,
117 schema,
118 default_fields,
119 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
120 trained_centroids,
121 trained_codebooks,
122 global_stats,
123 })
124 }
125
126 #[cfg(not(feature = "native"))]
127 {
128 let _ = directory; Ok(Self {
130 _phantom: std::marker::PhantomData,
131 segments,
132 schema,
133 default_fields,
134 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135 trained_centroids,
136 trained_codebooks,
137 global_stats,
138 })
139 }
140 }
141
142 async fn load_common(
144 directory: &Arc<D>,
145 schema: &Arc<Schema>,
146 segment_ids: &[String],
147 term_cache_blocks: usize,
148 ) -> (
149 Vec<Arc<SegmentReader>>,
150 Vec<crate::Field>,
151 Arc<LazyGlobalStats>,
152 ) {
153 let segments = Self::load_segments(directory, schema, segment_ids, term_cache_blocks).await;
154 let default_fields = Self::build_default_fields(schema);
155 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
156 (segments, default_fields, global_stats)
157 }
158
159 async fn load_segments(
161 directory: &Arc<D>,
162 schema: &Arc<Schema>,
163 segment_ids: &[String],
164 term_cache_blocks: usize,
165 ) -> Vec<Arc<SegmentReader>> {
166 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
168 .iter()
169 .enumerate()
170 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
171 .collect();
172
173 let futures: Vec<_> =
175 valid_segments
176 .iter()
177 .map(|(_, segment_id)| {
178 let dir = Arc::clone(directory);
179 let sch = Arc::clone(schema);
180 let sid = *segment_id;
181 async move {
182 SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
183 }
184 })
185 .collect();
186
187 let results = futures::future::join_all(futures).await;
188
189 let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
191 .into_iter()
192 .zip(results)
193 .filter_map(|((idx, _), result)| match result {
194 Ok(reader) => Some((idx, reader)),
195 Err(e) => {
196 log::warn!("Failed to open segment: {:?}", e);
197 None
198 }
199 })
200 .collect();
201
202 loaded.sort_by_key(|(idx, _)| *idx);
204
205 let mut doc_id_offset = 0u32;
207 let mut segments = Vec::with_capacity(loaded.len());
208 for (_, mut reader) in loaded {
209 reader.set_doc_id_offset(doc_id_offset);
210 doc_id_offset += reader.meta().num_docs;
211 segments.push(Arc::new(reader));
212 }
213
214 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
216 let total_sparse_mem: usize = segments
217 .iter()
218 .flat_map(|s| s.sparse_indexes().values())
219 .map(|idx| idx.num_dimensions() * 12)
220 .sum();
221 log::info!(
222 "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
223 segments.len(),
224 total_docs,
225 total_sparse_mem as f64 / (1024.0 * 1024.0)
226 );
227
228 segments
229 }
230
231 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
233 if !schema.default_fields().is_empty() {
234 schema.default_fields().to_vec()
235 } else {
236 schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn schema(&self) -> &Schema {
248 &self.schema
249 }
250
251 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
253 &self.segments
254 }
255
256 pub fn default_fields(&self) -> &[crate::Field] {
258 &self.default_fields
259 }
260
261 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
263 &self.tokenizers
264 }
265
266 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
268 &self.trained_centroids
269 }
270
271 pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
273 &self.trained_codebooks
274 }
275
276 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
278 &self.global_stats
279 }
280
281 pub fn num_docs(&self) -> u32 {
283 self.segments.iter().map(|s| s.meta().num_docs).sum()
284 }
285
286 pub fn num_segments(&self) -> usize {
288 self.segments.len()
289 }
290
291 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
293 let mut offset = 0u32;
294 for segment in &self.segments {
295 let segment_docs = segment.meta().num_docs;
296 if doc_id < offset + segment_docs {
297 let local_doc_id = doc_id - offset;
298 return segment.doc(local_doc_id).await;
299 }
300 offset += segment_docs;
301 }
302 Ok(None)
303 }
304
305 pub async fn search(
307 &self,
308 query: &dyn crate::query::Query,
309 limit: usize,
310 ) -> Result<Vec<crate::query::SearchResult>> {
311 let (results, _) = self.search_with_count(query, limit).await?;
312 Ok(results)
313 }
314
315 pub async fn search_with_count(
318 &self,
319 query: &dyn crate::query::Query,
320 limit: usize,
321 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
322 self.search_with_offset_and_count(query, limit, 0).await
323 }
324
325 pub async fn search_with_offset(
327 &self,
328 query: &dyn crate::query::Query,
329 limit: usize,
330 offset: usize,
331 ) -> Result<Vec<crate::query::SearchResult>> {
332 let (results, _) = self
333 .search_with_offset_and_count(query, limit, offset)
334 .await?;
335 Ok(results)
336 }
337
338 pub async fn search_with_offset_and_count(
340 &self,
341 query: &dyn crate::query::Query,
342 limit: usize,
343 offset: usize,
344 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
345 self.search_internal(query, limit, offset, false).await
346 }
347
348 pub async fn search_with_positions(
352 &self,
353 query: &dyn crate::query::Query,
354 limit: usize,
355 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
356 self.search_internal(query, limit, 0, true).await
357 }
358
359 async fn search_internal(
361 &self,
362 query: &dyn crate::query::Query,
363 limit: usize,
364 offset: usize,
365 collect_positions: bool,
366 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
367 let fetch_limit = offset + limit;
368
369 let futures: Vec<_> = self
370 .segments
371 .iter()
372 .map(|segment| {
373 let sid = segment.meta().id;
374 async move {
375 let (results, segment_seen) = if collect_positions {
376 crate::query::search_segment_with_positions_and_count(
377 segment.as_ref(),
378 query,
379 fetch_limit,
380 )
381 .await?
382 } else {
383 crate::query::search_segment_with_count(
384 segment.as_ref(),
385 query,
386 fetch_limit,
387 )
388 .await?
389 };
390 Ok::<_, crate::error::Error>((
391 results
392 .into_iter()
393 .map(move |r| (sid, r))
394 .collect::<Vec<_>>(),
395 segment_seen,
396 ))
397 }
398 })
399 .collect();
400
401 let batches = futures::future::try_join_all(futures).await?;
402 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
403 let mut total_seen: u32 = 0;
404 for (batch, segment_seen) in batches {
405 total_seen += segment_seen;
406 all_results.extend(batch);
407 }
408
409 all_results.sort_by(|a, b| {
411 b.1.score
412 .partial_cmp(&a.1.score)
413 .unwrap_or(std::cmp::Ordering::Equal)
414 });
415
416 let results = all_results
418 .into_iter()
419 .skip(offset)
420 .take(limit)
421 .map(|(_, result)| result)
422 .collect();
423
424 Ok((results, total_seen))
425 }
426
427 pub async fn search_and_rerank(
432 &self,
433 query: &dyn crate::query::Query,
434 l1_limit: usize,
435 final_limit: usize,
436 config: &crate::query::RerankerConfig,
437 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
438 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
439 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
440 Ok((reranked, total_seen))
441 }
442
443 pub async fn query(
445 &self,
446 query_str: &str,
447 limit: usize,
448 ) -> Result<crate::query::SearchResponse> {
449 self.query_offset(query_str, limit, 0).await
450 }
451
452 pub async fn query_offset(
454 &self,
455 query_str: &str,
456 limit: usize,
457 offset: usize,
458 ) -> Result<crate::query::SearchResponse> {
459 let parser = self.query_parser();
460 let query = parser
461 .parse(query_str)
462 .map_err(crate::error::Error::Query)?;
463
464 let fetch_limit = offset + limit;
465 let query_ref = query.as_ref();
466
467 let futures: Vec<_> = self
468 .segments
469 .iter()
470 .map(|segment| {
471 let sid = segment.meta().id;
472 async move {
473 let results =
474 crate::query::search_segment(segment.as_ref(), query_ref, fetch_limit)
475 .await?;
476 Ok::<_, crate::error::Error>(
477 results
478 .into_iter()
479 .map(move |r| (sid, r))
480 .collect::<Vec<_>>(),
481 )
482 }
483 })
484 .collect();
485
486 let batches = futures::future::try_join_all(futures).await?;
487 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
488 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
489 for batch in batches {
490 all_results.extend(batch);
491 }
492
493 all_results.sort_by(|a, b| {
494 b.1.score
495 .partial_cmp(&a.1.score)
496 .unwrap_or(std::cmp::Ordering::Equal)
497 });
498
499 let total_hits = all_results.len() as u32;
500
501 let hits: Vec<crate::query::SearchHit> = all_results
502 .into_iter()
503 .skip(offset)
504 .take(limit)
505 .map(|(segment_id, result)| crate::query::SearchHit {
506 address: crate::query::DocAddress::new(segment_id, result.doc_id),
507 score: result.score,
508 matched_fields: result.extract_ordinals(),
509 })
510 .collect();
511
512 Ok(crate::query::SearchResponse { hits, total_hits })
513 }
514
515 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
517 let query_routers = self.schema.query_routers();
518 if !query_routers.is_empty()
519 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
520 {
521 return crate::dsl::QueryLanguageParser::with_router(
522 Arc::clone(&self.schema),
523 self.default_fields.clone(),
524 Arc::clone(&self.tokenizers),
525 router,
526 );
527 }
528
529 crate::dsl::QueryLanguageParser::new(
530 Arc::clone(&self.schema),
531 self.default_fields.clone(),
532 Arc::clone(&self.tokenizers),
533 )
534 }
535
536 pub async fn get_document(
541 &self,
542 address: &crate::query::DocAddress,
543 ) -> Result<Option<crate::dsl::Document>> {
544 let segment_id = address.segment_id_u128().ok_or_else(|| {
545 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
546 })?;
547
548 for segment in &self.segments {
549 if segment.meta().id == segment_id {
550 let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
552 return segment.doc(local_doc_id).await;
553 }
554 }
555
556 Ok(None)
557 }
558}