1use crate::MutexGuard;
15use crate::futex::{futex_wait, futex_wake};
16use lock_api;
17use lock_api::RawMutex as RawMutexTrait;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub struct WaitTimeoutResult(pub(crate) bool);
26
27impl WaitTimeoutResult {
28 #[inline]
30 pub fn timed_out(self) -> bool {
31 self.0
32 }
33}
34
35pub struct Condvar {
40 seq: AtomicU32,
44}
45
46impl Condvar {
47 pub const fn new() -> Self {
49 Condvar { seq: AtomicU32::new(0) }
50 }
51
52 pub fn wait<T>(&self, guard: &mut MutexGuard<'_, T>) {
61 let seq = self.seq.load(Ordering::SeqCst);
62
63 let mutex = lock_api::MutexGuard::mutex(guard);
65 unsafe { mutex.force_unlock() };
66
67 futex_wait(&self.seq, seq, None);
69
70 unsafe { mutex.raw().lock() };
74 }
75
76 pub fn wait_for<T>(
80 &self,
81 guard: &mut MutexGuard<'_, T>,
82 timeout: Duration,
83 ) -> WaitTimeoutResult {
84 let seq = self.seq.load(Ordering::SeqCst);
85 let deadline = Instant::now() + timeout;
86
87 let mutex = lock_api::MutexGuard::mutex(guard);
88 unsafe { mutex.force_unlock() };
89
90 let timed_out = loop {
91 let now = Instant::now();
92 if now >= deadline {
93 break true;
94 }
95 let remaining = deadline - now;
96 let woke = futex_wait(&self.seq, seq, Some(remaining));
97 if !woke {
98 break true;
100 }
101 if self.seq.load(Ordering::Relaxed) != seq {
103 break false;
104 }
105 if Instant::now() >= deadline {
107 break true;
108 }
109 };
110
111 unsafe { mutex.raw().lock() };
113 WaitTimeoutResult(timed_out)
114 }
115
116 #[inline]
118 pub fn notify_one(&self) {
119 self.seq.fetch_add(1, Ordering::SeqCst);
120 futex_wake(&self.seq, 1);
121 }
122
123 #[inline]
125 pub fn notify_all(&self) {
126 self.seq.fetch_add(1, Ordering::SeqCst);
127 futex_wake(&self.seq, i32::MAX as u32);
130 }
131}
132
133impl std::fmt::Debug for Condvar {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.debug_struct("Condvar").finish_non_exhaustive()
136 }
137}
138
139impl Default for Condvar {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use crate::Mutex;
149 use std::sync::Arc;
150 use std::time::Duration;
151
152 #[test]
153 fn test_notify_one_wakes_waiter() {
154 let mutex = Arc::new(Mutex::new(false));
155 let condvar = Arc::new(Condvar::new());
156
157 let m2 = mutex.clone();
158 let cv2 = condvar.clone();
159 let handle = std::thread::spawn(move || {
160 let mut guard = m2.lock();
161 while !*guard {
162 cv2.wait(&mut guard);
163 }
164 true
165 });
166
167 std::thread::sleep(Duration::from_millis(20));
168 {
169 let mut guard = mutex.lock();
170 *guard = true;
171 condvar.notify_one();
172 }
173 assert!(handle.join().unwrap());
174 }
175
176 #[test]
177 fn test_notify_all_wakes_all_waiters() {
178 let mutex = Arc::new(Mutex::new(0usize));
179 let condvar = Arc::new(Condvar::new());
180 let mut handles = Vec::new();
181
182 for _ in 0..4 {
183 let m = mutex.clone();
184 let cv = condvar.clone();
185 handles.push(std::thread::spawn(move || {
186 let mut guard = m.lock();
187 while *guard == 0 {
188 cv.wait(&mut guard);
189 }
190 }));
191 }
192
193 std::thread::sleep(Duration::from_millis(30));
194 {
195 let mut guard = mutex.lock();
196 *guard = 1;
197 condvar.notify_all();
198 }
199 for h in handles {
200 h.join().unwrap();
201 }
202 }
203
204 #[test]
205 fn test_wait_for_times_out() {
206 let mutex = Arc::new(Mutex::new(()));
207 let condvar = Arc::new(Condvar::new());
208
209 let mut guard = mutex.lock();
210 let result = condvar.wait_for(&mut guard, Duration::from_millis(30));
211 assert!(result.timed_out(), "should have timed out");
212 }
213
214 #[test]
215 fn test_wait_for_notified_before_timeout() {
216 let mutex = Arc::new(Mutex::new(()));
217 let condvar = Arc::new(Condvar::new());
218
219 let cv2 = condvar.clone();
220 let handle = std::thread::spawn(move || {
221 std::thread::sleep(Duration::from_millis(10));
222 cv2.notify_one();
223 });
224
225 let mut guard = mutex.lock();
226 let result = condvar.wait_for(&mut guard, Duration::from_millis(500));
227 assert!(!result.timed_out(), "should have been notified");
228 handle.join().unwrap();
229 }
230}