Skip to main content

itools_walker/
lib.rs

1#![warn(missing_docs)]
2
3//! iTools 异步目录遍历模块
4//!
5//! 提供异步的目录遍历功能,支持递归遍历目录结构、并发处理和 glob 模式匹配。
6
7use futures::stream::{self, Stream};
8use std::path::Path;
9use tokio::{
10    fs,
11    sync::{Semaphore, mpsc},
12};
13
14mod errors;
15#[cfg(feature = "glob")]
16mod glob;
17
18pub use errors::WalkError;
19
20/// 遍历选项
21#[derive(Debug, Clone)]
22pub struct WalkOptions {
23    /// 是否递归遍历子目录
24    pub recursive: bool,
25    /// 是否跟随符号链接
26    pub follow_symlinks: bool,
27    /// 包含的 glob 模式
28    #[cfg(feature = "glob")]
29    pub include_patterns: Vec<String>,
30    /// 排除的 glob 模式(如 gitignore)
31    #[cfg(feature = "glob")]
32    pub exclude_patterns: Vec<String>,
33    /// 通道大小
34    pub channel_size: usize,
35    /// 并发度限制
36    pub concurrency_limit: Option<usize>,
37}
38
39impl Default for WalkOptions {
40    fn default() -> Self {
41        Self {
42            recursive: true,
43            follow_symlinks: false,
44            #[cfg(feature = "glob")]
45            include_patterns: Vec::new(),
46            #[cfg(feature = "glob")]
47            exclude_patterns: Vec::new(),
48            concurrency_limit: Some(10),
49            channel_size: 64,
50        }
51    }
52}
53
54/// 遍历条目
55#[derive(Debug, Clone)]
56pub struct WalkEntry {
57    /// 条目路径
58    pub path: std::path::PathBuf,
59    /// 文件元数据
60    pub metadata: std::fs::Metadata,
61    /// 是否为目录
62    pub is_dir: bool,
63    /// 是否为文件
64    pub is_file: bool,
65    /// 是否为符号链接
66    pub is_symlink: bool,
67}
68
69/// 异步目录遍历器
70#[derive(Debug, Clone)]
71pub struct Walker {
72    options: WalkOptions,
73    #[cfg(feature = "glob")]
74    glob_set: Option<glob::GlobSet>,
75}
76
77impl Walker {
78    /// 创建新的遍历器
79    pub fn new() -> Result<Self, WalkError> {
80        Self::with_options(WalkOptions::default())
81    }
82
83    /// 使用指定选项创建新的遍历器
84    pub fn with_options(options: WalkOptions) -> Result<Self, WalkError> {
85        #[cfg(feature = "glob")]
86        let mut walker = Self { options, glob_set: None };
87        #[cfg(not(feature = "glob"))]
88        let walker = Self { options };
89
90        #[cfg(feature = "glob")]
91        walker.build_glob_set()?;
92        Ok(walker)
93    }
94
95    /// 构建 glob 模式集合
96    #[cfg(feature = "glob")]
97    fn build_glob_set(&mut self) -> Result<(), WalkError> {
98        self.glob_set = Some(glob::build_glob_set(&self.options.include_patterns, &self.options.exclude_patterns)?);
99        Ok(())
100    }
101
102    /// 检查路径是否应该被包含
103    #[cfg(feature = "glob")]
104    fn should_include(&self, path: &Path) -> bool {
105        glob::should_include(path, &self.glob_set)
106    }
107
108    /// 检查路径是否应该被包含(无 glob 功能)
109    #[cfg(not(feature = "glob"))]
110    fn should_include(&self, _path: &Path) -> bool {
111        true
112    }
113
114    /// 遍历目录
115    ///
116    /// 返回一个 Stream,用于流式获取遍历结果,避免一次性收集所有条目导致的内存使用过高问题。
117    ///
118    /// # Examples
119    /// ```rust
120    /// use itools_walker::Walker;
121    /// use tokio_stream::StreamExt;
122    ///
123    /// #[tokio::main]
124    /// async fn main() {
125    ///     let walker = Walker::new().unwrap();
126    ///     let mut stream = walker.walk(".").await.unwrap();
127    ///
128    ///     while let Some(entry) = stream.next().await {
129    ///         match entry {
130    ///             Ok(walk_entry) => println!("Found: {:?}", walk_entry.path),
131    ///             Err(e) => println!("Error: {:?}", e),
132    ///         }
133    ///     }
134    /// }
135    /// ```
136    pub async fn walk<P: AsRef<Path>>(
137        &self,
138        path: P,
139    ) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>> + use<P>, WalkError> {
140        let path = path.as_ref().to_path_buf();
141        let (tx, rx) = mpsc::channel(self.options.channel_size);
142
143        let semaphore =
144            self.options.concurrency_limit.map(|limit| Semaphore::new(limit)).unwrap_or_else(|| Semaphore::new(usize::MAX));
145
146        let self_clone = self.clone();
147
148        tokio::spawn(async move {
149            let _ = self_clone.walk_recursive(&path, tx, &semaphore).await;
150        });
151
152        let stream = stream::unfold(rx, |mut rx| async move {
153            match rx.recv().await {
154                Some(item) => Some((item, rx)),
155                None => None,
156            }
157        });
158
159        Ok(stream)
160    }
161
162    /// 递归遍历目录
163    async fn walk_recursive(
164        &self,
165        path: &Path,
166        tx: mpsc::Sender<Result<WalkEntry, WalkError>>,
167        semaphore: &Semaphore,
168    ) -> Result<(), WalkError> {
169        if !self.should_include(path) {
170            return Ok(());
171        }
172
173        let metadata =
174            if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
175
176        let is_dir = metadata.is_dir();
177        let is_file = metadata.is_file();
178        let is_symlink = path.symlink_metadata()?.file_type().is_symlink();
179        let entry = WalkEntry { path: path.to_path_buf(), metadata, is_dir, is_file, is_symlink };
180
181        if tx.send(Ok(entry)).await.is_err() {
182            return Ok(());
183        }
184
185        if is_dir && self.options.recursive {
186            let mut dir_entries = fs::read_dir(path).await?;
187
188            while let Some(entry) = dir_entries.next_entry().await? {
189                let entry_path = entry.path();
190
191                if self.should_include(&entry_path) {
192                    let permit = semaphore.acquire().await?;
193                    let tx_clone = tx.clone();
194                    let semaphore = semaphore;
195                    let self_clone = self.clone();
196
197                    Box::pin(self_clone.walk_recursive(&entry_path, tx_clone, &semaphore)).await?;
198                    drop(permit);
199                }
200            }
201        }
202
203        Ok(())
204    }
205
206    /// 遍历目录并对每个条目执行回调
207    pub async fn walk_with<P: AsRef<Path>, F: Fn(&WalkEntry) -> Result<(), WalkError>>(
208        &self,
209        path: P,
210        mut callback: F,
211    ) -> Result<(), WalkError> {
212        let path = path.as_ref();
213        let semaphore =
214            self.options.concurrency_limit.map(|limit| Semaphore::new(limit)).unwrap_or_else(|| Semaphore::new(usize::MAX));
215
216        self.walk_with_recursive(path, &mut callback, &semaphore).await
217    }
218
219    /// 递归遍历目录并执行回调
220    async fn walk_with_recursive<F: Fn(&WalkEntry) -> Result<(), WalkError>>(
221        &self,
222        path: &Path,
223        callback: &mut F,
224        semaphore: &Semaphore,
225    ) -> Result<(), WalkError> {
226        if !self.should_include(path) {
227            return Ok(());
228        }
229
230        let metadata =
231            if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
232
233        let is_dir = metadata.is_dir();
234        let is_file = metadata.is_file();
235        let is_symlink = path.symlink_metadata()?.file_type().is_symlink();
236        let entry = WalkEntry { path: path.to_path_buf(), metadata, is_dir, is_file, is_symlink };
237
238        callback(&entry)?;
239
240        if is_dir && self.options.recursive {
241            let mut dir_entries = fs::read_dir(path).await?;
242
243            while let Some(entry) = dir_entries.next_entry().await? {
244                let entry_path = entry.path();
245
246                if self.should_include(&entry_path) {
247                    let permit = semaphore.acquire().await?;
248                    let semaphore = semaphore;
249                    let self_clone = self.clone();
250
251                    Box::pin(self_clone.walk_with_recursive(&entry_path, callback, &semaphore)).await?;
252                    drop(permit);
253                }
254            }
255        }
256
257        Ok(())
258    }
259}
260
261/// 便捷函数:遍历目录
262pub async fn walk<P: AsRef<Path>>(path: P) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
263    Walker::new()?.walk(path).await
264}
265
266/// 便捷函数:带选项遍历目录
267pub async fn walk_with_options<P: AsRef<Path>>(
268    path: P,
269    options: WalkOptions,
270) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
271    Walker::with_options(options)?.walk(path).await
272}