#![forbid(unsafe_code)]
use std::ffi::OsStr;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_deque::{Injector, Stealer, Worker};
pub struct WalkConfig {
pub threads: usize,
pub max_depth: Option<usize>,
pub follow_links: bool,
}
impl Default for WalkConfig {
fn default() -> Self {
Self {
threads: std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4),
max_depth: None,
follow_links: false,
}
}
}
pub struct Entry {
pub path: PathBuf,
pub kind: EntryKind,
pub depth: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EntryKind {
File,
Dir,
Symlink,
Other,
}
pub struct EntryRef<'a> {
pub name: &'a OsStr,
pub depth: usize,
pub kind: EntryKind,
}
struct DirJob {
path: PathBuf,
depth: usize,
}
struct WalkCtx<P> {
worker: Worker<DirJob>,
injector: Arc<Injector<DirJob>>,
pending: Arc<AtomicUsize>,
pre_filter: Option<Arc<P>>,
max_depth: Option<usize>,
follow_links: bool,
}
pub fn walk<F, V, P>(root: PathBuf, config: WalkConfig, pre_filter: Option<P>, visitor_factory: F)
where
F: Fn() -> V + Send + Sync + 'static,
V: FnMut(Entry) + Send + 'static,
P: Fn(&EntryRef<'_>) -> bool + Send + Sync + 'static,
{
let injector = Arc::new(Injector::<DirJob>::new());
let visitor_factory = Arc::new(visitor_factory);
let pre_filter: Option<Arc<P>> = pre_filter.map(Arc::new);
injector.push(DirJob {
path: root,
depth: 0,
});
let n = config.threads.max(1);
let max_depth = config.max_depth;
let follow_links = config.follow_links;
let workers: Vec<Worker<DirJob>> = (0..n).map(|_| Worker::new_lifo()).collect();
let stealers: Arc<Vec<Stealer<DirJob>>> =
Arc::new(workers.iter().map(|w| w.stealer()).collect());
let pending = Arc::new(AtomicUsize::new(1));
std::thread::scope(|s| {
for worker in workers {
let injector = Arc::clone(&injector);
let stealers = Arc::clone(&stealers);
let pre_filter = pre_filter.clone();
let pending = Arc::clone(&pending);
let mut visitor = visitor_factory();
s.spawn(move || {
let ctx = WalkCtx {
worker,
injector,
pending,
pre_filter,
max_depth,
follow_links,
};
loop {
let job = ctx.worker.pop().or_else(|| {
stealers
.iter()
.find_map(|s| s.steal().success())
.or_else(|| ctx.injector.steal().success())
});
match job {
Some(job) => {
process_dir(job, &mut visitor, &ctx);
ctx.pending.fetch_sub(1, Ordering::Release);
}
None => {
if ctx.pending.load(Ordering::Acquire) == 0 {
break;
}
std::thread::yield_now();
}
}
}
});
}
});
}
fn process_dir<V, P>(job: DirJob, visitor: &mut V, ctx: &WalkCtx<P>)
where
V: FnMut(Entry),
P: Fn(&EntryRef<'_>) -> bool + Send + Sync,
{
let read = match fs::read_dir(&job.path) {
Ok(r) => r,
Err(_) => return,
};
for raw in read {
let raw = match raw {
Ok(e) => e,
Err(_) => continue,
};
let file_type = match raw.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
let is_symlink = file_type.is_symlink();
let is_dir = if is_symlink && ctx.follow_links {
raw.path().is_dir()
} else {
file_type.is_dir()
};
let kind = if is_dir {
EntryKind::Dir
} else if is_symlink {
EntryKind::Symlink
} else if file_type.is_file() {
EntryKind::File
} else {
EntryKind::Other
};
let depth = job.depth + 1;
let name = raw.file_name();
let pass = ctx
.pre_filter
.as_ref()
.map(|f| {
f(&EntryRef {
name: &name,
depth,
kind: kind.clone(),
})
})
.unwrap_or(true);
if pass {
let path = job.path.join(&name);
visitor(Entry {
path,
kind: kind.clone(),
depth,
});
}
if is_dir && ctx.max_depth.map(|d| depth < d).unwrap_or(true) {
ctx.pending.fetch_add(1, Ordering::Relaxed);
ctx.worker.push(DirJob {
path: job.path.join(&name),
depth,
});
}
}
}