1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78 &directory,
79 &schema,
80 snapshot.segment_ids(),
81 &trained_centroids,
82 term_cache_blocks,
83 &[],
84 )
85 .await?;
86
87 Ok(Self {
88 _snapshot: snapshot,
89 _phantom: std::marker::PhantomData,
90 segments,
91 schema,
92 default_fields,
93 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
94 trained_centroids,
95 global_stats,
96 segment_map,
97 total_docs,
98 })
99 }
100
101 #[cfg(feature = "native")]
105 pub(crate) async fn from_snapshot_reuse(
106 directory: Arc<D>,
107 schema: Arc<Schema>,
108 snapshot: SegmentSnapshot,
109 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110 term_cache_blocks: usize,
111 existing_segments: &[Arc<SegmentReader>],
112 ) -> Result<Self> {
113 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
114 &directory,
115 &schema,
116 snapshot.segment_ids(),
117 &trained_centroids,
118 term_cache_blocks,
119 existing_segments,
120 )
121 .await?;
122
123 Ok(Self {
124 _snapshot: snapshot,
125 _phantom: std::marker::PhantomData,
126 segments,
127 schema,
128 default_fields,
129 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
130 trained_centroids,
131 global_stats,
132 segment_map,
133 total_docs,
134 })
135 }
136
137 async fn create(
139 directory: Arc<D>,
140 schema: Arc<Schema>,
141 segment_ids: &[String],
142 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
143 term_cache_blocks: usize,
144 ) -> Result<Self> {
145 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
146 &directory,
147 &schema,
148 segment_ids,
149 &trained_centroids,
150 term_cache_blocks,
151 &[],
152 )
153 .await?;
154
155 #[cfg(feature = "native")]
156 let _snapshot = {
157 let tracker = Arc::new(SegmentTracker::new());
158 SegmentSnapshot::new(tracker, segment_ids.to_vec())
159 };
160
161 let _ = directory; Ok(Self {
163 #[cfg(feature = "native")]
164 _snapshot,
165 _phantom: std::marker::PhantomData,
166 segments,
167 schema,
168 default_fields,
169 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
170 trained_centroids,
171 global_stats,
172 segment_map,
173 total_docs,
174 })
175 }
176
177 async fn load_common(
179 directory: &Arc<D>,
180 schema: &Arc<Schema>,
181 segment_ids: &[String],
182 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
183 term_cache_blocks: usize,
184 existing_segments: &[Arc<SegmentReader>],
185 ) -> Result<(
186 Vec<Arc<SegmentReader>>,
187 Vec<crate::Field>,
188 Arc<LazyGlobalStats>,
189 FxHashMap<u128, usize>,
190 u32,
191 )> {
192 let segments = Self::load_segments(
193 directory,
194 schema,
195 segment_ids,
196 trained_centroids,
197 term_cache_blocks,
198 existing_segments,
199 )
200 .await?;
201 let default_fields = Self::build_default_fields(schema);
202 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
203 let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
204 Ok((
205 segments,
206 default_fields,
207 global_stats,
208 segment_map,
209 total_docs,
210 ))
211 }
212
213 async fn load_segments(
217 directory: &Arc<D>,
218 schema: &Arc<Schema>,
219 segment_ids: &[String],
220 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
221 term_cache_blocks: usize,
222 existing_segments: &[Arc<SegmentReader>],
223 ) -> Result<Vec<Arc<SegmentReader>>> {
224 let existing_map: FxHashMap<u128, Arc<SegmentReader>> = existing_segments
226 .iter()
227 .map(|seg| (seg.meta().id, Arc::clone(seg)))
228 .collect();
229
230 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
232 .iter()
233 .enumerate()
234 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
235 .collect();
236
237 let mut reused: Vec<(usize, Arc<SegmentReader>)> = Vec::new();
239 let mut to_load: Vec<(usize, SegmentId)> = Vec::new();
240 for (idx, sid) in &valid_segments {
241 if let Some(existing) = existing_map.get(&sid.0) {
242 reused.push((*idx, Arc::clone(existing)));
243 } else {
244 to_load.push((*idx, *sid));
245 }
246 }
247
248 if !existing_segments.is_empty() {
249 log::info!(
250 "[searcher] reusing {} segment readers, loading {} new",
251 reused.len(),
252 to_load.len(),
253 );
254 }
255
256 let futures: Vec<_> = to_load
258 .iter()
259 .map(|(_, segment_id)| {
260 let dir = Arc::clone(directory);
261 let sch = Arc::clone(schema);
262 let sid = *segment_id;
263 async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
264 })
265 .collect();
266
267 let results = futures::future::join_all(futures).await;
268
269 let mut loaded: Vec<(usize, Arc<SegmentReader>)> = Vec::with_capacity(valid_segments.len());
271
272 loaded.extend(reused);
274
275 for ((idx, sid), result) in to_load.into_iter().zip(results) {
277 match result {
278 Ok(mut reader) => {
279 if !trained_centroids.is_empty() {
281 reader.set_coarse_centroids(trained_centroids.clone());
282 }
283 loaded.push((idx, Arc::new(reader)));
284 }
285 Err(e) => {
286 return Err(crate::error::Error::Internal(format!(
287 "Failed to open segment {:016x}: {:?}",
288 sid.0, e
289 )));
290 }
291 }
292 }
293
294 loaded.sort_by_key(|(idx, _)| *idx);
296
297 let segments: Vec<Arc<SegmentReader>> = loaded.into_iter().map(|(_, seg)| seg).collect();
298
299 let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
301 let mut total_mem = 0usize;
302 for seg in &segments {
303 let stats = seg.memory_stats();
304 let seg_total = stats.total_bytes();
305 total_mem += seg_total;
306 log::info!(
307 "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
308 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
309 stats.segment_id,
310 stats.num_docs,
311 seg_total as f64 / (1024.0 * 1024.0),
312 stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
313 stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
314 stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
315 stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
316 stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
317 );
318 }
319 let rss_mb = process_rss_mb();
321 log::info!(
322 "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
323 segments.len(),
324 total_docs,
325 total_mem as f64 / (1024.0 * 1024.0),
326 rss_mb,
327 );
328
329 Ok(segments)
330 }
331
332 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
334 if !schema.default_fields().is_empty() {
335 schema.default_fields().to_vec()
336 } else {
337 schema
338 .fields()
339 .filter(|(_, entry)| {
340 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
341 })
342 .map(|(field, _)| field)
343 .collect()
344 }
345 }
346
347 pub fn schema(&self) -> &Schema {
349 &self.schema
350 }
351
352 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
354 &self.segments
355 }
356
357 pub fn default_fields(&self) -> &[crate::Field] {
359 &self.default_fields
360 }
361
362 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
364 &self.tokenizers
365 }
366
367 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
369 &self.trained_centroids
370 }
371
372 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
374 &self.global_stats
375 }
376
377 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
379 let mut segment_map = FxHashMap::default();
380 let mut total = 0u32;
381 for (i, seg) in segments.iter().enumerate() {
382 segment_map.insert(seg.meta().id, i);
383 total = total.saturating_add(seg.meta().num_docs);
384 }
385 (segment_map, total)
386 }
387
388 pub fn num_docs(&self) -> u32 {
390 self.total_docs
391 }
392
393 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
395 &self.segment_map
396 }
397
398 pub fn num_segments(&self) -> usize {
400 self.segments.len()
401 }
402
403 pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
405 if let Some(&idx) = self.segment_map.get(&segment_id) {
406 return self.segments[idx].doc(doc_id).await;
407 }
408 Ok(None)
409 }
410
411 pub async fn search(
413 &self,
414 query: &dyn crate::query::Query,
415 limit: usize,
416 ) -> Result<Vec<crate::query::SearchResult>> {
417 let (results, _) = self.search_with_count(query, limit).await?;
418 Ok(results)
419 }
420
421 pub async fn search_with_count(
424 &self,
425 query: &dyn crate::query::Query,
426 limit: usize,
427 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
428 self.search_with_offset_and_count(query, limit, 0).await
429 }
430
431 pub async fn search_with_offset(
433 &self,
434 query: &dyn crate::query::Query,
435 limit: usize,
436 offset: usize,
437 ) -> Result<Vec<crate::query::SearchResult>> {
438 let (results, _) = self
439 .search_with_offset_and_count(query, limit, offset)
440 .await?;
441 Ok(results)
442 }
443
444 pub async fn search_with_offset_and_count(
446 &self,
447 query: &dyn crate::query::Query,
448 limit: usize,
449 offset: usize,
450 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
451 self.search_internal(query, limit, offset, false).await
452 }
453
454 pub async fn search_with_positions(
458 &self,
459 query: &dyn crate::query::Query,
460 limit: usize,
461 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
462 self.search_internal(query, limit, 0, true).await
463 }
464
465 async fn search_internal(
467 &self,
468 query: &dyn crate::query::Query,
469 limit: usize,
470 offset: usize,
471 collect_positions: bool,
472 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
473 let fetch_limit = offset + limit;
474
475 #[cfg(feature = "sync")]
481 if !self.segments.is_empty()
482 && tokio::runtime::Handle::current().runtime_flavor()
483 == tokio::runtime::RuntimeFlavor::MultiThread
484 {
485 return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
486 }
487
488 let futures: Vec<_> = self
490 .segments
491 .iter()
492 .map(|segment| {
493 let sid = segment.meta().id;
494 async move {
495 let (mut results, segment_seen) = if collect_positions {
496 crate::query::search_segment_with_positions_and_count(
497 segment.as_ref(),
498 query,
499 fetch_limit,
500 )
501 .await?
502 } else {
503 crate::query::search_segment_with_count(
504 segment.as_ref(),
505 query,
506 fetch_limit,
507 )
508 .await?
509 };
510 for r in &mut results {
512 r.segment_id = sid;
513 }
514 Ok::<_, crate::error::Error>((results, segment_seen))
515 }
516 })
517 .collect();
518
519 let batches = futures::future::try_join_all(futures).await?;
520 let mut total_seen: u32 = 0;
521
522 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
523 Vec::with_capacity(batches.len());
524 for (batch, segment_seen) in batches {
525 total_seen = total_seen.saturating_add(segment_seen);
526 if !batch.is_empty() {
527 sorted_batches.push(batch);
528 }
529 }
530
531 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
532 Ok((results, total_seen))
533 }
534
535 #[cfg(feature = "sync")]
540 fn search_internal_parallel(
541 &self,
542 query: &dyn crate::query::Query,
543 fetch_limit: usize,
544 offset: usize,
545 collect_positions: bool,
546 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
547 use rayon::prelude::*;
548
549 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
550 tokio::task::block_in_place(|| {
551 self.segments
552 .par_iter()
553 .map(|segment| {
554 let sid = segment.meta().id;
555 let (mut results, segment_seen) = if collect_positions {
556 crate::query::search_segment_with_positions_and_count_sync(
557 segment.as_ref(),
558 query,
559 fetch_limit,
560 )?
561 } else {
562 crate::query::search_segment_with_count_sync(
563 segment.as_ref(),
564 query,
565 fetch_limit,
566 )?
567 };
568 for r in &mut results {
569 r.segment_id = sid;
570 }
571 Ok((results, segment_seen))
572 })
573 .collect()
574 });
575
576 let mut total_seen: u32 = 0;
577 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
578 Vec::with_capacity(batches.len());
579 for result in batches {
580 let (batch, segment_seen) = result?;
581 total_seen = total_seen.saturating_add(segment_seen);
582 if !batch.is_empty() {
583 sorted_batches.push(batch);
584 }
585 }
586
587 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
588 Ok((results, total_seen))
589 }
590
591 #[cfg(feature = "sync")]
595 pub fn search_with_offset_and_count_sync(
596 &self,
597 query: &dyn crate::query::Query,
598 limit: usize,
599 offset: usize,
600 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
601 use rayon::prelude::*;
602
603 let fetch_limit = offset + limit;
604
605 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
606 .segments
607 .par_iter()
608 .map(|segment| {
609 let sid = segment.meta().id;
610 let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
611 segment.as_ref(),
612 query,
613 fetch_limit,
614 )?;
615 for r in &mut results {
616 r.segment_id = sid;
617 }
618 Ok((results, segment_seen))
619 })
620 .collect();
621
622 let mut total_seen: u32 = 0;
623 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
624 Vec::with_capacity(batches.len());
625 for result in batches {
626 let (batch, segment_seen) = result?;
627 total_seen = total_seen.saturating_add(segment_seen);
628 if !batch.is_empty() {
629 sorted_batches.push(batch);
630 }
631 }
632
633 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
634 Ok((results, total_seen))
635 }
636
637 pub async fn search_and_rerank(
642 &self,
643 query: &dyn crate::query::Query,
644 l1_limit: usize,
645 final_limit: usize,
646 config: &crate::query::RerankerConfig,
647 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
648 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
649 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
650 Ok((reranked, total_seen))
651 }
652
653 pub async fn query(
655 &self,
656 query_str: &str,
657 limit: usize,
658 ) -> Result<crate::query::SearchResponse> {
659 self.query_offset(query_str, limit, 0).await
660 }
661
662 pub async fn query_offset(
664 &self,
665 query_str: &str,
666 limit: usize,
667 offset: usize,
668 ) -> Result<crate::query::SearchResponse> {
669 let parser = self.query_parser();
670 let query = parser
671 .parse(query_str)
672 .map_err(crate::error::Error::Query)?;
673
674 let (results, _total_seen) = self
675 .search_internal(query.as_ref(), limit, offset, false)
676 .await?;
677
678 let total_hits = results.len() as u32;
679 let hits: Vec<crate::query::SearchHit> = results
680 .into_iter()
681 .map(|result| crate::query::SearchHit {
682 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
683 score: result.score,
684 matched_fields: result.extract_ordinals(),
685 })
686 .collect();
687
688 Ok(crate::query::SearchResponse { hits, total_hits })
689 }
690
691 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
693 let query_routers = self.schema.query_routers();
694 if !query_routers.is_empty()
695 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
696 {
697 return crate::dsl::QueryLanguageParser::with_router(
698 Arc::clone(&self.schema),
699 self.default_fields.clone(),
700 Arc::clone(&self.tokenizers),
701 router,
702 );
703 }
704
705 crate::dsl::QueryLanguageParser::new(
706 Arc::clone(&self.schema),
707 self.default_fields.clone(),
708 Arc::clone(&self.tokenizers),
709 )
710 }
711
712 pub async fn get_document(
714 &self,
715 address: &crate::query::DocAddress,
716 ) -> Result<Option<crate::dsl::Document>> {
717 self.get_document_with_fields(address, None).await
718 }
719
720 pub async fn get_document_with_fields(
726 &self,
727 address: &crate::query::DocAddress,
728 fields: Option<&rustc_hash::FxHashSet<u32>>,
729 ) -> Result<Option<crate::dsl::Document>> {
730 let segment_id = address.segment_id_u128().ok_or_else(|| {
731 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
732 })?;
733
734 if let Some(&idx) = self.segment_map.get(&segment_id) {
735 return self.segments[idx]
736 .doc_with_fields(address.doc_id, fields)
737 .await;
738 }
739
740 Ok(None)
741 }
742}
743
744fn merge_segment_results(
749 sorted_batches: Vec<Vec<crate::query::SearchResult>>,
750 fetch_limit: usize,
751 offset: usize,
752) -> Vec<crate::query::SearchResult> {
753 use std::cmp::Ordering;
754
755 struct MergeEntry {
756 score: f32,
757 batch_idx: usize,
758 pos: usize,
759 }
760 impl PartialEq for MergeEntry {
761 fn eq(&self, other: &Self) -> bool {
762 self.score == other.score
763 }
764 }
765 impl Eq for MergeEntry {}
766 impl PartialOrd for MergeEntry {
767 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
768 Some(self.cmp(other))
769 }
770 }
771 impl Ord for MergeEntry {
772 fn cmp(&self, other: &Self) -> Ordering {
773 self.score
774 .partial_cmp(&other.score)
775 .unwrap_or(Ordering::Equal)
776 }
777 }
778
779 let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
780 for (i, batch) in sorted_batches.iter().enumerate() {
781 if !batch.is_empty() {
782 heap.push(MergeEntry {
783 score: batch[0].score,
784 batch_idx: i,
785 pos: 0,
786 });
787 }
788 }
789
790 let mut results = Vec::with_capacity(fetch_limit.min(64));
791 let mut emitted = 0usize;
792 while let Some(entry) = heap.pop() {
793 if emitted >= fetch_limit {
794 break;
795 }
796 let batch = &sorted_batches[entry.batch_idx];
797 if emitted >= offset {
798 results.push(batch[entry.pos].clone());
799 }
800 emitted += 1;
801 let next_pos = entry.pos + 1;
802 if next_pos < batch.len() {
803 heap.push(MergeEntry {
804 score: batch[next_pos].score,
805 batch_idx: entry.batch_idx,
806 pos: next_pos,
807 });
808 }
809 }
810
811 results
812}
813
814fn process_rss_mb() -> f64 {
816 #[cfg(target_os = "linux")]
817 {
818 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
820 for line in status.lines() {
821 if let Some(rest) = line.strip_prefix("VmRSS:") {
822 let kb: f64 = rest
823 .trim()
824 .trim_end_matches("kB")
825 .trim()
826 .parse()
827 .unwrap_or(0.0);
828 return kb / 1024.0;
829 }
830 }
831 }
832 0.0
833 }
834 #[cfg(target_os = "macos")]
835 {
836 use std::mem;
838 #[repr(C)]
839 struct TaskBasicInfo {
840 virtual_size: u64,
841 resident_size: u64,
842 resident_size_max: u64,
843 user_time: [u32; 2],
844 system_time: [u32; 2],
845 policy: i32,
846 suspend_count: i32,
847 }
848 unsafe extern "C" {
849 fn mach_task_self() -> u32;
850 fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
851 }
852 const MACH_TASK_BASIC_INFO: u32 = 20;
853 let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
854 let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
855 let ret = unsafe {
856 task_info(
857 mach_task_self(),
858 MACH_TASK_BASIC_INFO,
859 &mut info,
860 &mut count,
861 )
862 };
863 if ret == 0 {
864 info.resident_size as f64 / (1024.0 * 1024.0)
865 } else {
866 0.0
867 }
868 }
869 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
870 {
871 0.0
872 }
873}