smolscale/
reaper.rs

1use async_channel::{Receiver, Sender};
2use async_executor::Task;
3use futures_lite::{FutureExt, StreamExt};
4use futures_util::stream::FuturesUnordered;
5
6/// Experimental: a "reaper" for `async_task` tasks that kills all that's inside when dropped, yet does not leak handles to tasks that have already died.
7pub struct TaskReaper<T> {
8    send_task: Sender<Task<T>>,
9    _reaper: Task<()>,
10}
11
12impl<T: Send + 'static> Default for TaskReaper<T> {
13    fn default() -> Self {
14        Self::new()
15    }
16}
17
18impl<T: Send + 'static> TaskReaper<T> {
19    /// Create a new reaper.
20    pub fn new() -> Self {
21        let (send_task, recv_task) = async_channel::unbounded();
22        let _reaper = crate::spawn(reaper_loop(recv_task));
23        Self { send_task, _reaper }
24    }
25
26    /// Attach a task to this reaper.
27    pub fn attach(&self, task: Task<T>) {
28        let _ = self.send_task.try_send(task);
29    }
30}
31
32async fn reaper_loop<T>(recv_task: Receiver<Task<T>>) {
33    let mut inner = FuturesUnordered::new();
34    loop {
35        let next = async { recv_task.recv().await }
36            .or(async {
37                inner.next().await;
38                futures_lite::future::pending().await
39            })
40            .await;
41        if let Ok(next) = next {
42            inner.push(next)
43        } else {
44            return;
45        }
46    }
47}