go_lib/sync/cond.rs
1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2 = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//! let mut ready = mu2.lock().unwrap();
22//! *ready = true;
23//! drop(ready);
24//! cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//! let mut ready = mu.lock().unwrap();
30//! while !*ready {
31//! ready = cnd.wait(&mu, ready);
32//! }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines. `notify_one` / `notify_all` call `goready` on them.
40//!
41//! ### Commit-park protocol
42//!
43//! `waitq` is protected by a [`RawMutex`] (not `std::sync::Mutex`) so that
44//! `wait` can hold the queue lock ACROSS the park via
45//! [`gopark_commit`][crate::runtime::park::gopark_commit] — the lock is
46//! released on g0 only after the waiter has reached `GWAITING`. Releasing
47//! the queue lock *before* `gopark` (the original design) left a lost-wakeup
48//! window: an async preemption (SIGURG) landing between the unlock and the
49//! park made the registered waiter `GRUNNABLE`; a concurrent
50//! `notify_one`/`notify_all` then popped the waiter and called `goready`,
51//! which saw a non-`GWAITING` status and dropped the wake on the floor — the
52//! waiter parked forever. Holding `waitq`'s lock until `park_fn` has
53//! committed the goroutine to `GWAITING` closes the window: no notifier can
54//! pop the waiter (the pop needs the queue lock) until the park is committed.
55
56use std::collections::VecDeque;
57use std::cell::UnsafeCell;
58use std::sync::Mutex; // the user's external lock passed to `wait`
59
60use crate::runtime::g::{current_g, WaitReason};
61use crate::runtime::park::{gopark_commit, goready};
62use crate::runtime::g::G;
63use crate::runtime::rawmutex::RawMutex;
64
65// ---------------------------------------------------------------------------
66// Cond
67// ---------------------------------------------------------------------------
68
69/// A goroutine-aware condition variable.
70///
71/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
72/// so other goroutines sharing the same M continue to be scheduled while
73/// waiters sleep.
74///
75/// Must be used from within a [`go_lib::run`] context.
76pub struct Cond {
77 /// Spinlock protecting `waitq`. A `RawMutex` (not `std::sync::Mutex`) so
78 /// the goroutine `wait()` path can hold the queue lock ACROSS the park via
79 /// `gopark_commit` — released on g0 only after the waiter is `GWAITING`
80 /// (see the module-level "Commit-park protocol" note).
81 mu: RawMutex,
82 /// Queue of goroutines waiting on this condition — always accessed under
83 /// `mu`.
84 waitq: UnsafeCell<VecDeque<*mut G>>,
85}
86
87// SAFETY: *mut G pointers are only read under `mu` and are valid for the
88// lifetime of the process (goroutines are never freed, only recycled).
89unsafe impl Send for Cond {}
90unsafe impl Sync for Cond {}
91
92impl Cond {
93 /// Create a new `Cond`.
94 pub fn new() -> Self {
95 Self { mu: RawMutex::new(), waitq: UnsafeCell::new(VecDeque::new()) }
96 }
97
98 /// Release `guard`, park the current goroutine until notified, then
99 /// re-acquire the mutex and return the new guard.
100 ///
101 /// Spurious wakeups are possible; always re-check the predicate in a loop:
102 ///
103 /// ```no_run
104 /// # use std::sync::{Arc, Mutex};
105 /// # use go_lib::sync::Cond;
106 /// # let mu = Arc::new(Mutex::new(false));
107 /// # let cnd = Arc::new(Cond::new());
108 /// let mut guard = mu.lock().unwrap();
109 /// while !*guard {
110 /// guard = cnd.wait(&mu, guard);
111 /// }
112 /// ```
113 ///
114 /// # Panics
115 ///
116 /// Panics if called from outside a goroutine context (i.e. before `run()`).
117 pub fn wait<'a, T>(
118 &self,
119 mu: &'a Mutex<T>,
120 guard: std::sync::MutexGuard<'a, T>,
121 ) -> std::sync::MutexGuard<'a, T> {
122 let gp = current_g();
123 assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
124
125 // m_lock suppresses async preemption while we hold `mu` AND across the
126 // `gopark_commit` below — the increment is transferred to `park_fn`,
127 // which balances it on the same M (see gopark_commit's contract).
128 let _lk = crate::runtime::m::m_lock();
129 self.mu.lock();
130
131 // Enqueue ourselves under `mu` so that any concurrent
132 // notify_one / notify_all sees us in the queue. The queue lock stays
133 // held until park_fn has committed this goroutine to GWAITING
134 // (commit-park protocol), so a notifier cannot pop us and call
135 // goready until the park is guaranteed to find us parked.
136 // SAFETY: `mu` is held.
137 unsafe { (*self.waitq.get()).push_back(gp) };
138
139 // Release the user's mutex *before* parking (Go semantics). The
140 // rendezvous with notifiers is protected by `mu`, which is held across
141 // the park, so releasing the user lock here cannot lose a wakeup.
142 drop(guard);
143
144 // Transfer the m.locks increment to park_fn, then park. The queue
145 // lock (`mu`) is released on g0 by `unlock_cond_mutex` only after this
146 // goroutine is GWAITING.
147 std::mem::forget(_lk);
148 unsafe {
149 gopark_commit(
150 WaitReason::CondVar,
151 unlock_cond_mutex,
152 &self.mu as *const RawMutex as *mut u8,
153 );
154 }
155
156 // Woken — re-acquire the user's mutex.
157 mu.lock().unwrap()
158 }
159
160 /// Wake one waiting goroutine. No-op if there are no waiters.
161 pub fn notify_one(&self) {
162 // m_lock suppresses async preemption across the `mu` critical section
163 // (same rationale as WaitGroup::add); without it a SIGURG between
164 // acquiring and releasing the spinlock could deschedule us while the
165 // lock is held, deadlocking the next goroutine on this M.
166 let _lk = crate::runtime::m::m_lock();
167 // A Cond waiter is woken purely via `goready`, which touches only the
168 // `G` descriptor — never the waiter's stack.
169 self.mu.lock();
170 // SAFETY: `mu` is held.
171 let gp = unsafe { (*self.waitq.get()).pop_front() };
172 unsafe { self.mu.unlock() };
173 // Wake outside the lock so we don't hold the spinlock across the
174 // goready spin (which waits for GRUNNING → GWAITING).
175 if let Some(gp) = gp {
176 // SAFETY: gp is a valid goroutine pointer (see module safety comment).
177 unsafe { goready(gp) };
178 }
179 }
180
181 /// Wake all waiting goroutines.
182 pub fn notify_all(&self) {
183 let _lk = crate::runtime::m::m_lock();
184 // See `notify_one`: waiters are woken only via `goready`, which never
185 // touches a waiter's stack.
186 self.mu.lock();
187 // SAFETY: `mu` is held.
188 let waiters: Vec<*mut G> = unsafe { (*self.waitq.get()).drain(..).collect() };
189 unsafe { self.mu.unlock() };
190 // Wake outside the lock (see notify_one).
191 for gp in waiters {
192 // SAFETY: same as notify_one.
193 unsafe { goready(gp) };
194 }
195 }
196
197}
198
199impl Default for Cond {
200 fn default() -> Self { Self::new() }
201}
202
203/// `gopark_commit` unlock shim: release the `Cond`'s `RawMutex` from g0 after
204/// the parking goroutine has reached `GWAITING`.
205///
206/// # Safety
207/// `arg` must be the `&RawMutex` of a `Cond` whose queue lock is held by the
208/// parking goroutine.
209unsafe fn unlock_cond_mutex(arg: *mut u8) {
210 unsafe { (*(arg as *const RawMutex)).unlock() }
211}
212
213// ---------------------------------------------------------------------------
214// Tests
215// ---------------------------------------------------------------------------
216
217#[cfg(all(test, not(loom)))]
218mod tests {
219 use super::*;
220 use crate::runtime::sched::run_impl;
221 use std::sync::{Arc, Mutex};
222 use std::sync::atomic::{AtomicI32, Ordering};
223
224 /// A single waiter is woken by notify_one.
225 #[test]
226 fn single_waiter_notify_one() {
227 let mu = Arc::new(Mutex::new(false));
228 let cnd = Arc::new(Cond::new());
229 let woke = Arc::new(AtomicI32::new(0));
230
231 let mu2 = Arc::clone(&mu);
232 let cnd2 = Arc::clone(&cnd);
233 let woke2 = Arc::clone(&woke);
234 let woke3 = Arc::clone(&woke);
235
236 run_impl(move || {
237 // Spawn a waiter goroutine.
238 crate::runtime::sched::spawn_goroutine(move || {
239 let mut guard = mu2.lock().unwrap();
240 while !*guard {
241 guard = cnd2.wait(&mu2, guard);
242 }
243 woke2.fetch_add(1, Ordering::Relaxed);
244 });
245
246 // Yield so the waiter parks, then signal.
247 for _ in 0..20 { crate::gosched(); }
248 {
249 let mut g = mu.lock().unwrap();
250 *g = true;
251 }
252 cnd.notify_one();
253
254 // Spin-wait for the waiter to record the wakeup.
255 let deadline = std::time::Instant::now()
256 + std::time::Duration::from_millis(500);
257 loop {
258 if woke3.load(Ordering::Acquire) == 1 { break; }
259 assert!(std::time::Instant::now() < deadline, "waiter did not wake");
260 crate::gosched();
261 }
262 });
263
264 assert_eq!(woke.load(Ordering::Acquire), 1);
265 }
266
267 /// All waiters are woken by notify_all.
268 #[test]
269 fn multiple_waiters_notify_all() {
270 const N: i32 = 4;
271 let mu = Arc::new(Mutex::new(false));
272 let cnd = Arc::new(Cond::new());
273 let woke = Arc::new(AtomicI32::new(0));
274
275 run_impl({
276 let mu = Arc::clone(&mu);
277 let cnd = Arc::clone(&cnd);
278 let woke = Arc::clone(&woke);
279 move || {
280 for _ in 0..N {
281 let mu2 = Arc::clone(&mu);
282 let cnd2 = Arc::clone(&cnd);
283 let woke2 = Arc::clone(&woke);
284 crate::runtime::sched::spawn_goroutine(move || {
285 let mut guard = mu2.lock().unwrap();
286 while !*guard {
287 guard = cnd2.wait(&mu2, guard);
288 }
289 woke2.fetch_add(1, Ordering::Relaxed);
290 });
291 }
292
293 for _ in 0..40 { crate::gosched(); }
294 *mu.lock().unwrap() = true;
295 cnd.notify_all();
296
297 let deadline = std::time::Instant::now()
298 + std::time::Duration::from_millis(500);
299 loop {
300 if woke.load(Ordering::Acquire) == N { break; }
301 assert!(std::time::Instant::now() < deadline,
302 "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
303 crate::gosched();
304 }
305 }
306 });
307
308 assert_eq!(woke.load(Ordering::Acquire), N);
309 }
310
311 /// notify_one with no waiters is a no-op (must not panic or block).
312 #[test]
313 fn notify_one_no_waiters() {
314 let cnd = Cond::new();
315 cnd.notify_one(); // must return immediately
316 }
317
318 /// notify_all with no waiters is a no-op.
319 #[test]
320 fn notify_all_no_waiters() {
321 let cnd = Cond::new();
322 cnd.notify_all();
323 }
324}