1#![forbid(unsafe_code)]
82#![deny(missing_docs)]
83
84mod error;
85
86use std::future::Future;
87use std::path::{Path, PathBuf};
88use std::pin::Pin;
89use std::task::{Context, Poll};
90
91use async_fs::{read_dir, ReadDir};
92use futures_lite::future::Boxed as BoxedFut;
93use futures_lite::future::FutureExt;
94use futures_lite::stream::{self, Stream, StreamExt};
95
96#[doc(no_inline)]
97pub use async_fs::DirEntry;
98
99pub use error::Error;
100use error::InnerError;
101
102pub type Result<T> = std::result::Result<T, Error>;
104
105type BoxStream = futures_lite::stream::Boxed<Result<DirEntry>>;
106
107pub struct WalkDir {
117 root: PathBuf,
118 entries: BoxStream,
119}
120
121#[derive(Debug, PartialEq, Eq)]
123pub enum Filtering {
124 Ignore,
126 IgnoreDir,
129 Continue,
131}
132
133impl WalkDir {
134 pub fn new(root: impl AsRef<Path>) -> Self {
136 Self {
137 root: root.as_ref().to_owned(),
138 entries: walk_dir(
139 root,
140 None::<Box<dyn FnMut(DirEntry) -> BoxedFut<Filtering> + Send>>,
141 ),
142 }
143 }
144
145 pub fn filter<F, Fut>(self, f: F) -> Self
147 where
148 F: FnMut(DirEntry) -> Fut + Send + 'static,
149 Fut: Future<Output = Filtering> + Send,
150 {
151 let root = self.root.clone();
152 Self {
153 root: self.root,
154 entries: walk_dir(root, Some(f)),
155 }
156 }
157}
158
159impl Stream for WalkDir {
160 type Item = Result<DirEntry>;
161
162 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163 let entries = Pin::new(&mut self.entries);
164 entries.poll_next(cx)
165 }
166}
167
168fn walk_dir<F, Fut>(root: impl AsRef<Path>, filter: Option<F>) -> BoxStream
169where
170 F: FnMut(DirEntry) -> Fut + Send + 'static,
171 Fut: Future<Output = Filtering> + Send,
172{
173 stream::unfold(
174 State::Start((root.as_ref().to_owned(), filter)),
175 move |state| async move {
176 match state {
177 State::Start((root, filter)) => match read_dir(&root).await {
178 Err(source) => Some((
179 Err(InnerError::Io { path: root, source }.into()),
180 State::Done,
181 )),
182 Ok(rd) => walk(vec![(root, rd)], filter).await,
183 },
184 State::Walk((dirs, filter)) => walk(dirs, filter).await,
185 State::Done => None,
186 }
187 },
188 )
189 .boxed()
190}
191
192enum State<F> {
193 Start((PathBuf, Option<F>)),
194 Walk((Vec<(PathBuf, ReadDir)>, Option<F>)),
195 Done,
196}
197
198type UnfoldState<F> = (Result<DirEntry>, State<F>);
199
200fn walk<F, Fut>(
201 mut dirs: Vec<(PathBuf, ReadDir)>,
202 filter: Option<F>,
203) -> BoxedFut<Option<UnfoldState<F>>>
204where
205 F: FnMut(DirEntry) -> Fut + Send + 'static,
206 Fut: Future<Output = Filtering> + Send,
207{
208 async move {
209 if let Some((path, dir)) = dirs.last_mut() {
210 match dir.next().await {
211 Some(Ok(entry)) => walk_entry(entry, dirs, filter).await,
212 Some(Err(source)) => Some((
213 Err(InnerError::Io {
214 path: path.to_path_buf(),
215 source,
216 }
217 .into()),
218 State::Walk((dirs, filter)),
219 )),
220 None => {
221 dirs.pop();
222 walk(dirs, filter).await
223 }
224 }
225 } else {
226 None
227 }
228 }
229 .boxed()
230}
231
232fn walk_entry<F, Fut>(
233 entry: DirEntry,
234 mut dirs: Vec<(PathBuf, ReadDir)>,
235 mut filter: Option<F>,
236) -> BoxedFut<Option<UnfoldState<F>>>
237where
238 F: FnMut(DirEntry) -> Fut + Send + 'static,
239 Fut: Future<Output = Filtering> + Send,
240{
241 async move {
242 match entry.file_type().await {
243 Err(source) => Some((
244 Err(InnerError::Io {
245 path: entry.path(),
246 source,
247 }
248 .into()),
249 State::Walk((dirs, filter)),
250 )),
251 Ok(ft) => {
252 let filtering = match filter.as_mut() {
253 Some(filter) => filter(entry.clone()).await,
254 None => Filtering::Continue,
255 };
256 if ft.is_dir() {
257 let path = entry.path();
258 let rd = match read_dir(&path).await {
259 Err(source) => {
260 return Some((
261 Err(InnerError::Io { path, source }.into()),
262 State::Walk((dirs, filter)),
263 ))
264 }
265 Ok(rd) => rd,
266 };
267 if filtering != Filtering::IgnoreDir {
268 dirs.push((path, rd));
269 }
270 }
271 match filtering {
272 Filtering::Continue => Some((Ok(entry), State::Walk((dirs, filter)))),
273 Filtering::IgnoreDir | Filtering::Ignore => walk(dirs, filter).await,
274 }
275 }
276 }
277 }
278 .boxed()
279}
280
281#[cfg(test)]
282mod tests {
283 use std::io::{ErrorKind, Result};
284
285 use futures_lite::future::block_on;
286 use futures_lite::stream::StreamExt;
287
288 use super::{Filtering, WalkDir};
289
290 #[test]
291 fn walk_dir_empty() -> Result<()> {
292 block_on(async {
293 let root = tempfile::tempdir()?;
294 let mut wd = WalkDir::new(root.path());
295 assert!(wd.next().await.is_none());
296 Ok(())
297 })
298 }
299
300 #[test]
301 fn walk_dir_not_exist() {
302 block_on(async {
303 let mut wd = WalkDir::new("foobar");
304 match wd.next().await.unwrap() {
305 Err(e) => {
306 assert_eq!(wd.root, e.path().unwrap());
307 assert_eq!(e.io().unwrap().kind(), ErrorKind::NotFound);
308 assert_eq!(e.into_io().unwrap().kind(), ErrorKind::NotFound);
309 }
310 _ => panic!("want IO error"),
311 }
312 })
313 }
314
315 #[test]
316 fn into_io_error() {
317 block_on(async {
318 let mut wd = WalkDir::new("foobar");
319 match wd.next().await.unwrap() {
320 Err(e) => {
321 let e: std::io::Error = e.into();
322 assert_eq!(e.kind(), ErrorKind::NotFound);
323 }
324 _ => panic!("want IO error"),
325 }
326 })
327 }
328
329 #[test]
330 fn walk_dir_files() -> Result<()> {
331 block_on(async {
332 let root = tempfile::tempdir()?;
333 let f1 = root.path().join("f1.txt");
334 let d1 = root.path().join("d1");
335 let f2 = d1.join("f2.txt");
336 let d2 = d1.join("d2");
337 let f3 = d2.join("f3.txt");
338
339 async_fs::create_dir_all(&d2).await?;
340 async_fs::write(&f1, []).await?;
341 async_fs::write(&f2, []).await?;
342 async_fs::write(&f3, []).await?;
343
344 let want = vec![
345 d1.to_owned(),
346 d2.to_owned(),
347 f3.to_owned(),
348 f2.to_owned(),
349 f1.to_owned(),
350 ];
351 let mut wd = WalkDir::new(root.path());
352
353 let mut got = Vec::new();
354 while let Some(entry) = wd.next().await {
355 let entry = entry.unwrap();
356 got.push(entry.path());
357 }
358 got.sort();
359 assert_eq!(got, want);
360
361 Ok(())
362 })
363 }
364
365 #[test]
366 fn filter_dirs() -> Result<()> {
367 block_on(async {
368 let root = tempfile::tempdir()?;
369 let f1 = root.path().join("f1.txt");
370 let d1 = root.path().join("d1");
371 let f2 = d1.join("f2.txt");
372 let d2 = d1.join("d2");
373 let f3 = d2.join("f3.txt");
374
375 async_fs::create_dir_all(&d2).await?;
376 async_fs::write(&f1, []).await?;
377 async_fs::write(&f2, []).await?;
378 async_fs::write(&f3, []).await?;
379
380 let want = vec![f3.to_owned(), f2.to_owned(), f1.to_owned()];
381
382 let mut wd = WalkDir::new(root.path()).filter(|entry| async move {
383 match entry.file_type().await {
384 Ok(ft) if ft.is_dir() => Filtering::Ignore,
385 _ => Filtering::Continue,
386 }
387 });
388
389 let mut got = Vec::new();
390 while let Some(entry) = wd.next().await {
391 let entry = entry.unwrap();
392 got.push(entry.path());
393 }
394 got.sort();
395 assert_eq!(got, want);
396
397 Ok(())
398 })
399 }
400
401 #[test]
402 fn filter_dirs_no_traverse() -> Result<()> {
403 block_on(async {
404 let root = tempfile::tempdir()?;
405 let f1 = root.path().join("f1.txt");
406 let d1 = root.path().join("d1");
407 let f2 = d1.join("f2.txt");
408 let d2 = d1.join("d2");
409 let f3 = d2.join("f3.txt");
410
411 async_fs::create_dir_all(&d2).await?;
412 async_fs::write(&f1, []).await?;
413 async_fs::write(&f2, []).await?;
414 async_fs::write(&f3, []).await?;
415
416 let want = vec![d1, f2.to_owned(), f1.to_owned()];
417
418 let mut wd = WalkDir::new(root.path()).filter(move |entry| {
419 let d2 = d2.clone();
420 async move {
421 if entry.path() == d2 {
422 Filtering::IgnoreDir
423 } else {
424 Filtering::Continue
425 }
426 }
427 });
428
429 let mut got = Vec::new();
430 while let Some(entry) = wd.next().await {
431 let entry = entry.unwrap();
432 got.push(entry.path());
433 }
434 got.sort();
435 assert_eq!(got, want);
436
437 Ok(())
438 })
439 }
440}
441
442#[cfg(all(unix, test))]
443mod test_unix {
444 use async_fs::unix::PermissionsExt;
445 use std::io::Result;
446
447 use futures_lite::future::block_on;
448 use futures_lite::stream::StreamExt;
449
450 use super::WalkDir;
451 #[test]
452 fn walk_dir_error_path() -> Result<()> {
453 block_on(async {
454 let root = tempfile::tempdir()?;
455 let d1 = root.path().join("d1");
456 async_fs::create_dir_all(&d1).await?;
457 let mut perms = async_fs::metadata(&d1).await?.permissions();
458 perms.set_mode(0o222);
459 async_fs::set_permissions(&d1, perms).await?;
460 let mut wd = WalkDir::new(&root);
461 match wd.next().await.unwrap() {
462 Err(e) => assert_eq!(e.path().unwrap(), d1.as_path()),
463 _ => panic!("want IO error"),
464 }
465 Ok(())
466 })
467 }
468}