1#![warn(missing_docs)]
2
3use futures::stream::{self, Stream};
8use std::path::Path;
9use tokio::{
10 fs,
11 sync::{Semaphore, mpsc},
12};
13
14mod errors;
15#[cfg(feature = "glob")]
16mod glob;
17
18pub use errors::WalkError;
19
20#[derive(Debug, Clone)]
22pub struct WalkOptions {
23 pub recursive: bool,
25 pub follow_symlinks: bool,
27 #[cfg(feature = "glob")]
29 pub include_patterns: Vec<String>,
30 #[cfg(feature = "glob")]
32 pub exclude_patterns: Vec<String>,
33 pub channel_size: usize,
35 pub concurrency_limit: Option<usize>,
37}
38
39impl Default for WalkOptions {
40 fn default() -> Self {
41 Self {
42 recursive: true,
43 follow_symlinks: false,
44 #[cfg(feature = "glob")]
45 include_patterns: Vec::new(),
46 #[cfg(feature = "glob")]
47 exclude_patterns: Vec::new(),
48 concurrency_limit: Some(10),
49 channel_size: 64,
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct WalkEntry {
57 pub path: std::path::PathBuf,
59 pub metadata: std::fs::Metadata,
61 pub is_dir: bool,
63 pub is_file: bool,
65 pub is_symlink: bool,
67}
68
69#[derive(Debug, Clone)]
71pub struct Walker {
72 options: WalkOptions,
73 #[cfg(feature = "glob")]
74 glob_set: Option<glob::GlobSet>,
75}
76
77impl Walker {
78 pub fn new() -> Result<Self, WalkError> {
80 Self::with_options(WalkOptions::default())
81 }
82
83 pub fn with_options(options: WalkOptions) -> Result<Self, WalkError> {
85 #[cfg(feature = "glob")]
86 let mut walker = Self { options, glob_set: None };
87 #[cfg(not(feature = "glob"))]
88 let walker = Self { options };
89
90 #[cfg(feature = "glob")]
91 walker.build_glob_set()?;
92 Ok(walker)
93 }
94
95 #[cfg(feature = "glob")]
97 fn build_glob_set(&mut self) -> Result<(), WalkError> {
98 self.glob_set = Some(glob::build_glob_set(&self.options.include_patterns, &self.options.exclude_patterns)?);
99 Ok(())
100 }
101
102 #[cfg(feature = "glob")]
104 fn should_include(&self, path: &Path) -> bool {
105 glob::should_include(path, &self.glob_set)
106 }
107
108 #[cfg(not(feature = "glob"))]
110 fn should_include(&self, _path: &Path) -> bool {
111 true
112 }
113
114 pub async fn walk<P: AsRef<Path>>(
137 &self,
138 path: P,
139 ) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>> + use<P>, WalkError> {
140 let path = path.as_ref().to_path_buf();
141 let (tx, rx) = mpsc::channel(self.options.channel_size);
142
143 let semaphore =
144 self.options.concurrency_limit.map(|limit| Semaphore::new(limit)).unwrap_or_else(|| Semaphore::new(usize::MAX));
145
146 let self_clone = self.clone();
147
148 tokio::spawn(async move {
149 let _ = self_clone.walk_recursive(&path, tx, &semaphore).await;
150 });
151
152 let stream = stream::unfold(rx, |mut rx| async move {
153 match rx.recv().await {
154 Some(item) => Some((item, rx)),
155 None => None,
156 }
157 });
158
159 Ok(stream)
160 }
161
162 async fn walk_recursive(
164 &self,
165 path: &Path,
166 tx: mpsc::Sender<Result<WalkEntry, WalkError>>,
167 semaphore: &Semaphore,
168 ) -> Result<(), WalkError> {
169 if !self.should_include(path) {
170 return Ok(());
171 }
172
173 let metadata =
174 if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
175
176 let is_dir = metadata.is_dir();
177 let is_file = metadata.is_file();
178 let is_symlink = path.symlink_metadata()?.file_type().is_symlink();
179 let entry = WalkEntry { path: path.to_path_buf(), metadata, is_dir, is_file, is_symlink };
180
181 if tx.send(Ok(entry)).await.is_err() {
182 return Ok(());
183 }
184
185 if is_dir && self.options.recursive {
186 let mut dir_entries = fs::read_dir(path).await?;
187
188 while let Some(entry) = dir_entries.next_entry().await? {
189 let entry_path = entry.path();
190
191 if self.should_include(&entry_path) {
192 let permit = semaphore.acquire().await?;
193 let tx_clone = tx.clone();
194 let semaphore = semaphore;
195 let self_clone = self.clone();
196
197 Box::pin(self_clone.walk_recursive(&entry_path, tx_clone, &semaphore)).await?;
198 drop(permit);
199 }
200 }
201 }
202
203 Ok(())
204 }
205
206 pub async fn walk_with<P: AsRef<Path>, F: Fn(&WalkEntry) -> Result<(), WalkError>>(
208 &self,
209 path: P,
210 mut callback: F,
211 ) -> Result<(), WalkError> {
212 let path = path.as_ref();
213 let semaphore =
214 self.options.concurrency_limit.map(|limit| Semaphore::new(limit)).unwrap_or_else(|| Semaphore::new(usize::MAX));
215
216 self.walk_with_recursive(path, &mut callback, &semaphore).await
217 }
218
219 async fn walk_with_recursive<F: Fn(&WalkEntry) -> Result<(), WalkError>>(
221 &self,
222 path: &Path,
223 callback: &mut F,
224 semaphore: &Semaphore,
225 ) -> Result<(), WalkError> {
226 if !self.should_include(path) {
227 return Ok(());
228 }
229
230 let metadata =
231 if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
232
233 let is_dir = metadata.is_dir();
234 let is_file = metadata.is_file();
235 let is_symlink = path.symlink_metadata()?.file_type().is_symlink();
236 let entry = WalkEntry { path: path.to_path_buf(), metadata, is_dir, is_file, is_symlink };
237
238 callback(&entry)?;
239
240 if is_dir && self.options.recursive {
241 let mut dir_entries = fs::read_dir(path).await?;
242
243 while let Some(entry) = dir_entries.next_entry().await? {
244 let entry_path = entry.path();
245
246 if self.should_include(&entry_path) {
247 let permit = semaphore.acquire().await?;
248 let semaphore = semaphore;
249 let self_clone = self.clone();
250
251 Box::pin(self_clone.walk_with_recursive(&entry_path, callback, &semaphore)).await?;
252 drop(permit);
253 }
254 }
255 }
256
257 Ok(())
258 }
259}
260
261pub async fn walk<P: AsRef<Path>>(path: P) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
263 Walker::new()?.walk(path).await
264}
265
266pub async fn walk_with_options<P: AsRef<Path>>(
268 path: P,
269 options: WalkOptions,
270) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
271 Walker::with_options(options)?.walk(path).await
272}