#![cfg_attr(
feature = "cargo-clippy", allow(redundant_field_names, suspicious_arithmetic_impl)
)]
use futures::{Async, Poll};
use smallvec::SmallVec;
use actor::{Actor, ActorState, AsyncContext, Running, SpawnHandle, Supervised};
use address::{Addr, AddressReceiver};
use contextitems::ActorWaitItem;
use fut::ActorFuture;
use mailbox::Mailbox;
bitflags! {
struct ContextFlags: u8 {
const STARTED = 0b0000_0001;
const RUNNING = 0b0000_0010;
const STOPPING = 0b0000_0100;
const STOPPED = 0b0001_0000;
const MODIFIED = 0b0010_0000;
}
}
type Item<A> = (
SpawnHandle,
Box<ActorFuture<Item = (), Error = (), Actor = A>>,
);
pub struct ContextImpl<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
act: Option<A>,
flags: ContextFlags,
mailbox: Mailbox<A>,
wait: SmallVec<[ActorWaitItem<A>; 2]>,
items: SmallVec<[Item<A>; 3]>,
handles: SmallVec<[SpawnHandle; 2]>,
}
impl<A> ContextImpl<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
#[inline]
pub fn new(act: Option<A>) -> ContextImpl<A> {
ContextImpl {
act,
wait: SmallVec::new(),
items: SmallVec::new(),
flags: ContextFlags::RUNNING,
mailbox: Mailbox::default(),
handles: SmallVec::from_slice(&[
SpawnHandle::default(),
SpawnHandle::default(),
]),
}
}
#[inline]
pub fn with_receiver(act: Option<A>, rx: AddressReceiver<A>) -> Self {
ContextImpl {
act,
wait: SmallVec::new(),
items: SmallVec::new(),
flags: ContextFlags::RUNNING,
mailbox: Mailbox::new(rx),
handles: SmallVec::from_slice(&[
SpawnHandle::default(),
SpawnHandle::default(),
]),
}
}
#[inline]
pub fn actor(&mut self) -> &mut A {
self.act.as_mut().unwrap()
}
#[inline]
pub fn modify(&mut self) {
self.flags.insert(ContextFlags::MODIFIED);
}
#[inline]
pub fn waiting(&self) -> bool {
!self.wait.is_empty()
|| self
.flags
.intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
}
#[inline]
pub fn stop(&mut self) {
if self.flags.contains(ContextFlags::RUNNING) {
self.flags
.remove(ContextFlags::RUNNING | ContextFlags::MODIFIED);
self.flags.insert(ContextFlags::STOPPING);
}
}
#[inline]
pub fn terminate(&mut self) {
self.flags = ContextFlags::STOPPED;
}
#[inline]
pub fn state(&self) -> ActorState {
if self.flags.contains(ContextFlags::RUNNING) {
ActorState::Running
} else if self.flags.contains(ContextFlags::STOPPED) {
ActorState::Stopped
} else if self.flags.contains(ContextFlags::STOPPING) {
ActorState::Stopping
} else {
ActorState::Started
}
}
#[inline]
pub fn curr_handle(&self) -> SpawnHandle {
self.handles[1]
}
#[inline]
pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
{
self.modify();
let handle = self.handles[0].next();
self.handles[0] = handle;
let fut: Box<ActorFuture<Item = (), Error = (), Actor = A>> = Box::new(fut);
self.items.push((handle, fut));
handle
}
#[inline]
pub fn wait<F>(&mut self, f: F)
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
{
self.modify();
self.wait.push(ActorWaitItem::new(f));
}
#[inline]
pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.handles.push(handle);
true
}
#[inline]
pub fn capacity(&mut self) -> usize {
self.mailbox.capacity()
}
#[inline]
pub fn set_mailbox_capacity(&mut self, cap: usize) {
self.modify();
self.mailbox.set_capacity(cap);
}
#[inline]
pub fn address(&self) -> Addr<A> {
self.mailbox.address()
}
#[inline]
pub fn alive(&self) -> bool {
if self.flags.contains(ContextFlags::STOPPED) {
false
} else {
!self.flags.contains(ContextFlags::STARTED)
|| self.mailbox.connected()
|| !self.items.is_empty()
|| !self.wait.is_empty()
}
}
#[inline]
fn stopping(&self) -> bool {
self.flags
.intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
}
#[inline]
pub fn restart(&mut self, ctx: &mut A::Context) -> bool
where
A: Supervised,
{
if self.act.is_none() || !self.mailbox.connected() {
false
} else {
self.flags = ContextFlags::RUNNING;
self.wait = SmallVec::new();
self.items = SmallVec::new();
self.handles[0] = SpawnHandle::default();
self.actor().restarting(ctx);
true
}
}
#[inline]
pub fn set_actor(&mut self, act: A) {
self.act = Some(act);
self.modify();
}
#[inline]
pub fn into_inner(self) -> Option<A> {
self.act
}
#[inline]
pub fn started(&mut self) -> bool {
self.flags.contains(ContextFlags::STARTED)
}
pub fn poll(&mut self, ctx: &mut A::Context) -> Poll<(), ()> {
let act: &mut A = if let Some(ref mut act) = self.act {
unsafe { &mut *(act as *mut A) }
} else {
return Ok(Async::Ready(()));
};
if !self.flags.contains(ContextFlags::STARTED) {
self.flags.insert(ContextFlags::STARTED);
Actor::started(act, ctx);
while self.handles.len() > 2 {
let handle = self.handles.pop().unwrap();
let mut idx = 0;
while idx < self.items.len() {
if self.items[idx].0 == handle {
self.items.swap_remove(idx);
} else {
idx += 1;
}
}
}
}
'outer: loop {
self.flags.remove(ContextFlags::MODIFIED);
while !self.wait.is_empty() && !self.stopping() {
let idx = self.wait.len() - 1;
if let Some(item) = self.wait.last_mut() {
match item.poll(act, ctx) {
Async::Ready(_) => (),
Async::NotReady => return Ok(Async::NotReady),
}
}
self.wait.remove(idx);
}
self.mailbox.poll(act, ctx);
if !self.wait.is_empty() && !self.stopping() {
continue;
}
let mut idx = 0;
while idx < self.items.len() && !self.stopping() {
self.handles[1] = self.items[idx].0;
match self.items[idx].1.poll(act, ctx) {
Ok(Async::NotReady) => {
if self.handles.len() > 2 {
while self.handles.len() > 2 {
let handle = self.handles.pop().unwrap();
let mut idx = 0;
while idx < self.items.len() {
if self.items[idx].0 == handle {
self.items.swap_remove(idx);
} else {
idx += 1;
}
}
}
continue 'outer;
}
if !self.wait.is_empty() && !self.stopping() {
let next = self.items.len() - 1;
if idx != next {
self.items.swap(idx, next);
}
continue 'outer;
} else {
idx += 1;
}
}
Ok(Async::Ready(())) | Err(_) => {
self.items.swap_remove(idx);
if !self.wait.is_empty() && !self.stopping() {
continue 'outer;
}
}
}
}
self.handles[1] = SpawnHandle::default();
if self.flags.contains(ContextFlags::MODIFIED)
&& !self.flags.contains(ContextFlags::STOPPING)
{
continue;
}
if self.flags.contains(ContextFlags::RUNNING) {
if !self.alive() && Actor::stopping(act, ctx) == Running::Stop {
self.flags = ContextFlags::STOPPED | ContextFlags::STARTED;
Actor::stopped(act, ctx);
return Ok(Async::Ready(()));
}
} else if self.flags.contains(ContextFlags::STOPPING) {
if Actor::stopping(act, ctx) == Running::Stop {
self.flags = ContextFlags::STOPPED | ContextFlags::STARTED;
Actor::stopped(act, ctx);
return Ok(Async::Ready(()));
} else {
self.flags.remove(ContextFlags::STOPPING);
self.flags.insert(ContextFlags::RUNNING);
continue;
}
} else if self.flags.contains(ContextFlags::STOPPED) {
Actor::stopped(act, ctx);
return Ok(Async::Ready(()));
}
return Ok(Async::NotReady);
}
}
}