#![forbid(unsafe_code)]
#![deny(missing_docs)]
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use async_fs::{read_dir, ReadDir};
use futures_lite::future::Boxed as BoxedFut;
use futures_lite::future::FutureExt;
use futures_lite::stream::{self, Stream, StreamExt};
#[doc(no_inline)]
pub use async_fs::DirEntry;
#[doc(no_inline)]
pub use std::io::Result;
type BoxStream = futures_lite::stream::Boxed<Result<DirEntry>>;
pub struct WalkDir {
root: PathBuf,
entries: BoxStream,
}
#[derive(Debug, PartialEq, Eq)]
pub enum Filtering {
Ignore,
IgnoreDir,
Continue,
}
impl WalkDir {
pub fn new(root: impl AsRef<Path>) -> Self {
Self {
root: root.as_ref().to_owned(),
entries: walk_dir(
root,
None::<Box<dyn FnMut(DirEntry) -> BoxedFut<Filtering> + Send>>,
),
}
}
pub fn filter<F, Fut>(self, f: F) -> Self
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
let root = self.root.clone();
Self {
root: self.root,
entries: walk_dir(root, Some(f)),
}
}
}
impl Stream for WalkDir {
type Item = Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let entries = Pin::new(&mut self.entries);
entries.poll_next(cx)
}
}
fn walk_dir<F, Fut>(root: impl AsRef<Path>, filter: Option<F>) -> BoxStream
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
stream::unfold(
State::Start((root.as_ref().to_owned(), filter)),
move |state| async move {
match state {
State::Start((root, filter)) => match read_dir(root).await {
Err(e) => return Some((Err(e), State::Done)),
Ok(rd) => return walk(vec![rd], filter).await,
},
State::Walk((dirs, filter)) => return walk(dirs, filter).await,
State::Done => return None,
}
},
)
.boxed()
}
enum State<F> {
Start((PathBuf, Option<F>)),
Walk((Vec<ReadDir>, Option<F>)),
Done,
}
type UnfoldState<F> = (Result<DirEntry>, State<F>);
fn walk<F, Fut>(mut dirs: Vec<ReadDir>, filter: Option<F>) -> BoxedFut<Option<UnfoldState<F>>>
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
async move {
if let Some(dir) = dirs.last_mut() {
match dir.next().await {
Some(Ok(entry)) => walk_entry(entry, dirs, filter).await,
Some(Err(e)) => Some((Err(e), State::Walk((dirs, filter)))),
None => {
dirs.pop();
walk(dirs, filter).await
}
}
} else {
None
}
}
.boxed()
}
fn walk_entry<F, Fut>(
entry: DirEntry,
mut dirs: Vec<ReadDir>,
mut filter: Option<F>,
) -> BoxedFut<Option<UnfoldState<F>>>
where
F: FnMut(DirEntry) -> Fut + Send + 'static,
Fut: Future<Output = Filtering> + Send,
{
async move {
match entry.file_type().await {
Err(e) => Some((Err(e), State::Walk((dirs, filter)))),
Ok(ft) => {
let filtering = match filter.as_mut() {
Some(filter) => filter(entry.clone()).await,
None => Filtering::Continue,
};
if ft.is_dir() {
let rd = match read_dir(entry.path()).await {
Err(e) => return Some((Err(e), State::Walk((dirs, filter)))),
Ok(rd) => rd,
};
if filtering != Filtering::IgnoreDir {
dirs.push(rd);
}
}
match filtering {
Filtering::Continue => Some((Ok(entry), State::Walk((dirs, filter)))),
Filtering::IgnoreDir | Filtering::Ignore => walk(dirs, filter).await,
}
}
}
}
.boxed()
}
#[cfg(test)]
mod tests {
use std::io::{ErrorKind, Result};
use futures_lite::future::block_on;
use futures_lite::stream::StreamExt;
use super::{Filtering, WalkDir};
#[test]
fn walk_dir_empty() -> Result<()> {
block_on(async {
let root = tempfile::tempdir()?;
let mut wd = WalkDir::new(root.path());
assert!(wd.next().await.is_none());
Ok(())
})
}
#[test]
fn walk_dir_not_exist() {
block_on(async {
let mut wd = WalkDir::new("foobar");
match wd.next().await.unwrap() {
Ok(_) => panic!("want error"),
Err(e) => assert_eq!(e.kind(), ErrorKind::NotFound),
}
})
}
#[test]
fn walk_dir_files() -> Result<()> {
block_on(async {
let root = tempfile::tempdir()?;
let f1 = root.path().join("f1.txt");
let d1 = root.path().join("d1");
let f2 = d1.join("f2.txt");
let d2 = d1.join("d2");
let f3 = d2.join("f3.txt");
async_fs::create_dir_all(&d2).await?;
async_fs::write(&f1, []).await?;
async_fs::write(&f2, []).await?;
async_fs::write(&f3, []).await?;
let want = vec![
d1.to_owned(),
d2.to_owned(),
f3.to_owned(),
f2.to_owned(),
f1.to_owned(),
];
let mut wd = WalkDir::new(root.path());
let mut got = Vec::new();
while let Some(entry) = wd.next().await {
let entry = entry.unwrap();
got.push(entry.path());
}
got.sort();
assert_eq!(got, want);
Ok(())
})
}
#[test]
fn filter_dirs() -> Result<()> {
block_on(async {
let root = tempfile::tempdir()?;
let f1 = root.path().join("f1.txt");
let d1 = root.path().join("d1");
let f2 = d1.join("f2.txt");
let d2 = d1.join("d2");
let f3 = d2.join("f3.txt");
async_fs::create_dir_all(&d2).await?;
async_fs::write(&f1, []).await?;
async_fs::write(&f2, []).await?;
async_fs::write(&f3, []).await?;
let want = vec![f3.to_owned(), f2.to_owned(), f1.to_owned()];
let mut wd = WalkDir::new(root.path()).filter(|entry| async move {
match entry.file_type().await {
Ok(ft) if ft.is_dir() => Filtering::Ignore,
_ => Filtering::Continue,
}
});
let mut got = Vec::new();
while let Some(entry) = wd.next().await {
let entry = entry.unwrap();
got.push(entry.path());
}
got.sort();
assert_eq!(got, want);
Ok(())
})
}
#[test]
fn filter_dirs_no_traverse() -> Result<()> {
block_on(async {
let root = tempfile::tempdir()?;
let f1 = root.path().join("f1.txt");
let d1 = root.path().join("d1");
let f2 = d1.join("f2.txt");
let d2 = d1.join("d2");
let f3 = d2.join("f3.txt");
async_fs::create_dir_all(&d2).await?;
async_fs::write(&f1, []).await?;
async_fs::write(&f2, []).await?;
async_fs::write(&f3, []).await?;
let want = vec![d1, f2.to_owned(), f1.to_owned()];
let mut wd = WalkDir::new(root.path()).filter(move |entry| {
let d2 = d2.clone();
async move {
if entry.path() == d2 {
Filtering::IgnoreDir
} else {
Filtering::Continue
}
}
});
let mut got = Vec::new();
while let Some(entry) = wd.next().await {
let entry = entry.unwrap();
got.push(entry.path());
}
got.sort();
assert_eq!(got, want);
Ok(())
})
}
}