use std::sync::{
atomic::{AtomicI32, AtomicI64, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use go_lib::{
chan::chan,
go,
select,
sync::WaitGroup,
};
#[test]
fn hello_goroutine() {
let (tx, rx) = chan::<&'static str>(0);
go_lib::run(move || {
go!(move || { tx.send("hello"); });
assert_eq!(rx.recv(), Some("hello"));
});
}
#[test]
fn fan_out() {
const N: i32 = 8;
let sum = Arc::new(AtomicI32::new(0));
let sum2 = Arc::clone(&sum);
go_lib::run(move || {
let (tx, rx) = chan::<i32>(N as usize);
for i in 1..=N {
let tx = tx.clone();
go!(move || { tx.send(i); });
}
let mut total = 0_i32;
for _ in 0..N {
total += rx.recv().unwrap();
}
sum2.store(total, Ordering::Relaxed);
});
assert_eq!(sum.load(Ordering::Acquire), N * (N + 1) / 2);
}
#[test]
fn pipeline_three_stage() {
let result = Arc::new(AtomicI64::new(0));
let result2 = Arc::clone(&result);
go_lib::run(move || {
let (gen_tx, gen_rx) = chan::<i64>(0);
go!(move || {
for i in 1_i64..=5 {
gen_tx.send(i);
}
gen_tx.close();
});
let (sq_tx, sq_rx) = chan::<i64>(0);
go!(move || {
loop {
match gen_rx.recv() {
Some(v) => sq_tx.send(v * v),
None => { sq_tx.close(); break; }
}
}
});
let mut sum = 0_i64;
while let Some(v) = sq_rx.recv() {
sum += v;
}
result2.store(sum, Ordering::Relaxed);
});
assert_eq!(result.load(Ordering::Acquire), 55);
}
#[test]
fn waitgroup_fan_out() {
const N: i32 = 16;
let counter = Arc::new(AtomicI32::new(0));
let counter2 = Arc::clone(&counter);
go_lib::run(move || {
let wg = Arc::new(WaitGroup::new());
for _ in 0..N {
wg.add(1);
let wg2 = Arc::clone(&wg);
let c = Arc::clone(&counter2);
go!(move || {
c.fetch_add(1, Ordering::Relaxed);
wg2.done();
});
}
wg.wait();
assert_eq!(counter2.load(Ordering::Acquire), N);
});
assert_eq!(counter.load(Ordering::Acquire), N);
}
#[test]
fn ping_pong() {
let hops = Arc::new(AtomicI32::new(0));
let hops2 = Arc::clone(&hops);
go_lib::run(move || {
let (a_tx, a_rx) = chan::<i32>(0);
let (b_tx, b_rx) = chan::<i32>(0);
let a_tx_b = a_tx.clone();
go!(move || {
while let Some(v) = a_rx.recv() {
b_tx.send(v + 1);
}
});
let h = Arc::clone(&hops2);
go!(move || {
while let Some(v) = b_rx.recv() {
h.fetch_add(1, Ordering::Relaxed);
if v < 20 {
a_tx_b.send(v + 1);
} else {
a_tx_b.close();
}
}
});
a_tx.send(0);
let deadline = Instant::now() + Duration::from_millis(500);
loop {
if hops2.load(Ordering::Acquire) >= 10 { break; }
if Instant::now() > deadline { panic!("ping-pong timed out"); }
go_lib::gosched();
}
});
assert!(hops.load(Ordering::Acquire) >= 10);
}
#[test]
fn select_fan_in() {
let sum = Arc::new(AtomicI32::new(0));
let sum2 = Arc::clone(&sum);
go_lib::run(move || {
let (tx1, rx1) = chan::<i32>(0);
let (tx2, rx2) = chan::<i32>(0);
go!(move || { tx1.send(10); });
go!(move || { tx2.send(20); });
let mut total = 0_i32;
for _ in 0..2 {
select! {
recv(rx1) -> v => { total += v.unwrap(); }
recv(rx2) -> v => { total += v.unwrap(); }
}
}
sum2.store(total, Ordering::Relaxed);
});
assert_eq!(sum.load(Ordering::Acquire), 30);
}
#[test]
fn done_channel_cancels_goroutine() {
let ticks = Arc::new(AtomicI32::new(0));
let ticks2 = Arc::clone(&ticks); let ticks3 = Arc::clone(&ticks);
go_lib::run(move || {
let (done_tx, done_rx) = chan::<()>(0);
let (tick_tx, tick_rx) = chan::<()>(4);
go!(move || {
loop {
select! {
recv(done_rx) -> _v => { break; }
recv(tick_rx) -> _v => { ticks2.fetch_add(1, Ordering::Relaxed); }
default => { go_lib::gosched(); }
}
}
});
for _ in 0..3 { tick_tx.send(()); }
let deadline = Instant::now() + Duration::from_millis(500);
loop {
if ticks3.load(Ordering::Acquire) >= 3 { break; }
if Instant::now() > deadline { panic!("ticks not all processed in time"); }
go_lib::gosched();
}
done_tx.send(());
for _ in 0..50 { go_lib::gosched(); }
});
assert_eq!(ticks.load(Ordering::Acquire), 3);
}
#[test]
fn buffered_never_blocks_sender() {
const N: usize = 64;
let received = Arc::new(AtomicI32::new(0));
let received2 = Arc::clone(&received);
go_lib::run(move || {
let wg = Arc::new(WaitGroup::new());
let (tx, rx) = chan::<i32>(N);
for i in 0..N as i32 { tx.send(i); }
wg.add(1);
let wg2 = Arc::clone(&wg);
go!(move || {
let mut count = 0_i32;
for _ in 0..N { rx.recv(); count += 1; }
received2.store(count, Ordering::Relaxed);
wg2.done();
});
wg.wait();
});
assert_eq!(received.load(Ordering::Acquire), N as i32);
}
#[test]
fn sleep_completes() {
let elapsed_ms = Arc::new(AtomicI64::new(-1));
let e2 = Arc::clone(&elapsed_ms);
let elapsed_ms3 = Arc::clone(&elapsed_ms);
go_lib::run(move || {
let wg = Arc::new(WaitGroup::new());
let wg2 = Arc::clone(&wg);
wg.add(1);
go!(move || {
let t0 = Instant::now();
go_lib::sleep(Duration::from_millis(10));
e2.store(t0.elapsed().as_millis() as i64, Ordering::Relaxed);
wg2.done();
});
wg.wait(); });
let ms = elapsed_ms3.load(Ordering::Acquire);
assert!(ms >= 8, "slept too short: {ms} ms"); }
#[test]
fn select_send_drops_on_full_buffer() {
let dropped = Arc::new(AtomicI32::new(0));
let d2 = Arc::clone(&dropped);
go_lib::run(move || {
let (tx, rx) = chan::<String>(1);
tx.send("first".to_string());
let val = "second".to_string();
select! {
send(tx, val) => { panic!("buffer was full, should have taken default"); }
default => { d2.store(1, Ordering::Relaxed); }
}
assert_eq!(rx.recv().unwrap(), "first");
});
assert_eq!(dropped.load(Ordering::Acquire), 1);
}
#[test]
fn waitgroup_reuse() {
const ROUNDS: i32 = 3;
const WORKERS: i32 = 4;
let total = Arc::new(AtomicI32::new(0));
let total2 = Arc::clone(&total);
go_lib::run(move || {
let wg = Arc::new(WaitGroup::new());
for _round in 0..ROUNDS {
for _ in 0..WORKERS {
wg.add(1);
let wg2 = Arc::clone(&wg);
let t = Arc::clone(&total2);
go!(move || {
t.fetch_add(1, Ordering::Relaxed);
wg2.done();
});
}
wg.wait();
}
});
assert_eq!(total.load(Ordering::Acquire), ROUNDS * WORKERS);
}
#[test]
fn with_syscall_unblocks_scheduler() {
let other_ran = Arc::new(AtomicI32::new(0));
let other2 = Arc::clone(&other_ran);
go_lib::run(move || {
go!(move || { other2.store(1, Ordering::Release); });
go_lib::with_syscall(|| {
std::thread::sleep(Duration::from_millis(5));
});
for _ in 0..100 { go_lib::gosched(); }
assert_eq!(
other_ran.load(Ordering::Acquire),
1,
"spawned goroutine should have run during with_syscall"
);
});
}