lazy_queue/
inner.rs

1use futures_core::{ready, Stream};
2use pin_project::{pin_project, project};
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9/// A `ForEach`-like future.
10#[must_use = "futures do nothing unless polled"]
11#[pin_project]
12pub struct StreamProcessor<S, P, F>
13where
14    F: Future,
15{
16    #[pin]
17    receiver: S,
18    processor: P,
19    #[pin]
20    state: State<F>,
21}
22
23impl<S, P, F> StreamProcessor<S, P, F>
24where
25    F: Future,
26{
27    pub fn new(receiver: S, processor: P) -> Self {
28        StreamProcessor {
29            receiver,
30            processor,
31            state: State::WaitingForItem,
32        }
33    }
34}
35
36#[pin_project]
37enum State<F> {
38    WaitingForItem,
39    RunningProcessor(#[pin] F),
40}
41
42impl<S, P, F> Future for StreamProcessor<S, P, F>
43where
44    S: Stream,
45    P: FnMut(S::Item) -> F,
46    F: Future,
47{
48    type Output = ();
49
50    #[project]
51    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
52        loop {
53            let this = self.as_mut().project();
54
55            #[project]
56            let next = match this.state.project() {
57                State::WaitingForItem => {
58                    if let Some(item) = ready!(this.receiver.poll_next(ctx)) {
59                        let fut = (this.processor)(item);
60                        State::RunningProcessor(fut)
61                    } else {
62                        return Poll::Ready(());
63                    }
64                }
65                State::RunningProcessor(fut) => {
66                    ready!(fut.poll(ctx));
67                    State::WaitingForItem
68                }
69            };
70
71            self.as_mut().project().state.set(next);
72        }
73    }
74}