riker 0.3.2

Easily build fast, highly concurrent and resilient applications. An Actor Framework for Rust.
Documentation
#[macro_use]
extern crate riker_testkit;

use riker::actors::*;

use riker_testkit::probe::channel::{probe, ChannelProbe};
use riker_testkit::probe::{Probe, ProbeReceive};

use chrono::{Duration as CDuration, Utc};
use std::time::Duration;
use uuid::Uuid;

#[derive(Clone, Debug)]
pub struct TestProbe(ChannelProbe<(), ()>);

#[derive(Clone, Debug)]
pub struct SomeMessage;

#[actor(TestProbe, SomeMessage)]
struct ScheduleOnce {
    probe: Option<TestProbe>,
}

impl ScheduleOnce {
    fn new() -> Self {
        ScheduleOnce { probe: None }
    }
}

impl Actor for ScheduleOnce {
    type Msg = ScheduleOnceMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<TestProbe> for ScheduleOnce {
    type Msg = ScheduleOnceMsg;

    fn receive(&mut self, ctx: &Context<ScheduleOnceMsg>, msg: TestProbe, _sender: Sender) {
        self.probe = Some(msg);
        // reschedule an Empty to be sent to myself()
        ctx.schedule_once(Duration::from_millis(200), ctx.myself(), None, SomeMessage);
    }
}

impl Receive<SomeMessage> for ScheduleOnce {
    type Msg = ScheduleOnceMsg;

    fn receive(&mut self, _ctx: &Context<ScheduleOnceMsg>, _msg: SomeMessage, _sender: Sender) {
        self.probe.as_ref().unwrap().0.event(());
    }
}

#[test]
fn schedule_once() {
    let sys = ActorSystem::new().unwrap();

    let props = Props::new(ScheduleOnce::new);
    let actor = sys.actor_of(props, "schedule-once").unwrap();

    let (probe, listen) = probe();

    // use scheduler to set up probe
    sys.schedule_once(Duration::from_millis(200), actor, None, TestProbe(probe));
    p_assert_eq!(listen, ());
}

#[test]
fn schedule_at_time() {
    let sys = ActorSystem::new().unwrap();

    let props = Props::new(ScheduleOnce::new);
    let actor = sys.actor_of(props, "schedule-once").unwrap();

    let (probe, listen) = probe();

    // use scheduler to set up probe at a specific time
    let schedule_at = Utc::now() + CDuration::milliseconds(200);
    sys.schedule_at_time(schedule_at, actor, None, TestProbe(probe));
    p_assert_eq!(listen, ());
}

// *** Schedule repeat test ***
#[actor(TestProbe, SomeMessage)]
struct ScheduleRepeat {
    probe: Option<TestProbe>,
    counter: u32,
    schedule_id: Option<Uuid>,
}

impl ScheduleRepeat {
    fn new() -> Self {
        ScheduleRepeat {
            probe: None,
            counter: 0,
            schedule_id: None,
        }
    }
}

impl Actor for ScheduleRepeat {
    type Msg = ScheduleRepeatMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<TestProbe> for ScheduleRepeat {
    type Msg = ScheduleRepeatMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: TestProbe, _sender: Sender) {
        self.probe = Some(msg);
        // schedule Message to be repeatedly sent to myself
        // and store the job id to cancel it later
        let id = ctx.schedule(
            Duration::from_millis(200),
            Duration::from_millis(200),
            ctx.myself(),
            None,
            SomeMessage,
        );
        self.schedule_id = Some(id);
    }
}

impl Receive<SomeMessage> for ScheduleRepeat {
    type Msg = ScheduleRepeatMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, _msg: SomeMessage, _sender: Sender) {
        if self.counter == 5 {
            ctx.cancel_schedule(self.schedule_id.unwrap());
            self.probe.as_ref().unwrap().0.event(());
        } else {
            self.counter += 1;
        }
    }
}

#[test]
fn schedule_repeat() {
    let sys = ActorSystem::new().unwrap();

    let props = Props::new(ScheduleRepeat::new);
    let actor = sys.actor_of(props, "schedule-repeat").unwrap();

    let (probe, listen) = probe();

    actor.tell(TestProbe(probe), None);

    p_assert_eq!(listen, ());
}