use std::any::Any;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::{Condvar, Mutex};
use crate::value::{AsyncPromise, Channel, Value};
use crate::{EvalContext, SemaError};
pub enum IoPoll {
Pending,
Ready(Result<Value, String>),
}
pub struct IoHandle {
poll: RefCell<Box<dyn FnMut() -> IoPoll>>,
abort: RefCell<Option<Box<dyn FnOnce()>>>,
}
impl IoHandle {
pub fn new(f: impl FnMut() -> IoPoll + 'static) -> Self {
Self {
poll: RefCell::new(Box::new(f)),
abort: RefCell::new(None),
}
}
pub fn with_abort(f: impl FnMut() -> IoPoll + 'static, abort: impl FnOnce() + 'static) -> Self {
Self {
poll: RefCell::new(Box::new(f)),
abort: RefCell::new(Some(Box::new(abort))),
}
}
pub fn poll(&self) -> IoPoll {
(self.poll.borrow_mut())()
}
pub fn abort(&self) {
let hook = self.abort.borrow_mut().take();
if let Some(f) = hook {
f();
}
}
}
impl std::fmt::Debug for IoHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("IoHandle")
}
}
#[derive(Debug, Clone)]
pub enum YieldReason {
AwaitPromise(Rc<AsyncPromise>),
ChannelRecv(Rc<Channel>),
ChannelSend(Rc<Channel>, Value),
Sleep(u64),
AwaitIo(Rc<IoHandle>),
}
#[derive(Clone)]
pub enum SchedulerTarget {
All,
One(Rc<AsyncPromise>),
AllOf(Vec<Rc<AsyncPromise>>),
AnyOf(Vec<Rc<AsyncPromise>>),
Timeout(Rc<AsyncPromise>, u64),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulerRunResult {
Complete,
TimedOut,
}
thread_local! {
static YIELD_SIGNAL: RefCell<Option<YieldReason>> = const { RefCell::new(None) };
static RESUME_VALUE: RefCell<Option<Value>> = const { RefCell::new(None) };
static IN_ASYNC_CONTEXT: Cell<bool> = const { Cell::new(false) };
}
pub fn set_yield_signal(reason: YieldReason) {
YIELD_SIGNAL.with(|s| *s.borrow_mut() = Some(reason));
}
pub fn take_yield_signal() -> Option<YieldReason> {
YIELD_SIGNAL.with(|s| s.borrow_mut().take())
}
pub fn set_resume_value(val: Value) {
RESUME_VALUE.with(|r| *r.borrow_mut() = Some(val));
}
pub fn take_resume_value() -> Option<Value> {
RESUME_VALUE.with(|r| r.borrow_mut().take())
}
pub fn in_async_context() -> bool {
IN_ASYNC_CONTEXT.with(|c| c.get())
}
pub fn set_async_context(val: bool) {
IN_ASYNC_CONTEXT.with(|c| c.set(val));
}
pub type SpawnCallbackFn = fn(&EvalContext, Value) -> Result<Value, SemaError>;
thread_local! {
static SPAWN_CALLBACK: Cell<Option<SpawnCallbackFn>> = const { Cell::new(None) };
}
pub fn set_spawn_callback(f: SpawnCallbackFn) {
SPAWN_CALLBACK.with(|cb| cb.set(Some(f)));
}
pub fn call_spawn_callback(ctx: &EvalContext, thunk: Value) -> Result<Value, SemaError> {
let f = SPAWN_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval(
"async/spawn: no async scheduler registered (async requires the VM backend)"
.to_string(),
)
})?;
f(ctx, thunk)
}
pub type RunSchedulerCallbackFn =
fn(&EvalContext, SchedulerTarget) -> Result<SchedulerRunResult, SemaError>;
thread_local! {
static RUN_SCHEDULER_CALLBACK: Cell<Option<RunSchedulerCallbackFn>> = const { Cell::new(None) };
}
pub fn set_run_scheduler_callback(f: RunSchedulerCallbackFn) {
RUN_SCHEDULER_CALLBACK.with(|cb| cb.set(Some(f)));
}
pub type CancelCallbackFn = fn(u64) -> Result<bool, SemaError>;
thread_local! {
static CANCEL_CALLBACK: Cell<Option<CancelCallbackFn>> = const { Cell::new(None) };
}
pub fn set_cancel_callback(f: CancelCallbackFn) {
CANCEL_CALLBACK.with(|cb| cb.set(Some(f)));
}
pub fn call_cancel_callback(task_id: u64) -> Result<bool, SemaError> {
let f = CANCEL_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval("async/cancel: no async scheduler registered".to_string())
})?;
f(task_id)
}
pub type BlockingSleepFn = fn(u64);
thread_local! {
static BLOCKING_SLEEP_CALLBACK: Cell<Option<BlockingSleepFn>> = const { Cell::new(None) };
}
pub fn set_blocking_sleep_callback(f: BlockingSleepFn) {
BLOCKING_SLEEP_CALLBACK.with(|cb| cb.set(Some(f)));
}
pub fn clear_blocking_sleep_callback() {
BLOCKING_SLEEP_CALLBACK.with(|cb| cb.set(None));
}
pub type OtelTakeFn = fn() -> Box<dyn Any>;
pub type OtelInstallFn = fn(Box<dyn Any>) -> Box<dyn Any>;
pub type OtelScopeFn = fn() -> Box<dyn Any>;
thread_local! {
static OTEL_TAKE_CALLBACK: Cell<Option<OtelTakeFn>> = const { Cell::new(None) };
static OTEL_INSTALL_CALLBACK: Cell<Option<OtelInstallFn>> = const { Cell::new(None) };
static OTEL_SCOPE_CALLBACK: Cell<Option<OtelScopeFn>> = const { Cell::new(None) };
}
pub fn set_otel_task_callbacks(take: OtelTakeFn, install: OtelInstallFn, scope: OtelScopeFn) {
OTEL_TAKE_CALLBACK.with(|cb| cb.set(Some(take)));
OTEL_INSTALL_CALLBACK.with(|cb| cb.set(Some(install)));
OTEL_SCOPE_CALLBACK.with(|cb| cb.set(Some(scope)));
}
pub fn current_conversation_scope_boxed() -> Box<dyn Any> {
match OTEL_SCOPE_CALLBACK.with(|cb| cb.get()) {
Some(f) => f(),
None => Box::new(()),
}
}
pub fn take_task_otel() -> Box<dyn Any> {
match OTEL_TAKE_CALLBACK.with(|cb| cb.get()) {
Some(f) => f(),
None => Box::new(()),
}
}
pub fn install_task_otel(ctx: Box<dyn Any>) -> Box<dyn Any> {
match OTEL_INSTALL_CALLBACK.with(|cb| cb.get()) {
Some(f) => f(ctx),
None => Box::new(()),
}
}
pub type InterruptCallbackFn = fn() -> bool;
thread_local! {
static INTERRUPT_CALLBACK: Cell<Option<InterruptCallbackFn>> = const { Cell::new(None) };
}
pub fn set_interrupt_callback(f: InterruptCallbackFn) {
INTERRUPT_CALLBACK.with(|cb| cb.set(Some(f)));
}
pub fn clear_interrupt_callback() {
INTERRUPT_CALLBACK.with(|cb| cb.set(None));
}
#[inline]
pub fn check_interrupt() -> bool {
INTERRUPT_CALLBACK.with(|cb| cb.get()).is_some_and(|f| f())
}
pub fn blocking_sleep_ms(ms: u64) {
if let Some(f) = BLOCKING_SLEEP_CALLBACK.with(|cb| cb.get()) {
f(ms);
return;
}
#[cfg(not(target_arch = "wasm32"))]
std::thread::sleep(std::time::Duration::from_millis(ms));
#[cfg(target_arch = "wasm32")]
let _ = ms; }
static IO_SIGNAL: (Mutex<u64>, Condvar) = (Mutex::new(0), Condvar::new());
pub fn notify_io_complete() {
let (lock, cvar) = &IO_SIGNAL;
if let Ok(mut gen) = lock.lock() {
*gen = gen.wrapping_add(1);
cvar.notify_all();
}
}
pub fn io_park(timeout_ms: u64) {
let (lock, cvar) = &IO_SIGNAL;
if let Ok(gen) = lock.lock() {
let _ = cvar.wait_timeout(gen, std::time::Duration::from_millis(timeout_ms));
}
}
pub fn call_run_scheduler(
ctx: &EvalContext,
target: Option<Rc<AsyncPromise>>,
) -> Result<(), SemaError> {
let f = RUN_SCHEDULER_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval(
"async: no async scheduler registered (async requires the VM backend)".to_string(),
)
})?;
let target = match target {
Some(promise) => SchedulerTarget::One(promise),
None => SchedulerTarget::All,
};
f(ctx, target).map(|_| ())
}
pub fn call_run_scheduler_all_of(
ctx: &EvalContext,
targets: Vec<Rc<AsyncPromise>>,
) -> Result<(), SemaError> {
let f = RUN_SCHEDULER_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval(
"async: no async scheduler registered (async requires the VM backend)".to_string(),
)
})?;
f(ctx, SchedulerTarget::AllOf(targets)).map(|_| ())
}
pub fn call_run_scheduler_any_of(
ctx: &EvalContext,
targets: Vec<Rc<AsyncPromise>>,
) -> Result<(), SemaError> {
let f = RUN_SCHEDULER_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval(
"async: no async scheduler registered (async requires the VM backend)".to_string(),
)
})?;
f(ctx, SchedulerTarget::AnyOf(targets)).map(|_| ())
}
pub fn call_run_scheduler_timeout(
ctx: &EvalContext,
target: Rc<AsyncPromise>,
timeout_ms: u64,
) -> Result<SchedulerRunResult, SemaError> {
let f = RUN_SCHEDULER_CALLBACK.with(|cb| cb.get()).ok_or_else(|| {
SemaError::eval(
"async: no async scheduler registered (async requires the VM backend)".to_string(),
)
})?;
f(ctx, SchedulerTarget::Timeout(target, timeout_ms))
}
#[cfg(test)]
mod tests {
use super::*;
use std::rc::Rc;
#[test]
fn io_handle_abort_runs_once() {
let count = Rc::new(Cell::new(0));
let c = count.clone();
let h = IoHandle::with_abort(|| IoPoll::Pending, move || c.set(c.get() + 1));
assert_eq!(count.get(), 0, "abort not run until called");
h.abort();
assert_eq!(count.get(), 1, "abort runs on first call");
h.abort();
h.abort();
assert_eq!(count.get(), 1, "abort is one-shot — later calls are no-ops");
}
#[test]
fn io_handle_new_abort_is_noop() {
let h = IoHandle::new(|| IoPoll::Ready(Ok(Value::nil())));
h.abort();
assert!(matches!(h.poll(), IoPoll::Ready(Ok(_))));
}
#[test]
fn io_handle_poll_works_after_abort() {
let h = IoHandle::with_abort(|| IoPoll::Ready(Ok(Value::int(7))), || {});
h.abort();
match h.poll() {
IoPoll::Ready(Ok(v)) => assert_eq!(v, Value::int(7)),
_ => panic!("poll should still return Ready after abort"),
}
}
}