Skip to main content

chainsaw/
walker.rs

1//! Concurrent dependency graph construction.
2//!
3//! Starting from an entry file, discovers all reachable modules by parsing
4//! imports and resolving them against the filesystem in parallel using a
5//! lock-free work queue and rayon thread pool.
6
7use std::collections::HashSet;
8use std::path::{Path, PathBuf};
9use std::sync::Mutex;
10use std::sync::atomic::{AtomicUsize, Ordering};
11
12use crossbeam_queue::SegQueue;
13use dashmap::DashSet;
14use rayon::slice::ParallelSliceMut;
15
16use crate::cache::ParseCache;
17use crate::graph::ModuleGraph;
18use crate::lang::{LanguageSupport, RawImport};
19use crate::vfs::Vfs;
20
21fn is_parseable(path: &Path, extensions: &[&str]) -> bool {
22    path.extension()
23        .and_then(|e| e.to_str())
24        .is_some_and(|ext| extensions.contains(&ext))
25}
26
27/// Result of discovering a single file during concurrent traversal.
28struct FileResult {
29    path: PathBuf,
30    size: u64,
31    /// File modification time captured during read (avoids re-stat in cache insert).
32    mtime_nanos: Option<u128>,
33    package: Option<String>,
34    imports: Vec<(RawImport, Option<PathBuf>)>,
35    unresolvable_dynamic: usize,
36}
37
38struct DiscoverResult {
39    files: Vec<FileResult>,
40    warnings: Vec<String>,
41}
42
43/// Phase 1: Concurrent file discovery using a lock-free work queue.
44/// Returns all discovered files with their parsed imports and resolved paths.
45#[allow(clippy::too_many_lines)]
46fn concurrent_discover(
47    entry: &Path,
48    root: &Path,
49    lang: &dyn LanguageSupport,
50    vfs: &dyn Vfs,
51) -> DiscoverResult {
52    let queue: SegQueue<PathBuf> = SegQueue::new();
53    let seen: DashSet<PathBuf> = DashSet::new();
54    let results: Mutex<Vec<FileResult>> = Mutex::new(Vec::new());
55    let warnings: SegQueue<String> = SegQueue::new();
56    let active = AtomicUsize::new(1); // entry file is active
57    let extensions = lang.extensions();
58
59    queue.push(entry.to_path_buf());
60    seen.insert(entry.to_path_buf());
61
62    rayon::scope(|s| {
63        for _ in 0..rayon::current_num_threads() {
64            s.spawn(|_| {
65                let mut spin_count: u32 = 0;
66                loop {
67                    if let Some(path) = queue.pop() {
68                        spin_count = 0;
69                        let (source, meta) = match vfs.read_with_metadata(&path) {
70                            Ok(r) => r,
71                            Err(e) => {
72                                warnings.push(format!("{}: {e}", path.display()));
73                                active.fetch_sub(1, Ordering::AcqRel);
74                                continue;
75                            }
76                        };
77                        let mtime_nanos = meta.mtime_nanos;
78                        let size = meta.len;
79
80                        let result = match lang.parse(&path, &source) {
81                            Ok(r) => r,
82                            Err(e) => {
83                                warnings.push(e.to_string());
84                                active.fetch_sub(1, Ordering::AcqRel);
85                                continue;
86                            }
87                        };
88                        let package = if path == entry {
89                            None
90                        } else {
91                            lang.package_name(&path)
92                                .or_else(|| lang.workspace_package_name(&path, root))
93                        };
94
95                        // Resolve imports and discover new files
96                        #[allow(clippy::or_fun_call)]
97                        let dir = path.parent().unwrap_or(Path::new("."));
98                        let imports: Vec<(RawImport, Option<PathBuf>)> = result
99                            .imports
100                            .into_iter()
101                            .map(|imp| {
102                                let resolved = lang.resolve(dir, &imp.specifier);
103                                if let Some(ref p) = resolved
104                                    && is_parseable(p, extensions)
105                                    && seen.insert(p.clone())
106                                {
107                                    active.fetch_add(1, Ordering::AcqRel);
108                                    queue.push(p.clone());
109                                }
110                                (imp, resolved)
111                            })
112                            .collect();
113
114                        let file_result = FileResult {
115                            path,
116                            size,
117                            mtime_nanos,
118                            package,
119                            imports,
120                            unresolvable_dynamic: result.unresolvable_dynamic,
121                        };
122                        results
123                            .lock()
124                            .expect("results mutex not poisoned")
125                            .push(file_result);
126
127                        if active.fetch_sub(1, Ordering::AcqRel) == 1 {
128                            // This was the last active item; all work is done
129                            return;
130                        }
131                    } else if active.load(Ordering::Acquire) == 0 {
132                        return;
133                    } else if spin_count < 64 {
134                        spin_count += 1;
135                        std::hint::spin_loop();
136                    } else {
137                        spin_count = 0;
138                        std::thread::yield_now();
139                    }
140                }
141            });
142        }
143    });
144
145    let mut files = results.into_inner().expect("results mutex not poisoned");
146    files.par_sort_unstable_by(|a, b| a.path.cmp(&b.path));
147    let warnings = std::iter::from_fn(|| warnings.pop()).collect();
148    DiscoverResult { files, warnings }
149}
150
151/// Result of building a module graph.
152#[derive(Debug)]
153#[non_exhaustive]
154pub struct BuildResult {
155    pub graph: ModuleGraph,
156    /// Files containing dynamic imports with non-literal arguments, with counts.
157    pub unresolvable_dynamic: Vec<(PathBuf, usize)>,
158    /// Import specifiers that failed to resolve (for cache invalidation).
159    pub unresolved_specifiers: Vec<String>,
160    /// Warnings from files that could not be opened, read, or parsed.
161    pub file_warnings: Vec<String>,
162}
163
164/// Build a complete `ModuleGraph` from the given entry point.
165/// Phase 1 concurrently discovers files using a lock-free work queue.
166/// Phase 2 serially constructs the graph from sorted discovery results.
167pub fn build_graph(
168    entry: &Path,
169    root: &Path,
170    lang: &dyn LanguageSupport,
171    cache: &mut ParseCache,
172    vfs: &dyn Vfs,
173) -> BuildResult {
174    // Phase 1: Concurrent discovery (lock-free work queue)
175    let discovered = concurrent_discover(entry, root, lang, vfs);
176    let file_results = discovered.files;
177
178    // Phase 2: Serial graph construction from sorted results
179    let mut graph = ModuleGraph::new();
180    let mut unresolvable_files: Vec<(PathBuf, usize)> = Vec::new();
181    let mut unresolved: HashSet<String> = HashSet::new();
182
183    // First pass: add all modules (deterministic order from sorted results)
184    for fr in &file_results {
185        graph.add_module(fr.path.clone(), fr.size, fr.package.clone());
186    }
187
188    // Second pass: add edges, collect diagnostics, and populate parse cache.
189    // Consumes file_results by value to avoid redundant clones.
190    for fr in file_results {
191        let source_id = graph.path_to_id[&fr.path];
192
193        if fr.unresolvable_dynamic > 0 {
194            unresolvable_files.push((fr.path.clone(), fr.unresolvable_dynamic));
195        }
196
197        // Separate imports into raw imports and resolved paths.
198        // Edge processing borrows from these; cache takes ownership.
199        let (raw_imports, resolved_paths): (Vec<RawImport>, Vec<Option<PathBuf>>) =
200            fr.imports.into_iter().unzip();
201
202        for (raw_import, resolved_path) in raw_imports.iter().zip(resolved_paths.iter()) {
203            match resolved_path {
204                Some(p) => {
205                    if let Some(&target_id) = graph.path_to_id.get(p) {
206                        graph.add_edge(
207                            source_id,
208                            target_id,
209                            raw_import.kind,
210                            &raw_import.specifier,
211                        );
212                    }
213                    // Target not in graph = unparseable leaf (e.g. .json, .css)
214                    // Add it as a leaf module
215                    else {
216                        let size = vfs.metadata(p).map(|m| m.len).unwrap_or(0);
217                        let package = lang
218                            .package_name(p)
219                            .or_else(|| lang.workspace_package_name(p, root));
220                        let target_id = graph.add_module(p.clone(), size, package);
221                        graph.add_edge(
222                            source_id,
223                            target_id,
224                            raw_import.kind,
225                            &raw_import.specifier,
226                        );
227                    }
228                }
229                None => {
230                    unresolved.insert(raw_import.specifier.clone());
231                }
232            }
233        }
234
235        let result = crate::lang::ParseResult {
236            imports: raw_imports,
237            unresolvable_dynamic: fr.unresolvable_dynamic,
238        };
239        if let Some(mtime) = fr.mtime_nanos {
240            cache.insert(fr.path, fr.size, mtime, result, resolved_paths);
241        }
242    }
243
244    graph.compute_package_info();
245    BuildResult {
246        graph,
247        unresolvable_dynamic: unresolvable_files,
248        unresolved_specifiers: unresolved.into_iter().collect(),
249        file_warnings: discovered.warnings,
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::lang::typescript::TypeScriptSupport;
257    use crate::vfs::OsVfs;
258    use std::fs;
259
260    #[test]
261    fn parse_failure_not_retried() {
262        let tmp = tempfile::tempdir().unwrap();
263        let root = tmp.path().canonicalize().unwrap();
264
265        fs::write(root.join("entry.ts"), r#"import { x } from "./broken";"#).unwrap();
266
267        fs::write(root.join("broken.ts"), [0xFF, 0xFE, 0x00, 0x01]).unwrap();
268
269        let lang = TypeScriptSupport::new(&root);
270        let mut cache = ParseCache::new();
271        let result = build_graph(&root.join("entry.ts"), &root, &lang, &mut cache, &OsVfs);
272        let graph = result.graph;
273
274        let entry_count = graph
275            .path_to_id
276            .keys()
277            .filter(|p| p.file_name().is_some_and(|n| n == "broken.ts"))
278            .count();
279        assert!(
280            entry_count <= 1,
281            "broken.ts should appear at most once, found {entry_count}"
282        );
283    }
284}