x_bow/path_ext/
bind_for_each.rs

1use std::{future::Future, task::Poll};
2
3use futures_core::Stream;
4
5use crate::{until_change::UntilChange, Path, PathExt};
6
7pin_project_lite::pin_project! {
8    /// A [Future]. See [bind_for_each][crate::PathExt::bind_for_each].
9    pub struct BindForEach<'a, P: ?Sized, C, I>
10    {
11        path: &'a P,
12        #[pin]
13        until_change: UntilChange<'a>,
14        closure: C,
15        #[pin]
16        incoming: I
17    }
18}
19
20impl<'a, P: ?Sized, C, I> BindForEach<'a, P, C, I> {
21    pub(super) fn new(path: &'a P, until_change: UntilChange<'a>, closure: C, incoming: I) -> Self {
22        Self {
23            path,
24            until_change,
25            closure,
26            incoming,
27        }
28    }
29}
30
31impl<'a, P: Path + ?Sized, C: FnMut(&P::Out) + Unpin, I: Stream<Item = P::Out>> Future
32    for BindForEach<'a, P, C, I>
33where
34    P::Out: Sized,
35{
36    type Output = ();
37
38    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
39        let mut this = self.project();
40
41        let mut val = None;
42        loop {
43            match this.incoming.as_mut().poll_next(cx) {
44                Poll::Ready(Some(v)) => {
45                    val = Some(v);
46                }
47                Poll::Ready(None) => {
48                    return Poll::Ready(());
49                }
50                Poll::Pending => break,
51            }
52        }
53        if let Some(val) = val {
54            if let Some(bm) = this.path.borrow_opt_mut().as_deref_mut() {
55                *bm = val;
56            } else {
57                return Poll::Ready(());
58            }
59            let _ = this.until_change.as_mut().poll_next(cx);
60        }
61
62        let first = !this.until_change.has_been_polled();
63        if first | this.until_change.as_mut().poll_next(cx).is_ready() {
64            if let Some(data) = this.path.borrow_opt().as_deref() {
65                (this.closure)(data);
66            }
67        }
68        Poll::Pending
69    }
70}