1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 doc_id_cumulative: Vec<u32>,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
78 Self::load_common(
79 &directory,
80 &schema,
81 snapshot.segment_ids(),
82 &trained_centroids,
83 term_cache_blocks,
84 )
85 .await;
86
87 Ok(Self {
88 _snapshot: snapshot,
89 _phantom: std::marker::PhantomData,
90 segments,
91 schema,
92 default_fields,
93 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94 trained_centroids,
95 global_stats,
96 segment_map,
97 doc_id_cumulative,
98 })
99 }
100
101 async fn create(
103 directory: Arc<D>,
104 schema: Arc<Schema>,
105 segment_ids: &[String],
106 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107 term_cache_blocks: usize,
108 ) -> Result<Self> {
109 let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
110 Self::load_common(
111 &directory,
112 &schema,
113 segment_ids,
114 &trained_centroids,
115 term_cache_blocks,
116 )
117 .await;
118
119 #[cfg(feature = "native")]
120 let _snapshot = {
121 let tracker = Arc::new(SegmentTracker::new());
122 SegmentSnapshot::new(tracker, segment_ids.to_vec())
123 };
124
125 let _ = directory; Ok(Self {
127 #[cfg(feature = "native")]
128 _snapshot,
129 _phantom: std::marker::PhantomData,
130 segments,
131 schema,
132 default_fields,
133 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
134 trained_centroids,
135 global_stats,
136 segment_map,
137 doc_id_cumulative,
138 })
139 }
140
141 async fn load_common(
143 directory: &Arc<D>,
144 schema: &Arc<Schema>,
145 segment_ids: &[String],
146 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
147 term_cache_blocks: usize,
148 ) -> (
149 Vec<Arc<SegmentReader>>,
150 Vec<crate::Field>,
151 Arc<LazyGlobalStats>,
152 FxHashMap<u128, usize>,
153 Vec<u32>,
154 ) {
155 let segments = Self::load_segments(
156 directory,
157 schema,
158 segment_ids,
159 trained_centroids,
160 term_cache_blocks,
161 )
162 .await;
163 let default_fields = Self::build_default_fields(schema);
164 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
165 let (segment_map, doc_id_cumulative) = Self::build_lookup_tables(&segments);
166 (
167 segments,
168 default_fields,
169 global_stats,
170 segment_map,
171 doc_id_cumulative,
172 )
173 }
174
175 async fn load_segments(
177 directory: &Arc<D>,
178 schema: &Arc<Schema>,
179 segment_ids: &[String],
180 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
181 term_cache_blocks: usize,
182 ) -> Vec<Arc<SegmentReader>> {
183 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
185 .iter()
186 .enumerate()
187 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
188 .collect();
189
190 let futures: Vec<_> =
192 valid_segments
193 .iter()
194 .map(|(_, segment_id)| {
195 let dir = Arc::clone(directory);
196 let sch = Arc::clone(schema);
197 let sid = *segment_id;
198 async move {
199 SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
200 }
201 })
202 .collect();
203
204 let results = futures::future::join_all(futures).await;
205
206 let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
208 .into_iter()
209 .zip(results)
210 .filter_map(|((idx, _), result)| match result {
211 Ok(reader) => Some((idx, reader)),
212 Err(e) => {
213 log::warn!("Failed to open segment: {:?}", e);
214 None
215 }
216 })
217 .collect();
218
219 loaded.sort_by_key(|(idx, _)| *idx);
221
222 let mut doc_id_offset = 0u32;
224 let mut segments = Vec::with_capacity(loaded.len());
225 for (_, mut reader) in loaded {
226 reader.set_doc_id_offset(doc_id_offset);
227 doc_id_offset += reader.meta().num_docs;
228 if !trained_centroids.is_empty() {
230 reader.set_coarse_centroids(trained_centroids.clone());
231 }
232 segments.push(Arc::new(reader));
233 }
234
235 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
237 let total_sparse_mem: usize = segments
238 .iter()
239 .flat_map(|s| s.sparse_indexes().values())
240 .map(|idx| idx.num_dimensions() * 12)
241 .sum();
242 log::info!(
243 "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
244 segments.len(),
245 total_docs,
246 total_sparse_mem as f64 / (1024.0 * 1024.0)
247 );
248
249 segments
250 }
251
252 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
254 if !schema.default_fields().is_empty() {
255 schema.default_fields().to_vec()
256 } else {
257 schema
258 .fields()
259 .filter(|(_, entry)| {
260 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
261 })
262 .map(|(field, _)| field)
263 .collect()
264 }
265 }
266
267 pub fn schema(&self) -> &Schema {
269 &self.schema
270 }
271
272 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
274 &self.segments
275 }
276
277 pub fn default_fields(&self) -> &[crate::Field] {
279 &self.default_fields
280 }
281
282 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
284 &self.tokenizers
285 }
286
287 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
289 &self.trained_centroids
290 }
291
292 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
294 &self.global_stats
295 }
296
297 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, Vec<u32>) {
299 let mut segment_map = FxHashMap::default();
300 let mut cumulative = Vec::with_capacity(segments.len());
301 let mut acc = 0u32;
302 for (i, seg) in segments.iter().enumerate() {
303 segment_map.insert(seg.meta().id, i);
304 acc += seg.meta().num_docs;
305 cumulative.push(acc);
306 }
307 (segment_map, cumulative)
308 }
309
310 pub fn num_docs(&self) -> u32 {
312 self.doc_id_cumulative.last().copied().unwrap_or(0)
313 }
314
315 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
317 &self.segment_map
318 }
319
320 pub fn num_segments(&self) -> usize {
322 self.segments.len()
323 }
324
325 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
327 let idx = self.doc_id_cumulative.partition_point(|&cum| cum <= doc_id);
328 if idx >= self.segments.len() {
329 return Ok(None);
330 }
331 let base = if idx == 0 {
332 0
333 } else {
334 self.doc_id_cumulative[idx - 1]
335 };
336 self.segments[idx].doc(doc_id - base).await
337 }
338
339 pub async fn search(
341 &self,
342 query: &dyn crate::query::Query,
343 limit: usize,
344 ) -> Result<Vec<crate::query::SearchResult>> {
345 let (results, _) = self.search_with_count(query, limit).await?;
346 Ok(results)
347 }
348
349 pub async fn search_with_count(
352 &self,
353 query: &dyn crate::query::Query,
354 limit: usize,
355 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
356 self.search_with_offset_and_count(query, limit, 0).await
357 }
358
359 pub async fn search_with_offset(
361 &self,
362 query: &dyn crate::query::Query,
363 limit: usize,
364 offset: usize,
365 ) -> Result<Vec<crate::query::SearchResult>> {
366 let (results, _) = self
367 .search_with_offset_and_count(query, limit, offset)
368 .await?;
369 Ok(results)
370 }
371
372 pub async fn search_with_offset_and_count(
374 &self,
375 query: &dyn crate::query::Query,
376 limit: usize,
377 offset: usize,
378 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
379 self.search_internal(query, limit, offset, false).await
380 }
381
382 pub async fn search_with_positions(
386 &self,
387 query: &dyn crate::query::Query,
388 limit: usize,
389 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
390 self.search_internal(query, limit, 0, true).await
391 }
392
393 async fn search_internal(
395 &self,
396 query: &dyn crate::query::Query,
397 limit: usize,
398 offset: usize,
399 collect_positions: bool,
400 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
401 let fetch_limit = offset + limit;
402
403 let futures: Vec<_> = self
404 .segments
405 .iter()
406 .map(|segment| {
407 let sid = segment.meta().id;
408 async move {
409 let (mut results, segment_seen) = if collect_positions {
410 crate::query::search_segment_with_positions_and_count(
411 segment.as_ref(),
412 query,
413 fetch_limit,
414 )
415 .await?
416 } else {
417 crate::query::search_segment_with_count(
418 segment.as_ref(),
419 query,
420 fetch_limit,
421 )
422 .await?
423 };
424 for r in &mut results {
426 r.segment_id = sid;
427 }
428 Ok::<_, crate::error::Error>((results, segment_seen))
429 }
430 })
431 .collect();
432
433 let batches = futures::future::try_join_all(futures).await?;
434 let mut all_results: Vec<crate::query::SearchResult> = Vec::new();
435 let mut total_seen: u32 = 0;
436 for (batch, segment_seen) in batches {
437 total_seen += segment_seen;
438 all_results.extend(batch);
439 }
440
441 all_results.sort_by(|a, b| {
443 b.score
444 .partial_cmp(&a.score)
445 .unwrap_or(std::cmp::Ordering::Equal)
446 });
447
448 let results = all_results.into_iter().skip(offset).take(limit).collect();
450
451 Ok((results, total_seen))
452 }
453
454 pub async fn search_and_rerank(
459 &self,
460 query: &dyn crate::query::Query,
461 l1_limit: usize,
462 final_limit: usize,
463 config: &crate::query::RerankerConfig,
464 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
465 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
466 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
467 Ok((reranked, total_seen))
468 }
469
470 pub async fn query(
472 &self,
473 query_str: &str,
474 limit: usize,
475 ) -> Result<crate::query::SearchResponse> {
476 self.query_offset(query_str, limit, 0).await
477 }
478
479 pub async fn query_offset(
481 &self,
482 query_str: &str,
483 limit: usize,
484 offset: usize,
485 ) -> Result<crate::query::SearchResponse> {
486 let parser = self.query_parser();
487 let query = parser
488 .parse(query_str)
489 .map_err(crate::error::Error::Query)?;
490
491 let (results, _total_seen) = self
492 .search_internal(query.as_ref(), limit, offset, false)
493 .await?;
494
495 let total_hits = results.len() as u32;
496 let hits: Vec<crate::query::SearchHit> = results
497 .into_iter()
498 .map(|result| crate::query::SearchHit {
499 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
500 score: result.score,
501 matched_fields: result.extract_ordinals(),
502 })
503 .collect();
504
505 Ok(crate::query::SearchResponse { hits, total_hits })
506 }
507
508 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
510 let query_routers = self.schema.query_routers();
511 if !query_routers.is_empty()
512 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
513 {
514 return crate::dsl::QueryLanguageParser::with_router(
515 Arc::clone(&self.schema),
516 self.default_fields.clone(),
517 Arc::clone(&self.tokenizers),
518 router,
519 );
520 }
521
522 crate::dsl::QueryLanguageParser::new(
523 Arc::clone(&self.schema),
524 self.default_fields.clone(),
525 Arc::clone(&self.tokenizers),
526 )
527 }
528
529 pub async fn get_document(
534 &self,
535 address: &crate::query::DocAddress,
536 ) -> Result<Option<crate::dsl::Document>> {
537 self.get_document_with_fields(address, None).await
538 }
539
540 pub async fn get_document_with_fields(
546 &self,
547 address: &crate::query::DocAddress,
548 fields: Option<&rustc_hash::FxHashSet<u32>>,
549 ) -> Result<Option<crate::dsl::Document>> {
550 let segment_id = address.segment_id_u128().ok_or_else(|| {
551 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
552 })?;
553
554 if let Some(&idx) = self.segment_map.get(&segment_id) {
555 let segment = &self.segments[idx];
556 let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
557 return segment.doc_with_fields(local_doc_id, fields).await;
558 }
559
560 Ok(None)
561 }
562}