liboskar/walk_parallel/
mod.rs

1extern crate crossbeam;
2extern crate walkdir;
3
4pub mod single_threaded;
5
6use self::crossbeam::deque::fifo;
7use self::crossbeam::deque::Pop;
8use self::crossbeam::deque::Steal;
9use self::crossbeam::deque::Worker;
10use self::walkdir::WalkDir;
11use colored::*;
12use error::*;
13use regex::{Regex, RegexSet};
14use std::ffi::OsStr;
15use std::fs;
16use std::path::Path;
17use std::path::PathBuf;
18use std::process::exit;
19use std::sync::atomic::AtomicUsize;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread;
23use types::FileSize;
24use utils::size;
25
26pub use walk_parallel::single_threaded::*;
27
28/// Enum for messaging between workers/stealers
29pub enum Status<T> {
30    Done,
31    Data(T),
32}
33
34/// The 'Walk' struct contains all the information we need to traverse a directory.
35#[derive(Debug)]
36pub struct Walk {
37    pub path: PathBuf,
38    gitignore: Option<RegexSet>,
39    hgignore: Option<RegexSet>,
40    darcs_boring: Option<RegexSet>,
41    excludes: Option<Regex>,
42    max_depth: Option<u8>,
43    threshold: Option<u64>,
44    start_depth: usize,
45    nproc: usize,
46    show_files: bool,
47    get_blocks: bool,
48    follow_symlinks: bool,
49    artifacts_only: bool,
50}
51
52impl Walk {
53    /// function to make output from a 'Walk', using one thread. It also takes an 'Arc<AtomicU64>'
54    /// and will add the relevant directory sizes to it.
55    pub fn print_dir(w: &Walk, total: &Arc<AtomicUsize>) {
56        let excludes = match w.excludes {
57            Some(ref x) => Some(x),
58            _ => None,
59        };
60
61        let v = if excludes.is_some() || w.artifacts_only {
62            read_all(
63                &w.path,
64                w.start_depth as u8,
65                w.max_depth,
66                excludes,
67                &w.gitignore,
68                false,
69                w.artifacts_only,
70            )
71        } else {
72            read_all_fast(&w.path, w.start_depth as u8, w.max_depth)
73        };
74
75        let subdir_size = v.file_size.get();
76
77        total.fetch_add(subdir_size as usize, Ordering::Relaxed);
78
79        let mut to_print = if let Some(m) = w.threshold {
80            subdir_size > m
81        } else {
82            true
83        };
84
85        if let Some(0) = w.max_depth {
86            to_print = false;
87        }
88
89        if to_print {
90            // filter by depth
91            let mut v_filtered = v.filtered(w.threshold, !w.show_files, w.max_depth);
92
93            v_filtered.display_tree(&w.path);
94        }
95    }
96
97    /// set the maximum depth to display
98    pub fn set_depth(&mut self, d: u8) {
99        self.max_depth = Some(d);
100    }
101
102    /// set the regex for excludes
103    pub fn set_regex(&mut self, r: Regex) {
104        self.excludes = Some(r);
105    }
106
107    /// set the minumum file size
108    pub fn set_threshold(&mut self, n: u64) {
109        self.threshold = Some(n);
110    }
111
112    /// include files when printing
113    pub fn with_files(&mut self) {
114        self.show_files = true;
115    }
116
117    /// include files when printing
118    pub fn blocks(&mut self) {
119        self.get_blocks = true;
120    }
121
122    /// include files when printing
123    pub fn artifacts_only(&mut self) {
124        self.artifacts_only = true;
125    }
126
127    fn get_proc(&self) -> usize {
128        self.nproc
129    }
130
131    /// Create a new 'Walk' from a 'PathBuf' and the number
132    /// of processor cores to be used.
133    pub fn new(p: PathBuf, n: usize) -> Walk {
134        Walk {
135            path: p,
136            gitignore: None,
137            hgignore: None,
138            darcs_boring: None,
139            excludes: None,
140            max_depth: None,
141            threshold: None,
142            start_depth: 0,
143            nproc: n,
144            show_files: false,
145            get_blocks: false,
146            follow_symlinks: false,
147            artifacts_only: false,
148        }
149    }
150
151    fn bump_depth(&mut self) {
152        self.start_depth += 1;
153    }
154
155    /// This takes a 'Walk' and a 'Worker<Status<Walk>>' and executes the walk *in parallel*,
156    /// creating new work for each subdirectory. It's not the most efficient concurrency
157    /// imaginable, but it's fast and easy-ish to use. It *also* takes in an 'Arc<AtomicU64>',
158    /// which it updates with any file sizes in the directory.
159    pub fn push_subdir(w: &Walk, worker: &mut Worker<Status<Walk>>, total: &Arc<AtomicUsize>) {
160        let in_paths = &w.path;
161
162        // fill up queue + print out files
163        if let Ok(paths) = fs::read_dir(in_paths) {
164            // iterate over all the entries in the directory
165            for p in paths {
166                let val = match p {
167                    Ok(x) => x,
168                    _ => {
169                        eprintln!("{}: path error at {:?}.", "Error".red(), p);
170                        exit(0x0001)
171                    }
172                };
173
174                let exclude_check = if let Some(ref x) = w.excludes {
175                    if let Some(r) = val.path().into_os_string().to_str() {
176                        !x.is_match(r)
177                    } else {
178                        eprintln!(
179                            "{}: ignoring invalid unicode at: {:?}",
180                            "Warning".yellow(),
181                            val.path().display()
182                        );
183                        true
184                    }
185                } else {
186                    true
187                };
188
189                if exclude_check {
190                    match val.file_type() {
191                        Ok(t) => {
192                            if t.is_dir() {
193                                let mut new_path = w.path.to_owned();
194                                new_path.push(val.file_name());
195                                let mut new_walk = Walk::new(new_path, w.get_proc());
196                                if w.excludes.is_some() {
197                                    new_walk.set_regex(w.excludes.clone().unwrap());
198                                }
199                                if w.show_files {
200                                    new_walk.with_files();
201                                }
202                                new_walk.bump_depth();
203                                if let Some(d) = w.max_depth {
204                                    new_walk.set_depth(d);
205                                }
206                                if let Some(b) = w.threshold {
207                                    new_walk.set_threshold(b);
208                                }
209                                worker.push(Status::Data(new_walk)); // pass a vector of Arc's to do 2-level traversals?
210                            } else if t.is_file() {
211                                if let Ok(l) = val.metadata() {
212                                    let size = size(&l, w.get_blocks);
213                                    total.fetch_add(size as usize, Ordering::Relaxed);
214                                    if w.show_files && size != 0 {
215                                        let to_formatted = format!("{}", FileSize::new(size));
216                                        println!(
217                                            "{}\t {}",
218                                            &to_formatted.green(),
219                                            val.path().display()
220                                        );
221                                    }
222                                } else {
223                                    eprintln!(
224                                        "{}: could not find filesize for file at {}.",
225                                        "Warning".yellow(),
226                                        val.path().display()
227                                    );
228                                }
229                            }
230                        }
231                        _ => eprintln!(
232                            "{}: could not determine file type for: {}",
233                            "Warning".yellow(),
234                            val.path().display()
235                        ),
236                    }
237                }
238            }
239
240            // send "done" messages to all the workers
241            let iter = 0..(w.get_proc());
242            iter.map(|_| worker.push(Status::Done)).count();
243        }
244        // if we can't read the directory contents, figure out why
245        // 1: check the path exists
246        else if !in_paths.exists() {
247            eprintln!(
248                "{}: path '{}' does not exist, or you do not have permission to enter.",
249                "Error".red(),
250                &in_paths.display()
251            );
252        }
253        // 2: check the path is actually a directory
254        else if !in_paths.is_dir() {
255            if w.artifacts_only {
256                eprintln!(
257                    "{}: {} is not a directory; not searching for artifacts",
258                    "Warning".yellow(),
259                    &in_paths.display()
260                );
261            }
262
263            if let Ok(l) = in_paths.metadata() {
264                let size = size(&l, w.get_blocks); // l.len();
265                let to_formatted = format!("{}", FileSize::new(size));
266                println!("{}\t {}", &to_formatted.green(), in_paths.display());
267            } else {
268                panic!("{}", Internal::IoError);
269            }
270        }
271        // 3: otherwise, give a warning about permissions
272        else {
273            eprintln!(
274                "{}: permission denied for directory: {}",
275                "Warning".yellow(),
276                &in_paths.display()
277            );
278        }
279    }
280}
281
282fn ats_cgen(p: Option<&OsStr>) -> bool {
283    lazy_static! {
284        static ref DATS_C: Regex =
285            Regex::new(r"(_(d|h|s)ats\.c|_lats\.dats|_sats\.c|_stub\.h)$").unwrap();
286    }
287    match p {
288        Some(p) => DATS_C.is_match(&p.to_string_lossy().to_string()),
289        None => false,
290    }
291}
292
293fn latex_log<P: AsRef<Path>>(p: P) -> bool {
294    lazy_static! {
295        static ref LOG: Regex = Regex::new(r"\.log$").unwrap();
296    }
297
298    if LOG.is_match(&p.as_ref().to_string_lossy().to_string()) {
299        let mut parent = (&p.as_ref())
300            .parent()
301            .unwrap()
302            .to_string_lossy()
303            .to_string();
304        parent.push_str("/*.tex");
305        glob_exists(&parent)
306    } else {
307        false
308    }
309}
310
311// TODO figure out why the unwrap_or is failing?
312// FIXME take optional reference to a regex
313pub fn clean_project_dirs<P: AsRef<Path>>(p: P, exclude: &Option<Regex>, _: bool) {
314    lazy_static! {
315        static ref REGEX: Regex =
316            Regex::new(r"\.(a|i|ii|la|lo|o|keter|bc|dyn_o|d|rlib|crate|hi|hc|chi|dyn_hi|jsexe|webapp|js\.externs|ibc|toc|aux|fdb_latexmk|spl|bbl|blg|fls|egg-info|whl|js_a|js_hi|jld|ji|js_o|so.*|dump-.*|vmb|crx|orig|elmo|elmi|hspec-failures|pyc|mod|vo|beam|agdai|go\.(v|teak|xmldef|rewrittenast|rewrittengo|simplego|tree-(bind|eval|finish|parse))|p_hi|p_o|prof|hide-cache|ghc\.environment\..*\d.\d.\d|(t|p|m)ix|synctex\.gz|hl|sandbox\.config|hp|eventlog|ipa|ttc|chs\.h|chi|\d+\.actual|\d+\.expected)$")
317            .unwrap();
318    }
319    lazy_static! {
320        static ref SRC_CONTROL: Regex = Regex::new(r"(_darcs|\.(git|hg|pijul|gnupg))").unwrap();
321    }
322
323    for dir in WalkDir::new(p)
324        .into_iter()
325        .filter_map(|e| e.ok())
326        .filter(|p| {
327            exclude
328                .clone()
329                .map(|e| e.is_match(&p.path().to_string_lossy().to_string()))
330                != Some(false)
331        })
332        .filter(|p| !SRC_CONTROL.is_match(&p.path().to_string_lossy().to_string()))
333        .filter(|p| {
334            REGEX.is_match(&p.path().to_string_lossy().to_string())
335                || is_project_dir(
336                    &p.path().to_string_lossy().to_string(),
337                    &p.path()
338                        .file_name()
339                        .map(|x| x.to_string_lossy().to_string())
340                        .unwrap_or_else(|| "".to_string()),
341                )
342                || latex_log(&p.path())
343                || ats_cgen(p.path().file_name())
344                || ({
345                    let x = &p.path().to_string_lossy().to_string();
346                    x.ends_with("/flxg_stats.txt")
347                })
348        })
349    {
350        if dir.file_type().is_file() {
351            fs::remove_file(dir.path()).unwrap_or(());
352        } else if dir.file_type().is_dir() {
353            fs::remove_dir_all(dir.path()).unwrap_or(());
354        }
355    }
356}
357
358/// Given a 'Walk' struct, traverse it concurrently and print out any relevant outputs.
359/// Currently, this only works for a depth of two, which is probably bad.
360pub fn print_parallel(w: Walk) {
361    // initialize the total at 0 and create a reference to it
362    let val = AtomicUsize::new(0);
363    let arc = Arc::new(val);
364    let arc_producer = arc.clone();
365    let arc_child = arc.clone();
366    let path_display = w.path.clone();
367
368    // set up worker & stealer
369    let (mut worker, stealer) = fifo();
370
371    // set up our iterator for the workers
372    let iter = 0..(&w.get_proc() - 1);
373
374    let _ = (&w.path).is_file();
375
376    // create the producer in another thread
377    let child_producer = thread::spawn(move || {
378        let arc_local = arc_producer.clone();
379
380        // assign work to everyone
381        Walk::push_subdir(&w, &mut worker, &arc_local);
382
383        // start popping off values in the worker's thread
384        loop {
385            if let Pop::Data(p) = worker.pop() {
386                match p {
387                    Status::Data(d) => Walk::print_dir(&d, &arc_local),
388                    _ => break,
389                }
390            }
391        }
392    });
393
394    // create a vector of thread handles so that it doesn't execute
395    // everything sequentially.
396    let mut threads = Vec::new();
397
398    // set up as many workers as we have threads
399    for _ in iter {
400        // create a new stealer
401        let stealer_clone = stealer.clone();
402
403        let arc_local = arc_child.clone();
404
405        // run the stealer in a new thread
406        let child_consumer = thread::spawn(move || loop {
407            if let Steal::Data(p) = stealer_clone.steal() {
408                match p {
409                    Status::Data(d) => Walk::print_dir(&d, &arc_local),
410                    _ => break,
411                }
412            }
413        });
414
415        threads.push(child_consumer);
416    }
417
418    // join the child producer to the main thread
419    let _ = child_producer.join();
420
421    // join the workers to the main thread
422    let _ = threads
423        .into_iter()
424        .map(|v| {
425            let result = v.join();
426            if let Ok(exit) = result {
427                exit
428            } else if let Err(e) = result {
429                panic!("{:?}", e)
430            }
431        })
432        .count();
433
434    // get the total size
435    let m = arc.load(Ordering::SeqCst); // TODO - check if this works with Relaxed?
436    let size = FileSize::new(m as u64);
437
438    // print directory total.
439    if size != FileSize::new(0) {
440        let to_formatted = format!("{}", size);
441        println!("{}\t {}", &to_formatted.green(), path_display.display());
442    }
443}