1use std::fmt;
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11use thiserror::Error;
12use tokio::sync::{mpsc, oneshot};
13
14use super::actor_cell::SystemMsg;
15use super::actor_system::ActorSystemInner;
16use super::path::ActorPath;
17use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
18use super::sender::Sender;
19use super::traits::MessageEnvelope;
20
21type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
23
24enum RefImpl<M: Send + 'static> {
25 Local {
26 path: ActorPath,
27 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
28 system: mpsc::UnboundedSender<SystemMsg>,
29 system_ref: Weak<ActorSystemInner>,
30 },
31 Remote {
32 path: ActorPath,
33 handle: Arc<dyn RemoteRef>,
34 serialize: RemoteSerializerFn<M>,
35 },
36}
37
38pub struct ActorRef<M: Send + 'static> {
43 inner: Arc<RefImpl<M>>,
44}
45
46impl<M: Send + 'static> Clone for ActorRef<M> {
47 fn clone(&self) -> Self {
48 Self { inner: self.inner.clone() }
49 }
50}
51
52impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
55 }
56}
57
58impl<M: Send + 'static> ActorRef<M> {
59 pub(crate) fn new(
60 path: ActorPath,
61 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
62 system: mpsc::UnboundedSender<SystemMsg>,
63 system_ref: Weak<ActorSystemInner>,
64 ) -> Self {
65 Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
66 }
67
68 pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
71 let path = handle.path().clone();
72 Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
73 }
74
75 pub fn path(&self) -> &ActorPath {
76 match &*self.inner {
77 RefImpl::Local { path, .. } => path,
78 RefImpl::Remote { path, .. } => path,
79 }
80 }
81
82 pub fn is_remote(&self) -> bool {
84 matches!(&*self.inner, RefImpl::Remote { .. })
85 }
86
87 pub fn tell(&self, msg: M) {
89 match &*self.inner {
90 RefImpl::Local { user, path, system_ref, .. } => {
91 if user.send(MessageEnvelope::new(msg)).is_err() {
92 notify_dead_letter::<M>(path, system_ref);
93 }
94 }
95 RefImpl::Remote { handle, serialize, .. } => {
96 handle.tell_serialized(serialize(msg, None));
97 }
98 }
99 }
100
101 pub fn tell_from(&self, msg: M, sender: Sender) {
104 match &*self.inner {
105 RefImpl::Local { user, path, system_ref, .. } => {
106 let env = MessageEnvelope::with_typed_sender(msg, sender);
107 if user.send(env).is_err() {
108 notify_dead_letter::<M>(path, system_ref);
109 }
110 }
111 RefImpl::Remote { handle, serialize, .. } => {
112 let sender_path = sender.path().cloned();
113 handle.tell_serialized(serialize(msg, sender_path));
114 }
115 }
116 }
117
118 pub fn tell_with_meta(&self, msg: M, metadata: super::metadata::Metadata) {
122 match &*self.inner {
123 RefImpl::Local { user, path, system_ref, .. } => {
124 let env = MessageEnvelope::with_meta(msg, Sender::None, metadata);
125 if user.send(env).is_err() {
126 notify_dead_letter::<M>(path, system_ref);
127 }
128 }
129 RefImpl::Remote { handle, serialize, .. } => {
130 handle.tell_serialized(serialize(msg, None));
133 }
134 }
135 }
136
137 pub fn tell_directive(&self, directive: crate::supervision::Directive) {
143 if let RefImpl::Local { system, .. } = &*self.inner {
144 let _ = system.send(SystemMsg::Directive(directive));
145 }
146 }
147
148 pub fn stop(&self) {
150 match &*self.inner {
151 RefImpl::Local { system, .. } => {
152 let _ = system.send(SystemMsg::Stop);
153 }
154 RefImpl::Remote { handle, .. } => {
155 handle.tell_system(RemoteSystemMsg::Stop);
156 }
157 }
158 }
159
160 pub fn is_terminated(&self) -> bool {
166 match &*self.inner {
167 RefImpl::Local { user, .. } => user.is_closed(),
168 RefImpl::Remote { .. } => false,
169 }
170 }
171
172 pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
180 where
181 R: Send + 'static,
182 F: FnOnce(oneshot::Sender<R>) -> M,
183 {
184 let (tx, rx) = oneshot::channel();
185 let msg = build(tx);
186 self.tell(msg);
187 tokio::time::timeout(timeout, rx)
188 .await
189 .map_err(|_| AskError::Timeout)?
190 .map_err(|_| AskError::TargetDropped)
191 }
192
193 pub fn as_untyped(&self) -> UntypedActorRef {
195 match &*self.inner {
196 RefImpl::Local { path, system, .. } => UntypedActorRef {
197 inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
198 },
199 RefImpl::Remote { path, handle, .. } => UntypedActorRef {
200 inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
201 },
202 }
203 }
204
205 pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
207 match &*self.inner {
208 RefImpl::Local { system, .. } => system.clone(),
209 RefImpl::Remote { .. } => {
210 let (tx, _rx) = mpsc::unbounded_channel();
211 tx
212 }
213 }
214 }
215}
216
217fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
218 if let Some(system) = system_ref.upgrade() {
219 if let Some(obs) = system.dead_letter_observer.read().as_ref() {
220 obs.on_dead_letter(path, None, std::any::type_name::<M>());
221 }
222 }
223}
224
225enum UntypedImpl {
226 Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
227 Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
228}
229
230#[derive(Clone)]
233pub struct UntypedActorRef {
234 inner: Arc<UntypedImpl>,
235}
236
237impl fmt::Debug for UntypedActorRef {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
240 }
241}
242
243impl UntypedActorRef {
244 pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
245 let path = handle.path().clone();
246 Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
247 }
248
249 pub fn path(&self) -> &ActorPath {
250 match &*self.inner {
251 UntypedImpl::Local { path, .. } => path,
252 UntypedImpl::Remote { path, .. } => path,
253 }
254 }
255
256 pub fn is_remote(&self) -> bool {
257 matches!(&*self.inner, UntypedImpl::Remote { .. })
258 }
259
260 pub fn stop(&self) {
261 match &*self.inner {
262 UntypedImpl::Local { system, .. } => {
263 let _ = system.send(SystemMsg::Stop);
264 }
265 UntypedImpl::Remote { handle, .. } => {
266 handle.tell_system(RemoteSystemMsg::Stop);
267 }
268 }
269 }
270
271 pub fn notify_watchers(&self, sender: ActorPath) {
276 match &*self.inner {
277 UntypedImpl::Local { system, .. } => {
278 let _ = system.send(SystemMsg::Terminated(sender));
279 }
280 UntypedImpl::Remote { handle, .. } => {
281 handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
282 }
283 }
284 }
285}
286
287impl PartialEq for UntypedActorRef {
288 fn eq(&self, other: &Self) -> bool {
289 self.path() == other.path()
290 }
291}
292
293impl Eq for UntypedActorRef {}
294
295impl std::hash::Hash for UntypedActorRef {
296 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
297 self.path().hash(state);
298 }
299}
300
301#[derive(Debug, Error)]
302#[non_exhaustive]
303pub enum AskError {
304 #[error("ask timed out")]
305 Timeout,
306 #[error("target actor was dropped before replying")]
307 TargetDropped,
308}