use futures::{future, Stream};
use std::time::Duration;
use address::{ActorAddress, Addr, Syn, Unsync};
use arbiter::Arbiter;
use context::Context;
use contextitems::{ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem};
use fut::ActorFuture;
use handler::{Handler, Message};
use stream::StreamHandler;
use utils::TimerFunc;
#[allow(unused_variables)]
pub trait Actor: Sized + 'static {
type Context: ActorContext;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn stopped(&mut self, ctx: &mut Self::Context) {}
fn start<Addr>(self) -> Addr
where
Self: Actor<Context = Context<Self>> + ActorAddress<Self, Addr>,
{
let mut ctx = Context::new(Some(self));
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
ctx.run(Arbiter::handle());
addr
}
fn start_default<Addr>() -> Addr
where
Self: Default + Actor<Context = Context<Self>> + ActorAddress<Self, Addr>,
{
Self::default().start()
}
fn create<Addr, F>(f: F) -> Addr
where
Self: Actor<Context = Context<Self>> + ActorAddress<Self, Addr>,
F: FnOnce(&mut Context<Self>) -> Self + 'static,
{
let mut ctx = Context::new(None);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
Arbiter::handle().spawn_fn(move || {
let act = f(&mut ctx);
ctx.set_actor(act);
ctx.run(Arbiter::handle());
future::ok(())
});
addr
}
}
#[allow(unused_variables)]
pub trait Supervised: Actor {
fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
Started,
Running,
Stopping,
Stopped,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Running {
Stop,
Continue,
}
impl ActorState {
pub fn alive(&self) -> bool {
*self == ActorState::Started || *self == ActorState::Running
}
}
pub trait ActorContext: Sized {
fn stop(&mut self);
fn terminate(&mut self);
fn state(&self) -> ActorState;
}
pub trait AsyncContext<A>: ActorContext
where
A: Actor<Context = Self>,
{
fn address<Address>(&mut self) -> Address
where
A: ActorAddress<A, Address>,
{
<A as ActorAddress<A, Address>>::get(self)
}
#[doc(hidden)]
fn sync_address(&mut self) -> Addr<Syn, A>;
#[doc(hidden)]
fn unsync_address(&mut self) -> Addr<Unsync, A>;
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static;
fn wait<F>(&mut self, fut: F)
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static;
fn waiting(&self) -> bool;
fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
where
S: Stream + 'static,
A: StreamHandler<S::Item, S::Error>,
{
<A as StreamHandler<S::Item, S::Error>>::add_stream(fut, self)
}
fn add_message_stream<S>(&mut self, fut: S)
where
S: Stream<Error = ()> + 'static,
S::Item: Message,
A: Handler<S::Item>,
{
if self.state() == ActorState::Stopped {
error!("Context::add_message_stream called for stopped actor.");
} else {
self.spawn(ActorMessageStreamItem::new(fut));
}
}
fn notify<M>(&mut self, msg: M)
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
} else {
self.spawn(ActorMessageItem::new(msg));
}
}
fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
SpawnHandle::default()
} else {
self.spawn(ActorDelayedMessageItem::new(msg, after))
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnOnce(&mut A, &mut A::Context) + 'static,
{
self.spawn(TimerFunc::new(dur, f))
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);
impl SpawnHandle {
pub fn next(self) -> SpawnHandle {
SpawnHandle(self.0 + 1)
}
#[doc(hidden)]
pub fn into_usize(self) -> usize {
self.0
}
}
impl Default for SpawnHandle {
fn default() -> SpawnHandle {
SpawnHandle(0)
}
}