Skip to main content

chainsaw/
walker.rs

1//! Concurrent dependency graph construction.
2//!
3//! Starting from an entry file, discovers all reachable modules by parsing
4//! imports and resolving them against the filesystem in parallel using a
5//! lock-free work queue and rayon thread pool.
6
7use std::collections::HashSet;
8use std::fs::{self, File};
9use std::io::Read;
10use std::path::{Path, PathBuf};
11use std::sync::Mutex;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::time::SystemTime;
14
15use crossbeam_queue::SegQueue;
16use dashmap::DashSet;
17use rayon::slice::ParallelSliceMut;
18
19use crate::cache::ParseCache;
20use crate::graph::ModuleGraph;
21use crate::lang::{LanguageSupport, RawImport};
22
23fn is_parseable(path: &Path, extensions: &[&str]) -> bool {
24    path.extension()
25        .and_then(|e| e.to_str())
26        .is_some_and(|ext| extensions.contains(&ext))
27}
28
29/// Result of discovering a single file during concurrent traversal.
30struct FileResult {
31    path: PathBuf,
32    size: u64,
33    /// File modification time captured during read (avoids re-stat in cache insert).
34    mtime_nanos: Option<u128>,
35    package: Option<String>,
36    imports: Vec<(RawImport, Option<PathBuf>)>,
37    unresolvable_dynamic: usize,
38}
39
40struct DiscoverResult {
41    files: Vec<FileResult>,
42    warnings: Vec<String>,
43}
44
45/// Phase 1: Concurrent file discovery using a lock-free work queue.
46/// Returns all discovered files with their parsed imports and resolved paths.
47#[allow(clippy::too_many_lines)]
48fn concurrent_discover(entry: &Path, root: &Path, lang: &dyn LanguageSupport) -> DiscoverResult {
49    let queue: SegQueue<PathBuf> = SegQueue::new();
50    let seen: DashSet<PathBuf> = DashSet::new();
51    let results: Mutex<Vec<FileResult>> = Mutex::new(Vec::new());
52    let warnings: SegQueue<String> = SegQueue::new();
53    let active = AtomicUsize::new(1); // entry file is active
54    let extensions = lang.extensions();
55
56    queue.push(entry.to_path_buf());
57    seen.insert(entry.to_path_buf());
58
59    rayon::scope(|s| {
60        for _ in 0..rayon::current_num_threads() {
61            s.spawn(|_| {
62                let mut spin_count: u32 = 0;
63                loop {
64                    if let Some(path) = queue.pop() {
65                        spin_count = 0;
66                        // Open + fstat + read in one pass (3 syscalls, not 4)
67                        let mut file = match File::open(&path) {
68                            Ok(f) => f,
69                            Err(e) => {
70                                warnings.push(format!("{}: {e}", path.display()));
71                                active.fetch_sub(1, Ordering::AcqRel);
72                                continue;
73                            }
74                        };
75                        let meta = match file.metadata() {
76                            Ok(m) => m,
77                            Err(e) => {
78                                warnings.push(format!("{}: {e}", path.display()));
79                                active.fetch_sub(1, Ordering::AcqRel);
80                                continue;
81                            }
82                        };
83                        let mtime_nanos = meta
84                            .modified()
85                            .ok()
86                            .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
87                            .map(|d| d.as_nanos());
88                        let size = meta.len();
89                        #[allow(clippy::cast_possible_truncation)]
90                        let mut source = String::with_capacity(size as usize + 1);
91                        if let Err(e) = file.read_to_string(&mut source) {
92                            warnings.push(format!("{}: {e}", path.display()));
93                            active.fetch_sub(1, Ordering::AcqRel);
94                            continue;
95                        }
96
97                        let result = match lang.parse(&path, &source) {
98                            Ok(r) => r,
99                            Err(e) => {
100                                warnings.push(e.to_string());
101                                active.fetch_sub(1, Ordering::AcqRel);
102                                continue;
103                            }
104                        };
105                        let package = if path == entry {
106                            None
107                        } else {
108                            lang.package_name(&path)
109                                .or_else(|| lang.workspace_package_name(&path, root))
110                        };
111
112                        // Resolve imports and discover new files
113                        #[allow(clippy::or_fun_call)]
114                        let dir = path.parent().unwrap_or(Path::new("."));
115                        let imports: Vec<(RawImport, Option<PathBuf>)> = result
116                            .imports
117                            .into_iter()
118                            .map(|imp| {
119                                let resolved = lang.resolve(dir, &imp.specifier);
120                                if let Some(ref p) = resolved
121                                    && is_parseable(p, extensions)
122                                    && seen.insert(p.clone())
123                                {
124                                    active.fetch_add(1, Ordering::AcqRel);
125                                    queue.push(p.clone());
126                                }
127                                (imp, resolved)
128                            })
129                            .collect();
130
131                        let file_result = FileResult {
132                            path,
133                            size,
134                            mtime_nanos,
135                            package,
136                            imports,
137                            unresolvable_dynamic: result.unresolvable_dynamic,
138                        };
139                        results.lock().unwrap().push(file_result);
140
141                        if active.fetch_sub(1, Ordering::AcqRel) == 1 {
142                            // This was the last active item; all work is done
143                            return;
144                        }
145                    } else if active.load(Ordering::Acquire) == 0 {
146                        return;
147                    } else if spin_count < 64 {
148                        spin_count += 1;
149                        std::hint::spin_loop();
150                    } else {
151                        spin_count = 0;
152                        std::thread::yield_now();
153                    }
154                }
155            });
156        }
157    });
158
159    let mut files = results.into_inner().unwrap();
160    files.par_sort_unstable_by(|a, b| a.path.cmp(&b.path));
161    let warnings = std::iter::from_fn(|| warnings.pop()).collect();
162    DiscoverResult { files, warnings }
163}
164
165/// Result of building a module graph.
166#[derive(Debug)]
167#[non_exhaustive]
168pub struct BuildResult {
169    pub graph: ModuleGraph,
170    /// Files containing dynamic imports with non-literal arguments, with counts.
171    pub unresolvable_dynamic: Vec<(PathBuf, usize)>,
172    /// Import specifiers that failed to resolve (for cache invalidation).
173    pub unresolved_specifiers: Vec<String>,
174    /// Warnings from files that could not be opened, read, or parsed.
175    pub file_warnings: Vec<String>,
176}
177
178/// Build a complete `ModuleGraph` from the given entry point.
179/// Phase 1 concurrently discovers files using a lock-free work queue.
180/// Phase 2 serially constructs the graph from sorted discovery results.
181pub fn build_graph(
182    entry: &Path,
183    root: &Path,
184    lang: &dyn LanguageSupport,
185    cache: &mut ParseCache,
186) -> BuildResult {
187    // Phase 1: Concurrent discovery (lock-free work queue)
188    let discovered = concurrent_discover(entry, root, lang);
189    let file_results = discovered.files;
190
191    // Phase 2: Serial graph construction from sorted results
192    let mut graph = ModuleGraph::new();
193    let mut unresolvable_files: Vec<(PathBuf, usize)> = Vec::new();
194    let mut unresolved: HashSet<String> = HashSet::new();
195
196    // First pass: add all modules (deterministic order from sorted results)
197    for fr in &file_results {
198        graph.add_module(fr.path.clone(), fr.size, fr.package.clone());
199    }
200
201    // Second pass: add edges, collect diagnostics, and populate parse cache.
202    // Consumes file_results by value to avoid redundant clones.
203    for fr in file_results {
204        let source_id = graph.path_to_id[&fr.path];
205
206        if fr.unresolvable_dynamic > 0 {
207            unresolvable_files.push((fr.path.clone(), fr.unresolvable_dynamic));
208        }
209
210        // Separate imports into raw imports and resolved paths.
211        // Edge processing borrows from these; cache takes ownership.
212        let (raw_imports, resolved_paths): (Vec<RawImport>, Vec<Option<PathBuf>>) =
213            fr.imports.into_iter().unzip();
214
215        for (raw_import, resolved_path) in raw_imports.iter().zip(resolved_paths.iter()) {
216            match resolved_path {
217                Some(p) => {
218                    if let Some(&target_id) = graph.path_to_id.get(p) {
219                        graph.add_edge(
220                            source_id,
221                            target_id,
222                            raw_import.kind,
223                            &raw_import.specifier,
224                        );
225                    }
226                    // Target not in graph = unparseable leaf (e.g. .json, .css)
227                    // Add it as a leaf module
228                    else {
229                        let size = fs::metadata(p).map(|m| m.len()).unwrap_or(0);
230                        let package = lang
231                            .package_name(p)
232                            .or_else(|| lang.workspace_package_name(p, root));
233                        let target_id = graph.add_module(p.clone(), size, package);
234                        graph.add_edge(
235                            source_id,
236                            target_id,
237                            raw_import.kind,
238                            &raw_import.specifier,
239                        );
240                    }
241                }
242                None => {
243                    unresolved.insert(raw_import.specifier.clone());
244                }
245            }
246        }
247
248        let result = crate::lang::ParseResult {
249            imports: raw_imports,
250            unresolvable_dynamic: fr.unresolvable_dynamic,
251        };
252        if let Some(mtime) = fr.mtime_nanos {
253            cache.insert(fr.path, fr.size, mtime, result, resolved_paths);
254        }
255    }
256
257    graph.compute_package_info();
258    BuildResult {
259        graph,
260        unresolvable_dynamic: unresolvable_files,
261        unresolved_specifiers: unresolved.into_iter().collect(),
262        file_warnings: discovered.warnings,
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::lang::typescript::TypeScriptSupport;
270    use std::fs;
271
272    #[test]
273    fn parse_failure_not_retried() {
274        let tmp = tempfile::tempdir().unwrap();
275        let root = tmp.path().canonicalize().unwrap();
276
277        fs::write(root.join("entry.ts"), r#"import { x } from "./broken";"#).unwrap();
278
279        fs::write(root.join("broken.ts"), [0xFF, 0xFE, 0x00, 0x01]).unwrap();
280
281        let lang = TypeScriptSupport::new(&root);
282        let mut cache = ParseCache::new();
283        let result = build_graph(&root.join("entry.ts"), &root, &lang, &mut cache);
284        let graph = result.graph;
285
286        let entry_count = graph
287            .path_to_id
288            .keys()
289            .filter(|p| p.file_name().is_some_and(|n| n == "broken.ts"))
290            .count();
291        assert!(
292            entry_count <= 1,
293            "broken.ts should appear at most once, found {entry_count}"
294        );
295    }
296}