use std::sync::{Arc, Mutex};
use bock_interp::{BuiltinRegistry, ChannelHandle, RuntimeError, TypeTag, Value};
pub fn register(registry: &mut BuiltinRegistry) {
registry.register_global("Channel.new", channel_new);
registry.register(TypeTag::Channel, "send", channel_send);
registry.register(TypeTag::Channel, "recv", channel_recv);
registry.register(TypeTag::Channel, "close", channel_close);
registry.register_global("spawn", builtin_spawn);
}
fn channel_new(_args: &[Value]) -> Result<Value, RuntimeError> {
let handle = ChannelHandle::new();
Ok(Value::Tuple(vec![
Value::Channel(handle.clone()),
Value::Channel(handle),
]))
}
fn channel_send(args: &[Value]) -> Result<Value, RuntimeError> {
let handle = match args.first() {
Some(Value::Channel(h)) => h.clone(),
Some(other) => {
return Err(RuntimeError::TypeError(format!(
"Channel.send called on non-Channel: {other}"
)));
}
None => {
return Err(RuntimeError::ArityMismatch {
expected: 2,
got: 0,
});
}
};
let val = args.get(1).cloned().unwrap_or(Value::Void);
handle.sender.send(val).map_err(|_| {
RuntimeError::TypeError("Channel.send: receiver has been dropped".to_string())
})?;
Ok(Value::Void)
}
fn channel_recv(args: &[Value]) -> Result<Value, RuntimeError> {
let handle = match args.first() {
Some(Value::Channel(h)) => h.clone(),
Some(other) => {
return Err(RuntimeError::TypeError(format!(
"Channel.recv called on non-Channel: {other}"
)));
}
None => {
return Err(RuntimeError::ArityMismatch {
expected: 1,
got: 0,
});
}
};
let join = tokio::spawn(async move {
let mut rx = handle.receiver.lock().await;
match rx.recv().await {
Some(v) => Ok(v),
None => Err(RuntimeError::TypeError(
"Channel.recv: channel closed with no more messages".to_string(),
)),
}
});
Ok(Value::Future(Arc::new(Mutex::new(Some(join)))))
}
fn channel_close(_args: &[Value]) -> Result<Value, RuntimeError> {
Ok(Value::Void)
}
fn builtin_spawn(args: &[Value]) -> Result<Value, RuntimeError> {
match args.first() {
Some(Value::Future(_)) => Ok(args[0].clone()),
Some(other) => Err(RuntimeError::TypeError(format!(
"spawn expects the result of an async fn call (Future), got {other}"
))),
None => Err(RuntimeError::ArityMismatch {
expected: 1,
got: 0,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
use bock_interp::BockString;
fn reg() -> BuiltinRegistry {
let mut r = BuiltinRegistry::new();
register(&mut r);
r
}
#[tokio::test]
async fn channel_new_returns_tuple_of_two_channels() {
let r = reg();
let result = r.call_global("Channel.new", &[]).unwrap().unwrap();
match result {
Value::Tuple(items) => {
assert_eq!(items.len(), 2);
assert!(matches!(items[0], Value::Channel(_)));
assert!(matches!(items[1], Value::Channel(_)));
}
other => panic!("expected Tuple, got {other}"),
}
}
#[tokio::test]
async fn channel_send_recv_roundtrip() {
let r = reg();
let tuple = r.call_global("Channel.new", &[]).unwrap().unwrap();
let (tx, rx) = match tuple {
Value::Tuple(mut v) => (v.remove(0), v.remove(0)),
_ => unreachable!(),
};
let msg = Value::String(BockString::new("hello"));
r.call(TypeTag::Channel, "send", &[tx.clone(), msg.clone()])
.unwrap()
.unwrap();
let fut = r
.call(TypeTag::Channel, "recv", &[rx.clone()])
.unwrap()
.unwrap();
let handle = match fut {
Value::Future(h) => h,
_ => panic!("expected Future"),
};
let jh = handle.lock().unwrap().take().unwrap();
let received = jh.await.unwrap().unwrap();
assert_eq!(received, msg);
}
#[tokio::test]
async fn spawn_returns_future_unchanged() {
let r = reg();
let jh = tokio::spawn(async { Ok(Value::Int(42)) });
let fut = Value::Future(Arc::new(Mutex::new(Some(jh))));
let result = r.call_global("spawn", &[fut]).unwrap().unwrap();
assert!(matches!(result, Value::Future(_)));
}
#[test]
fn spawn_rejects_non_future() {
let r = reg();
let result = r.call_global("spawn", &[Value::Int(42)]).unwrap();
assert!(matches!(result, Err(RuntimeError::TypeError(_))));
}
}