1use std::{convert, ops::ControlFlow, panic::AssertUnwindSafe, sync::Arc, thread};
2
3use futures::{
4 future::BoxFuture,
5 stream::{AbortHandle, AbortRegistration, Abortable, FuturesUnordered},
6 FutureExt, StreamExt,
7};
8use tokio::{
9 runtime::{Handle, RuntimeFlavor},
10 sync::SetOnce,
11 task::JoinHandle,
12};
13#[cfg(feature = "tracing")]
14use tracing::{error, trace};
15
16#[cfg(feature = "remote")]
17use crate::remote;
18
19use crate::{
20 actor::{kind::ActorBehaviour, Actor, ActorRef, Link, Links, CURRENT_ACTOR_ID},
21 error::{invoke_actor_error_hook, ActorStopReason, PanicError, SendError},
22 mailbox::{MailboxReceiver, MailboxSender, Signal},
23};
24
25use super::ActorId;
26
27#[allow(missing_debug_implementations)]
34#[must_use = "the prepared actor needs to be ran/spawned"]
35pub struct PreparedActor<A: Actor> {
36 actor_ref: ActorRef<A>,
37 mailbox_rx: MailboxReceiver<A>,
38 abort_registration: AbortRegistration,
39}
40
41impl<A: Actor> PreparedActor<A> {
42 pub fn new((mailbox_tx, mailbox_rx): (MailboxSender<A>, MailboxReceiver<A>)) -> Self {
49 let (abort_handle, abort_registration) = AbortHandle::new_pair();
50 let links = Links::default();
51 let startup_result = Arc::new(SetOnce::new());
52 let shutdown_result = Arc::new(SetOnce::new());
53 let actor_ref = ActorRef::new(
54 mailbox_tx,
55 abort_handle,
56 links,
57 startup_result,
58 shutdown_result,
59 );
60
61 PreparedActor {
62 actor_ref,
63 mailbox_rx,
64 abort_registration,
65 }
66 }
67
68 pub fn actor_ref(&self) -> &ActorRef<A> {
72 &self.actor_ref
73 }
74
75 pub async fn run(self, args: A::Args) -> Result<(A, ActorStopReason), PanicError> {
107 run_actor_lifecycle::<A>(
108 args,
109 self.actor_ref,
110 self.mailbox_rx,
111 self.abort_registration,
112 )
113 .await
114 }
115
116 pub fn spawn(self, args: A::Args) -> JoinHandle<Result<(A, ActorStopReason), PanicError>> {
120 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
121 {
122 tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
123 }
124
125 #[cfg(all(tokio_unstable, feature = "tracing"))]
126 {
127 tokio::task::Builder::new()
128 .name(A::name())
129 .spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
130 .unwrap()
131 }
132 }
133
134 pub fn spawn_in_thread(
138 self,
139 args: A::Args,
140 ) -> thread::JoinHandle<Result<(A, ActorStopReason), PanicError>> {
141 let handle = Handle::current();
142 if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
143 panic!("threaded actors are not supported in a single threaded tokio runtime");
144 }
145
146 std::thread::Builder::new()
147 .name(A::name().to_string())
148 .spawn({
149 let actor_ref = self.actor_ref.clone();
150 move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run(args)))
151 })
152 .unwrap()
153 }
154}
155
156#[inline]
157async fn run_actor_lifecycle<A>(
158 args: A::Args,
159 actor_ref: ActorRef<A>,
160 mailbox_rx: MailboxReceiver<A>,
161 abort_registration: AbortRegistration,
162) -> Result<(A, ActorStopReason), PanicError>
163where
164 A: Actor,
165{
166 #[allow(unused_mut)]
167 let mut id = actor_ref.id();
168 let name = A::name();
169 #[cfg(feature = "tracing")]
170 trace!(%id, %name, "actor started");
171
172 let start_res = AssertUnwindSafe(A::on_start(args, actor_ref.clone()))
173 .catch_unwind()
174 .await
175 .map(|res| res.map_err(|err| PanicError::new(Box::new(err))))
176 .map_err(PanicError::new_from_panic_any)
177 .and_then(convert::identity);
178 let startup_finished = matches!(
179 actor_ref.weak_signal_mailbox().signal_startup_finished(),
180 Err(SendError::MailboxFull(()))
181 );
182
183 let actor_ref = actor_ref.into_downgrade();
184
185 match start_res {
186 Ok(actor) => {
187 let mut state = ActorBehaviour::new_from_actor(actor, actor_ref.clone());
188
189 let reason = Abortable::new(
190 abortable_actor_loop(
191 &mut state,
192 mailbox_rx,
193 &actor_ref.startup_result,
194 startup_finished,
195 ),
196 abort_registration,
197 )
198 .await
199 .unwrap_or(ActorStopReason::Killed);
200
201 let mut actor = state.shutdown().await;
202
203 let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
204
205 log_actor_stop_reason(id, name, &reason);
206 let on_stop_res = actor.on_stop(actor_ref.clone(), reason.clone()).await;
207
208 while let Some(()) = notify_futs.next().await {}
209
210 #[cfg(not(feature = "remote"))]
211 crate::registry::ACTOR_REGISTRY.lock().unwrap().remove(name);
212 #[cfg(feature = "remote")]
213 remote::REMOTE_REGISTRY.lock().await.remove(&id);
214
215 match on_stop_res {
216 Ok(()) => {
217 actor_ref
218 .shutdown_result
219 .set(Ok(()))
220 .expect("nothing else should set the shutdown result");
221 }
222 Err(err) => {
223 let err = PanicError::new(Box::new(err));
224 invoke_actor_error_hook(&err);
225
226 actor_ref
227 .shutdown_result
228 .set(Err(err))
229 .expect("nothing else should set the shutdown result");
230 }
231 }
232
233 Ok((actor, reason))
234 }
235 Err(err) => {
236 actor_ref
237 .startup_result
238 .set(Err(err.clone()))
239 .expect("nothing should set the startup result");
240
241 let reason = ActorStopReason::Panicked(err);
242 log_actor_stop_reason(id, name, &reason);
243
244 let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
245 while let Some(()) = notify_futs.next().await {}
246
247 let ActorStopReason::Panicked(err) = reason else {
248 unreachable!()
249 };
250
251 actor_ref
252 .shutdown_result
253 .set(Err(err.clone()))
254 .expect("nothing should set the startup result");
255
256 Err(err)
257 }
258 }
259}
260
261async fn abortable_actor_loop<A>(
262 state: &mut ActorBehaviour<A>,
263 mut mailbox_rx: MailboxReceiver<A>,
264 startup_result: &SetOnce<Result<(), PanicError>>,
265 startup_finished: bool,
266) -> ActorStopReason
267where
268 A: Actor,
269{
270 if startup_finished {
271 if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
272 return reason;
273 }
274 }
275 loop {
276 let reason = recv_mailbox_loop(state, &mut mailbox_rx, startup_result).await;
277 if let ControlFlow::Break(reason) = state.on_shutdown(reason).await {
278 return reason;
279 }
280 }
281}
282
283async fn recv_mailbox_loop<A>(
284 state: &mut ActorBehaviour<A>,
285 mailbox_rx: &mut MailboxReceiver<A>,
286 startup_result: &SetOnce<Result<(), PanicError>>,
287) -> ActorStopReason
288where
289 A: Actor,
290{
291 loop {
292 match state.next(mailbox_rx).await {
293 Some(Signal::StartupFinished) => {
294 if startup_result.set(Ok(())).is_err() {
295 #[cfg(feature = "tracing")]
296 error!("received startup finished signal after already being started up");
297 }
298 if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
299 return reason;
300 }
301 }
302 Some(Signal::Message {
303 message,
304 actor_ref,
305 reply,
306 sent_within_actor,
307 }) => {
308 if let ControlFlow::Break(reason) = state
309 .handle_message(message, actor_ref, reply, sent_within_actor)
310 .await
311 {
312 return reason;
313 }
314 }
315 Some(Signal::LinkDied { id, reason }) => {
316 if let ControlFlow::Break(reason) = state.handle_link_died(id, reason).await {
317 return reason;
318 }
319 }
320 Some(Signal::Stop) | None => {
321 if let ControlFlow::Break(reason) = state.handle_stop().await {
322 return reason;
323 }
324 }
325 }
326 }
327}
328
329async fn notify_links(
330 id: ActorId,
331 links: &Links,
332 reason: &ActorStopReason,
333) -> FuturesUnordered<BoxFuture<'static, ()>> {
334 let futs = FuturesUnordered::new();
335 {
336 let mut links = links.lock().await;
337 #[allow(unused_variables)]
338 for (link_actor_id, link) in links.drain() {
339 match link {
340 Link::Local(mailbox) => {
341 let reason = reason.clone();
342 futs.push(
343 async move {
344 if let Err(err) = mailbox.signal_link_died(id, reason).await {
345 #[cfg(feature = "tracing")]
346 error!("failed to notify actor a link died: {err}");
347 }
348 }
349 .boxed(),
350 );
351 }
352 #[cfg(feature = "remote")]
353 Link::Remote(notified_actor_remote_id) => {
354 if let Some(swarm) = remote::ActorSwarm::get() {
355 let reason = reason.clone();
356 futs.push(
357 async move {
358 let res = swarm
359 .signal_link_died(
360 id,
361 link_actor_id,
362 notified_actor_remote_id,
363 reason,
364 )
365 .await;
366 if let Err(err) = res {
367 #[cfg(feature = "tracing")]
368 error!("failed to notify actor a link died: {err}");
369 }
370 }
371 .boxed(),
372 );
373 }
374 }
375 }
376 }
377 }
378
379 futs
380}
381
382#[inline]
383#[cfg(feature = "tracing")]
384fn log_actor_stop_reason(id: ActorId, name: &str, reason: &ActorStopReason) {
385 match reason {
386 reason @ ActorStopReason::Normal
387 | reason @ ActorStopReason::Killed
388 | reason @ ActorStopReason::LinkDied { .. } => {
389 trace!(%id, %name, ?reason, "actor stopped");
390 }
391 reason @ ActorStopReason::Panicked(_) => {
392 error!(%id, %name, ?reason, "actor stopped")
393 }
394 #[cfg(feature = "remote")]
395 reason @ ActorStopReason::PeerDisconnected => {
396 trace!(%id, %name, ?reason, "actor stopped");
397 }
398 }
399}
400
401#[cfg(not(feature = "tracing"))]
402fn log_actor_stop_reason(_id: ActorId, _name: &str, _reason: &ActorStopReason) {}