1use std::future::{self, Future};
10use std::sync::Arc;
11
12use tracing::debug;
13
14use crate::actor::Actor;
15use crate::channel::oneshot;
16use crate::error::{BoxError, ErrorReport, SendError};
17
18mod result;
19pub use result::MessageResult;
20
21mod future_result;
22pub use future_result::FutureMessageResult;
23
24#[cfg(feature = "identifier")]
25mod index;
26#[cfg(feature = "identifier")]
27#[cfg_attr(docsrs, doc(cfg(feature = "identifier")))]
28pub use index::MessageId;
29
30#[cfg(feature = "ipc")]
31mod remote;
32#[cfg(feature = "ipc")]
33#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
34pub use remote::BinaryMessage;
35
36pub trait Message: Send + 'static {
38 type Result: Send + 'static;
40}
41
42pub trait MessageResponse<A, M>: Send
44where
45 A: Actor,
46 M: Message,
47{
48 fn handle(
50 self,
51 ctx: &mut A::Context,
52 tx: Option<oneshot::Sender<M::Result>>,
53 ) -> impl Future<Output = ()> + Send;
54}
55
56pub trait Handler<M>: Actor
58where
59 M: Message,
60{
61 type Result: MessageResponse<Self, M>;
63
64 fn handle(
66 &mut self,
67 msg: M,
68 ctx: &mut Self::Context,
69 ) -> impl Future<Output = Self::Result> + Send;
70}
71
72impl Message for () {
73 type Result = ();
74}
75
76impl<M> Message for Box<M>
79where
80 M: Message,
81{
82 type Result = M::Result;
83}
84
85impl<M> Message for Arc<M>
86where
87 M: Message + Sync,
88{
89 type Result = M::Result;
90}
91
92impl<A, M, T, E> MessageResponse<A, M> for Result<T, E>
95where
96 A: Actor,
97 M: Message<Result = Self>,
98 T: Send,
99 E: Into<BoxError> + Send,
100{
101 fn handle(
102 self,
103 _ctx: &mut A::Context,
104 tx: Option<oneshot::Sender<M::Result>>,
105 ) -> impl Future<Output = ()> + Send {
106 if let Some(tx) = tx {
107 if let Err(SendError::Closed(Err(e))) = tx.send(self) {
108 debug!(
109 "Could not send the result back to the sender since the channel is closed, \
110 log the dropped error: {}",
111 e.into().report()
112 );
113 }
114 }
115 future::ready(())
117 }
118}
119
120impl<A, M, T> MessageResponse<A, M> for Option<T>
121where
122 A: Actor,
123 M: Message<Result = Self>,
124 T: Send,
125{
126 fn handle(
127 self,
128 _ctx: &mut A::Context,
129 tx: Option<oneshot::Sender<M::Result>>,
130 ) -> impl Future<Output = ()> + Send {
131 if let Some(tx) = tx {
132 let _ = tx.send(self);
133 }
134 future::ready(())
135 }
136}
137
138impl<A, M, T> MessageResponse<A, M> for Box<T>
139where
140 A: Actor,
141 M: Message<Result = Self>,
142 T: Send,
143{
144 fn handle(
145 self,
146 _ctx: &mut A::Context,
147 tx: Option<oneshot::Sender<M::Result>>,
148 ) -> impl Future<Output = ()> + Send {
149 if let Some(tx) = tx {
150 let _ = tx.send(self);
151 }
152 future::ready(())
153 }
154}
155
156impl<A, M, T> MessageResponse<A, M> for Arc<T>
157where
158 A: Actor,
159 M: Message<Result = Self>,
160 T: Send + Sync,
161{
162 fn handle(
163 self,
164 _ctx: &mut A::Context,
165 tx: Option<oneshot::Sender<M::Result>>,
166 ) -> impl Future<Output = ()> + Send {
167 if let Some(tx) = tx {
168 let _ = tx.send(self);
169 }
170 future::ready(())
171 }
172}
173
174impl<A, M, T> MessageResponse<A, M> for Vec<T>
175where
176 A: Actor,
177 M: Message<Result = Self>,
178 T: Send,
179{
180 fn handle(
181 self,
182 _ctx: &mut A::Context,
183 tx: Option<oneshot::Sender<M::Result>>,
184 ) -> impl Future<Output = ()> + Send {
185 if let Some(tx) = tx {
186 let _ = tx.send(self);
187 }
188 future::ready(())
189 }
190}
191
192macro_rules! impl_message_response_for {
193 ($type:ty) => {
194 impl<A, M> MessageResponse<A, M> for $type
195 where
196 A: Actor,
197 M: Message<Result = Self>,
198 {
199 fn handle(
200 self,
201 _ctx: &mut A::Context,
202 tx: Option<oneshot::Sender<M::Result>>,
203 ) -> impl Future<Output = ()> + Send {
204 if let Some(tx) = tx {
205 let _ = tx.send(self);
206 }
207 future::ready(())
208 }
209 }
210 };
211}
212
213impl_message_response_for!(());
214impl_message_response_for!(bool);
215impl_message_response_for!(i8);
216impl_message_response_for!(i16);
217impl_message_response_for!(i32);
218impl_message_response_for!(i64);
219impl_message_response_for!(isize);
220impl_message_response_for!(u8);
221impl_message_response_for!(u16);
222impl_message_response_for!(u32);
223impl_message_response_for!(u64);
224impl_message_response_for!(usize);
225impl_message_response_for!(f32);
226impl_message_response_for!(f64);
227impl_message_response_for!(char);
228impl_message_response_for!(String);
229
230#[cfg(test)]
231mod tests {
232 use std::marker::PhantomData;
233
234 use anyhow::Result;
235 use pretty_assertions::assert_eq;
236
237 use super::*;
238 use crate::context::Context;
239
240 #[derive(Debug)]
241 struct A;
242
243 impl Actor for A {
244 type Context = Context<Self>;
245 type Error = anyhow::Error;
246 }
247
248 struct M<T>(PhantomData<T>);
251
252 impl<T> Message for M<T>
253 where
254 T: Send + 'static,
255 {
256 type Result = T;
257 }
258
259 async fn roundtrip<R>(value: R) -> Result<R>
260 where
261 R: MessageResponse<A, M<R>> + Send + 'static,
262 {
263 let mut ctx = Context::<A>::with_capacity("test".into(), 1);
264 let (tx, rx) = oneshot::channel::<R>();
265 value.handle(&mut ctx, Some(tx)).await;
266 Ok(rx.await?)
267 }
268
269 #[tokio::test]
270 async fn test_message_response() -> Result<()> {
271 assert_eq!(roundtrip(()).await?, ());
272 assert_eq!(roundtrip(true).await?, true);
273 assert_eq!(roundtrip(-1_i32).await?, -1);
274 assert_eq!(roundtrip(42_u64).await?, 42);
275 assert_eq!(roundtrip('x').await?, 'x');
276 assert_eq!(roundtrip(String::from("hello")).await?, "hello");
277
278 assert_eq!(
280 roundtrip::<std::result::Result<i32, String>>(Ok(10)).await?,
281 Ok(10)
282 );
283 assert_eq!(
284 roundtrip::<std::result::Result<i32, String>>(Err(String::from("err"))).await?,
285 Err(String::from("err"))
286 );
287
288 assert_eq!(roundtrip(Some(42_u32)).await?, Some(42));
290 assert_eq!(roundtrip(None::<u32>).await?, None);
291
292 assert_eq!(*roundtrip(Box::new(7_u64)).await?, 7);
294 assert_eq!(*roundtrip(Arc::new(String::from("hi"))).await?, "hi");
295
296 assert_eq!(roundtrip(vec![1_u8, 2, 3]).await?, vec![1, 2, 3]);
298
299 Ok(())
300 }
301
302 #[test]
303 fn test_debug_fmt() {
304 let result = MessageResult::<M<i32>>(42);
305 assert_eq!(format!("{:?}", result), "MessageResult<M<i32>>");
306
307 let future_result = FutureMessageResult::<M<i32>>::new(async { 42 });
308 assert_eq!(
309 format!("{:?}", future_result),
310 "FutureMessageResult<M<i32>>"
311 );
312 }
313}