Skip to main content

atomr_core/actor/
actor_ref.rs

1//! `ActorRef` — typed handle to an actor.
2//!
3//! Refs are polymorphic: a [`ActorRef<M>`] is either backed by a local
4//! mailbox (cheap, in-process, the common case) or by a remote handle that
5//! serializes `M` and ships it to another `ActorSystem`.
6
7use std::fmt;
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11use thiserror::Error;
12use tokio::sync::{mpsc, oneshot};
13
14use super::actor_cell::SystemMsg;
15use super::actor_system::ActorSystemInner;
16use super::path::ActorPath;
17use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
18use super::sender::Sender;
19use super::traits::MessageEnvelope;
20
21/// Type-erased serializer used by the Remote variant of `ActorRef<M>`.
22type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
23
24enum RefImpl<M: Send + 'static> {
25    Local {
26        path: ActorPath,
27        user: mpsc::UnboundedSender<MessageEnvelope<M>>,
28        system: mpsc::UnboundedSender<SystemMsg>,
29        system_ref: Weak<ActorSystemInner>,
30    },
31    Remote {
32        path: ActorPath,
33        handle: Arc<dyn RemoteRef>,
34        serialize: RemoteSerializerFn<M>,
35    },
36}
37
38/// Typed handle to an actor.
39///
40/// Cheap to clone (internally `Arc`). `tell` sends without waiting; `ask`
41/// uses a helper pattern (`ask_with`) to avoid reflection.
42pub struct ActorRef<M: Send + 'static> {
43    inner: Arc<RefImpl<M>>,
44}
45
46impl<M: Send + 'static> Clone for ActorRef<M> {
47    fn clone(&self) -> Self {
48        Self { inner: self.inner.clone() }
49    }
50}
51
52impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
55    }
56}
57
58impl<M: Send + 'static> ActorRef<M> {
59    pub(crate) fn new(
60        path: ActorPath,
61        user: mpsc::UnboundedSender<MessageEnvelope<M>>,
62        system: mpsc::UnboundedSender<SystemMsg>,
63        system_ref: Weak<ActorSystemInner>,
64    ) -> Self {
65        Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
66    }
67
68    /// Construct a typed remote ref given a (type-erased) `RemoteRef` handle
69    /// and a serializer for `M`. Used by `atomr-remote::RemoteActorRefProvider`.
70    pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
71        let path = handle.path().clone();
72        Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
73    }
74
75    pub fn path(&self) -> &ActorPath {
76        match &*self.inner {
77            RefImpl::Local { path, .. } => path,
78            RefImpl::Remote { path, .. } => path,
79        }
80    }
81
82    /// True if this ref points at an actor in a different `ActorSystem`.
83    pub fn is_remote(&self) -> bool {
84        matches!(&*self.inner, RefImpl::Remote { .. })
85    }
86
87    /// Fire-and-forget send.
88    pub fn tell(&self, msg: M) {
89        match &*self.inner {
90            RefImpl::Local { user, path, system_ref, .. } => {
91                if user.send(MessageEnvelope::new(msg)).is_err() {
92                    notify_dead_letter::<M>(path, system_ref);
93                }
94            }
95            RefImpl::Remote { handle, serialize, .. } => {
96                handle.tell_serialized(serialize(msg, None));
97            }
98        }
99    }
100
101    /// Send `msg` with a typed [`Sender`]. The sender's identity stays
102    /// type-checked end-to-end (no `Any::downcast` on the reply path).
103    pub fn tell_from(&self, msg: M, sender: Sender) {
104        match &*self.inner {
105            RefImpl::Local { user, path, system_ref, .. } => {
106                let env = MessageEnvelope::with_typed_sender(msg, sender);
107                if user.send(env).is_err() {
108                    notify_dead_letter::<M>(path, system_ref);
109                }
110            }
111            RefImpl::Remote { handle, serialize, .. } => {
112                let sender_path = sender.path().cloned();
113                handle.tell_serialized(serialize(msg, sender_path));
114            }
115        }
116    }
117
118    /// Stop the actor.
119    pub fn stop(&self) {
120        match &*self.inner {
121            RefImpl::Local { system, .. } => {
122                let _ = system.send(SystemMsg::Stop);
123            }
124            RefImpl::Remote { handle, .. } => {
125                handle.tell_system(RemoteSystemMsg::Stop);
126            }
127        }
128    }
129
130    /// Ask pattern: callers supply a closure that embeds a `oneshot::Sender<R>`
131    /// in the message. The future resolves when the actor replies, or errors
132    /// out on timeout/actor-stop.
133    ///
134    /// Note: `ask_with` only works on local refs. For remote ask, use the
135    /// dedicated `atomr-remote::ask_remote` helper which routes the reply
136    /// through a temporary local responder actor.
137    pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
138    where
139        R: Send + 'static,
140        F: FnOnce(oneshot::Sender<R>) -> M,
141    {
142        let (tx, rx) = oneshot::channel();
143        let msg = build(tx);
144        self.tell(msg);
145        tokio::time::timeout(timeout, rx)
146            .await
147            .map_err(|_| AskError::Timeout)?
148            .map_err(|_| AskError::TargetDropped)
149    }
150
151    /// Downgrade into an untyped ref for use with DeadLetters / EventStream.
152    pub fn as_untyped(&self) -> UntypedActorRef {
153        match &*self.inner {
154            RefImpl::Local { path, system, .. } => UntypedActorRef {
155                inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
156            },
157            RefImpl::Remote { path, handle, .. } => UntypedActorRef {
158                inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
159            },
160        }
161    }
162
163    /// System-message channel exposed internally for DeathWatch (local only).
164    pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
165        match &*self.inner {
166            RefImpl::Local { system, .. } => system.clone(),
167            RefImpl::Remote { .. } => {
168                let (tx, _rx) = mpsc::unbounded_channel();
169                tx
170            }
171        }
172    }
173}
174
175fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
176    if let Some(system) = system_ref.upgrade() {
177        if let Some(obs) = system.dead_letter_observer.read().as_ref() {
178            obs.on_dead_letter(path, None, std::any::type_name::<M>());
179        }
180    }
181}
182
183enum UntypedImpl {
184    Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
185    Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
186}
187
188/// Untyped ref used where the message type is not statically known
189/// (e.g. death-watch notifications across actor types, event stream).
190#[derive(Clone)]
191pub struct UntypedActorRef {
192    inner: Arc<UntypedImpl>,
193}
194
195impl fmt::Debug for UntypedActorRef {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
198    }
199}
200
201impl UntypedActorRef {
202    pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
203        let path = handle.path().clone();
204        Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
205    }
206
207    pub fn path(&self) -> &ActorPath {
208        match &*self.inner {
209            UntypedImpl::Local { path, .. } => path,
210            UntypedImpl::Remote { path, .. } => path,
211        }
212    }
213
214    pub fn is_remote(&self) -> bool {
215        matches!(&*self.inner, UntypedImpl::Remote { .. })
216    }
217
218    pub fn stop(&self) {
219        match &*self.inner {
220            UntypedImpl::Local { system, .. } => {
221                let _ = system.send(SystemMsg::Stop);
222            }
223            UntypedImpl::Remote { handle, .. } => {
224                handle.tell_system(RemoteSystemMsg::Stop);
225            }
226        }
227    }
228
229    /// Surface termination to this ref. For local refs this delivers
230    /// `SystemMsg::Terminated(sender)` to the actor's system mailbox;
231    /// for remote refs it ships a `RemoteSystemMsg::Terminated` PDU.
232    /// Used by `actor_cell::finalize` and by `atomr-remote::RemoteWatcher`.
233    pub fn notify_watchers(&self, sender: ActorPath) {
234        match &*self.inner {
235            UntypedImpl::Local { system, .. } => {
236                let _ = system.send(SystemMsg::Terminated(sender));
237            }
238            UntypedImpl::Remote { handle, .. } => {
239                handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
240            }
241        }
242    }
243}
244
245impl PartialEq for UntypedActorRef {
246    fn eq(&self, other: &Self) -> bool {
247        self.path() == other.path()
248    }
249}
250
251impl Eq for UntypedActorRef {}
252
253impl std::hash::Hash for UntypedActorRef {
254    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
255        self.path().hash(state);
256    }
257}
258
259#[derive(Debug, Error)]
260#[non_exhaustive]
261pub enum AskError {
262    #[error("ask timed out")]
263    Timeout,
264    #[error("target actor was dropped before replying")]
265    TargetDropped,
266}