#![allow(dead_code)]
use std;
use std::collections::VecDeque;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use futures::sync::oneshot::Sender as SyncSender;
use tokio_core::reactor::Handle;
use tokio_io::AsyncRead;
use tokio_io::codec::{Framed, Encoder, Decoder};
use fut::ActorFuture;
use actor::{Actor, Supervised,
Handler, ResponseType, StreamHandler, SpawnHandle,
FramedActor, ActorState, ActorContext, AsyncContext};
use address::{Subscriber};
use context::{ActorAddressCell, ActorItemsCell, ActorWaitCell, AsyncContextApi};
use envelope::{Envelope, ToEnvelope, RemoteEnvelope};
use message::Response;
pub struct FramedContext<A>
where A: FramedActor + Actor<Context=FramedContext<A>>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
act: A,
state: ActorState,
modified: bool,
address: ActorAddressCell<A>,
framed: Option<ActorFramedCell<A>>,
wait: ActorWaitCell<A>,
items: ActorItemsCell<A>,
}
impl<A> ToEnvelope<A> for FramedContext<A>
where A: FramedActor + Actor<Context=FramedContext<A>>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
fn pack<M>(msg: M, tx: Option<SyncSender<Result<M::Item, M::Error>>>) -> Envelope<A>
where M: ResponseType + Send + 'static,
A: Handler<M>,
A: FramedActor + Actor<Context=FramedContext<A>>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
M::Item: Send,
M::Error: Send,
{
Envelope::new(RemoteEnvelope::new(msg, tx))
}
}
impl<A> ActorContext for FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
fn stop(&mut self) {
self.close();
self.address.close();
if self.state == ActorState::Running {
self.state = ActorState::Stopping;
}
}
fn terminate(&mut self) {
self.close();
self.address.close();
self.items.close();
self.state = ActorState::Stopped;
}
fn state(&self) -> ActorState {
self.state
}
}
impl<A> AsyncContext<A> for FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.modified = true;
self.items.spawn(fut)
}
fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{
self.modified = true;
self.wait.add(fut)
}
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.modified = true;
self.items.cancel_future(handle)
}
fn cancel_future_on_stop(&mut self, handle: SpawnHandle) {
self.items.cancel_future_on_stop(handle)
}
}
impl<A> AsyncContextApi<A> for FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
fn address_cell(&mut self) -> &mut ActorAddressCell<A> {
&mut self.address
}
}
impl<A> FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
pub fn send(&mut self, msg: <<A as FramedActor>::Codec as Encoder>::Item)
-> Result<(), <<A as FramedActor>::Codec as Encoder>::Item>
{
if let Some(ref mut framed) = self.framed {
framed.send(msg);
Ok(())
} else {
Err(msg)
}
}
pub fn close(&mut self) {
self.items.stop();
if let Some(ref mut framed) = self.framed.take() {
framed.close();
}
}
}
impl<A> FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
#[doc(hidden)]
pub fn subscriber<M>(&mut self) -> Box<Subscriber<M>>
where A: Handler<M>,
M: ResponseType + 'static,
{
Box::new(self.address.unsync_address())
}
#[doc(hidden)]
pub fn sync_subscriber<M>(&mut self) -> Box<Subscriber<M> + Send>
where A: Handler<M>,
M: ResponseType + Send + 'static,
M::Item: Send,
M::Error: Send,
{
Box::new(self.address.sync_address())
}
}
impl<A> FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
pub(crate) fn new(act: A, io: <A as FramedActor>::Io,
codec: <A as FramedActor>::Codec) -> FramedContext<A>
{
FramedContext {
act: act,
state: ActorState::Started,
modified: false,
address: ActorAddressCell::default(),
framed: Some(ActorFramedCell::new(io.framed(codec))),
wait: ActorWaitCell::default(),
items: ActorItemsCell::default(),
}
}
pub(crate) fn run(self, handle: &Handle) {
handle.spawn(self.map(|_| ()).map_err(|_| ()));
}
pub(crate) fn restarting(&mut self) where A: Supervised {
let ctx: &mut FramedContext<A> = unsafe {
std::mem::transmute(self as &mut FramedContext<A>)
};
self.act.restarting(ctx);
}
pub(crate) fn replace_actor(&mut self, srv: A) -> A {
std::mem::replace(&mut self.act, srv)
}
pub(crate) fn address_cell(&mut self) -> &mut ActorAddressCell<A> {
&mut self.address
}
pub(crate) fn into_inner(self) -> A {
self.act
}
}
#[doc(hidden)]
impl<A> Future for FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ctx: &mut FramedContext<A> = unsafe {
std::mem::transmute(self as &mut FramedContext<A>)
};
match self.state {
ActorState::Started => {
Actor::started(&mut self.act, ctx);
self.state = ActorState::Running;
},
ActorState::Stopping => {
Actor::stopping(&mut self.act, ctx);
}
_ => ()
}
let mut prep_stop = false;
loop {
self.modified = false;
if self.wait.poll(&mut self.act, ctx) {
return Ok(Async::NotReady)
}
self.address.poll(&mut self.act, ctx);
let closed = if let Some(ref mut framed) = self.framed {
match framed.poll(&mut self.act, ctx) {
Ok(Async::Ready(_)) | Err(_) => {
self.modified = true;
true
},
_ => false
}
} else { false };
if closed {
self.framed.take();
self.items.stop();
}
self.items.poll(&mut self.act, ctx);
if self.modified {
continue
}
match self.state {
ActorState::Stopped => {
self.state = ActorState::Stopped;
Actor::stopped(&mut self.act, ctx);
return Ok(Async::Ready(()))
},
ActorState::Stopping => {
if prep_stop {
if self.framed.is_some() ||
self.address.connected() ||
!self.items.is_empty()
{
self.state = ActorState::Running;
continue
} else {
self.state = ActorState::Stopped;
Actor::stopped(&mut self.act, ctx);
return Ok(Async::Ready(()))
}
} else {
Actor::stopping(&mut self.act, ctx);
prep_stop = true;
continue
}
},
ActorState::Running => {
if !self.framed.is_some() && !self.address.connected() &&
self.items.is_empty()
{
self.state = ActorState::Stopping;
Actor::stopping(&mut self.act, ctx);
prep_stop = true;
continue
}
},
_ => (),
}
return Ok(Async::NotReady)
}
}
}
impl<A> std::fmt::Debug for FramedContext<A>
where A: Actor<Context=Self> + FramedActor,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f,
"FramedContext({:?}: actor:{:?}) {{ state: {:?}, connected: {}, items: {} }}",
self as *const _,
&self.act as *const _,
self.state, "-", self.items.is_empty())
}
}
pub(crate)
struct ActorFramedCell<A>
where A: Actor + FramedActor,
A::Context: AsyncContext<A>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
started: bool,
closing: bool,
response: Option<Response<A, <<A as FramedActor>::Codec as Decoder>::Item>>,
framed: Framed<<A as FramedActor>::Io, <A as FramedActor>::Codec>,
stream_closed: bool,
sink_closed: bool,
sink_flushed: bool,
sink_items: VecDeque<<<A as FramedActor>::Codec as Encoder>::Item>,
}
impl<A> ActorFramedCell<A>
where A: Actor + FramedActor,
A::Context: AsyncContext<A>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
pub fn new(framed: Framed<<A as FramedActor>::Io, <A as FramedActor>::Codec>)
-> ActorFramedCell<A>
{
ActorFramedCell {
started: false,
closing: false,
response: None,
framed: framed,
stream_closed: false,
sink_closed: false,
sink_flushed: true,
sink_items: VecDeque::new(),
}
}
pub fn alive(&self) -> bool {
self.sink_closed || self.stream_closed
}
pub fn close(&mut self) {
self.closing = true;
}
pub fn send(&mut self, msg: <<A as FramedActor>::Codec as Encoder>::Item) {
self.sink_items.push_back(msg);
}
}
impl<A> ActorFuture for ActorFramedCell<A>
where A: Actor + FramedActor,
A::Context: AsyncContext<A>,
A: StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>,
<<A as FramedActor>::Codec as Decoder>::Item: ResponseType,
{
type Item = ();
type Error = ();
type Actor = A;
fn poll(&mut self, act: &mut A, ctx: &mut A::Context) -> Poll<Self::Item, Self::Error>
{
if !self.started {
self.started = true;
<A as StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>>
::started(act, ctx);
}
loop {
let mut not_ready = true;
if let Some(mut fut) = self.response.take() {
match fut.poll_response(act, ctx) {
Ok(Async::NotReady) => {
self.response = Some(fut);
},
Ok(Async::Ready(_)) => (),
Err(_) => {
self.stream_closed = true;
}
}
}
if !self.closing && !self.stream_closed && self.response.is_none() {
match self.framed.poll() {
Ok(Async::Ready(Some(msg))) => {
let fut =
<Self::Actor as
Handler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>>
::handle(act, msg, ctx);
self.response = Some(fut);
not_ready = false;
}
Ok(Async::Ready(None)) => {
<A as StreamHandler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>>
::finished(act, ctx);
self.sink_closed = true;
self.stream_closed = true;
}
Ok(Async::NotReady) => (),
Err(err) => {
self.sink_closed = true;
self.stream_closed = true;
<Self::Actor as Handler<<<A as FramedActor>::Codec as Decoder>::Item,
<<A as FramedActor>::Codec as Decoder>::Error>>
::error(act, err, ctx);
}
}
}
if !self.sink_closed {
loop {
if let Some(msg) = self.sink_items.pop_front() {
match self.framed.start_send(msg) {
Ok(AsyncSink::NotReady(msg)) => {
self.sink_items.push_front(msg);
}
Ok(AsyncSink::Ready) => {
self.sink_flushed = false;
continue
}
Err(err) => {
self.sink_closed = true;
self.sink_flushed = true;
<A as FramedActor>::error(act, err, ctx);
break
}
}
}
break
}
if !self.sink_flushed {
match self.framed.poll_complete() {
Ok(Async::Ready(_)) => {
not_ready = false;
self.sink_flushed = true;
}
Ok(Async::NotReady) => (),
Err(err) => {
self.sink_closed = true;
self.sink_flushed = true;
<A as FramedActor>::error(act, err, ctx);
}
}
}
if self.closing && self.sink_flushed && self.sink_items.is_empty() {
self.sink_closed = true;
}
}
if !not_ready {
continue
}
if self.stream_closed && self.sink_closed {
return Ok(Async::Ready(()))
} else {
return Ok(Async::NotReady)
}
}
}
}