go_lib/sync/cond.rs
1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! #[go_lib::main]
15//! fn main() {
16//! let mu = Arc::new(Mutex::new(false));
17//! let cnd = Arc::new(Cond::new());
18//!
19//! // Producer goroutine
20//! let mu2 = Arc::clone(&mu);
21//! let cnd2 = Arc::clone(&cnd);
22//! go_lib::go!(move || {
23//! let mut ready = mu2.lock().unwrap();
24//! *ready = true;
25//! drop(ready);
26//! cnd2.notify_one();
27//! });
28//!
29//! // Consumer (runs on the main goroutine)
30//! let mut ready = mu.lock().unwrap();
31//! while !*ready {
32//! ready = cnd.wait(&mu, ready);
33//! }
34//! }
35//! ```
36//!
37//! ## Implementation
38//!
39//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
40//! goroutines. `notify_one` / `notify_all` call `goready` on them.
41//!
42//! ### Commit-park protocol
43//!
44//! `waitq` is protected by a [`RawMutex`] (not `std::sync::Mutex`) so that
45//! `wait` can hold the queue lock ACROSS the park via
46//! [`gopark_commit`][crate::runtime::park::gopark_commit] — the lock is
47//! released on g0 only after the waiter has reached `GWAITING`. Releasing
48//! the queue lock *before* `gopark` (the original design) left a lost-wakeup
49//! window: an async preemption (SIGURG) landing between the unlock and the
50//! park made the registered waiter `GRUNNABLE`; a concurrent
51//! `notify_one`/`notify_all` then popped the waiter and called `goready`,
52//! which saw a non-`GWAITING` status and dropped the wake on the floor — the
53//! waiter parked forever. Holding `waitq`'s lock until `park_fn` has
54//! committed the goroutine to `GWAITING` closes the window: no notifier can
55//! pop the waiter (the pop needs the queue lock) until the park is committed.
56
57use std::collections::VecDeque;
58use std::cell::UnsafeCell;
59use std::sync::Mutex; // the user's external lock passed to `wait`
60
61use crate::runtime::g::{current_g, WaitReason};
62use crate::runtime::park::{gopark_commit, goready};
63use crate::runtime::g::G;
64use crate::runtime::rawmutex::RawMutex;
65
66// ---------------------------------------------------------------------------
67// Cond
68// ---------------------------------------------------------------------------
69
70/// A goroutine-aware condition variable.
71///
72/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
73/// so other goroutines sharing the same M continue to be scheduled while
74/// waiters sleep.
75///
76/// Must be used from within a `#[go_lib::main]` context.
77pub struct Cond {
78 /// Spinlock protecting `waitq`. A `RawMutex` (not `std::sync::Mutex`) so
79 /// the goroutine `wait()` path can hold the queue lock ACROSS the park via
80 /// `gopark_commit` — released on g0 only after the waiter is `GWAITING`
81 /// (see the module-level "Commit-park protocol" note).
82 mu: RawMutex,
83 /// Queue of goroutines waiting on this condition — always accessed under
84 /// `mu`.
85 waitq: UnsafeCell<VecDeque<*mut G>>,
86}
87
88// SAFETY: *mut G pointers are only read under `mu` and are valid for the
89// lifetime of the process (goroutines are never freed, only recycled).
90unsafe impl Send for Cond {}
91unsafe impl Sync for Cond {}
92
93impl Cond {
94 /// Create a new `Cond`.
95 pub fn new() -> Self {
96 Self { mu: RawMutex::new(), waitq: UnsafeCell::new(VecDeque::new()) }
97 }
98
99 /// Release `guard`, park the current goroutine until notified, then
100 /// re-acquire the mutex and return the new guard.
101 ///
102 /// Spurious wakeups are possible; always re-check the predicate in a loop:
103 ///
104 /// ```no_run
105 /// # use std::sync::{Arc, Mutex};
106 /// # use go_lib::sync::Cond;
107 /// # let mu = Arc::new(Mutex::new(false));
108 /// # let cnd = Arc::new(Cond::new());
109 /// let mut guard = mu.lock().unwrap();
110 /// while !*guard {
111 /// guard = cnd.wait(&mu, guard);
112 /// }
113 /// ```
114 ///
115 /// # Panics
116 ///
117 /// Panics if called from outside a goroutine context (i.e. before `run()`).
118 pub fn wait<'a, T>(
119 &self,
120 mu: &'a Mutex<T>,
121 guard: std::sync::MutexGuard<'a, T>,
122 ) -> std::sync::MutexGuard<'a, T> {
123 let gp = current_g();
124 assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
125
126 // m_lock suppresses async preemption while we hold `mu` AND across the
127 // `gopark_commit` below — the increment is transferred to `park_fn`,
128 // which balances it on the same M (see gopark_commit's contract).
129 let _lk = crate::runtime::m::m_lock();
130 self.mu.lock();
131
132 // Enqueue ourselves under `mu` so that any concurrent
133 // notify_one / notify_all sees us in the queue. The queue lock stays
134 // held until park_fn has committed this goroutine to GWAITING
135 // (commit-park protocol), so a notifier cannot pop us and call
136 // goready until the park is guaranteed to find us parked.
137 // SAFETY: `mu` is held.
138 unsafe { (*self.waitq.get()).push_back(gp) };
139
140 // Release the user's mutex *before* parking (Go semantics). The
141 // rendezvous with notifiers is protected by `mu`, which is held across
142 // the park, so releasing the user lock here cannot lose a wakeup.
143 drop(guard);
144
145 // Transfer the m.locks increment to park_fn, then park. The queue
146 // lock (`mu`) is released on g0 by `unlock_cond_mutex` only after this
147 // goroutine is GWAITING.
148 std::mem::forget(_lk);
149 unsafe {
150 gopark_commit(
151 WaitReason::CondVar,
152 unlock_cond_mutex,
153 &self.mu as *const RawMutex as *mut u8,
154 );
155 }
156
157 // Woken — re-acquire the user's mutex.
158 mu.lock().unwrap()
159 }
160
161 /// Wake one waiting goroutine. No-op if there are no waiters.
162 pub fn notify_one(&self) {
163 // m_lock suppresses async preemption across the `mu` critical section
164 // (same rationale as WaitGroup::add); without it a SIGURG between
165 // acquiring and releasing the spinlock could deschedule us while the
166 // lock is held, deadlocking the next goroutine on this M.
167 let _lk = crate::runtime::m::m_lock();
168 // A Cond waiter is woken purely via `goready`, which touches only the
169 // `G` descriptor — never the waiter's stack.
170 self.mu.lock();
171 // SAFETY: `mu` is held.
172 let gp = unsafe { (*self.waitq.get()).pop_front() };
173 unsafe { self.mu.unlock() };
174 // Wake outside the lock so we don't hold the spinlock across the
175 // goready spin (which waits for GRUNNING → GWAITING).
176 if let Some(gp) = gp {
177 // SAFETY: gp is a valid goroutine pointer (see module safety comment).
178 unsafe { goready(gp) };
179 }
180 }
181
182 /// Wake all waiting goroutines.
183 pub fn notify_all(&self) {
184 let _lk = crate::runtime::m::m_lock();
185 // See `notify_one`: waiters are woken only via `goready`, which never
186 // touches a waiter's stack.
187 self.mu.lock();
188 // SAFETY: `mu` is held.
189 let waiters: Vec<*mut G> = unsafe { (*self.waitq.get()).drain(..).collect() };
190 unsafe { self.mu.unlock() };
191 // Wake outside the lock (see notify_one).
192 for gp in waiters {
193 // SAFETY: same as notify_one.
194 unsafe { goready(gp) };
195 }
196 }
197
198}
199
200impl Default for Cond {
201 fn default() -> Self { Self::new() }
202}
203
204/// `gopark_commit` unlock shim: release the `Cond`'s `RawMutex` from g0 after
205/// the parking goroutine has reached `GWAITING`.
206///
207/// # Safety
208/// `arg` must be the `&RawMutex` of a `Cond` whose queue lock is held by the
209/// parking goroutine.
210unsafe fn unlock_cond_mutex(arg: *mut u8) {
211 unsafe { (*(arg as *const RawMutex)).unlock() }
212}
213
214// ---------------------------------------------------------------------------
215// Tests
216// ---------------------------------------------------------------------------
217
218#[cfg(all(test, not(loom)))]
219mod tests {
220 use super::*;
221 use std::sync::{Arc, Mutex};
222 use std::sync::atomic::{AtomicI32, Ordering};
223
224 /// A single waiter is woken by notify_one.
225 #[test]
226 #[go_lib::main]
227 fn single_waiter_notify_one() {
228 let mu = Arc::new(Mutex::new(false));
229 let cnd = Arc::new(Cond::new());
230 let woke = Arc::new(AtomicI32::new(0));
231
232 let mu2 = Arc::clone(&mu);
233 let cnd2 = Arc::clone(&cnd);
234 let woke2 = Arc::clone(&woke);
235 let woke3 = Arc::clone(&woke);
236
237 // Spawn a waiter goroutine.
238 crate::runtime::sched::spawn_goroutine(move || {
239 let mut guard = mu2.lock().unwrap();
240 while !*guard {
241 guard = cnd2.wait(&mu2, guard);
242 }
243 woke2.fetch_add(1, Ordering::Relaxed);
244 });
245
246 // Yield so the waiter parks, then signal.
247 for _ in 0..20 { crate::gosched(); }
248 {
249 let mut g = mu.lock().unwrap();
250 *g = true;
251 }
252 cnd.notify_one();
253
254 // Spin-wait for the waiter to record the wakeup.
255 let deadline = std::time::Instant::now()
256 + std::time::Duration::from_millis(500);
257 loop {
258 if woke3.load(Ordering::Acquire) == 1 { break; }
259 assert!(std::time::Instant::now() < deadline, "waiter did not wake");
260 crate::gosched();
261 }
262
263 assert_eq!(woke.load(Ordering::Acquire), 1);
264 }
265
266 /// All waiters are woken by notify_all.
267 #[test]
268 #[go_lib::main]
269 fn multiple_waiters_notify_all() {
270 const N: i32 = 4;
271 let mu = Arc::new(Mutex::new(false));
272 let cnd = Arc::new(Cond::new());
273 let woke = Arc::new(AtomicI32::new(0));
274
275 for _ in 0..N {
276 let mu2 = Arc::clone(&mu);
277 let cnd2 = Arc::clone(&cnd);
278 let woke2 = Arc::clone(&woke);
279 crate::runtime::sched::spawn_goroutine(move || {
280 let mut guard = mu2.lock().unwrap();
281 while !*guard {
282 guard = cnd2.wait(&mu2, guard);
283 }
284 woke2.fetch_add(1, Ordering::Relaxed);
285 });
286 }
287
288 for _ in 0..40 { crate::gosched(); }
289 *mu.lock().unwrap() = true;
290 cnd.notify_all();
291
292 let deadline = std::time::Instant::now()
293 + std::time::Duration::from_millis(500);
294 loop {
295 if woke.load(Ordering::Acquire) == N { break; }
296 assert!(std::time::Instant::now() < deadline,
297 "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
298 crate::gosched();
299 }
300
301 assert_eq!(woke.load(Ordering::Acquire), N);
302 }
303
304 /// notify_one with no waiters is a no-op (must not panic or block).
305 #[test]
306 fn notify_one_no_waiters() {
307 let cnd = Cond::new();
308 cnd.notify_one(); // must return immediately
309 }
310
311 /// notify_all with no waiters is a no-op.
312 #[test]
313 fn notify_all_no_waiters() {
314 let cnd = Cond::new();
315 cnd.notify_all();
316 }
317}