1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicU32, AtomicUsize};

use atomic_wait::{wait, wake_all, wake_one};

use crate::concurrent::sync::mutex::MutexGuard;

/// a primitive to signal and wait on a condition
pub struct Condvar {
    counter: AtomicU32,
    num_waiters: AtomicUsize,
}

impl Condvar {
    /// Create a new condition variable
    ///
    /// # Examples
    ///
    /// ```
    /// use lib_wc::sync::Condvar;
    ///
    /// let condvar = Condvar::new();
    /// ```
    pub const fn new() -> Self {
        Self {
            counter: AtomicU32::new(0),
            num_waiters: AtomicUsize::new(0),
        }
    }

    /// Wait on the condition variable until notified.
    ///
    /// This function will atomically unlock the mutex, and then wait for a notification.
    ///
    /// # Examples
    ///
    /// ```
    ///  use std::thread;
    ///  use std::time::Duration;
    ///  use std::sync::Arc;
    ///  use lib_wc::sync::{Condvar, Mutex};
    ///
    ///  let mutex = Arc::new(Mutex::new(0));
    ///  let condvar = Condvar::new();    
    ///  let mutex = Mutex::new(0);
    ///  let condvar = Condvar::new();
    ///
    ///  let mut wakeups = 0;
    ///
    ///  thread::scope(|s| {
    ///    s.spawn(|| {
    ///       thread::sleep(Duration::from_nanos(10));
    ///       *mutex.lock() = 123;
    ///       condvar.notify_one();
    ///    });
    ///
    ///    let mut m = mutex.lock();
    ///    while *m < 100 {
    ///      m = condvar.wait(m);
    ///      wakeups += 1;
    ///    }
    ///
    ///    assert_eq!(*m, 123);
    ///  });
    ///
    ///  // Check that the main thread actually did wait (not busy-loop),
    ///  // while still allowing for a few spurious wake ups.
    ///  assert!(wakeups < 10);
    /// ```
    pub fn wait<'a, T>(&self, mutex_guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
        self.num_waiters.fetch_add(1, Relaxed);
        let counter_value = self.counter.load(Relaxed);

        // Unlock the mutex by dropping the guard,
        // but remember the mutex so we can lock it again later.
        let mutex = mutex_guard.mutex;
        drop(mutex_guard);

        // Wait, but only if the counter hasn't changed since unlocking.
        wait(&self.counter, counter_value);

        self.num_waiters.fetch_sub(1, Relaxed);

        mutex.lock()
    }

    pub fn notify_one(&self) {
        if self.num_waiters.load(Relaxed) > 0 {
            self.counter.fetch_add(1, Relaxed);
            wake_one(&self.counter)
        }
    }

    pub fn notify_all(&self) {
        if self.num_waiters.load(Relaxed) > 0 {
            self.counter.fetch_add(1, Relaxed);
            wake_all(&self.counter)
        }
    }
}

#[cfg(test)]
mod tests {
    use std::thread;
    use std::time::Duration;

    use quickcheck_macros::quickcheck;

    use crate::concurrent::sync::Mutex;

    use super::*;

    #[quickcheck]
    fn test_condvar() {
        let mutex = Mutex::new(0);
        let condvar = Condvar::new();

        let mut wakeups = 0;

        thread::scope(|s| {
            s.spawn(|| {
                thread::sleep(Duration::from_nanos(10));
                *mutex.lock() = 123;
                condvar.notify_one();
            });

            let mut m = mutex.lock();
            while *m < 100 {
                m = condvar.wait(m);
                wakeups += 1;
            }

            assert_eq!(*m, 123);
        });

        // Check that the main thread actually did wait (not busy-loop),
        // while still allowing for a few spurious wake ups.
        assert!(wakeups < 10);
    }
}