dir_walkin/
lib.rs

1use std::{path::Path, sync::Arc};
2use tokio::{sync::Mutex, fs::{DirEntry, ReadDir}, task::JoinSet};
3
4mod error;
5
6pub use error::*;
7
8/// Walk directories linearly.
9///
10/// If an error occurs it'll propagate the main function.
11pub async fn walk_dir_linear<Func, Fut, E>(
12    path: impl AsRef<Path>,
13    mut entry_function: Func,
14) -> Result<std::result::Result<(), E>>
15where
16    Func: FnMut(tokio::fs::DirEntry) -> Fut,
17    Fut: std::future::Future<Output = std::result::Result<(), E>>,
18{
19    let mut cached_dirs = vec![tokio::fs::read_dir(path).await?];
20
21    while let Some(mut dir) = cached_dirs.pop() {
22        if let Some(entry) = dir.next_entry().await? {
23            cached_dirs.push(dir);
24
25            if entry.file_type().await?.is_dir() {
26                // We'll read from it in a second.
27                cached_dirs.push(tokio::fs::read_dir(entry.path()).await?);
28            } else if let Err(e) = entry_function(entry).await {
29                return Ok(Err(e));
30            }
31        }
32    }
33
34    Ok(Ok(()))
35}
36
37/// Walk directories concurrently.
38///
39/// If an error occurs it'll propagate the main function.
40pub async fn walk_dir_concurrently<Func, Fut, E>(
41    path: impl AsRef<Path>,
42    count: usize,
43    entry_function: Func,
44) -> Result<std::result::Result<(), E>>
45where
46    Func: Fn(tokio::fs::DirEntry) -> Fut + Send + Sync + 'static,
47    Fut: std::future::Future<Output = std::result::Result<(), E>> + Send + Sync,
48    E: Send + Sync + 'static,
49{
50    let cached_dirs = Arc::new(Mutex::new(vec![tokio::fs::read_dir(path).await?]));
51    let entry_function = Arc::new(entry_function);
52
53    let mut join_set = JoinSet::new();
54
55    for _ in 0..count {
56        let entry_function = entry_function.clone();
57
58        let mut cached_dirs = cached_dirs.lock().await;
59
60        if let Some(entry) = get_next_file(&mut cached_dirs).await? {
61            join_set.spawn(async move {
62                entry_function(entry).await
63            });
64        }
65    }
66
67    while let Some(v) = join_set.join_next().await {
68        if let Err(e) = v? {
69            return Ok(Err(e));
70        }
71
72        let entry_function = entry_function.clone();
73
74        let mut cached_dirs = cached_dirs.lock().await;
75
76        if let Some(entry) = get_next_file(&mut cached_dirs).await? {
77            join_set.spawn(async move {
78                entry_function(entry).await
79            });
80        }
81    }
82
83    Ok(Ok(()))
84}
85
86/// Walk directories concurrently.
87///
88/// If an error occurs you can handle it through `error_function`.
89///
90/// If an error occurs in `error_function` it'll propagate the main function.
91pub async fn walk_dir_concurrently_with_error_handler<EntryFunc, ErrorFunc, EntryFut, E>(
92    path: impl AsRef<Path>,
93    count: usize,
94    entry_function: EntryFunc,
95    error_function: ErrorFunc,
96) -> Result<std::result::Result<(), E>>
97where
98    EntryFunc: Fn(tokio::fs::DirEntry) -> EntryFut + Send + Sync + 'static,
99    ErrorFunc: Fn(E) -> std::result::Result<(), E>,
100    EntryFut: std::future::Future<Output = std::result::Result<(), E>> + Send + Sync,
101    E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
102{
103    let cached_dirs = Arc::new(Mutex::new(vec![tokio::fs::read_dir(path).await?]));
104    let entry_function = Arc::new(entry_function);
105
106    let mut join_set = JoinSet::new();
107
108    for _ in 0..count {
109        let entry_function = entry_function.clone();
110
111        let mut cached_dirs = cached_dirs.lock().await;
112
113        if let Some(entry) = get_next_file(&mut cached_dirs).await? {
114            join_set.spawn(async move {
115                entry_function(entry).await
116            });
117        }
118    }
119
120    while let Some(v) = join_set.join_next().await {
121        if let Err(e) = v? {
122            if let Err(e) = error_function(e) {
123                return Ok(Err(e));
124            }
125        }
126
127        let entry_function = entry_function.clone();
128
129        let mut cached_dirs = cached_dirs.lock().await;
130
131        if let Some(entry) = get_next_file(&mut cached_dirs).await? {
132            join_set.spawn(async move {
133                entry_function(entry).await
134            });
135        }
136    }
137
138    Ok(Ok(()))
139}
140
141
142
143async fn get_next_file(cached_dirs: &mut Vec<ReadDir>) -> Result<Option<DirEntry>> {
144    while let Some(mut dir) = cached_dirs.pop() {
145        if let Some(entry) = dir.next_entry().await? {
146            cached_dirs.push(dir);
147
148            if entry.file_type().await?.is_dir() {
149                // We'll read from it in a second.
150                cached_dirs.push(tokio::fs::read_dir(entry.path()).await?);
151            } else {
152                return Ok(Some(entry));
153            }
154        }
155    }
156
157    Ok(None)
158}