parallel_stream/par_stream/
map.rs

1// use async_std::prelude::*;
2use async_std::channel::{self, Receiver};
3use async_std::future::Future;
4use async_std::task;
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::ParallelStream;
10
11pin_project_lite::pin_project! {
12    /// A parallel stream that maps value of another stream with a function.
13    #[derive(Debug)]
14    pub struct Map<T> {
15        #[pin]
16        receiver: Receiver<T>,
17        limit: Option<usize>,
18    }
19}
20
21impl<T: Send + 'static> Map<T> {
22    /// Create a new instance of `Map`.
23    pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
24    where
25        S: ParallelStream,
26        F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
27        Fut: Future<Output = T> + Send,
28    {
29        let (sender, receiver) = channel::bounded(1);
30        let limit = stream.get_limit();
31        task::spawn(async move {
32            while let Some(item) = stream.next().await {
33                let sender = sender.clone();
34                task::spawn(async move {
35                    let res = f(item).await;
36                    sender.send(res).await.expect("message failed to send");
37                });
38            }
39        });
40        Map { receiver, limit }
41    }
42}
43
44impl<T: Send + 'static> ParallelStream for Map<T> {
45    type Item = T;
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47        use async_std::prelude::*;
48        let this = self.project();
49        this.receiver.poll_next(cx)
50    }
51
52    fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
53        self.limit = limit.into();
54        self
55    }
56
57    fn get_limit(&self) -> Option<usize> {
58        self.limit
59    }
60}
61
62#[async_std::test]
63async fn smoke() {
64    use async_std::prelude::*;
65    let s = async_std::stream::repeat(5usize).take(3);
66    let mut output = vec![];
67    let mut stream = crate::from_stream(s).map(|n| async move { n * 2 });
68    while let Some(n) = stream.next().await {
69        output.push(n);
70    }
71    assert_eq!(output, vec![10usize; 3]);
72}