1use std::{convert, ops::ControlFlow, panic::AssertUnwindSafe, sync::Arc, thread};
2
3use futures::{
4 FutureExt, StreamExt,
5 future::BoxFuture,
6 stream::{AbortHandle, AbortRegistration, Abortable, FuturesUnordered},
7};
8use tokio::{
9 runtime::{Handle, RuntimeFlavor},
10 sync::SetOnce,
11 task::JoinHandle,
12};
13#[cfg(feature = "tracing")]
14use tracing::{error, trace};
15
16#[cfg(feature = "remote")]
17use crate::remote;
18
19use crate::{
20 actor::{Actor, ActorRef, CURRENT_ACTOR_ID, Link, Links, kind::ActorBehaviour},
21 error::{ActorStopReason, PanicError, SendError, invoke_actor_error_hook},
22 mailbox::{MailboxReceiver, MailboxSender, Signal},
23};
24
25use super::ActorId;
26
27#[allow(missing_debug_implementations)]
34#[must_use = "the prepared actor needs to be ran/spawned"]
35pub struct PreparedActor<A: Actor> {
36 actor_ref: ActorRef<A>,
37 mailbox_rx: MailboxReceiver<A>,
38 abort_registration: AbortRegistration,
39}
40
41impl<A: Actor> PreparedActor<A> {
42 pub fn new((mailbox_tx, mailbox_rx): (MailboxSender<A>, MailboxReceiver<A>)) -> Self {
49 let (abort_handle, abort_registration) = AbortHandle::new_pair();
50 let links = Links::default();
51 let startup_result = Arc::new(SetOnce::new());
52 let shutdown_result = Arc::new(SetOnce::new());
53 let actor_ref = ActorRef::new(
54 mailbox_tx,
55 abort_handle,
56 links,
57 startup_result,
58 shutdown_result,
59 );
60
61 PreparedActor {
62 actor_ref,
63 mailbox_rx,
64 abort_registration,
65 }
66 }
67
68 pub fn actor_ref(&self) -> &ActorRef<A> {
72 &self.actor_ref
73 }
74
75 pub async fn run(self, args: A::Args) -> Result<(A, ActorStopReason), PanicError> {
107 run_actor_lifecycle::<A>(
108 args,
109 self.actor_ref,
110 self.mailbox_rx,
111 self.abort_registration,
112 )
113 .await
114 }
115
116 pub fn spawn(self, args: A::Args) -> JoinHandle<Result<(A, ActorStopReason), PanicError>> {
120 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
121 {
122 tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
123 }
124
125 #[cfg(all(tokio_unstable, feature = "tracing"))]
126 {
127 tokio::task::Builder::new()
128 .name(A::name())
129 .spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
130 .unwrap()
131 }
132 }
133
134 pub fn spawn_in_thread(
138 self,
139 args: A::Args,
140 ) -> thread::JoinHandle<Result<(A, ActorStopReason), PanicError>> {
141 let handle = Handle::current();
142 if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
143 panic!("threaded actors are not supported in a single threaded tokio runtime");
144 }
145
146 std::thread::Builder::new()
147 .name(A::name().to_string())
148 .spawn({
149 let actor_ref = self.actor_ref.clone();
150 move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run(args)))
151 })
152 .unwrap()
153 }
154}
155
156#[inline]
157async fn run_actor_lifecycle<A>(
158 args: A::Args,
159 actor_ref: ActorRef<A>,
160 mailbox_rx: MailboxReceiver<A>,
161 abort_registration: AbortRegistration,
162) -> Result<(A, ActorStopReason), PanicError>
163where
164 A: Actor,
165{
166 #[allow(unused_mut)]
167 let mut id = actor_ref.id();
168 let name = A::name();
169 #[cfg(feature = "tracing")]
170 trace!(%id, %name, "actor started");
171
172 let start_res = AssertUnwindSafe(A::on_start(args, actor_ref.clone()))
173 .catch_unwind()
174 .await
175 .map(|res| res.map_err(|err| PanicError::new(Box::new(err))))
176 .map_err(PanicError::new_from_panic_any)
177 .and_then(convert::identity);
178 let startup_finished = matches!(
179 actor_ref.weak_signal_mailbox().signal_startup_finished(),
180 Err(SendError::MailboxFull(()))
181 );
182
183 let actor_ref = actor_ref.into_downgrade();
184
185 match start_res {
186 Ok(actor) => {
187 let mut state = ActorBehaviour::new_from_actor(actor, actor_ref.clone());
188
189 let reason = Abortable::new(
190 abortable_actor_loop(
191 &mut state,
192 mailbox_rx,
193 &actor_ref.startup_result,
194 startup_finished,
195 ),
196 abort_registration,
197 )
198 .await
199 .unwrap_or(ActorStopReason::Killed);
200
201 let mut actor = state.shutdown().await;
202
203 let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
204
205 log_actor_stop_reason(id, name, &reason);
206 let on_stop_res = actor.on_stop(actor_ref.clone(), reason.clone()).await;
207 while let Some(()) = notify_futs.next().await {}
208
209 unregister_actor(&id).await;
210
211 match on_stop_res {
212 Ok(()) => {
213 actor_ref
214 .shutdown_result
215 .set(Ok(()))
216 .expect("nothing else should set the shutdown result");
217 }
218 Err(err) => {
219 let err = PanicError::new(Box::new(err));
220 invoke_actor_error_hook(&err);
221
222 actor_ref
223 .shutdown_result
224 .set(Err(err))
225 .expect("nothing else should set the shutdown result");
226 }
227 }
228
229 Ok((actor, reason))
230 }
231 Err(err) => {
232 actor_ref
233 .startup_result
234 .set(Err(err.clone()))
235 .expect("nothing should set the startup result");
236
237 let reason = ActorStopReason::Panicked(err);
238 log_actor_stop_reason(id, name, &reason);
239
240 let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
241 while let Some(()) = notify_futs.next().await {}
242
243 unregister_actor(&id).await;
244
245 let ActorStopReason::Panicked(err) = reason else {
246 unreachable!()
247 };
248
249 actor_ref
250 .shutdown_result
251 .set(Err(err.clone()))
252 .expect("nothing should set the startup result");
253
254 Err(err)
255 }
256 }
257}
258
259async fn abortable_actor_loop<A>(
260 state: &mut ActorBehaviour<A>,
261 mut mailbox_rx: MailboxReceiver<A>,
262 startup_result: &SetOnce<Result<(), PanicError>>,
263 startup_finished: bool,
264) -> ActorStopReason
265where
266 A: Actor,
267{
268 if startup_finished && let ControlFlow::Break(reason) = state.handle_startup_finished().await {
269 return reason;
270 }
271 loop {
272 let reason = recv_mailbox_loop(state, &mut mailbox_rx, startup_result).await;
273 if let ControlFlow::Break(reason) = state.on_shutdown(reason).await {
274 return reason;
275 }
276 }
277}
278
279async fn recv_mailbox_loop<A>(
280 state: &mut ActorBehaviour<A>,
281 mailbox_rx: &mut MailboxReceiver<A>,
282 startup_result: &SetOnce<Result<(), PanicError>>,
283) -> ActorStopReason
284where
285 A: Actor,
286{
287 loop {
288 match state.next(mailbox_rx).await {
289 Some(Signal::StartupFinished) => {
290 if startup_result.set(Ok(())).is_err() {
291 #[cfg(feature = "tracing")]
292 error!("received startup finished signal after already being started up");
293 }
294 if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
295 return reason;
296 }
297 }
298 Some(Signal::Message {
299 message,
300 actor_ref,
301 reply,
302 sent_within_actor,
303 }) => {
304 if let ControlFlow::Break(reason) = state
305 .handle_message(message, actor_ref, reply, sent_within_actor)
306 .await
307 {
308 return reason;
309 }
310 }
311 Some(Signal::LinkDied { id, reason }) => {
312 if let ControlFlow::Break(reason) = state.handle_link_died(id, reason).await {
313 return reason;
314 }
315 }
316 Some(Signal::Stop) | None => {
317 if let ControlFlow::Break(reason) = state.handle_stop().await {
318 return reason;
319 }
320 }
321 }
322 }
323}
324
325async fn notify_links(
326 id: ActorId,
327 links: &Links,
328 reason: &ActorStopReason,
329) -> FuturesUnordered<BoxFuture<'static, ()>> {
330 let futs = FuturesUnordered::new();
331 {
332 let mut links = links.lock().await;
333 #[allow(unused_variables)]
334 for (link_actor_id, link) in links.drain() {
335 match link {
336 Link::Local(mailbox) => {
337 let reason = reason.clone();
338 futs.push(
339 async move {
340 if let Err(err) = mailbox.signal_link_died(id, reason).await {
341 #[cfg(feature = "tracing")]
342 error!("failed to notify actor a link died: {err}");
343 }
344 }
345 .boxed(),
346 );
347 }
348 #[cfg(feature = "remote")]
349 Link::Remote(notified_actor_remote_id) => {
350 if let Some(swarm) = remote::ActorSwarm::get() {
351 let reason = reason.clone();
352 futs.push(
353 async move {
354 let res = swarm
355 .signal_link_died(
356 id,
357 link_actor_id,
358 notified_actor_remote_id,
359 reason,
360 )
361 .await;
362 if let Err(err) = res {
363 #[cfg(feature = "tracing")]
364 error!("failed to notify actor a link died: {err}");
365 }
366 }
367 .boxed(),
368 );
369 }
370 }
371 }
372 }
373 }
374
375 futs
376}
377
378#[allow(unused_variables)]
379async fn unregister_actor(id: &ActorId) {
380 #[cfg(not(feature = "remote"))]
381 crate::registry::ACTOR_REGISTRY
382 .lock()
383 .unwrap()
384 .remove_by_id(id);
385 #[cfg(feature = "remote")]
386 if let Some(entry) = remote::REMOTE_REGISTRY.lock().await.remove(id)
387 && let Some(registered_name) = entry.name
388 && let Some(swarm) = remote::ActorSwarm::get()
389 {
390 _ = swarm.unregister(registered_name);
391 }
392}
393
394#[inline]
395#[cfg(feature = "tracing")]
396fn log_actor_stop_reason(id: ActorId, name: &str, reason: &ActorStopReason) {
397 match reason {
398 reason @ ActorStopReason::Normal
399 | reason @ ActorStopReason::Killed
400 | reason @ ActorStopReason::LinkDied { .. } => {
401 trace!(%id, %name, ?reason, "actor stopped");
402 }
403 reason @ ActorStopReason::Panicked(_) => {
404 error!(%id, %name, ?reason, "actor stopped")
405 }
406 #[cfg(feature = "remote")]
407 reason @ ActorStopReason::PeerDisconnected => {
408 trace!(%id, %name, ?reason, "actor stopped");
409 }
410 }
411}
412
413#[cfg(not(feature = "tracing"))]
414fn log_actor_stop_reason(_id: ActorId, _name: &str, _reason: &ActorStopReason) {}