near_async/
messaging.rs

1use crate::break_apart::BreakApart;
2use crate::functional::{SendAsyncFunction, SendFunction};
3use crate::futures::DelayedActionRunner;
4use futures::FutureExt;
5use futures::future::BoxFuture;
6use std::fmt::{Debug, Display};
7use std::sync::{Arc, OnceLock};
8use tokio::sync::oneshot;
9
10/// Trait for an actor. An actor is a struct that can handle messages and implements the Handler or
11/// HandlerWithContext trait. We can optionally implement the start_actor trait which is executed in
12/// the beginning of the actor's lifecycle.
13/// This corresponds to the actix::Actor trait `started` function.
14pub trait Actor {
15    fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
16
17    fn wrap_handler<M: actix::Message>(
18        &mut self,
19        msg: M,
20        ctx: &mut dyn DelayedActionRunner<Self>,
21        f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> M::Result,
22    ) -> M::Result {
23        f(self, msg, ctx)
24    }
25}
26
27/// Trait for handling a message.
28/// This works in unison with the [`CanSend`] trait. An actor implements the Handler trait for all
29/// messages it would like to handle, while the CanSend trait implements the logic to send the
30/// message to the actor. Handle and CanSend are typically not both implemented by the same struct.
31/// Note that the actor is any struct that implements the Handler trait, not just actix actors.
32pub trait Handler<M: actix::Message>
33where
34    M::Result: Send,
35{
36    fn handle(&mut self, msg: M) -> M::Result;
37}
38
39/// Trait for handling a message with context.
40/// This is similar to the [`Handler`] trait, but it allows the handler to access the delayed action
41/// runner that is used to schedule actions to be run in the future. For actix::Actor, the context
42/// defined as actix::Context<Self> implements DelayedActionRunner<T>.
43/// Note that the implementer for handler of a message only needs to implement either of Handler or
44/// HandlerWithContext, not both.
45pub trait HandlerWithContext<M: actix::Message>
46where
47    M::Result: Send,
48{
49    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
50}
51
52impl<A, M> HandlerWithContext<M> for A
53where
54    M: actix::Message,
55    A: Actor + Handler<M>,
56    M::Result: Send,
57{
58    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
59        self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
60    }
61}
62
63/// Trait for sending a typed message. The sent message is then handled by the Handler trait.
64/// actix::Addr, which is derived from actix::Actor is an example of a struct that implements CanSend.
65/// See [`Handler`] trait for more details.
66pub trait CanSend<M>: Send + Sync + 'static {
67    fn send(&self, message: M);
68}
69
70/// Wraps a CanSend. This should be used to pass around an Arc<dyn CanSend<M>>, instead
71/// of spelling out that type. Using a wrapper struct allows us to define more flexible
72/// APIs.
73pub struct Sender<M: 'static> {
74    sender: Arc<dyn CanSend<M>>,
75}
76
77impl<M> Clone for Sender<M> {
78    fn clone(&self) -> Self {
79        Self { sender: self.sender.clone() }
80    }
81}
82
83/// Extension functions to wrap a CanSend as a Sender.
84pub trait IntoSender<M> {
85    /// This allows conversion of an owned CanSend into a Sender.
86    fn into_sender(self) -> Sender<M>;
87    /// This allows conversion of a reference-counted CanSend into a Sender.
88    fn as_sender(self: &Arc<Self>) -> Sender<M>;
89}
90
91impl<M, T: CanSend<M>> IntoSender<M> for T {
92    fn into_sender(self) -> Sender<M> {
93        Sender::from_impl(self)
94    }
95    fn as_sender(self: &Arc<Self>) -> Sender<M> {
96        Sender::from_arc(self.clone())
97    }
98}
99
100impl<M> Sender<M> {
101    /// Sends a message. It's the responsibility of the underlying CanSend
102    /// implementation to decide how to handle the message.
103    pub fn send(&self, message: M) {
104        self.sender.send(message)
105    }
106
107    fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
108        Self { sender: Arc::new(sender) }
109    }
110
111    fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
112        Self { sender: arc }
113    }
114
115    /// Creates a sender that handles messages using the given function.
116    pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
117        Self::from_impl(SendFunction::new(send))
118    }
119
120    /// Creates an object that implements `CanSend<Inner>` for any message `Inner`
121    /// that can be converted to `M`.
122    pub fn break_apart(self) -> BreakApart<M> {
123        BreakApart { sender: self }
124    }
125}
126
127/// Extension trait (not for anyone to implement), that allows a
128/// Sender<MessageWithCallback<M, R>> to be used to send a message and then
129/// getting a future of the response.
130///
131/// See `AsyncSendError` for reasons that the future may resolve to an error result.
132pub trait SendAsync<M, R: Send + 'static> {
133    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
134}
135
136impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendAsync<M, R> for A {
137    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
138        // To send a message and get a future of the response, we use a oneshot
139        // channel that is resolved when the responder function is called. It's
140        // possible that someone implementing the Sender would just drop the
141        // message without calling the responder, in which case we return a
142        // Dropped error.
143        let (sender, receiver) =
144            oneshot::channel::<BoxFuture<'static, Result<R, AsyncSendError>>>();
145        let future = async move {
146            match receiver.await {
147                Ok(result_future) => result_future.await,
148                Err(_) => Err(AsyncSendError::Dropped),
149            }
150        };
151        let responder = Box::new(move |r| {
152            // It's ok for the send to return an error, because that means the receiver is dropped
153            // therefore the sender does not care about the result.
154            sender.send(r).ok();
155        });
156        self.send(MessageWithCallback { message, callback: responder });
157        future.boxed()
158    }
159}
160
161impl<M, R: Send + 'static> Sender<MessageWithCallback<M, R>> {
162    /// Same as the above, but for a concrete Sender type.
163    pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
164        self.sender.send_async(message)
165    }
166
167    /// Creates a sender that would handle send_async messages by producing a
168    /// result synchronously. Note that the provided function is NOT async.
169    ///
170    /// (Implementing the similar functionality with async function is possible
171    /// but requires deciding who drives the async function; a FutureSpawner
172    /// can be a good idea.)
173    pub fn from_async_fn(send_async: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
174        Self::from_impl(SendAsyncFunction::new(send_async))
175    }
176}
177
178/// Generic failure for async send.
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub enum AsyncSendError {
181    // The receiver side could not accept the message.
182    Closed,
183    // The receiver side could not process the message in time.
184    Timeout,
185    // The message was ignored entirely.
186    Dropped,
187}
188
189impl Display for AsyncSendError {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        Debug::fmt(self, f)
192    }
193}
194
195/// Used to implement an async sender. An async sender is just a Sender whose
196/// message is `MessageWithCallback<M, R>`, which is a message plus a
197/// callback to send the response future back.
198pub struct MessageWithCallback<T, R> {
199    pub message: T,
200    pub callback: Box<dyn FnOnce(BoxFuture<'static, Result<R, AsyncSendError>>) + Send>,
201}
202
203impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_tuple("MessageWithCallback").field(&self.message).finish()
206    }
207}
208
209pub type AsyncSender<M, R> = Sender<MessageWithCallback<M, R>>;
210
211/// Sometimes we want to be able to pass in a sender that has not yet been fully constructed.
212/// LateBoundSender can act as a placeholder to pass CanSend and CanSendAsync capabilities
213/// through to the inner object. bind() should be called when the inner object is ready.
214/// All calls to send() and send_async() through this wrapper will block until bind() is called.
215///
216/// Example:
217///   let late_bound = LateBoundSender::new();
218///   let something_else = SomethingElse::new(late_bound.as_sender());
219///   let implementation = Implementation::new(something_else);
220///   late_bound.bind(implementation);
221pub struct LateBoundSender<S> {
222    sender: OnceLock<S>,
223}
224
225impl<S> LateBoundSender<S> {
226    pub fn new() -> Arc<Self> {
227        Arc::new(Self { sender: OnceLock::new() })
228    }
229
230    pub fn bind(&self, sender: S) {
231        self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
232    }
233}
234
235/// Allows LateBoundSender to be convertible to a Sender as long as the inner object could be.
236impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
237    fn send(&self, message: M) {
238        self.sender.wait().send(message);
239    }
240}
241
242pub struct Noop;
243
244impl<M> CanSend<M> for Noop {
245    fn send(&self, _message: M) {}
246}
247
248/// Creates a no-op sender that does nothing with the message.
249///
250/// Returns a type that can be converted to any type of sender,
251/// sync or async, including multi-senders.
252pub fn noop() -> Noop {
253    Noop
254}
255
256/// A trait for converting something that implements individual senders into
257/// a multi-sender.
258pub trait IntoMultiSender<A> {
259    fn as_multi_sender(self: &Arc<Self>) -> A;
260    fn into_multi_sender(self) -> A;
261}
262
263pub trait MultiSenderFrom<A> {
264    fn multi_sender_from(a: Arc<A>) -> Self;
265}
266
267impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
268    fn as_multi_sender(self: &Arc<Self>) -> B {
269        B::multi_sender_from(self.clone())
270    }
271    fn into_multi_sender(self) -> B {
272        B::multi_sender_from(Arc::new(self))
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
279    use futures::FutureExt;
280    use tokio_util::sync::CancellationToken;
281
282    #[tokio::test]
283    async fn test_async_send_sender_dropped() {
284        // The handler drops the callback, making the response will never resolve.
285        let sender: Sender<MessageWithCallback<u32, u32>> = Sender::from_fn(|_| {});
286        let result = sender.send_async(4).await;
287        assert_eq!(result, Err(AsyncSendError::Dropped));
288    }
289
290    #[tokio::test]
291    async fn test_async_send_receiver_dropped() {
292        // Test that if the receiver is dropped, the sending side will not panic.
293        let result_blocker = CancellationToken::new();
294        let callback_done = CancellationToken::new();
295
296        let default_panic = std::panic::take_hook();
297        std::panic::set_hook(Box::new(move |info| {
298            default_panic(info);
299            std::process::abort();
300        }));
301
302        let sender = {
303            let result_blocker = result_blocker.clone();
304            let callback_done = callback_done.clone();
305            Sender::<MessageWithCallback<u32, u32>>::from_fn(move |msg| {
306                let MessageWithCallback { message, callback } = msg;
307                let result_blocker = result_blocker.clone();
308                let callback_done = callback_done.clone();
309                tokio::spawn(async move {
310                    result_blocker.cancelled().await;
311                    callback(async move { Ok(message) }.boxed());
312                    callback_done.cancel();
313                });
314            })
315        };
316
317        drop(sender.send_async(4)); // drops the resulting future
318        result_blocker.cancel();
319        callback_done.cancelled().await;
320    }
321}