use crate::args::Args;
use crate::filetype::EntryType;
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_utils::Backoff;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
#[cfg(unix)]
#[path = "walk/unix.rs"]
mod platform;
#[cfg(not(unix))]
#[path = "walk/fallback.rs"]
mod platform;
pub enum WalkState {
Continue,
Quit,
}
pub struct Entry {
pub path: PathBuf,
pub file_type: EntryType,
}
impl Entry {
#[inline]
pub fn file_name(&self) -> &OsStr {
self.path.file_name().unwrap_or_else(|| self.path.as_os_str())
}
}
struct Task {
path: PathBuf,
file_type: EntryType,
depth: usize,
root_dev: u64,
ancestors: Option<Arc<Vec<(u64, u64)>>>,
}
pub fn walk_parallel<F, V>(args: &Args, roots: &[&Path], make_visitor: F)
where
F: Fn() -> V + Sync,
V: FnMut(Entry) -> WalkState + Send,
{
let n_workers = (args.threads - 1).max(1);
let injector = Injector::new();
let pending = AtomicUsize::new(0);
let quit = AtomicBool::new(false);
for root in roots {
let Ok((dev, _ino)) = platform::path_id(root) else {
continue;
};
let ancestors = args.follow_symlinks.then(|| Arc::new(Vec::new()));
pending.fetch_add(1, Ordering::SeqCst);
injector.push(Task {
path: root.to_path_buf(),
file_type: EntryType::Dir,
depth: 0,
root_dev: dev,
ancestors,
});
}
let workers: Vec<Worker<Task>> =
(0..n_workers).map(|_| Worker::new_lifo()).collect();
let stealers: Vec<Stealer<Task>> =
workers.iter().map(Worker::stealer).collect();
thread::scope(|scope| {
for worker in workers {
let injector = &injector;
let stealers = &stealers;
let pending = &pending;
let quit = &quit;
let make_visitor = &make_visitor;
scope.spawn(move || {
let mut visitor = make_visitor();
run_worker(
args,
&worker,
injector,
stealers,
pending,
quit,
&mut visitor,
);
});
}
});
}
fn run_worker<V: FnMut(Entry) -> WalkState>(
args: &Args,
local: &Worker<Task>,
injector: &Injector<Task>,
stealers: &[Stealer<Task>],
pending: &AtomicUsize,
quit: &AtomicBool,
visitor: &mut V,
) {
let backoff = Backoff::new();
loop {
if quit.load(Ordering::Relaxed) {
return;
}
match find_task(local, injector, stealers) {
Some(task) => {
backoff.reset();
process(args, task, local, pending, quit, visitor);
pending.fetch_sub(1, Ordering::SeqCst);
}
None => {
if pending.load(Ordering::SeqCst) == 0 {
return;
}
backoff.snooze();
}
}
}
}
fn find_task(
local: &Worker<Task>,
injector: &Injector<Task>,
stealers: &[Stealer<Task>],
) -> Option<Task> {
local.pop().or_else(|| {
std::iter::repeat_with(|| {
injector
.steal_batch_and_pop(local)
.or_else(|| stealers.iter().map(Stealer::steal).collect())
})
.find(|s| !s.is_retry())
.and_then(Steal::success)
})
}
fn process<V: FnMut(Entry) -> WalkState>(
args: &Args,
task: Task,
local: &Worker<Task>,
pending: &AtomicUsize,
quit: &AtomicBool,
visitor: &mut V,
) {
if let WalkState::Quit =
visitor(Entry { path: task.path.clone(), file_type: task.file_type })
{
quit.store(true, Ordering::Relaxed);
return;
}
descend(args, &task, local, pending);
}
fn descend(
args: &Args,
task: &Task,
local: &Worker<Task>,
pending: &AtomicUsize,
) {
let follow = match task.file_type {
EntryType::Dir => task.depth == 0,
EntryType::Symlink if args.follow_symlinks => true,
_ => return,
};
if let Some(max) = args.max_depth {
if task.depth >= max {
return;
}
}
let Ok(dir) = platform::open_dir(&task.path, follow) else {
return;
};
let Ok((dev, ino)) = platform::dir_id(&dir) else {
return;
};
if args.one_filesystem && dev != task.root_dev {
return;
}
if let Some(anc) = &task.ancestors {
if anc.contains(&(dev, ino)) {
return; }
}
let child_ancestors = task.ancestors.as_ref().map(|a| {
let mut v = (**a).clone();
v.push((dev, ino));
Arc::new(v)
});
let _ = platform::for_each_entry(&dir, &task.path, |path, raw_type| {
let file_type = match raw_type {
Some(t) => t,
None => match platform::lstat_type(&path) {
Ok(t) => t,
Err(_) => return,
},
};
pending.fetch_add(1, Ordering::SeqCst);
local.push(Task {
path,
file_type,
depth: task.depth + 1,
root_dev: task.root_dev,
ancestors: child_ancestors.clone(),
});
});
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use tempfile::TempDir;
fn base_args(threads: usize) -> Args {
Args {
threads,
path: vec![],
follow_symlinks: false,
one_filesystem: true,
max_depth: None,
name: None,
regex: None,
case_insensitive: false,
file_type: vec![],
}
}
fn collect(args: &Args, roots: &[&Path]) -> Vec<PathBuf> {
let sink = Mutex::new(Vec::new());
walk_parallel(args, roots, || {
|e: Entry| {
sink.lock().unwrap().push(e.path);
WalkState::Continue
}
});
sink.into_inner().unwrap()
}
#[test]
fn emits_root_and_all_descendants() {
let tmp = TempDir::new().unwrap();
std::fs::create_dir(tmp.path().join("a")).unwrap();
std::fs::write(tmp.path().join("a/f.txt"), b"x").unwrap();
std::fs::write(tmp.path().join("g.txt"), b"x").unwrap();
let root = tmp.path();
let got = collect(&base_args(4), &[root]);
assert!(got.iter().any(|p| p == root));
assert!(got.iter().any(|p| p.ends_with("a")));
assert!(got.iter().any(|p| p.ends_with("a/f.txt")));
assert!(got.iter().any(|p| p.ends_with("g.txt")));
assert_eq!(got.len(), 4);
}
#[test]
fn terminates_on_empty_directory() {
let tmp = TempDir::new().unwrap();
let got = collect(&base_args(4), &[tmp.path()]);
assert_eq!(got, vec![tmp.path().to_path_buf()]);
}
#[test]
fn terminates_on_deep_chain() {
let tmp = TempDir::new().unwrap();
let mut p = tmp.path().to_path_buf();
for i in 0..50 {
p = p.join(format!("d{i}"));
std::fs::create_dir(&p).unwrap();
}
let got = collect(&base_args(4), &[tmp.path()]);
assert_eq!(got.len(), 51);
}
#[test]
fn max_depth_limits_descent() {
let tmp = TempDir::new().unwrap();
std::fs::create_dir_all(tmp.path().join("l1/l2")).unwrap();
std::fs::write(tmp.path().join("l1/l2/deep.txt"), b"x").unwrap();
let mut args = base_args(4);
args.max_depth = Some(1);
let got = collect(&args, &[tmp.path()]);
assert!(got.iter().any(|p| p.ends_with("l1")));
assert!(!got.iter().any(|p| p.ends_with("deep.txt")));
}
#[cfg(unix)]
#[test]
fn symlink_not_followed_by_default() {
let tmp = TempDir::new().unwrap();
let real = tmp.path().join("real");
std::fs::create_dir(&real).unwrap();
std::fs::write(real.join("inside.txt"), b"x").unwrap();
std::os::unix::fs::symlink(&real, tmp.path().join("link")).unwrap();
let got = collect(&base_args(4), &[tmp.path()]);
assert!(got.iter().any(|p| p.ends_with("link")));
assert!(!got.iter().any(|p| p.starts_with(tmp.path().join("link"))
&& p.ends_with("inside.txt")));
}
#[cfg(unix)]
#[test]
fn symlinked_root_is_traversed() {
let tmp = TempDir::new().unwrap();
let real = tmp.path().join("real");
std::fs::create_dir(&real).unwrap();
std::fs::write(real.join("inside.txt"), b"x").unwrap();
let link = tmp.path().join("link");
std::os::unix::fs::symlink(&real, &link).unwrap();
let got = collect(&base_args(4), &[link.as_path()]);
assert!(
got.iter().any(|p| p.ends_with("inside.txt")),
"symlinked root must be descended"
);
}
#[cfg(unix)]
#[test]
fn symlink_cycle_does_not_hang() {
let tmp = TempDir::new().unwrap();
let a = tmp.path().join("a");
std::fs::create_dir(&a).unwrap();
std::os::unix::fs::symlink(&a, a.join("loop")).unwrap();
let mut args = base_args(4);
args.follow_symlinks = true;
let got = collect(&args, &[tmp.path()]);
assert!(got.iter().any(|p| p.ends_with("a")));
}
fn collect_typed(
args: &Args,
roots: &[&Path],
) -> Vec<(PathBuf, EntryType)> {
let sink = Mutex::new(Vec::new());
walk_parallel(args, roots, || {
|e: Entry| {
sink.lock().unwrap().push((e.path, e.file_type));
WalkState::Continue
}
});
sink.into_inner().unwrap()
}
#[test]
fn multiple_roots_all_traversed() {
let r1 = TempDir::new().unwrap();
let r2 = TempDir::new().unwrap();
std::fs::write(r1.path().join("one.txt"), b"x").unwrap();
std::fs::write(r2.path().join("two.txt"), b"x").unwrap();
let got = collect(&base_args(4), &[r1.path(), r2.path()]);
assert!(got.iter().any(|p| p.ends_with("one.txt")));
assert!(got.iter().any(|p| p.ends_with("two.txt")));
assert_eq!(got.len(), 4);
}
#[test]
fn hidden_files_are_emitted() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join(".hidden"), b"x").unwrap();
std::fs::create_dir(tmp.path().join(".dir")).unwrap();
let got = collect(&base_args(4), &[tmp.path()]);
assert!(got.iter().any(|p| p.ends_with(".hidden")));
assert!(got.iter().any(|p| p.ends_with(".dir")));
}
#[test]
fn no_duplicate_or_lost_entries() {
let tmp = TempDir::new().unwrap();
std::fs::create_dir(tmp.path().join("a")).unwrap();
std::fs::create_dir(tmp.path().join("b")).unwrap();
std::fs::write(tmp.path().join("a/x.txt"), b"x").unwrap();
std::fs::write(tmp.path().join("b/y.txt"), b"x").unwrap();
std::fs::write(tmp.path().join("c.txt"), b"x").unwrap();
let got = collect(&base_args(8), &[tmp.path()]);
let mut expected = vec![
tmp.path().to_path_buf(),
tmp.path().join("a"),
tmp.path().join("b"),
tmp.path().join("a/x.txt"),
tmp.path().join("b/y.txt"),
tmp.path().join("c.txt"),
];
let mut sorted = got.clone();
sorted.sort();
expected.sort();
assert_eq!(sorted, expected, "walk must emit each entry exactly once");
assert_eq!(got.len(), 6, "no duplicates");
}
#[test]
fn max_depth_zero_emits_only_root() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("child.txt"), b"x").unwrap();
let mut args = base_args(4);
args.max_depth = Some(0);
let got = collect(&args, &[tmp.path()]);
assert_eq!(got, vec![tmp.path().to_path_buf()]);
}
#[cfg(unix)]
#[test]
fn classifies_entry_types() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("f.txt"), b"x").unwrap();
std::fs::create_dir(tmp.path().join("d")).unwrap();
std::os::unix::fs::symlink(
tmp.path().join("f.txt"),
tmp.path().join("l"),
)
.unwrap();
let got = collect_typed(&base_args(4), &[tmp.path()]);
let ty = |name: &str| {
got.iter()
.find(|(p, _)| p.ends_with(name))
.unwrap_or_else(|| panic!("{name} missing"))
.1
};
assert_eq!(ty("f.txt"), EntryType::File);
assert_eq!(ty("d"), EntryType::Dir);
assert_eq!(ty("l"), EntryType::Symlink);
}
#[cfg(unix)]
#[test]
fn follow_symlinks_descends_into_symlinked_dir() {
let tmp = TempDir::new().unwrap();
let real = tmp.path().join("real");
std::fs::create_dir(&real).unwrap();
std::fs::write(real.join("inside.txt"), b"x").unwrap();
std::os::unix::fs::symlink(&real, tmp.path().join("link")).unwrap();
let mut args = base_args(4);
args.follow_symlinks = true;
let got = collect(&args, &[tmp.path()]);
assert!(
got.iter().any(|p| p.starts_with(tmp.path().join("link"))
&& p.ends_with("inside.txt")),
"follow_symlinks must descend into a symlinked directory"
);
}
#[cfg(unix)]
#[test]
fn broken_symlink_emitted_not_descended() {
let tmp = TempDir::new().unwrap();
std::os::unix::fs::symlink(
"/nonexistent/xyz/abc",
tmp.path().join("broken"),
)
.unwrap();
let mut args = base_args(4);
args.follow_symlinks = true;
let got = collect_typed(&args, &[tmp.path()]);
let broken = got.iter().find(|(p, _)| p.ends_with("broken"));
assert!(broken.is_some(), "broken symlink must still be emitted");
assert_eq!(broken.unwrap().1, EntryType::Symlink);
}
}