Skip to main content

go_lib/sync/
cond.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu  = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2  = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//!     let mut ready = mu2.lock().unwrap();
22//!     *ready = true;
23//!     drop(ready);
24//!     cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//!     let mut ready = mu.lock().unwrap();
30//!     while !*ready {
31//!         ready = cnd.wait(&mu, ready);
32//!     }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines.  `notify_one` / `notify_all` call `goready` on them.
40//!
41//! The GRUNNING → GWAITING race is handled by `goready`'s built-in spin loop:
42//! even if a notifier calls `goready` between the moment a goroutine pushes
43//! itself onto `waitq` and the moment `gopark` actually sets `GWAITING`, the
44//! spin safely serialises the two.
45
46use std::collections::VecDeque;
47use std::sync::Mutex;
48
49use crate::runtime::g::{current_g, WaitReason};
50use crate::runtime::park::{gopark, goready};
51use crate::runtime::g::G;
52
53// ---------------------------------------------------------------------------
54// Cond
55// ---------------------------------------------------------------------------
56
57/// A goroutine-aware condition variable.
58///
59/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
60/// so other goroutines sharing the same M continue to be scheduled while
61/// waiters sleep.
62///
63/// Must be used from within a [`go_lib::run`] context.
64pub struct Cond {
65    /// Queue of goroutines waiting on this condition.
66    waitq: Mutex<VecDeque<*mut G>>,
67}
68
69// SAFETY: *mut G pointers are only read under the waitq Mutex and are valid
70// for the lifetime of the process (goroutines are never freed, only recycled).
71unsafe impl Send for Cond {}
72unsafe impl Sync for Cond {}
73
74impl Cond {
75    /// Create a new `Cond`.
76    pub fn new() -> Self {
77        Self { waitq: Mutex::new(VecDeque::new()) }
78    }
79
80    /// Release `guard`, park the current goroutine until notified, then
81    /// re-acquire the mutex and return the new guard.
82    ///
83    /// Spurious wakeups are possible; always re-check the predicate in a loop:
84    ///
85    /// ```no_run
86    /// # use std::sync::{Arc, Mutex};
87    /// # use go_lib::sync::Cond;
88    /// # let mu = Arc::new(Mutex::new(false));
89    /// # let cnd = Arc::new(Cond::new());
90    /// let mut guard = mu.lock().unwrap();
91    /// while !*guard {
92    ///     guard = cnd.wait(&mu, guard);
93    /// }
94    /// ```
95    ///
96    /// # Panics
97    ///
98    /// Panics if called from outside a goroutine context (i.e. before `run()`).
99    pub fn wait<'a, T>(
100        &self,
101        mu:   &'a Mutex<T>,
102        guard: std::sync::MutexGuard<'a, T>,
103    ) -> std::sync::MutexGuard<'a, T> {
104        let gp = current_g();
105        assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
106
107        // Enqueue ourselves before releasing the mutex so that any concurrent
108        // notify_one / notify_all sees us in the queue.
109        self.waitq.lock().unwrap().push_back(gp);
110
111        // Release the user's mutex.  After this point a notifier may call
112        // goready(gp); goready's spin loop handles the GRUNNING→GWAITING race.
113        drop(guard);
114
115        // Park until goready transitions us back to GRUNNABLE.
116        // SAFETY: we are on a goroutine stack (asserted above).
117        unsafe { gopark(WaitReason::CondVar) };
118
119        // Woken — re-acquire the user's mutex.
120        mu.lock().unwrap()
121    }
122
123    /// Wake one waiting goroutine.  No-op if there are no waiters.
124    pub fn notify_one(&self) {
125        let gp = self.waitq.lock().unwrap().pop_front();
126        if let Some(gp) = gp {
127            // SAFETY: gp is a valid goroutine pointer (see module safety comment).
128            unsafe { goready(gp) };
129        }
130    }
131
132    /// Wake all waiting goroutines.
133    pub fn notify_all(&self) {
134        let waiters: Vec<*mut G> = self.waitq.lock().unwrap().drain(..).collect();
135        for gp in waiters {
136            // SAFETY: same as notify_one.
137            unsafe { goready(gp) };
138        }
139    }
140}
141
142impl Default for Cond {
143    fn default() -> Self { Self::new() }
144}
145
146// ---------------------------------------------------------------------------
147// Tests
148// ---------------------------------------------------------------------------
149
150#[cfg(all(test, not(loom)))]
151mod tests {
152    use super::*;
153    use crate::runtime::sched::run_impl;
154    use std::sync::{Arc, Mutex};
155    use std::sync::atomic::{AtomicI32, Ordering};
156
157    /// A single waiter is woken by notify_one.
158    #[test]
159    fn single_waiter_notify_one() {
160        let mu   = Arc::new(Mutex::new(false));
161        let cnd  = Arc::new(Cond::new());
162        let woke = Arc::new(AtomicI32::new(0));
163
164        let mu2   = Arc::clone(&mu);
165        let cnd2  = Arc::clone(&cnd);
166        let woke2 = Arc::clone(&woke);
167        let woke3 = Arc::clone(&woke);
168
169        run_impl(move || {
170            // Spawn a waiter goroutine.
171            unsafe {
172                crate::runtime::sched::spawn_goroutine(move || {
173                    let mut guard = mu2.lock().unwrap();
174                    while !*guard {
175                        guard = cnd2.wait(&mu2, guard);
176                    }
177                    woke2.fetch_add(1, Ordering::Relaxed);
178                });
179            }
180
181            // Yield so the waiter parks, then signal.
182            for _ in 0..20 { crate::gosched(); }
183            {
184                let mut g = mu.lock().unwrap();
185                *g = true;
186            }
187            cnd.notify_one();
188
189            // Spin-wait for the waiter to record the wakeup.
190            let deadline = std::time::Instant::now()
191                + std::time::Duration::from_millis(500);
192            loop {
193                if woke3.load(Ordering::Acquire) == 1 { break; }
194                assert!(std::time::Instant::now() < deadline, "waiter did not wake");
195                crate::gosched();
196            }
197        });
198
199        assert_eq!(woke.load(Ordering::Acquire), 1);
200    }
201
202    /// All waiters are woken by notify_all.
203    #[test]
204    fn multiple_waiters_notify_all() {
205        const N: i32 = 4;
206        let mu   = Arc::new(Mutex::new(false));
207        let cnd  = Arc::new(Cond::new());
208        let woke = Arc::new(AtomicI32::new(0));
209
210        run_impl({
211            let mu   = Arc::clone(&mu);
212            let cnd  = Arc::clone(&cnd);
213            let woke = Arc::clone(&woke);
214            move || {
215                for _ in 0..N {
216                    let mu2   = Arc::clone(&mu);
217                    let cnd2  = Arc::clone(&cnd);
218                    let woke2 = Arc::clone(&woke);
219                    unsafe {
220                        crate::runtime::sched::spawn_goroutine(move || {
221                            let mut guard = mu2.lock().unwrap();
222                            while !*guard {
223                                guard = cnd2.wait(&mu2, guard);
224                            }
225                            woke2.fetch_add(1, Ordering::Relaxed);
226                        });
227                    }
228                }
229
230                for _ in 0..40 { crate::gosched(); }
231                *mu.lock().unwrap() = true;
232                cnd.notify_all();
233
234                let deadline = std::time::Instant::now()
235                    + std::time::Duration::from_millis(500);
236                loop {
237                    if woke.load(Ordering::Acquire) == N { break; }
238                    assert!(std::time::Instant::now() < deadline,
239                        "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
240                    crate::gosched();
241                }
242            }
243        });
244
245        assert_eq!(woke.load(Ordering::Acquire), N);
246    }
247
248    /// notify_one with no waiters is a no-op (must not panic or block).
249    #[test]
250    fn notify_one_no_waiters() {
251        let cnd = Cond::new();
252        cnd.notify_one(); // must return immediately
253    }
254
255    /// notify_all with no waiters is a no-op.
256    #[test]
257    fn notify_all_no_waiters() {
258        let cnd = Cond::new();
259        cnd.notify_all();
260    }
261}