Skip to main content

aft/inspect/
dispatch.rs

1use std::sync::Arc;
2use std::thread;
3use std::time::Instant;
4
5use crossbeam_channel::{unbounded, Receiver, Sender};
6
7use super::job::{InspectCategory, InspectJob, InspectResult};
8
9pub type InspectWorker = Arc<dyn Fn(InspectJob) -> InspectResult + Send + Sync + 'static>;
10
11#[derive(Clone)]
12pub struct DispatchHandles {
13    pub request_tx: Sender<InspectJob>,
14    pub result_rx: Receiver<InspectResult>,
15    pub pool: Arc<rayon::ThreadPool>,
16}
17
18pub fn start_dispatch_loop(worker: InspectWorker) -> DispatchHandles {
19    let (request_tx, request_rx) = unbounded::<InspectJob>();
20    let (result_tx, result_rx) = unbounded::<InspectResult>();
21    let pool = Arc::new(
22        rayon::ThreadPoolBuilder::new()
23            .num_threads(default_pool_size())
24            .thread_name(|index| format!("aft-inspect-{index}"))
25            .build()
26            .expect("inspect worker pool must build"),
27    );
28
29    let loop_pool = Arc::clone(&pool);
30    thread::spawn(move || dispatch_loop(request_rx, result_tx, loop_pool, worker));
31
32    DispatchHandles {
33        request_tx,
34        result_rx,
35        pool,
36    }
37}
38
39pub fn default_worker() -> InspectWorker {
40    Arc::new(dispatch_category)
41}
42
43fn dispatch_loop(
44    request_rx: Receiver<InspectJob>,
45    result_tx: Sender<InspectResult>,
46    pool: Arc<rayon::ThreadPool>,
47    worker: InspectWorker,
48) {
49    while let Ok(job) = request_rx.recv() {
50        let tx = result_tx.clone();
51        let worker = Arc::clone(&worker);
52        pool.spawn(move || {
53            let result = worker(job);
54            let _ = tx.send(result);
55        });
56    }
57}
58
59fn dispatch_category(job: InspectJob) -> InspectResult {
60    use crate::inspect::scanners;
61
62    match job.category {
63        InspectCategory::Todos => scanners::todos::run_todos_scan(&job),
64        InspectCategory::Metrics => scanners::metrics::run_metrics_scan(&job),
65        InspectCategory::DeadCode => scanners::dead_code::run_dead_code_scan(&job),
66        InspectCategory::UnusedExports => scanners::unused_exports::run_unused_exports_scan(&job),
67        InspectCategory::Duplicates => scanners::duplicates::run_duplicates_scan(&job),
68        InspectCategory::Diagnostics => {
69            // Diagnostics are backed by the AppContext LSP manager (RefCell, not
70            // Send/Sync), so they run on the main thread via
71            // `run_diagnostics_category` in `handle_inspect` — never through this
72            // rayon worker pool. Reaching this arm means a caller routed
73            // Diagnostics into the worker path incorrectly; surface that as a
74            // routing bug instead of a misleading "pending" status.
75            let started = Instant::now();
76            InspectResult::failed(
77                &job,
78                "diagnostics must run on the main thread (run_diagnostics_category), \
79                 not the rayon inspect worker pool",
80                started.elapsed(),
81            )
82        }
83        other => {
84            let started = Instant::now();
85            InspectResult::failed(
86                &job,
87                format!("inspect category '{other}' is not active in v0.33"),
88                started.elapsed(),
89            )
90        }
91    }
92}
93
94fn default_pool_size() -> usize {
95    std::thread::available_parallelism()
96        .map(|parallelism| parallelism.get())
97        .unwrap_or(1)
98        .div_ceil(2)
99        .clamp(1, 8)
100}