actor_helper/
lib.rs

1//! Lightweight, runtime-agnostic actor pattern with dynamic error types.
2//!
3//! Works with `tokio`, `async-std`, or blocking threads. Uses [`flume`] channels.
4//!
5//! # Quick Start
6//!
7//! ```rust,ignore
8//! use std::io;
9//! use actor_helper::{Actor, Handle, Receiver, act_ok, spawn_actor};
10//!
11//! // Public API
12//! pub struct Counter {
13//!     handle: Handle<CounterActor, io::Error>,
14//! }
15//!
16//! impl Counter {
17//!     pub fn new() -> Self {
18//!         let (handle, rx) = Handle::channel();
19//!         spawn_actor(CounterActor { value: 0, rx });
20//!         Self { handle }
21//!     }
22//!
23//!     pub async fn increment(&self, by: i32) -> io::Result<()> {
24//!         self.handle.call(act_ok!(actor => async move {
25//!             actor.value += by;
26//!         })).await
27//!     }
28//!
29//!     pub async fn get(&self) -> io::Result<i32> {
30//!         self.handle.call(act_ok!(actor => async move { actor.value })).await
31//!     }
32//! }
33//!
34//! // Private actor
35//! struct CounterActor {
36//!     value: i32,
37//!     rx: Receiver<actor_helper::Action<CounterActor>>,
38//! }
39//!
40//! impl Actor<io::Error> for CounterActor {
41//!     async fn run(&mut self) -> io::Result<()> {
42//!         loop {
43//!             tokio::select! {
44//!                 Ok(action) = self.rx.recv_async() => action(self).await,
45//!             }
46//!         }
47//!     }
48//! }
49//! ```
50//!
51//! # Error Types
52//!
53//! Use any error type implementing [`ActorError`]:
54//! - `io::Error` (default)
55//! - `anyhow::Error` (with `anyhow` feature)
56//! - `String`
57//! - `Box<dyn Error>`
58//!
59//! # Blocking/Sync
60//!
61//! ```rust,ignore
62//! use actor_helper::{ActorSync, block_on};
63//!
64//! impl ActorSync<io::Error> for CounterActor {
65//!     fn run_blocking(&mut self) -> io::Result<()> {
66//!         loop {
67//!             if let Ok(action) = self.rx.recv() {
68//!                 block_on(action(self));
69//!             }
70//!         }
71//!     }
72//! }
73//!
74//! // Use call_blocking instead of call
75//! handle.call_blocking(act_ok!(actor => async move { actor.value }))?;
76//! ```
77//!
78//! # Notes
79//!
80//! - Actions run sequentially, long tasks block the mailbox
81//! - Panics are caught and converted to errors with location info
82//! - `call` requires `tokio` or `async-std` feature
83//! - `call_blocking` has no feature requirements
84use std::{boxed, future::Future, io, pin::Pin};
85
86use futures_util::FutureExt;
87
88/// Flume unbounded sender.
89pub type Sender<T> = flume::Sender<T>;
90
91/// Flume unbounded receiver. Actors receive actions via `Receiver<Action<Self>>`.
92///
93/// Use `recv()` for blocking or `recv_async()` for async.
94pub type Receiver<T> = flume::Receiver<T>;
95
96/// Execute async futures in blocking context. Required for `ActorSync`.
97pub use futures_executor::block_on;
98
99/// Convert panic/actor-stop messages into your error type.
100///
101/// Implemented for `io::Error`, `anyhow::Error`, `String`, and `Box<dyn Error>`.
102///
103/// # Example
104/// ```rust,ignore
105/// impl ActorError for MyError {
106///     fn from_actor_message(msg: String) -> Self {
107///         MyError::ActorPanic(msg)
108///     }
109/// }
110/// ```
111pub trait ActorError: Sized + Send + 'static {
112    fn from_actor_message(msg: String) -> Self;
113}
114
115// Implementations for common types
116impl ActorError for io::Error {
117    fn from_actor_message(msg: String) -> Self {
118        io::Error::new(io::ErrorKind::Other, msg)
119    }
120}
121
122#[cfg(feature = "anyhow")]
123impl ActorError for anyhow::Error {
124    fn from_actor_message(msg: String) -> Self {
125        anyhow::anyhow!(msg)
126    }
127}
128
129impl ActorError for String {
130    fn from_actor_message(msg: String) -> Self {
131        msg
132    }
133}
134
135impl ActorError for Box<dyn std::error::Error + Send + Sync> {
136    fn from_actor_message(msg: String) -> Self {
137        Box::new(io::Error::new(io::ErrorKind::Other, msg))
138    }
139}
140
141/// Unboxed future type for actor actions.
142pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
143
144/// Pinned, boxed future used by action helpers and macros.
145pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
146
147/// Action sent to an actor: `FnOnce(&mut A) -> Future<()>`.
148///
149/// Created via `act!` or `act_ok!` macros. Return values flow through oneshot channels.
150pub type Action<A> =
151    Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
152
153/// Box a future yielding `Result<T, E>`. Used by `act!` macro.
154#[doc(hidden)]
155pub fn into_actor_fut_res<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T,E>>
156where
157    Fut: Future<Output = Result<T,E>> + Send + 'a,
158    T: Send + 'a,
159{
160    Box::pin(fut)
161}
162
163/// Box a future yielding `T`, wrap as `Ok(T)`. Used by `act_ok!` macro.
164#[doc(hidden)]
165pub fn into_actor_fut_ok<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
166where
167    Fut: Future<Output = T> + Send + 'a,
168    T: Send + 'a,
169    E: ActorError,
170{
171    return Box::pin(async move { Ok(fut.await) });
172}
173
174/// Create action returning `Result<T, E>`.
175///
176/// # Example
177/// ```rust,ignore
178/// handle.call(act!(actor => async move {
179///     if actor.value < 0 {
180///         Err(io::Error::new(io::ErrorKind::Other, "negative"))
181///     } else {
182///         Ok(actor.value)
183///     }
184/// })).await?
185/// ```
186#[macro_export]
187macro_rules! act {
188    ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_res(($expr)) }};
189    ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_res($body) }};
190}
191
192/// Create action returning `T`, auto-wrapped as `Ok(T)`.
193///
194/// # Example
195/// ```rust,ignore
196/// handle.call(act_ok!(actor => async move {
197///     actor.value += 1;
198///     actor.value
199/// })).await?
200/// ```
201#[macro_export]
202macro_rules! act_ok {
203    ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_ok(($expr)) }};
204    ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_ok($body) }};
205}
206
207/// Async actor trait. Loop forever receiving and executing actions.
208///
209/// # Example
210/// ```rust,ignore
211/// impl Actor<io::Error> for MyActor {
212///     async fn run(&mut self) -> io::Result<()> {
213///         loop {
214///             tokio::select! {
215///                 Ok(action) = self.rx.recv_async() => action(self).await,
216///                 _ = tokio::signal::ctrl_c() => {
217///                     break;
218///                 }
219///             }
220///         }
221///         Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
222///     }
223/// }
224/// ```
225#[cfg(any(feature = "tokio", feature = "async-std"))]
226pub trait Actor<E>: Send + 'static {
227    fn run(&mut self) -> impl Future<Output = Result<(),E>> + Send;
228}
229
230/// Blocking actor trait. Loop receiving actions with `recv()` and executing them with `block_on()`.
231///
232/// # Example
233/// ```rust,ignore
234/// impl ActorSync<io::Error> for MyActor {
235///     fn run_blocking(&mut self) -> io::Result<()> {
236///         while let Ok(action) = self.rx.recv() {
237///             block_on(action(self));
238///         }
239///         Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
240///     }
241/// }
242/// ```
243pub trait ActorSync<E>: Send + 'static {
244    fn run_blocking(&mut self) -> Result<(), E>;
245}
246
247/// Spawn blocking actor on new thread.
248pub fn spawn_actor_blocking<A, E>(actor: A) -> std::thread::JoinHandle<Result<(), E>>
249where
250    A: ActorSync<E>,
251    E: ActorError,
252{
253    std::thread::spawn(move || {
254        let mut actor = actor;
255        actor.run_blocking()
256    })
257}
258
259/// Spawn async actor on tokio runtime.
260#[cfg(all(feature = "tokio", not(feature = "async-std")))]
261pub fn spawn_actor<A, E>(actor: A) -> tokio::task::JoinHandle<Result<(), E>>
262where
263    A: Actor<E>,
264    E: ActorError,
265{
266    tokio::task::spawn(async move {
267        let mut actor = actor;
268        actor.run().await
269    })
270}
271
272/// Spawn async actor on async-std runtime.
273#[cfg(all(feature = "async-std", not(feature = "tokio")))]
274pub fn spawn_actor<A, E>(actor: A) -> async_std::task::JoinHandle<Result<(), E>>
275where
276    A: Actor<E>,
277    E: ActorError,
278{
279    async_std::task::spawn(async move {
280        let mut actor = actor;
281        actor.run().await
282    })
283}
284
285/// Cloneable handle to send actions to actor `A` with error type `E`.
286///
287/// Thread-safe. Actions run sequentially on the actor.
288#[derive(Debug)]
289pub struct Handle<A, E> 
290where
291    A: Send + 'static,
292    E: ActorError,{
293    tx: Sender<Action<A>>,
294    _phantom: std::marker::PhantomData<E>,
295}
296
297impl<A, E> Clone for Handle<A, E> 
298where
299    A: Send + 'static,
300    E: ActorError,{
301    fn clone(&self) -> Self {
302        Self {
303            tx: self.tx.clone(),
304            _phantom: std::marker::PhantomData,
305        }
306    }
307}
308
309impl<A, E> Handle<A, E>
310where
311    A: Send + 'static,
312    E: ActorError,
313{
314    /// Create handle and receiver.
315    ///
316    /// # Example
317    /// ```rust,ignore
318    /// let (handle, rx) = Handle::<MyActor, io::Error>::channel();
319    /// spawn_actor(MyActor { state: 0, rx });
320    /// ```
321    pub fn channel() -> (Self, Receiver<Action<A>>) {
322        let (tx, rx) = flume::unbounded::<Action<A>>();
323        (Self { tx, _phantom: std::marker::PhantomData }, rx)
324    }
325
326    /// Internal: wraps action with panic catching and result forwarding.
327    fn base_call<R, F>(
328        &self,
329        f: F,
330    ) -> Result<
331        (
332            Receiver<Result<R, E>>,
333            &'static std::panic::Location<'static>,
334        ),
335        E,
336    >
337    where
338        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
339        R: Send + 'static,
340    {
341        let (rtx, rrx) = flume::unbounded();
342        let loc = std::panic::Location::caller();
343
344        self.tx
345            .send(Box::new(move |actor: &mut A| {
346                Box::pin(async move {
347                    // Execute the action and catch any panics
348                    let panic_result = std::panic::AssertUnwindSafe(async move { f(actor).await })
349                        .catch_unwind()
350                        .await;
351
352                    let res = match panic_result {
353                        Ok(action_result) => action_result,
354                        Err(panic_payload) => {
355                            // Convert panic payload to error message
356                            let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
357                                (*s).to_string()
358                            } else if let Some(s) = panic_payload.downcast_ref::<String>() {
359                                s.clone()
360                            } else {
361                                "unknown panic".to_string()
362                            };
363                            Err(E::from_actor_message(format!(
364                                "panic in actor call at {}:{}: {}",
365                                loc.file(),
366                                loc.line(),
367                                msg
368                            )))
369                        }
370                    };
371
372                    // Send result back to caller (ignore send errors - caller may have dropped)
373                    let _ = rtx.send(res);
374                })
375            }))
376            .map_err(|_| {
377                E::from_actor_message(format!("actor stopped (call send at {}:{})", loc.file(), loc.line()))
378            })?;
379        Ok((rrx, loc))
380    }
381
382    /// Send action, block until complete. Works without async runtime.
383    ///
384    /// # Example
385    /// ```rust,ignore
386    /// handle.call_blocking(act_ok!(actor => async move {
387    ///     actor.value += 1;
388    ///     actor.value
389    /// }))?
390    /// ```
391    pub fn call_blocking<R, F>(&self, f: F) -> Result<R, E>
392    where
393        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
394        R: Send + 'static,
395    {
396        let (rrx, loc) = self.base_call(f)?;
397        rrx.recv().map_err(|_| {
398            E::from_actor_message(format!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))
399        })?
400    }
401
402    /// Send action, await result. Requires `tokio` or `async-std` feature.
403    ///
404    /// # Example
405    /// ```rust,ignore
406    /// handle.call(act_ok!(actor => async move {
407    ///     actor.value += 1;
408    ///     actor.value
409    /// })).await?
410    /// ```
411    #[cfg(any(feature = "tokio", feature = "async-std"))]
412    pub async fn call<R, F>(&self, f: F) -> Result<R, E>
413    where
414        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
415        R: Send + 'static,
416    {
417        let (rrx, loc) = self.base_call(f)?;
418        rrx.recv_async().await.map_err(|_| {
419            E::from_actor_message(format!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))
420        })?
421    }
422}