async_walkdir/
lib.rs

1// Copyright 2020 Ririsoft <riri@ririsoft.com>
2// Copyright 2024 Jordan Danford <jordandanford@gmail.com>
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! An utility for walking through a directory asynchronously and recursively.
17//!
18//! Based on [async-fs](https://docs.rs/async-fs) and [blocking](https://docs.rs/blocking),
19//! it uses a thread pool to handle blocking IOs. Please refere to those crates for the rationale.
20//! This crate is compatible with any async runtime based on [futures 0.3](https://docs.rs/futures-core),
21//! which includes [tokio](https://docs.rs/tokio), [async-std](https://docs.rs/async-std) and [smol](https://docs.rs/smol).
22//!
23//! Symbolic links are walked through but they are not followed.
24//!
25//! # Example
26//!
27//! Recursively traverse a directory:
28//!
29//! ```
30//! use async_walkdir::WalkDir;
31//! use futures_lite::future::block_on;
32//! use futures_lite::stream::StreamExt;
33//!
34//! block_on(async {
35//!     let mut entries = WalkDir::new("my_directory");
36//!     loop {
37//!         match entries.next().await {
38//!             Some(Ok(entry)) => println!("file: {}", entry.path().display()),
39//!             Some(Err(e)) => {
40//!                 eprintln!("error: {}", e);
41//!                 break;
42//!             }
43//!             None => break,
44//!         }
45//!     }
46//! });
47//! ```
48//!
49//! Do not recurse through directories whose name starts with '.':
50//!
51//! ```
52//! use async_walkdir::{Filtering, WalkDir};
53//! use futures_lite::future::block_on;
54//! use futures_lite::stream::StreamExt;
55//!
56//! block_on(async {
57//!     let mut entries = WalkDir::new("my_directory").filter(|entry| async move {
58//!         if let Some(true) = entry
59//!             .path()
60//!             .file_name()
61//!             .map(|f| f.to_string_lossy().starts_with('.'))
62//!         {
63//!             return Filtering::IgnoreDir;
64//!         }
65//!         Filtering::Continue
66//!     });
67//!
68//!     loop {
69//!         match entries.next().await {
70//!             Some(Ok(entry)) => println!("file: {}", entry.path().display()),
71//!             Some(Err(e)) => {
72//!                 eprintln!("error: {}", e);
73//!                 break;
74//!             }
75//!             None => break,
76//!         }
77//!     }
78//! });
79//! ```
80
81#![forbid(unsafe_code)]
82#![deny(missing_docs)]
83
84mod error;
85
86use std::future::Future;
87use std::path::{Path, PathBuf};
88use std::pin::Pin;
89use std::task::{Context, Poll};
90
91use async_fs::{read_dir, ReadDir};
92use futures_lite::future::Boxed as BoxedFut;
93use futures_lite::future::FutureExt;
94use futures_lite::stream::{self, Stream, StreamExt};
95
96#[doc(no_inline)]
97pub use async_fs::DirEntry;
98
99pub use error::Error;
100use error::InnerError;
101
102/// A specialized [`Result`][`std::result::Result`] type.
103pub type Result<T> = std::result::Result<T, Error>;
104
105type BoxStream = futures_lite::stream::Boxed<Result<DirEntry>>;
106
107/// A `Stream` of `DirEntry` generated from recursively traversing
108/// a directory.
109///
110/// Entries are returned without a specific ordering. The top most root directory
111/// is not returned but child directories are.
112///
113/// # Panics
114///
115/// Panics if the directories depth overflows `usize`.
116pub struct WalkDir {
117    root: PathBuf,
118    entries: BoxStream,
119}
120
121/// Sets the filtering behavior.
122#[derive(Debug, PartialEq, Eq)]
123pub enum Filtering {
124    /// Ignore the current entry.
125    Ignore,
126    /// Ignore the current entry and, if a directory,
127    /// do not traverse its childs.
128    IgnoreDir,
129    /// Continue the normal processing.
130    Continue,
131}
132
133impl WalkDir {
134    /// Returns a new `Walkdir` starting at `root`.
135    pub fn new(root: impl AsRef<Path>) -> Self {
136        Self {
137            root: root.as_ref().to_owned(),
138            entries: walk_dir(
139                root,
140                None::<Box<dyn FnMut(DirEntry) -> BoxedFut<Filtering> + Send>>,
141            ),
142        }
143    }
144
145    /// Filter entries.
146    pub fn filter<F, Fut>(self, f: F) -> Self
147    where
148        F: FnMut(DirEntry) -> Fut + Send + 'static,
149        Fut: Future<Output = Filtering> + Send,
150    {
151        let root = self.root.clone();
152        Self {
153            root: self.root,
154            entries: walk_dir(root, Some(f)),
155        }
156    }
157}
158
159impl Stream for WalkDir {
160    type Item = Result<DirEntry>;
161
162    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163        let entries = Pin::new(&mut self.entries);
164        entries.poll_next(cx)
165    }
166}
167
168fn walk_dir<F, Fut>(root: impl AsRef<Path>, filter: Option<F>) -> BoxStream
169where
170    F: FnMut(DirEntry) -> Fut + Send + 'static,
171    Fut: Future<Output = Filtering> + Send,
172{
173    stream::unfold(
174        State::Start((root.as_ref().to_owned(), filter)),
175        move |state| async move {
176            match state {
177                State::Start((root, filter)) => match read_dir(&root).await {
178                    Err(source) => Some((
179                        Err(InnerError::Io { path: root, source }.into()),
180                        State::Done,
181                    )),
182                    Ok(rd) => walk(vec![(root, rd)], filter).await,
183                },
184                State::Walk((dirs, filter)) => walk(dirs, filter).await,
185                State::Done => None,
186            }
187        },
188    )
189    .boxed()
190}
191
192enum State<F> {
193    Start((PathBuf, Option<F>)),
194    Walk((Vec<(PathBuf, ReadDir)>, Option<F>)),
195    Done,
196}
197
198type UnfoldState<F> = (Result<DirEntry>, State<F>);
199
200fn walk<F, Fut>(
201    mut dirs: Vec<(PathBuf, ReadDir)>,
202    filter: Option<F>,
203) -> BoxedFut<Option<UnfoldState<F>>>
204where
205    F: FnMut(DirEntry) -> Fut + Send + 'static,
206    Fut: Future<Output = Filtering> + Send,
207{
208    async move {
209        if let Some((path, dir)) = dirs.last_mut() {
210            match dir.next().await {
211                Some(Ok(entry)) => walk_entry(entry, dirs, filter).await,
212                Some(Err(source)) => Some((
213                    Err(InnerError::Io {
214                        path: path.to_path_buf(),
215                        source,
216                    }
217                    .into()),
218                    State::Walk((dirs, filter)),
219                )),
220                None => {
221                    dirs.pop();
222                    walk(dirs, filter).await
223                }
224            }
225        } else {
226            None
227        }
228    }
229    .boxed()
230}
231
232fn walk_entry<F, Fut>(
233    entry: DirEntry,
234    mut dirs: Vec<(PathBuf, ReadDir)>,
235    mut filter: Option<F>,
236) -> BoxedFut<Option<UnfoldState<F>>>
237where
238    F: FnMut(DirEntry) -> Fut + Send + 'static,
239    Fut: Future<Output = Filtering> + Send,
240{
241    async move {
242        match entry.file_type().await {
243            Err(source) => Some((
244                Err(InnerError::Io {
245                    path: entry.path(),
246                    source,
247                }
248                .into()),
249                State::Walk((dirs, filter)),
250            )),
251            Ok(ft) => {
252                let filtering = match filter.as_mut() {
253                    Some(filter) => filter(entry.clone()).await,
254                    None => Filtering::Continue,
255                };
256                if ft.is_dir() {
257                    let path = entry.path();
258                    let rd = match read_dir(&path).await {
259                        Err(source) => {
260                            return Some((
261                                Err(InnerError::Io { path, source }.into()),
262                                State::Walk((dirs, filter)),
263                            ))
264                        }
265                        Ok(rd) => rd,
266                    };
267                    if filtering != Filtering::IgnoreDir {
268                        dirs.push((path, rd));
269                    }
270                }
271                match filtering {
272                    Filtering::Continue => Some((Ok(entry), State::Walk((dirs, filter)))),
273                    Filtering::IgnoreDir | Filtering::Ignore => walk(dirs, filter).await,
274                }
275            }
276        }
277    }
278    .boxed()
279}
280
281#[cfg(test)]
282mod tests {
283    use std::io::{ErrorKind, Result};
284
285    use futures_lite::future::block_on;
286    use futures_lite::stream::StreamExt;
287
288    use super::{Filtering, WalkDir};
289
290    #[test]
291    fn walk_dir_empty() -> Result<()> {
292        block_on(async {
293            let root = tempfile::tempdir()?;
294            let mut wd = WalkDir::new(root.path());
295            assert!(wd.next().await.is_none());
296            Ok(())
297        })
298    }
299
300    #[test]
301    fn walk_dir_not_exist() {
302        block_on(async {
303            let mut wd = WalkDir::new("foobar");
304            match wd.next().await.unwrap() {
305                Err(e) => {
306                    assert_eq!(wd.root, e.path().unwrap());
307                    assert_eq!(e.io().unwrap().kind(), ErrorKind::NotFound);
308                    assert_eq!(e.into_io().unwrap().kind(), ErrorKind::NotFound);
309                }
310                _ => panic!("want IO error"),
311            }
312        })
313    }
314
315    #[test]
316    fn into_io_error() {
317        block_on(async {
318            let mut wd = WalkDir::new("foobar");
319            match wd.next().await.unwrap() {
320                Err(e) => {
321                    let e: std::io::Error = e.into();
322                    assert_eq!(e.kind(), ErrorKind::NotFound);
323                }
324                _ => panic!("want IO error"),
325            }
326        })
327    }
328
329    #[test]
330    fn walk_dir_files() -> Result<()> {
331        block_on(async {
332            let root = tempfile::tempdir()?;
333            let f1 = root.path().join("f1.txt");
334            let d1 = root.path().join("d1");
335            let f2 = d1.join("f2.txt");
336            let d2 = d1.join("d2");
337            let f3 = d2.join("f3.txt");
338
339            async_fs::create_dir_all(&d2).await?;
340            async_fs::write(&f1, []).await?;
341            async_fs::write(&f2, []).await?;
342            async_fs::write(&f3, []).await?;
343
344            let want = vec![
345                d1.to_owned(),
346                d2.to_owned(),
347                f3.to_owned(),
348                f2.to_owned(),
349                f1.to_owned(),
350            ];
351            let mut wd = WalkDir::new(root.path());
352
353            let mut got = Vec::new();
354            while let Some(entry) = wd.next().await {
355                let entry = entry.unwrap();
356                got.push(entry.path());
357            }
358            got.sort();
359            assert_eq!(got, want);
360
361            Ok(())
362        })
363    }
364
365    #[test]
366    fn filter_dirs() -> Result<()> {
367        block_on(async {
368            let root = tempfile::tempdir()?;
369            let f1 = root.path().join("f1.txt");
370            let d1 = root.path().join("d1");
371            let f2 = d1.join("f2.txt");
372            let d2 = d1.join("d2");
373            let f3 = d2.join("f3.txt");
374
375            async_fs::create_dir_all(&d2).await?;
376            async_fs::write(&f1, []).await?;
377            async_fs::write(&f2, []).await?;
378            async_fs::write(&f3, []).await?;
379
380            let want = vec![f3.to_owned(), f2.to_owned(), f1.to_owned()];
381
382            let mut wd = WalkDir::new(root.path()).filter(|entry| async move {
383                match entry.file_type().await {
384                    Ok(ft) if ft.is_dir() => Filtering::Ignore,
385                    _ => Filtering::Continue,
386                }
387            });
388
389            let mut got = Vec::new();
390            while let Some(entry) = wd.next().await {
391                let entry = entry.unwrap();
392                got.push(entry.path());
393            }
394            got.sort();
395            assert_eq!(got, want);
396
397            Ok(())
398        })
399    }
400
401    #[test]
402    fn filter_dirs_no_traverse() -> Result<()> {
403        block_on(async {
404            let root = tempfile::tempdir()?;
405            let f1 = root.path().join("f1.txt");
406            let d1 = root.path().join("d1");
407            let f2 = d1.join("f2.txt");
408            let d2 = d1.join("d2");
409            let f3 = d2.join("f3.txt");
410
411            async_fs::create_dir_all(&d2).await?;
412            async_fs::write(&f1, []).await?;
413            async_fs::write(&f2, []).await?;
414            async_fs::write(&f3, []).await?;
415
416            let want = vec![d1, f2.to_owned(), f1.to_owned()];
417
418            let mut wd = WalkDir::new(root.path()).filter(move |entry| {
419                let d2 = d2.clone();
420                async move {
421                    if entry.path() == d2 {
422                        Filtering::IgnoreDir
423                    } else {
424                        Filtering::Continue
425                    }
426                }
427            });
428
429            let mut got = Vec::new();
430            while let Some(entry) = wd.next().await {
431                let entry = entry.unwrap();
432                got.push(entry.path());
433            }
434            got.sort();
435            assert_eq!(got, want);
436
437            Ok(())
438        })
439    }
440}
441
442#[cfg(all(unix, test))]
443mod test_unix {
444    use async_fs::unix::PermissionsExt;
445    use std::io::Result;
446
447    use futures_lite::future::block_on;
448    use futures_lite::stream::StreamExt;
449
450    use super::WalkDir;
451    #[test]
452    fn walk_dir_error_path() -> Result<()> {
453        block_on(async {
454            let root = tempfile::tempdir()?;
455            let d1 = root.path().join("d1");
456            async_fs::create_dir_all(&d1).await?;
457            let mut perms = async_fs::metadata(&d1).await?.permissions();
458            perms.set_mode(0o222);
459            async_fs::set_permissions(&d1, perms).await?;
460            let mut wd = WalkDir::new(&root);
461            match wd.next().await.unwrap() {
462                Err(e) => assert_eq!(e.path().unwrap(), d1.as_path()),
463                _ => panic!("want IO error"),
464            }
465            Ok(())
466        })
467    }
468}