1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78 &directory,
79 &schema,
80 snapshot.segment_ids(),
81 &trained_centroids,
82 term_cache_blocks,
83 )
84 .await;
85
86 Ok(Self {
87 _snapshot: snapshot,
88 _phantom: std::marker::PhantomData,
89 segments,
90 schema,
91 default_fields,
92 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93 trained_centroids,
94 global_stats,
95 segment_map,
96 total_docs,
97 })
98 }
99
100 async fn create(
102 directory: Arc<D>,
103 schema: Arc<Schema>,
104 segment_ids: &[String],
105 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106 term_cache_blocks: usize,
107 ) -> Result<Self> {
108 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109 &directory,
110 &schema,
111 segment_ids,
112 &trained_centroids,
113 term_cache_blocks,
114 )
115 .await;
116
117 #[cfg(feature = "native")]
118 let _snapshot = {
119 let tracker = Arc::new(SegmentTracker::new());
120 SegmentSnapshot::new(tracker, segment_ids.to_vec())
121 };
122
123 let _ = directory; Ok(Self {
125 #[cfg(feature = "native")]
126 _snapshot,
127 _phantom: std::marker::PhantomData,
128 segments,
129 schema,
130 default_fields,
131 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132 trained_centroids,
133 global_stats,
134 segment_map,
135 total_docs,
136 })
137 }
138
139 async fn load_common(
141 directory: &Arc<D>,
142 schema: &Arc<Schema>,
143 segment_ids: &[String],
144 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145 term_cache_blocks: usize,
146 ) -> (
147 Vec<Arc<SegmentReader>>,
148 Vec<crate::Field>,
149 Arc<LazyGlobalStats>,
150 FxHashMap<u128, usize>,
151 u32,
152 ) {
153 let segments = Self::load_segments(
154 directory,
155 schema,
156 segment_ids,
157 trained_centroids,
158 term_cache_blocks,
159 )
160 .await;
161 let default_fields = Self::build_default_fields(schema);
162 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163 let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164 (
165 segments,
166 default_fields,
167 global_stats,
168 segment_map,
169 total_docs,
170 )
171 }
172
173 async fn load_segments(
175 directory: &Arc<D>,
176 schema: &Arc<Schema>,
177 segment_ids: &[String],
178 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179 term_cache_blocks: usize,
180 ) -> Vec<Arc<SegmentReader>> {
181 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183 .iter()
184 .enumerate()
185 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186 .collect();
187
188 let futures: Vec<_> = valid_segments
190 .iter()
191 .map(|(_, segment_id)| {
192 let dir = Arc::clone(directory);
193 let sch = Arc::clone(schema);
194 let sid = *segment_id;
195 async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196 })
197 .collect();
198
199 let results = futures::future::join_all(futures).await;
200
201 let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203 for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204 match result {
205 Ok(reader) => loaded.push((idx, reader)),
206 Err(e) => {
207 panic!(
208 "Failed to open segment {:016x}: {:?}. \
209 Refusing to serve with incomplete data.",
210 sid.0, e
211 );
212 }
213 }
214 }
215
216 loaded.sort_by_key(|(idx, _)| *idx);
218
219 let mut segments = Vec::with_capacity(loaded.len());
220 for (_, mut reader) in loaded {
221 if !trained_centroids.is_empty() {
223 reader.set_coarse_centroids(trained_centroids.clone());
224 }
225 segments.push(Arc::new(reader));
226 }
227
228 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
230 let mut total_mem = 0usize;
231 for seg in &segments {
232 let stats = seg.memory_stats();
233 let seg_total = stats.total_bytes();
234 total_mem += seg_total;
235 log::info!(
236 "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
237 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
238 stats.segment_id,
239 stats.num_docs,
240 seg_total as f64 / (1024.0 * 1024.0),
241 stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
242 stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
243 stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
244 stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
245 stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
246 );
247 }
248 let rss_mb = process_rss_mb();
250 log::info!(
251 "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
252 segments.len(),
253 total_docs,
254 total_mem as f64 / (1024.0 * 1024.0),
255 rss_mb,
256 );
257
258 segments
259 }
260
261 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
263 if !schema.default_fields().is_empty() {
264 schema.default_fields().to_vec()
265 } else {
266 schema
267 .fields()
268 .filter(|(_, entry)| {
269 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
270 })
271 .map(|(field, _)| field)
272 .collect()
273 }
274 }
275
276 pub fn schema(&self) -> &Schema {
278 &self.schema
279 }
280
281 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
283 &self.segments
284 }
285
286 pub fn default_fields(&self) -> &[crate::Field] {
288 &self.default_fields
289 }
290
291 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
293 &self.tokenizers
294 }
295
296 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
298 &self.trained_centroids
299 }
300
301 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
303 &self.global_stats
304 }
305
306 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
308 let mut segment_map = FxHashMap::default();
309 let mut total = 0u32;
310 for (i, seg) in segments.iter().enumerate() {
311 segment_map.insert(seg.meta().id, i);
312 total = total.saturating_add(seg.meta().num_docs);
313 }
314 (segment_map, total)
315 }
316
317 pub fn num_docs(&self) -> u32 {
319 self.total_docs
320 }
321
322 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
324 &self.segment_map
325 }
326
327 pub fn num_segments(&self) -> usize {
329 self.segments.len()
330 }
331
332 pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
334 if let Some(&idx) = self.segment_map.get(&segment_id) {
335 return self.segments[idx].doc(doc_id).await;
336 }
337 Ok(None)
338 }
339
340 pub async fn search(
342 &self,
343 query: &dyn crate::query::Query,
344 limit: usize,
345 ) -> Result<Vec<crate::query::SearchResult>> {
346 let (results, _) = self.search_with_count(query, limit).await?;
347 Ok(results)
348 }
349
350 pub async fn search_with_count(
353 &self,
354 query: &dyn crate::query::Query,
355 limit: usize,
356 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
357 self.search_with_offset_and_count(query, limit, 0).await
358 }
359
360 pub async fn search_with_offset(
362 &self,
363 query: &dyn crate::query::Query,
364 limit: usize,
365 offset: usize,
366 ) -> Result<Vec<crate::query::SearchResult>> {
367 let (results, _) = self
368 .search_with_offset_and_count(query, limit, offset)
369 .await?;
370 Ok(results)
371 }
372
373 pub async fn search_with_offset_and_count(
375 &self,
376 query: &dyn crate::query::Query,
377 limit: usize,
378 offset: usize,
379 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
380 self.search_internal(query, limit, offset, false).await
381 }
382
383 pub async fn search_with_positions(
387 &self,
388 query: &dyn crate::query::Query,
389 limit: usize,
390 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391 self.search_internal(query, limit, 0, true).await
392 }
393
394 async fn search_internal(
396 &self,
397 query: &dyn crate::query::Query,
398 limit: usize,
399 offset: usize,
400 collect_positions: bool,
401 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402 let fetch_limit = offset + limit;
403
404 let futures: Vec<_> = self
405 .segments
406 .iter()
407 .map(|segment| {
408 let sid = segment.meta().id;
409 async move {
410 let (mut results, segment_seen) = if collect_positions {
411 crate::query::search_segment_with_positions_and_count(
412 segment.as_ref(),
413 query,
414 fetch_limit,
415 )
416 .await?
417 } else {
418 crate::query::search_segment_with_count(
419 segment.as_ref(),
420 query,
421 fetch_limit,
422 )
423 .await?
424 };
425 for r in &mut results {
427 r.segment_id = sid;
428 }
429 Ok::<_, crate::error::Error>((results, segment_seen))
430 }
431 })
432 .collect();
433
434 let batches = futures::future::try_join_all(futures).await?;
435 let mut total_seen: u32 = 0;
436
437 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
438 Vec::with_capacity(batches.len());
439 for (batch, segment_seen) in batches {
440 total_seen += segment_seen;
441 if !batch.is_empty() {
442 sorted_batches.push(batch);
443 }
444 }
445
446 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
447 Ok((results, total_seen))
448 }
449
450 #[cfg(feature = "sync")]
454 pub fn search_with_offset_and_count_sync(
455 &self,
456 query: &dyn crate::query::Query,
457 limit: usize,
458 offset: usize,
459 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
460 use rayon::prelude::*;
461
462 let fetch_limit = offset + limit;
463
464 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
465 .segments
466 .par_iter()
467 .map(|segment| {
468 let sid = segment.meta().id;
469 let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
470 segment.as_ref(),
471 query,
472 fetch_limit,
473 )?;
474 for r in &mut results {
475 r.segment_id = sid;
476 }
477 Ok((results, segment_seen))
478 })
479 .collect();
480
481 let mut total_seen: u32 = 0;
482 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
483 Vec::with_capacity(batches.len());
484 for result in batches {
485 let (batch, segment_seen) = result?;
486 total_seen += segment_seen;
487 if !batch.is_empty() {
488 sorted_batches.push(batch);
489 }
490 }
491
492 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
493 Ok((results, total_seen))
494 }
495
496 pub async fn search_and_rerank(
501 &self,
502 query: &dyn crate::query::Query,
503 l1_limit: usize,
504 final_limit: usize,
505 config: &crate::query::RerankerConfig,
506 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
507 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
508 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
509 Ok((reranked, total_seen))
510 }
511
512 pub async fn query(
514 &self,
515 query_str: &str,
516 limit: usize,
517 ) -> Result<crate::query::SearchResponse> {
518 self.query_offset(query_str, limit, 0).await
519 }
520
521 pub async fn query_offset(
523 &self,
524 query_str: &str,
525 limit: usize,
526 offset: usize,
527 ) -> Result<crate::query::SearchResponse> {
528 let parser = self.query_parser();
529 let query = parser
530 .parse(query_str)
531 .map_err(crate::error::Error::Query)?;
532
533 let (results, _total_seen) = self
534 .search_internal(query.as_ref(), limit, offset, false)
535 .await?;
536
537 let total_hits = results.len() as u32;
538 let hits: Vec<crate::query::SearchHit> = results
539 .into_iter()
540 .map(|result| crate::query::SearchHit {
541 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
542 score: result.score,
543 matched_fields: result.extract_ordinals(),
544 })
545 .collect();
546
547 Ok(crate::query::SearchResponse { hits, total_hits })
548 }
549
550 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
552 let query_routers = self.schema.query_routers();
553 if !query_routers.is_empty()
554 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
555 {
556 return crate::dsl::QueryLanguageParser::with_router(
557 Arc::clone(&self.schema),
558 self.default_fields.clone(),
559 Arc::clone(&self.tokenizers),
560 router,
561 );
562 }
563
564 crate::dsl::QueryLanguageParser::new(
565 Arc::clone(&self.schema),
566 self.default_fields.clone(),
567 Arc::clone(&self.tokenizers),
568 )
569 }
570
571 pub async fn get_document(
573 &self,
574 address: &crate::query::DocAddress,
575 ) -> Result<Option<crate::dsl::Document>> {
576 self.get_document_with_fields(address, None).await
577 }
578
579 pub async fn get_document_with_fields(
585 &self,
586 address: &crate::query::DocAddress,
587 fields: Option<&rustc_hash::FxHashSet<u32>>,
588 ) -> Result<Option<crate::dsl::Document>> {
589 let segment_id = address.segment_id_u128().ok_or_else(|| {
590 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
591 })?;
592
593 if let Some(&idx) = self.segment_map.get(&segment_id) {
594 return self.segments[idx]
595 .doc_with_fields(address.doc_id, fields)
596 .await;
597 }
598
599 Ok(None)
600 }
601}
602
603fn merge_segment_results(
608 sorted_batches: Vec<Vec<crate::query::SearchResult>>,
609 fetch_limit: usize,
610 offset: usize,
611) -> Vec<crate::query::SearchResult> {
612 use std::cmp::Ordering;
613
614 struct MergeEntry {
615 score: f32,
616 batch_idx: usize,
617 pos: usize,
618 }
619 impl PartialEq for MergeEntry {
620 fn eq(&self, other: &Self) -> bool {
621 self.score == other.score
622 }
623 }
624 impl Eq for MergeEntry {}
625 impl PartialOrd for MergeEntry {
626 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
627 Some(self.cmp(other))
628 }
629 }
630 impl Ord for MergeEntry {
631 fn cmp(&self, other: &Self) -> Ordering {
632 self.score
633 .partial_cmp(&other.score)
634 .unwrap_or(Ordering::Equal)
635 }
636 }
637
638 let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
639 for (i, batch) in sorted_batches.iter().enumerate() {
640 if !batch.is_empty() {
641 heap.push(MergeEntry {
642 score: batch[0].score,
643 batch_idx: i,
644 pos: 0,
645 });
646 }
647 }
648
649 let mut results = Vec::with_capacity(fetch_limit.min(64));
650 let mut emitted = 0usize;
651 while let Some(entry) = heap.pop() {
652 if emitted >= fetch_limit {
653 break;
654 }
655 let batch = &sorted_batches[entry.batch_idx];
656 if emitted >= offset {
657 results.push(batch[entry.pos].clone());
658 }
659 emitted += 1;
660 let next_pos = entry.pos + 1;
661 if next_pos < batch.len() {
662 heap.push(MergeEntry {
663 score: batch[next_pos].score,
664 batch_idx: entry.batch_idx,
665 pos: next_pos,
666 });
667 }
668 }
669
670 results
671}
672
673fn process_rss_mb() -> f64 {
675 #[cfg(target_os = "linux")]
676 {
677 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
679 for line in status.lines() {
680 if let Some(rest) = line.strip_prefix("VmRSS:") {
681 let kb: f64 = rest
682 .trim()
683 .trim_end_matches("kB")
684 .trim()
685 .parse()
686 .unwrap_or(0.0);
687 return kb / 1024.0;
688 }
689 }
690 }
691 0.0
692 }
693 #[cfg(target_os = "macos")]
694 {
695 use std::mem;
697 #[repr(C)]
698 struct TaskBasicInfo {
699 virtual_size: u64,
700 resident_size: u64,
701 resident_size_max: u64,
702 user_time: [u32; 2],
703 system_time: [u32; 2],
704 policy: i32,
705 suspend_count: i32,
706 }
707 unsafe extern "C" {
708 fn mach_task_self() -> u32;
709 fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
710 }
711 const MACH_TASK_BASIC_INFO: u32 = 20;
712 let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
713 let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
714 let ret = unsafe {
715 task_info(
716 mach_task_self(),
717 MACH_TASK_BASIC_INFO,
718 &mut info,
719 &mut count,
720 )
721 };
722 if ret == 0 {
723 info.resident_size as f64 / (1024.0 * 1024.0)
724 } else {
725 0.0
726 }
727 }
728 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
729 {
730 0.0
731 }
732}