Skip to main content

atomr_core/actor/
actor_ref.rs

1//! `ActorRef` — typed handle to an actor.
2//!
3//! Refs are polymorphic: a [`ActorRef<M>`] is either backed by a local
4//! mailbox (cheap, in-process, the common case) or by a remote handle that
5//! serializes `M` and ships it to another `ActorSystem`.
6
7use std::fmt;
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11use thiserror::Error;
12use tokio::sync::{mpsc, oneshot};
13
14use super::actor_cell::SystemMsg;
15use super::actor_system::ActorSystemInner;
16use super::path::ActorPath;
17use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
18use super::sender::Sender;
19use super::traits::MessageEnvelope;
20
21/// Type-erased serializer used by the Remote variant of `ActorRef<M>`.
22type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
23
24enum RefImpl<M: Send + 'static> {
25    Local {
26        path: ActorPath,
27        user: mpsc::UnboundedSender<MessageEnvelope<M>>,
28        system: mpsc::UnboundedSender<SystemMsg>,
29        system_ref: Weak<ActorSystemInner>,
30    },
31    Remote {
32        path: ActorPath,
33        handle: Arc<dyn RemoteRef>,
34        serialize: RemoteSerializerFn<M>,
35    },
36}
37
38/// Typed handle to an actor.
39///
40/// Cheap to clone (internally `Arc`). `tell` sends without waiting; `ask`
41/// uses a helper pattern (`ask_with`) to avoid reflection.
42pub struct ActorRef<M: Send + 'static> {
43    inner: Arc<RefImpl<M>>,
44}
45
46impl<M: Send + 'static> Clone for ActorRef<M> {
47    fn clone(&self) -> Self {
48        Self { inner: self.inner.clone() }
49    }
50}
51
52impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
55    }
56}
57
58impl<M: Send + 'static> ActorRef<M> {
59    pub(crate) fn new(
60        path: ActorPath,
61        user: mpsc::UnboundedSender<MessageEnvelope<M>>,
62        system: mpsc::UnboundedSender<SystemMsg>,
63        system_ref: Weak<ActorSystemInner>,
64    ) -> Self {
65        Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
66    }
67
68    /// Construct a typed remote ref given a (type-erased) `RemoteRef` handle
69    /// and a serializer for `M`. Used by `atomr-remote::RemoteActorRefProvider`.
70    pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
71        let path = handle.path().clone();
72        Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
73    }
74
75    pub fn path(&self) -> &ActorPath {
76        match &*self.inner {
77            RefImpl::Local { path, .. } => path,
78            RefImpl::Remote { path, .. } => path,
79        }
80    }
81
82    /// True if this ref points at an actor in a different `ActorSystem`.
83    pub fn is_remote(&self) -> bool {
84        matches!(&*self.inner, RefImpl::Remote { .. })
85    }
86
87    /// Fire-and-forget send.
88    pub fn tell(&self, msg: M) {
89        match &*self.inner {
90            RefImpl::Local { user, path, system_ref, .. } => {
91                if user.send(MessageEnvelope::new(msg)).is_err() {
92                    notify_dead_letter::<M>(path, system_ref);
93                }
94            }
95            RefImpl::Remote { handle, serialize, .. } => {
96                handle.tell_serialized(serialize(msg, None));
97            }
98        }
99    }
100
101    /// Send `msg` with a typed [`Sender`]. The sender's identity stays
102    /// type-checked end-to-end (no `Any::downcast` on the reply path).
103    pub fn tell_from(&self, msg: M, sender: Sender) {
104        match &*self.inner {
105            RefImpl::Local { user, path, system_ref, .. } => {
106                let env = MessageEnvelope::with_typed_sender(msg, sender);
107                if user.send(env).is_err() {
108                    notify_dead_letter::<M>(path, system_ref);
109                }
110            }
111            RefImpl::Remote { handle, serialize, .. } => {
112                let sender_path = sender.path().cloned();
113                handle.tell_serialized(serialize(msg, sender_path));
114            }
115        }
116    }
117
118    /// Send `msg` with attached [`Metadata`](super::Metadata) (trace context +
119    /// baggage, FR-10). The metadata rides the envelope and is exposed to the
120    /// receiving actor via [`Context::metadata`](super::Context::metadata).
121    pub fn tell_with_meta(&self, msg: M, metadata: super::metadata::Metadata) {
122        match &*self.inner {
123            RefImpl::Local { user, path, system_ref, .. } => {
124                let env = MessageEnvelope::with_meta(msg, Sender::None, metadata);
125                if user.send(env).is_err() {
126                    notify_dead_letter::<M>(path, system_ref);
127                }
128            }
129            RefImpl::Remote { handle, serialize, .. } => {
130                // Remote metadata propagation is carried alongside the sender
131                // path on the serialized hop; see `atomr-remote`.
132                handle.tell_serialized(serialize(msg, None));
133            }
134        }
135    }
136
137    /// Push a graded supervisor [`Directive`](crate::supervision::Directive)
138    /// (Throttle / Suspend / ResumeFrom) into a running actor without
139    /// restarting it (FR-6). Delivered on the system channel; the actor
140    /// observes it via [`Actor::on_directive`](super::Actor::on_directive).
141    /// No-op for remote refs in this release.
142    pub fn tell_directive(&self, directive: crate::supervision::Directive) {
143        if let RefImpl::Local { system, .. } = &*self.inner {
144            let _ = system.send(SystemMsg::Directive(directive));
145        }
146    }
147
148    /// Stop the actor.
149    pub fn stop(&self) {
150        match &*self.inner {
151            RefImpl::Local { system, .. } => {
152                let _ = system.send(SystemMsg::Stop);
153            }
154            RefImpl::Remote { handle, .. } => {
155                handle.tell_system(RemoteSystemMsg::Stop);
156            }
157        }
158    }
159
160    /// Best-effort liveness check. Returns `true` if the user
161    /// mailbox's receiver half has been dropped, which means the
162    /// actor cell has finished and will no longer process messages.
163    /// For `Remote` refs we cannot inspect the far-end mailbox, so we
164    /// always return `false`.
165    pub fn is_terminated(&self) -> bool {
166        match &*self.inner {
167            RefImpl::Local { user, .. } => user.is_closed(),
168            RefImpl::Remote { .. } => false,
169        }
170    }
171
172    /// Ask pattern: callers supply a closure that embeds a `oneshot::Sender<R>`
173    /// in the message. The future resolves when the actor replies, or errors
174    /// out on timeout/actor-stop.
175    ///
176    /// Note: `ask_with` only works on local refs. For remote ask, use the
177    /// dedicated `atomr-remote::ask_remote` helper which routes the reply
178    /// through a temporary local responder actor.
179    pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
180    where
181        R: Send + 'static,
182        F: FnOnce(oneshot::Sender<R>) -> M,
183    {
184        let (tx, rx) = oneshot::channel();
185        let msg = build(tx);
186        self.tell(msg);
187        tokio::time::timeout(timeout, rx)
188            .await
189            .map_err(|_| AskError::Timeout)?
190            .map_err(|_| AskError::TargetDropped)
191    }
192
193    /// Downgrade into an untyped ref for use with DeadLetters / EventStream.
194    pub fn as_untyped(&self) -> UntypedActorRef {
195        match &*self.inner {
196            RefImpl::Local { path, system, .. } => UntypedActorRef {
197                inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
198            },
199            RefImpl::Remote { path, handle, .. } => UntypedActorRef {
200                inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
201            },
202        }
203    }
204
205    /// System-message channel exposed internally for DeathWatch (local only).
206    pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
207        match &*self.inner {
208            RefImpl::Local { system, .. } => system.clone(),
209            RefImpl::Remote { .. } => {
210                let (tx, _rx) = mpsc::unbounded_channel();
211                tx
212            }
213        }
214    }
215}
216
217fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
218    if let Some(system) = system_ref.upgrade() {
219        if let Some(obs) = system.dead_letter_observer.read().as_ref() {
220            obs.on_dead_letter(path, None, std::any::type_name::<M>());
221        }
222    }
223}
224
225enum UntypedImpl {
226    Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
227    Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
228}
229
230/// Untyped ref used where the message type is not statically known
231/// (e.g. death-watch notifications across actor types, event stream).
232#[derive(Clone)]
233pub struct UntypedActorRef {
234    inner: Arc<UntypedImpl>,
235}
236
237impl fmt::Debug for UntypedActorRef {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
240    }
241}
242
243impl UntypedActorRef {
244    pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
245        let path = handle.path().clone();
246        Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
247    }
248
249    pub fn path(&self) -> &ActorPath {
250        match &*self.inner {
251            UntypedImpl::Local { path, .. } => path,
252            UntypedImpl::Remote { path, .. } => path,
253        }
254    }
255
256    pub fn is_remote(&self) -> bool {
257        matches!(&*self.inner, UntypedImpl::Remote { .. })
258    }
259
260    pub fn stop(&self) {
261        match &*self.inner {
262            UntypedImpl::Local { system, .. } => {
263                let _ = system.send(SystemMsg::Stop);
264            }
265            UntypedImpl::Remote { handle, .. } => {
266                handle.tell_system(RemoteSystemMsg::Stop);
267            }
268        }
269    }
270
271    /// Surface termination to this ref. For local refs this delivers
272    /// `SystemMsg::Terminated(sender)` to the actor's system mailbox;
273    /// for remote refs it ships a `RemoteSystemMsg::Terminated` PDU.
274    /// Used by `actor_cell::finalize` and by `atomr-remote::RemoteWatcher`.
275    pub fn notify_watchers(&self, sender: ActorPath) {
276        match &*self.inner {
277            UntypedImpl::Local { system, .. } => {
278                let _ = system.send(SystemMsg::Terminated(sender));
279            }
280            UntypedImpl::Remote { handle, .. } => {
281                handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
282            }
283        }
284    }
285}
286
287impl PartialEq for UntypedActorRef {
288    fn eq(&self, other: &Self) -> bool {
289        self.path() == other.path()
290    }
291}
292
293impl Eq for UntypedActorRef {}
294
295impl std::hash::Hash for UntypedActorRef {
296    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
297        self.path().hash(state);
298    }
299}
300
301#[derive(Debug, Error)]
302#[non_exhaustive]
303pub enum AskError {
304    #[error("ask timed out")]
305    Timeout,
306    #[error("target actor was dropped before replying")]
307    TargetDropped,
308}