#![forbid(unsafe_code)]
use std::ffi::OsStr;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_deque::{Injector, Stealer, Worker};
pub struct WalkConfig {
pub threads: usize,
pub max_depth: Option<usize>,
pub follow_links: bool,
}
impl Default for WalkConfig {
fn default() -> Self {
Self {
threads: std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4),
max_depth: None,
follow_links: false,
}
}
}
pub struct Entry {
pub path: PathBuf,
pub kind: EntryKind,
pub depth: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EntryKind {
File,
Dir,
Symlink,
Other,
}
pub struct EntryRef<'a> {
pub name: &'a OsStr,
pub depth: usize,
pub kind: EntryKind,
}
struct DirJob {
path: PathBuf,
depth: usize,
}
pub fn walk<F, P>(root: PathBuf, config: WalkConfig, pre_filter: Option<P>, visitor: F)
where
F: Fn(Entry) + Send + Sync + 'static,
P: Fn(&EntryRef<'_>) -> bool + Send + Sync + 'static,
{
let injector = Arc::new(Injector::<DirJob>::new());
let visitor = Arc::new(visitor);
let pre_filter: Option<Arc<P>> = pre_filter.map(Arc::new);
injector.push(DirJob { path: root, depth: 0 });
let n = config.threads.max(1);
let max_depth = config.max_depth;
let follow_links = config.follow_links;
let workers: Vec<Worker<DirJob>> = (0..n).map(|_| Worker::new_lifo()).collect();
let stealers: Arc<Vec<Stealer<DirJob>>> =
Arc::new(workers.iter().map(|w| w.stealer()).collect());
let active = Arc::new(AtomicUsize::new(0));
std::thread::scope(|s| {
for worker in workers {
let injector = Arc::clone(&injector);
let stealers = Arc::clone(&stealers);
let visitor = Arc::clone(&visitor);
let pre_filter = pre_filter.clone();
let active = Arc::clone(&active);
s.spawn(move || {
loop {
let job = worker.pop().or_else(|| {
stealers
.iter()
.find_map(|s| s.steal().success())
.or_else(|| injector.steal().success())
});
match job {
Some(job) => {
active.fetch_add(1, Ordering::Relaxed);
process_dir(
job,
&worker,
&injector,
&visitor,
pre_filter.as_ref(),
max_depth,
follow_links,
);
active.fetch_sub(1, Ordering::Relaxed);
}
None => {
if active.load(Ordering::Relaxed) == 0
&& injector.is_empty()
{
std::thread::yield_now();
if active.load(Ordering::Relaxed) == 0
&& injector.is_empty()
{
break;
}
} else {
std::thread::yield_now();
}
}
}
}
});
}
});
}
fn process_dir<F, P>(
job: DirJob,
worker: &Worker<DirJob>,
_injector: &Injector<DirJob>,
visitor: &Arc<F>,
pre_filter: Option<&Arc<P>>,
max_depth: Option<usize>,
follow_links: bool,
) where
F: Fn(Entry) + Send + Sync,
P: Fn(&EntryRef<'_>) -> bool + Send + Sync,
{
let read = match fs::read_dir(&job.path) {
Ok(r) => r,
Err(_) => return,
};
for raw in read {
let raw = match raw {
Ok(e) => e,
Err(_) => continue,
};
let file_type = match raw.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
let is_symlink = file_type.is_symlink();
let is_dir = if is_symlink && follow_links {
raw.path().is_dir()
} else {
file_type.is_dir()
};
let kind = if is_dir {
EntryKind::Dir
} else if is_symlink {
EntryKind::Symlink
} else if file_type.is_file() {
EntryKind::File
} else {
EntryKind::Other
};
let depth = job.depth + 1;
let name = raw.file_name();
let pass = pre_filter
.map(|f| f(&EntryRef { name: &name, depth, kind: kind.clone() }))
.unwrap_or(true);
if pass {
let path = job.path.join(&name);
visitor(Entry { path, kind: kind.clone(), depth });
}
if is_dir && max_depth.map(|d| depth < d).unwrap_or(true) {
let sub = DirJob { path: job.path.join(&name), depth };
worker.push(sub);
}
}
}