#![warn(missing_docs)]
use futures::stream::{self, Stream};
use std::{num::NonZeroU32, path::Path};
use tokio::{
fs,
sync::{Semaphore, mpsc},
};
mod errors;
#[cfg(feature = "glob")]
mod glob;
pub use errors::WalkError;
#[derive(Debug, Clone)]
pub struct WalkOptions {
pub recursive: bool,
pub follow_symlinks: bool,
#[cfg(feature = "glob")]
pub include_patterns: Vec<String>,
#[cfg(feature = "glob")]
pub exclude_patterns: Vec<String>,
pub channel_size: NonZeroU32,
pub concurrency_limit: NonZeroU32,
}
impl Default for WalkOptions {
fn default() -> Self {
Self {
recursive: true,
follow_symlinks: false,
#[cfg(feature = "glob")]
include_patterns: Vec::new(),
#[cfg(feature = "glob")]
exclude_patterns: Vec::new(),
concurrency_limit: NonZeroU32::new(10).unwrap(),
channel_size: NonZeroU32::new(64).unwrap(),
}
}
}
#[derive(Debug, Clone)]
pub struct WalkEntry {
pub path: std::path::PathBuf,
pub metadata: std::fs::Metadata,
}
impl WalkEntry {
pub fn is_dir(&self) -> bool {
self.metadata.is_dir()
}
pub fn is_file(&self) -> bool {
self.metadata.is_file()
}
pub fn is_symlink(&self) -> bool {
self.path.symlink_metadata().map(|m| m.file_type().is_symlink()).unwrap_or(false)
}
}
#[derive(Debug, Clone)]
pub struct Walker {
options: WalkOptions,
#[cfg(feature = "glob")]
glob_set: Option<glob::GlobSet>,
}
impl Walker {
pub fn new() -> Result<Self, WalkError> {
Self::with_options(WalkOptions::default())
}
pub fn with_options(options: WalkOptions) -> Result<Self, WalkError> {
#[cfg(feature = "glob")]
let mut walker = Self { options, glob_set: None };
#[cfg(not(feature = "glob"))]
let walker = Self { options };
#[cfg(feature = "glob")]
walker.build_glob_set()?;
Ok(walker)
}
#[cfg(feature = "glob")]
fn build_glob_set(&mut self) -> Result<(), WalkError> {
self.glob_set = Some(glob::build_glob_set(&self.options.include_patterns, &self.options.exclude_patterns)?);
Ok(())
}
#[cfg(feature = "glob")]
fn should_include(&self, path: &Path) -> bool {
glob::should_include(path, &self.glob_set)
}
#[cfg(not(feature = "glob"))]
fn should_include(&self, _path: &Path) -> bool {
true
}
pub async fn walk<P: AsRef<Path>>(
&self,
path: P,
) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>> + Unpin + use<P>, WalkError> {
let path = path.as_ref().to_path_buf();
let (tx, rx) = mpsc::channel(self.options.channel_size.get() as usize);
let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
let self_clone = self.clone();
tokio::spawn(async move {
let _ = self_clone.walk_recursive(&path, tx, &semaphore).await;
});
let stream = stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
Ok(Box::pin(stream))
}
async fn walk_recursive(
&self,
path: &Path,
tx: mpsc::Sender<Result<WalkEntry, WalkError>>,
semaphore: &Semaphore,
) -> Result<(), WalkError> {
if !self.should_include(path) {
return Ok(());
}
let metadata =
if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
let is_dir = metadata.is_dir();
let entry = WalkEntry { path: path.to_path_buf(), metadata };
if tx.send(Ok(entry)).await.is_err() {
return Ok(());
}
if is_dir && self.options.recursive {
let mut dir_entries = fs::read_dir(path).await?;
while let Some(entry) = dir_entries.next_entry().await? {
let entry_path = entry.path();
if self.should_include(&entry_path) {
let permit = semaphore.acquire().await?;
let tx_clone = tx.clone();
let self_clone = self.clone();
Box::pin(self_clone.walk_recursive(&entry_path, tx_clone, semaphore)).await?;
drop(permit);
}
}
}
Ok(())
}
pub async fn walk_with<P: AsRef<Path>, F: Fn(&WalkEntry) -> Result<(), WalkError>>(
&self,
path: P,
mut callback: F,
) -> Result<(), WalkError> {
let path = path.as_ref();
let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
self.walk_with_recursive(path, &mut callback, &semaphore).await
}
async fn walk_with_recursive<F: Fn(&WalkEntry) -> Result<(), WalkError>>(
&self,
path: &Path,
callback: &mut F,
semaphore: &Semaphore,
) -> Result<(), WalkError> {
if !self.should_include(path) {
return Ok(());
}
let metadata =
if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
let is_dir = metadata.is_dir();
let entry = WalkEntry { path: path.to_path_buf(), metadata };
callback(&entry)?;
if is_dir && self.options.recursive {
let mut dir_entries = fs::read_dir(path).await?;
while let Some(entry) = dir_entries.next_entry().await? {
let entry_path = entry.path();
if self.should_include(&entry_path) {
let permit = semaphore.acquire().await?;
let self_clone = self.clone();
Box::pin(self_clone.walk_with_recursive(&entry_path, callback, semaphore)).await?;
drop(permit);
}
}
}
Ok(())
}
}
pub async fn walk<P: AsRef<Path>>(path: P) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
Walker::new()?.walk(path).await
}
pub async fn walk_with_options<P: AsRef<Path>>(
path: P,
options: WalkOptions,
) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
Walker::with_options(options)?.walk(path).await
}