Skip to main content

aft/inspect/
dispatch.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Instant;
4
5use crossbeam_channel::{unbounded, Receiver, Sender};
6
7use super::job::{InspectCategory, InspectJob, InspectResult};
8
9pub type InspectWorker = Arc<dyn Fn(InspectJob) -> InspectResult + Send + Sync + 'static>;
10
11#[derive(Clone)]
12pub struct DispatchHandles {
13    pub request_tx: Sender<InspectJob>,
14    pub result_rx: Receiver<InspectResult>,
15    pub pool: Arc<rayon::ThreadPool>,
16}
17
18pub fn start_dispatch_loop(worker: InspectWorker) -> DispatchHandles {
19    let (request_tx, request_rx) = unbounded::<InspectJob>();
20    let (result_tx, result_rx) = unbounded::<InspectResult>();
21    let pool = Arc::new(
22        rayon::ThreadPoolBuilder::new()
23            .num_threads(default_pool_size())
24            .thread_name(|index| format!("aft-inspect-{index}"))
25            // Rayon defaults workers to ~2MB stacks (vs the main thread's 8MB).
26            // The duplicates scanner walks the AST recursively, and deep trees
27            // (minified bundles, generated code, long chains) previously
28            // overflowed a 2MB worker stack and SIGABRT'd the whole bridge.
29            // Match the main thread's 8MB so the bounded recursion in
30            // collect_fragments (MAX_FRAGMENT_DEPTH) has comfortable headroom.
31            .stack_size(8 * 1024 * 1024)
32            .build()
33            .expect("inspect worker pool must build"),
34    );
35
36    let loop_pool = Arc::clone(&pool);
37    thread::spawn(move || dispatch_loop(request_rx, result_tx, loop_pool, worker));
38
39    DispatchHandles {
40        request_tx,
41        result_rx,
42        pool,
43    }
44}
45
46pub fn default_worker() -> InspectWorker {
47    Arc::new(dispatch_category)
48}
49
50fn dispatch_loop(
51    request_rx: Receiver<InspectJob>,
52    result_tx: Sender<InspectResult>,
53    pool: Arc<rayon::ThreadPool>,
54    worker: InspectWorker,
55) {
56    while let Ok(job) = request_rx.recv() {
57        let tx = result_tx.clone();
58        let worker = Arc::clone(&worker);
59        pool.spawn(move || {
60            let result = worker(job);
61            let _ = tx.send(result);
62        });
63    }
64}
65
66fn dispatch_category(job: InspectJob) -> InspectResult {
67    use crate::inspect::scanners;
68
69    match job.category {
70        InspectCategory::Todos => scanners::todos::run_todos_scan(&job),
71        InspectCategory::Metrics => scanners::metrics::run_metrics_scan(&job),
72        InspectCategory::DeadCode => scanners::dead_code::run_dead_code_scan(&job),
73        InspectCategory::UnusedExports => scanners::unused_exports::run_unused_exports_scan(&job),
74        InspectCategory::Duplicates => scanners::duplicates::run_duplicates_scan(&job),
75        InspectCategory::Diagnostics => {
76            // Diagnostics are backed by the AppContext LSP manager and run via
77            // the serial LSP/status lane in `handle_inspect` — never through
78            // this rayon worker pool. Reaching this arm means a caller routed
79            // Diagnostics into the worker path incorrectly; surface that as a
80            // routing bug instead of a misleading "pending" status.
81            let started = Instant::now();
82            InspectResult::failed(
83                &job,
84                "diagnostics must run on the main thread (run_diagnostics_category), \
85                 not the rayon inspect worker pool",
86                started.elapsed(),
87            )
88        }
89        other => {
90            let started = Instant::now();
91            InspectResult::failed(
92                &job,
93                format!("inspect category '{other}' is not active in v0.33"),
94                started.elapsed(),
95            )
96        }
97    }
98}
99
100fn default_pool_size() -> usize {
101    std::thread::available_parallelism()
102        .map(|parallelism| parallelism.get())
103        .unwrap_or(1)
104        .div_ceil(2)
105        .clamp(1, 8)
106}