Skip to main content

chainsaw/
walker.rs

1//! Concurrent dependency graph construction.
2//!
3//! Starting from an entry file, discovers all reachable modules by parsing
4//! imports and resolving them against the filesystem in parallel using a
5//! lock-free work queue and rayon thread pool.
6
7use std::collections::HashSet;
8use std::fs::{self, File};
9use std::io::Read;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::Mutex;
13use std::time::SystemTime;
14
15use crossbeam_queue::SegQueue;
16use dashmap::DashSet;
17use rayon::slice::ParallelSliceMut;
18
19use crate::cache::ParseCache;
20use crate::graph::ModuleGraph;
21use crate::lang::{LanguageSupport, RawImport};
22
23fn is_parseable(path: &Path, extensions: &[&str]) -> bool {
24    path.extension()
25        .and_then(|e| e.to_str())
26        .is_some_and(|ext| extensions.contains(&ext))
27}
28
29/// Result of discovering a single file during concurrent traversal.
30struct FileResult {
31    path: PathBuf,
32    size: u64,
33    /// File modification time captured during read (avoids re-stat in cache insert).
34    mtime_nanos: Option<u128>,
35    package: Option<String>,
36    imports: Vec<(RawImport, Option<PathBuf>)>,
37    unresolvable_dynamic: usize,
38}
39
40struct DiscoverResult {
41    files: Vec<FileResult>,
42    warnings: Vec<String>,
43}
44
45/// Phase 1: Concurrent file discovery using a lock-free work queue.
46/// Returns all discovered files with their parsed imports and resolved paths.
47#[allow(clippy::too_many_lines)]
48fn concurrent_discover(
49    entry: &Path,
50    root: &Path,
51    lang: &dyn LanguageSupport,
52) -> DiscoverResult {
53    let queue: SegQueue<PathBuf> = SegQueue::new();
54    let seen: DashSet<PathBuf> = DashSet::new();
55    let results: Mutex<Vec<FileResult>> = Mutex::new(Vec::new());
56    let warnings: SegQueue<String> = SegQueue::new();
57    let active = AtomicUsize::new(1); // entry file is active
58    let extensions = lang.extensions();
59
60    queue.push(entry.to_path_buf());
61    seen.insert(entry.to_path_buf());
62
63    rayon::scope(|s| {
64        for _ in 0..rayon::current_num_threads() {
65            s.spawn(|_| {
66                let mut spin_count: u32 = 0;
67                loop {
68                    if let Some(path) = queue.pop() {
69                        spin_count = 0;
70                        // Open + fstat + read in one pass (3 syscalls, not 4)
71                        let mut file = match File::open(&path) {
72                            Ok(f) => f,
73                            Err(e) => {
74                                warnings.push(format!("{}: {e}", path.display()));
75                                active.fetch_sub(1, Ordering::AcqRel);
76                                continue;
77                            }
78                        };
79                        let meta = match file.metadata() {
80                            Ok(m) => m,
81                            Err(e) => {
82                                warnings.push(format!("{}: {e}", path.display()));
83                                active.fetch_sub(1, Ordering::AcqRel);
84                                continue;
85                            }
86                        };
87                        let mtime_nanos = meta.modified().ok()
88                            .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
89                            .map(|d| d.as_nanos());
90                        let size = meta.len();
91                        #[allow(clippy::cast_possible_truncation)]
92                        let mut source = String::with_capacity(size as usize + 1);
93                        if let Err(e) = file.read_to_string(&mut source) {
94                            warnings.push(format!("{}: {e}", path.display()));
95                            active.fetch_sub(1, Ordering::AcqRel);
96                            continue;
97                        }
98
99                        let result = match lang.parse(&path, &source) {
100                            Ok(r) => r,
101                            Err(e) => {
102                                warnings.push(e.to_string());
103                                active.fetch_sub(1, Ordering::AcqRel);
104                                continue;
105                            }
106                        };
107                        let package = if path == entry {
108                            None
109                        } else {
110                            lang.package_name(&path)
111                                .or_else(|| lang.workspace_package_name(&path, root))
112                        };
113
114                        // Resolve imports and discover new files
115                        #[allow(clippy::or_fun_call)]
116                        let dir = path.parent().unwrap_or(Path::new("."));
117                        let imports: Vec<(RawImport, Option<PathBuf>)> = result
118                            .imports
119                            .into_iter()
120                            .map(|imp| {
121                                let resolved = lang.resolve(dir, &imp.specifier);
122                                if let Some(ref p) = resolved
123                                    && is_parseable(p, extensions)
124                                    && seen.insert(p.clone())
125                                {
126                                    active.fetch_add(1, Ordering::AcqRel);
127                                    queue.push(p.clone());
128                                }
129                                (imp, resolved)
130                            })
131                            .collect();
132
133                        let file_result = FileResult {
134                            path,
135                            size,
136                            mtime_nanos,
137                            package,
138                            imports,
139                            unresolvable_dynamic: result.unresolvable_dynamic,
140                        };
141                        results.lock().unwrap().push(file_result);
142
143                        if active.fetch_sub(1, Ordering::AcqRel) == 1 {
144                            // This was the last active item; all work is done
145                            return;
146                        }
147                    } else if active.load(Ordering::Acquire) == 0 {
148                        return;
149                    } else if spin_count < 64 {
150                        spin_count += 1;
151                        std::hint::spin_loop();
152                    } else {
153                        spin_count = 0;
154                        std::thread::yield_now();
155                    }
156                }
157            });
158        }
159    });
160
161    let mut files = results.into_inner().unwrap();
162    files.par_sort_unstable_by(|a, b| a.path.cmp(&b.path));
163    let warnings = std::iter::from_fn(|| warnings.pop()).collect();
164    DiscoverResult { files, warnings }
165}
166
167/// Result of building a module graph.
168#[derive(Debug)]
169#[non_exhaustive]
170pub struct BuildResult {
171    pub graph: ModuleGraph,
172    /// Files containing dynamic imports with non-literal arguments, with counts.
173    pub unresolvable_dynamic: Vec<(PathBuf, usize)>,
174    /// Import specifiers that failed to resolve (for cache invalidation).
175    pub unresolved_specifiers: Vec<String>,
176    /// Warnings from files that could not be opened, read, or parsed.
177    pub file_warnings: Vec<String>,
178}
179
180/// Build a complete `ModuleGraph` from the given entry point.
181/// Phase 1 concurrently discovers files using a lock-free work queue.
182/// Phase 2 serially constructs the graph from sorted discovery results.
183pub fn build_graph(
184    entry: &Path,
185    root: &Path,
186    lang: &dyn LanguageSupport,
187    cache: &mut ParseCache,
188) -> BuildResult {
189    // Phase 1: Concurrent discovery (lock-free work queue)
190    let discovered = concurrent_discover(entry, root, lang);
191    let file_results = discovered.files;
192
193    // Phase 2: Serial graph construction from sorted results
194    let mut graph = ModuleGraph::new();
195    let mut unresolvable_files: Vec<(PathBuf, usize)> = Vec::new();
196    let mut unresolved: HashSet<String> = HashSet::new();
197
198    // First pass: add all modules (deterministic order from sorted results)
199    for fr in &file_results {
200        graph.add_module(fr.path.clone(), fr.size, fr.package.clone());
201    }
202
203    // Second pass: add edges, collect diagnostics, and populate parse cache.
204    // Consumes file_results by value to avoid redundant clones.
205    for fr in file_results {
206        let source_id = graph.path_to_id[&fr.path];
207
208        if fr.unresolvable_dynamic > 0 {
209            unresolvable_files.push((fr.path.clone(), fr.unresolvable_dynamic));
210        }
211
212        // Separate imports into raw imports and resolved paths.
213        // Edge processing borrows from these; cache takes ownership.
214        let (raw_imports, resolved_paths): (Vec<RawImport>, Vec<Option<PathBuf>>) =
215            fr.imports.into_iter().unzip();
216
217        for (raw_import, resolved_path) in raw_imports.iter().zip(resolved_paths.iter()) {
218            match resolved_path {
219                Some(p) => {
220                    if let Some(&target_id) = graph.path_to_id.get(p) {
221                        graph.add_edge(
222                            source_id,
223                            target_id,
224                            raw_import.kind,
225                            &raw_import.specifier,
226                        );
227                    }
228                    // Target not in graph = unparseable leaf (e.g. .json, .css)
229                    // Add it as a leaf module
230                    else {
231                        let size = fs::metadata(p).map(|m| m.len()).unwrap_or(0);
232                        let package = lang
233                            .package_name(p)
234                            .or_else(|| lang.workspace_package_name(p, root));
235                        let target_id = graph.add_module(p.clone(), size, package);
236                        graph.add_edge(
237                            source_id,
238                            target_id,
239                            raw_import.kind,
240                            &raw_import.specifier,
241                        );
242                    }
243                }
244                None => {
245                    unresolved.insert(raw_import.specifier.clone());
246                }
247            }
248        }
249
250        let result = crate::lang::ParseResult {
251            imports: raw_imports,
252            unresolvable_dynamic: fr.unresolvable_dynamic,
253        };
254        if let Some(mtime) = fr.mtime_nanos {
255            cache.insert(fr.path, fr.size, mtime, result, resolved_paths);
256        }
257    }
258
259    graph.compute_package_info();
260    BuildResult {
261        graph,
262        unresolvable_dynamic: unresolvable_files,
263        unresolved_specifiers: unresolved.into_iter().collect(),
264        file_warnings: discovered.warnings,
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use crate::lang::typescript::TypeScriptSupport;
272    use std::fs;
273
274    #[test]
275    fn parse_failure_not_retried() {
276        let tmp = tempfile::tempdir().unwrap();
277        let root = tmp.path().canonicalize().unwrap();
278
279        fs::write(
280            root.join("entry.ts"),
281            r#"import { x } from "./broken";"#,
282        )
283        .unwrap();
284
285        fs::write(root.join("broken.ts"), [0xFF, 0xFE, 0x00, 0x01]).unwrap();
286
287        let lang = TypeScriptSupport::new(&root);
288        let mut cache = ParseCache::new();
289        let result = build_graph(&root.join("entry.ts"), &root, &lang, &mut cache);
290        let graph = result.graph;
291
292        let entry_count = graph
293            .path_to_id
294            .keys()
295            .filter(|p| p.file_name().is_some_and(|n| n == "broken.ts"))
296            .count();
297        assert!(
298            entry_count <= 1,
299            "broken.ts should appear at most once, found {entry_count}"
300        );
301    }
302}