1use crate::vval::*;
18use crate::threads::*;
19use crate::compiler::*;
20
21const RPC_MSG_CALL : i64 = 1;
22const RPC_MSG_SEND : i64 = 2;
23
24#[derive(Clone)]
25pub struct RPCHandle {
26 free_queue: AValChannel,
27 error_channel: AValChannel,
28 request_queue: AValChannel,
29}
30
31impl Default for RPCHandle {
32 fn default() -> Self { Self::new() }
33}
34
35
36impl RPCHandle {
37 pub fn new() -> Self {
38 Self {
39 free_queue: AValChannel::new_direct(),
40 error_channel: AValChannel::new_direct(),
41 request_queue: AValChannel::new_direct(),
42 }
43 }
44
45 pub fn register_global_functions(&self, prefix: &str, ctx: &mut EvalContext) {
46 let caller = self.clone();
47 ctx.set_global_var(&format!("{}_call", prefix),
48 &VValFun::new_fun(
49 move |env: &mut Env, argc: usize| {
50 let args =
51 if argc == 1 { VVal::None }
52 else {
53 let a = VVal::vec();
54 for i in 1..argc {
55 a.push(env.arg(i).clone());
56 }
57 a
58 };
59 Ok(caller.call(&env.arg(0).s_raw(), args))
60 }, Some(1), None, false));
61
62 let caller = self.clone();
63 ctx.set_global_var(&format!("{}_send", prefix),
64 &VValFun::new_fun(
65 move |env: &mut Env, argc: usize| {
66 let args =
67 if argc == 1 { VVal::None }
68 else {
69 let a = VVal::vec();
70 for i in 1..argc {
71 a.push(env.arg(i).clone());
72 }
73 a
74 };
75 caller.send(&env.arg(0).s_raw(), args);
76 Ok(VVal::None)
77 }, Some(1), None, false));
78 }
79
80 fn get_request(&self) -> Option<AtomicAValSlot> {
81 match self.free_queue.try_recv() {
82 VVal::Opt(Some(o)) => {
83 if let VVal::Usr(mut ud) = (*o).clone() {
84 if let Some(ud) = ud.as_any().downcast_ref::<AtomicAValSlot>() {
85 Some(ud.clone())
86 } else {
87 Some(AtomicAValSlot::new())
88 }
89 } else {
90 None
91 }
92 },
93 VVal::Opt(None)
94 | VVal::None => Some(AtomicAValSlot::new()),
95 _ => None,
96 }
97 }
98
99 pub fn call(&self, target: &str, args: VVal) -> VVal {
100 let resp = self.get_request().expect("AtomicAValSlot allocation");
101
102 let v = VVal::vec();
103 v.push(VVal::Int(RPC_MSG_CALL));
104 v.push(VVal::Usr(Box::new(resp.clone())));
105 v.push(VVal::new_sym(target));
106 v.push(args);
107
108 self.request_queue.send(&v);
109 let ret = resp.recv_timeout(None);
110 self.free_queue.send(&v.at(1).expect("the AtomicAValSlot"));
111 ret
112 }
113
114 pub fn send(&self, target: &str, args: VVal) {
115 let v = VVal::vec();
116 v.push(VVal::Int(RPC_MSG_SEND));
117 v.push(VVal::None);
118 v.push(VVal::new_sym(target));
119 v.push(args);
120
121 self.request_queue.send(&v);
122 }
123
124 pub fn fetch_error(&self) -> Option<VVal> {
125 let msg = self.error_channel.try_recv();
126 if msg.is_some() {
127 Some(msg)
128 } else {
129 None
130 }
131 }
132}
133
134pub enum RPCHandlerError {
135 Disconnected,
136}
137
138pub fn rpc_handler(
139 ctx: &mut EvalContext,
140 handle: &RPCHandle,
141 interval_timeout: std::time::Duration)
142{
143 let quit = std::rc::Rc::new(std::cell::RefCell::new(false));
144
145 let qr = quit.clone();
146 ctx.set_global_var(
147 "thread:quit",
148 &VValFun::new_fun(move |_env: &mut Env, _argc: usize| {
149 *qr.borrow_mut() = true;
150 Ok(VVal::None)
151 }, Some(0), Some(0), false));
152
153 loop {
154 if let Err(RPCHandlerError::Disconnected) =
155 rpc_handler_step(ctx, handle, interval_timeout)
156 {
157 break;
158 }
159
160 if *quit.borrow() { break; }
161 }
162}
163
164pub fn rpc_handler_step(
165 ctx: &mut EvalContext,
166 handle: &RPCHandle,
167 timeout: std::time::Duration) -> Result<(), RPCHandlerError>
168{
169 let res = handle.request_queue.recv_timeout(Some(timeout));
170 if res.is_err() {
171 Err(RPCHandlerError::Disconnected)
172
173 } else if let VVal::Opt(Some(m)) = res {
174 let cmd = m.at(0).unwrap_or(VVal::None).i();
175 let resp = m.at(1).unwrap_or(VVal::None);
176 let name = m.at(2).unwrap_or(VVal::None);
177 let args = m.at(3).unwrap_or(VVal::None);
178
179 match cmd {
180 RPC_MSG_CALL => {
181 if let VVal::Usr(mut resp) = resp {
182 let resp =
183 resp.as_any()
184 .downcast_mut::<AtomicAValSlot>()
185 .expect("AtomicAValSlot in RPC_MSG_CALL");
186
187 name.with_s_ref(|name| {
188 if let Some(v) = ctx.get_global_var(name) {
189 let arg =
190 if args.is_none() { vec![] }
191 else { args.to_vec() };
192 let ret = match ctx.call(&v, &arg) {
193 Ok(v) => v,
194 Err(e) =>
195 VVal::err_msg(
196 &format!("Panic in call to '{}': {:?}",
197 name, e)),
198 };
199 resp.send(&ret);
200 } else {
201 resp.send(
202 &VVal::err_msg(
203 &format!("No such global on call: {}", name)));
204 }
205 });
206 } else {
207 panic!("Didn't get a AtomicAValSlot in RPC_MSG_CALL");
208 }
209 },
210 RPC_MSG_SEND => {
211 name.with_s_ref(|name| {
212 if let Some(v) = ctx.get_global_var(name) {
213 let arg =
214 if args.is_none() { vec![] }
215 else { args.to_vec() };
216 let ret = ctx.call(&v, &arg).unwrap_or(VVal::None);
217 if ret.is_err() {
218 handle.error_channel.send(
219 &VVal::err_msg(&format!("Error on send: {}", ret.s())));
220 }
221 } else {
222 handle.error_channel.send(
223 &VVal::err_msg(&format!("No such global on send: {}", name)));
224 }
225 });
226 },
227 _ => (),
228 }
229
230 Ok(())
231 } else {
232 Ok(())
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 #[test]
239 fn check_rpc() {
240 use crate::vval::*;
241 use crate::rpc_helper::*;
242
243 let mut ctx = crate::compiler::EvalContext::new_default();
245
246 let msg_handle = RPCHandle::new();
247 let sender = msg_handle.clone();
248 sender.register_global_functions("worker", &mut ctx);
249
250 let t = std::thread::spawn(move || {
251 let mut ctx = crate::compiler::EvalContext::new_default();
252
253 ctx.eval("!:global X = 123").unwrap();
254
255 rpc_handler(
256 &mut ctx, &msg_handle, std::time::Duration::from_secs(1));
257 });
258
259 ctx.eval("worker_send :mfeofe").unwrap();
260 let ret = ctx.eval("worker_call :displayln \"hello world from worker thread!\";").unwrap();
261 assert_eq!(ret.s(), "$e $[\"No such global on call: displayln\",\"\"]");
262 ctx.eval("std:assert_eq (worker_call :std:eval \"X\") 123;").unwrap();
263
264 sender.call("thread:quit", VVal::None);
265
266 let err = sender.fetch_error().unwrap();
267 assert_eq!(err.s(), "$o($e $[\"No such global on send: mfeofe\",\"\"])");
268
269 t.join().unwrap();
270 }
271}