futures_concurrency/concurrent_stream/
from_concurrent_stream.rs1use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
2#[cfg(all(feature = "alloc", not(feature = "std")))]
3use alloc::vec::Vec;
4use core::future::Future;
5use core::pin::Pin;
6use futures_buffered::FuturesUnordered;
7use futures_lite::StreamExt;
8use pin_project::pin_project;
9
10#[allow(async_fn_in_trait)]
12pub trait FromConcurrentStream<A>: Sized {
13 async fn from_concurrent_stream<T>(iter: T) -> Self
15 where
16 T: IntoConcurrentStream<Item = A>;
17}
18
19impl<T> FromConcurrentStream<T> for Vec<T> {
20 async fn from_concurrent_stream<S>(iter: S) -> Self
21 where
22 S: IntoConcurrentStream<Item = T>,
23 {
24 let stream = iter.into_co_stream();
25 let mut output = Vec::with_capacity(stream.size_hint().1.unwrap_or_default());
26 stream.drive(VecConsumer::new(&mut output)).await;
27 output
28 }
29}
30
31impl<T, E> FromConcurrentStream<Result<T, E>> for Result<Vec<T>, E> {
32 async fn from_concurrent_stream<S>(iter: S) -> Self
33 where
34 S: IntoConcurrentStream<Item = Result<T, E>>,
35 {
36 let stream = iter.into_co_stream();
37 let mut output = Ok(Vec::with_capacity(stream.size_hint().1.unwrap_or_default()));
38 stream.drive(ResultVecConsumer::new(&mut output)).await;
39 output
40 }
41}
42
43#[pin_project]
45pub(crate) struct VecConsumer<'a, Fut: Future> {
46 #[pin]
47 group: FuturesUnordered<Fut>,
48 output: &'a mut Vec<Fut::Output>,
49}
50
51impl<'a, Fut: Future> VecConsumer<'a, Fut> {
52 pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
53 Self {
54 group: FuturesUnordered::new(),
55 output,
56 }
57 }
58}
59
60impl<Item, Fut> Consumer<Item, Fut> for VecConsumer<'_, Fut>
61where
62 Fut: Future<Output = Item>,
63{
64 type Output = ();
65
66 async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
67 let mut this = self.project();
68 this.group.as_mut().push(future);
70 ConsumerState::Continue
71 }
72
73 async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
74 let mut this = self.project();
75 while let Some(item) = this.group.next().await {
76 this.output.push(item);
77 }
78 ConsumerState::Empty
79 }
80 async fn flush(self: Pin<&mut Self>) -> Self::Output {
81 let mut this = self.project();
82 while let Some(item) = this.group.next().await {
83 this.output.push(item);
84 }
85 }
86}
87
88#[pin_project]
89pub(crate) struct ResultVecConsumer<'a, Fut: Future, T, E> {
90 #[pin]
91 group: FuturesUnordered<Fut>,
92 output: &'a mut Result<Vec<T>, E>,
93}
94
95impl<'a, Fut: Future, T, E> ResultVecConsumer<'a, Fut, T, E> {
96 pub(crate) fn new(output: &'a mut Result<Vec<T>, E>) -> Self {
97 Self {
98 group: FuturesUnordered::new(),
99 output,
100 }
101 }
102}
103
104impl<Fut, T, E> Consumer<Result<T, E>, Fut> for ResultVecConsumer<'_, Fut, T, E>
105where
106 Fut: Future<Output = Result<T, E>>,
107{
108 type Output = ();
109
110 async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
111 let mut this = self.project();
112 this.group.as_mut().push(future);
114 ConsumerState::Continue
115 }
116
117 async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
118 let mut this = self.project();
119 let Ok(items) = this.output else {
120 return ConsumerState::Break;
121 };
122
123 while let Some(item) = this.group.next().await {
124 match item {
125 Ok(item) => {
126 items.push(item);
127 }
128 Err(e) => {
129 **this.output = Err(e);
130 return ConsumerState::Break;
131 }
132 }
133 }
134 ConsumerState::Empty
135 }
136
137 async fn flush(self: Pin<&mut Self>) -> Self::Output {
138 self.progress().await;
139 }
140}
141
142#[cfg(test)]
143mod test {
144 use crate::prelude::*;
145 use futures_lite::stream;
146
147 #[test]
148 fn collect() {
149 futures_lite::future::block_on(async {
150 let v: Vec<_> = stream::repeat(1).co().take(5).collect().await;
151 assert_eq!(v, &[1, 1, 1, 1, 1]);
152 });
153 }
154
155 #[test]
156 fn collect_to_result_ok() {
157 futures_lite::future::block_on(async {
158 let v: Result<Vec<_>, ()> = stream::repeat(Ok(1)).co().take(5).collect().await;
159 assert_eq!(v, Ok(vec![1, 1, 1, 1, 1]));
160 });
161 }
162
163 #[test]
164 fn collect_to_result_err() {
165 futures_lite::future::block_on(async {
166 let v: Result<Vec<_>, _> = stream::repeat(Err::<u8, _>(()))
167 .co()
168 .take(5)
169 .collect()
170 .await;
171 assert_eq!(v, Err(()));
172 });
173 }
174}