1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78 &directory,
79 &schema,
80 snapshot.segment_ids(),
81 &trained_centroids,
82 term_cache_blocks,
83 )
84 .await?;
85
86 Ok(Self {
87 _snapshot: snapshot,
88 _phantom: std::marker::PhantomData,
89 segments,
90 schema,
91 default_fields,
92 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93 trained_centroids,
94 global_stats,
95 segment_map,
96 total_docs,
97 })
98 }
99
100 async fn create(
102 directory: Arc<D>,
103 schema: Arc<Schema>,
104 segment_ids: &[String],
105 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106 term_cache_blocks: usize,
107 ) -> Result<Self> {
108 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109 &directory,
110 &schema,
111 segment_ids,
112 &trained_centroids,
113 term_cache_blocks,
114 )
115 .await?;
116
117 #[cfg(feature = "native")]
118 let _snapshot = {
119 let tracker = Arc::new(SegmentTracker::new());
120 SegmentSnapshot::new(tracker, segment_ids.to_vec())
121 };
122
123 let _ = directory; Ok(Self {
125 #[cfg(feature = "native")]
126 _snapshot,
127 _phantom: std::marker::PhantomData,
128 segments,
129 schema,
130 default_fields,
131 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132 trained_centroids,
133 global_stats,
134 segment_map,
135 total_docs,
136 })
137 }
138
139 async fn load_common(
141 directory: &Arc<D>,
142 schema: &Arc<Schema>,
143 segment_ids: &[String],
144 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145 term_cache_blocks: usize,
146 ) -> Result<(
147 Vec<Arc<SegmentReader>>,
148 Vec<crate::Field>,
149 Arc<LazyGlobalStats>,
150 FxHashMap<u128, usize>,
151 u32,
152 )> {
153 let segments = Self::load_segments(
154 directory,
155 schema,
156 segment_ids,
157 trained_centroids,
158 term_cache_blocks,
159 )
160 .await?;
161 let default_fields = Self::build_default_fields(schema);
162 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163 let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164 Ok((
165 segments,
166 default_fields,
167 global_stats,
168 segment_map,
169 total_docs,
170 ))
171 }
172
173 async fn load_segments(
175 directory: &Arc<D>,
176 schema: &Arc<Schema>,
177 segment_ids: &[String],
178 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179 term_cache_blocks: usize,
180 ) -> Result<Vec<Arc<SegmentReader>>> {
181 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183 .iter()
184 .enumerate()
185 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186 .collect();
187
188 let futures: Vec<_> = valid_segments
190 .iter()
191 .map(|(_, segment_id)| {
192 let dir = Arc::clone(directory);
193 let sch = Arc::clone(schema);
194 let sid = *segment_id;
195 async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196 })
197 .collect();
198
199 let results = futures::future::join_all(futures).await;
200
201 let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203 for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204 match result {
205 Ok(reader) => loaded.push((idx, reader)),
206 Err(e) => {
207 return Err(crate::error::Error::Internal(format!(
208 "Failed to open segment {:016x}: {:?}",
209 sid.0, e
210 )));
211 }
212 }
213 }
214
215 loaded.sort_by_key(|(idx, _)| *idx);
217
218 let mut segments = Vec::with_capacity(loaded.len());
219 for (_, mut reader) in loaded {
220 if !trained_centroids.is_empty() {
222 reader.set_coarse_centroids(trained_centroids.clone());
223 }
224 segments.push(Arc::new(reader));
225 }
226
227 let total_docs: u64 = segments.iter().map(|s| s.meta().num_docs as u64).sum();
229 let mut total_mem = 0usize;
230 for seg in &segments {
231 let stats = seg.memory_stats();
232 let seg_total = stats.total_bytes();
233 total_mem += seg_total;
234 log::info!(
235 "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
236 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
237 stats.segment_id,
238 stats.num_docs,
239 seg_total as f64 / (1024.0 * 1024.0),
240 stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
241 stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
242 stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
243 stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
244 stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
245 );
246 }
247 let rss_mb = process_rss_mb();
249 log::info!(
250 "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
251 segments.len(),
252 total_docs,
253 total_mem as f64 / (1024.0 * 1024.0),
254 rss_mb,
255 );
256
257 Ok(segments)
258 }
259
260 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
262 if !schema.default_fields().is_empty() {
263 schema.default_fields().to_vec()
264 } else {
265 schema
266 .fields()
267 .filter(|(_, entry)| {
268 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
269 })
270 .map(|(field, _)| field)
271 .collect()
272 }
273 }
274
275 pub fn schema(&self) -> &Schema {
277 &self.schema
278 }
279
280 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
282 &self.segments
283 }
284
285 pub fn default_fields(&self) -> &[crate::Field] {
287 &self.default_fields
288 }
289
290 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
292 &self.tokenizers
293 }
294
295 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
297 &self.trained_centroids
298 }
299
300 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
302 &self.global_stats
303 }
304
305 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
307 let mut segment_map = FxHashMap::default();
308 let mut total = 0u32;
309 for (i, seg) in segments.iter().enumerate() {
310 segment_map.insert(seg.meta().id, i);
311 total = total.saturating_add(seg.meta().num_docs);
312 }
313 (segment_map, total)
314 }
315
316 pub fn num_docs(&self) -> u32 {
318 self.total_docs
319 }
320
321 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
323 &self.segment_map
324 }
325
326 pub fn num_segments(&self) -> usize {
328 self.segments.len()
329 }
330
331 pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
333 if let Some(&idx) = self.segment_map.get(&segment_id) {
334 return self.segments[idx].doc(doc_id).await;
335 }
336 Ok(None)
337 }
338
339 pub async fn search(
341 &self,
342 query: &dyn crate::query::Query,
343 limit: usize,
344 ) -> Result<Vec<crate::query::SearchResult>> {
345 let (results, _) = self.search_with_count(query, limit).await?;
346 Ok(results)
347 }
348
349 pub async fn search_with_count(
352 &self,
353 query: &dyn crate::query::Query,
354 limit: usize,
355 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
356 self.search_with_offset_and_count(query, limit, 0).await
357 }
358
359 pub async fn search_with_offset(
361 &self,
362 query: &dyn crate::query::Query,
363 limit: usize,
364 offset: usize,
365 ) -> Result<Vec<crate::query::SearchResult>> {
366 let (results, _) = self
367 .search_with_offset_and_count(query, limit, offset)
368 .await?;
369 Ok(results)
370 }
371
372 pub async fn search_with_offset_and_count(
374 &self,
375 query: &dyn crate::query::Query,
376 limit: usize,
377 offset: usize,
378 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
379 self.search_internal(query, limit, offset, false).await
380 }
381
382 pub async fn search_with_positions(
386 &self,
387 query: &dyn crate::query::Query,
388 limit: usize,
389 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
390 self.search_internal(query, limit, 0, true).await
391 }
392
393 async fn search_internal(
395 &self,
396 query: &dyn crate::query::Query,
397 limit: usize,
398 offset: usize,
399 collect_positions: bool,
400 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
401 let fetch_limit = offset + limit;
402
403 #[cfg(feature = "sync")]
406 if self.segments.len() > 1
407 && tokio::runtime::Handle::current().runtime_flavor()
408 == tokio::runtime::RuntimeFlavor::MultiThread
409 {
410 return self.search_internal_parallel(query, fetch_limit, offset, collect_positions);
411 }
412
413 let futures: Vec<_> = self
415 .segments
416 .iter()
417 .map(|segment| {
418 let sid = segment.meta().id;
419 async move {
420 let (mut results, segment_seen) = if collect_positions {
421 crate::query::search_segment_with_positions_and_count(
422 segment.as_ref(),
423 query,
424 fetch_limit,
425 )
426 .await?
427 } else {
428 crate::query::search_segment_with_count(
429 segment.as_ref(),
430 query,
431 fetch_limit,
432 )
433 .await?
434 };
435 for r in &mut results {
437 r.segment_id = sid;
438 }
439 Ok::<_, crate::error::Error>((results, segment_seen))
440 }
441 })
442 .collect();
443
444 let batches = futures::future::try_join_all(futures).await?;
445 let mut total_seen: u32 = 0;
446
447 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
448 Vec::with_capacity(batches.len());
449 for (batch, segment_seen) in batches {
450 total_seen = total_seen.saturating_add(segment_seen);
451 if !batch.is_empty() {
452 sorted_batches.push(batch);
453 }
454 }
455
456 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
457 Ok((results, total_seen))
458 }
459
460 #[cfg(feature = "sync")]
465 fn search_internal_parallel(
466 &self,
467 query: &dyn crate::query::Query,
468 fetch_limit: usize,
469 offset: usize,
470 collect_positions: bool,
471 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
472 use rayon::prelude::*;
473
474 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> =
475 tokio::task::block_in_place(|| {
476 self.segments
477 .par_iter()
478 .map(|segment| {
479 let sid = segment.meta().id;
480 let (mut results, segment_seen) = if collect_positions {
481 crate::query::search_segment_with_positions_and_count_sync(
482 segment.as_ref(),
483 query,
484 fetch_limit,
485 )?
486 } else {
487 crate::query::search_segment_with_count_sync(
488 segment.as_ref(),
489 query,
490 fetch_limit,
491 )?
492 };
493 for r in &mut results {
494 r.segment_id = sid;
495 }
496 Ok((results, segment_seen))
497 })
498 .collect()
499 });
500
501 let mut total_seen: u32 = 0;
502 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
503 Vec::with_capacity(batches.len());
504 for result in batches {
505 let (batch, segment_seen) = result?;
506 total_seen = total_seen.saturating_add(segment_seen);
507 if !batch.is_empty() {
508 sorted_batches.push(batch);
509 }
510 }
511
512 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
513 Ok((results, total_seen))
514 }
515
516 #[cfg(feature = "sync")]
520 pub fn search_with_offset_and_count_sync(
521 &self,
522 query: &dyn crate::query::Query,
523 limit: usize,
524 offset: usize,
525 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
526 use rayon::prelude::*;
527
528 let fetch_limit = offset + limit;
529
530 let batches: Vec<Result<(Vec<crate::query::SearchResult>, u32)>> = self
531 .segments
532 .par_iter()
533 .map(|segment| {
534 let sid = segment.meta().id;
535 let (mut results, segment_seen) = crate::query::search_segment_with_count_sync(
536 segment.as_ref(),
537 query,
538 fetch_limit,
539 )?;
540 for r in &mut results {
541 r.segment_id = sid;
542 }
543 Ok((results, segment_seen))
544 })
545 .collect();
546
547 let mut total_seen: u32 = 0;
548 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
549 Vec::with_capacity(batches.len());
550 for result in batches {
551 let (batch, segment_seen) = result?;
552 total_seen = total_seen.saturating_add(segment_seen);
553 if !batch.is_empty() {
554 sorted_batches.push(batch);
555 }
556 }
557
558 let results = merge_segment_results(sorted_batches, fetch_limit, offset);
559 Ok((results, total_seen))
560 }
561
562 pub async fn search_and_rerank(
567 &self,
568 query: &dyn crate::query::Query,
569 l1_limit: usize,
570 final_limit: usize,
571 config: &crate::query::RerankerConfig,
572 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
573 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
574 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
575 Ok((reranked, total_seen))
576 }
577
578 pub async fn query(
580 &self,
581 query_str: &str,
582 limit: usize,
583 ) -> Result<crate::query::SearchResponse> {
584 self.query_offset(query_str, limit, 0).await
585 }
586
587 pub async fn query_offset(
589 &self,
590 query_str: &str,
591 limit: usize,
592 offset: usize,
593 ) -> Result<crate::query::SearchResponse> {
594 let parser = self.query_parser();
595 let query = parser
596 .parse(query_str)
597 .map_err(crate::error::Error::Query)?;
598
599 let (results, _total_seen) = self
600 .search_internal(query.as_ref(), limit, offset, false)
601 .await?;
602
603 let total_hits = results.len() as u32;
604 let hits: Vec<crate::query::SearchHit> = results
605 .into_iter()
606 .map(|result| crate::query::SearchHit {
607 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
608 score: result.score,
609 matched_fields: result.extract_ordinals(),
610 })
611 .collect();
612
613 Ok(crate::query::SearchResponse { hits, total_hits })
614 }
615
616 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
618 let query_routers = self.schema.query_routers();
619 if !query_routers.is_empty()
620 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
621 {
622 return crate::dsl::QueryLanguageParser::with_router(
623 Arc::clone(&self.schema),
624 self.default_fields.clone(),
625 Arc::clone(&self.tokenizers),
626 router,
627 );
628 }
629
630 crate::dsl::QueryLanguageParser::new(
631 Arc::clone(&self.schema),
632 self.default_fields.clone(),
633 Arc::clone(&self.tokenizers),
634 )
635 }
636
637 pub async fn get_document(
639 &self,
640 address: &crate::query::DocAddress,
641 ) -> Result<Option<crate::dsl::Document>> {
642 self.get_document_with_fields(address, None).await
643 }
644
645 pub async fn get_document_with_fields(
651 &self,
652 address: &crate::query::DocAddress,
653 fields: Option<&rustc_hash::FxHashSet<u32>>,
654 ) -> Result<Option<crate::dsl::Document>> {
655 let segment_id = address.segment_id_u128().ok_or_else(|| {
656 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id()))
657 })?;
658
659 if let Some(&idx) = self.segment_map.get(&segment_id) {
660 return self.segments[idx]
661 .doc_with_fields(address.doc_id, fields)
662 .await;
663 }
664
665 Ok(None)
666 }
667}
668
669fn merge_segment_results(
674 sorted_batches: Vec<Vec<crate::query::SearchResult>>,
675 fetch_limit: usize,
676 offset: usize,
677) -> Vec<crate::query::SearchResult> {
678 use std::cmp::Ordering;
679
680 struct MergeEntry {
681 score: f32,
682 batch_idx: usize,
683 pos: usize,
684 }
685 impl PartialEq for MergeEntry {
686 fn eq(&self, other: &Self) -> bool {
687 self.score == other.score
688 }
689 }
690 impl Eq for MergeEntry {}
691 impl PartialOrd for MergeEntry {
692 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693 Some(self.cmp(other))
694 }
695 }
696 impl Ord for MergeEntry {
697 fn cmp(&self, other: &Self) -> Ordering {
698 self.score
699 .partial_cmp(&other.score)
700 .unwrap_or(Ordering::Equal)
701 }
702 }
703
704 let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
705 for (i, batch) in sorted_batches.iter().enumerate() {
706 if !batch.is_empty() {
707 heap.push(MergeEntry {
708 score: batch[0].score,
709 batch_idx: i,
710 pos: 0,
711 });
712 }
713 }
714
715 let mut results = Vec::with_capacity(fetch_limit.min(64));
716 let mut emitted = 0usize;
717 while let Some(entry) = heap.pop() {
718 if emitted >= fetch_limit {
719 break;
720 }
721 let batch = &sorted_batches[entry.batch_idx];
722 if emitted >= offset {
723 results.push(batch[entry.pos].clone());
724 }
725 emitted += 1;
726 let next_pos = entry.pos + 1;
727 if next_pos < batch.len() {
728 heap.push(MergeEntry {
729 score: batch[next_pos].score,
730 batch_idx: entry.batch_idx,
731 pos: next_pos,
732 });
733 }
734 }
735
736 results
737}
738
739fn process_rss_mb() -> f64 {
741 #[cfg(target_os = "linux")]
742 {
743 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
745 for line in status.lines() {
746 if let Some(rest) = line.strip_prefix("VmRSS:") {
747 let kb: f64 = rest
748 .trim()
749 .trim_end_matches("kB")
750 .trim()
751 .parse()
752 .unwrap_or(0.0);
753 return kb / 1024.0;
754 }
755 }
756 }
757 0.0
758 }
759 #[cfg(target_os = "macos")]
760 {
761 use std::mem;
763 #[repr(C)]
764 struct TaskBasicInfo {
765 virtual_size: u64,
766 resident_size: u64,
767 resident_size_max: u64,
768 user_time: [u32; 2],
769 system_time: [u32; 2],
770 policy: i32,
771 suspend_count: i32,
772 }
773 unsafe extern "C" {
774 fn mach_task_self() -> u32;
775 fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
776 }
777 const MACH_TASK_BASIC_INFO: u32 = 20;
778 let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
779 let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
780 let ret = unsafe {
781 task_info(
782 mach_task_self(),
783 MACH_TASK_BASIC_INFO,
784 &mut info,
785 &mut count,
786 )
787 };
788 if ret == 0 {
789 info.resident_size as f64 / (1024.0 * 1024.0)
790 } else {
791 0.0
792 }
793 }
794 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
795 {
796 0.0
797 }
798}