actor_helper/lib.rs
1//! Lightweight, runtime-agnostic actor pattern with dynamic error types.
2//!
3//! Works with `tokio`, `async-std`, or blocking threads. Uses [`flume`] channels.
4//!
5//! # Quick Start
6//!
7//! ```rust,ignore
8//! use std::io;
9//! use actor_helper::{Actor, Handle, Receiver, act_ok, spawn_actor};
10//!
11//! // Public API
12//! pub struct Counter {
13//! handle: Handle<CounterActor, io::Error>,
14//! }
15//!
16//! impl Counter {
17//! pub fn new() -> Self {
18//! let (handle, rx) = Handle::channel();
19//! spawn_actor(CounterActor { value: 0, rx });
20//! Self { handle }
21//! }
22//!
23//! pub async fn increment(&self, by: i32) -> io::Result<()> {
24//! self.handle.call(act_ok!(actor => async move {
25//! actor.value += by;
26//! })).await
27//! }
28//!
29//! pub async fn get(&self) -> io::Result<i32> {
30//! self.handle.call(act_ok!(actor => async move { actor.value })).await
31//! }
32//! }
33//!
34//! // Private actor
35//! struct CounterActor {
36//! value: i32,
37//! rx: Receiver<actor_helper::Action<CounterActor>>,
38//! }
39//!
40//! impl Actor<io::Error> for CounterActor {
41//! async fn run(&mut self) -> io::Result<()> {
42//! loop {
43//! tokio::select! {
44//! Ok(action) = self.rx.recv_async() => action(self).await,
45//! else => break Ok(()),
46//! }
47//! }
48//! }
49//! }
50//! ```
51//!
52//! # Error Types
53//!
54//! Use any error type implementing [`ActorError`]:
55//! - `io::Error` (default)
56//! - `anyhow::Error` (with `anyhow` feature)
57//! - `String`
58//! - `Box<dyn Error>`
59//!
60//! # Blocking/Sync
61//!
62//! ```rust,ignore
63//! use actor_helper::{ActorSync, block_on};
64//!
65//! impl ActorSync<io::Error> for CounterActor {
66//! fn run_blocking(&mut self) -> io::Result<()> {
67//! loop {
68//! if let Ok(action) = self.rx.recv() {
69//! block_on(action(self));
70//! }
71//! }
72//! }
73//! }
74//!
75//! // Use call_blocking instead of call
76//! handle.call_blocking(act_ok!(actor => async move { actor.value }))?;
77//! ```
78//!
79//! # Notes
80//!
81//! - Actions run sequentially, long tasks block the mailbox
82//! - Panics are caught and converted to errors with location info
83//! - `call` requires `tokio` or `async-std` feature
84//! - `call_blocking` has no feature requirements
85use std::{any::Any, boxed, future::Future, io, pin::Pin};
86
87use futures_util::FutureExt;
88
89/// Flume unbounded sender.
90pub type Sender<T> = flume::Sender<T>;
91
92/// Flume unbounded receiver. Actors receive actions via `Receiver<Action<Self>>`.
93///
94/// Use `recv()` for blocking or `recv_async()` for async.
95pub type Receiver<T> = flume::Receiver<T>;
96
97/// Execute async futures in blocking context. Required for `ActorSync`.
98pub use futures_executor::block_on;
99
100/// Convert panic/actor-stop messages into your error type.
101///
102/// Implemented for `io::Error`, `anyhow::Error`, `String`, and `Box<dyn Error>`.
103///
104/// # Example
105/// ```rust,ignore
106/// impl ActorError for MyError {
107/// fn from_actor_message(msg: String) -> Self {
108/// MyError::ActorPanic(msg)
109/// }
110/// }
111/// ```
112pub trait ActorError: Sized + Send + 'static {
113 fn from_actor_message(msg: String) -> Self;
114}
115
116// Implementations for common types
117impl ActorError for io::Error {
118 fn from_actor_message(msg: String) -> Self {
119 io::Error::other(msg)
120 }
121}
122
123#[cfg(feature = "anyhow")]
124impl ActorError for anyhow::Error {
125 fn from_actor_message(msg: String) -> Self {
126 anyhow::anyhow!(msg)
127 }
128}
129
130impl ActorError for String {
131 fn from_actor_message(msg: String) -> Self {
132 msg
133 }
134}
135
136impl ActorError for Box<dyn std::error::Error + Send + Sync> {
137 fn from_actor_message(msg: String) -> Self {
138 Box::new(io::Error::other(msg))
139 }
140}
141
142/// Unboxed future type for actor actions.
143pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
144
145/// Pinned, boxed future used by action helpers and macros.
146pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
147
148/// Action sent to an actor: `FnOnce(&mut A) -> Future<()>`.
149///
150/// Created via `act!` or `act_ok!` macros. Return values flow through oneshot channels.
151pub type Action<A> = Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
152
153/// Internal result type used by `Handle::base_call`.
154type BaseCallResult<R, E> = Result<
155 (
156 Receiver<Result<R, E>>,
157 &'static std::panic::Location<'static>,
158 ),
159 E,
160>;
161
162/// Box a future yielding `Result<T, E>`. Used by `act!` macro.
163#[doc(hidden)]
164pub fn into_actor_fut_res<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
165where
166 Fut: Future<Output = Result<T, E>> + Send + 'a,
167 T: Send + 'a,
168{
169 Box::pin(fut)
170}
171
172/// Box a future yielding `T`, wrap as `Ok(T)`. Used by `act_ok!` macro.
173#[doc(hidden)]
174pub fn into_actor_fut_ok<'a, Fut, T, E>(fut: Fut) -> ActorFut<'a, Result<T, E>>
175where
176 Fut: Future<Output = T> + Send + 'a,
177 T: Send + 'a,
178 E: ActorError,
179{
180 Box::pin(async move { Ok(fut.await) })
181}
182
183/// Create action returning `Result<T, E>`.
184///
185/// # Example
186/// ```rust,ignore
187/// handle.call(act!(actor => async move {
188/// if actor.value < 0 {
189/// Err(io::Error::new(io::ErrorKind::Other, "negative"))
190/// } else {
191/// Ok(actor.value)
192/// }
193/// })).await?
194/// ```
195#[macro_export]
196macro_rules! act {
197 ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_res(($expr)) }};
198 ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_res($body) }};
199}
200
201/// Create action returning `T`, auto-wrapped as `Ok(T)`.
202///
203/// # Example
204/// ```rust,ignore
205/// handle.call(act_ok!(actor => async move {
206/// actor.value += 1;
207/// actor.value
208/// })).await?
209/// ```
210#[macro_export]
211macro_rules! act_ok {
212 ($actor:ident => $expr:expr) => {{ move |$actor| $crate::into_actor_fut_ok(($expr)) }};
213 ($actor:ident => $body:block) => {{ move |$actor| $crate::into_actor_fut_ok($body) }};
214}
215
216/// Async actor trait. Loop forever receiving and executing actions.
217///
218/// # Example
219/// ```rust,ignore
220/// impl Actor<io::Error> for MyActor {
221/// async fn run(&mut self) -> io::Result<()> {
222/// loop {
223/// tokio::select! {
224/// Ok(action) = self.rx.recv_async() => action(self).await,
225/// else => break Ok(()),
226/// }
227/// }
228/// Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
229/// }
230/// }
231/// ```
232#[cfg(any(feature = "tokio", feature = "async-std"))]
233pub trait Actor<E>: Send + 'static {
234 fn run(&mut self) -> impl Future<Output = Result<(), E>> + Send;
235}
236
237/// Blocking actor trait. Loop receiving actions with `recv()` and executing them with `block_on()`.
238///
239/// # Example
240/// ```rust,ignore
241/// impl ActorSync<io::Error> for MyActor {
242/// fn run_blocking(&mut self) -> io::Result<()> {
243/// while let Ok(action) = self.rx.recv() {
244/// block_on(action(self));
245/// }
246/// Err(io::Error::new(io::ErrorKind::Other, "Actor stopped"))
247/// }
248/// }
249/// ```
250pub trait ActorSync<E>: Send + 'static {
251 fn run_blocking(&mut self) -> Result<(), E>;
252}
253
254fn panic_payload_message(panic_payload: Box<dyn Any + Send>) -> String {
255 if let Some(s) = panic_payload.downcast_ref::<&str>() {
256 (*s).to_string()
257 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
258 s.clone()
259 } else {
260 "unknown panic".to_string()
261 }
262}
263
264fn actor_loop_panic<E: ActorError>(panic_payload: Box<dyn Any + Send>) -> E {
265 E::from_actor_message(format!(
266 "panic in actor loop: {}",
267 panic_payload_message(panic_payload)
268 ))
269}
270
271/// Spawn blocking actor on new thread.
272pub fn spawn_actor_blocking<A, E>(actor: A) -> std::thread::JoinHandle<Result<(), E>>
273where
274 A: ActorSync<E>,
275 E: ActorError,
276{
277 std::thread::spawn(move || {
278 let mut actor = actor;
279 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| actor.run_blocking())) {
280 Ok(result) => result,
281 Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
282 }
283 })
284}
285
286/// Spawn async actor on tokio runtime.
287#[cfg(all(feature = "tokio", not(feature = "async-std")))]
288pub fn spawn_actor<A, E>(actor: A) -> tokio::task::JoinHandle<Result<(), E>>
289where
290 A: Actor<E>,
291 E: ActorError,
292{
293 tokio::task::spawn(async move {
294 let mut actor = actor;
295 match std::panic::AssertUnwindSafe(async move { actor.run().await })
296 .catch_unwind()
297 .await
298 {
299 Ok(result) => result,
300 Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
301 }
302 })
303}
304
305/// Spawn async actor on async-std runtime.
306#[cfg(all(feature = "async-std", not(feature = "tokio")))]
307pub fn spawn_actor<A, E>(actor: A) -> async_std::task::JoinHandle<Result<(), E>>
308where
309 A: Actor<E>,
310 E: ActorError,
311{
312 async_std::task::spawn(async move {
313 let mut actor = actor;
314 match std::panic::AssertUnwindSafe(async move { actor.run().await })
315 .catch_unwind()
316 .await
317 {
318 Ok(result) => result,
319 Err(panic_payload) => Err(actor_loop_panic(panic_payload)),
320 }
321 })
322}
323
324/// Cloneable handle to send actions to actor `A` with error type `E`.
325///
326/// Thread-safe. Actions run sequentially on the actor.
327#[derive(Debug)]
328pub struct Handle<A, E>
329where
330 A: Send + 'static,
331 E: ActorError,
332{
333 tx: Sender<Action<A>>,
334 _phantom: std::marker::PhantomData<E>,
335}
336
337impl<A, E> Clone for Handle<A, E>
338where
339 A: Send + 'static,
340 E: ActorError,
341{
342 fn clone(&self) -> Self {
343 Self {
344 tx: self.tx.clone(),
345 _phantom: std::marker::PhantomData,
346 }
347 }
348}
349
350impl<A, E> PartialEq for Handle<A, E>
351where
352 A: Send + 'static,
353 E: ActorError,
354{
355 fn eq(&self, other: &Self) -> bool {
356 self.tx.same_channel(&other.tx)
357 }
358}
359
360impl<A, E> Eq for Handle<A, E>
361where
362 A: Send + 'static,
363 E: ActorError,
364{
365}
366
367impl<A, E> Handle<A, E>
368where
369 A: Send + 'static,
370 E: ActorError,
371{
372 /// Create handle and receiver.
373 ///
374 /// # Example
375 /// ```rust,ignore
376 /// let (handle, rx) = Handle::<MyActor, io::Error>::channel();
377 /// spawn_actor(MyActor { state: 0, rx });
378 /// ```
379 pub fn channel() -> (Self, Receiver<Action<A>>) {
380 let (tx, rx) = flume::unbounded::<Action<A>>();
381 (
382 Self {
383 tx,
384 _phantom: std::marker::PhantomData,
385 },
386 rx,
387 )
388 }
389
390 /// Internal: wraps action with panic catching and result forwarding.
391 fn base_call<R, F>(&self, f: F) -> BaseCallResult<R, E>
392 where
393 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
394 R: Send + 'static,
395 {
396 let (rtx, rrx) = flume::unbounded();
397 let loc = std::panic::Location::caller();
398
399 self.tx
400 .send(Box::new(move |actor: &mut A| {
401 Box::pin(async move {
402 // Execute the action and catch any panics
403 let panic_result = std::panic::AssertUnwindSafe(async move { f(actor).await })
404 .catch_unwind()
405 .await;
406
407 let res = match panic_result {
408 Ok(action_result) => action_result,
409 Err(panic_payload) => {
410 // Convert panic payload to error message
411 let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
412 (*s).to_string()
413 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
414 s.clone()
415 } else {
416 "unknown panic".to_string()
417 };
418 Err(E::from_actor_message(format!(
419 "panic in actor call at {}:{}: {}",
420 loc.file(),
421 loc.line(),
422 msg
423 )))
424 }
425 };
426
427 // Send result back to caller (ignore send errors - caller may have dropped)
428 let _ = rtx.send(res);
429 })
430 }))
431 .map_err(|_| {
432 E::from_actor_message(format!(
433 "actor stopped (call send at {}:{})",
434 loc.file(),
435 loc.line()
436 ))
437 })?;
438 Ok((rrx, loc))
439 }
440
441 /// Send action, block until complete. Works without async runtime.
442 ///
443 /// # Example
444 /// ```rust,ignore
445 /// handle.call_blocking(act_ok!(actor => async move {
446 /// actor.value += 1;
447 /// actor.value
448 /// }))?
449 /// ```
450 pub fn call_blocking<R, F>(&self, f: F) -> Result<R, E>
451 where
452 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
453 R: Send + 'static,
454 {
455 let (rrx, loc) = self.base_call(f)?;
456 rrx.recv().map_err(|_| {
457 E::from_actor_message(format!(
458 "actor stopped (call recv at {}:{})",
459 loc.file(),
460 loc.line()
461 ))
462 })?
463 }
464
465 /// Send action, await result. Requires `tokio` or `async-std` feature.
466 ///
467 /// # Example
468 /// ```rust,ignore
469 /// handle.call(act_ok!(actor => async move {
470 /// actor.value += 1;
471 /// actor.value
472 /// })).await?
473 /// ```
474 #[cfg(any(feature = "tokio", feature = "async-std"))]
475 pub async fn call<R, F>(&self, f: F) -> Result<R, E>
476 where
477 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, Result<R, E>> + Send + 'static,
478 R: Send + 'static,
479 {
480 let (rrx, loc) = self.base_call(f)?;
481 rrx.recv_async().await.map_err(|_| {
482 E::from_actor_message(format!(
483 "actor stopped (call recv at {}:{})",
484 loc.file(),
485 loc.line()
486 ))
487 })?
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::Handle;
494
495 #[derive(Debug)]
496 struct TestActor;
497
498 #[test]
499 fn handle_equality_uses_channel_identity() {
500 let (h1, _rx1) = Handle::<TestActor, std::io::Error>::channel();
501 let h2 = h1.clone();
502 let (h3, _rx3) = Handle::<TestActor, std::io::Error>::channel();
503
504 assert_eq!(h1, h2);
505 assert_ne!(h1, h3);
506 }
507}