actor_helper/lib.rs
1
2//! Minimal, self-contained actor runtime used in this crate.
3//!
4//! This module provides a tiny, opinionated actor pattern built on top of
5//! `tokio`, with:
6//!
7//! - A per-actor mailbox (`mpsc::Receiver<Action<A>>`) of boxed async actions.
8//! - A `Handle<A>` you can clone and send across threads to schedule work on the
9//! actor's single-threaded mutable state.
10//! - Ergonomic macros (`act!`, `act_ok!`) to write actor actions inline.
11//! - Panic capturing and error propagation to callers via `anyhow::Result`.
12//!
13//! The pattern encourages a public API type ("Object") that holds a
14//! `Handle<ObjectActor>` and a private type ("ObjectActor") that owns the
15//! mutable state and the mailbox `Receiver`. The actor implements `Actor` for a
16//! simple `run` loop and is spawned with `tokio::spawn`.
17//!
18//! Example: a simple counter
19//!
20//! ```rust,ignore
21//! use anyhow::{anyhow, Result};
22//! use iroh_lan::actor::{Actor, Handle, Action};
23//! use tokio::sync::mpsc;
24//!
25//! // Public API type (exposed from your module)
26//! pub struct Counter {
27//! api: Handle<CounterActor>,
28//! }
29//!
30//! impl Counter {
31//! pub fn new() -> Self {
32//! // 1) Create channel and api handle
33//! let (api, rx) = Handle::channel(128);
34//!
35//! // 2) Create the actor with private state and the mailbox
36//! let actor = CounterActor { value: 0, rx };
37//!
38//! // 3) Spawn the run loop
39//! tokio::spawn(async move {
40//! let mut actor = actor;
41//! let _ = actor.run().await;
42//! });
43//!
44//! Self { api }
45//! }
46//!
47//! // Mutating API method
48//! pub async fn inc(&self, by: i32) -> Result<()> {
49//! self.api
50//! // act_ok! wraps the returned value into Ok(..)
51//! .call(act_ok!(actor => async move {
52//! actor.value += by;
53//! }))
54//! .await
55//! }
56//!
57//! // Query method returning a value
58//! pub async fn get(&self) -> Result<i32> {
59//! self.api
60//! .call(act_ok!(actor => actor.value))
61//! .await
62//! }
63//!
64//! // An example that can fail
65//! pub async fn set_non_negative(&self, v: i32) -> Result<()> {
66//! self.api
67//! .call(act!(actor => async move {
68//! if v < 0 {
69//! Err(anyhow!("negative value"))
70//! } else {
71//! actor.value = v;
72//! Ok(())
73//! }
74//! }))
75//! .await
76//! }
77//! }
78//!
79//! // Private actor with its state and mailbox
80//! struct CounterActor {
81//! value: i32,
82//! rx: mpsc::Receiver<Action<CounterActor>>,
83//! }
84//!
85//! impl Actor for CounterActor {
86//! async fn run(&mut self) -> Result<()> {
87//! async move {
88//! loop {
89//! tokio::select! {
90//! Some(action) = self.rx.recv() => {
91//! action(self).await;
92//! }
93//! Some(your_bytes) = your_reader.recv() => {
94//! // do other background work if needed
95//! }
96//! // ... other background work ...
97//! }
98//! }
99//! Ok(())
100//! }
101//! }
102//! }
103//!
104//! # async fn _example_usage() -> Result<()> {
105//! # let c = Counter::new();
106//! # c.inc(5).await?;
107//! # assert_eq!(c.get().await?, 5);
108//! # Ok(())
109//! # }
110//! ```
111//!
112//! Notes
113//! - `Handle::call` captures the call site location and wraps any panic from the
114//! actor action into an `anyhow::Error` delivered to the caller.
115//! - Actions must complete reasonably promptly; the actor processes actions
116//! sequentially and a long-running action blocks the mailbox.
117//! - Do not hold references across `.await` inside actions; prefer moving values
118//! or cloning as needed.
119mod tests;
120
121use std::{boxed, future::Future, pin::Pin};
122
123use anyhow::anyhow;
124use futures::FutureExt;
125use tokio::sync::{mpsc, oneshot};
126
127/// The unboxed future type used for actor actions.
128///
129/// This is a convenience type alias for a `Send` future that returns `T`.
130/// Prefer using [`ActorFut`] which wraps this in a pinned `Box`.
131pub type PreBoxActorFut<'a, T> = dyn Future<Output = T> + Send + 'a;
132
133/// The boxed future type returned by actor actions.
134///
135/// Most APIs take or return `ActorFut<'a, T>` as the standard boxed future
136/// used to execute actor code on the actor thread.
137pub type ActorFut<'a, T> = Pin<boxed::Box<PreBoxActorFut<'a, T>>>;
138
139/// An action scheduled onto an actor.
140///
141/// This is a boxed closure that receives a mutable reference to the actor
142/// instance and returns a boxed future to execute. The future typically returns
143/// `anyhow::Result<T>`, but `T` is generic here to support helper adapters.
144pub type Action<A> = Box<dyn for<'a> FnOnce(&'a mut A) -> ActorFut<'a, ()> + Send + 'static>;
145
146/// Convert a future yielding `anyhow::Result<T>` into the standard boxed
147/// [`ActorFut`].
148///
149/// Use this when your actor code already returns `Result<T, anyhow::Error>`.
150///
151/// See also: [`into_actor_fut_ok`], [`into_actor_fut_unit_ok`].
152pub fn into_actor_fut_res<'a, Fut, T>(
153 fut: Fut,
154) -> ActorFut<'a, anyhow::Result<T>>
155where
156 Fut: Future<Output = anyhow::Result<T>> + Send + 'a,
157 T: Send + 'a,
158{
159 Box::pin(fut)
160}
161
162/// Convert a future yielding `T` into a boxed future yielding `anyhow::Result<T>`.
163///
164/// This wraps the output in `Ok(T)` for convenience.
165pub fn into_actor_fut_ok<'a, Fut, T>(
166 fut: Fut,
167) -> ActorFut<'a, anyhow::Result<T>>
168where
169 Fut: Future<Output = T> + Send + 'a,
170 T: Send + 'a,
171{
172 Box::pin(async move { Ok::<T, anyhow::Error>(fut.await) })
173}
174
175/// Convert a unit future (`Future<Output = ()>`) into a boxed future yielding
176/// `anyhow::Result<()>`.
177///
178/// Convenient when an action does not return any value.
179pub fn into_actor_fut_unit_ok<'a, Fut>(
180 fut: Fut,
181) -> ActorFut<'a, anyhow::Result<()>>
182where
183 Fut: Future<Output = ()> + Send + 'a,
184{
185 Box::pin(async move {
186 fut.await;
187 Ok::<(), anyhow::Error>(())
188 })
189}
190
191/// Write an actor action that returns `anyhow::Result<T>`.
192///
193/// This macro helps create the closure expected by [`Handle::call`] when your
194/// action returns `Result<T, anyhow::Error>`. Use `act_ok!` if your action
195/// returns a plain `T`.
196///
197/// Examples
198/// ```rust,ignore
199/// Self.api.call(act!(actor => actor.do_something())).await
200/// // or with a block
201/// Self.api.call(act!(actor => async move { actor.do_something().await })).await
202/// ```
203#[macro_export]
204macro_rules! act {
205 // takes single expression that yields Result<T, anyhow::Error>
206 ($actor:ident => $expr:expr) => {{
207 move |$actor| $crate::into_actor_fut_res(($expr))
208 }};
209
210 // takes a block that yields Result<T, anyhow::Error> and can use ?
211 ($actor:ident => $body:block) => {{
212 move |$actor| $crate::into_actor_fut_res($body)
213 }};
214}
215/// Write an actor action that returns a plain `T` (wrapped as `Ok(T)`).
216///
217/// This macro is like [`act!`] but for actions that do not naturally return
218/// an `anyhow::Result`. The output is automatically wrapped into `Ok(..)`.
219///
220/// Examples
221/// ```rust,ignore
222/// Self.api.call(act_ok!(actor => actor.get_value())).await
223/// // or with a block
224/// Self.api.call(act_ok!(actor => {
225/// let v = actor.get_value();
226/// v
227/// })).await
228/// ```
229#[macro_export]
230macro_rules! act_ok {
231 // takes single expression that yields T
232 ($actor:ident => $expr:expr) => {{
233 move |$actor| $crate::into_actor_fut_ok(($expr))
234 }};
235
236 // takes a block that yields T (no = u) map to Ok(T)
237 ($actor:ident => $body:block) => {{
238 move |$actor| $crate::into_actor_fut_ok($body)
239 }};
240}
241
242/// A minimal trait implemented by concrete actor types to run their mailbox.
243///
244/// `async fn run()` is intendet to be alive as long as the actor is alive,
245/// processing actions from the mailbox sequentially.
246///
247/// *Note:* This is also the place to do any continuous background work *your* actor
248/// needs to perform.
249///
250/// The returned future should poll the mailbox and execute enqueued actions
251/// until the channel closes.
252pub trait Actor: Send + 'static {
253 fn run(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send;
254}
255
256#[derive(Debug)]
257/// A clonable handle to schedule actions onto an actor of type `A`.
258///
259/// Use [`Handle::channel`] to create a `(Handle<A>, mpsc::Receiver<Action<A>>)`
260/// pair, then build your actor with the receiver and spawn its run loop. The
261/// handle may be cloned and used concurrently from many tasks/threads.
262pub struct Handle<A> {
263 tx: mpsc::Sender<Action<A>>,
264}
265
266impl<A> Clone for Handle<A> {
267 fn clone(&self) -> Self {
268 Self {
269 tx: self.tx.clone(),
270 }
271 }
272}
273
274impl<A> Handle<A>
275where
276 A: Send + 'static,
277{
278 /// Create a new actor handle and mailbox receiver with the given capacity.
279 ///
280 /// Returns the `(Handle<A>, mpsc::Receiver<Action<A>>)` pair. Pass the
281 /// receiver to your actor and spawn its `run` loop like this:
282 /// ```rust,ignore
283 /// let (handle, rx) = Handle::<MyActor>::channel(128);
284 /// let actor = MyActor { /* ... */ rx };
285 /// tokio::spawn(async move { let _ = actor.run().await; });
286 /// ```
287 pub fn channel(capacity: usize) -> (Self, mpsc::Receiver<Action<A>>) {
288 let (tx, rx) = mpsc::channel(capacity);
289 (Self { tx }, rx)
290 }
291
292 /// Schedule an action to run on the actor and await its result.
293 ///
294 /// - `f` is a closure that receives `&mut A` and returns a boxed future
295 /// yielding `anyhow::Result<R>`. Use the [`act!`] and [`act_ok!`] macros
296 /// to write these concisely.
297 /// - If the actor panics while processing the action, the panic is caught
298 /// and returned as an `anyhow::Error` with the call site location.
299 /// - If the actor task has stopped, an error is returned.
300 pub async fn call<R, F>(&self, f: F) -> anyhow::Result<R>
301 where
302 F: for<'a> FnOnce(&'a mut A) -> ActorFut<'a, anyhow::Result<R>> + Send + 'static,
303 R: Send + 'static,
304 {
305 let (rtx, rrx) = oneshot::channel::<anyhow::Result<R>>();
306 let loc = std::panic::Location::caller();
307
308 self.tx
309 .send(Box::new(move |actor: &mut A| {
310 Box::pin(async move {
311 let res = std::panic::AssertUnwindSafe(async move { f(actor).await })
312 .catch_unwind()
313 .await
314 .map_err(|p| {
315 let msg = if let Some(s) = p.downcast_ref::<&str>() {
316 (*s).to_string()
317 } else if let Some(s) = p.downcast_ref::<String>() {
318 s.clone()
319 } else {
320 "unknown panic".to_string()
321 };
322 anyhow!(
323 "panic in actor call at {}:{}: {}",
324 loc.file(),
325 loc.line(),
326 msg
327 )
328 })
329 .and_then(|r| r);
330
331 let _ = rtx.send(res);
332 })
333 }))
334 .await
335 .map_err(|_| anyhow!("actor stopped (call send at {}:{})", loc.file(), loc.line()))?;
336
337 rrx.await
338 .map_err(|_| anyhow!("actor stopped (call recv at {}:{})", loc.file(), loc.line()))?
339 }
340}