oxidd_manager_pointer/
workers.rs1use std::sync::atomic::{AtomicU32, Ordering::Relaxed};
2
3pub struct Workers {
5 pub(crate) pool: rayon::ThreadPool,
6 split_depth: AtomicU32,
7}
8
9impl Workers {
10 pub(crate) fn new(threads: u32) -> Self {
11 let stack_size = std::env::var("OXIDD_STACK_SIZE")
12 .ok()
13 .and_then(|s| s.parse().ok())
14 .unwrap_or(1024 * 1024 * 1024); let pool = rayon::ThreadPoolBuilder::new()
17 .num_threads(threads as usize)
18 .thread_name(|i| format!("oxidd mp {i}")) .stack_size(stack_size)
20 .build()
21 .expect("could not build thread pool");
22 let split_depth = AtomicU32::new(Workers::auto_split_depth(&pool));
23 Self { pool, split_depth }
24 }
25
26 fn auto_split_depth(pool: &rayon::ThreadPool) -> u32 {
27 let threads = pool.current_num_threads();
28 if threads > 1 {
29 (4096 * threads).ilog2()
30 } else {
31 0
32 }
33 }
34}
35
36impl oxidd_core::WorkerPool for Workers {
37 #[inline]
38 fn current_num_threads(&self) -> usize {
39 self.pool.current_num_threads()
40 }
41
42 #[inline(always)]
43 fn split_depth(&self) -> u32 {
44 self.split_depth.load(Relaxed)
45 }
46
47 fn set_split_depth(&self, depth: Option<u32>) {
48 let depth = match depth {
49 Some(d) => d,
50 None => Self::auto_split_depth(&self.pool),
51 };
52 self.split_depth.store(depth, Relaxed);
53 }
54
55 #[inline]
56 fn install<RA: Send>(&self, op: impl FnOnce() -> RA + Send) -> RA {
57 self.pool.install(op)
58 }
59
60 #[inline]
61 fn join<RA: Send, RB: Send>(
62 &self,
63 op_a: impl FnOnce() -> RA + Send,
64 op_b: impl FnOnce() -> RB + Send,
65 ) -> (RA, RB) {
66 self.pool.join(op_a, op_b)
67 }
68
69 #[inline]
70 fn broadcast<RA: Send>(
71 &self,
72 op: impl Fn(oxidd_core::BroadcastContext) -> RA + Sync,
73 ) -> Vec<RA> {
74 self.pool.broadcast(|ctx| {
75 op(oxidd_core::BroadcastContext {
76 index: ctx.index() as u32,
77 num_threads: ctx.num_threads() as u32,
78 })
79 })
80 }
81}