Skip to main content

ix/
executor.rs

1//! Query executor — search through the index and verify results.
2//!
3//! Handles literal searches, indexed regex, and full scans.
4
5use crate::decompress::maybe_decompress;
6use crate::error::Result;
7use crate::format::is_binary;
8use crate::planner::QueryPlan;
9use crate::reader::{FileInfo, Reader};
10use crate::trigram::Trigram;
11use rayon::prelude::*;
12use regex::Regex;
13use std::collections::HashSet;
14use std::fs::File;
15use std::io::{BufRead, BufReader, Cursor, Read};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
18
19/// A single regex match found in a file.
20#[derive(Debug)]
21pub struct Match {
22    /// Absolute path to the file containing the match.
23    pub file_path: PathBuf,
24    /// 1-based line number.
25    pub line_number: u32,
26    /// 1-based column within the line (byte offset).
27    pub col: u32,
28    /// The entire content of the matching line.
29    pub line_content: String,
30    /// Byte offset from the start of the file.
31    pub byte_offset: u64,
32    /// Lines preceding the match (context).
33    pub context_before: Vec<String>,
34    /// Lines following the match (context).
35    pub context_after: Vec<String>,
36    /// Whether the file was detected as binary.
37    pub is_binary: bool,
38}
39
40/// Performance counters collected during query execution.
41#[derive(Default, Debug)]
42pub struct QueryStats {
43    /// Number of trigrams looked up in the trigram table.
44    pub trigrams_queried: u32,
45    /// Number of posting lists that were fully decoded from disk.
46    pub posting_lists_decoded: u32,
47    /// Number of candidate files after intersection and bloom filtering.
48    pub candidate_files: u32,
49    /// Number of files whose content was verified against the regex.
50    pub files_verified: u32,
51    /// Total bytes of file content read during verification.
52    pub bytes_verified: u64,
53    /// Total number of matches produced.
54    pub total_matches: u32,
55}
56
57/// Tunable knobs that control query execution behaviour.
58#[derive(Debug, Default)]
59#[allow(clippy::struct_excessive_bools)]
60pub struct QueryOptions {
61    /// Only report match counts (not line content).
62    pub count_only: bool,
63    /// Only list file paths containing matches.
64    pub files_only: bool,
65    /// Maximum number of results to return (0 = unlimited).
66    pub max_results: usize,
67    /// File extensions to restrict the search to.
68    pub type_filter: Vec<String>,
69    /// Number of context lines to show before and after each match.
70    pub context_lines: usize,
71    /// Transparently decompress archives (e.g. `.gz`) when scanning.
72    pub decompress: bool,
73    /// Number of Rayon threads to use for parallel work.
74    pub threads: usize,
75    /// Dot-matches-newline mode for regex matching.
76    pub multiline: bool,
77    /// Search inside archive files (zip, tar.gz).
78    pub archive: bool,
79    /// Search binary files as if they were text.
80    pub binary: bool,
81}
82
83/// Query executor that searches through an open index and verifies
84/// candidate files against the original regex.
85pub struct Executor<'a> {
86    index: &'a Reader,
87}
88
89impl<'a> Executor<'a> {
90    /// Create an executor backed by the given index reader.
91    #[must_use]
92    pub const fn new(index: &'a Reader) -> Self {
93        Self { index }
94    }
95
96    /// Execute a query plan against the index.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if I/O fails when reading index sections, if posted
101    /// data is corrupted, or if file content cannot be read during verification.
102    pub fn execute(
103        &self,
104        plan: &QueryPlan,
105        options: &QueryOptions,
106    ) -> Result<(Vec<Match>, QueryStats)> {
107        match plan {
108            QueryPlan::Literal {
109                pattern,
110                trigrams,
111                regex,
112            } => self.execute_literal(pattern, trigrams, regex, options),
113            QueryPlan::RegexWithLiterals {
114                regex,
115                required_trigram_sets,
116            } => self.execute_regex_indexed(regex, required_trigram_sets, options),
117            QueryPlan::CaseInsensitive {
118                regex,
119                trigram_groups,
120            } => Ok(self.execute_case_insensitive(regex, trigram_groups, options)),
121            QueryPlan::FullScan { regex } => Ok(self.execute_full_scan(regex, options)),
122        }
123    }
124
125    #[allow(clippy::as_conversions)] // match counts: len()→u32 fits within range
126    #[allow(clippy::indexing_slicing)] // infos sorted+checked: .get(0) always valid
127    fn execute_literal(
128        &self,
129        _pattern: &[u8],
130        trigrams: &[Trigram],
131        regex: &Regex,
132        options: &QueryOptions,
133    ) -> Result<(Vec<Match>, QueryStats)> {
134        let mut stats = QueryStats::default();
135
136        let mut infos = Vec::new();
137        for &tri in trigrams {
138            stats.trigrams_queried += 1;
139            if let Some(info) = self.index.get_trigram(tri) {
140                infos.push((tri, info));
141            } else {
142                return Ok((vec![], stats));
143            }
144        }
145
146        // Sort by doc_frequency (rarest first)
147        infos.sort_by_key(|(_, info)| info.doc_frequency);
148
149        // ── Step 1: Decode rarest posting list ──
150        let (_, rarest_info) = &infos[0];
151        let postings = self.index.decode_postings(rarest_info)?;
152        stats.posting_lists_decoded += 1;
153
154        let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
155
156        // ── Step 2: Intersect with next rarest lists if candidate set is large ──
157        // Only decode up to 3 lists to avoid excessive I/O
158        for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
159            if candidates.len() < 100 {
160                break;
161            }
162
163            let next_postings = self.index.decode_postings(info)?;
164            stats.posting_lists_decoded += 1;
165
166            let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
167            candidates.retain(|fid| next_set.contains(fid));
168        }
169
170        // ── Step 3: Filter remaining using Bloom filters ──
171        for &(tri, _) in &infos[1..] {
172            if candidates.is_empty() {
173                break;
174            }
175            candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
176        }
177
178        stats.candidate_files = candidates.len() as u32;
179
180        // Parallel verification
181        let files_verified = AtomicU32::new(0);
182        let bytes_verified = std::sync::atomic::AtomicU64::new(0);
183        let matches_found = AtomicU32::new(0);
184
185        let candidate_list: Vec<u32> = candidates.into_iter().collect();
186
187        let mut all_matches: Vec<Match> = candidate_list
188            .into_par_iter()
189            .filter_map(|fid| {
190                if options.max_results > 0
191                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
192                {
193                    return None;
194                }
195
196                let file_info = self.index.get_file(fid).ok()?;
197
198                // Filter by extension
199                if !options.type_filter.is_empty() {
200                    let ext = file_info
201                        .path
202                        .extension()
203                        .and_then(|e: &std::ffi::OsStr| e.to_str())
204                        .unwrap_or("");
205                    if !options.type_filter.iter().any(|e: &String| e == ext) {
206                        return None;
207                    }
208                }
209
210                files_verified.fetch_add(1, Ordering::Relaxed);
211                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
212
213                let matches = Self::verify_file(&file_info, regex, options).ok()?;
214                matches_found.fetch_add(matches.len() as u32, Ordering::Relaxed);
215                Some(matches)
216            })
217            .flatten()
218            .collect();
219
220        stats.files_verified = files_verified.into_inner();
221        stats.bytes_verified = bytes_verified.into_inner();
222
223        if options.max_results > 0 && all_matches.len() > options.max_results {
224            all_matches.truncate(options.max_results);
225        }
226
227        stats.total_matches = all_matches.len() as u32;
228
229        Ok((all_matches, stats))
230    }
231
232    #[allow(clippy::as_conversions)] // match counts: len()→u32 fits within range
233    #[allow(clippy::indexing_slicing)] // infos sorted+checked: .get(0) always valid
234    fn execute_regex_indexed(
235        &self,
236        regex: &Regex,
237        required_trigram_sets: &[Vec<Trigram>],
238        options: &QueryOptions,
239    ) -> Result<(Vec<Match>, QueryStats)> {
240        let mut stats = QueryStats::default();
241
242        // For each required literal fragment, find candidate files
243        let mut fragment_candidates = Vec::new();
244        for trigram_set in required_trigram_sets {
245            let mut infos = Vec::new();
246            for &tri in trigram_set {
247                stats.trigrams_queried += 1;
248                if let Some(info) = self.index.get_trigram(tri) {
249                    infos.push((tri, info));
250                } else {
251                    return Ok((vec![], stats));
252                }
253            }
254
255            infos.sort_by_key(|(_, info)| info.doc_frequency);
256
257            // Intersection within fragment
258            let (_, rarest_info) = &infos[0];
259            let postings = self.index.decode_postings(rarest_info)?;
260            stats.posting_lists_decoded += 1;
261            let mut set_candidates: HashSet<u32> =
262                postings.entries.iter().map(|e| e.file_id).collect();
263
264            // Intersect with up to 2 more lists if large
265            for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
266                if set_candidates.len() < 100 {
267                    break;
268                }
269                let next_postings = self.index.decode_postings(info)?;
270                stats.posting_lists_decoded += 1;
271                let next_set: HashSet<u32> =
272                    next_postings.entries.iter().map(|e| e.file_id).collect();
273                set_candidates.retain(|fid| next_set.contains(fid));
274            }
275
276            for &(tri, _) in &infos[1..] {
277                set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
278            }
279            fragment_candidates.push(set_candidates);
280        }
281
282        // Intersect candidates from all fragments
283        let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
284            Some(c) => c,
285            None => return Ok((vec![], stats)),
286        };
287        for set in fragment_candidates {
288            final_candidates.retain(|fid: &u32| set.contains(fid));
289        }
290
291        stats.candidate_files = final_candidates.len() as u32;
292
293        let files_verified = AtomicU32::new(0);
294        let bytes_verified = AtomicU64::new(0);
295        let matches_found = AtomicU32::new(0);
296
297        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
298
299        let mut all_matches: Vec<Match> = candidate_list
300            .into_par_iter()
301            .filter_map(|fid| {
302                if options.max_results > 0
303                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
304                {
305                    return None;
306                }
307
308                let file_info = self.index.get_file(fid).ok()?;
309
310                // Filter by extension
311                if !options.type_filter.is_empty() {
312                    let ext = file_info
313                        .path
314                        .extension()
315                        .and_then(|e: &std::ffi::OsStr| e.to_str())
316                        .unwrap_or("");
317                    if !options.type_filter.iter().any(|e: &String| e == ext) {
318                        return None;
319                    }
320                }
321
322                files_verified.fetch_add(1, Ordering::Relaxed);
323                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
324
325                let file_matches = Self::verify_file(&file_info, regex, options).ok()?;
326                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
327                Some(file_matches)
328            })
329            .flatten()
330            .collect();
331
332        stats.files_verified = files_verified.into_inner();
333        stats.bytes_verified = bytes_verified.into_inner();
334
335        if options.max_results > 0 && all_matches.len() > options.max_results {
336            all_matches.truncate(options.max_results);
337        }
338
339        stats.total_matches = all_matches.len() as u32;
340        Ok((all_matches, stats))
341    }
342
343    #[allow(clippy::as_conversions)] // match counts: len()→u32 fits within range
344    fn execute_case_insensitive(
345        &self,
346        regex: &Regex,
347        trigram_groups: &[Vec<Trigram>],
348        options: &QueryOptions,
349    ) -> (Vec<Match>, QueryStats) {
350        let mut stats = QueryStats::default();
351
352        // For each position group: UNION posting lists of all variants found
353        let mut group_candidates = Vec::new();
354        for group in trigram_groups {
355            let mut union_set: HashSet<u32> = HashSet::new();
356            for &tri in group {
357                stats.trigrams_queried += 1;
358                if let Some(info) = self.index.get_trigram(tri)
359                    && let Ok(postings) = self.index.decode_postings(&info)
360                {
361                    stats.posting_lists_decoded += 1;
362                    for entry in &postings.entries {
363                        union_set.insert(entry.file_id);
364                    }
365                }
366                // Missing variant = skip, not abort
367            }
368            if !union_set.is_empty() {
369                group_candidates.push(union_set);
370            }
371        }
372
373        // Intersect across position groups
374        let final_candidates = if let Some(mut base) = group_candidates.pop() {
375            for set in group_candidates {
376                base.retain(|fid| set.contains(fid));
377            }
378            base
379        } else {
380            // No trigrams found at all — fall back to all files
381            let all: HashSet<u32> = (0..self.index.header.file_count).collect();
382            all
383        };
384
385        stats.candidate_files = final_candidates.len() as u32;
386
387        let files_verified = AtomicU32::new(0);
388        let bytes_verified = AtomicU64::new(0);
389        let matches_found = AtomicU32::new(0);
390
391        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
392
393        let mut all_matches: Vec<Match> = candidate_list
394            .into_par_iter()
395            .filter_map(|fid| {
396                if options.max_results > 0
397                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
398                {
399                    return None;
400                }
401
402                let file_info = self.index.get_file(fid).ok()?;
403
404                if !options.type_filter.is_empty() {
405                    let ext = file_info
406                        .path
407                        .extension()
408                        .and_then(|e| e.to_str())
409                        .unwrap_or("");
410                    if !options.type_filter.iter().any(|e| e == ext) {
411                        return None;
412                    }
413                }
414
415                files_verified.fetch_add(1, Ordering::Relaxed);
416                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
417
418                let file_matches = Self::verify_file(&file_info, regex, options).ok()?;
419                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
420                Some(file_matches)
421            })
422            .flatten()
423            .collect();
424
425        stats.files_verified = files_verified.into_inner();
426        stats.bytes_verified = bytes_verified.into_inner();
427
428        if options.max_results > 0 && all_matches.len() > options.max_results {
429            all_matches.truncate(options.max_results);
430        }
431
432        stats.total_matches = all_matches.len() as u32;
433        (all_matches, stats)
434    }
435
436    #[allow(clippy::as_conversions)] // match counts: len()→u32 fits within range
437    fn execute_full_scan(
438        &self,
439        regex: &Regex,
440        options: &QueryOptions,
441    ) -> (Vec<Match>, QueryStats) {
442        let stats_candidate_files = self.index.header.file_count;
443
444        let files_verified = AtomicU32::new(0);
445        let bytes_verified = AtomicU64::new(0);
446        let matches_found = AtomicU32::new(0);
447
448        let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
449            .into_par_iter()
450            .filter_map(|fid| {
451                if options.max_results > 0
452                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
453                {
454                    return None;
455                }
456
457                let file_info = self.index.get_file(fid).ok()?;
458
459                // Filter by extension
460                if !options.type_filter.is_empty() {
461                    let ext = file_info
462                        .path
463                        .extension()
464                        .and_then(|e: &std::ffi::OsStr| e.to_str())
465                        .unwrap_or("");
466                    if !options.type_filter.iter().any(|e: &String| e == ext) {
467                        return None;
468                    }
469                }
470
471                files_verified.fetch_add(1, Ordering::Relaxed);
472                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
473
474                let file_matches = Self::verify_file(&file_info, regex, options).ok()?;
475                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
476                Some(file_matches)
477            })
478            .flatten()
479            .collect();
480
481        if options.max_results > 0 && all_matches.len() > options.max_results {
482            all_matches.truncate(options.max_results);
483        }
484
485        let stats = QueryStats {
486            candidate_files: stats_candidate_files,
487            files_verified: files_verified.into_inner(),
488            bytes_verified: bytes_verified.into_inner(),
489            total_matches: all_matches.len() as u32,
490            ..Default::default()
491        };
492        (all_matches, stats)
493    }
494
495    /// Exposed for integration testing of the streaming logic.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the file content cannot be read or if
500    /// regex matching operations fail.
501    pub fn verify_stream_for_test<R: Read>(
502        &self,
503        reader: R,
504        path: &Path,
505        regex: &Regex,
506        options: &QueryOptions,
507    ) -> Result<Vec<Match>> {
508        Self::verify_stream(reader, path, regex, options)
509    }
510
511    #[allow(clippy::as_conversions)] // line.len()→u64 fits within range
512    fn verify_stream<R: Read>(
513        reader: R,
514        path: &Path,
515        regex: &Regex,
516        options: &QueryOptions,
517    ) -> Result<Vec<Match>> {
518        let mut buf_reader = BufReader::new(reader);
519        let mut matches = Vec::new();
520        let mut line_number = 0u32;
521        let mut byte_offset = 0u64;
522
523        // Binary check on first 8KB
524        {
525            let buffer = buf_reader.fill_buf()?;
526            let is_bin = is_binary(buffer);
527            if is_bin && !options.binary {
528                return Ok(vec![]);
529            }
530        }
531
532        let mut line = String::new();
533        let mut context_before = std::collections::VecDeque::new();
534        let mut pending_matches: Vec<Match> = Vec::new();
535
536        while buf_reader.read_line(&mut line)? > 0 {
537            line_number += 1;
538            let line_len = line.len() as u64;
539            let trimmed_line = line.trim_end().to_string();
540
541            // Fill context_after for pending matches
542            for m in &mut pending_matches {
543                if m.context_after.len() < options.context_lines {
544                    m.context_after.push(trimmed_line.clone());
545                }
546            }
547
548            // Move completed matches to final list
549            let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
550                .into_iter()
551                .partition(|m| m.context_after.len() >= options.context_lines);
552            matches.extend(completed);
553            pending_matches = still_pending;
554
555        if let Some(m) = regex.find(&line) {
556            let context_before_vec: Vec<String> = context_before
557                .iter()
558                .cloned()
559                .collect();
560
561                let new_match = Match {
562                    file_path: path.to_path_buf(),
563                    line_number,
564                    col: (m.start() + 1) as u32,
565                    line_content: if options.count_only {
566                        String::new()
567                    } else {
568                        trimmed_line.clone()
569                    },
570                    byte_offset: byte_offset + m.start() as u64,
571                    context_before: context_before_vec,
572                    context_after: vec![],
573                    is_binary: false,
574                };
575
576                if options.context_lines > 0 {
577                    pending_matches.push(new_match);
578                } else {
579                    matches.push(new_match);
580                }
581
582                if options.max_results > 0
583                    && (matches.len() + pending_matches.len()) >= options.max_results
584                    && (pending_matches.is_empty() || matches.len() >= options.max_results)
585                {
586                    break;
587                }
588            }
589
590            if options.context_lines > 0 {
591                context_before.push_back(trimmed_line.clone());
592                if context_before.len() > options.context_lines {
593                    context_before.pop_front();
594                }
595            }
596
597            byte_offset += line_len;
598            line.clear();
599        }
600
601        matches.extend(pending_matches);
602        Ok(matches)
603    }
604
605    fn verify_file(
606        info: &FileInfo,
607        regex: &Regex,
608        options: &QueryOptions,
609    ) -> Result<Vec<Match>> {
610        let file = File::open(&info.path)?;
611        let mmap = unsafe { memmap2::Mmap::map(&file)? };
612
613        if options.decompress
614            && let Some(reader) = maybe_decompress(&info.path, &mmap)?
615        {
616            return Self::verify_stream(reader, info.path.as_ref(), regex, options);
617        }
618
619        // Default to streaming via Cursor for uncompressed files to ensure constant memory (R-02)
620        Self::verify_stream(Cursor::new(&mmap[..]), info.path.as_ref(), regex, options)
621    }
622}