async_actor/
actor_ref.rs

1use crate::Actor;
2
3use async_channel::{Sender as MultiSender, WeakSender as WeakMultiSender};
4use async_oneshot_channel::{Sender as OneshotSender, WeakSender as WeakOneshotSender};
5
6#[derive(Debug)]
7/// A handle to an actor, that allows messages to be sent to the actor.
8///
9/// As long as one ActorRef exists, the actor will continue to run.
10pub struct ActorRef<A: Actor> {
11    pub(crate) sender: MultiSender<A::Message>,
12    pub(crate) stop: OneshotSender<A::Message>,
13}
14
15impl<A: Actor> ActorRef<A> {
16    /// Sends a message to the actor. If the mailbox is full, the message will be returned in [`Err`].
17    pub async fn send(&self, msg: A::Message) -> Result<(), A::Message> {
18        self.sender.send(msg).await.map_err(|e| e.0)
19    }
20
21    /// Stops the actor by sending a stop message to it. If a stop message has already been sent,
22    /// the stop message will be returned in [`Err`].
23    pub fn stop(&self, stop: A::Message) -> Result<(), A::Message> {
24        self.stop.send(stop)
25    }
26
27    /// Creates a [`WeakActorRef`] from this [`ActorRef`], which can be used as a handle to the actor that
28    /// doesn't keep the actor alive, if it is the last handle to the actor.
29    pub fn downgrade(&self) -> WeakActorRef<A> {
30        WeakActorRef {
31            sender: self.sender.downgrade(),
32            stop: self.stop.downgrade(),
33        }
34    }
35}
36
37impl<A: Actor> Clone for ActorRef<A> {
38    fn clone(&self) -> Self {
39        Self {
40            sender: self.sender.clone(),
41            stop: self.stop.clone(),
42        }
43    }
44}
45
46#[derive(Debug)]
47/// A reference to an actor that allows messages to be sent to the actor.
48///
49/// If the actor has been dropped, this [`WeakActorRef`] will not be able to send messages to the actor,
50/// and will not be able to be upgraded.
51pub struct WeakActorRef<A: Actor> {
52    sender: WeakMultiSender<A::Message>,
53    stop: WeakOneshotSender<A::Message>,
54}
55
56impl<A: Actor> WeakActorRef<A> {
57    /// Attempts to upgrade this [`WeakActorRef`] to an [`ActorRef`]. If the actor has been dropped,
58    /// this will return [`None`].
59    pub fn upgrade(&self) -> Option<ActorRef<A>> {
60        Some(ActorRef {
61            sender: self.sender.upgrade()?,
62            stop: self.stop.upgrade()?,
63        })
64    }
65
66    /// Sends a message to the actor. If the actor has been dropped, or the mailbox is full,
67    /// the message will be returned in [`Err`].
68    pub async fn send(&self, msg: A::Message) -> Result<(), A::Message> {
69        match self.upgrade() {
70            Some(actor_ref) => actor_ref.send(msg).await,
71            None => Err(msg),
72        }
73    }
74
75    /// Stops the actor by sending a stop message to it. If the actor has been dropped, or the mailbox is full,
76    /// the stop message will be returned in [`Err`].
77    pub fn stop(&self, stop: A::Message) -> Result<(), A::Message> {
78        self.stop.send(stop)
79    }
80}
81
82impl<A: Actor> Clone for WeakActorRef<A> {
83    fn clone(&self) -> Self {
84        Self {
85            sender: self.sender.clone(),
86            stop: self.stop.clone(),
87        }
88    }
89}
90
91impl<A: Actor> TryInto<ActorRef<A>> for WeakActorRef<A> {
92    type Error = ();
93
94    fn try_into(self) -> Result<ActorRef<A>, Self::Error> {
95        self.upgrade().ok_or(())
96    }
97}