Skip to main content

dci_tool/
engine.rs

1//! The in-process corpus engine.
2//!
3//! Search uses ripgrep's own crates ([`grep`], [`ignore`], [`globset`]) linked
4//! directly into the process — the same matching/walking engine as the `rg`
5//! binary, but with no subprocess and therefore no shell-injection surface.
6//!
7//! All functions here are synchronous and blocking; callers on an async runtime
8//! should dispatch them via `spawn_blocking` (the tool layer does this).
9
10use std::io;
11use std::path::Path;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::time::Instant;
15
16use globset::{Glob, GlobSet, GlobSetBuilder};
17use grep::regex::RegexMatcherBuilder;
18use grep::searcher::{BinaryDetection, Searcher, SearcherBuilder, Sink, SinkContext, SinkMatch};
19use ignore::{WalkBuilder, WalkState};
20use serde::{Deserialize, Serialize};
21
22use crate::error::{DciError, Result};
23use crate::sandbox::CorpusRoot;
24
25/// Parameters for a [`search`] call.
26#[derive(Debug, Clone)]
27pub struct SearchQuery {
28    /// The regular expression to match (ripgrep/Rust regex syntax).
29    pub pattern: String,
30    /// Optional glob restricting which files are searched (e.g. `**/*.log`).
31    pub path_glob: Option<String>,
32    /// Case-insensitive matching when `true`.
33    pub case_insensitive: bool,
34    /// Number of context lines to capture on each side of a match.
35    pub context_lines: usize,
36    /// Override for the maximum number of matches returned.
37    pub max_results: Option<usize>,
38}
39
40/// A single line emitted by [`search`], either a match or surrounding context.
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42pub struct SearchHit {
43    /// Corpus-relative path of the file.
44    pub path: String,
45    /// 1-based line number.
46    pub line: u64,
47    /// The (possibly truncated) line text, trailing newline removed.
48    pub text: String,
49    /// `true` for the matched line, `false` for surrounding context lines.
50    pub is_match: bool,
51}
52
53/// Result of a [`search`] call.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SearchResult {
56    /// Collected hits in walk order.
57    pub hits: Vec<SearchHit>,
58    /// Number of files actually searched.
59    pub files_searched: usize,
60    /// `true` if the result was capped by a limit (more matches may exist).
61    pub truncated: bool,
62}
63
64/// Run a regular-expression search across the corpus.
65///
66/// The walk and per-file search run in parallel across ripgrep's worker
67/// threads (the same engine as the `rg` binary). To keep results reproducible
68/// regardless of thread scheduling, every matching line is collected and then
69/// deterministically ordered by `(path, line, match-before-context)` before the
70/// result cap is applied. The only non-deterministic bound is
71/// `max_files_walked`, which acts purely as a runaway-corpus safety valve and
72/// is not reached on the bounded corpora used for evaluation or testing.
73pub fn search(corpus: &CorpusRoot, query: &SearchQuery) -> Result<SearchResult> {
74    let limits = corpus.limits();
75    let cap = query.max_results.unwrap_or(limits.max_results).max(1);
76
77    let matcher = RegexMatcherBuilder::new()
78        .case_insensitive(query.case_insensitive)
79        .line_terminator(Some(b'\n'))
80        .build(&query.pattern)
81        .map_err(|e| DciError::InvalidPattern(e.to_string()))?;
82
83    let glob = query.path_glob.as_deref().map(build_globset).transpose()?;
84
85    let hits: Mutex<Vec<SearchHit>> = Mutex::new(Vec::new());
86    let files_searched = AtomicUsize::new(0);
87    let files_walked = AtomicUsize::new(0);
88    let timed_out = AtomicBool::new(false);
89    let deadline = Instant::now() + limits.timeout;
90
91    // Shared references captured by each per-thread worker closure.
92    let hits_ref = &hits;
93    let files_searched_ref = &files_searched;
94    let files_walked_ref = &files_walked;
95    let timed_out_ref = &timed_out;
96    let matcher_ref = &matcher;
97    let glob_ref = &glob;
98
99    walk(corpus).build_parallel().run(|| {
100        // Each worker thread gets its own matcher clone and searcher: the
101        // matcher is cheap to clone and `Searcher` holds non-shareable buffers.
102        let matcher = matcher_ref.clone();
103        let glob = glob_ref.clone();
104        let context_lines = query.context_lines;
105        let max_line_len = limits.max_line_len;
106        let max_file_bytes = limits.max_file_bytes;
107        let max_files_walked = limits.max_files_walked;
108        let mut searcher = SearcherBuilder::new()
109            .line_number(true)
110            .before_context(context_lines)
111            .after_context(context_lines)
112            .binary_detection(BinaryDetection::quit(0))
113            .build();
114
115        Box::new(move |result| {
116            let entry = match result {
117                Ok(e) => e,
118                Err(_) => return WalkState::Continue,
119            };
120            if !entry.file_type().is_some_and(|t| t.is_file()) {
121                return WalkState::Continue;
122            }
123            let walked = files_walked_ref.fetch_add(1, Ordering::Relaxed) + 1;
124            if walked > max_files_walked {
125                return WalkState::Quit;
126            }
127            // Cooperative wall-clock cancellation: stop the walk between files
128            // once the budget is spent and flag the result as truncated, rather
129            // than running to completion on a thread the caller has abandoned.
130            if Instant::now() >= deadline {
131                timed_out_ref.store(true, Ordering::Relaxed);
132                return WalkState::Quit;
133            }
134
135            let path = entry.path();
136            let rel = corpus.relativize(path).into_owned();
137
138            if let Some(set) = &glob {
139                if !set.is_match(rel.as_str()) {
140                    return WalkState::Continue;
141                }
142            }
143            if let Ok(meta) = entry.metadata() {
144                if meta.len() > max_file_bytes {
145                    return WalkState::Continue;
146                }
147            }
148
149            // Bound each file's contribution to `cap` so a single pathological
150            // file cannot exhaust memory; the global cap is applied after merge.
151            let mut local: Vec<SearchHit> = Vec::new();
152            let mut sink = CollectSink {
153                rel: &rel,
154                hits: &mut local,
155                remaining: cap,
156                max_line_len,
157            };
158            // Search errors on a single file (e.g. permissions) are non-fatal.
159            let _ = searcher.search_path(&matcher, path, &mut sink);
160            files_searched_ref.fetch_add(1, Ordering::Relaxed);
161
162            if !local.is_empty() {
163                let mut guard = hits_ref.lock().unwrap_or_else(|e| e.into_inner());
164                guard.extend(local);
165            }
166            WalkState::Continue
167        })
168    });
169
170    let mut hits = hits.into_inner().unwrap_or_else(|e| e.into_inner());
171    let walked_total = files_walked.load(Ordering::Relaxed);
172    let collected = hits.len();
173    let truncated = walked_total > limits.max_files_walked
174        || collected >= cap
175        || timed_out.load(Ordering::Relaxed);
176
177    // Deterministic order: by path, then line, with matched lines ahead of
178    // their surrounding context at the same line number.
179    hits.sort_by(|a, b| {
180        a.path
181            .cmp(&b.path)
182            .then(a.line.cmp(&b.line))
183            .then(b.is_match.cmp(&a.is_match))
184    });
185    hits.truncate(cap);
186
187    Ok(SearchResult {
188        hits,
189        files_searched: files_searched.load(Ordering::Relaxed),
190        truncated,
191    })
192}
193
194/// Parameters for a [`find`] call.
195#[derive(Debug, Clone)]
196pub struct FindQuery {
197    /// Glob to match against corpus-relative paths (e.g. `**/*.rs`, `auth*`).
198    pub glob: String,
199    /// Override for the maximum number of paths returned.
200    pub max_results: Option<usize>,
201}
202
203/// Result of a [`find`] call.
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct FindResult {
206    /// Matching corpus-relative paths.
207    pub paths: Vec<String>,
208    /// `true` if the result was capped by a limit.
209    pub truncated: bool,
210}
211
212/// Find files whose corpus-relative path matches a glob.
213///
214/// The walk runs in parallel across ripgrep's worker threads. To keep results
215/// reproducible regardless of thread scheduling, every matching path is
216/// collected and then deterministically ordered before the result cap is
217/// applied. The only non-deterministic bound is `max_files_walked`, which acts
218/// purely as a runaway-corpus safety valve.
219pub fn find(corpus: &CorpusRoot, query: &FindQuery) -> Result<FindResult> {
220    let limits = corpus.limits();
221    let cap = query.max_results.unwrap_or(limits.max_results).max(1);
222    let set = build_globset(&query.glob)?;
223
224    let paths = Mutex::new(Vec::new());
225    let files_walked = AtomicUsize::new(0);
226    let timed_out = AtomicBool::new(false);
227    let deadline = Instant::now() + limits.timeout;
228
229    let paths_ref = &paths;
230    let files_walked_ref = &files_walked;
231    let timed_out_ref = &timed_out;
232    let set_ref = &set;
233
234    walk(corpus).build_parallel().run(|| {
235        Box::new(move |result| {
236            let entry = match result {
237                Ok(e) => e,
238                Err(_) => return WalkState::Continue,
239            };
240            if !entry.file_type().is_some_and(|t| t.is_file()) {
241                return WalkState::Continue;
242            }
243            let walked = files_walked_ref.fetch_add(1, Ordering::Relaxed) + 1;
244            if walked > limits.max_files_walked {
245                return WalkState::Quit;
246            }
247            if Instant::now() >= deadline {
248                timed_out_ref.store(true, Ordering::Relaxed);
249                return WalkState::Quit;
250            }
251
252            let rel = corpus.relativize(entry.path()).into_owned();
253            if set_ref.is_match(rel.as_str()) {
254                let mut guard = paths_ref.lock().unwrap_or_else(|e| e.into_inner());
255                guard.push(rel);
256            }
257            WalkState::Continue
258        })
259    });
260
261    let mut paths = paths.into_inner().unwrap_or_else(|e| e.into_inner());
262    let walked_total = files_walked.load(Ordering::Relaxed);
263    let collected = paths.len();
264    let truncated = walked_total > limits.max_files_walked
265        || collected > cap
266        || timed_out.load(Ordering::Relaxed);
267
268    // Deterministic selection: order all matches, then apply the cap, so the
269    // surviving subset never depends on thread scheduling.
270    paths.sort();
271    paths.truncate(cap);
272
273    Ok(FindResult { paths, truncated })
274}
275
276/// A numbered line returned by [`read_range`].
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
278pub struct NumberedLine {
279    /// 1-based line number within the file.
280    pub line: u64,
281    /// The (possibly truncated) line text.
282    pub text: String,
283}
284
285/// Result of a [`read_range`] call.
286#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct ReadResult {
288    /// Corpus-relative path that was read.
289    pub path: String,
290    /// The returned lines.
291    pub lines: Vec<NumberedLine>,
292    /// `true` if more lines exist below the returned window.
293    pub more_below: bool,
294}
295
296/// Read a bounded, line-numbered window from a single file.
297///
298/// `start_line` is 1-based (defaults to 1). `line_count` is clamped to the
299/// configured read limit.
300pub fn read_range(
301    corpus: &CorpusRoot,
302    path: &str,
303    start_line: Option<usize>,
304    line_count: Option<usize>,
305) -> Result<ReadResult> {
306    let limits = corpus.limits();
307    let resolved = corpus.resolve(path)?;
308
309    if !resolved.is_file() {
310        return Err(DciError::NotFound {
311            requested: path.to_string(),
312        });
313    }
314
315    let file = std::fs::File::open(&resolved).map_err(|e| DciError::Io {
316        path: resolved.clone(),
317        source: e,
318    })?;
319    use std::io::{BufRead, BufReader, Read};
320    let mut reader = BufReader::new(file.take(limits.max_file_bytes));
321
322    let start = start_line.unwrap_or(1).max(1);
323    let count = line_count
324        .unwrap_or(limits.max_read_lines)
325        .min(limits.max_read_lines);
326
327    let mut lines = Vec::new();
328    let mut more_below = false;
329    let mut current_idx = 0;
330    let mut line_buf = Vec::new();
331
332    while let Ok(bytes_read) = reader.read_until(b'\n', &mut line_buf) {
333        if bytes_read == 0 {
334            break;
335        }
336        current_idx += 1;
337
338        if current_idx < start {
339            line_buf.clear();
340            continue;
341        }
342        if lines.len() >= count {
343            more_below = true;
344            break;
345        }
346        
347        let raw = String::from_utf8_lossy(&line_buf);
348        lines.push(NumberedLine {
349            line: current_idx as u64,
350            text: truncate(&raw, limits.max_line_len),
351        });
352        line_buf.clear();
353    }
354
355    Ok(ReadResult {
356        path: corpus.relativize(&resolved).into_owned(),
357        lines,
358        more_below,
359    })
360}
361
362/// A directory entry returned by [`list_dir`].
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct DirEntryInfo {
365    /// Entry name (not a full path).
366    pub name: String,
367    /// `"file"`, `"dir"`, `"symlink"`, or `"other"`.
368    pub kind: String,
369    /// Size in bytes for files; `None` otherwise.
370    pub size_bytes: Option<u64>,
371}
372
373/// Result of a [`list_dir`] call.
374#[derive(Debug, Clone, Serialize, Deserialize)]
375pub struct ListResult {
376    /// Corpus-relative path of the listed directory.
377    pub path: String,
378    /// Entries sorted directories-first, then by name.
379    pub entries: Vec<DirEntryInfo>,
380    /// `true` if the listing was capped by a limit.
381    pub truncated: bool,
382}
383
384/// List the immediate entries of a directory inside the corpus.
385pub fn list_dir(corpus: &CorpusRoot, path: Option<&str>) -> Result<ListResult> {
386    let limits = corpus.limits();
387    let resolved = match path {
388        Some(p) if !p.is_empty() && p != "." => corpus.resolve(p)?,
389        _ => corpus.root().to_path_buf(),
390    };
391
392    if !resolved.is_dir() {
393        return Err(DciError::NotFound {
394            requested: path.unwrap_or(".").to_string(),
395        });
396    }
397
398    let read_dir = std::fs::read_dir(&resolved).map_err(|e| DciError::Io {
399        path: resolved.clone(),
400        source: e,
401    })?;
402
403    let mut entries = Vec::new();
404    let mut truncated = false;
405    for entry in read_dir {
406        let entry = match entry {
407            Ok(e) => e,
408            Err(_) => continue,
409        };
410        if entries.len() >= limits.max_results {
411            truncated = true;
412            break;
413        }
414        let file_type = entry.file_type().ok();
415        let (kind, size_bytes) = match file_type {
416            Some(t) if t.is_dir() => ("dir", None),
417            Some(t) if t.is_symlink() => ("symlink", None),
418            Some(t) if t.is_file() => ("file", entry.metadata().ok().map(|m| m.len())),
419            _ => ("other", None),
420        };
421        entries.push(DirEntryInfo {
422            name: entry.file_name().to_string_lossy().into_owned(),
423            kind: kind.to_string(),
424            size_bytes,
425        });
426    }
427
428    entries.sort_by(|a, b| {
429        let rank = |k: &str| if k == "dir" { 0 } else { 1 };
430        rank(&a.kind)
431            .cmp(&rank(&b.kind))
432            .then_with(|| a.name.cmp(&b.name))
433    });
434
435    Ok(ListResult {
436        path: corpus.relativize(&resolved).into_owned(),
437        entries,
438        truncated,
439    })
440}
441
442/// Enumerate every file in the corpus as a corpus-relative path, honoring the
443/// walk limits (gitignore, hidden, `max_files_walked`).
444///
445/// Used by corpus-wide consumers such as the evaluation vector baseline, which
446/// must materialize the whole corpus to embed it.
447pub fn list_files(corpus: &CorpusRoot) -> Result<Vec<String>> {
448    let limits = corpus.limits();
449    let mut paths = Vec::new();
450    for entry in walk(corpus).build() {
451        if paths.len() >= limits.max_files_walked {
452            break;
453        }
454        let entry = match entry {
455            Ok(e) => e,
456            Err(_) => continue,
457        };
458        if entry.file_type().is_some_and(|t| t.is_file()) {
459            paths.push(corpus.relativize(entry.path()).into_owned());
460        }
461    }
462    paths.sort();
463    Ok(paths)
464}
465
466/// Read a whole file (bounded by `max_file_bytes`) as UTF-8 (lossy), resolving
467/// the path through the corpus jail.
468pub fn read_document(corpus: &CorpusRoot, path: &str) -> Result<String> {
469    let resolved = corpus.resolve(path)?;
470    if !resolved.is_file() {
471        return Err(DciError::NotFound {
472            requested: path.to_string(),
473        });
474    }
475    read_file_bounded(&resolved, corpus.limits().max_file_bytes)
476}
477
478// --- internals ---------------------------------------------------------------
479
480/// Build a gitignore-aware walker honoring the corpus limits.
481fn walk(corpus: &CorpusRoot) -> WalkBuilder {
482    let limits = corpus.limits();
483    let respect = limits.respect_gitignore;
484    let mut builder = WalkBuilder::new(corpus.root());
485    builder
486        .standard_filters(respect)
487        .git_ignore(respect)
488        .git_global(respect)
489        .git_exclude(respect)
490        .ignore(respect)
491        .parents(respect)
492        // Honor `.gitignore` files even when the corpus is not itself a git
493        // repository; otherwise ignore rules would be silently skipped.
494        .require_git(false)
495        .hidden(!limits.include_hidden)
496        .follow_links(false);
497    builder
498}
499
500/// Compile a glob, auto-wrapping bare patterns (no `/`) to match anywhere in
501/// the tree (`auth*` becomes `**/auth*`).
502fn build_globset(pattern: &str) -> Result<GlobSet> {
503    let normalized = if pattern.contains('/') {
504        pattern.to_string()
505    } else {
506        format!("**/{pattern}")
507    };
508    let glob = Glob::new(&normalized).map_err(|e| DciError::InvalidGlob {
509        glob: pattern.to_string(),
510        reason: e.to_string(),
511    })?;
512    let mut builder = GlobSetBuilder::new();
513    builder.add(glob);
514    builder.build().map_err(|e| DciError::InvalidGlob {
515        glob: pattern.to_string(),
516        reason: e.to_string(),
517    })
518}
519
520/// Read a file, capping the number of bytes read to `max_bytes`.
521fn read_file_bounded(path: &Path, max_bytes: u64) -> Result<String> {
522    use std::io::Read;
523    let file = std::fs::File::open(path).map_err(|e| DciError::Io {
524        path: path.to_path_buf(),
525        source: e,
526    })?;
527    let mut handle = file.take(max_bytes);
528    let mut buf = Vec::new();
529    handle.read_to_end(&mut buf).map_err(|e| DciError::Io {
530        path: path.to_path_buf(),
531        source: e,
532    })?;
533    Ok(String::from_utf8_lossy(&buf).into_owned())
534}
535
536fn truncate(text: &str, max_len: usize) -> String {
537    let trimmed = text.trim_end_matches(['\n', '\r']);
538    let mut indices = trimmed.char_indices();
539    match indices.nth(max_len) {
540        None => trimmed.to_string(),
541        Some((byte_idx, _)) => {
542            let mut out = String::with_capacity(byte_idx + 3); // 3 bytes for '…'
543            out.push_str(&trimmed[..byte_idx]);
544            out.push('…');
545            out
546        }
547    }
548}
549
550/// Sink that collects matched (and context) lines up to a per-call budget.
551struct CollectSink<'a> {
552    rel: &'a str,
553    hits: &'a mut Vec<SearchHit>,
554    remaining: usize,
555    max_line_len: usize,
556}
557
558impl Sink for CollectSink<'_> {
559    type Error = io::Error;
560
561    fn matched(&mut self, _searcher: &Searcher, m: &SinkMatch<'_>) -> io::Result<bool> {
562        if self.remaining == 0 {
563            return Ok(false);
564        }
565        let base = m.line_number().unwrap_or(0);
566        // A SinkMatch may span multiple lines; record each with an incrementing
567        // line number anchored at the match's first line.
568        for (offset, line) in m.lines().enumerate() {
569            if self.remaining == 0 {
570                break;
571            }
572            self.hits.push(SearchHit {
573                path: self.rel.to_string(),
574                line: base + offset as u64,
575                text: truncate(&String::from_utf8_lossy(line), self.max_line_len),
576                is_match: true,
577            });
578            self.remaining -= 1;
579        }
580        Ok(self.remaining > 0)
581    }
582
583    fn context(&mut self, _searcher: &Searcher, ctx: &SinkContext<'_>) -> io::Result<bool> {
584        if self.remaining == 0 {
585            return Ok(false);
586        }
587        self.hits.push(SearchHit {
588            path: self.rel.to_string(),
589            line: ctx.line_number().unwrap_or(0),
590            text: truncate(&String::from_utf8_lossy(ctx.bytes()), self.max_line_len),
591            is_match: false,
592        });
593        self.remaining -= 1;
594        Ok(self.remaining > 0)
595    }
596}