go_lib/sync/cond.rs
1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2 = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//! let mut ready = mu2.lock().unwrap();
22//! *ready = true;
23//! drop(ready);
24//! cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//! let mut ready = mu.lock().unwrap();
30//! while !*ready {
31//! ready = cnd.wait(&mu, ready);
32//! }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines. `notify_one` / `notify_all` call `goready` on them.
40//!
41//! The GRUNNING → GWAITING race is handled by `goready`'s built-in spin loop:
42//! even if a notifier calls `goready` between the moment a goroutine pushes
43//! itself onto `waitq` and the moment `gopark` actually sets `GWAITING`, the
44//! spin safely serialises the two.
45
46use std::collections::VecDeque;
47use std::sync::Mutex;
48
49use crate::runtime::g::{current_g, WaitReason};
50use crate::runtime::park::{gopark, goready};
51use crate::runtime::g::G;
52
53// ---------------------------------------------------------------------------
54// Cond
55// ---------------------------------------------------------------------------
56
57/// A goroutine-aware condition variable.
58///
59/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
60/// so other goroutines sharing the same M continue to be scheduled while
61/// waiters sleep.
62///
63/// Must be used from within a [`go_lib::run`] context.
64pub struct Cond {
65 /// Queue of goroutines waiting on this condition.
66 waitq: Mutex<VecDeque<*mut G>>,
67}
68
69// SAFETY: *mut G pointers are only read under the waitq Mutex and are valid
70// for the lifetime of the process (goroutines are never freed, only recycled).
71unsafe impl Send for Cond {}
72unsafe impl Sync for Cond {}
73
74impl Cond {
75 /// Create a new `Cond`.
76 pub fn new() -> Self {
77 Self { waitq: Mutex::new(VecDeque::new()) }
78 }
79
80 /// Release `guard`, park the current goroutine until notified, then
81 /// re-acquire the mutex and return the new guard.
82 ///
83 /// Spurious wakeups are possible; always re-check the predicate in a loop:
84 ///
85 /// ```no_run
86 /// # use std::sync::{Arc, Mutex};
87 /// # use go_lib::sync::Cond;
88 /// # let mu = Arc::new(Mutex::new(false));
89 /// # let cnd = Arc::new(Cond::new());
90 /// let mut guard = mu.lock().unwrap();
91 /// while !*guard {
92 /// guard = cnd.wait(&mu, guard);
93 /// }
94 /// ```
95 ///
96 /// # Panics
97 ///
98 /// Panics if called from outside a goroutine context (i.e. before `run()`).
99 pub fn wait<'a, T>(
100 &self,
101 mu: &'a Mutex<T>,
102 guard: std::sync::MutexGuard<'a, T>,
103 ) -> std::sync::MutexGuard<'a, T> {
104 let gp = current_g();
105 assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
106
107 // Enqueue ourselves before releasing the mutex so that any concurrent
108 // notify_one / notify_all sees us in the queue.
109 self.waitq.lock().unwrap().push_back(gp);
110
111 // Release the user's mutex. After this point a notifier may call
112 // goready(gp); goready's spin loop handles the GRUNNING→GWAITING race.
113 drop(guard);
114
115 // Park until goready transitions us back to GRUNNABLE.
116 gopark(WaitReason::CondVar);
117
118 // Woken — re-acquire the user's mutex.
119 mu.lock().unwrap()
120 }
121
122 /// Wake one waiting goroutine. No-op if there are no waiters.
123 pub fn notify_one(&self) {
124 let gp = self.waitq.lock().unwrap().pop_front();
125 if let Some(gp) = gp {
126 // SAFETY: gp is a valid goroutine pointer (see module safety comment).
127 unsafe { goready(gp) };
128 }
129 }
130
131 /// Wake all waiting goroutines.
132 pub fn notify_all(&self) {
133 let waiters: Vec<*mut G> = self.waitq.lock().unwrap().drain(..).collect();
134 for gp in waiters {
135 // SAFETY: same as notify_one.
136 unsafe { goready(gp) };
137 }
138 }
139}
140
141impl Default for Cond {
142 fn default() -> Self { Self::new() }
143}
144
145// ---------------------------------------------------------------------------
146// Tests
147// ---------------------------------------------------------------------------
148
149#[cfg(all(test, not(loom)))]
150mod tests {
151 use super::*;
152 use crate::runtime::sched::run_impl;
153 use std::sync::{Arc, Mutex};
154 use std::sync::atomic::{AtomicI32, Ordering};
155
156 /// A single waiter is woken by notify_one.
157 #[test]
158 fn single_waiter_notify_one() {
159 let mu = Arc::new(Mutex::new(false));
160 let cnd = Arc::new(Cond::new());
161 let woke = Arc::new(AtomicI32::new(0));
162
163 let mu2 = Arc::clone(&mu);
164 let cnd2 = Arc::clone(&cnd);
165 let woke2 = Arc::clone(&woke);
166 let woke3 = Arc::clone(&woke);
167
168 run_impl(move || {
169 // Spawn a waiter goroutine.
170 crate::runtime::sched::spawn_goroutine(move || {
171 let mut guard = mu2.lock().unwrap();
172 while !*guard {
173 guard = cnd2.wait(&mu2, guard);
174 }
175 woke2.fetch_add(1, Ordering::Relaxed);
176 });
177
178 // Yield so the waiter parks, then signal.
179 for _ in 0..20 { crate::gosched(); }
180 {
181 let mut g = mu.lock().unwrap();
182 *g = true;
183 }
184 cnd.notify_one();
185
186 // Spin-wait for the waiter to record the wakeup.
187 let deadline = std::time::Instant::now()
188 + std::time::Duration::from_millis(500);
189 loop {
190 if woke3.load(Ordering::Acquire) == 1 { break; }
191 assert!(std::time::Instant::now() < deadline, "waiter did not wake");
192 crate::gosched();
193 }
194 });
195
196 assert_eq!(woke.load(Ordering::Acquire), 1);
197 }
198
199 /// All waiters are woken by notify_all.
200 #[test]
201 fn multiple_waiters_notify_all() {
202 const N: i32 = 4;
203 let mu = Arc::new(Mutex::new(false));
204 let cnd = Arc::new(Cond::new());
205 let woke = Arc::new(AtomicI32::new(0));
206
207 run_impl({
208 let mu = Arc::clone(&mu);
209 let cnd = Arc::clone(&cnd);
210 let woke = Arc::clone(&woke);
211 move || {
212 for _ in 0..N {
213 let mu2 = Arc::clone(&mu);
214 let cnd2 = Arc::clone(&cnd);
215 let woke2 = Arc::clone(&woke);
216 crate::runtime::sched::spawn_goroutine(move || {
217 let mut guard = mu2.lock().unwrap();
218 while !*guard {
219 guard = cnd2.wait(&mu2, guard);
220 }
221 woke2.fetch_add(1, Ordering::Relaxed);
222 });
223 }
224
225 for _ in 0..40 { crate::gosched(); }
226 *mu.lock().unwrap() = true;
227 cnd.notify_all();
228
229 let deadline = std::time::Instant::now()
230 + std::time::Duration::from_millis(500);
231 loop {
232 if woke.load(Ordering::Acquire) == N { break; }
233 assert!(std::time::Instant::now() < deadline,
234 "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
235 crate::gosched();
236 }
237 }
238 });
239
240 assert_eq!(woke.load(Ordering::Acquire), N);
241 }
242
243 /// notify_one with no waiters is a no-op (must not panic or block).
244 #[test]
245 fn notify_one_no_waiters() {
246 let cnd = Cond::new();
247 cnd.notify_one(); // must return immediately
248 }
249
250 /// notify_all with no waiters is a no-op.
251 #[test]
252 fn notify_all_no_waiters() {
253 let cnd = Cond::new();
254 cnd.notify_all();
255 }
256}