use std::time::Duration;
use actix_rt::Arbiter;
use futures::Stream;
use log::error;
use crate::address::{channel, Addr};
use crate::context::Context;
use crate::contextitems::{
ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem,
};
use crate::fut::{ActorFuture, ActorStream};
use crate::handler::{Handler, Message};
use crate::mailbox::DEFAULT_CAPACITY;
use crate::stream::StreamHandler;
use crate::utils::{IntervalFunc, TimerFunc};
#[allow(unused_variables)]
pub trait Actor: Sized + Unpin + 'static {
type Context: ActorContext;
fn started(&mut self, ctx: &mut Self::Context) {}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn stopped(&mut self, ctx: &mut Self::Context) {}
fn start(self) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
{
Context::new().run(self)
}
fn start_default() -> Addr<Self>
where
Self: Actor<Context = Context<Self>> + Default,
{
Self::default().start()
}
fn start_in_arbiter<F>(arb: &Arbiter, f: F) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,
{
let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
arb.exec_fn(move || {
let mut ctx = Context::with_receiver(rx);
let act = f(&mut ctx);
let fut = ctx.into_future(act);
actix_rt::spawn(fut);
});
Addr::new(tx)
}
fn create<F>(f: F) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
F: FnOnce(&mut Context<Self>) -> Self,
{
let mut ctx = Context::new();
let act = f(&mut ctx);
ctx.run(act)
}
}
#[allow(unused_variables)]
pub trait Supervised: Actor {
fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
Started,
Running,
Stopping,
Stopped,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Running {
Stop,
Continue,
}
impl ActorState {
pub fn alive(self) -> bool {
self == ActorState::Started || self == ActorState::Running
}
pub fn stopping(self) -> bool {
self == ActorState::Stopping || self == ActorState::Stopped
}
}
pub trait ActorContext: Sized {
fn stop(&mut self);
fn terminate(&mut self);
fn state(&self) -> ActorState;
}
pub trait AsyncContext<A>: ActorContext
where
A: Actor<Context = Self>,
{
fn address(&self) -> Addr<A>;
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Output = (), Actor = A> + 'static;
fn wait<F>(&mut self, fut: F)
where
F: ActorFuture<Output = (), Actor = A> + 'static;
fn waiting(&self) -> bool;
fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
where
S: Stream + 'static,
A: StreamHandler<S::Item>,
{
<A as StreamHandler<S::Item>>::add_stream(fut, self)
}
fn add_message_stream<S>(&mut self, fut: S)
where
S: Stream + 'static,
S::Item: Message,
A: Handler<S::Item>,
{
if self.state() == ActorState::Stopped {
error!("Context::add_message_stream called for stopped actor.");
} else {
self.spawn(ActorMessageStreamItem::new(fut));
}
}
fn notify<M>(&mut self, msg: M)
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::notify called for stopped actor.");
} else {
self.spawn(ActorMessageItem::new(msg));
}
}
fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
where
A: Handler<M>,
M: Message + 'static,
{
if self.state() == ActorState::Stopped {
error!("Context::notify_later called for stopped actor.");
SpawnHandle::default()
} else {
self.spawn(ActorDelayedMessageItem::new(msg, after))
}
}
fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnOnce(&mut A, &mut A::Context) + 'static,
{
self.spawn(TimerFunc::new(dur, f))
}
fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
where
F: FnMut(&mut A, &mut A::Context) + 'static,
{
self.spawn(IntervalFunc::new(dur, f).finish())
}
}
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);
impl SpawnHandle {
pub fn next(self) -> SpawnHandle {
SpawnHandle(self.0 + 1)
}
#[doc(hidden)]
pub fn into_usize(self) -> usize {
self.0
}
}
impl Default for SpawnHandle {
fn default() -> SpawnHandle {
SpawnHandle(0)
}
}