use std::pin::Pin;
use async_trait::async_trait;
use crate::{
cfg_runtime,
prelude::{Actor, Context, Handler, Notifiable},
};
#[async_trait]
pub(crate) trait EnvelopeProxy<A: Actor + Unpin>: Send + 'static {
async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>);
}
pub(crate) struct MessageEnvelope<A: Handler<IN>, IN> {
data: Option<(IN, async_oneshot::Sender<A::Result>)>,
}
impl<A, IN> MessageEnvelope<A, IN>
where
A: Handler<IN>,
{
pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
Self {
data: Some((message, response)),
}
}
}
#[async_trait]
impl<A, IN> EnvelopeProxy<A> for MessageEnvelope<A, IN>
where
A: Handler<IN> + Actor + Send + Unpin,
IN: Send + 'static,
A::Result: Send + Sync + 'static,
{
async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
let (message, mut response) = self.data.take().expect("`Envelope::handle` called twice");
let result = actor
.get_mut()
.handle(message, Pin::into_inner(context))
.await;
let _ = response.send(result);
}
}
pub(crate) struct NotificationEnvelope<A: Notifiable<IN>, IN> {
message: Option<IN>,
_marker: std::marker::PhantomData<A>,
}
impl<A, IN> NotificationEnvelope<A, IN>
where
A: Notifiable<IN>,
{
pub(crate) fn new(message: IN) -> Self {
Self {
message: Some(message),
_marker: std::marker::PhantomData,
}
}
}
#[async_trait]
impl<A, IN> EnvelopeProxy<A> for NotificationEnvelope<A, IN>
where
A: Notifiable<IN> + Actor + Send + Unpin,
IN: Send + 'static,
{
async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
let message = self
.message
.take()
.expect("`Envelope::handle` called twice");
actor
.get_mut()
.notify(message, Pin::into_inner(context))
.await;
}
}
cfg_runtime! {
use crate::handler::Coroutine;
pub(crate) struct CoroutineEnvelope<A: Coroutine<IN>, IN> {
data: Option<(IN, async_oneshot::Sender<A::Result>)>,
}
impl<A, IN> CoroutineEnvelope<A, IN>
where
A: Coroutine<IN>,
{
pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
Self {
data: Some((message, response)),
}
}
}
#[async_trait]
impl<A, IN> EnvelopeProxy<A> for CoroutineEnvelope<A, IN>
where
A: Coroutine<IN> + Actor + Send + Unpin,
IN: Send + 'static,
A::Result: Send + Sync + 'static,
{
async fn handle(&mut self, actor: Pin<&mut A>, _context: Pin<&Context<A>>) {
let actor = Pin::into_inner(actor).clone();
let (message, mut response) = self
.data
.take()
.expect("`Envelope::handle` called twice");
crate::runtime::spawn(async move {
let result = actor.calculate(message).await;
let _ = response.send(result);
});
}
}
}