use std::mem;
use futures::{Async, Poll};
use fut::ActorFuture;
use queue::{sync, unsync};
use actor::{Actor, AsyncContext, ActorState, SpawnHandle};
use address::{Address, SyncAddress, Subscriber};
use contextcells::{ContextCell, ContextCellResult, ContextProtocol,
ActorAddressCell, ActorItemsCell, ActorWaitCell};
use handler::{Handler, ResponseType};
use envelope::Envelope;
bitflags! {
struct ContextFlags: u8 {
const STARTED = 0b0000_0001;
const RUNNING = 0b0000_0010;
const STOPPING = 0b0000_0100;
const PREPSTOP = 0b0000_1000;
const STOPPED = 0b0001_0000;
const MODIFIED = 0b0010_0000;
}
}
macro_rules! cell_ready {
($slf:ident, $e:expr) => (match $e {
ContextCellResult::Ready => (),
ContextCellResult::NotReady => return Ok(Async::NotReady),
ContextCellResult::Stop => $slf.stop(),
})
}
pub struct ContextImpl<A, C=()> where A: Actor, A::Context: AsyncContext<A> {
act: Option<A>,
flags: ContextFlags,
wait: ActorWaitCell<A>,
items: ActorItemsCell<A>,
address: ActorAddressCell<A>,
cell: Option<C>,
}
impl<A, C> ContextImpl<A, C>
where A: Actor, A::Context: AsyncContext<A>, C: ContextCell<A>
{
#[inline]
pub fn new(act: Option<A>) -> ContextImpl<A, C> {
ContextImpl {
act: act,
flags: ContextFlags::RUNNING,
wait: ActorWaitCell::default(),
items: ActorItemsCell::default(),
address: ActorAddressCell::default(),
cell: None,
}
}
#[inline]
pub fn with_cell(act: Option<A>, cell: C) -> ContextImpl<A, C> {
ContextImpl {
act: act,
flags: ContextFlags::RUNNING,
wait: ActorWaitCell::default(),
items: ActorItemsCell::default(),
address: ActorAddressCell::default(),
cell: Some(cell),
}
}
#[inline]
pub fn with_receiver(act: Option<A>,
rx: sync::UnboundedReceiver<Envelope<A>>) -> ContextImpl<A, C> {
ContextImpl {
act: act,
flags: ContextFlags::RUNNING,
wait: ActorWaitCell::default(),
items: ActorItemsCell::default(),
address: ActorAddressCell::new(rx),
cell: None,
}
}
#[inline]
pub fn actor(&mut self) -> &mut A {
self.act.as_mut().unwrap()
}
#[inline]
pub fn cell(&mut self) -> &mut Option<C> {
&mut self.cell
}
#[inline]
pub fn modify(&mut self) {
self.flags.insert(ContextFlags::MODIFIED);
}
#[inline]
pub fn stop(&mut self) {
if self.flags.contains(ContextFlags::RUNNING) {
self.flags.remove(ContextFlags::RUNNING);
self.flags.insert(ContextFlags::STOPPING | ContextFlags::MODIFIED);
}
}
#[inline]
pub fn terminate(&mut self) {
self.flags = ContextFlags::STOPPED;
}
#[inline]
pub fn state(&self) -> ActorState {
if self.flags.contains(ContextFlags::RUNNING) {
ActorState::Running
} else if self.flags.contains(ContextFlags::STOPPING | ContextFlags::PREPSTOP) {
ActorState::Stopping
} else if self.flags.contains(ContextFlags::STOPPED) {
ActorState::Stopped
} else {
ActorState::Started
}
}
#[inline]
pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.flags.insert(ContextFlags::MODIFIED);
self.items.spawn(fut)
}
#[inline]
pub fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.flags.insert(ContextFlags::MODIFIED);
self.wait.add(fut)
}
#[inline]
pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.flags.insert(ContextFlags::MODIFIED);
self.items.cancel_future(handle)
}
#[inline]
pub fn unsync_sender(&mut self) -> unsync::UnboundedSender<ContextProtocol<A>> {
self.modify();
self.address.unsync_sender()
}
#[inline]
pub fn unsync_address(&mut self) -> Address<A> {
self.modify();
self.address.unsync_address()
}
#[inline]
pub fn sync_address(&mut self) -> SyncAddress<A> {
self.modify();
self.address.sync_address()
}
#[inline]
pub fn subscriber<M>(&mut self) -> Box<Subscriber<M>>
where A: Handler<M>,
M: ResponseType + 'static {
self.modify();
Box::new(self.address.unsync_address())
}
#[inline]
pub fn sync_subscriber<M>(&mut self) -> Box<Subscriber<M> + Send>
where A: Handler<M>,
M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send {
self.modify();
Box::new(self.address.sync_address())
}
#[inline]
pub fn alive(&mut self) -> bool {
if self.flags.contains(ContextFlags::STOPPED) {
false
} else {
self.address.connected() || !self.items.is_empty() || !self.wait.is_empty() ||
self.cell.as_ref().map(|c| c.alive()).unwrap_or(false)
}
}
#[inline]
pub fn set_actor(&mut self, act: A) {
self.act = Some(act);
self.modify();
}
#[inline]
pub fn into_inner(self) -> Option<A> {
self.act
}
#[inline]
pub fn started(&mut self) -> bool {
self.flags.contains(ContextFlags::STARTED)
}
pub fn poll(&mut self, ctx: &mut A::Context) -> Poll<(), ()> {
if self.act.is_none() {
return Ok(Async::Ready(()))
}
let act: &mut A = unsafe {
mem::transmute(self.act.as_mut().unwrap() as &mut A)
};
if !self.flags.contains(ContextFlags::STARTED) {
Actor::started(act, ctx);
self.flags.insert(ContextFlags::STARTED);
}
loop {
self.flags.remove(ContextFlags::MODIFIED);
let prepstop = self.flags.contains(ContextFlags::PREPSTOP);
cell_ready!{ self, self.wait.poll(act, ctx, prepstop) };
let stop = match self.cell {
Some(ref mut cell) => match cell.poll(act, ctx, prepstop) {
ContextCellResult::Ready => false,
ContextCellResult::NotReady => return Ok(Async::NotReady),
ContextCellResult::Stop => true,
},
None => false,
};
if stop {
self.stop();
}
cell_ready!{ self, self.address.poll(act, ctx, prepstop) };
cell_ready!{ self, self.items.poll(act, ctx, prepstop) };
if self.flags.contains(ContextFlags::MODIFIED) {
continue
}
if self.flags.contains(ContextFlags::RUNNING) {
if !self.alive() && Actor::stopping(act, ctx) {
self.flags.remove(ContextFlags::RUNNING);
self.flags.insert(ContextFlags::PREPSTOP);
continue
}
} else if self.flags.contains(ContextFlags::STOPPING) {
self.flags.remove(ContextFlags::STOPPING);
if Actor::stopping(act, ctx) {
self.flags.insert(ContextFlags::PREPSTOP);
} else {
self.flags.insert(ContextFlags::RUNNING);
}
continue
} else if self.flags.contains(ContextFlags::PREPSTOP) {
self.flags = ContextFlags::STOPPED;
Actor::stopped(act, ctx);
return Ok(Async::Ready(()))
} else if self.flags.contains(ContextFlags::STOPPED) {
Actor::stopped(act, ctx);
return Ok(Async::Ready(()))
}
return Ok(Async::NotReady)
}
}
}