futures_concurrency/concurrent_stream/
from_concurrent_stream.rs

1use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
2#[cfg(all(feature = "alloc", not(feature = "std")))]
3use alloc::vec::Vec;
4use core::future::Future;
5use core::pin::Pin;
6use futures_buffered::FuturesUnordered;
7use futures_lite::StreamExt;
8use pin_project::pin_project;
9
10/// Conversion from a [`ConcurrentStream`]
11#[allow(async_fn_in_trait)]
12pub trait FromConcurrentStream<A>: Sized {
13    /// Creates a value from a concurrent iterator.
14    async fn from_concurrent_stream<T>(iter: T) -> Self
15    where
16        T: IntoConcurrentStream<Item = A>;
17}
18
19impl<T> FromConcurrentStream<T> for Vec<T> {
20    async fn from_concurrent_stream<S>(iter: S) -> Self
21    where
22        S: IntoConcurrentStream<Item = T>,
23    {
24        let stream = iter.into_co_stream();
25        let mut output = Vec::with_capacity(stream.size_hint().1.unwrap_or_default());
26        stream.drive(VecConsumer::new(&mut output)).await;
27        output
28    }
29}
30
31impl<T, E> FromConcurrentStream<Result<T, E>> for Result<Vec<T>, E> {
32    async fn from_concurrent_stream<S>(iter: S) -> Self
33    where
34        S: IntoConcurrentStream<Item = Result<T, E>>,
35    {
36        let stream = iter.into_co_stream();
37        let mut output = Ok(Vec::with_capacity(stream.size_hint().1.unwrap_or_default()));
38        stream.drive(ResultVecConsumer::new(&mut output)).await;
39        output
40    }
41}
42
43// TODO: replace this with a generalized `fold` operation
44#[pin_project]
45pub(crate) struct VecConsumer<'a, Fut: Future> {
46    #[pin]
47    group: FuturesUnordered<Fut>,
48    output: &'a mut Vec<Fut::Output>,
49}
50
51impl<'a, Fut: Future> VecConsumer<'a, Fut> {
52    pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
53        Self {
54            group: FuturesUnordered::new(),
55            output,
56        }
57    }
58}
59
60impl<Item, Fut> Consumer<Item, Fut> for VecConsumer<'_, Fut>
61where
62    Fut: Future<Output = Item>,
63{
64    type Output = ();
65
66    async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
67        let mut this = self.project();
68        // unbounded concurrency, so we just goooo
69        this.group.as_mut().push(future);
70        ConsumerState::Continue
71    }
72
73    async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
74        let mut this = self.project();
75        while let Some(item) = this.group.next().await {
76            this.output.push(item);
77        }
78        ConsumerState::Empty
79    }
80    async fn flush(self: Pin<&mut Self>) -> Self::Output {
81        let mut this = self.project();
82        while let Some(item) = this.group.next().await {
83            this.output.push(item);
84        }
85    }
86}
87
88#[pin_project]
89pub(crate) struct ResultVecConsumer<'a, Fut: Future, T, E> {
90    #[pin]
91    group: FuturesUnordered<Fut>,
92    output: &'a mut Result<Vec<T>, E>,
93}
94
95impl<'a, Fut: Future, T, E> ResultVecConsumer<'a, Fut, T, E> {
96    pub(crate) fn new(output: &'a mut Result<Vec<T>, E>) -> Self {
97        Self {
98            group: FuturesUnordered::new(),
99            output,
100        }
101    }
102}
103
104impl<Fut, T, E> Consumer<Result<T, E>, Fut> for ResultVecConsumer<'_, Fut, T, E>
105where
106    Fut: Future<Output = Result<T, E>>,
107{
108    type Output = ();
109
110    async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
111        let mut this = self.project();
112        // unbounded concurrency, so we just goooo
113        this.group.as_mut().push(future);
114        ConsumerState::Continue
115    }
116
117    async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
118        let mut this = self.project();
119        let Ok(items) = this.output else {
120            return ConsumerState::Break;
121        };
122
123        while let Some(item) = this.group.next().await {
124            match item {
125                Ok(item) => {
126                    items.push(item);
127                }
128                Err(e) => {
129                    **this.output = Err(e);
130                    return ConsumerState::Break;
131                }
132            }
133        }
134        ConsumerState::Empty
135    }
136
137    async fn flush(self: Pin<&mut Self>) -> Self::Output {
138        self.progress().await;
139    }
140}
141
142#[cfg(test)]
143mod test {
144    use crate::prelude::*;
145    use futures_lite::stream;
146
147    #[test]
148    fn collect() {
149        futures_lite::future::block_on(async {
150            let v: Vec<_> = stream::repeat(1).co().take(5).collect().await;
151            assert_eq!(v, &[1, 1, 1, 1, 1]);
152        });
153    }
154
155    #[test]
156    fn collect_to_result_ok() {
157        futures_lite::future::block_on(async {
158            let v: Result<Vec<_>, ()> = stream::repeat(Ok(1)).co().take(5).collect().await;
159            assert_eq!(v, Ok(vec![1, 1, 1, 1, 1]));
160        });
161    }
162
163    #[test]
164    fn collect_to_result_err() {
165        futures_lite::future::block_on(async {
166            let v: Result<Vec<_>, _> = stream::repeat(Err::<u8, _>(()))
167                .co()
168                .take(5)
169                .collect()
170                .await;
171            assert_eq!(v, Err(()));
172        });
173    }
174}