oxidd_manager_pointer/
workers.rs

1use std::sync::atomic::{AtomicU32, Ordering::Relaxed};
2
3/// Worker thread pool
4pub struct Workers {
5    pub(crate) pool: rayon::ThreadPool,
6    split_depth: AtomicU32,
7}
8
9impl Workers {
10    pub(crate) fn new(threads: u32) -> Self {
11        let stack_size = std::env::var("OXIDD_STACK_SIZE")
12            .ok()
13            .and_then(|s| s.parse().ok())
14            .unwrap_or(1024 * 1024 * 1024); // default: 1 GiB
15
16        let pool = rayon::ThreadPoolBuilder::new()
17            .num_threads(threads as usize)
18            .thread_name(|i| format!("oxidd mp {i}")) // "mp" for "manager pointer"
19            .stack_size(stack_size)
20            .build()
21            .expect("could not build thread pool");
22        let split_depth = AtomicU32::new(Workers::auto_split_depth(&pool));
23        Self { pool, split_depth }
24    }
25
26    fn auto_split_depth(pool: &rayon::ThreadPool) -> u32 {
27        let threads = pool.current_num_threads();
28        if threads > 1 {
29            (4096 * threads).ilog2()
30        } else {
31            0
32        }
33    }
34}
35
36impl oxidd_core::WorkerPool for Workers {
37    #[inline]
38    fn current_num_threads(&self) -> usize {
39        self.pool.current_num_threads()
40    }
41
42    #[inline(always)]
43    fn split_depth(&self) -> u32 {
44        self.split_depth.load(Relaxed)
45    }
46
47    fn set_split_depth(&self, depth: Option<u32>) {
48        let depth = match depth {
49            Some(d) => d,
50            None => Self::auto_split_depth(&self.pool),
51        };
52        self.split_depth.store(depth, Relaxed);
53    }
54
55    #[inline]
56    fn install<RA: Send>(&self, op: impl FnOnce() -> RA + Send) -> RA {
57        self.pool.install(op)
58    }
59
60    #[inline]
61    fn join<RA: Send, RB: Send>(
62        &self,
63        op_a: impl FnOnce() -> RA + Send,
64        op_b: impl FnOnce() -> RB + Send,
65    ) -> (RA, RB) {
66        self.pool.join(op_a, op_b)
67    }
68
69    #[inline]
70    fn broadcast<RA: Send>(
71        &self,
72        op: impl Fn(oxidd_core::BroadcastContext) -> RA + Sync,
73    ) -> Vec<RA> {
74        self.pool.broadcast(|ctx| {
75            op(oxidd_core::BroadcastContext {
76                index: ctx.index() as u32,
77                num_threads: ctx.num_threads() as u32,
78            })
79        })
80    }
81}