use crate::vval::*;
use crate::threads::*;
use crate::compiler::*;
const RPC_MSG_CALL : i64 = 1;
const RPC_MSG_SEND : i64 = 2;
#[derive(Clone)]
pub struct RPCHandle {
free_queue: AValChannel,
error_channel: AValChannel,
request_queue: AValChannel,
}
impl Default for RPCHandle {
fn default() -> Self { Self::new() }
}
impl RPCHandle {
pub fn new() -> Self {
Self {
free_queue: AValChannel::new_direct(),
error_channel: AValChannel::new_direct(),
request_queue: AValChannel::new_direct(),
}
}
pub fn register_global_functions(&self, prefix: &str, ctx: &mut EvalContext) {
let caller = self.clone();
ctx.set_global_var(&format!("{}_call", prefix),
&VValFun::new_fun(
move |env: &mut Env, argc: usize| {
let args =
if argc == 1 { VVal::None }
else {
let a = VVal::vec();
for i in 1..argc {
a.push(env.arg(i).clone());
}
a
};
Ok(caller.call(&env.arg(0).s_raw(), args))
}, Some(1), None, false));
let caller = self.clone();
ctx.set_global_var(&format!("{}_send", prefix),
&VValFun::new_fun(
move |env: &mut Env, argc: usize| {
let args =
if argc == 1 { VVal::None }
else {
let a = VVal::vec();
for i in 1..argc {
a.push(env.arg(i).clone());
}
a
};
caller.send(&env.arg(0).s_raw(), args);
Ok(VVal::None)
}, Some(1), None, false));
}
fn get_request(&self) -> Option<AtomicAValSlot> {
match self.free_queue.try_recv() {
VVal::Opt(Some(o)) => {
if let VVal::Usr(mut ud) = (*o).clone() {
if let Some(ud) = ud.as_any().downcast_ref::<AtomicAValSlot>() {
Some(ud.clone())
} else {
Some(AtomicAValSlot::new())
}
} else {
None
}
},
VVal::Opt(None)
| VVal::None => Some(AtomicAValSlot::new()),
_ => None,
}
}
pub fn call(&self, target: &str, args: VVal) -> VVal {
let resp = self.get_request().expect("AtomicAValSlot allocation");
let v = VVal::vec();
v.push(VVal::Int(RPC_MSG_CALL));
v.push(VVal::Usr(Box::new(resp.clone())));
v.push(VVal::new_sym(target));
v.push(args);
self.request_queue.send(&v);
let ret = resp.recv_timeout(None);
self.free_queue.send(&v.at(1).expect("the AtomicAValSlot"));
ret
}
pub fn send(&self, target: &str, args: VVal) {
let v = VVal::vec();
v.push(VVal::Int(RPC_MSG_SEND));
v.push(VVal::None);
v.push(VVal::new_sym(target));
v.push(args);
self.request_queue.send(&v);
}
pub fn fetch_error(&self) -> Option<VVal> {
let msg = self.error_channel.try_recv();
if msg.is_some() {
Some(msg)
} else {
None
}
}
}
pub enum RPCHandlerError {
Disconnected,
}
pub fn rpc_handler(
ctx: &mut EvalContext,
handle: &RPCHandle,
interval_timeout: std::time::Duration)
{
let quit = std::rc::Rc::new(std::cell::RefCell::new(false));
let qr = quit.clone();
ctx.set_global_var(
"thread:quit",
&VValFun::new_fun(move |_env: &mut Env, _argc: usize| {
*qr.borrow_mut() = true;
Ok(VVal::None)
}, Some(0), Some(0), false));
loop {
if let Err(RPCHandlerError::Disconnected) =
rpc_handler_step(ctx, handle, interval_timeout)
{
break;
}
if *quit.borrow() { break; }
}
}
pub fn rpc_handler_step(
ctx: &mut EvalContext,
handle: &RPCHandle,
timeout: std::time::Duration) -> Result<(), RPCHandlerError>
{
let res = handle.request_queue.recv_timeout(Some(timeout));
if res.is_err() {
Err(RPCHandlerError::Disconnected)
} else if let VVal::Opt(Some(m)) = res {
let cmd = m.at(0).unwrap_or_else(|| VVal::None).i();
let resp = m.at(1).unwrap_or_else(|| VVal::None);
let name = m.at(2).unwrap_or_else(|| VVal::None);
let args = m.at(3).unwrap_or_else(|| VVal::None);
match cmd {
RPC_MSG_CALL => {
if let VVal::Usr(mut resp) = resp {
let resp =
resp.as_any()
.downcast_mut::<AtomicAValSlot>()
.expect("AtomicAValSlot in RPC_MSG_CALL");
name.with_s_ref(|name| {
if let Some(v) = ctx.get_global_var(name) {
let arg =
if args.is_none() { vec![] }
else { args.to_vec() };
let ret = match ctx.call(&v, &arg) {
Ok(v) => v,
Err(e) =>
VVal::err_msg(
&format!("Panic in call to '{}': {:?}",
name, e)),
};
resp.send(&ret);
} else {
resp.send(
&VVal::err_msg(
&format!("No such global on call: {}", name)));
}
});
} else {
panic!("Didn't get a AtomicAValSlot in RPC_MSG_CALL");
}
},
RPC_MSG_SEND => {
name.with_s_ref(|name| {
if let Some(v) = ctx.get_global_var(name) {
let arg =
if args.is_none() { vec![] }
else { args.to_vec() };
let ret = ctx.call(&v, &arg).unwrap_or_else(|_| VVal::None);
if ret.is_err() {
handle.error_channel.send(
&VVal::err_msg(&format!("Error on send: {}", ret.s())));
}
} else {
handle.error_channel.send(
&VVal::err_msg(&format!("No such global on send: {}", name)));
}
});
},
_ => (),
}
Ok(())
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
#[test]
fn check_rpc() {
use crate::vval::*;
use crate::rpc_helper::*;
let mut ctx = crate::compiler::EvalContext::new_default();
let msg_handle = RPCHandle::new();
let sender = msg_handle.clone();
sender.register_global_functions("worker", &mut ctx);
let t = std::thread::spawn(move || {
let mut ctx = crate::compiler::EvalContext::new_default();
ctx.eval("!:global X = 123").unwrap();
rpc_handler(
&mut ctx, &msg_handle, std::time::Duration::from_secs(1));
});
ctx.eval("worker_send :mfeofe").unwrap();
let ret = ctx.eval("worker_call :displayln \"hello world from worker thread!\";").unwrap();
assert_eq!(ret.s(), "$e $[\"No such global on call: displayln\",\"\"]");
ctx.eval("std:assert_eq (worker_call :std:eval \"X\") 123;").unwrap();
sender.call("thread:quit", VVal::None);
let err = sender.fetch_error().unwrap();
assert_eq!(err.s(), "$o($e $[\"No such global on send: mfeofe\",\"\"])");
t.join().unwrap();
}
}