Skip to main content

itools_walker/
lib.rs

1#![warn(missing_docs)]
2
3//! iTools 异步目录遍历模块
4//!
5//! 提供异步的目录遍历功能,支持递归遍历目录结构、并发处理和 glob 模式匹配。
6
7use futures::stream::{self, Stream};
8use std::{num::NonZeroU32, path::Path};
9use tokio::{
10    fs,
11    sync::{Semaphore, mpsc},
12};
13
14mod errors;
15#[cfg(feature = "glob")]
16mod glob;
17
18pub use errors::WalkError;
19
20/// 遍历选项
21#[derive(Debug, Clone)]
22pub struct WalkOptions {
23    /// 是否递归遍历子目录
24    pub recursive: bool,
25    /// 是否跟随符号链接
26    pub follow_symlinks: bool,
27    /// 包含的 glob 模式
28    #[cfg(feature = "glob")]
29    pub include_patterns: Vec<String>,
30    /// 排除的 glob 模式(如 gitignore)
31    #[cfg(feature = "glob")]
32    pub exclude_patterns: Vec<String>,
33    /// 通道大小
34    pub channel_size: NonZeroU32,
35    /// 并发度限制
36    pub concurrency_limit: NonZeroU32,
37}
38
39impl Default for WalkOptions {
40    fn default() -> Self {
41        Self {
42            recursive: true,
43            follow_symlinks: false,
44            #[cfg(feature = "glob")]
45            include_patterns: Vec::new(),
46            #[cfg(feature = "glob")]
47            exclude_patterns: Vec::new(),
48            concurrency_limit: NonZeroU32::new(10).unwrap(),
49            channel_size: NonZeroU32::new(64).unwrap(),
50        }
51    }
52}
53
54/// 遍历条目
55#[derive(Debug, Clone)]
56pub struct WalkEntry {
57    /// 条目路径
58    pub path: std::path::PathBuf,
59    /// 文件元数据
60    pub metadata: std::fs::Metadata,
61}
62
63impl WalkEntry {
64    /// 是否为目录
65    pub fn is_dir(&self) -> bool {
66        self.metadata.is_dir()
67    }
68
69    /// 是否为文件
70    pub fn is_file(&self) -> bool {
71        self.metadata.is_file()
72    }
73
74    /// 是否为符号链接
75    pub fn is_symlink(&self) -> bool {
76        self.path.symlink_metadata().map(|m| m.file_type().is_symlink()).unwrap_or(false)
77    }
78}
79
80/// 异步目录遍历器
81#[derive(Debug, Clone)]
82pub struct Walker {
83    options: WalkOptions,
84    #[cfg(feature = "glob")]
85    glob_set: Option<glob::GlobSet>,
86}
87
88impl Walker {
89    /// 创建新的遍历器
90    pub fn new() -> Result<Self, WalkError> {
91        Self::with_options(WalkOptions::default())
92    }
93
94    /// 使用指定选项创建新的遍历器
95    pub fn with_options(options: WalkOptions) -> Result<Self, WalkError> {
96        #[cfg(feature = "glob")]
97        let mut walker = Self { options, glob_set: None };
98        #[cfg(not(feature = "glob"))]
99        let walker = Self { options };
100
101        #[cfg(feature = "glob")]
102        walker.build_glob_set()?;
103        Ok(walker)
104    }
105
106    /// 构建 glob 模式集合
107    #[cfg(feature = "glob")]
108    fn build_glob_set(&mut self) -> Result<(), WalkError> {
109        self.glob_set = Some(glob::build_glob_set(&self.options.include_patterns, &self.options.exclude_patterns)?);
110        Ok(())
111    }
112
113    /// 检查路径是否应该被包含
114    #[cfg(feature = "glob")]
115    fn should_include(&self, path: &Path) -> bool {
116        glob::should_include(path, &self.glob_set)
117    }
118
119    /// 检查路径是否应该被包含(无 glob 功能)
120    #[cfg(not(feature = "glob"))]
121    fn should_include(&self, _path: &Path) -> bool {
122        true
123    }
124
125    /// 遍历目录
126    ///
127    /// 返回一个 Stream,用于流式获取遍历结果,避免一次性收集所有条目导致的内存使用过高问题。
128    ///
129    /// # Examples
130    /// ```rust
131    /// use itools_walker::Walker;
132    /// use tokio_stream::StreamExt;
133    ///
134    /// #[tokio::main]
135    /// async fn main() {
136    ///     let walker = Walker::new().unwrap();
137    ///     let mut stream = walker.walk(".").await.unwrap();
138    ///
139    ///     while let Some(entry) = stream.next().await {
140    ///         match entry {
141    ///             Ok(walk_entry) => println!("Found: {:?}", walk_entry.path),
142    ///             Err(e) => println!("Error: {:?}", e),
143    ///         }
144    ///     }
145    /// }
146    /// ```
147    pub async fn walk<P: AsRef<Path>>(
148        &self,
149        path: P,
150    ) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>> + Unpin + use<P>, WalkError> {
151        let path = path.as_ref().to_path_buf();
152        let (tx, rx) = mpsc::channel(self.options.channel_size.get() as usize);
153
154        let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
155
156        let self_clone = self.clone();
157
158        tokio::spawn(async move {
159            let _ = self_clone.walk_recursive(&path, tx, &semaphore).await;
160        });
161
162        let stream = stream::unfold(rx, |mut rx| async move {
163            rx.recv().await.map(|item| (item, rx))
164        });
165
166        Ok(Box::pin(stream))
167    }
168
169    /// 递归遍历目录
170    async fn walk_recursive(
171        &self,
172        path: &Path,
173        tx: mpsc::Sender<Result<WalkEntry, WalkError>>,
174        semaphore: &Semaphore,
175    ) -> Result<(), WalkError> {
176        if !self.should_include(path) {
177            return Ok(());
178        }
179
180        let metadata =
181            if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
182
183        let is_dir = metadata.is_dir();
184        let entry = WalkEntry { path: path.to_path_buf(), metadata };
185
186        if tx.send(Ok(entry)).await.is_err() {
187            return Ok(());
188        }
189
190        if is_dir && self.options.recursive {
191            let mut dir_entries = fs::read_dir(path).await?;
192
193            while let Some(entry) = dir_entries.next_entry().await? {
194                let entry_path = entry.path();
195
196                if self.should_include(&entry_path) {
197                    let permit = semaphore.acquire().await?;
198                    let tx_clone = tx.clone();
199                    let self_clone = self.clone();
200
201                    Box::pin(self_clone.walk_recursive(&entry_path, tx_clone, semaphore)).await?;
202                    drop(permit);
203                }
204            }
205        }
206
207        Ok(())
208    }
209
210    /// 遍历目录并对每个条目执行回调
211    pub async fn walk_with<P: AsRef<Path>, F: Fn(&WalkEntry) -> Result<(), WalkError>>(
212        &self,
213        path: P,
214        mut callback: F,
215    ) -> Result<(), WalkError> {
216        let path = path.as_ref();
217        let semaphore = Semaphore::new(self.options.concurrency_limit.get() as usize);
218
219        self.walk_with_recursive(path, &mut callback, &semaphore).await
220    }
221
222    /// 递归遍历目录并执行回调
223    async fn walk_with_recursive<F: Fn(&WalkEntry) -> Result<(), WalkError>>(
224        &self,
225        path: &Path,
226        callback: &mut F,
227        semaphore: &Semaphore,
228    ) -> Result<(), WalkError> {
229        if !self.should_include(path) {
230            return Ok(());
231        }
232
233        let metadata =
234            if self.options.follow_symlinks { fs::metadata(path).await? } else { fs::symlink_metadata(path).await? };
235
236        let is_dir = metadata.is_dir();
237        let entry = WalkEntry { path: path.to_path_buf(), metadata };
238
239        callback(&entry)?;
240
241        if is_dir && self.options.recursive {
242            let mut dir_entries = fs::read_dir(path).await?;
243
244            while let Some(entry) = dir_entries.next_entry().await? {
245                let entry_path = entry.path();
246
247                if self.should_include(&entry_path) {
248                    let permit = semaphore.acquire().await?;
249                    let self_clone = self.clone();
250
251                    Box::pin(self_clone.walk_with_recursive(&entry_path, callback, semaphore)).await?;
252                    drop(permit);
253                }
254            }
255        }
256
257        Ok(())
258    }
259}
260
261/// 便捷函数:遍历目录
262pub async fn walk<P: AsRef<Path>>(path: P) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
263    Walker::new()?.walk(path).await
264}
265
266/// 便捷函数:带选项遍历目录
267pub async fn walk_with_options<P: AsRef<Path>>(
268    path: P,
269    options: WalkOptions,
270) -> Result<impl Stream<Item = Result<WalkEntry, WalkError>>, WalkError> {
271    Walker::with_options(options)?.walk(path).await
272}