egglog_concurrency/
notification.rs

1//! A simple concurrent notification object, based on `absl::Notificiation` from
2//! the absl C++ library.
3
4use std::{
5    sync::{
6        Condvar, Mutex,
7        atomic::{AtomicBool, Ordering},
8    },
9    time::Duration,
10};
11
12/// A simple concurrent notification object, based on `absl::Notificiation` from
13/// the absl library. Notifications happen at most once (with future
14/// notifications being no-ops). Waiting threads can block, optionally with a
15/// timeout.
16pub struct Notification {
17    has_been_notified: AtomicBool,
18    mutex: Mutex<()>,
19    cv: Condvar,
20}
21
22impl Default for Notification {
23    fn default() -> Self {
24        Self {
25            has_been_notified: AtomicBool::new(false),
26            mutex: Mutex::new(()),
27            cv: Condvar::new(),
28        }
29    }
30}
31
32impl Drop for Notification {
33    fn drop(&mut self) {
34        // From absl: want to ensure that a thread running `notify` exits before
35        // the object is destroyed.
36        let _guard = self.mutex.lock();
37    }
38}
39
40impl Notification {
41    /// Create a fresh notification.
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Block until `notify` is called.
47    pub fn wait(&self) {
48        if self.has_been_notified() {
49            return;
50        }
51        let mut lock = self.mutex.lock().unwrap();
52        while !self.has_been_notified() {
53            lock = self.cv.wait(lock).unwrap();
54        }
55    }
56
57    pub fn wait_with_timeout(&self, timeout: Duration) -> bool {
58        if self.has_been_notified() {
59            return true;
60        }
61        let mut lock = self.mutex.lock().unwrap();
62        while !self.has_been_notified() {
63            let (next, result) = self.cv.wait_timeout(lock, timeout).unwrap();
64            if result.timed_out() {
65                return false;
66            }
67            lock = next;
68        }
69        self.has_been_notified()
70    }
71
72    /// Notify all threads waiting on this notification, and unblock any future
73    /// threads who may wait.
74    pub fn notify(&self) {
75        let _guard = self.mutex.lock().unwrap();
76        self.has_been_notified.store(true, Ordering::Release);
77        self.cv.notify_all();
78    }
79
80    /// Query whether this notification has been notified, without blocking.
81    pub fn has_been_notified(&self) -> bool {
82        self.has_been_notified.load(Ordering::Acquire)
83    }
84}