1use crate::decompress::maybe_decompress;
6use crate::error::Result;
7use crate::format::is_binary;
8use crate::planner::QueryPlan;
9use crate::reader::{FileInfo, Reader};
10use crate::trigram::Trigram;
11use rayon::prelude::*;
12use regex::Regex;
13use std::collections::HashSet;
14use std::fs::File;
15use std::io::{BufRead, BufReader, Cursor, Read};
16use std::path::{PathBuf};
17use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
18
19#[derive(Debug)]
20pub struct Match {
21 pub file_path: PathBuf,
22 pub line_number: u32,
23 pub col: u32,
24 pub line_content: String,
25 pub byte_offset: u64,
26 pub context_before: Vec<String>,
27 pub context_after: Vec<String>,
28 pub is_binary: bool,
29}
30
31#[derive(Default, Debug)]
32pub struct QueryStats {
33 pub trigrams_queried: u32,
34 pub posting_lists_decoded: u32,
35 pub candidate_files: u32,
36 pub files_verified: u32,
37 pub bytes_verified: u64,
38 pub total_matches: u32,
39}
40
41#[derive(Debug, Default)]
42pub struct QueryOptions {
43 pub count_only: bool,
44 pub files_only: bool,
45 pub max_results: usize,
46 pub type_filter: Vec<String>,
47 pub context_lines: usize,
48 pub decompress: bool,
49 pub threads: usize,
50 pub multiline: bool,
51 pub archive: bool,
52 pub binary: bool,
53}
54
55pub struct Executor<'a> {
56 index: &'a Reader,
57}
58
59impl<'a> Executor<'a> {
60 pub fn new(index: &'a Reader) -> Self {
61 Self { index }
62 }
63
64 pub fn execute(
65 &self,
66 plan: &QueryPlan,
67 options: &QueryOptions,
68 ) -> Result<(Vec<Match>, QueryStats)> {
69 match plan {
70 QueryPlan::Literal { pattern, trigrams } => {
71 self.execute_literal(pattern, trigrams, options)
72 }
73 QueryPlan::RegexWithLiterals {
74 regex,
75 required_trigram_sets,
76 } => self.execute_regex_indexed(regex, required_trigram_sets, options),
77 QueryPlan::CaseInsensitive {
78 regex,
79 trigram_groups,
80 } => self.execute_case_insensitive(regex, trigram_groups, options),
81 QueryPlan::FullScan { regex } => self.execute_full_scan(regex, options),
82 }
83 }
84
85 fn execute_literal(
86 &self,
87 pattern: &[u8],
88 trigrams: &[Trigram],
89 options: &QueryOptions,
90 ) -> Result<(Vec<Match>, QueryStats)> {
91 let mut stats = QueryStats::default();
92
93 let mut infos = Vec::new();
94 for &tri in trigrams {
95 stats.trigrams_queried += 1;
96 if let Some(info) = self.index.get_trigram(tri) {
97 infos.push((tri, info));
98 } else {
99 return Ok((vec![], stats));
100 }
101 }
102
103 infos.sort_by_key(|(_, info)| info.doc_frequency);
105
106 let (_, rarest_info) = &infos[0];
108 let postings = self.index.decode_postings(rarest_info)?;
109 stats.posting_lists_decoded += 1;
110
111 let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
112
113 for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
116 if candidates.len() < 100 {
117 break;
118 }
119
120 let next_postings = self.index.decode_postings(info)?;
121 stats.posting_lists_decoded += 1;
122
123 let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
124 candidates.retain(|fid| next_set.contains(fid));
125 }
126
127 for &(tri, _) in &infos[1..] {
129 if candidates.is_empty() {
130 break;
131 }
132 candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
133 }
134
135 stats.candidate_files = candidates.len() as u32;
136
137 let regex = Regex::new(®ex::escape(&String::from_utf8_lossy(pattern)))?;
138
139 let files_verified = AtomicU32::new(0);
141 let bytes_verified = std::sync::atomic::AtomicU64::new(0);
142 let matches_found = AtomicU32::new(0);
143
144 let candidate_list: Vec<u32> = candidates.into_iter().collect();
145
146 let mut all_matches: Vec<Match> = candidate_list
147 .into_par_iter()
148 .filter_map(|fid| {
149 if options.max_results > 0 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32 {
150 return None;
151 }
152
153 let file_info = self.index.get_file(fid).ok()?;
154
155 if !options.type_filter.is_empty() {
157 let ext = file_info
158 .path
159 .extension()
160 .and_then(|e: &std::ffi::OsStr| e.to_str())
161 .unwrap_or("");
162 if !options.type_filter.iter().any(|e: &String| e == ext) {
163 return None;
164 }
165 }
166
167 files_verified.fetch_add(1, Ordering::Relaxed);
168 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
169
170 let matches = self.verify_file(&file_info, ®ex, options).ok()?;
171 matches_found.fetch_add(matches.len() as u32, Ordering::Relaxed);
172 Some(matches)
173 })
174 .flatten()
175 .collect();
176
177 stats.files_verified = files_verified.into_inner();
178 stats.bytes_verified = bytes_verified.into_inner();
179
180 if options.max_results > 0 && all_matches.len() > options.max_results {
181 all_matches.truncate(options.max_results);
182 }
183
184 stats.total_matches = all_matches.len() as u32;
185
186 Ok((all_matches, stats))
187 }
188
189
190 fn execute_regex_indexed(
191 &self,
192 regex: &Regex,
193 required_trigram_sets: &[Vec<Trigram>],
194 options: &QueryOptions,
195 ) -> Result<(Vec<Match>, QueryStats)> {
196 let mut stats = QueryStats::default();
197
198 let mut fragment_candidates = Vec::new();
200 for trigram_set in required_trigram_sets {
201 let mut infos = Vec::new();
202 for &tri in trigram_set {
203 stats.trigrams_queried += 1;
204 if let Some(info) = self.index.get_trigram(tri) {
205 infos.push((tri, info));
206 } else {
207 return Ok((vec![], stats));
208 }
209 }
210
211 infos.sort_by_key(|(_, info)| info.doc_frequency);
212
213 let (_, rarest_info) = &infos[0];
215 let postings = self.index.decode_postings(rarest_info)?;
216 stats.posting_lists_decoded += 1;
217 let mut set_candidates: HashSet<u32> =
218 postings.entries.iter().map(|e| e.file_id).collect();
219
220 for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
222 if set_candidates.len() < 100 {
223 break;
224 }
225 let next_postings = self.index.decode_postings(info)?;
226 stats.posting_lists_decoded += 1;
227 let next_set: HashSet<u32> =
228 next_postings.entries.iter().map(|e| e.file_id).collect();
229 set_candidates.retain(|fid| next_set.contains(fid));
230 }
231
232 for &(tri, _) in &infos[1..] {
233 set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
234 }
235 fragment_candidates.push(set_candidates);
236 }
237
238 let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
240 Some(c) => c,
241 None => return Ok((vec![], stats)),
242 };
243 for set in fragment_candidates {
244 final_candidates.retain(|fid: &u32| set.contains(fid));
245 }
246
247 stats.candidate_files = final_candidates.len() as u32;
248
249 let files_verified = AtomicU32::new(0);
250 let bytes_verified = AtomicU64::new(0);
251 let matches_found = AtomicU32::new(0);
252
253 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
254
255 let mut all_matches: Vec<Match> = candidate_list
256 .into_par_iter()
257 .filter_map(|fid| {
258 if options.max_results > 0
259 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
260 {
261 return None;
262 }
263
264 let file_info = self.index.get_file(fid).ok()?;
265
266 if !options.type_filter.is_empty() {
268 let ext = file_info
269 .path
270 .extension()
271 .and_then(|e: &std::ffi::OsStr| e.to_str())
272 .unwrap_or("");
273 if !options.type_filter.iter().any(|e: &String| e == ext) {
274 return None;
275 }
276 }
277
278 files_verified.fetch_add(1, Ordering::Relaxed);
279 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
280
281 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
282 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
283 Some(file_matches)
284 })
285 .flatten()
286 .collect();
287
288 stats.files_verified = files_verified.into_inner();
289 stats.bytes_verified = bytes_verified.into_inner();
290
291 if options.max_results > 0 && all_matches.len() > options.max_results {
292 all_matches.truncate(options.max_results);
293 }
294
295 stats.total_matches = all_matches.len() as u32;
296 Ok((all_matches, stats))
297 }
298
299 fn execute_case_insensitive(
300 &self,
301 regex: &Regex,
302 trigram_groups: &[Vec<Trigram>],
303 options: &QueryOptions,
304 ) -> Result<(Vec<Match>, QueryStats)> {
305 let mut stats = QueryStats::default();
306
307 let mut group_candidates = Vec::new();
309 for group in trigram_groups {
310 let mut union_set: HashSet<u32> = HashSet::new();
311 for &tri in group {
312 stats.trigrams_queried += 1;
313 if let Some(info) = self.index.get_trigram(tri)
314 && let Ok(postings) = self.index.decode_postings(&info)
315 {
316 stats.posting_lists_decoded += 1;
317 for entry in &postings.entries {
318 union_set.insert(entry.file_id);
319 }
320 }
321 }
323 if !union_set.is_empty() {
324 group_candidates.push(union_set);
325 }
326 }
327
328 let final_candidates = if let Some(mut base) = group_candidates.pop() {
330 for set in group_candidates {
331 base.retain(|fid| set.contains(fid));
332 }
333 base
334 } else {
335 let all: HashSet<u32> = (0..self.index.header.file_count).collect();
337 all
338 };
339
340 stats.candidate_files = final_candidates.len() as u32;
341
342 let files_verified = AtomicU32::new(0);
343 let bytes_verified = AtomicU64::new(0);
344 let matches_found = AtomicU32::new(0);
345
346 let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
347
348 let mut all_matches: Vec<Match> = candidate_list
349 .into_par_iter()
350 .filter_map(|fid| {
351 if options.max_results > 0
352 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
353 {
354 return None;
355 }
356
357 let file_info = self.index.get_file(fid).ok()?;
358
359 if !options.type_filter.is_empty() {
360 let ext = file_info
361 .path
362 .extension()
363 .and_then(|e| e.to_str())
364 .unwrap_or("");
365 if !options.type_filter.iter().any(|e| e == ext) {
366 return None;
367 }
368 }
369
370 files_verified.fetch_add(1, Ordering::Relaxed);
371 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
372
373 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
374 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
375 Some(file_matches)
376 })
377 .flatten()
378 .collect();
379
380 stats.files_verified = files_verified.into_inner();
381 stats.bytes_verified = bytes_verified.into_inner();
382
383 if options.max_results > 0 && all_matches.len() > options.max_results {
384 all_matches.truncate(options.max_results);
385 }
386
387 stats.total_matches = all_matches.len() as u32;
388 Ok((all_matches, stats))
389 }
390
391 fn execute_full_scan(
392 &self,
393 regex: &Regex,
394 options: &QueryOptions,
395 ) -> Result<(Vec<Match>, QueryStats)> {
396 let stats_candidate_files = self.index.header.file_count;
397
398 let files_verified = AtomicU32::new(0);
399 let bytes_verified = AtomicU64::new(0);
400 let matches_found = AtomicU32::new(0);
401
402 let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
403 .into_par_iter()
404 .filter_map(|fid| {
405 if options.max_results > 0
406 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
407 {
408 return None;
409 }
410
411 let file_info = self.index.get_file(fid).ok()?;
412
413 if !options.type_filter.is_empty() {
415 let ext = file_info
416 .path
417 .extension()
418 .and_then(|e: &std::ffi::OsStr| e.to_str())
419 .unwrap_or("");
420 if !options.type_filter.iter().any(|e: &String| e == ext) {
421 return None;
422 }
423 }
424
425 files_verified.fetch_add(1, Ordering::Relaxed);
426 bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
427
428 let file_matches = self.verify_file(&file_info, regex, options).ok()?;
429 matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
430 Some(file_matches)
431 })
432 .flatten()
433 .collect();
434
435 if options.max_results > 0 && all_matches.len() > options.max_results {
436 all_matches.truncate(options.max_results);
437 }
438
439 let stats = QueryStats {
440 candidate_files: stats_candidate_files,
441 files_verified: files_verified.into_inner(),
442 bytes_verified: bytes_verified.into_inner(),
443 total_matches: all_matches.len() as u32,
444 ..Default::default()
445 };
446 Ok((all_matches, stats))
447 }
448
449 pub fn verify_stream_for_test<R: Read>(
451 &self,
452 reader: R,
453 path: PathBuf,
454 regex: &Regex,
455 options: &QueryOptions,
456 ) -> Result<Vec<Match>> {
457 self.verify_stream(reader, path, regex, options)
458 }
459
460 fn verify_stream<R: Read>(
461 &self,
462 reader: R,
463 path: PathBuf,
464 regex: &Regex,
465 options: &QueryOptions,
466 ) -> Result<Vec<Match>> {
467 let mut buf_reader = BufReader::new(reader);
468 let mut matches = Vec::new();
469 let mut line_number = 0u32;
470 let mut byte_offset = 0u64;
471
472 {
474 let buffer = buf_reader.fill_buf()?;
475 let is_bin = is_binary(buffer);
476 if is_bin && !options.binary {
477 return Ok(vec![]);
478 }
479 }
480
481 let mut line = String::new();
482 let mut context_before = std::collections::VecDeque::new();
483 let mut pending_matches: Vec<Match> = Vec::new();
484
485 while buf_reader.read_line(&mut line)? > 0 {
486 line_number += 1;
487 let line_len = line.len() as u64;
488 let trimmed_line = line.trim_end().to_string();
489
490 for m in &mut pending_matches {
492 if m.context_after.len() < options.context_lines {
493 m.context_after.push(trimmed_line.clone());
494 }
495 }
496
497 let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
499 .into_iter()
500 .partition(|m| m.context_after.len() >= options.context_lines);
501 matches.extend(completed);
502 pending_matches = still_pending;
503
504 if let Some(m) = regex.find(&line) {
505 let context_before_vec: Vec<String> =
506 context_before.iter().map(|s: &String| s.trim_end().to_string()).collect();
507
508 let new_match = Match {
509 file_path: path.clone(),
510 line_number,
511 col: (m.start() + 1) as u32,
512 line_content: if options.count_only {
513 String::new()
514 } else {
515 trimmed_line.clone()
516 },
517 byte_offset: byte_offset + m.start() as u64,
518 context_before: context_before_vec,
519 context_after: vec![],
520 is_binary: false,
521 };
522
523 if options.context_lines > 0 {
524 pending_matches.push(new_match);
525 } else {
526 matches.push(new_match);
527 }
528
529 if options.max_results > 0
530 && (matches.len() + pending_matches.len()) >= options.max_results
531 && (pending_matches.is_empty() || matches.len() >= options.max_results)
532 {
533 break;
534 }
535 }
536
537 if options.context_lines > 0 {
538 context_before.push_back(line.clone());
539 if context_before.len() > options.context_lines {
540 context_before.pop_front();
541 }
542 }
543
544 byte_offset += line_len;
545 line.clear();
546 }
547
548 matches.extend(pending_matches);
549 Ok(matches)
550 }
551
552 fn verify_file(
553 &self,
554 info: &FileInfo,
555 regex: &Regex,
556 options: &QueryOptions,
557 ) -> Result<Vec<Match>> {
558 let file = File::open(&info.path)?;
559 let mmap = unsafe { memmap2::Mmap::map(&file)? };
560
561 if options.decompress
562 && let Some(reader) = maybe_decompress(&info.path, &mmap)? {
563 return self.verify_stream(reader, info.path.clone(), regex, options);
564 }
565
566 self.verify_stream(Cursor::new(&mmap[..]), info.path.clone(), regex, options)
568 }
569}