1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::thread;
5
6use crossbeam::channel::{unbounded, Receiver, Sender};
7use globset::GlobSet;
8use walkdir::{DirEntry, WalkDir};
9
10type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
11
12type ProcDirPathsFunction<Config> =
13 dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
14
15type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
16
17fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19fn null_proc_path<Config>(_: &Path, _: &Config) {}
20
21struct JobItem<Config> {
22 path: PathBuf,
23 cfg: Arc<Config>,
24}
25
26type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
27type JobSender<Config> = Sender<Option<JobItem<Config>>>;
28
29fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
30where
31 ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
32{
33 while let Ok(job) = receiver.recv() {
34 if job.is_none() {
35 break;
36 }
37 let job = job.unwrap();
39 let path = job.path.clone();
40
41 if let Err(err) = func(job.path, &job.cfg) {
42 eprintln!("{:?} for file {:?}", err, path);
43 }
44 }
45}
46
47fn send_file<T>(
48 path: PathBuf,
49 cfg: &Arc<T>,
50 sender: &JobSender<T>,
51) -> Result<(), ConcurrentErrors> {
52 sender
53 .send(Some(JobItem {
54 path,
55 cfg: Arc::clone(cfg),
56 }))
57 .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
58}
59
60fn is_hidden(entry: &DirEntry) -> bool {
61 entry
62 .file_name()
63 .to_str()
64 .map(|s| s.starts_with('.'))
65 .unwrap_or(false)
66}
67
68fn explore<Config, ProcDirPaths, ProcPath>(
69 files_data: FilesData,
70 cfg: &Arc<Config>,
71 proc_dir_paths: ProcDirPaths,
72 proc_path: ProcPath,
73 sender: &JobSender<Config>,
74) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
75where
76 ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
77 ProcPath: Fn(&Path, &Config) + Send + Sync,
78{
79 let FilesData {
80 mut paths,
81 ref include,
82 ref exclude,
83 } = files_data;
84
85 let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
86
87 for path in paths.drain(..) {
88 if !path.exists() {
89 eprintln!("Warning: File doesn't exist: {:?}", path);
90 continue;
91 }
92 if path.is_dir() {
93 for entry in WalkDir::new(path)
94 .into_iter()
95 .filter_entry(|e| !is_hidden(e))
96 {
97 let entry = match entry {
98 Ok(entry) => entry,
99 Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
100 };
101 let path = entry.path().to_path_buf();
102 if (include.is_empty() || include.is_match(&path))
103 && (exclude.is_empty() || !exclude.is_match(&path))
104 && path.is_file()
105 {
106 proc_dir_paths(&mut all_files, &path, cfg);
107 send_file(path, cfg, sender)?;
108 }
109 }
110 } else if (include.is_empty() || include.is_match(&path))
111 && (exclude.is_empty() || !exclude.is_match(&path))
112 && path.is_file()
113 {
114 proc_path(&path, cfg);
115 send_file(path, cfg, sender)?;
116 }
117 }
118
119 Ok(all_files)
120}
121
122#[derive(Debug)]
124pub enum ConcurrentErrors {
125 Producer(String),
129 Sender(String),
133 Receiver(String),
137 Thread(String),
141}
142
143pub struct FilesData {
145 pub include: GlobSet,
147 pub exclude: GlobSet,
149 pub paths: Vec<PathBuf>,
151}
152
153pub struct ConcurrentRunner<Config> {
155 proc_files: Box<ProcFilesFunction<Config>>,
156 proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
157 proc_path: Box<ProcPathFunction<Config>>,
158 num_jobs: usize,
159}
160
161impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
162 pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
168 where
169 ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
170 {
171 let num_jobs = std::cmp::max(2, num_jobs) - 1;
172 Self {
173 proc_files: Box::new(proc_files),
174 proc_dir_paths: Box::new(null_proc_dir_paths),
175 proc_path: Box::new(null_proc_path),
176 num_jobs,
177 }
178 }
179
180 pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
183 where
184 ProcDirPaths:
185 'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
186 {
187 self.proc_dir_paths = Box::new(proc_dir_paths);
188 self
189 }
190
191 pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
193 where
194 ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
195 {
196 self.proc_path = Box::new(proc_path);
197 self
198 }
199
200 pub fn run(
207 self,
208 config: Config,
209 files_data: FilesData,
210 ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
211 let cfg = Arc::new(config);
212
213 let (sender, receiver) = unbounded();
214
215 let producer = {
216 let sender = sender.clone();
217
218 match thread::Builder::new()
219 .name(String::from("Producer"))
220 .spawn(move || {
221 explore(
222 files_data,
223 &cfg,
224 self.proc_dir_paths,
225 self.proc_path,
226 &sender,
227 )
228 }) {
229 Ok(producer) => producer,
230 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
231 }
232 };
233
234 let mut receivers = Vec::with_capacity(self.num_jobs);
235 let proc_files = Arc::new(self.proc_files);
236 for i in 0..self.num_jobs {
237 let receiver = receiver.clone();
238 let proc_files = proc_files.clone();
239
240 let t = match thread::Builder::new()
241 .name(format!("Consumer {}", i))
242 .spawn(move || {
243 consumer(receiver, proc_files);
244 }) {
245 Ok(receiver) => receiver,
246 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
247 };
248
249 receivers.push(t);
250 }
251
252 let all_files = match producer.join() {
253 Ok(res) => res,
254 Err(_) => {
255 return Err(ConcurrentErrors::Producer(
256 "Child thread panicked".to_owned(),
257 ))
258 }
259 };
260
261 for _ in 0..self.num_jobs {
263 if let Err(e) = sender.send(None) {
264 return Err(ConcurrentErrors::Sender(e.to_string()));
265 }
266 }
267
268 for receiver in receivers {
269 if receiver.join().is_err() {
270 return Err(ConcurrentErrors::Receiver(
271 "A thread used to process a file panicked".to_owned(),
272 ));
273 }
274 }
275
276 all_files
277 }
278}