use dyn_clone::DynClone;
use std::{future::Future, pin::Pin};
use crate::{Actor, Handler, channel, context::Core, error::ActorError};
use super::{Addr, Message, Payload, Result, weak_sender::WeakSender};
pub struct Sender<M: Message<Response = ()>> {
core: Core,
send_fn: Box<dyn SenderFn<M>>,
try_send_fn: Box<dyn TrySenderFn<M>>,
force_send_fn: Box<dyn ForceSenderFn<M>>,
downgrade_fn: Box<dyn DowngradeFn<M>>,
}
impl<M: Message<Response = ()>> Sender<M> {
pub fn send(&self, msg: M) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
self.send_fn.send(msg)
}
pub(crate) fn force_send(&self, msg: M) -> Result<()> {
self.force_send_fn.send(msg)
}
pub fn try_send(&self, msg: M) -> Result<()> {
self.try_send_fn.try_send(msg)
}
pub fn downgrade(&self) -> WeakSender<M> {
self.downgrade_fn.downgrade()
}
pub(crate) fn new<A>(tx: channel::Tx<A>, core: Core) -> Self
where
A: Actor + Handler<M>,
{
let weak_tx = tx.downgrade();
let send_fn: Box<dyn SenderFn<M>> = {
let tx = tx.clone();
let core = core.clone();
Box::new(
move |msg: M| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let tx = tx.clone();
let core = core.clone();
Box::pin(async move {
if core.stopped() {
return Err(ActorError::ChannelClosed);
}
tx.send(Payload::task(move |actor, ctx| {
Box::pin(Handler::handle(&mut *actor, ctx, msg))
}))
.await
.map_err(|_err| ActorError::ChannelClosed)?;
Ok(())
})
},
)
};
let try_send_fn: Box<dyn TrySenderFn<M>> = {
let tx = tx.clone();
let core = core.clone();
Box::new(move |msg: M| {
if core.stopped() {
return Err(ActorError::ChannelClosed);
}
tx.try_send(Payload::task(move |actor, ctx| {
Box::pin(Handler::handle(&mut *actor, ctx, msg))
}))
.map_err(|_err| ActorError::ChannelClosed)?;
Ok(())
})
};
let force_send_fn: Box<dyn ForceSenderFn<M>> = {
let core = core.clone();
Box::new(move |msg| {
if core.stopped() {
return Err(ActorError::ChannelClosed);
}
tx.force_send(Payload::task(move |actor, ctx| {
Box::pin(Handler::handle(&mut *actor, ctx, msg))
}))
.map_err(|_err| ActorError::ChannelClosed)?;
Ok(())
})
};
let downgrade_fn: Box<dyn DowngradeFn<M>> = {
let core = core.clone();
Box::new(move || {
let weak_tx = weak_tx.clone();
let core = core.clone();
WeakSender {
core: core.clone(),
upgrade: Box::new(move || {
weak_tx.upgrade().map(|tx| Sender::new(tx, core.clone()))
}),
}
})
};
Sender {
core,
send_fn,
try_send_fn,
force_send_fn,
downgrade_fn,
}
}
}
impl<M: Message<Response = ()>> Sender<M> {
pub fn running(&self) -> bool {
self.core.running()
}
pub fn stopped(&self) -> bool {
self.core.stopped()
}
}
trait ForceSenderFn<M: Message<Response = ()>>: 'static + Send + Sync + DynClone {
fn send(&self, msg: M) -> Result<()>;
}
impl<F, M: Message<Response = ()>> ForceSenderFn<M> for F
where
F: Fn(M) -> Result<()>,
F: 'static + Send + Sync + Clone,
{
fn send(&self, msg: M) -> Result<()> {
self(msg)
}
}
trait SenderFn<M: Message<Response = ()>>: 'static + Send + Sync + DynClone {
fn send(&self, msg: M) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
}
impl<F, M: Message<Response = ()>> SenderFn<M> for F
where
F: Fn(M) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>,
F: 'static + Send + Sync + Clone,
{
fn send(&self, msg: M) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
self(msg)
}
}
trait TrySenderFn<M: Message<Response = ()>>: 'static + Send + Sync + DynClone {
fn try_send(&self, msg: M) -> Result<()>;
}
impl<F, M: Message<Response = ()>> TrySenderFn<M> for F
where
F: Fn(M) -> Result<()>,
F: 'static + Send + Sync + Clone,
{
fn try_send(&self, msg: M) -> Result<()> {
self(msg)
}
}
impl<M: Message<Response = ()>, A> From<Addr<A>> for Sender<M>
where
A: Actor + Handler<M>,
{
fn from(addr: Addr<A>) -> Self {
Sender::new(addr.tx.clone(), addr.core)
}
}
impl<M: Message<Response = ()>, A> From<&Addr<A>> for Sender<M>
where
A: Actor + Handler<M>,
{
fn from(addr: &Addr<A>) -> Self {
Sender::new(addr.tx.clone(), addr.core.clone())
}
}
impl<M: Message<Response = ()>> Clone for Sender<M> {
fn clone(&self) -> Self {
Sender {
core: self.core.clone(),
send_fn: dyn_clone::clone_box(&*self.send_fn),
try_send_fn: dyn_clone::clone_box(&*self.try_send_fn),
force_send_fn: dyn_clone::clone_box(&*self.force_send_fn),
downgrade_fn: dyn_clone::clone_box(&*self.downgrade_fn),
}
}
}
trait DowngradeFn<M: Message<Response = ()>>: Send + Sync + 'static + DynClone {
fn downgrade(&self) -> WeakSender<M>;
}
impl<F, M> DowngradeFn<M> for F
where
F: Fn() -> WeakSender<M>,
F: 'static + Send + Sync + Clone,
M: Message<Response = ()>,
{
fn downgrade(&self) -> WeakSender<M> {
self()
}
}