#![warn(clippy::all)]
mod core;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::cmp::Ordering;
use std::default::Default;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::core::{ReadDir, ReadDirSpec};
pub use crate::core::{DirEntry, DirEntryIter, Error};
pub use rayon;
pub type WalkDir = WalkDirGeneric<((), ())>;
pub type Result<T> = std::result::Result<T, Error>;
pub trait ClientState: Send + Default + Debug + 'static {
type ReadDirState: Clone + Send + Default + Debug + 'static;
type DirEntryState: Send + Default + Debug + 'static;
}
pub struct WalkDirGeneric<C: ClientState> {
root: PathBuf,
options: WalkDirOptions<C>,
}
type ProcessReadDirFunction<C> = dyn Fn(Option<usize>, &Path, &mut <C as ClientState>::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
+ Send
+ Sync
+ 'static;
#[derive(Clone)]
pub enum Parallelism {
Serial,
RayonDefaultPool {
busy_timeout: std::time::Duration,
},
RayonExistingPool {
pool: Arc<ThreadPool>,
busy_timeout: Option<std::time::Duration>,
},
RayonNewPool(usize),
}
struct WalkDirOptions<C: ClientState> {
sort: bool,
min_depth: usize,
max_depth: usize,
skip_hidden: bool,
follow_links: bool,
parallelism: Parallelism,
root_read_dir_state: C::ReadDirState,
process_read_dir: Option<Arc<ProcessReadDirFunction<C>>>,
}
impl<C: ClientState> WalkDirGeneric<C> {
pub fn new<P: AsRef<Path>>(root: P) -> Self {
WalkDirGeneric {
root: root.as_ref().to_path_buf(),
options: WalkDirOptions {
sort: false,
min_depth: 0,
max_depth: ::std::usize::MAX,
skip_hidden: true,
follow_links: false,
parallelism: Parallelism::RayonDefaultPool {
busy_timeout: std::time::Duration::from_secs(1),
},
root_read_dir_state: C::ReadDirState::default(),
process_read_dir: None,
},
}
}
pub fn try_into_iter(self) -> Result<DirEntryIter<C>> {
let iter = self.into_iter();
if iter.read_dir_iter.is_none() {
Err(Error::busy())
} else {
Ok(iter)
}
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn sort(mut self, sort: bool) -> Self {
self.options.sort = sort;
self
}
pub fn skip_hidden(mut self, skip_hidden: bool) -> Self {
self.options.skip_hidden = skip_hidden;
self
}
pub fn follow_links(mut self, follow_links: bool) -> Self {
self.options.follow_links = follow_links;
self
}
pub fn min_depth(mut self, depth: usize) -> Self {
self.options.min_depth = depth;
if self.options.min_depth > self.options.max_depth {
self.options.min_depth = self.options.max_depth;
}
self
}
pub fn max_depth(mut self, depth: usize) -> Self {
self.options.max_depth = depth;
if self.options.max_depth < self.options.min_depth {
self.options.max_depth = self.options.min_depth;
}
if self.options.max_depth < 2 {
self.options.parallelism = Parallelism::Serial;
}
self
}
pub fn parallelism(mut self, parallelism: Parallelism) -> Self {
self.options.parallelism = parallelism;
self
}
pub fn root_read_dir_state(mut self, read_dir_state: C::ReadDirState) -> Self {
self.options.root_read_dir_state = read_dir_state;
self
}
pub fn process_read_dir<F>(mut self, process_by: F) -> Self
where
F: Fn(Option<usize>, &Path, &mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
+ Send
+ Sync
+ 'static,
{
self.options.process_read_dir = Some(Arc::new(process_by));
self
}
}
fn process_dir_entry_result<C: ClientState>(
dir_entry_result: Result<DirEntry<C>>,
follow_links: bool,
) -> Result<DirEntry<C>> {
match dir_entry_result {
Ok(mut dir_entry) => {
if follow_links && dir_entry.file_type.is_symlink() {
dir_entry = dir_entry.follow_symlink()?;
}
if dir_entry.depth == 0 && dir_entry.file_type.is_symlink() {
let metadata = fs::metadata(dir_entry.path())
.map_err(|err| Error::from_path(0, dir_entry.path(), err))?;
if metadata.file_type().is_dir() {
dir_entry.read_children_path = Some(Arc::from(dir_entry.path()));
}
}
Ok(dir_entry)
}
Err(err) => Err(err),
}
}
impl<C: ClientState> IntoIterator for WalkDirGeneric<C> {
type Item = Result<DirEntry<C>>;
type IntoIter = DirEntryIter<C>;
fn into_iter(self) -> DirEntryIter<C> {
let sort = self.options.sort;
let max_depth = self.options.max_depth;
let min_depth = self.options.min_depth;
let parallelism = self.options.parallelism;
let skip_hidden = self.options.skip_hidden;
let follow_links = self.options.follow_links;
let process_read_dir = self.options.process_read_dir.clone();
let mut root_read_dir_state = self.options.root_read_dir_state;
let follow_link_ancestors = if follow_links {
Arc::new(vec![Arc::from(self.root.clone()) as Arc<Path>])
} else {
Arc::new(vec![])
};
let root_entry = DirEntry::from_path(0, &self.root, false, follow_link_ancestors);
let root_parent_path = root_entry
.as_ref()
.map(|root| root.parent_path().to_owned())
.unwrap_or_default();
let mut root_entry_results = vec![process_dir_entry_result(root_entry, follow_links)];
if let Some(process_read_dir) = process_read_dir.as_ref() {
process_read_dir(
None,
&root_parent_path,
&mut root_read_dir_state,
&mut root_entry_results,
);
}
DirEntryIter::new(
root_entry_results,
parallelism,
min_depth,
root_read_dir_state,
Arc::new(move |read_dir_spec| {
let ReadDirSpec {
path,
depth,
mut client_read_state,
mut follow_link_ancestors,
} = read_dir_spec;
let read_dir_depth = depth;
let read_dir_contents_depth = depth + 1;
if read_dir_contents_depth > max_depth {
return Ok(ReadDir::new(client_read_state, Vec::new()));
}
follow_link_ancestors = if follow_links {
let mut ancestors = Vec::with_capacity(follow_link_ancestors.len() + 1);
ancestors.extend(follow_link_ancestors.iter().cloned());
ancestors.push(path.clone());
Arc::new(ancestors)
} else {
follow_link_ancestors
};
let mut dir_entry_results: Vec<_> = fs::read_dir(path.as_ref())
.map_err(|err| Error::from_path(0, path.to_path_buf(), err))?
.filter_map(|dir_entry_result| {
let fs_dir_entry = match dir_entry_result {
Ok(fs_dir_entry) => fs_dir_entry,
Err(err) => {
return Some(Err(Error::from_io(read_dir_contents_depth, err)))
}
};
let dir_entry = match DirEntry::from_entry(
read_dir_contents_depth,
path.clone(),
&fs_dir_entry,
follow_link_ancestors.clone(),
) {
Ok(dir_entry) => dir_entry,
Err(err) => return Some(Err(err)),
};
if skip_hidden && is_hidden(&dir_entry.file_name) {
return None;
}
Some(process_dir_entry_result(Ok(dir_entry), follow_links))
})
.collect();
if sort {
dir_entry_results.sort_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
(Ok(_), Err(_)) => Ordering::Less,
(Err(_), Ok(_)) => Ordering::Greater,
(Err(_), Err(_)) => Ordering::Equal,
});
}
if let Some(process_read_dir) = process_read_dir.as_ref() {
process_read_dir(
Some(read_dir_depth),
path.as_ref(),
&mut client_read_state,
&mut dir_entry_results,
);
}
Ok(ReadDir::new(client_read_state, dir_entry_results))
}),
)
}
}
impl<C: ClientState> Clone for WalkDirOptions<C> {
fn clone(&self) -> WalkDirOptions<C> {
WalkDirOptions {
sort: false,
min_depth: self.min_depth,
max_depth: self.max_depth,
skip_hidden: self.skip_hidden,
follow_links: self.follow_links,
parallelism: self.parallelism.clone(),
root_read_dir_state: self.root_read_dir_state.clone(),
process_read_dir: self.process_read_dir.clone(),
}
}
}
impl Parallelism {
pub(crate) fn spawn<OP>(&self, op: OP)
where
OP: FnOnce() + Send + 'static,
{
match self {
Parallelism::Serial => op(),
Parallelism::RayonDefaultPool { .. } => rayon::spawn(op),
Parallelism::RayonNewPool(num_threads) => {
let mut thread_pool = ThreadPoolBuilder::new();
if *num_threads > 0 {
thread_pool = thread_pool.num_threads(*num_threads);
}
if let Ok(thread_pool) = thread_pool.build() {
thread_pool.spawn(op);
} else {
rayon::spawn(op);
}
}
Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op),
}
}
pub(crate) fn timeout(&self) -> Option<std::time::Duration> {
match self {
Parallelism::Serial | Parallelism::RayonNewPool(_) => None,
Parallelism::RayonDefaultPool { busy_timeout } => Some(*busy_timeout),
Parallelism::RayonExistingPool { busy_timeout, .. } => *busy_timeout,
}
}
}
fn is_hidden(file_name: &OsStr) -> bool {
file_name
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
impl<B, E> ClientState for (B, E)
where
B: Clone + Send + Default + Debug + 'static,
E: Send + Default + Debug + 'static,
{
type ReadDirState = B;
type DirEntryState = E;
}