1use std::{future::Future, ops::{Deref, DerefMut}, pin::Pin};
2
3use futures::{stream::FusedStream, Stream, StreamExt};
4
5#[derive(Debug, Clone)]
11pub struct Spawner<'env, T = ()>(
12 futures::channel::mpsc::UnboundedSender<Pin<Box<dyn Future<Output = T> + 'env>>>,
13);
14
15impl<'env, T> Spawner<'env, T> {
16 pub fn spawn(
17 &mut self,
18 future: impl Future<Output = T> + 'env,
19 ) -> Result<(), futures::channel::mpsc::SendError> {
20 self.0.start_send(Box::pin(future))
21 }
22}
23
24#[derive(Debug)]
28pub struct Anchor<'env, T = ()> {
29 receiver: futures::channel::mpsc::UnboundedReceiver<
30 Pin<Box<dyn Future<Output = T> + 'env>>,
31 >,
32 pub spawner: Spawner<'env, T>,
33}
34
35impl<'env, T> Anchor<'env, T> {
36 pub fn new() -> Self {
37 let (sender, receiver) = futures::channel::mpsc::unbounded();
38 Anchor {
39 receiver,
40 spawner: Spawner(sender),
41 }
42 }
43
44 pub fn stream(self) -> Pool<'env, T> {
45 Pool {
46 receiver: self.receiver,
47 tasks: futures::stream::FuturesUnordered::new(),
48 }
49 }
50}
51
52impl<'env, T> Deref for Anchor<'env, T> {
53 type Target = Spawner<'env, T>;
54
55 fn deref(&self) -> &Self::Target {
56 &self.spawner
57 }
58}
59
60impl<'env, T> DerefMut for Anchor<'env, T> {
61 fn deref_mut(&mut self) -> &mut Self::Target {
62 &mut self.spawner
63 }
64}
65
66#[derive(Debug)]
72#[must_use = "streams do nothing unless polled"]
73pub struct Pool<'env, T = ()> {
74 receiver: futures::channel::mpsc::UnboundedReceiver<
75 Pin<Box<dyn Future<Output = T> + 'env>>,
76 >,
77 tasks:
78 futures::stream::FuturesUnordered<Pin<Box<dyn Future<Output = T> + 'env>>>,
79}
80
81impl<'env, T> Stream for Pool<'env, T> {
82 type Item = T;
83
84 fn poll_next(
85 mut self: Pin<&mut Self>,
86 cx: &mut std::task::Context<'_>,
87 ) -> std::task::Poll<Option<Self::Item>> {
88 use std::task::Poll;
89 if self.receiver.is_terminated() {
90 return self.tasks.poll_next_unpin(cx);
92 }
93 loop {
94 match self.receiver.poll_next_unpin(cx) {
95 Poll::Ready(None) => {
96 if self.tasks.is_terminated() {
97 return Poll::Ready(None);
99 } else {
100 break;
101 }
102 }
103 Poll::Ready(Some(task)) => {
104 self.tasks.push(task);
105 continue;
106 }
107 Poll::Pending => break,
108 }
109 #[allow(unreachable_code)]
110 {
111 unreachable!()
112 }
113 }
114 if !self.tasks.is_terminated() {
116 match self.tasks.poll_next_unpin(cx) {
118 Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
119 Poll::Ready(None) => {
120 if self.receiver.is_terminated() {
121 return Poll::Ready(None);
123 }
124 }
125 Poll::Pending => (),
126 }
127 };
128 Poll::Pending
130 }
131}
132
133impl<'env, T> FusedStream for Pool<'env, T> {
134 fn is_terminated(&self) -> bool {
135 self.tasks.is_terminated() && self.receiver.is_terminated()
136 }
137}