go_lib/sync/cond.rs
1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2 = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//! let mut ready = mu2.lock().unwrap();
22//! *ready = true;
23//! drop(ready);
24//! cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//! let mut ready = mu.lock().unwrap();
30//! while !*ready {
31//! ready = cnd.wait(&mu, ready);
32//! }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines. `notify_one` / `notify_all` call `goready` on them.
40//!
41//! The GRUNNING → GWAITING race is handled by `goready`'s built-in spin loop:
42//! even if a notifier calls `goready` between the moment a goroutine pushes
43//! itself onto `waitq` and the moment `gopark` actually sets `GWAITING`, the
44//! spin safely serialises the two.
45
46use std::collections::VecDeque;
47use std::sync::Mutex;
48
49use crate::runtime::g::{current_g, WaitReason};
50use crate::runtime::park::{gopark, goready};
51use crate::runtime::g::G;
52
53// ---------------------------------------------------------------------------
54// Cond
55// ---------------------------------------------------------------------------
56
57/// A goroutine-aware condition variable.
58///
59/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
60/// so other goroutines sharing the same M continue to be scheduled while
61/// waiters sleep.
62///
63/// Must be used from within a [`go_lib::run`] context.
64pub struct Cond {
65 /// Queue of goroutines waiting on this condition.
66 waitq: Mutex<VecDeque<*mut G>>,
67}
68
69// SAFETY: *mut G pointers are only read under the waitq Mutex and are valid
70// for the lifetime of the process (goroutines are never freed, only recycled).
71unsafe impl Send for Cond {}
72unsafe impl Sync for Cond {}
73
74impl Cond {
75 /// Create a new `Cond`.
76 pub fn new() -> Self {
77 Self { waitq: Mutex::new(VecDeque::new()) }
78 }
79
80 /// Release `guard`, park the current goroutine until notified, then
81 /// re-acquire the mutex and return the new guard.
82 ///
83 /// Spurious wakeups are possible; always re-check the predicate in a loop:
84 ///
85 /// ```no_run
86 /// # use std::sync::{Arc, Mutex};
87 /// # use go_lib::sync::Cond;
88 /// # let mu = Arc::new(Mutex::new(false));
89 /// # let cnd = Arc::new(Cond::new());
90 /// let mut guard = mu.lock().unwrap();
91 /// while !*guard {
92 /// guard = cnd.wait(&mu, guard);
93 /// }
94 /// ```
95 ///
96 /// # Panics
97 ///
98 /// Panics if called from outside a goroutine context (i.e. before `run()`).
99 pub fn wait<'a, T>(
100 &self,
101 mu: &'a Mutex<T>,
102 guard: std::sync::MutexGuard<'a, T>,
103 ) -> std::sync::MutexGuard<'a, T> {
104 let gp = current_g();
105 assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
106
107 // Enqueue ourselves before releasing the mutex so that any concurrent
108 // notify_one / notify_all sees us in the queue.
109 self.waitq.lock().unwrap().push_back(gp);
110
111 // Release the user's mutex. After this point a notifier may call
112 // goready(gp); goready's spin loop handles the GRUNNING→GWAITING race.
113 drop(guard);
114
115 // Park until goready transitions us back to GRUNNABLE.
116 // SAFETY: we are on a goroutine stack (asserted above).
117 unsafe { gopark(WaitReason::CondVar) };
118
119 // Woken — re-acquire the user's mutex.
120 mu.lock().unwrap()
121 }
122
123 /// Wake one waiting goroutine. No-op if there are no waiters.
124 pub fn notify_one(&self) {
125 let gp = self.waitq.lock().unwrap().pop_front();
126 if let Some(gp) = gp {
127 // SAFETY: gp is a valid goroutine pointer (see module safety comment).
128 unsafe { goready(gp) };
129 }
130 }
131
132 /// Wake all waiting goroutines.
133 pub fn notify_all(&self) {
134 let waiters: Vec<*mut G> = self.waitq.lock().unwrap().drain(..).collect();
135 for gp in waiters {
136 // SAFETY: same as notify_one.
137 unsafe { goready(gp) };
138 }
139 }
140}
141
142impl Default for Cond {
143 fn default() -> Self { Self::new() }
144}
145
146// ---------------------------------------------------------------------------
147// Tests
148// ---------------------------------------------------------------------------
149
150#[cfg(all(test, not(loom)))]
151mod tests {
152 use super::*;
153 use crate::runtime::sched::run_impl;
154 use std::sync::{Arc, Mutex};
155 use std::sync::atomic::{AtomicI32, Ordering};
156
157 /// A single waiter is woken by notify_one.
158 #[test]
159 fn single_waiter_notify_one() {
160 let mu = Arc::new(Mutex::new(false));
161 let cnd = Arc::new(Cond::new());
162 let woke = Arc::new(AtomicI32::new(0));
163
164 let mu2 = Arc::clone(&mu);
165 let cnd2 = Arc::clone(&cnd);
166 let woke2 = Arc::clone(&woke);
167 let woke3 = Arc::clone(&woke);
168
169 run_impl(move || {
170 // Spawn a waiter goroutine.
171 unsafe {
172 crate::runtime::sched::spawn_goroutine(move || {
173 let mut guard = mu2.lock().unwrap();
174 while !*guard {
175 guard = cnd2.wait(&mu2, guard);
176 }
177 woke2.fetch_add(1, Ordering::Relaxed);
178 });
179 }
180
181 // Yield so the waiter parks, then signal.
182 for _ in 0..20 { crate::gosched(); }
183 {
184 let mut g = mu.lock().unwrap();
185 *g = true;
186 }
187 cnd.notify_one();
188
189 // Spin-wait for the waiter to record the wakeup.
190 let deadline = std::time::Instant::now()
191 + std::time::Duration::from_millis(500);
192 loop {
193 if woke3.load(Ordering::Acquire) == 1 { break; }
194 assert!(std::time::Instant::now() < deadline, "waiter did not wake");
195 crate::gosched();
196 }
197 });
198
199 assert_eq!(woke.load(Ordering::Acquire), 1);
200 }
201
202 /// All waiters are woken by notify_all.
203 #[test]
204 fn multiple_waiters_notify_all() {
205 const N: i32 = 4;
206 let mu = Arc::new(Mutex::new(false));
207 let cnd = Arc::new(Cond::new());
208 let woke = Arc::new(AtomicI32::new(0));
209
210 run_impl({
211 let mu = Arc::clone(&mu);
212 let cnd = Arc::clone(&cnd);
213 let woke = Arc::clone(&woke);
214 move || {
215 for _ in 0..N {
216 let mu2 = Arc::clone(&mu);
217 let cnd2 = Arc::clone(&cnd);
218 let woke2 = Arc::clone(&woke);
219 unsafe {
220 crate::runtime::sched::spawn_goroutine(move || {
221 let mut guard = mu2.lock().unwrap();
222 while !*guard {
223 guard = cnd2.wait(&mu2, guard);
224 }
225 woke2.fetch_add(1, Ordering::Relaxed);
226 });
227 }
228 }
229
230 for _ in 0..40 { crate::gosched(); }
231 *mu.lock().unwrap() = true;
232 cnd.notify_all();
233
234 let deadline = std::time::Instant::now()
235 + std::time::Duration::from_millis(500);
236 loop {
237 if woke.load(Ordering::Acquire) == N { break; }
238 assert!(std::time::Instant::now() < deadline,
239 "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
240 crate::gosched();
241 }
242 }
243 });
244
245 assert_eq!(woke.load(Ordering::Acquire), N);
246 }
247
248 /// notify_one with no waiters is a no-op (must not panic or block).
249 #[test]
250 fn notify_one_no_waiters() {
251 let cnd = Cond::new();
252 cnd.notify_one(); // must return immediately
253 }
254
255 /// notify_all with no waiters is a no-op.
256 #[test]
257 fn notify_all_no_waiters() {
258 let cnd = Cond::new();
259 cnd.notify_all();
260 }
261}