1use std::sync::Arc;
2use std::thread;
3use std::time::Instant;
4
5use crossbeam_channel::{unbounded, Receiver, Sender};
6
7use super::job::{InspectCategory, InspectJob, InspectResult};
8
9pub type InspectWorker = Arc<dyn Fn(InspectJob) -> InspectResult + Send + Sync + 'static>;
10
11#[derive(Clone)]
12pub struct DispatchHandles {
13 pub request_tx: Sender<InspectJob>,
14 pub result_rx: Receiver<InspectResult>,
15 pub pool: Arc<rayon::ThreadPool>,
16}
17
18pub fn start_dispatch_loop(worker: InspectWorker) -> DispatchHandles {
19 let (request_tx, request_rx) = unbounded::<InspectJob>();
20 let (result_tx, result_rx) = unbounded::<InspectResult>();
21 let pool = Arc::new(
22 rayon::ThreadPoolBuilder::new()
23 .num_threads(default_pool_size())
24 .thread_name(|index| format!("aft-inspect-{index}"))
25 .build()
26 .expect("inspect worker pool must build"),
27 );
28
29 let loop_pool = Arc::clone(&pool);
30 thread::spawn(move || dispatch_loop(request_rx, result_tx, loop_pool, worker));
31
32 DispatchHandles {
33 request_tx,
34 result_rx,
35 pool,
36 }
37}
38
39pub fn default_worker() -> InspectWorker {
40 Arc::new(dispatch_category)
41}
42
43fn dispatch_loop(
44 request_rx: Receiver<InspectJob>,
45 result_tx: Sender<InspectResult>,
46 pool: Arc<rayon::ThreadPool>,
47 worker: InspectWorker,
48) {
49 while let Ok(job) = request_rx.recv() {
50 let tx = result_tx.clone();
51 let worker = Arc::clone(&worker);
52 pool.spawn(move || {
53 let result = worker(job);
54 let _ = tx.send(result);
55 });
56 }
57}
58
59fn dispatch_category(job: InspectJob) -> InspectResult {
60 use crate::inspect::scanners;
61
62 match job.category {
63 InspectCategory::Todos => scanners::todos::run_todos_scan(&job),
64 InspectCategory::Metrics => scanners::metrics::run_metrics_scan(&job),
65 InspectCategory::DeadCode => scanners::dead_code::run_dead_code_scan(&job),
66 InspectCategory::UnusedExports => scanners::unused_exports::run_unused_exports_scan(&job),
67 InspectCategory::Duplicates => scanners::duplicates::run_duplicates_scan(&job),
68 InspectCategory::Diagnostics => {
69 let started = Instant::now();
76 InspectResult::failed(
77 &job,
78 "diagnostics must run on the main thread (run_diagnostics_category), \
79 not the rayon inspect worker pool",
80 started.elapsed(),
81 )
82 }
83 other => {
84 let started = Instant::now();
85 InspectResult::failed(
86 &job,
87 format!("inspect category '{other}' is not active in v0.33"),
88 started.elapsed(),
89 )
90 }
91 }
92}
93
94fn default_pool_size() -> usize {
95 std::thread::available_parallelism()
96 .map(|parallelism| parallelism.get())
97 .unwrap_or(1)
98 .div_ceil(2)
99 .clamp(1, 8)
100}