use crate::stack::{Stack, pop, push};
use crate::value::{ChannelData, Value};
use may::sync::mpmc;
use std::sync::Arc;
#[cfg(feature = "diagnostics")]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "diagnostics")]
pub static TOTAL_MESSAGES_SENT: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "diagnostics")]
pub static TOTAL_MESSAGES_RECEIVED: AtomicU64 = AtomicU64::new(0);
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_make_channel(stack: Stack) -> Stack {
let (sender, receiver) = mpmc::channel();
let channel = Arc::new(ChannelData { sender, receiver });
unsafe { push(stack, Value::Channel(channel)) }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_close_channel(stack: Stack) -> Stack {
assert!(!stack.is_null(), "close_channel: stack is empty");
let (rest, channel_value) = unsafe { pop(stack) };
match channel_value {
Value::Channel(_) => {} _ => panic!(
"close_channel: expected Channel on stack, got {:?}",
channel_value
),
}
rest
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_chan_send(stack: Stack) -> Stack {
assert!(!stack.is_null(), "chan.send: stack is empty");
let (stack, channel_value) = unsafe { pop(stack) };
let channel = match channel_value {
Value::Channel(ch) => ch,
_ => {
if !stack.is_null() {
let (rest, _value) = unsafe { pop(stack) };
return unsafe { push(rest, Value::Bool(false)) };
}
return unsafe { push(stack, Value::Bool(false)) };
}
};
if stack.is_null() {
return unsafe { push(stack, Value::Bool(false)) };
}
let (rest, value) = unsafe { pop(stack) };
let global_value = value.clone();
match channel.sender.send(global_value) {
Ok(()) => {
#[cfg(feature = "diagnostics")]
TOTAL_MESSAGES_SENT.fetch_add(1, Ordering::Relaxed);
unsafe { push(rest, Value::Bool(true)) }
}
Err(_) => unsafe { push(rest, Value::Bool(false)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn patch_seq_chan_receive(stack: Stack) -> Stack {
assert!(!stack.is_null(), "chan.receive: stack is empty");
let (rest, channel_value) = unsafe { pop(stack) };
let channel = match channel_value {
Value::Channel(ch) => ch,
_ => {
let stack = unsafe { push(rest, Value::Int(0)) };
return unsafe { push(stack, Value::Bool(false)) };
}
};
match channel.receiver.recv() {
Ok(value) => {
#[cfg(feature = "diagnostics")]
TOTAL_MESSAGES_RECEIVED.fetch_add(1, Ordering::Relaxed);
let stack = unsafe { push(rest, value) };
unsafe { push(stack, Value::Bool(true)) }
}
Err(_) => {
let stack = unsafe { push(rest, Value::Int(0)) };
unsafe { push(stack, Value::Bool(false)) }
}
}
}
pub use patch_seq_chan_receive as receive;
pub use patch_seq_chan_send as send;
pub use patch_seq_close_channel as close_channel;
pub use patch_seq_make_channel as make_channel;
#[cfg(test)]
mod tests;