mofo/
lib.rs

1use std::{cell::RefCell, pin::Pin, rc::Rc, task::Waker};
2
3use futures::{select, stream::FuturesUnordered, Future, FutureExt, StreamExt};
4use tracing::trace;
5
6type DynFuture = Pin<Box<dyn Future<Output = ()>>>;
7
8struct MofoInner {
9    futures: Option<FuturesUnordered<DynFuture>>,
10    new_futures: Vec<DynFuture>,
11    waker: Option<Waker>,
12}
13
14#[derive(Clone)]
15pub struct Mofo(Rc<RefCell<MofoInner>>);
16
17impl Mofo {
18    pub fn new() -> Mofo {
19        Mofo(Rc::new(RefCell::new(MofoInner {
20            futures: Some(FuturesUnordered::new()),
21            new_futures: Vec::new(),
22            waker: None,
23        })))
24    }
25
26    pub fn add_background_task(&self, fut: DynFuture) {
27        trace!("Adding background task");
28        let mut self_ref = self.0.borrow_mut();
29        if let Some(futures) = &mut self_ref.futures {
30            futures.push(fut);
31        } else {
32            // We're currently polling existing futures
33            self_ref.new_futures.push(fut);
34        }
35        if let Some(waker) = &self_ref.waker {
36            waker.wake_by_ref();
37        } else {
38            trace!("No waker to use yet");
39        }
40    }
41
42    pub async fn run_until<O, F: Future<Output = O>>(self, foreground: F) -> O {
43        select! {
44            result = foreground.fuse() => result,
45            _ = self.boxed_local().fuse() => panic!("Background tasks finished first"),
46        }
47    }
48}
49
50impl Default for Mofo {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl Future for Mofo {
57    type Output = ();
58
59    fn poll(
60        self: Pin<&mut Self>,
61        cx: &mut std::task::Context<'_>,
62    ) -> std::task::Poll<Self::Output> {
63        let mut current_futures = {
64            let mut self_ref = self.0.borrow_mut();
65            if self_ref.waker.is_none() {
66                self_ref.waker = Some(cx.waker().clone());
67            }
68
69            // pull futures out for polling, so we don't borrow self while polling, so futures can add more tasks
70            std::mem::take(&mut self_ref.futures).expect("Already polling")
71        };
72
73        trace!(n_tasks = current_futures.len(), "Polling background tasks");
74
75        let res = current_futures.poll_next_unpin(cx);
76        if let std::task::Poll::Ready(Some(_)) = res {
77            // we made progress, so try again
78            trace!("Polling background tasks finished, manually re-waking");
79            cx.waker().wake_by_ref();
80        }
81
82        let mut self_ref = self.0.borrow_mut();
83        trace!(
84            n_new_tasks = self_ref.new_futures.len(),
85            "Polling background tasks done"
86        );
87        current_futures.extend(self_ref.new_futures.drain(..));
88        self_ref.futures = Some(current_futures);
89        std::task::Poll::Pending
90    }
91}