tokio_par_stream/
futures_unordered.rs1use futures::stream::{FusedStream, FuturesUnordered};
2use futures::{Stream, StreamExt};
3use std::fmt::{Debug, Formatter};
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::runtime::Handle as TokioHandle;
8use tokio::task::JoinHandle;
9
10#[must_use = "streams do nothing unless polled"]
12pub struct FuturesParallelUnordered<F: Future> {
13 futures: FuturesUnordered<JoinHandle<F::Output>>,
14 handle: TokioHandle,
15}
16
17impl<F: Future> Unpin for FuturesParallelUnordered<F> {}
18
19impl<F: Future> Debug for FuturesParallelUnordered<F> {
20 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("FuturesParallelUnordered")
22 .finish_non_exhaustive()
23 }
24}
25
26impl<Fut: Future> FuturesParallelUnordered<Fut> {
27 pub fn new() -> Self {
35 Self {
36 futures: FuturesUnordered::new(),
37 handle: TokioHandle::current(),
38 }
39 }
40 pub fn len(&self) -> usize {
44 self.futures.len()
45 }
46
47 pub fn is_empty(&self) -> bool {
49 self.futures.is_empty()
50 }
51}
52
53impl<Fut: Future + Send + 'static> FuturesParallelUnordered<Fut>
54where
55 Fut::Output: Send,
56{
57 pub fn push(&self, future: Fut) {
66 self.add_join_handle(self.handle().spawn(future));
67 }
68
69 pub const fn handle(&self) -> &TokioHandle {
71 &self.handle
72 }
73
74 pub fn add_join_handle(&self, jh: JoinHandle<Fut::Output>) {
76 self.futures.push(jh)
77 }
78}
79
80impl<Fut: Future> Default for FuturesParallelUnordered<Fut> {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl<Fut: Future> Stream for FuturesParallelUnordered<Fut>
87where
88 Fut::Output: 'static,
89{
90 type Item = Fut::Output;
91
92 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 match self.futures.poll_next_unpin(cx) {
94 Poll::Ready(x) => Poll::Ready(x.map(|res| {
95 res.unwrap_or_else(|join_err| {
96 match join_err.try_into_panic() {
97 Ok(panic) => std::panic::resume_unwind(panic),
99 Err(other) => panic!("could not get the next future due to {other}"),
101 }
102 })
103 })),
104 Poll::Pending => Poll::Pending,
105 }
106 }
107
108 fn size_hint(&self) -> (usize, Option<usize>) {
109 self.futures.size_hint()
110 }
111}
112
113impl<Fut: Future> FusedStream for FuturesParallelUnordered<Fut>
114where
115 Fut::Output: 'static,
116{
117 fn is_terminated(&self) -> bool {
118 self.futures.is_terminated()
119 }
120}
121
122impl<F: Future> Drop for FuturesParallelUnordered<F> {
123 fn drop(&mut self) {
124 for orphan in self.futures.iter_mut() {
125 orphan.abort()
126 }
127 }
128}
129
130impl<Fut: Future + Send + 'static> Extend<Fut> for FuturesParallelUnordered<Fut>
131where
132 Fut::Output: Send,
133{
134 fn extend<I>(&mut self, iter: I)
135 where
136 I: IntoIterator<Item = Fut>,
137 {
138 for item in iter {
139 self.push(item);
140 }
141 }
142}
143
144impl<Fut: Future + Send + 'static> FromIterator<Fut> for FuturesParallelUnordered<Fut>
145where
146 Fut::Output: Send,
147{
148 fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
149 let mut ret = FuturesParallelUnordered::new();
150 ret.extend(iter);
151 ret
152 }
153}