1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot<D>,
27 #[cfg(not(feature = "native"))]
29 _phantom: std::marker::PhantomData<D>,
30 segments: Vec<Arc<SegmentReader>>,
32 schema: Arc<Schema>,
34 default_fields: Vec<crate::Field>,
36 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
38 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
40 global_stats: Arc<LazyGlobalStats>,
42 segment_map: FxHashMap<u128, usize>,
44 doc_id_cumulative: Vec<u32>,
46}
47
48impl<D: Directory + 'static> Searcher<D> {
49 pub async fn open(
54 directory: Arc<D>,
55 schema: Arc<Schema>,
56 segment_ids: &[String],
57 term_cache_blocks: usize,
58 ) -> Result<Self> {
59 Self::create(
60 directory,
61 schema,
62 segment_ids,
63 FxHashMap::default(),
64 term_cache_blocks,
65 )
66 .await
67 }
68
69 #[cfg(feature = "native")]
71 pub(crate) async fn from_snapshot(
72 directory: Arc<D>,
73 schema: Arc<Schema>,
74 snapshot: SegmentSnapshot<D>,
75 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
76 term_cache_blocks: usize,
77 ) -> Result<Self> {
78 let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
79 Self::load_common(
80 &directory,
81 &schema,
82 snapshot.segment_ids(),
83 &trained_centroids,
84 term_cache_blocks,
85 )
86 .await;
87
88 Ok(Self {
89 _snapshot: snapshot,
90 segments,
91 schema,
92 default_fields,
93 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94 trained_centroids,
95 global_stats,
96 segment_map,
97 doc_id_cumulative,
98 })
99 }
100
101 async fn create(
103 directory: Arc<D>,
104 schema: Arc<Schema>,
105 segment_ids: &[String],
106 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
107 term_cache_blocks: usize,
108 ) -> Result<Self> {
109 let (segments, default_fields, global_stats, segment_map, doc_id_cumulative) =
110 Self::load_common(
111 &directory,
112 &schema,
113 segment_ids,
114 &trained_centroids,
115 term_cache_blocks,
116 )
117 .await;
118
119 #[cfg(feature = "native")]
120 {
121 let tracker = Arc::new(SegmentTracker::new());
122 let snapshot = SegmentSnapshot::new(tracker, segment_ids.to_vec());
123 Ok(Self {
124 _snapshot: snapshot,
125 segments,
126 schema,
127 default_fields,
128 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
129 trained_centroids,
130 global_stats,
131 segment_map,
132 doc_id_cumulative,
133 })
134 }
135
136 #[cfg(not(feature = "native"))]
137 {
138 let _ = directory; Ok(Self {
140 _phantom: std::marker::PhantomData,
141 segments,
142 schema,
143 default_fields,
144 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
145 trained_centroids,
146 global_stats,
147 segment_map,
148 doc_id_cumulative,
149 })
150 }
151 }
152
153 async fn load_common(
155 directory: &Arc<D>,
156 schema: &Arc<Schema>,
157 segment_ids: &[String],
158 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
159 term_cache_blocks: usize,
160 ) -> (
161 Vec<Arc<SegmentReader>>,
162 Vec<crate::Field>,
163 Arc<LazyGlobalStats>,
164 FxHashMap<u128, usize>,
165 Vec<u32>,
166 ) {
167 let segments = Self::load_segments(
168 directory,
169 schema,
170 segment_ids,
171 trained_centroids,
172 term_cache_blocks,
173 )
174 .await;
175 let default_fields = Self::build_default_fields(schema);
176 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
177 let (segment_map, doc_id_cumulative) = Self::build_lookup_tables(&segments);
178 (
179 segments,
180 default_fields,
181 global_stats,
182 segment_map,
183 doc_id_cumulative,
184 )
185 }
186
187 async fn load_segments(
189 directory: &Arc<D>,
190 schema: &Arc<Schema>,
191 segment_ids: &[String],
192 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
193 term_cache_blocks: usize,
194 ) -> Vec<Arc<SegmentReader>> {
195 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
197 .iter()
198 .enumerate()
199 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
200 .collect();
201
202 let futures: Vec<_> =
204 valid_segments
205 .iter()
206 .map(|(_, segment_id)| {
207 let dir = Arc::clone(directory);
208 let sch = Arc::clone(schema);
209 let sid = *segment_id;
210 async move {
211 SegmentReader::open(dir.as_ref(), sid, sch, 0, term_cache_blocks).await
212 }
213 })
214 .collect();
215
216 let results = futures::future::join_all(futures).await;
217
218 let mut loaded: Vec<(usize, SegmentReader)> = valid_segments
220 .into_iter()
221 .zip(results)
222 .filter_map(|((idx, _), result)| match result {
223 Ok(reader) => Some((idx, reader)),
224 Err(e) => {
225 log::warn!("Failed to open segment: {:?}", e);
226 None
227 }
228 })
229 .collect();
230
231 loaded.sort_by_key(|(idx, _)| *idx);
233
234 let mut doc_id_offset = 0u32;
236 let mut segments = Vec::with_capacity(loaded.len());
237 for (_, mut reader) in loaded {
238 reader.set_doc_id_offset(doc_id_offset);
239 doc_id_offset += reader.meta().num_docs;
240 if !trained_centroids.is_empty() {
242 reader.set_coarse_centroids(trained_centroids.clone());
243 }
244 segments.push(Arc::new(reader));
245 }
246
247 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
249 let total_sparse_mem: usize = segments
250 .iter()
251 .flat_map(|s| s.sparse_indexes().values())
252 .map(|idx| idx.num_dimensions() * 12)
253 .sum();
254 log::info!(
255 "[searcher] loaded {} segments: total_docs={}, sparse_index_mem={:.2} MB",
256 segments.len(),
257 total_docs,
258 total_sparse_mem as f64 / (1024.0 * 1024.0)
259 );
260
261 segments
262 }
263
264 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
266 if !schema.default_fields().is_empty() {
267 schema.default_fields().to_vec()
268 } else {
269 schema
270 .fields()
271 .filter(|(_, entry)| {
272 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
273 })
274 .map(|(field, _)| field)
275 .collect()
276 }
277 }
278
279 pub fn schema(&self) -> &Schema {
281 &self.schema
282 }
283
284 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
286 &self.segments
287 }
288
289 pub fn default_fields(&self) -> &[crate::Field] {
291 &self.default_fields
292 }
293
294 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
296 &self.tokenizers
297 }
298
299 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
301 &self.trained_centroids
302 }
303
304 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
306 &self.global_stats
307 }
308
309 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, Vec<u32>) {
311 let mut segment_map = FxHashMap::default();
312 let mut cumulative = Vec::with_capacity(segments.len());
313 let mut acc = 0u32;
314 for (i, seg) in segments.iter().enumerate() {
315 segment_map.insert(seg.meta().id, i);
316 acc += seg.meta().num_docs;
317 cumulative.push(acc);
318 }
319 (segment_map, cumulative)
320 }
321
322 pub fn num_docs(&self) -> u32 {
324 self.doc_id_cumulative.last().copied().unwrap_or(0)
325 }
326
327 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
329 &self.segment_map
330 }
331
332 pub fn num_segments(&self) -> usize {
334 self.segments.len()
335 }
336
337 pub async fn doc(&self, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
339 let idx = self.doc_id_cumulative.partition_point(|&cum| cum <= doc_id);
340 if idx >= self.segments.len() {
341 return Ok(None);
342 }
343 let base = if idx == 0 {
344 0
345 } else {
346 self.doc_id_cumulative[idx - 1]
347 };
348 self.segments[idx].doc(doc_id - base).await
349 }
350
351 pub async fn search(
353 &self,
354 query: &dyn crate::query::Query,
355 limit: usize,
356 ) -> Result<Vec<crate::query::SearchResult>> {
357 let (results, _) = self.search_with_count(query, limit).await?;
358 Ok(results)
359 }
360
361 pub async fn search_with_count(
364 &self,
365 query: &dyn crate::query::Query,
366 limit: usize,
367 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
368 self.search_with_offset_and_count(query, limit, 0).await
369 }
370
371 pub async fn search_with_offset(
373 &self,
374 query: &dyn crate::query::Query,
375 limit: usize,
376 offset: usize,
377 ) -> Result<Vec<crate::query::SearchResult>> {
378 let (results, _) = self
379 .search_with_offset_and_count(query, limit, offset)
380 .await?;
381 Ok(results)
382 }
383
384 pub async fn search_with_offset_and_count(
386 &self,
387 query: &dyn crate::query::Query,
388 limit: usize,
389 offset: usize,
390 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391 self.search_internal(query, limit, offset, false).await
392 }
393
394 pub async fn search_with_positions(
398 &self,
399 query: &dyn crate::query::Query,
400 limit: usize,
401 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402 self.search_internal(query, limit, 0, true).await
403 }
404
405 async fn search_internal(
407 &self,
408 query: &dyn crate::query::Query,
409 limit: usize,
410 offset: usize,
411 collect_positions: bool,
412 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
413 let fetch_limit = offset + limit;
414
415 let futures: Vec<_> = self
416 .segments
417 .iter()
418 .map(|segment| {
419 let sid = segment.meta().id;
420 async move {
421 let (mut results, segment_seen) = if collect_positions {
422 crate::query::search_segment_with_positions_and_count(
423 segment.as_ref(),
424 query,
425 fetch_limit,
426 )
427 .await?
428 } else {
429 crate::query::search_segment_with_count(
430 segment.as_ref(),
431 query,
432 fetch_limit,
433 )
434 .await?
435 };
436 for r in &mut results {
438 r.segment_id = sid;
439 }
440 Ok::<_, crate::error::Error>((results, segment_seen))
441 }
442 })
443 .collect();
444
445 let batches = futures::future::try_join_all(futures).await?;
446 let mut all_results: Vec<crate::query::SearchResult> = Vec::new();
447 let mut total_seen: u32 = 0;
448 for (batch, segment_seen) in batches {
449 total_seen += segment_seen;
450 all_results.extend(batch);
451 }
452
453 all_results.sort_by(|a, b| {
455 b.score
456 .partial_cmp(&a.score)
457 .unwrap_or(std::cmp::Ordering::Equal)
458 });
459
460 let results = all_results.into_iter().skip(offset).take(limit).collect();
462
463 Ok((results, total_seen))
464 }
465
466 pub async fn search_and_rerank(
471 &self,
472 query: &dyn crate::query::Query,
473 l1_limit: usize,
474 final_limit: usize,
475 config: &crate::query::RerankerConfig,
476 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
477 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
478 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
479 Ok((reranked, total_seen))
480 }
481
482 pub async fn query(
484 &self,
485 query_str: &str,
486 limit: usize,
487 ) -> Result<crate::query::SearchResponse> {
488 self.query_offset(query_str, limit, 0).await
489 }
490
491 pub async fn query_offset(
493 &self,
494 query_str: &str,
495 limit: usize,
496 offset: usize,
497 ) -> Result<crate::query::SearchResponse> {
498 let parser = self.query_parser();
499 let query = parser
500 .parse(query_str)
501 .map_err(crate::error::Error::Query)?;
502
503 let (results, _total_seen) = self
504 .search_internal(query.as_ref(), limit, offset, false)
505 .await?;
506
507 let total_hits = results.len() as u32;
508 let hits: Vec<crate::query::SearchHit> = results
509 .into_iter()
510 .map(|result| crate::query::SearchHit {
511 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
512 score: result.score,
513 matched_fields: result.extract_ordinals(),
514 })
515 .collect();
516
517 Ok(crate::query::SearchResponse { hits, total_hits })
518 }
519
520 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
522 let query_routers = self.schema.query_routers();
523 if !query_routers.is_empty()
524 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
525 {
526 return crate::dsl::QueryLanguageParser::with_router(
527 Arc::clone(&self.schema),
528 self.default_fields.clone(),
529 Arc::clone(&self.tokenizers),
530 router,
531 );
532 }
533
534 crate::dsl::QueryLanguageParser::new(
535 Arc::clone(&self.schema),
536 self.default_fields.clone(),
537 Arc::clone(&self.tokenizers),
538 )
539 }
540
541 pub async fn get_document(
546 &self,
547 address: &crate::query::DocAddress,
548 ) -> Result<Option<crate::dsl::Document>> {
549 self.get_document_with_fields(address, None).await
550 }
551
552 pub async fn get_document_with_fields(
558 &self,
559 address: &crate::query::DocAddress,
560 fields: Option<&rustc_hash::FxHashSet<u32>>,
561 ) -> Result<Option<crate::dsl::Document>> {
562 let segment_id = address.segment_id_u128().ok_or_else(|| {
563 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
564 })?;
565
566 if let Some(&idx) = self.segment_map.get(&segment_id) {
567 let segment = &self.segments[idx];
568 let local_doc_id = address.doc_id.wrapping_sub(segment.doc_id_offset());
569 return segment.doc_with_fields(local_doc_id, fields).await;
570 }
571
572 Ok(None)
573 }
574}