kameo/actor/
spawn.rs

1use std::{convert, ops::ControlFlow, panic::AssertUnwindSafe, sync::Arc, thread};
2
3use futures::{
4    future::BoxFuture,
5    stream::{AbortHandle, AbortRegistration, Abortable, FuturesUnordered},
6    FutureExt, StreamExt,
7};
8use tokio::{
9    runtime::{Handle, RuntimeFlavor},
10    sync::SetOnce,
11    task::JoinHandle,
12};
13#[cfg(feature = "tracing")]
14use tracing::{error, trace};
15
16#[cfg(feature = "remote")]
17use crate::remote;
18
19use crate::{
20    actor::{kind::ActorBehaviour, Actor, ActorRef, Link, Links, CURRENT_ACTOR_ID},
21    error::{invoke_actor_error_hook, ActorStopReason, PanicError, SendError},
22    mailbox::{MailboxReceiver, MailboxSender, Signal},
23};
24
25use super::ActorId;
26
27/// A `PreparedActor` represents an actor that has been initialized and is ready to be either run
28/// in the current task or spawned into a new task.
29///
30/// The `PreparedActor` provides access to the actor's [`ActorRef`] for interacting with the actor
31/// before it starts running. It allows for flexible execution, either by running the actor
32/// synchronously in the current task or spawning it in a separate task or thread.
33#[allow(missing_debug_implementations)]
34#[must_use = "the prepared actor needs to be ran/spawned"]
35pub struct PreparedActor<A: Actor> {
36    actor_ref: ActorRef<A>,
37    mailbox_rx: MailboxReceiver<A>,
38    abort_registration: AbortRegistration,
39}
40
41impl<A: Actor> PreparedActor<A> {
42    /// Creates a new prepared actor with a specific mailbox configuration, allowing access to its [`ActorRef`] before spawning.
43    ///
44    /// This function allows you to explicitly specify a mailbox when preparing an actor.
45    /// Use this when you need custom mailbox behavior or capacity.
46    ///
47    /// This is typically created though [`Actor::prepare`] and [`Actor::prepare_with_mailbox`].
48    pub fn new((mailbox_tx, mailbox_rx): (MailboxSender<A>, MailboxReceiver<A>)) -> Self {
49        let (abort_handle, abort_registration) = AbortHandle::new_pair();
50        let links = Links::default();
51        let startup_result = Arc::new(SetOnce::new());
52        let shutdown_result = Arc::new(SetOnce::new());
53        let actor_ref = ActorRef::new(
54            mailbox_tx,
55            abort_handle,
56            links,
57            startup_result,
58            shutdown_result,
59        );
60
61        PreparedActor {
62            actor_ref,
63            mailbox_rx,
64            abort_registration,
65        }
66    }
67
68    /// Returns a reference to the [`ActorRef`], which can be used to send messages to the actor.
69    ///
70    /// The `ActorRef` can be used for interaction before the actor starts processing its event loop.
71    pub fn actor_ref(&self) -> &ActorRef<A> {
72        &self.actor_ref
73    }
74
75    /// Runs the actor in the current context **without** spawning a separate task, until the actor is stopped.
76    ///
77    /// This is useful when you need to run an actor synchronously in the current context,
78    /// without background execution, and when the actor is expected to be short-lived.
79    ///
80    /// Note that the actor's mailbox may already contain messages before `run` is called.
81    /// In this case, the actor will process all pending messages in the mailbox before completing.
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// # use kameo::Actor;
87    /// # use kameo::actor::PreparedActor;
88    /// # use kameo::message::{Context, Message};
89    ///
90    /// # #[derive(Actor)]
91    /// # struct MyActor;
92    /// #
93    /// # impl Message<&'static str> for MyActor {
94    /// #     type Reply = ();
95    /// #     async fn handle(&mut self, msg: &'static str, ctx: &mut Context<Self, Self::Reply>) -> Self::Reply { }
96    /// # }
97    /// #
98    /// # tokio_test::block_on(async {
99    /// let prepared_actor = MyActor::prepare();
100    /// // Send it a message before it runs
101    /// prepared_actor.actor_ref().tell("hello!").await?;
102    /// prepared_actor.run(MyActor).await;
103    /// # Ok::<(), Box<dyn std::error::Error>>(())
104    /// # });
105    /// ```
106    pub async fn run(self, args: A::Args) -> Result<(A, ActorStopReason), PanicError> {
107        run_actor_lifecycle::<A>(
108            args,
109            self.actor_ref,
110            self.mailbox_rx,
111            self.abort_registration,
112        )
113        .await
114    }
115
116    /// Spawns the actor in a new background tokio task, returning the `JoinHandle`.
117    ///
118    /// See [`Actor::spawn`] for more information.
119    pub fn spawn(self, args: A::Args) -> JoinHandle<Result<(A, ActorStopReason), PanicError>> {
120        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
121        {
122            tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
123        }
124
125        #[cfg(all(tokio_unstable, feature = "tracing"))]
126        {
127            tokio::task::Builder::new()
128                .name(A::name())
129                .spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
130                .unwrap()
131        }
132    }
133
134    /// Spawns the actor in a new background thread, returning the `JoinHandle`.
135    ///
136    /// See [`Actor::spawn_in_thread`] for more information.
137    pub fn spawn_in_thread(
138        self,
139        args: A::Args,
140    ) -> thread::JoinHandle<Result<(A, ActorStopReason), PanicError>> {
141        let handle = Handle::current();
142        if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
143            panic!("threaded actors are not supported in a single threaded tokio runtime");
144        }
145
146        std::thread::Builder::new()
147            .name(A::name().to_string())
148            .spawn({
149                let actor_ref = self.actor_ref.clone();
150                move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run(args)))
151            })
152            .unwrap()
153    }
154}
155
156#[inline]
157async fn run_actor_lifecycle<A>(
158    args: A::Args,
159    actor_ref: ActorRef<A>,
160    mailbox_rx: MailboxReceiver<A>,
161    abort_registration: AbortRegistration,
162) -> Result<(A, ActorStopReason), PanicError>
163where
164    A: Actor,
165{
166    #[allow(unused_mut)]
167    let mut id = actor_ref.id();
168    let name = A::name();
169    #[cfg(feature = "tracing")]
170    trace!(%id, %name, "actor started");
171
172    let start_res = AssertUnwindSafe(A::on_start(args, actor_ref.clone()))
173        .catch_unwind()
174        .await
175        .map(|res| res.map_err(|err| PanicError::new(Box::new(err))))
176        .map_err(PanicError::new_from_panic_any)
177        .and_then(convert::identity);
178    let startup_finished = matches!(
179        actor_ref.weak_signal_mailbox().signal_startup_finished(),
180        Err(SendError::MailboxFull(()))
181    );
182
183    let actor_ref = actor_ref.into_downgrade();
184
185    match start_res {
186        Ok(actor) => {
187            let mut state = ActorBehaviour::new_from_actor(actor, actor_ref.clone());
188
189            let reason = Abortable::new(
190                abortable_actor_loop(
191                    &mut state,
192                    mailbox_rx,
193                    &actor_ref.startup_result,
194                    startup_finished,
195                ),
196                abort_registration,
197            )
198            .await
199            .unwrap_or(ActorStopReason::Killed);
200
201            let mut actor = state.shutdown().await;
202
203            let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
204
205            log_actor_stop_reason(id, name, &reason);
206            let on_stop_res = actor.on_stop(actor_ref.clone(), reason.clone()).await;
207
208            while let Some(()) = notify_futs.next().await {}
209
210            #[cfg(not(feature = "remote"))]
211            crate::registry::ACTOR_REGISTRY.lock().unwrap().remove(name);
212            #[cfg(feature = "remote")]
213            remote::REMOTE_REGISTRY.lock().await.remove(&id);
214
215            match on_stop_res {
216                Ok(()) => {
217                    actor_ref
218                        .shutdown_result
219                        .set(Ok(()))
220                        .expect("nothing else should set the shutdown result");
221                }
222                Err(err) => {
223                    let err = PanicError::new(Box::new(err));
224                    invoke_actor_error_hook(&err);
225
226                    actor_ref
227                        .shutdown_result
228                        .set(Err(err))
229                        .expect("nothing else should set the shutdown result");
230                }
231            }
232
233            Ok((actor, reason))
234        }
235        Err(err) => {
236            actor_ref
237                .startup_result
238                .set(Err(err.clone()))
239                .expect("nothing should set the startup result");
240
241            let reason = ActorStopReason::Panicked(err);
242            log_actor_stop_reason(id, name, &reason);
243
244            let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
245            while let Some(()) = notify_futs.next().await {}
246
247            let ActorStopReason::Panicked(err) = reason else {
248                unreachable!()
249            };
250
251            actor_ref
252                .shutdown_result
253                .set(Err(err.clone()))
254                .expect("nothing should set the startup result");
255
256            Err(err)
257        }
258    }
259}
260
261async fn abortable_actor_loop<A>(
262    state: &mut ActorBehaviour<A>,
263    mut mailbox_rx: MailboxReceiver<A>,
264    startup_result: &SetOnce<Result<(), PanicError>>,
265    startup_finished: bool,
266) -> ActorStopReason
267where
268    A: Actor,
269{
270    if startup_finished {
271        if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
272            return reason;
273        }
274    }
275    loop {
276        let reason = recv_mailbox_loop(state, &mut mailbox_rx, startup_result).await;
277        if let ControlFlow::Break(reason) = state.on_shutdown(reason).await {
278            return reason;
279        }
280    }
281}
282
283async fn recv_mailbox_loop<A>(
284    state: &mut ActorBehaviour<A>,
285    mailbox_rx: &mut MailboxReceiver<A>,
286    startup_result: &SetOnce<Result<(), PanicError>>,
287) -> ActorStopReason
288where
289    A: Actor,
290{
291    loop {
292        match state.next(mailbox_rx).await {
293            Some(Signal::StartupFinished) => {
294                if startup_result.set(Ok(())).is_err() {
295                    #[cfg(feature = "tracing")]
296                    error!("received startup finished signal after already being started up");
297                }
298                if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
299                    return reason;
300                }
301            }
302            Some(Signal::Message {
303                message,
304                actor_ref,
305                reply,
306                sent_within_actor,
307            }) => {
308                if let ControlFlow::Break(reason) = state
309                    .handle_message(message, actor_ref, reply, sent_within_actor)
310                    .await
311                {
312                    return reason;
313                }
314            }
315            Some(Signal::LinkDied { id, reason }) => {
316                if let ControlFlow::Break(reason) = state.handle_link_died(id, reason).await {
317                    return reason;
318                }
319            }
320            Some(Signal::Stop) | None => {
321                if let ControlFlow::Break(reason) = state.handle_stop().await {
322                    return reason;
323                }
324            }
325        }
326    }
327}
328
329async fn notify_links(
330    id: ActorId,
331    links: &Links,
332    reason: &ActorStopReason,
333) -> FuturesUnordered<BoxFuture<'static, ()>> {
334    let futs = FuturesUnordered::new();
335    {
336        let mut links = links.lock().await;
337        #[allow(unused_variables)]
338        for (link_actor_id, link) in links.drain() {
339            match link {
340                Link::Local(mailbox) => {
341                    let reason = reason.clone();
342                    futs.push(
343                        async move {
344                            if let Err(err) = mailbox.signal_link_died(id, reason).await {
345                                #[cfg(feature = "tracing")]
346                                error!("failed to notify actor a link died: {err}");
347                            }
348                        }
349                        .boxed(),
350                    );
351                }
352                #[cfg(feature = "remote")]
353                Link::Remote(notified_actor_remote_id) => {
354                    if let Some(swarm) = remote::ActorSwarm::get() {
355                        let reason = reason.clone();
356                        futs.push(
357                            async move {
358                                let res = swarm
359                                    .signal_link_died(
360                                        id,
361                                        link_actor_id,
362                                        notified_actor_remote_id,
363                                        reason,
364                                    )
365                                    .await;
366                                if let Err(err) = res {
367                                    #[cfg(feature = "tracing")]
368                                    error!("failed to notify actor a link died: {err}");
369                                }
370                            }
371                            .boxed(),
372                        );
373                    }
374                }
375            }
376        }
377    }
378
379    futs
380}
381
382#[inline]
383#[cfg(feature = "tracing")]
384fn log_actor_stop_reason(id: ActorId, name: &str, reason: &ActorStopReason) {
385    match reason {
386        reason @ ActorStopReason::Normal
387        | reason @ ActorStopReason::Killed
388        | reason @ ActorStopReason::LinkDied { .. } => {
389            trace!(%id, %name, ?reason, "actor stopped");
390        }
391        reason @ ActorStopReason::Panicked(_) => {
392            error!(%id, %name, ?reason, "actor stopped")
393        }
394        #[cfg(feature = "remote")]
395        reason @ ActorStopReason::PeerDisconnected => {
396            trace!(%id, %name, ?reason, "actor stopped");
397        }
398    }
399}
400
401#[cfg(not(feature = "tracing"))]
402fn log_actor_stop_reason(_id: ActorId, _name: &str, _reason: &ActorStopReason) {}