actor_helper/lib.rs
1//! Lightweight, runtime-agnostic actor pattern with dynamic error types.
2//!
3//! Works with `tokio`, `async-std`, or blocking threads. Uses [`flume`] channels.
4//!
5//! # Quick Start
6//!
7//! ```rust,ignore
8//! use std::io;
9//! use actor_helper::{Actor, Handle, Receiver, act_ok, spawn_actor};
10//!
11//! // Public API
12//! pub struct Counter {
13//! handle: Handle<CounterActor, io::Error>,
14//! }
15//!
16//! impl Counter {
17//! pub fn new() -> Self {
18//! let (handle, rx) = Handle::channel();
19//! spawn_actor(CounterActor { value: 0, rx });
20//! Self { handle }
21//! }
22//!
23//! pub async fn increment(&self, by: i32) -> io::Result<()> {
24//! self.handle.call(act_ok!(actor => async move {
25//! actor.value += by;
26//! })).await
27//! }
28//!
29//! pub async fn get(&self) -> io::Result<i32> {
30//! self.handle.call(act_ok!(actor => async move { actor.value })).await
31//! }
32//! }
33//!
34//! // Private actor
35//! struct CounterActor {
36//! value: i32,
37//! rx: Receiver<actor_helper::Action<CounterActor>>,
38//! }
39//!
40//! impl Actor<io::Error> for CounterActor {
41//! async fn run(&mut self) -> io::Result<()> {
42//! loop {
43//! tokio::select! {
44//! Ok(action) = self.rx.recv_async() => action(self).await,
45//! }
46//! }
47//! }
48//! }
49//! ```
50//!
51//! # Error Types
52//!
53//! Use any error type implementing [`ActorError`]:
54//! - `io::Error` (default)
55//! - `anyhow::Error` (with `anyhow` feature)
56//! - `String`
57//! - `Box<dyn Error>`
58//!
59//! # Blocking/Sync
60//!
61//! ```rust,ignore
62//! use actor_helper::{ActorSync, block_on};
63//!
64//! impl ActorSync<io::Error> for CounterActor {
65//! fn run_blocking(&mut self) -> io::Result<()> {
66//! loop {
67//! if let Ok(action) = self.rx.recv() {
68//! block_on(action(self));
69//! }
70//! }
71//! }
72//! }
73//!
74//! // Use call_blocking instead of call
75//! handle.call_blocking(act_ok!(actor => async move { actor.value }))?;
76//! ```
77//!
78//! # Notes
79//!
80//! - Actions run sequentially, long tasks block the mailbox
81//! - Panics are caught and converted to errors with location info
82//! - `call` requires `tokio` or `async-std` feature
83//! - `call_blocking` has no feature requirements
84use std::{boxed, future::Future, io, pin::Pin};
85
86use futures_util::FutureExt;
87
88/// Flume unbounded sender.
89pub type Sender<T> = flume::Sender<T>;
90
91/// Flume unbounded receiver. Actors receive actions via `Receiver<Action<Self>>`.
92///
93/// Use `recv()` for blocking or `recv_async()` for async.
94pub type Receiver<T> = flume::Receiver<T>;
95
96/// Execute async futures in blocking context. Required for `ActorSync`.
97pub use futures_executor::block_on;
98
99/// Convert panic/actor-stop messages into your error type.
100///
101/// Implemented for `io::Error`, `anyhow::Error`, `String`, and `Box<dyn Error>`.
102///
103/// # Example
104/// ```rust,ignore
105/// impl ActorError for MyError {
106/// fn from_actor_message(msg: String) -> Self {
107/// MyError::ActorPanic(msg)
108/// }
109/// }
110/// ```
111pub trait ActorError: Sized + Send + 'static {
112 fn from_actor_message(msg: String) -> Self;
113}
114
115// Implementations for common types
116impl ActorError for io::Error {
117 fn from_actor_message(msg: String) -> Self {
118 io::Error::new(io::ErrorKind::Other, msg)
119 }
120}
121
122#[cfg(feature = "anyhow")]
123impl ActorError for anyhow::Error {
124 fn from_actor_message(msg: String) -> Self {
125 anyhow::anyhow!(msg)
126 }
127}
128
129impl ActorError for String {
130 fn from_actor_message(msg: String) -> Self {
131 msg
132 }
133}
134
135impl ActorError for Box<dyn std::error::Error + Send + Sync> {
136 fn from_actor_message(msg: String) -> Self {
137 Box::new(io::Error::new(io::ErrorKind::Other, msg))
138 }
139}
140
141/// Unboxed future type for actor actions.
142pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
143
144/// Pinned, boxed future used by action helpers and macros.
145pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
146
147/// Action sent to an actor: `FnOnce(&mut A) -> Future<()>`.
148///
149/// Created via `act!` or `act_ok!` macros. Return values flow through oneshot channels.
150pub type Action<A> =
151 Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
152
153/// Box a future yielding `Result<T, E>`. Used by `act!` macro.
154#[doc(hidden)]
155pub fn into_actor_fut_res<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T,E>>
156where
157 Fut: Future<Output = Result<T,E>> + Send + 'a,
158 T: Send + 'a,
159{
160 Box::pin(fut)
161}
162
163/// Box a future yielding `T`, wrap as `Ok(T)`. Used by `act_ok!` macro.
164#[doc(hidden)]
165pub fn into_actor_fut_ok<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
166where
167 Fut: Future<Output = T> + Send + 'a,
168 T: Send + 'a,
169 E: ActorError,
170{
171 return Box::pin(async move { Ok(fut.await) });
172}
173
174/// Create action returning `Result<T, E>`.
175///
176/// # Example
177/// ```rust,ignore
178/// handle.call(act!(actor => async move {
179/// if actor.value < 0 {
180/// Err(io::Error::new(io::ErrorKind::Other, "negative"))
181/// } else {
182/// Ok(actor.value)
183/// }
184/// })).await?
185/// ```
186#[macro_export]
187macro_rules! act {
188 ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_res(($expr)) }};
189 ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_res($body) }};
190}
191
192/// Create action returning `T`, auto-wrapped as `Ok(T)`.
193///
194/// # Example
195/// ```rust,ignore
196/// handle.call(act_ok!(actor => async move {
197/// actor.value += 1;
198/// actor.value
199/// })).await?
200/// ```
201#[macro_export]
202macro_rules! act_ok {
203 ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_ok(($expr)) }};
204 ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_ok($body) }};
205}
206
207/// Async actor trait. Loop forever receiving and executing actions.
208///
209/// # Example
210/// ```rust,ignore
211/// impl Actor<io::Error> for MyActor {
212/// async fn run(&mut self) -> io::Result<()> {
213/// loop {
214/// tokio::select! {
215/// Ok(action) = self.rx.recv_async() => action(self).await,
216/// _ = tokio::signal::ctrl_c() => {
217/// break;
218/// }
219/// }
220/// }
221/// Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
222/// }
223/// }
224/// ```
225#[cfg(any(feature = "tokio", feature = "async-std"))]
226pub trait Actor<E>: Send + 'static {
227 fn run(&mut self) -> impl Future<Output = Result<(),E>> + Send;
228}
229
230/// Blocking actor trait. Loop receiving actions with `recv()` and executing them with `block_on()`.
231///
232/// # Example
233/// ```rust,ignore
234/// impl ActorSync<io::Error> for MyActor {
235/// fn run_blocking(&mut self) -> io::Result<()> {
236/// while let Ok(action) = self.rx.recv() {
237/// block_on(action(self));
238/// }
239/// Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
240/// }
241/// }
242/// ```
243pub trait ActorSync<E>: Send + 'static {
244 fn run_blocking(&mut self) -> Result<(), E>;
245}
246
247/// Spawn blocking actor on new thread.
248pub fn spawn_actor_blocking<A, E>(actor: A) -> std::thread::JoinHandle<Result<(), E>>
249where
250 A: ActorSync<E>,
251 E: ActorError,
252{
253 std::thread::spawn(move || {
254 let mut actor = actor;
255 actor.run_blocking()
256 })
257}
258
259/// Spawn async actor on tokio runtime.
260#[cfg(all(feature = "tokio", not(feature = "async-std")))]
261pub fn spawn_actor<A, E>(actor: A) -> tokio::task::JoinHandle<Result<(), E>>
262where
263 A: Actor<E>,
264 E: ActorError,
265{
266 tokio::task::spawn(async move {
267 let mut actor = actor;
268 actor.run().await
269 })
270}
271
272/// Spawn async actor on async-std runtime.
273#[cfg(all(feature = "async-std", not(feature = "tokio")))]
274pub fn spawn_actor<A, E>(actor: A) -> async_std::task::JoinHandle<Result<(), E>>
275where
276 A: Actor<E>,
277 E: ActorError,
278{
279 async_std::task::spawn(async move {
280 let mut actor = actor;
281 actor.run().await
282 })
283}
284
285/// Cloneable handle to send actions to actor `A` with error type `E`.
286///
287/// Thread-safe. Actions run sequentially on the actor.
288#[derive(Debug)]
289pub struct Handle<A, E>
290where
291 A: Send + 'static,
292 E: ActorError,{
293 tx: Sender<Action<A>>,
294 _phantom: std::marker::PhantomData<E>,
295}
296
297impl<A, E> Clone for Handle<A, E>
298where
299 A: Send + 'static,
300 E: ActorError,{
301 fn clone(&self) -> Self {
302 Self {
303 tx: self.tx.clone(),
304 _phantom: std::marker::PhantomData,
305 }
306 }
307}
308
309impl<A, E> Handle<A, E>
310where
311 A: Send + 'static,
312 E: ActorError,
313{
314 /// Create handle and receiver.
315 ///
316 /// # Example
317 /// ```rust,ignore
318 /// let (handle, rx) = Handle::<MyActor, io::Error>::channel();
319 /// spawn_actor(MyActor { state: 0, rx });
320 /// ```
321 pub fn channel() -> (Self, Receiver<Action<A>>) {
322 let (tx, rx) = flume::unbounded::<Action<A>>();
323 (Self { tx, _phantom: std::marker::PhantomData }, rx)
324 }
325
326 /// Internal: wraps action with panic catching and result forwarding.
327 fn base_call<R, F>(
328 &self,
329 f: F,
330 ) -> Result<
331 (
332 Receiver<Result<R, E>>,
333 &'static std::panic::Location<'static>,
334 ),
335 E,
336 >
337 where
338 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
339 R: Send + 'static,
340 {
341 let (rtx, rrx) = flume::unbounded();
342 let loc = std::panic::Location::caller();
343
344 self.tx
345 .send(Box::new(move |actor: &mut A| {
346 Box::pin(async move {
347 // Execute the action and catch any panics
348 let panic_result = std::panic::AssertUnwindSafe(async move { f(actor).await })
349 .catch_unwind()
350 .await;
351
352 let res = match panic_result {
353 Ok(action_result) => action_result,
354 Err(panic_payload) => {
355 // Convert panic payload to error message
356 let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
357 (*s).to_string()
358 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
359 s.clone()
360 } else {
361 "unknown panic".to_string()
362 };
363 Err(E::from_actor_message(format!(
364 "panic in actor call at {}:{}: {}",
365 loc.file(),
366 loc.line(),
367 msg
368 )))
369 }
370 };
371
372 // Send result back to caller (ignore send errors - caller may have dropped)
373 let _ = rtx.send(res);
374 })
375 }))
376 .map_err(|_| {
377 E::from_actor_message(format!("actor stopped (call send at {}:{})", loc.file(), loc.line()))
378 })?;
379 Ok((rrx, loc))
380 }
381
382 /// Send action, block until complete. Works without async runtime.
383 ///
384 /// # Example
385 /// ```rust,ignore
386 /// handle.call_blocking(act_ok!(actor => async move {
387 /// actor.value += 1;
388 /// actor.value
389 /// }))?
390 /// ```
391 pub fn call_blocking<R, F>(&self, f: F) -> Result<R, E>
392 where
393 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
394 R: Send + 'static,
395 {
396 let (rrx, loc) = self.base_call(f)?;
397 rrx.recv().map_err(|_| {
398 E::from_actor_message(format!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))
399 })?
400 }
401
402 /// Send action, await result. Requires `tokio` or `async-std` feature.
403 ///
404 /// # Example
405 /// ```rust,ignore
406 /// handle.call(act_ok!(actor => async move {
407 /// actor.value += 1;
408 /// actor.value
409 /// })).await?
410 /// ```
411 #[cfg(any(feature = "tokio", feature = "async-std"))]
412 pub async fn call<R, F>(&self, f: F) -> Result<R, E>
413 where
414 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
415 R: Send + 'static,
416 {
417 let (rrx, loc) = self.base_call(f)?;
418 rrx.recv_async().await.map_err(|_| {
419 E::from_actor_message(format!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))
420 })?
421 }
422}