use std::marker::PhantomData;
use std::sync::Arc;
use std::thread;
use crossbeam_channel as cb_channel;
use futures::sync::oneshot::Sender as SyncSender;
use futures::{Async, Future, Poll, Stream};
use actor::{Actor, ActorContext, ActorState, Running};
use address::channel;
use address::{Addr, AddressReceiver, Envelope, EnvelopeProxy, ToEnvelope};
use arbiter::Arbiter;
use context::Context;
use handler::{Handler, Message, MessageResponse};
use system::System;
pub struct SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
queue: cb_channel::Sender<SyncContextProtocol<A>>,
msgs: AddressReceiver<A>,
threads: usize,
}
impl<A> SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
pub fn start<F>(threads: usize, factory: F) -> Addr<A>
where
F: Fn() -> A + Send + Sync + 'static,
{
let factory = Arc::new(factory);
let (sender, receiver) = cb_channel::unbounded();
for _ in 0..threads {
let f = Arc::clone(&factory);
let sys = System::current();
let actor_queue = receiver.clone();
thread::spawn(move || {
System::set_current(sys);
SyncContext::new(f, actor_queue).run();
});
}
let (tx, rx) = channel::channel(0);
Arbiter::spawn(SyncArbiter {
queue: sender,
msgs: rx,
threads,
});
Addr::new(tx)
}
}
impl<A> Actor for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Context = Context<Self>;
}
#[doc(hidden)]
impl<A> Future for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.msgs.poll() {
Ok(Async::Ready(Some(msg))) => {
self.queue.send(SyncContextProtocol::Envelope(msg))
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => unreachable!(),
}
}
if self.msgs.connected() {
Ok(Async::NotReady)
} else {
for _ in 0..self.threads {
self.queue.send(SyncContextProtocol::Stop);
}
Ok(Async::Ready(()))
}
}
}
impl<A, M> ToEnvelope<A, M> for SyncContext<A>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send + 'static,
M::Result: Send,
{
fn pack(msg: M, tx: Option<SyncSender<M::Result>>) -> Envelope<A> {
Envelope::with_proxy(Box::new(SyncContextEnvelope::new(msg, tx)))
}
}
enum SyncContextProtocol<A>
where
A: Actor<Context = SyncContext<A>>,
{
Stop,
Envelope(Envelope<A>),
}
pub struct SyncContext<A>
where
A: Actor<Context = SyncContext<A>>,
{
act: A,
queue: cb_channel::Receiver<SyncContextProtocol<A>>,
stopping: bool,
state: ActorState,
factory: Arc<Fn() -> A>,
}
impl<A> SyncContext<A>
where
A: Actor<Context = Self>,
{
fn new(
factory: Arc<Fn() -> A>, queue: cb_channel::Receiver<SyncContextProtocol<A>>,
) -> Self {
let act = factory();
SyncContext {
act,
queue,
factory,
stopping: false,
state: ActorState::Started,
}
}
fn run(&mut self) {
let ctx: &mut SyncContext<A> = unsafe { &mut *(self as *mut _) };
A::started(&mut self.act, ctx);
self.state = ActorState::Running;
loop {
match self.queue.recv() {
Some(SyncContextProtocol::Stop) => {
self.state = ActorState::Stopping;
if A::stopping(&mut self.act, ctx) != Running::Stop {
warn!("stopping method is not supported for sync actors");
}
self.state = ActorState::Stopped;
A::stopped(&mut self.act, ctx);
return;
}
Some(SyncContextProtocol::Envelope(mut env)) => {
env.handle(&mut self.act, ctx);
}
None => {
self.state = ActorState::Stopping;
if A::stopping(&mut self.act, ctx) != Running::Stop {
warn!("stopping method is not supported for sync actors");
}
self.state = ActorState::Stopped;
A::stopped(&mut self.act, ctx);
return;
}
}
if self.stopping {
self.stopping = false;
A::stopping(&mut self.act, ctx);
self.state = ActorState::Stopped;
A::stopped(&mut self.act, ctx);
self.state = ActorState::Started;
self.act = (*self.factory)();
A::started(&mut self.act, ctx);
self.state = ActorState::Running;
}
}
}
}
impl<A> ActorContext for SyncContext<A>
where
A: Actor<Context = Self>,
{
fn stop(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn terminate(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn state(&self) -> ActorState {
self.state
}
}
pub(crate) struct SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
{
msg: Option<M>,
tx: Option<SyncSender<M::Result>>,
actor: PhantomData<A>,
}
unsafe impl<A, M> Send for SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
{}
impl<A, M> SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
M::Result: Send,
{
pub fn new(msg: M, tx: Option<SyncSender<M::Result>>) -> Self {
SyncContextEnvelope {
tx,
msg: Some(msg),
actor: PhantomData,
}
}
}
impl<A, M> EnvelopeProxy for SyncContextEnvelope<A, M>
where
M: Message + Send + 'static,
M::Result: Send,
A: Actor<Context = SyncContext<A>> + Handler<M>,
{
type Actor = A;
fn handle(&mut self, act: &mut A, ctx: &mut A::Context) {
let tx = self.tx.take();
if tx.is_some() && tx.as_ref().unwrap().is_canceled() {
return;
}
if let Some(msg) = self.msg.take() {
<A as Handler<M>>::handle(act, msg, ctx).handle(ctx, tx)
}
}
}