milli_core/
thread_pool_no_abort.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::Arc;
3
4use rayon::{ThreadPool, ThreadPoolBuilder};
5use thiserror::Error;
6
7/// A rayon ThreadPool wrapper that can catch panics in the pool
8/// and modifies the install function accordingly.
9#[derive(Debug)]
10pub struct ThreadPoolNoAbort {
11    thread_pool: ThreadPool,
12    /// The number of active operations.
13    active_operations: AtomicUsize,
14    /// Set to true if the thread pool catched a panic.
15    pool_catched_panic: Arc<AtomicBool>,
16}
17
18impl ThreadPoolNoAbort {
19    pub fn install<OP, R>(&self, op: OP) -> Result<R, PanicCatched>
20    where
21        OP: FnOnce() -> R + Send,
22        R: Send,
23    {
24        self.active_operations.fetch_add(1, Ordering::Relaxed);
25        let output = self.thread_pool.install(op);
26        self.active_operations.fetch_sub(1, Ordering::Relaxed);
27        // While reseting the pool panic catcher we return an error if we catched one.
28        if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
29            Err(PanicCatched)
30        } else {
31            Ok(output)
32        }
33    }
34
35    pub fn current_num_threads(&self) -> usize {
36        self.thread_pool.current_num_threads()
37    }
38
39    /// The number of active operations.
40    pub fn active_operations(&self) -> usize {
41        self.active_operations.load(Ordering::Relaxed)
42    }
43}
44
45#[derive(Error, Debug)]
46#[error("A panic occured. Read the logs to find more information about it")]
47pub struct PanicCatched;
48
49#[derive(Default)]
50pub struct ThreadPoolNoAbortBuilder(ThreadPoolBuilder);
51
52impl ThreadPoolNoAbortBuilder {
53    pub fn new() -> ThreadPoolNoAbortBuilder {
54        ThreadPoolNoAbortBuilder::default()
55    }
56
57    pub fn thread_name<F>(mut self, closure: F) -> Self
58    where
59        F: FnMut(usize) -> String + 'static,
60    {
61        self.0 = self.0.thread_name(closure);
62        self
63    }
64
65    pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolNoAbortBuilder {
66        self.0 = self.0.num_threads(num_threads);
67        self
68    }
69
70    pub fn build(mut self) -> Result<ThreadPoolNoAbort, rayon::ThreadPoolBuildError> {
71        let pool_catched_panic = Arc::new(AtomicBool::new(false));
72        self.0 = self.0.panic_handler({
73            let catched_panic = pool_catched_panic.clone();
74            move |_result| catched_panic.store(true, Ordering::SeqCst)
75        });
76        Ok(ThreadPoolNoAbort {
77            thread_pool: self.0.build()?,
78            active_operations: AtomicUsize::new(0),
79            pool_catched_panic,
80        })
81    }
82}