actix_lua/
actor.rs

1use ::actix::prelude::*;
2use ::actix::ActorContext;
3use rlua::Error as LuaError;
4use rlua::{FromLua, Function, Lua, MultiValue, ToLua, Value};
5
6use crate::message::LuaMessage;
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::str;
10use std::time::Duration;
11
12/// Top level struct which holds a lua state for itself.
13///
14/// It provides most of the actix context API to the lua enviroment.
15///
16/// You can create new `LuaActor` with [`LuaActorBuilder`].
17///
18///
19/// ### `ctx.msg`
20/// The message sent to Lua actor.
21///
22/// ### `ctx.notify(msg)`
23/// Send message `msg` to self.
24///
25/// ### `ctx.notify_later(msg, seconds)`
26/// Send message `msg` to self after specified period of time.
27///
28/// ### `local result = ctx.send(recipient, msg)`
29/// Send message `msg` to `recipient asynchronously and wait for response.
30///
31/// Calling `ctx.send` yield the current coroutine and returns a `ThreadYield(thread_id)` message.
32/// LuaActor will wait for the response and resume the yielded coroutine once the response is returned.
33///
34/// Equivalent to `actix::Recipient.send`.
35///
36/// ### `ctx.do_send(recipient, msg)`
37/// Send message `msg` to `recipient`.
38///
39/// Equivalent to `actix::Recipient.do_send`.
40///
41/// ### `ctx.terminate()`
42/// Terminate actor execution.
43///
44/// [`LuaActorBuilder`]: struct.LuaActorBuilder.html
45pub struct LuaActor {
46    vm: Lua,
47    pub recipients: HashMap<String, Recipient<LuaMessage>>,
48}
49
50impl LuaActor {
51    pub fn new_with_vm(
52        vm: Lua,
53        started: Option<String>,
54        handle: Option<String>,
55        stopped: Option<String>,
56    ) -> Result<LuaActor, LuaError> {
57        let prelude = include_str!("lua/prelude.lua");
58        vm.context(|ctx| {
59            ctx.load(prelude).set_name("Prelude")?.exec()?;
60            {
61                let load: Function = ctx.globals().get("__load")?;
62                if let Some(script) = started {
63                    let res = load.call::<(String, String), ()>((script, "started".to_string()));
64
65                    if let Err(e) = res {
66                        return Result::Err(e);
67                    }
68                }
69                if let Some(script) = handle {
70                    let res = load.call::<(String, String), ()>((script, "handle".to_string()));
71
72                    if let Err(e) = res {
73                        return Result::Err(e);
74                    }
75                }
76                if let Some(script) = stopped {
77                    let res = load.call::<(String, String), ()>((script, "stopped".to_string()));
78
79                    if let Err(e) = res {
80                        return Result::Err(e);
81                    }
82                }
83            }
84            Ok(())
85        })?;
86
87        Result::Ok(LuaActor {
88            vm,
89            recipients: HashMap::new(),
90        })
91    }
92
93    pub fn new(
94        started: Option<String>,
95        handle: Option<String>,
96        stopped: Option<String>,
97    ) -> Result<LuaActor, LuaError> {
98        let vm = Lua::new();
99        Self::new_with_vm(vm, started, handle, stopped)
100    }
101
102    /// Add a recipient to the actor's recipient list.
103    /// You can send message to the recipient via `name` with the context API `ctx.send(name, message)`
104    pub fn add_recipients(
105        &mut self,
106        name: &str,
107        rec: Recipient<LuaMessage>,
108    ) -> Option<Recipient<LuaMessage>> {
109        self.recipients.insert(name.to_string(), rec)
110    }
111}
112
113// Remove all `self` usage with a independent function `invoke`.
114fn invoke(
115    self_addr: &Recipient<SendAttempt>,
116    ctx: &mut Context<LuaActor>,
117    vm: &mut Lua,
118    recs: &mut HashMap<String, Recipient<LuaMessage>>,
119    func_name: &str,
120    args: Vec<LuaMessage>,
121) -> Result<LuaMessage, LuaError> {
122    // `ctx` is used in multiple closure in the lua scope.
123    // to create multiple borrow in closures, we use RefCell to move the borrow-checking to runtime.
124    // Voliating the check will result in panic. Which shouldn't happend(I think) since lua is single-threaded.
125    let ctx = RefCell::new(ctx);
126    let recs = RefCell::new(recs);
127
128    vm.context(|lua_ctx| {
129        let iter = args
130            .into_iter()
131            .map(|msg| msg.to_lua(lua_ctx).unwrap())
132            .collect();
133        let args = MultiValue::from_vec(iter);
134        // We can't create a function with references to `self` and is 'static since `self` already owns Lua.
135        // A function within Lua owning `self` creates self-borrowing cycle.
136        //
137        // Also, Lua requires all values passed to it is 'static because we can't know when will Lua GC our value.
138        // Therefore, we use scope to make sure these APIs are temporary and don't have to deal with 'static lifetime.
139        //
140        // (Quote from: https://github.com/kyren/rlua/issues/56#issuecomment-363928738
141        // When the scope ends, the Lua function is 100% guaranteed (afaict!) to be "invalidated".
142        // This means that calling the function will cause an immediate Lua error with a message like "error, call of invalidated function".)
143        //
144        // for reference, check https://github.com/kyren/rlua/issues/73#issuecomment-370222198
145        lua_ctx.scope(|scope| {
146            let globals = lua_ctx.globals();
147
148            let notify = scope.create_function_mut(|_, msg: LuaMessage| {
149                let mut ctx = ctx.borrow_mut();
150                ctx.notify(msg);
151                Ok(())
152            })?;
153            globals.set("notify", notify)?;
154
155            let notify_later = scope.create_function_mut(|_, (msg, secs): (LuaMessage, u64)| {
156                let mut ctx = ctx.borrow_mut();
157                ctx.notify_later(msg, Duration::new(secs, 0));
158                Ok(())
159            })?;
160            globals.set("notify_later", notify_later)?;
161
162            let do_send =
163                scope.create_function_mut(|_, (recipient_name, msg): (String, LuaMessage)| {
164                    let recs = recs.borrow_mut();
165                    let rec = recs.get(&recipient_name);
166
167                    // TODO: error handling?
168                    if let Some(r) = rec {
169                        r.do_send(msg).unwrap();
170                    }
171                    Ok(())
172                })?;
173            globals.set("do_send", do_send)?;
174
175            let send = scope.create_function_mut(
176                |_, (recipient_name, msg, cb_thread_id): (String, LuaMessage, i64)| {
177                    // we can't create a lua function which owns `self`
178                    // but `self` is needed for resolving `send` future.
179                    //
180                    // The workaround is we notify ourself with a `SendAttempt` Message
181                    // and resolving `send` future in the `handle` function.
182                    self_addr
183                        .do_send(SendAttempt {
184                            recipient_name,
185                            msg,
186                            cb_thread_id,
187                        })
188                        .unwrap();
189
190                    Ok(())
191                },
192            )?;
193            globals.set("send", send)?;
194
195            let terminate = scope.create_function_mut(|_, _: LuaMessage| {
196                let mut ctx = ctx.borrow_mut();
197                ctx.terminate();
198                Ok(())
199            })?;
200            globals.set("terminate", terminate)?;
201
202            let lua_handle: Result<Function, LuaError> = globals.get(func_name);
203            if let Ok(f) = lua_handle {
204                match f.call::<MultiValue, Value>(args) {
205                    Err(e) => panic!("{:?}", e),
206                    Ok(ret) => Ok(LuaMessage::from_lua(ret, lua_ctx).unwrap()),
207                }
208            } else {
209                // return nil if handle is not defined
210                Ok(LuaMessage::Nil)
211            }
212        })
213    })
214}
215
216impl Actor for LuaActor {
217    type Context = Context<Self>;
218
219    fn started(&mut self, ctx: &mut Context<Self>) {
220        if let Err(e) = invoke(
221            &ctx.address().recipient(),
222            ctx,
223            &mut self.vm,
224            &mut self.recipients,
225            "__run",
226            vec![LuaMessage::from("started")],
227        ) {
228            panic!("lua actor started failed {:?}", e);
229        }
230    }
231
232    fn stopped(&mut self, ctx: &mut Context<Self>) {
233        if let Err(e) = invoke(
234            &ctx.address().recipient(),
235            ctx,
236            &mut self.vm,
237            &mut self.recipients,
238            "__run",
239            vec![LuaMessage::from("stopped")],
240        ) {
241            panic!("lua actor stopped failed {:?}", e);
242        }
243    }
244}
245
246struct SendAttempt {
247    recipient_name: String,
248    msg: LuaMessage,
249    cb_thread_id: i64,
250}
251
252impl Message for SendAttempt {
253    type Result = LuaMessage;
254}
255
256struct SendAttemptResult {
257    msg: LuaMessage,
258    cb_thread_id: i64,
259}
260
261impl Message for SendAttemptResult {
262    type Result = LuaMessage;
263}
264
265impl Handler<LuaMessage> for LuaActor {
266    type Result = LuaMessage;
267
268    fn handle(&mut self, msg: LuaMessage, ctx: &mut Context<Self>) -> Self::Result {
269        if let Ok(res) = invoke(
270            &ctx.address().recipient(),
271            ctx,
272            &mut self.vm,
273            &mut self.recipients,
274            "__run",
275            vec![LuaMessage::from("handle"), msg],
276        ) {
277            res
278        } else {
279            LuaMessage::Nil
280        }
281    }
282}
283
284impl Handler<SendAttemptResult> for LuaActor {
285    type Result = LuaMessage;
286
287    fn handle(&mut self, result: SendAttemptResult, ctx: &mut Context<Self>) -> Self::Result {
288        if let Ok(res) = invoke(
289            &ctx.address().recipient(),
290            ctx,
291            &mut self.vm,
292            &mut self.recipients,
293            "__resume",
294            vec![LuaMessage::from(result.cb_thread_id), result.msg],
295        ) {
296            res
297        } else {
298            LuaMessage::Nil
299        }
300    }
301}
302
303impl Handler<SendAttempt> for LuaActor {
304    type Result = LuaMessage;
305
306    fn handle(&mut self, attempt: SendAttempt, ctx: &mut Context<Self>) -> Self::Result {
307        let rec = &self.recipients[&attempt.recipient_name];
308        let self_addr = ctx.address().clone();
309        rec.send(attempt.msg.clone())
310            .into_actor(self)
311            .then(move |res, _, _| {
312                match res {
313                    Ok(msg) => self_addr.do_send(SendAttemptResult {
314                        msg,
315                        cb_thread_id: attempt.cb_thread_id,
316                    }),
317                    _ => {
318                        panic!("send attempt failed: {:?}", res);
319                    }
320                };
321                actix::fut::ok(())
322            })
323            .wait(ctx);
324
325        LuaMessage::Nil
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use futures_timer::Delay;
333    use std::collections::HashMap;
334    use std::time::Duration;
335    use tokio::prelude::Future;
336
337    use crate::builder::LuaActorBuilder;
338
339    fn lua_actor_with_handle(script: &str) -> LuaActor {
340        LuaActorBuilder::new()
341            .on_handle_with_lua(script)
342            .build()
343            .unwrap()
344    }
345
346    #[test]
347    fn lua_actor_basic() {
348        let system = System::new("test");
349
350        let lua_addr = lua_actor_with_handle(r#"return ctx.msg + 1"#).start();
351
352        let l = lua_addr.send(LuaMessage::from(1));
353        Arbiter::spawn(
354            l.map(|res| {
355                assert_eq!(res, LuaMessage::from(2));
356                System::current().stop();
357            })
358            .map_err(|e| println!("actor dead {}", e)),
359        );
360
361        system.run();
362    }
363
364    #[test]
365    fn lua_actor_syntax_error() {
366        let res = LuaActorBuilder::new()
367            .on_handle_with_lua(r"return 1+")
368            .build();
369
370        if let Ok(_) = res {
371            panic!("should return Err(syntax_error)");
372        }
373    }
374
375    #[should_panic]
376    #[test]
377    fn lua_actor_user_error() {
378        let system = System::new("test");
379
380        let lua_addr = lua_actor_with_handle(
381            r#"
382        print("before")
383        error("foo")
384        print("after")
385        "#,
386        )
387        .start();
388
389        let l = lua_addr.send(LuaMessage::from(0));
390        Arbiter::spawn(
391            l.map(|_| {
392                // it should panic
393                System::current().stop();
394            })
395            .map_err(|e| println!("actor dead {}", e)),
396        );
397
398        system.run();
399    }
400
401    #[test]
402    fn lua_actor_return_table() {
403        let system = System::new("test");
404
405        let lua_addr = lua_actor_with_handle(
406            r#"
407        return {x = 1}
408        "#,
409        )
410        .start();
411
412        let l = lua_addr.send(LuaMessage::from(3));
413        Arbiter::spawn(
414            l.map(|res| {
415                let mut t = HashMap::new();
416                t.insert("x".to_string(), LuaMessage::from(1));
417
418                assert_eq!(res, LuaMessage::from(t));
419                System::current().stop();
420            })
421            .map_err(|e| println!("actor dead {}", e)),
422        );
423
424        system.run();
425    }
426
427    #[test]
428    fn lua_actor_state() {
429        let system = System::new("test");
430
431        let lua_addr = lua_actor_with_handle(
432            r#"
433        if not ctx.state.x then ctx.state.x = 0 end
434
435        ctx.state.x = ctx.state.x + 1
436        return ctx.state.x
437        "#,
438        )
439        .start();
440
441        let l = lua_addr.send(LuaMessage::Nil);
442        Arbiter::spawn(
443            l.map(move |res| {
444                assert_eq!(res, LuaMessage::from(1));
445                let l2 = lua_addr.send(LuaMessage::Nil);
446                Arbiter::spawn(
447                    l2.map(|res| {
448                        assert_eq!(res, LuaMessage::from(2));
449                        System::current().stop();
450                    })
451                    .map_err(|e| println!("actor dead {}", e)),
452                );
453            })
454            .map_err(|e| println!("actor dead {}", e)),
455        );
456
457        system.run();
458    }
459
460    #[test]
461    fn lua_actor_notify() {
462        let system = System::new("test");
463
464        let addr = LuaActorBuilder::new()
465            .on_started_with_lua(
466                r#"
467            ctx.notify(100)
468            "#,
469            )
470            .on_handle_with_lua(
471                r#"
472            if ctx.msg == 100 then
473                ctx.state.notified = ctx.msg
474            end
475
476            return ctx.msg + ctx.state.notified
477            "#,
478            )
479            .build()
480            .unwrap()
481            .start();
482
483        let delay = Delay::new(Duration::from_secs(1)).map(move |()| {
484            let l = addr.send(LuaMessage::from(1));
485            Arbiter::spawn(
486                l.map(|res| {
487                    assert_eq!(res, LuaMessage::from(101));
488                    System::current().stop();
489                })
490                .map_err(|e| println!("actor dead {}", e)),
491            )
492        });
493        Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
494
495        system.run();
496    }
497
498    #[test]
499    fn lua_actor_notify_later() {
500        let system = System::new("test");
501
502        let addr = LuaActorBuilder::new()
503            .on_started_with_lua(
504                r#"
505            ctx.notify_later(100, 1)
506            "#,
507            )
508            .on_handle_with_lua(
509                r#"
510            if ctx.msg == 100 then
511                ctx.state.notified = ctx.msg
512            end
513
514            return ctx.msg + ctx.state.notified
515            "#,
516            )
517            .build()
518            .unwrap()
519            .start();
520
521        let delay = Delay::new(Duration::from_secs(2)).map(move |()| {
522            let l2 = addr.send(LuaMessage::from(1));
523            Arbiter::spawn(
524                l2.map(|res| {
525                    assert_eq!(res, LuaMessage::from(101));
526                    System::current().stop();
527                })
528                .map_err(|e| println!("actor dead {}", e)),
529            )
530        });
531        Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
532
533        system.run();
534    }
535
536    #[test]
537    fn lua_actor_send() {
538        use std::mem::discriminant;
539        let system = System::new("test");
540
541        struct Callback;
542        impl Actor for Callback {
543            type Context = Context<Self>;
544        }
545
546        impl Handler<LuaMessage> for Callback {
547            type Result = LuaMessage;
548
549            fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
550                // check msg type
551                assert_eq!(
552                    discriminant(&msg),
553                    discriminant(&LuaMessage::String("foo".to_string()))
554                );
555                if let LuaMessage::String(s) = msg {
556                    assert_eq!(s, "Hello");
557                    System::current().stop();
558                    LuaMessage::Boolean(true)
559                } else {
560                    unimplemented!()
561                }
562            }
563        }
564        let callback_addr = Callback.start();
565
566        let mut actor = LuaActorBuilder::new()
567            .on_started_with_lua(
568                r#"
569            local result = ctx.send("callback", "Hello")
570            print("result", "=", result)
571            "#,
572            )
573            .build()
574            .unwrap();
575
576        actor.add_recipients("callback", callback_addr.recipient());
577        actor.start();
578        system.run();
579    }
580
581    #[test]
582    fn lua_actor_thread_yield() {
583        use std::mem::discriminant;
584        struct Callback;
585        impl Actor for Callback {
586            type Context = Context<Self>;
587        }
588
589        impl Handler<LuaMessage> for Callback {
590            type Result = LuaMessage;
591
592            fn handle(&mut self, _: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
593                LuaMessage::Nil
594            }
595        }
596
597        let system = System::new("test");
598
599        let mut actor = LuaActorBuilder::new()
600            .on_handle_with_lua(
601                r#"
602            local result = ctx.send("callback", "Hello")
603            print(result)
604            return result
605            "#,
606            )
607            .build()
608            .unwrap();
609
610        actor.add_recipients("callback", Callback.start().recipient());
611
612        let addr = actor.start();
613
614        let l = addr.send(LuaMessage::Nil);
615        Arbiter::spawn(
616            l.map(move |res| {
617                assert_eq!(
618                    discriminant(&res),
619                    discriminant(&LuaMessage::ThreadYield("foo".to_string()))
620                );
621                System::current().stop();
622            })
623            .map_err(|e| println!("actor dead {}", e)),
624        );
625
626        system.run();
627    }
628
629    #[test]
630    fn lua_actor_thread_yield_and_callback_message() {
631        use std::mem::discriminant;
632
633        struct Callback;
634        impl Actor for Callback {
635            type Context = Context<Self>;
636        }
637
638        impl Handler<LuaMessage> for Callback {
639            type Result = LuaMessage;
640
641            fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
642                // check msg type
643                assert_eq!(
644                    discriminant(&msg),
645                    discriminant(&LuaMessage::String("foo".to_string()))
646                );
647                if let LuaMessage::String(s) = msg {
648                    assert_eq!(s, "Hello");
649                    LuaMessage::String(format!("{} from callback", s))
650                } else {
651                    unimplemented!()
652                }
653            }
654        }
655
656        struct Check;
657        impl Actor for Check {
658            type Context = Context<Self>;
659        }
660
661        impl Handler<LuaMessage> for Check {
662            type Result = LuaMessage;
663
664            fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
665                // check msg type
666                assert_eq!(
667                    discriminant(&msg),
668                    discriminant(&LuaMessage::String("foo".to_string()))
669                );
670                if let LuaMessage::String(s) = msg {
671                    assert_eq!(s, "Hello from callback");
672                    System::current().stop();
673                    LuaMessage::Nil
674                } else {
675                    unimplemented!()
676                }
677            }
678        }
679
680        let system = System::new("test");
681        let mut actor = LuaActorBuilder::new()
682            .on_handle_with_lua(
683                r#"
684            local result = ctx.send("callback", ctx.msg)
685            print("send result", "=", result)
686            ctx.send("check", result)
687            "#,
688            )
689            .build()
690            .unwrap();
691
692        actor.add_recipients("callback", Callback.start().recipient());
693        actor.add_recipients("check", Check.start().recipient());
694
695        let addr = actor.start();
696
697        let l = addr.send(LuaMessage::String("Hello".to_string()));
698        Arbiter::spawn(
699            l.map(move |res| {
700                assert_eq!(
701                    discriminant(&res),
702                    discriminant(&LuaMessage::ThreadYield("foo".to_string()))
703                );
704            })
705            .map_err(|e| println!("actor dead {}", e)),
706        );
707
708        system.run();
709    }
710
711    #[test]
712    fn lua_actor_do_send() {
713        use std::mem::discriminant;
714
715        struct Check;
716        impl Actor for Check {
717            type Context = Context<Self>;
718        }
719
720        impl Handler<LuaMessage> for Check {
721            type Result = LuaMessage;
722
723            fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
724                // check msg type
725                assert_eq!(
726                    discriminant(&msg),
727                    discriminant(&LuaMessage::String("foo".to_string()))
728                );
729                if let LuaMessage::String(s) = msg {
730                    assert_eq!(s, "Hello");
731                    System::current().stop();
732                    LuaMessage::Nil
733                } else {
734                    unimplemented!()
735                }
736            }
737        }
738        let system = System::new("test");
739
740        let mut actor = LuaActorBuilder::new()
741            .on_handle_with_lua(
742                r#"
743            local result = ctx.do_send("check", "Hello")
744            print("new actor addr name", rec, result)
745            return ctx.msg
746            "#,
747            )
748            .build()
749            .unwrap();
750        actor.add_recipients("check", Check.start().recipient());
751        let addr = actor.start();
752
753        let l = addr.send(LuaMessage::Nil);
754        Arbiter::spawn(
755            l.map(|res| {
756                assert_eq!(res, LuaMessage::Nil);
757            })
758            .map_err(|e| println!("actor dead {}", e)),
759        );
760
761        system.run();
762    }
763
764    #[test]
765    fn lua_actor_terminate() {
766        // TODO: validate on_stopped is called
767        let system = System::new("test");
768
769        let _ = LuaActorBuilder::new()
770            .on_started_with_lua(
771                r#"
772            ctx.terminate()
773            "#,
774            )
775            .on_stopped_with_lua(r#"print("stopped")"#)
776            .build()
777            .unwrap()
778            .start();
779        let delay = Delay::new(Duration::from_secs(1)).map(move |()| {
780            System::current().stop();
781        });
782        Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
783
784        system.run();
785    }
786
787    use std::env;
788
789    #[test]
790    fn lua_actor_require() {
791        let system = System::new("test");
792        env::set_var("LUA_PATH", "./src/?.lua;;");
793
794        let addr = LuaActorBuilder::new()
795            .on_handle_with_lua(
796                r#"
797                local m = require('lua/test/module')
798                return m.incr(ctx.msg)
799            "#,
800            )
801            .build()
802            .unwrap()
803            .start();
804        let l = addr.send(LuaMessage::from(1));
805        Arbiter::spawn(
806            l.map(|res| {
807                assert_eq!(res, LuaMessage::from(2));
808                System::current().stop();
809            })
810            .map_err(|e| println!("actor dead {}", e)),
811        );
812
813        system.run();
814    }
815
816    #[test]
817    fn lua_actor_with_vm() {
818        let system = System::new("test");
819
820        let vm = Lua::new();
821        vm.context(|ctx| {
822            ctx.globals()
823                .set(
824                    "greet",
825                    ctx.create_function(|_, name: String| Ok(format!("Hello, {}!", name)))
826                        .unwrap(),
827                )
828                .unwrap();
829        });
830
831        let addr = LuaActorBuilder::new()
832            .on_handle_with_lua(
833                r#"
834            return greet(ctx.msg)
835            "#,
836            )
837            .build_with_vm(vm)
838            .unwrap()
839            .start();
840
841        let l = addr.send(LuaMessage::from("World"));
842        Arbiter::spawn(
843            l.map(|res| {
844                assert_eq!(res, LuaMessage::from("Hello, World!"));
845                System::current().stop();
846            })
847            .map_err(|e| println!("actor dead {}", e)),
848        );
849
850        system.run();
851    }
852}