1use std::{any::Any, pin::Pin, sync::Arc};
2
3use futures::{Future, FutureExt};
4use tokio::sync::{
5 mpsc::{error::SendError, UnboundedSender},
6 oneshot::channel,
7};
8
9use crate::{
10 task::{HandlerTask, Task},
11 Actor, Context, Handler,
12};
13
14pub struct Addr<A: Actor> {
18 sender: Arc<UnboundedSender<Box<dyn Task<A>>>>,
19}
20
21impl<A: Actor> Addr<A> {
22 pub(crate) fn new(sender: Arc<UnboundedSender<Box<dyn Task<A>>>>) -> Self {
23 Addr { sender }
24 }
25
26 pub fn send<M: 'static + Send>(
28 &self,
29 message: M,
30 ) -> Result<impl Future<Output = A::Result>, SendError<M>>
31 where
32 A: Handler<M>,
33 {
34 let (sender, receiver) = channel();
35 let task = Box::new(HandlerTask::new(message, handle, sender));
36 self.sender.send(task).map_err(|error| {
37 let message: M = *Box::<dyn Any + 'static>::downcast(error.0.into_message())
38 .expect("It must be the original message that was sent back");
39 SendError(message)
40 })?;
41 Ok(receiver
42 .map(|maybe_result| maybe_result.expect("Worker thread has stopped unexpectedly")))
43 }
44}
45
46fn handle<'a, A: Actor + Handler<M>, M: 'static + Send>(
47 actor: &'a mut A,
48 message: M,
49 context: &'a mut Context<A>,
50) -> Pin<Box<dyn Future<Output = A::Result> + Send + 'a>> {
51 A::handle(actor, message, context)
52}
53
54impl<A: Actor> Clone for Addr<A> {
55 fn clone(&self) -> Self {
56 Addr {
57 sender: self.sender.clone(),
58 }
59 }
60}
61
62impl<A: Actor> std::fmt::Debug for Addr<A> {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("Addr")
65 .field("sender", &self.sender)
66 .finish()
67 }
68}