1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78 &directory,
79 &schema,
80 snapshot.segment_ids(),
81 &trained_centroids,
82 term_cache_blocks,
83 &[],
84 )
85 .await?;
86
87 Ok(Self {
88 _snapshot: snapshot,
89 _phantom: std::marker::PhantomData,
90 segments,
91 schema,
92 default_fields,
93 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94 trained_centroids,
95 global_stats,
96 segment_map,
97 total_docs,
98 })
99 }
100
101 #[cfg(feature = "native")]
105 pub(crate) async fn from_snapshot_reuse(
106 directory: Arc<D>,
107 schema: Arc<Schema>,
108 snapshot: SegmentSnapshot,
109 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110 term_cache_blocks: usize,
111 existing_segments: &[Arc<SegmentReader>],
112 ) -> Result<Self> {
113 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
114 &directory,
115 &schema,
116 snapshot.segment_ids(),
117 &trained_centroids,
118 term_cache_blocks,
119 existing_segments,
120 )
121 .await?;
122
123 Ok(Self {
124 _snapshot: snapshot,
125 _phantom: std::marker::PhantomData,
126 segments,
127 schema,
128 default_fields,
129 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
130 trained_centroids,
131 global_stats,
132 segment_map,
133 total_docs,
134 })
135 }
136
137 async fn create(
139 directory: Arc<D>,
140 schema: Arc<Schema>,
141 segment_ids: &[String],
142 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143 term_cache_blocks: usize,
144 ) -> Result<Self> {
145 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
146 &directory,
147 &schema,
148 segment_ids,
149 &trained_centroids,
150 term_cache_blocks,
151 &[],
152 )
153 .await?;
154
155 #[cfg(feature = "native")]
156 let _snapshot = {
157 let tracker = Arc::new(SegmentTracker::new());
158 SegmentSnapshot::new(tracker, segment_ids.to_vec())
159 };
160
161 let _ = directory; Ok(Self {
163 #[cfg(feature = "native")]
164 _snapshot,
165 _phantom: std::marker::PhantomData,
166 segments,
167 schema,
168 default_fields,
169 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
170 trained_centroids,
171 global_stats,
172 segment_map,
173 total_docs,
174 })
175 }
176
177 async fn load_common(
179 directory: &Arc<D>,
180 schema: &Arc<Schema>,
181 segment_ids: &[String],
182 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
183 term_cache_blocks: usize,
184 existing_segments: &[Arc<SegmentReader>],
185 ) -> Result<(
186 Vec<Arc<SegmentReader>>,
187 Vec<crate::Field>,
188 Arc<LazyGlobalStats>,
189 FxHashMap<u128, usize>,
190 u32,
191 )> {
192 let segments = Self::load_segments(
193 directory,
194 schema,
195 segment_ids,
196 trained_centroids,
197 term_cache_blocks,
198 existing_segments,
199 )
200 .await?;
201 let default_fields = Self::build_default_fields(schema);
202 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
203 let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
204 Ok((
205 segments,
206 default_fields,
207 global_stats,
208 segment_map,
209 total_docs,
210 ))
211 }
212
213 async fn load_segments(
217 directory: &Arc<D>,
218 schema: &Arc<Schema>,
219 segment_ids: &[String],
220 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
221 term_cache_blocks: usize,
222 existing_segments: &[Arc<SegmentReader>],
223 ) -> Result<Vec<Arc<SegmentReader>>> {
224 let existing_map: FxHashMap<u128, Arc<SegmentReader>> = existing_segments
226 .iter()
227 .map(|seg| (seg.meta().id, Arc::clone(seg)))
228 .collect();
229
230 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
232 .iter()
233 .enumerate()
234 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
235 .collect();
236
237 let mut reused: Vec<(usize, Arc<SegmentReader>)> = Vec::new();
239 let mut to_load: Vec<(usize, SegmentId)> = Vec::new();
240 for (idx, sid) in &valid_segments {
241 if let Some(existing) = existing_map.get(&sid.0) {
242 reused.push((*idx, Arc::clone(existing)));
243 } else {
244 to_load.push((*idx, *sid));
245 }
246 }
247
248 if !existing_segments.is_empty() {
249 log::info!(
250 "[searcher] reusing {} segment readers, loading {} new",
251 reused.len(),
252 to_load.len(),
253 );
254 }
255
256 let futures: Vec<_> = to_load
258 .iter()
259 .map(|(_, segment_id)| {
260 let dir = Arc::clone(directory);
261 let sch = Arc::clone(schema);
262 let sid = *segment_id;
263 async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
264 })
265 .collect();
266
267 let results = futures::future::join_all(futures).await;
268
269 let mut loaded: Vec<(usize, Arc<SegmentReader>)> = Vec::with_capacity(valid_segments.len());
271
272 loaded.extend(reused);
274
275 for ((idx, sid), result) in to_load.into_iter().zip(results) {
277 match result {
278 Ok(mut reader) => {
279 if !trained_centroids.is_empty() {
281 reader.set_coarse_centroids(trained_centroids.clone());
282 }
283 loaded.push((idx, Arc::new(reader)));
284 }
285 Err(e) => {
286 return Err(crate::error::Error::Internal(format!(
287 "Failed to open segment {:016x}: {:?}",
288 sid.0, e
289 )));
290 }
291 }
292 }
293
294 loaded.sort_by_key(|(idx, _)| *idx);
296
297 let segments: Vec<Arc<SegmentReader>> = loaded.into_iter().map(|(_, seg)| seg).collect();
298
299 let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
301 let mut total_mem = 0usize;
302 for seg in &segments {
303 let stats = seg.memory_stats();
304 let seg_total = stats.total_bytes();
305 total_mem += seg_total;
306 log::info!(
307 "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
308 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
309 stats.segment_id,
310 stats.num_docs,
311 seg_total as f64 / (1024.0 * 1024.0),
312 stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
313 stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
314 stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
315 stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
316 stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
317 );
318 }
319 let rss_mb = process_rss_mb();
321 log::info!(
322 "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
323 segments.len(),
324 total_docs,
325 total_mem as f64 / (1024.0 * 1024.0),
326 rss_mb,
327 );
328
329 Ok(segments)
330 }
331
332 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
334 if !schema.default_fields().is_empty() {
335 schema.default_fields().to_vec()
336 } else {
337 schema
338 .fields()
339 .filter(|(_, entry)| {
340 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
341 })
342 .map(|(field, _)| field)
343 .collect()
344 }
345 }
346
347 pub fn schema(&self) -> &Schema {
349 &self.schema
350 }
351
352 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
354 &self.segments
355 }
356
357 pub fn default_fields(&self) -> &[crate::Field] {
359 &self.default_fields
360 }
361
362 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
364 &self.tokenizers
365 }
366
367 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
369 &self.trained_centroids
370 }
371
372 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
374 &self.global_stats
375 }
376
377 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
379 let mut segment_map = FxHashMap::default();
380 let mut total = 0u32;
381 for (i, seg) in segments.iter().enumerate() {
382 segment_map.insert(seg.meta().id, i);
383 total = total.saturating_add(seg.meta().num_docs);
384 }
385 (segment_map, total)
386 }
387
388 pub fn num_docs(&self) -> u32 {
390 self.total_docs
391 }
392
393 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
395 &self.segment_map
396 }
397
398 pub fn num_segments(&self) -> usize {
400 self.segments.len()
401 }
402
403 pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
405 if let Some(&idx) = self.segment_map.get(&segment_id) {
406 return self.segments[idx].doc(doc_id).await;
407 }
408 Ok(None)
409 }
410
411 pub async fn search(
413 &self,
414 query: &dyn crate::query::Query,
415 limit: usize,
416 ) -> Result<Vec<crate::query::SearchResult>> {
417 let (results, _) = self.search_with_count(query, limit).await?;
418 Ok(results)
419 }
420
421 pub async fn search_with_count(
424 &self,
425 query: &dyn crate::query::Query,
426 limit: usize,
427 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
428 self.search_with_offset_and_count(query, limit, 0).await
429 }
430
431 pub async fn search_with_offset(
433 &self,
434 query: &dyn crate::query::Query,
435 limit: usize,
436 offset: usize,
437 ) -> Result<Vec<crate::query::SearchResult>> {
438 let (results, _) = self
439 .search_with_offset_and_count(query, limit, offset)
440 .await?;
441 Ok(results)
442 }
443
444 pub async fn search_with_offset_and_count(
446 &self,
447 query: &dyn crate::query::Query,
448 limit: usize,
449 offset: usize,
450 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
451 self.search_internal(query, limit, offset, false).await
452 }
453
454 pub async fn search_with_positions(
458 &self,
459 query: &dyn crate::query::Query,
460 limit: usize,
461 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
462 self.search_internal(query, limit, 0, true).await
463 }
464
465 async fn search_internal(
467 &self,
468 query: &dyn crate::query::Query,
469 limit: usize,
470 offset: usize,
471 collect_positions: bool,
472 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
473 let fetch_limit = offset + limit;
474
475 #[cfg(feature = "sync")]
478 if self.segments.len() > 1
479 && tokio::runtime::Handle::current().runtime_flavor()
480 == tokio::runtime::RuntimeFlavor::MultiThread
481 {
482 return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
483 }
484
485 let futures: Vec<_> = self
487 .segments
488 .iter()
489 .map(|segment| {
490 let sid = segment.meta().id;
491 async move {
492 let (mut results, segment_seen) = if collect_positions {
493 crate::query::search_segment_with_positions_and_count(
494 segment.as_ref(),
495 query,
496 fetch_limit,
497 )
498 .await?
499 } else {
500 crate::query::search_segment_with_count(
501 segment.as_ref(),
502 query,
503 fetch_limit,
504 )
505 .await?
506 };
507 for r in &mut results {
509 r.segment_id = sid;
510 }
511 Ok::<_, crate::error::Error>((results, segment_seen))
512 }
513 })
514 .collect();
515
516 let batches = futures::future::try_join_all(futures).await?;
517 let mut total_seen: u32 = 0;
518
519 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
520 Vec::with_capacity(batches.len());
521 for (batch, segment_seen) in batches {
522 total_seen = total_seen.saturating_add(segment_seen);
523 if !batch.is_empty() {
524 sorted_batches.push(batch);
525 }
526 }
527
528 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
529 Ok((results, total_seen))
530 }
531
532 #[cfg(feature = "sync")]
537 fn search_internal_parallel(
538 &self,
539 query: &dyn crate::query::Query,
540 fetch_limit: usize,
541 offset: usize,
542 collect_positions: bool,
543 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
544 use rayon::prelude::*;
545
546 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
547 tokio::task::block_in_place(|| {
548 self.segments
549 .par_iter()
550 .map(|segment| {
551 let sid = segment.meta().id;
552 let (mut results, segment_seen) = if collect_positions {
553 crate::query::search_segment_with_positions_and_count_sync(
554 segment.as_ref(),
555 query,
556 fetch_limit,
557 )?
558 } else {
559 crate::query::search_segment_with_count_sync(
560 segment.as_ref(),
561 query,
562 fetch_limit,
563 )?
564 };
565 for r in &mut results {
566 r.segment_id = sid;
567 }
568 Ok((results, segment_seen))
569 })
570 .collect()
571 });
572
573 let mut total_seen: u32 = 0;
574 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
575 Vec::with_capacity(batches.len());
576 for result in batches {
577 let (batch, segment_seen) = result?;
578 total_seen = total_seen.saturating_add(segment_seen);
579 if !batch.is_empty() {
580 sorted_batches.push(batch);
581 }
582 }
583
584 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
585 Ok((results, total_seen))
586 }
587
588 #[cfg(feature = "sync")]
592 pub fn search_with_offset_and_count_sync(
593 &self,
594 query: &dyn crate::query::Query,
595 limit: usize,
596 offset: usize,
597 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
598 use rayon::prelude::*;
599
600 let fetch_limit = offset + limit;
601
602 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
603 .segments
604 .par_iter()
605 .map(|segment| {
606 let sid = segment.meta().id;
607 let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
608 segment.as_ref(),
609 query,
610 fetch_limit,
611 )?;
612 for r in &mut results {
613 r.segment_id = sid;
614 }
615 Ok((results, segment_seen))
616 })
617 .collect();
618
619 let mut total_seen: u32 = 0;
620 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
621 Vec::with_capacity(batches.len());
622 for result in batches {
623 let (batch, segment_seen) = result?;
624 total_seen = total_seen.saturating_add(segment_seen);
625 if !batch.is_empty() {
626 sorted_batches.push(batch);
627 }
628 }
629
630 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
631 Ok((results, total_seen))
632 }
633
634 pub async fn search_and_rerank(
639 &self,
640 query: &dyn crate::query::Query,
641 l1_limit: usize,
642 final_limit: usize,
643 config: &crate::query::RerankerConfig,
644 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
645 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
646 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
647 Ok((reranked, total_seen))
648 }
649
650 pub async fn query(
652 &self,
653 query_str: &str,
654 limit: usize,
655 ) -> Result<crate::query::SearchResponse> {
656 self.query_offset(query_str, limit, 0).await
657 }
658
659 pub async fn query_offset(
661 &self,
662 query_str: &str,
663 limit: usize,
664 offset: usize,
665 ) -> Result<crate::query::SearchResponse> {
666 let parser = self.query_parser();
667 let query = parser
668 .parse(query_str)
669 .map_err(crate::error::Error::Query)?;
670
671 let (results, _total_seen) = self
672 .search_internal(query.as_ref(), limit, offset, false)
673 .await?;
674
675 let total_hits = results.len() as u32;
676 let hits: Vec<crate::query::SearchHit> = results
677 .into_iter()
678 .map(|result| crate::query::SearchHit {
679 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
680 score: result.score,
681 matched_fields: result.extract_ordinals(),
682 })
683 .collect();
684
685 Ok(crate::query::SearchResponse { hits, total_hits })
686 }
687
688 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
690 let query_routers = self.schema.query_routers();
691 if !query_routers.is_empty()
692 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
693 {
694 return crate::dsl::QueryLanguageParser::with_router(
695 Arc::clone(&self.schema),
696 self.default_fields.clone(),
697 Arc::clone(&self.tokenizers),
698 router,
699 );
700 }
701
702 crate::dsl::QueryLanguageParser::new(
703 Arc::clone(&self.schema),
704 self.default_fields.clone(),
705 Arc::clone(&self.tokenizers),
706 )
707 }
708
709 pub async fn get_document(
711 &self,
712 address: &crate::query::DocAddress,
713 ) -> Result<Option<crate::dsl::Document>> {
714 self.get_document_with_fields(address, None).await
715 }
716
717 pub async fn get_document_with_fields(
723 &self,
724 address: &crate::query::DocAddress,
725 fields: Option<&rustc_hash::FxHashSet<u32>>,
726 ) -> Result<Option<crate::dsl::Document>> {
727 let segment_id = address.segment_id_u128().ok_or_else(|| {
728 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
729 })?;
730
731 if let Some(&idx) = self.segment_map.get(&segment_id) {
732 return self.segments[idx]
733 .doc_with_fields(address.doc_id, fields)
734 .await;
735 }
736
737 Ok(None)
738 }
739}
740
741fn merge_segment_results(
746 sorted_batches: Vec<Vec<crate::query::SearchResult>>,
747 fetch_limit: usize,
748 offset: usize,
749) -> Vec<crate::query::SearchResult> {
750 use std::cmp::Ordering;
751
752 struct MergeEntry {
753 score: f32,
754 batch_idx: usize,
755 pos: usize,
756 }
757 impl PartialEq for MergeEntry {
758 fn eq(&self, other: &Self) -> bool {
759 self.score == other.score
760 }
761 }
762 impl Eq for MergeEntry {}
763 impl PartialOrd for MergeEntry {
764 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
765 Some(self.cmp(other))
766 }
767 }
768 impl Ord for MergeEntry {
769 fn cmp(&self, other: &Self) -> Ordering {
770 self.score
771 .partial_cmp(&other.score)
772 .unwrap_or(Ordering::Equal)
773 }
774 }
775
776 let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
777 for (i, batch) in sorted_batches.iter().enumerate() {
778 if !batch.is_empty() {
779 heap.push(MergeEntry {
780 score: batch[0].score,
781 batch_idx: i,
782 pos: 0,
783 });
784 }
785 }
786
787 let mut results = Vec::with_capacity(fetch_limit.min(64));
788 let mut emitted = 0usize;
789 while let Some(entry) = heap.pop() {
790 if emitted >= fetch_limit {
791 break;
792 }
793 let batch = &sorted_batches[entry.batch_idx];
794 if emitted >= offset {
795 results.push(batch[entry.pos].clone());
796 }
797 emitted += 1;
798 let next_pos = entry.pos + 1;
799 if next_pos < batch.len() {
800 heap.push(MergeEntry {
801 score: batch[next_pos].score,
802 batch_idx: entry.batch_idx,
803 pos: next_pos,
804 });
805 }
806 }
807
808 results
809}
810
811fn process_rss_mb() -> f64 {
813 #[cfg(target_os = "linux")]
814 {
815 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
817 for line in status.lines() {
818 if let Some(rest) = line.strip_prefix("VmRSS:") {
819 let kb: f64 = rest
820 .trim()
821 .trim_end_matches("kB")
822 .trim()
823 .parse()
824 .unwrap_or(0.0);
825 return kb / 1024.0;
826 }
827 }
828 }
829 0.0
830 }
831 #[cfg(target_os = "macos")]
832 {
833 use std::mem;
835 #[repr(C)]
836 struct TaskBasicInfo {
837 virtual_size: u64,
838 resident_size: u64,
839 resident_size_max: u64,
840 user_time: [u32; 2],
841 system_time: [u32; 2],
842 policy: i32,
843 suspend_count: i32,
844 }
845 unsafe extern "C" {
846 fn mach_task_self() -> u32;
847 fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
848 }
849 const MACH_TASK_BASIC_INFO: u32 = 20;
850 let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
851 let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
852 let ret = unsafe {
853 task_info(
854 mach_task_self(),
855 MACH_TASK_BASIC_INFO,
856 &mut info,
857 &mut count,
858 )
859 };
860 if ret == 0 {
861 info.resident_size as f64 / (1024.0 * 1024.0)
862 } else {
863 0.0
864 }
865 }
866 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867 {
868 0.0
869 }
870}