rust_code_analysis_code_split/
concurrent_files.rs1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::thread;
5
6use crossbeam::channel::{Receiver, Sender, unbounded};
7use globset::GlobSet;
8use walkdir::{DirEntry, WalkDir};
9
10type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
11
12type ProcDirPathsFunction<Config> =
13 dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
14
15type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
16
17fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19fn null_proc_path<Config>(_: &Path, _: &Config) {}
20
21#[derive(Debug)]
22struct JobItem<Config> {
23 path: PathBuf,
24 cfg: Arc<Config>,
25}
26
27type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
28type JobSender<Config> = Sender<Option<JobItem<Config>>>;
29
30fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
31where
32 ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
33{
34 while let Ok(job) = receiver.recv() {
35 if job.is_none() {
36 break;
37 }
38 let job = job.unwrap();
40 let path = job.path.clone();
41
42 if let Err(err) = func(job.path, &job.cfg) {
43 eprintln!("{err:?} for file {path:?}");
44 }
45 }
46}
47
48fn send_file<T>(
49 path: PathBuf,
50 cfg: &Arc<T>,
51 sender: &JobSender<T>,
52) -> Result<(), ConcurrentErrors> {
53 sender
54 .send(Some(JobItem {
55 path,
56 cfg: Arc::clone(cfg),
57 }))
58 .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
59}
60
61fn is_hidden(entry: &DirEntry) -> bool {
62 entry
63 .file_name()
64 .to_str()
65 .map(|s| s.starts_with('.'))
66 .unwrap_or(false)
67}
68
69fn explore<Config, ProcDirPaths, ProcPath>(
70 files_data: FilesData,
71 cfg: &Arc<Config>,
72 proc_dir_paths: ProcDirPaths,
73 proc_path: ProcPath,
74 sender: &JobSender<Config>,
75) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
76where
77 ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
78 ProcPath: Fn(&Path, &Config) + Send + Sync,
79{
80 let FilesData {
81 mut paths,
82 ref include,
83 ref exclude,
84 } = files_data;
85
86 let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
87
88 for path in paths.drain(..) {
89 if !path.exists() {
90 eprintln!("Warning: File doesn't exist: {path:?}");
91 continue;
92 }
93 if path.is_dir() {
94 for entry in WalkDir::new(path)
95 .into_iter()
96 .filter_entry(|e| !is_hidden(e))
97 {
98 let entry = match entry {
99 Ok(entry) => entry,
100 Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
101 };
102 let path = entry.path().to_path_buf();
103 if (include.is_empty() || include.is_match(&path))
104 && (exclude.is_empty() || !exclude.is_match(&path))
105 && path.is_file()
106 {
107 proc_dir_paths(&mut all_files, &path, cfg);
108 send_file(path, cfg, sender)?;
109 }
110 }
111 } else if (include.is_empty() || include.is_match(&path))
112 && (exclude.is_empty() || !exclude.is_match(&path))
113 && path.is_file()
114 {
115 proc_path(&path, cfg);
116 send_file(path, cfg, sender)?;
117 }
118 }
119
120 Ok(all_files)
121}
122
123#[derive(Debug)]
125pub enum ConcurrentErrors {
126 Producer(String),
130 Sender(String),
134 Receiver(String),
138 Thread(String),
142}
143
144#[derive(Debug)]
146pub struct FilesData {
147 pub include: GlobSet,
149 pub exclude: GlobSet,
151 pub paths: Vec<PathBuf>,
153}
154
155pub struct ConcurrentRunner<Config> {
157 proc_files: Box<ProcFilesFunction<Config>>,
158 proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
159 proc_path: Box<ProcPathFunction<Config>>,
160 num_jobs: usize,
161}
162
163impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
164 pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
170 where
171 ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
172 {
173 let num_jobs = std::cmp::max(2, num_jobs) - 1;
174 Self {
175 proc_files: Box::new(proc_files),
176 proc_dir_paths: Box::new(null_proc_dir_paths),
177 proc_path: Box::new(null_proc_path),
178 num_jobs,
179 }
180 }
181
182 pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
185 where
186 ProcDirPaths:
187 'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
188 {
189 self.proc_dir_paths = Box::new(proc_dir_paths);
190 self
191 }
192
193 pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
195 where
196 ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
197 {
198 self.proc_path = Box::new(proc_path);
199 self
200 }
201
202 pub fn run(
209 self,
210 config: Config,
211 files_data: FilesData,
212 ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
213 let cfg = Arc::new(config);
214
215 let (sender, receiver) = unbounded();
216
217 let producer = {
218 let sender = sender.clone();
219
220 match thread::Builder::new()
221 .name(String::from("Producer"))
222 .spawn(move || {
223 explore(
224 files_data,
225 &cfg,
226 self.proc_dir_paths,
227 self.proc_path,
228 &sender,
229 )
230 }) {
231 Ok(producer) => producer,
232 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
233 }
234 };
235
236 let mut receivers = Vec::with_capacity(self.num_jobs);
237 let proc_files = Arc::new(self.proc_files);
238 for i in 0..self.num_jobs {
239 let receiver = receiver.clone();
240 let proc_files = proc_files.clone();
241
242 let t = match thread::Builder::new()
243 .name(format!("Consumer {i}"))
244 .spawn(move || {
245 consumer(receiver, proc_files);
246 }) {
247 Ok(receiver) => receiver,
248 Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
249 };
250
251 receivers.push(t);
252 }
253
254 let all_files = match producer.join() {
255 Ok(res) => res,
256 Err(_) => {
257 return Err(ConcurrentErrors::Producer(
258 "Child thread panicked".to_owned(),
259 ));
260 }
261 };
262
263 for _ in 0..self.num_jobs {
265 if let Err(e) = sender.send(None) {
266 return Err(ConcurrentErrors::Sender(e.to_string()));
267 }
268 }
269
270 for receiver in receivers {
271 if receiver.join().is_err() {
272 return Err(ConcurrentErrors::Receiver(
273 "A thread used to process a file panicked".to_owned(),
274 ));
275 }
276 }
277
278 all_files
279 }
280}