use std::mem::{ManuallyDrop, MaybeUninit};
use std::ptr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::chan::{Hchan, Receiver, Sender};
use crate::runtime::g::{current_g, G, WaitReason};
use crate::runtime::park::{gopark, goready};
use crate::runtime::sudog::{acquire_sudog, release_sudog, Sudog};
pub const CASE_DEFAULT: usize = usize::MAX;
#[derive(Debug)]
pub(crate) enum TryResult {
NotReady,
Done { ok: bool },
Handoff { gp: *mut G, ok: bool },
ClosedSend,
}
unsafe impl Send for TryResult {}
#[doc(hidden)]
pub struct SCase {
pub(crate) chan_ptr: *const (),
pub(crate) sg: *mut Sudog,
pub(crate) elem: *mut u8,
pub(crate) lock_fn: unsafe fn(*const ()),
pub(crate) unlock_fn: unsafe fn(*const ()),
pub(crate) try_fn: unsafe fn(*const (), *mut u8) -> TryResult,
pub(crate) enqueue_fn: unsafe fn(*const (), *mut Sudog),
pub(crate) dequeue_fn: unsafe fn(*const (), *mut Sudog),
}
unsafe impl Send for SCase {}
struct Lehmer(u64);
impl Lehmer {
fn from_goid() -> Self {
let goid = unsafe {
let gp = current_g();
if gp.is_null() { 1 } else { (*gp).goid | 1 }
};
Lehmer(goid | 1) }
fn next_usize(&mut self, n: usize) -> usize {
self.0 = self.0.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
((self.0 >> 33) as usize) % n
}
}
#[doc(hidden)]
pub unsafe fn selectgo(cases: &mut [SCase], has_default: bool) -> (usize, bool) {
let n = cases.len();
let mut pollorder: Vec<usize> = (0..n).collect();
let mut rng = Lehmer::from_goid();
for i in (1..n).rev() {
let j = rng.next_usize(i + 1);
pollorder.swap(i, j);
}
let mut lockorder: Vec<usize> = (0..n).collect();
lockorder.sort_by_key(|&i| cases[i].chan_ptr as usize);
lockorder.dedup_by_key(|&mut i| cases[i].chan_ptr as usize);
for &i in &lockorder {
unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
}
for &i in &pollorder {
let result = unsafe { (cases[i].try_fn)(cases[i].chan_ptr, cases[i].elem) };
match result {
TryResult::NotReady => continue,
TryResult::Done { ok } => {
for &j in &lockorder {
unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
}
return (i, ok);
}
TryResult::Handoff { gp, ok } => {
for &j in &lockorder {
unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
}
unsafe { goready(gp) };
return (i, ok);
}
TryResult::ClosedSend => {
for &j in &lockorder {
unsafe { (cases[j].unlock_fn)(cases[j].chan_ptr) };
}
panic!("send on closed channel");
}
}
}
if has_default {
for &i in &lockorder {
unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
}
return (CASE_DEFAULT, false);
}
let gp = current_g();
debug_assert!(!gp.is_null(), "selectgo: called from g0");
for case in cases.iter_mut() {
let sg = acquire_sudog();
unsafe {
(*sg).g = gp;
(*sg).elem = case.elem;
(*sg).is_select = true;
(*sg).success = false;
(*sg).c = case.chan_ptr as *mut u8;
}
case.sg = sg;
unsafe { (case.enqueue_fn)(case.chan_ptr, sg) };
}
unsafe { (*gp).selectdone.store(0, Ordering::Release) };
unsafe { (*gp).param = ptr::null_mut() };
for &i in &lockorder {
unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
}
unsafe { gopark(WaitReason::Select) };
let sg_winner = unsafe { (*gp).param as *mut Sudog };
unsafe { (*gp).param = ptr::null_mut() };
let ok = unsafe { (*sg_winner).success };
let winner = cases
.iter()
.position(|c| c.sg == sg_winner)
.expect("selectgo: winning sudog not found in cases");
for &i in &lockorder {
unsafe { (cases[i].lock_fn)(cases[i].chan_ptr) };
}
for (i, case) in cases.iter_mut().enumerate() {
if i == winner { continue; }
let sg = case.sg;
unsafe { (case.dequeue_fn)(case.chan_ptr, sg) };
}
for &i in &lockorder {
unsafe { (cases[i].unlock_fn)(cases[i].chan_ptr) };
}
for case in cases.iter_mut() {
let sg = case.sg;
case.sg = ptr::null_mut();
unsafe {
(*sg).g = ptr::null_mut();
(*sg).elem = ptr::null_mut();
(*sg).c = ptr::null_mut();
release_sudog(sg);
}
}
(winner, ok)
}
pub(crate) unsafe fn lock_chan<T>(p: *const ()) {
(*(p as *const Hchan<T>)).mutex.lock();
}
pub(crate) unsafe fn unlock_chan<T>(p: *const ()) {
(*(p as *const Hchan<T>)).mutex.unlock();
}
pub(crate) unsafe fn try_send_chan<T: Send + 'static>(
p: *const (),
elem: *mut u8,
) -> TryResult {
let hchan = &*(p as *const Hchan<T>);
let state = &mut *hchan.state.get();
if state.closed {
return TryResult::ClosedSend;
}
let recv_sg = state.recvq.dequeue();
if !recv_sg.is_null() {
let gp = (*recv_sg).g;
let ep = (*recv_sg).elem as *mut MaybeUninit<T>;
if !ep.is_null() {
(*ep).write(ptr::read(elem as *const T));
}
(*recv_sg).success = true;
(*gp).param = recv_sg as *mut u8;
return TryResult::Handoff { gp, ok: true };
}
if state.buf.len() < state.cap {
state.buf.push_back(ptr::read(elem as *const T));
return TryResult::Done { ok: true };
}
TryResult::NotReady
}
pub(crate) unsafe fn try_recv_chan<T: Send + 'static>(
p: *const (),
elem: *mut u8,
) -> TryResult {
let hchan = &*(p as *const Hchan<T>);
let state = &mut *hchan.state.get();
let send_sg = state.sendq.dequeue();
if !send_sg.is_null() {
let gp = (*send_sg).g;
let ep = (*send_sg).elem as *mut MaybeUninit<T>; let boxed = (*send_sg).boxed_elem;
let val = if state.cap == 0 {
let v = (*ep).assume_init_read();
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
v
} else {
let head = state.buf.pop_front().unwrap();
let sv = (*ep).assume_init_read();
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
state.buf.push_back(sv);
head
};
(*(elem as *mut MaybeUninit<T>)).write(val);
(*send_sg).success = true;
(*gp).param = send_sg as *mut u8;
return TryResult::Handoff { gp, ok: true };
}
if !state.buf.is_empty() {
let val = state.buf.pop_front().unwrap();
(*(elem as *mut MaybeUninit<T>)).write(val);
return TryResult::Done { ok: true };
}
if state.closed {
return TryResult::Done { ok: false };
}
TryResult::NotReady
}
pub(crate) unsafe fn enqueue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<T>);
(*hchan.state.get()).sendq.enqueue(sg);
}
pub(crate) unsafe fn enqueue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<T>);
(*hchan.state.get()).recvq.enqueue(sg);
}
pub(crate) unsafe fn dequeue_send_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<T>);
(*hchan.state.get()).sendq.dequeue_sudog(sg);
}
pub(crate) unsafe fn dequeue_recv_chan<T: Send + 'static>(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<T>);
(*hchan.state.get()).recvq.dequeue_sudog(sg);
}
#[doc(hidden)]
pub fn recv_case_of<T: Send + 'static>(rx: &Receiver<T>, slot: *mut MaybeUninit<T>) -> SCase {
SCase {
chan_ptr: Arc::as_ptr(rx.hchan()) as *const (),
sg: ptr::null_mut(),
elem: slot as *mut u8,
lock_fn: lock_chan::<T>,
unlock_fn: unlock_chan::<T>,
try_fn: try_recv_chan::<T>,
enqueue_fn: enqueue_recv_chan::<T>,
dequeue_fn: dequeue_recv_chan::<T>,
}
}
#[doc(hidden)]
pub fn send_case_of<T: Send + 'static>(tx: &Sender<T>, val: *mut ManuallyDrop<T>) -> SCase {
SCase {
chan_ptr: Arc::as_ptr(tx.hchan()) as *const (),
sg: ptr::null_mut(),
elem: val as *mut u8,
lock_fn: lock_chan::<T>,
unlock_fn: unlock_chan::<T>,
try_fn: try_send_chan::<T>,
enqueue_fn: enqueue_send_chan::<T>,
dequeue_fn: dequeue_send_chan::<T>,
}
}
#[cfg(all(test, not(loom)))]
#[allow(unused_unsafe)] mod tests {
use super::*;
use crate::chan::{chan, Hchan};
use crate::runtime::sudog::Sudog;
use crate::runtime::sched::run_impl;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
unsafe fn lock_i32(p: *const ()) {
(*(p as *const Hchan<i32>)).mutex.lock();
}
unsafe fn unlock_i32(p: *const ()) {
unsafe { (*(p as *const Hchan<i32>)).mutex.unlock() };
}
unsafe fn try_send_i32(p: *const (), elem: *mut u8) -> TryResult {
let hchan = &*(p as *const Hchan<i32>);
let state = &mut *hchan.state.get();
if state.closed {
return TryResult::ClosedSend;
}
let recv_sg = state.recvq.dequeue();
if !recv_sg.is_null() {
let gp = (*recv_sg).g;
let ep = (*recv_sg).elem as *mut MaybeUninit<i32>;
if !ep.is_null() {
(*ep).write(ptr::read(elem as *const i32));
}
(*recv_sg).success = true;
(*gp).param = recv_sg as *mut u8;
return TryResult::Handoff { gp, ok: true };
}
if state.buf.len() < state.cap {
state.buf.push_back(ptr::read(elem as *const i32));
return TryResult::Done { ok: true };
}
TryResult::NotReady
}
unsafe fn try_recv_i32(p: *const (), elem: *mut u8) -> TryResult {
let hchan = &*(p as *const Hchan<i32>);
let state = &mut *hchan.state.get();
let send_sg = state.sendq.dequeue();
if !send_sg.is_null() {
let gp = (*send_sg).g;
let ep = (*send_sg).elem as *mut MaybeUninit<i32>;
let boxed = (*send_sg).boxed_elem;
let val = if state.cap == 0 {
let v = (*ep).assume_init_read();
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
v
} else {
let head = state.buf.pop_front().unwrap();
let sv = (*ep).assume_init_read();
if boxed { let _ = Box::from_raw(ep); }
(*send_sg).elem = ptr::null_mut();
state.buf.push_back(sv);
head
};
(*(elem as *mut MaybeUninit<i32>)).write(val);
(*send_sg).success = true;
(*gp).param = send_sg as *mut u8;
return TryResult::Handoff { gp, ok: true };
}
if !state.buf.is_empty() {
let val = state.buf.pop_front().unwrap();
(*(elem as *mut MaybeUninit<i32>)).write(val);
return TryResult::Done { ok: true };
}
if state.closed {
(*(elem as *mut MaybeUninit<i32>)) = MaybeUninit::uninit();
return TryResult::Done { ok: false };
}
TryResult::NotReady
}
unsafe fn enqueue_send_i32(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<i32>);
(*hchan.state.get()).sendq.enqueue(sg);
}
unsafe fn enqueue_recv_i32(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<i32>);
(*hchan.state.get()).recvq.enqueue(sg);
}
unsafe fn dequeue_send_sg_i32(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<i32>);
(*hchan.state.get()).sendq.dequeue_sudog(sg);
}
unsafe fn dequeue_recv_sg_i32(p: *const (), sg: *mut Sudog) {
let hchan = &*(p as *const Hchan<i32>);
(*hchan.state.get()).recvq.dequeue_sudog(sg);
}
fn send_case(h: &Arc<Hchan<i32>>, val: &mut i32) -> SCase {
SCase {
chan_ptr: Arc::as_ptr(h) as *const (),
sg: ptr::null_mut(),
elem: val as *mut i32 as *mut u8,
lock_fn: lock_i32,
unlock_fn: unlock_i32,
try_fn: try_send_i32,
enqueue_fn: enqueue_send_i32,
dequeue_fn: dequeue_send_sg_i32,
}
}
fn recv_case(h: &Arc<Hchan<i32>>, slot: &mut MaybeUninit<i32>) -> SCase {
SCase {
chan_ptr: Arc::as_ptr(h) as *const (),
sg: ptr::null_mut(),
elem: slot as *mut MaybeUninit<i32> as *mut u8,
lock_fn: lock_i32,
unlock_fn: unlock_i32,
try_fn: try_recv_i32,
enqueue_fn: enqueue_recv_i32,
dequeue_fn: dequeue_recv_sg_i32,
}
}
#[test]
fn fast_recv_buffered() {
run_impl(|| {
let (tx, rx) = chan::<i32>(4);
tx.send(42);
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, true) };
assert_eq!(idx, 0, "should pick recv case");
assert!(ok, "should be ok (not closed)");
assert_eq!(unsafe { slot.assume_init() }, 42);
});
}
#[test]
fn fast_send_buffered() {
run_impl(|| {
let (tx, rx) = chan::<i32>(4);
let mut val = 99_i32;
let mut cases = [send_case(tx.hchan(), &mut val)];
let (idx, ok) = unsafe { selectgo(&mut cases, true) };
assert_eq!(idx, 0);
assert!(ok, "buffered send completes with ok=true");
assert_eq!(rx.recv(), Some(99));
});
}
#[test]
fn default_taken_when_not_ready() {
run_impl(|| {
let (_tx, rx) = chan::<i32>(0);
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, true) };
assert_eq!(idx, CASE_DEFAULT);
assert!(!ok);
});
}
#[test]
fn recv_closed_empty() {
run_impl(|| {
let (tx, rx) = chan::<i32>(0);
tx.close();
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, false) };
assert_eq!(idx, 0);
assert!(!ok, "recv from closed returns ok=false");
});
}
#[test]
fn multi_case_first_ready_wins() {
run_impl(|| {
let (tx1, rx1) = chan::<i32>(1);
let (_tx2, rx2) = chan::<i32>(1);
tx1.send(7);
let mut s1 = MaybeUninit::<i32>::uninit();
let mut s2 = MaybeUninit::<i32>::uninit();
let mut cases = [
recv_case(rx1.hchan(), &mut s1),
recv_case(rx2.hchan(), &mut s2),
];
let (idx, ok) = unsafe { selectgo(&mut cases, false) };
assert_eq!(idx, 0);
assert!(ok);
assert_eq!(unsafe { s1.assume_init() }, 7);
});
}
#[test]
fn blocking_recv_unblocked_by_send() {
use crate::runtime::sched::spawn_goroutine;
let result = Arc::new(AtomicI32::new(-1));
let result2 = Arc::clone(&result);
run_impl(move || {
let (tx, rx) = chan::<i32>(0);
unsafe {
spawn_goroutine(move || {
crate::gosched();
tx.send(55);
});
}
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, false) };
assert_eq!(idx, 0);
assert!(ok);
result2.store(unsafe { slot.assume_init() }, Ordering::Relaxed);
});
assert_eq!(result.load(Ordering::Acquire), 55);
}
#[test]
fn blocking_send_unblocked_by_recv() {
use crate::runtime::sched::spawn_goroutine;
run_impl(|| {
let (tx, rx) = chan::<i32>(0);
unsafe {
spawn_goroutine(move || {
crate::gosched();
let _ = rx.recv();
});
}
let mut val = 77_i32;
let mut cases = [send_case(tx.hchan(), &mut val)];
let (idx, _ok) = unsafe { selectgo(&mut cases, false) };
assert_eq!(idx, 0);
});
}
#[test]
fn select_race_one_winner() {
use crate::runtime::sched::spawn_goroutine;
let wins = Arc::new(AtomicI32::new(0));
let wins2 = Arc::clone(&wins);
let wins3 = Arc::clone(&wins);
run_impl(move || {
let (tx, rx) = chan::<i32>(1);
tx.send(1);
unsafe {
spawn_goroutine({
let wins = Arc::clone(&wins2);
let rx = rx.clone();
move || {
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, true) };
if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
}
});
}
unsafe {
spawn_goroutine({
let wins = Arc::clone(&wins3);
let rx = rx.clone();
move || {
let mut slot = MaybeUninit::<i32>::uninit();
let mut cases = [recv_case(rx.hchan(), &mut slot)];
let (idx, ok) = unsafe { selectgo(&mut cases, true) };
if idx == 0 && ok { wins.fetch_add(1, Ordering::Relaxed); }
}
});
}
for _ in 0..200 { crate::gosched(); }
});
assert_eq!(wins.load(Ordering::Acquire), 1);
}
}