rust_code_analysis/
concurrent_files.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::thread;
5
6use crossbeam::channel::{unbounded, Receiver, Sender};
7use globset::GlobSet;
8use walkdir::{DirEntry, WalkDir};
9
10type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
11
12type ProcDirPathsFunction<Config> =
13    dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
14
15type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
16
17// Null functions removed at compile time
18fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19fn null_proc_path<Config>(_: &Path, _: &Config) {}
20
21struct JobItem<Config> {
22    path: PathBuf,
23    cfg: Arc<Config>,
24}
25
26type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
27type JobSender<Config> = Sender<Option<JobItem<Config>>>;
28
29fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
30where
31    ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
32{
33    while let Ok(job) = receiver.recv() {
34        if job.is_none() {
35            break;
36        }
37        // Cannot panic because of the check immediately above.
38        let job = job.unwrap();
39        let path = job.path.clone();
40
41        if let Err(err) = func(job.path, &job.cfg) {
42            eprintln!("{:?} for file {:?}", err, path);
43        }
44    }
45}
46
47fn send_file<T>(
48    path: PathBuf,
49    cfg: &Arc<T>,
50    sender: &JobSender<T>,
51) -> Result<(), ConcurrentErrors> {
52    sender
53        .send(Some(JobItem {
54            path,
55            cfg: Arc::clone(cfg),
56        }))
57        .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
58}
59
60fn is_hidden(entry: &DirEntry) -> bool {
61    entry
62        .file_name()
63        .to_str()
64        .map(|s| s.starts_with('.'))
65        .unwrap_or(false)
66}
67
68fn explore<Config, ProcDirPaths, ProcPath>(
69    files_data: FilesData,
70    cfg: &Arc<Config>,
71    proc_dir_paths: ProcDirPaths,
72    proc_path: ProcPath,
73    sender: &JobSender<Config>,
74) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
75where
76    ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
77    ProcPath: Fn(&Path, &Config) + Send + Sync,
78{
79    let FilesData {
80        mut paths,
81        ref include,
82        ref exclude,
83    } = files_data;
84
85    let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
86
87    for path in paths.drain(..) {
88        if !path.exists() {
89            eprintln!("Warning: File doesn't exist: {:?}", path);
90            continue;
91        }
92        if path.is_dir() {
93            for entry in WalkDir::new(path)
94                .into_iter()
95                .filter_entry(|e| !is_hidden(e))
96            {
97                let entry = match entry {
98                    Ok(entry) => entry,
99                    Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
100                };
101                let path = entry.path().to_path_buf();
102                if (include.is_empty() || include.is_match(&path))
103                    && (exclude.is_empty() || !exclude.is_match(&path))
104                    && path.is_file()
105                {
106                    proc_dir_paths(&mut all_files, &path, cfg);
107                    send_file(path, cfg, sender)?;
108                }
109            }
110        } else if (include.is_empty() || include.is_match(&path))
111            && (exclude.is_empty() || !exclude.is_match(&path))
112            && path.is_file()
113        {
114            proc_path(&path, cfg);
115            send_file(path, cfg, sender)?;
116        }
117    }
118
119    Ok(all_files)
120}
121
122/// Series of errors that might happen when processing files concurrently.
123#[derive(Debug)]
124pub enum ConcurrentErrors {
125    /// Producer side error.
126    ///
127    /// An error occurred inside the producer thread.
128    Producer(String),
129    /// Sender side error.
130    ///
131    /// An error occurred when sending an item.
132    Sender(String),
133    /// Receiver side error.
134    ///
135    /// An error occurred inside one of the receiver threads.
136    Receiver(String),
137    /// Thread side error.
138    ///
139    /// A general error occurred when a thread is being spawned or run.
140    Thread(String),
141}
142
143/// Data related to files.
144pub struct FilesData {
145    /// Kind of files included in a search.
146    pub include: GlobSet,
147    /// Kind of files excluded from a search.
148    pub exclude: GlobSet,
149    /// List of file paths.
150    pub paths: Vec<PathBuf>,
151}
152
153/// A runner to process files concurrently.
154pub struct ConcurrentRunner<Config> {
155    proc_files: Box<ProcFilesFunction<Config>>,
156    proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
157    proc_path: Box<ProcPathFunction<Config>>,
158    num_jobs: usize,
159}
160
161impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
162    /// Creates a new `ConcurrentRunner`.
163    ///
164    /// * `num_jobs` - Number of jobs utilized to process files concurrently.
165    /// * `proc_files` - Function that processes each file found during
166    ///    the search.
167    pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
168    where
169        ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
170    {
171        let num_jobs = std::cmp::max(2, num_jobs) - 1;
172        Self {
173            proc_files: Box::new(proc_files),
174            proc_dir_paths: Box::new(null_proc_dir_paths),
175            proc_path: Box::new(null_proc_path),
176            num_jobs,
177        }
178    }
179
180    /// Sets the function to process the paths and subpaths contained in a
181    /// directory.
182    pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
183    where
184        ProcDirPaths:
185            'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
186    {
187        self.proc_dir_paths = Box::new(proc_dir_paths);
188        self
189    }
190
191    /// Sets the function to process a single path.
192    pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
193    where
194        ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
195    {
196        self.proc_path = Box::new(proc_path);
197        self
198    }
199
200    /// Runs the producer-consumer approach to process the files
201    /// contained in a directory and in its own subdirectories.
202    ///
203    /// * `config` - Information used to process a file.
204    /// * `files_data` - Information about the files to be included or excluded
205    ///    from a search more the number of paths considered in the search.
206    pub fn run(
207        self,
208        config: Config,
209        files_data: FilesData,
210    ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
211        let cfg = Arc::new(config);
212
213        let (sender, receiver) = unbounded();
214
215        let producer = {
216            let sender = sender.clone();
217
218            match thread::Builder::new()
219                .name(String::from("Producer"))
220                .spawn(move || {
221                    explore(
222                        files_data,
223                        &cfg,
224                        self.proc_dir_paths,
225                        self.proc_path,
226                        &sender,
227                    )
228                }) {
229                Ok(producer) => producer,
230                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
231            }
232        };
233
234        let mut receivers = Vec::with_capacity(self.num_jobs);
235        let proc_files = Arc::new(self.proc_files);
236        for i in 0..self.num_jobs {
237            let receiver = receiver.clone();
238            let proc_files = proc_files.clone();
239
240            let t = match thread::Builder::new()
241                .name(format!("Consumer {}", i))
242                .spawn(move || {
243                    consumer(receiver, proc_files);
244                }) {
245                Ok(receiver) => receiver,
246                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
247            };
248
249            receivers.push(t);
250        }
251
252        let all_files = match producer.join() {
253            Ok(res) => res,
254            Err(_) => {
255                return Err(ConcurrentErrors::Producer(
256                    "Child thread panicked".to_owned(),
257                ))
258            }
259        };
260
261        // Poison the receiver, now that the producer is finished.
262        for _ in 0..self.num_jobs {
263            if let Err(e) = sender.send(None) {
264                return Err(ConcurrentErrors::Sender(e.to_string()));
265            }
266        }
267
268        for receiver in receivers {
269            if receiver.join().is_err() {
270                return Err(ConcurrentErrors::Receiver(
271                    "A thread used to process a file panicked".to_owned(),
272                ));
273            }
274        }
275
276        all_files
277    }
278}