rsactor/lib.rs
1// Copyright 2022 Jeff Kim <hiking90@gmail.com>
2// SPDX-License-Identifier: Apache-2.0
3
4//! # rsActor
5//! A Simple and Efficient In-Process Actor Model Implementation for Rust
6//!
7//! `rsActor` is a lightweight, Tokio-based actor framework in Rust focused on providing a simple
8//! and efficient actor model for local, in-process systems. It emphasizes clean message-passing
9//! semantics and straightforward actor lifecycle management while maintaining high performance for
10//! Rust applications.
11//!
12//! ## Features
13//!
14//! - **Asynchronous Actors**: Actors run in their own asynchronous tasks.
15//! - **Message Passing**: Actors communicate by sending and receiving messages.
16//! - [`tell`](actor_ref::ActorRef::tell): Send a message without waiting for a reply (fire-and-forget).
17//! - [`tell_with_timeout`](actor_ref::ActorRef::tell_with_timeout): Send a message without waiting for a reply, with a specified timeout.
18//! - [`ask`](actor_ref::ActorRef::ask): Send a message and await a reply.
19//! - [`ask_with_timeout`](actor_ref::ActorRef::ask_with_timeout): Send a message and await a reply, with a specified timeout.
20//! - [`tell_blocking`](actor_ref::ActorRef::tell_blocking): Blocking version of `tell` for use in [`tokio::task::spawn_blocking`] tasks.
21//! - [`ask_blocking`](actor_ref::ActorRef::ask_blocking): Blocking version of `ask` for use in [`tokio::task::spawn_blocking`] tasks.
22//! - **Straightforward Actor Lifecycle**: Actors have [`on_start`](Actor::on_start), [`on_run`](Actor::on_run),
23//! and [`on_stop`](Actor::on_stop) lifecycle hooks that provide a clean and intuitive actor lifecycle management system.
24//! The framework manages the execution flow while giving developers full control over actor behavior.
25//! - **Graceful Shutdown & Kill**: Actors can be stopped gracefully or killed immediately.
26//! - **Typed Messages**: Messages are strongly typed, and replies are also typed.
27//! - **Macro for Message Handling**:
28//! - [`message_handlers`] attribute macro with `#[handler]` method attributes for automatic message handling (recommended)
29//! - **Type Safety Features**: [`ActorRef<T>`] provides compile-time type safety with zero runtime overhead
30//! - **Optional Tracing Support**: Built-in observability using the [`tracing`](https://crates.io/crates/tracing) crate (enable with `tracing` feature):
31//! - Actor lifecycle event tracing (start, stop, different termination scenarios)
32//! - Message handling with timing and performance metrics
33//! - Reply processing and error handling tracing
34//! - Structured, non-redundant logs for easier debugging and monitoring
35//! - **Dead Letter Tracking**: Automatic logging of undelivered messages via [`DeadLetterReason`]:
36//! - All failed message deliveries are logged with actor and message type information
37//! - Helps identify stopped actors, timeouts, and dropped replies
38//! - Zero overhead on successful message delivery (hot path optimization)
39//! - **Enhanced Error Debugging**: Rich error information via [`Error::debugging_tips()`](Error::debugging_tips) and [`Error::is_retryable()`](Error::is_retryable):
40//! - Actionable debugging tips for each error type
41//! - Retry classification for timeout errors
42//!
43//! ## Core Concepts
44//!
45//! - **[`Actor`]**: Trait defining actor behavior and lifecycle hooks ([`on_start`](Actor::on_start) required, [`on_run`](Actor::on_run) optional).
46//! - **[`Message<M>`](actor::Message)**: Trait for handling a message type `M` and defining its reply type.
47//! - **[`ActorRef`]**: Handle for sending messages to an actor.
48//! - **[`spawn`]**: Function to create and start an actor, returning an [`ActorRef`] and a `JoinHandle`.
49//! - **[`ActorResult`]**: Enum representing the outcome of an actor's lifecycle (e.g., completed, failed).
50//!
51//! ## Getting Started
52//!
53//! ### Message Handling with `#[message_handlers]`
54//!
55//! rsActor uses the `#[message_handlers]` attribute macro combined with `#[handler]` method attributes
56//! for message handling. This is **required** for all actors and offers several advantages:
57//!
58//! - **Selective Processing**: Only methods marked with `#[handler]` are treated as message handlers.
59//! - **Clean Separation**: Regular methods can coexist with message handlers within the same `impl` block.
60//! - **Automatic Generation**: The macro automatically generates the necessary `Message` trait implementations and handler registrations.
61//! - **Type Safety**: Message handler signatures are verified at compile time.
62//! - **Reduced Boilerplate**: Eliminates the need to manually implement `Message` traits.
63//!
64//! ### Option A: Simple Actor with `#[derive(Actor)]`
65//!
66//! For simple actors that don't need complex initialization logic, use the `#[derive(Actor)]` macro:
67//!
68//! ```rust
69//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
70//!
71//! // 1. Define your actor struct and derive Actor
72//! #[derive(Actor)]
73//! struct MyActor {
74//! name: String,
75//! count: u32,
76//! }
77//!
78//! // 2. Define message types
79//! struct GetName;
80//! struct Increment;
81//!
82//! // 3. Use message_handlers macro with handler attributes
83//! #[message_handlers]
84//! impl MyActor {
85//! #[handler]
86//! async fn handle_get_name(&mut self, _msg: GetName, _: &ActorRef<Self>) -> String {
87//! self.name.clone()
88//! }
89//!
90//! #[handler]
91//! async fn handle_increment(&mut self, _msg: Increment, _: &ActorRef<Self>) -> () {
92//! self.count += 1;
93//! }
94//!
95//! // Regular methods can coexist without the #[handler] attribute
96//! fn get_count(&self) -> u32 {
97//! self.count
98//! }
99//! }
100//!
101//! // 4. Usage
102//! # #[tokio::main]
103//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
104//! let actor_instance = MyActor { name: "Test".to_string(), count: 0 };
105//! let (actor_ref, _join_handle) = spawn::<MyActor>(actor_instance);
106//!
107//! let name = actor_ref.ask(GetName).await?;
108//! actor_ref.tell(Increment).await?;
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ### Option B: Custom Actor Implementation with Manual Initialization
114//!
115//! For actors that need custom initialization logic, implement the `Actor` trait manually:
116//!
117//! ```rust
118//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
119//! use anyhow::Result;
120//!
121//! // 1. Define your actor struct
122//! #[derive(Debug)] // Added Debug for printing the actor in ActorResult
123//! struct MyActor {
124//! data: String,
125//! count: u32,
126//! }
127//!
128//! // 2. Implement the Actor trait manually
129//! impl Actor for MyActor {
130//! type Args = String;
131//! type Error = anyhow::Error;
132//!
133//! // on_start is required and must be implemented.
134//! // on_run and on_stop are optional and have default implementations.
135//! async fn on_start(initial_data: Self::Args, actor_ref: &ActorRef<Self>) -> std::result::Result<Self, Self::Error> {
136//! println!("MyActor (id: {}) started with data: '{}'", actor_ref.identity(), initial_data);
137//! Ok(MyActor {
138//! data: initial_data,
139//! count: 0,
140//! })
141//! }
142//! }
143//!
144//! // 3. Define message types
145//! struct GetData;
146//! struct IncrementMsg(u32);
147//!
148//! // 4. Use message_handlers macro for message handling
149//! #[message_handlers]
150//! impl MyActor {
151//! #[handler]
152//! async fn handle_get_data(&mut self, _msg: GetData, _actor_ref: &ActorRef<Self>) -> String {
153//! self.data.clone()
154//! }
155//!
156//! #[handler]
157//! async fn handle_increment(&mut self, msg: IncrementMsg, _actor_ref: &ActorRef<Self>) -> u32 {
158//! self.count += msg.0;
159//! self.count
160//! }
161//! }
162//!
163//! // 5. Usage
164//! # #[tokio::main]
165//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
166//! let (actor_ref, join_handle) = spawn::<MyActor>("initial data".to_string());
167//!
168//! let current_data: String = actor_ref.ask(GetData).await?;
169//! let new_count: u32 = actor_ref.ask(IncrementMsg(5)).await?;
170//!
171//! actor_ref.stop().await?;
172//! let actor_result = join_handle.await?;
173//! # Ok(())
174//! # }
175//! ```
176//!
177//! Both approaches also work with enums, making it easy to create state machine actors:
178//!
179//! ```rust
180//! use rsactor::{Actor, ActorRef, message_handlers, spawn};
181//!
182//! // Using message_handlers macro approach
183//! #[derive(Actor, Clone)]
184//! enum StateActor {
185//! Idle,
186//! Processing(String),
187//! Completed(i32),
188//! }
189//!
190//! struct GetState;
191//! struct StartProcessing(String);
192//! struct Complete(i32);
193//!
194//! #[message_handlers]
195//! impl StateActor {
196//! #[handler]
197//! async fn handle_get_state(&mut self, _msg: GetState, _: &ActorRef<Self>) -> StateActor {
198//! self.clone()
199//! }
200//!
201//! #[handler]
202//! async fn handle_start_processing(&mut self, msg: StartProcessing, _: &ActorRef<Self>) -> () {
203//! *self = StateActor::Processing(msg.0);
204//! }
205//!
206//! #[handler]
207//! async fn handle_complete(&mut self, msg: Complete, _: &ActorRef<Self>) -> () {
208//! *self = StateActor::Completed(msg.0);
209//! }
210//! }
211//! ```
212//!
213//! ## Tracing Support
214//!
215//! rsActor provides optional tracing support for comprehensive observability. Enable it with the `tracing` feature:
216//!
217//! ```toml
218//! [dependencies]
219//! rsactor = { version = "0.13", features = ["tracing"] }
220//! tracing = "0.1"
221//! tracing-subscriber = "0.3"
222//! ```
223//!
224//! When enabled, rsActor emits structured trace events for:
225//! - Actor lifecycle events (start, stop, termination scenarios)
226//! - Message sending and handling with timing information
227//! - Reply processing and error handling
228//! - Performance metrics (message processing duration)
229//!
230//! All examples support tracing. Here's the integration pattern:
231//!
232//! ```rust,no_run
233//! #[tokio::main]
234//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
235//! // Initialize tracing subscriber to see logs
236//! // The `tracing` crate is always available for logging
237//! tracing_subscriber::fmt()
238//! .with_max_level(tracing::Level::DEBUG)
239//! .with_target(false)
240//! .init();
241//!
242//! // Your existing actor code here...
243//! // Logs are automatically emitted via tracing::warn!, tracing::error!, etc.
244//! Ok(())
245//! }
246//! ```
247//!
248//! Run any example with debug logging:
249//! ```bash
250//! RUST_LOG=debug cargo run --example basic
251//! ```
252//!
253//! Enable instrumentation spans with the `tracing` feature:
254//! ```bash
255//! RUST_LOG=debug cargo run --example basic --features tracing
256//! ```
257//!
258//! This crate-level documentation provides an overview of [`rsActor`](crate).
259//! For more details on specific components, please refer to their individual
260//! documentation.
261
262mod error;
263pub use error::{Error, Result};
264
265mod dead_letter;
266pub use dead_letter::DeadLetterReason;
267
268// Re-export test utilities when test-utils feature is enabled
269#[cfg(any(test, feature = "test-utils"))]
270pub use dead_letter::{dead_letter_count, reset_dead_letter_count};
271
272#[cfg(feature = "metrics")]
273mod metrics;
274#[cfg(feature = "metrics")]
275pub use metrics::MetricsSnapshot;
276
277mod actor_ref;
278pub use actor_ref::{ActorRef, ActorWeak};
279
280mod actor_result;
281pub use actor_result::{ActorResult, FailurePhase};
282
283mod actor;
284pub use actor::{Actor, Message};
285
286mod handler;
287pub use handler::{AskHandler, TellHandler, WeakAskHandler, WeakTellHandler};
288
289mod actor_control;
290pub use actor_control::{ActorControl, WeakActorControl};
291
292use futures::FutureExt;
293// Re-export derive macros for convenient access
294pub use rsactor_derive::{message_handlers, Actor};
295
296use std::{fmt::Debug, future::Future, sync::atomic::AtomicU64, sync::OnceLock};
297
298use tokio::sync::{mpsc, oneshot};
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub struct Identity {
302 /// Unique ID of the actor
303 pub id: u64,
304 /// Type name of the actor
305 pub type_name: &'static str,
306}
307
308impl Identity {
309 /// Creates a new `Identity` with the given ID and type name.
310 pub fn new(id: u64, type_name: &'static str) -> Self {
311 Identity { id, type_name }
312 }
313
314 /// Returns the type name of the actor.
315 pub fn name(&self) -> &'static str {
316 self.type_name
317 }
318}
319
320impl std::fmt::Display for Identity {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 write!(f, "{}(#{})", self.type_name, self.id)
323 }
324}
325
326/// Type-erased payload handler trait for dynamic message dispatch.
327///
328/// This trait allows different message types to be handled uniformly within the actor system,
329/// enabling storage of various message types in the same mailbox while preserving type safety
330/// through the `Message` trait implementation.
331trait PayloadHandler<A>: Send
332where
333 A: Actor,
334{
335 /// Handles the message by calling the appropriate handler and optionally sending a reply.
336 ///
337 /// # Parameters
338 /// - `actor`: Mutable reference to the actor instance
339 /// - `actor_ref`: Reference to the actor for potential self-messaging
340 /// - `reply_channel`: Optional channel to send the result back for `ask` operations
341 fn handle_message(
342 self: Box<Self>,
343 actor: &mut A,
344 actor_ref: ActorRef<A>,
345 reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
346 ) -> BoxFuture<'_, ()>;
347}
348
349/// A boxed future that is Send and can be stored in collections.
350///
351/// This type alias is used throughout the handler traits for object-safe async methods.
352/// Identical to `futures::future::BoxFuture` but defined locally to avoid exposing
353/// the `futures` crate in the public API surface.
354pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 'a>>;
355
356impl<A, T> PayloadHandler<A> for T
357where
358 A: Actor + Message<T> + 'static,
359 T: Send + 'static,
360{
361 fn handle_message(
362 self: Box<Self>,
363 actor: &mut A,
364 actor_ref: ActorRef<A>,
365 reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
366 ) -> BoxFuture<'_, ()> {
367 async move {
368 let result = Message::handle(actor, *self, &actor_ref).await;
369 if let Some(channel) = reply_channel {
370 match channel.send(Box::new(result)) {
371 Ok(_) => {
372 #[cfg(feature = "tracing")]
373 tracing::debug!(
374 actor = %actor_ref.identity(),
375 "Reply sent successfully"
376 );
377 }
378 Err(_) => {
379 tracing::error!(
380 actor = %actor_ref.identity(),
381 message_type = %std::any::type_name::<T>(),
382 "Failed to send reply - receiver dropped"
383 );
384 }
385 }
386 } else {
387 <A as Message<T>>::on_tell_result(&result, &actor_ref);
388 }
389 }
390 .boxed()
391 }
392}
393
394/// Represents messages that can be sent to an actor's mailbox.
395///
396/// This enum includes both user-defined messages (wrapped in `Envelope`)
397/// and control messages like `StopGracefully`. The `Terminate` control signal
398/// is handled through a separate dedicated channel.
399pub(crate) enum MailboxMessage<T>
400where
401 T: Actor + 'static,
402{
403 /// A user-defined message to be processed by the actor.
404 Envelope {
405 /// The message payload containing the actual message data.
406 payload: Box<dyn PayloadHandler<T>>,
407 /// Optional channel to send the reply back to the caller (used for `ask` operations).
408 reply_channel: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
409 /// The actor reference for potential self-messaging or context.
410 actor_ref: ActorRef<T>,
411 },
412 /// A signal for the actor to stop gracefully after processing existing messages in its mailbox.
413 ///
414 /// The contained `ActorRef<T>` prevents the actor from being dropped until this message is processed.
415 #[allow(dead_code)]
416 StopGracefully(ActorRef<T>),
417}
418
419/// Control signals sent through a dedicated high-priority channel.
420///
421/// These signals are processed with higher priority than regular mailbox messages
422/// to ensure timely actor termination even when the mailbox is full.
423#[derive(Debug)]
424pub(crate) enum ControlSignal {
425 /// A signal for the actor to terminate immediately without processing remaining mailbox messages.
426 Terminate,
427}
428
429/// Type alias for the sender side of an actor's mailbox channel.
430///
431/// This is used by `ActorRef` to send messages to the actor's mailbox.
432pub(crate) type MailboxSender<T> = mpsc::Sender<MailboxMessage<T>>;
433
434/// Global configuration for the default mailbox capacity.
435///
436/// This value can be set once using `set_default_mailbox_capacity()` and will be used
437/// by the `spawn()` function when no specific capacity is provided.
438static CONFIGURED_DEFAULT_MAILBOX_CAPACITY: OnceLock<usize> = OnceLock::new();
439
440/// The default mailbox capacity for actors.
441pub const DEFAULT_MAILBOX_CAPACITY: usize = 32;
442
443/// Sets the global default buffer size for actor mailboxes.
444///
445/// This function can only be called successfully once. Subsequent calls
446/// will return an error. This configured value is used by the `spawn` function
447/// if no specific capacity is provided to `spawn_with_mailbox_capacity`.
448pub fn set_default_mailbox_capacity(size: usize) -> Result<()> {
449 if size == 0 {
450 return Err(Error::MailboxCapacity {
451 message: "Global default mailbox capacity must be greater than 0".to_string(),
452 });
453 }
454
455 CONFIGURED_DEFAULT_MAILBOX_CAPACITY
456 .set(size)
457 .map_err(|_| Error::MailboxCapacity {
458 message: "Global default mailbox capacity has already been set".to_string(),
459 })
460}
461
462/// Spawns a new actor and returns an `ActorRef<T>` to it, along with a `JoinHandle`.
463///
464/// Takes initialization arguments that will be passed to the actor's [`on_start`](crate::Actor::on_start) method.
465/// The `JoinHandle` can be used to await the actor's termination and retrieve
466/// the actor result as an [`ActorResult<T>`](crate::ActorResult).
467pub fn spawn<T: Actor + 'static>(
468 args: T::Args,
469) -> (ActorRef<T>, tokio::task::JoinHandle<ActorResult<T>>) {
470 let capacity = CONFIGURED_DEFAULT_MAILBOX_CAPACITY
471 .get()
472 .copied()
473 .unwrap_or(DEFAULT_MAILBOX_CAPACITY);
474 spawn_with_mailbox_capacity(args, capacity)
475}
476
477/// Spawns a new actor with a specified mailbox capacity and returns an `ActorRef<T>` to it, along with a `JoinHandle`.
478///
479/// Takes initialization arguments that will be passed to the actor's [`on_start`](crate::Actor::on_start) method.
480/// The `JoinHandle` can be used to await the actor's termination and retrieve
481/// the actor result as an [`ActorResult<T>`](crate::ActorResult). Use this version when you need
482/// to control the actor's mailbox capacity.
483pub fn spawn_with_mailbox_capacity<T: Actor + 'static>(
484 args: T::Args,
485 mailbox_capacity: usize,
486) -> (ActorRef<T>, tokio::task::JoinHandle<ActorResult<T>>) {
487 assert!(
488 mailbox_capacity > 0,
489 "Mailbox capacity must be greater than 0"
490 );
491
492 static ACTOR_IDS: AtomicU64 = AtomicU64::new(1);
493
494 let actor_id = Identity::new(
495 ACTOR_IDS.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
496 std::any::type_name::<T>(),
497 );
498
499 let (mailbox_tx, mailbox_rx) = mpsc::channel(mailbox_capacity);
500 let (terminate_tx, terminate_rx) = mpsc::channel::<ControlSignal>(1);
501
502 #[cfg(feature = "metrics")]
503 let metrics = std::sync::Arc::new(metrics::MetricsCollector::new());
504
505 let actor_ref = ActorRef::new(
506 actor_id,
507 mailbox_tx,
508 terminate_tx,
509 #[cfg(feature = "metrics")]
510 metrics,
511 );
512
513 let join_handle = tokio::spawn(crate::actor::run_actor_lifecycle(
514 args,
515 actor_ref.clone(),
516 mailbox_rx,
517 terminate_rx,
518 ));
519
520 (actor_ref, join_handle)
521}