kameo/actor/
spawn.rs

1use std::{convert, ops::ControlFlow, panic::AssertUnwindSafe, sync::Arc, thread};
2
3use futures::{
4    FutureExt, StreamExt,
5    future::BoxFuture,
6    stream::{AbortHandle, AbortRegistration, Abortable, FuturesUnordered},
7};
8use tokio::{
9    runtime::{Handle, RuntimeFlavor},
10    sync::SetOnce,
11    task::JoinHandle,
12};
13#[cfg(feature = "tracing")]
14use tracing::{error, trace};
15
16#[cfg(feature = "remote")]
17use crate::remote;
18
19use crate::{
20    actor::{Actor, ActorRef, CURRENT_ACTOR_ID, Link, Links, kind::ActorBehaviour},
21    error::{ActorStopReason, PanicError, SendError, invoke_actor_error_hook},
22    mailbox::{MailboxReceiver, MailboxSender, Signal},
23};
24
25use super::ActorId;
26
27/// A `PreparedActor` represents an actor that has been initialized and is ready to be either run
28/// in the current task or spawned into a new task.
29///
30/// The `PreparedActor` provides access to the actor's [`ActorRef`] for interacting with the actor
31/// before it starts running. It allows for flexible execution, either by running the actor
32/// synchronously in the current task or spawning it in a separate task or thread.
33#[allow(missing_debug_implementations)]
34#[must_use = "the prepared actor needs to be ran/spawned"]
35pub struct PreparedActor<A: Actor> {
36    actor_ref: ActorRef<A>,
37    mailbox_rx: MailboxReceiver<A>,
38    abort_registration: AbortRegistration,
39}
40
41impl<A: Actor> PreparedActor<A> {
42    /// Creates a new prepared actor with a specific mailbox configuration, allowing access to its [`ActorRef`] before spawning.
43    ///
44    /// This function allows you to explicitly specify a mailbox when preparing an actor.
45    /// Use this when you need custom mailbox behavior or capacity.
46    ///
47    /// This is typically created though [`Actor::prepare`](crate::actor::Spawn::prepare) and [`Actor::prepare_with_mailbox`](crate::actor::Spawn::prepare_with_mailbox).
48    pub fn new((mailbox_tx, mailbox_rx): (MailboxSender<A>, MailboxReceiver<A>)) -> Self {
49        let (abort_handle, abort_registration) = AbortHandle::new_pair();
50        let links = Links::default();
51        let startup_result = Arc::new(SetOnce::new());
52        let shutdown_result = Arc::new(SetOnce::new());
53        let actor_ref = ActorRef::new(
54            mailbox_tx,
55            abort_handle,
56            links,
57            startup_result,
58            shutdown_result,
59        );
60
61        PreparedActor {
62            actor_ref,
63            mailbox_rx,
64            abort_registration,
65        }
66    }
67
68    /// Returns a reference to the [`ActorRef`], which can be used to send messages to the actor.
69    ///
70    /// The `ActorRef` can be used for interaction before the actor starts processing its event loop.
71    pub fn actor_ref(&self) -> &ActorRef<A> {
72        &self.actor_ref
73    }
74
75    /// Runs the actor in the current context **without** spawning a separate task, until the actor is stopped.
76    ///
77    /// This is useful when you need to run an actor synchronously in the current context,
78    /// without background execution, and when the actor is expected to be short-lived.
79    ///
80    /// Note that the actor's mailbox may already contain messages before `run` is called.
81    /// In this case, the actor will process all pending messages in the mailbox before completing.
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// # use kameo::Actor;
87    /// # use kameo::actor::{PreparedActor, Spawn};
88    /// # use kameo::message::{Context, Message};
89    ///
90    /// # #[derive(Actor)]
91    /// # struct MyActor;
92    /// #
93    /// # impl Message<&'static str> for MyActor {
94    /// #     type Reply = ();
95    /// #     async fn handle(&mut self, msg: &'static str, ctx: &mut Context<Self, Self::Reply>) -> Self::Reply { }
96    /// # }
97    /// #
98    /// # tokio_test::block_on(async {
99    /// let prepared_actor = MyActor::prepare();
100    /// // Send it a message before it runs
101    /// prepared_actor.actor_ref().tell("hello!").await?;
102    /// prepared_actor.run(MyActor).await;
103    /// # Ok::<(), Box<dyn std::error::Error>>(())
104    /// # });
105    /// ```
106    pub async fn run(self, args: A::Args) -> Result<(A, ActorStopReason), PanicError> {
107        run_actor_lifecycle::<A>(
108            args,
109            self.actor_ref,
110            self.mailbox_rx,
111            self.abort_registration,
112        )
113        .await
114    }
115
116    /// Spawns the actor in a new background tokio task, returning the `JoinHandle`.
117    ///
118    /// See [`Spawn::spawn`](crate::actor::Spawn::spawn) for more information.
119    pub fn spawn(self, args: A::Args) -> JoinHandle<Result<(A, ActorStopReason), PanicError>> {
120        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
121        {
122            tokio::spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
123        }
124
125        #[cfg(all(tokio_unstable, feature = "tracing"))]
126        {
127            tokio::task::Builder::new()
128                .name(A::name())
129                .spawn(CURRENT_ACTOR_ID.scope(self.actor_ref.id(), self.run(args)))
130                .unwrap()
131        }
132    }
133
134    /// Spawns the actor in a new background thread, returning the `JoinHandle`.
135    ///
136    /// See [`Spawn::spawn_in_thread`](crate::actor::Spawn::spawn_in_thread) for more information.
137    pub fn spawn_in_thread(
138        self,
139        args: A::Args,
140    ) -> thread::JoinHandle<Result<(A, ActorStopReason), PanicError>> {
141        let handle = Handle::current();
142        if matches!(handle.runtime_flavor(), RuntimeFlavor::CurrentThread) {
143            panic!("threaded actors are not supported in a single threaded tokio runtime");
144        }
145
146        std::thread::Builder::new()
147            .name(A::name().to_string())
148            .spawn({
149                let actor_ref = self.actor_ref.clone();
150                move || handle.block_on(CURRENT_ACTOR_ID.scope(actor_ref.id(), self.run(args)))
151            })
152            .unwrap()
153    }
154}
155
156#[inline]
157async fn run_actor_lifecycle<A>(
158    args: A::Args,
159    actor_ref: ActorRef<A>,
160    mailbox_rx: MailboxReceiver<A>,
161    abort_registration: AbortRegistration,
162) -> Result<(A, ActorStopReason), PanicError>
163where
164    A: Actor,
165{
166    #[allow(unused_mut)]
167    let mut id = actor_ref.id();
168    let name = A::name();
169    #[cfg(feature = "tracing")]
170    trace!(%id, %name, "actor started");
171
172    let start_res = AssertUnwindSafe(A::on_start(args, actor_ref.clone()))
173        .catch_unwind()
174        .await
175        .map(|res| res.map_err(|err| PanicError::new(Box::new(err))))
176        .map_err(PanicError::new_from_panic_any)
177        .and_then(convert::identity);
178    let startup_finished = matches!(
179        actor_ref.weak_signal_mailbox().signal_startup_finished(),
180        Err(SendError::MailboxFull(()))
181    );
182
183    let actor_ref = actor_ref.into_downgrade();
184
185    match start_res {
186        Ok(actor) => {
187            let mut state = ActorBehaviour::new_from_actor(actor, actor_ref.clone());
188
189            let reason = Abortable::new(
190                abortable_actor_loop(
191                    &mut state,
192                    mailbox_rx,
193                    &actor_ref.startup_result,
194                    startup_finished,
195                ),
196                abort_registration,
197            )
198            .await
199            .unwrap_or(ActorStopReason::Killed);
200
201            let mut actor = state.shutdown().await;
202
203            let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
204
205            log_actor_stop_reason(id, name, &reason);
206            let on_stop_res = actor.on_stop(actor_ref.clone(), reason.clone()).await;
207            while let Some(()) = notify_futs.next().await {}
208
209            unregister_actor(&id).await;
210
211            match on_stop_res {
212                Ok(()) => {
213                    actor_ref
214                        .shutdown_result
215                        .set(Ok(()))
216                        .expect("nothing else should set the shutdown result");
217                }
218                Err(err) => {
219                    let err = PanicError::new(Box::new(err));
220                    invoke_actor_error_hook(&err);
221
222                    actor_ref
223                        .shutdown_result
224                        .set(Err(err))
225                        .expect("nothing else should set the shutdown result");
226                }
227            }
228
229            Ok((actor, reason))
230        }
231        Err(err) => {
232            actor_ref
233                .startup_result
234                .set(Err(err.clone()))
235                .expect("nothing should set the startup result");
236
237            let reason = ActorStopReason::Panicked(err);
238            log_actor_stop_reason(id, name, &reason);
239
240            let mut notify_futs = notify_links(id, &actor_ref.links, &reason).await;
241            while let Some(()) = notify_futs.next().await {}
242
243            unregister_actor(&id).await;
244
245            let ActorStopReason::Panicked(err) = reason else {
246                unreachable!()
247            };
248
249            actor_ref
250                .shutdown_result
251                .set(Err(err.clone()))
252                .expect("nothing should set the startup result");
253
254            Err(err)
255        }
256    }
257}
258
259async fn abortable_actor_loop<A>(
260    state: &mut ActorBehaviour<A>,
261    mut mailbox_rx: MailboxReceiver<A>,
262    startup_result: &SetOnce<Result<(), PanicError>>,
263    startup_finished: bool,
264) -> ActorStopReason
265where
266    A: Actor,
267{
268    if startup_finished && let ControlFlow::Break(reason) = state.handle_startup_finished().await {
269        return reason;
270    }
271    loop {
272        let reason = recv_mailbox_loop(state, &mut mailbox_rx, startup_result).await;
273        if let ControlFlow::Break(reason) = state.on_shutdown(reason).await {
274            return reason;
275        }
276    }
277}
278
279async fn recv_mailbox_loop<A>(
280    state: &mut ActorBehaviour<A>,
281    mailbox_rx: &mut MailboxReceiver<A>,
282    startup_result: &SetOnce<Result<(), PanicError>>,
283) -> ActorStopReason
284where
285    A: Actor,
286{
287    loop {
288        match state.next(mailbox_rx).await {
289            Some(Signal::StartupFinished) => {
290                if startup_result.set(Ok(())).is_err() {
291                    #[cfg(feature = "tracing")]
292                    error!("received startup finished signal after already being started up");
293                }
294                if let ControlFlow::Break(reason) = state.handle_startup_finished().await {
295                    return reason;
296                }
297            }
298            Some(Signal::Message {
299                message,
300                actor_ref,
301                reply,
302                sent_within_actor,
303            }) => {
304                if let ControlFlow::Break(reason) = state
305                    .handle_message(message, actor_ref, reply, sent_within_actor)
306                    .await
307                {
308                    return reason;
309                }
310            }
311            Some(Signal::LinkDied { id, reason }) => {
312                if let ControlFlow::Break(reason) = state.handle_link_died(id, reason).await {
313                    return reason;
314                }
315            }
316            Some(Signal::Stop) | None => {
317                if let ControlFlow::Break(reason) = state.handle_stop().await {
318                    return reason;
319                }
320            }
321        }
322    }
323}
324
325async fn notify_links(
326    id: ActorId,
327    links: &Links,
328    reason: &ActorStopReason,
329) -> FuturesUnordered<BoxFuture<'static, ()>> {
330    let futs = FuturesUnordered::new();
331    {
332        let mut links = links.lock().await;
333        #[allow(unused_variables)]
334        for (link_actor_id, link) in links.drain() {
335            match link {
336                Link::Local(mailbox) => {
337                    let reason = reason.clone();
338                    futs.push(
339                        async move {
340                            if let Err(err) = mailbox.signal_link_died(id, reason).await {
341                                #[cfg(feature = "tracing")]
342                                error!("failed to notify actor a link died: {err}");
343                            }
344                        }
345                        .boxed(),
346                    );
347                }
348                #[cfg(feature = "remote")]
349                Link::Remote(notified_actor_remote_id) => {
350                    if let Some(swarm) = remote::ActorSwarm::get() {
351                        let reason = reason.clone();
352                        futs.push(
353                            async move {
354                                let res = swarm
355                                    .signal_link_died(
356                                        id,
357                                        link_actor_id,
358                                        notified_actor_remote_id,
359                                        reason,
360                                    )
361                                    .await;
362                                if let Err(err) = res {
363                                    #[cfg(feature = "tracing")]
364                                    error!("failed to notify actor a link died: {err}");
365                                }
366                            }
367                            .boxed(),
368                        );
369                    }
370                }
371            }
372        }
373    }
374
375    futs
376}
377
378#[allow(unused_variables)]
379async fn unregister_actor(id: &ActorId) {
380    #[cfg(not(feature = "remote"))]
381    crate::registry::ACTOR_REGISTRY
382        .lock()
383        .unwrap()
384        .remove_by_id(id);
385    #[cfg(feature = "remote")]
386    if let Some(entry) = remote::REMOTE_REGISTRY.lock().await.remove(id)
387        && let Some(registered_name) = entry.name
388        && let Some(swarm) = remote::ActorSwarm::get()
389    {
390        _ = swarm.unregister(registered_name);
391    }
392}
393
394#[inline]
395#[cfg(feature = "tracing")]
396fn log_actor_stop_reason(id: ActorId, name: &str, reason: &ActorStopReason) {
397    match reason {
398        reason @ ActorStopReason::Normal
399        | reason @ ActorStopReason::Killed
400        | reason @ ActorStopReason::LinkDied { .. } => {
401            trace!(%id, %name, ?reason, "actor stopped");
402        }
403        reason @ ActorStopReason::Panicked(_) => {
404            error!(%id, %name, ?reason, "actor stopped")
405        }
406        #[cfg(feature = "remote")]
407        reason @ ActorStopReason::PeerDisconnected => {
408            trace!(%id, %name, ?reason, "actor stopped");
409        }
410    }
411}
412
413#[cfg(not(feature = "tracing"))]
414fn log_actor_stop_reason(_id: ActorId, _name: &str, _reason: &ActorStopReason) {}