1use std::{path::Path, sync::Arc};
2use tokio::{sync::Mutex, fs::{DirEntry, ReadDir}, task::JoinSet};
3
4mod error;
5
6pub use error::*;
7
8pub async fn walk_dir_linear<Func, Fut, E>(
12 path: impl AsRef<Path>,
13 mut entry_function: Func,
14) -> Result<std::result::Result<(), E>>
15where
16 Func: FnMut(tokio::fs::DirEntry) -> Fut,
17 Fut: std::future::Future<Output = std::result::Result<(), E>>,
18{
19 let mut cached_dirs = vec![tokio::fs::read_dir(path).await?];
20
21 while let Some(mut dir) = cached_dirs.pop() {
22 if let Some(entry) = dir.next_entry().await? {
23 cached_dirs.push(dir);
24
25 if entry.file_type().await?.is_dir() {
26 cached_dirs.push(tokio::fs::read_dir(entry.path()).await?);
28 } else if let Err(e) = entry_function(entry).await {
29 return Ok(Err(e));
30 }
31 }
32 }
33
34 Ok(Ok(()))
35}
36
37pub async fn walk_dir_concurrently<Func, Fut, E>(
41 path: impl AsRef<Path>,
42 count: usize,
43 entry_function: Func,
44) -> Result<std::result::Result<(), E>>
45where
46 Func: Fn(tokio::fs::DirEntry) -> Fut + Send + Sync + 'static,
47 Fut: std::future::Future<Output = std::result::Result<(), E>> + Send + Sync,
48 E: Send + Sync + 'static,
49{
50 let cached_dirs = Arc::new(Mutex::new(vec![tokio::fs::read_dir(path).await?]));
51 let entry_function = Arc::new(entry_function);
52
53 let mut join_set = JoinSet::new();
54
55 for _ in 0..count {
56 let entry_function = entry_function.clone();
57
58 let mut cached_dirs = cached_dirs.lock().await;
59
60 if let Some(entry) = get_next_file(&mut cached_dirs).await? {
61 join_set.spawn(async move {
62 entry_function(entry).await
63 });
64 }
65 }
66
67 while let Some(v) = join_set.join_next().await {
68 if let Err(e) = v? {
69 return Ok(Err(e));
70 }
71
72 let entry_function = entry_function.clone();
73
74 let mut cached_dirs = cached_dirs.lock().await;
75
76 if let Some(entry) = get_next_file(&mut cached_dirs).await? {
77 join_set.spawn(async move {
78 entry_function(entry).await
79 });
80 }
81 }
82
83 Ok(Ok(()))
84}
85
86pub async fn walk_dir_concurrently_with_error_handler<EntryFunc, ErrorFunc, EntryFut, E>(
92 path: impl AsRef<Path>,
93 count: usize,
94 entry_function: EntryFunc,
95 error_function: ErrorFunc,
96) -> Result<std::result::Result<(), E>>
97where
98 EntryFunc: Fn(tokio::fs::DirEntry) -> EntryFut + Send + Sync + 'static,
99 ErrorFunc: Fn(E) -> std::result::Result<(), E>,
100 EntryFut: std::future::Future<Output = std::result::Result<(), E>> + Send + Sync,
101 E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
102{
103 let cached_dirs = Arc::new(Mutex::new(vec![tokio::fs::read_dir(path).await?]));
104 let entry_function = Arc::new(entry_function);
105
106 let mut join_set = JoinSet::new();
107
108 for _ in 0..count {
109 let entry_function = entry_function.clone();
110
111 let mut cached_dirs = cached_dirs.lock().await;
112
113 if let Some(entry) = get_next_file(&mut cached_dirs).await? {
114 join_set.spawn(async move {
115 entry_function(entry).await
116 });
117 }
118 }
119
120 while let Some(v) = join_set.join_next().await {
121 if let Err(e) = v? {
122 if let Err(e) = error_function(e) {
123 return Ok(Err(e));
124 }
125 }
126
127 let entry_function = entry_function.clone();
128
129 let mut cached_dirs = cached_dirs.lock().await;
130
131 if let Some(entry) = get_next_file(&mut cached_dirs).await? {
132 join_set.spawn(async move {
133 entry_function(entry).await
134 });
135 }
136 }
137
138 Ok(Ok(()))
139}
140
141
142
143async fn get_next_file(cached_dirs: &mut Vec<ReadDir>) -> Result<Option<DirEntry>> {
144 while let Some(mut dir) = cached_dirs.pop() {
145 if let Some(entry) = dir.next_entry().await? {
146 cached_dirs.push(dir);
147
148 if entry.file_type().await?.is_dir() {
149 cached_dirs.push(tokio::fs::read_dir(entry.path()).await?);
151 } else {
152 return Ok(Some(entry));
153 }
154 }
155 }
156
157 Ok(None)
158}