actor_helper/
lib.rs

1
2//! Minimal, self-contained actor runtime used in this crate.
3//!
4//! This module provides a tiny, opinionated actor pattern built on top of
5//! `tokio`, with:
6//!
7//! - A per-actor mailbox (`mpsc::Receiver<Action<A>>`) of boxed async actions.
8//! - A `Handle<A>` you can clone and send across threads to schedule work on the
9//!   actor's single-threaded mutable state.
10//! - Ergonomic macros (`act!`, `act_ok!`) to write actor actions inline.
11//! - Panic capturing and error propagation to callers via `anyhow::Result`.
12//!
13//! The pattern encourages a public API type ("Object") that holds a
14//! `Handle<ObjectActor>` and a private type ("ObjectActor") that owns the
15//! mutable state and the mailbox `Receiver`. The actor implements `Actor` for a
16//! simple `run` loop and is spawned with `tokio::spawn`.
17//!
18//! Example: a simple counter
19//!
20//! ```rust,ignore
21//! use anyhow::{anyhow, Result};
22//! use iroh_lan::actor::{Actor, Handle, Action};
23//! use tokio::sync::mpsc;
24//!
25//! // Public API type (exposed from your module)
26//! pub struct Counter {
27//!     api: Handle<CounterActor>,
28//! }
29//!
30//! impl Counter {
31//!     pub fn new() -> Self {
32//!         // 1) Create channel and api handle
33//!         let (api, rx) = Handle::channel(128);
34//!
35//!         // 2) Create the actor with private state and the mailbox
36//!         let actor = CounterActor { value: 0, rx };
37//!
38//!         // 3) Spawn the run loop
39//!         tokio::spawn(async move {
40//!             let mut actor = actor;
41//!             let _ = actor.run().await;
42//!         });
43//!
44//!         Self { api }
45//!     }
46//!
47//!     // Mutating API method
48//!     pub async fn inc(&self, by: i32) -> Result<()> {
49//!         self.api
50//!             // act_ok! wraps the returned value into Ok(..)
51//!             .call(act_ok!(actor => async move {
52//!                 actor.value += by;
53//!             }))
54//!             .await
55//!     }
56//!
57//!     // Query method returning a value
58//!     pub async fn get(&self) -> Result<i32> {
59//!         self.api
60//!             .call(act_ok!(actor => actor.value))
61//!             .await
62//!     }
63//!
64//!     // An example that can fail
65//!     pub async fn set_non_negative(&self, v: i32) -> Result<()> {
66//!         self.api
67//!             .call(act!(actor => async move {
68//!                 if v < 0 {
69//!                     Err(anyhow!("negative value"))
70//!                 } else {
71//!                     actor.value = v;
72//!                     Ok(())
73//!                 }
74//!             }))
75//!             .await
76//!     }
77//! }
78//!
79//! // Private actor with its state and mailbox
80//! struct CounterActor {
81//!     value: i32,
82//!     rx: mpsc::Receiver<Action<CounterActor>>,
83//! }
84//!
85//! impl Actor for CounterActor {
86//!     async fn run(&mut self) -> Result<()> {
87//!         async move {
88//!             loop {
89//!                 tokio::select! {
90//!                     Some(action) = self.rx.recv() => {
91//!                         action(self).await;
92//!                     }
93//!                     Some(your_bytes) = your_reader.recv() => {
94//!                         // do other background work if needed
95//!                     }
96//!                     // ... other background work ...
97//!                 }
98//!             }
99//!             Ok(())
100//!         }
101//!     }
102//! }
103//!
104//! # async fn _example_usage() -> Result<()> {
105//! #   let c = Counter::new();
106//! #   c.inc(5).await?;
107//! #   assert_eq!(c.get().await?, 5);
108//! #   Ok(())
109//! # }
110//! ```
111//!
112//! Notes
113//! - `Handle::call` captures the call site location and wraps any panic from the
114//!   actor action into an `anyhow::Error` delivered to the caller.
115//! - Actions must complete reasonably promptly; the actor processes actions
116//!   sequentially and a long-running action blocks the mailbox.
117//! - Do not hold references across `.await` inside actions; prefer moving values
118//!   or cloning as needed.
119mod tests;
120
121use std::{boxed, future::Future, pin::Pin};
122
123use anyhow::anyhow;
124use futures::FutureExt;
125use tokio::sync::{mpsc, oneshot};
126
127/// The unboxed future type used for actor actions.
128///
129/// This is a convenience type alias for a `Send` future that returns `T`.
130/// Prefer using [`ActorFut`] which wraps this in a pinned `Box`.
131pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
132
133/// The boxed future type returned by actor actions.
134///
135/// Most APIs take or return `ActorFut<'a, T>` as the standard boxed future
136/// used to execute actor code on the actor thread.
137pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
138
139/// An action scheduled onto an actor.
140///
141/// This is a boxed closure that receives a mutable reference to the actor
142/// instance and returns a boxed future to execute. The future typically returns
143/// `anyhow::Result<T>`, but `T` is generic here to support helper adapters.
144pub type Action<A> = Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
145
146/// Convert a future yielding `anyhow::Result<T>` into the standard boxed
147/// [`ActorFut`].
148///
149/// Use this when your actor code already returns `Result<T, anyhow::Error>`.
150///
151/// See also: [`into_actor_fut_ok`], [`into_actor_fut_unit_ok`].
152pub fn into_actor_fut_res<'a, Fut, T>(
153    fut: Fut,
154) -> ActorFut<'a, anyhow::Result<T>>
155where
156    Fut: Future<Output = anyhow::Result<T>> + Send + 'a,
157    T: Send + 'a,
158{
159    Box::pin(fut)
160}
161
162/// Convert a future yielding `T` into a boxed future yielding `anyhow::Result<T>`.
163///
164/// This wraps the output in `Ok(T)` for convenience.
165pub fn into_actor_fut_ok<'a, Fut, T>(
166    fut: Fut,
167) -> ActorFut<'a, anyhow::Result<T>>
168where
169    Fut: Future<Output = T> + Send + 'a,
170    T: Send + 'a,
171{
172    Box::pin(async move { Ok::<T, anyhow::Error>(fut.await) })
173}
174
175/// Convert a unit future (`Future<Output = ()>`) into a boxed future yielding
176/// `anyhow::Result<()>`.
177///
178/// Convenient when an action does not return any value.
179pub fn into_actor_fut_unit_ok<'a, Fut>(
180    fut: Fut,
181) -> ActorFut<'a, anyhow::Result<()>>
182where
183    Fut: Future<Output = ()> + Send + 'a,
184{
185    Box::pin(async move {
186        fut.await;
187        Ok::<(), anyhow::Error>(())
188    })
189}
190
191/// Write an actor action that returns `anyhow::Result<T>`.
192///
193/// This macro helps create the closure expected by [`Handle::call`] when your
194/// action returns `Result<T, anyhow::Error>`. Use `act_ok!` if your action
195/// returns a plain `T`.
196///
197/// Examples
198/// ```rust,ignore
199/// Self.api.call(act!(actor => actor.do_something())).await
200/// // or with a block
201/// Self.api.call(act!(actor => async move { actor.do_something().await })).await
202/// ```
203#[macro_export]
204macro_rules! act {
205    // takes single expression that yields Result<T, anyhow::Error>
206    ($actor:ident => $expr:expr) => {{
207        move |$actor| $crate::into_actor_fut_res(($expr))
208    }};
209
210    // takes a block that yields Result<T, anyhow::Error> and can use ?
211    ($actor:ident => $body:block) => {{
212        move |$actor| $crate::into_actor_fut_res($body)
213    }};
214}
215/// Write an actor action that returns a plain `T` (wrapped as `Ok(T)`).
216///
217/// This macro is like [`act!`] but for actions that do not naturally return
218/// an `anyhow::Result`. The output is automatically wrapped into `Ok(..)`.
219///
220/// Examples
221/// ```rust,ignore
222/// Self.api.call(act_ok!(actor => actor.get_value())).await
223/// // or with a block
224/// Self.api.call(act_ok!(actor => { 
225///   let v = actor.get_value(); 
226///   v 
227/// })).await
228/// ```
229#[macro_export]
230macro_rules! act_ok {
231    // takes single expression that yields T
232    ($actor:ident => $expr:expr) => {{
233        move |$actor| $crate::into_actor_fut_ok(($expr))
234    }};
235
236    // takes a block that yields T (no = u) map to Ok(T)
237    ($actor:ident => $body:block) => {{
238        move |$actor| $crate::into_actor_fut_ok($body)
239    }};
240}
241
242/// A minimal trait implemented by concrete actor types to run their mailbox.
243///
244/// `async fn run()` is intendet to be alive as long as the actor is alive,
245/// processing actions from the mailbox sequentially.
246/// 
247/// *Note:* This is also the place to do any continuous background work *your* actor
248/// needs to perform.
249///
250/// The returned future should poll the mailbox and execute enqueued actions
251/// until the channel closes.
252pub trait Actor: Send + 'static {
253    fn run(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send;
254}
255
256#[derive(Debug)]
257/// A clonable handle to schedule actions onto an actor of type `A`.
258///
259/// Use [`Handle::channel`] to create a `(Handle<A>, mpsc::Receiver<Action<A>>)`
260/// pair, then build your actor with the receiver and spawn its run loop. The
261/// handle may be cloned and used concurrently from many tasks/threads.
262pub struct Handle<A> {
263    tx: mpsc::Sender<Action<A>>,
264}
265
266impl<A> Clone for Handle<A> {
267    fn clone(&self) -> Self {
268        Self {
269            tx: self.tx.clone(),
270        }
271    }
272}
273
274impl<A> Handle<A>
275where
276    A: Send + 'static,
277{
278    /// Create a new actor handle and mailbox receiver with the given capacity.
279    ///
280    /// Returns the `(Handle<A>, mpsc::Receiver<Action<A>>)` pair. Pass the
281    /// receiver to your actor and spawn its `run` loop like this:
282    /// ```rust,ignore
283    /// let (handle, rx) = Handle::<MyActor>::channel(128);
284    /// let actor = MyActor { /* ... */ rx };
285    /// tokio::spawn(async move { let _ = actor.run().await; });
286    /// ```
287    pub fn channel(capacity: usize) -> (Self, mpsc::Receiver<Action<A>>) {
288        let (tx, rx) = mpsc::channel(capacity);
289        (Self { tx }, rx)
290    }
291
292    /// Schedule an action to run on the actor and await its result.
293    ///
294    /// - `f` is a closure that receives `&mut A` and returns a boxed future
295    ///   yielding `anyhow::Result<R>`. Use the [`act!`] and [`act_ok!`] macros
296    ///   to write these concisely.
297    /// - If the actor panics while processing the action, the panic is caught
298    ///   and returned as an `anyhow::Error` with the call site location.
299    /// - If the actor task has stopped, an error is returned.
300    pub async fn call<R, F>(&self, f: F) -> anyhow::Result<R>
301    where
302        F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, anyhow::Result<R>> + Send + 'static,
303        R: Send + 'static,
304    {
305        let (rtx, rrx) = oneshot::channel::<anyhow::Result<R>>();
306        let loc = std::panic::Location::caller();
307
308        self.tx
309            .send(Box::new(move |actor: &mut A| {
310                Box::pin(async move {
311                    let res = std::panic::AssertUnwindSafe(async move { f(actor).await })
312                        .catch_unwind()
313                        .await
314                        .map_err(|p| {
315                            let msg = if let Some(s) = p.downcast_ref::<&str>() {
316                                (*s).to_string()
317                            } else if let Some(s) = p.downcast_ref::<String>() {
318                                s.clone()
319                            } else {
320                                "unknown panic".to_string()
321                            };
322                            anyhow!(
323                                "panic in actor call at {}:{}: {}",
324                                loc.file(),
325                                loc.line(),
326                                msg
327                            )
328                        })
329                        .and_then(|r| r);
330
331                    let _ = rtx.send(res);
332                })
333            }))
334            .await
335            .map_err(|_| anyhow!("actor stopped (call send at {}:{})", loc.file(), loc.line()))?;
336
337        rrx.await
338            .map_err(|_| anyhow!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))?
339    }
340}