parallel_stream/par_stream/
map.rs1use async_std::channel::{self, Receiver};
3use async_std::future::Future;
4use async_std::task;
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::ParallelStream;
10
11pin_project_lite::pin_project! {
12 #[derive(Debug)]
14 pub struct Map<T> {
15 #[pin]
16 receiver: Receiver<T>,
17 limit: Option<usize>,
18 }
19}
20
21impl<T: Send + 'static> Map<T> {
22 pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
24 where
25 S: ParallelStream,
26 F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
27 Fut: Future<Output = T> + Send,
28 {
29 let (sender, receiver) = channel::bounded(1);
30 let limit = stream.get_limit();
31 task::spawn(async move {
32 while let Some(item) = stream.next().await {
33 let sender = sender.clone();
34 task::spawn(async move {
35 let res = f(item).await;
36 sender.send(res).await.expect("message failed to send");
37 });
38 }
39 });
40 Map { receiver, limit }
41 }
42}
43
44impl<T: Send + 'static> ParallelStream for Map<T> {
45 type Item = T;
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47 use async_std::prelude::*;
48 let this = self.project();
49 this.receiver.poll_next(cx)
50 }
51
52 fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
53 self.limit = limit.into();
54 self
55 }
56
57 fn get_limit(&self) -> Option<usize> {
58 self.limit
59 }
60}
61
62#[async_std::test]
63async fn smoke() {
64 use async_std::prelude::*;
65 let s = async_std::stream::repeat(5usize).take(3);
66 let mut output = vec![];
67 let mut stream = crate::from_stream(s).map(|n| async move { n * 2 });
68 while let Some(n) = stream.next().await {
69 output.push(n);
70 }
71 assert_eq!(output, vec![10usize; 3]);
72}