1use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::{from_stream, FromParallelStream, FromStream, IntoParallelStream, ParallelStream};
11
12use async_std::stream::{from_iter, FromIter};
13use std::vec;
14
15pin_project_lite::pin_project! {
16 #[derive(Debug)]
18 pub struct IntoParStream<T> {
19 #[pin]
20 stream: FromStream<FromIter<vec::IntoIter<T>>>,
21 limit: Option<usize>,
22 }
23}
24
25impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
26 type Item = T;
27 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28 let this = self.project();
29 this.stream.poll_next(cx)
30 }
31
32 fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
33 self.limit = limit.into();
34 self
35 }
36
37 fn get_limit(&self) -> Option<usize> {
38 self.limit
39 }
40}
41
42impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
43 type Item = T;
44 type IntoParStream = IntoParStream<T>;
45
46 #[inline]
47 fn into_par_stream(self) -> Self::IntoParStream {
48 IntoParStream {
49 stream: from_stream(from_iter(self)),
50 limit: None,
51 }
52 }
53}
54
55impl<T: Send> FromParallelStream<T> for Vec<T> {
71 fn from_par_stream<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
72 where
73 S: IntoParallelStream<Item = T> + Send + 'a,
74 {
75 Box::pin(async move {
76 let mut stream = stream.into_par_stream();
77 let mut res = Vec::with_capacity(0);
78 while let Some(item) = stream.next().await {
79 res.push(item);
80 }
81 res
82 })
83 }
84}
85
86#[async_std::test]
87async fn smoke() {
88 use crate::IntoParallelStream;
89
90 let v = vec![1, 2, 3, 4];
91 let mut stream = v.into_par_stream().map(|n| async move { n * n });
92
93 let mut out = vec![];
94 while let Some(n) = stream.next().await {
95 out.push(n);
96 }
97 out.sort_unstable();
98
99 assert_eq!(out, vec![1usize, 4, 9, 16]);
100}