async_hofs/
iter.rs

1use crate::async_util::{ready, OptionPinned};
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::Stream;
6use pin_project::pin_project;
7
8pub trait AsyncMapExt<T>: Sized {
9    /// Basically same as [`Iterator::map`], but it accepts closure that returns
10    /// [`Future`] and creates new [`Stream`] instead of [`Iterator`].
11    ///
12    /// [`Iterator`]: core::iter::Iterator
13    /// [`Iterator::map`]: core::iter::Iterator::map
14    /// [`Future`]: core::future::Future
15    /// [`Stream`]: futures_core::Stream
16    ///
17    /// # Examples
18    ///
19    /// ```
20    /// # #[tokio::main]
21    /// # async fn main() {
22    /// use async_hofs::prelude::*;
23    /// use tokio_stream::StreamExt; // for .collect
24    ///
25    /// assert_eq!(
26    ///     vec![1, 2]
27    ///         .into_iter()
28    ///         .async_map(|x| async move { x + 1 })
29    ///         .collect::<Vec<_>>()
30    ///         .await,
31    ///     vec![2, 3],
32    /// );
33    /// # }
34    /// ```
35    fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
36    where
37        TFn: FnMut(T) -> TFuture,
38        TFuture: Future<Output = U>;
39}
40
41impl<TIter, T> AsyncMapExt<T> for TIter
42where
43    TIter: Iterator<Item = T>,
44{
45    fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
46    where
47        TFn: FnMut(T) -> TFuture,
48        TFuture: Future<Output = U>,
49    {
50        AsyncMap::new(self, f)
51    }
52}
53
54#[doc(hidden)]
55#[pin_project]
56pub struct AsyncMap<TIter, TFn, TFuture> {
57    #[pin]
58    mapper_future: OptionPinned<TFuture>,
59    mapper: TFn,
60    iter: TIter,
61}
62
63impl<TIter, TFn, TFuture> AsyncMap<TIter, TFn, TFuture> {
64    fn new(iter: TIter, f: TFn) -> Self {
65        Self {
66            mapper_future: OptionPinned::None,
67            mapper: f,
68            iter,
69        }
70    }
71}
72
73impl<TIter, TFn, T, U, TFuture> Stream for AsyncMap<TIter, TFn, TFuture>
74where
75    TFn: FnMut(T) -> TFuture,
76    TIter: Iterator<Item = T>,
77    TFuture: Future<Output = U>,
78{
79    type Item = U;
80
81    fn poll_next(
82        self: Pin<&mut Self>,
83        cx: &mut Context<'_>,
84    ) -> Poll<Option<<Self as Stream>::Item>> {
85        let mut me = self.project();
86
87        if me.mapper_future.is_none() {
88            let item = match me.iter.next() {
89                Some(x) => x,
90                None => return Poll::Ready(None),
91            };
92
93            let future = (me.mapper)(item);
94            me.mapper_future.set(OptionPinned::Some(future));
95        }
96
97        let future = me.mapper_future.as_mut().project().unwrap();
98        let output = ready!(future.poll(cx));
99
100        me.mapper_future.set(OptionPinned::None);
101
102        Poll::Ready(Some(output))
103    }
104}
105
106#[cfg(test)]
107#[tokio::test]
108async fn test() {
109    use tokio_stream::StreamExt;
110
111    assert_eq!(
112        vec![1, 2]
113            .into_iter()
114            .async_map(|x| async move { x + 1 })
115            .collect::<Vec<_>>()
116            .await,
117        vec![2, 3],
118    );
119}