use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::shared_state::ScopedKey;
use crate::value::{VmAtomicHandle, VmChannelHandle, VmError, VmValue};
use crate::vm::Vm;
struct CircuitState {
failures: usize,
threshold: usize,
reset_ms: u64,
opened_at: Option<std::time::Instant>,
}
thread_local! {
static CIRCUITS: RefCell<HashMap<String, CircuitState>> = RefCell::new(HashMap::new());
}
fn select_result(index: usize, value: VmValue, channel_name: &str) -> VmValue {
let mut result = BTreeMap::new();
result.insert("index".to_string(), VmValue::Int(index as i64));
result.insert("value".to_string(), value);
result.insert(
"channel".to_string(),
VmValue::String(Rc::from(channel_name)),
);
VmValue::Dict(Rc::new(result))
}
fn select_none() -> VmValue {
let mut result = BTreeMap::new();
result.insert("index".to_string(), VmValue::Int(-1));
result.insert("value".to_string(), VmValue::Nil);
result.insert("channel".to_string(), VmValue::Nil);
VmValue::Dict(Rc::new(result))
}
fn try_poll_channels(channels: &[VmValue]) -> (Option<(usize, VmValue, String)>, bool) {
let mut all_closed = true;
for (i, ch_val) in channels.iter().enumerate() {
if let VmValue::Channel(ch) = ch_val {
if let Ok(mut rx) = ch.receiver.try_lock() {
match rx.try_recv() {
Ok(val) => return (Some((i, val, ch.name.to_string())), false),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
all_closed = false;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {}
}
} else {
all_closed = false;
}
}
}
(None, all_closed)
}
fn cancelled_vm_error() -> VmError {
VmError::Thrown(VmValue::String(Rc::from(
"kind:cancelled:VM cancelled by host",
)))
}
fn optional_timeout_ms(value: Option<&VmValue>) -> Option<u64> {
match value {
Some(VmValue::Int(n)) => Some((*n).max(0) as u64),
Some(VmValue::Duration(ms)) => Some(*ms),
Some(VmValue::Dict(dict)) => dict
.get("timeout_ms")
.or_else(|| dict.get("max_wait_ms"))
.and_then(|v| match v {
VmValue::Int(n) => Some((*n).max(0) as u64),
VmValue::Duration(ms) => Some(*ms),
_ => None,
}),
_ => None,
}
}
fn positive_u32_arg(
args: &[VmValue],
idx: usize,
default: u32,
name: &str,
) -> Result<u32, VmError> {
let value = args
.get(idx)
.and_then(|v| v.as_int())
.unwrap_or(default as i64);
if value <= 0 {
return Err(VmError::Runtime(format!("{name}: value must be positive")));
}
Ok(value as u32)
}
fn dict_string(dict: &BTreeMap<String, VmValue>, key: &str) -> Option<String> {
dict.get(key).and_then(|value| match value {
VmValue::String(text) if !text.is_empty() => Some(text.to_string()),
VmValue::Nil => None,
other => Some(other.display()),
})
}
fn context_string(context: &VmValue, key: &str) -> Option<String> {
context.as_dict().and_then(|dict| match dict.get(key) {
Some(VmValue::String(text)) if !text.is_empty() => Some(text.to_string()),
_ => None,
})
}
fn resolve_shared_scope(
vm: &Vm,
raw_scope: Option<&str>,
options: Option<&BTreeMap<String, VmValue>>,
builtin: &str,
) -> Result<String, VmError> {
let context = crate::runtime_context::runtime_context_value(vm);
let scope = raw_scope.unwrap_or("task_group");
let pick = |field: &str| context_string(&context, field);
let resolved = match scope {
"task" | "task_local" | "task-local" => {
pick("task_id").unwrap_or_else(|| "task_root".to_string())
}
"root_task" | "root-task" => {
pick("root_task_id").unwrap_or_else(|| "task_root".to_string())
}
"task_group" | "task-group" | "workflow" | "workflow_local" | "workflow-local" => {
pick("task_group_id")
.or_else(|| pick("run_id"))
.or_else(|| pick("root_task_id"))
.unwrap_or_else(|| "task_root".to_string())
}
"workflow_run" | "workflow-run" | "run" => pick("run_id")
.or_else(|| pick("task_group_id"))
.or_else(|| pick("root_task_id"))
.unwrap_or_else(|| "task_root".to_string()),
"agent_session" | "agent-session" | "session" => pick("agent_session_id")
.or_else(|| pick("root_agent_session_id"))
.or_else(|| pick("root_task_id"))
.unwrap_or_else(|| "task_root".to_string()),
"tenant" | "tenant_scoped" | "tenant-scoped" => options
.and_then(|opts| dict_string(opts, "tenant_id"))
.or_else(|| pick("tenant_id"))
.ok_or_else(|| {
VmError::Runtime(format!(
"{builtin}: tenant scope requires tenant_id in options or runtime context"
))
})?,
"process" | "global" => "process".to_string(),
"durable" | "event_log" | "event-log" => {
return Err(VmError::Runtime(format!(
"{builtin}: durable shared state is explicit; use store_* or agent_state_* APIs"
)));
}
"external" | "host" => {
return Err(VmError::Runtime(format!(
"{builtin}: external shared state must be provided by a host/connector builtin"
)));
}
custom => custom.to_string(),
};
Ok(format!("{scope}:{resolved}"))
}
fn shared_options(args: &[VmValue]) -> Option<&BTreeMap<String, VmValue>> {
args.first().and_then(VmValue::as_dict)
}
fn scoped_from_open_args(
vm: &Vm,
args: &[VmValue],
builtin: &str,
key_field: &str,
) -> Result<(ScopedKey, Option<BTreeMap<String, VmValue>>), VmError> {
let options = shared_options(args);
let key = if let Some(options) = options {
dict_string(options, key_field)
.or_else(|| dict_string(options, "key"))
.or_else(|| dict_string(options, "name"))
} else {
args.first()
.map(VmValue::display)
.filter(|key| !key.is_empty())
}
.ok_or_else(|| VmError::Runtime(format!("{builtin}: key/name is required")))?;
let raw_scope = options.and_then(|opts| dict_string(opts, "scope"));
let scope = resolve_shared_scope(vm, raw_scope.as_deref(), options, builtin)?;
Ok((ScopedKey { scope, key }, options.cloned()))
}
fn scoped_from_handle_or_name(
vm: &Vm,
value: &VmValue,
expected_kind: &str,
builtin: &str,
) -> Result<ScopedKey, VmError> {
if let Some(dict) = value.as_dict() {
let kind = dict_string(dict, "_type").unwrap_or_default();
if kind != expected_kind {
return Err(VmError::Runtime(format!(
"{builtin}: expected {expected_kind} handle"
)));
}
let scope = dict_string(dict, "scope").ok_or_else(|| {
VmError::Runtime(format!("{builtin}: {expected_kind} handle missing scope"))
})?;
let key = dict_string(dict, "key").ok_or_else(|| {
VmError::Runtime(format!("{builtin}: {expected_kind} handle missing key"))
})?;
return Ok(ScopedKey { scope, key });
}
let key = value.display();
if key.is_empty() {
return Err(VmError::Runtime(format!(
"{builtin}: name must not be empty"
)));
}
let scope = resolve_shared_scope(vm, None, None, builtin)?;
Ok(ScopedKey { scope, key })
}
fn current_async_vm(builtin: &str) -> Result<Vm, VmError> {
crate::vm::clone_async_builtin_child_vm().ok_or_else(|| {
VmError::Runtime(format!(
"{builtin}: shared state builtin requires VM execution context"
))
})
}
pub(crate) fn register_concurrency_builtins(vm: &mut Vm) {
let sync_runtime = vm.sync_runtime.clone();
vm.register_async_builtin("sync_mutex_acquire", move |args| {
let sync_runtime = sync_runtime.clone();
async move {
let key = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "__default__".to_string());
let timeout_ms = optional_timeout_ms(args.get(1));
let cancel_token =
crate::vm::clone_async_builtin_child_vm().and_then(|vm| vm.cancel_token.clone());
Ok(sync_runtime
.acquire("mutex", &key, 1, 1, timeout_ms, cancel_token)
.await?
.map(VmValue::SyncPermit)
.unwrap_or(VmValue::Nil))
}
});
let sync_runtime = vm.sync_runtime.clone();
vm.register_async_builtin("sync_semaphore_acquire", move |args| {
let sync_runtime = sync_runtime.clone();
async move {
let key = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let capacity = positive_u32_arg(&args, 1, 1, "sync_semaphore_acquire")?;
let permits = positive_u32_arg(&args, 2, 1, "sync_semaphore_acquire")?;
let timeout_ms = optional_timeout_ms(args.get(3));
let cancel_token =
crate::vm::clone_async_builtin_child_vm().and_then(|vm| vm.cancel_token.clone());
Ok(sync_runtime
.acquire(
"semaphore",
&key,
capacity,
permits,
timeout_ms,
cancel_token,
)
.await?
.map(VmValue::SyncPermit)
.unwrap_or(VmValue::Nil))
}
});
let sync_runtime = vm.sync_runtime.clone();
vm.register_async_builtin("sync_gate_acquire", move |args| {
let sync_runtime = sync_runtime.clone();
async move {
let key = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let limit = positive_u32_arg(&args, 1, 1, "sync_gate_acquire")?;
let timeout_ms = optional_timeout_ms(args.get(2));
let cancel_token =
crate::vm::clone_async_builtin_child_vm().and_then(|vm| vm.cancel_token.clone());
Ok(sync_runtime
.acquire("gate", &key, limit, 1, timeout_ms, cancel_token)
.await?
.map(VmValue::SyncPermit)
.unwrap_or(VmValue::Nil))
}
});
vm.register_builtin("sync_release", |args, _out| {
let Some(VmValue::SyncPermit(permit)) = args.first() else {
return Err(VmError::Runtime(
"sync_release: first argument must be a sync permit".to_string(),
));
};
Ok(VmValue::Bool(permit.release()))
});
let sync_runtime = vm.sync_runtime.clone();
vm.register_builtin("sync_metrics", move |args, _out| {
let kind = args.first().map(|v| v.display());
let key = args.get(1).map(|v| v.display());
Ok(sync_runtime.metrics(kind.as_deref(), key.as_deref()))
});
vm.register_async_builtin("shared_scope_id", |args| async move {
let vm = current_async_vm("shared_scope_id")?;
let options = args.get(1).and_then(VmValue::as_dict);
let raw_scope = args.first().map(VmValue::display);
Ok(VmValue::String(Rc::from(resolve_shared_scope(
&vm,
raw_scope.as_deref(),
options,
"shared_scope_id",
)?)))
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_cell", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_cell")?;
let (scoped, options) = scoped_from_open_args(&vm, &args, "shared_cell", "key")?;
let initial = options
.as_ref()
.and_then(|opts| opts.get("initial").cloned())
.or_else(|| args.get(1).cloned())
.unwrap_or(VmValue::Nil);
Ok(shared_runtime.open_cell(scoped, initial))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_get", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_get")?;
let handle = args
.first()
.ok_or_else(|| VmError::Runtime("shared_get: handle is required".to_string()))?;
let scoped = scoped_from_handle_or_name(&vm, handle, "shared_cell", "shared_get")?;
Ok(shared_runtime.cell_get(&scoped))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_snapshot", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_snapshot")?;
let handle = args.first().ok_or_else(|| {
VmError::Runtime("shared_snapshot: handle is required".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, handle, "shared_cell", "shared_snapshot")?;
Ok(shared_runtime.cell_snapshot(&scoped))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_set", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_set")?;
let handle = args
.first()
.ok_or_else(|| VmError::Runtime("shared_set: handle is required".to_string()))?;
let scoped = scoped_from_handle_or_name(&vm, handle, "shared_cell", "shared_set")?;
let value = args.get(1).cloned().unwrap_or(VmValue::Nil);
Ok(shared_runtime.cell_set(&scoped, value))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_cas", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 3 {
return Err(VmError::Runtime(
"shared_cas: requires handle, expected, and new value".to_string(),
));
}
let vm = current_async_vm("shared_cas")?;
let scoped = scoped_from_handle_or_name(&vm, &args[0], "shared_cell", "shared_cas")?;
Ok(VmValue::Bool(shared_runtime.cell_cas(
&scoped,
&args[1],
args[2].clone(),
)))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_map")?;
let (scoped, options) = scoped_from_open_args(&vm, &args, "shared_map", "key")?;
let initial = options
.as_ref()
.and_then(|opts| opts.get("initial"))
.or_else(|| args.get(1))
.and_then(VmValue::as_dict)
.cloned();
Ok(shared_runtime.open_map(scoped, initial))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_get", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 2 {
return Err(VmError::Runtime(
"shared_map_get: requires handle and key".to_string(),
));
}
let vm = current_async_vm("shared_map_get")?;
let scoped = scoped_from_handle_or_name(&vm, &args[0], "shared_map", "shared_map_get")?;
let default = args.get(2).cloned().unwrap_or(VmValue::Nil);
Ok(shared_runtime.map_get(&scoped, &args[1].display(), default))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_snapshot", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 2 {
return Err(VmError::Runtime(
"shared_map_snapshot: requires handle and key".to_string(),
));
}
let vm = current_async_vm("shared_map_snapshot")?;
let scoped =
scoped_from_handle_or_name(&vm, &args[0], "shared_map", "shared_map_snapshot")?;
Ok(shared_runtime.map_snapshot(&scoped, &args[1].display()))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_entries", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_map_entries")?;
let handle = args.first().ok_or_else(|| {
VmError::Runtime("shared_map_entries: handle is required".to_string())
})?;
let scoped =
scoped_from_handle_or_name(&vm, handle, "shared_map", "shared_map_entries")?;
Ok(shared_runtime.map_entries(&scoped))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_set", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 3 {
return Err(VmError::Runtime(
"shared_map_set: requires handle, key, and value".to_string(),
));
}
let vm = current_async_vm("shared_map_set")?;
let scoped = scoped_from_handle_or_name(&vm, &args[0], "shared_map", "shared_map_set")?;
Ok(shared_runtime.map_set(&scoped, args[1].display(), args[2].clone()))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_delete", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 2 {
return Err(VmError::Runtime(
"shared_map_delete: requires handle and key".to_string(),
));
}
let vm = current_async_vm("shared_map_delete")?;
let scoped =
scoped_from_handle_or_name(&vm, &args[0], "shared_map", "shared_map_delete")?;
Ok(shared_runtime.map_delete(&scoped, &args[1].display()))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_map_cas", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 4 {
return Err(VmError::Runtime(
"shared_map_cas: requires handle, key, expected, and new value".to_string(),
));
}
let vm = current_async_vm("shared_map_cas")?;
let scoped = scoped_from_handle_or_name(&vm, &args[0], "shared_map", "shared_map_cas")?;
Ok(VmValue::Bool(shared_runtime.map_cas(
&scoped,
args[1].display(),
&args[2],
args[3].clone(),
)))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("shared_metrics", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("shared_metrics")?;
let Some(handle) = args.first() else {
return Ok(shared_runtime.metrics(None, None));
};
let Some(dict) = handle.as_dict() else {
return Ok(shared_runtime.metrics(None, None));
};
let kind = dict_string(dict, "_type").ok_or_else(|| {
VmError::Runtime("shared_metrics: handle missing _type".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, handle, &kind, "shared_metrics")?;
Ok(shared_runtime.metrics(Some(&kind), Some(&scoped)))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_open", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_open")?;
let (scoped, options) = scoped_from_open_args(&vm, &args, "mailbox_open", "name")?;
let capacity = options
.as_ref()
.and_then(|opts| opts.get("capacity").and_then(VmValue::as_int))
.or_else(|| args.get(1).and_then(VmValue::as_int))
.unwrap_or(256)
.max(1) as usize;
Ok(shared_runtime.open_mailbox(scoped, capacity))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_lookup", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_lookup")?;
let target = args.first().ok_or_else(|| {
VmError::Runtime("mailbox_lookup: name or handle is required".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, target, "mailbox", "mailbox_lookup")?;
Ok(shared_runtime.mailbox(&scoped).unwrap_or(VmValue::Nil))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_send", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
if args.len() < 2 {
return Err(VmError::Runtime(
"mailbox_send: requires target and value".to_string(),
));
}
let vm = current_async_vm("mailbox_send")?;
let scoped = scoped_from_handle_or_name(&vm, &args[0], "mailbox", "mailbox_send")?;
let Some(channel) = shared_runtime.mailbox_channel(&scoped) else {
return Ok(VmValue::Bool(false));
};
if channel.closed.load(Ordering::SeqCst) {
shared_runtime.note_mailbox_send(&scoped, false);
return Ok(VmValue::Bool(false));
}
let ok = channel.sender.send(args[1].clone()).await.is_ok();
shared_runtime.note_mailbox_send(&scoped, ok);
Ok(VmValue::Bool(ok))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_try_receive", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_try_receive")?;
let target = args.first().ok_or_else(|| {
VmError::Runtime("mailbox_try_receive: target is required".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, target, "mailbox", "mailbox_try_receive")?;
let Some(channel) = shared_runtime.mailbox_channel(&scoped) else {
return Ok(VmValue::Nil);
};
let Ok(mut rx) = channel.receiver.try_lock() else {
return Ok(VmValue::Nil);
};
match rx.try_recv() {
Ok(value) => {
shared_runtime.note_mailbox_receive(&scoped);
Ok(value)
}
Err(_) => Ok(VmValue::Nil),
}
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_receive", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_receive")?;
let cancel_token = vm.cancel_token.clone();
let target = args.first().ok_or_else(|| {
VmError::Runtime("mailbox_receive: target is required".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, target, "mailbox", "mailbox_receive")?;
let Some(channel) = shared_runtime.mailbox_channel(&scoped) else {
return Ok(VmValue::Nil);
};
loop {
if cancel_token
.as_ref()
.is_some_and(|token| token.load(Ordering::SeqCst))
{
return Err(cancelled_vm_error());
}
if channel.closed.load(Ordering::SeqCst) {
let mut rx = channel.receiver.lock().await;
return match rx.try_recv() {
Ok(value) => {
shared_runtime.note_mailbox_receive(&scoped);
Ok(value)
}
Err(_) => Ok(VmValue::Nil),
};
}
let mut rx = channel.receiver.lock().await;
let poll = tokio::time::sleep(tokio::time::Duration::from_millis(10));
tokio::pin!(poll);
tokio::select! {
value = rx.recv() => {
return match value {
Some(value) => {
shared_runtime.note_mailbox_receive(&scoped);
Ok(value)
}
None => Ok(VmValue::Nil),
};
}
_ = &mut poll => {}
}
}
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_close", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_close")?;
let target = args
.first()
.ok_or_else(|| VmError::Runtime("mailbox_close: target is required".to_string()))?;
let scoped = scoped_from_handle_or_name(&vm, target, "mailbox", "mailbox_close")?;
Ok(VmValue::Bool(shared_runtime.close_mailbox(&scoped)))
}
});
let shared_runtime = vm.shared_state_runtime.clone();
vm.register_async_builtin("mailbox_metrics", move |args| {
let shared_runtime = shared_runtime.clone();
async move {
let vm = current_async_vm("mailbox_metrics")?;
let target = args.first().ok_or_else(|| {
VmError::Runtime("mailbox_metrics: target is required".to_string())
})?;
let scoped = scoped_from_handle_or_name(&vm, target, "mailbox", "mailbox_metrics")?;
Ok(shared_runtime.metrics(Some("mailbox"), Some(&scoped)))
}
});
vm.register_builtin("channel", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let capacity = args.get(1).and_then(|a| a.as_int()).unwrap_or(256) as usize;
let capacity = capacity.max(1);
let (tx, rx) = tokio::sync::mpsc::channel(capacity);
#[allow(clippy::arc_with_non_send_sync)]
Ok(VmValue::Channel(VmChannelHandle {
name: Rc::from(name),
sender: Arc::new(tx),
receiver: Arc::new(tokio::sync::Mutex::new(rx)),
closed: Arc::new(AtomicBool::new(false)),
}))
});
vm.register_builtin("close_channel", |args, _out| {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"close_channel: requires a channel",
))));
}
if let VmValue::Channel(ch) = &args[0] {
ch.closed.store(true, Ordering::SeqCst);
Ok(VmValue::Nil)
} else {
Err(VmError::Thrown(VmValue::String(Rc::from(
"close_channel: first argument must be a channel",
))))
}
});
vm.register_builtin("try_receive", |args, _out| {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"try_receive: requires a channel",
))));
}
if let VmValue::Channel(ch) = &args[0] {
match ch.receiver.try_lock() {
Ok(mut rx) => match rx.try_recv() {
Ok(val) => Ok(val),
Err(_) => Ok(VmValue::Nil),
},
Err(_) => Ok(VmValue::Nil),
}
} else {
Err(VmError::Thrown(VmValue::String(Rc::from(
"try_receive: first argument must be a channel",
))))
}
});
vm.register_builtin("atomic", |args, _out| {
let initial = match args.first() {
Some(VmValue::Int(n)) => *n,
Some(VmValue::Float(f)) => *f as i64,
Some(VmValue::Bool(b)) => i64::from(*b),
_ => 0,
};
Ok(VmValue::Atomic(VmAtomicHandle {
value: Arc::new(AtomicI64::new(initial)),
}))
});
vm.register_builtin("atomic_get", |args, _out| {
if let Some(VmValue::Atomic(a)) = args.first() {
Ok(VmValue::Int(a.value.load(Ordering::SeqCst)))
} else {
Ok(VmValue::Nil)
}
});
vm.register_builtin("atomic_set", |args, _out| {
if args.len() >= 2 {
if let (VmValue::Atomic(a), Some(val)) = (&args[0], args[1].as_int()) {
let old = a.value.swap(val, Ordering::SeqCst);
return Ok(VmValue::Int(old));
}
}
Ok(VmValue::Nil)
});
vm.register_builtin("atomic_add", |args, _out| {
if args.len() >= 2 {
if let (VmValue::Atomic(a), Some(delta)) = (&args[0], args[1].as_int()) {
let prev = a.value.fetch_add(delta, Ordering::SeqCst);
return Ok(VmValue::Int(prev));
}
}
Ok(VmValue::Nil)
});
vm.register_builtin("atomic_cas", |args, _out| {
if args.len() >= 3 {
if let (VmValue::Atomic(a), Some(expected), Some(new_val)) =
(&args[0], args[1].as_int(), args[2].as_int())
{
let result =
a.value
.compare_exchange(expected, new_val, Ordering::SeqCst, Ordering::SeqCst);
return Ok(VmValue::Bool(result.is_ok()));
}
}
Ok(VmValue::Bool(false))
});
vm.register_async_builtin("sleep", |args| async move {
let ms = match args.first() {
Some(VmValue::Duration(ms)) => *ms,
Some(VmValue::Int(n)) => *n as u64,
_ => 0,
};
if ms > 0 {
let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(ms));
tokio::pin!(sleep);
if let Some(vm) = crate::vm::clone_async_builtin_child_vm() {
let mut poll = tokio::time::interval(Duration::from_millis(10));
loop {
tokio::select! {
_ = &mut sleep => break,
_ = poll.tick() => {
if vm.is_cancel_requested() {
return Err(cancelled_vm_error());
}
}
}
}
} else {
sleep.await;
}
}
Ok(VmValue::Nil)
});
vm.register_async_builtin("send", |args| async move {
if args.len() < 2 {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"send: requires channel and value",
))));
}
if let VmValue::Channel(ch) = &args[0] {
if ch.closed.load(Ordering::SeqCst) {
return Ok(VmValue::Bool(false));
}
let val = args[1].clone();
match ch.sender.send(val).await {
Ok(()) => Ok(VmValue::Bool(true)),
Err(_) => Ok(VmValue::Bool(false)),
}
} else {
Err(VmError::Thrown(VmValue::String(Rc::from(
"send: first argument must be a channel",
))))
}
});
vm.register_async_builtin("receive", |args| async move {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"receive: requires a channel",
))));
}
if let VmValue::Channel(ch) = &args[0] {
if ch.closed.load(Ordering::SeqCst) {
let mut rx = ch.receiver.lock().await;
return match rx.try_recv() {
Ok(val) => Ok(val),
Err(_) => Ok(VmValue::Nil),
};
}
let mut rx = ch.receiver.lock().await;
match rx.recv().await {
Some(val) => Ok(val),
None => Ok(VmValue::Nil),
}
} else {
Err(VmError::Thrown(VmValue::String(Rc::from(
"receive: first argument must be a channel",
))))
}
});
vm.register_async_builtin("select", |args| async move {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"select: requires at least one channel",
))));
}
for arg in &args {
if !matches!(arg, VmValue::Channel(_)) {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"select: all arguments must be channels",
))));
}
}
loop {
let (found, all_closed) = try_poll_channels(&args);
if let Some((i, val, name)) = found {
return Ok(select_result(i, val, &name));
}
if all_closed {
return Ok(select_none());
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
vm.register_async_builtin("__select_timeout", |args| async move {
if args.len() < 2 {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_timeout: requires channel list and timeout",
))));
}
let channels = match &args[0] {
VmValue::List(items) => (**items).clone(),
_ => {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_timeout: first argument must be a list of channels",
))));
}
};
let timeout_ms = match &args[1] {
VmValue::Int(n) => (*n).max(0) as u64,
VmValue::Duration(ms) => *ms,
_ => 5000,
};
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
loop {
let (found, all_closed) = try_poll_channels(&channels);
if let Some((i, val, name)) = found {
return Ok(select_result(i, val, &name));
}
if all_closed || tokio::time::Instant::now() >= deadline {
return Ok(select_none());
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
vm.register_async_builtin("__select_try", |args| async move {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_try: requires channel list",
))));
}
let channels = match &args[0] {
VmValue::List(items) => (**items).clone(),
_ => {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_try: first argument must be a list of channels",
))));
}
};
let (found, _) = try_poll_channels(&channels);
if let Some((i, val, name)) = found {
Ok(select_result(i, val, &name))
} else {
Ok(select_none())
}
});
vm.register_async_builtin("__select_list", |args| async move {
if args.is_empty() {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_list: requires channel list",
))));
}
let channels = match &args[0] {
VmValue::List(items) => (**items).clone(),
_ => {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"__select_list: first argument must be a list of channels",
))));
}
};
loop {
let (found, all_closed) = try_poll_channels(&channels);
if let Some((i, val, name)) = found {
return Ok(select_result(i, val, &name));
}
if all_closed {
return Ok(select_none());
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
vm.register_builtin("timer_start", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let mut timer = BTreeMap::new();
timer.insert("name".to_string(), VmValue::String(Rc::from(name)));
timer.insert("start_ms".to_string(), VmValue::Int(now_ms));
Ok(VmValue::Dict(Rc::new(timer)))
});
vm.register_builtin("circuit_breaker", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let threshold = args.get(1).and_then(|a| a.as_int()).unwrap_or(5) as usize;
let reset_ms = args.get(2).and_then(|a| a.as_int()).unwrap_or(30000) as u64;
CIRCUITS.with(|circuits| {
circuits.borrow_mut().insert(
name.clone(),
CircuitState {
failures: 0,
threshold,
reset_ms,
opened_at: None,
},
);
});
Ok(VmValue::String(Rc::from(name)))
});
vm.register_builtin("circuit_check", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let state = CIRCUITS.with(|circuits| {
let circuits = circuits.borrow();
let Some(cs) = circuits.get(&name) else {
return "closed".to_string();
};
match cs.opened_at {
None => "closed".to_string(),
Some(opened) => {
let elapsed = opened.elapsed().as_millis() as u64;
if elapsed >= cs.reset_ms {
"half_open".to_string()
} else {
"open".to_string()
}
}
}
});
Ok(VmValue::String(Rc::from(state)))
});
vm.register_builtin("circuit_record_success", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
CIRCUITS.with(|circuits| {
let mut circuits = circuits.borrow_mut();
if let Some(cs) = circuits.get_mut(&name) {
cs.failures = 0;
cs.opened_at = None;
}
});
Ok(VmValue::Nil)
});
vm.register_builtin("circuit_record_failure", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
let is_open = CIRCUITS.with(|circuits| {
let mut circuits = circuits.borrow_mut();
if let Some(cs) = circuits.get_mut(&name) {
cs.failures += 1;
if cs.failures >= cs.threshold && cs.opened_at.is_none() {
cs.opened_at = Some(std::time::Instant::now());
return true;
}
}
false
});
Ok(VmValue::Bool(is_open))
});
vm.register_builtin("circuit_reset", |args, _out| {
let name = args
.first()
.map(|a| a.display())
.unwrap_or_else(|| "default".to_string());
CIRCUITS.with(|circuits| {
let mut circuits = circuits.borrow_mut();
if let Some(cs) = circuits.get_mut(&name) {
cs.failures = 0;
cs.opened_at = None;
}
});
Ok(VmValue::Nil)
});
vm.register_builtin("timer_end", |args, out| {
let timer = match args.first() {
Some(VmValue::Dict(d)) => d,
_ => {
return Err(VmError::Thrown(VmValue::String(Rc::from(
"timer_end: argument must be a timer dict from timer_start",
))));
}
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let start_ms = timer
.get("start_ms")
.and_then(|v| v.as_int())
.unwrap_or(now_ms);
let elapsed = now_ms - start_ms;
let name = timer.get("name").map(|v| v.display()).unwrap_or_default();
out.push_str(&format!("[timer] {name}: {elapsed}ms\n"));
Ok(VmValue::Int(elapsed))
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::vm::Vm;
use std::rc::Rc;
fn vm() -> Vm {
let mut vm = Vm::new();
register_concurrency_builtins(&mut vm);
vm
}
fn call(vm: &mut Vm, name: &str, args: Vec<VmValue>) -> Result<VmValue, VmError> {
let f = vm.builtins.get(name).unwrap().clone();
let mut out = String::new();
f(&args, &mut out)
}
fn s(v: &str) -> VmValue {
VmValue::String(Rc::from(v))
}
#[test]
fn atomic_default_zero() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![]).unwrap();
let val = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(val.display(), "0");
}
#[test]
fn atomic_initial_value() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Int(42)]).unwrap();
let val = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(val.display(), "42");
}
#[test]
fn atomic_set_returns_old() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Int(10)]).unwrap();
let old = call(&mut vm, "atomic_set", vec![atom.clone(), VmValue::Int(20)]).unwrap();
assert_eq!(old.display(), "10");
let cur = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(cur.display(), "20");
}
#[test]
fn atomic_add() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Int(5)]).unwrap();
let prev = call(&mut vm, "atomic_add", vec![atom.clone(), VmValue::Int(3)]).unwrap();
assert_eq!(prev.display(), "5");
let cur = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(cur.display(), "8");
}
#[test]
fn atomic_cas_success() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Int(10)]).unwrap();
let ok = call(
&mut vm,
"atomic_cas",
vec![atom.clone(), VmValue::Int(10), VmValue::Int(20)],
)
.unwrap();
assert_eq!(ok.display(), "true");
let cur = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(cur.display(), "20");
}
#[test]
fn atomic_cas_failure() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Int(10)]).unwrap();
let ok = call(
&mut vm,
"atomic_cas",
vec![atom.clone(), VmValue::Int(99), VmValue::Int(20)],
)
.unwrap();
assert_eq!(ok.display(), "false");
let cur = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(cur.display(), "10");
}
#[test]
fn atomic_bool_init() {
let mut vm = vm();
let atom = call(&mut vm, "atomic", vec![VmValue::Bool(true)]).unwrap();
let val = call(&mut vm, "atomic_get", vec![atom]).unwrap();
assert_eq!(val.display(), "1");
}
#[test]
fn circuit_breaker_starts_closed() {
let mut vm = vm();
call(
&mut vm,
"circuit_breaker",
vec![s("test_cb"), VmValue::Int(3)],
)
.unwrap();
let state = call(&mut vm, "circuit_check", vec![s("test_cb")]).unwrap();
assert_eq!(state.display(), "closed");
}
#[test]
fn circuit_opens_at_threshold() {
let mut vm = vm();
call(
&mut vm,
"circuit_breaker",
vec![s("test_cb2"), VmValue::Int(2)],
)
.unwrap();
let opened = call(&mut vm, "circuit_record_failure", vec![s("test_cb2")]).unwrap();
assert_eq!(opened.display(), "false");
let state = call(&mut vm, "circuit_check", vec![s("test_cb2")]).unwrap();
assert_eq!(state.display(), "closed");
let opened = call(&mut vm, "circuit_record_failure", vec![s("test_cb2")]).unwrap();
assert_eq!(opened.display(), "true");
let state = call(&mut vm, "circuit_check", vec![s("test_cb2")]).unwrap();
assert_eq!(state.display(), "open");
}
#[test]
fn circuit_success_resets() {
let mut vm = vm();
call(
&mut vm,
"circuit_breaker",
vec![s("test_cb3"), VmValue::Int(2)],
)
.unwrap();
call(&mut vm, "circuit_record_failure", vec![s("test_cb3")]).unwrap();
call(&mut vm, "circuit_record_success", vec![s("test_cb3")]).unwrap();
let state = call(&mut vm, "circuit_check", vec![s("test_cb3")]).unwrap();
assert_eq!(state.display(), "closed");
call(&mut vm, "circuit_record_failure", vec![s("test_cb3")]).unwrap();
let state = call(&mut vm, "circuit_check", vec![s("test_cb3")]).unwrap();
assert_eq!(state.display(), "closed");
}
#[test]
fn circuit_reset_clears_state() {
let mut vm = vm();
call(
&mut vm,
"circuit_breaker",
vec![s("test_cb4"), VmValue::Int(1)],
)
.unwrap();
call(&mut vm, "circuit_record_failure", vec![s("test_cb4")]).unwrap();
let state = call(&mut vm, "circuit_check", vec![s("test_cb4")]).unwrap();
assert_eq!(state.display(), "open");
call(&mut vm, "circuit_reset", vec![s("test_cb4")]).unwrap();
let state = call(&mut vm, "circuit_check", vec![s("test_cb4")]).unwrap();
assert_eq!(state.display(), "closed");
}
#[test]
fn circuit_unknown_name_defaults_closed() {
let mut vm = vm();
let state = call(&mut vm, "circuit_check", vec![s("nonexistent")]).unwrap();
assert_eq!(state.display(), "closed");
}
#[test]
fn timer_start_returns_dict() {
let mut vm = vm();
let timer = call(&mut vm, "timer_start", vec![s("my_timer")]).unwrap();
let dict = timer.as_dict().unwrap();
assert_eq!(dict.get("name").unwrap().display(), "my_timer");
assert!(dict.get("start_ms").unwrap().as_int().unwrap() > 0);
}
#[test]
fn timer_end_returns_elapsed() {
let mut vm = vm();
let timer = call(&mut vm, "timer_start", vec![s("t")]).unwrap();
let elapsed = call(&mut vm, "timer_end", vec![timer]).unwrap();
assert!(elapsed.as_int().unwrap() >= 0);
assert!(elapsed.as_int().unwrap() < 1000);
}
#[test]
fn timer_end_non_dict_errors() {
let mut vm = vm();
let result = call(&mut vm, "timer_end", vec![VmValue::Int(42)]);
assert!(result.is_err());
}
}