Skip to main content

noxu_sync/
condvar.rs

1//! Futex-based condition variable.
2//!
3//! Uses a sequence counter approach: `notify_one`/`notify_all` increment the
4//! counter before calling `futex_wake`. A waiting thread snapshots the counter
5//! before releasing the mutex; `futex_wait` returns immediately if the counter
6//! has already changed, preventing missed wakeups.
7//!
8//! This matches the API exported by `parking_lot::Condvar`:
9//!   - `wait(&mut MutexGuard<'_, T>)`
10//!   - `wait_for(&mut MutexGuard<'_, T>, Duration) -> WaitTimeoutResult`
11//!   - `notify_one()`
12//!   - `notify_all()`
13
14use crate::MutexGuard;
15use crate::futex::{futex_wait, futex_wake};
16use lock_api;
17use lock_api::RawMutex as RawMutexTrait;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::{Duration, Instant};
20
21/// Result of a timed condvar wait.
22///
23/// Drop-in replacement for `parking_lot::WaitTimeoutResult`.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub struct WaitTimeoutResult(pub(crate) bool);
26
27impl WaitTimeoutResult {
28    /// Returns `true` if the wait timed out, `false` if it was notified.
29    #[inline]
30    pub fn timed_out(self) -> bool {
31        self.0
32    }
33}
34
35/// Futex-based condition variable.
36///
37/// Drop-in replacement for `parking_lot::Condvar`. Works exclusively with
38/// `noxu_sync::Mutex<T>` guards.
39pub struct Condvar {
40    /// Sequence counter: incremented by each `notify_*` call.
41    /// Waiters snapshot this before releasing the mutex; `futex_wait` checks
42    /// that the value still matches, preventing lost-wakeup races.
43    seq: AtomicU32,
44}
45
46impl Condvar {
47    /// Creates a new `Condvar`.
48    pub const fn new() -> Self {
49        Condvar { seq: AtomicU32::new(0) }
50    }
51
52    /// Atomically releases the mutex guard and waits for a notification.
53    ///
54    /// Re-acquires the mutex before returning. Spurious wakeups are possible;
55    /// callers must re-check their condition in a loop.
56    ///
57    /// # Panics
58    ///
59    /// Does not panic. Safe to call from any thread holding the guard.
60    pub fn wait<T>(&self, guard: &mut MutexGuard<'_, T>) {
61        let seq = self.seq.load(Ordering::SeqCst);
62
63        // Release the mutex before parking.
64        let mutex = lock_api::MutexGuard::mutex(guard);
65        unsafe { mutex.force_unlock() };
66
67        // Park until seq changes (woken by notify) or spurious wakeup.
68        futex_wait(&self.seq, seq, None);
69
70        // Re-acquire before returning to caller.
71        // SAFETY: We released the lock above; re-acquiring it here restores
72        // the invariant that the guard is valid (lock held) on return.
73        unsafe { mutex.raw().lock() };
74    }
75
76    /// Atomically releases the mutex guard, waits for a notification or timeout.
77    ///
78    /// Returns `WaitTimeoutResult(true)` if the timeout elapsed, `false` if notified.
79    pub fn wait_for<T>(
80        &self,
81        guard: &mut MutexGuard<'_, T>,
82        timeout: Duration,
83    ) -> WaitTimeoutResult {
84        let seq = self.seq.load(Ordering::SeqCst);
85        let deadline = Instant::now() + timeout;
86
87        let mutex = lock_api::MutexGuard::mutex(guard);
88        unsafe { mutex.force_unlock() };
89
90        let timed_out = loop {
91            let now = Instant::now();
92            if now >= deadline {
93                break true;
94            }
95            let remaining = deadline - now;
96            let woke = futex_wait(&self.seq, seq, Some(remaining));
97            if !woke {
98                // futex_wait returned false → timed out.
99                break true;
100            }
101            // Check if seq changed (notification received).
102            if self.seq.load(Ordering::Relaxed) != seq {
103                break false;
104            }
105            // Spurious wakeup: re-check deadline.
106            if Instant::now() >= deadline {
107                break true;
108            }
109        };
110
111        // SAFETY: We released the lock above; re-acquiring restores guard validity.
112        unsafe { mutex.raw().lock() };
113        WaitTimeoutResult(timed_out)
114    }
115
116    /// Wakes one thread waiting on this condvar.
117    #[inline]
118    pub fn notify_one(&self) {
119        self.seq.fetch_add(1, Ordering::SeqCst);
120        futex_wake(&self.seq, 1);
121    }
122
123    /// Wakes all threads waiting on this condvar.
124    #[inline]
125    pub fn notify_all(&self) {
126        self.seq.fetch_add(1, Ordering::SeqCst);
127        // Use i32::MAX as u32 — the kernel nr_wake field is signed;
128        // u32::MAX would truncate to -1 and wake at most one thread.
129        futex_wake(&self.seq, i32::MAX as u32);
130    }
131}
132
133impl std::fmt::Debug for Condvar {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("Condvar").finish_non_exhaustive()
136    }
137}
138
139impl Default for Condvar {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use crate::Mutex;
149    use std::sync::Arc;
150    use std::time::Duration;
151
152    #[test]
153    fn test_notify_one_wakes_waiter() {
154        let mutex = Arc::new(Mutex::new(false));
155        let condvar = Arc::new(Condvar::new());
156
157        let m2 = mutex.clone();
158        let cv2 = condvar.clone();
159        let handle = std::thread::spawn(move || {
160            let mut guard = m2.lock();
161            while !*guard {
162                cv2.wait(&mut guard);
163            }
164            true
165        });
166
167        std::thread::sleep(Duration::from_millis(20));
168        {
169            let mut guard = mutex.lock();
170            *guard = true;
171            condvar.notify_one();
172        }
173        assert!(handle.join().unwrap());
174    }
175
176    #[test]
177    fn test_notify_all_wakes_all_waiters() {
178        let mutex = Arc::new(Mutex::new(0usize));
179        let condvar = Arc::new(Condvar::new());
180        let mut handles = Vec::new();
181
182        for _ in 0..4 {
183            let m = mutex.clone();
184            let cv = condvar.clone();
185            handles.push(std::thread::spawn(move || {
186                let mut guard = m.lock();
187                while *guard == 0 {
188                    cv.wait(&mut guard);
189                }
190            }));
191        }
192
193        std::thread::sleep(Duration::from_millis(30));
194        {
195            let mut guard = mutex.lock();
196            *guard = 1;
197            condvar.notify_all();
198        }
199        for h in handles {
200            h.join().unwrap();
201        }
202    }
203
204    #[test]
205    fn test_wait_for_times_out() {
206        let mutex = Arc::new(Mutex::new(()));
207        let condvar = Arc::new(Condvar::new());
208
209        let mut guard = mutex.lock();
210        let result = condvar.wait_for(&mut guard, Duration::from_millis(30));
211        assert!(result.timed_out(), "should have timed out");
212    }
213
214    #[test]
215    fn test_wait_for_notified_before_timeout() {
216        let mutex = Arc::new(Mutex::new(()));
217        let condvar = Arc::new(Condvar::new());
218
219        let cv2 = condvar.clone();
220        let handle = std::thread::spawn(move || {
221            std::thread::sleep(Duration::from_millis(10));
222            cv2.notify_one();
223        });
224
225        let mut guard = mutex.lock();
226        let result = condvar.wait_for(&mut guard, Duration::from_millis(500));
227        assert!(!result.timed_out(), "should have been notified");
228        handle.join().unwrap();
229    }
230}