1use ::actix::prelude::*;
2use ::actix::ActorContext;
3use rlua::Error as LuaError;
4use rlua::{FromLua, Function, Lua, MultiValue, ToLua, Value};
5
6use crate::message::LuaMessage;
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::str;
10use std::time::Duration;
11
12pub struct LuaActor {
46 vm: Lua,
47 pub recipients: HashMap<String, Recipient<LuaMessage>>,
48}
49
50impl LuaActor {
51 pub fn new_with_vm(
52 vm: Lua,
53 started: Option<String>,
54 handle: Option<String>,
55 stopped: Option<String>,
56 ) -> Result<LuaActor, LuaError> {
57 let prelude = include_str!("lua/prelude.lua");
58 vm.context(|ctx| {
59 ctx.load(prelude).set_name("Prelude")?.exec()?;
60 {
61 let load: Function = ctx.globals().get("__load")?;
62 if let Some(script) = started {
63 let res = load.call::<(String, String), ()>((script, "started".to_string()));
64
65 if let Err(e) = res {
66 return Result::Err(e);
67 }
68 }
69 if let Some(script) = handle {
70 let res = load.call::<(String, String), ()>((script, "handle".to_string()));
71
72 if let Err(e) = res {
73 return Result::Err(e);
74 }
75 }
76 if let Some(script) = stopped {
77 let res = load.call::<(String, String), ()>((script, "stopped".to_string()));
78
79 if let Err(e) = res {
80 return Result::Err(e);
81 }
82 }
83 }
84 Ok(())
85 })?;
86
87 Result::Ok(LuaActor {
88 vm,
89 recipients: HashMap::new(),
90 })
91 }
92
93 pub fn new(
94 started: Option<String>,
95 handle: Option<String>,
96 stopped: Option<String>,
97 ) -> Result<LuaActor, LuaError> {
98 let vm = Lua::new();
99 Self::new_with_vm(vm, started, handle, stopped)
100 }
101
102 pub fn add_recipients(
105 &mut self,
106 name: &str,
107 rec: Recipient<LuaMessage>,
108 ) -> Option<Recipient<LuaMessage>> {
109 self.recipients.insert(name.to_string(), rec)
110 }
111}
112
113fn invoke(
115 self_addr: &Recipient<SendAttempt>,
116 ctx: &mut Context<LuaActor>,
117 vm: &mut Lua,
118 recs: &mut HashMap<String, Recipient<LuaMessage>>,
119 func_name: &str,
120 args: Vec<LuaMessage>,
121) -> Result<LuaMessage, LuaError> {
122 let ctx = RefCell::new(ctx);
126 let recs = RefCell::new(recs);
127
128 vm.context(|lua_ctx| {
129 let iter = args
130 .into_iter()
131 .map(|msg| msg.to_lua(lua_ctx).unwrap())
132 .collect();
133 let args = MultiValue::from_vec(iter);
134 lua_ctx.scope(|scope| {
146 let globals = lua_ctx.globals();
147
148 let notify = scope.create_function_mut(|_, msg: LuaMessage| {
149 let mut ctx = ctx.borrow_mut();
150 ctx.notify(msg);
151 Ok(())
152 })?;
153 globals.set("notify", notify)?;
154
155 let notify_later = scope.create_function_mut(|_, (msg, secs): (LuaMessage, u64)| {
156 let mut ctx = ctx.borrow_mut();
157 ctx.notify_later(msg, Duration::new(secs, 0));
158 Ok(())
159 })?;
160 globals.set("notify_later", notify_later)?;
161
162 let do_send =
163 scope.create_function_mut(|_, (recipient_name, msg): (String, LuaMessage)| {
164 let recs = recs.borrow_mut();
165 let rec = recs.get(&recipient_name);
166
167 if let Some(r) = rec {
169 r.do_send(msg).unwrap();
170 }
171 Ok(())
172 })?;
173 globals.set("do_send", do_send)?;
174
175 let send = scope.create_function_mut(
176 |_, (recipient_name, msg, cb_thread_id): (String, LuaMessage, i64)| {
177 self_addr
183 .do_send(SendAttempt {
184 recipient_name,
185 msg,
186 cb_thread_id,
187 })
188 .unwrap();
189
190 Ok(())
191 },
192 )?;
193 globals.set("send", send)?;
194
195 let terminate = scope.create_function_mut(|_, _: LuaMessage| {
196 let mut ctx = ctx.borrow_mut();
197 ctx.terminate();
198 Ok(())
199 })?;
200 globals.set("terminate", terminate)?;
201
202 let lua_handle: Result<Function, LuaError> = globals.get(func_name);
203 if let Ok(f) = lua_handle {
204 match f.call::<MultiValue, Value>(args) {
205 Err(e) => panic!("{:?}", e),
206 Ok(ret) => Ok(LuaMessage::from_lua(ret, lua_ctx).unwrap()),
207 }
208 } else {
209 Ok(LuaMessage::Nil)
211 }
212 })
213 })
214}
215
216impl Actor for LuaActor {
217 type Context = Context<Self>;
218
219 fn started(&mut self, ctx: &mut Context<Self>) {
220 if let Err(e) = invoke(
221 &ctx.address().recipient(),
222 ctx,
223 &mut self.vm,
224 &mut self.recipients,
225 "__run",
226 vec![LuaMessage::from("started")],
227 ) {
228 panic!("lua actor started failed {:?}", e);
229 }
230 }
231
232 fn stopped(&mut self, ctx: &mut Context<Self>) {
233 if let Err(e) = invoke(
234 &ctx.address().recipient(),
235 ctx,
236 &mut self.vm,
237 &mut self.recipients,
238 "__run",
239 vec![LuaMessage::from("stopped")],
240 ) {
241 panic!("lua actor stopped failed {:?}", e);
242 }
243 }
244}
245
246struct SendAttempt {
247 recipient_name: String,
248 msg: LuaMessage,
249 cb_thread_id: i64,
250}
251
252impl Message for SendAttempt {
253 type Result = LuaMessage;
254}
255
256struct SendAttemptResult {
257 msg: LuaMessage,
258 cb_thread_id: i64,
259}
260
261impl Message for SendAttemptResult {
262 type Result = LuaMessage;
263}
264
265impl Handler<LuaMessage> for LuaActor {
266 type Result = LuaMessage;
267
268 fn handle(&mut self, msg: LuaMessage, ctx: &mut Context<Self>) -> Self::Result {
269 if let Ok(res) = invoke(
270 &ctx.address().recipient(),
271 ctx,
272 &mut self.vm,
273 &mut self.recipients,
274 "__run",
275 vec![LuaMessage::from("handle"), msg],
276 ) {
277 res
278 } else {
279 LuaMessage::Nil
280 }
281 }
282}
283
284impl Handler<SendAttemptResult> for LuaActor {
285 type Result = LuaMessage;
286
287 fn handle(&mut self, result: SendAttemptResult, ctx: &mut Context<Self>) -> Self::Result {
288 if let Ok(res) = invoke(
289 &ctx.address().recipient(),
290 ctx,
291 &mut self.vm,
292 &mut self.recipients,
293 "__resume",
294 vec![LuaMessage::from(result.cb_thread_id), result.msg],
295 ) {
296 res
297 } else {
298 LuaMessage::Nil
299 }
300 }
301}
302
303impl Handler<SendAttempt> for LuaActor {
304 type Result = LuaMessage;
305
306 fn handle(&mut self, attempt: SendAttempt, ctx: &mut Context<Self>) -> Self::Result {
307 let rec = &self.recipients[&attempt.recipient_name];
308 let self_addr = ctx.address().clone();
309 rec.send(attempt.msg.clone())
310 .into_actor(self)
311 .then(move |res, _, _| {
312 match res {
313 Ok(msg) => self_addr.do_send(SendAttemptResult {
314 msg,
315 cb_thread_id: attempt.cb_thread_id,
316 }),
317 _ => {
318 panic!("send attempt failed: {:?}", res);
319 }
320 };
321 actix::fut::ok(())
322 })
323 .wait(ctx);
324
325 LuaMessage::Nil
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use futures_timer::Delay;
333 use std::collections::HashMap;
334 use std::time::Duration;
335 use tokio::prelude::Future;
336
337 use crate::builder::LuaActorBuilder;
338
339 fn lua_actor_with_handle(script: &str) -> LuaActor {
340 LuaActorBuilder::new()
341 .on_handle_with_lua(script)
342 .build()
343 .unwrap()
344 }
345
346 #[test]
347 fn lua_actor_basic() {
348 let system = System::new("test");
349
350 let lua_addr = lua_actor_with_handle(r#"return ctx.msg + 1"#).start();
351
352 let l = lua_addr.send(LuaMessage::from(1));
353 Arbiter::spawn(
354 l.map(|res| {
355 assert_eq!(res, LuaMessage::from(2));
356 System::current().stop();
357 })
358 .map_err(|e| println!("actor dead {}", e)),
359 );
360
361 system.run();
362 }
363
364 #[test]
365 fn lua_actor_syntax_error() {
366 let res = LuaActorBuilder::new()
367 .on_handle_with_lua(r"return 1+")
368 .build();
369
370 if let Ok(_) = res {
371 panic!("should return Err(syntax_error)");
372 }
373 }
374
375 #[should_panic]
376 #[test]
377 fn lua_actor_user_error() {
378 let system = System::new("test");
379
380 let lua_addr = lua_actor_with_handle(
381 r#"
382 print("before")
383 error("foo")
384 print("after")
385 "#,
386 )
387 .start();
388
389 let l = lua_addr.send(LuaMessage::from(0));
390 Arbiter::spawn(
391 l.map(|_| {
392 System::current().stop();
394 })
395 .map_err(|e| println!("actor dead {}", e)),
396 );
397
398 system.run();
399 }
400
401 #[test]
402 fn lua_actor_return_table() {
403 let system = System::new("test");
404
405 let lua_addr = lua_actor_with_handle(
406 r#"
407 return {x = 1}
408 "#,
409 )
410 .start();
411
412 let l = lua_addr.send(LuaMessage::from(3));
413 Arbiter::spawn(
414 l.map(|res| {
415 let mut t = HashMap::new();
416 t.insert("x".to_string(), LuaMessage::from(1));
417
418 assert_eq!(res, LuaMessage::from(t));
419 System::current().stop();
420 })
421 .map_err(|e| println!("actor dead {}", e)),
422 );
423
424 system.run();
425 }
426
427 #[test]
428 fn lua_actor_state() {
429 let system = System::new("test");
430
431 let lua_addr = lua_actor_with_handle(
432 r#"
433 if not ctx.state.x then ctx.state.x = 0 end
434
435 ctx.state.x = ctx.state.x + 1
436 return ctx.state.x
437 "#,
438 )
439 .start();
440
441 let l = lua_addr.send(LuaMessage::Nil);
442 Arbiter::spawn(
443 l.map(move |res| {
444 assert_eq!(res, LuaMessage::from(1));
445 let l2 = lua_addr.send(LuaMessage::Nil);
446 Arbiter::spawn(
447 l2.map(|res| {
448 assert_eq!(res, LuaMessage::from(2));
449 System::current().stop();
450 })
451 .map_err(|e| println!("actor dead {}", e)),
452 );
453 })
454 .map_err(|e| println!("actor dead {}", e)),
455 );
456
457 system.run();
458 }
459
460 #[test]
461 fn lua_actor_notify() {
462 let system = System::new("test");
463
464 let addr = LuaActorBuilder::new()
465 .on_started_with_lua(
466 r#"
467 ctx.notify(100)
468 "#,
469 )
470 .on_handle_with_lua(
471 r#"
472 if ctx.msg == 100 then
473 ctx.state.notified = ctx.msg
474 end
475
476 return ctx.msg + ctx.state.notified
477 "#,
478 )
479 .build()
480 .unwrap()
481 .start();
482
483 let delay = Delay::new(Duration::from_secs(1)).map(move |()| {
484 let l = addr.send(LuaMessage::from(1));
485 Arbiter::spawn(
486 l.map(|res| {
487 assert_eq!(res, LuaMessage::from(101));
488 System::current().stop();
489 })
490 .map_err(|e| println!("actor dead {}", e)),
491 )
492 });
493 Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
494
495 system.run();
496 }
497
498 #[test]
499 fn lua_actor_notify_later() {
500 let system = System::new("test");
501
502 let addr = LuaActorBuilder::new()
503 .on_started_with_lua(
504 r#"
505 ctx.notify_later(100, 1)
506 "#,
507 )
508 .on_handle_with_lua(
509 r#"
510 if ctx.msg == 100 then
511 ctx.state.notified = ctx.msg
512 end
513
514 return ctx.msg + ctx.state.notified
515 "#,
516 )
517 .build()
518 .unwrap()
519 .start();
520
521 let delay = Delay::new(Duration::from_secs(2)).map(move |()| {
522 let l2 = addr.send(LuaMessage::from(1));
523 Arbiter::spawn(
524 l2.map(|res| {
525 assert_eq!(res, LuaMessage::from(101));
526 System::current().stop();
527 })
528 .map_err(|e| println!("actor dead {}", e)),
529 )
530 });
531 Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
532
533 system.run();
534 }
535
536 #[test]
537 fn lua_actor_send() {
538 use std::mem::discriminant;
539 let system = System::new("test");
540
541 struct Callback;
542 impl Actor for Callback {
543 type Context = Context<Self>;
544 }
545
546 impl Handler<LuaMessage> for Callback {
547 type Result = LuaMessage;
548
549 fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
550 assert_eq!(
552 discriminant(&msg),
553 discriminant(&LuaMessage::String("foo".to_string()))
554 );
555 if let LuaMessage::String(s) = msg {
556 assert_eq!(s, "Hello");
557 System::current().stop();
558 LuaMessage::Boolean(true)
559 } else {
560 unimplemented!()
561 }
562 }
563 }
564 let callback_addr = Callback.start();
565
566 let mut actor = LuaActorBuilder::new()
567 .on_started_with_lua(
568 r#"
569 local result = ctx.send("callback", "Hello")
570 print("result", "=", result)
571 "#,
572 )
573 .build()
574 .unwrap();
575
576 actor.add_recipients("callback", callback_addr.recipient());
577 actor.start();
578 system.run();
579 }
580
581 #[test]
582 fn lua_actor_thread_yield() {
583 use std::mem::discriminant;
584 struct Callback;
585 impl Actor for Callback {
586 type Context = Context<Self>;
587 }
588
589 impl Handler<LuaMessage> for Callback {
590 type Result = LuaMessage;
591
592 fn handle(&mut self, _: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
593 LuaMessage::Nil
594 }
595 }
596
597 let system = System::new("test");
598
599 let mut actor = LuaActorBuilder::new()
600 .on_handle_with_lua(
601 r#"
602 local result = ctx.send("callback", "Hello")
603 print(result)
604 return result
605 "#,
606 )
607 .build()
608 .unwrap();
609
610 actor.add_recipients("callback", Callback.start().recipient());
611
612 let addr = actor.start();
613
614 let l = addr.send(LuaMessage::Nil);
615 Arbiter::spawn(
616 l.map(move |res| {
617 assert_eq!(
618 discriminant(&res),
619 discriminant(&LuaMessage::ThreadYield("foo".to_string()))
620 );
621 System::current().stop();
622 })
623 .map_err(|e| println!("actor dead {}", e)),
624 );
625
626 system.run();
627 }
628
629 #[test]
630 fn lua_actor_thread_yield_and_callback_message() {
631 use std::mem::discriminant;
632
633 struct Callback;
634 impl Actor for Callback {
635 type Context = Context<Self>;
636 }
637
638 impl Handler<LuaMessage> for Callback {
639 type Result = LuaMessage;
640
641 fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
642 assert_eq!(
644 discriminant(&msg),
645 discriminant(&LuaMessage::String("foo".to_string()))
646 );
647 if let LuaMessage::String(s) = msg {
648 assert_eq!(s, "Hello");
649 LuaMessage::String(format!("{} from callback", s))
650 } else {
651 unimplemented!()
652 }
653 }
654 }
655
656 struct Check;
657 impl Actor for Check {
658 type Context = Context<Self>;
659 }
660
661 impl Handler<LuaMessage> for Check {
662 type Result = LuaMessage;
663
664 fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
665 assert_eq!(
667 discriminant(&msg),
668 discriminant(&LuaMessage::String("foo".to_string()))
669 );
670 if let LuaMessage::String(s) = msg {
671 assert_eq!(s, "Hello from callback");
672 System::current().stop();
673 LuaMessage::Nil
674 } else {
675 unimplemented!()
676 }
677 }
678 }
679
680 let system = System::new("test");
681 let mut actor = LuaActorBuilder::new()
682 .on_handle_with_lua(
683 r#"
684 local result = ctx.send("callback", ctx.msg)
685 print("send result", "=", result)
686 ctx.send("check", result)
687 "#,
688 )
689 .build()
690 .unwrap();
691
692 actor.add_recipients("callback", Callback.start().recipient());
693 actor.add_recipients("check", Check.start().recipient());
694
695 let addr = actor.start();
696
697 let l = addr.send(LuaMessage::String("Hello".to_string()));
698 Arbiter::spawn(
699 l.map(move |res| {
700 assert_eq!(
701 discriminant(&res),
702 discriminant(&LuaMessage::ThreadYield("foo".to_string()))
703 );
704 })
705 .map_err(|e| println!("actor dead {}", e)),
706 );
707
708 system.run();
709 }
710
711 #[test]
712 fn lua_actor_do_send() {
713 use std::mem::discriminant;
714
715 struct Check;
716 impl Actor for Check {
717 type Context = Context<Self>;
718 }
719
720 impl Handler<LuaMessage> for Check {
721 type Result = LuaMessage;
722
723 fn handle(&mut self, msg: LuaMessage, _ctx: &mut Context<Self>) -> Self::Result {
724 assert_eq!(
726 discriminant(&msg),
727 discriminant(&LuaMessage::String("foo".to_string()))
728 );
729 if let LuaMessage::String(s) = msg {
730 assert_eq!(s, "Hello");
731 System::current().stop();
732 LuaMessage::Nil
733 } else {
734 unimplemented!()
735 }
736 }
737 }
738 let system = System::new("test");
739
740 let mut actor = LuaActorBuilder::new()
741 .on_handle_with_lua(
742 r#"
743 local result = ctx.do_send("check", "Hello")
744 print("new actor addr name", rec, result)
745 return ctx.msg
746 "#,
747 )
748 .build()
749 .unwrap();
750 actor.add_recipients("check", Check.start().recipient());
751 let addr = actor.start();
752
753 let l = addr.send(LuaMessage::Nil);
754 Arbiter::spawn(
755 l.map(|res| {
756 assert_eq!(res, LuaMessage::Nil);
757 })
758 .map_err(|e| println!("actor dead {}", e)),
759 );
760
761 system.run();
762 }
763
764 #[test]
765 fn lua_actor_terminate() {
766 let system = System::new("test");
768
769 let _ = LuaActorBuilder::new()
770 .on_started_with_lua(
771 r#"
772 ctx.terminate()
773 "#,
774 )
775 .on_stopped_with_lua(r#"print("stopped")"#)
776 .build()
777 .unwrap()
778 .start();
779 let delay = Delay::new(Duration::from_secs(1)).map(move |()| {
780 System::current().stop();
781 });
782 Arbiter::spawn(delay.map_err(|e| println!("actor dead {}", e)));
783
784 system.run();
785 }
786
787 use std::env;
788
789 #[test]
790 fn lua_actor_require() {
791 let system = System::new("test");
792 env::set_var("LUA_PATH", "./src/?.lua;;");
793
794 let addr = LuaActorBuilder::new()
795 .on_handle_with_lua(
796 r#"
797 local m = require('lua/test/module')
798 return m.incr(ctx.msg)
799 "#,
800 )
801 .build()
802 .unwrap()
803 .start();
804 let l = addr.send(LuaMessage::from(1));
805 Arbiter::spawn(
806 l.map(|res| {
807 assert_eq!(res, LuaMessage::from(2));
808 System::current().stop();
809 })
810 .map_err(|e| println!("actor dead {}", e)),
811 );
812
813 system.run();
814 }
815
816 #[test]
817 fn lua_actor_with_vm() {
818 let system = System::new("test");
819
820 let vm = Lua::new();
821 vm.context(|ctx| {
822 ctx.globals()
823 .set(
824 "greet",
825 ctx.create_function(|_, name: String| Ok(format!("Hello, {}!", name)))
826 .unwrap(),
827 )
828 .unwrap();
829 });
830
831 let addr = LuaActorBuilder::new()
832 .on_handle_with_lua(
833 r#"
834 return greet(ctx.msg)
835 "#,
836 )
837 .build_with_vm(vm)
838 .unwrap()
839 .start();
840
841 let l = addr.send(LuaMessage::from("World"));
842 Arbiter::spawn(
843 l.map(|res| {
844 assert_eq!(res, LuaMessage::from("Hello, World!"));
845 System::current().stop();
846 })
847 .map_err(|e| println!("actor dead {}", e)),
848 );
849
850 system.run();
851 }
852}