1use crate::async_util::{ready, OptionPinned};
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::Stream;
6use pin_project::pin_project;
7
8pub trait AsyncMapExt<T>: Sized {
9 fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
33 where
34 TFn: FnMut(T) -> TFuture,
35 TFuture: Future<Output = U>;
36}
37
38impl<TStream, T> AsyncMapExt<T> for TStream
39where
40 TStream: Stream<Item = T>,
41{
42 fn async_map<TFn, TFuture, U>(self, f: TFn) -> AsyncMap<Self, TFn, TFuture>
43 where
44 TFn: FnMut(T) -> TFuture,
45 TFuture: Future<Output = U>,
46 {
47 AsyncMap::new(self, f)
48 }
49}
50
51#[doc(hidden)]
52#[pin_project]
53pub struct AsyncMap<TStream, TFn, TFuture> {
54 #[pin]
55 stream: TStream,
56 #[pin]
57 mapper_future: OptionPinned<TFuture>,
58
59 mapper: TFn,
60}
61
62impl<TStream, TFn, TFuture> AsyncMap<TStream, TFn, TFuture> {
63 fn new(stream: TStream, f: TFn) -> Self {
64 Self {
65 stream,
66 mapper_future: OptionPinned::None,
67 mapper: f,
68 }
69 }
70}
71
72impl<TStream, TFn, T, U, TFuture> Stream for AsyncMap<TStream, TFn, TFuture>
73where
74 TFn: FnMut(T) -> TFuture,
75 TStream: Stream<Item = T>,
76 TFuture: Future<Output = U>,
77{
78 type Item = U;
79
80 fn poll_next(
81 self: Pin<&mut Self>,
82 cx: &mut Context<'_>,
83 ) -> Poll<Option<<Self as Stream>::Item>> {
84 let mut me = self.project();
85
86 if me.mapper_future.is_none() {
87 let item = match ready!(me.stream.poll_next(cx)) {
88 Some(item) => item,
89 None => return Poll::Ready(None),
90 };
91
92 let future = (me.mapper)(item);
93 me.mapper_future.set(OptionPinned::Some(future));
94 }
95
96 let future = me.mapper_future.as_mut().project().unwrap();
97 let output = ready!(future.poll(cx));
98
99 me.mapper_future.set(OptionPinned::None);
100
101 Poll::Ready(Some(output))
102 }
103}
104
105#[cfg(test)]
106#[tokio::test]
107async fn test() {
108 use tokio_stream::StreamExt;
109
110 assert_eq!(
111 tokio_stream::iter(vec![1, 2])
112 .async_map(|x| async move { x + 1 })
113 .collect::<Vec<_>>()
114 .await,
115 vec![2, 3],
116 );
117}