Skip to main content

go_lib/sync/
cond.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! #[go_lib::main]
15//! fn main() {
16//!     let mu  = Arc::new(Mutex::new(false));
17//!     let cnd = Arc::new(Cond::new());
18//!
19//!     // Producer goroutine
20//!     let mu2  = Arc::clone(&mu);
21//!     let cnd2 = Arc::clone(&cnd);
22//!     go_lib::go!(move || {
23//!         let mut ready = mu2.lock().unwrap();
24//!         *ready = true;
25//!         drop(ready);
26//!         cnd2.notify_one();
27//!     });
28//!
29//!     // Consumer (runs on the main goroutine)
30//!     let mut ready = mu.lock().unwrap();
31//!     while !*ready {
32//!         ready = cnd.wait(&mu, ready);
33//!     }
34//! }
35//! ```
36//!
37//! ## Implementation
38//!
39//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
40//! goroutines.  `notify_one` / `notify_all` call `goready` on them.
41//!
42//! ### Commit-park protocol
43//!
44//! `waitq` is protected by a [`RawMutex`] (not `std::sync::Mutex`) so that
45//! `wait` can hold the queue lock ACROSS the park via
46//! [`gopark_commit`][crate::runtime::park::gopark_commit] — the lock is
47//! released on g0 only after the waiter has reached `GWAITING`.  Releasing
48//! the queue lock *before* `gopark` (the original design) left a lost-wakeup
49//! window: an async preemption (SIGURG) landing between the unlock and the
50//! park made the registered waiter `GRUNNABLE`; a concurrent
51//! `notify_one`/`notify_all` then popped the waiter and called `goready`,
52//! which saw a non-`GWAITING` status and dropped the wake on the floor — the
53//! waiter parked forever.  Holding `waitq`'s lock until `park_fn` has
54//! committed the goroutine to `GWAITING` closes the window: no notifier can
55//! pop the waiter (the pop needs the queue lock) until the park is committed.
56
57use std::collections::VecDeque;
58use std::cell::UnsafeCell;
59use std::sync::Mutex; // the user's external lock passed to `wait`
60
61use crate::runtime::g::{current_g, WaitReason};
62use crate::runtime::park::{gopark_commit, goready};
63use crate::runtime::g::G;
64use crate::runtime::rawmutex::RawMutex;
65
66// ---------------------------------------------------------------------------
67// Cond
68// ---------------------------------------------------------------------------
69
70/// A goroutine-aware condition variable.
71///
72/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
73/// so other goroutines sharing the same M continue to be scheduled while
74/// waiters sleep.
75///
76/// Must be used from within a `#[go_lib::main]` context.
77pub struct Cond {
78    /// Spinlock protecting `waitq`.  A `RawMutex` (not `std::sync::Mutex`) so
79    /// the goroutine `wait()` path can hold the queue lock ACROSS the park via
80    /// `gopark_commit` — released on g0 only after the waiter is `GWAITING`
81    /// (see the module-level "Commit-park protocol" note).
82    mu:    RawMutex,
83    /// Queue of goroutines waiting on this condition — always accessed under
84    /// `mu`.
85    waitq: UnsafeCell<VecDeque<*mut G>>,
86}
87
88// SAFETY: *mut G pointers are only read under `mu` and are valid for the
89// lifetime of the process (goroutines are never freed, only recycled).
90unsafe impl Send for Cond {}
91unsafe impl Sync for Cond {}
92
93impl Cond {
94    /// Create a new `Cond`.
95    pub fn new() -> Self {
96        Self { mu: RawMutex::new(), waitq: UnsafeCell::new(VecDeque::new()) }
97    }
98
99    /// Release `guard`, park the current goroutine until notified, then
100    /// re-acquire the mutex and return the new guard.
101    ///
102    /// Spurious wakeups are possible; always re-check the predicate in a loop:
103    ///
104    /// ```no_run
105    /// # use std::sync::{Arc, Mutex};
106    /// # use go_lib::sync::Cond;
107    /// # let mu = Arc::new(Mutex::new(false));
108    /// # let cnd = Arc::new(Cond::new());
109    /// let mut guard = mu.lock().unwrap();
110    /// while !*guard {
111    ///     guard = cnd.wait(&mu, guard);
112    /// }
113    /// ```
114    ///
115    /// # Panics
116    ///
117    /// Panics if called from outside a goroutine context (i.e. before `run()`).
118    pub fn wait<'a, T>(
119        &self,
120        mu:   &'a Mutex<T>,
121        guard: std::sync::MutexGuard<'a, T>,
122    ) -> std::sync::MutexGuard<'a, T> {
123        let gp = current_g();
124        assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
125
126        // m_lock suppresses async preemption while we hold `mu` AND across the
127        // `gopark_commit` below — the increment is transferred to `park_fn`,
128        // which balances it on the same M (see gopark_commit's contract).
129        let _lk = crate::runtime::m::m_lock();
130        self.mu.lock();
131
132        // Enqueue ourselves under `mu` so that any concurrent
133        // notify_one / notify_all sees us in the queue.  The queue lock stays
134        // held until park_fn has committed this goroutine to GWAITING
135        // (commit-park protocol), so a notifier cannot pop us and call
136        // goready until the park is guaranteed to find us parked.
137        // SAFETY: `mu` is held.
138        unsafe { (*self.waitq.get()).push_back(gp) };
139
140        // Release the user's mutex *before* parking (Go semantics).  The
141        // rendezvous with notifiers is protected by `mu`, which is held across
142        // the park, so releasing the user lock here cannot lose a wakeup.
143        drop(guard);
144
145        // Transfer the m.locks increment to park_fn, then park.  The queue
146        // lock (`mu`) is released on g0 by `unlock_cond_mutex` only after this
147        // goroutine is GWAITING.
148        std::mem::forget(_lk);
149        unsafe {
150            gopark_commit(
151                WaitReason::CondVar,
152                unlock_cond_mutex,
153                &self.mu as *const RawMutex as *mut u8,
154            );
155        }
156
157        // Woken — re-acquire the user's mutex.
158        mu.lock().unwrap()
159    }
160
161    /// Wake one waiting goroutine.  No-op if there are no waiters.
162    pub fn notify_one(&self) {
163        // m_lock suppresses async preemption across the `mu` critical section
164        // (same rationale as WaitGroup::add); without it a SIGURG between
165        // acquiring and releasing the spinlock could deschedule us while the
166        // lock is held, deadlocking the next goroutine on this M.
167        let _lk = crate::runtime::m::m_lock();
168        // A Cond waiter is woken purely via `goready`, which touches only the
169        // `G` descriptor — never the waiter's stack.
170        self.mu.lock();
171        // SAFETY: `mu` is held.
172        let gp = unsafe { (*self.waitq.get()).pop_front() };
173        unsafe { self.mu.unlock() };
174        // Wake outside the lock so we don't hold the spinlock across the
175        // goready spin (which waits for GRUNNING → GWAITING).
176        if let Some(gp) = gp {
177            // SAFETY: gp is a valid goroutine pointer (see module safety comment).
178            unsafe { goready(gp) };
179        }
180    }
181
182    /// Wake all waiting goroutines.
183    pub fn notify_all(&self) {
184        let _lk = crate::runtime::m::m_lock();
185        // See `notify_one`: waiters are woken only via `goready`, which never
186        // touches a waiter's stack.
187        self.mu.lock();
188        // SAFETY: `mu` is held.
189        let waiters: Vec<*mut G> = unsafe { (*self.waitq.get()).drain(..).collect() };
190        unsafe { self.mu.unlock() };
191        // Wake outside the lock (see notify_one).
192        for gp in waiters {
193            // SAFETY: same as notify_one.
194            unsafe { goready(gp) };
195        }
196    }
197
198}
199
200impl Default for Cond {
201    fn default() -> Self { Self::new() }
202}
203
204/// `gopark_commit` unlock shim: release the `Cond`'s `RawMutex` from g0 after
205/// the parking goroutine has reached `GWAITING`.
206///
207/// # Safety
208/// `arg` must be the `&RawMutex` of a `Cond` whose queue lock is held by the
209/// parking goroutine.
210unsafe fn unlock_cond_mutex(arg: *mut u8) {
211    unsafe { (*(arg as *const RawMutex)).unlock() }
212}
213
214// ---------------------------------------------------------------------------
215// Tests
216// ---------------------------------------------------------------------------
217
218#[cfg(all(test, not(loom)))]
219mod tests {
220    use super::*;
221    use std::sync::{Arc, Mutex};
222    use std::sync::atomic::{AtomicI32, Ordering};
223
224    /// A single waiter is woken by notify_one.
225    #[test]
226    #[go_lib::main]
227    fn single_waiter_notify_one() {
228        let mu   = Arc::new(Mutex::new(false));
229        let cnd  = Arc::new(Cond::new());
230        let woke = Arc::new(AtomicI32::new(0));
231
232        let mu2   = Arc::clone(&mu);
233        let cnd2  = Arc::clone(&cnd);
234        let woke2 = Arc::clone(&woke);
235        let woke3 = Arc::clone(&woke);
236
237        // Spawn a waiter goroutine.
238        crate::runtime::sched::spawn_goroutine(move || {
239            let mut guard = mu2.lock().unwrap();
240            while !*guard {
241                guard = cnd2.wait(&mu2, guard);
242            }
243            woke2.fetch_add(1, Ordering::Relaxed);
244        });
245
246        // Yield so the waiter parks, then signal.
247        for _ in 0..20 { crate::gosched(); }
248        {
249            let mut g = mu.lock().unwrap();
250            *g = true;
251        }
252        cnd.notify_one();
253
254        // Spin-wait for the waiter to record the wakeup.
255        let deadline = std::time::Instant::now()
256            + std::time::Duration::from_millis(500);
257        loop {
258            if woke3.load(Ordering::Acquire) == 1 { break; }
259            assert!(std::time::Instant::now() < deadline, "waiter did not wake");
260            crate::gosched();
261        }
262
263        assert_eq!(woke.load(Ordering::Acquire), 1);
264    }
265
266    /// All waiters are woken by notify_all.
267    #[test]
268    #[go_lib::main]
269    fn multiple_waiters_notify_all() {
270        const N: i32 = 4;
271        let mu   = Arc::new(Mutex::new(false));
272        let cnd  = Arc::new(Cond::new());
273        let woke = Arc::new(AtomicI32::new(0));
274
275        for _ in 0..N {
276            let mu2   = Arc::clone(&mu);
277            let cnd2  = Arc::clone(&cnd);
278            let woke2 = Arc::clone(&woke);
279            crate::runtime::sched::spawn_goroutine(move || {
280                let mut guard = mu2.lock().unwrap();
281                while !*guard {
282                    guard = cnd2.wait(&mu2, guard);
283                }
284                woke2.fetch_add(1, Ordering::Relaxed);
285            });
286        }
287
288        for _ in 0..40 { crate::gosched(); }
289        *mu.lock().unwrap() = true;
290        cnd.notify_all();
291
292        let deadline = std::time::Instant::now()
293            + std::time::Duration::from_millis(500);
294        loop {
295            if woke.load(Ordering::Acquire) == N { break; }
296            assert!(std::time::Instant::now() < deadline,
297                "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
298            crate::gosched();
299        }
300
301        assert_eq!(woke.load(Ordering::Acquire), N);
302    }
303
304    /// notify_one with no waiters is a no-op (must not panic or block).
305    #[test]
306    fn notify_one_no_waiters() {
307        let cnd = Cond::new();
308        cnd.notify_one(); // must return immediately
309    }
310
311    /// notify_all with no waiters is a no-op.
312    #[test]
313    fn notify_all_no_waiters() {
314        let cnd = Cond::new();
315        cnd.notify_all();
316    }
317}