tokio_par_stream/
futures_unordered.rs

1use futures::stream::{FusedStream, FuturesUnordered};
2use futures::{Stream, StreamExt};
3use std::fmt::{Debug, Formatter};
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::runtime::Handle as TokioHandle;
8use tokio::task::JoinHandle;
9
10// don't use JoinSet: https://github.com/tokio-rs/tokio/issues/5564
11#[must_use = "streams do nothing unless polled"]
12pub struct FuturesParallelUnordered<F: Future> {
13    futures: FuturesUnordered<JoinHandle<F::Output>>,
14    handle: TokioHandle,
15}
16
17impl<F: Future> Unpin for FuturesParallelUnordered<F> {}
18
19impl<F: Future> Debug for FuturesParallelUnordered<F> {
20    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("FuturesParallelUnordered")
22            .finish_non_exhaustive()
23    }
24}
25
26impl<Fut: Future> FuturesParallelUnordered<Fut> {
27    /// Constructs a new, empty [`FuturesParallelUnordered`].
28    ///
29    /// The returned [`FuturesParallelUnordered`] does not contain any futures.
30    /// In this state, [`FuturesParallelUnordered::poll_next`](Stream::poll_next) will
31    /// return [`Poll::Ready(None)`](Poll::Ready).
32    ///
33    /// must be called within a tokio runtime
34    pub fn new() -> Self {
35        Self {
36            futures: FuturesUnordered::new(),
37            handle: TokioHandle::current(),
38        }
39    }
40    /// Returns the number of futures contained in the set.
41    ///
42    /// This represents the total number of in-flight futures.
43    pub fn len(&self) -> usize {
44        self.futures.len()
45    }
46
47    /// Returns `true` if the set contains no futures.
48    pub fn is_empty(&self) -> bool {
49        self.futures.is_empty()
50    }
51}
52
53impl<Fut: Future + Send + 'static> FuturesParallelUnordered<Fut>
54where
55    Fut::Output: Send,
56{
57    /// Push a future into the set.
58    ///
59    /// This method adds the given future to the set. This method may
60    /// call [`poll`](Future::poll) on the submitted future.
61    /// and will spawn in it on the current executor
62    ///
63    /// tasks will be scheduled on the same runtime that the [`FuturesParallelUnordered`]
64    /// was originally made in
65    pub fn push(&self, future: Fut) {
66        self.add_join_handle(self.handle().spawn(future));
67    }
68
69    /// Returns a handle to this stream's spawner.
70    pub const fn handle(&self) -> &TokioHandle {
71        &self.handle
72    }
73
74    /// adds any JoinHandle to the executor
75    pub fn add_join_handle(&self, jh: JoinHandle<Fut::Output>) {
76        self.futures.push(jh)
77    }
78}
79
80impl<Fut: Future> Default for FuturesParallelUnordered<Fut> {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl<Fut: Future> Stream for FuturesParallelUnordered<Fut>
87where
88    Fut::Output: 'static,
89{
90    type Item = Fut::Output;
91
92    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        match self.futures.poll_next_unpin(cx) {
94            Poll::Ready(x) => Poll::Ready(x.map(|res| {
95                res.unwrap_or_else(|join_err| {
96                    match join_err.try_into_panic() {
97                        // we forward the panic onto the other thread
98                        Ok(panic) => std::panic::resume_unwind(panic),
99                        // since we never give out the abort handle so its unexpected that a task is canceled
100                        Err(other) => panic!("could not get the next future due to {other}"),
101                    }
102                })
103            })),
104            Poll::Pending => Poll::Pending,
105        }
106    }
107
108    fn size_hint(&self) -> (usize, Option<usize>) {
109        self.futures.size_hint()
110    }
111}
112
113impl<Fut: Future> FusedStream for FuturesParallelUnordered<Fut>
114where
115    Fut::Output: 'static,
116{
117    fn is_terminated(&self) -> bool {
118        self.futures.is_terminated()
119    }
120}
121
122impl<F: Future> Drop for FuturesParallelUnordered<F> {
123    fn drop(&mut self) {
124        for orphan in self.futures.iter_mut() {
125            orphan.abort()
126        }
127    }
128}
129
130impl<Fut: Future + Send + 'static> Extend<Fut> for FuturesParallelUnordered<Fut>
131where
132    Fut::Output: Send,
133{
134    fn extend<I>(&mut self, iter: I)
135    where
136        I: IntoIterator<Item = Fut>,
137    {
138        for item in iter {
139            self.push(item);
140        }
141    }
142}
143
144impl<Fut: Future + Send + 'static> FromIterator<Fut> for FuturesParallelUnordered<Fut>
145where
146    Fut::Output: Send,
147{
148    fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
149        let mut ret = FuturesParallelUnordered::new();
150        ret.extend(iter);
151        ret
152    }
153}