1use crate::decompress::maybe_decompress;
6use crate::error::Result;
7use crate::format::is_binary;
8use crate::planner::QueryPlan;
9use crate::reader::{FileInfo, Reader};
10use crate::trigram::Trigram;
11use rayon::prelude::*;
12use regex::Regex;
13use std::collections::HashSet;
14use std::fs::File;
15use std::io::{BufRead, BufReader, Cursor, Read};
16use std::path::PathBuf;
17use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
18
19#[derive(Debug)]
20pub struct Match {
21 pub file_path: PathBuf,
22 pub line_number: u32,
23 pub col: u32,
24 pub line_content: String,
25 pub byte_offset: u64,
26 pub context_before: Vec<String>,
27 pub context_after: Vec<String>,
28 pub is_binary: bool,
29}
30
31#[derive(Default, Debug)]
32pub struct QueryStats {
33 pub trigrams_queried: u32,
34 pub posting_lists_decoded: u32,
35 pub candidate_files: u32,
36 pub files_verified: u32,
37 pub bytes_verified: u64,
38 pub total_matches: u32,
39}
40
41#[derive(Debug, Default)]
42pub struct QueryOptions {
43 pub count_only: bool,
44 pub files_only: bool,
45 pub max_results: usize,
46 pub type_filter: Vec<String>,
47 pub context_lines: usize,
48 pub decompress: bool,
49 pub threads: usize,
50 pub multiline: bool,
51 pub archive: bool,
52 pub binary: bool,
53}
54
55pub struct Executor<'a> {
56 index: &'a Reader,
57}
58
59impl<'a> Executor<'a> {
60 pub fn new(index: &'a Reader) -> Self {
61 Self { index }
62 }
63
64 pub fn execute(
65 &self,
66 plan: &QueryPlan,
67 options: &QueryOptions,
68 ) -> Result<(Vec<Match>, QueryStats)> {
69 match plan {
70 QueryPlan::Literal { pattern, trigrams } => {
71 self.execute_literal(pattern, trigrams, options)
72 }
73 QueryPlan::RegexWithLiterals {
74 regex,
75 required_trigram_sets,
76 } => self.execute_regex_indexed(regex, required_trigram_sets, options),
77 QueryPlan::CaseInsensitive {
78 regex,
79 trigram_groups,
80 } => self.execute_case_insensitive(regex, trigram_groups, options),
81 QueryPlan::FullScan { regex } => self.execute_full_scan(regex, options),
82 }
83 }
84
85 fn execute_literal(
86 &self,
87 pattern: &[u8],
88 trigrams: &[Trigram],
89 options: &QueryOptions,
90 ) -> Result<(Vec<Match>, QueryStats)> {
91 let mut stats = QueryStats::default();
92
93 let mut infos = Vec::new();
94 for &tri in trigrams {
95 stats.trigrams_queried += 1;
96 if let Some(info) = self.index.get_trigram(tri) {
97 infos.push((tri, info));
98 } else {
99 return Ok((vec![], stats));
100 }
101 }
102
103 infos.sort_by_key(|(_, info)| info.doc_frequency);
105
106 let (_, rarest_info) = &infos[0];
108 let postings = self.index.decode_postings(rarest_info)?;
109 stats.posting_lists_decoded += 1;
110
111 let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
112
113 for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
116 if candidates.len() < 100 {
117 break;
118 }
119
120 let next_postings = self.index.decode_postings(info)?;
121 stats.posting_lists_decoded += 1;
122
123 let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
124 candidates.retain(|fid| next_set.contains(fid));
125 }
126
127 for &(tri, _) in &infos[1..] {
129 if candidates.is_empty() {
130 break;
131 }
132 candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
133 }
134
135 stats.candidate_files = candidates.len() as u32;
136
137 let regex = Regex::new(®ex::escape(&String::from_utf8_lossy(pattern)))?;
138
139 let files_verified = AtomicU32::new(0);
141 let bytes_verified = std::sync::atomic::AtomicU64::new(0);
142 let matches_found = AtomicU32::new(0);
143
144 let candidate_list: Vec<u32> = candidates.into_iter().collect();
145
146 let mut all_matches: Vec<Match> = candidate_list
147 .into_par_iter()
148 .filter_map(|fid| {
149 if options.max_results > 0
150 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
151 {
152 return None;
153 }
154
155 let file_info = self.index.get_file(fid).ok()?;
156
157 if !options.type_filter.is_empty() {
159 let ext = file_info
160 .path
161 .extension()
162 .and_then(|e: &std::ffi::OsStr| e.to_str())
163 .unwrap_or("");
164 if !options.type_filter.iter().any(|e: &String| e == ext) {
165 return None;
166 }
167 }
168
169 files_verified.fetch_add(1, Ordering::Relaxed);
170 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
171
172 let matches = self.verify_file(&file_info, ®ex, options).ok()?;
173 matches_found.fetch_add(matches.len() as u32, Ordering::Relaxed);
174 Some(matches)
175 })
176 .flatten()
177 .collect();
178
179 stats.files_verified = files_verified.into_inner();
180 stats.bytes_verified = bytes_verified.into_inner();
181
182 if options.max_results > 0 && all_matches.len() > options.max_results {
183 all_matches.truncate(options.max_results);
184 }
185
186 stats.total_matches = all_matches.len() as u32;
187
188 Ok((all_matches, stats))
189 }
190
191 fn execute_regex_indexed(
192 &self,
193 regex: &Regex,
194 required_trigram_sets: &[Vec<Trigram>],
195 options: &QueryOptions,
196 ) -> Result<(Vec<Match>, QueryStats)> {
197 let mut stats = QueryStats::default();
198
199 let mut fragment_candidates = Vec::new();
201 for trigram_set in required_trigram_sets {
202 let mut infos = Vec::new();
203 for &tri in trigram_set {
204 stats.trigrams_queried += 1;
205 if let Some(info) = self.index.get_trigram(tri) {
206 infos.push((tri, info));
207 } else {
208 return Ok((vec![], stats));
209 }
210 }
211
212 infos.sort_by_key(|(_, info)| info.doc_frequency);
213
214 let (_, rarest_info) = &infos[0];
216 let postings = self.index.decode_postings(rarest_info)?;
217 stats.posting_lists_decoded += 1;
218 let mut set_candidates: HashSet<u32> =
219 postings.entries.iter().map(|e| e.file_id).collect();
220
221 for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
223 if set_candidates.len() < 100 {
224 break;
225 }
226 let next_postings = self.index.decode_postings(info)?;
227 stats.posting_lists_decoded += 1;
228 let next_set: HashSet<u32> =
229 next_postings.entries.iter().map(|e| e.file_id).collect();
230 set_candidates.retain(|fid| next_set.contains(fid));
231 }
232
233 for &(tri, _) in &infos[1..] {
234 set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
235 }
236 fragment_candidates.push(set_candidates);
237 }
238
239 let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
241 Some(c) => c,
242 None => return Ok((vec![], stats)),
243 };
244 for set in fragment_candidates {
245 final_candidates.retain(|fid: &u32| set.contains(fid));
246 }
247
248 stats.candidate_files = final_candidates.len() as u32;
249
250 let files_verified = AtomicU32::new(0);
251 let bytes_verified = AtomicU64::new(0);
252 let matches_found = AtomicU32::new(0);
253
254 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
255
256 let mut all_matches: Vec<Match> = candidate_list
257 .into_par_iter()
258 .filter_map(|fid| {
259 if options.max_results > 0
260 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
261 {
262 return None;
263 }
264
265 let file_info = self.index.get_file(fid).ok()?;
266
267 if !options.type_filter.is_empty() {
269 let ext = file_info
270 .path
271 .extension()
272 .and_then(|e: &std::ffi::OsStr| e.to_str())
273 .unwrap_or("");
274 if !options.type_filter.iter().any(|e: &String| e == ext) {
275 return None;
276 }
277 }
278
279 files_verified.fetch_add(1, Ordering::Relaxed);
280 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
281
282 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
283 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
284 Some(file_matches)
285 })
286 .flatten()
287 .collect();
288
289 stats.files_verified = files_verified.into_inner();
290 stats.bytes_verified = bytes_verified.into_inner();
291
292 if options.max_results > 0 && all_matches.len() > options.max_results {
293 all_matches.truncate(options.max_results);
294 }
295
296 stats.total_matches = all_matches.len() as u32;
297 Ok((all_matches, stats))
298 }
299
300 fn execute_case_insensitive(
301 &self,
302 regex: &Regex,
303 trigram_groups: &[Vec<Trigram>],
304 options: &QueryOptions,
305 ) -> Result<(Vec<Match>, QueryStats)> {
306 let mut stats = QueryStats::default();
307
308 let mut group_candidates = Vec::new();
310 for group in trigram_groups {
311 let mut union_set: HashSet<u32> = HashSet::new();
312 for &tri in group {
313 stats.trigrams_queried += 1;
314 if let Some(info) = self.index.get_trigram(tri)
315 && let Ok(postings) = self.index.decode_postings(&info)
316 {
317 stats.posting_lists_decoded += 1;
318 for entry in &postings.entries {
319 union_set.insert(entry.file_id);
320 }
321 }
322 }
324 if !union_set.is_empty() {
325 group_candidates.push(union_set);
326 }
327 }
328
329 let final_candidates = if let Some(mut base) = group_candidates.pop() {
331 for set in group_candidates {
332 base.retain(|fid| set.contains(fid));
333 }
334 base
335 } else {
336 let all: HashSet<u32> = (0..self.index.header.file_count).collect();
338 all
339 };
340
341 stats.candidate_files = final_candidates.len() as u32;
342
343 let files_verified = AtomicU32::new(0);
344 let bytes_verified = AtomicU64::new(0);
345 let matches_found = AtomicU32::new(0);
346
347 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
348
349 let mut all_matches: Vec<Match> = candidate_list
350 .into_par_iter()
351 .filter_map(|fid| {
352 if options.max_results > 0
353 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
354 {
355 return None;
356 }
357
358 let file_info = self.index.get_file(fid).ok()?;
359
360 if !options.type_filter.is_empty() {
361 let ext = file_info
362 .path
363 .extension()
364 .and_then(|e| e.to_str())
365 .unwrap_or("");
366 if !options.type_filter.iter().any(|e| e == ext) {
367 return None;
368 }
369 }
370
371 files_verified.fetch_add(1, Ordering::Relaxed);
372 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
373
374 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
375 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
376 Some(file_matches)
377 })
378 .flatten()
379 .collect();
380
381 stats.files_verified = files_verified.into_inner();
382 stats.bytes_verified = bytes_verified.into_inner();
383
384 if options.max_results > 0 && all_matches.len() > options.max_results {
385 all_matches.truncate(options.max_results);
386 }
387
388 stats.total_matches = all_matches.len() as u32;
389 Ok((all_matches, stats))
390 }
391
392 fn execute_full_scan(
393 &self,
394 regex: &Regex,
395 options: &QueryOptions,
396 ) -> Result<(Vec<Match>, QueryStats)> {
397 let stats_candidate_files = self.index.header.file_count;
398
399 let files_verified = AtomicU32::new(0);
400 let bytes_verified = AtomicU64::new(0);
401 let matches_found = AtomicU32::new(0);
402
403 let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
404 .into_par_iter()
405 .filter_map(|fid| {
406 if options.max_results > 0
407 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
408 {
409 return None;
410 }
411
412 let file_info = self.index.get_file(fid).ok()?;
413
414 if !options.type_filter.is_empty() {
416 let ext = file_info
417 .path
418 .extension()
419 .and_then(|e: &std::ffi::OsStr| e.to_str())
420 .unwrap_or("");
421 if !options.type_filter.iter().any(|e: &String| e == ext) {
422 return None;
423 }
424 }
425
426 files_verified.fetch_add(1, Ordering::Relaxed);
427 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
428
429 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
430 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
431 Some(file_matches)
432 })
433 .flatten()
434 .collect();
435
436 if options.max_results > 0 && all_matches.len() > options.max_results {
437 all_matches.truncate(options.max_results);
438 }
439
440 let stats = QueryStats {
441 candidate_files: stats_candidate_files,
442 files_verified: files_verified.into_inner(),
443 bytes_verified: bytes_verified.into_inner(),
444 total_matches: all_matches.len() as u32,
445 ..Default::default()
446 };
447 Ok((all_matches, stats))
448 }
449
450 pub fn verify_stream_for_test<R: Read>(
452 &self,
453 reader: R,
454 path: PathBuf,
455 regex: &Regex,
456 options: &QueryOptions,
457 ) -> Result<Vec<Match>> {
458 self.verify_stream(reader, path, regex, options)
459 }
460
461 fn verify_stream<R: Read>(
462 &self,
463 reader: R,
464 path: PathBuf,
465 regex: &Regex,
466 options: &QueryOptions,
467 ) -> Result<Vec<Match>> {
468 let mut buf_reader = BufReader::new(reader);
469 let mut matches = Vec::new();
470 let mut line_number = 0u32;
471 let mut byte_offset = 0u64;
472
473 {
475 let buffer = buf_reader.fill_buf()?;
476 let is_bin = is_binary(buffer);
477 if is_bin && !options.binary {
478 return Ok(vec![]);
479 }
480 }
481
482 let mut line = String::new();
483 let mut context_before = std::collections::VecDeque::new();
484 let mut pending_matches: Vec<Match> = Vec::new();
485
486 while buf_reader.read_line(&mut line)? > 0 {
487 line_number += 1;
488 let line_len = line.len() as u64;
489 let trimmed_line = line.trim_end().to_string();
490
491 for m in &mut pending_matches {
493 if m.context_after.len() < options.context_lines {
494 m.context_after.push(trimmed_line.clone());
495 }
496 }
497
498 let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
500 .into_iter()
501 .partition(|m| m.context_after.len() >= options.context_lines);
502 matches.extend(completed);
503 pending_matches = still_pending;
504
505 if let Some(m) = regex.find(&line) {
506 let context_before_vec: Vec<String> = context_before
507 .iter()
508 .map(|s: &String| s.trim_end().to_string())
509 .collect();
510
511 let new_match = Match {
512 file_path: path.clone(),
513 line_number,
514 col: (m.start() + 1) as u32,
515 line_content: if options.count_only {
516 String::new()
517 } else {
518 trimmed_line.clone()
519 },
520 byte_offset: byte_offset + m.start() as u64,
521 context_before: context_before_vec,
522 context_after: vec![],
523 is_binary: false,
524 };
525
526 if options.context_lines > 0 {
527 pending_matches.push(new_match);
528 } else {
529 matches.push(new_match);
530 }
531
532 if options.max_results > 0
533 && (matches.len() + pending_matches.len()) >= options.max_results
534 && (pending_matches.is_empty() || matches.len() >= options.max_results)
535 {
536 break;
537 }
538 }
539
540 if options.context_lines > 0 {
541 context_before.push_back(line.clone());
542 if context_before.len() > options.context_lines {
543 context_before.pop_front();
544 }
545 }
546
547 byte_offset += line_len;
548 line.clear();
549 }
550
551 matches.extend(pending_matches);
552 Ok(matches)
553 }
554
555 fn verify_file(
556 &self,
557 info: &FileInfo,
558 regex: &Regex,
559 options: &QueryOptions,
560 ) -> Result<Vec<Match>> {
561 let file = File::open(&info.path)?;
562 let mmap = unsafe { memmap2::Mmap::map(&file)? };
563
564 if options.decompress
565 && let Some(reader) = maybe_decompress(&info.path, &mmap)?
566 {
567 return self.verify_stream(reader, info.path.clone(), regex, options);
568 }
569
570 self.verify_stream(Cursor::new(&mmap[..]), info.path.clone(), regex, options)
572 }
573}