Skip to main content

rsactor/
lib.rs

1// Copyright 2022 Jeff Kim <hiking90@gmail.com>
2// SPDX-License-Identifier: Apache-2.0
3
4//! # rsActor
5//! A Simple and Efficient In-Process Actor Model Implementation for Rust
6//!
7//! `rsActor` is a lightweight, Tokio-based actor framework in Rust focused on providing a simple
8//! and efficient actor model for local, in-process systems. It emphasizes clean message-passing
9//! semantics and straightforward actor lifecycle management while maintaining high performance for
10//! Rust applications.
11//!
12//! ## Features
13//!
14//! - **Asynchronous Actors**: Actors run in their own asynchronous tasks.
15//! - **Message Passing**: Actors communicate by sending and receiving messages.
16//!   - [`tell`](actor_ref::ActorRef::tell): Send a message without waiting for a reply (fire-and-forget).
17//!   - [`tell_with_timeout`](actor_ref::ActorRef::tell_with_timeout): Send a message without waiting for a reply, with a specified timeout.
18//!   - [`ask`](actor_ref::ActorRef::ask): Send a message and await a reply.
19//!   - [`ask_with_timeout`](actor_ref::ActorRef::ask_with_timeout): Send a message and await a reply, with a specified timeout.
20//!   - [`tell_blocking`](actor_ref::ActorRef::tell_blocking): Blocking version of `tell` for use in [`tokio::task::spawn_blocking`] tasks.
21//!   - [`ask_blocking`](actor_ref::ActorRef::ask_blocking): Blocking version of `ask` for use in [`tokio::task::spawn_blocking`] tasks.
22//! - **Straightforward Actor Lifecycle**: Actors have [`on_start`](Actor::on_start), [`on_run`](Actor::on_run),
23//!   and [`on_stop`](Actor::on_stop) lifecycle hooks that provide a clean and intuitive actor lifecycle management system.
24//!   The framework manages the execution flow while giving developers full control over actor behavior.
25//! - **Graceful Shutdown & Kill**: Actors can be stopped gracefully or killed immediately.
26//! - **Typed Messages**: Messages are strongly typed, and replies are also typed.
27//! - **Macro for Message Handling**:
28//!   - [`message_handlers`] attribute macro with `#[handler]` method attributes for automatic message handling (recommended)
29//! - **Type Safety Features**: [`ActorRef<T>`] provides compile-time type safety with zero runtime overhead
30//! - **Optional Tracing Support**: Built-in observability using the [`tracing`](https://crates.io/crates/tracing) crate (enable with `tracing` feature):
31//!   - Actor lifecycle event tracing (start, stop, different termination scenarios)
32//!   - Message handling with timing and performance metrics
33//!   - Reply processing and error handling tracing
34//!   - Structured, non-redundant logs for easier debugging and monitoring
35//! - **Dead Letter Tracking**: Automatic logging of undelivered messages via [`DeadLetterReason`]:
36//!   - All failed message deliveries are logged with actor and message type information
37//!   - Helps identify stopped actors, timeouts, and dropped replies
38//!   - Zero overhead on successful message delivery (hot path optimization)
39//! - **Enhanced Error Debugging**: Rich error information via [`Error::debugging_tips()`](Error::debugging_tips) and [`Error::is_retryable()`](Error::is_retryable):
40//!   - Actionable debugging tips for each error type
41//!   - Retry classification for timeout errors
42//!
43//! ## Core Concepts
44//!
45//! - **[`Actor`]**: Trait defining actor behavior and lifecycle hooks ([`on_start`](Actor::on_start) required, [`on_run`](Actor::on_run) optional).
46//! - **[`Message<M>`](actor::Message)**: Trait for handling a message type `M` and defining its reply type.
47//! - **[`ActorRef`]**: Handle for sending messages to an actor.
48//! - **[`spawn`]**: Function to create and start an actor, returning an [`ActorRef`] and a `JoinHandle`.
49//! - **[`ActorResult`]**: Enum representing the outcome of an actor's lifecycle (e.g., completed, failed).
50//!
51//! ## Getting Started
52//!
53//! ### Message Handling with `#[message_handlers]`
54//!
55//! rsActor uses the `#[message_handlers]` attribute macro combined with `#[handler]` method attributes
56//! for message handling. This is **required** for all actors and offers several advantages:
57//!
58//! - **Selective Processing**: Only methods marked with `#[handler]` are treated as message handlers.
59//! - **Clean Separation**: Regular methods can coexist with message handlers within the same `impl` block.
60//! - **Automatic Generation**: The macro automatically generates the necessary `Message` trait implementations and handler registrations.
61//! - **Type Safety**: Message handler signatures are verified at compile time.
62//! - **Reduced Boilerplate**: Eliminates the need to manually implement `Message` traits.
63//!
64//! ### Option A: Simple Actor with `#[derive(Actor)]`
65//!
66//! For simple actors that don't need complex initialization logic, use the `#[derive(Actor)]` macro:
67//!
68//! ```rust
69//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
70//!
71//! // 1. Define your actor struct and derive Actor
72//! #[derive(Actor)]
73//! struct MyActor {
74//!     name: String,
75//!     count: u32,
76//! }
77//!
78//! // 2. Define message types
79//! struct GetName;
80//! struct Increment;
81//!
82//! // 3. Use message_handlers macro with handler attributes
83//! #[message_handlers]
84//! impl MyActor {
85//!     #[handler]
86//!     async fn handle_get_name(&mut self, _msg: GetName, _: &ActorRef<Self>) -> String {
87//!         self.name.clone()
88//!     }
89//!
90//!     #[handler]
91//!     async fn handle_increment(&mut self, _msg: Increment, _: &ActorRef<Self>) -> () {
92//!         self.count += 1;
93//!     }
94//!
95//!     // Regular methods can coexist without the #[handler] attribute
96//!     fn get_count(&self) -> u32 {
97//!         self.count
98//!     }
99//! }
100//!
101//! // 4. Usage
102//! # #[tokio::main]
103//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
104//! let actor_instance = MyActor { name: "Test".to_string(), count: 0 };
105//! let (actor_ref, _join_handle) = spawn::<MyActor>(actor_instance);
106//!
107//! let name = actor_ref.ask(GetName).await?;
108//! actor_ref.tell(Increment).await?;
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ### Option B: Custom Actor Implementation with Manual Initialization
114//!
115//! For actors that need custom initialization logic, implement the `Actor` trait manually:
116//!
117//! ```rust
118//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
119//! use anyhow::Result;
120//!
121//! // 1. Define your actor struct
122//! #[derive(Debug)] // Added Debug for printing the actor in ActorResult
123//! struct MyActor {
124//!     data: String,
125//!     count: u32,
126//! }
127//!
128//! // 2. Implement the Actor trait manually
129//! impl Actor for MyActor {
130//!     type Args = String;
131//!     type Error = anyhow::Error;
132//!
133//!     // on_start is required and must be implemented.
134//!     // on_run and on_stop are optional and have default implementations.
135//!     async fn on_start(initial_data: Self::Args, actor_ref: &ActorRef<Self>) -> std::result::Result<Self, Self::Error> {
136//!         println!("MyActor (id: {}) started with data: '{}'", actor_ref.identity(), initial_data);
137//!         Ok(MyActor {
138//!             data: initial_data,
139//!             count: 0,
140//!         })
141//!     }
142//! }
143//!
144//! // 3. Define message types
145//! struct GetData;
146//! struct IncrementMsg(u32);
147//!
148//! // 4. Use message_handlers macro for message handling
149//! #[message_handlers]
150//! impl MyActor {
151//!     #[handler]
152//!     async fn handle_get_data(&mut self, _msg: GetData, _actor_ref: &ActorRef<Self>) -> String {
153//!         self.data.clone()
154//!     }
155//!
156//!     #[handler]
157//!     async fn handle_increment(&mut self, msg: IncrementMsg, _actor_ref: &ActorRef<Self>) -> u32 {
158//!         self.count += msg.0;
159//!         self.count
160//!     }
161//! }
162//!
163//! // 5. Usage
164//! # #[tokio::main]
165//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
166//! let (actor_ref, join_handle) = spawn::<MyActor>("initial data".to_string());
167//!
168//! let current_data: String = actor_ref.ask(GetData).await?;
169//! let new_count: u32 = actor_ref.ask(IncrementMsg(5)).await?;
170//!
171//! actor_ref.stop().await?;
172//! let actor_result = join_handle.await?;
173//! # Ok(())
174//! # }
175//! ```
176//!
177//! Both approaches also work with enums, making it easy to create state machine actors:
178//!
179//! ```rust
180//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
181//!
182//! // Using message_handlers macro approach
183//! #[derive(Actor, Clone)]
184//! enum StateActor {
185//!     Idle,
186//!     Processing(String),
187//!     Completed(i32),
188//! }
189//!
190//! struct GetState;
191//! struct StartProcessing(String);
192//! struct Complete(i32);
193//!
194//! #[message_handlers]
195//! impl StateActor {
196//!     #[handler]
197//!     async fn handle_get_state(&mut self, _msg: GetState, _: &ActorRef<Self>) -> StateActor {
198//!         self.clone()
199//!     }
200//!
201//!     #[handler]
202//!     async fn handle_start_processing(&mut self, msg: StartProcessing, _: &ActorRef<Self>) -> () {
203//!         *self = StateActor::Processing(msg.0);
204//!     }
205//!
206//!     #[handler]
207//!     async fn handle_complete(&mut self, msg: Complete, _: &ActorRef<Self>) -> () {
208//!         *self = StateActor::Completed(msg.0);
209//!     }
210//! }
211//! ```
212//!
213//! ## Tracing Support
214//!
215//! rsActor provides optional tracing support for comprehensive observability. Enable it with the `tracing` feature:
216//!
217//! ```toml
218//! [dependencies]
219//! rsactor = { version = "0.13", features = ["tracing"] }
220//! tracing = "0.1"
221//! tracing-subscriber = "0.3"
222//! ```
223//!
224//! When enabled, rsActor emits structured trace events for:
225//! - Actor lifecycle events (start, stop, termination scenarios)
226//! - Message sending and handling with timing information
227//! - Reply processing and error handling
228//! - Performance metrics (message processing duration)
229//!
230//! All examples support tracing. Here's the integration pattern:
231//!
232//! ```rust,no_run
233//! #[tokio::main]
234//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
235//!     // Initialize tracing subscriber to see logs
236//!     // The `tracing` crate is always available for logging
237//!     tracing_subscriber::fmt()
238//!         .with_max_level(tracing::Level::DEBUG)
239//!         .with_target(false)
240//!         .init();
241//!
242//!     // Your existing actor code here...
243//!     // Logs are automatically emitted via tracing::warn!, tracing::error!, etc.
244//!     Ok(())
245//! }
246//! ```
247//!
248//! Run any example with debug logging:
249//! ```bash
250//! RUST_LOG=debug cargo run --example basic
251//! ```
252//!
253//! Enable instrumentation spans with the `tracing` feature:
254//! ```bash
255//! RUST_LOG=debug cargo run --example basic --features tracing
256//! ```
257//!
258//! This crate-level documentation provides an overview of [`rsActor`](crate).
259//! For more details on specific components, please refer to their individual
260//! documentation.
261
262mod error;
263pub use error::{Error, Result};
264
265mod dead_letter;
266pub use dead_letter::DeadLetterReason;
267
268// Re-export test utilities when test-utils feature is enabled
269#[cfg(any(test, feature = "test-utils"))]
270pub use dead_letter::{dead_letter_count, reset_dead_letter_count};
271
272#[cfg(feature = "metrics")]
273mod metrics;
274#[cfg(feature = "metrics")]
275pub use metrics::MetricsSnapshot;
276
277mod actor_ref;
278pub use actor_ref::{ActorRef, ActorWeak};
279
280mod actor_result;
281pub use actor_result::{ActorResult, FailurePhase};
282
283mod actor;
284pub use actor::{Actor, Message};
285
286mod handler;
287pub use handler::{AskHandler, TellHandler, WeakAskHandler, WeakTellHandler};
288
289mod actor_control;
290pub use actor_control::{ActorControl, WeakActorControl};
291
292use futures::FutureExt;
293// Re-export derive macros for convenient access
294pub use rsactor_derive::{message_handlers, Actor};
295
296use std::{fmt::Debug, future::Future, sync::atomic::AtomicU64, sync::OnceLock};
297
298use tokio::sync::{mpsc, oneshot};
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub struct Identity {
302    /// Unique ID of the actor
303    pub id: u64,
304    /// Type name of the actor
305    pub type_name: &'static str,
306}
307
308impl Identity {
309    /// Creates a new `Identity` with the given ID and type name.
310    pub fn new(id: u64, type_name: &'static str) -> Self {
311        Identity { id, type_name }
312    }
313
314    /// Returns the type name of the actor.
315    pub fn name(&self) -> &'static str {
316        self.type_name
317    }
318}
319
320impl std::fmt::Display for Identity {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        write!(f, "{}(#{})", self.type_name, self.id)
323    }
324}
325
326/// Type-erased payload handler trait for dynamic message dispatch.
327///
328/// This trait allows different message types to be handled uniformly within the actor system,
329/// enabling storage of various message types in the same mailbox while preserving type safety
330/// through the `Message` trait implementation.
331trait PayloadHandler<A>: Send
332where
333    A: Actor,
334{
335    /// Handles the message by calling the appropriate handler and optionally sending a reply.
336    ///
337    /// # Parameters
338    /// - `actor`: Mutable reference to the actor instance
339    /// - `actor_ref`: Reference to the actor for potential self-messaging
340    /// - `reply_channel`: Optional channel to send the result back for `ask` operations
341    fn handle_message(
342        self: Box<Self>,
343        actor: &mut A,
344        actor_ref: ActorRef<A>,
345        reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
346    ) -> BoxFuture<'_, ()>;
347}
348
349/// A boxed future that is Send and can be stored in collections.
350///
351/// This type alias is used throughout the handler traits for object-safe async methods.
352/// Identical to `futures::future::BoxFuture` but defined locally to avoid exposing
353/// the `futures` crate in the public API surface.
354pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 'a>>;
355
356impl<A, T> PayloadHandler<A> for T
357where
358    A: Actor + Message<T> + 'static,
359    T: Send + 'static,
360{
361    fn handle_message(
362        self: Box<Self>,
363        actor: &mut A,
364        actor_ref: ActorRef<A>,
365        reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
366    ) -> BoxFuture<'_, ()> {
367        async move {
368            let result = Message::handle(actor, *self, &actor_ref).await;
369            if let Some(channel) = reply_channel {
370                match channel.send(Box::new(result)) {
371                    Ok(_) => {
372                        #[cfg(feature = "tracing")]
373                        tracing::debug!(
374                            actor = %actor_ref.identity(),
375                            "Reply sent successfully"
376                        );
377                    }
378                    Err(_) => {
379                        tracing::error!(
380                            actor = %actor_ref.identity(),
381                            message_type = %std::any::type_name::<T>(),
382                            "Failed to send reply - receiver dropped"
383                        );
384                    }
385                }
386            } else {
387                <A as Message<T>>::on_tell_result(&result, &actor_ref);
388            }
389        }
390        .boxed()
391    }
392}
393
394/// Represents messages that can be sent to an actor's mailbox.
395///
396/// This enum includes both user-defined messages (wrapped in `Envelope`)
397/// and control messages like `StopGracefully`. The `Terminate` control signal
398/// is handled through a separate dedicated channel.
399pub(crate) enum MailboxMessage<T>
400where
401    T: Actor + 'static,
402{
403    /// A user-defined message to be processed by the actor.
404    Envelope {
405        /// The message payload containing the actual message data.
406        payload: Box<dyn PayloadHandler<T>>,
407        /// Optional channel to send the reply back to the caller (used for `ask` operations).
408        reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
409        /// The actor reference for potential self-messaging or context.
410        actor_ref: ActorRef<T>,
411    },
412    /// A signal for the actor to stop gracefully after processing existing messages in its mailbox.
413    ///
414    /// The contained `ActorRef<T>` prevents the actor from being dropped until this message is processed.
415    #[allow(dead_code)]
416    StopGracefully(ActorRef<T>),
417}
418
419/// Control signals sent through a dedicated high-priority channel.
420///
421/// These signals are processed with higher priority than regular mailbox messages
422/// to ensure timely actor termination even when the mailbox is full.
423#[derive(Debug)]
424pub(crate) enum ControlSignal {
425    /// A signal for the actor to terminate immediately without processing remaining mailbox messages.
426    Terminate,
427}
428
429/// Type alias for the sender side of an actor's mailbox channel.
430///
431/// This is used by `ActorRef` to send messages to the actor's mailbox.
432pub(crate) type MailboxSender<T> = mpsc::Sender<MailboxMessage<T>>;
433
434/// Global configuration for the default mailbox capacity.
435///
436/// This value can be set once using `set_default_mailbox_capacity()` and will be used
437/// by the `spawn()` function when no specific capacity is provided.
438static CONFIGURED_DEFAULT_MAILBOX_CAPACITY: OnceLock<usize> = OnceLock::new();
439
440/// The default mailbox capacity for actors.
441pub const DEFAULT_MAILBOX_CAPACITY: usize = 32;
442
443/// Sets the global default buffer size for actor mailboxes.
444///
445/// This function can only be called successfully once. Subsequent calls
446/// will return an error. This configured value is used by the `spawn` function
447/// if no specific capacity is provided to `spawn_with_mailbox_capacity`.
448pub fn set_default_mailbox_capacity(size: usize) -> Result<()> {
449    if size == 0 {
450        return Err(Error::MailboxCapacity {
451            message: "Global default mailbox capacity must be greater than 0".to_string(),
452        });
453    }
454
455    CONFIGURED_DEFAULT_MAILBOX_CAPACITY
456        .set(size)
457        .map_err(|_| Error::MailboxCapacity {
458            message: "Global default mailbox capacity has already been set".to_string(),
459        })
460}
461
462/// Spawns a new actor and returns an `ActorRef<T>` to it, along with a `JoinHandle`.
463///
464/// Takes initialization arguments that will be passed to the actor's [`on_start`](crate::Actor::on_start) method.
465/// The `JoinHandle` can be used to await the actor's termination and retrieve
466/// the actor result as an [`ActorResult<T>`](crate::ActorResult).
467pub fn spawn<T: Actor + 'static>(
468    args: T::Args,
469) -> (ActorRef<T>, tokio::task::JoinHandle<ActorResult<T>>) {
470    let capacity = CONFIGURED_DEFAULT_MAILBOX_CAPACITY
471        .get()
472        .copied()
473        .unwrap_or(DEFAULT_MAILBOX_CAPACITY);
474    spawn_with_mailbox_capacity(args, capacity)
475}
476
477/// Spawns a new actor with a specified mailbox capacity and returns an `ActorRef<T>` to it, along with a `JoinHandle`.
478///
479/// Takes initialization arguments that will be passed to the actor's [`on_start`](crate::Actor::on_start) method.
480/// The `JoinHandle` can be used to await the actor's termination and retrieve
481/// the actor result as an [`ActorResult<T>`](crate::ActorResult). Use this version when you need
482/// to control the actor's mailbox capacity.
483pub fn spawn_with_mailbox_capacity<T: Actor + 'static>(
484    args: T::Args,
485    mailbox_capacity: usize,
486) -> (ActorRef<T>, tokio::task::JoinHandle<ActorResult<T>>) {
487    assert!(
488        mailbox_capacity > 0,
489        "Mailbox capacity must be greater than 0"
490    );
491
492    static ACTOR_IDS: AtomicU64 = AtomicU64::new(1);
493
494    let actor_id = Identity::new(
495        ACTOR_IDS.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
496        std::any::type_name::<T>(),
497    );
498
499    let (mailbox_tx, mailbox_rx) = mpsc::channel(mailbox_capacity);
500    let (terminate_tx, terminate_rx) = mpsc::channel::<ControlSignal>(1);
501
502    #[cfg(feature = "metrics")]
503    let metrics = std::sync::Arc::new(metrics::MetricsCollector::new());
504
505    let actor_ref = ActorRef::new(
506        actor_id,
507        mailbox_tx,
508        terminate_tx,
509        #[cfg(feature = "metrics")]
510        metrics,
511    );
512
513    let join_handle = tokio::spawn(crate::actor::run_actor_lifecycle(
514        args,
515        actor_ref.clone(),
516        mailbox_rx,
517        terminate_rx,
518    ));
519
520    (actor_ref, join_handle)
521}