use std::time::Duration;
use futures::Stream;
use address::Addr;
use arbiter::Arbiter;
use context::Context;
use contextitems::{ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem};
use fut::{ActorFuture, ActorStream};
use handler::{Handler, Message};
use stream::StreamHandler;
use utils::{IntervalFunc, TimerFunc};
#[allow(unused_variables)]
pub trait Actor: Sized + 'static {
type Context: ActorContext;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn stopped(&mut self, ctx: &mut Self::Context) {}
fn start(self) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
{
let ctx = Context::new(Some(self));
let addr = ctx.address();
ctx.run();
addr
}
fn start_default() -> Addr<Self>
where
Self: Actor<Context = Context<Self>> + Default,
{
Self::default().start()
}
fn create<F>(f: F) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
F: FnOnce(&mut Context<Self>) -> Self + 'static,
{
let ctx = Context::create(f);
let addr = ctx.address();
Arbiter::spawn(ctx);
addr
}
}
#[allow(unused_variables)]
pub trait Supervised: Actor {
fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
Started,
Running,
Stopping,
Stopped,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Running {
Stop,
Continue,
}
impl ActorState {
pub fn alive(&self) -> bool {
*self == ActorState::Started || *self == ActorState::Running
}
pub fn stopping(&self) -> bool {
*self == ActorState::Stopping || *self == ActorState::Stopped
}
}
pub trait ActorContext: Sized {
fn stop(&mut self);
fn terminate(&mut self);
fn state(&self) -> ActorState;
}
pub trait AsyncContext<A>: ActorContext
where
A: Actor<Context = Self>,
{
fn address(&self) -> Addr<A>;
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static;
fn wait<F>(&mut self, fut: F)
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static;
fn waiting(&self) -> bool;
fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
where
S: Stream + 'static,
A: StreamHandler<S::Item, S::Error>,
{
<A as StreamHandler<S::Item, S::Error>>::add_stream(fut, self)
}
fn add_message_stream<S>(&mut self, fut: S)
where
S: Stream<Error = ()> + 'static,
S::Item: Message,
A: Handler<S::Item>,
{
if self.state() == ActorState::Stopped {
error!("Context::add_message_stream called for stopped actor.");
} else {
self.spawn(ActorMessageStreamItem::new(fut));
}
}
fn notify<M>(&mut self, msg: M)
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
} else {
self.spawn(ActorMessageItem::new(msg));
}
}
fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
SpawnHandle::default()
} else {
self.spawn(ActorDelayedMessageItem::new(msg, after))
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnOnce(&mut A, &mut A::Context) + 'static,
{
self.spawn(TimerFunc::new(dur, f))
}
fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnMut(&mut A, &mut A::Context) + 'static,
{
self.spawn(IntervalFunc::new(dur, f).finish())
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);
impl SpawnHandle {
pub fn next(self) -> SpawnHandle {
SpawnHandle(self.0 + 1)
}
#[doc(hidden)]
pub fn into_usize(self) -> usize {
self.0
}
}
impl Default for SpawnHandle {
fn default() -> SpawnHandle {
SpawnHandle(0)
}
}