Skip to main content

go_lib/sync/
cond.rs

1// SPDX-License-Identifier: Apache-2.0
2//! `Cond` — goroutine-aware condition variable.
3//!
4//! Mirrors Go's `sync.Cond`: goroutines waiting on a `Cond` are parked via
5//! the scheduler (`gopark`) rather than blocking an OS thread, so other
6//! goroutines sharing that M can continue to run.
7//!
8//! ## Usage
9//!
10//! ```no_run
11//! use std::sync::{Arc, Mutex};
12//! use go_lib::sync::Cond;
13//!
14//! let mu  = Arc::new(Mutex::new(false));
15//! let cnd = Arc::new(Cond::new());
16//!
17//! // Producer goroutine
18//! let mu2  = Arc::clone(&mu);
19//! let cnd2 = Arc::clone(&cnd);
20//! go_lib::run(move || {
21//!     let mut ready = mu2.lock().unwrap();
22//!     *ready = true;
23//!     drop(ready);
24//!     cnd2.notify_one();
25//! });
26//!
27//! // Consumer
28//! go_lib::run(move || {
29//!     let mut ready = mu.lock().unwrap();
30//!     while !*ready {
31//!         ready = cnd.wait(&mu, ready);
32//!     }
33//! });
34//! ```
35//!
36//! ## Implementation
37//!
38//! An internal wait queue (`waitq`) stores raw `*mut G` pointers of parked
39//! goroutines.  `notify_one` / `notify_all` call `goready` on them.
40//!
41//! The GRUNNING → GWAITING race is handled by `goready`'s built-in spin loop:
42//! even if a notifier calls `goready` between the moment a goroutine pushes
43//! itself onto `waitq` and the moment `gopark` actually sets `GWAITING`, the
44//! spin safely serialises the two.
45
46use std::collections::VecDeque;
47use std::sync::Mutex;
48
49use crate::runtime::g::{current_g, WaitReason};
50use crate::runtime::park::{gopark, goready};
51use crate::runtime::g::G;
52
53// ---------------------------------------------------------------------------
54// Cond
55// ---------------------------------------------------------------------------
56
57/// A goroutine-aware condition variable.
58///
59/// Like `std::sync::Condvar` but parks goroutines instead of OS threads,
60/// so other goroutines sharing the same M continue to be scheduled while
61/// waiters sleep.
62///
63/// Must be used from within a [`go_lib::run`] context.
64pub struct Cond {
65    /// Queue of goroutines waiting on this condition.
66    waitq: Mutex<VecDeque<*mut G>>,
67}
68
69// SAFETY: *mut G pointers are only read under the waitq Mutex and are valid
70// for the lifetime of the process (goroutines are never freed, only recycled).
71unsafe impl Send for Cond {}
72unsafe impl Sync for Cond {}
73
74impl Cond {
75    /// Create a new `Cond`.
76    pub fn new() -> Self {
77        Self { waitq: Mutex::new(VecDeque::new()) }
78    }
79
80    /// Release `guard`, park the current goroutine until notified, then
81    /// re-acquire the mutex and return the new guard.
82    ///
83    /// Spurious wakeups are possible; always re-check the predicate in a loop:
84    ///
85    /// ```no_run
86    /// # use std::sync::{Arc, Mutex};
87    /// # use go_lib::sync::Cond;
88    /// # let mu = Arc::new(Mutex::new(false));
89    /// # let cnd = Arc::new(Cond::new());
90    /// let mut guard = mu.lock().unwrap();
91    /// while !*guard {
92    ///     guard = cnd.wait(&mu, guard);
93    /// }
94    /// ```
95    ///
96    /// # Panics
97    ///
98    /// Panics if called from outside a goroutine context (i.e. before `run()`).
99    pub fn wait<'a, T>(
100        &self,
101        mu:   &'a Mutex<T>,
102        guard: std::sync::MutexGuard<'a, T>,
103    ) -> std::sync::MutexGuard<'a, T> {
104        let gp = current_g();
105        assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
106
107        // Enqueue ourselves before releasing the mutex so that any concurrent
108        // notify_one / notify_all sees us in the queue.
109        self.waitq.lock().unwrap().push_back(gp);
110
111        // Release the user's mutex.  After this point a notifier may call
112        // goready(gp); goready's spin loop handles the GRUNNING→GWAITING race.
113        drop(guard);
114
115        // Park until goready transitions us back to GRUNNABLE.
116        gopark(WaitReason::CondVar);
117
118        // Woken — re-acquire the user's mutex.
119        mu.lock().unwrap()
120    }
121
122    /// Wake one waiting goroutine.  No-op if there are no waiters.
123    pub fn notify_one(&self) {
124        let gp = self.waitq.lock().unwrap().pop_front();
125        if let Some(gp) = gp {
126            // SAFETY: gp is a valid goroutine pointer (see module safety comment).
127            unsafe { goready(gp) };
128        }
129    }
130
131    /// Wake all waiting goroutines.
132    pub fn notify_all(&self) {
133        let waiters: Vec<*mut G> = self.waitq.lock().unwrap().drain(..).collect();
134        for gp in waiters {
135            // SAFETY: same as notify_one.
136            unsafe { goready(gp) };
137        }
138    }
139}
140
141impl Default for Cond {
142    fn default() -> Self { Self::new() }
143}
144
145// ---------------------------------------------------------------------------
146// Tests
147// ---------------------------------------------------------------------------
148
149#[cfg(all(test, not(loom)))]
150mod tests {
151    use super::*;
152    use crate::runtime::sched::run_impl;
153    use std::sync::{Arc, Mutex};
154    use std::sync::atomic::{AtomicI32, Ordering};
155
156    /// A single waiter is woken by notify_one.
157    #[test]
158    fn single_waiter_notify_one() {
159        let mu   = Arc::new(Mutex::new(false));
160        let cnd  = Arc::new(Cond::new());
161        let woke = Arc::new(AtomicI32::new(0));
162
163        let mu2   = Arc::clone(&mu);
164        let cnd2  = Arc::clone(&cnd);
165        let woke2 = Arc::clone(&woke);
166        let woke3 = Arc::clone(&woke);
167
168        run_impl(move || {
169            // Spawn a waiter goroutine.
170            crate::runtime::sched::spawn_goroutine(move || {
171                let mut guard = mu2.lock().unwrap();
172                while !*guard {
173                    guard = cnd2.wait(&mu2, guard);
174                }
175                woke2.fetch_add(1, Ordering::Relaxed);
176            });
177
178            // Yield so the waiter parks, then signal.
179            for _ in 0..20 { crate::gosched(); }
180            {
181                let mut g = mu.lock().unwrap();
182                *g = true;
183            }
184            cnd.notify_one();
185
186            // Spin-wait for the waiter to record the wakeup.
187            let deadline = std::time::Instant::now()
188                + std::time::Duration::from_millis(500);
189            loop {
190                if woke3.load(Ordering::Acquire) == 1 { break; }
191                assert!(std::time::Instant::now() < deadline, "waiter did not wake");
192                crate::gosched();
193            }
194        });
195
196        assert_eq!(woke.load(Ordering::Acquire), 1);
197    }
198
199    /// All waiters are woken by notify_all.
200    #[test]
201    fn multiple_waiters_notify_all() {
202        const N: i32 = 4;
203        let mu   = Arc::new(Mutex::new(false));
204        let cnd  = Arc::new(Cond::new());
205        let woke = Arc::new(AtomicI32::new(0));
206
207        run_impl({
208            let mu   = Arc::clone(&mu);
209            let cnd  = Arc::clone(&cnd);
210            let woke = Arc::clone(&woke);
211            move || {
212                for _ in 0..N {
213                    let mu2   = Arc::clone(&mu);
214                    let cnd2  = Arc::clone(&cnd);
215                    let woke2 = Arc::clone(&woke);
216                    crate::runtime::sched::spawn_goroutine(move || {
217                        let mut guard = mu2.lock().unwrap();
218                        while !*guard {
219                            guard = cnd2.wait(&mu2, guard);
220                        }
221                        woke2.fetch_add(1, Ordering::Relaxed);
222                    });
223                }
224
225                for _ in 0..40 { crate::gosched(); }
226                *mu.lock().unwrap() = true;
227                cnd.notify_all();
228
229                let deadline = std::time::Instant::now()
230                    + std::time::Duration::from_millis(500);
231                loop {
232                    if woke.load(Ordering::Acquire) == N { break; }
233                    assert!(std::time::Instant::now() < deadline,
234                        "not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
235                    crate::gosched();
236                }
237            }
238        });
239
240        assert_eq!(woke.load(Ordering::Acquire), N);
241    }
242
243    /// notify_one with no waiters is a no-op (must not panic or block).
244    #[test]
245    fn notify_one_no_waiters() {
246        let cnd = Cond::new();
247        cnd.notify_one(); // must return immediately
248    }
249
250    /// notify_all with no waiters is a no-op.
251    #[test]
252    fn notify_all_no_waiters() {
253        let cnd = Cond::new();
254        cnd.notify_all();
255    }
256}