actix 0.5.7

Actor framework for Rust
Documentation
#[macro_use]
extern crate actix;
extern crate futures;
extern crate tokio_core;

use actix::prelude::*;
use futures::{future, Future};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio_core::reactor::Timeout;

#[derive(Message, Debug)]
struct Ping(usize);

use std::marker::PhantomData;
use std::rc::Rc;

#[derive(Message, Debug)]
struct UnsyncPing(usize, PhantomData<Rc<bool>>);

struct MyActor(Arc<AtomicUsize>);

impl Actor for MyActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<Ping> for MyActor {
    type Result = ();

    fn handle(&mut self, _: Ping, _: &mut Self::Context) {
        self.0.fetch_add(1, Ordering::Relaxed);
    }
}

impl actix::Handler<UnsyncPing> for MyActor {
    type Result = ();

    fn handle(&mut self, _: UnsyncPing, _: &mut Self::Context) {
        self.0.fetch_add(1, Ordering::Relaxed);
    }
}

struct MyActor3;

impl Actor for MyActor3 {
    type Context = Context<Self>;
}

impl actix::Handler<Ping> for MyActor3 {
    type Result = ();

    fn handle(&mut self, _: Ping, _: &mut actix::Context<MyActor3>) -> Self::Result {
        Arbiter::system().do_send(actix::msgs::SystemExit(0));
    }
}

#[test]
fn test_address() {
    let sys = System::new("test");
    let count = Arc::new(AtomicUsize::new(0));

    let addr: Addr<Unsync, _> = MyActor(Arc::clone(&count)).start();
    let addr2 = addr.clone();
    addr.do_send(Ping(0));

    Arbiter::handle().spawn_fn(move || {
        addr2.do_send(Ping(1));

        Timeout::new(Duration::new(0, 100), Arbiter::handle())
            .unwrap()
            .then(move |_| {
                addr2.do_send(Ping(2));
                Arbiter::system().do_send(actix::msgs::SystemExit(0));
                future::result(Ok(()))
            })
    });

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 3);
}

#[test]
fn test_recipient_call() {
    let sys = System::new("test");
    let count = Arc::new(AtomicUsize::new(0));

    let addr: Addr<Unsync, _> = MyActor(Arc::clone(&count)).start();
    let addr2 = addr.clone().recipient();
    addr.do_send(UnsyncPing(0, PhantomData));

    Arbiter::handle().spawn(addr2.send(Ping(1)).then(move |_| {
        addr2.send(Ping(2)).then(|_| {
            Arbiter::system().do_send(actix::msgs::SystemExit(0));
            Ok(())
        })
    }));

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 3);
}

#[test]
fn test_sync_address() {
    let sys = System::new("test");
    let count = Arc::new(AtomicUsize::new(0));
    let arbiter = Arbiter::new("sync-test");

    let addr: Addr<Syn, _> = MyActor(Arc::clone(&count)).start();
    let addr2 = addr.clone();
    let addr3 = addr.clone();
    addr.do_send(Ping(1));

    arbiter.do_send(actix::msgs::Execute::new(
        move || -> Result<(), ()> {
            addr3.do_send(Ping(2));

            Arbiter::handle().spawn_fn(move || {
                Timeout::new(Duration::new(0, 1000), Arbiter::handle())
                    .unwrap()
                    .then(move |_| {
                        Arbiter::system().do_send(actix::msgs::SystemExit(0));
                        future::result(Ok(()))
                    })
            });
            Ok(())
        },
    ));

    Arbiter::handle().spawn_fn(move || {
        addr2.do_send(Ping(3));

        Timeout::new(Duration::new(0, 100), Arbiter::handle())
            .unwrap()
            .then(move |_| {
                addr2.do_send(Ping(4));
                future::result(Ok(()))
            })
    });

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 4);
}

#[test]
fn test_sync_recipient_call() {
    let sys = System::new("test");
    let count = Arc::new(AtomicUsize::new(0));

    let addr: Addr<Syn, _> = MyActor(Arc::clone(&count)).start();
    let addr2 = addr.clone().recipient();
    addr.do_send(Ping(0));

    Arbiter::handle().spawn(addr2.send(Ping(1)).then(move |_| {
        addr2.send(Ping(2)).then(|_| {
            Arbiter::system().do_send(actix::msgs::SystemExit(0));
            Ok(())
        })
    }));

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 3);
}

#[test]
fn test_error_result() {
    let sys = System::new("test");

    let addr: Addr<Unsync, _> = MyActor3.start();

    Arbiter::handle().spawn_fn(move || {
        addr.send(Ping(0)).then(|res| {
            match res {
                Ok(_) => (),
                _ => panic!("Should not happen"),
            }
            futures::future::result(Ok(()))
        })
    });

    sys.run();
}

struct TimeoutActor;

impl Actor for TimeoutActor {
    type Context = actix::Context<Self>;
}

impl Handler<Ping> for TimeoutActor {
    type Result = ();

    fn handle(&mut self, _: Ping, ctx: &mut Self::Context) {
        Timeout::new(Duration::new(0, 5_000_000), Arbiter::handle())
            .unwrap()
            .map_err(|_| ())
            .into_actor(self)
            .wait(ctx);
    }
}

#[test]
fn test_message_timeout() {
    let sys = System::new("test");

    let addr: Addr<Unsync, _> = TimeoutActor.start();
    let count = Arc::new(AtomicUsize::new(0));
    let count2 = Arc::clone(&count);

    Arbiter::handle().spawn_fn(move || {
        addr.do_send(Ping(0));
        addr.send(Ping(0))
            .timeout(Duration::new(0, 1_000))
            .then(move |res| {
                match res {
                    Ok(_) => panic!("Should not happen"),
                    Err(MailboxError::Timeout) => {
                        count2.fetch_add(1, Ordering::Relaxed);
                    }
                    _ => panic!("Should not happen"),
                }
                Arbiter::system().do_send(actix::msgs::SystemExit(0));
                futures::future::result(Ok(()))
            })
    });

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 1);
}

#[test]
fn test_sync_message_timeout() {
    let sys = System::new("test");

    let addr: Addr<Syn, _> = TimeoutActor.start();
    let count = Arc::new(AtomicUsize::new(0));
    let count2 = Arc::clone(&count);

    Arbiter::handle().spawn_fn(move || {
        addr.do_send(Ping(0));
        addr.send(Ping(0))
            .timeout(Duration::new(0, 1_000))
            .then(move |res| {
                match res {
                    Ok(_) => panic!("Should not happen"),
                    Err(MailboxError::Timeout) => {
                        count2.fetch_add(1, Ordering::Relaxed);
                    }
                    _ => panic!("Should not happen"),
                }
                Arbiter::system().do_send(actix::msgs::SystemExit(0));
                futures::future::result(Ok(()))
            })
    });

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 1);
}

struct TimeoutActor2(Addr<Unsync, TimeoutActor>, Arc<AtomicUsize>);

impl Actor for TimeoutActor2 {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.0.do_send(Ping(0));
        self.0
            .send(Ping(0))
            .timeout(Duration::new(0, 1_000))
            .into_actor(self)
            .then(move |res, act, _| {
                match res {
                    Ok(_) => panic!("Should not happen"),
                    Err(MailboxError::Timeout) => {
                        act.1.fetch_add(1, Ordering::Relaxed);
                    }
                    _ => panic!("Should not happen"),
                }
                Arbiter::system().do_send(actix::msgs::SystemExit(0));
                actix::fut::ok(())
            })
            .wait(ctx)
    }
}

#[test]
fn test_call_message_timeout() {
    let sys = System::new("test");
    let addr: Addr<Unsync, _> = TimeoutActor.start();

    let count = Arc::new(AtomicUsize::new(0));
    let count2 = Arc::clone(&count);
    let _addr2: Addr<Unsync, _> = TimeoutActor2(addr, count2).start();

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 1);
}

struct TimeoutActor3(Addr<Syn, TimeoutActor>, Arc<AtomicUsize>);

impl Actor for TimeoutActor3 {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.0.do_send(Ping(0));
        self.0
            .send(Ping(0))
            .timeout(Duration::new(0, 1_000))
            .into_actor(self)
            .then(move |res, act, _| {
                match res {
                    Ok(_) => panic!("Should not happen"),
                    Err(MailboxError::Timeout) => {
                        act.1.fetch_add(1, Ordering::Relaxed);
                    }
                    _ => panic!("Should not happen"),
                }
                Arbiter::system().do_send(actix::msgs::SystemExit(0));
                actix::fut::ok(())
            })
            .wait(ctx)
    }
}

#[test]
fn test_sync_call_message_timeout() {
    let sys = System::new("test");
    let addr: Addr<Syn, _> = TimeoutActor.start();

    let count = Arc::new(AtomicUsize::new(0));
    let count2 = Arc::clone(&count);
    let _addr2: Addr<Unsync, _> = TimeoutActor3(addr, count2).start();

    sys.run();
    assert_eq!(count.load(Ordering::Relaxed), 1);
}