use std::marker::PhantomData;
use std::collections::VecDeque;
use futures::{Async, Stream};
use futures::unsync::oneshot::Sender;
use smallvec::SmallVec;
use fut::ActorFuture;
use queue::{sync, unsync};
use actor::{Actor, AsyncContext, SpawnHandle};
use address::{Address, SyncAddress};
use envelope::Envelope;
use constants::MAX_SYNC_POLLS;
#[derive(Debug, PartialEq)]
pub enum ContextCellResult {
NotReady,
Ready,
Stop,
}
pub enum ContextProtocol<A: Actor> {
Envelope(Envelope<A>),
Upgrade(Sender<SyncAddress<A>>),
}
pub trait ContextCell<A> where Self: 'static, A: Actor {
fn alive(&self) -> bool;
fn poll(&mut self, act: &mut A, ctx: &mut A::Context, stop: bool) -> ContextCellResult;
}
impl<A: Actor> ContextCell<A> for () {
fn alive(&self) -> bool {
false
}
fn poll(&mut self, _: &mut A, _: &mut A::Context, _: bool) -> ContextCellResult {
ContextCellResult::Ready
}
}
pub struct ActorAddressCell<A> where A: Actor, A::Context: AsyncContext<A> {
sync_alive: bool,
sync_msgs: Option<sync::UnboundedReceiver<Envelope<A>>>,
unsync_msgs: unsync::UnboundedReceiver<ContextProtocol<A>>,
}
impl<A> Default for ActorAddressCell<A> where A: Actor, A::Context: AsyncContext<A> {
#[inline]
fn default() -> Self {
ActorAddressCell {
sync_alive: false,
sync_msgs: None,
unsync_msgs: unsync::unbounded(),
}
}
}
impl<A> ActorAddressCell<A> where A: Actor, A::Context: AsyncContext<A>
{
#[inline]
pub fn new(rx: sync::UnboundedReceiver<Envelope<A>>) -> Self {
ActorAddressCell {
sync_alive: true,
sync_msgs: Some(rx),
unsync_msgs: unsync::unbounded(),
}
}
#[inline]
pub fn close(&mut self) {
self.unsync_msgs.close();
if let Some(ref mut msgs) = self.sync_msgs {
msgs.close()
}
}
#[inline]
pub fn connected(&mut self) -> bool {
self.unsync_msgs.connected() || self.sync_alive
}
#[inline]
pub fn unsync_sender(&mut self) -> unsync::UnboundedSender<ContextProtocol<A>> {
self.unsync_msgs.sender()
}
#[inline]
pub fn unsync_address(&mut self) -> Address<A> {
Address::new(self.unsync_msgs.sender())
}
pub fn sync_address(&mut self) -> SyncAddress<A> {
if self.sync_msgs.is_none() {
let (tx, rx) = sync::unbounded();
self.sync_msgs = Some(rx);
self.sync_alive = true;
SyncAddress::new(tx)
} else {
if let Some(ref mut addr) = self.sync_msgs {
return SyncAddress::new(addr.sender())
}
unreachable!();
}
}
pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, stop: bool) -> ContextCellResult {
let mut n_polls: u32 = 0;
loop {
let mut not_ready = true;
n_polls += 1;
match self.unsync_msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
not_ready = false;
match msg {
ContextProtocol::Envelope(mut env) => {
env.handle(act, ctx)
}
ContextProtocol::Upgrade(tx) => {
let _ = tx.send(self.sync_address());
}
}
}
Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) => (),
}
if self.sync_alive {
if let Some(ref mut msgs) = self.sync_msgs {
match msgs.poll() {
Ok(Async::Ready(Some(mut msg))) => {
not_ready = false;
msg.handle(act, ctx);
}
Ok(Async::Ready(None)) | Err(_) => {
self.sync_alive = false;
},
Ok(Async::NotReady) => (),
}
}
}
if not_ready || n_polls == MAX_SYNC_POLLS {
if stop {
self.close()
}
return ContextCellResult::Ready;
}
}
}
}
type Item<A> = (SpawnHandle, Option<Box<ActorFuture<Item=(), Error=(), Actor=A>>>);
pub struct ActorItemsCell<A> where A: Actor, A::Context: AsyncContext<A> {
index: SpawnHandle,
items: SmallVec<[Item<A>; 2]>,
}
impl<A> Default for ActorItemsCell<A> where A: Actor, A::Context: AsyncContext<A> {
#[inline]
fn default() -> Self {
ActorItemsCell {
index: SpawnHandle::default(),
items: SmallVec::new(),
}
}
}
impl<A> ActorItemsCell<A> where A: Actor, A::Context: AsyncContext<A>
{
#[inline]
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
#[inline]
pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
let fut: Box<ActorFuture<Item=(), Error=(), Actor=A>> = Box::new(fut);
self.index = self.index.next();
self.items.push((self.index, Some(fut)));
self.index
}
#[inline]
pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
for item in &mut self.items {
if item.0 == handle {
item.1.take();
return true
}
}
false
}
pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, stop: bool) -> ContextCellResult {
loop {
let mut idx = 0;
let mut len = self.items.len();
let mut not_ready = true;
while idx < len {
let drop = if let Some(ref mut item) = self.items[idx].1 {
match item.poll(act, ctx) {
Ok(val) => match val {
Async::Ready(_) => {
not_ready = false;
true
}
Async::NotReady => false,
},
Err(_) => true,
}
} else { true };
len = self.items.len();
if drop {
len -= 1;
if idx >= len {
self.items.pop();
not_ready = true;
break
} else {
self.items[idx] = self.items.pop().unwrap();
}
} else {
idx += 1;
}
}
if not_ready {
if stop {
self.items.clear();
}
return ContextCellResult::Ready
}
}
}
}
pub struct ActorWaitCell<A>
where A: Actor, A::Context: AsyncContext<A>,
{
act: PhantomData<A>,
fut: VecDeque<Box<ActorFuture<Item=(), Error=(), Actor=A>>>,
}
impl<A> Default for ActorWaitCell<A>
where A: Actor, A::Context: AsyncContext<A>
{
#[inline]
fn default() -> ActorWaitCell<A> {
ActorWaitCell {
act: PhantomData,
fut: VecDeque::new() }
}
}
impl<A> ActorWaitCell<A>
where A: Actor, A::Context: AsyncContext<A>,
{
#[inline]
pub fn is_empty(&self) -> bool {
self.fut.is_empty()
}
#[inline]
pub fn add<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.fut.push_back(Box::new(fut));
}
pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, stop: bool) -> ContextCellResult {
loop {
if let Some(fut) = self.fut.front_mut() {
match fut.poll(act, ctx) {
Ok(Async::NotReady) => {
if !stop {
return ContextCellResult::NotReady
}
},
Ok(Async::Ready(_)) | Err(_) => (),
}
}
if stop {
self.fut.clear();
}
if self.fut.is_empty() {
return ContextCellResult::Ready
} else {
self.fut.pop_front();
}
}
}
}