1use std::fmt;
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11use thiserror::Error;
12use tokio::sync::{mpsc, oneshot};
13
14use super::actor_cell::SystemMsg;
15use super::actor_system::ActorSystemInner;
16use super::path::ActorPath;
17use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
18use super::sender::Sender;
19use super::traits::MessageEnvelope;
20
21type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
23
24enum RefImpl<M: Send + 'static> {
25 Local {
26 path: ActorPath,
27 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
28 system: mpsc::UnboundedSender<SystemMsg>,
29 system_ref: Weak<ActorSystemInner>,
30 },
31 Remote {
32 path: ActorPath,
33 handle: Arc<dyn RemoteRef>,
34 serialize: RemoteSerializerFn<M>,
35 },
36}
37
38pub struct ActorRef<M: Send + 'static> {
43 inner: Arc<RefImpl<M>>,
44}
45
46impl<M: Send + 'static> Clone for ActorRef<M> {
47 fn clone(&self) -> Self {
48 Self { inner: self.inner.clone() }
49 }
50}
51
52impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
55 }
56}
57
58impl<M: Send + 'static> ActorRef<M> {
59 pub(crate) fn new(
60 path: ActorPath,
61 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
62 system: mpsc::UnboundedSender<SystemMsg>,
63 system_ref: Weak<ActorSystemInner>,
64 ) -> Self {
65 Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
66 }
67
68 pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
71 let path = handle.path().clone();
72 Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
73 }
74
75 pub fn path(&self) -> &ActorPath {
76 match &*self.inner {
77 RefImpl::Local { path, .. } => path,
78 RefImpl::Remote { path, .. } => path,
79 }
80 }
81
82 pub fn is_remote(&self) -> bool {
84 matches!(&*self.inner, RefImpl::Remote { .. })
85 }
86
87 pub fn tell(&self, msg: M) {
89 match &*self.inner {
90 RefImpl::Local { user, path, system_ref, .. } => {
91 if user.send(MessageEnvelope::new(msg)).is_err() {
92 notify_dead_letter::<M>(path, system_ref);
93 }
94 }
95 RefImpl::Remote { handle, serialize, .. } => {
96 handle.tell_serialized(serialize(msg, None));
97 }
98 }
99 }
100
101 pub fn tell_from(&self, msg: M, sender: Sender) {
104 match &*self.inner {
105 RefImpl::Local { user, path, system_ref, .. } => {
106 let env = MessageEnvelope::with_typed_sender(msg, sender);
107 if user.send(env).is_err() {
108 notify_dead_letter::<M>(path, system_ref);
109 }
110 }
111 RefImpl::Remote { handle, serialize, .. } => {
112 let sender_path = sender.path().cloned();
113 handle.tell_serialized(serialize(msg, sender_path));
114 }
115 }
116 }
117
118 pub fn stop(&self) {
120 match &*self.inner {
121 RefImpl::Local { system, .. } => {
122 let _ = system.send(SystemMsg::Stop);
123 }
124 RefImpl::Remote { handle, .. } => {
125 handle.tell_system(RemoteSystemMsg::Stop);
126 }
127 }
128 }
129
130 pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
138 where
139 R: Send + 'static,
140 F: FnOnce(oneshot::Sender<R>) -> M,
141 {
142 let (tx, rx) = oneshot::channel();
143 let msg = build(tx);
144 self.tell(msg);
145 tokio::time::timeout(timeout, rx)
146 .await
147 .map_err(|_| AskError::Timeout)?
148 .map_err(|_| AskError::TargetDropped)
149 }
150
151 pub fn as_untyped(&self) -> UntypedActorRef {
153 match &*self.inner {
154 RefImpl::Local { path, system, .. } => UntypedActorRef {
155 inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
156 },
157 RefImpl::Remote { path, handle, .. } => UntypedActorRef {
158 inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
159 },
160 }
161 }
162
163 pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
165 match &*self.inner {
166 RefImpl::Local { system, .. } => system.clone(),
167 RefImpl::Remote { .. } => {
168 let (tx, _rx) = mpsc::unbounded_channel();
169 tx
170 }
171 }
172 }
173}
174
175fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
176 if let Some(system) = system_ref.upgrade() {
177 if let Some(obs) = system.dead_letter_observer.read().as_ref() {
178 obs.on_dead_letter(path, None, std::any::type_name::<M>());
179 }
180 }
181}
182
183enum UntypedImpl {
184 Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
185 Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
186}
187
188#[derive(Clone)]
191pub struct UntypedActorRef {
192 inner: Arc<UntypedImpl>,
193}
194
195impl fmt::Debug for UntypedActorRef {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
198 }
199}
200
201impl UntypedActorRef {
202 pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
203 let path = handle.path().clone();
204 Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
205 }
206
207 pub fn path(&self) -> &ActorPath {
208 match &*self.inner {
209 UntypedImpl::Local { path, .. } => path,
210 UntypedImpl::Remote { path, .. } => path,
211 }
212 }
213
214 pub fn is_remote(&self) -> bool {
215 matches!(&*self.inner, UntypedImpl::Remote { .. })
216 }
217
218 pub fn stop(&self) {
219 match &*self.inner {
220 UntypedImpl::Local { system, .. } => {
221 let _ = system.send(SystemMsg::Stop);
222 }
223 UntypedImpl::Remote { handle, .. } => {
224 handle.tell_system(RemoteSystemMsg::Stop);
225 }
226 }
227 }
228
229 pub fn notify_watchers(&self, sender: ActorPath) {
234 match &*self.inner {
235 UntypedImpl::Local { system, .. } => {
236 let _ = system.send(SystemMsg::Terminated(sender));
237 }
238 UntypedImpl::Remote { handle, .. } => {
239 handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
240 }
241 }
242 }
243}
244
245impl PartialEq for UntypedActorRef {
246 fn eq(&self, other: &Self) -> bool {
247 self.path() == other.path()
248 }
249}
250
251impl Eq for UntypedActorRef {}
252
253impl std::hash::Hash for UntypedActorRef {
254 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
255 self.path().hash(state);
256 }
257}
258
259#[derive(Debug, Error)]
260#[non_exhaustive]
261pub enum AskError {
262 #[error("ask timed out")]
263 Timeout,
264 #[error("target actor was dropped before replying")]
265 TargetDropped,
266}