1#![allow(clippy::needless_pass_by_value)]
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::thread;
7
8use crossbeam::channel::{Receiver, Sender, unbounded};
9use globset::GlobSet;
10use walkdir::{DirEntry, WalkDir};
11
12type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
13
14type ProcDirPathsFunction<Config> =
15 dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
16
17type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
18
19fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
21fn null_proc_path<Config>(_: &Path, _: &Config) {}
22
23#[derive(Debug)]
24struct JobItem<Config> {
25 path: PathBuf,
26 cfg: Arc<Config>,
27}
28
29type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
30type JobSender<Config> = Sender<Option<JobItem<Config>>>;
31
32fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
33where
34 ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
35{
36 while let Ok(job) = receiver.recv() {
37 if job.is_none() {
38 break;
39 }
40 let job = job.unwrap();
42 let path = job.path.clone();
43
44 if let Err(err) = func(job.path, &job.cfg) {
45 eprintln!("{err:?} for file {}", path.display());
46 }
47 }
48}
49
50fn send_file<T>(
51 path: PathBuf,
52 cfg: &Arc<T>,
53 sender: &JobSender<T>,
54) -> Result<(), ConcurrentErrors> {
55 sender
56 .send(Some(JobItem {
57 path,
58 cfg: Arc::clone(cfg),
59 }))
60 .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
61}
62
63fn is_hidden(entry: &DirEntry) -> bool {
64 entry
65 .file_name()
66 .to_str()
67 .is_some_and(|s| s.starts_with('.'))
68}
69
70fn explore<Config, ProcDirPaths, ProcPath>(
71 files_data: FilesData,
72 cfg: &Arc<Config>,
73 proc_dir_paths: ProcDirPaths,
74 proc_path: ProcPath,
75 sender: &JobSender<Config>,
76) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
77where
78 ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
79 ProcPath: Fn(&Path, &Config) + Send + Sync,
80{
81 let FilesData {
82 mut paths,
83 ref include,
84 ref exclude,
85 } = files_data;
86
87 let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
88
89 for path in paths.drain(..) {
90 if !path.exists() {
91 eprintln!("Warning: File doesn't exist: {}", path.display());
92 continue;
93 }
94 if path.is_dir() {
95 for entry in WalkDir::new(path)
96 .into_iter()
97 .filter_entry(|e| !is_hidden(e))
98 {
99 let entry = match entry {
100 Ok(entry) => entry,
101 Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
102 };
103 let path = entry.path().to_path_buf();
104 if (include.is_empty() || include.is_match(&path))
105 && (exclude.is_empty() || !exclude.is_match(&path))
106 && path.is_file()
107 {
108 proc_dir_paths(&mut all_files, &path, cfg);
109 send_file(path, cfg, sender)?;
110 }
111 }
112 } else if (include.is_empty() || include.is_match(&path))
113 && (exclude.is_empty() || !exclude.is_match(&path))
114 && path.is_file()
115 {
116 proc_path(&path, cfg);
117 send_file(path, cfg, sender)?;
118 }
119 }
120
121 Ok(all_files)
122}
123
124#[derive(Debug)]
126pub enum ConcurrentErrors {
127 Producer(String),
131 Sender(String),
135 Receiver(String),
139 Thread(String),
143}
144
145#[derive(Debug)]
147pub struct FilesData {
148 pub include: GlobSet,
150 pub exclude: GlobSet,
152 pub paths: Vec<PathBuf>,
154}
155
156pub struct ConcurrentRunner<Config> {
158 proc_files: Box<ProcFilesFunction<Config>>,
159 proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
160 proc_path: Box<ProcPathFunction<Config>>,
161 num_jobs: usize,
162}
163
164impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
165 pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
171 where
172 ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
173 {
174 let num_jobs = std::cmp::max(2, num_jobs) - 1;
175 Self {
176 proc_files: Box::new(proc_files),
177 proc_dir_paths: Box::new(null_proc_dir_paths),
178 proc_path: Box::new(null_proc_path),
179 num_jobs,
180 }
181 }
182
183 #[must_use]
186 pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
187 where
188 ProcDirPaths:
189 'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
190 {
191 self.proc_dir_paths = Box::new(proc_dir_paths);
192 self
193 }
194
195 #[must_use]
197 pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
198 where
199 ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
200 {
201 self.proc_path = Box::new(proc_path);
202 self
203 }
204
205 pub fn run(
226 self,
227 config: Config,
228 files_data: FilesData,
229 ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
230 let cfg = Arc::new(config);
231
232 let (sender, receiver) = unbounded();
233
234 let producer = {
235 let sender = sender.clone();
236
237 match thread::Builder::new()
238 .name(String::from("Producer"))
239 .spawn(move || {
240 explore(
241 files_data,
242 &cfg,
243 self.proc_dir_paths,
244 self.proc_path,
245 &sender,
246 )
247 }) {
248 Ok(producer) => producer,
249 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
250 }
251 };
252
253 let mut receivers = Vec::with_capacity(self.num_jobs);
254 let proc_files = Arc::new(self.proc_files);
255 for i in 0..self.num_jobs {
256 let receiver = receiver.clone();
257 let proc_files = proc_files.clone();
258
259 let t = match thread::Builder::new()
260 .name(format!("Consumer {i}"))
261 .spawn(move || {
262 consumer(receiver, proc_files);
263 }) {
264 Ok(receiver) => receiver,
265 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
266 };
267
268 receivers.push(t);
269 }
270
271 let Ok(all_files) = producer.join() else {
272 return Err(ConcurrentErrors::Producer(
273 "Child thread panicked".to_owned(),
274 ));
275 };
276
277 for _ in 0..self.num_jobs {
279 if let Err(e) = sender.send(None) {
280 return Err(ConcurrentErrors::Sender(e.to_string()));
281 }
282 }
283
284 for receiver in receivers {
285 if receiver.join().is_err() {
286 return Err(ConcurrentErrors::Receiver(
287 "A thread used to process a file panicked".to_owned(),
288 ));
289 }
290 }
291
292 all_files
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use tempfile::Builder;
300 use walkdir::WalkDir;
301
302 fn make_visible_tempdir() -> tempfile::TempDir {
307 Builder::new().prefix("visible-").tempdir().unwrap()
308 }
309
310 fn walk_skipping_hidden(dir: &Path) -> Vec<String> {
314 WalkDir::new(dir)
315 .into_iter()
316 .filter_entry(|e| !is_hidden(e))
317 .filter_map(Result::ok)
318 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
319 .collect()
320 }
321
322 #[test]
323 fn is_hidden_skips_dotfiles_and_keeps_regular_files() {
324 let dir = make_visible_tempdir();
325 std::fs::write(dir.path().join("keep.rs"), "// kept\n").unwrap();
326 std::fs::write(dir.path().join(".env"), "secret=1\n").unwrap();
327 std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
328
329 let visited = walk_skipping_hidden(dir.path());
330 assert!(visited.iter().any(|n| n == "keep.rs"));
331 assert!(!visited.iter().any(|n| n == ".env"));
332 assert!(!visited.iter().any(|n| n == ".gitignore"));
333 }
334
335 #[test]
336 fn is_hidden_prunes_hidden_directories_recursively() {
337 let dir = make_visible_tempdir();
338 let hidden_dir = dir.path().join(".hidden");
339 std::fs::create_dir(&hidden_dir).unwrap();
340 std::fs::write(hidden_dir.join("inside.rs"), "// inside hidden\n").unwrap();
341 std::fs::write(dir.path().join("visible.rs"), "// visible\n").unwrap();
342
343 let visited = walk_skipping_hidden(dir.path());
344 assert!(visited.iter().any(|n| n == "visible.rs"));
346 assert!(!visited.iter().any(|n| n == ".hidden"));
347 assert!(!visited.iter().any(|n| n == "inside.rs"));
348 }
349}