Skip to main content

codelens_core/walker/
parallel.rs

1//! Parallel directory walker using the `ignore` crate.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use crossbeam_channel::bounded;
7use ignore::{DirEntry, WalkBuilder, WalkState};
8
9use crate::analyzer::stats::FileStats;
10use crate::analyzer::FileAnalyzer;
11use crate::error::Result;
12use crate::filter::Filter;
13
14/// Configuration for the parallel walker.
15#[derive(Debug, Clone)]
16pub struct WalkerConfig {
17    /// Number of threads to use.
18    pub threads: usize,
19    /// Whether to follow symbolic links.
20    pub follow_symlinks: bool,
21    /// Whether to respect .gitignore files.
22    pub use_gitignore: bool,
23    /// Maximum directory depth (None = unlimited).
24    pub max_depth: Option<usize>,
25    /// Additional ignore patterns.
26    pub custom_ignores: Vec<String>,
27}
28
29impl Default for WalkerConfig {
30    fn default() -> Self {
31        Self {
32            threads: num_cpus::get(),
33            follow_symlinks: false,
34            use_gitignore: true,
35            max_depth: None,
36            custom_ignores: Vec::new(),
37        }
38    }
39}
40
41/// Parallel directory walker.
42pub struct ParallelWalker {
43    config: WalkerConfig,
44}
45
46impl ParallelWalker {
47    /// Create a new parallel walker.
48    pub fn new(config: WalkerConfig) -> Self {
49        Self { config }
50    }
51
52    /// Walk directories and analyze files in parallel.
53    ///
54    /// Calls `on_file` for each successfully analyzed file,
55    /// and `on_skip` for each skipped file.
56    pub fn walk_and_analyze<F, S>(
57        &self,
58        root: &Path,
59        analyzer: Arc<FileAnalyzer>,
60        filter: Arc<dyn Filter>,
61        mut on_file: F,
62        mut on_skip: S,
63    ) -> Result<()>
64    where
65        F: FnMut(FileStats) + Send,
66        S: FnMut(&Path) + Send,
67    {
68        let (tx, rx) = bounded::<WalkResult>(1000);
69
70        // Build the walker
71        let mut builder = WalkBuilder::new(root);
72        builder
73            .hidden(false) // Don't skip hidden files by default
74            .git_ignore(self.config.use_gitignore)
75            .git_global(self.config.use_gitignore)
76            .git_exclude(self.config.use_gitignore)
77            .follow_links(self.config.follow_symlinks)
78            .threads(self.config.threads);
79
80        if let Some(depth) = self.config.max_depth {
81            builder.max_depth(Some(depth));
82        }
83
84        // Add custom ignore patterns
85        for pattern in &self.config.custom_ignores {
86            builder.add_custom_ignore_filename(pattern);
87        }
88
89        // Start parallel walk with concurrent consumer.
90        // The consumer MUST run in parallel with `run()` because `run()` blocks
91        // until all walker threads finish. With a bounded channel, walker threads
92        // block on send when the channel is full. If the consumer only starts
93        // after `run()` returns, this creates a deadlock when results exceed the
94        // channel capacity.
95        let filter_clone = Arc::clone(&filter);
96        let analyzer_clone = Arc::clone(&analyzer);
97
98        std::thread::scope(|s| {
99            // Spawn consumer thread that drains results while producers are running
100            let consumer = s.spawn(|| {
101                for result in &rx {
102                    match result {
103                        WalkResult::File(stats) => on_file(stats),
104                        WalkResult::Skipped(path) => on_skip(&path),
105                    }
106                }
107            });
108
109            // Run producers (blocks until all walker threads finish)
110            builder.build_parallel().run(|| {
111                let tx = tx.clone();
112                let filter = Arc::clone(&filter_clone);
113                let analyzer = Arc::clone(&analyzer_clone);
114                // Per-thread reusable buffer (64KB initial capacity)
115                let mut buf: Vec<u8> = Vec::with_capacity(64 * 1024);
116
117                Box::new(move |entry: std::result::Result<DirEntry, ignore::Error>| {
118                    let entry = match entry {
119                        Ok(e) => e,
120                        Err(_) => return WalkState::Continue,
121                    };
122
123                    let path = entry.path();
124                    let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
125
126                    // Apply custom filter
127                    if !filter.should_include(path, is_dir) {
128                        if is_dir {
129                            return WalkState::Skip;
130                        }
131                        let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
132                        return WalkState::Continue;
133                    }
134
135                    // Skip directories (they're handled by the walker)
136                    if is_dir {
137                        return WalkState::Continue;
138                    }
139
140                    // Skip non-files
141                    if !entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
142                        return WalkState::Continue;
143                    }
144
145                    // Read file into reusable buffer
146                    buf.clear();
147                    match std::fs::File::open(path).and_then(|mut f| {
148                        if let Ok(meta) = f.metadata() {
149                            buf.reserve(meta.len() as usize);
150                        }
151                        std::io::Read::read_to_end(&mut f, &mut buf)
152                    }) {
153                        Ok(_) => match analyzer.analyze_from_bytes(path, &buf) {
154                            Ok(Some(stats)) => {
155                                let _ = tx.send(WalkResult::File(stats));
156                            }
157                            Ok(None) => {
158                                let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
159                            }
160                            Err(_) => {
161                                let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
162                            }
163                        },
164                        Err(_) => {
165                            let _ = tx.send(WalkResult::Skipped(path.to_path_buf()));
166                        }
167                    }
168
169                    // Shrink buffer if it grew too large from a single file
170                    if buf.capacity() > 1_048_576 {
171                        buf = Vec::with_capacity(64 * 1024);
172                    }
173
174                    WalkState::Continue
175                })
176            });
177
178            // Close the sender so consumer finishes when all results are drained
179            drop(tx);
180
181            // Wait for consumer to finish
182            consumer.join().expect("consumer thread panicked");
183        });
184
185        Ok(())
186    }
187}
188
189impl Default for ParallelWalker {
190    fn default() -> Self {
191        Self::new(WalkerConfig::default())
192    }
193}
194
195/// Result from walking a single entry.
196enum WalkResult {
197    /// Successfully analyzed file.
198    File(FileStats),
199    /// Skipped file.
200    Skipped(std::path::PathBuf),
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use std::sync::atomic::{AtomicUsize, Ordering};
207    use tempfile::TempDir;
208
209    struct AllowAll;
210    impl Filter for AllowAll {
211        fn should_include(&self, _path: &Path, _is_dir: bool) -> bool {
212            true
213        }
214    }
215
216    #[test]
217    fn test_walker_config_default() {
218        let config = WalkerConfig::default();
219        assert!(config.threads > 0);
220        assert!(config.use_gitignore);
221        assert!(!config.follow_symlinks);
222    }
223
224    #[test]
225    fn test_walk_empty_dir() {
226        let dir = TempDir::new().unwrap();
227        let walker = ParallelWalker::default();
228        let registry = Arc::new(crate::language::LanguageRegistry::empty());
229        let analyzer = Arc::new(FileAnalyzer::new(
230            registry,
231            &crate::config::Config::default(),
232        ));
233        let filter = Arc::new(AllowAll);
234
235        let count = AtomicUsize::new(0);
236        walker
237            .walk_and_analyze(
238                dir.path(),
239                analyzer,
240                filter,
241                |_| {
242                    count.fetch_add(1, Ordering::SeqCst);
243                },
244                |_| {},
245            )
246            .unwrap();
247
248        assert_eq!(count.load(Ordering::SeqCst), 0);
249    }
250
251    #[test]
252    fn test_walk_many_files_no_deadlock() {
253        // Regression test: >1000 files previously caused deadlock because
254        // the bounded channel (capacity 1000) filled up while run() blocked
255        // waiting for threads, and the consumer only started after run().
256        let dir = TempDir::new().unwrap();
257        let file_count = 1500;
258        for i in 0..file_count {
259            std::fs::write(dir.path().join(format!("file_{i}.rs")), "fn main() {}\n").unwrap();
260        }
261
262        let walker = ParallelWalker::default();
263        let registry = Arc::new(crate::language::LanguageRegistry::with_builtin().unwrap());
264        let analyzer = Arc::new(FileAnalyzer::new(
265            registry,
266            &crate::config::Config::default(),
267        ));
268        let filter = Arc::new(AllowAll);
269
270        let analyzed = AtomicUsize::new(0);
271        walker
272            .walk_and_analyze(
273                dir.path(),
274                analyzer,
275                filter,
276                |_| {
277                    analyzed.fetch_add(1, Ordering::SeqCst);
278                },
279                |_| {},
280            )
281            .unwrap();
282
283        assert_eq!(analyzed.load(Ordering::SeqCst), file_count);
284    }
285}