kameo/actor.rs
1//! Core functionality for defining and managing actors in Kameo.
2//!
3//! Actors are independent units of computation that run asynchronously, sending and receiving messages.
4//! Each actor operates within its own task, and its lifecycle is managed by hooks such as `on_start`, `on_panic`, and `on_stop`.
5//!
6//! The actor trait is designed to support fault tolerance, recovery from panics, and clean termination.
7//! Lifecycle hooks allow customization of actor behavior when starting, encountering errors, or shutting down.
8//!
9//! This module contains the primary `Actor` trait, which all actors must implement,
10//! as well as types for managing message queues (mailboxes) and actor references ([`ActorRef`]).
11//!
12//! # Features
13//! - **Asynchronous Message Handling**: Each actor processes messages asynchronously within its own task.
14//! - **Lifecycle Hooks**: Customizable hooks ([`on_start`], [`on_stop`], [`on_panic`]) for managing the actor's lifecycle.
15//! - **Backpressure**: Mailboxes can be bounded or unbounded, controlling the flow of messages.
16//! - **Supervision**: Actors can be linked, enabling robust supervision and error recovery systems.
17//!
18//! This module allows building resilient, fault-tolerant, distributed systems with flexible control over the actor lifecycle.
19//!
20//! [`on_start`]: Actor::on_start
21//! [`on_stop`]: Actor::on_stop
22//! [`on_panic`]: Actor::on_panic
23
24mod actor_ref;
25mod id;
26mod kind;
27mod spawn;
28
29use std::{any, ops::ControlFlow};
30
31use futures::Future;
32
33use crate::{
34 error::{ActorStopReason, PanicError},
35 mailbox::{self, MailboxReceiver, MailboxSender, Signal},
36 message::BoxMessage,
37 reply::{BoxReplySender, ReplyError},
38 supervision::{SupervisedActorBuilder, SupervisionStrategy},
39};
40
41pub use actor_ref::*;
42pub use id::*;
43pub use spawn::*;
44
45pub(crate) const DEFAULT_MAILBOX_CAPACITY: usize = 64;
46
47/// Core behavior of an actor, including its lifecycle events and how it processes messages.
48///
49/// Every actor must implement this trait, which provides hooks
50/// for the actor's initialization ([`on_start`]), handling errors ([`on_panic`]), and cleanup ([`on_stop`]).
51///
52/// The actor runs within its own task and processes messages asynchronously from a mailbox.
53/// Each actor can be linked to others, allowing for robust supervision and failure recovery mechanisms.
54///
55/// Methods in this trait that return [`Self::Error`] will cause the actor to stop with the reason
56/// [`ActorStopReason::Panicked`] if an error occurs. This enables graceful handling of actor panics
57/// or errors.
58///
59/// # Example with Derive
60///
61/// ```
62/// use kameo::Actor;
63///
64/// #[derive(Actor)]
65/// struct MyActor;
66/// ```
67///
68/// # Example Override Behaviour
69///
70/// ```
71/// use kameo::actor::{Actor, ActorRef, WeakActorRef};
72/// use kameo::error::{ActorStopReason, Infallible};
73///
74/// struct MyActor;
75///
76/// impl Actor for MyActor {
77/// type Args = Self;
78/// type Error = Infallible;
79///
80/// async fn on_start(
81/// state: Self::Args,
82/// actor_ref: ActorRef<Self>
83/// ) -> Result<Self, Self::Error> {
84/// println!("actor started");
85/// Ok(state)
86/// }
87///
88/// async fn on_stop(
89/// &mut self,
90/// actor_ref: WeakActorRef<Self>,
91/// reason: ActorStopReason,
92/// ) -> Result<(), Self::Error> {
93/// println!("actor stopped");
94/// Ok(())
95/// }
96/// }
97/// ```
98///
99/// # Lifecycle Hooks
100/// - [`on_start`]: Called when the actor starts. This is where initialization happens.
101/// - [`on_panic`]: Called when the actor encounters a panic or an error while processing a "tell" message.
102/// - [`on_stop`]: Called before the actor is stopped. This allows for cleanup tasks.
103/// - [`on_link_died`]: Hook that is invoked when a linked actor dies.
104///
105/// # Mailboxes
106/// Actors use a mailbox to queue incoming messages. You can choose between:
107/// - **Bounded Mailbox**: Limits the number of messages that can be queued, providing backpressure.
108/// - **Unbounded Mailbox**: Allows an infinite number of messages, but can lead to high memory usage.
109///
110/// Mailboxes enable efficient asynchronous message passing with support for both backpressure and
111/// unbounded queueing depending on system requirements.
112///
113/// [`on_start`]: Actor::on_start
114/// [`on_panic`]: Actor::on_panic
115/// [`on_stop`]: Actor::on_stop
116/// [`on_link_died`]: Actor::on_link_died
117pub trait Actor: Sized + Send + 'static {
118 /// Arguments to initialize the actor.
119 ///
120 /// Its common for `Args = Self`, allowing the actors state to be passed directly,
121 /// however for more complex use cases, the args can be any other type.
122 type Args: Send;
123
124 /// Actor error type.
125 ///
126 /// This error is used as the error returned by lifecycle hooks in this actor.
127 type Error: ReplyError;
128
129 /// The name of the actor, which can be useful for logging or debugging.
130 ///
131 /// # Default Implementation
132 /// By default, this returns the type name of the actor.
133 #[inline]
134 fn name() -> &'static str {
135 any::type_name::<Self>()
136 }
137
138 /// Defines the supervision strategy for this actor when acting as a supervisor.
139 ///
140 /// The supervision strategy determines which children should be restarted when a child
141 /// actor fails. Override this method to customize the behavior.
142 ///
143 /// # Default
144 ///
145 /// [`SupervisionStrategy::OneForOne`] - only restart the failed child
146 ///
147 /// # When to Override
148 ///
149 /// - Use [`SupervisionStrategy::OneForAll`] when children are tightly coupled and must be
150 /// kept in sync (e.g., services that share distributed state)
151 /// - Use [`SupervisionStrategy::RestForOne`] when children have sequential dependencies
152 /// (e.g., pipeline stages where later stages depend on earlier ones)
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use kameo::actor::{Actor, ActorRef};
158 /// use kameo::error::Infallible;
159 /// use kameo::supervision::SupervisionStrategy;
160 ///
161 /// struct Supervisor;
162 ///
163 /// impl Actor for Supervisor {
164 /// type Args = ();
165 /// type Error = Infallible;
166 ///
167 /// // Override to use OneForAll strategy
168 /// fn supervision_strategy() -> SupervisionStrategy {
169 /// SupervisionStrategy::OneForAll
170 /// }
171 ///
172 /// async fn on_start(
173 /// _: Self::Args,
174 /// _: ActorRef<Self>
175 /// ) -> Result<Self, Self::Error> {
176 /// Ok(Supervisor)
177 /// }
178 /// }
179 /// ```
180 ///
181 /// [`SupervisionStrategy::OneForOne`]: crate::supervision::SupervisionStrategy::OneForOne
182 /// [`SupervisionStrategy::OneForAll`]: crate::supervision::SupervisionStrategy::OneForAll
183 /// [`SupervisionStrategy::RestForOne`]: crate::supervision::SupervisionStrategy::RestForOne
184 #[inline]
185 fn supervision_strategy() -> SupervisionStrategy {
186 SupervisionStrategy::default()
187 }
188
189 /// Called when the actor starts, before it processes any messages.
190 ///
191 /// Messages sent internally by the actor during `on_start` are prioritized and processed
192 /// before any externally sent messages, even if external messages are received first.
193 ///
194 /// This ensures that the actor can properly initialize before handling external messages.
195 ///
196 /// # Example
197 ///
198 /// ```rust
199 /// # use kameo::actor::{Actor, ActorRef};
200 /// # use kameo::error::Infallible;
201 /// #
202 /// struct MyActor;
203 ///
204 /// impl Actor for MyActor {
205 /// type Args = Self;
206 /// type Error = Infallible;
207 ///
208 /// async fn on_start(
209 /// state: Self::Args,
210 /// _actor_ref: ActorRef<Self>
211 /// ) -> Result<Self, Self::Error> {
212 /// Ok(state)
213 /// }
214 /// }
215 /// ```
216 #[allow(unused_variables)]
217 fn on_start(
218 args: Self::Args,
219 actor_ref: ActorRef<Self>,
220 ) -> impl Future<Output = Result<Self, Self::Error>> + Send;
221
222 /// Called when the actor receives a message to be processed.
223 ///
224 /// By default, this method handles the incoming message immediately using the
225 /// actor's standard message handling logic.
226 ///
227 /// Advanced use cases can override this method to customize how messages are processed,
228 /// such as buffering messages for later processing or implementing custom scheduling.
229 ///
230 /// # Parameters
231 /// - `msg`: The incoming message, wrapped in a `BoxMessage<Self>`.
232 /// - `actor_ref`: A reference to the actor itself.
233 /// - `tx`: An optional reply sender, used when the message expects a response.
234 /// - `stop`: A mutable bool which can be set to true, stopping the actor immediately after processing this message.
235 ///
236 /// # Returns
237 /// A future that resolves to `Result<(), Box<dyn ReplyError>>`. An `Ok(())` indicates successful processing,
238 /// while an `Err` indicates an error occurred during message handling.
239 ///
240 /// # Default Implementation
241 /// The default implementation handles the message immediately by calling
242 /// `msg.handle_dyn(self, actor_ref, tx).await`.
243 ///
244 /// # Notes
245 /// - Overriding this method allows you to intercept and manipulate messages before they are processed.
246 /// - Be cautious when buffering messages, as unbounded buffering can lead to increased memory usage.
247 /// - Custom implementations should ensure that messages are eventually handled or appropriately discarded to
248 /// prevent message loss.
249 /// - The `tx` (reply sender) is tied to the specific `BoxMessage` it corresponds to,
250 /// and passing an incorrect or mismatched `tx` can lead to a panic.
251 /// - The `stop` variable can be set to true in a message handler, by calling `Context::stop`.
252 fn on_message(
253 &mut self,
254 msg: BoxMessage<Self>,
255 actor_ref: ActorRef<Self>,
256 tx: Option<BoxReplySender>,
257 stop: &mut bool,
258 ) -> impl Future<Output = Result<(), Box<dyn ReplyError>>> + Send {
259 async move { msg.handle_dyn(self, actor_ref, tx, stop).await }
260 }
261
262 /// Called when the actor encounters a panic or an error during "tell" message handling.
263 ///
264 /// This method gives the actor an opportunity to clean up or reset its state and determine
265 /// whether it should be stopped or continue processing messages.
266 ///
267 /// The `PanicError` parameter holds the error that occurred during.
268 /// This error can typically be downcasted into one of a few types:
269 /// - [`Self::Error`] type, which is the error type when one of the actor's lifecycle functions returns an error.
270 /// - An error type returned when handling a message.
271 /// - A string type, which can be accessed with `PanicError::with_str`.
272 /// String types are the result of regular panics, typically raised using the [`std::panic!`] macro.
273 /// - Any other panic types. Typically uncommon, though possible with [`std::panic::panic_any`].
274 ///
275 /// # Returns
276 /// Whether the actor should stop or continue processing messages.
277 #[allow(unused_variables)]
278 #[inline]
279 fn on_panic(
280 &mut self,
281 actor_ref: WeakActorRef<Self>,
282 err: PanicError,
283 ) -> impl Future<Output = Result<ControlFlow<ActorStopReason>, Self::Error>> + Send {
284 async move { Ok(ControlFlow::Break(ActorStopReason::Panicked(err))) }
285 }
286
287 /// Called when a linked actor dies.
288 ///
289 /// By default, the actor will stop if the reason for the linked actor's death is anything other
290 /// than `Normal`. You can customize this behavior in the implementation.
291 ///
292 /// # Returns
293 /// Whether the actor should stop or continue processing messages.
294 #[allow(unused_variables)]
295 #[inline]
296 fn on_link_died(
297 &mut self,
298 actor_ref: WeakActorRef<Self>,
299 id: ActorId,
300 reason: ActorStopReason,
301 ) -> impl Future<Output = Result<ControlFlow<ActorStopReason>, Self::Error>> + Send {
302 async move {
303 match &reason {
304 ActorStopReason::Normal | ActorStopReason::SupervisorRestart => {
305 Ok(ControlFlow::Continue(()))
306 }
307 ActorStopReason::Killed
308 | ActorStopReason::Panicked(_)
309 | ActorStopReason::LinkDied { .. } => {
310 Ok(ControlFlow::Break(ActorStopReason::LinkDied {
311 id,
312 reason: Box::new(reason),
313 }))
314 }
315 #[cfg(feature = "remote")]
316 ActorStopReason::PeerDisconnected => {
317 Ok(ControlFlow::Break(ActorStopReason::PeerDisconnected))
318 }
319 }
320 }
321 }
322
323 /// Called before the actor stops.
324 ///
325 /// This allows the actor to perform any necessary cleanup or release resources before being fully stopped.
326 ///
327 /// The error returned by this method will be unwraped by kameo, causing a panic in the tokio task or
328 /// thread running the actor.
329 #[allow(unused_variables)]
330 #[inline]
331 fn on_stop(
332 &mut self,
333 actor_ref: WeakActorRef<Self>,
334 reason: ActorStopReason,
335 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
336 async { Ok(()) }
337 }
338
339 /// Awaits the next signal typically from the mailbox.
340 ///
341 /// This can be overwritten for more advanced actor behaviour, such as awaiting multiple channels, etc.
342 /// The return value is a signal which will be handled by the actor.
343 #[allow(unused_variables)]
344 #[inline]
345 fn next(
346 &mut self,
347 actor_ref: WeakActorRef<Self>,
348 mailbox_rx: &mut MailboxReceiver<Self>,
349 ) -> impl Future<Output = Result<Option<Signal<Self>>, Self::Error>> + Send {
350 async move { Ok(mailbox_rx.recv().await) }
351 }
352}
353
354/// Provides methods for spawning actors with various configurations.
355///
356/// This trait is automatically implemented for all types that implement [`Actor`], providing
357/// convenient methods to spawn actors in different execution contexts and with different
358/// mailbox configurations.
359///
360/// The `Spawn` trait separates actor instantiation from actor behavior, keeping the [`Actor`]
361/// trait focused on lifecycle hooks and message handling while providing ergonomic spawn
362/// methods through this extension trait.
363///
364/// # Choosing a Spawn Method
365///
366/// - **[`spawn`]** or **[`spawn_default`]**: Standard async actor with bounded mailbox (most common)
367/// - **[`spawn_with_mailbox`]**: Custom mailbox configuration (unbounded, custom capacity)
368/// - **[`spawn_in_thread`]**: Blocking operations requiring dedicated thread
369/// - **[`spawn_link`]**: Actor needs supervision link established before spawning
370///
371/// # Examples
372///
373/// ## Basic Spawning
374///
375/// ```
376/// use kameo::Actor;
377/// use kameo::actor::Spawn;
378///
379/// #[derive(Actor)]
380/// struct Counter {
381/// count: i32,
382/// }
383///
384/// # tokio_test::block_on(async {
385/// // Spawn with explicit initialization
386/// let actor_ref = Counter::spawn(Counter { count: 0 });
387/// # })
388/// ```
389///
390/// ## Default Spawning
391///
392/// ```
393/// use kameo::Actor;
394/// use kameo::actor::Spawn;
395///
396/// #[derive(Actor, Default)]
397/// struct Counter {
398/// count: i32,
399/// }
400///
401/// # tokio_test::block_on(async {
402/// // Spawn with default initialization
403/// let actor_ref = Counter::spawn_default();
404/// # })
405/// ```
406///
407/// ## Custom Mailbox
408///
409/// ```
410/// use kameo::Actor;
411/// use kameo::actor::Spawn;
412/// use kameo::mailbox;
413///
414/// #[derive(Actor)]
415/// struct HighThroughput;
416///
417/// # tokio_test::block_on(async {
418/// // Spawn with unbounded mailbox for high message rates
419/// let actor_ref = HighThroughput::spawn_with_mailbox(
420/// HighThroughput,
421/// mailbox::unbounded()
422/// );
423/// # })
424/// ```
425///
426/// ## Blocking Operations
427///
428/// ```no_run
429/// use std::fs::File;
430/// use kameo::Actor;
431/// use kameo::actor::Spawn;
432///
433/// #[derive(Actor)]
434/// struct FileWriter {
435/// file: File,
436/// }
437///
438/// // Spawn in dedicated thread for blocking I/O
439/// let actor_ref = FileWriter::spawn_in_thread(
440/// FileWriter { file: File::create("log.txt").unwrap() }
441/// );
442/// ```
443///
444/// ## Supervision
445///
446/// ```
447/// use kameo::Actor;
448/// use kameo::actor::Spawn;
449///
450/// #[derive(Actor)]
451/// struct Supervisor;
452///
453/// #[derive(Actor)]
454/// struct Worker;
455///
456/// # tokio_test::block_on(async {
457/// let supervisor = Supervisor::spawn(Supervisor);
458/// // Link worker to supervisor before spawning
459/// let worker = Worker::spawn_link(&supervisor, Worker).await;
460/// # })
461/// ```
462///
463/// # Note
464///
465/// This trait is sealed and cannot be implemented manually. It is automatically available
466/// for all [`Actor`] types through a blanket implementation.
467///
468/// [`spawn`]: Spawn::spawn
469/// [`spawn_default`]: Spawn::spawn_default
470/// [`spawn_with_mailbox`]: Spawn::spawn_with_mailbox
471/// [`spawn_in_thread`]: Spawn::spawn_in_thread
472/// [`spawn_link`]: Spawn::spawn_link
473pub trait Spawn: Actor + private::Sealed {
474 /// Spawns the actor in a Tokio task, running asynchronously with a default bounded mailbox.
475 ///
476 /// This function spawns the actor in a non-blocking Tokio task, making it suitable for actors that need to
477 /// perform asynchronous operations. The actor runs in the background and can be interacted with through
478 /// the returned [`ActorRef`].
479 ///
480 /// By default, a bounded mailbox with capacity 64 is used to provide backpressure.
481 /// For custom mailbox configuration, use [`Spawn::spawn_with_mailbox`].
482 ///
483 /// # Example
484 ///
485 /// ```
486 /// use kameo::Actor;
487 /// use kameo::actor::Spawn;
488 ///
489 /// #[derive(Actor)]
490 /// struct MyActor;
491 ///
492 /// # tokio_test::block_on(async {
493 /// // Spawns with a default bounded mailbox (capacity 64)
494 /// let actor_ref = MyActor::spawn(MyActor);
495 /// # })
496 /// ```
497 ///
498 /// The actor will continue running in the background, and messages can be sent to it via `actor_ref`.
499 fn spawn(args: Self::Args) -> ActorRef<Self> {
500 Spawn::spawn_with_mailbox(args, mailbox::bounded(DEFAULT_MAILBOX_CAPACITY))
501 }
502
503 /// Spawns the actor with default initialization in a Tokio task.
504 ///
505 /// This is a convenience method for actors that implement [`Default`], equivalent to calling
506 /// `Self::spawn(Self::default())`. The actor runs asynchronously in a non-blocking Tokio task
507 /// and can be interacted with through the returned [`ActorRef`].
508 ///
509 /// By default, a bounded mailbox with capacity 64 is used to provide backpressure.
510 /// For custom initialization or mailbox configuration, use [`Spawn::spawn`] or
511 /// [`Spawn::spawn_with_mailbox`] instead.
512 ///
513 /// # Example
514 ///
515 /// ```
516 /// use kameo::Actor;
517 /// use kameo::actor::Spawn;
518 ///
519 /// #[derive(Actor, Default)]
520 /// struct MyActor {
521 /// count: i32,
522 /// }
523 ///
524 /// # tokio_test::block_on(async {
525 /// // Spawns with default state and bounded mailbox (capacity 64)
526 /// let actor_ref = MyActor::spawn_default();
527 /// # })
528 /// ```
529 ///
530 /// # Requirements
531 ///
532 /// This method requires that `Self::Args` implements [`Default`]. For actors where
533 /// `Args = Self`, this means the actor struct itself must implement `Default`.
534 #[must_use]
535 fn spawn_default() -> ActorRef<Self>
536 where
537 Self::Args: Default,
538 {
539 Spawn::spawn(Self::Args::default())
540 }
541
542 /// Spawns the actor in a Tokio task with a specific mailbox configuration.
543 ///
544 /// This function allows you to explicitly specify a mailbox when spawning an actor.
545 /// Use this when you need custom mailbox behavior or capacity.
546 ///
547 /// # Example
548 ///
549 /// ```
550 /// use kameo::Actor;
551 /// use kameo::actor::Spawn;
552 /// use kameo::mailbox;
553 ///
554 /// #[derive(Actor)]
555 /// struct MyActor;
556 ///
557 /// # tokio_test::block_on(async {
558 /// // Using a bounded mailbox with custom capacity
559 /// let actor_ref = MyActor::spawn_with_mailbox(MyActor, mailbox::bounded(1000));
560 ///
561 /// // Using an unbounded mailbox
562 /// let actor_ref = MyActor::spawn_with_mailbox(MyActor, mailbox::unbounded());
563 /// # })
564 /// ```
565 fn spawn_with_mailbox(
566 args: Self::Args,
567 (mailbox_tx, mailbox_rx): (MailboxSender<Self>, MailboxReceiver<Self>),
568 ) -> ActorRef<Self> {
569 let prepared_actor = PreparedActor::new((mailbox_tx, mailbox_rx));
570 let actor_ref = prepared_actor.actor_ref().clone();
571 prepared_actor.spawn(args);
572 actor_ref
573 }
574
575 /// Spawns and links the actor in a Tokio task with a default bounded mailbox.
576 ///
577 /// This function is used to ensure an actor is linked with another actor before it's truly spawned,
578 /// which avoids possible edge cases where the actor could die before having the chance to be linked.
579 ///
580 /// By default, a bounded mailbox with capacity 64 is used to provide backpressure.
581 /// For custom mailbox configuration, use [`Spawn::spawn_link_with_mailbox`].
582 ///
583 /// # Example
584 ///
585 /// ```
586 /// use kameo::Actor;
587 /// use kameo::actor::Spawn;
588 ///
589 /// #[derive(Actor)]
590 /// struct FooActor;
591 ///
592 /// #[derive(Actor)]
593 /// struct BarActor;
594 ///
595 /// # tokio_test::block_on(async {
596 /// let link_ref = FooActor::spawn(FooActor);
597 /// // Spawns with default bounded mailbox (capacity 64)
598 /// let actor_ref = BarActor::spawn_link(&link_ref, BarActor).await;
599 /// # })
600 /// ```
601 fn spawn_link<L>(
602 link_ref: &ActorRef<L>,
603 args: Self::Args,
604 ) -> impl Future<Output = ActorRef<Self>> + Send
605 where
606 L: Actor,
607 {
608 <Self as Spawn>::spawn_link_with_mailbox::<L>(
609 link_ref,
610 args,
611 mailbox::bounded(DEFAULT_MAILBOX_CAPACITY),
612 )
613 }
614
615 /// Spawns and links the actor in a Tokio task with a specific mailbox configuration.
616 ///
617 /// This function is used to ensure an actor is linked with another actor before it's truly spawned,
618 /// which avoids possible edge cases where the actor could die before having the chance to be linked.
619 ///
620 /// # Example
621 ///
622 /// ```
623 /// use kameo::Actor;
624 /// use kameo::actor::Spawn;
625 /// use kameo::mailbox;
626 ///
627 /// #[derive(Actor)]
628 /// struct FooActor;
629 ///
630 /// #[derive(Actor)]
631 /// struct BarActor;
632 ///
633 /// # tokio_test::block_on(async {
634 /// let link_ref = FooActor::spawn(FooActor);
635 /// // Using a custom mailbox
636 /// let actor_ref = BarActor::spawn_link_with_mailbox(&link_ref, BarActor, mailbox::unbounded()).await;
637 /// # })
638 /// ```
639 fn spawn_link_with_mailbox<L>(
640 link_ref: &ActorRef<L>,
641 args: Self::Args,
642 (mailbox_tx, mailbox_rx): (MailboxSender<Self>, MailboxReceiver<Self>),
643 ) -> impl Future<Output = ActorRef<Self>> + Send
644 where
645 L: Actor,
646 {
647 async move {
648 let prepared_actor = PreparedActor::new((mailbox_tx, mailbox_rx));
649 let actor_ref = prepared_actor.actor_ref().clone();
650 actor_ref.link(link_ref).await;
651 prepared_actor.spawn(args);
652 actor_ref
653 }
654 }
655
656 /// Spawns the actor in its own dedicated thread with a default bounded mailbox.
657 ///
658 /// This function spawns the actor in a separate thread, making it suitable for actors that perform blocking
659 /// operations, such as file I/O or other tasks that cannot be efficiently executed in an asynchronous context.
660 /// Despite running in a blocking thread, the actor can still communicate asynchronously with other actors.
661 ///
662 /// By default, a bounded mailbox with capacity 64 is used to provide backpressure.
663 /// For custom mailbox configuration, use [`Spawn::spawn_in_thread_with_mailbox`].
664 ///
665 /// # Example
666 ///
667 /// ```no_run
668 /// use std::io::{self, Write};
669 /// use std::fs::File;
670 ///
671 /// use kameo::Actor;
672 /// use kameo::actor::Spawn;
673 /// use kameo::message::{Context, Message};
674 ///
675 /// #[derive(Actor)]
676 /// struct MyActor {
677 /// file: File,
678 /// }
679 ///
680 /// struct Flush;
681 /// impl Message<Flush> for MyActor {
682 /// type Reply = io::Result<()>;
683 ///
684 /// async fn handle(&mut self, _: Flush, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
685 /// self.file.flush() // This blocking operation is handled in its own thread
686 /// }
687 /// }
688 ///
689 /// let actor_ref = MyActor::spawn_in_thread(
690 /// MyActor { file: File::create("output.txt").unwrap() }
691 /// );
692 /// actor_ref.tell(Flush).blocking_send()?;
693 /// # Ok::<(), kameo::error::SendError<Flush>>(())
694 /// ```
695 ///
696 /// This function is useful for actors that require or benefit from running blocking operations while still
697 /// enabling asynchronous functionality.
698 fn spawn_in_thread(args: Self::Args) -> ActorRef<Self> {
699 Spawn::spawn_in_thread_with_mailbox(args, mailbox::bounded(DEFAULT_MAILBOX_CAPACITY))
700 }
701
702 /// Spawns the actor in its own dedicated thread with a specific mailbox configuration.
703 ///
704 /// This function allows you to explicitly specify a mailbox when spawning an actor in a dedicated thread.
705 /// Use this when you need custom mailbox behavior or capacity for actors that perform blocking operations.
706 ///
707 /// # Example
708 ///
709 /// ```no_run
710 /// use std::io::{self, Write};
711 /// use std::fs::File;
712 ///
713 /// use kameo::Actor;
714 /// use kameo::actor::Spawn;
715 /// use kameo::mailbox;
716 /// use kameo::message::{Context, Message};
717 ///
718 /// #[derive(Actor)]
719 /// struct MyActor {
720 /// file: File,
721 /// }
722 ///
723 /// struct Flush;
724 /// impl Message<Flush> for MyActor {
725 /// type Reply = io::Result<()>;
726 ///
727 /// async fn handle(&mut self, _: Flush, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
728 /// self.file.flush() // This blocking operation is handled in its own thread
729 /// }
730 /// }
731 ///
732 /// let actor_ref = MyActor::spawn_in_thread_with_mailbox(
733 /// MyActor { file: File::create("output.txt").unwrap() },
734 /// mailbox::bounded(100)
735 /// );
736 /// actor_ref.tell(Flush).blocking_send()?;
737 /// # Ok::<(), kameo::error::SendError<Flush>>(())
738 /// ```
739 fn spawn_in_thread_with_mailbox(
740 args: Self::Args,
741 (mailbox_tx, mailbox_rx): (MailboxSender<Self>, MailboxReceiver<Self>),
742 ) -> ActorRef<Self> {
743 let prepared_actor = PreparedActor::new((mailbox_tx, mailbox_rx));
744 let actor_ref = prepared_actor.actor_ref().clone();
745 prepared_actor.spawn_in_thread(args);
746 actor_ref
747 }
748
749 /// Creates a new prepared actor, allowing access to its [`ActorRef`] before spawning.
750 ///
751 /// # Example
752 ///
753 /// ```
754 /// use kameo::Actor;
755 /// use kameo::actor::Spawn;
756 ///
757 /// #[derive(Actor)]
758 /// struct MyActor;
759 ///
760 /// # tokio_test::block_on(async {
761 /// let other_actor = MyActor::spawn(MyActor);
762 /// let prepared_actor = MyActor::prepare();
763 /// prepared_actor.actor_ref().link(&other_actor).await;
764 /// let actor_ref = prepared_actor.spawn(MyActor);
765 /// # Ok::<(), Box<dyn std::error::Error>>(())
766 /// # });
767 /// ```
768 fn prepare() -> PreparedActor<Self> {
769 Spawn::prepare_with_mailbox(mailbox::bounded(DEFAULT_MAILBOX_CAPACITY))
770 }
771
772 /// Creates a new prepared actor with a specific mailbox configuration, allowing access to its [`ActorRef`] before spawning.
773 ///
774 /// This function allows you to explicitly specify a mailbox when preparing an actor.
775 /// Use this when you need custom mailbox behavior or capacity.
776 ///
777 /// # Example
778 ///
779 /// ```
780 /// use kameo::Actor;
781 /// use kameo::actor::Spawn;
782 /// use kameo::mailbox;
783 ///
784 /// #[derive(Actor)]
785 /// struct MyActor;
786 ///
787 /// # tokio_test::block_on(async {
788 /// let other_actor = MyActor::spawn(MyActor);
789 /// let prepared_actor = MyActor::prepare_with_mailbox(mailbox::unbounded());
790 /// prepared_actor.actor_ref().link(&other_actor).await;
791 /// let actor_ref = prepared_actor.spawn(MyActor);
792 /// # Ok::<(), Box<dyn std::error::Error>>(())
793 /// # });
794 /// ```
795 fn prepare_with_mailbox(
796 (mailbox_tx, mailbox_rx): (MailboxSender<Self>, MailboxReceiver<Self>),
797 ) -> PreparedActor<Self> {
798 PreparedActor::new((mailbox_tx, mailbox_rx))
799 }
800
801 /// Creates a supervised child actor under a supervisor.
802 ///
803 /// This method returns a [`SupervisedActorBuilder`] that allows you to configure
804 /// restart policies and limits before spawning the child. The child will be linked
805 /// to the supervisor, and the supervisor will manage its lifecycle according to the
806 /// configured policies.
807 ///
808 /// When the child actor needs to be restarted, the provided `args` will be cloned
809 /// and passed to the actor's [`on_start`](Actor::on_start) method. This requires
810 /// that `Args` implements [`Clone`] + [`Sync`].
811 ///
812 /// **Note**: If your args type is not [`Sync`], use [`supervise_with`](Self::supervise_with)
813 /// instead, which only requires [`Send`].
814 ///
815 /// # Parameters
816 ///
817 /// - `supervisor_ref`: Reference to the supervisor actor that will manage this child
818 /// - `args`: Initial arguments for the child actor (must be [`Clone`] + [`Sync`])
819 ///
820 /// # Returns
821 ///
822 /// A [`SupervisedActorBuilder`] for configuring supervision behavior
823 ///
824 /// # Examples
825 ///
826 /// ```no_run
827 /// use std::time::Duration;
828 /// use kameo::actor::{Actor, ActorRef, Spawn};
829 /// use kameo::error::Infallible;
830 /// use kameo::supervision::{RestartPolicy, SupervisionStrategy};
831 ///
832 /// struct Supervisor;
833 /// impl Actor for Supervisor {
834 /// type Args = ();
835 /// type Error = Infallible;
836 ///
837 /// fn supervision_strategy() -> SupervisionStrategy {
838 /// SupervisionStrategy::OneForOne
839 /// }
840 ///
841 /// async fn on_start(_: Self::Args, actor_ref: ActorRef<Self>) -> Result<Self, Self::Error> {
842 /// // Spawn a supervised child
843 /// let child = Worker::supervise(&actor_ref, Worker { count: 0 })
844 /// .restart_policy(RestartPolicy::Transient)
845 /// .restart_limit(5, Duration::from_secs(10))
846 /// .spawn()
847 /// .await;
848 ///
849 /// Ok(Supervisor)
850 /// }
851 /// }
852 ///
853 /// #[derive(Clone)]
854 /// struct Worker {
855 /// count: u32,
856 /// }
857 ///
858 /// impl Actor for Worker {
859 /// type Args = Self;
860 /// type Error = Infallible;
861 ///
862 /// async fn on_start(state: Self::Args, _: ActorRef<Self>) -> Result<Self, Self::Error> {
863 /// Ok(state)
864 /// }
865 /// }
866 /// ```
867 ///
868 /// [`SupervisedActorBuilder`]: crate::supervision::SupervisedActorBuilder
869 fn supervise<A: Actor>(
870 supervisor_ref: &ActorRef<A>,
871 args: Self::Args,
872 ) -> SupervisedActorBuilder<'_, A, Self>
873 where
874 Self::Args: Clone + Sync,
875 {
876 SupervisedActorBuilder::new(supervisor_ref, args)
877 }
878
879 /// Creates a supervised child actor with a factory function for generating args.
880 ///
881 /// This is similar to [`supervise`](Self::supervise), but instead of cloning the same
882 /// args on each restart, it calls a factory function to generate new args. This is
883 /// useful when:
884 ///
885 /// - **The actor's args don't implement [`Sync`]** (only requires [`Send`])
886 /// - The actor's args don't implement [`Clone`]
887 /// - You need to generate fresh state on each restart
888 /// - Args need to be dynamically computed at restart time
889 ///
890 /// Unlike [`supervise`](Self::supervise), this method only requires `Args: Send`, making
891 /// it suitable for types that cannot be safely shared across threads.
892 ///
893 /// # Parameters
894 ///
895 /// - `supervisor_ref`: Reference to the supervisor actor that will manage this child
896 /// - `f`: Factory function that returns fresh args on each restart (must be [`Send`] + [`Sync`])
897 ///
898 /// # Returns
899 ///
900 /// A [`SupervisedActorBuilder`] for configuring supervision behavior
901 ///
902 /// # Examples
903 ///
904 /// ```no_run
905 /// use std::time::Duration;
906 /// use kameo::actor::{Actor, ActorRef, Spawn};
907 /// use kameo::error::Infallible;
908 /// use kameo::supervision::RestartPolicy;
909 ///
910 /// struct Supervisor;
911 /// impl Actor for Supervisor {
912 /// type Args = ();
913 /// type Error = Infallible;
914 ///
915 /// async fn on_start(_: Self::Args, actor_ref: ActorRef<Self>) -> Result<Self, Self::Error> {
916 /// // Use a factory function to generate fresh state on each restart
917 /// let child = Worker::supervise_with(&actor_ref, || Worker {
918 /// // This function is called on each restart
919 /// start_time: std::time::Instant::now(),
920 /// })
921 /// .restart_policy(RestartPolicy::Transient)
922 /// .spawn()
923 /// .await;
924 ///
925 /// Ok(Supervisor)
926 /// }
927 /// }
928 ///
929 /// struct Worker {
930 /// start_time: std::time::Instant, // Not Clone
931 /// }
932 ///
933 /// impl Actor for Worker {
934 /// type Args = Self;
935 /// type Error = Infallible;
936 ///
937 /// async fn on_start(state: Self::Args, _: ActorRef<Self>) -> Result<Self, Self::Error> {
938 /// Ok(state)
939 /// }
940 /// }
941 /// ```
942 ///
943 /// [`SupervisedActorBuilder`]: crate::supervision::SupervisedActorBuilder
944 fn supervise_with<A: Actor>(
945 supervisor_ref: &ActorRef<A>,
946 f: impl Fn() -> Self::Args + Send + Sync + 'static,
947 ) -> SupervisedActorBuilder<'_, A, Self> {
948 SupervisedActorBuilder::new_with(supervisor_ref, f)
949 }
950}
951
952impl<A: Actor> Spawn for A {}
953
954mod private {
955 use super::Actor;
956
957 pub trait Sealed {}
958 impl<A: Actor> Sealed for A {}
959}