null_e/scanner/
parallel.rs1use crate::core::{
6 ArtifactStats, Project, ProjectId, ScanConfig, ScanError, ScanProgress, ScanResult, Scanner,
7};
8use crate::error::{DevSweepError, Result};
9use crate::plugins::PluginRegistry;
10use dashmap::DashMap;
11use rayon::prelude::*;
12use std::path::{Path, PathBuf};
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Instant;
16use walkdir::WalkDir;
17
18pub struct ParallelScanner {
20 registry: Arc<PluginRegistry>,
21 progress: Arc<ScanProgress>,
22}
23
24impl ParallelScanner {
25 pub fn new(registry: Arc<PluginRegistry>) -> Self {
27 Self {
28 registry,
29 progress: ScanProgress::new(),
30 }
31 }
32
33 fn scan_root(&self, root: &Path, projects: &DashMap<ProjectId, Project>, config: &ScanConfig) -> Result<()> {
35 let walker = WalkDir::new(root)
36 .max_depth(config.max_depth.unwrap_or(usize::MAX))
37 .follow_links(false);
38
39 let skip_hidden = config.skip_hidden;
40
41 let skip_dirs: std::collections::HashSet<&str> = [
43 "node_modules",
44 "target",
45 ".venv",
46 "venv",
47 "__pycache__",
48 "vendor",
49 "build",
50 ".gradle",
51 "bin",
52 "obj",
53 ".build",
54 "Pods",
55 "DerivedData",
56 ".next",
57 ".nuxt",
58 "dist",
59 ".cache",
60 ".turbo",
61 "coverage",
62 ].into_iter().collect();
63
64 let entries = walker.into_iter().filter_entry(move |e| {
65 let name = e.file_name().to_str().unwrap_or("");
66
67 if e.depth() > 0 && skip_dirs.contains(name) {
69 return false;
70 }
71
72 if skip_hidden && e.depth() > 0 && name.starts_with('.') {
74 let allowed_hidden = [".git", ".github", ".vscode", ".idea"];
76 if !allowed_hidden.contains(&name) {
77 return false;
78 }
79 }
80
81 true
82 });
83
84 for entry in entries {
85 if self.progress.is_cancelled() {
87 return Err(DevSweepError::ScanInterrupted);
88 }
89
90 let entry = match entry {
91 Ok(e) => e,
92 Err(e) => {
93 self.progress.add_error(ScanError::new(
94 PathBuf::new(),
95 format!("Walk error: {}", e),
96 ));
97 continue;
98 }
99 };
100
101 if !entry.file_type().is_dir() {
103 continue;
104 }
105
106 let path = entry.path();
107 self.progress.inc_directories();
108 self.progress.set_current_path(path.to_path_buf());
109
110 let project_id = ProjectId::from_path(path);
112 if projects.contains_key(&project_id) {
113 continue;
114 }
115
116 if let Some((kind, plugin)) = self.registry.detect_project(path) {
118 let mut project = Project::new(kind, path.to_path_buf());
120
121 match plugin.find_artifacts(path) {
123 Ok(mut artifacts) => {
124 artifacts.par_iter_mut().for_each(|artifact| {
126 if let Ok(size) = plugin.calculate_size(artifact) {
127 artifact.size = size;
128 }
129 if let Ok(count) = crate::plugins::count_files(&artifact.path) {
130 artifact.file_count = count;
131 }
132 });
133
134 if let Some(min_size) = config.min_size {
136 artifacts.retain(|a| a.size >= min_size);
137 }
138
139 if artifacts.is_empty() {
141 continue;
142 }
143
144 project.artifacts = artifacts;
145 project.calculate_totals();
146
147 if let Ok(meta) = std::fs::metadata(path) {
149 project.last_modified = meta.modified().ok();
150 }
151
152 self.progress.inc_projects();
153 self.progress.add_size(project.cleanable_size);
154
155 projects.insert(project_id, project);
156 }
157 Err(e) => {
158 self.progress.add_error(ScanError::new(
159 path.to_path_buf(),
160 format!("Failed to find artifacts: {}", e),
161 ));
162 }
163 }
164 }
165 }
166
167 Ok(())
168 }
169}
170
171impl Scanner for ParallelScanner {
172 fn scan(&self, config: &ScanConfig) -> Result<ScanResult> {
173 let start = Instant::now();
174
175 if config.roots.is_empty() {
177 return Err(DevSweepError::Config("No scan roots specified".into()));
178 }
179
180 for root in &config.roots {
181 if !root.exists() {
182 return Err(DevSweepError::PathNotFound(root.clone()));
183 }
184 if !root.is_dir() {
185 return Err(DevSweepError::NotADirectory(root.clone()));
186 }
187 }
188
189 let pool = rayon::ThreadPoolBuilder::new()
191 .num_threads(config.parallelism.unwrap_or(num_cpus::get()))
192 .build()
193 .map_err(|e| DevSweepError::Scanner(format!("Thread pool error: {}", e)))?;
194
195 let projects: DashMap<ProjectId, Project> = DashMap::new();
197
198 pool.install(|| {
200 config.roots.par_iter().for_each(|root| {
201 if let Err(e) = self.scan_root(root, &projects, config) {
202 if !matches!(e, DevSweepError::ScanInterrupted) {
203 self.progress.add_error(ScanError::new(
204 root.clone(),
205 e.to_string(),
206 ));
207 }
208 }
209 });
210 });
211
212 self.progress.mark_complete();
213
214 if self.progress.is_cancelled() {
216 return Err(DevSweepError::ScanInterrupted);
217 }
218
219 let mut results: Vec<Project> = projects.into_iter().map(|(_, p)| p).collect();
221 results.sort_by(|a, b| b.cleanable_size.cmp(&a.cleanable_size));
222
223 if let Some(limit) = config.limit {
225 results.truncate(limit);
226 }
227
228 let mut stats = ArtifactStats::default();
230 for project in &results {
231 for artifact in &project.artifacts {
232 stats.add(artifact);
233 }
234 }
235
236 let total_size: u64 = results.iter().map(|p| p.total_size).sum();
237 let total_cleanable: u64 = results.iter().map(|p| p.cleanable_size).sum();
238
239 Ok(ScanResult {
240 projects: results,
241 total_size,
242 total_cleanable,
243 duration: start.elapsed(),
244 directories_scanned: self.progress.directories_scanned.load(Ordering::Relaxed),
245 errors: std::mem::take(&mut *self.progress.errors.lock()),
246 stats,
247 })
248 }
249
250 fn progress(&self) -> Arc<ScanProgress> {
251 Arc::clone(&self.progress)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use tempfile::TempDir;
259
260 fn setup_node_project(path: &Path) {
261 std::fs::write(path.join("package.json"), r#"{"name": "test"}"#).unwrap();
262 std::fs::create_dir(path.join("node_modules")).unwrap();
263 std::fs::write(path.join("node_modules/.package-lock.json"), "{}").unwrap();
264 }
265
266 fn setup_rust_project(path: &Path) {
267 std::fs::write(
268 path.join("Cargo.toml"),
269 "[package]\nname = \"test\"\nversion = \"0.1.0\"\n",
270 )
271 .unwrap();
272 std::fs::create_dir_all(path.join("target/debug")).unwrap();
273 std::fs::write(path.join("target/debug/test"), "binary").unwrap();
274 }
275
276 #[test]
277 fn test_scan_empty_directory() {
278 let temp = TempDir::new().unwrap();
279 let registry = Arc::new(PluginRegistry::with_builtins());
280 let scanner = ParallelScanner::new(registry);
281
282 let config = ScanConfig::new(temp.path());
283 let result = scanner.scan(&config).unwrap();
284
285 assert_eq!(result.projects.len(), 0);
286 assert_eq!(result.total_cleanable, 0);
287 }
288
289 #[test]
290 fn test_scan_node_project() {
291 let temp = TempDir::new().unwrap();
292 setup_node_project(temp.path());
293
294 let registry = Arc::new(PluginRegistry::with_builtins());
295 let scanner = ParallelScanner::new(registry);
296
297 let config = ScanConfig::new(temp.path());
298 let result = scanner.scan(&config).unwrap();
299
300 assert_eq!(result.projects.len(), 1);
301 assert!(result.projects[0].artifacts.iter().any(|a| a.name() == "node_modules"));
302 }
303
304 #[test]
305 fn test_scan_rust_project() {
306 let temp = TempDir::new().unwrap();
307 setup_rust_project(temp.path());
308
309 let registry = Arc::new(PluginRegistry::with_builtins());
310 let scanner = ParallelScanner::new(registry);
311
312 let config = ScanConfig::new(temp.path());
313 let result = scanner.scan(&config).unwrap();
314
315 assert_eq!(result.projects.len(), 1);
316 assert!(result.projects[0].artifacts.iter().any(|a| a.name() == "target"));
317 }
318
319 #[test]
320 fn test_scan_multiple_projects() {
321 let temp = TempDir::new().unwrap();
322
323 let node_proj = temp.path().join("node-app");
325 std::fs::create_dir(&node_proj).unwrap();
326 setup_node_project(&node_proj);
327
328 let rust_proj = temp.path().join("rust-app");
330 std::fs::create_dir(&rust_proj).unwrap();
331 setup_rust_project(&rust_proj);
332
333 let registry = Arc::new(PluginRegistry::with_builtins());
334 let scanner = ParallelScanner::new(registry);
335
336 let config = ScanConfig::new(temp.path());
337 let result = scanner.scan(&config).unwrap();
338
339 assert_eq!(result.projects.len(), 2);
340 }
341
342 #[test]
343 fn test_scan_with_min_size_filter() {
344 let temp = TempDir::new().unwrap();
345 setup_node_project(temp.path());
346
347 let registry = Arc::new(PluginRegistry::with_builtins());
348 let scanner = ParallelScanner::new(registry);
349
350 let config = ScanConfig::new(temp.path()).with_min_size(1_000_000_000);
352 let result = scanner.scan(&config).unwrap();
353
354 assert_eq!(result.projects.len(), 0);
356 }
357
358 #[test]
359 fn test_scan_with_max_depth() {
360 let temp = TempDir::new().unwrap();
361
362 let deep = temp.path().join("a/b/c/d/e");
364 std::fs::create_dir_all(&deep).unwrap();
365 setup_node_project(&deep);
366
367 let registry = Arc::new(PluginRegistry::with_builtins());
368 let scanner = ParallelScanner::new(registry);
369
370 let config = ScanConfig::new(temp.path()).with_max_depth(3);
372 let result = scanner.scan(&config).unwrap();
373
374 assert_eq!(result.projects.len(), 0);
376 }
377
378 #[test]
379 fn test_scan_cancellation() {
380 let temp = TempDir::new().unwrap();
381 setup_node_project(temp.path());
382
383 let registry = Arc::new(PluginRegistry::with_builtins());
384 let scanner = ParallelScanner::new(registry);
385
386 scanner.cancel();
388
389 let config = ScanConfig::new(temp.path());
390 let result = scanner.scan(&config);
391
392 assert!(result.is_err());
393 assert!(matches!(
394 result.unwrap_err(),
395 DevSweepError::ScanInterrupted
396 ));
397 }
398
399 #[test]
400 fn test_progress_tracking() {
401 let temp = TempDir::new().unwrap();
402 setup_node_project(temp.path());
403
404 let registry = Arc::new(PluginRegistry::with_builtins());
405 let scanner = ParallelScanner::new(registry);
406 let progress = scanner.progress();
407
408 let config = ScanConfig::new(temp.path());
409 let _ = scanner.scan(&config);
410
411 let snapshot = progress.snapshot();
412 assert!(snapshot.is_complete);
413 assert!(snapshot.directories_scanned > 0);
414 }
415}