Skip to main content

big_code_analysis/
concurrent_files.rs

1#![allow(clippy::needless_pass_by_value)]
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::thread;
7
8use crossbeam::channel::{Receiver, Sender, unbounded};
9use globset::GlobSet;
10use walkdir::{DirEntry, WalkDir};
11
12type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
13
14type ProcDirPathsFunction<Config> =
15    dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
16
17type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
18
19// Null functions removed at compile time
20fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
21fn null_proc_path<Config>(_: &Path, _: &Config) {}
22
23#[derive(Debug)]
24struct JobItem<Config> {
25    path: PathBuf,
26    cfg: Arc<Config>,
27}
28
29type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
30type JobSender<Config> = Sender<Option<JobItem<Config>>>;
31
32fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
33where
34    ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
35{
36    // `Ok(None)` is the poison-pill terminating the consumer loop;
37    // `Err(_)` means the channel was closed (sender dropped).
38    while let Ok(Some(job)) = receiver.recv() {
39        let path = job.path.clone();
40
41        if let Err(err) = func(job.path, &job.cfg) {
42            eprintln!("{err:?} for file {}", path.display());
43        }
44    }
45}
46
47fn send_file<T>(
48    path: PathBuf,
49    cfg: &Arc<T>,
50    sender: &JobSender<T>,
51) -> Result<(), ConcurrentErrors> {
52    sender
53        .send(Some(JobItem {
54            path,
55            cfg: Arc::clone(cfg),
56        }))
57        .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
58}
59
60fn is_hidden(entry: &DirEntry) -> bool {
61    entry
62        .file_name()
63        .to_str()
64        .is_some_and(|s| s.starts_with('.'))
65}
66
67fn explore<Config, ProcDirPaths, ProcPath>(
68    files_data: FilesData,
69    cfg: &Arc<Config>,
70    proc_dir_paths: ProcDirPaths,
71    proc_path: ProcPath,
72    sender: &JobSender<Config>,
73) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
74where
75    ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
76    ProcPath: Fn(&Path, &Config) + Send + Sync,
77{
78    let FilesData {
79        mut paths,
80        include,
81        exclude,
82    } = files_data;
83    let filters = Filters {
84        include: &include,
85        exclude: &exclude,
86    };
87
88    let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
89
90    for path in paths.drain(..) {
91        if !path.exists() {
92            eprintln!("Warning: File doesn't exist: {}", path.display());
93            continue;
94        }
95        if path.is_dir() {
96            for entry_path in walk_dir_files(&path, &filters) {
97                let entry_path = entry_path?;
98                proc_dir_paths(&mut all_files, &entry_path, cfg);
99                send_file(entry_path, cfg, sender)?;
100            }
101        } else if filters.matches(&path) && path.is_file() {
102            proc_path(&path, cfg);
103            send_file(path, cfg, sender)?;
104        }
105    }
106
107    Ok(all_files)
108}
109
110/// Borrowed include/exclude pair, factored out so `explore` and the
111/// directory walker can share one filter predicate instead of
112/// re-evaluating two near-identical `&&`-chains side-by-side.
113struct Filters<'a> {
114    include: &'a GlobSet,
115    exclude: &'a GlobSet,
116}
117
118impl Filters<'_> {
119    fn matches(&self, path: &Path) -> bool {
120        (self.include.is_empty() || self.include.is_match(path))
121            && (self.exclude.is_empty() || !self.exclude.is_match(path))
122    }
123}
124
125/// Walk `root` recursively, yielding only regular files that pass
126/// `filters` and aren't hidden. `WalkDir` errors are surfaced as
127/// `ConcurrentErrors::Sender` so the caller can `?`-propagate them
128/// through this iterator.
129fn walk_dir_files<'a>(
130    root: &Path,
131    filters: &'a Filters<'_>,
132) -> impl Iterator<Item = Result<PathBuf, ConcurrentErrors>> + 'a {
133    WalkDir::new(root)
134        .into_iter()
135        .filter_entry(|e| !is_hidden(e))
136        .filter_map(move |entry| match entry {
137            Ok(entry) => {
138                let path = entry.path();
139                (filters.matches(path) && path.is_file()).then(|| Ok(path.to_path_buf()))
140            }
141            Err(e) => Some(Err(ConcurrentErrors::Sender(e.to_string()))),
142        })
143}
144
145/// Series of errors that might happen when processing files concurrently.
146#[derive(Debug)]
147pub enum ConcurrentErrors {
148    /// Producer side error.
149    ///
150    /// An error occurred inside the producer thread.
151    Producer(String),
152    /// Sender side error.
153    ///
154    /// An error occurred when sending an item.
155    Sender(String),
156    /// Receiver side error.
157    ///
158    /// An error occurred inside one of the receiver threads.
159    Receiver(String),
160    /// Thread side error.
161    ///
162    /// A general error occurred when a thread is being spawned or run.
163    Thread(String),
164}
165
166/// Data related to files.
167#[derive(Debug)]
168pub struct FilesData {
169    /// Kind of files included in a search.
170    pub include: GlobSet,
171    /// Kind of files excluded from a search.
172    pub exclude: GlobSet,
173    /// List of file paths.
174    pub paths: Vec<PathBuf>,
175}
176
177/// A runner to process files concurrently.
178pub struct ConcurrentRunner<Config> {
179    proc_files: Box<ProcFilesFunction<Config>>,
180    proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
181    proc_path: Box<ProcPathFunction<Config>>,
182    num_jobs: usize,
183}
184
185impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
186    /// Creates a new `ConcurrentRunner`.
187    ///
188    /// * `num_jobs` - Number of jobs utilized to process files concurrently.
189    /// * `proc_files` - Function that processes each file found during
190    ///   the search.
191    pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
192    where
193        ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
194    {
195        let num_jobs = std::cmp::max(2, num_jobs) - 1;
196        Self {
197            proc_files: Box::new(proc_files),
198            proc_dir_paths: Box::new(null_proc_dir_paths),
199            proc_path: Box::new(null_proc_path),
200            num_jobs,
201        }
202    }
203
204    /// Sets the function to process the paths and subpaths contained in a
205    /// directory.
206    #[must_use]
207    pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
208    where
209        ProcDirPaths:
210            'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
211    {
212        self.proc_dir_paths = Box::new(proc_dir_paths);
213        self
214    }
215
216    /// Sets the function to process a single path.
217    #[must_use]
218    pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
219    where
220        ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
221    {
222        self.proc_path = Box::new(proc_path);
223        self
224    }
225
226    /// Runs the producer-consumer approach to process the files
227    /// contained in a directory and in its own subdirectories.
228    ///
229    /// * `config` - Information used to process a file.
230    /// * `files_data` - Information about the files to be included or excluded
231    ///   from a search more the number of paths considered in the search.
232    ///
233    /// # Errors
234    ///
235    /// Returns [`ConcurrentErrors::Thread`] when any worker thread
236    /// (the single producer OR one of the `num_jobs` consumers)
237    /// cannot be spawned via [`std::thread::Builder::spawn`];
238    /// [`ConcurrentErrors::Producer`] when the producer thread
239    /// panics during its directory walk and join fails;
240    /// [`ConcurrentErrors::Sender`] when a worker cannot place an
241    /// item (or the post-walk `None` poison-pill) on the channel;
242    /// [`ConcurrentErrors::Receiver`] when a consumer thread panics
243    /// and its join fails. Per-file processing errors raised by the
244    /// user-supplied callbacks are surfaced through the callbacks
245    /// themselves, not through this `Result`.
246    pub fn run(
247        self,
248        config: Config,
249        files_data: FilesData,
250    ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
251        let cfg = Arc::new(config);
252
253        let (sender, receiver) = unbounded();
254
255        let producer = {
256            let sender = sender.clone();
257
258            match thread::Builder::new()
259                .name(String::from("Producer"))
260                .spawn(move || {
261                    explore(
262                        files_data,
263                        &cfg,
264                        self.proc_dir_paths,
265                        self.proc_path,
266                        &sender,
267                    )
268                }) {
269                Ok(producer) => producer,
270                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
271            }
272        };
273
274        let mut receivers = Vec::with_capacity(self.num_jobs);
275        let proc_files = Arc::new(self.proc_files);
276        for i in 0..self.num_jobs {
277            let receiver = receiver.clone();
278            let proc_files = proc_files.clone();
279
280            let t = match thread::Builder::new()
281                .name(format!("Consumer {i}"))
282                .spawn(move || {
283                    consumer(receiver, proc_files);
284                }) {
285                Ok(receiver) => receiver,
286                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
287            };
288
289            receivers.push(t);
290        }
291
292        let Ok(all_files) = producer.join() else {
293            return Err(ConcurrentErrors::Producer(
294                "Child thread panicked".to_owned(),
295            ));
296        };
297
298        // Poison the receiver, now that the producer is finished.
299        for _ in 0..self.num_jobs {
300            if let Err(e) = sender.send(None) {
301                return Err(ConcurrentErrors::Sender(e.to_string()));
302            }
303        }
304
305        for receiver in receivers {
306            if receiver.join().is_err() {
307                return Err(ConcurrentErrors::Receiver(
308                    "A thread used to process a file panicked".to_owned(),
309                ));
310            }
311        }
312
313        all_files
314    }
315}
316
317#[cfg(test)]
318#[path = "concurrent_files_tests.rs"]
319mod tests;