Skip to main content

acktor/
message.rs

1//! Message passing between actors.
2//!
3//! A [`Message`] is a type that can be sent between actors. It is a trait that can be derived
4//! for custom types. A specific message type can be sent to an actor only if the actor implements
5//! the corresponding [`Handler`] trait for the message type, which describes the action of the
6//! actor when it receives the message.
7//!
8
9use std::future::{self, Future};
10use std::sync::Arc;
11
12use tracing::debug;
13
14use crate::actor::Actor;
15use crate::channel::oneshot;
16use crate::error::{BoxError, ErrorReport, SendError};
17
18mod result;
19pub use result::MessageResult;
20
21mod future_result;
22pub use future_result::FutureMessageResult;
23
24#[cfg(feature = "identifier")]
25mod index;
26#[cfg(feature = "identifier")]
27#[cfg_attr(docsrs, doc(cfg(feature = "identifier")))]
28pub use index::MessageId;
29
30#[cfg(feature = "ipc")]
31mod remote;
32#[cfg(feature = "ipc")]
33#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
34pub use remote::BinaryMessage;
35
36/// Types that can be sent between actors.
37pub trait Message: Send + 'static {
38    /// The type of the response produced when this message is handled.
39    type Result: Send + 'static;
40}
41
42/// Types that can be sent as a response to a message.
43pub trait MessageResponse<A, M>: Send
44where
45    A: Actor,
46    M: Message,
47{
48    /// Handles the response.
49    fn handle(
50        self,
51        ctx: &mut A::Context,
52        tx: Option<oneshot::Sender<M::Result>>,
53    ) -> impl Future<Output = ()> + Send;
54}
55
56/// Describes how an actor handles a specific message type.
57pub trait Handler<M>: Actor
58where
59    M: Message,
60{
61    /// The return type of the handler, which must implement [`MessageResponse`].
62    type Result: MessageResponse<Self, M>;
63
64    /// Handles a message.
65    fn handle(
66        &mut self,
67        msg: M,
68        ctx: &mut Self::Context,
69    ) -> impl Future<Output = Self::Result> + Send;
70}
71
72impl Message for () {
73    type Result = ();
74}
75
76// implement Message trait for a few common wrapper types
77
78impl<M> Message for Box<M>
79where
80    M: Message,
81{
82    type Result = M::Result;
83}
84
85impl<M> Message for Arc<M>
86where
87    M: Message + Sync,
88{
89    type Result = M::Result;
90}
91
92// implement MessageResponse trait for a few common wrapper types
93
94impl<A, M, T, E> MessageResponse<A, M> for Result<T, E>
95where
96    A: Actor,
97    M: Message<Result = Self>,
98    T: Send,
99    E: Into<BoxError> + Send,
100{
101    fn handle(
102        self,
103        _ctx: &mut A::Context,
104        tx: Option<oneshot::Sender<M::Result>>,
105    ) -> impl Future<Output = ()> + Send {
106        if let Some(tx) = tx {
107            if let Err(SendError::Closed(Err(e))) = tx.send(self) {
108                debug!(
109                    "Could not send the result back to the sender since the channel is closed, \
110                    log the dropped error: {}",
111                    e.into().report()
112                );
113            }
114        }
115        // tx is None means the sender does not care about the result, so we simply drop it
116        future::ready(())
117    }
118}
119
120impl<A, M, T> MessageResponse<A, M> for Option<T>
121where
122    A: Actor,
123    M: Message<Result = Self>,
124    T: Send,
125{
126    fn handle(
127        self,
128        _ctx: &mut A::Context,
129        tx: Option<oneshot::Sender<M::Result>>,
130    ) -> impl Future<Output = ()> + Send {
131        if let Some(tx) = tx {
132            let _ = tx.send(self);
133        }
134        future::ready(())
135    }
136}
137
138impl<A, M, T> MessageResponse<A, M> for Box<T>
139where
140    A: Actor,
141    M: Message<Result = Self>,
142    T: Send,
143{
144    fn handle(
145        self,
146        _ctx: &mut A::Context,
147        tx: Option<oneshot::Sender<M::Result>>,
148    ) -> impl Future<Output = ()> + Send {
149        if let Some(tx) = tx {
150            let _ = tx.send(self);
151        }
152        future::ready(())
153    }
154}
155
156impl<A, M, T> MessageResponse<A, M> for Arc<T>
157where
158    A: Actor,
159    M: Message<Result = Self>,
160    T: Send + Sync,
161{
162    fn handle(
163        self,
164        _ctx: &mut A::Context,
165        tx: Option<oneshot::Sender<M::Result>>,
166    ) -> impl Future<Output = ()> + Send {
167        if let Some(tx) = tx {
168            let _ = tx.send(self);
169        }
170        future::ready(())
171    }
172}
173
174impl<A, M, T> MessageResponse<A, M> for Vec<T>
175where
176    A: Actor,
177    M: Message<Result = Self>,
178    T: Send,
179{
180    fn handle(
181        self,
182        _ctx: &mut A::Context,
183        tx: Option<oneshot::Sender<M::Result>>,
184    ) -> impl Future<Output = ()> + Send {
185        if let Some(tx) = tx {
186            let _ = tx.send(self);
187        }
188        future::ready(())
189    }
190}
191
192macro_rules! impl_message_response_for {
193    ($type:ty) => {
194        impl<A, M> MessageResponse<A, M> for $type
195        where
196            A: Actor,
197            M: Message<Result = Self>,
198        {
199            fn handle(
200                self,
201                _ctx: &mut A::Context,
202                tx: Option<oneshot::Sender<M::Result>>,
203            ) -> impl Future<Output = ()> + Send {
204                if let Some(tx) = tx {
205                    let _ = tx.send(self);
206                }
207                future::ready(())
208            }
209        }
210    };
211}
212
213impl_message_response_for!(());
214impl_message_response_for!(bool);
215impl_message_response_for!(i8);
216impl_message_response_for!(i16);
217impl_message_response_for!(i32);
218impl_message_response_for!(i64);
219impl_message_response_for!(isize);
220impl_message_response_for!(u8);
221impl_message_response_for!(u16);
222impl_message_response_for!(u32);
223impl_message_response_for!(u64);
224impl_message_response_for!(usize);
225impl_message_response_for!(f32);
226impl_message_response_for!(f64);
227impl_message_response_for!(char);
228impl_message_response_for!(String);
229
230#[cfg(test)]
231mod tests {
232    use std::marker::PhantomData;
233
234    use anyhow::Result;
235    use pretty_assertions::assert_eq;
236
237    use super::*;
238    use crate::context::Context;
239
240    #[derive(Debug)]
241    struct A;
242
243    impl Actor for A {
244        type Context = Context<Self>;
245        type Error = anyhow::Error;
246    }
247
248    /// Synthetic message whose `Result` is `T`. Used to drive `MessageResponse` tests
249    /// without coupling to a concrete message type per scalar.
250    struct M<T>(PhantomData<T>);
251
252    impl<T> Message for M<T>
253    where
254        T: Send + 'static,
255    {
256        type Result = T;
257    }
258
259    async fn roundtrip<R>(value: R) -> Result<R>
260    where
261        R: MessageResponse<A, M<R>> + Send + 'static,
262    {
263        let mut ctx = Context::<A>::with_capacity("test".into(), 1);
264        let (tx, rx) = oneshot::channel::<R>();
265        value.handle(&mut ctx, Some(tx)).await;
266        Ok(rx.await?)
267    }
268
269    #[tokio::test]
270    async fn test_message_response() -> Result<()> {
271        assert_eq!(roundtrip(()).await?, ());
272        assert_eq!(roundtrip(true).await?, true);
273        assert_eq!(roundtrip(-1_i32).await?, -1);
274        assert_eq!(roundtrip(42_u64).await?, 42);
275        assert_eq!(roundtrip('x').await?, 'x');
276        assert_eq!(roundtrip(String::from("hello")).await?, "hello");
277
278        // result
279        assert_eq!(
280            roundtrip::<std::result::Result<i32, String>>(Ok(10)).await?,
281            Ok(10)
282        );
283        assert_eq!(
284            roundtrip::<std::result::Result<i32, String>>(Err(String::from("err"))).await?,
285            Err(String::from("err"))
286        );
287
288        // option
289        assert_eq!(roundtrip(Some(42_u32)).await?, Some(42));
290        assert_eq!(roundtrip(None::<u32>).await?, None);
291
292        // box and arc
293        assert_eq!(*roundtrip(Box::new(7_u64)).await?, 7);
294        assert_eq!(*roundtrip(Arc::new(String::from("hi"))).await?, "hi");
295
296        // vec
297        assert_eq!(roundtrip(vec![1_u8, 2, 3]).await?, vec![1, 2, 3]);
298
299        Ok(())
300    }
301
302    #[test]
303    fn test_debug_fmt() {
304        let result = MessageResult::<M<i32>>(42);
305        assert_eq!(format!("{:?}", result), "MessageResult<M<i32>>");
306
307        let future_result = FutureMessageResult::<M<i32>>::new(async { 42 });
308        assert_eq!(
309            format!("{:?}", future_result),
310            "FutureMessageResult<M<i32>>"
311        );
312    }
313}