parallel_processor/execution_manager/
notifier.rs

1use std::{
2    hint::unreachable_unchecked,
3    sync::atomic::{AtomicUsize, Ordering},
4};
5
6use parking_lot_core::{DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
7
8pub struct Notifier {
9    status: AtomicUsize,
10}
11
12impl Notifier {
13    const EMPTY: usize = 0;
14    const WAITING: usize = 1;
15
16    pub const fn new() -> Self {
17        Self {
18            status: AtomicUsize::new(Self::EMPTY),
19        }
20    }
21
22    pub fn notify_one(&self) {
23        if self.status.load(Ordering::SeqCst) == Self::WAITING {
24            let key = &self.status as *const AtomicUsize as usize;
25            unsafe {
26                parking_lot_core::unpark_one(key, |status| {
27                    if status.have_more_threads {
28                        self.status.store(Self::WAITING, Ordering::SeqCst);
29                    } else {
30                        self.status.store(Self::EMPTY, Ordering::SeqCst);
31                    }
32                    DEFAULT_UNPARK_TOKEN
33                });
34            }
35        }
36    }
37
38    pub fn notify_all(&self) {
39        let key = &self.status as *const AtomicUsize as usize;
40        unsafe {
41            parking_lot_core::unpark_all(key, DEFAULT_UNPARK_TOKEN);
42        }
43    }
44
45    #[inline(always)]
46    pub fn wait_for_condition(&self, mut checker: impl FnMut() -> bool) {
47        while !checker() {
48            if self.wait_for_condition_slow(&mut checker) {
49                return;
50            }
51        }
52    }
53
54    #[cold]
55    #[inline(never)]
56    fn wait_for_condition_slow(&self, checker: &mut impl FnMut() -> bool) -> bool {
57        let key = &self.status as *const AtomicUsize as usize;
58
59        self.status.store(Self::WAITING, Ordering::SeqCst);
60
61        let mut condition_ok = false;
62        match unsafe {
63            parking_lot_core::park(
64                key,
65                || {
66                    condition_ok = checker();
67                    !condition_ok && self.status.load(Ordering::SeqCst) == Self::WAITING
68                },
69                || {},
70                |_, _| {},
71                DEFAULT_PARK_TOKEN,
72                None,
73            )
74        } {
75            parking_lot_core::ParkResult::Unparked(_) => {
76                // Try to read again
77                false
78            }
79            parking_lot_core::ParkResult::Invalid => {
80                // Condition can be satisfied
81                condition_ok
82            }
83            parking_lot_core::ParkResult::TimedOut => unsafe { unreachable_unchecked() },
84        }
85    }
86}