Skip to main content

rust_code_analysis_code_split/
concurrent_files.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::thread;
5
6use crossbeam::channel::{Receiver, Sender, unbounded};
7use globset::GlobSet;
8use walkdir::{DirEntry, WalkDir};
9
10type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
11
12type ProcDirPathsFunction<Config> =
13    dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
14
15type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
16
17// Null functions removed at compile time
18fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19fn null_proc_path<Config>(_: &Path, _: &Config) {}
20
21#[derive(Debug)]
22struct JobItem<Config> {
23    path: PathBuf,
24    cfg: Arc<Config>,
25}
26
27type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
28type JobSender<Config> = Sender<Option<JobItem<Config>>>;
29
30fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
31where
32    ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
33{
34    while let Ok(job) = receiver.recv() {
35        if job.is_none() {
36            break;
37        }
38        // Cannot panic because of the check immediately above.
39        let job = job.unwrap();
40        let path = job.path.clone();
41
42        if let Err(err) = func(job.path, &job.cfg) {
43            eprintln!("{err:?} for file {path:?}");
44        }
45    }
46}
47
48fn send_file<T>(
49    path: PathBuf,
50    cfg: &Arc<T>,
51    sender: &JobSender<T>,
52) -> Result<(), ConcurrentErrors> {
53    sender
54        .send(Some(JobItem {
55            path,
56            cfg: Arc::clone(cfg),
57        }))
58        .map_err(|e| ConcurrentErrors::Sender(e.to_string()))
59}
60
61fn is_hidden(entry: &DirEntry) -> bool {
62    entry
63        .file_name()
64        .to_str()
65        .map(|s| s.starts_with('.'))
66        .unwrap_or(false)
67}
68
69fn explore<Config, ProcDirPaths, ProcPath>(
70    files_data: FilesData,
71    cfg: &Arc<Config>,
72    proc_dir_paths: ProcDirPaths,
73    proc_path: ProcPath,
74    sender: &JobSender<Config>,
75) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
76where
77    ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
78    ProcPath: Fn(&Path, &Config) + Send + Sync,
79{
80    let FilesData {
81        mut paths,
82        ref include,
83        ref exclude,
84    } = files_data;
85
86    let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
87
88    for path in paths.drain(..) {
89        if !path.exists() {
90            eprintln!("Warning: File doesn't exist: {path:?}");
91            continue;
92        }
93        if path.is_dir() {
94            for entry in WalkDir::new(path)
95                .into_iter()
96                .filter_entry(|e| !is_hidden(e))
97            {
98                let entry = match entry {
99                    Ok(entry) => entry,
100                    Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
101                };
102                let path = entry.path().to_path_buf();
103                if (include.is_empty() || include.is_match(&path))
104                    && (exclude.is_empty() || !exclude.is_match(&path))
105                    && path.is_file()
106                {
107                    proc_dir_paths(&mut all_files, &path, cfg);
108                    send_file(path, cfg, sender)?;
109                }
110            }
111        } else if (include.is_empty() || include.is_match(&path))
112            && (exclude.is_empty() || !exclude.is_match(&path))
113            && path.is_file()
114        {
115            proc_path(&path, cfg);
116            send_file(path, cfg, sender)?;
117        }
118    }
119
120    Ok(all_files)
121}
122
123/// Series of errors that might happen when processing files concurrently.
124#[derive(Debug)]
125pub enum ConcurrentErrors {
126    /// Producer side error.
127    ///
128    /// An error occurred inside the producer thread.
129    Producer(String),
130    /// Sender side error.
131    ///
132    /// An error occurred when sending an item.
133    Sender(String),
134    /// Receiver side error.
135    ///
136    /// An error occurred inside one of the receiver threads.
137    Receiver(String),
138    /// Thread side error.
139    ///
140    /// A general error occurred when a thread is being spawned or run.
141    Thread(String),
142}
143
144/// Data related to files.
145#[derive(Debug)]
146pub struct FilesData {
147    /// Kind of files included in a search.
148    pub include: GlobSet,
149    /// Kind of files excluded from a search.
150    pub exclude: GlobSet,
151    /// List of file paths.
152    pub paths: Vec<PathBuf>,
153}
154
155/// A runner to process files concurrently.
156pub struct ConcurrentRunner<Config> {
157    proc_files: Box<ProcFilesFunction<Config>>,
158    proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
159    proc_path: Box<ProcPathFunction<Config>>,
160    num_jobs: usize,
161}
162
163impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
164    /// Creates a new `ConcurrentRunner`.
165    ///
166    /// * `num_jobs` - Number of jobs utilized to process files concurrently.
167    /// * `proc_files` - Function that processes each file found during
168    ///   the search.
169    pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
170    where
171        ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
172    {
173        let num_jobs = std::cmp::max(2, num_jobs) - 1;
174        Self {
175            proc_files: Box::new(proc_files),
176            proc_dir_paths: Box::new(null_proc_dir_paths),
177            proc_path: Box::new(null_proc_path),
178            num_jobs,
179        }
180    }
181
182    /// Sets the function to process the paths and subpaths contained in a
183    /// directory.
184    pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
185    where
186        ProcDirPaths:
187            'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
188    {
189        self.proc_dir_paths = Box::new(proc_dir_paths);
190        self
191    }
192
193    /// Sets the function to process a single path.
194    pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
195    where
196        ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
197    {
198        self.proc_path = Box::new(proc_path);
199        self
200    }
201
202    /// Runs the producer-consumer approach to process the files
203    /// contained in a directory and in its own subdirectories.
204    ///
205    /// * `config` - Information used to process a file.
206    /// * `files_data` - Information about the files to be included or excluded
207    ///   from a search more the number of paths considered in the search.
208    pub fn run(
209        self,
210        config: Config,
211        files_data: FilesData,
212    ) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
213        let cfg = Arc::new(config);
214
215        let (sender, receiver) = unbounded();
216
217        let producer = {
218            let sender = sender.clone();
219
220            match thread::Builder::new()
221                .name(String::from("Producer"))
222                .spawn(move || {
223                    explore(
224                        files_data,
225                        &cfg,
226                        self.proc_dir_paths,
227                        self.proc_path,
228                        &sender,
229                    )
230                }) {
231                Ok(producer) => producer,
232                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
233            }
234        };
235
236        let mut receivers = Vec::with_capacity(self.num_jobs);
237        let proc_files = Arc::new(self.proc_files);
238        for i in 0..self.num_jobs {
239            let receiver = receiver.clone();
240            let proc_files = proc_files.clone();
241
242            let t = match thread::Builder::new()
243                .name(format!("Consumer {i}"))
244                .spawn(move || {
245                    consumer(receiver, proc_files);
246                }) {
247                Ok(receiver) => receiver,
248                Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
249            };
250
251            receivers.push(t);
252        }
253
254        let all_files = match producer.join() {
255            Ok(res) => res,
256            Err(_) => {
257                return Err(ConcurrentErrors::Producer(
258                    "Child thread panicked".to_owned(),
259                ));
260            }
261        };
262
263        // Poison the receiver, now that the producer is finished.
264        for _ in 0..self.num_jobs {
265            if let Err(e) = sender.send(None) {
266                return Err(ConcurrentErrors::Sender(e.to_string()));
267            }
268        }
269
270        for receiver in receivers {
271            if receiver.join().is_err() {
272                return Err(ConcurrentErrors::Receiver(
273                    "A thread used to process a file panicked".to_owned(),
274                ));
275            }
276        }
277
278        all_files
279    }
280}