use std;
use std::time::Duration;
use futures::{future, Future, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::{Encoder, Decoder};
use fut::ActorFuture;
use message::Response;
use arbiter::Arbiter;
use address::ActorAddress;
use envelope::ToEnvelope;
use context::{Context, ActorFutureCell, ActorStreamCell};
use framed::FramedContext;
use utils::{TimerFunc, TimeoutWrapper};
#[allow(unused_variables)]
pub trait Actor: Sized + 'static {
type Context: ActorContext + ToEnvelope<Self>;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) {}
fn stopped(&mut self, ctx: &mut Self::Context) {}
fn start<Addr>(self) -> Addr
where Self: Actor<Context=Context<Self>> + ActorAddress<Self, Addr>
{
let mut ctx = Context::new(self);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
ctx.run(Arbiter::handle());
addr
}
fn start_default<Addr>() -> Addr
where Self: Default + Actor<Context=Context<Self>> + ActorAddress<Self, Addr>
{
Self::default().start()
}
fn create<Addr, F>(f: F) -> Addr
where Self: Actor<Context=Context<Self>> + ActorAddress<Self, Addr>,
F: FnOnce(&mut Context<Self>) -> Self + 'static
{
let mut ctx = Context::new(unsafe{std::mem::uninitialized()});
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
Arbiter::handle().spawn_fn(move || {
let act = f(&mut ctx);
let old = ctx.replace_actor(act);
std::mem::forget(old);
ctx.run(Arbiter::handle());
future::ok(())
});
addr
}
fn reply<M>(val: M::Item) -> Response<Self, M> where M: ResponseType {
Response::reply(val)
}
fn async_reply<T, M>(fut: T) -> Response<Self, M>
where M: ResponseType,
T: ActorFuture<Item=M::Item, Error=M::Error, Actor=Self> + Sized + 'static
{
Response::async_reply(fut)
}
fn empty<M>() -> Response<Self, M> where M: ResponseType<Item=()> {
Response::empty()
}
fn reply_error<M>(err: M::Error) -> Response<Self, M> where M: ResponseType {
Response::error(err)
}
}
#[allow(unused_variables)]
pub trait FramedActor: Actor {
type Io: AsyncRead + AsyncWrite;
type Codec: Encoder + Decoder;
fn error(&mut self, err: <Self::Codec as Encoder>::Error, ctx: &mut Self::Context) {}
fn framed<Addr>(self, io: Self::Io, codec: Self::Codec) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>,
Self: StreamHandler<<<Self as FramedActor>::Codec as Decoder>::Item,
<<Self as FramedActor>::Codec as Decoder>::Error>,
<<Self as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
let mut ctx = FramedContext::new(self, io, codec);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
ctx.run(Arbiter::handle());
addr
}
fn create_framed<Addr, F>(io: Self::Io, codec: Self::Codec, f: F) -> Addr
where Self: Actor<Context=FramedContext<Self>> + ActorAddress<Self, Addr>,
Self: StreamHandler<<<Self as FramedActor>::Codec as Decoder>::Item,
<<Self as FramedActor>::Codec as Decoder>::Error>,
<<Self as FramedActor>::Codec as Decoder>::Item: ResponseType,
F: FnOnce(&mut FramedContext<Self>) -> Self + 'static
{
let mut ctx = FramedContext::new(unsafe{std::mem::uninitialized()}, io, codec);
let addr = <Self as ActorAddress<Self, Addr>>::get(&mut ctx);
Arbiter::handle().spawn_fn(move || {
let act = f(&mut ctx);
let old = ctx.replace_actor(act);
std::mem::forget(old);
ctx.run(Arbiter::handle());
future::ok(())
});
addr
}
}
#[allow(unused_variables)]
pub trait Supervised: Actor {
fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}
#[allow(unused_variables)]
pub trait Handler<M, E=()> where Self: Actor, M: ResponseType
{
fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Response<Self, M>;
fn error(&mut self, err: E, ctx: &mut Self::Context) {}
}
pub trait ResponseType {
type Item;
type Error;
}
#[allow(unused_variables)]
pub trait StreamHandler<M, E=()>: Handler<M, E>
where Self: Actor,
M: ResponseType,
{
fn started(&mut self, ctx: &mut Self::Context) {}
fn finished(&mut self, ctx: &mut Self::Context) {}
}
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
Started,
Running,
Stopping,
Stopped,
}
pub trait ActorContext: Sized {
fn stop(&mut self);
fn terminate(&mut self);
fn state(&self) -> ActorState;
fn alive(&self) -> bool {
self.state() == ActorState::Stopped
}
}
pub trait AsyncContext<A>: ActorContext + ToEnvelope<A> where A: Actor<Context=Self>
{
fn address<Address>(&mut self) -> Address where A: ActorAddress<A, Address> {
<A as ActorAddress<A, Address>>::get(self)
}
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static;
fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static;
fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
#[doc(hidden)]
fn cancel_future_on_stop(&mut self, handle: SpawnHandle);
fn add_future<F>(&mut self, fut: F)
where F: Future + 'static,
F::Item: ResponseType,
A: Handler<F::Item, F::Error>
{
if self.state() == ActorState::Stopped {
error!("Context::add_future called for stopped actor.");
} else {
self.spawn(ActorFutureCell::new(fut));
}
}
fn add_stream<S>(&mut self, fut: S)
where S: Stream + 'static,
S::Item: ResponseType,
A: Handler<S::Item, S::Error> + StreamHandler<S::Item, S::Error>
{
if self.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
} else {
self.spawn(ActorStreamCell::new(fut));
}
}
fn notify<M, E>(&mut self, msg: M, after: Duration) -> SpawnHandle
where A: Handler<M, E>, M: ResponseType + 'static, E: 'static
{
if self.state() == ActorState::Stopped {
error!("Context::add_timeout called for stopped actor.");
SpawnHandle::default()
} else {
let h = self.spawn(ActorFutureCell::new(TimeoutWrapper::new(msg, after)));
self.cancel_future_on_stop(h);
h
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where F: FnOnce(&mut A, &mut A::Context) + 'static
{
let h = self.spawn(TimerFunc::new(dur, f));
self.cancel_future_on_stop(h);
h
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);
impl SpawnHandle {
pub fn next(self) -> SpawnHandle {
SpawnHandle(self.0 + 1)
}
}
impl Default for SpawnHandle {
fn default() -> SpawnHandle {
SpawnHandle(0)
}
}