1use crate::functional::{SendAsyncFunction, SendFunction};
2use crate::futures::DelayedActionRunner;
3use futures::FutureExt;
4use futures::future::BoxFuture;
5use std::fmt::{Debug, Display};
6use std::sync::{Arc, OnceLock};
7
8pub trait Actor {
14 fn start_actor(&mut self, _ctx: &mut dyn DelayedActionRunner<Self>) {}
17
18 fn wrap_handler<M, R>(
21 &mut self,
22 msg: M,
23 ctx: &mut dyn DelayedActionRunner<Self>,
24 f: impl FnOnce(&mut Self, M, &mut dyn DelayedActionRunner<Self>) -> R,
25 ) -> R {
26 f(self, msg, ctx)
27 }
28
29 fn stop_actor(&mut self) {}
31}
32
33pub trait Handler<M, R = ()>
38where
39 M: Send + 'static,
40 R: Send,
41{
42 fn handle(&mut self, msg: M) -> R;
43}
44
45pub trait HandlerWithContext<M, R = ()>
52where
53 M: Send + 'static,
54 R: Send,
55{
56 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> R;
57}
58
59impl<A, M, R> HandlerWithContext<M, R> for A
60where
61 A: Actor + Handler<M, R>,
62 M: Send + 'static,
63 R: Send,
64{
65 fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> R {
66 self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
67 }
68}
69
70pub trait CanSend<M>: Send + Sync + 'static {
73 fn send(&self, message: M);
74}
75
76pub trait CanSendAsync<M, R>: Send + Sync + 'static {
79 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>>;
80}
81
82pub struct Sender<M: 'static> {
86 sender: Arc<dyn CanSend<M>>,
87}
88
89impl<M> Clone for Sender<M> {
90 fn clone(&self) -> Self {
91 Self { sender: self.sender.clone() }
92 }
93}
94
95pub trait IntoSender<M> {
97 fn into_sender(self) -> Sender<M>;
99 fn as_sender(self: &Arc<Self>) -> Sender<M>;
101}
102
103impl<M, T: CanSend<M>> IntoSender<M> for T {
104 fn into_sender(self) -> Sender<M> {
105 Sender::from_impl(self)
106 }
107 fn as_sender(self: &Arc<Self>) -> Sender<M> {
108 Sender::from_arc(self.clone())
109 }
110}
111
112impl<M> Sender<M> {
113 pub fn send(&self, message: M) {
116 self.sender.send(message)
117 }
118
119 fn from_impl(sender: impl CanSend<M> + 'static) -> Self {
120 Self { sender: Arc::new(sender) }
121 }
122
123 fn from_arc<T: CanSend<M> + 'static>(arc: Arc<T>) -> Self {
124 Self { sender: arc }
125 }
126
127 pub fn from_fn(send: impl Fn(M) + Send + Sync + 'static) -> Self {
129 Self::from_impl(SendFunction::new(send))
130 }
131}
132
133pub struct AsyncSender<M: 'static, R: Send + 'static> {
135 sender: Arc<dyn CanSendAsync<M, R>>,
136}
137
138impl<M, R: Send + 'static> Clone for AsyncSender<M, R> {
139 fn clone(&self) -> Self {
140 Self { sender: self.sender.clone() }
141 }
142}
143
144pub trait IntoAsyncSender<M, R: Send> {
146 fn into_async_sender(self) -> AsyncSender<M, R>;
148 fn as_async_sender(self: &Arc<Self>) -> AsyncSender<M, R>;
150}
151
152impl<M, R: Send + 'static, T: CanSendAsync<M, R>> IntoAsyncSender<M, R> for T {
153 fn into_async_sender(self) -> AsyncSender<M, R> {
154 AsyncSender::from_impl(self)
155 }
156
157 fn as_async_sender(self: &Arc<Self>) -> AsyncSender<M, R> {
158 AsyncSender::from_arc(self.clone())
159 }
160}
161
162impl<M, R: Send + 'static> AsyncSender<M, R> {
163 pub fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
165 self.sender.send_async(message)
166 }
167
168 fn from_impl(sender: impl CanSendAsync<M, R> + 'static) -> Self {
169 Self { sender: Arc::new(sender) }
170 }
171
172 fn from_arc<T: CanSendAsync<M, R> + 'static>(arc: Arc<T>) -> Self {
173 Self { sender: arc }
174 }
175
176 pub fn from_fn(send: impl Fn(M) -> R + Send + Sync + 'static) -> Self {
178 Self::from_impl(SendAsyncFunction::new(send))
179 }
180}
181
182#[derive(Debug, Copy, Clone, PartialEq, Eq)]
184pub enum AsyncSendError {
185 Closed,
187 Timeout,
189 Dropped,
191}
192
193impl std::error::Error for AsyncSendError {}
194
195impl Display for AsyncSendError {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 Debug::fmt(self, f)
198 }
199}
200
201pub struct LateBoundSender<S> {
212 sender: OnceLock<S>,
213}
214
215impl<S> LateBoundSender<S> {
216 pub fn new() -> Arc<Self> {
217 Arc::new(Self { sender: OnceLock::new() })
218 }
219
220 pub fn bind(&self, sender: S) {
221 self.sender.set(sender).map_err(|_| ()).expect("cannot set sender twice");
222 }
223}
224
225impl<M, S: CanSend<M>> CanSend<M> for LateBoundSender<S> {
227 fn send(&self, message: M) {
228 self.sender.wait().send(message);
229 }
230}
231
232impl<M, R, S: CanSendAsync<M, R>> CanSendAsync<M, R> for LateBoundSender<S> {
233 fn send_async(&self, message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
234 self.sender.wait().send_async(message)
235 }
236}
237
238pub struct Noop;
239
240impl<M> CanSend<M> for Noop {
241 fn send(&self, _message: M) {}
242}
243
244impl<M> CanSend<M> for Arc<Noop> {
245 fn send(&self, _message: M) {}
246}
247
248impl<M, R: Send + 'static> CanSendAsync<M, R> for Noop {
249 fn send_async(&self, _message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
250 async { Err(AsyncSendError::Dropped) }.boxed()
251 }
252}
253
254impl<M, R: Send + 'static> CanSendAsync<M, R> for Arc<Noop> {
255 fn send_async(&self, _message: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
256 async { Err(AsyncSendError::Dropped) }.boxed()
257 }
258}
259
260pub fn noop() -> Arc<Noop> {
265 Arc::new(Noop)
266}
267
268pub trait IntoMultiSender<A> {
271 fn as_multi_sender(self: &Arc<Self>) -> A;
272 fn into_multi_sender(self) -> A;
273}
274
275pub trait MultiSenderFrom<A> {
276 fn multi_sender_from(a: Arc<A>) -> Self;
277}
278
279impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
280 fn as_multi_sender(self: &Arc<Self>) -> B {
281 B::multi_sender_from(self.clone())
282 }
283 fn into_multi_sender(self) -> B {
284 B::multi_sender_from(Arc::new(self))
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use crate::messaging::{AsyncSendError, AsyncSender, CanSendAsync, IntoAsyncSender};
291 use futures::FutureExt;
292 use futures::future::BoxFuture;
293
294 struct DroppingSender;
295
296 impl CanSendAsync<u32, u32> for DroppingSender {
297 fn send_async(&self, _message: u32) -> BoxFuture<'static, Result<u32, AsyncSendError>> {
298 async { Err(AsyncSendError::Dropped) }.boxed()
299 }
300 }
301
302 #[tokio::test]
303 async fn test_async_sender_from_async_fn_success() {
304 let sender = AsyncSender::from_fn(|x: u32| x + 1);
305 let result = sender.send_async(4).await;
306 assert_eq!(result, Ok(5));
307 }
308
309 #[tokio::test]
310 async fn test_async_sender_dropped_result() {
311 let sender = DroppingSender.into_async_sender();
312 let result = sender.send_async(7).await;
313 assert_eq!(result, Err(AsyncSendError::Dropped));
314 }
315}