use std::{fmt, future::Future, pin::Pin, sync::Arc};
pub use tokio::sync::oneshot::Sender as OneshotSender;
use crate::actor::{Actor, AsyncContext};
use crate::address::Addr;
use crate::fut::{ActorFuture, ActorFutureExt, LocalBoxActorFuture};
#[allow(unused_variables)]
pub trait Handler<M>
where
Self: Actor,
M: Message,
{
type Result: MessageResponse<Self, M>;
fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
}
pub trait Message {
type Result: 'static;
}
impl<M> Message for Arc<M>
where
M: Message,
{
type Result = M::Result;
}
impl<M> Message for Box<M>
where
M: Message,
{
type Result = M::Result;
}
pub struct MessageResult<M: Message>(pub M::Result);
pub struct AtomicResponse<A, T>(ResponseActFuture<A, T>);
impl<A, T> AtomicResponse<A, T> {
pub fn new(fut: ResponseActFuture<A, T>) -> Self {
AtomicResponse(fut)
}
}
impl<A, M> MessageResponse<A, M> for AtomicResponse<A, M::Result>
where
A: Actor,
M: Message,
A::Context: AsyncContext<A>,
{
fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
ctx.wait(self.0.map(|res, _, _| tx.send(res)));
}
}
pub type ResponseActFuture<A, I> = LocalBoxActorFuture<A, I>;
pub type ResponseFuture<I> = Pin<Box<dyn Future<Output = I>>>;
pub trait MessageResponse<A: Actor, M: Message> {
fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>);
}
impl<A, M> MessageResponse<A, M> for MessageResult<M>
where
A: Actor,
M: Message,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
tx.send(self.0)
}
}
impl<A, M, I, E> MessageResponse<A, M> for Result<I, E>
where
A: Actor,
M: Message<Result = Self>,
I: 'static,
E: 'static,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
tx.send(self)
}
}
impl<A, M, I> MessageResponse<A, M> for Arc<I>
where
A: Actor,
M: Message<Result = Self>,
I: 'static,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
tx.send(self)
}
}
impl<A, M, I> MessageResponse<A, M> for Option<I>
where
A: Actor,
M: Message<Result = Self>,
I: 'static,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
tx.send(self)
}
}
impl<A, M, B> MessageResponse<A, M> for Addr<B>
where
A: Actor,
M: Message<Result = Self>,
B: Actor,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
tx.send(self)
}
}
impl<A, M> MessageResponse<A, M> for ResponseActFuture<A, M::Result>
where
A: Actor,
M: Message,
A::Context: AsyncContext<A>,
{
fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
ctx.spawn(self.map(|res, _, _| tx.send(res)));
}
}
impl<A, M> MessageResponse<A, M> for ResponseFuture<M::Result>
where
A: Actor,
M: Message,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
actix_rt::spawn(async { tx.send(self.await) });
}
}
enum ResponseTypeItem<I> {
Result(I),
Fut(Pin<Box<dyn Future<Output = I>>>),
}
pub struct Response<I> {
item: ResponseTypeItem<I>,
}
impl<I> fmt::Debug for Response<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut fmt = fmt.debug_struct("Response");
match self.item {
ResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
ResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
}
.finish()
}
}
impl<I> Response<I> {
pub fn fut<T>(fut: T) -> Self
where
T: Future<Output = I> + 'static,
{
Self {
item: ResponseTypeItem::Fut(Box::pin(fut)),
}
}
pub fn reply(val: I) -> Self {
Self {
item: ResponseTypeItem::Result(val),
}
}
}
impl<A, M> MessageResponse<A, M> for Response<M::Result>
where
A: Actor,
M: Message,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
match self.item {
ResponseTypeItem::Fut(fut) => {
actix_rt::spawn(async { tx.send(fut.await) });
}
ResponseTypeItem::Result(res) => tx.send(res),
}
}
}
enum ActorResponseTypeItem<A, I> {
Result(I),
Fut(Pin<Box<dyn ActorFuture<A, Output = I>>>),
}
pub struct ActorResponse<A, I> {
item: ActorResponseTypeItem<A, I>,
}
impl<A, I> fmt::Debug for ActorResponse<A, I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut fmt = fmt.debug_struct("ActorResponse");
match self.item {
ActorResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
ActorResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
}
.finish()
}
}
impl<A: Actor, I> ActorResponse<A, I> {
pub fn reply(val: I) -> Self {
Self {
item: ActorResponseTypeItem::Result(val),
}
}
pub fn r#async<T>(fut: T) -> Self
where
T: ActorFuture<A, Output = I> + 'static,
{
Self {
item: ActorResponseTypeItem::Fut(Box::pin(fut)),
}
}
}
impl<A, M> MessageResponse<A, M> for ActorResponse<A, M::Result>
where
A: Actor,
M: Message,
A::Context: AsyncContext<A>,
{
fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
match self.item {
ActorResponseTypeItem::Fut(fut) => {
let fut = fut.map(|res, _, _| tx.send(res));
ctx.spawn(fut);
}
ActorResponseTypeItem::Result(res) => tx.send(res),
}
}
}
macro_rules! SIMPLE_RESULT {
($type:ty) => {
impl<A, M> MessageResponse<A, M> for $type
where
A: Actor,
M: Message<Result = $type>,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<$type>>) {
tx.send(self)
}
}
};
}
SIMPLE_RESULT!(());
SIMPLE_RESULT!(u8);
SIMPLE_RESULT!(u16);
SIMPLE_RESULT!(u32);
SIMPLE_RESULT!(u64);
SIMPLE_RESULT!(usize);
SIMPLE_RESULT!(i8);
SIMPLE_RESULT!(i16);
SIMPLE_RESULT!(i32);
SIMPLE_RESULT!(i64);
SIMPLE_RESULT!(isize);
SIMPLE_RESULT!(f32);
SIMPLE_RESULT!(f64);
SIMPLE_RESULT!(String);
SIMPLE_RESULT!(bool);
trait OneshotSend<M> {
fn send(self, msg: M);
}
impl<M> OneshotSend<M> for Option<OneshotSender<M>> {
fn send(self, msg: M) {
if let Some(tx) = self {
let _ = tx.send(msg);
}
}
}