wlambda/
rpc_helper.rs

1// Copyright (c) 2020-2022 Weird Constructor <weirdconstructor@gmail.com>
2// This is a part of WLambda. See README.md and COPYING for details.
3
4/*!
5Provides a helper to create a WLambda evaluation worker thread in your program.
6
7This is helpful if you want to provide a main thread that defines and evaluates
8a WLambda program in a multi threaded Rust program.
9
10For instance it's used by a HTTP server I wrote that used worker threads to
11handle requests. In that environment the central WLambda main thread was then
12queried for executing the requests. This of course makes WLambda the bottle
13neck for request responses. But if WLambda decided to just serve a file, the
14file I/O could be done outside the WLambda control flow.
15*/
16
17use crate::vval::*;
18use crate::threads::*;
19use crate::compiler::*;
20
21const RPC_MSG_CALL : i64 = 1;
22const RPC_MSG_SEND : i64 = 2;
23
24#[derive(Clone)]
25pub struct RPCHandle {
26    free_queue:     AValChannel,
27    error_channel:  AValChannel,
28    request_queue:  AValChannel,
29}
30
31impl Default for RPCHandle {
32    fn default() -> Self { Self::new() }
33}
34
35
36impl RPCHandle {
37    pub fn new() -> Self {
38        Self {
39            free_queue:     AValChannel::new_direct(),
40            error_channel:  AValChannel::new_direct(),
41            request_queue:  AValChannel::new_direct(),
42        }
43    }
44
45    pub fn register_global_functions(&self, prefix: &str, ctx: &mut EvalContext) {
46        let caller = self.clone();
47        ctx.set_global_var(&format!("{}_call", prefix),
48            &VValFun::new_fun(
49                move |env: &mut Env, argc: usize| {
50                    let args =
51                        if argc == 1 { VVal::None }
52                        else {
53                            let a = VVal::vec();
54                            for i in 1..argc {
55                                a.push(env.arg(i).clone());
56                            }
57                            a
58                        };
59                    Ok(caller.call(&env.arg(0).s_raw(), args))
60                }, Some(1), None, false));
61
62        let caller = self.clone();
63        ctx.set_global_var(&format!("{}_send", prefix),
64            &VValFun::new_fun(
65                move |env: &mut Env, argc: usize| {
66                    let args =
67                        if argc == 1 { VVal::None }
68                        else {
69                            let a = VVal::vec();
70                            for i in 1..argc {
71                                a.push(env.arg(i).clone());
72                            }
73                            a
74                        };
75                    caller.send(&env.arg(0).s_raw(), args);
76                    Ok(VVal::None)
77                }, Some(1), None, false));
78    }
79
80    fn get_request(&self) -> Option<AtomicAValSlot> {
81        match self.free_queue.try_recv() {
82            VVal::Opt(Some(o)) => {
83                if let VVal::Usr(mut ud) = (*o).clone() {
84                    if let Some(ud) = ud.as_any().downcast_ref::<AtomicAValSlot>() {
85                        Some(ud.clone())
86                    } else {
87                        Some(AtomicAValSlot::new())
88                    }
89                } else {
90                    None
91                }
92            },
93            VVal::Opt(None)
94            | VVal::None => Some(AtomicAValSlot::new()),
95            _ => None,
96        }
97    }
98
99    pub fn call(&self, target: &str, args: VVal) -> VVal {
100        let resp = self.get_request().expect("AtomicAValSlot allocation");
101
102        let v = VVal::vec();
103        v.push(VVal::Int(RPC_MSG_CALL));
104        v.push(VVal::Usr(Box::new(resp.clone())));
105        v.push(VVal::new_sym(target));
106        v.push(args);
107
108        self.request_queue.send(&v);
109        let ret = resp.recv_timeout(None);
110        self.free_queue.send(&v.at(1).expect("the AtomicAValSlot"));
111        ret
112    }
113
114    pub fn send(&self, target: &str, args: VVal) {
115        let v = VVal::vec();
116        v.push(VVal::Int(RPC_MSG_SEND));
117        v.push(VVal::None);
118        v.push(VVal::new_sym(target));
119        v.push(args);
120
121        self.request_queue.send(&v);
122    }
123
124    pub fn fetch_error(&self) -> Option<VVal> {
125        let msg = self.error_channel.try_recv();
126        if msg.is_some() {
127            Some(msg)
128        } else {
129            None
130        }
131    }
132}
133
134pub enum RPCHandlerError {
135    Disconnected,
136}
137
138pub fn rpc_handler(
139    ctx: &mut EvalContext,
140    handle: &RPCHandle,
141    interval_timeout: std::time::Duration)
142{
143    let quit = std::rc::Rc::new(std::cell::RefCell::new(false));
144
145    let qr = quit.clone();
146    ctx.set_global_var(
147        "thread:quit",
148        &VValFun::new_fun(move |_env: &mut Env, _argc: usize| {
149            *qr.borrow_mut() = true;
150            Ok(VVal::None)
151        }, Some(0), Some(0), false));
152
153    loop {
154        if let Err(RPCHandlerError::Disconnected) =
155            rpc_handler_step(ctx, handle, interval_timeout)
156        {
157            break;
158        }
159
160        if *quit.borrow() { break; }
161    }
162}
163
164pub fn rpc_handler_step(
165    ctx: &mut EvalContext,
166    handle: &RPCHandle,
167    timeout: std::time::Duration) -> Result<(), RPCHandlerError>
168{
169    let res = handle.request_queue.recv_timeout(Some(timeout));
170    if res.is_err() {
171        Err(RPCHandlerError::Disconnected)
172
173    } else if let VVal::Opt(Some(m)) = res {
174        let cmd  = m.at(0).unwrap_or(VVal::None).i();
175        let resp = m.at(1).unwrap_or(VVal::None);
176        let name = m.at(2).unwrap_or(VVal::None);
177        let args = m.at(3).unwrap_or(VVal::None);
178
179        match cmd {
180            RPC_MSG_CALL => {
181                if let VVal::Usr(mut resp) = resp {
182                    let resp =
183                        resp.as_any()
184                            .downcast_mut::<AtomicAValSlot>()
185                            .expect("AtomicAValSlot in RPC_MSG_CALL");
186
187                    name.with_s_ref(|name| {
188                        if let Some(v) = ctx.get_global_var(name) {
189                            let arg =
190                                if args.is_none() { vec![] }
191                                else { args.to_vec() };
192                            let ret = match ctx.call(&v, &arg) {
193                                Ok(v)  => v,
194                                Err(e) =>
195                                    VVal::err_msg(
196                                        &format!("Panic in call to '{}': {:?}",
197                                                 name, e)),
198                            };
199                            resp.send(&ret);
200                        } else {
201                            resp.send(
202                                &VVal::err_msg(
203                                    &format!("No such global on call: {}", name)));
204                        }
205                    });
206                } else {
207                    panic!("Didn't get a AtomicAValSlot in RPC_MSG_CALL");
208                }
209            },
210            RPC_MSG_SEND => {
211                name.with_s_ref(|name| {
212                    if let Some(v) = ctx.get_global_var(name) {
213                        let arg =
214                            if args.is_none() { vec![] }
215                            else { args.to_vec() };
216                        let ret = ctx.call(&v, &arg).unwrap_or(VVal::None);
217                        if ret.is_err() {
218                            handle.error_channel.send(
219                                &VVal::err_msg(&format!("Error on send: {}", ret.s())));
220                        }
221                    } else {
222                        handle.error_channel.send(
223                            &VVal::err_msg(&format!("No such global on send: {}", name)));
224                    }
225                });
226            },
227            _ => (),
228        }
229
230        Ok(())
231    } else {
232        Ok(())
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    #[test]
239    fn check_rpc() {
240        use crate::vval::*;
241        use crate::rpc_helper::*;
242
243        // Get some random user thread:
244        let mut ctx = crate::compiler::EvalContext::new_default();
245
246        let msg_handle = RPCHandle::new();
247        let sender = msg_handle.clone();
248        sender.register_global_functions("worker", &mut ctx);
249
250        let t = std::thread::spawn(move || {
251            let mut ctx = crate::compiler::EvalContext::new_default();
252
253            ctx.eval("!:global X = 123").unwrap();
254
255            rpc_handler(
256                &mut ctx, &msg_handle, std::time::Duration::from_secs(1));
257        });
258
259        ctx.eval("worker_send :mfeofe").unwrap();
260        let ret = ctx.eval("worker_call :displayln \"hello world from worker thread!\";").unwrap();
261        assert_eq!(ret.s(), "$e $[\"No such global on call: displayln\",\"\"]");
262        ctx.eval("std:assert_eq (worker_call :std:eval \"X\") 123;").unwrap();
263
264        sender.call("thread:quit", VVal::None);
265
266        let err = sender.fetch_error().unwrap();
267        assert_eq!(err.s(), "$o($e $[\"No such global on send: mfeofe\",\"\"])");
268
269        t.join().unwrap();
270    }
271}