Skip to main content

big_code_analysis/
concurrent_files.rs

1#![allow(clippy::needless_pass_by_value)]
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::thread;
7
8use crossbeam::channel::{Receiver, Sender, unbounded};
9use globset::GlobSet;
10use walkdir::{DirEntry, WalkDir};
11
12type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
13
14type ProcDirPathsFunction<Config> =
15    dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
16
17type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
18
19// Null functions removed at compile time
20fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
21fn null_proc_path<Config>(_: &Path, _: &Config) {}
22
23#[derive(Debug)]
24struct JobItem<Config> {
25    path: PathBuf,
26    cfg: Arc<Config>,
27}
28
29type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
30type JobSender<Config> = Sender<Option<JobItem<Config>>>;
31
32fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
33where
34    ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
35{
36    while let Ok(job) = receiver.recv() {
37        if job.is_none() {
38            break;
39        }
40        // Cannot panic because of the check immediately above.
41        let job = job.unwrap();
42        let path = job.path.clone();
43
44        if let Err(err) = func(job.path, &job.cfg) {
45            eprintln!("{err:?} for file {}", path.display());
46        }
47    }
48}
49
50fn send_file<T>(
51    path: PathBuf,
52    cfg: &Arc<T>,
53    sender: &JobSender<T>,
54) -> Result<(), ConcurrentErrors> {
55    sender
56        .send(Some(JobItem {
57            path,
58            cfg: Arc::clone(cfg),
59        }))
60        .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
61}
62
63fn is_hidden(entry: &DirEntry) -> bool {
64    entry
65        .file_name()
66        .to_str()
67        .is_some_and(|s| s.starts_with('.'))
68}
69
70fn explore<Config, ProcDirPaths, ProcPath>(
71    files_data: FilesData,
72    cfg: &Arc<Config>,
73    proc_dir_paths: ProcDirPaths,
74    proc_path: ProcPath,
75    sender: &JobSender<Config>,
76) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
77where
78    ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
79    ProcPath: Fn(&Path, &Config) + Send + Sync,
80{
81    let FilesData {
82        mut paths,
83        ref include,
84        ref exclude,
85    } = files_data;
86
87    let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
88
89    for path in paths.drain(..) {
90        if !path.exists() {
91            eprintln!("Warning: File doesn't exist: {}", path.display());
92            continue;
93        }
94        if path.is_dir() {
95            for entry in WalkDir::new(path)
96                .into_iter()
97                .filter_entry(|e| !is_hidden(e))
98            {
99                let entry = match entry {
100                    Ok(entry) => entry,
101                    Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
102                };
103                let path = entry.path().to_path_buf();
104                if (include.is_empty() || include.is_match(&path))
105                    && (exclude.is_empty() || !exclude.is_match(&path))
106                    && path.is_file()
107                {
108                    proc_dir_paths(&mut all_files, &path, cfg);
109                    send_file(path, cfg, sender)?;
110                }
111            }
112        } else if (include.is_empty() || include.is_match(&path))
113            && (exclude.is_empty() || !exclude.is_match(&path))
114            && path.is_file()
115        {
116            proc_path(&path, cfg);
117            send_file(path, cfg, sender)?;
118        }
119    }
120
121    Ok(all_files)
122}
123
124/// Series of errors that might happen when processing files concurrently.
125#[derive(Debug)]
126pub enum ConcurrentErrors {
127    /// Producer side error.
128    ///
129    /// An error occurred inside the producer thread.
130    Producer(String),
131    /// Sender side error.
132    ///
133    /// An error occurred when sending an item.
134    Sender(String),
135    /// Receiver side error.
136    ///
137    /// An error occurred inside one of the receiver threads.
138    Receiver(String),
139    /// Thread side error.
140    ///
141    /// A general error occurred when a thread is being spawned or run.
142    Thread(String),
143}
144
145/// Data related to files.
146#[derive(Debug)]
147pub struct FilesData {
148    /// Kind of files included in a search.
149    pub include: GlobSet,
150    /// Kind of files excluded from a search.
151    pub exclude: GlobSet,
152    /// List of file paths.
153    pub paths: Vec<PathBuf>,
154}
155
156/// A runner to process files concurrently.
157pub struct ConcurrentRunner<Config> {
158    proc_files: Box<ProcFilesFunction<Config>>,
159    proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
160    proc_path: Box<ProcPathFunction<Config>>,
161    num_jobs: usize,
162}
163
164impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
165    /// Creates a new `ConcurrentRunner`.
166    ///
167    /// * `num_jobs` - Number of jobs utilized to process files concurrently.
168    /// * `proc_files` - Function that processes each file found during
169    ///   the search.
170    pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
171    where
172        ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
173    {
174        let num_jobs = std::cmp::max(2, num_jobs) - 1;
175        Self {
176            proc_files: Box::new(proc_files),
177            proc_dir_paths: Box::new(null_proc_dir_paths),
178            proc_path: Box::new(null_proc_path),
179            num_jobs,
180        }
181    }
182
183    /// Sets the function to process the paths and subpaths contained in a
184    /// directory.
185    #[must_use]
186    pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
187    where
188        ProcDirPaths:
189            'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
190    {
191        self.proc_dir_paths = Box::new(proc_dir_paths);
192        self
193    }
194
195    /// Sets the function to process a single path.
196    #[must_use]
197    pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
198    where
199        ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
200    {
201        self.proc_path = Box::new(proc_path);
202        self
203    }
204
205    /// Runs the producer-consumer approach to process the files
206    /// contained in a directory and in its own subdirectories.
207    ///
208    /// * `config` - Information used to process a file.
209    /// * `files_data` - Information about the files to be included or excluded
210    ///   from a search more the number of paths considered in the search.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`ConcurrentErrors::Thread`] when any worker thread
215    /// (the single producer OR one of the `num_jobs` consumers)
216    /// cannot be spawned via [`std::thread::Builder::spawn`];
217    /// [`ConcurrentErrors::Producer`] when the producer thread
218    /// panics during its directory walk and join fails;
219    /// [`ConcurrentErrors::Sender`] when a worker cannot place an
220    /// item (or the post-walk `None` poison-pill) on the channel;
221    /// [`ConcurrentErrors::Receiver`] when a consumer thread panics
222    /// and its join fails. Per-file processing errors raised by the
223    /// user-supplied callbacks are surfaced through the callbacks
224    /// themselves, not through this `Result`.
225    pub fn run(
226        self,
227        config: Config,
228        files_data: FilesData,
229    ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
230        let cfg = Arc::new(config);
231
232        let (sender, receiver) = unbounded();
233
234        let producer = {
235            let sender = sender.clone();
236
237            match thread::Builder::new()
238                .name(String::from("Producer"))
239                .spawn(move || {
240                    explore(
241                        files_data,
242                        &cfg,
243                        self.proc_dir_paths,
244                        self.proc_path,
245                        &sender,
246                    )
247                }) {
248                Ok(producer) => producer,
249                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
250            }
251        };
252
253        let mut receivers = Vec::with_capacity(self.num_jobs);
254        let proc_files = Arc::new(self.proc_files);
255        for i in 0..self.num_jobs {
256            let receiver = receiver.clone();
257            let proc_files = proc_files.clone();
258
259            let t = match thread::Builder::new()
260                .name(format!("Consumer {i}"))
261                .spawn(move || {
262                    consumer(receiver, proc_files);
263                }) {
264                Ok(receiver) => receiver,
265                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
266            };
267
268            receivers.push(t);
269        }
270
271        let Ok(all_files) = producer.join() else {
272            return Err(ConcurrentErrors::Producer(
273                "Child thread panicked".to_owned(),
274            ));
275        };
276
277        // Poison the receiver, now that the producer is finished.
278        for _ in 0..self.num_jobs {
279            if let Err(e) = sender.send(None) {
280                return Err(ConcurrentErrors::Sender(e.to_string()));
281            }
282        }
283
284        for receiver in receivers {
285            if receiver.join().is_err() {
286                return Err(ConcurrentErrors::Receiver(
287                    "A thread used to process a file panicked".to_owned(),
288                ));
289            }
290        }
291
292        all_files
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use tempfile::Builder;
300    use walkdir::WalkDir;
301
302    // `tempfile::TempDir::new()` uses a default `.tmp` prefix, which
303    // would itself trip `is_hidden` and filter the entire fixture out.
304    // The tests below use `Builder::new().prefix("visible-")` to land
305    // on a non-hidden root.
306    fn make_visible_tempdir() -> tempfile::TempDir {
307        Builder::new().prefix("visible-").tempdir().unwrap()
308    }
309
310    /// Returns the visited `DirEntry` filenames for a directory tree,
311    /// applying the same `filter_entry(is_hidden)` gate used by
312    /// `explore`.
313    fn walk_skipping_hidden(dir: &Path) -> Vec<String> {
314        WalkDir::new(dir)
315            .into_iter()
316            .filter_entry(|e| !is_hidden(e))
317            .filter_map(Result::ok)
318            .filter_map(|e| e.file_name().to_str().map(str::to_owned))
319            .collect()
320    }
321
322    #[test]
323    fn is_hidden_skips_dotfiles_and_keeps_regular_files() {
324        let dir = make_visible_tempdir();
325        std::fs::write(dir.path().join("keep.rs"), "// kept\n").unwrap();
326        std::fs::write(dir.path().join(".env"), "secret=1\n").unwrap();
327        std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
328
329        let visited = walk_skipping_hidden(dir.path());
330        assert!(visited.iter().any(|n| n == "keep.rs"));
331        assert!(!visited.iter().any(|n| n == ".env"));
332        assert!(!visited.iter().any(|n| n == ".gitignore"));
333    }
334
335    #[test]
336    fn is_hidden_prunes_hidden_directories_recursively() {
337        let dir = make_visible_tempdir();
338        let hidden_dir = dir.path().join(".hidden");
339        std::fs::create_dir(&hidden_dir).unwrap();
340        std::fs::write(hidden_dir.join("inside.rs"), "// inside hidden\n").unwrap();
341        std::fs::write(dir.path().join("visible.rs"), "// visible\n").unwrap();
342
343        let visited = walk_skipping_hidden(dir.path());
344        // The hidden directory and everything inside it must be pruned.
345        assert!(visited.iter().any(|n| n == "visible.rs"));
346        assert!(!visited.iter().any(|n| n == ".hidden"));
347        assert!(!visited.iter().any(|n| n == "inside.rs"));
348    }
349}