futures_concurrency/concurrent_stream/
map.rs1use pin_project::pin_project;
2
3use super::{ConcurrentStream, Consumer};
4use core::num::NonZeroUsize;
5use core::{
6 future::Future,
7 marker::PhantomData,
8 pin::Pin,
9 task::{ready, Context, Poll},
10};
11
12#[derive(Debug)]
14pub struct Map<CS, F, FutT, T, FutB, B>
15where
16 CS: ConcurrentStream<Item = T, Future = FutT>,
17 F: Fn(T) -> FutB,
18 F: Clone,
19 FutT: Future<Output = T>,
20 FutB: Future<Output = B>,
21{
22 inner: CS,
23 f: F,
24 _phantom: PhantomData<(FutT, T, FutB, B)>,
25}
26
27impl<CS, F, FutT, T, FutB, B> Map<CS, F, FutT, T, FutB, B>
28where
29 CS: ConcurrentStream<Item = T, Future = FutT>,
30 F: Fn(T) -> FutB,
31 F: Clone,
32 FutT: Future<Output = T>,
33 FutB: Future<Output = B>,
34{
35 pub(crate) fn new(inner: CS, f: F) -> Self {
36 Self {
37 inner,
38 f,
39 _phantom: PhantomData,
40 }
41 }
42}
43
44impl<CS, F, FutT, T, FutB, B> ConcurrentStream for Map<CS, F, FutT, T, FutB, B>
45where
46 CS: ConcurrentStream<Item = T, Future = FutT>,
47 F: Fn(T) -> FutB,
48 F: Clone,
49 FutT: Future<Output = T>,
50 FutB: Future<Output = B>,
51{
52 type Future = MapFuture<F, FutT, T, FutB, B>;
53 type Item = B;
54
55 async fn drive<C>(self, consumer: C) -> C::Output
56 where
57 C: Consumer<Self::Item, Self::Future>,
58 {
59 let consumer = MapConsumer {
60 inner: consumer,
61 f: self.f,
62 _phantom: PhantomData,
63 };
64 self.inner.drive(consumer).await
65 }
66
67 fn concurrency_limit(&self) -> Option<NonZeroUsize> {
68 self.inner.concurrency_limit()
69 }
70
71 fn size_hint(&self) -> (usize, Option<usize>) {
72 self.inner.size_hint()
73 }
74}
75
76#[pin_project]
77pub struct MapConsumer<C, F, FutT, T, FutB, B>
78where
79 FutT: Future<Output = T>,
80 C: Consumer<B, MapFuture<F, FutT, T, FutB, B>>,
81 F: Fn(T) -> FutB,
82 F: Clone,
83 FutB: Future<Output = B>,
84{
85 #[pin]
86 inner: C,
87 f: F,
88 _phantom: PhantomData<(FutT, T, FutB, B)>,
89}
90
91impl<C, F, FutT, T, FutB, B> Consumer<T, FutT> for MapConsumer<C, F, FutT, T, FutB, B>
92where
93 FutT: Future<Output = T>,
94 C: Consumer<B, MapFuture<F, FutT, T, FutB, B>>,
95 F: Fn(T) -> FutB,
96 F: Clone,
97 FutB: Future<Output = B>,
98{
99 type Output = C::Output;
100
101 async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
102 let this = self.project();
103 this.inner.progress().await
104 }
105
106 async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
107 let this = self.project();
108 let fut = MapFuture::new(this.f.clone(), future);
109 this.inner.send(fut).await
110 }
111
112 async fn flush(self: Pin<&mut Self>) -> Self::Output {
113 let this = self.project();
114 this.inner.flush().await
115 }
116}
117
118#[derive(Debug)]
120pub struct MapFuture<F, FutT, T, FutB, B>
121where
122 FutT: Future<Output = T>,
123 F: Fn(T) -> FutB,
124 FutB: Future<Output = B>,
125{
126 done: bool,
127 f: F,
128 fut_t: Option<FutT>,
129 fut_b: Option<FutB>,
130}
131
132impl<F, FutT, T, FutB, B> MapFuture<F, FutT, T, FutB, B>
133where
134 FutT: Future<Output = T>,
135 F: Fn(T) -> FutB,
136 FutB: Future<Output = B>,
137{
138 fn new(f: F, fut_t: FutT) -> Self {
139 Self {
140 done: false,
141 f,
142 fut_t: Some(fut_t),
143 fut_b: None,
144 }
145 }
146}
147
148impl<F, FutT, T, FutB, B> Future for MapFuture<F, FutT, T, FutB, B>
149where
150 FutT: Future<Output = T>,
151 F: Fn(T) -> FutB,
152 FutB: Future<Output = B>,
153{
154 type Output = B;
155
156 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 let this = unsafe { self.get_unchecked_mut() };
159 if this.done {
160 panic!("future has already been polled to completion once");
161 }
162
163 if let Some(fut) = this.fut_t.as_mut() {
165 let t = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx));
167 let fut_b = (this.f)(t);
168 this.fut_t = None;
169 this.fut_b = Some(fut_b);
170 }
171
172 if let Some(fut) = this.fut_b.as_mut() {
174 let t = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx));
176 this.done = true;
177 return Poll::Ready(t);
178 }
179
180 unreachable!("neither future `a` nor future `b` were ready");
181 }
182}