1use futures_core::{ready, Stream};
2use pin_project::{pin_project, project};
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9#[must_use = "futures do nothing unless polled"]
11#[pin_project]
12pub struct StreamProcessor<S, P, F>
13where
14 F: Future,
15{
16 #[pin]
17 receiver: S,
18 processor: P,
19 #[pin]
20 state: State<F>,
21}
22
23impl<S, P, F> StreamProcessor<S, P, F>
24where
25 F: Future,
26{
27 pub fn new(receiver: S, processor: P) -> Self {
28 StreamProcessor {
29 receiver,
30 processor,
31 state: State::WaitingForItem,
32 }
33 }
34}
35
36#[pin_project]
37enum State<F> {
38 WaitingForItem,
39 RunningProcessor(#[pin] F),
40}
41
42impl<S, P, F> Future for StreamProcessor<S, P, F>
43where
44 S: Stream,
45 P: FnMut(S::Item) -> F,
46 F: Future,
47{
48 type Output = ();
49
50 #[project]
51 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
52 loop {
53 let this = self.as_mut().project();
54
55 #[project]
56 let next = match this.state.project() {
57 State::WaitingForItem => {
58 if let Some(item) = ready!(this.receiver.poll_next(ctx)) {
59 let fut = (this.processor)(item);
60 State::RunningProcessor(fut)
61 } else {
62 return Poll::Ready(());
63 }
64 }
65 State::RunningProcessor(fut) => {
66 ready!(fut.poll(ctx));
67 State::WaitingForItem
68 }
69 };
70
71 self.as_mut().project().state.set(next);
72 }
73 }
74}