1use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use crate::directories::Directory;
11use crate::dsl::Schema;
12use crate::error::Result;
13use crate::query::LazyGlobalStats;
14use crate::segment::{SegmentId, SegmentReader};
15#[cfg(feature = "native")]
16use crate::segment::{SegmentSnapshot, SegmentTracker};
17use crate::structures::CoarseCentroids;
18
19pub struct Searcher<D: Directory + 'static> {
24 #[cfg(feature = "native")]
26 _snapshot: SegmentSnapshot,
27 _phantom: std::marker::PhantomData<D>,
29 segments: Vec<Arc<SegmentReader>>,
31 schema: Arc<Schema>,
33 default_fields: Vec<crate::Field>,
35 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
37 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
39 global_stats: Arc<LazyGlobalStats>,
41 segment_map: FxHashMap<u128, usize>,
43 total_docs: u32,
45}
46
47impl<D: Directory + 'static> Searcher<D> {
48 pub async fn open(
53 directory: Arc<D>,
54 schema: Arc<Schema>,
55 segment_ids: &[String],
56 term_cache_blocks: usize,
57 ) -> Result<Self> {
58 Self::create(
59 directory,
60 schema,
61 segment_ids,
62 FxHashMap::default(),
63 term_cache_blocks,
64 )
65 .await
66 }
67
68 #[cfg(feature = "native")]
70 pub(crate) async fn from_snapshot(
71 directory: Arc<D>,
72 schema: Arc<Schema>,
73 snapshot: SegmentSnapshot,
74 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
75 term_cache_blocks: usize,
76 ) -> Result<Self> {
77 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
78 &directory,
79 &schema,
80 snapshot.segment_ids(),
81 &trained_centroids,
82 term_cache_blocks,
83 )
84 .await;
85
86 Ok(Self {
87 _snapshot: snapshot,
88 _phantom: std::marker::PhantomData,
89 segments,
90 schema,
91 default_fields,
92 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
93 trained_centroids,
94 global_stats,
95 segment_map,
96 total_docs,
97 })
98 }
99
100 async fn create(
102 directory: Arc<D>,
103 schema: Arc<Schema>,
104 segment_ids: &[String],
105 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
106 term_cache_blocks: usize,
107 ) -> Result<Self> {
108 let (segments, default_fields, global_stats, segment_map, total_docs) = Self::load_common(
109 &directory,
110 &schema,
111 segment_ids,
112 &trained_centroids,
113 term_cache_blocks,
114 )
115 .await;
116
117 #[cfg(feature = "native")]
118 let _snapshot = {
119 let tracker = Arc::new(SegmentTracker::new());
120 SegmentSnapshot::new(tracker, segment_ids.to_vec())
121 };
122
123 let _ = directory; Ok(Self {
125 #[cfg(feature = "native")]
126 _snapshot,
127 _phantom: std::marker::PhantomData,
128 segments,
129 schema,
130 default_fields,
131 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
132 trained_centroids,
133 global_stats,
134 segment_map,
135 total_docs,
136 })
137 }
138
139 async fn load_common(
141 directory: &Arc<D>,
142 schema: &Arc<Schema>,
143 segment_ids: &[String],
144 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
145 term_cache_blocks: usize,
146 ) -> (
147 Vec<Arc<SegmentReader>>,
148 Vec<crate::Field>,
149 Arc<LazyGlobalStats>,
150 FxHashMap<u128, usize>,
151 u32,
152 ) {
153 let segments = Self::load_segments(
154 directory,
155 schema,
156 segment_ids,
157 trained_centroids,
158 term_cache_blocks,
159 )
160 .await;
161 let default_fields = Self::build_default_fields(schema);
162 let global_stats = Arc::new(LazyGlobalStats::new(segments.clone()));
163 let (segment_map, total_docs) = Self::build_lookup_tables(&segments);
164 (
165 segments,
166 default_fields,
167 global_stats,
168 segment_map,
169 total_docs,
170 )
171 }
172
173 async fn load_segments(
175 directory: &Arc<D>,
176 schema: &Arc<Schema>,
177 segment_ids: &[String],
178 trained_centroids: &FxHashMap<u32, Arc<CoarseCentroids>>,
179 term_cache_blocks: usize,
180 ) -> Vec<Arc<SegmentReader>> {
181 let valid_segments: Vec<(usize, SegmentId)> = segment_ids
183 .iter()
184 .enumerate()
185 .filter_map(|(idx, id_str)| SegmentId::from_hex(id_str).map(|sid| (idx, sid)))
186 .collect();
187
188 let futures: Vec<_> = valid_segments
190 .iter()
191 .map(|(_, segment_id)| {
192 let dir = Arc::clone(directory);
193 let sch = Arc::clone(schema);
194 let sid = *segment_id;
195 async move { SegmentReader::open(dir.as_ref(), sid, sch, term_cache_blocks).await }
196 })
197 .collect();
198
199 let results = futures::future::join_all(futures).await;
200
201 let mut loaded: Vec<(usize, SegmentReader)> = Vec::with_capacity(valid_segments.len());
203 for ((idx, sid), result) in valid_segments.into_iter().zip(results) {
204 match result {
205 Ok(reader) => loaded.push((idx, reader)),
206 Err(e) => {
207 panic!(
208 "Failed to open segment {:016x}: {:?}. \
209 Refusing to serve with incomplete data.",
210 sid.0, e
211 );
212 }
213 }
214 }
215
216 loaded.sort_by_key(|(idx, _)| *idx);
218
219 let mut segments = Vec::with_capacity(loaded.len());
220 for (_, mut reader) in loaded {
221 if !trained_centroids.is_empty() {
223 reader.set_coarse_centroids(trained_centroids.clone());
224 }
225 segments.push(Arc::new(reader));
226 }
227
228 let total_docs: u32 = segments.iter().map(|s| s.meta().num_docs).sum();
230 let mut total_mem = 0usize;
231 for seg in &segments {
232 let stats = seg.memory_stats();
233 let seg_total = stats.total_bytes();
234 total_mem += seg_total;
235 log::info!(
236 "[searcher] segment {:016x}: docs={}, mem={:.2} MB \
237 (term_dict={:.2} MB, store={:.2} MB, sparse={:.2} MB, dense={:.2} MB, bloom={:.2} MB)",
238 stats.segment_id,
239 stats.num_docs,
240 seg_total as f64 / (1024.0 * 1024.0),
241 stats.term_dict_cache_bytes as f64 / (1024.0 * 1024.0),
242 stats.store_cache_bytes as f64 / (1024.0 * 1024.0),
243 stats.sparse_index_bytes as f64 / (1024.0 * 1024.0),
244 stats.dense_index_bytes as f64 / (1024.0 * 1024.0),
245 stats.bloom_filter_bytes as f64 / (1024.0 * 1024.0),
246 );
247 }
248 let rss_mb = process_rss_mb();
250 log::info!(
251 "[searcher] loaded {} segments: total_docs={}, estimated_mem={:.2} MB, process_rss={:.1} MB",
252 segments.len(),
253 total_docs,
254 total_mem as f64 / (1024.0 * 1024.0),
255 rss_mb,
256 );
257
258 segments
259 }
260
261 fn build_default_fields(schema: &Schema) -> Vec<crate::Field> {
263 if !schema.default_fields().is_empty() {
264 schema.default_fields().to_vec()
265 } else {
266 schema
267 .fields()
268 .filter(|(_, entry)| {
269 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
270 })
271 .map(|(field, _)| field)
272 .collect()
273 }
274 }
275
276 pub fn schema(&self) -> &Schema {
278 &self.schema
279 }
280
281 pub fn segment_readers(&self) -> &[Arc<SegmentReader>] {
283 &self.segments
284 }
285
286 pub fn default_fields(&self) -> &[crate::Field] {
288 &self.default_fields
289 }
290
291 pub fn tokenizers(&self) -> &crate::tokenizer::TokenizerRegistry {
293 &self.tokenizers
294 }
295
296 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
298 &self.trained_centroids
299 }
300
301 pub fn global_stats(&self) -> &Arc<LazyGlobalStats> {
303 &self.global_stats
304 }
305
306 fn build_lookup_tables(segments: &[Arc<SegmentReader>]) -> (FxHashMap<u128, usize>, u32) {
308 let mut segment_map = FxHashMap::default();
309 let mut total = 0u32;
310 for (i, seg) in segments.iter().enumerate() {
311 segment_map.insert(seg.meta().id, i);
312 total = total.saturating_add(seg.meta().num_docs);
313 }
314 (segment_map, total)
315 }
316
317 pub fn num_docs(&self) -> u32 {
319 self.total_docs
320 }
321
322 pub fn segment_map(&self) -> &FxHashMap<u128, usize> {
324 &self.segment_map
325 }
326
327 pub fn num_segments(&self) -> usize {
329 self.segments.len()
330 }
331
332 pub async fn doc(&self, segment_id: u128, doc_id: u32) -> Result<Option<crate::dsl::Document>> {
334 if let Some(&idx) = self.segment_map.get(&segment_id) {
335 return self.segments[idx].doc(doc_id).await;
336 }
337 Ok(None)
338 }
339
340 pub async fn search(
342 &self,
343 query: &dyn crate::query::Query,
344 limit: usize,
345 ) -> Result<Vec<crate::query::SearchResult>> {
346 let (results, _) = self.search_with_count(query, limit).await?;
347 Ok(results)
348 }
349
350 pub async fn search_with_count(
353 &self,
354 query: &dyn crate::query::Query,
355 limit: usize,
356 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
357 self.search_with_offset_and_count(query, limit, 0).await
358 }
359
360 pub async fn search_with_offset(
362 &self,
363 query: &dyn crate::query::Query,
364 limit: usize,
365 offset: usize,
366 ) -> Result<Vec<crate::query::SearchResult>> {
367 let (results, _) = self
368 .search_with_offset_and_count(query, limit, offset)
369 .await?;
370 Ok(results)
371 }
372
373 pub async fn search_with_offset_and_count(
375 &self,
376 query: &dyn crate::query::Query,
377 limit: usize,
378 offset: usize,
379 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
380 self.search_internal(query, limit, offset, false).await
381 }
382
383 pub async fn search_with_positions(
387 &self,
388 query: &dyn crate::query::Query,
389 limit: usize,
390 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
391 self.search_internal(query, limit, 0, true).await
392 }
393
394 async fn search_internal(
396 &self,
397 query: &dyn crate::query::Query,
398 limit: usize,
399 offset: usize,
400 collect_positions: bool,
401 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
402 let fetch_limit = offset + limit;
403
404 let futures: Vec<_> = self
405 .segments
406 .iter()
407 .map(|segment| {
408 let sid = segment.meta().id;
409 async move {
410 let (mut results, segment_seen) = if collect_positions {
411 crate::query::search_segment_with_positions_and_count(
412 segment.as_ref(),
413 query,
414 fetch_limit,
415 )
416 .await?
417 } else {
418 crate::query::search_segment_with_count(
419 segment.as_ref(),
420 query,
421 fetch_limit,
422 )
423 .await?
424 };
425 for r in &mut results {
427 r.segment_id = sid;
428 }
429 Ok::<_, crate::error::Error>((results, segment_seen))
430 }
431 })
432 .collect();
433
434 let batches = futures::future::try_join_all(futures).await?;
435 let mut total_seen: u32 = 0;
436
437 use std::cmp::Ordering;
440 struct MergeEntry {
441 score: f32,
442 batch_idx: usize,
443 pos: usize,
444 }
445 impl PartialEq for MergeEntry {
446 fn eq(&self, other: &Self) -> bool {
447 self.score == other.score
448 }
449 }
450 impl Eq for MergeEntry {}
451 impl PartialOrd for MergeEntry {
452 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
453 Some(self.cmp(other))
454 }
455 }
456 impl Ord for MergeEntry {
457 fn cmp(&self, other: &Self) -> Ordering {
458 self.score
459 .partial_cmp(&other.score)
460 .unwrap_or(Ordering::Equal)
461 }
462 }
463
464 let mut sorted_batches: Vec<Vec<crate::query::SearchResult>> =
465 Vec::with_capacity(batches.len());
466 for (batch, segment_seen) in batches {
467 total_seen += segment_seen;
468 if !batch.is_empty() {
469 sorted_batches.push(batch);
470 }
471 }
472
473 let mut heap = std::collections::BinaryHeap::with_capacity(sorted_batches.len());
474 for (i, batch) in sorted_batches.iter().enumerate() {
475 heap.push(MergeEntry {
476 score: batch[0].score,
477 batch_idx: i,
478 pos: 0,
479 });
480 }
481
482 let mut results = Vec::with_capacity(fetch_limit.min(total_seen as usize));
483 let mut emitted = 0usize;
484 while let Some(entry) = heap.pop() {
485 if emitted >= fetch_limit {
486 break;
487 }
488 let batch = &sorted_batches[entry.batch_idx];
489 if emitted >= offset {
490 results.push(batch[entry.pos].clone());
491 }
492 emitted += 1;
493 let next_pos = entry.pos + 1;
494 if next_pos < batch.len() {
495 heap.push(MergeEntry {
496 score: batch[next_pos].score,
497 batch_idx: entry.batch_idx,
498 pos: next_pos,
499 });
500 }
501 }
502
503 Ok((results, total_seen))
504 }
505
506 pub async fn search_and_rerank(
511 &self,
512 query: &dyn crate::query::Query,
513 l1_limit: usize,
514 final_limit: usize,
515 config: &crate::query::RerankerConfig,
516 ) -> Result<(Vec<crate::query::SearchResult>, u32)> {
517 let (candidates, total_seen) = self.search_with_count(query, l1_limit).await?;
518 let reranked = crate::query::rerank(self, &candidates, config, final_limit).await?;
519 Ok((reranked, total_seen))
520 }
521
522 pub async fn query(
524 &self,
525 query_str: &str,
526 limit: usize,
527 ) -> Result<crate::query::SearchResponse> {
528 self.query_offset(query_str, limit, 0).await
529 }
530
531 pub async fn query_offset(
533 &self,
534 query_str: &str,
535 limit: usize,
536 offset: usize,
537 ) -> Result<crate::query::SearchResponse> {
538 let parser = self.query_parser();
539 let query = parser
540 .parse(query_str)
541 .map_err(crate::error::Error::Query)?;
542
543 let (results, _total_seen) = self
544 .search_internal(query.as_ref(), limit, offset, false)
545 .await?;
546
547 let total_hits = results.len() as u32;
548 let hits: Vec<crate::query::SearchHit> = results
549 .into_iter()
550 .map(|result| crate::query::SearchHit {
551 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
552 score: result.score,
553 matched_fields: result.extract_ordinals(),
554 })
555 .collect();
556
557 Ok(crate::query::SearchResponse { hits, total_hits })
558 }
559
560 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
562 let query_routers = self.schema.query_routers();
563 if !query_routers.is_empty()
564 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
565 {
566 return crate::dsl::QueryLanguageParser::with_router(
567 Arc::clone(&self.schema),
568 self.default_fields.clone(),
569 Arc::clone(&self.tokenizers),
570 router,
571 );
572 }
573
574 crate::dsl::QueryLanguageParser::new(
575 Arc::clone(&self.schema),
576 self.default_fields.clone(),
577 Arc::clone(&self.tokenizers),
578 )
579 }
580
581 pub async fn get_document(
583 &self,
584 address: &crate::query::DocAddress,
585 ) -> Result<Option<crate::dsl::Document>> {
586 self.get_document_with_fields(address, None).await
587 }
588
589 pub async fn get_document_with_fields(
595 &self,
596 address: &crate::query::DocAddress,
597 fields: Option<&rustc_hash::FxHashSet<u32>>,
598 ) -> Result<Option<crate::dsl::Document>> {
599 let segment_id = address.segment_id_u128().ok_or_else(|| {
600 crate::error::Error::Query(format!("Invalid segment ID: {}", address.segment_id))
601 })?;
602
603 if let Some(&idx) = self.segment_map.get(&segment_id) {
604 return self.segments[idx]
605 .doc_with_fields(address.doc_id, fields)
606 .await;
607 }
608
609 Ok(None)
610 }
611}
612
613fn process_rss_mb() -> f64 {
615 #[cfg(target_os = "linux")]
616 {
617 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
619 for line in status.lines() {
620 if let Some(rest) = line.strip_prefix("VmRSS:") {
621 let kb: f64 = rest
622 .trim()
623 .trim_end_matches("kB")
624 .trim()
625 .parse()
626 .unwrap_or(0.0);
627 return kb / 1024.0;
628 }
629 }
630 }
631 0.0
632 }
633 #[cfg(target_os = "macos")]
634 {
635 use std::mem;
637 #[repr(C)]
638 struct TaskBasicInfo {
639 virtual_size: u64,
640 resident_size: u64,
641 resident_size_max: u64,
642 user_time: [u32; 2],
643 system_time: [u32; 2],
644 policy: i32,
645 suspend_count: i32,
646 }
647 unsafe extern "C" {
648 fn mach_task_self() -> u32;
649 fn task_info(task: u32, flavor: u32, info: *mut TaskBasicInfo, count: *mut u32) -> i32;
650 }
651 const MACH_TASK_BASIC_INFO: u32 = 20;
652 let mut info: TaskBasicInfo = unsafe { mem::zeroed() };
653 let mut count = (mem::size_of::<TaskBasicInfo>() / mem::size_of::<u32>()) as u32;
654 let ret = unsafe {
655 task_info(
656 mach_task_self(),
657 MACH_TASK_BASIC_INFO,
658 &mut info,
659 &mut count,
660 )
661 };
662 if ret == 0 {
663 info.resident_size as f64 / (1024.0 * 1024.0)
664 } else {
665 0.0
666 }
667 }
668 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
669 {
670 0.0
671 }
672}