#![allow(non_camel_case_types, non_snake_case)]
use goish::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
test!{ fn TestChan_TryRecvOnEmpty(t) {
for chanCap in [0i64, 1, 2, 4, 8, 16] {
let c = chan!(i64, chanCap as usize);
let (_, ok) = c.TryRecv();
if ok {
t.Errorf(Sprintf!("chan[%d]: TryRecv on empty returned ok", chanCap));
}
}
}}
test!{ fn TestChan_TrySendOnFull(t) {
for chanCap in [1i64, 2, 4, 8] {
let c = chan!(i64, chanCap as usize);
for i in 0..chanCap { let _ = c.Send(i); }
let ok = c.TrySend(99);
if ok {
t.Errorf(Sprintf!("chan[%d]: TrySend on full returned ok", chanCap));
}
}
}}
test!{ fn TestChan_ReceiveZeroFromClosed(t) {
for chanCap in [1i64, 2, 4, 8, 16] {
let c = chan!(i64, chanCap as usize);
for i in 0..chanCap { let _ = c.Send(i); }
c.Close();
for i in 0..chanCap {
let (v, ok) = c.Recv();
if !ok {
t.Fatalf(Sprintf!("chan[%d]: drain recv #%d not ok", chanCap, i));
}
if v != i {
t.Errorf(Sprintf!("chan[%d]: received %v, expected %v", chanCap, v, i));
}
}
for _ in 0..3 {
let (v, ok) = c.Recv();
if ok || v != 0 {
t.Errorf(Sprintf!("chan[%d]: post-drain got (%v, %v); want (0, false)", chanCap, v, ok));
}
}
}
}}
test!{ fn TestChan_CloseUnblocksReceive(t) {
for chanCap in [0i64, 1, 2, 4, 8] {
let c = chan!(i64, chanCap as usize);
let done = chan!(bool, 1);
let cc = c.clone();
let dc = done.clone();
let g = go!{
let (v, ok) = cc.Recv();
let _ = dc.Send(v == 0 && !ok);
};
std::thread::sleep(std::time::Duration::from_millis(30));
c.Close();
let (got, _) = done.Recv();
if !got {
t.Fatalf(Sprintf!("chan[%d]: received non-zero from closed chan", chanCap));
}
let _ = g.Wait();
}
}}
test!{ fn TestChan_FIFO(t) {
for chanCap in [0i64, 1, 4, 16, 100] {
let c = chan!(i64, chanCap as usize);
let cp = c.clone();
let g = go!{
for i in 0..100 { let _ = cp.Send(i); }
};
for i in 0..100 {
let (v, ok) = c.Recv();
if !ok { t.Fatalf(Sprintf!("chan[%d]: receive failed at %d", chanCap, i)); }
if v != i {
t.Fatalf(Sprintf!("chan[%d]: received %v, expected %v", chanCap, v, i));
}
}
let _ = g.Wait();
}
}}
test!{ fn TestChan_MPMCBigFanout(t) {
const P: i32 = 4;
const L: i32 = 1000;
for &chanCap in &[0, 1, 4, 16, 100] {
let c = chan!(i32, chanCap);
let mut producers = Vec::new();
for _ in 0..P {
let cp = c.clone();
producers.push(go!{
for i in 0..L { let _ = cp.Send(i); }
});
}
let done = chan!(std::collections::HashMap<i32, i32>, P as usize);
let mut consumers = Vec::new();
for _ in 0..P {
let cc = c.clone();
let dc = done.clone();
consumers.push(go!{
let mut recv: std::collections::HashMap<i32, i32> = std::collections::HashMap::new();
for _ in 0..L {
let (v, _) = cc.Recv();
*recv.entry(v).or_insert(0) += 1;
}
let _ = dc.Send(recv);
});
}
let mut total: std::collections::HashMap<i32, i32> = std::collections::HashMap::new();
for _ in 0..P {
let (m, _) = done.Recv();
for (k, v) in m { *total.entry(k).or_insert(0) += v; }
}
for g in producers { let _ = g.Wait(); }
for g in consumers { let _ = g.Wait(); }
if total.len() as i32 != L {
t.Fatalf(Sprintf!("chan[cap=%d]: received %v distinct values, expected %v",
chanCap, total.len() as i32, L));
}
for (k, v) in &total {
if *v != P {
t.Fatalf(Sprintf!("chan[cap=%d]: key %v received %v times, expected %v",
chanCap, k, v, P));
}
}
}
}}
test!{ fn TestChan_LenCap(t) {
for chanCap in [1i64, 2, 4, 8, 16] {
let c = chan!(i64, chanCap as usize);
if c.Len() != 0 || c.Cap() != chanCap {
t.Fatalf(Sprintf!("chan[%d]: bad initial len/cap %v/%v", chanCap, c.Len(), c.Cap()));
}
for i in 0..chanCap { let _ = c.Send(i); }
if c.Len() != chanCap || c.Cap() != chanCap {
t.Fatalf(Sprintf!("chan[%d]: bad full len/cap %v/%v", chanCap, c.Len(), c.Cap()));
}
}
}}
test!{ fn TestChan_SendOnClosed(t) {
let c = chan!(i64, 1);
c.Close();
let err = c.Send(42);
if err == nil {
t.Error("send on closed channel should return error");
}
}}
test!{ fn TestChan_UnbufferedRendezvous(t) {
let c = chan!(i32);
let cp = c.clone();
let g = go!{ let _ = cp.Send(42); };
let (v, ok) = c.Recv();
let _ = g.Wait();
if !ok || v != 42 {
t.Errorf(Sprintf!("unbuffered recv: got (%d, %v); want (42, true)", v, ok));
}
}}
test!{ fn TestChan_CloseDrainsBuffered(t) {
let c = chan!(i64, 4);
let _ = c.Send(1);
let _ = c.Send(2);
let _ = c.Send(3);
c.Close();
for expect in [1i64, 2, 3] {
let (v, ok) = c.Recv();
if !ok { t.Fatalf(Sprintf!("expected ok recv of %d", expect)); }
if v != expect { t.Errorf(Sprintf!("recv got %d, want %d", v, expect)); }
}
let (v, ok) = c.Recv();
if ok { t.Errorf(Sprintf!("expected !ok after drain; got (%d, true)", v)); }
}}
test!{ fn TestChan_ManyGoroutines(t) {
const N: i32 = 200;
let c = chan!(i32, 16);
let total = Arc::new(AtomicI32::new(0));
let cc = c.clone();
let total_c = total.clone();
let consumer = go!{
for _ in 0..N {
let (v, _) = cc.Recv();
total_c.fetch_add(v, Ordering::SeqCst);
}
};
let mut producers = Vec::new();
for i in 0..N {
let cp = c.clone();
producers.push(go!{ let _ = cp.Send(i); });
}
for g in producers { let _ = g.Wait(); }
let _ = consumer.Wait();
let expected = (N - 1) * N / 2;
let got = total.load(Ordering::SeqCst);
if got != expected {
t.Errorf(Sprintf!("many goroutines sum: got %d, want %d", got, expected));
}
}}
test!{ fn TestChan_NonblockRecvRace(t) {
let n = 1000;
let errs = Arc::new(std::sync::atomic::AtomicU32::new(0));
for _ in 0..n {
let c = chan!(i32, 1);
let _ = c.Send(1);
let cc = c.clone();
let errs_c = errs.clone();
let g = go!{
goish::select!{
recv(cc) => {},
default => { errs_c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); },
}
};
c.Close();
let _ = c.Recv();
let _ = g.Wait();
}
let n_errs = errs.load(std::sync::atomic::Ordering::SeqCst);
if n_errs != 0 {
t.Errorf(Sprintf!("non-blocking recv raced in %v/%v iterations", n_errs as i32, n as i32));
}
}}
test!{ fn TestChan_EngineReport(t) {
t.Logf(Sprintf!("chan engine = %s", goish::chan::ENGINE));
}}