tonari_actor/async.rs
1//! # Async Actors
2//!
3//! `tonari-actor` lets you freely combine sync (blocking) and async actors within one system.
4//! Available under the `async` crate feature (disabled by default).
5//!
6//! While sync actors implement the [`Actor`](crate::Actor) trait and are spawned using the
7//! [`System::spawn()`], [`System::prepare()`] and [`System::prepare_fn()`] family of methods,
8//! async actors implement [`AsyncActor`] and are spawned using [`System::spawn_async()`],
9//! [`System::prepare_async()`] and [`System::prepare_async_factory()`].
10//!
11//! Sync and async actors share the same [`Addr`] and [`Recipient`](crate::Recipient) types.
12//!
13//! Async actors share the same paradigm as sync actors: each one gets its own OS-level thread.
14//! More specifically a single-threaded async runtime is spawned for every async actor.
15//!
16//! `tonari-actor` currently uses the [`tokio`][^tokio] ecosystem, more specifically its
17//! [`LocalRuntime`]. It allows spawning futures that are _not_ [`Send`], which means you
18//! can use [`Rc`](https://doc.rust-lang.org/std/rc/struct.Rc.html) and
19//! [`RefCell`](https://doc.rust-lang.org/std/cell/struct.RefCell.html) instead of
20//! [`Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) and mutexes in your futures.
21//! It also allows the [`AsyncActor`] trait (and the implementors) to use the `async fn` syntax.
22//!
23//! Tokio's [`LocalRuntime`] is currently [gated behind the `tokio_unstable` _Rust
24//! flag_](https://docs.rs/tokio/latest/tokio/index.html#unstable-features). Note that this isn't
25//! a cargo feature flag (that would go to `Cargo.toml`); it goes to `.cargo/config.toml` or
26//! `RUSTFLAGS`, which override the former. It needs to be specified in your leaf project/workspace
27//! (it doesn't propagate from `tonari-actor`). Stabilization of [`LocalRuntime`] is tracked in
28//! [tokio-rs/tokio#7558](https://github.com/tokio-rs/tokio/issues/7558).
29//!
30//! With [`AsyncActor::handle()`] being an `async fn`, you gain an access to the wide async library
31//! ecosystem (currently those compatible with [`tokio`]), and you can employ concurrency (still
32//! within the single thread) when processing each message by using the various future combinators.
33//!
34//! But the incoming messages are still processed sequentially. The actor event loop doesn't resume
35//! until the async fn handle() future resolves: it cannot start multiple concurrent
36//! [`AsyncActor::handle()`] futures also because `handle()` holds the `&mut self` reference.
37//!
38//! It is usually desirable that `handle()` resolves relatively quickly so that control messages can
39//! be processed in a timely fashion and non-control messages do not accumulate and overflow their
40//! inboxes.
41//!
42//! If you want to process the messages concurrently, spawn an async task to handle the message and
43//! return from the `handle()` method immediately so the new messages (including control messages to
44//! stop the actor) can arrive for processing.
45//!
46//! Async tasks can be spawned using [`tokio::task::spawn_local()`], or [`tokio::spawn()`] if the
47//! [`Send`] bound of the latter doesn't limit you.
48//!
49//! Note that an async equivalent of [`crate::SpawnBuilderWithAddress::run_and_block()`] is not
50//! currently implemented (contributions welcome).
51//!
52//! [^tokio]: on a logical level, `tonari-actor` isn't tied to any specific async runtime (it
53//! doesn't do any runtime-specific operations like I/O or timers), it just needs to spawn
54//! _some_ async runtime in the actor loop. Tokio was just a pragmatic choice that many crates
55//! in the ecosystem use. We could add support for alternative ones, even runtime-configurable.
56
57use crate::{
58 ActorError, Addr, BareContext, Capacity, Control, Priority, RegistryEntry, System, SystemHandle,
59};
60use futures_util::{StreamExt, select_biased};
61use log::{debug, trace};
62use std::{any::type_name, fmt, future, thread};
63use tokio::runtime::LocalRuntime;
64
65/// The actor trait - async variant.
66// Ad. the #[allow]: using `async fn` in a trait doesn't allow us to specify `Send` (or other)
67// bounds, but we don't really need any bounds, because we use [`LocalRuntime`].
68#[allow(async_fn_in_trait)]
69pub trait AsyncActor {
70 /// The expected type of a message to be received.
71 type Message: Send + 'static;
72 /// The type to return on error in the handle method.
73 type Error: fmt::Display;
74
75 /// Default capacity of actor's normal-priority inbox unless overridden by `.with_capacity()`.
76 const DEFAULT_CAPACITY_NORMAL: usize = 5;
77 /// Default capacity of actor's high-priority inbox unless overridden by `.with_capacity()`.
78 const DEFAULT_CAPACITY_HIGH: usize = 5;
79
80 /// The name of the Actor. Used only for logging/debugging.
81 /// Default implementation uses [`type_name()`].
82 fn name() -> &'static str {
83 type_name::<Self>()
84 }
85
86 /// Determine priority of a `message` before it is sent to this actor.
87 /// Default implementation returns [`Priority::Normal`].
88 fn priority(_message: &Self::Message) -> Priority {
89 Priority::Normal
90 }
91
92 /// An optional callback when the Actor has been started.
93 async fn started(&mut self, _context: &BareContext<Self::Message>) -> Result<(), Self::Error> {
94 Ok(())
95 }
96
97 /// The primary function of this trait, allowing an actor to handle incoming messages of
98 /// a certain type. Note that the actor system still calls this serially for each message even
99 /// for async actors; not even control messages (currently the request to stop the actor) are
100 /// processed while the future returned by `handle()` is still pending.
101 ///
102 /// Delegate work to an async task if you want to process messages and control events
103 /// concurrently. This is recommended especially if the `handle()` can take extended/arbitrary
104 /// time to fully resolve and you want the actor to stay responsive.
105 async fn handle(
106 &mut self,
107 context: &BareContext<Self::Message>,
108 message: Self::Message,
109 ) -> Result<(), Self::Error>;
110
111 /// An optional callback when the Actor has been stopped.
112 async fn stopped(&mut self, _context: &BareContext<Self::Message>) -> Result<(), Self::Error> {
113 Ok(())
114 }
115
116 /// Create address for this actor with default capacities.
117 fn addr() -> Addr<Self::Message> {
118 let capacity =
119 Capacity { normal: Self::DEFAULT_CAPACITY_NORMAL, high: Self::DEFAULT_CAPACITY_HIGH };
120 Self::addr_with_capacity(capacity)
121 }
122
123 /// Create address for this actor, specifying its inbox size. Accepts [`Capacity`] or [`usize`].
124 fn addr_with_capacity(capacity: impl Into<Capacity>) -> Addr<Self::Message> {
125 Addr::new(capacity, Self::name(), Self::priority)
126 }
127}
128
129/// A builder for configuring [`AsyncActor`] spawning.
130/// You can specify your own [`Addr`] for the Actor, or let the system create
131/// a new address with either provided or default capacity.
132#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
133 configure this builder"]
134pub struct AsyncSpawnBuilderWithoutAddress<'a, A: AsyncActor, F: IntoFuture<Output = A>> {
135 system: &'a mut System,
136 factory: F,
137}
138
139impl<'a, A: AsyncActor, F: IntoFuture<Output = A>> AsyncSpawnBuilderWithoutAddress<'a, A, F> {
140 /// Specify an existing [`Addr`] to use with this Actor.
141 pub fn with_addr(self, addr: Addr<A::Message>) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
142 AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
143 }
144
145 /// Specify a capacity for the actor's receiving channel. Accepts [`Capacity`] or [`usize`].
146 pub fn with_capacity(
147 self,
148 capacity: impl Into<Capacity>,
149 ) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
150 let addr = A::addr_with_capacity(capacity);
151 AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
152 }
153
154 /// Use the default capacity for the actor's receiving channel.
155 pub fn with_default_capacity(self) -> AsyncSpawnBuilderWithAddress<'a, A, F> {
156 let addr = A::addr();
157 AsyncSpawnBuilderWithAddress { spawn_builder: self, addr }
158 }
159}
160
161/// After having configured the builder with an address
162/// it is possible to create and run the actor on a new thread with [`Self::spawn()`].
163///
164/// Not yet implemented for async actors: `run_and_block()` on the current thread.
165/// File an issue if you need it.
166#[must_use = "You must call .spawn() to run the actor"]
167pub struct AsyncSpawnBuilderWithAddress<'a, A: AsyncActor, F: IntoFuture<Output = A>> {
168 spawn_builder: AsyncSpawnBuilderWithoutAddress<'a, A, F>,
169 addr: Addr<A::Message>,
170}
171
172impl<A: AsyncActor, F: IntoFuture<Output = A> + Send + 'static>
173 AsyncSpawnBuilderWithAddress<'_, A, F>
174{
175 /// Spawn this async actor into a new thread managed by the [`System`].
176 pub fn spawn(self) -> Result<Addr<A::Message>, ActorError> {
177 let builder = self.spawn_builder;
178 builder.system.spawn_async_fn_with_addr(builder.factory, self.addr.clone())?;
179 Ok(self.addr)
180 }
181}
182
183impl System {
184 /// Prepare an async actor to be spawned. Returns an [`AsyncSpawnBuilderWithoutAddress`]
185 /// which has to be further configured before spawning the actor.
186 pub fn prepare_async<A>(
187 &mut self,
188 actor: A,
189 ) -> AsyncSpawnBuilderWithoutAddress<'_, A, future::Ready<A>>
190 where
191 A: AsyncActor,
192 {
193 AsyncSpawnBuilderWithoutAddress { system: self, factory: future::ready(actor) }
194 }
195
196 /// Similar to [`Self::prepare_async()`], but an async actor factory is passed instead
197 /// of an [`AsyncActor`] itself. This is used when an actor needs to be
198 /// created on its own thread instead of the calling thread.
199 /// Returns an [`AsyncSpawnBuilderWithoutAddress`] which has to be further
200 /// configured before spawning the actor.
201 pub fn prepare_async_factory<A, F>(
202 &mut self,
203 factory: F,
204 ) -> AsyncSpawnBuilderWithoutAddress<'_, A, F>
205 where
206 A: AsyncActor,
207 F: IntoFuture<Output = A>,
208 {
209 AsyncSpawnBuilderWithoutAddress { system: self, factory }
210 }
211
212 /// Spawn an [`AsyncActor`] in the system, returning its address when successful.
213 /// This address is created by the system and uses a default capacity.
214 /// If you need to customize the address see [`Self::prepare_async()`] or
215 /// [`Self::prepare_async_factory()`].
216 pub fn spawn_async<A>(&mut self, actor: A) -> Result<Addr<A::Message>, ActorError>
217 where
218 A: AsyncActor + Send + 'static,
219 {
220 self.prepare_async(actor).with_default_capacity().spawn()
221 }
222
223 fn spawn_async_fn_with_addr<F, A>(
224 &mut self,
225 factory: F,
226 addr: Addr<A::Message>,
227 ) -> Result<(), ActorError>
228 where
229 F: IntoFuture<Output = A> + Send + 'static,
230 A: AsyncActor,
231 {
232 // Hold the lock until the end of the function to prevent the race
233 // condition between spawn and shutdown.
234 let system_state_lock = self.handle.system_state.read();
235 if !system_state_lock.is_running() {
236 return Err(ActorError::SystemStopped { actor_name: A::name() });
237 }
238
239 let system_handle = self.handle.clone();
240 let context =
241 BareContext { system_handle: system_handle.clone(), myself: addr.recipient.clone() };
242 let control_addr = addr.control_tx.clone();
243
244 let thread_handle = thread::Builder::new()
245 .name(A::name().into())
246 .spawn(move || {
247 let runtime = match LocalRuntime::new() {
248 Ok(runtime) => runtime,
249 Err(e) => {
250 Self::report_error_shutdown(
251 &system_handle,
252 A::name(),
253 "creating async runtime",
254 e,
255 );
256 return;
257 },
258 };
259
260 let main_task = async {
261 let mut actor = factory.await;
262
263 if let Err(error) = actor.started(&context).await {
264 Self::report_error_shutdown(&system_handle, A::name(), "started()", error);
265 return;
266 }
267 debug!("[{}] started async actor: {}", system_handle.name, A::name());
268
269 Self::run_async_actor_select_loop(actor, addr, &context, &system_handle).await
270 };
271
272 runtime.block_on(main_task)
273 })
274 .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
275
276 self.handle
277 .registry
278 .lock()
279 .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
280
281 Ok(())
282 }
283
284 /// Keep logically in sync with [`Self::run_actor_select_loop()`].
285 async fn run_async_actor_select_loop<A>(
286 mut actor: A,
287 addr: Addr<A::Message>,
288 context: &BareContext<A::Message>,
289 system_handle: &SystemHandle,
290 ) where
291 A: AsyncActor,
292 {
293 /// What can be received during one actor event loop.
294 enum Received<M> {
295 Control(Control),
296 Message(M),
297 }
298
299 let mut control_stream = addr.control_rx.into_stream();
300 let mut high_prio_stream = addr.priority_rx.into_stream();
301 let mut normal_prio_stream = addr.message_rx.into_stream();
302
303 loop {
304 // We have a nuanced requirements on combinator for the futures:
305 // 1. If multiple futures in the combinator are ready, it should return the one with
306 // higher priority (control > high > normal);
307 // 2. Otherwise it would wait for the first message to be ready and return that.
308 //
309 // Tokio's `select` macro documentation contains nice survey of ecosystem alternatives:
310 // https://docs.rs/tokio/latest/tokio/macro.select.html#racing-futures
311 let received = select_biased!(
312 control = control_stream.next() => {
313 Received::Control(control.expect("We keep control_tx alive through addr."))
314 },
315 high_prio = high_prio_stream.next() => {
316 Received::Message(high_prio.expect("We keep priority_tx alive through addr."))
317 },
318 normal_prio = normal_prio_stream.next() => {
319 Received::Message(normal_prio.expect("We keep message_tx alive through addr."))
320 },
321 );
322
323 // Process the event. Returning ends actor loop, the normal operation is to fall through.
324 match received {
325 Received::Control(Control::Stop) => {
326 if let Err(error) = actor.stopped(context).await {
327 // FWIW this should always hit the "while shutting down" variant.
328 Self::report_error_shutdown(system_handle, A::name(), "stopped()", error);
329 }
330 debug!("[{}] stopped actor: {}", system_handle.name, A::name());
331 return;
332 },
333 Received::Message(msg) => {
334 trace!("[{}] message received by {}", system_handle.name, A::name());
335 if let Err(error) = actor.handle(context, msg).await {
336 Self::report_error_shutdown(system_handle, A::name(), "handle()", error);
337 return;
338 }
339 },
340 }
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::{Actor, Context, Recipient};
349 use anyhow::Error;
350 use std::{
351 sync::{Arc, Mutex},
352 time::Duration,
353 };
354
355 struct AsyncTestActor {
356 recorder: Recipient<TestMessage>,
357 }
358
359 impl AsyncActor for AsyncTestActor {
360 type Error = Error;
361 type Message = TestMessage;
362
363 fn priority(message: &TestMessage) -> Priority {
364 match message {
365 TestMessage::HighPrio(_) => Priority::High,
366 _ => Priority::Normal,
367 }
368 }
369
370 async fn started(&mut self, _: &BareContext<TestMessage>) -> Result<(), Error> {
371 debug!("AsyncActor started hook");
372 self.recorder.send(TestMessage::Event("started"))?;
373 Ok(())
374 }
375
376 async fn handle(
377 &mut self,
378 context: &BareContext<TestMessage>,
379 message: TestMessage,
380 ) -> Result<(), Error> {
381 self.recorder.send(message.clone())?;
382
383 if message == TestMessage::DelayedTask {
384 let recorder = self.recorder.clone();
385 tokio::spawn(async move {
386 debug!("delayed task started");
387 tokio::time::sleep(Duration::from_millis(10)).await;
388
389 recorder.send(TestMessage::Event("delayed task finished"))?;
390 debug!("delayed task finished");
391 Ok::<(), Error>(())
392 });
393 }
394
395 if message == TestMessage::DelayedShutdown {
396 let system_handle = context.system_handle.clone();
397 tokio::spawn(async move {
398 debug!("delayed shutdown started");
399 tokio::time::sleep(Duration::from_millis(20)).await;
400
401 debug!("delayed shutdown shutting down now");
402 system_handle.shutdown()
403 });
404 }
405
406 Ok(())
407 }
408
409 async fn stopped(&mut self, _: &BareContext<TestMessage>) -> Result<(), Error> {
410 trace!("AsyncActor stopped hook");
411 // Not sending a message to recorder, it is also being stopped right now.
412 Ok(())
413 }
414 }
415
416 #[derive(Debug, Clone, PartialEq, Eq)]
417 enum TestMessage {
418 Event(&'static str),
419 HighPrio(usize),
420 NormalPrio(usize),
421 DelayedTask,
422 DelayedShutdown,
423 }
424
425 struct SyncRecorder {
426 received: Arc<Mutex<Vec<TestMessage>>>,
427 }
428
429 impl Actor for SyncRecorder {
430 type Context = Context<Self::Message>;
431 type Error = Error;
432 type Message = TestMessage;
433
434 fn handle(
435 &mut self,
436 _context: &mut Self::Context,
437 message: Self::Message,
438 ) -> Result<(), Self::Error> {
439 self.received.lock().expect("lock should not be poisoned").push(message);
440 Ok(())
441 }
442 }
443
444 #[test]
445 fn async_priorities() {
446 // Logger might have been initialized by another test, so just try on best-effort basis.
447 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
448 .try_init()
449 .ok();
450
451 let mut system = System::new("async priorities");
452
453 let received = Arc::new(Mutex::new(Vec::new()));
454 let recorder_actor = SyncRecorder { received: Arc::clone(&received) };
455 let recorder_addr = system.prepare(recorder_actor).with_capacity(10).spawn().unwrap();
456
457 let async_actor = AsyncTestActor { recorder: recorder_addr.recipient() };
458 let async_addr = system.prepare_async(async_actor).with_capacity(10).spawn().unwrap();
459
460 async_addr.send(TestMessage::DelayedTask).unwrap();
461 async_addr.send(TestMessage::DelayedShutdown).unwrap();
462 async_addr.send(TestMessage::NormalPrio(1)).unwrap();
463 async_addr.send(TestMessage::NormalPrio(2)).unwrap();
464 async_addr.send(TestMessage::HighPrio(3)).unwrap();
465 async_addr.send(TestMessage::HighPrio(4)).unwrap();
466
467 system.run().unwrap();
468
469 let received = Arc::into_inner(received)
470 .expect("arc has a single reference at this point")
471 .into_inner()
472 .expect("Mutex should not be poisoned");
473 assert_eq!(
474 received,
475 [
476 TestMessage::Event("started"),
477 TestMessage::HighPrio(3),
478 TestMessage::HighPrio(4),
479 TestMessage::DelayedTask,
480 TestMessage::DelayedShutdown,
481 TestMessage::NormalPrio(1),
482 TestMessage::NormalPrio(2),
483 TestMessage::Event("delayed task finished")
484 ]
485 );
486 }
487
488 #[test]
489 fn async_error() {
490 // Logger might have been initialized by another test, so just try on best-effort basis.
491 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
492 .try_init()
493 .ok();
494
495 struct ErroringActor;
496
497 impl AsyncActor for ErroringActor {
498 type Error = String;
499 type Message = ();
500
501 async fn handle(&mut self, _c: &BareContext<()>, _m: ()) -> Result<(), String> {
502 Err(String::from("Raising an error"))
503 }
504 }
505
506 let mut system = System::new("async error");
507 let addr = system.spawn_async(ErroringActor).unwrap();
508 addr.send(()).unwrap();
509
510 // The Error isn't really propagated here, but at least we can test that the system doesn't
511 // continue running (i.e. this test finishes quickly, doesn't hang here indefinitely).
512 system.run().unwrap();
513 }
514}