near_async/
messaging.rs

1use crate::break_apart::BreakApart;
2use crate::functional::{SendAsyncFunction, SendFunction};
3use crate::futures::DelayedActionRunner;
4use futures::future::BoxFuture;
5use futures::FutureExt;
6use once_cell::sync::OnceCell;
7use std::fmt::{Debug, Display};
8use std::sync::Arc;
9use tokio::sync::oneshot;
10
11/// Trait for an actor. An actor is a struct that can handle messages and implements the Handler or
12/// HandlerWithContext trait. We can optionally implement the start_actor trait which is executed in
13/// the beginning of the actor's lifecycle.
14/// This corresponds to the actix::Actor trait `started` function.
15pub trait Actor {
16    fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
17
18    fn wrap_handler<M: actix::Message>(
19        &mut self,
20        msg: M,
21        ctx: &mut dyn DelayedActionRunner<Self>,
22        f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> M::Result,
23    ) -> M::Result {
24        f(self, msg, ctx)
25    }
26}
27
28/// Trait for handling a message.
29/// This works in unison with the [`CanSend`] trait. An actor implements the Handler trait for all
30/// messages it would like to handle, while the CanSend trait implements the logic to send the
31/// message to the actor. Handle and CanSend are typically not both implemented by the same struct.
32/// Note that the actor is any struct that implements the Handler trait, not just actix actors.
33pub trait Handler<M: actix::Message>
34where
35    M::Result: Send,
36{
37    fn handle(&mut self, msg: M) -> M::Result;
38}
39
40/// Trait for handling a message with context.
41/// This is similar to the [`Handler`] trait, but it allows the handler to access the delayed action
42/// runner that is used to schedule actions to be run in the future. For actix::Actor, the context
43/// defined as actix::Context<Self> implements DelayedActionRunner<T>.
44/// Note that the implementer for handler of a message only needs to implement either of Handler or
45/// HandlerWithContext, not both.
46pub trait HandlerWithContext<M: actix::Message>
47where
48    M::Result: Send,
49{
50    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
51}
52
53impl<A, M> HandlerWithContext<M> for A
54where
55    M: actix::Message,
56    A: Actor + Handler<M>,
57    M::Result: Send,
58{
59    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
60        self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
61    }
62}
63
64/// Trait for sending a typed message. The sent message is then handled by the Handler trait.
65/// actix::Addr, which is derived from actix::Actor is an example of a struct that implements CanSend.
66/// See [`Handler`] trait for more details.
67pub trait CanSend<M>: Send + Sync + 'static {
68    fn send(&self, message: M);
69}
70
71/// Wraps a CanSend. This should be used to pass around an Arc<dyn CanSend<M>>, instead
72/// of spelling out that type. Using a wrapper struct allows us to define more flexible
73/// APIs.
74pub struct Sender<M: 'static> {
75    sender: Arc<dyn CanSend<M>>,
76}
77
78impl<M> Clone for Sender<M> {
79    fn clone(&self) -> Self {
80        Self { sender: self.sender.clone() }
81    }
82}
83
84/// Extension functions to wrap a CanSend as a Sender.
85pub trait IntoSender<M> {
86    /// This allows conversion of an owned CanSend into a Sender.
87    fn into_sender(self) -> Sender<M>;
88    /// This allows conversion of a reference-counted CanSend into a Sender.
89    fn as_sender(self: &Arc<Self>) -> Sender<M>;
90}
91
92impl<M, T: CanSend<M>> IntoSender<M> for T {
93    fn into_sender(self) -> Sender<M> {
94        Sender::from_impl(self)
95    }
96    fn as_sender(self: &Arc<Self>) -> Sender<M> {
97        Sender::from_arc(self.clone())
98    }
99}
100
101impl<M> Sender<M> {
102    /// Sends a message. It's the responsibility of the underlying CanSend
103    /// implementation to decide how to handle the message.
104    pub fn send(&self, message: M) {
105        self.sender.send(message)
106    }
107
108    fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
109        Self { sender: Arc::new(sender) }
110    }
111
112    fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
113        Self { sender: arc }
114    }
115
116    /// Creates a sender that handles messages using the given function.
117    pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
118        Self::from_impl(SendFunction::new(send))
119    }
120
121    /// Creates an object that implements `CanSend<Inner>` for any message `Inner`
122    /// that can be converted to `M`.
123    pub fn break_apart(self) -> BreakApart<M> {
124        BreakApart { sender: self }
125    }
126}
127
128/// Extension trait (not for anyone to implement), that allows a
129/// Sender<MessageWithCallback<M, R>> to be used to send a message and then
130/// getting a future of the response.
131///
132/// See `AsyncSendError` for reasons that the future may resolve to an error result.
133pub trait SendAsync<M, R: Send + 'static> {
134    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
135}
136
137impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendAsync<M, R> for A {
138    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
139        // To send a message and get a future of the response, we use a oneshot
140        // channel that is resolved when the responder function is called. It's
141        // possible that someone implementing the Sender would just drop the
142        // message without calling the responder, in which case we return a
143        // Dropped error.
144        let (sender, receiver) =
145            oneshot::channel::<BoxFuture<'static, Result<R, AsyncSendError>>>();
146        let future = async move {
147            match receiver.await {
148                Ok(result_future) => result_future.await,
149                Err(_) => Err(AsyncSendError::Dropped),
150            }
151        };
152        let responder = Box::new(move |r| {
153            // It's ok for the send to return an error, because that means the receiver is dropped
154            // therefore the sender does not care about the result.
155            sender.send(r).ok();
156        });
157        self.send(MessageWithCallback { message, callback: responder });
158        future.boxed()
159    }
160}
161
162impl<M, R: Send + 'static> Sender<MessageWithCallback<M, R>> {
163    /// Same as the above, but for a concrete Sender type.
164    pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
165        self.sender.send_async(message)
166    }
167
168    /// Creates a sender that would handle send_async messages by producing a
169    /// result synchronously. Note that the provided function is NOT async.
170    ///
171    /// (Implementing the similar functionality with async function is possible
172    /// but requires deciding who drives the async function; a FutureSpawner
173    /// can be a good idea.)
174    pub fn from_async_fn(send_async: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
175        Self::from_impl(SendAsyncFunction::new(send_async))
176    }
177}
178
179/// Generic failure for async send.
180#[derive(Debug, Copy, Clone, PartialEq, Eq)]
181pub enum AsyncSendError {
182    // The receiver side could not accept the message.
183    Closed,
184    // The receiver side could not process the message in time.
185    Timeout,
186    // The message was ignored entirely.
187    Dropped,
188}
189
190impl Display for AsyncSendError {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        Debug::fmt(self, f)
193    }
194}
195
196/// Used to implement an async sender. An async sender is just a Sender whose
197/// message is `MessageWithCallback<M, R>`, which is a message plus a
198/// callback to send the response future back.
199pub struct MessageWithCallback<T, R> {
200    pub message: T,
201    pub callback: Box<dyn FnOnce(BoxFuture<'static, Result<R, AsyncSendError>>) + Send>,
202}
203
204impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        f.debug_tuple("MessageWithCallback").field(&self.message).finish()
207    }
208}
209
210pub type AsyncSender<M, R> = Sender<MessageWithCallback<M, R>>;
211
212/// Sometimes we want to be able to pass in a sender that has not yet been fully constructed.
213/// LateBoundSender can act as a placeholder to pass CanSend and CanSendAsync capabilities
214/// through to the inner object. bind() should be called when the inner object is ready.
215/// All calls to send() and send_async() through this wrapper will block until bind() is called.
216///
217/// Example:
218///   let late_bound = LateBoundSender::new();
219///   let something_else = SomethingElse::new(late_bound.as_sender());
220///   let implementation = Implementation::new(something_else);
221///   late_bound.bind(implementation);
222pub struct LateBoundSender<S> {
223    sender: OnceCell<S>,
224}
225
226impl<S> LateBoundSender<S> {
227    pub fn new() -> Arc<Self> {
228        Arc::new(Self { sender: OnceCell::new() })
229    }
230
231    pub fn bind(&self, sender: S) {
232        self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
233    }
234}
235
236/// Allows LateBoundSender to be convertible to a Sender as long as the inner object could be.
237impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
238    fn send(&self, message: M) {
239        self.sender.wait().send(message);
240    }
241}
242
243pub struct Noop;
244
245impl<M> CanSend<M> for Noop {
246    fn send(&self, _message: M) {}
247}
248
249/// Creates a no-op sender that does nothing with the message.
250///
251/// Returns a type that can be converted to any type of sender,
252/// sync or async, including multi-senders.
253pub fn noop() -> Noop {
254    Noop
255}
256
257/// A trait for converting something that implements individual senders into
258/// a multi-sender.
259pub trait IntoMultiSender<A> {
260    fn as_multi_sender(self: &Arc<Self>) -> A;
261    fn into_multi_sender(self) -> A;
262}
263
264pub trait MultiSenderFrom<A> {
265    fn multi_sender_from(a: Arc<A>) -> Self;
266}
267
268impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
269    fn as_multi_sender(self: &Arc<Self>) -> B {
270        B::multi_sender_from(self.clone())
271    }
272    fn into_multi_sender(self) -> B {
273        B::multi_sender_from(Arc::new(self))
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
280    use futures::FutureExt;
281    use tokio_util::sync::CancellationToken;
282
283    #[tokio::test]
284    async fn test_async_send_sender_dropped() {
285        // The handler drops the callback, making the response will never resolve.
286        let sender: Sender<MessageWithCallback<u32, u32>> = Sender::from_fn(|_| {});
287        let result = sender.send_async(4).await;
288        assert_eq!(result, Err(AsyncSendError::Dropped));
289    }
290
291    #[tokio::test]
292    async fn test_async_send_receiver_dropped() {
293        // Test that if the receiver is dropped, the sending side will not panic.
294        let result_blocker = CancellationToken::new();
295        let callback_done = CancellationToken::new();
296
297        let default_panic = std::panic::take_hook();
298        std::panic::set_hook(Box::new(move |info| {
299            default_panic(info);
300            std::process::abort();
301        }));
302
303        let sender = {
304            let result_blocker = result_blocker.clone();
305            let callback_done = callback_done.clone();
306            Sender::<MessageWithCallback<u32, u32>>::from_fn(move |msg| {
307                let MessageWithCallback { message, callback } = msg;
308                let result_blocker = result_blocker.clone();
309                let callback_done = callback_done.clone();
310                tokio::spawn(async move {
311                    result_blocker.cancelled().await;
312                    callback(async move { Ok(message) }.boxed());
313                    callback_done.cancel();
314                });
315            })
316        };
317
318        drop(sender.send_async(4)); // drops the resulting future
319        result_blocker.cancel();
320        callback_done.cancelled().await;
321    }
322}