codelens_core/walker/
parallel.rs1use std::path::Path;
4use std::sync::Arc;
5
6use crossbeam_channel::bounded;
7use ignore::{DirEntry, WalkBuilder, WalkState};
8
9use crate::analyzer::stats::FileStats;
10use crate::analyzer::FileAnalyzer;
11use crate::error::Result;
12use crate::filter::Filter;
13
14#[derive(Debug, Clone)]
16pub struct WalkerConfig {
17 pub threads: usize,
19 pub follow_symlinks: bool,
21 pub use_gitignore: bool,
23 pub max_depth: Option<usize>,
25 pub custom_ignores: Vec<String>,
27}
28
29impl Default for WalkerConfig {
30 fn default() -> Self {
31 Self {
32 threads: num_cpus::get(),
33 follow_symlinks: false,
34 use_gitignore: true,
35 max_depth: None,
36 custom_ignores: Vec::new(),
37 }
38 }
39}
40
41pub struct ParallelWalker {
43 config: WalkerConfig,
44}
45
46impl ParallelWalker {
47 pub fn new(config: WalkerConfig) -> Self {
49 Self { config }
50 }
51
52 pub fn walk_and_analyze<F, S>(
57 &self,
58 root: &Path,
59 analyzer: Arc<FileAnalyzer>,
60 filter: Arc<dyn Filter>,
61 mut on_file: F,
62 mut on_skip: S,
63 ) -> Result<()>
64 where
65 F: FnMut(FileStats) + Send,
66 S: FnMut(&Path) + Send,
67 {
68 let (tx, rx) = bounded::<WalkResult>(1000);
69
70 let mut builder = WalkBuilder::new(root);
72 builder
73 .hidden(false) .git_ignore(self.config.use_gitignore)
75 .git_global(self.config.use_gitignore)
76 .git_exclude(self.config.use_gitignore)
77 .follow_links(self.config.follow_symlinks)
78 .threads(self.config.threads);
79
80 if let Some(depth) = self.config.max_depth {
81 builder.max_depth(Some(depth));
82 }
83
84 for pattern in &self.config.custom_ignores {
86 builder.add_custom_ignore_filename(pattern);
87 }
88
89 let filter_clone = Arc::clone(&filter);
91 let analyzer_clone = Arc::clone(&analyzer);
92
93 builder.build_parallel().run(|| {
94 let tx = tx.clone();
95 let filter = Arc::clone(&filter_clone);
96 let analyzer = Arc::clone(&analyzer_clone);
97 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
99
100 Box::new(move |entry: std::result::Result<DirEntry, ignore::Error>| {
101 let entry = match entry {
102 Ok(e) => e,
103 Err(_) => return WalkState::Continue,
104 };
105
106 let path = entry.path();
107 let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
108
109 if !filter.should_include(path, is_dir) {
111 if is_dir {
112 return WalkState::Skip;
113 }
114 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
115 return WalkState::Continue;
116 }
117
118 if is_dir {
120 return WalkState::Continue;
121 }
122
123 if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
125 return WalkState::Continue;
126 }
127
128 buf.clear();
130 match std::fs::File::open(path).and_then(|mut f| {
131 if let Ok(meta) = f.metadata() {
132 buf.reserve(meta.len() as usize);
133 }
134 std::io::Read::read_to_end(&mut f, &mut buf)
135 }) {
136 Ok(_) => match analyzer.analyze_from_bytes(path, &buf) {
137 Ok(Some(stats)) => {
138 let _ = tx.send(WalkResult::File(stats));
139 }
140 Ok(None) => {
141 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
142 }
143 Err(_) => {
144 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
145 }
146 },
147 Err(_) => {
148 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
149 }
150 }
151
152 if buf.capacity() > 1_048_576 {
154 buf = Vec::with_capacity(64 * 1024);
155 }
156
157 WalkState::Continue
158 })
159 });
160
161 drop(tx);
163
164 for result in rx {
166 match result {
167 WalkResult::File(stats) => on_file(stats),
168 WalkResult::Skipped(path) => on_skip(&path),
169 }
170 }
171
172 Ok(())
173 }
174}
175
176impl Default for ParallelWalker {
177 fn default() -> Self {
178 Self::new(WalkerConfig::default())
179 }
180}
181
182enum WalkResult {
184 File(FileStats),
186 Skipped(std::path::PathBuf),
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use std::sync::atomic::{AtomicUsize, Ordering};
194 use tempfile::TempDir;
195
196 struct AllowAll;
197 impl Filter for AllowAll {
198 fn should_include(&self, _path: &Path, _is_dir: bool) -> bool {
199 true
200 }
201 }
202
203 #[test]
204 fn test_walker_config_default() {
205 let config = WalkerConfig::default();
206 assert!(config.threads > 0);
207 assert!(config.use_gitignore);
208 assert!(!config.follow_symlinks);
209 }
210
211 #[test]
212 fn test_walk_empty_dir() {
213 let dir = TempDir::new().unwrap();
214 let walker = ParallelWalker::default();
215 let registry = Arc::new(crate::language::LanguageRegistry::empty());
216 let analyzer = Arc::new(FileAnalyzer::new(
217 registry,
218 &crate::config::Config::default(),
219 ));
220 let filter = Arc::new(AllowAll);
221
222 let count = AtomicUsize::new(0);
223 walker
224 .walk_and_analyze(
225 dir.path(),
226 analyzer,
227 filter,
228 |_| {
229 count.fetch_add(1, Ordering::SeqCst);
230 },
231 |_| {},
232 )
233 .unwrap();
234
235 assert_eq!(count.load(Ordering::SeqCst), 0);
236 }
237}