1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicU32, AtomicUsize};
use atomic_wait::{wait, wake_all, wake_one};
use crate::concurrent::sync::mutex::MutexGuard;
/// a primitive to signal and wait on a condition
pub struct Condvar {
counter: AtomicU32,
num_waiters: AtomicUsize,
}
impl Condvar {
/// Create a new condition variable
///
/// # Examples
///
/// ```
/// use lib_wc::sync::Condvar;
///
/// let condvar = Condvar::new();
/// ```
pub const fn new() -> Self {
Self {
counter: AtomicU32::new(0),
num_waiters: AtomicUsize::new(0),
}
}
/// Wait on the condition variable until notified.
///
/// This function will atomically unlock the mutex, and then wait for a notification.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use std::time::Duration;
/// use std::sync::Arc;
/// use lib_wc::sync::{Condvar, Mutex};
///
/// let mutex = Arc::new(Mutex::new(0));
/// let condvar = Condvar::new();
/// let mutex = Mutex::new(0);
/// let condvar = Condvar::new();
///
/// let mut wakeups = 0;
///
/// thread::scope(|s| {
/// s.spawn(|| {
/// thread::sleep(Duration::from_nanos(10));
/// *mutex.lock() = 123;
/// condvar.notify_one();
/// });
///
/// let mut m = mutex.lock();
/// while *m < 100 {
/// m = condvar.wait(m);
/// wakeups += 1;
/// }
///
/// assert_eq!(*m, 123);
/// });
///
/// // Check that the main thread actually did wait (not busy-loop),
/// // while still allowing for a few spurious wake ups.
/// assert!(wakeups < 10);
/// ```
pub fn wait<'a, T>(&self, mutex_guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
self.num_waiters.fetch_add(1, Relaxed);
let counter_value = self.counter.load(Relaxed);
// Unlock the mutex by dropping the guard,
// but remember the mutex so we can lock it again later.
let mutex = mutex_guard.mutex;
drop(mutex_guard);
// Wait, but only if the counter hasn't changed since unlocking.
wait(&self.counter, counter_value);
self.num_waiters.fetch_sub(1, Relaxed);
mutex.lock()
}
pub fn notify_one(&self) {
if self.num_waiters.load(Relaxed) > 0 {
self.counter.fetch_add(1, Relaxed);
wake_one(&self.counter)
}
}
pub fn notify_all(&self) {
if self.num_waiters.load(Relaxed) > 0 {
self.counter.fetch_add(1, Relaxed);
wake_all(&self.counter)
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use std::time::Duration;
use quickcheck_macros::quickcheck;
use crate::concurrent::sync::Mutex;
use super::*;
#[quickcheck]
fn test_condvar() {
let mutex = Mutex::new(0);
let condvar = Condvar::new();
let mut wakeups = 0;
thread::scope(|s| {
s.spawn(|| {
thread::sleep(Duration::from_nanos(10));
*mutex.lock() = 123;
condvar.notify_one();
});
let mut m = mutex.lock();
while *m < 100 {
m = condvar.wait(m);
wakeups += 1;
}
assert_eq!(*m, 123);
});
// Check that the main thread actually did wait (not busy-loop),
// while still allowing for a few spurious wake ups.
assert!(wakeups < 10);
}
}