Skip to main content

ix/
executor.rs

1//! Query executor — search through the index and verify results.
2//!
3//! Handles literal searches, indexed regex, and full scans.
4
5use crate::decompress::maybe_decompress;
6use crate::error::Result;
7use crate::format::is_binary;
8use crate::planner::QueryPlan;
9use crate::reader::{FileInfo, Reader};
10use crate::trigram::Trigram;
11use rayon::prelude::*;
12use regex::Regex;
13use std::collections::HashSet;
14use std::fs::File;
15use std::io::{BufRead, BufReader, Cursor, Read};
16use std::path::{PathBuf};
17use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
18
19#[derive(Debug)]
20pub struct Match {
21    pub file_path: PathBuf,
22    pub line_number: u32,
23    pub col: u32,
24    pub line_content: String,
25    pub byte_offset: u64,
26    pub context_before: Vec<String>,
27    pub context_after: Vec<String>,
28    pub is_binary: bool,
29}
30
31#[derive(Default, Debug)]
32pub struct QueryStats {
33    pub trigrams_queried: u32,
34    pub posting_lists_decoded: u32,
35    pub candidate_files: u32,
36    pub files_verified: u32,
37    pub bytes_verified: u64,
38    pub total_matches: u32,
39}
40
41#[derive(Debug, Default)]
42pub struct QueryOptions {
43    pub count_only: bool,
44    pub files_only: bool,
45    pub max_results: usize,
46    pub type_filter: Vec<String>,
47    pub context_lines: usize,
48    pub decompress: bool,
49    pub threads: usize,
50    pub multiline: bool,
51    pub archive: bool,
52    pub binary: bool,
53}
54
55pub struct Executor<'a> {
56    index: &'a Reader,
57}
58
59impl<'a> Executor<'a> {
60    pub fn new(index: &'a Reader) -> Self {
61        Self { index }
62    }
63
64    pub fn execute(
65        &self,
66        plan: &QueryPlan,
67        options: &QueryOptions,
68    ) -> Result<(Vec<Match>, QueryStats)> {
69        match plan {
70            QueryPlan::Literal { pattern, trigrams } => {
71                self.execute_literal(pattern, trigrams, options)
72            }
73            QueryPlan::RegexWithLiterals {
74                regex,
75                required_trigram_sets,
76            } => self.execute_regex_indexed(regex, required_trigram_sets, options),
77            QueryPlan::CaseInsensitive {
78                regex,
79                trigram_groups,
80            } => self.execute_case_insensitive(regex, trigram_groups, options),
81            QueryPlan::FullScan { regex } => self.execute_full_scan(regex, options),
82        }
83    }
84
85    fn execute_literal(
86        &self,
87        pattern: &[u8],
88        trigrams: &[Trigram],
89        options: &QueryOptions,
90    ) -> Result<(Vec<Match>, QueryStats)> {
91        let mut stats = QueryStats::default();
92
93        let mut infos = Vec::new();
94        for &tri in trigrams {
95            stats.trigrams_queried += 1;
96            if let Some(info) = self.index.get_trigram(tri) {
97                infos.push((tri, info));
98            } else {
99                return Ok((vec![], stats));
100            }
101        }
102
103        // Sort by doc_frequency (rarest first)
104        infos.sort_by_key(|(_, info)| info.doc_frequency);
105
106        // ── Step 1: Decode rarest posting list ──
107        let (_, rarest_info) = &infos[0];
108        let postings = self.index.decode_postings(rarest_info)?;
109        stats.posting_lists_decoded += 1;
110
111        let mut candidates: HashSet<u32> = postings.entries.iter().map(|e| e.file_id).collect();
112
113        // ── Step 2: Intersect with next rarest lists if candidate set is large ──
114        // Only decode up to 3 lists to avoid excessive I/O
115        for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
116            if candidates.len() < 100 {
117                break;
118            }
119
120            let next_postings = self.index.decode_postings(info)?;
121            stats.posting_lists_decoded += 1;
122
123            let next_set: HashSet<u32> = next_postings.entries.iter().map(|e| e.file_id).collect();
124            candidates.retain(|fid| next_set.contains(fid));
125        }
126
127        // ── Step 3: Filter remaining using Bloom filters ──
128        for &(tri, _) in &infos[1..] {
129            if candidates.is_empty() {
130                break;
131            }
132            candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
133        }
134
135        stats.candidate_files = candidates.len() as u32;
136
137        let regex = Regex::new(&regex::escape(&String::from_utf8_lossy(pattern)))?;
138
139        // Parallel verification
140        let files_verified = AtomicU32::new(0);
141        let bytes_verified = std::sync::atomic::AtomicU64::new(0);
142        let matches_found = AtomicU32::new(0);
143
144        let candidate_list: Vec<u32> = candidates.into_iter().collect();
145
146        let mut all_matches: Vec<Match> = candidate_list
147            .into_par_iter()
148            .filter_map(|fid| {
149                if options.max_results > 0 && matches_found.load(Ordering::Relaxed) >= options.max_results as u32 {
150                    return None;
151                }
152
153                let file_info = self.index.get_file(fid).ok()?;
154
155                // Filter by extension
156                if !options.type_filter.is_empty() {
157                    let ext = file_info
158                        .path
159                        .extension()
160                        .and_then(|e: &std::ffi::OsStr| e.to_str())
161                        .unwrap_or("");
162                    if !options.type_filter.iter().any(|e: &String| e == ext) {
163                        return None;
164                    }
165                }
166
167                files_verified.fetch_add(1, Ordering::Relaxed);
168                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
169
170                let matches = self.verify_file(&file_info, &regex, options).ok()?;
171                matches_found.fetch_add(matches.len() as u32, Ordering::Relaxed);
172                Some(matches)
173            })
174            .flatten()
175            .collect();
176
177        stats.files_verified = files_verified.into_inner();
178        stats.bytes_verified = bytes_verified.into_inner();
179
180        if options.max_results > 0 && all_matches.len() > options.max_results {
181            all_matches.truncate(options.max_results);
182        }
183
184        stats.total_matches = all_matches.len() as u32;
185
186        Ok((all_matches, stats))
187        }
188
189
190    fn execute_regex_indexed(
191        &self,
192        regex: &Regex,
193        required_trigram_sets: &[Vec<Trigram>],
194        options: &QueryOptions,
195    ) -> Result<(Vec<Match>, QueryStats)> {
196        let mut stats = QueryStats::default();
197
198        // For each required literal fragment, find candidate files
199        let mut fragment_candidates = Vec::new();
200        for trigram_set in required_trigram_sets {
201            let mut infos = Vec::new();
202            for &tri in trigram_set {
203                stats.trigrams_queried += 1;
204                if let Some(info) = self.index.get_trigram(tri) {
205                    infos.push((tri, info));
206                } else {
207                    return Ok((vec![], stats));
208                }
209            }
210
211            infos.sort_by_key(|(_, info)| info.doc_frequency);
212
213            // Intersection within fragment
214            let (_, rarest_info) = &infos[0];
215            let postings = self.index.decode_postings(rarest_info)?;
216            stats.posting_lists_decoded += 1;
217            let mut set_candidates: HashSet<u32> =
218                postings.entries.iter().map(|e| e.file_id).collect();
219
220            // Intersect with up to 2 more lists if large
221            for (_, info) in infos.iter().take(infos.len().min(3)).skip(1) {
222                if set_candidates.len() < 100 {
223                    break;
224                }
225                let next_postings = self.index.decode_postings(info)?;
226                stats.posting_lists_decoded += 1;
227                let next_set: HashSet<u32> =
228                    next_postings.entries.iter().map(|e| e.file_id).collect();
229                set_candidates.retain(|fid| next_set.contains(fid));
230            }
231
232            for &(tri, _) in &infos[1..] {
233                set_candidates.retain(|&fid| self.index.bloom_may_contain(fid, tri));
234            }
235            fragment_candidates.push(set_candidates);
236        }
237
238        // Intersect candidates from all fragments
239        let mut final_candidates: HashSet<u32> = match fragment_candidates.pop() {
240            Some(c) => c,
241            None => return Ok((vec![], stats)),
242        };
243        for set in fragment_candidates {
244            final_candidates.retain(|fid: &u32| set.contains(fid));
245        }
246
247        stats.candidate_files = final_candidates.len() as u32;
248
249        let files_verified = AtomicU32::new(0);
250        let bytes_verified = AtomicU64::new(0);
251        let matches_found = AtomicU32::new(0);
252
253        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
254
255        let mut all_matches: Vec<Match> = candidate_list
256            .into_par_iter()
257            .filter_map(|fid| {
258                if options.max_results > 0
259                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
260                {
261                    return None;
262                }
263
264                let file_info = self.index.get_file(fid).ok()?;
265
266                // Filter by extension
267                if !options.type_filter.is_empty() {
268                    let ext = file_info
269                        .path
270                        .extension()
271                        .and_then(|e: &std::ffi::OsStr| e.to_str())
272                        .unwrap_or("");
273                    if !options.type_filter.iter().any(|e: &String| e == ext) {
274                        return None;
275                    }
276                }
277
278                files_verified.fetch_add(1, Ordering::Relaxed);
279                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
280
281                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
282                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
283                Some(file_matches)
284            })
285            .flatten()
286            .collect();
287
288        stats.files_verified = files_verified.into_inner();
289        stats.bytes_verified = bytes_verified.into_inner();
290
291        if options.max_results > 0 && all_matches.len() > options.max_results {
292            all_matches.truncate(options.max_results);
293        }
294
295        stats.total_matches = all_matches.len() as u32;
296        Ok((all_matches, stats))
297    }
298
299    fn execute_case_insensitive(
300        &self,
301        regex: &Regex,
302        trigram_groups: &[Vec<Trigram>],
303        options: &QueryOptions,
304    ) -> Result<(Vec<Match>, QueryStats)> {
305        let mut stats = QueryStats::default();
306
307        // For each position group: UNION posting lists of all variants found
308        let mut group_candidates = Vec::new();
309        for group in trigram_groups {
310            let mut union_set: HashSet<u32> = HashSet::new();
311            for &tri in group {
312                stats.trigrams_queried += 1;
313                if let Some(info) = self.index.get_trigram(tri)
314                    && let Ok(postings) = self.index.decode_postings(&info)
315                {
316                    stats.posting_lists_decoded += 1;
317                    for entry in &postings.entries {
318                        union_set.insert(entry.file_id);
319                    }
320                }
321                // Missing variant = skip, not abort
322            }
323            if !union_set.is_empty() {
324                group_candidates.push(union_set);
325            }
326        }
327
328        // Intersect across position groups
329        let final_candidates = if let Some(mut base) = group_candidates.pop() {
330            for set in group_candidates {
331                base.retain(|fid| set.contains(fid));
332            }
333            base
334        } else {
335            // No trigrams found at all — fall back to all files
336            let all: HashSet<u32> = (0..self.index.header.file_count).collect();
337            all
338        };
339
340        stats.candidate_files = final_candidates.len() as u32;
341
342        let files_verified = AtomicU32::new(0);
343        let bytes_verified = AtomicU64::new(0);
344        let matches_found = AtomicU32::new(0);
345
346        let candidate_list: Vec<u32> = final_candidates.into_iter().collect();
347
348        let mut all_matches: Vec<Match> = candidate_list
349            .into_par_iter()
350            .filter_map(|fid| {
351                if options.max_results > 0
352                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
353                {
354                    return None;
355                }
356
357                let file_info = self.index.get_file(fid).ok()?;
358
359                if !options.type_filter.is_empty() {
360                    let ext = file_info
361                        .path
362                        .extension()
363                        .and_then(|e| e.to_str())
364                        .unwrap_or("");
365                    if !options.type_filter.iter().any(|e| e == ext) {
366                        return None;
367                    }
368                }
369
370                files_verified.fetch_add(1, Ordering::Relaxed);
371                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
372
373                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
374                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
375                Some(file_matches)
376            })
377            .flatten()
378            .collect();
379
380        stats.files_verified = files_verified.into_inner();
381        stats.bytes_verified = bytes_verified.into_inner();
382
383        if options.max_results > 0 && all_matches.len() > options.max_results {
384            all_matches.truncate(options.max_results);
385        }
386
387        stats.total_matches = all_matches.len() as u32;
388        Ok((all_matches, stats))
389    }
390
391    fn execute_full_scan(
392        &self,
393        regex: &Regex,
394        options: &QueryOptions,
395    ) -> Result<(Vec<Match>, QueryStats)> {
396        let stats_candidate_files = self.index.header.file_count;
397
398        let files_verified = AtomicU32::new(0);
399        let bytes_verified = AtomicU64::new(0);
400        let matches_found = AtomicU32::new(0);
401
402        let mut all_matches: Vec<Match> = (0..self.index.header.file_count)
403            .into_par_iter()
404            .filter_map(|fid| {
405                if options.max_results > 0
406                    && matches_found.load(Ordering::Relaxed) >= options.max_results as u32
407                {
408                    return None;
409                }
410
411                let file_info = self.index.get_file(fid).ok()?;
412
413                // Filter by extension
414                if !options.type_filter.is_empty() {
415                    let ext = file_info
416                        .path
417                        .extension()
418                        .and_then(|e: &std::ffi::OsStr| e.to_str())
419                        .unwrap_or("");
420                    if !options.type_filter.iter().any(|e: &String| e == ext) {
421                        return None;
422                    }
423                }
424
425                files_verified.fetch_add(1, Ordering::Relaxed);
426                bytes_verified.fetch_add(file_info.size_bytes, Ordering::Relaxed);
427
428                let file_matches = self.verify_file(&file_info, regex, options).ok()?;
429                matches_found.fetch_add(file_matches.len() as u32, Ordering::Relaxed);
430                Some(file_matches)
431            })
432            .flatten()
433            .collect();
434
435        if options.max_results > 0 && all_matches.len() > options.max_results {
436            all_matches.truncate(options.max_results);
437        }
438
439        let stats = QueryStats {
440            candidate_files: stats_candidate_files,
441            files_verified: files_verified.into_inner(),
442            bytes_verified: bytes_verified.into_inner(),
443            total_matches: all_matches.len() as u32,
444            ..Default::default()
445        };
446        Ok((all_matches, stats))
447    }
448
449    /// Exposed for integration testing of the streaming logic.
450    pub fn verify_stream_for_test<R: Read>(
451        &self,
452        reader: R,
453        path: PathBuf,
454        regex: &Regex,
455        options: &QueryOptions,
456    ) -> Result<Vec<Match>> {
457        self.verify_stream(reader, path, regex, options)
458    }
459
460    fn verify_stream<R: Read>(
461        &self,
462        reader: R,
463        path: PathBuf,
464        regex: &Regex,
465        options: &QueryOptions,
466    ) -> Result<Vec<Match>> {
467        let mut buf_reader = BufReader::new(reader);
468        let mut matches = Vec::new();
469        let mut line_number = 0u32;
470        let mut byte_offset = 0u64;
471
472        // Binary check on first 8KB
473        {
474            let buffer = buf_reader.fill_buf()?;
475            let is_bin = is_binary(buffer);
476            if is_bin && !options.binary {
477                return Ok(vec![]);
478            }
479        }
480
481        let mut line = String::new();
482        let mut context_before = std::collections::VecDeque::new();
483        let mut pending_matches: Vec<Match> = Vec::new();
484
485        while buf_reader.read_line(&mut line)? > 0 {
486            line_number += 1;
487            let line_len = line.len() as u64;
488            let trimmed_line = line.trim_end().to_string();
489
490            // Fill context_after for pending matches
491            for m in &mut pending_matches {
492                if m.context_after.len() < options.context_lines {
493                    m.context_after.push(trimmed_line.clone());
494                }
495            }
496
497            // Move completed matches to final list
498            let (completed, still_pending): (Vec<_>, Vec<_>) = pending_matches
499                .into_iter()
500                .partition(|m| m.context_after.len() >= options.context_lines);
501            matches.extend(completed);
502            pending_matches = still_pending;
503
504            if let Some(m) = regex.find(&line) {
505                let context_before_vec: Vec<String> =
506                    context_before.iter().map(|s: &String| s.trim_end().to_string()).collect();
507
508                let new_match = Match {
509                    file_path: path.clone(),
510                    line_number,
511                    col: (m.start() + 1) as u32,
512                    line_content: if options.count_only {
513                        String::new()
514                    } else {
515                        trimmed_line.clone()
516                    },
517                    byte_offset: byte_offset + m.start() as u64,
518                    context_before: context_before_vec,
519                    context_after: vec![],
520                    is_binary: false,
521                };
522
523                if options.context_lines > 0 {
524                    pending_matches.push(new_match);
525                } else {
526                    matches.push(new_match);
527                }
528
529                if options.max_results > 0
530                    && (matches.len() + pending_matches.len()) >= options.max_results
531                    && (pending_matches.is_empty() || matches.len() >= options.max_results)
532                {
533                    break;
534                }
535            }
536
537            if options.context_lines > 0 {
538                context_before.push_back(line.clone());
539                if context_before.len() > options.context_lines {
540                    context_before.pop_front();
541                }
542            }
543
544            byte_offset += line_len;
545            line.clear();
546        }
547        
548        matches.extend(pending_matches);
549        Ok(matches)
550    }
551
552    fn verify_file(
553        &self,
554        info: &FileInfo,
555        regex: &Regex,
556        options: &QueryOptions,
557    ) -> Result<Vec<Match>> {
558        let file = File::open(&info.path)?;
559        let mmap = unsafe { memmap2::Mmap::map(&file)? };
560
561        if options.decompress
562            && let Some(reader) = maybe_decompress(&info.path, &mmap)? {
563            return self.verify_stream(reader, info.path.clone(), regex, options);
564        }
565
566        // Default to streaming via Cursor for uncompressed files to ensure constant memory (R-02)
567        self.verify_stream(Cursor::new(&mmap[..]), info.path.clone(), regex, options)
568    }
569}