use crate::stack::{Stack, pop, push};
use crate::value::{ChannelData, Value};
use may::sync::mpmc;
use std::sync::Arc;
#[cfg(feature = "diagnostics")]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "diagnostics")]
pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "diagnostics")]
pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
let (sender, receiver) = mpmc::channel();
let channel = Arc::new(ChannelData { sender, receiver });
unsafe { push(stack, Value::Channel(channel)) }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
assert!(!stack.is_null(), "close_channel: stack is empty");
let (rest, channel_value) = unsafe { pop(stack) };
match channel_value {
Value::Channel(_) => {} _ => panic!(
"close_channel: expected Channel on stack, got {:?}",
channel_value
),
}
rest
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
assert!(!stack.is_null(), "chan.send: stack is empty");
let (stack, channel_value) = unsafe { pop(stack) };
let channel = match channel_value {
Value::Channel(ch) => ch,
_ => {
if !stack.is_null() {
let (rest, _value) = unsafe { pop(stack) };
return unsafe { push(rest, Value::Bool(false)) };
}
return unsafe { push(stack, Value::Bool(false)) };
}
};
if stack.is_null() {
return unsafe { push(stack, Value::Bool(false)) };
}
let (rest, value) = unsafe { pop(stack) };
let global_value = value.clone();
match channel.sender.send(global_value) {
Ok(()) => {
#[cfg(feature = "diagnostics")]
TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
unsafe { push(rest, Value::Bool(true)) }
}
Err(_) => unsafe { push(rest, Value::Bool(false)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
assert!(!stack.is_null(), "chan.receive: stack is empty");
let (rest, channel_value) = unsafe { pop(stack) };
let channel = match channel_value {
Value::Channel(ch) => ch,
_ => {
let stack = unsafe { push(rest, Value::Int(0)) };
return unsafe { push(stack, Value::Bool(false)) };
}
};
match channel.receiver.recv() {
Ok(value) => {
#[cfg(feature = "diagnostics")]
TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
let stack = unsafe { push(rest, value) };
unsafe { push(stack, Value::Bool(true)) }
}
Err(_) => {
let stack = unsafe { push(rest, Value::Int(0)) };
unsafe { push(stack, Value::Bool(false)) }
}
}
}
pub use patch_seq_chan_receive as receive;
pub use patch_seq_chan_send as send;
pub use patch_seq_close_channel as close_channel;
pub use patch_seq_make_channel as make_channel;
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::{spawn_strand, wait_all_strands};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
#[test]
fn test_make_channel() {
unsafe {
let stack = crate::stack::alloc_test_stack();
let stack = make_channel(stack);
let (_stack, value) = pop(stack);
assert!(matches!(value, Value::Channel(_)));
}
}
#[test]
fn test_send_receive() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_empty_stack, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (stack, send_success) = pop(stack);
assert_eq!(send_success, Value::Bool(true));
let mut stack = push(stack, channel_value);
stack = receive(stack);
let (stack, recv_success) = pop(stack);
let (_stack, received) = pop(stack);
assert_eq!(recv_success, Value::Bool(true));
assert_eq!(received, Value::Int(42));
}
}
#[test]
fn test_channel_dup_sharing() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, ch1) = pop(stack);
let ch2 = ch1.clone();
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(99));
stack = push(stack, ch1);
stack = send(stack);
let (stack, _) = pop(stack);
let mut stack = push(stack, ch2);
stack = receive(stack);
let (stack, _) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(received, Value::Int(99));
}
}
#[test]
fn test_multiple_sends_receives() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
for i in 1..=5 {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (_, success) = pop(stack);
assert_eq!(success, Value::Bool(true));
}
for i in 1..=5 {
let mut stack = push(crate::stack::alloc_test_stack(), channel_value.clone());
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(received, Value::Int(i));
}
}
}
#[test]
fn test_close_channel() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let _stack = close_channel(stack);
}
}
#[test]
fn test_arena_string_send_between_strands() {
unsafe {
static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
static VERIFIED: AtomicBool = AtomicBool::new(false);
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let ch_ptr = match &channel_value {
Value::Channel(arc) => Arc::as_ptr(arc) as i64,
_ => panic!("Expected Channel"),
};
CHANNEL_PTR.store(ch_ptr, Ordering::Release);
std::mem::forget(channel_value.clone());
extern "C" fn sender(_stack: Stack) -> Stack {
use crate::seqstring::arena_string;
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let msg = arena_string("Arena message!");
assert!(!msg.is_global(), "Should be arena-allocated initially");
let stack = push(crate::stack::alloc_test_stack(), Value::String(msg));
let stack = push(stack, Value::Channel(channel_clone));
let stack = send(stack);
let (stack, _) = pop(stack);
stack
}
}
extern "C" fn receiver(_stack: Stack) -> Stack {
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::Acquire) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(
crate::stack::alloc_test_stack(),
Value::Channel(channel_clone),
);
stack = receive(stack);
let (stack, _) = pop(stack);
let (_, msg_val) = pop(stack);
match msg_val {
Value::String(s) => {
assert_eq!(s.as_str(), "Arena message!");
assert!(s.is_global(), "Received string should be global");
VERIFIED.store(true, Ordering::Release);
}
_ => panic!("Expected String"),
}
std::ptr::null_mut()
}
}
spawn_strand(sender);
spawn_strand(receiver);
wait_all_strands();
assert!(
VERIFIED.load(Ordering::Acquire),
"Receiver should have verified"
);
}
}
#[test]
fn test_send_success() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (stack, result) = pop(stack);
assert_eq!(result, Value::Bool(true));
let mut stack = push(stack, channel_value);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, received) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(received, Value::Int(42));
}
}
#[test]
fn test_send_wrong_type() {
unsafe {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, Value::Int(999)); stack = send(stack);
let (_stack, result) = pop(stack);
assert_eq!(result, Value::Bool(false));
}
}
#[test]
fn test_receive_success() {
unsafe {
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(42));
stack = push(stack, channel_value.clone());
stack = send(stack);
let (_, _) = pop(stack);
let mut stack = push(crate::stack::alloc_test_stack(), channel_value);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_stack, value) = pop(stack);
assert_eq!(success, Value::Bool(true));
assert_eq!(value, Value::Int(42));
}
}
#[test]
fn test_receive_wrong_type() {
unsafe {
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(999));
stack = receive(stack);
let (stack, success) = pop(stack);
let (_stack, value) = pop(stack);
assert_eq!(success, Value::Bool(false));
assert_eq!(value, Value::Int(0));
}
}
#[test]
fn test_mpmc_concurrent_receivers() {
unsafe {
const NUM_MESSAGES: i64 = 100;
const NUM_RECEIVERS: usize = 4;
static RECEIVER_COUNTS: [AtomicI64; 4] = [
AtomicI64::new(0),
AtomicI64::new(0),
AtomicI64::new(0),
AtomicI64::new(0),
];
static CHANNEL_PTR: AtomicI64 = AtomicI64::new(0);
for counter in &RECEIVER_COUNTS {
counter.store(0, Ordering::SeqCst);
}
let mut stack = crate::stack::alloc_test_stack();
stack = make_channel(stack);
let (_, channel_value) = pop(stack);
let ch_ptr = match &channel_value {
Value::Channel(arc) => Arc::as_ptr(arc) as i64,
_ => panic!("Expected Channel"),
};
CHANNEL_PTR.store(ch_ptr, Ordering::SeqCst);
for _ in 0..(NUM_RECEIVERS + 1) {
std::mem::forget(channel_value.clone());
}
fn make_receiver(idx: usize) -> extern "C" fn(Stack) -> Stack {
match idx {
0 => receiver_0,
1 => receiver_1,
2 => receiver_2,
3 => receiver_3,
_ => panic!("Invalid receiver index"),
}
}
extern "C" fn receiver_0(stack: Stack) -> Stack {
receive_loop(0, stack)
}
extern "C" fn receiver_1(stack: Stack) -> Stack {
receive_loop(1, stack)
}
extern "C" fn receiver_2(stack: Stack) -> Stack {
receive_loop(2, stack)
}
extern "C" fn receiver_3(stack: Stack) -> Stack {
receive_loop(3, stack)
}
fn receive_loop(idx: usize, _stack: Stack) -> Stack {
use crate::value::ChannelData;
unsafe {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
loop {
let mut stack = push(
crate::stack::alloc_test_stack(),
Value::Channel(channel_clone.clone()),
);
stack = receive(stack);
let (stack, success) = pop(stack);
let (_, value) = pop(stack);
match (success, value) {
(Value::Bool(true), Value::Int(v)) => {
if v < 0 {
break; }
RECEIVER_COUNTS[idx].fetch_add(1, Ordering::SeqCst);
}
_ => break,
}
may::coroutine::yield_now();
}
std::ptr::null_mut()
}
}
for i in 0..NUM_RECEIVERS {
spawn_strand(make_receiver(i));
}
std::thread::sleep(std::time::Duration::from_millis(10));
for i in 0..NUM_MESSAGES {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(i));
stack = push(stack, Value::Channel(channel_clone));
let _ = send(stack);
}
for _ in 0..NUM_RECEIVERS {
let ch_ptr = CHANNEL_PTR.load(Ordering::SeqCst) as *const ChannelData;
let channel = Arc::from_raw(ch_ptr);
let channel_clone = Arc::clone(&channel);
std::mem::forget(channel);
let mut stack = push(crate::stack::alloc_test_stack(), Value::Int(-1));
stack = push(stack, Value::Channel(channel_clone));
let _ = send(stack);
}
wait_all_strands();
let total_received: i64 = RECEIVER_COUNTS
.iter()
.map(|c| c.load(Ordering::SeqCst))
.sum();
assert_eq!(total_received, NUM_MESSAGES);
let active_receivers = RECEIVER_COUNTS
.iter()
.filter(|c| c.load(Ordering::SeqCst) > 0)
.count();
assert!(active_receivers >= 2, "Messages should be distributed");
}
}
}