async_hofs/
stream.rs

1use crate::async_util::{ready, OptionPinned};
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::Stream;
6use pin_project::pin_project;
7
8pub trait AsyncMapExt<T>: Sized {
9    /// Basically same as [`StreamExt::map`], but it accepts closure that returns
10    /// [`Future`] and creates new [`Stream`]
11    ///
12    /// [`Future`]: core::future::Future
13    /// [`Stream`]: futures_core::Stream
14    /// [`StreamExt::map`]: https://docs.rs/tokio-stream/0.1.9/tokio_stream/trait.StreamExt.html#method.map
15    ///
16    /// # Examples
17    ///
18    /// ```
19    /// # #[tokio::main]
20    /// # async fn main() {
21    /// use async_hofs::prelude::*;
22    /// use tokio_stream::StreamExt; // for .collect
23    ///
24    /// assert_eq!(
25    ///     tokio_stream::iter(vec![1, 2])
26    ///         .async_map(|x| async move { x + 1 })
27    ///         .collect::<Vec<_>>()
28    ///         .await,
29    ///     vec![2, 3],
30    /// );
31    /// # }
32    fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
33    where
34        TFn: FnMut(T) -> TFuture,
35        TFuture: Future<Output = U>;
36}
37
38impl<TStream, T> AsyncMapExt<T> for TStream
39where
40    TStream: Stream<Item = T>,
41{
42    fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
43    where
44        TFn: FnMut(T) -> TFuture,
45        TFuture: Future<Output = U>,
46    {
47        AsyncMap::new(self, f)
48    }
49}
50
51#[doc(hidden)]
52#[pin_project]
53pub struct AsyncMap<TStream, TFn, TFuture> {
54    #[pin]
55    stream: TStream,
56    #[pin]
57    mapper_future: OptionPinned<TFuture>,
58
59    mapper: TFn,
60}
61
62impl<TStream, TFn, TFuture> AsyncMap<TStream, TFn, TFuture> {
63    fn new(stream: TStream, f: TFn) -> Self {
64        Self {
65            stream,
66            mapper_future: OptionPinned::None,
67            mapper: f,
68        }
69    }
70}
71
72impl<TStream, TFn, T, U, TFuture> Stream for AsyncMap<TStream, TFn, TFuture>
73where
74    TFn: FnMut(T) -> TFuture,
75    TStream: Stream<Item = T>,
76    TFuture: Future<Output = U>,
77{
78    type Item = U;
79
80    fn poll_next(
81        self: Pin<&mut Self>,
82        cx: &mut Context<'_>,
83    ) -> Poll<Option<<Self as Stream>::Item>> {
84        let mut me = self.project();
85
86        if me.mapper_future.is_none() {
87            let item = match ready!(me.stream.poll_next(cx)) {
88                Some(item) => item,
89                None => return Poll::Ready(None),
90            };
91
92            let future = (me.mapper)(item);
93            me.mapper_future.set(OptionPinned::Some(future));
94        }
95
96        let future = me.mapper_future.as_mut().project().unwrap();
97        let output = ready!(future.poll(cx));
98
99        me.mapper_future.set(OptionPinned::None);
100
101        Poll::Ready(Some(output))
102    }
103}
104
105#[cfg(test)]
106#[tokio::test]
107async fn test() {
108    use tokio_stream::StreamExt;
109
110    assert_eq!(
111        tokio_stream::iter(vec![1, 2])
112            .async_map(|x| async move { x + 1 })
113            .collect::<Vec<_>>()
114            .await,
115        vec![2, 3],
116    );
117}