use std;
use std::thread;
use std::sync::Arc;
use std::marker::PhantomData;
use crossbeam::sync::MsQueue;
use futures::{Async, Future, Poll, Stream};
use futures::sync::oneshot::Sender as SyncSender;
use tokio_core::reactor::Core;
use actor::{Actor, ActorContext, ActorState, Handler, ResponseType};
use arbiter::Arbiter;
use address::SyncAddress;
use context::Context;
use envelope::{Envelope, EnvelopeProxy, ToEnvelope};
use message::Response;
use queue::sync;
pub struct SyncArbiter<A> where A: Actor<Context=SyncContext<A>> {
queue: Arc<MsQueue<SyncContextProtocol<A>>>,
msgs: sync::UnboundedReceiver<Envelope<A>>,
threads: usize,
}
impl<A> SyncArbiter<A> where A: Actor<Context=SyncContext<A>> + Send {
pub fn start<F>(threads: usize, f: F) -> SyncAddress<A>
where F: Fn() -> A + 'static
{
let queue = Arc::new(MsQueue::new());
for _ in 0..threads {
let actor = f();
let actor_queue = Arc::clone(&queue);
thread::spawn(move || {
SyncContext::new(actor, actor_queue)
.run()
});
}
let (tx, rx) = sync::unbounded();
Arbiter::handle().spawn(
SyncArbiter{queue: queue, msgs: rx, threads: threads});
SyncAddress::new(tx)
}
}
impl<A> Actor for SyncArbiter<A> where A: Actor<Context=SyncContext<A>>{
type Context = Context<Self>;
}
#[doc(hidden)]
impl<A> Future for SyncArbiter<A> where A: Actor<Context=SyncContext<A>>
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
self.queue.push(SyncContextProtocol::Envelope(msg));
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => {
for _ in 0..self.threads {
self.queue.push(SyncContextProtocol::Stop);
}
return Ok(Async::Ready(()))
},
}
}
Ok(Async::NotReady)
}
}
impl<A> ToEnvelope<A> for SyncContext<A>
where A: Actor<Context=SyncContext<A>>,
{
fn pack<M>(msg: M, tx: Option<SyncSender<Result<M::Item, M::Error>>>) -> Envelope<A>
where A: Handler<M>,
M: ResponseType + Send + 'static,
<M as ResponseType>::Item: Send,
<M as ResponseType>::Error: Send
{
Envelope::new(SyncEnvelope::new(msg, tx))
}
}
enum SyncContextProtocol<A> where A: Actor<Context=SyncContext<A>> {
Stop,
Envelope(Envelope<A>),
}
pub struct SyncContext<A> where A: Actor<Context=SyncContext<A>> {
act: A,
core: Option<Core>,
queue: Arc<MsQueue<SyncContextProtocol<A>>>,
stopping: bool,
state: ActorState,
}
impl<A> SyncContext<A> where A: Actor<Context=Self> {
fn new(act: A, queue: Arc<MsQueue<SyncContextProtocol<A>>>) -> Self {
SyncContext {
act: act,
core: None,
queue: queue,
stopping: false,
state: ActorState::Started,
}
}
fn run(&mut self) {
let ctx: &mut SyncContext<A> = unsafe {
std::mem::transmute(self as &mut SyncContext<A>)
};
A::started(&mut self.act, ctx);
self.state = ActorState::Running;
loop {
match self.queue.pop() {
SyncContextProtocol::Stop => {
self.state = ActorState::Stopping;
A::stopping(&mut self.act, ctx);
self.state = ActorState::Stopped;
A::stopped(&mut self.act, ctx);
return
},
SyncContextProtocol::Envelope(mut env) => {
env.handle(&mut self.act, ctx)
},
}
if self.stopping {
A::stopping(&mut self.act, ctx);
self.state = ActorState::Stopped;
A::stopped(&mut self.act, ctx);
return
}
}
}
}
impl<A> ActorContext for SyncContext<A> where A: Actor<Context=Self>
{
fn stop(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn terminate(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn state(&self) -> ActorState {
self.state
}
}
pub(crate) struct SyncEnvelope<A, M>
where A: Actor<Context=SyncContext<A>> + Handler<M>, M: ResponseType,
{
msg: Option<M>,
tx: Option<SyncSender<Result<M::Item, M::Error>>>,
actor: PhantomData<A>,
}
impl<A, M> SyncEnvelope<A, M>
where A: Actor<Context=SyncContext<A>> + Handler<M>,
M: ResponseType,
{
pub fn new(msg: M, tx: Option<SyncSender<Result<M::Item, M::Error>>>) -> Self {
SyncEnvelope{msg: Some(msg), tx: tx, actor: PhantomData}
}
}
impl<A, M> EnvelopeProxy for SyncEnvelope<A, M>
where M: ResponseType + 'static,
A: Actor<Context=SyncContext<A>> + Handler<M>,
{
type Actor = A;
fn handle(&mut self, act: &mut Self::Actor, ctx: &mut <Self::Actor as Actor>::Context)
{
if let Some(msg) = self.msg.take() {
let mut response = <Self::Actor as Handler<M>>::handle(act, msg, ctx);
let result = if response.is_async() {
response.result().unwrap()
} else {
if ctx.core.is_none() {
ctx.core = Some(Core::new().unwrap());
}
let ctx_ptr = ctx as *mut _;
let core = ctx.core.as_mut().unwrap();
core.run(ResponseFuture{
fut: response,
act: act as *mut _,
ctx: ctx_ptr})
};
if let Some(tx) = self.tx.take() {
let _ = tx.send(result);
}
}
}
}
struct ResponseFuture<A, M> where A: Actor<Context=SyncContext<A>>, M: ResponseType {
fut: Response<A, M>,
act: *mut A,
ctx: *mut SyncContext<A>
}
impl<A, M> Future for ResponseFuture<A, M>
where A: Actor<Context=SyncContext<A>>,
M: ResponseType
{
type Item = M::Item;
type Error = M::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>
{
let act = unsafe{ &mut *self.act };
let ctx = unsafe{ &mut *self.ctx };
self.fut.poll_response(act, ctx)
}
}