parallel_stream/
vec.rs

1//! Parallel types for `Vec`.
2//!
3//! You will rarely need to interact with this module directly unless you need to
4//! name one of the stream types.
5
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::{from_stream, FromParallelStream, FromStream, IntoParallelStream, ParallelStream};
11
12use async_std::stream::{from_iter, FromIter};
13use std::vec;
14
15pin_project_lite::pin_project! {
16    /// Parallel stream that moves out of a vector.
17    #[derive(Debug)]
18    pub struct IntoParStream<T> {
19        #[pin]
20        stream: FromStream<FromIter<vec::IntoIter<T>>>,
21        limit: Option<usize>,
22    }
23}
24
25impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
26    type Item = T;
27    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28        let this = self.project();
29        this.stream.poll_next(cx)
30    }
31
32    fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
33        self.limit = limit.into();
34        self
35    }
36
37    fn get_limit(&self) -> Option<usize> {
38        self.limit
39    }
40}
41
42impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
43    type Item = T;
44    type IntoParStream = IntoParStream<T>;
45
46    #[inline]
47    fn into_par_stream(self) -> Self::IntoParStream {
48        IntoParStream {
49            stream: from_stream(from_iter(self)),
50            limit: None,
51        }
52    }
53}
54
55/// Collect items from a parallel stream into a vector.
56///
57/// # Examples
58/// ```
59/// use parallel_stream::prelude::*;
60///
61/// #[async_std::main]
62/// async fn main() {
63///     let v = vec![1, 2, 3, 4];
64///     let mut stream = v.into_par_stream().map(|n| async move { n * n });
65///     let mut res = Vec::from_par_stream(stream).await;
66///     res.sort();
67///     assert_eq!(res, vec![1, 4, 9, 16]);
68/// }
69/// ```
70impl<T: Send> FromParallelStream<T> for Vec<T> {
71    fn from_par_stream<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
72    where
73        S: IntoParallelStream<Item = T> + Send + 'a,
74    {
75        Box::pin(async move {
76            let mut stream = stream.into_par_stream();
77            let mut res = Vec::with_capacity(0);
78            while let Some(item) = stream.next().await {
79                res.push(item);
80            }
81            res
82        })
83    }
84}
85
86#[async_std::test]
87async fn smoke() {
88    use crate::IntoParallelStream;
89
90    let v = vec![1, 2, 3, 4];
91    let mut stream = v.into_par_stream().map(|n| async move { n * n });
92
93    let mut out = vec![];
94    while let Some(n) = stream.next().await {
95        out.push(n);
96    }
97    out.sort_unstable();
98
99    assert_eq!(out, vec![1usize, 4, 9, 16]);
100}