1use crate::async_util::{ready, OptionPinned};
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::Stream;
6use pin_project::pin_project;
7
8pub trait AsyncMapExt<T>: Sized {
9 fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
36 where
37 TFn: FnMut(T) -> TFuture,
38 TFuture: Future<Output = U>;
39}
40
41impl<TIter, T> AsyncMapExt<T> for TIter
42where
43 TIter: Iterator<Item = T>,
44{
45 fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
46 where
47 TFn: FnMut(T) -> TFuture,
48 TFuture: Future<Output = U>,
49 {
50 AsyncMap::new(self, f)
51 }
52}
53
54#[doc(hidden)]
55#[pin_project]
56pub struct AsyncMap<TIter, TFn, TFuture> {
57 #[pin]
58 mapper_future: OptionPinned<TFuture>,
59 mapper: TFn,
60 iter: TIter,
61}
62
63impl<TIter, TFn, TFuture> AsyncMap<TIter, TFn, TFuture> {
64 fn new(iter: TIter, f: TFn) -> Self {
65 Self {
66 mapper_future: OptionPinned::None,
67 mapper: f,
68 iter,
69 }
70 }
71}
72
73impl<TIter, TFn, T, U, TFuture> Stream for AsyncMap<TIter, TFn, TFuture>
74where
75 TFn: FnMut(T) -> TFuture,
76 TIter: Iterator<Item = T>,
77 TFuture: Future<Output = U>,
78{
79 type Item = U;
80
81 fn poll_next(
82 self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 ) -> Poll<Option<<Self as Stream>::Item>> {
85 let mut me = self.project();
86
87 if me.mapper_future.is_none() {
88 let item = match me.iter.next() {
89 Some(x) => x,
90 None => return Poll::Ready(None),
91 };
92
93 let future = (me.mapper)(item);
94 me.mapper_future.set(OptionPinned::Some(future));
95 }
96
97 let future = me.mapper_future.as_mut().project().unwrap();
98 let output = ready!(future.poll(cx));
99
100 me.mapper_future.set(OptionPinned::None);
101
102 Poll::Ready(Some(output))
103 }
104}
105
106#[cfg(test)]
107#[tokio::test]
108async fn test() {
109 use tokio_stream::StreamExt;
110
111 assert_eq!(
112 vec![1, 2]
113 .into_iter()
114 .async_map(|x| async move { x + 1 })
115 .collect::<Vec<_>>()
116 .await,
117 vec![2, 3],
118 );
119}