1#![allow(clippy::needless_pass_by_value)]
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::thread;
7
8use crossbeam::channel::{Receiver, Sender, unbounded};
9use globset::GlobSet;
10use walkdir::{DirEntry, WalkDir};
11
12type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
13
14type ProcDirPathsFunction<Config> =
15 dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
16
17type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
18
19fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
21fn null_proc_path<Config>(_: &Path, _: &Config) {}
22
23#[derive(Debug)]
24struct JobItem<Config> {
25 path: PathBuf,
26 cfg: Arc<Config>,
27}
28
29type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
30type JobSender<Config> = Sender<Option<JobItem<Config>>>;
31
32fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
33where
34 ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
35{
36 while let Ok(Some(job)) = receiver.recv() {
39 let path = job.path.clone();
40
41 if let Err(err) = func(job.path, &job.cfg) {
42 eprintln!("{err:?} for file {}", path.display());
43 }
44 }
45}
46
47fn send_file<T>(
48 path: PathBuf,
49 cfg: &Arc<T>,
50 sender: &JobSender<T>,
51) -> Result<(), ConcurrentErrors> {
52 sender
53 .send(Some(JobItem {
54 path,
55 cfg: Arc::clone(cfg),
56 }))
57 .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
58}
59
60fn is_hidden(entry: &DirEntry) -> bool {
61 entry
62 .file_name()
63 .to_str()
64 .is_some_and(|s| s.starts_with('.'))
65}
66
67fn explore<Config, ProcDirPaths, ProcPath>(
68 files_data: FilesData,
69 cfg: &Arc<Config>,
70 proc_dir_paths: ProcDirPaths,
71 proc_path: ProcPath,
72 sender: &JobSender<Config>,
73) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
74where
75 ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
76 ProcPath: Fn(&Path, &Config) + Send + Sync,
77{
78 let FilesData {
79 mut paths,
80 include,
81 exclude,
82 } = files_data;
83 let filters = Filters {
84 include: &include,
85 exclude: &exclude,
86 };
87
88 let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
89
90 for path in paths.drain(..) {
91 if !path.exists() {
92 eprintln!("Warning: File doesn't exist: {}", path.display());
93 continue;
94 }
95 if path.is_dir() {
96 for entry_path in walk_dir_files(&path, &filters) {
97 let entry_path = entry_path?;
98 proc_dir_paths(&mut all_files, &entry_path, cfg);
99 send_file(entry_path, cfg, sender)?;
100 }
101 } else if filters.matches(&path) && path.is_file() {
102 proc_path(&path, cfg);
103 send_file(path, cfg, sender)?;
104 }
105 }
106
107 Ok(all_files)
108}
109
110struct Filters<'a> {
114 include: &'a GlobSet,
115 exclude: &'a GlobSet,
116}
117
118impl Filters<'_> {
119 fn matches(&self, path: &Path) -> bool {
120 (self.include.is_empty() || self.include.is_match(path))
121 && (self.exclude.is_empty() || !self.exclude.is_match(path))
122 }
123}
124
125fn walk_dir_files<'a>(
130 root: &Path,
131 filters: &'a Filters<'_>,
132) -> impl Iterator<Item = Result<PathBuf, ConcurrentErrors>> + 'a {
133 WalkDir::new(root)
134 .into_iter()
135 .filter_entry(|e| !is_hidden(e))
136 .filter_map(move |entry| match entry {
137 Ok(entry) => {
138 let path = entry.path();
139 (filters.matches(path) && path.is_file()).then(|| Ok(path.to_path_buf()))
140 }
141 Err(e) => Some(Err(ConcurrentErrors::Sender(e.to_string()))),
142 })
143}
144
145#[derive(Debug)]
147pub enum ConcurrentErrors {
148 Producer(String),
152 Sender(String),
156 Receiver(String),
160 Thread(String),
164}
165
166#[derive(Debug)]
168pub struct FilesData {
169 pub include: GlobSet,
171 pub exclude: GlobSet,
173 pub paths: Vec<PathBuf>,
175}
176
177pub struct ConcurrentRunner<Config> {
179 proc_files: Box<ProcFilesFunction<Config>>,
180 proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
181 proc_path: Box<ProcPathFunction<Config>>,
182 num_jobs: usize,
183}
184
185impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
186 pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
192 where
193 ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
194 {
195 let num_jobs = std::cmp::max(2, num_jobs) - 1;
196 Self {
197 proc_files: Box::new(proc_files),
198 proc_dir_paths: Box::new(null_proc_dir_paths),
199 proc_path: Box::new(null_proc_path),
200 num_jobs,
201 }
202 }
203
204 #[must_use]
207 pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
208 where
209 ProcDirPaths:
210 'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
211 {
212 self.proc_dir_paths = Box::new(proc_dir_paths);
213 self
214 }
215
216 #[must_use]
218 pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
219 where
220 ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
221 {
222 self.proc_path = Box::new(proc_path);
223 self
224 }
225
226 pub fn run(
247 self,
248 config: Config,
249 files_data: FilesData,
250 ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
251 let cfg = Arc::new(config);
252
253 let (sender, receiver) = unbounded();
254
255 let producer = {
256 let sender = sender.clone();
257
258 match thread::Builder::new()
259 .name(String::from("Producer"))
260 .spawn(move || {
261 explore(
262 files_data,
263 &cfg,
264 self.proc_dir_paths,
265 self.proc_path,
266 &sender,
267 )
268 }) {
269 Ok(producer) => producer,
270 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
271 }
272 };
273
274 let mut receivers = Vec::with_capacity(self.num_jobs);
275 let proc_files = Arc::new(self.proc_files);
276 for i in 0..self.num_jobs {
277 let receiver = receiver.clone();
278 let proc_files = proc_files.clone();
279
280 let t = match thread::Builder::new()
281 .name(format!("Consumer {i}"))
282 .spawn(move || {
283 consumer(receiver, proc_files);
284 }) {
285 Ok(receiver) => receiver,
286 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
287 };
288
289 receivers.push(t);
290 }
291
292 let Ok(all_files) = producer.join() else {
293 return Err(ConcurrentErrors::Producer(
294 "Child thread panicked".to_owned(),
295 ));
296 };
297
298 for _ in 0..self.num_jobs {
300 if let Err(e) = sender.send(None) {
301 return Err(ConcurrentErrors::Sender(e.to_string()));
302 }
303 }
304
305 for receiver in receivers {
306 if receiver.join().is_err() {
307 return Err(ConcurrentErrors::Receiver(
308 "A thread used to process a file panicked".to_owned(),
309 ));
310 }
311 }
312
313 all_files
314 }
315}
316
317#[cfg(test)]
318#[path = "concurrent_files_tests.rs"]
319mod tests;