Skip to main content

ix/
executor.rs

1//! Query executor — search through the index and verify results.
2//!
3//! Handles literal searches, indexed regex, and full scans.
4
5use crate::decompress::maybe_decompress;
6use crate::error::Result;
7use crate::format::is_binary;
8use crate::planner::QueryPlan;
9use crate::reader::{FileInfo, Reader};
10use crate::trigram::Trigram;
11use rayon::prelude::*;
12use regex::Regex;
13use std::collections::HashSet;
14use std::fs::File;
15use std::io::{BufRead, BufReader, Cursor, Read};
16use std::path::PathBuf;
17use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
18
19#[derive(Debug)]
20pub struct Match {
21    pub file_path: PathBuf,
22    pub line_number: u32,
23    pub col: u32,
24    pub line_content: String,
25    pub byte_offset: u64,
26    pub context_before: Vec<String>,
27    pub context_after: Vec<String>,
28    pub is_binary: bool,
29}
30
31#[derive(Default, Debug)]
32pub struct QueryStats {
33    pub trigrams_queried: u32,
34    pub posting_lists_decoded: u32,
35    pub candidate_files: u32,
36    pub files_verified: u32,
37    pub bytes_verified: u64,
38    pub total_matches: u32,
39}
40
41#[derive(Debug, Default)]
42pub struct QueryOptions {
43    pub count_only: bool,
44    pub files_only: bool,
45    pub max_results: usize,
46    pub type_filter: Vec<String>,
47    pub context_lines: usize,
48    pub decompress: bool,
49    pub threads: usize,
50    pub multiline: bool,
51    pub archive: bool,
52    pub binary: bool,
53}
54
55pub struct Executor<'a> {
56    index: &'a Reader,
57}
58
59impl<'a> Executor<'a> {
60    pub fn new(index: &'a Reader) -> Self {
61        Self { index }
62    }
63
64    pub fn execute(
65        &self,
66        plan: &QueryPlan,
67        options: &QueryOptions,
68    ) -> Result<(Vec<Match>, QueryStats)> {
69        match plan {
70            QueryPlan::Literal { pattern, trigrams } => {
71                self.execute_literal(pattern, trigrams, options)
72            }
73            QueryPlan::RegexWithLiterals {
74                regex,
75                required_trigram_sets,
76            } => self.execute_regex_indexed(regex, required_trigram_sets, options),
77            QueryPlan::CaseInsensitive {
78                regex,
79                trigram_groups,
80            } => self.execute_case_insensitive(regex, trigram_groups, options),
81            QueryPlan::FullScan { regex } => self.execute_full_scan(regex, options),
82        }
83    }
84
85    fn execute_literal(
86        &self,
87        pattern: &[u8],
88        trigrams: &[Trigram],
89        options: &QueryOptions,
90    ) -> Result<(Vec<Match>, QueryStats)> {
91        let mut stats = QueryStats::default();
92
93        let mut infos = Vec::new();
94        for &tri in trigrams {
95            stats.trigrams_queried += 1;
96            if let Some(info) = self.index.get_trigram(tri) {
97                infos.push((tri, info));
98            } else {
99                return Ok((vec![], stats));
100            }
101        }
102
103        // Sort by doc_frequency (rarest first)
104        infos.sort_by_key(|(_, info)| info.doc_frequency);
105
106        // ── Step 1: Decode rarest posting list ──
107        let (_, rarest_info) = &infos[0];
108        let postings = self.index.decode_postings(rarest_info)?;
109        stats.posting_lists_decoded += 1;
110
111        let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
112
113        // ── Step 2: Intersect with next rarest lists if candidate set is large ──
114        // Only decode up to 3 lists to avoid excessive I/O
115        for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
116            if candidates.len() < 100 {
117                break;
118            }
119
120            let next_postings = self.index.decode_postings(info)?;
121            stats.posting_lists_decoded += 1;
122
123            let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
124            candidates.retain(|fid| next_set.contains(fid));
125        }
126
127        // ── Step 3: Filter remaining using Bloom filters ──
128        for &(tri, _) in &infos[1..] {
129            if candidates.is_empty() {
130                break;
131            }
132            candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
133        }
134
135        stats.candidate_files = candidates.len() as u32;
136
137        let regex = Regex::new(&regex::escape(&String::from_utf8_lossy(pattern)))?;
138
139        // Parallel verification
140        let files_verified = AtomicU32::new(0);
141        let bytes_verified = std::sync::atomic::AtomicU64::new(0);
142        let matches_found = AtomicU32::new(0);
143
144        let candidate_list: Vec<u32> = candidates.into_iter().collect();
145
146        let mut all_matches: Vec<Match> = candidate_list
147            .into_par_iter()
148            .filter_map(|fid| {
149                if options.max_results > 0
150                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
151                {
152                    return None;
153                }
154
155                let file_info = self.index.get_file(fid).ok()?;
156
157                // Filter by extension
158                if !options.type_filter.is_empty() {
159                    let ext = file_info
160                        .path
161                        .extension()
162                        .and_then(|e: &std::ffi::OsStr| e.to_str())
163                        .unwrap_or("");
164                    if !options.type_filter.iter().any(|e: &String| e == ext) {
165                        return None;
166                    }
167                }
168
169                files_verified.fetch_add(1, Ordering::Relaxed);
170                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
171
172                let matches = self.verify_file(&file_info, &regex, options).ok()?;
173                matches_found.fetch_add(matches.len() as u32, Ordering::Relaxed);
174                Some(matches)
175            })
176            .flatten()
177            .collect();
178
179        stats.files_verified = files_verified.into_inner();
180        stats.bytes_verified = bytes_verified.into_inner();
181
182        if options.max_results > 0 && all_matches.len() > options.max_results {
183            all_matches.truncate(options.max_results);
184        }
185
186        stats.total_matches = all_matches.len() as u32;
187
188        Ok((all_matches, stats))
189    }
190
191    fn execute_regex_indexed(
192        &self,
193        regex: &Regex,
194        required_trigram_sets: &[Vec<Trigram>],
195        options: &QueryOptions,
196    ) -> Result<(Vec<Match>, QueryStats)> {
197        let mut stats = QueryStats::default();
198
199        // For each required literal fragment, find candidate files
200        let mut fragment_candidates = Vec::new();
201        for trigram_set in required_trigram_sets {
202            let mut infos = Vec::new();
203            for &tri in trigram_set {
204                stats.trigrams_queried += 1;
205                if let Some(info) = self.index.get_trigram(tri) {
206                    infos.push((tri, info));
207                } else {
208                    return Ok((vec![], stats));
209                }
210            }
211
212            infos.sort_by_key(|(_, info)| info.doc_frequency);
213
214            // Intersection within fragment
215            let (_, rarest_info) = &infos[0];
216            let postings = self.index.decode_postings(rarest_info)?;
217            stats.posting_lists_decoded += 1;
218            let mut set_candidates: HashSet<u32> =
219                postings.entries.iter().map(|e| e.file_id).collect();
220
221            // Intersect with up to 2 more lists if large
222            for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
223                if set_candidates.len() < 100 {
224                    break;
225                }
226                let next_postings = self.index.decode_postings(info)?;
227                stats.posting_lists_decoded += 1;
228                let next_set: HashSet<u32> =
229                    next_postings.entries.iter().map(|e| e.file_id).collect();
230                set_candidates.retain(|fid| next_set.contains(fid));
231            }
232
233            for &(tri, _) in &infos[1..] {
234                set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
235            }
236            fragment_candidates.push(set_candidates);
237        }
238
239        // Intersect candidates from all fragments
240        let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
241            Some(c) => c,
242            None => return Ok((vec![], stats)),
243        };
244        for set in fragment_candidates {
245            final_candidates.retain(|fid: &u32| set.contains(fid));
246        }
247
248        stats.candidate_files = final_candidates.len() as u32;
249
250        let files_verified = AtomicU32::new(0);
251        let bytes_verified = AtomicU64::new(0);
252        let matches_found = AtomicU32::new(0);
253
254        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
255
256        let mut all_matches: Vec<Match> = candidate_list
257            .into_par_iter()
258            .filter_map(|fid| {
259                if options.max_results > 0
260                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
261                {
262                    return None;
263                }
264
265                let file_info = self.index.get_file(fid).ok()?;
266
267                // Filter by extension
268                if !options.type_filter.is_empty() {
269                    let ext = file_info
270                        .path
271                        .extension()
272                        .and_then(|e: &std::ffi::OsStr| e.to_str())
273                        .unwrap_or("");
274                    if !options.type_filter.iter().any(|e: &String| e == ext) {
275                        return None;
276                    }
277                }
278
279                files_verified.fetch_add(1, Ordering::Relaxed);
280                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
281
282                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
283                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
284                Some(file_matches)
285            })
286            .flatten()
287            .collect();
288
289        stats.files_verified = files_verified.into_inner();
290        stats.bytes_verified = bytes_verified.into_inner();
291
292        if options.max_results > 0 && all_matches.len() > options.max_results {
293            all_matches.truncate(options.max_results);
294        }
295
296        stats.total_matches = all_matches.len() as u32;
297        Ok((all_matches, stats))
298    }
299
300    fn execute_case_insensitive(
301        &self,
302        regex: &Regex,
303        trigram_groups: &[Vec<Trigram>],
304        options: &QueryOptions,
305    ) -> Result<(Vec<Match>, QueryStats)> {
306        let mut stats = QueryStats::default();
307
308        // For each position group: UNION posting lists of all variants found
309        let mut group_candidates = Vec::new();
310        for group in trigram_groups {
311            let mut union_set: HashSet<u32> = HashSet::new();
312            for &tri in group {
313                stats.trigrams_queried += 1;
314                if let Some(info) = self.index.get_trigram(tri)
315                    && let Ok(postings) = self.index.decode_postings(&info)
316                {
317                    stats.posting_lists_decoded += 1;
318                    for entry in &postings.entries {
319                        union_set.insert(entry.file_id);
320                    }
321                }
322                // Missing variant = skip, not abort
323            }
324            if !union_set.is_empty() {
325                group_candidates.push(union_set);
326            }
327        }
328
329        // Intersect across position groups
330        let final_candidates = if let Some(mut base) = group_candidates.pop() {
331            for set in group_candidates {
332                base.retain(|fid| set.contains(fid));
333            }
334            base
335        } else {
336            // No trigrams found at all — fall back to all files
337            let all: HashSet<u32> = (0..self.index.header.file_count).collect();
338            all
339        };
340
341        stats.candidate_files = final_candidates.len() as u32;
342
343        let files_verified = AtomicU32::new(0);
344        let bytes_verified = AtomicU64::new(0);
345        let matches_found = AtomicU32::new(0);
346
347        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
348
349        let mut all_matches: Vec<Match> = candidate_list
350            .into_par_iter()
351            .filter_map(|fid| {
352                if options.max_results > 0
353                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
354                {
355                    return None;
356                }
357
358                let file_info = self.index.get_file(fid).ok()?;
359
360                if !options.type_filter.is_empty() {
361                    let ext = file_info
362                        .path
363                        .extension()
364                        .and_then(|e| e.to_str())
365                        .unwrap_or("");
366                    if !options.type_filter.iter().any(|e| e == ext) {
367                        return None;
368                    }
369                }
370
371                files_verified.fetch_add(1, Ordering::Relaxed);
372                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
373
374                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
375                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
376                Some(file_matches)
377            })
378            .flatten()
379            .collect();
380
381        stats.files_verified = files_verified.into_inner();
382        stats.bytes_verified = bytes_verified.into_inner();
383
384        if options.max_results > 0 && all_matches.len() > options.max_results {
385            all_matches.truncate(options.max_results);
386        }
387
388        stats.total_matches = all_matches.len() as u32;
389        Ok((all_matches, stats))
390    }
391
392    fn execute_full_scan(
393        &self,
394        regex: &Regex,
395        options: &QueryOptions,
396    ) -> Result<(Vec<Match>, QueryStats)> {
397        let stats_candidate_files = self.index.header.file_count;
398
399        let files_verified = AtomicU32::new(0);
400        let bytes_verified = AtomicU64::new(0);
401        let matches_found = AtomicU32::new(0);
402
403        let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
404            .into_par_iter()
405            .filter_map(|fid| {
406                if options.max_results > 0
407                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
408                {
409                    return None;
410                }
411
412                let file_info = self.index.get_file(fid).ok()?;
413
414                // Filter by extension
415                if !options.type_filter.is_empty() {
416                    let ext = file_info
417                        .path
418                        .extension()
419                        .and_then(|e: &std::ffi::OsStr| e.to_str())
420                        .unwrap_or("");
421                    if !options.type_filter.iter().any(|e: &String| e == ext) {
422                        return None;
423                    }
424                }
425
426                files_verified.fetch_add(1, Ordering::Relaxed);
427                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
428
429                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
430                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
431                Some(file_matches)
432            })
433            .flatten()
434            .collect();
435
436        if options.max_results > 0 && all_matches.len() > options.max_results {
437            all_matches.truncate(options.max_results);
438        }
439
440        let stats = QueryStats {
441            candidate_files: stats_candidate_files,
442            files_verified: files_verified.into_inner(),
443            bytes_verified: bytes_verified.into_inner(),
444            total_matches: all_matches.len() as u32,
445            ..Default::default()
446        };
447        Ok((all_matches, stats))
448    }
449
450    /// Exposed for integration testing of the streaming logic.
451    pub fn verify_stream_for_test<R: Read>(
452        &self,
453        reader: R,
454        path: PathBuf,
455        regex: &Regex,
456        options: &QueryOptions,
457    ) -> Result<Vec<Match>> {
458        self.verify_stream(reader, path, regex, options)
459    }
460
461    fn verify_stream<R: Read>(
462        &self,
463        reader: R,
464        path: PathBuf,
465        regex: &Regex,
466        options: &QueryOptions,
467    ) -> Result<Vec<Match>> {
468        let mut buf_reader = BufReader::new(reader);
469        let mut matches = Vec::new();
470        let mut line_number = 0u32;
471        let mut byte_offset = 0u64;
472
473        // Binary check on first 8KB
474        {
475            let buffer = buf_reader.fill_buf()?;
476            let is_bin = is_binary(buffer);
477            if is_bin && !options.binary {
478                return Ok(vec![]);
479            }
480        }
481
482        let mut line = String::new();
483        let mut context_before = std::collections::VecDeque::new();
484        let mut pending_matches: Vec<Match> = Vec::new();
485
486        while buf_reader.read_line(&mut line)? > 0 {
487            line_number += 1;
488            let line_len = line.len() as u64;
489            let trimmed_line = line.trim_end().to_string();
490
491            // Fill context_after for pending matches
492            for m in &mut pending_matches {
493                if m.context_after.len() < options.context_lines {
494                    m.context_after.push(trimmed_line.clone());
495                }
496            }
497
498            // Move completed matches to final list
499            let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
500                .into_iter()
501                .partition(|m| m.context_after.len() >= options.context_lines);
502            matches.extend(completed);
503            pending_matches = still_pending;
504
505            if let Some(m) = regex.find(&line) {
506                let context_before_vec: Vec<String> = context_before
507                    .iter()
508                    .map(|s: &String| s.trim_end().to_string())
509                    .collect();
510
511                let new_match = Match {
512                    file_path: path.clone(),
513                    line_number,
514                    col: (m.start() + 1) as u32,
515                    line_content: if options.count_only {
516                        String::new()
517                    } else {
518                        trimmed_line.clone()
519                    },
520                    byte_offset: byte_offset + m.start() as u64,
521                    context_before: context_before_vec,
522                    context_after: vec![],
523                    is_binary: false,
524                };
525
526                if options.context_lines > 0 {
527                    pending_matches.push(new_match);
528                } else {
529                    matches.push(new_match);
530                }
531
532                if options.max_results > 0
533                    && (matches.len() + pending_matches.len()) >= options.max_results
534                    && (pending_matches.is_empty() || matches.len() >= options.max_results)
535                {
536                    break;
537                }
538            }
539
540            if options.context_lines > 0 {
541                context_before.push_back(line.clone());
542                if context_before.len() > options.context_lines {
543                    context_before.pop_front();
544                }
545            }
546
547            byte_offset += line_len;
548            line.clear();
549        }
550
551        matches.extend(pending_matches);
552        Ok(matches)
553    }
554
555    fn verify_file(
556        &self,
557        info: &FileInfo,
558        regex: &Regex,
559        options: &QueryOptions,
560    ) -> Result<Vec<Match>> {
561        let file = File::open(&info.path)?;
562        let mmap = unsafe { memmap2::Mmap::map(&file)? };
563
564        if options.decompress
565            && let Some(reader) = maybe_decompress(&info.path, &mmap)?
566        {
567            return self.verify_stream(reader, info.path.clone(), regex, options);
568        }
569
570        // Default to streaming via Cursor for uncompressed files to ensure constant memory (R-02)
571        self.verify_stream(Cursor::new(&mmap[..]), info.path.clone(), regex, options)
572    }
573}