1use crate::break_apart::BreakApart;
2use crate::functional::{SendAsyncFunction, SendFunction};
3use crate::futures::DelayedActionRunner;
4use futures::future::BoxFuture;
5use futures::FutureExt;
6use once_cell::sync::OnceCell;
7use std::fmt::{Debug, Display};
8use std::sync::Arc;
9use tokio::sync::oneshot;
10
11pub trait Actor {
16 fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
17
18 fn wrap_handler<M: actix::Message>(
19 &mut self,
20 msg: M,
21 ctx: &mut dyn DelayedActionRunner<Self>,
22 f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> M::Result,
23 ) -> M::Result {
24 f(self, msg, ctx)
25 }
26}
27
28pub trait Handler<M: actix::Message>
34where
35 M::Result: Send,
36{
37 fn handle(&mut self, msg: M) -> M::Result;
38}
39
40pub trait HandlerWithContext<M: actix::Message>
47where
48 M::Result: Send,
49{
50 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
51}
52
53impl<A, M> HandlerWithContext<M> for A
54where
55 M: actix::Message,
56 A: Actor + Handler<M>,
57 M::Result: Send,
58{
59 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
60 self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
61 }
62}
63
64pub trait CanSend<M>: Send + Sync + 'static {
68 fn send(&self, message: M);
69}
70
71pub struct Sender<M: 'static> {
75 sender: Arc<dyn CanSend<M>>,
76}
77
78impl<M> Clone for Sender<M> {
79 fn clone(&self) -> Self {
80 Self { sender: self.sender.clone() }
81 }
82}
83
84pub trait IntoSender<M> {
86 fn into_sender(self) -> Sender<M>;
88 fn as_sender(self: &Arc<Self>) -> Sender<M>;
90}
91
92impl<M, T: CanSend<M>> IntoSender<M> for T {
93 fn into_sender(self) -> Sender<M> {
94 Sender::from_impl(self)
95 }
96 fn as_sender(self: &Arc<Self>) -> Sender<M> {
97 Sender::from_arc(self.clone())
98 }
99}
100
101impl<M> Sender<M> {
102 pub fn send(&self, message: M) {
105 self.sender.send(message)
106 }
107
108 fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
109 Self { sender: Arc::new(sender) }
110 }
111
112 fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
113 Self { sender: arc }
114 }
115
116 pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
118 Self::from_impl(SendFunction::new(send))
119 }
120
121 pub fn break_apart(self) -> BreakApart<M> {
124 BreakApart { sender: self }
125 }
126}
127
128pub trait SendAsync<M, R: Send + 'static> {
134 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
135}
136
137impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendAsync<M, R> for A {
138 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
139 let (sender, receiver) =
145 oneshot::channel::<BoxFuture<'static, Result<R, AsyncSendError>>>();
146 let future = async move {
147 match receiver.await {
148 Ok(result_future) => result_future.await,
149 Err(_) => Err(AsyncSendError::Dropped),
150 }
151 };
152 let responder = Box::new(move |r| {
153 sender.send(r).ok();
156 });
157 self.send(MessageWithCallback { message, callback: responder });
158 future.boxed()
159 }
160}
161
162impl<M, R: Send + 'static> Sender<MessageWithCallback<M, R>> {
163 pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
165 self.sender.send_async(message)
166 }
167
168 pub fn from_async_fn(send_async: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
175 Self::from_impl(SendAsyncFunction::new(send_async))
176 }
177}
178
179#[derive(Debug, Copy, Clone, PartialEq, Eq)]
181pub enum AsyncSendError {
182 Closed,
184 Timeout,
186 Dropped,
188}
189
190impl Display for AsyncSendError {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 Debug::fmt(self, f)
193 }
194}
195
196pub struct MessageWithCallback<T, R> {
200 pub message: T,
201 pub callback: Box<dyn FnOnce(BoxFuture<'static, Result<R, AsyncSendError>>) + Send>,
202}
203
204impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 f.debug_tuple("MessageWithCallback").field(&self.message).finish()
207 }
208}
209
210pub type AsyncSender<M, R> = Sender<MessageWithCallback<M, R>>;
211
212pub struct LateBoundSender<S> {
223 sender: OnceCell<S>,
224}
225
226impl<S> LateBoundSender<S> {
227 pub fn new() -> Arc<Self> {
228 Arc::new(Self { sender: OnceCell::new() })
229 }
230
231 pub fn bind(&self, sender: S) {
232 self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
233 }
234}
235
236impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
238 fn send(&self, message: M) {
239 self.sender.wait().send(message);
240 }
241}
242
243pub struct Noop;
244
245impl<M> CanSend<M> for Noop {
246 fn send(&self, _message: M) {}
247}
248
249pub fn noop() -> Noop {
254 Noop
255}
256
257pub trait IntoMultiSender<A> {
260 fn as_multi_sender(self: &Arc<Self>) -> A;
261 fn into_multi_sender(self) -> A;
262}
263
264pub trait MultiSenderFrom<A> {
265 fn multi_sender_from(a: Arc<A>) -> Self;
266}
267
268impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
269 fn as_multi_sender(self: &Arc<Self>) -> B {
270 B::multi_sender_from(self.clone())
271 }
272 fn into_multi_sender(self) -> B {
273 B::multi_sender_from(Arc::new(self))
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
280 use futures::FutureExt;
281 use tokio_util::sync::CancellationToken;
282
283 #[tokio::test]
284 async fn test_async_send_sender_dropped() {
285 let sender: Sender<MessageWithCallback<u32, u32>> = Sender::from_fn(|_| {});
287 let result = sender.send_async(4).await;
288 assert_eq!(result, Err(AsyncSendError::Dropped));
289 }
290
291 #[tokio::test]
292 async fn test_async_send_receiver_dropped() {
293 let result_blocker = CancellationToken::new();
295 let callback_done = CancellationToken::new();
296
297 let default_panic = std::panic::take_hook();
298 std::panic::set_hook(Box::new(move |info| {
299 default_panic(info);
300 std::process::abort();
301 }));
302
303 let sender = {
304 let result_blocker = result_blocker.clone();
305 let callback_done = callback_done.clone();
306 Sender::<MessageWithCallback<u32, u32>>::from_fn(move |msg| {
307 let MessageWithCallback { message, callback } = msg;
308 let result_blocker = result_blocker.clone();
309 let callback_done = callback_done.clone();
310 tokio::spawn(async move {
311 result_blocker.cancelled().await;
312 callback(async move { Ok(message) }.boxed());
313 callback_done.cancel();
314 });
315 })
316 };
317
318 drop(sender.send_async(4)); result_blocker.cancel();
320 callback_done.cancelled().await;
321 }
322}