Skip to main content

codelens_core/walker/
parallel.rs

1//! Parallel directory walker using the `ignore` crate.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use crossbeam_channel::bounded;
7use ignore::{DirEntry, WalkBuilder, WalkState};
8
9use crate::analyzer::stats::FileStats;
10use crate::analyzer::FileAnalyzer;
11use crate::error::Result;
12use crate::filter::Filter;
13
14/// Configuration for the parallel walker.
15#[derive(Debug, Clone)]
16pub struct WalkerConfig {
17    /// Number of threads to use.
18    pub threads: usize,
19    /// Whether to follow symbolic links.
20    pub follow_symlinks: bool,
21    /// Whether to respect .gitignore files.
22    pub use_gitignore: bool,
23    /// Maximum directory depth (None = unlimited).
24    pub max_depth: Option<usize>,
25    /// Additional ignore patterns.
26    pub custom_ignores: Vec<String>,
27}
28
29impl Default for WalkerConfig {
30    fn default() -> Self {
31        Self {
32            threads: num_cpus::get(),
33            follow_symlinks: false,
34            use_gitignore: true,
35            max_depth: None,
36            custom_ignores: Vec::new(),
37        }
38    }
39}
40
41/// Parallel directory walker.
42pub struct ParallelWalker {
43    config: WalkerConfig,
44}
45
46impl ParallelWalker {
47    /// Create a new parallel walker.
48    pub fn new(config: WalkerConfig) -> Self {
49        Self { config }
50    }
51
52    /// Walk directories and analyze files in parallel.
53    ///
54    /// Calls `on_file` for each successfully analyzed file,
55    /// and `on_skip` for each skipped file.
56    pub fn walk_and_analyze<F, S>(
57        &self,
58        root: &Path,
59        analyzer: Arc<FileAnalyzer>,
60        filter: Arc<dyn Filter>,
61        mut on_file: F,
62        mut on_skip: S,
63    ) -> Result<()>
64    where
65        F: FnMut(FileStats) + Send,
66        S: FnMut(&Path) + Send,
67    {
68        let (tx, rx) = bounded::<WalkResult>(1000);
69
70        // Build the walker
71        let mut builder = WalkBuilder::new(root);
72        builder
73            .hidden(false) // Don't skip hidden files by default
74            .git_ignore(self.config.use_gitignore)
75            .git_global(self.config.use_gitignore)
76            .git_exclude(self.config.use_gitignore)
77            .follow_links(self.config.follow_symlinks)
78            .threads(self.config.threads);
79
80        if let Some(depth) = self.config.max_depth {
81            builder.max_depth(Some(depth));
82        }
83
84        // Add custom ignore patterns
85        for pattern in &self.config.custom_ignores {
86            builder.add_custom_ignore_filename(pattern);
87        }
88
89        // Start parallel walk
90        let filter_clone = Arc::clone(&filter);
91        let analyzer_clone = Arc::clone(&analyzer);
92
93        builder.build_parallel().run(|| {
94            let tx = tx.clone();
95            let filter = Arc::clone(&filter_clone);
96            let analyzer = Arc::clone(&analyzer_clone);
97            // Per-thread reusable buffer (64KB initial capacity)
98            let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
99
100            Box::new(move |entry: std::result::Result<DirEntry, ignore::Error>| {
101                let entry = match entry {
102                    Ok(e) => e,
103                    Err(_) => return WalkState::Continue,
104                };
105
106                let path = entry.path();
107                let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
108
109                // Apply custom filter
110                if !filter.should_include(path, is_dir) {
111                    if is_dir {
112                        return WalkState::Skip;
113                    }
114                    let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
115                    return WalkState::Continue;
116                }
117
118                // Skip directories (they're handled by the walker)
119                if is_dir {
120                    return WalkState::Continue;
121                }
122
123                // Skip non-files
124                if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
125                    return WalkState::Continue;
126                }
127
128                // Read file into reusable buffer
129                buf.clear();
130                match std::fs::File::open(path).and_then(|mut f| {
131                    if let Ok(meta) = f.metadata() {
132                        buf.reserve(meta.len() as usize);
133                    }
134                    std::io::Read::read_to_end(&mut f, &mut buf)
135                }) {
136                    Ok(_) => match analyzer.analyze_from_bytes(path, &buf) {
137                        Ok(Some(stats)) => {
138                            let _ = tx.send(WalkResult::File(stats));
139                        }
140                        Ok(None) => {
141                            let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
142                        }
143                        Err(_) => {
144                            let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
145                        }
146                    },
147                    Err(_) => {
148                        let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
149                    }
150                }
151
152                // Shrink buffer if it grew too large from a single file
153                if buf.capacity() > 1_048_576 {
154                    buf = Vec::with_capacity(64 * 1024);
155                }
156
157                WalkState::Continue
158            })
159        });
160
161        // Close the sender
162        drop(tx);
163
164        // Collect results
165        for result in rx {
166            match result {
167                WalkResult::File(stats) => on_file(stats),
168                WalkResult::Skipped(path) => on_skip(&path),
169            }
170        }
171
172        Ok(())
173    }
174}
175
176impl Default for ParallelWalker {
177    fn default() -> Self {
178        Self::new(WalkerConfig::default())
179    }
180}
181
182/// Result from walking a single entry.
183enum WalkResult {
184    /// Successfully analyzed file.
185    File(FileStats),
186    /// Skipped file.
187    Skipped(std::path::PathBuf),
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use std::sync::atomic::{AtomicUsize, Ordering};
194    use tempfile::TempDir;
195
196    struct AllowAll;
197    impl Filter for AllowAll {
198        fn should_include(&self, _path: &Path, _is_dir: bool) -> bool {
199            true
200        }
201    }
202
203    #[test]
204    fn test_walker_config_default() {
205        let config = WalkerConfig::default();
206        assert!(config.threads > 0);
207        assert!(config.use_gitignore);
208        assert!(!config.follow_symlinks);
209    }
210
211    #[test]
212    fn test_walk_empty_dir() {
213        let dir = TempDir::new().unwrap();
214        let walker = ParallelWalker::default();
215        let registry = Arc::new(crate::language::LanguageRegistry::empty());
216        let analyzer = Arc::new(FileAnalyzer::new(
217            registry,
218            &crate::config::Config::default(),
219        ));
220        let filter = Arc::new(AllowAll);
221
222        let count = AtomicUsize::new(0);
223        walker
224            .walk_and_analyze(
225                dir.path(),
226                analyzer,
227                filter,
228                |_| {
229                    count.fetch_add(1, Ordering::SeqCst);
230                },
231                |_| {},
232            )
233            .unwrap();
234
235        assert_eq!(count.load(Ordering::SeqCst), 0);
236    }
237}