bastion_executor/
sleepers.rs

1//!
2//! Where workers went to parking while no workload is in their worker queue.
3//!
4//! If a workload received pool will wake them up.
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Condvar, Mutex};
7
8/// The place where worker threads go to sleep.
9///
10/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
11/// the next thread that attempts to go to sleep will pick up the notification immediately.
12#[derive(Debug)]
13#[allow(clippy::mutex_atomic)]
14pub struct Sleepers {
15    /// How many threads are currently a sleep.
16    sleep: Mutex<usize>,
17
18    /// A condvar for notifying sleeping threads.
19    wake: Condvar,
20
21    /// Set to `true` if a notification came up while nobody was sleeping.
22    notified: AtomicBool,
23}
24
25#[allow(clippy::mutex_atomic)]
26impl Default for Sleepers {
27    /// Creates a new `Sleepers`.
28    fn default() -> Self {
29        Self {
30            sleep: Mutex::new(0),
31            wake: Condvar::new(),
32            notified: AtomicBool::new(false),
33        }
34    }
35}
36
37#[allow(clippy::mutex_atomic)]
38impl Sleepers {
39    /// Creates a new `Sleepers`.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Puts the current thread to sleep.
45    pub fn wait(&self) {
46        let mut sleep = self.sleep.lock().unwrap();
47
48        if !self.notified.swap(false, Ordering::SeqCst) {
49            *sleep += 1;
50            std::mem::drop(self.wake.wait(sleep).unwrap());
51        }
52    }
53
54    /// Notifies one thread.
55    pub fn notify_one(&self) {
56        if !self.notified.load(Ordering::SeqCst) {
57            let mut sleep = self.sleep.lock().unwrap();
58
59            if *sleep > 0 {
60                *sleep -= 1;
61                self.wake.notify_one();
62            } else {
63                self.notified.store(true, Ordering::SeqCst);
64            }
65        }
66    }
67}