1use std::sync::Arc;
2use std::thread;
3use std::time::Instant;
4
5use crossbeam_channel::{unbounded, Receiver, Sender};
6
7use super::job::{InspectCategory, InspectJob, InspectResult};
8
9pub type InspectWorker = Arc<dyn Fn(InspectJob) -> InspectResult + Send + Sync + 'static>;
10
11#[derive(Clone)]
12pub struct DispatchHandles {
13 pub request_tx: Sender<InspectJob>,
14 pub result_rx: Receiver<InspectResult>,
15 pub pool: Arc<rayon::ThreadPool>,
16}
17
18pub fn start_dispatch_loop(worker: InspectWorker) -> DispatchHandles {
19 let (request_tx, request_rx) = unbounded::<InspectJob>();
20 let (result_tx, result_rx) = unbounded::<InspectResult>();
21 let pool = Arc::new(
22 rayon::ThreadPoolBuilder::new()
23 .num_threads(default_pool_size())
24 .thread_name(|index| format!("aft-inspect-{index}"))
25 .stack_size(8 * 1024 * 1024)
32 .build()
33 .expect("inspect worker pool must build"),
34 );
35
36 let loop_pool = Arc::clone(&pool);
37 thread::spawn(move || dispatch_loop(request_rx, result_tx, loop_pool, worker));
38
39 DispatchHandles {
40 request_tx,
41 result_rx,
42 pool,
43 }
44}
45
46pub fn default_worker() -> InspectWorker {
47 Arc::new(dispatch_category)
48}
49
50fn dispatch_loop(
51 request_rx: Receiver<InspectJob>,
52 result_tx: Sender<InspectResult>,
53 pool: Arc<rayon::ThreadPool>,
54 worker: InspectWorker,
55) {
56 while let Ok(job) = request_rx.recv() {
57 let tx = result_tx.clone();
58 let worker = Arc::clone(&worker);
59 pool.spawn(move || {
60 let result = worker(job);
61 let _ = tx.send(result);
62 });
63 }
64}
65
66fn dispatch_category(job: InspectJob) -> InspectResult {
67 use crate::inspect::scanners;
68
69 match job.category {
70 InspectCategory::Todos => scanners::todos::run_todos_scan(&job),
71 InspectCategory::Metrics => scanners::metrics::run_metrics_scan(&job),
72 InspectCategory::DeadCode => scanners::dead_code::run_dead_code_scan(&job),
73 InspectCategory::UnusedExports => scanners::unused_exports::run_unused_exports_scan(&job),
74 InspectCategory::Duplicates => scanners::duplicates::run_duplicates_scan(&job),
75 InspectCategory::Diagnostics => {
76 let started = Instant::now();
83 InspectResult::failed(
84 &job,
85 "diagnostics must run on the main thread (run_diagnostics_category), \
86 not the rayon inspect worker pool",
87 started.elapsed(),
88 )
89 }
90 other => {
91 let started = Instant::now();
92 InspectResult::failed(
93 &job,
94 format!("inspect category '{other}' is not active in v0.33"),
95 started.elapsed(),
96 )
97 }
98 }
99}
100
101fn default_pool_size() -> usize {
102 std::thread::available_parallelism()
103 .map(|parallelism| parallelism.get())
104 .unwrap_or(1)
105 .div_ceil(2)
106 .clamp(1, 8)
107}