1#![warn(missing_docs)]
2
3use futures::stream::{self, Stream};
8use std::{num::NonZeroU32, path::Path};
9use tokio::{
10 fs,
11 sync::{Semaphore, mpsc},
12};
13
14mod errors;
15#[cfg(feature = "glob")]
16mod glob;
17
18pub use errors::WalkError;
19
20#[derive(Debug, Clone)]
22pub struct WalkOptions {
23 pub recursive: bool,
25 pub follow_symlinks: bool,
27 #[cfg(feature = "glob")]
29 pub include_patterns: Vec<String>,
30 #[cfg(feature = "glob")]
32 pub exclude_patterns: Vec<String>,
33 pub channel_size: NonZeroU32,
35 pub concurrency_limit: NonZeroU32,
37}
38
39impl Default for WalkOptions {
40 fn default() -> Self {
41 Self {
42 recursive: true,
43 follow_symlinks: false,
44 #[cfg(feature = "glob")]
45 include_patterns: Vec::new(),
46 #[cfg(feature = "glob")]
47 exclude_patterns: Vec::new(),
48 concurrency_limit: NonZeroU32::new(10).unwrap(),
49 channel_size: NonZeroU32::new(64).unwrap(),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct WalkEntry {
57 pub path: std::path::PathBuf,
59 pub metadata: std::fs::Metadata,
61}
62
63impl WalkEntry {
64 pub fn is_dir(&self) -> bool {
66 self.metadata.is_dir()
67 }
68
69 pub fn is_file(&self) -> bool {
71 self.metadata.is_file()
72 }
73
74 pub fn is_symlink(&self) -> bool {
76 self.path.symlink_metadata().map(|m| m.file_type().is_symlink()).unwrap_or(false)
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct Walker {
83 options: WalkOptions,
84 #[cfg(feature = "glob")]
85 glob_set: Option<glob::GlobSet>,
86}
87
88impl Walker {
89 pub fn new() -> Result<Self, WalkError> {
91 Self::with_options(WalkOptions::default())
92 }
93
94 pub fn with_options(options: WalkOptions) -> Result<Self, WalkError> {
96 #[cfg(feature = "glob")]
97 let mut walker = Self { options, glob_set: None };
98 #[cfg(not(feature = "glob"))]
99 let walker = Self { options };
100
101 #[cfg(feature = "glob")]
102 walker.build_glob_set()?;
103 Ok(walker)
104 }
105
106 #[cfg(feature = "glob")]
108 fn build_glob_set(&mut self) -> Result<(), WalkError> {
109 self.glob_set = Some(glob::build_glob_set(&self.options.include_patterns, &self.options.exclude_patterns)?);
110 Ok(())
111 }
112
113 #[cfg(feature = "glob")]
115 fn should_include(&self, path: &Path) -> bool {
116 glob::should_include(path, &self.glob_set)
117 }
118
119 #[cfg(not(feature = "glob"))]
121 fn should_include(&self, _path: &Path) -> bool {
122 true
123 }
124
125 pub async fn walk<P: AsRef<Path>>(
148 &self,
149 path: P,
150 ) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>> + Unpin + use<P>, WalkError> {
151 let path = path.as_ref().to_path_buf();
152 let (tx, rx) = mpsc::channel(self.options.channel_size.get() as usize);
153
154 let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
155
156 let self_clone = self.clone();
157
158 tokio::spawn(async move {
159 let _ = self_clone.walk_recursive(&path, tx, &semaphore).await;
160 });
161
162 let stream = stream::unfold(rx, |mut rx| async move {
163 rx.recv().await.map(|item| (item, rx))
164 });
165
166 Ok(Box::pin(stream))
167 }
168
169 async fn walk_recursive(
171 &self,
172 path: &Path,
173 tx: mpsc::Sender<Result<WalkEntry, WalkError>>,
174 semaphore: &Semaphore,
175 ) -> Result<(), WalkError> {
176 if !self.should_include(path) {
177 return Ok(());
178 }
179
180 let metadata =
181 if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
182
183 let is_dir = metadata.is_dir();
184 let entry = WalkEntry { path: path.to_path_buf(), metadata };
185
186 if tx.send(Ok(entry)).await.is_err() {
187 return Ok(());
188 }
189
190 if is_dir && self.options.recursive {
191 let mut dir_entries = fs::read_dir(path).await?;
192
193 while let Some(entry) = dir_entries.next_entry().await? {
194 let entry_path = entry.path();
195
196 if self.should_include(&entry_path) {
197 let permit = semaphore.acquire().await?;
198 let tx_clone = tx.clone();
199 let self_clone = self.clone();
200
201 Box::pin(self_clone.walk_recursive(&entry_path, tx_clone, semaphore)).await?;
202 drop(permit);
203 }
204 }
205 }
206
207 Ok(())
208 }
209
210 pub async fn walk_with<P: AsRef<Path>, F: Fn(&WalkEntry) -> Result<(), WalkError>>(
212 &self,
213 path: P,
214 mut callback: F,
215 ) -> Result<(), WalkError> {
216 let path = path.as_ref();
217 let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
218
219 self.walk_with_recursive(path, &mut callback, &semaphore).await
220 }
221
222 async fn walk_with_recursive<F: Fn(&WalkEntry) -> Result<(), WalkError>>(
224 &self,
225 path: &Path,
226 callback: &mut F,
227 semaphore: &Semaphore,
228 ) -> Result<(), WalkError> {
229 if !self.should_include(path) {
230 return Ok(());
231 }
232
233 let metadata =
234 if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
235
236 let is_dir = metadata.is_dir();
237 let entry = WalkEntry { path: path.to_path_buf(), metadata };
238
239 callback(&entry)?;
240
241 if is_dir && self.options.recursive {
242 let mut dir_entries = fs::read_dir(path).await?;
243
244 while let Some(entry) = dir_entries.next_entry().await? {
245 let entry_path = entry.path();
246
247 if self.should_include(&entry_path) {
248 let permit = semaphore.acquire().await?;
249 let self_clone = self.clone();
250
251 Box::pin(self_clone.walk_with_recursive(&entry_path, callback, semaphore)).await?;
252 drop(permit);
253 }
254 }
255 }
256
257 Ok(())
258 }
259}
260
261pub async fn walk<P: AsRef<Path>>(path: P) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
263 Walker::new()?.walk(path).await
264}
265
266pub async fn walk_with_options<P: AsRef<Path>>(
268 path: P,
269 options: WalkOptions,
270) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
271 Walker::with_options(options)?.walk(path).await
272}