use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{task, thread};
use actix_rt::System;
use crossbeam_channel as cb_channel;
use futures::channel::oneshot::Sender as SyncSender;
use futures::{Future, StreamExt};
use log::warn;
use crate::actor::{Actor, ActorContext, ActorState, Running};
use crate::address::channel;
use crate::address::{Addr, AddressReceiver, Envelope, EnvelopeProxy, ToEnvelope};
use crate::context::Context;
use crate::handler::{Handler, Message, MessageResponse};
pub struct SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
queue: Option<cb_channel::Sender<Envelope<A>>>,
msgs: AddressReceiver<A>,
}
impl<A> SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
pub fn start<F>(threads: usize, factory: F) -> Addr<A>
where
F: Fn() -> A + Send + Sync + 'static,
{
let factory = Arc::new(factory);
let (sender, receiver) = cb_channel::unbounded();
for _ in 0..threads {
let f = Arc::clone(&factory);
let sys = System::current();
let actor_queue = receiver.clone();
thread::spawn(move || {
System::set_current(sys);
SyncContext::new(f, actor_queue).run();
});
}
let (tx, rx) = channel::channel(0);
actix_rt::spawn(Self {
queue: Some(sender),
msgs: rx,
});
Addr::new(tx)
}
}
impl<A> Actor for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Context = Context<Self>;
}
#[doc(hidden)]
impl<A> Future for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = unsafe { self.get_unchecked_mut() };
loop {
match this.msgs.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => {
if let Some(ref queue) = this.queue {
assert!(queue.send(msg).is_ok());
}
}
Poll::Pending => break,
Poll::Ready(None) => unreachable!(),
}
}
if this.msgs.connected() {
Poll::Pending
} else {
this.queue = None;
Poll::Ready(())
}
}
}
impl<A, M> ToEnvelope<A, M> for SyncContext<A>
where
A: Actor<Context = Self> + Handler<M>,
M: Message + Send + 'static,
M::Result: Send,
{
fn pack(msg: M, tx: Option<SyncSender<M::Result>>) -> Envelope<A> {
Envelope::with_proxy(Box::new(SyncContextEnvelope::new(msg, tx)))
}
}
pub struct SyncContext<A>
where
A: Actor<Context = SyncContext<A>>,
{
act: Option<A>,
queue: cb_channel::Receiver<Envelope<A>>,
stopping: bool,
state: ActorState,
factory: Arc<dyn Fn() -> A>,
}
impl<A> SyncContext<A>
where
A: Actor<Context = Self>,
{
fn new(
factory: Arc<dyn Fn() -> A>,
queue: cb_channel::Receiver<Envelope<A>>,
) -> Self {
let act = factory();
Self {
queue,
factory,
act: Some(act),
stopping: false,
state: ActorState::Started,
}
}
fn run(&mut self) {
let mut act = self.act.take().unwrap();
A::started(&mut act, self);
self.state = ActorState::Running;
loop {
match self.queue.recv() {
Ok(mut env) => {
env.handle(&mut act, self);
}
Err(_) => {
self.state = ActorState::Stopping;
if A::stopping(&mut act, self) != Running::Stop {
warn!("stopping method is not supported for sync actors");
}
self.state = ActorState::Stopped;
A::stopped(&mut act, self);
return;
}
}
if self.stopping {
self.stopping = false;
A::stopping(&mut act, self);
self.state = ActorState::Stopped;
A::stopped(&mut act, self);
self.state = ActorState::Started;
act = (*self.factory)();
A::started(&mut act, self);
self.state = ActorState::Running;
}
}
}
}
impl<A> ActorContext for SyncContext<A>
where
A: Actor<Context = Self>,
{
fn stop(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn terminate(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn state(&self) -> ActorState {
self.state
}
}
pub(crate) struct SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
{
msg: Option<M>,
tx: Option<SyncSender<M::Result>>,
actor: PhantomData<A>,
}
unsafe impl<A, M> Send for SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
{
}
impl<A, M> SyncContextEnvelope<A, M>
where
A: Actor<Context = SyncContext<A>> + Handler<M>,
M: Message + Send,
M::Result: Send,
{
pub fn new(msg: M, tx: Option<SyncSender<M::Result>>) -> Self {
Self {
tx,
msg: Some(msg),
actor: PhantomData,
}
}
}
impl<A, M> EnvelopeProxy for SyncContextEnvelope<A, M>
where
M: Message + Send + 'static,
M::Result: Send,
A: Actor<Context = SyncContext<A>> + Handler<M>,
{
type Actor = A;
fn handle(&mut self, act: &mut A, ctx: &mut A::Context) {
let tx = self.tx.take();
if tx.is_some() && tx.as_ref().unwrap().is_canceled() {
return;
}
if let Some(msg) = self.msg.take() {
<A as Handler<M>>::handle(act, msg, ctx).handle(ctx, tx)
}
}
}