use crate::script::convert::err;
use crate::script::defaults::ScriptDefaults;
use rhai::{Array, Dynamic, Engine, EvalAltResult, FnPtr, Shared, AST};
use std::sync::{
mpsc::{self, Receiver, Sender, SyncSender},
Arc, Mutex,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
type ThreadResult = Result<Dynamic, String>;
type ThreadInner = Arc<Mutex<Option<JoinHandle<ThreadResult>>>>;
#[derive(Clone)]
pub struct ThreadHandle {
inner: ThreadInner,
}
#[derive(Clone)]
pub struct RhaiSender {
kind: SenderKind,
}
#[derive(Clone)]
enum SenderKind {
Unbounded(Sender<Dynamic>),
Bounded(SyncSender<Dynamic>),
}
#[derive(Clone)]
pub struct RhaiReceiver {
inner: Arc<Mutex<Receiver<Dynamic>>>,
}
pub fn register(engine: &mut Engine, ast: Shared<AST>, defaults: ScriptDefaults) {
let defaults = Arc::new(defaults);
engine.register_type_with_name::<ThreadHandle>("ThreadHandle");
engine.register_type_with_name::<RhaiSender>("RhaiSender");
engine.register_type_with_name::<RhaiReceiver>("RhaiReceiver");
engine.register_fn("tid", || -> i64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
thread::current().id().hash(&mut h);
(h.finish() as i64).abs()
});
engine.register_fn("sleep", |ms: i64| {
if ms > 0 {
thread::sleep(Duration::from_millis(ms as u64));
}
});
let ast_for_spawn = ast.clone();
let d = defaults.clone();
engine.register_fn(
"thread_spawn",
move |fn_ptr: FnPtr| -> Result<ThreadHandle, Box<EvalAltResult>> {
Ok(spawn_closure(ast_for_spawn.clone(), d.clone(), fn_ptr, Vec::new()))
},
);
let ast_for_spawn = ast.clone();
let d = defaults.clone();
engine.register_fn(
"thread_spawn",
move |fn_ptr: FnPtr, arg: Dynamic| -> Result<ThreadHandle, Box<EvalAltResult>> {
Ok(spawn_closure(ast_for_spawn.clone(), d.clone(), fn_ptr, vec![arg]))
},
);
let ast_for_spawn = ast.clone();
let d = defaults.clone();
engine.register_fn(
"thread_spawn",
move |fn_ptr: FnPtr, args: Array| -> Result<ThreadHandle, Box<EvalAltResult>> {
Ok(spawn_closure(ast_for_spawn.clone(), d.clone(), fn_ptr, args))
},
);
engine.register_fn(
"join",
|h: &mut ThreadHandle| -> Result<Dynamic, Box<EvalAltResult>> {
let handle = {
let mut guard = h
.inner
.lock()
.map_err(|_| err("join: thread-handle mutex poisoned"))?;
guard.take()
};
match handle {
None => Err(err("join: handle already joined")),
Some(jh) => match jh.join() {
Err(_) => Err(err("join: spawned thread panicked")),
Ok(Err(msg)) => Err(err(format!("spawned task error: {msg}"))),
Ok(Ok(v)) => Ok(v),
},
}
},
);
engine.register_fn("channel", || -> Array {
let (tx, rx) = mpsc::channel::<Dynamic>();
vec![
Dynamic::from(RhaiSender {
kind: SenderKind::Unbounded(tx),
}),
Dynamic::from(RhaiReceiver {
inner: Arc::new(Mutex::new(rx)),
}),
]
});
engine.register_fn("channel_bounded", |capacity: i64| -> Array {
let cap = capacity.max(0) as usize;
let (tx, rx) = mpsc::sync_channel::<Dynamic>(cap);
vec![
Dynamic::from(RhaiSender {
kind: SenderKind::Bounded(tx),
}),
Dynamic::from(RhaiReceiver {
inner: Arc::new(Mutex::new(rx)),
}),
]
});
engine.register_fn(
"send",
|s: &mut RhaiSender, val: Dynamic| -> Result<(), Box<EvalAltResult>> {
match &s.kind {
SenderKind::Unbounded(tx) => tx
.send(val)
.map_err(|e| err(format!("send: channel closed ({e})"))),
SenderKind::Bounded(tx) => tx
.send(val)
.map_err(|e| err(format!("send: channel closed ({e})"))),
}
},
);
engine.register_fn(
"try_send",
|s: &mut RhaiSender, val: Dynamic| -> Result<bool, Box<EvalAltResult>> {
match &s.kind {
SenderKind::Unbounded(tx) => {
tx.send(val)
.map_err(|e| err(format!("try_send: channel closed ({e})")))?;
Ok(true)
}
SenderKind::Bounded(tx) => match tx.try_send(val) {
Ok(()) => Ok(true),
Err(mpsc::TrySendError::Full(_)) => Ok(false),
Err(mpsc::TrySendError::Disconnected(_)) => {
Err(err("try_send: channel closed"))
}
},
}
},
);
engine.register_fn(
"recv",
|r: &mut RhaiReceiver| -> Result<Dynamic, Box<EvalAltResult>> {
let guard = r
.inner
.lock()
.map_err(|_| err("recv: receiver mutex poisoned"))?;
guard
.recv()
.map_err(|_| err("recv: all senders dropped"))
},
);
engine.register_fn(
"recv",
|r: &mut RhaiReceiver, timeout_ms: i64| -> Result<Dynamic, Box<EvalAltResult>> {
let guard = r
.inner
.lock()
.map_err(|_| err("recv: receiver mutex poisoned"))?;
guard
.recv_timeout(Duration::from_millis(timeout_ms.max(0) as u64))
.map_err(|e| err(format!("recv: timeout or closed ({e})")))
},
);
engine.register_fn("try_recv", |r: &mut RhaiReceiver| -> Dynamic {
let Ok(guard) = r.inner.lock() else {
return Dynamic::UNIT;
};
guard.try_recv().unwrap_or(Dynamic::UNIT)
});
}
fn spawn_closure(
ast: Shared<AST>,
defaults: Arc<ScriptDefaults>,
fn_ptr: FnPtr,
args: Vec<Dynamic>,
) -> ThreadHandle {
let jh = thread::spawn(move || -> Result<Dynamic, String> {
let mut engine = crate::script::engine::build_engine(&defaults);
register(&mut engine, ast.clone(), (*defaults).clone());
fn_ptr
.call::<Dynamic>(&engine, &ast, args)
.map_err(|e| e.to_string())
});
ThreadHandle {
inner: Arc::new(Mutex::new(Some(jh))),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn build_engine_with_threads() -> Engine {
let mut e = Engine::new();
super::super::helpers::register(&mut e);
let empty = Shared::new(AST::empty());
register(&mut e, empty, ScriptDefaults::default());
e
}
#[test]
fn tid_is_an_integer() {
let e = build_engine_with_threads();
let t: i64 = e.eval("tid()").expect("eval");
assert!(t > 0);
}
#[test]
fn channel_send_recv() {
let e = build_engine_with_threads();
let v: i64 = e
.eval(
r#"
let c = channel();
let tx = c[0];
let rx = c[1];
send(tx, 42);
recv(rx)
"#,
)
.expect("eval");
assert_eq!(v, 42);
}
#[test]
fn bounded_channel_try_send_fills() {
let e = build_engine_with_threads();
let ok: bool = e
.eval(
r#"
let c = channel_bounded(1);
let tx = c[0];
if !try_send(tx, 1) { return false; }
// Second should return false (channel full).
!try_send(tx, 2)
"#,
)
.expect("eval");
assert!(ok);
}
#[test]
fn recv_with_timeout_errors_when_empty() {
let e = build_engine_with_threads();
let res: Result<Dynamic, _> = e.eval(
r#"
let c = channel();
let rx = c[1];
recv(rx, 10)
"#,
);
assert!(res.is_err());
}
}