messages/
envelope.rs

1//! Envelope is an entity capable of encapsulating the sent message
2//! together with a way to report the result back to the sender (if needed).
3//! It consists of two parts:
4//!
5//! - `EnvelopeProxy` trait that is being used by the `Context` to
6//!   pass the message to the actor (which is only accessable by
7//!   the `Context` itself).
8//! - `MessageEnvelope` and `NotificationEnvelope` structures that
9//!   actually have the message inside of them and implement `EnvelopeProxy`.
10//!
11//! The way it works is as follows:
12//!
13//! - User calls `Address::send` / `Address::notify` with a message that
14//!   can be handled by the corresponding `Actor` type.
15//! - `Address` creates an `*Envelope` object and converts it to the
16//!   `Box<dyn EnvelopeProxy>`. Information about the message type is now
17//!   elided and we can consider different messages to be of the same type.
18//! - This "envelope" is sent to the `Context` through a channel.
19//! - Once `Context` processes envelope, it creates `Pin`s to both itself
20//!   and `Actor` and calls `EnvelopeProxy::handle` to process the message.
21
22use std::pin::Pin;
23
24use async_trait::async_trait;
25
26use crate::{
27    cfg_runtime,
28    prelude::{Actor, Context, Handler, Notifiable},
29};
30
31#[async_trait]
32pub(crate) trait EnvelopeProxy<A: Actor + Unpin>: Send + 'static {
33    async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>);
34}
35
36pub(crate) struct MessageEnvelope<A: Handler<IN>, IN> {
37    data: Option<(IN, async_oneshot::Sender<A::Result>)>,
38}
39
40impl<A, IN> MessageEnvelope<A, IN>
41where
42    A: Handler<IN>,
43{
44    pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
45        Self {
46            data: Some((message, response)),
47        }
48    }
49}
50
51#[async_trait]
52impl<A, IN> EnvelopeProxy<A> for MessageEnvelope<A, IN>
53where
54    A: Handler<IN> + Actor + Send + Unpin,
55    IN: Send + 'static,
56    A::Result: Send + Sync + 'static,
57{
58    async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
59        let (message, mut response) = self.data.take().expect("`Envelope::handle` called twice");
60
61        let result = actor
62            .get_mut()
63            .handle(message, Pin::into_inner(context))
64            .await;
65        let _ = response.send(result);
66    }
67}
68
69pub(crate) struct NotificationEnvelope<A: Notifiable<IN>, IN> {
70    message: Option<IN>,
71    _marker: std::marker::PhantomData<A>,
72}
73
74impl<A, IN> NotificationEnvelope<A, IN>
75where
76    A: Notifiable<IN>,
77{
78    pub(crate) fn new(message: IN) -> Self {
79        Self {
80            message: Some(message),
81            _marker: std::marker::PhantomData,
82        }
83    }
84}
85
86#[async_trait]
87impl<A, IN> EnvelopeProxy<A> for NotificationEnvelope<A, IN>
88where
89    A: Notifiable<IN> + Actor + Send + Unpin,
90    IN: Send + 'static,
91{
92    async fn handle(&mut self, actor: Pin<&mut A>, context: Pin<&Context<A>>) {
93        let message = self
94            .message
95            .take()
96            .expect("`Envelope::handle` called twice");
97
98        actor
99            .get_mut()
100            .notify(message, Pin::into_inner(context))
101            .await;
102    }
103}
104
105cfg_runtime! {
106    use crate::handler::Coroutine;
107
108    pub(crate) struct CoroutineEnvelope<A: Coroutine<IN>, IN> {
109        data: Option<(IN, async_oneshot::Sender<A::Result>)>,
110    }
111
112    impl<A, IN> CoroutineEnvelope<A, IN>
113    where
114        A: Coroutine<IN>,
115    {
116        pub(crate) fn new(message: IN, response: async_oneshot::Sender<A::Result>) -> Self {
117            Self {
118                data: Some((message, response)),
119            }
120        }
121    }
122
123    #[async_trait]
124    impl<A, IN> EnvelopeProxy<A> for CoroutineEnvelope<A, IN>
125    where
126        A: Coroutine<IN> + Actor + Send + Unpin,
127        IN: Send + 'static,
128        A::Result: Send + Sync + 'static,
129    {
130        async fn handle(&mut self, actor: Pin<&mut A>, _context: Pin<&Context<A>>) {
131            let actor = Pin::into_inner(actor).clone();
132            let (message, mut response) = self
133                .data
134                .take()
135                .expect("`Envelope::handle` called twice");
136
137            crate::runtime::spawn(async move {
138                let result = actor.calculate(message).await;
139                let _ = response.send(result);
140            });
141        }
142    }
143}