use crate::decompress::maybe_decompress;
use crate::error::Result;
use crate::format::is_binary;
use crate::neg_cache::NegCache;
use crate::planner::QueryPlan;
use crate::posting_cache::PostingCache;
use crate::reader::{DeltaReader, FileInfo, Reader};
use crate::regex_pool::RegexPool;
use crate::trigram::Trigram;
use rayon::prelude::*;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufRead, BufReader, Cursor, Read};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Match {
pub file_path: PathBuf,
pub line_number: u32,
pub col: u32,
pub line_content: String,
pub byte_offset: u64,
pub context_before: Vec<String>,
pub context_after: Vec<String>,
pub is_binary: bool,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct QueryStats {
pub trigrams_queried: u32,
pub posting_lists_decoded: u32,
pub candidate_files: u32,
pub files_verified: u32,
pub bytes_verified: u64,
pub total_matches: u32,
pub posting_cache_hits: u64,
pub posting_cache_misses: u64,
pub neg_cache_hits: u64,
pub neg_cache_misses: u64,
}
struct QueryStatsAccum {
files_verified: AtomicU32,
bytes_verified: AtomicU64,
matches_found: AtomicU32,
neg_cache_hits: AtomicU64,
neg_cache_misses: AtomicU64,
}
impl QueryStatsAccum {
fn new() -> Self {
Self {
files_verified: AtomicU32::new(0),
bytes_verified: AtomicU64::new(0),
matches_found: AtomicU32::new(0),
neg_cache_hits: AtomicU64::new(0),
neg_cache_misses: AtomicU64::new(0),
}
}
fn into_stats(self, candidate_files: u32, total_matches: u32, stats: &mut QueryStats) {
stats.files_verified = self.files_verified.into_inner();
stats.bytes_verified = self.bytes_verified.into_inner();
stats.neg_cache_hits += self.neg_cache_hits.into_inner();
stats.neg_cache_misses += self.neg_cache_misses.into_inner();
stats.candidate_files = candidate_files;
stats.total_matches = total_matches;
}
}
#[derive(Debug, Default, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct QueryOptions {
pub count_only: bool,
pub files_only: bool,
pub max_results: usize,
pub type_filter: Vec<String>,
pub context_lines: usize,
pub decompress: bool,
pub threads: usize,
pub multiline: bool,
pub archive: bool,
pub binary: bool,
pub word_boundary: bool,
}
pub struct Executor<'a> {
index: &'a Reader,
delta: Option<DeltaReader>,
delta_path: Option<std::path::PathBuf>,
posting_cache: Arc<PostingCache>,
neg_cache: Arc<NegCache>,
regex_pool: Arc<RegexPool>,
neg_query_fingerprint: u64,
}
impl<'a> Executor<'a> {
#[must_use]
pub fn new(index: &'a Reader) -> Self {
Self {
index,
delta: None,
delta_path: None,
posting_cache: Arc::new(PostingCache::default()),
neg_cache: Arc::new(NegCache::new(65_536)),
regex_pool: Arc::new(RegexPool::new(256)),
neg_query_fingerprint: 0,
}
}
#[must_use]
pub fn new_with_caches(
index: &'a Reader,
posting_cache: Arc<PostingCache>,
neg_cache: Arc<NegCache>,
regex_pool: Arc<RegexPool>,
delta_path: Option<std::path::PathBuf>,
) -> Self {
let delta = delta_path
.as_ref()
.and_then(|p| crate::reader::DeltaReader::open(p).ok());
Self {
index,
delta,
delta_path,
posting_cache,
neg_cache,
regex_pool,
neg_query_fingerprint: 0,
}
}
#[must_use]
pub fn posting_cache(&self) -> &PostingCache {
&self.posting_cache
}
#[must_use]
pub fn neg_cache(&self) -> &NegCache {
&self.neg_cache
}
#[must_use]
pub fn regex_pool(&self) -> &RegexPool {
&self.regex_pool
}
fn get_file_info(&self, fid: u32) -> Option<crate::reader::FileInfo> {
if fid >= self.index.header.file_count {
let delta = self.delta.as_ref()?;
let info = delta.id_to_fileinfo.get(&fid)?;
Some(crate::reader::FileInfo {
file_id: fid,
path: info.path.clone(),
status: crate::format::FileStatus::Fresh,
mtime_ns: info.mtime,
size_bytes: info.size,
content_hash: info.hash,
})
} else {
self.index.get_file(fid).ok()
}
}
pub fn set_delta_path(&mut self, path: std::path::PathBuf) {
self.delta_path = Some(path);
}
fn ensure_delta(&mut self) {
if self.delta.is_none()
&& let Some(ref path) = self.delta_path
&& let Ok(dr) = DeltaReader::open(path)
{
self.delta = Some(dr);
}
}
fn is_tombstoned(&self, file_id: u32) -> bool {
self.delta
.as_ref()
.is_some_and(|d| d.tombstones.contains(&file_id))
}
fn merge_delta_candidates(
&self,
candidates: &mut std::collections::HashSet<u32>,
trigram: crate::trigram::Trigram,
) {
if let Some(ref delta) = self.delta
&& let Some(entries) = delta.postings.get(&trigram)
{
for entry in entries {
if !self.is_tombstoned(entry.file_id) {
candidates.insert(entry.file_id);
}
}
}
}
pub fn execute(
&mut self,
plan: &QueryPlan,
options: &QueryOptions,
) -> Result<(Vec<Match>, QueryStats)> {
self.neg_query_fingerprint = crate::neg_cache::query_fingerprint(plan.pattern_str());
self.ensure_delta();
match plan {
QueryPlan::Literal {
pattern,
trigrams,
regex,
} => self.execute_literal(pattern, trigrams, regex, options),
QueryPlan::RegexWithLiterals {
regex,
required_trigram_sets,
} => self.execute_regex_indexed(regex, required_trigram_sets, options),
QueryPlan::CaseInsensitive {
regex,
trigram_groups,
} => Ok(self.execute_case_insensitive(regex, trigram_groups, options)),
QueryPlan::FullScan { regex } => Ok(self.execute_full_scan(regex, options)),
}
}
fn decode_postings_cached(
&self,
tri: Trigram,
info: &crate::reader::TrigramInfo,
stats: &mut QueryStats,
) -> Result<crate::posting::PostingList> {
if let Some(cached) = self.posting_cache.get(tri) {
stats.posting_cache_hits += 1;
return Ok(cached);
}
stats.posting_cache_misses += 1;
let list = self.index.decode_postings(info)?;
self.posting_cache.insert(tri, list.clone());
stats.posting_lists_decoded += 1;
Ok(list)
}
#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] fn execute_literal(
&self,
_pattern: &[u8],
trigrams: &[Trigram],
regex: &Regex,
options: &QueryOptions,
) -> Result<(Vec<Match>, QueryStats)> {
let mut stats = QueryStats::default();
let mut infos = Vec::new();
for &tri in trigrams {
stats.trigrams_queried += 1;
if let Some(info) = self.index.get_trigram(tri) {
infos.push((tri, info));
} else {
return Ok((vec![], stats));
}
}
infos.sort_by_key(|(_, info)| info.doc_frequency);
tracing::debug!(
"literal search: {} trigrams, rarities: {:?}",
infos.len(),
infos
.iter()
.map(|(t, i)| (format!("0x{t:06x}"), i.doc_frequency))
.collect::<Vec<_>>()
);
let (rarest_tri, rarest_info) = &infos[0];
let postings = self.decode_postings_cached(*rarest_tri, rarest_info, &mut stats)?;
let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
candidates.retain(|&fid| !self.is_tombstoned(fid));
self.merge_delta_candidates(&mut candidates, *rarest_tri);
tracing::debug!("step 1 (rarest): {} candidates", candidates.len());
for (tri, info) in infos.iter().take(infos.len().min(3)).skip(1) {
if candidates.len() < 100 {
tracing::debug!(
"step 2: breaking early, {} candidates < 100",
candidates.len()
);
break;
}
let next_postings = self.decode_postings_cached(*tri, info, &mut stats)?;
let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
candidates.retain(|fid| next_set.contains(fid));
}
for &(tri, _) in &infos[1..] {
if candidates.is_empty() {
break;
}
candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
}
stats.candidate_files = candidates.len() as u32;
let accum = QueryStatsAccum::new();
let neg_fp = self.neg_query_fingerprint;
let candidate_list: Vec<u32> = candidates.into_iter().collect();
let mut all_matches: Vec<Match> = candidate_list
.into_par_iter()
.filter_map(|fid| {
let should_early_terminate = options.max_results > 0
&& !options.files_only
&& accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
if should_early_terminate {
return None;
}
let file_info = self.get_file_info(fid)?;
if !options.type_filter.is_empty() {
let ext = file_info
.path
.extension()
.and_then(|e: &std::ffi::OsStr| e.to_str())
.unwrap_or("");
if !options.type_filter.iter().any(|e: &String| e == ext) {
return None;
}
}
accum.files_verified.fetch_add(1, Ordering::Relaxed);
accum
.bytes_verified
.fetch_add(file_info.size_bytes, Ordering::Relaxed);
let file_matches =
self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
accum
.matches_found
.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
Some(file_matches)
})
.flatten()
.collect();
accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
{
all_matches.truncate(options.max_results);
}
stats.total_matches = all_matches.len() as u32;
Ok((all_matches, stats))
}
#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] fn execute_regex_indexed(
&self,
regex: &Regex,
required_trigram_sets: &[Vec<Trigram>],
options: &QueryOptions,
) -> Result<(Vec<Match>, QueryStats)> {
let mut stats = QueryStats::default();
let mut fragment_candidates = Vec::new();
for trigram_set in required_trigram_sets {
let mut infos = Vec::new();
for &tri in trigram_set {
stats.trigrams_queried += 1;
if let Some(info) = self.index.get_trigram(tri) {
infos.push((tri, info));
} else {
return Ok((vec![], stats));
}
}
infos.sort_by_key(|(_, info)| info.doc_frequency);
let (rarest_tri, rarest_info) = &infos[0];
let postings = self.decode_postings_cached(*rarest_tri, rarest_info, &mut stats)?;
let mut set_candidates: HashSet<u32> =
postings.entries.iter().map(|e| e.file_id).collect();
for (tri, info) in infos.iter().take(infos.len().min(3)).skip(1) {
if set_candidates.len() < 100 {
break;
}
let next_postings = self.decode_postings_cached(*tri, info, &mut stats)?;
let next_set: HashSet<u32> =
next_postings.entries.iter().map(|e| e.file_id).collect();
set_candidates.retain(|fid| next_set.contains(fid));
}
for &(tri, _) in &infos[1..] {
set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
}
fragment_candidates.push(set_candidates);
}
let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
Some(c) => c,
None => return Ok((vec![], stats)),
};
for set in fragment_candidates {
final_candidates.retain(|fid: &u32| set.contains(fid));
}
final_candidates.retain(|&fid| !self.is_tombstoned(fid));
if let Some(ref delta) = self.delta {
final_candidates.extend(delta.id_to_fileinfo.keys().copied());
}
stats.candidate_files = final_candidates.len() as u32;
let accum = QueryStatsAccum::new();
let neg_fp = self.neg_query_fingerprint;
let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
let mut all_matches: Vec<Match> = candidate_list
.into_par_iter()
.filter_map(|fid| {
let should_early_terminate = options.max_results > 0
&& !options.files_only
&& accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
if should_early_terminate {
return None;
}
let file_info = self.get_file_info(fid)?;
if !options.type_filter.is_empty() {
let ext = file_info
.path
.extension()
.and_then(|e: &std::ffi::OsStr| e.to_str())
.unwrap_or("");
if !options.type_filter.iter().any(|e: &String| e == ext) {
return None;
}
}
accum.files_verified.fetch_add(1, Ordering::Relaxed);
accum
.bytes_verified
.fetch_add(file_info.size_bytes, Ordering::Relaxed);
let file_matches =
self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
accum
.matches_found
.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
Some(file_matches)
})
.flatten()
.collect();
accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
{
all_matches.truncate(options.max_results);
}
stats.total_matches = all_matches.len() as u32;
Ok((all_matches, stats))
}
#[allow(clippy::as_conversions)] fn execute_case_insensitive(
&self,
regex: &Regex,
trigram_groups: &[Vec<Trigram>],
options: &QueryOptions,
) -> (Vec<Match>, QueryStats) {
let mut stats = QueryStats::default();
let mut group_candidates = Vec::new();
for group in trigram_groups {
let mut union_set: HashSet<u32> = HashSet::new();
for &tri in group {
stats.trigrams_queried += 1;
if let Some(info) = self.index.get_trigram(tri)
&& let Ok(postings) = self.decode_postings_cached(tri, &info, &mut stats)
{
for entry in &postings.entries {
union_set.insert(entry.file_id);
}
}
}
if !union_set.is_empty() {
group_candidates.push(union_set);
}
}
let mut final_candidates = if let Some(mut base) = group_candidates.pop() {
for set in group_candidates {
base.retain(|fid| set.contains(fid));
}
base
} else {
let all: HashSet<u32> = (0..self.index.header.file_count).collect();
all
};
final_candidates.retain(|&fid| !self.is_tombstoned(fid));
if let Some(ref delta) = self.delta {
final_candidates.extend(delta.id_to_fileinfo.keys().copied());
}
stats.candidate_files = final_candidates.len() as u32;
let accum = QueryStatsAccum::new();
let neg_fp = self.neg_query_fingerprint;
let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
let mut all_matches: Vec<Match> = candidate_list
.into_par_iter()
.filter_map(|fid| {
let should_early_terminate = options.max_results > 0
&& !options.files_only
&& accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
if should_early_terminate {
return None;
}
let file_info = self.get_file_info(fid)?;
if !options.type_filter.is_empty() {
let ext = file_info
.path
.extension()
.and_then(|e: &std::ffi::OsStr| e.to_str())
.unwrap_or("");
if !options.type_filter.iter().any(|e: &String| e == ext) {
return None;
}
}
accum.files_verified.fetch_add(1, Ordering::Relaxed);
accum
.bytes_verified
.fetch_add(file_info.size_bytes, Ordering::Relaxed);
let file_matches =
self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
accum
.matches_found
.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
Some(file_matches)
})
.flatten()
.collect();
accum.into_stats(stats.candidate_files, all_matches.len() as u32, &mut stats);
if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
{
all_matches.truncate(options.max_results);
}
stats.total_matches = all_matches.len() as u32;
(all_matches, stats)
}
#[allow(clippy::as_conversions)] fn execute_full_scan(&self, regex: &Regex, options: &QueryOptions) -> (Vec<Match>, QueryStats) {
let mut candidates: Vec<u32> = (0..self.index.header.file_count)
.filter(|fid| !self.is_tombstoned(*fid))
.collect();
if let Some(ref delta) = self.delta {
candidates.extend(
delta
.id_to_fileinfo
.keys()
.copied()
.filter(|fid| !self.is_tombstoned(*fid)),
);
}
let stats_candidate_files = candidates.len() as u32;
let accum = QueryStatsAccum::new();
let neg_fp = self.neg_query_fingerprint;
let mut all_matches: Vec<Match> = candidates
.into_par_iter()
.filter_map(|fid| {
let should_early_terminate = options.max_results > 0
&& !options.files_only
&& accum.matches_found.load(Ordering::Relaxed) >= options.max_results as u32;
if should_early_terminate {
return None;
}
let file_info = self.get_file_info(fid)?;
if !options.type_filter.is_empty() {
let ext = file_info
.path
.extension()
.and_then(|e: &std::ffi::OsStr| e.to_str())
.unwrap_or("");
if !options.type_filter.iter().any(|e: &String| e == ext) {
return None;
}
}
accum.files_verified.fetch_add(1, Ordering::Relaxed);
accum
.bytes_verified
.fetch_add(file_info.size_bytes, Ordering::Relaxed);
let file_matches =
self.verify_candidate(&file_info, regex, options, neg_fp, &accum)?;
accum
.matches_found
.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
Some(file_matches)
})
.flatten()
.collect();
if options.max_results > 0 && !options.files_only && all_matches.len() > options.max_results
{
all_matches.truncate(options.max_results);
}
let mut stats = QueryStats {
candidate_files: stats_candidate_files,
total_matches: all_matches.len() as u32,
..Default::default()
};
accum.into_stats(stats_candidate_files, all_matches.len() as u32, &mut stats);
(all_matches, stats)
}
pub fn verify_stream_for_test<R: Read>(
&self,
reader: R,
path: &Path,
regex: &Regex,
options: &QueryOptions,
) -> Result<Vec<Match>> {
Self::verify_stream(reader, path, regex, options)
}
#[allow(clippy::as_conversions)] fn verify_stream<R: Read>(
reader: R,
path: &Path,
regex: &Regex,
options: &QueryOptions,
) -> Result<Vec<Match>> {
let mut buf_reader = BufReader::new(reader);
let mut matches = Vec::new();
let mut line_number = 0u32;
let mut byte_offset = 0u64;
{
let buffer = buf_reader.fill_buf()?;
let is_bin = is_binary(buffer);
if is_bin && !options.binary {
return Ok(vec![]);
}
}
let mut line = String::new();
let mut context_before = std::collections::VecDeque::new();
let mut pending_matches: Vec<Match> = Vec::new();
while buf_reader.read_line(&mut line)? > 0 {
line_number += 1;
let line_len = line.len() as u64;
let trimmed_line = line.trim_end().to_string();
for m in &mut pending_matches {
if m.context_after.len() < options.context_lines {
m.context_after.push(trimmed_line.clone());
}
}
let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
.into_iter()
.partition(|m| m.context_after.len() >= options.context_lines);
matches.extend(completed);
pending_matches = still_pending;
if let Some(m) = regex.find(&line) {
let context_before_vec: Vec<String> = context_before.iter().cloned().collect();
let new_match = Match {
file_path: path.to_path_buf(),
line_number,
col: (m.start() + 1) as u32,
line_content: if options.count_only {
String::new()
} else {
trimmed_line.clone()
},
byte_offset: byte_offset + m.start() as u64,
context_before: context_before_vec,
context_after: vec![],
is_binary: false,
};
if options.context_lines > 0 {
pending_matches.push(new_match);
} else {
matches.push(new_match);
}
if options.max_results > 0
&& (matches.len() + pending_matches.len()) >= options.max_results
&& (pending_matches.is_empty() || matches.len() >= options.max_results)
{
break;
}
}
if options.context_lines > 0 {
context_before.push_back(trimmed_line.clone());
if context_before.len() > options.context_lines {
context_before.pop_front();
}
}
byte_offset += line_len;
line.clear();
}
matches.extend(pending_matches);
Ok(matches)
}
fn verify_candidate(
&self,
file_info: &FileInfo,
regex: &Regex,
options: &QueryOptions,
neg_fp: u64,
stats: &QueryStatsAccum,
) -> Option<Vec<Match>> {
if self
.neg_cache
.is_known_negative(neg_fp, file_info.content_hash)
{
stats.neg_cache_hits.fetch_add(1, Ordering::Relaxed);
return None;
}
stats.neg_cache_misses.fetch_add(1, Ordering::Relaxed);
let matches = Self::verify_file(file_info, regex, options).ok()?;
if matches.is_empty() {
self.neg_cache
.record_negative(neg_fp, file_info.content_hash);
}
Some(matches)
}
fn verify_file(info: &FileInfo, regex: &Regex, options: &QueryOptions) -> Result<Vec<Match>> {
let file = File::open(&info.path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
let effective_options = if options.files_only && options.max_results == 0 {
QueryOptions {
max_results: 1,
..options.clone()
}
} else {
options.clone()
};
if options.decompress
&& let Some(reader) = maybe_decompress(&info.path, &mmap)?
{
return Self::verify_stream(reader, info.path.as_ref(), regex, &effective_options);
}
Self::verify_stream(
Cursor::new(&mmap[..]),
info.path.as_ref(),
regex,
&effective_options,
)
}
}