Skip to main content

tonari_actor/
async.rs

1//! # Async Actors
2//!
3//! `tonari-actor` lets you freely combine sync (blocking) and async actors within one system.
4//! Available under the `async` crate feature (disabled by default).
5//!
6//! While sync actors implement the [`Actor`](crate::Actor) trait and are spawned using the
7//! [`System::spawn()`], [`System::prepare()`] and [`System::prepare_fn()`] family of methods,
8//! async actors implement [`AsyncActor`] and are spawned using [`System::spawn_async()`],
9//! [`System::prepare_async()`] and [`System::prepare_async_factory()`].
10//!
11//! Sync and async actors share the same [`Addr`] and [`Recipient`](crate::Recipient) types.
12//!
13//! Async actors share the same paradigm as sync actors: each one gets its own OS-level thread.
14//! More specifically a single-threaded async runtime is spawned for every async actor.
15//!
16//! `tonari-actor` currently uses the [`tokio`][^tokio] ecosystem, more specifically its
17//! [`LocalRuntime`]. It allows spawning futures that are _not_ [`Send`], which means you
18//! can use [`Rc`](https://doc.rust-lang.org/std/rc/struct.Rc.html) and
19//! [`RefCell`](https://doc.rust-lang.org/std/cell/struct.RefCell.html) instead of
20//! [`Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) and mutexes in your futures.
21//! It also allows the [`AsyncActor`] trait (and the implementors) to use the `async fn` syntax.
22//!
23//! Tokio's [`LocalRuntime`] is currently [gated behind the `tokio_unstable` _Rust
24//! flag_](https://docs.rs/tokio/latest/tokio/index.html#unstable-features). Note that this isn't
25//! a cargo feature flag (that would go to `Cargo.toml`); it goes to `.cargo/config.toml` or
26//! `RUSTFLAGS`, which override the former. It needs to be specified in your leaf project/workspace
27//! (it doesn't propagate from `tonari-actor`). Stabilization of [`LocalRuntime`] is tracked in
28//! [tokio-rs/tokio#7558](https://github.com/tokio-rs/tokio/issues/7558).
29//!
30//! With [`AsyncActor::handle()`] being an `async fn`, you gain an access to the wide async library
31//! ecosystem (currently those compatible with [`tokio`]), and you can employ concurrency (still
32//! within the single thread) when processing each message by using the various future combinators.
33//!
34//! But the incoming messages are still processed sequentially. The actor event loop doesn't resume
35//! until the async fn handle() future resolves: it cannot start multiple concurrent
36//! [`AsyncActor::handle()`] futures also because `handle()` holds the `&mut self` reference.
37//!
38//! It is usually desirable that `handle()` resolves relatively quickly so that control messages can
39//! be processed in a timely fashion and non-control messages do not accumulate and overflow their
40//! inboxes.
41//!
42//! If you want to process the messages concurrently, spawn an async task to handle the message and
43//! return from the `handle()` method immediately so the new messages (including control messages to
44//! stop the actor) can arrive for processing.
45//!
46//! Async tasks can be spawned using [`tokio::task::spawn_local()`], or [`tokio::spawn()`] if the
47//! [`Send`] bound of the latter doesn't limit you.
48//!
49//! Note that an async equivalent of [`crate::SpawnBuilderWithAddress::run_and_block()`] is not
50//! currently implemented (contributions welcome).
51//!
52//! [^tokio]: on a logical level, `tonari-actor` isn't tied to any specific async runtime (it
53//!     doesn't do any runtime-specific operations like I/O or timers), it just needs to spawn
54//!     _some_ async runtime in the actor loop. Tokio was just a pragmatic choice that many crates
55//!     in the ecosystem use. We could add support for alternative ones, even runtime-configurable.
56
57use crate::{
58    ActorError, Addr, BareContext, Capacity, Control, Priority, RegistryEntry, System, SystemHandle,
59};
60use futures_util::{StreamExt, select_biased};
61use log::{debug, trace};
62use std::{any::type_name, fmt, future, thread};
63use tokio::runtime::LocalRuntime;
64
65/// The actor trait - async variant.
66// Ad. the #[allow]: using `async fn` in a trait doesn't allow us to specify `Send` (or other)
67// bounds, but we don't really need any bounds, because we use [`LocalRuntime`].
68#[allow(async_fn_in_trait)]
69pub trait AsyncActor {
70    /// The expected type of a message to be received.
71    type Message: Send + 'static;
72    /// The type to return on error in the handle method.
73    type Error: fmt::Display;
74
75    /// Default capacity of actor's normal-priority inbox unless overridden by `.with_capacity()`.
76    const DEFAULT_CAPACITY_NORMAL: usize = 5;
77    /// Default capacity of actor's high-priority inbox unless overridden by `.with_capacity()`.
78    const DEFAULT_CAPACITY_HIGH: usize = 5;
79
80    /// The name of the Actor. Used only for logging/debugging.
81    /// Default implementation uses [`type_name()`].
82    fn name() -> &'static str {
83        type_name::<Self>()
84    }
85
86    /// Determine priority of a `message` before it is sent to this actor.
87    /// Default implementation returns [`Priority::Normal`].
88    fn priority(_message: &Self::Message) -> Priority {
89        Priority::Normal
90    }
91
92    /// An optional callback when the Actor has been started.
93    async fn started(&mut self, _context: &BareContext<Self::Message>) -> Result<(), Self::Error> {
94        Ok(())
95    }
96
97    /// The primary function of this trait, allowing an actor to handle incoming messages of
98    /// a certain type. Note that the actor system still calls this serially for each message even
99    /// for async actors; not even control messages (currently the request to stop the actor) are
100    /// processed while the future returned by `handle()` is still pending.
101    ///
102    /// Delegate work to an async task if you want to process messages and control events
103    /// concurrently. This is recommended especially if the `handle()` can take extended/arbitrary
104    /// time to fully resolve and you want the actor to stay responsive.
105    async fn handle(
106        &mut self,
107        context: &BareContext<Self::Message>,
108        message: Self::Message,
109    ) -> Result<(), Self::Error>;
110
111    /// An optional callback when the Actor has been stopped.
112    async fn stopped(&mut self, _context: &BareContext<Self::Message>) -> Result<(), Self::Error> {
113        Ok(())
114    }
115
116    /// Create address for this actor with default capacities.
117    fn addr() -> Addr<Self::Message> {
118        let capacity =
119            Capacity { normal: Self::DEFAULT_CAPACITY_NORMAL, high: Self::DEFAULT_CAPACITY_HIGH };
120        Self::addr_with_capacity(capacity)
121    }
122
123    /// Create address for this actor, specifying its inbox size. Accepts [`Capacity`] or [`usize`].
124    fn addr_with_capacity(capacity: impl Into<Capacity>) -> Addr<Self::Message> {
125        Addr::new(capacity, Self::name(), Self::priority)
126    }
127}
128
129/// A builder for configuring [`AsyncActor`] spawning.
130/// You can specify your own [`Addr`] for the Actor, or let the system create
131/// a new address with either provided or default capacity.
132#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
133              configure this builder"]
134pub struct AsyncSpawnBuilderWithoutAddress<'a, A: AsyncActor, F: IntoFuture<Output = A>> {
135    system: &'a mut System,
136    factory: F,
137}
138
139impl<'a, A: AsyncActor, F: IntoFuture<Output = A>> AsyncSpawnBuilderWithoutAddress<'a, A, F> {
140    /// Specify an existing [`Addr`] to use with this Actor.
141    pub fn with_addr(self, addr: Addr<A::Message>) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
142        AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
143    }
144
145    /// Specify a capacity for the actor's receiving channel. Accepts [`Capacity`] or [`usize`].
146    pub fn with_capacity(
147        self,
148        capacity: impl Into<Capacity>,
149    ) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
150        let addr = A::addr_with_capacity(capacity);
151        AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
152    }
153
154    /// Use the default capacity for the actor's receiving channel.
155    pub fn with_default_capacity(self) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
156        let addr = A::addr();
157        AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
158    }
159}
160
161/// After having configured the builder with an address
162/// it is possible to create and run the actor on a new thread with [`Self::spawn()`].
163///
164/// Not yet implemented for async actors: `run_and_block()` on the current thread.
165/// File an issue if you need it.
166#[must_use = "You must call .spawn() to run the actor"]
167pub struct AsyncSpawnBuilderWithAddress<'a, A: AsyncActor, F: IntoFuture<Output = A>> {
168    spawn_builder: AsyncSpawnBuilderWithoutAddress<'a, A, F>,
169    addr: Addr<A::Message>,
170}
171
172impl<A: AsyncActor, F: IntoFuture<Output = A> + Send + 'static>
173    AsyncSpawnBuilderWithAddress<'_, A, F>
174{
175    /// Spawn this async actor into a new thread managed by the [`System`].
176    pub fn spawn(self) -> Result<Addr<A::Message>, ActorError> {
177        let builder = self.spawn_builder;
178        builder.system.spawn_async_fn_with_addr(builder.factory, self.addr.clone())?;
179        Ok(self.addr)
180    }
181}
182
183impl System {
184    /// Prepare an async actor to be spawned. Returns an [`AsyncSpawnBuilderWithoutAddress`]
185    /// which has to be further configured before spawning the actor.
186    pub fn prepare_async<A>(
187        &mut self,
188        actor: A,
189    ) -> AsyncSpawnBuilderWithoutAddress<'_, A, future::Ready<A>>
190    where
191        A: AsyncActor,
192    {
193        AsyncSpawnBuilderWithoutAddress { system: self, factory: future::ready(actor) }
194    }
195
196    /// Similar to [`Self::prepare_async()`], but an async actor factory is passed instead
197    /// of an [`AsyncActor`] itself. This is used when an actor needs to be
198    /// created on its own thread instead of the calling thread.
199    /// Returns an [`AsyncSpawnBuilderWithoutAddress`] which has to be further
200    /// configured before spawning the actor.
201    pub fn prepare_async_factory<A, F>(
202        &mut self,
203        factory: F,
204    ) -> AsyncSpawnBuilderWithoutAddress<'_, A, F>
205    where
206        A: AsyncActor,
207        F: IntoFuture<Output = A>,
208    {
209        AsyncSpawnBuilderWithoutAddress { system: self, factory }
210    }
211
212    /// Spawn an [`AsyncActor`] in the system, returning its address when successful.
213    /// This address is created by the system and uses a default capacity.
214    /// If you need to customize the address see [`Self::prepare_async()`] or
215    /// [`Self::prepare_async_factory()`].
216    pub fn spawn_async<A>(&mut self, actor: A) -> Result<Addr<A::Message>, ActorError>
217    where
218        A: AsyncActor + Send + 'static,
219    {
220        self.prepare_async(actor).with_default_capacity().spawn()
221    }
222
223    fn spawn_async_fn_with_addr<F, A>(
224        &mut self,
225        factory: F,
226        addr: Addr<A::Message>,
227    ) -> Result<(), ActorError>
228    where
229        F: IntoFuture<Output = A> + Send + 'static,
230        A: AsyncActor,
231    {
232        // Hold the lock until the end of the function to prevent the race
233        // condition between spawn and shutdown.
234        let system_state_lock = self.handle.system_state.read();
235        if !system_state_lock.is_running() {
236            return Err(ActorError::SystemStopped { actor_name: A::name() });
237        }
238
239        let system_handle = self.handle.clone();
240        let context =
241            BareContext { system_handle: system_handle.clone(), myself: addr.recipient.clone() };
242        let control_addr = addr.control_tx.clone();
243
244        let thread_handle = thread::Builder::new()
245            .name(A::name().into())
246            .spawn(move || {
247                let runtime = match LocalRuntime::new() {
248                    Ok(runtime) => runtime,
249                    Err(e) => {
250                        Self::report_error_shutdown(
251                            &system_handle,
252                            A::name(),
253                            "creating async runtime",
254                            e,
255                        );
256                        return;
257                    },
258                };
259
260                let main_task = async {
261                    let mut actor = factory.await;
262
263                    if let Err(error) = actor.started(&context).await {
264                        Self::report_error_shutdown(&system_handle, A::name(), "started()", error);
265                        return;
266                    }
267                    debug!("[{}] started async actor: {}", system_handle.name, A::name());
268
269                    Self::run_async_actor_select_loop(actor, addr, &context, &system_handle).await
270                };
271
272                runtime.block_on(main_task)
273            })
274            .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
275
276        self.handle
277            .registry
278            .lock()
279            .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
280
281        Ok(())
282    }
283
284    /// Keep logically in sync with [`Self::run_actor_select_loop()`].
285    async fn run_async_actor_select_loop<A>(
286        mut actor: A,
287        addr: Addr<A::Message>,
288        context: &BareContext<A::Message>,
289        system_handle: &SystemHandle,
290    ) where
291        A: AsyncActor,
292    {
293        /// What can be received during one actor event loop.
294        enum Received<M> {
295            Control(Control),
296            Message(M),
297        }
298
299        let mut control_stream = addr.control_rx.into_stream();
300        let mut high_prio_stream = addr.priority_rx.into_stream();
301        let mut normal_prio_stream = addr.message_rx.into_stream();
302
303        loop {
304            // We have a nuanced requirements on combinator for the futures:
305            // 1. If multiple futures in the combinator are ready, it should return the one with
306            //    higher priority (control > high > normal);
307            // 2. Otherwise it would wait for the first message to be ready and return that.
308            //
309            // Tokio's `select` macro documentation contains nice survey of ecosystem alternatives:
310            // https://docs.rs/tokio/latest/tokio/macro.select.html#racing-futures
311            let received = select_biased!(
312                control = control_stream.next() => {
313                    Received::Control(control.expect("We keep control_tx alive through addr."))
314                },
315                high_prio = high_prio_stream.next() => {
316                    Received::Message(high_prio.expect("We keep priority_tx alive through addr."))
317                },
318                normal_prio = normal_prio_stream.next() => {
319                    Received::Message(normal_prio.expect("We keep message_tx alive through addr."))
320                },
321            );
322
323            // Process the event. Returning ends actor loop, the normal operation is to fall through.
324            match received {
325                Received::Control(Control::Stop) => {
326                    if let Err(error) = actor.stopped(context).await {
327                        // FWIW this should always hit the "while shutting down" variant.
328                        Self::report_error_shutdown(system_handle, A::name(), "stopped()", error);
329                    }
330                    debug!("[{}] stopped actor: {}", system_handle.name, A::name());
331                    return;
332                },
333                Received::Message(msg) => {
334                    trace!("[{}] message received by {}", system_handle.name, A::name());
335                    if let Err(error) = actor.handle(context, msg).await {
336                        Self::report_error_shutdown(system_handle, A::name(), "handle()", error);
337                        return;
338                    }
339                },
340            }
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::{Actor, Context, Recipient};
349    use anyhow::Error;
350    use std::{
351        sync::{Arc, Mutex},
352        time::Duration,
353    };
354
355    struct AsyncTestActor {
356        recorder: Recipient<TestMessage>,
357    }
358
359    impl AsyncActor for AsyncTestActor {
360        type Error = Error;
361        type Message = TestMessage;
362
363        fn priority(message: &TestMessage) -> Priority {
364            match message {
365                TestMessage::HighPrio(_) => Priority::High,
366                _ => Priority::Normal,
367            }
368        }
369
370        async fn started(&mut self, _: &BareContext<TestMessage>) -> Result<(), Error> {
371            debug!("AsyncActor started hook");
372            self.recorder.send(TestMessage::Event("started"))?;
373            Ok(())
374        }
375
376        async fn handle(
377            &mut self,
378            context: &BareContext<TestMessage>,
379            message: TestMessage,
380        ) -> Result<(), Error> {
381            self.recorder.send(message.clone())?;
382
383            if message == TestMessage::DelayedTask {
384                let recorder = self.recorder.clone();
385                tokio::spawn(async move {
386                    debug!("delayed task started");
387                    tokio::time::sleep(Duration::from_millis(10)).await;
388
389                    recorder.send(TestMessage::Event("delayed task finished"))?;
390                    debug!("delayed task finished");
391                    Ok::<(), Error>(())
392                });
393            }
394
395            if message == TestMessage::DelayedShutdown {
396                let system_handle = context.system_handle.clone();
397                tokio::spawn(async move {
398                    debug!("delayed shutdown started");
399                    tokio::time::sleep(Duration::from_millis(20)).await;
400
401                    debug!("delayed shutdown shutting down now");
402                    system_handle.shutdown()
403                });
404            }
405
406            Ok(())
407        }
408
409        async fn stopped(&mut self, _: &BareContext<TestMessage>) -> Result<(), Error> {
410            trace!("AsyncActor stopped hook");
411            // Not sending a message to recorder, it is also being stopped right now.
412            Ok(())
413        }
414    }
415
416    #[derive(Debug, Clone, PartialEq, Eq)]
417    enum TestMessage {
418        Event(&'static str),
419        HighPrio(usize),
420        NormalPrio(usize),
421        DelayedTask,
422        DelayedShutdown,
423    }
424
425    struct SyncRecorder {
426        received: Arc<Mutex<Vec<TestMessage>>>,
427    }
428
429    impl Actor for SyncRecorder {
430        type Context = Context<Self::Message>;
431        type Error = Error;
432        type Message = TestMessage;
433
434        fn handle(
435            &mut self,
436            _context: &mut Self::Context,
437            message: Self::Message,
438        ) -> Result<(), Self::Error> {
439            self.received.lock().expect("lock should not be poisoned").push(message);
440            Ok(())
441        }
442    }
443
444    #[test]
445    fn async_priorities() {
446        // Logger might have been initialized by another test, so just try on best-effort basis.
447        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
448            .try_init()
449            .ok();
450
451        let mut system = System::new("async priorities");
452
453        let received = Arc::new(Mutex::new(Vec::new()));
454        let recorder_actor = SyncRecorder { received: Arc::clone(&received) };
455        let recorder_addr = system.prepare(recorder_actor).with_capacity(10).spawn().unwrap();
456
457        let async_actor = AsyncTestActor { recorder: recorder_addr.recipient() };
458        let async_addr = system.prepare_async(async_actor).with_capacity(10).spawn().unwrap();
459
460        async_addr.send(TestMessage::DelayedTask).unwrap();
461        async_addr.send(TestMessage::DelayedShutdown).unwrap();
462        async_addr.send(TestMessage::NormalPrio(1)).unwrap();
463        async_addr.send(TestMessage::NormalPrio(2)).unwrap();
464        async_addr.send(TestMessage::HighPrio(3)).unwrap();
465        async_addr.send(TestMessage::HighPrio(4)).unwrap();
466
467        system.run().unwrap();
468
469        let received = Arc::into_inner(received)
470            .expect("arc has a single reference at this point")
471            .into_inner()
472            .expect("Mutex should not be poisoned");
473        assert_eq!(
474            received,
475            [
476                TestMessage::Event("started"),
477                TestMessage::HighPrio(3),
478                TestMessage::HighPrio(4),
479                TestMessage::DelayedTask,
480                TestMessage::DelayedShutdown,
481                TestMessage::NormalPrio(1),
482                TestMessage::NormalPrio(2),
483                TestMessage::Event("delayed task finished")
484            ]
485        );
486    }
487
488    #[test]
489    fn async_error() {
490        // Logger might have been initialized by another test, so just try on best-effort basis.
491        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
492            .try_init()
493            .ok();
494
495        struct ErroringActor;
496
497        impl AsyncActor for ErroringActor {
498            type Error = String;
499            type Message = ();
500
501            async fn handle(&mut self, _c: &BareContext<()>, _m: ()) -> Result<(), String> {
502                Err(String::from("Raising an error"))
503            }
504        }
505
506        let mut system = System::new("async error");
507        let addr = system.spawn_async(ErroringActor).unwrap();
508        addr.send(()).unwrap();
509
510        // The Error isn't really propagated here, but at least we can test that the system doesn't
511        // continue running (i.e. this test finishes quickly, doesn't hang here indefinitely).
512        system.run().unwrap();
513    }
514}