xactor/
addr.rs

1use crate::{Actor, ActorId, Caller, Context, Error, Handler, Message, Result, Sender};
2use futures::channel::{mpsc, oneshot};
3use futures::future::Shared;
4use futures::Future;
5use std::hash::{Hash, Hasher};
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, Weak};
8
9type ExecFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
10
11pub(crate) type ExecFn<A> =
12    Box<dyn for<'a> FnOnce(&'a mut A, &'a mut Context<A>) -> ExecFuture<'a> + Send + 'static>;
13
14pub(crate) enum ActorEvent<A> {
15    Exec(ExecFn<A>),
16    Stop(Option<Error>),
17    RemoveStream(usize),
18}
19
20/// The address of an actor.
21///
22/// When all references to `Addr<A>` are dropped, the actor ends.
23/// You can use `Clone` trait to create multiple copies of `Addr<A>`.
24pub struct Addr<A> {
25    pub(crate) actor_id: ActorId,
26    pub(crate) tx: Arc<mpsc::UnboundedSender<ActorEvent<A>>>,
27    pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
28}
29
30impl<A> Clone for Addr<A> {
31    fn clone(&self) -> Self {
32        Self {
33            actor_id: self.actor_id,
34            tx: self.tx.clone(),
35            rx_exit: self.rx_exit.clone(),
36        }
37    }
38}
39
40impl<A> Addr<A> {
41    pub fn downgrade(&self) -> WeakAddr<A> {
42        WeakAddr {
43            actor_id: self.actor_id,
44            tx: Arc::downgrade(&self.tx),
45            rx_exit: self.rx_exit.clone(),
46        }
47    }
48}
49
50impl<A> PartialEq for Addr<A> {
51    fn eq(&self, other: &Self) -> bool {
52        self.actor_id == other.actor_id
53    }
54}
55
56impl<A> Hash for Addr<A> {
57    fn hash<H: Hasher>(&self, state: &mut H) {
58        self.actor_id.hash(state)
59    }
60}
61
62impl<A: Actor> Addr<A> {
63    /// Returns the id of the actor.
64    pub fn actor_id(&self) -> ActorId {
65        self.actor_id
66    }
67
68    /// Stop the actor.
69    pub fn stop(&mut self, err: Option<Error>) -> Result<()> {
70        mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Stop(err))?;
71        Ok(())
72    }
73
74    /// Send a message `msg` to the actor and wait for the return value.
75    pub async fn call<T: Message>(&self, msg: T) -> Result<T::Result>
76    where
77        A: Handler<T>,
78    {
79        let (tx, rx) = oneshot::channel();
80        mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new(
81            move |actor, ctx| {
82                Box::pin(async move {
83                    let res = Handler::handle(actor, ctx, msg).await;
84                    let _ = tx.send(res);
85                })
86            },
87        )))?;
88
89        Ok(rx.await?)
90    }
91
92    /// Send a message `msg` to the actor without waiting for the return value.
93    pub fn send<T: Message<Result = ()>>(&self, msg: T) -> Result<()>
94    where
95        A: Handler<T>,
96    {
97        mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new(
98            move |actor, ctx| {
99                Box::pin(async move {
100                    Handler::handle(actor, ctx, msg).await;
101                })
102            },
103        )))?;
104        Ok(())
105    }
106
107    /// Create a `Caller<T>` for a specific message type
108    pub fn caller<T: Message>(&self) -> Caller<T>
109    where
110        A: Handler<T>,
111    {
112        let weak_tx = Arc::downgrade(&self.tx);
113
114        Caller {
115            actor_id: self.actor_id.clone(),
116            caller_fn: Mutex::new(Box::new(move |msg| {
117                let weak_tx_option = weak_tx.upgrade();
118                Box::pin(async move {
119                    match weak_tx_option {
120                        Some(tx) => {
121                            let (oneshot_tx, oneshot_rx) = oneshot::channel();
122
123                            mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(
124                                Box::new(move |actor, ctx| {
125                                    Box::pin(async move {
126                                        let res = Handler::handle(&mut *actor, ctx, msg).await;
127                                        let _ = oneshot_tx.send(res);
128                                    })
129                                }),
130                            ))?;
131                            Ok(oneshot_rx.await?)
132                        }
133                        None => Err(anyhow::anyhow!("Actor Dropped")),
134                    }
135                })
136            })),
137        }
138    }
139
140    /// Create a `Sender<T>` for a specific message type
141    pub fn sender<T: Message<Result = ()>>(&self) -> Sender<T>
142    where
143        A: Handler<T>,
144    {
145        let weak_tx = Arc::downgrade(&self.tx);
146        Sender {
147            actor_id: self.actor_id.clone(),
148            sender_fn: Box::new(move |msg| match weak_tx.upgrade() {
149                Some(tx) => {
150                    mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new(
151                        move |actor, ctx| {
152                            Box::pin(async move {
153                                Handler::handle(&mut *actor, ctx, msg).await;
154                            })
155                        },
156                    )))?;
157                    Ok(())
158                }
159                None => Ok(()),
160            }),
161        }
162    }
163
164    /// Wait for an actor to finish, and if the actor has finished, the function returns immediately.
165    pub async fn wait_for_stop(self) {
166        if let Some(rx_exit) = self.rx_exit {
167            rx_exit.await.ok();
168        } else {
169            futures::future::pending::<()>().await;
170        }
171    }
172}
173
174pub struct WeakAddr<A> {
175    pub(crate) actor_id: ActorId,
176    pub(crate) tx: Weak<mpsc::UnboundedSender<ActorEvent<A>>>,
177    pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
178}
179
180impl<A> PartialEq for WeakAddr<A> {
181    fn eq(&self, other: &Self) -> bool {
182        self.actor_id == other.actor_id
183    }
184}
185
186impl<A> Hash for WeakAddr<A> {
187    fn hash<H: Hasher>(&self, state: &mut H) {
188        self.actor_id.hash(state)
189    }
190}
191
192impl<A> WeakAddr<A> {
193    pub fn upgrade(&self) -> Option<Addr<A>> {
194        match self.tx.upgrade() {
195            Some(tx) => Some(Addr {
196                actor_id: self.actor_id,
197                tx,
198                rx_exit: self.rx_exit.clone(),
199            }),
200            None => None,
201        }
202    }
203}
204
205impl<A> Clone for WeakAddr<A> {
206    fn clone(&self) -> Self {
207        Self {
208            actor_id: self.actor_id,
209            tx: self.tx.clone(),
210            rx_exit: self.rx_exit.clone(),
211        }
212    }
213}
214
215impl<A: Actor> WeakAddr<A> {
216    /// Returns the id of the actor.
217    pub fn actor_id(&self) -> ActorId {
218        self.actor_id
219    }
220}