codelens_core/walker/
parallel.rs1use std::path::Path;
4use std::sync::Arc;
5
6use crossbeam_channel::bounded;
7use ignore::{DirEntry, WalkBuilder, WalkState};
8
9use crate::analyzer::stats::FileStats;
10use crate::analyzer::FileAnalyzer;
11use crate::error::Result;
12use crate::filter::Filter;
13
14#[derive(Debug, Clone)]
16pub struct WalkerConfig {
17 pub threads: usize,
19 pub follow_symlinks: bool,
21 pub use_gitignore: bool,
23 pub max_depth: Option<usize>,
25 pub custom_ignores: Vec<String>,
27}
28
29impl Default for WalkerConfig {
30 fn default() -> Self {
31 Self {
32 threads: num_cpus::get(),
33 follow_symlinks: false,
34 use_gitignore: true,
35 max_depth: None,
36 custom_ignores: Vec::new(),
37 }
38 }
39}
40
41pub struct ParallelWalker {
43 config: WalkerConfig,
44}
45
46impl ParallelWalker {
47 pub fn new(config: WalkerConfig) -> Self {
49 Self { config }
50 }
51
52 pub fn walk_and_analyze<F, S>(
57 &self,
58 root: &Path,
59 analyzer: Arc<FileAnalyzer>,
60 filter: Arc<dyn Filter>,
61 mut on_file: F,
62 mut on_skip: S,
63 ) -> Result<()>
64 where
65 F: FnMut(FileStats) + Send,
66 S: FnMut(&Path) + Send,
67 {
68 let (tx, rx) = bounded::<WalkResult>(1000);
69
70 let mut builder = WalkBuilder::new(root);
72 builder
73 .hidden(false) .git_ignore(self.config.use_gitignore)
75 .git_global(self.config.use_gitignore)
76 .git_exclude(self.config.use_gitignore)
77 .follow_links(self.config.follow_symlinks)
78 .threads(self.config.threads);
79
80 if let Some(depth) = self.config.max_depth {
81 builder.max_depth(Some(depth));
82 }
83
84 for pattern in &self.config.custom_ignores {
86 builder.add_custom_ignore_filename(pattern);
87 }
88
89 let filter_clone = Arc::clone(&filter);
96 let analyzer_clone = Arc::clone(&analyzer);
97
98 std::thread::scope(|s| {
99 let consumer = s.spawn(|| {
101 for result in &rx {
102 match result {
103 WalkResult::File(stats) => on_file(stats),
104 WalkResult::Skipped(path) => on_skip(&path),
105 }
106 }
107 });
108
109 builder.build_parallel().run(|| {
111 let tx = tx.clone();
112 let filter = Arc::clone(&filter_clone);
113 let analyzer = Arc::clone(&analyzer_clone);
114 let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
116
117 Box::new(move |entry: std::result::Result<DirEntry, ignore::Error>| {
118 let entry = match entry {
119 Ok(e) => e,
120 Err(_) => return WalkState::Continue,
121 };
122
123 let path = entry.path();
124 let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
125
126 if !filter.should_include(path, is_dir) {
128 if is_dir {
129 return WalkState::Skip;
130 }
131 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
132 return WalkState::Continue;
133 }
134
135 if is_dir {
137 return WalkState::Continue;
138 }
139
140 if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
142 return WalkState::Continue;
143 }
144
145 buf.clear();
147 match std::fs::File::open(path).and_then(|mut f| {
148 if let Ok(meta) = f.metadata() {
149 buf.reserve(meta.len() as usize);
150 }
151 std::io::Read::read_to_end(&mut f, &mut buf)
152 }) {
153 Ok(_) => match analyzer.analyze_from_bytes(path, &buf) {
154 Ok(Some(stats)) => {
155 let _ = tx.send(WalkResult::File(stats));
156 }
157 Ok(None) => {
158 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
159 }
160 Err(_) => {
161 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
162 }
163 },
164 Err(_) => {
165 let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
166 }
167 }
168
169 if buf.capacity() > 1_048_576 {
171 buf = Vec::with_capacity(64 * 1024);
172 }
173
174 WalkState::Continue
175 })
176 });
177
178 drop(tx);
180
181 consumer.join().expect("consumer thread panicked");
183 });
184
185 Ok(())
186 }
187}
188
189impl Default for ParallelWalker {
190 fn default() -> Self {
191 Self::new(WalkerConfig::default())
192 }
193}
194
195enum WalkResult {
197 File(FileStats),
199 Skipped(std::path::PathBuf),
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use std::sync::atomic::{AtomicUsize, Ordering};
207 use tempfile::TempDir;
208
209 struct AllowAll;
210 impl Filter for AllowAll {
211 fn should_include(&self, _path: &Path, _is_dir: bool) -> bool {
212 true
213 }
214 }
215
216 #[test]
217 fn test_walker_config_default() {
218 let config = WalkerConfig::default();
219 assert!(config.threads > 0);
220 assert!(config.use_gitignore);
221 assert!(!config.follow_symlinks);
222 }
223
224 #[test]
225 fn test_walk_empty_dir() {
226 let dir = TempDir::new().unwrap();
227 let walker = ParallelWalker::default();
228 let registry = Arc::new(crate::language::LanguageRegistry::empty());
229 let analyzer = Arc::new(FileAnalyzer::new(
230 registry,
231 &crate::config::Config::default(),
232 ));
233 let filter = Arc::new(AllowAll);
234
235 let count = AtomicUsize::new(0);
236 walker
237 .walk_and_analyze(
238 dir.path(),
239 analyzer,
240 filter,
241 |_| {
242 count.fetch_add(1, Ordering::SeqCst);
243 },
244 |_| {},
245 )
246 .unwrap();
247
248 assert_eq!(count.load(Ordering::SeqCst), 0);
249 }
250
251 #[test]
252 fn test_walk_many_files_no_deadlock() {
253 let dir = TempDir::new().unwrap();
257 let file_count = 1500;
258 for i in 0..file_count {
259 std::fs::write(dir.path().join(format!("file_{i}.rs")), "fn main() {}\n").unwrap();
260 }
261
262 let walker = ParallelWalker::default();
263 let registry = Arc::new(crate::language::LanguageRegistry::with_builtin().unwrap());
264 let analyzer = Arc::new(FileAnalyzer::new(
265 registry,
266 &crate::config::Config::default(),
267 ));
268 let filter = Arc::new(AllowAll);
269
270 let analyzed = AtomicUsize::new(0);
271 walker
272 .walk_and_analyze(
273 dir.path(),
274 analyzer,
275 filter,
276 |_| {
277 analyzed.fetch_add(1, Ordering::SeqCst);
278 },
279 |_| {},
280 )
281 .unwrap();
282
283 assert_eq!(analyzed.load(Ordering::SeqCst), file_count);
284 }
285}