use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::Arc;
use crate::runtime::g::{current_g, WaitReason};
use crate::runtime::park::{gopark, goready};
use crate::runtime::rawmutex::{LockGuard, RawMutex};
use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog, WaitQ};
pub(crate) struct HchanState<T> {
pub(crate) buf: VecDeque<T>,
pub(crate) cap: usize,
pub(crate) closed: bool,
pub(crate) sendq: WaitQ,
pub(crate) recvq: WaitQ,
}
impl<T> HchanState<T> {
fn new(cap: usize) -> Self {
Self {
buf: VecDeque::with_capacity(cap),
cap,
closed: false,
sendq: WaitQ::new(),
recvq: WaitQ::new(),
}
}
}
pub(crate) struct Hchan<T> {
pub(crate) mutex: RawMutex,
pub(crate) state: UnsafeCell<HchanState<T>>,
}
unsafe impl<T: Send> Send for Hchan<T> {}
unsafe impl<T: Send> Sync for Hchan<T> {}
impl<T> Hchan<T> {
pub(crate) fn new(cap: usize) -> Self {
Self {
mutex: RawMutex::new(),
state: UnsafeCell::new(HchanState::new(cap)),
}
}
#[allow(clippy::mut_from_ref)] pub(crate) unsafe fn lock_state(&self) -> (LockGuard<'_>, &mut HchanState<T>) {
let g = LockGuard::new(&self.mutex);
let s = unsafe { &mut *self.state.get() };
(g, s)
}
}
pub struct Sender<T>(Arc<Hchan<T>>);
pub struct Receiver<T>(Arc<Hchan<T>>);
impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Sender(Arc::clone(&self.0)) } }
impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { Receiver(Arc::clone(&self.0)) } }
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
pub fn chan<T: Send + 'static>(cap: usize) -> (Sender<T>, Receiver<T>) {
let h = Arc::new(Hchan::new(cap));
(Sender(Arc::clone(&h)), Receiver(h))
}
impl<T: Send + 'static> Sender<T> {
pub fn send(&self, val: T) {
unsafe { chansend(&self.0, val, true) };
}
pub fn try_send(&self, val: T) -> bool {
unsafe { chansend(&self.0, val, false) }
}
pub fn close(&self) {
unsafe { closechan(&self.0) };
}
pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
}
impl<T: Send + 'static> Receiver<T> {
pub fn recv(&self) -> Option<T> {
unsafe { chanrecv(&self.0, true) }
}
pub fn try_recv(&self) -> Option<Option<T>> {
unsafe { chanrecv_nb(&self.0) }
}
pub(crate) fn hchan(&self) -> &Arc<Hchan<T>> { &self.0 }
}
pub(crate) unsafe fn chansend<T: Send + 'static>(
c: &Arc<Hchan<T>>,
val: T,
block: bool,
) -> bool {
let (_g, state) = unsafe { c.lock_state() };
if state.closed {
drop(_g);
panic!("send on closed channel");
}
let recv_sg = unsafe { state.recvq.dequeue() };
if !recv_sg.is_null() {
let gp = unsafe { (*recv_sg).g };
let elem_ptr = unsafe { (*recv_sg).elem as *mut MaybeUninit<T> };
if !elem_ptr.is_null() {
unsafe { (*elem_ptr).write(val) };
}
unsafe {
(*recv_sg).success = true;
(*gp).param = recv_sg as *mut u8;
}
drop(_g);
unsafe { goready(gp) };
return true;
}
if state.buf.len() < state.cap {
state.buf.push_back(val);
return true;
}
if !block {
return false;
}
let gp = current_g();
debug_assert!(!gp.is_null(), "chansend: called from g0");
let elem_ptr = Box::into_raw(Box::new(MaybeUninit::new(val))) as *mut u8;
let s = acquire_sudog();
unsafe {
(*s).g = gp;
(*s).elem = elem_ptr;
(*s).boxed_elem = true; (*s).success = false;
(*s).c = Arc::as_ptr(c) as *mut u8;
(*gp).param = ptr::null_mut();
state.sendq.enqueue(s);
}
drop(_g); unsafe { gopark(WaitReason::ChanSend) };
let ok = unsafe {
let s2 = (*gp).param as *mut Sudog;
(*gp).param = ptr::null_mut();
let ok = (*s2).success;
if !ok && !(*s2).elem.is_null() {
let ep = (*s2).elem as *mut MaybeUninit<T>;
(*s2).elem = ptr::null_mut();
(*ep).assume_init_drop();
if (*s2).boxed_elem { let _ = Box::from_raw(ep); }
}
(*s2).g = ptr::null_mut();
(*s2).c = ptr::null_mut();
release_sudog(s2);
ok
};
if !ok {
panic!("send on closed channel");
}
true
}
pub(crate) unsafe fn chanrecv<T: Send + 'static>(
c: &Arc<Hchan<T>>,
block: bool,
) -> Option<T> {
let (_g, state) = unsafe { c.lock_state() };
let send_sg = unsafe { state.sendq.dequeue() };
if !send_sg.is_null() {
let val = recv_from_sender(state, send_sg);
drop(_g);
return Some(val);
}
if !state.buf.is_empty() {
return Some(state.buf.pop_front().unwrap());
}
if state.closed {
return None;
}
if !block {
return None;
}
let gp = current_g();
debug_assert!(!gp.is_null(), "chanrecv: called from g0");
let elem_ptr = Box::into_raw(Box::new(MaybeUninit::<T>::uninit())) as *mut u8;
let s = acquire_sudog();
unsafe {
(*s).g = gp;
(*s).elem = elem_ptr;
(*s).boxed_elem = true; (*s).success = false;
(*s).c = Arc::as_ptr(c) as *mut u8;
(*gp).param = ptr::null_mut();
state.recvq.enqueue(s);
}
drop(_g);
unsafe { gopark(WaitReason::ChanReceive) };
unsafe {
let s2 = (*gp).param as *mut Sudog;
(*gp).param = ptr::null_mut();
let ok = (*s2).success;
let boxed = (*s2).boxed_elem;
let result = if ok {
debug_assert!(!(*s2).elem.is_null(), "chanrecv: success but elem is null");
let ep = (*s2).elem as *mut MaybeUninit<T>;
(*s2).elem = ptr::null_mut();
let val = (*ep).assume_init_read();
if boxed { let _ = Box::from_raw(ep); }
Some(val)
} else {
if !(*s2).elem.is_null() {
let ep = (*s2).elem as *mut MaybeUninit<T>;
(*s2).elem = ptr::null_mut();
if boxed { let _ = Box::from_raw(ep); } }
None
};
(*s2).g = ptr::null_mut();
(*s2).c = ptr::null_mut();
release_sudog(s2);
result
}
}
pub(crate) unsafe fn chanrecv_nb<T: Send + 'static>(
c: &Arc<Hchan<T>>,
) -> Option<Option<T>> {
let (_g, state) = unsafe { c.lock_state() };
let send_sg = unsafe { state.sendq.dequeue() };
if !send_sg.is_null() {
let val = recv_from_sender(state, send_sg);
drop(_g);
return Some(Some(val));
}
if !state.buf.is_empty() {
return Some(Some(state.buf.pop_front().unwrap()));
}
if state.closed {
return Some(None);
}
None
}
fn recv_from_sender<T: Send + 'static>(
state: &mut HchanState<T>,
send_sg: *mut Sudog,
) -> T {
let gp = unsafe { (*send_sg).g };
let boxed = unsafe { (*send_sg).boxed_elem };
let val = if state.cap == 0 {
let ep = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
let v = unsafe { (*ep).assume_init_read() };
unsafe {
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
}
v
} else {
let head = state.buf.pop_front().unwrap();
let ep = unsafe { (*send_sg).elem as *mut MaybeUninit<T> };
let sv = unsafe { (*ep).assume_init_read() };
unsafe {
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
}
state.buf.push_back(sv);
head
};
unsafe {
(*send_sg).success = true;
(*gp).param = send_sg as *mut u8;
}
unsafe { goready(gp) };
val
}
pub(crate) unsafe fn closechan<T: Send + 'static>(c: &Arc<Hchan<T>>) {
let (_g, state) = unsafe { c.lock_state() };
if state.closed {
drop(_g);
panic!("close of closed channel");
}
state.closed = true;
let mut wakeup: Vec<*mut crate::runtime::g::G> = Vec::new();
loop {
let sg = unsafe { state.recvq.dequeue() };
if sg.is_null() { break; }
let gp = unsafe { (*sg).g };
unsafe {
(*sg).success = false;
(*gp).param = sg as *mut u8;
}
wakeup.push(gp);
}
loop {
let sg = unsafe { state.sendq.dequeue() };
if sg.is_null() { break; }
let gp = unsafe { (*sg).g };
unsafe {
(*sg).success = false;
(*gp).param = sg as *mut u8;
}
wakeup.push(gp);
}
drop(_g);
for gp in wakeup {
unsafe { goready(gp) };
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::sched::run_impl;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
#[test]
fn buffered_send_recv() {
run_impl(|| {
let (tx, rx) = chan::<i32>(1);
tx.send(42);
assert_eq!(rx.recv(), Some(42));
});
}
#[test]
fn buffered_fifo_order() {
run_impl(|| {
let (tx, rx) = chan::<i32>(4);
for i in 0..4_i32 { tx.send(i); }
for i in 0..4_i32 { assert_eq!(rx.recv(), Some(i)); }
});
}
#[test]
fn buffered_close_drains_then_none() {
run_impl(|| {
let (tx, rx) = chan::<i32>(2);
tx.send(1);
tx.send(2);
tx.close();
assert_eq!(rx.recv(), Some(1));
assert_eq!(rx.recv(), Some(2));
assert_eq!(rx.recv(), None);
assert_eq!(rx.recv(), None); });
}
#[test]
fn try_recv_empty() {
run_impl(|| {
let (_tx, rx) = chan::<i32>(4);
assert_eq!(rx.try_recv(), None);
});
}
#[test]
fn try_recv_closed_empty() {
run_impl(|| {
let (tx, rx) = chan::<i32>(4);
tx.close();
assert_eq!(rx.try_recv(), Some(None));
});
}
#[test]
fn try_send_full() {
run_impl(|| {
let (tx, _rx) = chan::<i32>(2);
assert!(tx.try_send(1));
assert!(tx.try_send(2));
assert!(!tx.try_send(3));
});
}
#[test]
#[should_panic(expected = "close of closed channel")]
fn close_twice_panics() {
let (tx, _rx) = chan::<i32>(1);
tx.close();
tx.close();
}
#[test]
#[should_panic(expected = "send on closed channel")]
fn send_on_closed_panics() {
let (tx, _rx) = chan::<i32>(1);
tx.close();
tx.send(1);
}
#[test]
fn unbuffered_rendezvous() {
use crate::runtime::sched::spawn_goroutine;
run_impl(|| {
let (tx, rx) = chan::<i32>(0);
unsafe {
spawn_goroutine(move || { tx.send(99); });
}
assert_eq!(rx.recv(), Some(99));
});
}
#[test]
fn unbuffered_ping_pong() {
use crate::runtime::sched::spawn_goroutine;
run_impl(|| {
let (ping_tx, ping_rx) = chan::<i32>(0);
let (pong_tx, pong_rx) = chan::<i32>(0);
unsafe {
spawn_goroutine(move || {
for _ in 0..10 {
let v = ping_rx.recv().unwrap();
pong_tx.send(v + 1);
}
});
}
let mut n = 0_i32;
for _ in 0..10 {
ping_tx.send(n);
n = pong_rx.recv().unwrap();
}
assert_eq!(n, 10);
});
}
#[test]
fn producer_consumer() {
use crate::runtime::sched::spawn_goroutine;
const N: i32 = 20;
let sum = Arc::new(AtomicI32::new(0));
let sum2 = Arc::clone(&sum);
run_impl(move || {
let (tx, rx) = chan::<i32>(4);
let sum3 = Arc::clone(&sum2);
unsafe {
spawn_goroutine(move || {
for i in 0..N { tx.send(i); }
tx.close();
});
}
unsafe {
spawn_goroutine(move || {
while let Some(v) = rx.recv() {
sum3.fetch_add(v, Ordering::Relaxed);
}
});
}
for _ in 0..500 { crate::gosched(); }
});
assert_eq!(sum.load(Ordering::Acquire), N * (N - 1) / 2);
}
#[test]
fn close_wakes_blocked_receiver() {
use crate::runtime::sched::spawn_goroutine;
let got_none = Arc::new(AtomicI32::new(0));
let got2 = Arc::clone(&got_none);
run_impl(move || {
let (tx, rx) = chan::<i32>(0);
unsafe {
spawn_goroutine(move || {
if rx.recv().is_none() {
got2.fetch_add(1, Ordering::Relaxed);
}
});
}
for _ in 0..20 { crate::gosched(); }
tx.close();
for _ in 0..20 { crate::gosched(); }
});
assert_eq!(got_none.load(Ordering::Acquire), 1);
}
}