Skip to main content

actor_helper/
lib.rs

1//! Lightweight, runtime-agnostic actor pattern with dynamic error types.
2//!
3//! Works with `tokio`, `async-std`, or blocking threads. Uses [`flume`] channels.
4//!
5//! # Quick Start
6//!
7//! ```rust,ignore
8//! use std::io;
9//! use actor_helper::{Actor, Handle, Receiver, act_ok, spawn_actor};
10//!
11//! // Public API
12//! pub struct Counter {
13//!     handle: Handle<CounterActor, io::Error>,
14//! }
15//!
16//! impl Counter {
17//!     pub fn new() -> Self {
18//!         let (handle, rx) = Handle::channel();
19//!         spawn_actor(CounterActor { value: 0, rx });
20//!         Self { handle }
21//!     }
22//!
23//!     pub async fn increment(&self, by: i32) -> io::Result<()> {
24//!         self.handle.call(act_ok!(actor => async move {
25//!             actor.value += by;
26//!         })).await
27//!     }
28//!
29//!     pub async fn get(&self) -> io::Result<i32> {
30//!         self.handle.call(act_ok!(actor => async move { actor.value })).await
31//!     }
32//! }
33//!
34//! // Private actor
35//! struct CounterActor {
36//!     value: i32,
37//!     rx: Receiver<actor_helper::Action<CounterActor>>,
38//! }
39//!
40//! impl Actor<io::Error> for CounterActor {
41//!     async fn run(&mut self) -> io::Result<()> {
42//!         loop {
43//!             tokio::select! {
44//!                 Ok(action) = self.rx.recv_async() => action(self).await,
45//!                 else => break Ok(()),
46//!             }
47//!         }
48//!     }
49//! }
50//! ```
51//!
52//! # Error Types
53//!
54//! Use any error type implementing [`ActorError`]:
55//! - `io::Error` (default)
56//! - `anyhow::Error` (with `anyhow` feature)
57//! - `String`
58//! - `Box<dyn Error>`
59//!
60//! # Blocking/Sync
61//!
62//! ```rust,ignore
63//! use actor_helper::{ActorSync, block_on};
64//!
65//! impl ActorSync<io::Error> for CounterActor {
66//!     fn run_blocking(&mut self) -> io::Result<()> {
67//!         loop {
68//!             if let Ok(action) = self.rx.recv() {
69//!                 block_on(action(self));
70//!             }
71//!         }
72//!     }
73//! }
74//!
75//! // Use call_blocking instead of call
76//! handle.call_blocking(act_ok!(actor => async move { actor.value }))?;
77//! ```
78//!
79//! # Notes
80//!
81//! - Actions run sequentially, long tasks block the mailbox
82//! - Panics are caught and converted to errors with location info
83//! - `call` requires `tokio` or `async-std` feature
84//! - `call_blocking` has no feature requirements
85use std::{any::Any, boxed, future::Future, io, pin::Pin};
86
87use futures_util::FutureExt;
88
89/// Flume unbounded sender.
90pub type Sender<T> = flume::Sender<T>;
91
92/// Flume unbounded receiver. Actors receive actions via `Receiver<Action<Self>>`.
93///
94/// Use `recv()` for blocking or `recv_async()` for async.
95pub type Receiver<T> = flume::Receiver<T>;
96
97/// Execute async futures in blocking context. Required for `ActorSync`.
98pub use futures_executor::block_on;
99
100/// Convert panic/actor-stop messages into your error type.
101///
102/// Implemented for `io::Error`, `anyhow::Error`, `String`, and `Box<dyn Error>`.
103///
104/// # Example
105/// ```rust,ignore
106/// impl ActorError for MyError {
107///     fn from_actor_message(msg: String) -> Self {
108///         MyError::ActorPanic(msg)
109///     }
110/// }
111/// ```
112pub trait ActorError: Sized + Send + 'static {
113    fn from_actor_message(msg: String) -> Self;
114}
115
116// Implementations for common types
117impl ActorError for io::Error {
118    fn from_actor_message(msg: String) -> Self {
119        io::Error::other(msg)
120    }
121}
122
123#[cfg(feature = "anyhow")]
124impl ActorError for anyhow::Error {
125    fn from_actor_message(msg: String) -> Self {
126        anyhow::anyhow!(msg)
127    }
128}
129
130impl ActorError for String {
131    fn from_actor_message(msg: String) -> Self {
132        msg
133    }
134}
135
136impl ActorError for Box<dyn std::error::Error + Send + Sync> {
137    fn from_actor_message(msg: String) -> Self {
138        Box::new(io::Error::other(msg))
139    }
140}
141
142/// Unboxed future type for actor actions.
143pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
144
145/// Pinned, boxed future used by action helpers and macros.
146pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
147
148/// Action sent to an actor: `FnOnce(&mut A) -> Future<()>`.
149///
150/// Created via `act!` or `act_ok!` macros. Return values flow through oneshot channels.
151pub type Action<A> = Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
152
153/// Internal result type used by `Handle::base_call`.
154type BaseCallResult<R, E> = Result<
155    (
156        Receiver<Result<R, E>>,
157        &'static std::panic::Location<'static>,
158    ),
159    E,
160>;
161
162/// Box a future yielding `Result<T, E>`. Used by `act!` macro.
163#[doc(hidden)]
164pub fn into_actor_fut_res<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
165where
166    Fut: Future<Output = Result<T, E>> + Send + 'a,
167    T: Send + 'a,
168{
169    Box::pin(fut)
170}
171
172/// Box a future yielding `T`, wrap as `Ok(T)`. Used by `act_ok!` macro.
173#[doc(hidden)]
174pub fn into_actor_fut_ok<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
175where
176    Fut: Future<Output = T> + Send + 'a,
177    T: Send + 'a,
178    E: ActorError,
179{
180    Box::pin(async move { Ok(fut.await) })
181}
182
183/// Create action returning `Result<T, E>`.
184///
185/// # Example
186/// ```rust,ignore
187/// handle.call(act!(actor => async move {
188///     if actor.value < 0 {
189///         Err(io::Error::new(io::ErrorKind::Other, "negative"))
190///     } else {
191///         Ok(actor.value)
192///     }
193/// })).await?
194/// ```
195#[macro_export]
196macro_rules! act {
197    ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_res(($expr)) }};
198    ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_res($body) }};
199}
200
201/// Create action returning `T`, auto-wrapped as `Ok(T)`.
202///
203/// # Example
204/// ```rust,ignore
205/// handle.call(act_ok!(actor => async move {
206///     actor.value += 1;
207///     actor.value
208/// })).await?
209/// ```
210#[macro_export]
211macro_rules! act_ok {
212    ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_ok(($expr)) }};
213    ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_ok($body) }};
214}
215
216/// Async actor trait. Loop forever receiving and executing actions.
217///
218/// # Example
219/// ```rust,ignore
220/// impl Actor<io::Error> for MyActor {
221///     async fn run(&mut self) -> io::Result<()> {
222///         loop {
223///             tokio::select! {
224///                 Ok(action) = self.rx.recv_async() => action(self).await,
225///                 else => break Ok(()),
226///             }
227///         }
228///         Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
229///     }
230/// }
231/// ```
232#[cfg(any(feature = "tokio", feature = "async-std"))]
233pub trait Actor<E>: Send + 'static {
234    fn run(&mut self) -> impl Future<Output = Result<(), E>> + Send;
235}
236
237/// Blocking actor trait. Loop receiving actions with `recv()` and executing them with `block_on()`.
238///
239/// # Example
240/// ```rust,ignore
241/// impl ActorSync<io::Error> for MyActor {
242///     fn run_blocking(&mut self) -> io::Result<()> {
243///         while let Ok(action) = self.rx.recv() {
244///             block_on(action(self));
245///         }
246///         Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
247///     }
248/// }
249/// ```
250pub trait ActorSync<E>: Send + 'static {
251    fn run_blocking(&mut self) -> Result<(), E>;
252}
253
254fn panic_payload_message(panic_payload: Box<dyn Any + Send>) -> String {
255    if let Some(s) = panic_payload.downcast_ref::<&str>() {
256        (*s).to_string()
257    } else if let Some(s) = panic_payload.downcast_ref::<String>() {
258        s.clone()
259    } else {
260        "unknown panic".to_string()
261    }
262}
263
264fn actor_loop_panic<E: ActorError>(panic_payload: Box<dyn Any + Send>) -> E {
265    E::from_actor_message(format!(
266        "panic in actor loop: {}",
267        panic_payload_message(panic_payload)
268    ))
269}
270
271/// Spawn blocking actor on new thread.
272pub fn spawn_actor_blocking<A, E>(actor: A) -> std::thread::JoinHandle<Result<(), E>>
273where
274    A: ActorSync<E>,
275    E: ActorError,
276{
277    std::thread::spawn(move || {
278        let mut actor = actor;
279        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| actor.run_blocking())) {
280            Ok(result) => result,
281            Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
282        }
283    })
284}
285
286/// Spawn async actor on tokio runtime.
287#[cfg(all(feature = "tokio", not(feature = "async-std")))]
288pub fn spawn_actor<A, E>(actor: A) -> tokio::task::JoinHandle<Result<(), E>>
289where
290    A: Actor<E>,
291    E: ActorError,
292{
293    tokio::task::spawn(async move {
294        let mut actor = actor;
295        match std::panic::AssertUnwindSafe(async move { actor.run().await })
296            .catch_unwind()
297            .await
298        {
299            Ok(result) => result,
300            Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
301        }
302    })
303}
304
305/// Spawn async actor on async-std runtime.
306#[cfg(all(feature = "async-std", not(feature = "tokio")))]
307pub fn spawn_actor<A, E>(actor: A) -> async_std::task::JoinHandle<Result<(), E>>
308where
309    A: Actor<E>,
310    E: ActorError,
311{
312    async_std::task::spawn(async move {
313        let mut actor = actor;
314        match std::panic::AssertUnwindSafe(async move { actor.run().await })
315            .catch_unwind()
316            .await
317        {
318            Ok(result) => result,
319            Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
320        }
321    })
322}
323
324/// Cloneable handle to send actions to actor `A` with error type `E`.
325///
326/// Thread-safe. Actions run sequentially on the actor.
327#[derive(Debug)]
328pub struct Handle<A, E>
329where
330    A: Send + 'static,
331    E: ActorError,
332{
333    tx: Sender<Action<A>>,
334    _phantom: std::marker::PhantomData<E>,
335}
336
337impl<A, E> Clone for Handle<A, E>
338where
339    A: Send + 'static,
340    E: ActorError,
341{
342    fn clone(&self) -> Self {
343        Self {
344            tx: self.tx.clone(),
345            _phantom: std::marker::PhantomData,
346        }
347    }
348}
349
350impl<A, E> PartialEq for Handle<A, E>
351where
352    A: Send + 'static,
353    E: ActorError,
354{
355    fn eq(&self, other: &Self) -> bool {
356        self.tx.same_channel(&other.tx)
357    }
358}
359
360impl<A, E> Eq for Handle<A, E>
361where
362    A: Send + 'static,
363    E: ActorError,
364{
365}
366
367impl<A, E> Handle<A, E>
368where
369    A: Send + 'static,
370    E: ActorError,
371{
372    /// Create handle and receiver.
373    ///
374    /// # Example
375    /// ```rust,ignore
376    /// let (handle, rx) = Handle::<MyActor, io::Error>::channel();
377    /// spawn_actor(MyActor { state: 0, rx });
378    /// ```
379    pub fn channel() -> (Self, Receiver<Action<A>>) {
380        let (tx, rx) = flume::unbounded::<Action<A>>();
381        (
382            Self {
383                tx,
384                _phantom: std::marker::PhantomData,
385            },
386            rx,
387        )
388    }
389
390    /// Internal: wraps action with panic catching and result forwarding.
391    fn base_call<R, F>(&self, f: F) -> BaseCallResult<R, E>
392    where
393        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
394        R: Send + 'static,
395    {
396        let (rtx, rrx) = flume::unbounded();
397        let loc = std::panic::Location::caller();
398
399        self.tx
400            .send(Box::new(move |actor: &mut A| {
401                Box::pin(async move {
402                    // Execute the action and catch any panics
403                    let panic_result = std::panic::AssertUnwindSafe(async move { f(actor).await })
404                        .catch_unwind()
405                        .await;
406
407                    let res = match panic_result {
408                        Ok(action_result) => action_result,
409                        Err(panic_payload) => {
410                            // Convert panic payload to error message
411                            let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
412                                (*s).to_string()
413                            } else if let Some(s) = panic_payload.downcast_ref::<String>() {
414                                s.clone()
415                            } else {
416                                "unknown panic".to_string()
417                            };
418                            Err(E::from_actor_message(format!(
419                                "panic in actor call at {}:{}: {}",
420                                loc.file(),
421                                loc.line(),
422                                msg
423                            )))
424                        }
425                    };
426
427                    // Send result back to caller (ignore send errors - caller may have dropped)
428                    let _ = rtx.send(res);
429                })
430            }))
431            .map_err(|_| {
432                E::from_actor_message(format!(
433                    "actor stopped (call send at {}:{})",
434                    loc.file(),
435                    loc.line()
436                ))
437            })?;
438        Ok((rrx, loc))
439    }
440
441    /// Send action, block until complete. Works without async runtime.
442    ///
443    /// # Example
444    /// ```rust,ignore
445    /// handle.call_blocking(act_ok!(actor => async move {
446    ///     actor.value += 1;
447    ///     actor.value
448    /// }))?
449    /// ```
450    pub fn call_blocking<R, F>(&self, f: F) -> Result<R, E>
451    where
452        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
453        R: Send + 'static,
454    {
455        let (rrx, loc) = self.base_call(f)?;
456        rrx.recv().map_err(|_| {
457            E::from_actor_message(format!(
458                "actor stopped (call recv at {}:{})",
459                loc.file(),
460                loc.line()
461            ))
462        })?
463    }
464
465    /// Send action, await result. Requires `tokio` or `async-std` feature.
466    ///
467    /// # Example
468    /// ```rust,ignore
469    /// handle.call(act_ok!(actor => async move {
470    ///     actor.value += 1;
471    ///     actor.value
472    /// })).await?
473    /// ```
474    #[cfg(any(feature = "tokio", feature = "async-std"))]
475    pub async fn call<R, F>(&self, f: F) -> Result<R, E>
476    where
477        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
478        R: Send + 'static,
479    {
480        let (rrx, loc) = self.base_call(f)?;
481        rrx.recv_async().await.map_err(|_| {
482            E::from_actor_message(format!(
483                "actor stopped (call recv at {}:{})",
484                loc.file(),
485                loc.line()
486            ))
487        })?
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::Handle;
494
495    #[derive(Debug)]
496    struct TestActor;
497
498    #[test]
499    fn handle_equality_uses_channel_identity() {
500        let (h1, _rx1) = Handle::<TestActor, std::io::Error>::channel();
501        let h2 = h1.clone();
502        let (h3, _rx3) = Handle::<TestActor, std::io::Error>::channel();
503
504        assert_eq!(h1, h2);
505        assert_ne!(h1, h3);
506    }
507}