1use crate::break_apart::BreakApart;
2use crate::functional::{SendAsyncFunction, SendFunction};
3use crate::futures::DelayedActionRunner;
4use futures::FutureExt;
5use futures::future::BoxFuture;
6use std::fmt::{Debug, Display};
7use std::sync::{Arc, OnceLock};
8use tokio::sync::oneshot;
9
10pub trait Actor {
15 fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
16
17 fn wrap_handler<M: actix::Message>(
18 &mut self,
19 msg: M,
20 ctx: &mut dyn DelayedActionRunner<Self>,
21 f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> M::Result,
22 ) -> M::Result {
23 f(self, msg, ctx)
24 }
25}
26
27pub trait Handler<M: actix::Message>
33where
34 M::Result: Send,
35{
36 fn handle(&mut self, msg: M) -> M::Result;
37}
38
39pub trait HandlerWithContext<M: actix::Message>
46where
47 M::Result: Send,
48{
49 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
50}
51
52impl<A, M> HandlerWithContext<M> for A
53where
54 M: actix::Message,
55 A: Actor + Handler<M>,
56 M::Result: Send,
57{
58 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
59 self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
60 }
61}
62
63pub trait CanSend<M>: Send + Sync + 'static {
67 fn send(&self, message: M);
68}
69
70pub struct Sender<M: 'static> {
74 sender: Arc<dyn CanSend<M>>,
75}
76
77impl<M> Clone for Sender<M> {
78 fn clone(&self) -> Self {
79 Self { sender: self.sender.clone() }
80 }
81}
82
83pub trait IntoSender<M> {
85 fn into_sender(self) -> Sender<M>;
87 fn as_sender(self: &Arc<Self>) -> Sender<M>;
89}
90
91impl<M, T: CanSend<M>> IntoSender<M> for T {
92 fn into_sender(self) -> Sender<M> {
93 Sender::from_impl(self)
94 }
95 fn as_sender(self: &Arc<Self>) -> Sender<M> {
96 Sender::from_arc(self.clone())
97 }
98}
99
100impl<M> Sender<M> {
101 pub fn send(&self, message: M) {
104 self.sender.send(message)
105 }
106
107 fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
108 Self { sender: Arc::new(sender) }
109 }
110
111 fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
112 Self { sender: arc }
113 }
114
115 pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
117 Self::from_impl(SendFunction::new(send))
118 }
119
120 pub fn break_apart(self) -> BreakApart<M> {
123 BreakApart { sender: self }
124 }
125}
126
127pub trait SendAsync<M, R: Send + 'static> {
133 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
134}
135
136impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendAsync<M, R> for A {
137 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
138 let (sender, receiver) =
144 oneshot::channel::<BoxFuture<'static, Result<R, AsyncSendError>>>();
145 let future = async move {
146 match receiver.await {
147 Ok(result_future) => result_future.await,
148 Err(_) => Err(AsyncSendError::Dropped),
149 }
150 };
151 let responder = Box::new(move |r| {
152 sender.send(r).ok();
155 });
156 self.send(MessageWithCallback { message, callback: responder });
157 future.boxed()
158 }
159}
160
161impl<M, R: Send + 'static> Sender<MessageWithCallback<M, R>> {
162 pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
164 self.sender.send_async(message)
165 }
166
167 pub fn from_async_fn(send_async: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
174 Self::from_impl(SendAsyncFunction::new(send_async))
175 }
176}
177
178#[derive(Debug, Copy, Clone, PartialEq, Eq)]
180pub enum AsyncSendError {
181 Closed,
183 Timeout,
185 Dropped,
187}
188
189impl Display for AsyncSendError {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 Debug::fmt(self, f)
192 }
193}
194
195pub struct MessageWithCallback<T, R> {
199 pub message: T,
200 pub callback: Box<dyn FnOnce(BoxFuture<'static, Result<R, AsyncSendError>>) + Send>,
201}
202
203impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_tuple("MessageWithCallback").field(&self.message).finish()
206 }
207}
208
209pub type AsyncSender<M, R> = Sender<MessageWithCallback<M, R>>;
210
211pub struct LateBoundSender<S> {
222 sender: OnceLock<S>,
223}
224
225impl<S> LateBoundSender<S> {
226 pub fn new() -> Arc<Self> {
227 Arc::new(Self { sender: OnceLock::new() })
228 }
229
230 pub fn bind(&self, sender: S) {
231 self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
232 }
233}
234
235impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
237 fn send(&self, message: M) {
238 self.sender.wait().send(message);
239 }
240}
241
242pub struct Noop;
243
244impl<M> CanSend<M> for Noop {
245 fn send(&self, _message: M) {}
246}
247
248pub fn noop() -> Noop {
253 Noop
254}
255
256pub trait IntoMultiSender<A> {
259 fn as_multi_sender(self: &Arc<Self>) -> A;
260 fn into_multi_sender(self) -> A;
261}
262
263pub trait MultiSenderFrom<A> {
264 fn multi_sender_from(a: Arc<A>) -> Self;
265}
266
267impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
268 fn as_multi_sender(self: &Arc<Self>) -> B {
269 B::multi_sender_from(self.clone())
270 }
271 fn into_multi_sender(self) -> B {
272 B::multi_sender_from(Arc::new(self))
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
279 use futures::FutureExt;
280 use tokio_util::sync::CancellationToken;
281
282 #[tokio::test]
283 async fn test_async_send_sender_dropped() {
284 let sender: Sender<MessageWithCallback<u32, u32>> = Sender::from_fn(|_| {});
286 let result = sender.send_async(4).await;
287 assert_eq!(result, Err(AsyncSendError::Dropped));
288 }
289
290 #[tokio::test]
291 async fn test_async_send_receiver_dropped() {
292 let result_blocker = CancellationToken::new();
294 let callback_done = CancellationToken::new();
295
296 let default_panic = std::panic::take_hook();
297 std::panic::set_hook(Box::new(move |info| {
298 default_panic(info);
299 std::process::abort();
300 }));
301
302 let sender = {
303 let result_blocker = result_blocker.clone();
304 let callback_done = callback_done.clone();
305 Sender::<MessageWithCallback<u32, u32>>::from_fn(move |msg| {
306 let MessageWithCallback { message, callback } = msg;
307 let result_blocker = result_blocker.clone();
308 let callback_done = callback_done.clone();
309 tokio::spawn(async move {
310 result_blocker.cancelled().await;
311 callback(async move { Ok(message) }.boxed());
312 callback_done.cancel();
313 });
314 })
315 };
316
317 drop(sender.send_async(4)); result_blocker.cancel();
319 callback_done.cancelled().await;
320 }
321}