1use async_channel::{Receiver, Sender};
2use async_executor::Task;
3use futures_lite::{FutureExt, StreamExt};
4use futures_util::stream::FuturesUnordered;
5
6pub struct TaskReaper<T> {
8 send_task: Sender<Task<T>>,
9 _reaper: Task<()>,
10}
11
12impl<T: Send + 'static> Default for TaskReaper<T> {
13 fn default() -> Self {
14 Self::new()
15 }
16}
17
18impl<T: Send + 'static> TaskReaper<T> {
19 pub fn new() -> Self {
21 let (send_task, recv_task) = async_channel::unbounded();
22 let _reaper = crate::spawn(reaper_loop(recv_task));
23 Self { send_task, _reaper }
24 }
25
26 pub fn attach(&self, task: Task<T>) {
28 let _ = self.send_task.try_send(task);
29 }
30}
31
32async fn reaper_loop<T>(recv_task: Receiver<Task<T>>) {
33 let mut inner = FuturesUnordered::new();
34 loop {
35 let next = async { recv_task.recv().await }
36 .or(async {
37 inner.next().await;
38 futures_lite::future::pending().await
39 })
40 .await;
41 if let Ok(next) = next {
42 inner.push(next)
43 } else {
44 return;
45 }
46 }
47}