Skip to main content

null_e/scanner/
parallel.rs

1//! Parallel filesystem scanner implementation
2//!
3//! Uses jwalk for parallel directory traversal and rayon for parallel processing.
4
5use crate::core::{
6    ArtifactStats, Project, ProjectId, ScanConfig, ScanError, ScanProgress, ScanResult, Scanner,
7};
8use crate::error::{DevSweepError, Result};
9use crate::plugins::PluginRegistry;
10use dashmap::DashMap;
11use rayon::prelude::*;
12use std::path::{Path, PathBuf};
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Instant;
16use walkdir::WalkDir;
17
18/// High-performance parallel scanner
19pub struct ParallelScanner {
20    registry: Arc<PluginRegistry>,
21    progress: Arc<ScanProgress>,
22}
23
24impl ParallelScanner {
25    /// Create a new parallel scanner
26    pub fn new(registry: Arc<PluginRegistry>) -> Self {
27        Self {
28            registry,
29            progress: ScanProgress::new(),
30        }
31    }
32
33    /// Scan a single root directory
34    fn scan_root(&self, root: &Path, projects: &DashMap<ProjectId, Project>, config: &ScanConfig) -> Result<()> {
35        let walker = WalkDir::new(root)
36            .max_depth(config.max_depth.unwrap_or(usize::MAX))
37            .follow_links(false);
38
39        let skip_hidden = config.skip_hidden;
40
41        // Directories to skip (artifact directories that contain nested packages)
42        let skip_dirs: std::collections::HashSet<&str> = [
43            "node_modules",
44            "target",
45            ".venv",
46            "venv",
47            "__pycache__",
48            "vendor",
49            "build",
50            ".gradle",
51            "bin",
52            "obj",
53            ".build",
54            "Pods",
55            "DerivedData",
56            ".next",
57            ".nuxt",
58            "dist",
59            ".cache",
60            ".turbo",
61            "coverage",
62        ].into_iter().collect();
63
64        let entries = walker.into_iter().filter_entry(move |e| {
65            let name = e.file_name().to_str().unwrap_or("");
66
67            // Skip artifact directories (they contain nested packages we don't want)
68            if e.depth() > 0 && skip_dirs.contains(name) {
69                return false;
70            }
71
72            // Skip hidden directories if configured
73            if skip_hidden && e.depth() > 0 && name.starts_with('.') {
74                // But allow some important hidden dirs
75                let allowed_hidden = [".git", ".github", ".vscode", ".idea"];
76                if !allowed_hidden.contains(&name) {
77                    return false;
78                }
79            }
80
81            true
82        });
83
84        for entry in entries {
85            // Check for cancellation
86            if self.progress.is_cancelled() {
87                return Err(DevSweepError::ScanInterrupted);
88            }
89
90            let entry = match entry {
91                Ok(e) => e,
92                Err(e) => {
93                    self.progress.add_error(ScanError::new(
94                        PathBuf::new(),
95                        format!("Walk error: {}", e),
96                    ));
97                    continue;
98                }
99            };
100
101            // Only process directories
102            if !entry.file_type().is_dir() {
103                continue;
104            }
105
106            let path = entry.path();
107            self.progress.inc_directories();
108            self.progress.set_current_path(path.to_path_buf());
109
110            // Skip if already found as a project or inside a project
111            let project_id = ProjectId::from_path(path);
112            if projects.contains_key(&project_id) {
113                continue;
114            }
115
116            // Try to detect project type
117            if let Some((kind, plugin)) = self.registry.detect_project(path) {
118                // Found a project!
119                let mut project = Project::new(kind, path.to_path_buf());
120
121                // Find artifacts
122                match plugin.find_artifacts(path) {
123                    Ok(mut artifacts) => {
124                        // Calculate sizes in parallel
125                        artifacts.par_iter_mut().for_each(|artifact| {
126                            if let Ok(size) = plugin.calculate_size(artifact) {
127                                artifact.size = size;
128                            }
129                            if let Ok(count) = crate::plugins::count_files(&artifact.path) {
130                                artifact.file_count = count;
131                            }
132                        });
133
134                        // Filter by minimum size if specified
135                        if let Some(min_size) = config.min_size {
136                            artifacts.retain(|a| a.size >= min_size);
137                        }
138
139                        // Skip if no meaningful artifacts
140                        if artifacts.is_empty() {
141                            continue;
142                        }
143
144                        project.artifacts = artifacts;
145                        project.calculate_totals();
146
147                        // Get last modified time
148                        if let Ok(meta) = std::fs::metadata(path) {
149                            project.last_modified = meta.modified().ok();
150                        }
151
152                        self.progress.inc_projects();
153                        self.progress.add_size(project.cleanable_size);
154
155                        projects.insert(project_id, project);
156                    }
157                    Err(e) => {
158                        self.progress.add_error(ScanError::new(
159                            path.to_path_buf(),
160                            format!("Failed to find artifacts: {}", e),
161                        ));
162                    }
163                }
164            }
165        }
166
167        Ok(())
168    }
169}
170
171impl Scanner for ParallelScanner {
172    fn scan(&self, config: &ScanConfig) -> Result<ScanResult> {
173        let start = Instant::now();
174
175        // Validate roots
176        if config.roots.is_empty() {
177            return Err(DevSweepError::Config("No scan roots specified".into()));
178        }
179
180        for root in &config.roots {
181            if !root.exists() {
182                return Err(DevSweepError::PathNotFound(root.clone()));
183            }
184            if !root.is_dir() {
185                return Err(DevSweepError::NotADirectory(root.clone()));
186            }
187        }
188
189        // Configure thread pool
190        let pool = rayon::ThreadPoolBuilder::new()
191            .num_threads(config.parallelism.unwrap_or(num_cpus::get()))
192            .build()
193            .map_err(|e| DevSweepError::Scanner(format!("Thread pool error: {}", e)))?;
194
195        // Concurrent project map
196        let projects: DashMap<ProjectId, Project> = DashMap::new();
197
198        // Scan each root
199        pool.install(|| {
200            config.roots.par_iter().for_each(|root| {
201                if let Err(e) = self.scan_root(root, &projects, config) {
202                    if !matches!(e, DevSweepError::ScanInterrupted) {
203                        self.progress.add_error(ScanError::new(
204                            root.clone(),
205                            e.to_string(),
206                        ));
207                    }
208                }
209            });
210        });
211
212        self.progress.mark_complete();
213
214        // Check if scan was cancelled
215        if self.progress.is_cancelled() {
216            return Err(DevSweepError::ScanInterrupted);
217        }
218
219        // Collect and sort results
220        let mut results: Vec<Project> = projects.into_iter().map(|(_, p)| p).collect();
221        results.sort_by(|a, b| b.cleanable_size.cmp(&a.cleanable_size));
222
223        // Apply limit if specified
224        if let Some(limit) = config.limit {
225            results.truncate(limit);
226        }
227
228        // Calculate statistics
229        let mut stats = ArtifactStats::default();
230        for project in &results {
231            for artifact in &project.artifacts {
232                stats.add(artifact);
233            }
234        }
235
236        let total_size: u64 = results.iter().map(|p| p.total_size).sum();
237        let total_cleanable: u64 = results.iter().map(|p| p.cleanable_size).sum();
238
239        Ok(ScanResult {
240            projects: results,
241            total_size,
242            total_cleanable,
243            duration: start.elapsed(),
244            directories_scanned: self.progress.directories_scanned.load(Ordering::Relaxed),
245            errors: std::mem::take(&mut *self.progress.errors.lock()),
246            stats,
247        })
248    }
249
250    fn progress(&self) -> Arc<ScanProgress> {
251        Arc::clone(&self.progress)
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use tempfile::TempDir;
259
260    fn setup_node_project(path: &Path) {
261        std::fs::write(path.join("package.json"), r#"{"name": "test"}"#).unwrap();
262        std::fs::create_dir(path.join("node_modules")).unwrap();
263        std::fs::write(path.join("node_modules/.package-lock.json"), "{}").unwrap();
264    }
265
266    fn setup_rust_project(path: &Path) {
267        std::fs::write(
268            path.join("Cargo.toml"),
269            "[package]\nname = \"test\"\nversion = \"0.1.0\"\n",
270        )
271        .unwrap();
272        std::fs::create_dir_all(path.join("target/debug")).unwrap();
273        std::fs::write(path.join("target/debug/test"), "binary").unwrap();
274    }
275
276    #[test]
277    fn test_scan_empty_directory() {
278        let temp = TempDir::new().unwrap();
279        let registry = Arc::new(PluginRegistry::with_builtins());
280        let scanner = ParallelScanner::new(registry);
281
282        let config = ScanConfig::new(temp.path());
283        let result = scanner.scan(&config).unwrap();
284
285        assert_eq!(result.projects.len(), 0);
286        assert_eq!(result.total_cleanable, 0);
287    }
288
289    #[test]
290    fn test_scan_node_project() {
291        let temp = TempDir::new().unwrap();
292        setup_node_project(temp.path());
293
294        let registry = Arc::new(PluginRegistry::with_builtins());
295        let scanner = ParallelScanner::new(registry);
296
297        let config = ScanConfig::new(temp.path());
298        let result = scanner.scan(&config).unwrap();
299
300        assert_eq!(result.projects.len(), 1);
301        assert!(result.projects[0].artifacts.iter().any(|a| a.name() == "node_modules"));
302    }
303
304    #[test]
305    fn test_scan_rust_project() {
306        let temp = TempDir::new().unwrap();
307        setup_rust_project(temp.path());
308
309        let registry = Arc::new(PluginRegistry::with_builtins());
310        let scanner = ParallelScanner::new(registry);
311
312        let config = ScanConfig::new(temp.path());
313        let result = scanner.scan(&config).unwrap();
314
315        assert_eq!(result.projects.len(), 1);
316        assert!(result.projects[0].artifacts.iter().any(|a| a.name() == "target"));
317    }
318
319    #[test]
320    fn test_scan_multiple_projects() {
321        let temp = TempDir::new().unwrap();
322
323        // Create Node project
324        let node_proj = temp.path().join("node-app");
325        std::fs::create_dir(&node_proj).unwrap();
326        setup_node_project(&node_proj);
327
328        // Create Rust project
329        let rust_proj = temp.path().join("rust-app");
330        std::fs::create_dir(&rust_proj).unwrap();
331        setup_rust_project(&rust_proj);
332
333        let registry = Arc::new(PluginRegistry::with_builtins());
334        let scanner = ParallelScanner::new(registry);
335
336        let config = ScanConfig::new(temp.path());
337        let result = scanner.scan(&config).unwrap();
338
339        assert_eq!(result.projects.len(), 2);
340    }
341
342    #[test]
343    fn test_scan_with_min_size_filter() {
344        let temp = TempDir::new().unwrap();
345        setup_node_project(temp.path());
346
347        let registry = Arc::new(PluginRegistry::with_builtins());
348        let scanner = ParallelScanner::new(registry);
349
350        // Set min size very high
351        let config = ScanConfig::new(temp.path()).with_min_size(1_000_000_000);
352        let result = scanner.scan(&config).unwrap();
353
354        // Should find no projects since artifacts are small
355        assert_eq!(result.projects.len(), 0);
356    }
357
358    #[test]
359    fn test_scan_with_max_depth() {
360        let temp = TempDir::new().unwrap();
361
362        // Create nested project
363        let deep = temp.path().join("a/b/c/d/e");
364        std::fs::create_dir_all(&deep).unwrap();
365        setup_node_project(&deep);
366
367        let registry = Arc::new(PluginRegistry::with_builtins());
368        let scanner = ParallelScanner::new(registry);
369
370        // Limit depth to 3
371        let config = ScanConfig::new(temp.path()).with_max_depth(3);
372        let result = scanner.scan(&config).unwrap();
373
374        // Should not find the deeply nested project
375        assert_eq!(result.projects.len(), 0);
376    }
377
378    #[test]
379    fn test_scan_cancellation() {
380        let temp = TempDir::new().unwrap();
381        setup_node_project(temp.path());
382
383        let registry = Arc::new(PluginRegistry::with_builtins());
384        let scanner = ParallelScanner::new(registry);
385
386        // Cancel immediately
387        scanner.cancel();
388
389        let config = ScanConfig::new(temp.path());
390        let result = scanner.scan(&config);
391
392        assert!(result.is_err());
393        assert!(matches!(
394            result.unwrap_err(),
395            DevSweepError::ScanInterrupted
396        ));
397    }
398
399    #[test]
400    fn test_progress_tracking() {
401        let temp = TempDir::new().unwrap();
402        setup_node_project(temp.path());
403
404        let registry = Arc::new(PluginRegistry::with_builtins());
405        let scanner = ParallelScanner::new(registry);
406        let progress = scanner.progress();
407
408        let config = ScanConfig::new(temp.path());
409        let _ = scanner.scan(&config);
410
411        let snapshot = progress.snapshot();
412        assert!(snapshot.is_complete);
413        assert!(snapshot.directories_scanned > 0);
414    }
415}