async_borrow/
scope.rs

1use std::{future::Future, ops::{Deref, DerefMut}, pin::Pin};
2
3use futures::{stream::FusedStream, Stream, StreamExt};
4
5// MARK: Spawner
6
7/// Spawning handle for an Anchor.
8///
9/// *can be safely broadened as it can only spawn tasks.*
10#[derive(Debug, Clone)]
11pub struct Spawner</* In */ 'env, /* In */ T = ()>(
12    futures::channel::mpsc::UnboundedSender</* In */ Pin<Box<dyn Future<Output = T> + 'env>>>,
13);
14
15impl<'env, T> Spawner<'env, T> {
16    pub fn spawn(
17        &mut self,
18        future: impl Future<Output = T> + 'env,
19    ) -> Result<(), futures::channel::mpsc::SendError> {
20        self.0.start_send(Box::pin(future))
21    }
22}
23
24// MARK: Anchor
25
26/// An anchor to handle safe scoped task "spawning".
27#[derive(Debug)]
28pub struct Anchor</* Mix */ 'env, /* Mix */ T = ()> {
29    receiver: futures::channel::mpsc::UnboundedReceiver<
30        /* Out */ Pin<Box<dyn Future<Output = T> + 'env>>,
31    >,
32    pub spawner: Spawner</* In */ 'env, /* In */ T>,
33}
34
35impl<'env, T> Anchor<'env, T> {
36    pub fn new() -> Self {
37        let (sender, receiver) = futures::channel::mpsc::unbounded();
38        Anchor {
39            receiver,
40            spawner: Spawner(sender),
41        }
42    }
43
44    pub fn stream(self) -> Pool<'env, T> {
45        Pool {
46            receiver: self.receiver,
47            tasks: futures::stream::FuturesUnordered::new(),
48        }
49    }
50}
51
52impl<'env, T> Deref for Anchor<'env, T> {
53    type Target = Spawner<'env, T>;
54
55    fn deref(&self) -> &Self::Target {
56        &self.spawner
57    }
58}
59
60impl<'env, T> DerefMut for Anchor<'env, T> {
61    fn deref_mut(&mut self) -> &mut Self::Target {
62        &mut self.spawner
63    }
64}
65
66// MARK: Pool
67
68/// A future for awaiting a collection of futures.
69///
70/// *can safely be narrowed as no more tasks can be spawned to it.*
71#[derive(Debug)]
72#[must_use = "streams do nothing unless polled"]
73pub struct Pool</* Out */ 'env, /* Out */ T = ()> {
74    receiver: futures::channel::mpsc::UnboundedReceiver<
75        /* Out */ Pin<Box<dyn Future<Output = T> + 'env>>,
76    >,
77    tasks:
78        futures::stream::FuturesUnordered</* Out */ Pin<Box<dyn Future<Output = T> + 'env>>>,
79}
80
81impl<'env, T> Stream for Pool<'env, T> {
82    type Item = T;
83
84    fn poll_next(
85        mut self: Pin<&mut Self>,
86        cx: &mut std::task::Context<'_>,
87    ) -> std::task::Poll<Option<Self::Item>> {
88        use std::task::Poll;
89        if self.receiver.is_terminated() {
90            // only the tasks now
91            return self.tasks.poll_next_unpin(cx);
92        }
93        loop {
94            match self.receiver.poll_next_unpin(cx) {
95                Poll::Ready(None) => {
96                    if self.tasks.is_terminated() {
97                        // end stream
98                        return Poll::Ready(None);
99                    } else {
100                        break;
101                    }
102                }
103                Poll::Ready(Some(task)) => {
104                    self.tasks.push(task);
105                    continue;
106                }
107                Poll::Pending => break,
108            }
109            #[allow(unreachable_code)]
110            {
111                unreachable!()
112            }
113        }
114        // we have awaited all we can from the receiver
115        if !self.tasks.is_terminated() {
116            // more tasks to run
117            match self.tasks.poll_next_unpin(cx) {
118                Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
119                Poll::Ready(None) => {
120                    if self.receiver.is_terminated() {
121                        // end stream
122                        return Poll::Ready(None);
123                    }
124                }
125                Poll::Pending => (),
126            }
127        };
128        // we have awaited all we can for the tasks
129        Poll::Pending
130    }
131}
132
133impl<'env, T> FusedStream for Pool<'env, T> {
134    fn is_terminated(&self) -> bool {
135        self.tasks.is_terminated() && self.receiver.is_terminated()
136    }
137}