use super::*;
use crate::scheduler::{spawn_strand, wait_all_strands};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
#[test]
fn test_make_channel() {
unsafe {
let stack = crate::stack::alloc_test_stack();
let stack = make_channel(stack);
let (_stack, value) = pop(stack);
assert!(matches!(value, Value::Channel(_)));
}
}
#[test]
fn test_send_receive() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_empty_stack, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (stack, send_success) = pop(stack);
assert_eq!(send_success, Value::Bool(true));
let mut stack = push(stack, channel_value);
stack = receive(stack);
let (stack, recv_success) = pop(stack);
let (_stack, received) = pop(stack);
assert_eq!(recv_success, Value::Bool(true));
assert_eq!(received, Value::Int(42));
}
}
#[test]
fn test_channel_dup_sharing() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, ch1) = pop(stack);
let ch2 = ch1.clone();
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
stack = push(stack, ch1);
stack = send(stack);
let (stack, _) = pop(stack);
let mut stack = push(stack, ch2);
stack = receive(stack);
let (stack, _) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(received, Value::Int(99));
}
}
#[test]
fn test_multiple_sends_receives() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
for i in 1..=5 {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (_, success) = pop(stack);
assert_eq!(success, Value::Bool(true));
}
for i in 1..=5 {
let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(received, Value::Int(i));
}
}
}
#[test]
fn test_close_channel() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let _stack = close_channel(stack);
}
}
#[test]
fn test_arena_string_send_between_strands() {
unsafe {
static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
static VERIFIED: AtomicBool = AtomicBool::new(false);
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let ch_ptr = match &channel_value {
Value::Channel(arc) => Arc::as_ptr(arc) as i64,
_ => panic!("Expected Channel"),
};
CHANNEL_PTR.store(ch_ptr, Ordering::Release);
std::mem::forget(channel_value.clone());
extern "C" fn sender(_stack: Stack) -> Stack {
use crate::seqstring::arena_string;
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let msg = arena_string("Arena message!");
assert!(!msg.is_global(), "Should be arena-allocated initially");
let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
let stack = push(stack, Value::Channel(channel_clone));
let stack = send(stack);
let (stack, _) = pop(stack);
stack
}
}
extern "C" fn receiver(_stack: Stack) -> Stack {
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(
crate::stack::alloc_test_stack(),
Value::Channel(channel_clone),
);
stack = receive(stack);
let (stack, _) = pop(stack);
let (_, msg_val) = pop(stack);
match msg_val {
Value::String(s) => {
assert_eq!(s.as_str(), "Arena message!");
assert!(s.is_global(), "Received string should be global");
VERIFIED.store(true, Ordering::Release);
}
_ => panic!("Expected String"),
}
std::ptr::null_mut()
}
}
spawn_strand(sender);
spawn_strand(receiver);
wait_all_strands();
assert!(
VERIFIED.load(Ordering::Acquire),
"Receiver should have verified"
);
}
}
#[test]
fn test_send_success() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (stack, result) = pop(stack);
assert_eq!(result, Value::Bool(true));
let mut stack = push(stack, channel_value);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(received, Value::Int(42));
}
}
#[test]
fn test_send_wrong_type() {
unsafe {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, Value::Int(999)); stack = send(stack);
let (_stack, result) = pop(stack);
assert_eq!(result, Value::Bool(false));
}
}
#[test]
fn test_receive_success() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (_, _) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_stack, value) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(value, Value::Int(42));
}
}
#[test]
fn test_receive_wrong_type() {
unsafe {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
stack = receive(stack);
let (stack, success) = pop(stack);
let (_stack, value) = pop(stack);
assert_eq!(success, Value::Bool(false));
assert_eq!(value, Value::Int(0));
}
}
#[test]
fn test_mpmc_concurrent_receivers() {
unsafe {
const NUM_MESSAGES: i64 = 100;
const NUM_RECEIVERS: usize = 4;
static RECEIVER_COUNTS: [AtomicI64; 4] = [
AtomicI64::new(0),
AtomicI64::new(0),
AtomicI64::new(0),
AtomicI64::new(0),
];
static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
for counter in &RECEIVER_COUNTS {
counter.store(0, Ordering::SeqCst);
}
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let ch_ptr = match &channel_value {
Value::Channel(arc) => Arc::as_ptr(arc) as i64,
_ => panic!("Expected Channel"),
};
CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
for _ in 0..(NUM_RECEIVERS + 1) {
std::mem::forget(channel_value.clone());
}
fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
match idx {
0 => receiver_0,
1 => receiver_1,
2 => receiver_2,
3 => receiver_3,
_ => panic!("Invalid receiver index"),
}
}
extern "C" fn receiver_0(stack: Stack) -> Stack {
receive_loop(0, stack)
}
extern "C" fn receiver_1(stack: Stack) -> Stack {
receive_loop(1, stack)
}
extern "C" fn receiver_2(stack: Stack) -> Stack {
receive_loop(2, stack)
}
extern "C" fn receiver_3(stack: Stack) -> Stack {
receive_loop(3, stack)
}
fn receive_loop(idx: usize, _stack: Stack) -> Stack {
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
loop {
let mut stack = push(
crate::stack::alloc_test_stack(),
Value::Channel(channel_clone.clone()),
);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, value) = pop(stack);
match (success, value) {
(Value::Bool(true), Value::Int(v)) => {
if v < 0 {
break; }
RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
}
_ => break,
}
may::coroutine::yield_now();
}
std::ptr::null_mut()
}
}
for i in 0..NUM_RECEIVERS {
spawn_strand(make_receiver(i));
}
std::thread::sleep(std::time::Duration::from_millis(10));
for i in 0..NUM_MESSAGES {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
stack = push(stack, Value::Channel(channel_clone));
let _ = send(stack);
}
for _ in 0..NUM_RECEIVERS {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
stack = push(stack, Value::Channel(channel_clone));
let _ = send(stack);
}
wait_all_strands();
let total_received: i64 = RECEIVER_COUNTS
.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum();
assert_eq!(total_received, NUM_MESSAGES);
let active_receivers = RECEIVER_COUNTS
.iter()
.filter(|c| c.load(Ordering::SeqCst) > 0)
.count();
assert!(active_receivers >= 2, "Messages should be distributed");
}
}