1use std::fmt;
8use std::sync::{Arc, Weak};
9use std::time::Duration;
10
11use thiserror::Error;
12use tokio::sync::{mpsc, oneshot};
13
14use super::actor_cell::SystemMsg;
15use super::actor_system::ActorSystemInner;
16use super::path::ActorPath;
17use super::remote::{RemoteRef, RemoteSystemMsg, SerializedMessage};
18use super::sender::Sender;
19use super::traits::MessageEnvelope;
20
21type RemoteSerializerFn<M> = Arc<dyn Fn(M, Option<ActorPath>) -> SerializedMessage + Send + Sync>;
23
24enum RefImpl<M: Send + 'static> {
25 Local {
26 path: ActorPath,
27 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
28 system: mpsc::UnboundedSender<SystemMsg>,
29 system_ref: Weak<ActorSystemInner>,
30 },
31 Remote {
32 path: ActorPath,
33 handle: Arc<dyn RemoteRef>,
34 serialize: RemoteSerializerFn<M>,
35 },
36}
37
38pub struct ActorRef<M: Send + 'static> {
43 inner: Arc<RefImpl<M>>,
44}
45
46impl<M: Send + 'static> Clone for ActorRef<M> {
47 fn clone(&self) -> Self {
48 Self { inner: self.inner.clone() }
49 }
50}
51
52impl<M: Send + 'static> fmt::Debug for ActorRef<M> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("ActorRef").field("path", &self.path().to_string()).finish()
55 }
56}
57
58impl<M: Send + 'static> ActorRef<M> {
59 pub(crate) fn new(
60 path: ActorPath,
61 user: mpsc::UnboundedSender<MessageEnvelope<M>>,
62 system: mpsc::UnboundedSender<SystemMsg>,
63 system_ref: Weak<ActorSystemInner>,
64 ) -> Self {
65 Self { inner: Arc::new(RefImpl::Local { path, user, system, system_ref }) }
66 }
67
68 pub fn from_remote(handle: Arc<dyn RemoteRef>, serialize: RemoteSerializerFn<M>) -> Self {
71 let path = handle.path().clone();
72 Self { inner: Arc::new(RefImpl::Remote { path, handle, serialize }) }
73 }
74
75 pub fn path(&self) -> &ActorPath {
76 match &*self.inner {
77 RefImpl::Local { path, .. } => path,
78 RefImpl::Remote { path, .. } => path,
79 }
80 }
81
82 pub fn is_remote(&self) -> bool {
84 matches!(&*self.inner, RefImpl::Remote { .. })
85 }
86
87 pub fn tell(&self, msg: M) {
89 match &*self.inner {
90 RefImpl::Local { user, path, system_ref, .. } => {
91 if user.send(MessageEnvelope::new(msg)).is_err() {
92 notify_dead_letter::<M>(path, system_ref);
93 }
94 }
95 RefImpl::Remote { handle, serialize, .. } => {
96 handle.tell_serialized(serialize(msg, None));
97 }
98 }
99 }
100
101 pub fn tell_from(&self, msg: M, sender: Sender) {
104 match &*self.inner {
105 RefImpl::Local { user, path, system_ref, .. } => {
106 let env = MessageEnvelope::with_typed_sender(msg, sender);
107 if user.send(env).is_err() {
108 notify_dead_letter::<M>(path, system_ref);
109 }
110 }
111 RefImpl::Remote { handle, serialize, .. } => {
112 let sender_path = sender.path().cloned();
113 handle.tell_serialized(serialize(msg, sender_path));
114 }
115 }
116 }
117
118 pub fn stop(&self) {
120 match &*self.inner {
121 RefImpl::Local { system, .. } => {
122 let _ = system.send(SystemMsg::Stop);
123 }
124 RefImpl::Remote { handle, .. } => {
125 handle.tell_system(RemoteSystemMsg::Stop);
126 }
127 }
128 }
129
130 pub fn is_terminated(&self) -> bool {
136 match &*self.inner {
137 RefImpl::Local { user, .. } => user.is_closed(),
138 RefImpl::Remote { .. } => false,
139 }
140 }
141
142 pub async fn ask_with<R, F>(&self, build: F, timeout: Duration) -> Result<R, AskError>
150 where
151 R: Send + 'static,
152 F: FnOnce(oneshot::Sender<R>) -> M,
153 {
154 let (tx, rx) = oneshot::channel();
155 let msg = build(tx);
156 self.tell(msg);
157 tokio::time::timeout(timeout, rx)
158 .await
159 .map_err(|_| AskError::Timeout)?
160 .map_err(|_| AskError::TargetDropped)
161 }
162
163 pub fn as_untyped(&self) -> UntypedActorRef {
165 match &*self.inner {
166 RefImpl::Local { path, system, .. } => UntypedActorRef {
167 inner: Arc::new(UntypedImpl::Local { path: path.clone(), system: system.clone() }),
168 },
169 RefImpl::Remote { path, handle, .. } => UntypedActorRef {
170 inner: Arc::new(UntypedImpl::Remote { path: path.clone(), handle: handle.clone() }),
171 },
172 }
173 }
174
175 pub(crate) fn system_sender(&self) -> mpsc::UnboundedSender<SystemMsg> {
177 match &*self.inner {
178 RefImpl::Local { system, .. } => system.clone(),
179 RefImpl::Remote { .. } => {
180 let (tx, _rx) = mpsc::unbounded_channel();
181 tx
182 }
183 }
184 }
185}
186
187fn notify_dead_letter<M: 'static>(path: &ActorPath, system_ref: &Weak<ActorSystemInner>) {
188 if let Some(system) = system_ref.upgrade() {
189 if let Some(obs) = system.dead_letter_observer.read().as_ref() {
190 obs.on_dead_letter(path, None, std::any::type_name::<M>());
191 }
192 }
193}
194
195enum UntypedImpl {
196 Local { path: ActorPath, system: mpsc::UnboundedSender<SystemMsg> },
197 Remote { path: ActorPath, handle: Arc<dyn RemoteRef> },
198}
199
200#[derive(Clone)]
203pub struct UntypedActorRef {
204 inner: Arc<UntypedImpl>,
205}
206
207impl fmt::Debug for UntypedActorRef {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("UntypedActorRef").field("path", &self.path().to_string()).finish()
210 }
211}
212
213impl UntypedActorRef {
214 pub fn from_remote(handle: Arc<dyn RemoteRef>) -> Self {
215 let path = handle.path().clone();
216 Self { inner: Arc::new(UntypedImpl::Remote { path, handle }) }
217 }
218
219 pub fn path(&self) -> &ActorPath {
220 match &*self.inner {
221 UntypedImpl::Local { path, .. } => path,
222 UntypedImpl::Remote { path, .. } => path,
223 }
224 }
225
226 pub fn is_remote(&self) -> bool {
227 matches!(&*self.inner, UntypedImpl::Remote { .. })
228 }
229
230 pub fn stop(&self) {
231 match &*self.inner {
232 UntypedImpl::Local { system, .. } => {
233 let _ = system.send(SystemMsg::Stop);
234 }
235 UntypedImpl::Remote { handle, .. } => {
236 handle.tell_system(RemoteSystemMsg::Stop);
237 }
238 }
239 }
240
241 pub fn notify_watchers(&self, sender: ActorPath) {
246 match &*self.inner {
247 UntypedImpl::Local { system, .. } => {
248 let _ = system.send(SystemMsg::Terminated(sender));
249 }
250 UntypedImpl::Remote { handle, .. } => {
251 handle.tell_system(RemoteSystemMsg::Terminated { actor: sender });
252 }
253 }
254 }
255}
256
257impl PartialEq for UntypedActorRef {
258 fn eq(&self, other: &Self) -> bool {
259 self.path() == other.path()
260 }
261}
262
263impl Eq for UntypedActorRef {}
264
265impl std::hash::Hash for UntypedActorRef {
266 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
267 self.path().hash(state);
268 }
269}
270
271#[derive(Debug, Error)]
272#[non_exhaustive]
273pub enum AskError {
274 #[error("ask timed out")]
275 Timeout,
276 #[error("target actor was dropped before replying")]
277 TargetDropped,
278}