1use std::{cell::RefCell, pin::Pin, rc::Rc, task::Waker};
2
3use futures::{select, stream::FuturesUnordered, Future, FutureExt, StreamExt};
4use tracing::trace;
5
6type DynFuture = Pin<Box<dyn Future<Output = ()>>>;
7
8struct MofoInner {
9 futures: Option<FuturesUnordered<DynFuture>>,
10 new_futures: Vec<DynFuture>,
11 waker: Option<Waker>,
12}
13
14#[derive(Clone)]
15pub struct Mofo(Rc<RefCell<MofoInner>>);
16
17impl Mofo {
18 pub fn new() -> Mofo {
19 Mofo(Rc::new(RefCell::new(MofoInner {
20 futures: Some(FuturesUnordered::new()),
21 new_futures: Vec::new(),
22 waker: None,
23 })))
24 }
25
26 pub fn add_background_task(&self, fut: DynFuture) {
27 trace!("Adding background task");
28 let mut self_ref = self.0.borrow_mut();
29 if let Some(futures) = &mut self_ref.futures {
30 futures.push(fut);
31 } else {
32 self_ref.new_futures.push(fut);
34 }
35 if let Some(waker) = &self_ref.waker {
36 waker.wake_by_ref();
37 } else {
38 trace!("No waker to use yet");
39 }
40 }
41
42 pub async fn run_until<O, F: Future<Output = O>>(self, foreground: F) -> O {
43 select! {
44 result = foreground.fuse() => result,
45 _ = self.boxed_local().fuse() => panic!("Background tasks finished first"),
46 }
47 }
48}
49
50impl Default for Mofo {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56impl Future for Mofo {
57 type Output = ();
58
59 fn poll(
60 self: Pin<&mut Self>,
61 cx: &mut std::task::Context<'_>,
62 ) -> std::task::Poll<Self::Output> {
63 let mut current_futures = {
64 let mut self_ref = self.0.borrow_mut();
65 if self_ref.waker.is_none() {
66 self_ref.waker = Some(cx.waker().clone());
67 }
68
69 std::mem::take(&mut self_ref.futures).expect("Already polling")
71 };
72
73 trace!(n_tasks = current_futures.len(), "Polling background tasks");
74
75 let res = current_futures.poll_next_unpin(cx);
76 if let std::task::Poll::Ready(Some(_)) = res {
77 trace!("Polling background tasks finished, manually re-waking");
79 cx.waker().wake_by_ref();
80 }
81
82 let mut self_ref = self.0.borrow_mut();
83 trace!(
84 n_new_tasks = self_ref.new_futures.len(),
85 "Polling background tasks done"
86 );
87 current_futures.extend(self_ref.new_futures.drain(..));
88 self_ref.futures = Some(current_futures);
89 std::task::Poll::Pending
90 }
91}