1use crate::decompress::maybe_decompress;
8use crate::error::Result;
9use crate::format::is_binary;
10use crate::neg_cache::NegCache;
11use crate::planner::QueryPlan;
12use crate::posting_cache::PostingCache;
13use crate::reader::{FileInfo, Reader};
14use crate::regex_pool::RegexPool;
15use crate::trigram::Trigram;
16use rayon::prelude::*;
17use regex::Regex;
18use std::collections::HashSet;
19use std::fs::File;
20use std::io::{BufRead, BufReader, Cursor, Read};
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
24
25#[derive(Debug)]
27pub struct Match {
28 pub file_path: PathBuf,
30 pub line_number: u32,
32 pub col: u32,
34 pub line_content: String,
36 pub byte_offset: u64,
38 pub context_before: Vec<String>,
40 pub context_after: Vec<String>,
42 pub is_binary: bool,
44}
45
46#[derive(Default, Debug)]
48pub struct QueryStats {
49 pub trigrams_queried: u32,
51 pub posting_lists_decoded: u32,
53 pub candidate_files: u32,
55 pub files_verified: u32,
57 pub bytes_verified: u64,
59 pub total_matches: u32,
61 pub posting_cache_hits: u64,
63 pub posting_cache_misses: u64,
65 pub neg_cache_hits: u64,
67 pub neg_cache_misses: u64,
69}
70
71struct QueryStatsAccum {
73 files_verified: AtomicU32,
74 bytes_verified: AtomicU64,
75 matches_found: AtomicU32,
76 neg_cache_hits: AtomicU64,
77 neg_cache_misses: AtomicU64,
78}
79
80impl QueryStatsAccum {
81 fn new() -> Self {
82 Self {
83 files_verified: AtomicU32::new(0),
84 bytes_verified: AtomicU64::new(0),
85 matches_found: AtomicU32::new(0),
86 neg_cache_hits: AtomicU64::new(0),
87 neg_cache_misses: AtomicU64::new(0),
88 }
89 }
90
91 fn into_stats(self, candidate_files: u32, total_matches: u32, stats: &mut QueryStats) {
92 stats.files_verified = self.files_verified.into_inner();
93 stats.bytes_verified = self.bytes_verified.into_inner();
94 stats.neg_cache_hits += self.neg_cache_hits.into_inner();
95 stats.neg_cache_misses += self.neg_cache_misses.into_inner();
96 stats.candidate_files = candidate_files;
97 stats.total_matches = total_matches;
98 }
99}
100
101#[derive(Debug, Default, Clone)]
103#[allow(clippy::struct_excessive_bools)]
104pub struct QueryOptions {
105 pub count_only: bool,
107 pub files_only: bool,
109 pub max_results: usize,
111 pub type_filter: Vec<String>,
113 pub context_lines: usize,
115 pub decompress: bool,
117 pub threads: usize,
119 pub multiline: bool,
121 pub archive: bool,
123 pub binary: bool,
125 pub word_boundary: bool,
127}
128
129pub struct Executor<'a> {
137 index: &'a Reader,
138 posting_cache: Arc<PostingCache>,
139 neg_cache: Arc<NegCache>,
140 regex_pool: Arc<RegexPool>,
141 neg_query_fingerprint: u64,
142}
143
144impl<'a> Executor<'a> {
145 #[must_use]
147 pub fn new(index: &'a Reader) -> Self {
148 Self {
149 index,
150 posting_cache: Arc::new(PostingCache::default()),
151 neg_cache: Arc::new(NegCache::new(65_536)),
152 regex_pool: Arc::new(RegexPool::new(256)),
153 neg_query_fingerprint: 0,
154 }
155 }
156
157 #[must_use]
162 pub fn new_with_caches(
163 index: &'a Reader,
164 posting_cache: Arc<PostingCache>,
165 neg_cache: Arc<NegCache>,
166 regex_pool: Arc<RegexPool>,
167 ) -> Self {
168 Self {
169 index,
170 posting_cache,
171 neg_cache,
172 regex_pool,
173 neg_query_fingerprint: 0,
174 }
175 }
176
177 #[must_use]
179 pub fn posting_cache(&self) -> &PostingCache {
180 &self.posting_cache
181 }
182
183 #[must_use]
185 pub fn neg_cache(&self) -> &NegCache {
186 &self.neg_cache
187 }
188
189 #[must_use]
191 pub fn regex_pool(&self) -> &RegexPool {
192 &self.regex_pool
193 }
194
195 pub fn execute(
202 &mut self,
203 plan: &QueryPlan,
204 options: &QueryOptions,
205 ) -> Result<(Vec<Match>, QueryStats)> {
206 self.neg_query_fingerprint = crate::neg_cache::query_fingerprint(plan.pattern_str());
207 match plan {
208 QueryPlan::Literal {
209 pattern,
210 trigrams,
211 regex,
212 } => self.execute_literal(pattern, trigrams, regex, options),
213 QueryPlan::RegexWithLiterals {
214 regex,
215 required_trigram_sets,
216 } => self.execute_regex_indexed(regex, required_trigram_sets, options),
217 QueryPlan::CaseInsensitive {
218 regex,
219 trigram_groups,
220 } => Ok(self.execute_case_insensitive(regex, trigram_groups, options)),
221 QueryPlan::FullScan { regex } => Ok(self.execute_full_scan(regex, options)),
222 }
223 }
224
225 fn decode_postings_cached(
228 &self,
229 tri: Trigram,
230 info: &crate::reader::TrigramInfo,
231 stats: &mut QueryStats,
232 ) -> Result<crate::posting::PostingList> {
233 if let Some(cached) = self.posting_cache.get(tri) {
234 stats.posting_cache_hits += 1;
235 return Ok(cached);
236 }
237 stats.posting_cache_misses += 1;
238 let list = self.index.decode_postings(info)?;
239 self.posting_cache.insert(tri, list.clone());
240 stats.posting_lists_decoded += 1;
241 Ok(list)
242 }
243
244 #[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] fn execute_literal(
247 &self,
248 _pattern: &[u8],
249 trigrams: &[Trigram],
250 regex: &Regex,
251 options: &QueryOptions,
252 ) -> Result<(Vec<Match>, QueryStats)> {
253 let mut stats = QueryStats::default();
254
255 let mut infos = Vec::new();
256 for &tri in trigrams {
257 stats.trigrams_queried += 1;
258 if let Some(info) = self.index.get_trigram(tri) {
259 infos.push((tri, info));
260 } else {
261 return Ok((vec![], stats));
262 }
263 }
264
265 infos.sort_by_key(|(_, info)| info.doc_frequency);
267
268 tracing::debug!(
269 "literal search: {} trigrams, rarities: {:?}",
270 infos.len(),
271 infos
272 .iter()
273 .map(|(t, i)| (format!("0x{t:06x}"), i.doc_frequency))
274 .collect::<Vec<_>>()
275 );
276
277 let (rarest_tri, rarest_info) = &infos[0];
279 let postings = self.decode_postings_cached(*rarest_tri, rarest_info, &mut stats)?;
280
281 let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
282 tracing::debug!("step 1 (rarest): {} candidates", candidates.len());
283
284 for (tri, info) in infos.iter().take(infos.len().min(3)).skip(1) {
287 if candidates.len() < 100 {
288 tracing::debug!(
289 "step 2: breaking early, {} candidates < 100",
290 candidates.len()
291 );
292 break;
293 }
294
295 let next_postings = self.decode_postings_cached(*tri, info, &mut stats)?;
296
297 let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
298 candidates.retain(|fid| next_set.contains(fid));
299 }
300
301 for &(tri, _) in &infos[1..] {
303 if candidates.is_empty() {
304 break;
305 }
306 candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
307 }
308
309 stats.candidate_files = candidates.len() as u32;
310
311 let accum = QueryStatsAccum::new();
312 let neg_fp = self.neg_query_fingerprint;
313
314 let candidate_list: Vec<u32> = candidates.into_iter().collect();
315
316 let mut all_matches: Vec<Match> = candidate_list
317 .into_par_iter()
318 .filter_map(|fid| {
319 let should_early_terminate = options.max_results > 0
320 && !options.files_only
321 && accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
322 if should_early_terminate {
323 return None;
324 }
325
326 let file_info = self.index.get_file(fid).ok()?;
327
328 if !options.type_filter.is_empty() {
329 let ext = file_info
330 .path
331 .extension()
332 .and_then(|e: &std::ffi::OsStr| e.to_str())
333 .unwrap_or("");
334 if !options.type_filter.iter().any(|e: &String| e == ext) {
335 return None;
336 }
337 }
338
339 accum.files_verified.fetch_add(1, Ordering::Relaxed);
340 accum
341 .bytes_verified
342 .fetch_add(file_info.size_bytes, Ordering::Relaxed);
343
344 let file_matches =
345 self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
346 accum
347 .matches_found
348 .fetch_add(file_matches.len() as u32, Ordering::Relaxed);
349 Some(file_matches)
350 })
351 .flatten()
352 .collect();
353
354 accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
355
356 if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
357 {
358 all_matches.truncate(options.max_results);
359 }
360
361 stats.total_matches = all_matches.len() as u32;
362
363 Ok((all_matches, stats))
364 }
365
366 #[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] fn execute_regex_indexed(
369 &self,
370 regex: &Regex,
371 required_trigram_sets: &[Vec<Trigram>],
372 options: &QueryOptions,
373 ) -> Result<(Vec<Match>, QueryStats)> {
374 let mut stats = QueryStats::default();
375
376 let mut fragment_candidates = Vec::new();
378 for trigram_set in required_trigram_sets {
379 let mut infos = Vec::new();
380 for &tri in trigram_set {
381 stats.trigrams_queried += 1;
382 if let Some(info) = self.index.get_trigram(tri) {
383 infos.push((tri, info));
384 } else {
385 return Ok((vec![], stats));
386 }
387 }
388
389 infos.sort_by_key(|(_, info)| info.doc_frequency);
390
391 let (rarest_tri, rarest_info) = &infos[0];
393 let postings = self.decode_postings_cached(*rarest_tri, rarest_info, &mut stats)?;
394 let mut set_candidates: HashSet<u32> =
395 postings.entries.iter().map(|e| e.file_id).collect();
396
397 for (tri, info) in infos.iter().take(infos.len().min(3)).skip(1) {
399 if set_candidates.len() < 100 {
400 break;
401 }
402 let next_postings = self.decode_postings_cached(*tri, info, &mut stats)?;
403 let next_set: HashSet<u32> =
404 next_postings.entries.iter().map(|e| e.file_id).collect();
405 set_candidates.retain(|fid| next_set.contains(fid));
406 }
407
408 for &(tri, _) in &infos[1..] {
409 set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
410 }
411 fragment_candidates.push(set_candidates);
412 }
413
414 let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
416 Some(c) => c,
417 None => return Ok((vec![], stats)),
418 };
419 for set in fragment_candidates {
420 final_candidates.retain(|fid: &u32| set.contains(fid));
421 }
422
423 stats.candidate_files = final_candidates.len() as u32;
424
425 let accum = QueryStatsAccum::new();
426 let neg_fp = self.neg_query_fingerprint;
427
428 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
429
430 let mut all_matches: Vec<Match> = candidate_list
431 .into_par_iter()
432 .filter_map(|fid| {
433 let should_early_terminate = options.max_results > 0
434 && !options.files_only
435 && accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
436 if should_early_terminate {
437 return None;
438 }
439
440 let file_info = self.index.get_file(fid).ok()?;
441
442 if !options.type_filter.is_empty() {
443 let ext = file_info
444 .path
445 .extension()
446 .and_then(|e: &std::ffi::OsStr| e.to_str())
447 .unwrap_or("");
448 if !options.type_filter.iter().any(|e: &String| e == ext) {
449 return None;
450 }
451 }
452
453 accum.files_verified.fetch_add(1, Ordering::Relaxed);
454 accum
455 .bytes_verified
456 .fetch_add(file_info.size_bytes, Ordering::Relaxed);
457
458 let file_matches =
459 self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
460 accum
461 .matches_found
462 .fetch_add(file_matches.len() as u32, Ordering::Relaxed);
463 Some(file_matches)
464 })
465 .flatten()
466 .collect();
467
468 accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
469
470 if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
471 {
472 all_matches.truncate(options.max_results);
473 }
474
475 stats.total_matches = all_matches.len() as u32;
476 Ok((all_matches, stats))
477 }
478
479 #[allow(clippy::as_conversions)] fn execute_case_insensitive(
481 &self,
482 regex: &Regex,
483 trigram_groups: &[Vec<Trigram>],
484 options: &QueryOptions,
485 ) -> (Vec<Match>, QueryStats) {
486 let mut stats = QueryStats::default();
487
488 let mut group_candidates = Vec::new();
490 for group in trigram_groups {
491 let mut union_set: HashSet<u32> = HashSet::new();
492 for &tri in group {
493 stats.trigrams_queried += 1;
494 if let Some(info) = self.index.get_trigram(tri)
495 && let Ok(postings) = self.decode_postings_cached(tri, &info, &mut stats)
496 {
497 for entry in &postings.entries {
498 union_set.insert(entry.file_id);
499 }
500 }
501 }
503 if !union_set.is_empty() {
504 group_candidates.push(union_set);
505 }
506 }
507
508 let final_candidates = if let Some(mut base) = group_candidates.pop() {
510 for set in group_candidates {
511 base.retain(|fid| set.contains(fid));
512 }
513 base
514 } else {
515 let all: HashSet<u32> = (0..self.index.header.file_count).collect();
517 all
518 };
519
520 stats.candidate_files = final_candidates.len() as u32;
521
522 let accum = QueryStatsAccum::new();
523 let neg_fp = self.neg_query_fingerprint;
524
525 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
526
527 let mut all_matches: Vec<Match> = candidate_list
528 .into_par_iter()
529 .filter_map(|fid| {
530 let should_early_terminate = options.max_results > 0
531 && !options.files_only
532 && accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
533 if should_early_terminate {
534 return None;
535 }
536
537 let file_info = self.index.get_file(fid).ok()?;
538
539 if !options.type_filter.is_empty() {
540 let ext = file_info
541 .path
542 .extension()
543 .and_then(|e| e.to_str())
544 .unwrap_or("");
545 if !options.type_filter.iter().any(|e| e == ext) {
546 return None;
547 }
548 }
549
550 accum.files_verified.fetch_add(1, Ordering::Relaxed);
551 accum
552 .bytes_verified
553 .fetch_add(file_info.size_bytes, Ordering::Relaxed);
554
555 let file_matches =
556 self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
557 accum
558 .matches_found
559 .fetch_add(file_matches.len() as u32, Ordering::Relaxed);
560 Some(file_matches)
561 })
562 .flatten()
563 .collect();
564
565 accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
566
567 if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
568 {
569 all_matches.truncate(options.max_results);
570 }
571
572 stats.total_matches = all_matches.len() as u32;
573 (all_matches, stats)
574 }
575
576 #[allow(clippy::as_conversions)] fn execute_full_scan(&self, regex: &Regex, options: &QueryOptions) -> (Vec<Match>, QueryStats) {
578 let stats_candidate_files = self.index.header.file_count;
579
580 let accum = QueryStatsAccum::new();
581 let neg_fp = self.neg_query_fingerprint;
582
583 let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
584 .into_par_iter()
585 .filter_map(|fid| {
586 let should_early_terminate = options.max_results > 0
587 && !options.files_only
588 && accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
589 if should_early_terminate {
590 return None;
591 }
592
593 let file_info = self.index.get_file(fid).ok()?;
594
595 if !options.type_filter.is_empty() {
596 let ext = file_info
597 .path
598 .extension()
599 .and_then(|e: &std::ffi::OsStr| e.to_str())
600 .unwrap_or("");
601 if !options.type_filter.iter().any(|e: &String| e == ext) {
602 return None;
603 }
604 }
605
606 accum.files_verified.fetch_add(1, Ordering::Relaxed);
607 accum
608 .bytes_verified
609 .fetch_add(file_info.size_bytes, Ordering::Relaxed);
610
611 let file_matches =
612 self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
613 accum
614 .matches_found
615 .fetch_add(file_matches.len() as u32, Ordering::Relaxed);
616 Some(file_matches)
617 })
618 .flatten()
619 .collect();
620
621 if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
622 {
623 all_matches.truncate(options.max_results);
624 }
625
626 let mut stats = QueryStats {
627 candidate_files: stats_candidate_files,
628 total_matches: all_matches.len() as u32,
629 ..Default::default()
630 };
631 accum.into_stats(stats_candidate_files, all_matches.len() as u32, &mut stats);
632 (all_matches, stats)
633 }
634
635 pub fn verify_stream_for_test<R: Read>(
642 &self,
643 reader: R,
644 path: &Path,
645 regex: &Regex,
646 options: &QueryOptions,
647 ) -> Result<Vec<Match>> {
648 Self::verify_stream(reader, path, regex, options)
649 }
650
651 #[allow(clippy::as_conversions)] fn verify_stream<R: Read>(
653 reader: R,
654 path: &Path,
655 regex: &Regex,
656 options: &QueryOptions,
657 ) -> Result<Vec<Match>> {
658 let mut buf_reader = BufReader::new(reader);
659 let mut matches = Vec::new();
660 let mut line_number = 0u32;
661 let mut byte_offset = 0u64;
662
663 {
665 let buffer = buf_reader.fill_buf()?;
666 let is_bin = is_binary(buffer);
667 if is_bin && !options.binary {
668 return Ok(vec![]);
669 }
670 }
671
672 let mut line = String::new();
673 let mut context_before = std::collections::VecDeque::new();
674 let mut pending_matches: Vec<Match> = Vec::new();
675
676 while buf_reader.read_line(&mut line)? > 0 {
677 line_number += 1;
678 let line_len = line.len() as u64;
679 let trimmed_line = line.trim_end().to_string();
680
681 for m in &mut pending_matches {
683 if m.context_after.len() < options.context_lines {
684 m.context_after.push(trimmed_line.clone());
685 }
686 }
687
688 let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
690 .into_iter()
691 .partition(|m| m.context_after.len() >= options.context_lines);
692 matches.extend(completed);
693 pending_matches = still_pending;
694
695 if let Some(m) = regex.find(&line) {
696 let context_before_vec: Vec<String> = context_before.iter().cloned().collect();
697
698 let new_match = Match {
699 file_path: path.to_path_buf(),
700 line_number,
701 col: (m.start() + 1) as u32,
702 line_content: if options.count_only {
703 String::new()
704 } else {
705 trimmed_line.clone()
706 },
707 byte_offset: byte_offset + m.start() as u64,
708 context_before: context_before_vec,
709 context_after: vec![],
710 is_binary: false,
711 };
712
713 if options.context_lines > 0 {
714 pending_matches.push(new_match);
715 } else {
716 matches.push(new_match);
717 }
718
719 if options.max_results > 0
720 && (matches.len() + pending_matches.len()) >= options.max_results
721 && (pending_matches.is_empty() || matches.len() >= options.max_results)
722 {
723 break;
724 }
725 }
726
727 if options.context_lines > 0 {
728 context_before.push_back(trimmed_line.clone());
729 if context_before.len() > options.context_lines {
730 context_before.pop_front();
731 }
732 }
733
734 byte_offset += line_len;
735 line.clear();
736 }
737
738 matches.extend(pending_matches);
739 Ok(matches)
740 }
741
742 fn verify_candidate(
748 &self,
749 file_info: &FileInfo,
750 regex: &Regex,
751 options: &QueryOptions,
752 neg_fp: u64,
753 stats: &QueryStatsAccum,
754 ) -> Option<Vec<Match>> {
755 if self
756 .neg_cache
757 .is_known_negative(neg_fp, file_info.content_hash)
758 {
759 stats.neg_cache_hits.fetch_add(1, Ordering::Relaxed);
760 return None;
761 }
762 stats.neg_cache_misses.fetch_add(1, Ordering::Relaxed);
763
764 let matches = Self::verify_file(file_info, regex, options).ok()?;
765 if matches.is_empty() {
766 self.neg_cache
767 .record_negative(neg_fp, file_info.content_hash);
768 }
769 Some(matches)
770 }
771
772 fn verify_file(info: &FileInfo, regex: &Regex, options: &QueryOptions) -> Result<Vec<Match>> {
773 let file = File::open(&info.path)?;
774 let mmap = unsafe { memmap2::Mmap::map(&file)? };
775
776 let effective_options = if options.files_only && options.max_results == 0 {
777 QueryOptions {
778 max_results: 1,
779 ..options.clone()
780 }
781 } else {
782 options.clone()
783 };
784
785 if options.decompress
786 && let Some(reader) = maybe_decompress(&info.path, &mmap)?
787 {
788 return Self::verify_stream(reader, info.path.as_ref(), regex, &effective_options);
789 }
790
791 Self::verify_stream(
792 Cursor::new(&mmap[..]),
793 info.path.as_ref(),
794 regex,
795 &effective_options,
796 )
797 }
798}