use std::collections::VecDeque;
use std::sync::Mutex;
use crate::runtime::g::{current_g, WaitReason};
use crate::runtime::park::{gopark, goready};
use crate::runtime::g::G;
pub struct Cond {
waitq: Mutex<VecDeque<*mut G>>,
}
unsafe impl Send for Cond {}
unsafe impl Sync for Cond {}
impl Cond {
pub fn new() -> Self {
Self { waitq: Mutex::new(VecDeque::new()) }
}
pub fn wait<'a, T>(
&self,
mu: &'a Mutex<T>,
guard: std::sync::MutexGuard<'a, T>,
) -> std::sync::MutexGuard<'a, T> {
let gp = current_g();
assert!(!gp.is_null(), "Cond::wait called outside a goroutine context");
self.waitq.lock().unwrap().push_back(gp);
drop(guard);
unsafe { gopark(WaitReason::CondVar) };
mu.lock().unwrap()
}
pub fn notify_one(&self) {
let gp = self.waitq.lock().unwrap().pop_front();
if let Some(gp) = gp {
unsafe { goready(gp) };
}
}
pub fn notify_all(&self) {
let waiters: Vec<*mut G> = self.waitq.lock().unwrap().drain(..).collect();
for gp in waiters {
unsafe { goready(gp) };
}
}
}
impl Default for Cond {
fn default() -> Self { Self::new() }
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::sched::run_impl;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicI32, Ordering};
#[test]
fn single_waiter_notify_one() {
let mu = Arc::new(Mutex::new(false));
let cnd = Arc::new(Cond::new());
let woke = Arc::new(AtomicI32::new(0));
let mu2 = Arc::clone(&mu);
let cnd2 = Arc::clone(&cnd);
let woke2 = Arc::clone(&woke);
let woke3 = Arc::clone(&woke);
run_impl(move || {
unsafe {
crate::runtime::sched::spawn_goroutine(move || {
let mut guard = mu2.lock().unwrap();
while !*guard {
guard = cnd2.wait(&mu2, guard);
}
woke2.fetch_add(1, Ordering::Relaxed);
});
}
for _ in 0..20 { crate::gosched(); }
{
let mut g = mu.lock().unwrap();
*g = true;
}
cnd.notify_one();
let deadline = std::time::Instant::now()
+ std::time::Duration::from_millis(500);
loop {
if woke3.load(Ordering::Acquire) == 1 { break; }
assert!(std::time::Instant::now() < deadline, "waiter did not wake");
crate::gosched();
}
});
assert_eq!(woke.load(Ordering::Acquire), 1);
}
#[test]
fn multiple_waiters_notify_all() {
const N: i32 = 4;
let mu = Arc::new(Mutex::new(false));
let cnd = Arc::new(Cond::new());
let woke = Arc::new(AtomicI32::new(0));
run_impl({
let mu = Arc::clone(&mu);
let cnd = Arc::clone(&cnd);
let woke = Arc::clone(&woke);
move || {
for _ in 0..N {
let mu2 = Arc::clone(&mu);
let cnd2 = Arc::clone(&cnd);
let woke2 = Arc::clone(&woke);
unsafe {
crate::runtime::sched::spawn_goroutine(move || {
let mut guard = mu2.lock().unwrap();
while !*guard {
guard = cnd2.wait(&mu2, guard);
}
woke2.fetch_add(1, Ordering::Relaxed);
});
}
}
for _ in 0..40 { crate::gosched(); }
*mu.lock().unwrap() = true;
cnd.notify_all();
let deadline = std::time::Instant::now()
+ std::time::Duration::from_millis(500);
loop {
if woke.load(Ordering::Acquire) == N { break; }
assert!(std::time::Instant::now() < deadline,
"not all waiters woke: {}/{N}", woke.load(Ordering::Relaxed));
crate::gosched();
}
}
});
assert_eq!(woke.load(Ordering::Acquire), N);
}
#[test]
fn notify_one_no_waiters() {
let cnd = Cond::new();
cnd.notify_one(); }
#[test]
fn notify_all_no_waiters() {
let cnd = Cond::new();
cnd.notify_all();
}
}