Skip to main content

go_lib/sync/
cond.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu  = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2  = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//!     let mut ready = mu2.lock().unwrap();
22//!     *ready = true;
23//!     drop(ready);
24//!     cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//!     let mut ready = mu.lock().unwrap();
30//!     while !*ready {
31//!         ready = cnd.wait(&mu, ready);
32//!     }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines.  `notify_one` / `notify_all` call `goready` on them.
40//!
41//! ### Commit-park protocol
42//!
43//! `waitq` is protected by a [`RawMutex`] (not `std::sync::Mutex`) so that
44//! `wait` can hold the queue lock ACROSS the park via
45//! [`gopark_commit`][crate::runtime::park::gopark_commit] — the lock is
46//! released on g0 only after the waiter has reached `GWAITING`.  Releasing
47//! the queue lock *before* `gopark` (the original design) left a lost-wakeup
48//! window: an async preemption (SIGURG) landing between the unlock and the
49//! park made the registered waiter `GRUNNABLE`; a concurrent
50//! `notify_one`/`notify_all` then popped the waiter and called `goready`,
51//! which saw a non-`GWAITING` status and dropped the wake on the floor — the
52//! waiter parked forever.  Holding `waitq`'s lock until `park_fn` has
53//! committed the goroutine to `GWAITING` closes the window: no notifier can
54//! pop the waiter (the pop needs the queue lock) until the park is committed.
55
56use std::collections::VecDeque;
57use std::cell::UnsafeCell;
58use std::sync::Mutex; // the user's external lock passed to `wait`
59
60use crate::runtime::g::{current_g, WaitReason};
61use crate::runtime::park::{gopark_commit, goready};
62use crate::runtime::g::G;
63use crate::runtime::rawmutex::RawMutex;
64
65// ---------------------------------------------------------------------------
66// Cond
67// ---------------------------------------------------------------------------
68
69/// A goroutine-aware condition variable.
70///
71/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
72/// so other goroutines sharing the same M continue to be scheduled while
73/// waiters sleep.
74///
75/// Must be used from within a [`go_lib::run`] context.
76pub struct Cond {
77    /// Spinlock protecting `waitq`.  A `RawMutex` (not `std::sync::Mutex`) so
78    /// the goroutine `wait()` path can hold the queue lock ACROSS the park via
79    /// `gopark_commit` — released on g0 only after the waiter is `GWAITING`
80    /// (see the module-level "Commit-park protocol" note).
81    mu:    RawMutex,
82    /// Queue of goroutines waiting on this condition — always accessed under
83    /// `mu`.
84    waitq: UnsafeCell<VecDeque<*mut G>>,
85}
86
87// SAFETY: *mut G pointers are only read under `mu` and are valid for the
88// lifetime of the process (goroutines are never freed, only recycled).
89unsafe impl Send for Cond {}
90unsafe impl Sync for Cond {}
91
92impl Cond {
93    /// Create a new `Cond`.
94    pub fn new() -> Self {
95        Self { mu: RawMutex::new(), waitq: UnsafeCell::new(VecDeque::new()) }
96    }
97
98    /// Release `guard`, park the current goroutine until notified, then
99    /// re-acquire the mutex and return the new guard.
100    ///
101    /// Spurious wakeups are possible; always re-check the predicate in a loop:
102    ///
103    /// ```no_run
104    /// # use std::sync::{Arc, Mutex};
105    /// # use go_lib::sync::Cond;
106    /// # let mu = Arc::new(Mutex::new(false));
107    /// # let cnd = Arc::new(Cond::new());
108    /// let mut guard = mu.lock().unwrap();
109    /// while !*guard {
110    ///     guard = cnd.wait(&mu, guard);
111    /// }
112    /// ```
113    ///
114    /// # Panics
115    ///
116    /// Panics if called from outside a goroutine context (i.e. before `run()`).
117    pub fn wait<'a, T>(
118        &self,
119        mu:   &'a Mutex<T>,
120        guard: std::sync::MutexGuard<'a, T>,
121    ) -> std::sync::MutexGuard<'a, T> {
122        let gp = current_g();
123        assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
124
125        // m_lock suppresses async preemption while we hold `mu` AND across the
126        // `gopark_commit` below — the increment is transferred to `park_fn`,
127        // which balances it on the same M (see gopark_commit's contract).
128        let _lk = crate::runtime::m::m_lock();
129        self.mu.lock();
130
131        // Enqueue ourselves under `mu` so that any concurrent
132        // notify_one / notify_all sees us in the queue.  The queue lock stays
133        // held until park_fn has committed this goroutine to GWAITING
134        // (commit-park protocol), so a notifier cannot pop us and call
135        // goready until the park is guaranteed to find us parked.
136        // SAFETY: `mu` is held.
137        unsafe { (*self.waitq.get()).push_back(gp) };
138
139        // Release the user's mutex *before* parking (Go semantics).  The
140        // rendezvous with notifiers is protected by `mu`, which is held across
141        // the park, so releasing the user lock here cannot lose a wakeup.
142        drop(guard);
143
144        // Transfer the m.locks increment to park_fn, then park.  The queue
145        // lock (`mu`) is released on g0 by `unlock_cond_mutex` only after this
146        // goroutine is GWAITING.
147        std::mem::forget(_lk);
148        unsafe {
149            gopark_commit(
150                WaitReason::CondVar,
151                unlock_cond_mutex,
152                &self.mu as *const RawMutex as *mut u8,
153            );
154        }
155
156        // Woken — re-acquire the user's mutex.
157        mu.lock().unwrap()
158    }
159
160    /// Wake one waiting goroutine.  No-op if there are no waiters.
161    pub fn notify_one(&self) {
162        // m_lock suppresses async preemption across the `mu` critical section
163        // (same rationale as WaitGroup::add); without it a SIGURG between
164        // acquiring and releasing the spinlock could deschedule us while the
165        // lock is held, deadlocking the next goroutine on this M.
166        let _lk = crate::runtime::m::m_lock();
167        // A Cond waiter is woken purely via `goready`, which touches only the
168        // `G` descriptor — never the waiter's stack.
169        self.mu.lock();
170        // SAFETY: `mu` is held.
171        let gp = unsafe { (*self.waitq.get()).pop_front() };
172        unsafe { self.mu.unlock() };
173        // Wake outside the lock so we don't hold the spinlock across the
174        // goready spin (which waits for GRUNNING → GWAITING).
175        if let Some(gp) = gp {
176            // SAFETY: gp is a valid goroutine pointer (see module safety comment).
177            unsafe { goready(gp) };
178        }
179    }
180
181    /// Wake all waiting goroutines.
182    pub fn notify_all(&self) {
183        let _lk = crate::runtime::m::m_lock();
184        // See `notify_one`: waiters are woken only via `goready`, which never
185        // touches a waiter's stack.
186        self.mu.lock();
187        // SAFETY: `mu` is held.
188        let waiters: Vec<*mut G> = unsafe { (*self.waitq.get()).drain(..).collect() };
189        unsafe { self.mu.unlock() };
190        // Wake outside the lock (see notify_one).
191        for gp in waiters {
192            // SAFETY: same as notify_one.
193            unsafe { goready(gp) };
194        }
195    }
196
197}
198
199impl Default for Cond {
200    fn default() -> Self { Self::new() }
201}
202
203/// `gopark_commit` unlock shim: release the `Cond`'s `RawMutex` from g0 after
204/// the parking goroutine has reached `GWAITING`.
205///
206/// # Safety
207/// `arg` must be the `&RawMutex` of a `Cond` whose queue lock is held by the
208/// parking goroutine.
209unsafe fn unlock_cond_mutex(arg: *mut u8) {
210    unsafe { (*(arg as *const RawMutex)).unlock() }
211}
212
213// ---------------------------------------------------------------------------
214// Tests
215// ---------------------------------------------------------------------------
216
217#[cfg(all(test, not(loom)))]
218mod tests {
219    use super::*;
220    use crate::runtime::sched::run_impl;
221    use std::sync::{Arc, Mutex};
222    use std::sync::atomic::{AtomicI32, Ordering};
223
224    /// A single waiter is woken by notify_one.
225    #[test]
226    fn single_waiter_notify_one() {
227        let mu   = Arc::new(Mutex::new(false));
228        let cnd  = Arc::new(Cond::new());
229        let woke = Arc::new(AtomicI32::new(0));
230
231        let mu2   = Arc::clone(&mu);
232        let cnd2  = Arc::clone(&cnd);
233        let woke2 = Arc::clone(&woke);
234        let woke3 = Arc::clone(&woke);
235
236        run_impl(move || {
237            // Spawn a waiter goroutine.
238            crate::runtime::sched::spawn_goroutine(move || {
239                let mut guard = mu2.lock().unwrap();
240                while !*guard {
241                    guard = cnd2.wait(&mu2, guard);
242                }
243                woke2.fetch_add(1, Ordering::Relaxed);
244            });
245
246            // Yield so the waiter parks, then signal.
247            for _ in 0..20 { crate::gosched(); }
248            {
249                let mut g = mu.lock().unwrap();
250                *g = true;
251            }
252            cnd.notify_one();
253
254            // Spin-wait for the waiter to record the wakeup.
255            let deadline = std::time::Instant::now()
256                + std::time::Duration::from_millis(500);
257            loop {
258                if woke3.load(Ordering::Acquire) == 1 { break; }
259                assert!(std::time::Instant::now() < deadline, "waiter did not wake");
260                crate::gosched();
261            }
262        });
263
264        assert_eq!(woke.load(Ordering::Acquire), 1);
265    }
266
267    /// All waiters are woken by notify_all.
268    #[test]
269    fn multiple_waiters_notify_all() {
270        const N: i32 = 4;
271        let mu   = Arc::new(Mutex::new(false));
272        let cnd  = Arc::new(Cond::new());
273        let woke = Arc::new(AtomicI32::new(0));
274
275        run_impl({
276            let mu   = Arc::clone(&mu);
277            let cnd  = Arc::clone(&cnd);
278            let woke = Arc::clone(&woke);
279            move || {
280                for _ in 0..N {
281                    let mu2   = Arc::clone(&mu);
282                    let cnd2  = Arc::clone(&cnd);
283                    let woke2 = Arc::clone(&woke);
284                    crate::runtime::sched::spawn_goroutine(move || {
285                        let mut guard = mu2.lock().unwrap();
286                        while !*guard {
287                            guard = cnd2.wait(&mu2, guard);
288                        }
289                        woke2.fetch_add(1, Ordering::Relaxed);
290                    });
291                }
292
293                for _ in 0..40 { crate::gosched(); }
294                *mu.lock().unwrap() = true;
295                cnd.notify_all();
296
297                let deadline = std::time::Instant::now()
298                    + std::time::Duration::from_millis(500);
299                loop {
300                    if woke.load(Ordering::Acquire) == N { break; }
301                    assert!(std::time::Instant::now() < deadline,
302                        "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
303                    crate::gosched();
304                }
305            }
306        });
307
308        assert_eq!(woke.load(Ordering::Acquire), N);
309    }
310
311    /// notify_one with no waiters is a no-op (must not panic or block).
312    #[test]
313    fn notify_one_no_waiters() {
314        let cnd = Cond::new();
315        cnd.notify_one(); // must return immediately
316    }
317
318    /// notify_all with no waiters is a no-op.
319    #[test]
320    fn notify_all_no_waiters() {
321        let cnd = Cond::new();
322        cnd.notify_all();
323    }
324}