parallel_processor/execution_manager/
notifier.rs1use std::{
2 hint::unreachable_unchecked,
3 sync::atomic::{AtomicUsize, Ordering},
4};
5
6use parking_lot_core::{DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
7
8pub struct Notifier {
9 status: AtomicUsize,
10}
11
12impl Notifier {
13 const EMPTY: usize = 0;
14 const WAITING: usize = 1;
15
16 pub const fn new() -> Self {
17 Self {
18 status: AtomicUsize::new(Self::EMPTY),
19 }
20 }
21
22 pub fn notify_one(&self) {
23 if self.status.load(Ordering::SeqCst) == Self::WAITING {
24 let key = &self.status as *const AtomicUsize as usize;
25 unsafe {
26 parking_lot_core::unpark_one(key, |status| {
27 if status.have_more_threads {
28 self.status.store(Self::WAITING, Ordering::SeqCst);
29 } else {
30 self.status.store(Self::EMPTY, Ordering::SeqCst);
31 }
32 DEFAULT_UNPARK_TOKEN
33 });
34 }
35 }
36 }
37
38 pub fn notify_all(&self) {
39 let key = &self.status as *const AtomicUsize as usize;
40 unsafe {
41 parking_lot_core::unpark_all(key, DEFAULT_UNPARK_TOKEN);
42 }
43 }
44
45 #[inline(always)]
46 pub fn wait_for_condition(&self, mut checker: impl FnMut() -> bool) {
47 while !checker() {
48 if self.wait_for_condition_slow(&mut checker) {
49 return;
50 }
51 }
52 }
53
54 #[cold]
55 #[inline(never)]
56 fn wait_for_condition_slow(&self, checker: &mut impl FnMut() -> bool) -> bool {
57 let key = &self.status as *const AtomicUsize as usize;
58
59 self.status.store(Self::WAITING, Ordering::SeqCst);
60
61 let mut condition_ok = false;
62 match unsafe {
63 parking_lot_core::park(
64 key,
65 || {
66 condition_ok = checker();
67 !condition_ok && self.status.load(Ordering::SeqCst) == Self::WAITING
68 },
69 || {},
70 |_, _| {},
71 DEFAULT_PARK_TOKEN,
72 None,
73 )
74 } {
75 parking_lot_core::ParkResult::Unparked(_) => {
76 false
78 }
79 parking_lot_core::ParkResult::Invalid => {
80 condition_ok
82 }
83 parking_lot_core::ParkResult::TimedOut => unsafe { unreachable_unchecked() },
84 }
85 }
86}