extern crate actix;
extern crate futures;
extern crate tokio_core;
use actix::msgs::SystemExit;
use actix::prelude::*;
use futures::stream::futures_ordered;
use futures::{Future, Stream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio_core::reactor::Timeout;
struct MyActor {
timeout: Arc<AtomicBool>,
}
#[derive(PartialEq, Copy, Clone)]
enum Error {
Timeout,
Generic,
}
impl Actor for MyActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
Timeout::new(Duration::new(0, 5_000_000), Arbiter::handle())
.unwrap()
.then(|_| {
Arbiter::system().do_send(SystemExit(0));
Ok::<_, Error>(())
})
.into_actor(self)
.timeout(Duration::new(0, 100), Error::Timeout)
.map_err(|e, act, _| {
if e == Error::Timeout {
act.timeout.store(true, Ordering::Relaxed);
Arbiter::system().do_send(SystemExit(0));
()
}
})
.wait(ctx)
}
}
#[test]
fn test_fut_timeout() {
let sys = System::new("test");
let timeout = Arc::new(AtomicBool::new(false));
let _addr: Addr<Unsync, _> = MyActor {
timeout: Arc::clone(&timeout),
}.start();
sys.run();
assert!(timeout.load(Ordering::Relaxed), "Not timeout");
}
struct MyStreamActor {
timeout: Arc<AtomicBool>,
}
impl Actor for MyStreamActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let s = futures_ordered(vec![
Timeout::new(Duration::new(0, 5_000_000), Arbiter::handle()),
Timeout::new(Duration::new(0, 5_000_000), Arbiter::handle()),
]);
s.and_then(|f| f)
.map_err(|_| Error::Generic)
.into_actor(self)
.timeout(Duration::new(0, 1000), Error::Timeout)
.map_err(|e, act, _| {
if e == Error::Timeout {
act.timeout.store(true, Ordering::Relaxed);
Arbiter::system().do_send(SystemExit(0));
()
}
})
.finish()
.wait(ctx)
}
}
#[test]
fn test_stream_timeout() {
let sys = System::new("test");
let timeout = Arc::new(AtomicBool::new(false));
let _addr: Addr<Unsync, _> = MyStreamActor {
timeout: Arc::clone(&timeout),
}.start();
sys.run();
assert!(timeout.load(Ordering::Relaxed), "Not timeout");
}