use std::time::Duration;
use futures::{future, Future, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Framed, Encoder, Decoder};
use fut::ActorFuture;
use message::Response;
use arbiter::Arbiter;
use address::ActorAddress;
use envelope::ToEnvelope;
use handler::{Handler, ResponseType};
use context::Context;
use contextitems::{ActorFutureItem, ActorMessageItem,
ActorDelayedMessageItem, ActorStreamItem, ActorMessageStreamItem};
use framed::FramedContext;
use utils::TimerFunc;
#[allow(unused_variables)]
pub trait Actor: Sized + 'static {
type Context: ActorContext + ToEnvelope<Self>;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) -> bool {
true
}
fn stopped(&mut self, ctx: &mut Self::Context) {}
fn start<Addr>(self) -> Addr
where Self: Actor<Context=Context<Self>> + ActorAddress<Self, Addr>
{
let mut ctx = Context::new(Some(self));
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
ctx.run(Arbiter::handle());
addr
}
fn start_default<Addr>() -> Addr
where Self: Default + Actor<Context=Context<Self>> + ActorAddress<Self, Addr>
{
Self::default().start()
}
fn create<Addr, F>(f: F) -> Addr
where Self: Actor<Context=Context<Self>> + ActorAddress<Self, Addr>,
F: FnOnce(&mut Context<Self>) -> Self + 'static
{
let mut ctx = Context::new(None);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
Arbiter::handle().spawn_fn(move || {
let act = f(&mut ctx);
ctx.set_actor(act);
ctx.run(Arbiter::handle());
future::ok(())
});
addr
}
fn reply<M>(val: Result<M::Item, M::Error>) -> Response<Self, M> where M: ResponseType {
Response::reply(val)
}
fn async_reply<T, M>(fut: T) -> Response<Self, M>
where M: ResponseType,
T: ActorFuture<Item=M::Item, Error=M::Error, Actor=Self> + Sized + 'static {
Response::async_reply(fut)
}
}
#[allow(unused_variables)]
pub trait FramedActor: Actor {
type Io: AsyncRead + AsyncWrite;
type Codec: Encoder + Decoder;
fn handle(&mut self,
msg: Result<<Self::Codec as Decoder>::Item, <Self::Codec as Decoder>::Error>,
ctx: &mut Self::Context);
fn closed(&mut self,
error: Option<<Self::Codec as Encoder>::Error>,
ctx: &mut Self::Context) {}
fn framed<Addr>(self, io: Self::Io, codec: Self::Codec) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>
{
Self::from_framed(self, io.framed(codec))
}
fn from_framed<Addr>(self, framed: Framed<Self::Io, Self::Codec>) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>
{
let mut ctx = FramedContext::framed(Some(self), framed);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
ctx.run(Arbiter::handle());
addr
}
fn create_framed<Addr, F>(io: Self::Io, codec: Self::Codec, f: F) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>,
F: FnOnce(&mut FramedContext<Self>) -> Self + 'static
{
Self::create_from_framed(io.framed(codec), f)
}
fn create_from_framed<Addr, F>(framed: Framed<Self::Io, Self::Codec>, f: F) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>,
F: FnOnce(&mut FramedContext<Self>) -> Self + 'static
{
let mut ctx = FramedContext::framed(None, framed);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
Arbiter::handle().spawn_fn(move || {
let act = f(&mut ctx);
ctx.set_actor(act);
ctx.run(Arbiter::handle());
future::ok(())
});
addr
}
}
#[allow(unused_variables)]
pub trait Supervised: Actor {
fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
Started,
Running,
Stopping,
Stopped,
}
pub trait ActorContext: Sized {
fn stop(&mut self);
fn terminate(&mut self);
fn state(&self) -> ActorState;
fn alive(&self) -> bool {
self.state() == ActorState::Stopped
}
}
pub trait AsyncContext<A>: ActorContext + ToEnvelope<A> where A: Actor<Context=Self>
{
fn address<Address>(&mut self) -> Address where A: ActorAddress<A, Address> {
<A as ActorAddress<A, Address>>::get(self)
}
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static;
fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static;
fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
fn add_future<F>(&mut self, fut: F)
where F: Future + 'static,
F::Item: ResponseType,
A: Handler<Result<F::Item, F::Error>>
{
if self.state() == ActorState::Stopped {
error!("Context::add_future called for stopped actor.");
} else {
self.spawn(ActorFutureItem::new(fut));
}
}
fn add_stream<S>(&mut self, fut: S)
where S: Stream + 'static,
S::Item: ResponseType,
A: Handler<Result<S::Item, S::Error>>
{
if self.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
} else {
self.spawn(ActorStreamItem::new(fut));
}
}
fn add_message_stream<S>(&mut self, fut: S)
where S: Stream<Error=()> + 'static,
S::Item: ResponseType,
A: Handler<S::Item>
{
if self.state() == ActorState::Stopped {
error!("Context::add_message_stream called for stopped actor.");
} else {
self.spawn(ActorMessageStreamItem::new(fut));
}
}
fn notify<M>(&mut self, msg: M)
where A: Handler<M>, M: ResponseType + 'static
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
} else {
self.spawn(ActorMessageItem::new(msg));
}
}
fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
where A: Handler<M>, M: ResponseType + 'static
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
SpawnHandle::default()
} else {
self.spawn(ActorDelayedMessageItem::new(msg, after))
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where F: FnOnce(&mut A, &mut A::Context) + 'static
{
self.spawn(TimerFunc::new(dur, f))
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);
impl SpawnHandle {
pub fn next(self) -> SpawnHandle {
SpawnHandle(self.0 + 1)
}
}
impl Default for SpawnHandle {
fn default() -> SpawnHandle {
SpawnHandle(0)
}
}