futures_concurrency/concurrent_stream/
map.rs

1use pin_project::pin_project;
2
3use super::{ConcurrentStream, Consumer};
4use core::num::NonZeroUsize;
5use core::{
6    future::Future,
7    marker::PhantomData,
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11
12/// Convert items from one type into another
13#[derive(Debug)]
14pub struct Map<CS, F, FutT, T, FutB, B>
15where
16    CS: ConcurrentStream<Item = T, Future = FutT>,
17    F: Fn(T) -> FutB,
18    F: Clone,
19    FutT: Future<Output = T>,
20    FutB: Future<Output = B>,
21{
22    inner: CS,
23    f: F,
24    _phantom: PhantomData<(FutT, T, FutB, B)>,
25}
26
27impl<CS, F, FutT, T, FutB, B> Map<CS, F, FutT, T, FutB, B>
28where
29    CS: ConcurrentStream<Item = T, Future = FutT>,
30    F: Fn(T) -> FutB,
31    F: Clone,
32    FutT: Future<Output = T>,
33    FutB: Future<Output = B>,
34{
35    pub(crate) fn new(inner: CS, f: F) -> Self {
36        Self {
37            inner,
38            f,
39            _phantom: PhantomData,
40        }
41    }
42}
43
44impl<CS, F, FutT, T, FutB, B> ConcurrentStream for Map<CS, F, FutT, T, FutB, B>
45where
46    CS: ConcurrentStream<Item = T, Future = FutT>,
47    F: Fn(T) -> FutB,
48    F: Clone,
49    FutT: Future<Output = T>,
50    FutB: Future<Output = B>,
51{
52    type Future = MapFuture<F, FutT, T, FutB, B>;
53    type Item = B;
54
55    async fn drive<C>(self, consumer: C) -> C::Output
56    where
57        C: Consumer<Self::Item, Self::Future>,
58    {
59        let consumer = MapConsumer {
60            inner: consumer,
61            f: self.f,
62            _phantom: PhantomData,
63        };
64        self.inner.drive(consumer).await
65    }
66
67    fn concurrency_limit(&self) -> Option<NonZeroUsize> {
68        self.inner.concurrency_limit()
69    }
70
71    fn size_hint(&self) -> (usize, Option<usize>) {
72        self.inner.size_hint()
73    }
74}
75
76#[pin_project]
77pub struct MapConsumer<C, F, FutT, T, FutB, B>
78where
79    FutT: Future<Output = T>,
80    C: Consumer<B, MapFuture<F, FutT, T, FutB, B>>,
81    F: Fn(T) -> FutB,
82    F: Clone,
83    FutB: Future<Output = B>,
84{
85    #[pin]
86    inner: C,
87    f: F,
88    _phantom: PhantomData<(FutT, T, FutB, B)>,
89}
90
91impl<C, F, FutT, T, FutB, B> Consumer<T, FutT> for MapConsumer<C, F, FutT, T, FutB, B>
92where
93    FutT: Future<Output = T>,
94    C: Consumer<B, MapFuture<F, FutT, T, FutB, B>>,
95    F: Fn(T) -> FutB,
96    F: Clone,
97    FutB: Future<Output = B>,
98{
99    type Output = C::Output;
100
101    async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
102        let this = self.project();
103        this.inner.progress().await
104    }
105
106    async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
107        let this = self.project();
108        let fut = MapFuture::new(this.f.clone(), future);
109        this.inner.send(fut).await
110    }
111
112    async fn flush(self: Pin<&mut Self>) -> Self::Output {
113        let this = self.project();
114        this.inner.flush().await
115    }
116}
117
118/// Takes a future and maps it to another future via a closure
119#[derive(Debug)]
120pub struct MapFuture<F, FutT, T, FutB, B>
121where
122    FutT: Future<Output = T>,
123    F: Fn(T) -> FutB,
124    FutB: Future<Output = B>,
125{
126    done: bool,
127    f: F,
128    fut_t: Option<FutT>,
129    fut_b: Option<FutB>,
130}
131
132impl<F, FutT, T, FutB, B> MapFuture<F, FutT, T, FutB, B>
133where
134    FutT: Future<Output = T>,
135    F: Fn(T) -> FutB,
136    FutB: Future<Output = B>,
137{
138    fn new(f: F, fut_t: FutT) -> Self {
139        Self {
140            done: false,
141            f,
142            fut_t: Some(fut_t),
143            fut_b: None,
144        }
145    }
146}
147
148impl<F, FutT, T, FutB, B> Future for MapFuture<F, FutT, T, FutB, B>
149where
150    FutT: Future<Output = T>,
151    F: Fn(T) -> FutB,
152    FutB: Future<Output = B>,
153{
154    type Output = B;
155
156    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157        // SAFETY: we need to access the inner future's fields to project them
158        let this = unsafe { self.get_unchecked_mut() };
159        if this.done {
160            panic!("future has already been polled to completion once");
161        }
162
163        // Poll forward the future containing the value of `T`
164        if let Some(fut) = this.fut_t.as_mut() {
165            // SAFETY: we're pin projecting here
166            let t = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx));
167            let fut_b = (this.f)(t);
168            this.fut_t = None;
169            this.fut_b = Some(fut_b);
170        }
171
172        // Poll forward the future returned by the closure
173        if let Some(fut) = this.fut_b.as_mut() {
174            // SAFETY: we're pin projecting here
175            let t = ready!(unsafe { Pin::new_unchecked(fut) }.poll(cx));
176            this.done = true;
177            return Poll::Ready(t);
178        }
179
180        unreachable!("neither future `a` nor future `b` were ready");
181    }
182}