near_async/
messaging.rs

1use crate::functional::{SendAsyncFunction, SendFunction};
2use crate::futures::DelayedActionRunner;
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use std::fmt::{Debug, Display};
6use std::sync::{Arc, OnceLock};
7
8/// Trait for an actor. An actor is a struct that can handle messages and implements the Handler or
9/// HandlerWithContext trait.
10///
11/// IMPORTANT NOTE: For the Multithread actor runtime, NONE of the below methods are ever called.
12/// These are only applicable for the async tokio runtime.
13pub trait Actor {
14    /// This is automatically called by the actor runtime when the actor is started.
15    /// It is called before any messages are processed.
16    fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
17
18    /// This is called for every message that the actor handles, so that the actor can do something
19    /// else before and after handling the message.
20    fn wrap_handler<M, R>(
21        &mut self,
22        msg: M,
23        ctx: &mut dyn DelayedActionRunner<Self>,
24        f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> R,
25    ) -> R {
26        f(self, msg, ctx)
27    }
28
29    /// Called by the actor runtime right before the actor is dropped.
30    fn stop_actor(&mut self) {}
31}
32
33/// Trait for handling a message.
34/// This works in unison with the [`CanSend`] trait. An actor implements the Handler trait for all
35/// messages it would like to handle, while the CanSend trait implements the logic to send the
36/// message to the actor. Handle and CanSend are typically not both implemented by the same struct.
37pub trait Handler<M, R = ()>
38where
39    M: Send + 'static,
40    R: Send,
41{
42    fn handle(&mut self, msg: M) -> R;
43}
44
45/// Trait for handling a message with context.
46/// This is similar to the [`Handler`] trait, but it allows the handler to access the delayed action
47/// runner that is used to schedule actions to be run in the future. For tokio actors, the
48/// TokioRuntimeHandle<A> implements DelayedActionRunner<T>.
49/// Note that the implementer for handler of a message only needs to implement either of Handler or
50/// HandlerWithContext, not both.
51pub trait HandlerWithContext<M, R = ()>
52where
53    M: Send + 'static,
54    R: Send,
55{
56    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> R;
57}
58
59impl<A, M, R> HandlerWithContext<M, R> for A
60where
61    A: Actor + Handler<M, R>,
62    M: Send + 'static,
63    R: Send,
64{
65    fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> R {
66        self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
67    }
68}
69
70/// Trait for sending a typed message. The sent message is then handled by the Handler trait.
71/// See [`Handler<M>`] trait for more details.
72pub trait CanSend<M>: Send + Sync + 'static {
73    fn send(&self, message: M);
74}
75
76/// Trait for sending a typed message async. The sent message is then handled by the Handler trait.
77/// See [`Handler<M, R>`] trait for more details.
78pub trait CanSendAsync<M, R>: Send + Sync + 'static {
79    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
80}
81
82/// Wraps a CanSend. This should be used to pass around an Arc<dyn CanSend<M>>, instead
83/// of spelling out that type. Using a wrapper struct allows us to define more flexible
84/// APIs.
85pub struct Sender<M: 'static> {
86    sender: Arc<dyn CanSend<M>>,
87}
88
89impl<M> Clone for Sender<M> {
90    fn clone(&self) -> Self {
91        Self { sender: self.sender.clone() }
92    }
93}
94
95/// Extension functions to wrap a CanSend as a Sender.
96pub trait IntoSender<M> {
97    /// This allows conversion of an owned CanSend into a Sender.
98    fn into_sender(self) -> Sender<M>;
99    /// This allows conversion of a reference-counted CanSend into a Sender.
100    fn as_sender(self: &Arc<Self>) -> Sender<M>;
101}
102
103impl<M, T: CanSend<M>> IntoSender<M> for T {
104    fn into_sender(self) -> Sender<M> {
105        Sender::from_impl(self)
106    }
107    fn as_sender(self: &Arc<Self>) -> Sender<M> {
108        Sender::from_arc(self.clone())
109    }
110}
111
112impl<M> Sender<M> {
113    /// Sends a message. It's the responsibility of the underlying CanSend
114    /// implementation to decide how to handle the message.
115    pub fn send(&self, message: M) {
116        self.sender.send(message)
117    }
118
119    fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
120        Self { sender: Arc::new(sender) }
121    }
122
123    fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
124        Self { sender: arc }
125    }
126
127    /// Creates a sender that handles messages using the given function.
128    pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
129        Self::from_impl(SendFunction::new(send))
130    }
131}
132
133/// Wraps a `CanSendAsync`. Similar to [`Sender<M>`] but for asynchronous messaging.
134pub struct AsyncSender<M: 'static, R: Send + 'static> {
135    sender: Arc<dyn CanSendAsync<M, R>>,
136}
137
138impl<M, R: Send + 'static> Clone for AsyncSender<M, R> {
139    fn clone(&self) -> Self {
140        Self { sender: self.sender.clone() }
141    }
142}
143
144/// Extension functions to wrap a `CanSendAsync` as an [`AsyncSender`].
145pub trait IntoAsyncSender<M, R: Send> {
146    /// Convert an owned implementer into an [`AsyncSender`].
147    fn into_async_sender(self) -> AsyncSender<M, R>;
148    /// Convert a reference-counted implementer into an [`AsyncSender`].
149    fn as_async_sender(self: &Arc<Self>) -> AsyncSender<M, R>;
150}
151
152impl<M, R: Send + 'static, T: CanSendAsync<M, R>> IntoAsyncSender<M, R> for T {
153    fn into_async_sender(self) -> AsyncSender<M, R> {
154        AsyncSender::from_impl(self)
155    }
156
157    fn as_async_sender(self: &Arc<Self>) -> AsyncSender<M, R> {
158        AsyncSender::from_arc(self.clone())
159    }
160}
161
162impl<M, R: Send + 'static> AsyncSender<M, R> {
163    /// Sends a message asynchronously, forwarding to the underlying [`CanSendAsync`].
164    pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
165        self.sender.send_async(message)
166    }
167
168    fn from_impl(sender: impl CanSendAsync<M, R> + 'static) -> Self {
169        Self { sender: Arc::new(sender) }
170    }
171
172    fn from_arc<T: CanSendAsync<M, R> + 'static>(arc: Arc<T>) -> Self {
173        Self { sender: arc }
174    }
175
176    /// Creates an async sender backed by a synchronous function that returns the response.
177    pub fn from_fn(send: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
178        Self::from_impl(SendAsyncFunction::new(send))
179    }
180}
181
182/// Generic failure for async send.
183#[derive(Debug, Copy, Clone, PartialEq, Eq)]
184pub enum AsyncSendError {
185    // The receiver side could not accept the message.
186    Closed,
187    // The receiver side could not process the message in time.
188    Timeout,
189    // The message was ignored entirely.
190    Dropped,
191}
192
193impl std::error::Error for AsyncSendError {}
194
195impl Display for AsyncSendError {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        Debug::fmt(self, f)
198    }
199}
200
201/// Sometimes we want to be able to pass in a sender that has not yet been fully constructed.
202/// LateBoundSender can act as a placeholder to pass CanSend and CanSendAsync capabilities
203/// through to the inner object. bind() should be called when the inner object is ready.
204/// All calls to send() and send_async() through this wrapper will block until bind() is called.
205///
206/// Example:
207///   let late_bound = LateBoundSender::new();
208///   let something_else = SomethingElse::new(late_bound.as_sender());
209///   let implementation = Implementation::new(something_else);
210///   late_bound.bind(implementation);
211pub struct LateBoundSender<S> {
212    sender: OnceLock<S>,
213}
214
215impl<S> LateBoundSender<S> {
216    pub fn new() -> Arc<Self> {
217        Arc::new(Self { sender: OnceLock::new() })
218    }
219
220    pub fn bind(&self, sender: S) {
221        self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
222    }
223}
224
225/// Allows LateBoundSender to be convertible to a Sender as long as the inner object could be.
226impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
227    fn send(&self, message: M) {
228        self.sender.wait().send(message);
229    }
230}
231
232impl<M, R, S: CanSendAsync<M, R>> CanSendAsync<M, R> for LateBoundSender<S> {
233    fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
234        self.sender.wait().send_async(message)
235    }
236}
237
238pub struct Noop;
239
240impl<M> CanSend<M> for Noop {
241    fn send(&self, _message: M) {}
242}
243
244impl<M> CanSend<M> for Arc<Noop> {
245    fn send(&self, _message: M) {}
246}
247
248impl<M, R: Send + 'static> CanSendAsync<M, R> for Noop {
249    fn send_async(&self, _message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
250        async { Err(AsyncSendError::Dropped) }.boxed()
251    }
252}
253
254impl<M, R: Send + 'static> CanSendAsync<M, R> for Arc<Noop> {
255    fn send_async(&self, _message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
256        async { Err(AsyncSendError::Dropped) }.boxed()
257    }
258}
259
260/// Creates a no-op sender that does nothing with the message.
261///
262/// Returns a type that can be converted to any type of sender,
263/// sync or async, including multi-senders.
264pub fn noop() -> Arc<Noop> {
265    Arc::new(Noop)
266}
267
268/// A trait for converting something that implements individual senders into
269/// a multi-sender.
270pub trait IntoMultiSender<A> {
271    fn as_multi_sender(self: &Arc<Self>) -> A;
272    fn into_multi_sender(self) -> A;
273}
274
275pub trait MultiSenderFrom<A> {
276    fn multi_sender_from(a: Arc<A>) -> Self;
277}
278
279impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
280    fn as_multi_sender(self: &Arc<Self>) -> B {
281        B::multi_sender_from(self.clone())
282    }
283    fn into_multi_sender(self) -> B {
284        B::multi_sender_from(Arc::new(self))
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use crate::messaging::{AsyncSendError, AsyncSender, CanSendAsync, IntoAsyncSender};
291    use futures::FutureExt;
292    use futures::future::BoxFuture;
293
294    struct DroppingSender;
295
296    impl CanSendAsync<u32, u32> for DroppingSender {
297        fn send_async(&self, _message: u32) -> BoxFuture<'static, Result<u32, AsyncSendError>> {
298            async { Err(AsyncSendError::Dropped) }.boxed()
299        }
300    }
301
302    #[tokio::test]
303    async fn test_async_sender_from_async_fn_success() {
304        let sender = AsyncSender::from_fn(|x: u32| x + 1);
305        let result = sender.send_async(4).await;
306        assert_eq!(result, Ok(5));
307    }
308
309    #[tokio::test]
310    async fn test_async_sender_dropped_result() {
311        let sender = DroppingSender.into_async_sender();
312        let result = sender.send_async(7).await;
313        assert_eq!(result, Err(AsyncSendError::Dropped));
314    }
315}