tokio_tasker/
lib.rs

1//! Lets you stop and join groups of Tokio tasks.
2//!
3//! This is a small library intended to help with graceful shutdown in programs or services
4//! with a number of independent tasks and/or tasks that require shutdown steps.
5//!
6//! ## Usage
7//!
8//! The library's entry point is the [`Tasker`] type, which represents a group of tasks.
9//!
10//! #### Adding tasks
11//!
12//! Tasks are added to the group with [`spawn()`][Tasker::spawn()], [`spawn_local()`][Tasker::spawn_local()],
13//! or [`add_handle()`][Tasker::add_handle()].
14//!
15//! The `Tasker` may be cloned freely, the clones may be sent to other tasks/threads and tasks can be
16//! spawned on the clones as well. However, **each clone** except the 'main' one needs to be dropped to let the 'main' one join all the tasks.
17//! A clone can be dropped using [`finish()`][Tasker::finish()], this is recommended for explicitness.
18//! This is to avoid race conditions where tasks could be spawned while the `Tasker` is attempting to join.
19//!
20//! **Warning:** If you continuously add tasks to `Tasker` (such as network connection handlers etc.),
21//! over time its internal storage might grow unreasonably, as it needs to keep a handle to each task.
22//! To solve this, use [`poll_join()`][Tasker::poll_join()] or [`try_poll_join()`][Tasker::try_poll_join()]
23//! regularly (such as every couple thousand connections or so), this will regularly clean up finished tasks
24//! from `Tasker`'s storage.
25//!
26//! #### Stopping tasks
27//!
28//! The `Tasker` dispenses [`Stopper`]s, small futures that resolve once the task group is stopped.
29//! These can be used to wrap individual futures, streams or used in `select!()` etc.,
30//! see [`stopper()`][Tasker::stopper()].
31//!
32//! To signal to the group that all tasks should be stopped, call [`stop()`][Tasker::stop()]
33//! on any `Tasker` clone. This will resolve all the `Stopper` instances. Note that this is not racy,
34//! you can still obtain additional `Stopper`s (ie. in other threads etc.) and new tasks can still be
35//! set up, they will just be stopped right away.
36//!
37//! Alternatively you can obtain a [`Signaller`] using [`signaller()`][Tasker::signaller]
38//! to get a special `Tasker` clone that provides the `.stop()` method as well, but doesn't need to be finished/dropped
39//! before tasks are joined.
40//!
41//! #### Joining the task group
42//!
43//! There will usually be a 'main' instance of `Tasker` that will be used to join the tasks.
44//! (This doesn't have to be the original clone, whichever clone may be used.)
45//!
46//! Call [`join().await`][Tasker::join()] at the point where you would like to collect the tasks, such as at the end of `main()` or similar.
47//! This will first wait for all the other `Tasker` clones to be finished/dropped and then await the join handles
48//! of all the tasks, one by one.
49//!
50//! If any of the tasks panicked, `join()` will propagate the panic. Use [`try_join()`][Tasker::try_join()]
51//! to handle the join results yourself.
52//!
53//! There are also the [`poll_join()`][Tasker::poll_join()] and [`try_poll_join()`][Tasker::try_poll_join()]
54//! non-`async` variants which join already finished tasks without waiting, and release memory used by their handles.
55//!
56//! Finally, there is [`join_stream()`][Tasker::join_stream()] which lets you asynchronously receive task results
57//! as they become available, ie. as tasks terminate.
58//!
59//! ## Example
60//!
61//! A simple case of using `Tasker` in `main()`:
62//!
63//! ```rust
64//! # use std::time::Duration;
65//! #
66//! # use eyre::Result;
67//! # use futures_util::future;
68//! # use futures_util::StreamExt;
69//! # use tokio::time;
70//! # use tokio_stream::wrappers::IntervalStream;
71//! # use tokio_tasker::{FutureExt as _, Tasker};
72//! #
73//! #[tokio::main]
74//! async fn main() -> Result<()> {
75//!     let tasker = Tasker::new();
76//!
77//!     let tasker2 = tasker.clone();
78//!     // Spawn a task that will spawn some subtasks.
79//!     // It uses the tasker2 clone to do this.
80//!     tasker.spawn(async move {
81//!         let pending = future::pending::<()>().unless(tasker2.stopper());
82//!         tasker2.spawn(pending);
83//!
84//!         let interval = time::interval(Duration::from_millis(10_000));
85//!         let mut interval = IntervalStream::new(interval).take_until(tasker2.stopper());
86//!         tasker2.spawn(async move { while let Some(_) = interval.next().await {} });
87//!
88//!         // We're done spawning tasks on this clone.
89//!         tasker2.finish();
90//!     });
91//!
92//!     // Get a Signaller clone for stopping the group in another task
93//!     let signaller = tasker.signaller();
94//!     tokio::spawn(async move {
95//!         // .stop() the task group after 1s.
96//!         time::sleep(Duration::from_millis(1_000)).await;
97//!         signaller.stop();
98//!     });
99//!
100//!     // Join all the tasks.
101//!     tasker.join().await;
102//!
103//!     Ok(())
104//! }
105//! ```
106//!
107//! There is also an example echo server in `examples/echo.rs`.
108//!
109
110mod future_ext;
111mod join_stream;
112mod signaller;
113mod stopper;
114mod tasker;
115
116pub use future_ext::{FutureExt, Unless};
117pub use join_stream::JoinStream;
118pub use signaller::Signaller;
119pub use stopper::{Stopped, Stopper};
120pub use tasker::Tasker;