#[macro_use]
extern crate riker_testkit;
use riker::actors::*;
use riker_testkit::probe::{Probe, ProbeReceive};
use riker_testkit::probe::channel::{probe, ChannelProbe};
#[derive(Clone, Debug)]
pub struct TestProbe(ChannelProbe<(), ()>);
#[derive(Clone, Debug)]
pub struct SomeMessage;
#[actor(TestProbe, SomeMessage)]
struct Subscriber {
probe: Option<TestProbe>,
chan: ChannelRef<SomeMessage>,
topic: Topic,
}
impl Subscriber {
fn actor((chan, topic): (ChannelRef<SomeMessage>, Topic)) -> Self {
Subscriber {
probe: None,
chan,
topic
}
}
fn props(chan: ChannelRef<SomeMessage>, topic: Topic) -> BoxActorProd<Subscriber> {
Props::new_args(Subscriber::actor, (chan, topic))
}
}
impl Actor for Subscriber {
type Msg = SubscriberMsg;
fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
let sub = Box::new(ctx.myself());
self.chan.tell(Subscribe { actor: sub, topic: self.topic.clone() }, None);
}
fn recv(&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Sender) {
self.receive(ctx, msg, sender);
}
}
impl Receive<TestProbe> for Subscriber {
type Msg = SubscriberMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
msg: TestProbe,
_sender: Sender) {
msg.0.event(());
self.probe = Some(msg);
}
}
impl Receive<SomeMessage> for Subscriber {
type Msg = SubscriberMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
_msg: SomeMessage,
_sender: Sender) {
self.probe.as_ref().unwrap().0.event(());
}
}
#[test]
fn channel_publish() {
let sys = ActorSystem::new().unwrap();
let chan: ChannelRef<SomeMessage> = channel("my-chan", &sys).unwrap();
let topic = Topic::from("my-topic");
let sub = sys.actor_of(Subscriber::props(chan.clone(), topic.clone()), "sub-actor").unwrap();
let (probe, listen) = probe();
sub.tell(TestProbe(probe), None);
listen.recv();
chan.tell(Publish { msg: SomeMessage, topic: topic }, None);
p_assert_eq!(listen, ());
}
#[test]
fn channel_publish_subscribe_all() {
let sys = ActorSystem::new().unwrap();
let chan: ChannelRef<SomeMessage> = channel("my-chan", &sys).unwrap();
let topic = Topic::from("*");
let sub = sys.actor_of(Subscriber::props(chan.clone(), topic.clone()), "sub-actor").unwrap();
let (probe, listen) = probe();
sub.tell(TestProbe(probe), None);
listen.recv();
chan.tell(Publish { msg: SomeMessage, topic: "topic-1".into() }, None);
chan.tell(Publish { msg: SomeMessage, topic: "topic-2".into() }, None);
chan.tell(Publish { msg: SomeMessage, topic: "topic-3".into() }, None);
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
}
#[derive(Clone, Debug)]
pub struct Panic;
#[actor(Panic, SomeMessage)]
struct DumbActor;
impl DumbActor {
fn new() -> Self {
DumbActor
}
}
impl Actor for DumbActor {
type Msg = DumbActorMsg;
fn recv(&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Sender) {
self.receive(ctx, msg, sender);
}
}
impl Receive<Panic> for DumbActor {
type Msg = DumbActorMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
_msg: Panic,
_sender: Sender) {
panic!("// TEST PANIC // TEST PANIC // TEST PANIC //");
}
}
impl Receive<SomeMessage> for DumbActor {
type Msg = DumbActorMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
_msg: SomeMessage,
_sender: Sender) {
}
}
#[derive(Clone, Debug)]
struct SysEvent(SystemEvent);
#[actor(TestProbe, SystemEvent)]
struct EventSubscriber {
probe: Option<TestProbe>,
}
impl EventSubscriber {
fn new() -> Self {
EventSubscriber {
probe: None
}
}
fn props() -> BoxActorProd<EventSubscriber> {
Props::new(EventSubscriber::new)
}
}
impl Actor for EventSubscriber {
type Msg = EventSubscriberMsg;
fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
let sub = Box::new(ctx.myself());
ctx.system
.sys_events()
.tell(Subscribe { actor: sub, topic: "*".into() }, None);
}
fn recv(&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Sender) {
self.receive(ctx, msg, sender);
}
fn sys_recv(&mut self,
ctx: &Context<Self::Msg>,
msg: SystemMsg,
sender: Sender) {
if let SystemMsg::Event(evt) = msg {
self.receive(ctx, evt, sender);
}
}
}
impl Receive<TestProbe> for EventSubscriber {
type Msg = EventSubscriberMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
msg: TestProbe,
_sender: Sender) {
msg.0.event(());
self.probe = Some(msg);
}
}
impl Receive<SystemEvent> for EventSubscriber {
type Msg = EventSubscriberMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
msg: SystemEvent,
_sender: Sender) {
match msg {
SystemEvent::ActorCreated(created) => {
if created.actor.path() == "/user/dumb-actor" {
self.probe.as_ref().unwrap().0.event(())
}
}
SystemEvent::ActorRestarted(restarted) => {
if restarted.actor.path() == "/user/dumb-actor" {
self.probe.as_ref().unwrap().0.event(())
}
}
SystemEvent::ActorTerminated(terminated) => {
if terminated.actor.path() == "/user/dumb-actor" {
self.probe.as_ref().unwrap().0.event(())
}
}
}
}
}
#[test]
fn channel_system_events() {
let sys = ActorSystem::new().unwrap();
let actor = sys.actor_of(EventSubscriber::props(), "event-sub").unwrap();
let (probe, listen) = probe();
actor.tell(TestProbe(probe), None);
listen.recv();
let props = Props::new(DumbActor::new);
let dumb = sys.actor_of(props, "dumb-actor").unwrap();
p_assert_eq!(listen, ());
dumb.tell(Panic, None);
p_assert_eq!(listen, ());
sys.stop(&dumb);
p_assert_eq!(listen, ());
}
#[actor(TestProbe, DeadLetter)]
struct DeadLetterSub {
probe: Option<TestProbe>,
}
impl DeadLetterSub {
fn new() -> Self {
DeadLetterSub {
probe: None
}
}
fn props() -> BoxActorProd<DeadLetterSub> {
Props::new(DeadLetterSub::new)
}
}
impl Actor for DeadLetterSub {
type Msg = DeadLetterSubMsg;
fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
let sub = Box::new(ctx.myself());
ctx.system
.dead_letters()
.tell(Subscribe { actor: sub, topic: "*".into() }, None);
}
fn recv(&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Sender) {
self.receive(ctx, msg, sender)
}
}
impl Receive<TestProbe> for DeadLetterSub {
type Msg = DeadLetterSubMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
msg: TestProbe,
_sender: Sender) {
msg.0.event(());
self.probe = Some(msg);
}
}
impl Receive<DeadLetter> for DeadLetterSub {
type Msg = DeadLetterSubMsg;
fn receive(&mut self,
_ctx: &Context<Self::Msg>,
_msg: DeadLetter,
_sender: Sender) {
self.probe.as_ref().unwrap().0.event(());
}
}
#[test]
fn channel_dead_letters() {
let sys = ActorSystem::new().unwrap();
let actor = sys.actor_of(DeadLetterSub::props(), "dl-subscriber").unwrap();
let (probe, listen) = probe();
actor.tell(TestProbe(probe), None);
listen.recv();
let props = Props::new(DumbActor::new);
let dumb = sys.actor_of(props, "dumb-actor").unwrap();
sys.stop(&dumb);
std::thread::sleep(std::time::Duration::from_secs(1));
dumb.tell(SomeMessage, None);
p_assert_eq!(listen, ());
}