use std::future::{self, Future};
use std::sync::Arc;
use tracing::debug;
use crate::actor::Actor;
use crate::channel::oneshot;
use crate::errors::{BoxError, ErrorReport, SendError};
mod result;
pub use result::MessageResult;
mod future_result;
pub use future_result::FutureMessageResult;
pub trait Message: Send + 'static {
type Result: Send + 'static;
}
pub trait Handler<M>: Actor
where
M: Message,
{
type Result: MessageResponse<Self, M>;
fn handle(
&mut self,
msg: M,
ctx: &mut Self::Context,
) -> impl Future<Output = Self::Result> + Send;
}
pub trait MessageResponse<A, M>: Send
where
A: Actor,
M: Message,
{
fn handle(
self,
ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send;
}
impl Message for () {
type Result = ();
}
impl<M> Message for Box<M>
where
M: Message,
{
type Result = M::Result;
}
impl<M> Message for Arc<M>
where
M: Message + Sync,
{
type Result = M::Result;
}
impl<A, M, T, E> MessageResponse<A, M> for Result<T, E>
where
A: Actor,
M: Message<Result = Self>,
T: Send,
E: Into<BoxError> + Send,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
if let Err(SendError::Closed(Err(e))) = tx.send(self) {
debug!(
"Could not send the result back to the sender since the channel is closed, \
log the dropped error: {}",
e.into().report()
);
}
}
future::ready(())
}
}
impl<A, M, T> MessageResponse<A, M> for Option<T>
where
A: Actor,
M: Message<Result = Self>,
T: Send,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
let _ = tx.send(self);
}
future::ready(())
}
}
impl<A, M, T> MessageResponse<A, M> for Box<T>
where
A: Actor,
M: Message<Result = Self>,
T: Send,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
let _ = tx.send(self);
}
future::ready(())
}
}
impl<A, M, T> MessageResponse<A, M> for Arc<T>
where
A: Actor,
M: Message<Result = Self>,
T: Send + Sync,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
let _ = tx.send(self);
}
future::ready(())
}
}
impl<A, M, T> MessageResponse<A, M> for Vec<T>
where
A: Actor,
M: Message<Result = Self>,
T: Send,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
let _ = tx.send(self);
}
future::ready(())
}
}
macro_rules! impl_message_response_for {
($type:ty) => {
impl<A, M> MessageResponse<A, M> for $type
where
A: Actor,
M: Message<Result = Self>,
{
fn handle(
self,
_ctx: &mut A::Context,
tx: Option<oneshot::Sender<M::Result>>,
) -> impl Future<Output = ()> + Send {
if let Some(tx) = tx {
let _ = tx.send(self);
}
future::ready(())
}
}
};
}
impl_message_response_for!(());
impl_message_response_for!(bool);
impl_message_response_for!(i8);
impl_message_response_for!(i16);
impl_message_response_for!(i32);
impl_message_response_for!(i64);
impl_message_response_for!(isize);
impl_message_response_for!(u8);
impl_message_response_for!(u16);
impl_message_response_for!(u32);
impl_message_response_for!(u64);
impl_message_response_for!(usize);
impl_message_response_for!(f32);
impl_message_response_for!(f64);
impl_message_response_for!(char);
impl_message_response_for!(String);
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use pretty_assertions::assert_eq;
use super::*;
use crate::context::Context;
#[derive(Debug)]
struct A;
impl Actor for A {
type Context = Context<Self>;
type Error = anyhow::Error;
}
struct M<T>(PhantomData<T>);
impl<T> Message for M<T>
where
T: Send + 'static,
{
type Result = T;
}
async fn roundtrip<R>(value: R) -> R
where
R: MessageResponse<A, M<R>> + Send + 'static,
{
let mut ctx = Context::<A>::with_capacity("test".into(), 1);
let (tx, rx) = oneshot::channel::<R>();
value.handle(&mut ctx, Some(tx)).await;
rx.await.unwrap()
}
#[tokio::test]
async fn test_message_response() {
assert_eq!(roundtrip(()).await, ());
assert_eq!(roundtrip(true).await, true);
assert_eq!(roundtrip(-1_i32).await, -1);
assert_eq!(roundtrip(42_u64).await, 42);
assert_eq!(roundtrip('x').await, 'x');
assert_eq!(roundtrip(String::from("hello")).await, "hello");
assert_eq!(roundtrip::<Result<i32, String>>(Ok(10)).await, Ok(10));
assert_eq!(
roundtrip::<Result<i32, String>>(Err(String::from("err"))).await,
Err(String::from("err"))
);
assert_eq!(roundtrip(Some(42_u32)).await, Some(42));
assert_eq!(roundtrip(None::<u32>).await, None);
assert_eq!(*roundtrip(Box::new(7_u64)).await, 7);
assert_eq!(*roundtrip(Arc::new(String::from("hi"))).await, "hi");
assert_eq!(roundtrip(vec![1_u8, 2, 3]).await, vec![1, 2, 3]);
}
}