x_bow/path_ext/
bind_for_each.rs1use std::{future::Future, task::Poll};
2
3use futures_core::Stream;
4
5use crate::{until_change::UntilChange, Path, PathExt};
6
7pin_project_lite::pin_project! {
8 pub struct BindForEach<'a, P: ?Sized, C, I>
10 {
11 path: &'a P,
12 #[pin]
13 until_change: UntilChange<'a>,
14 closure: C,
15 #[pin]
16 incoming: I
17 }
18}
19
20impl<'a, P: ?Sized, C, I> BindForEach<'a, P, C, I> {
21 pub(super) fn new(path: &'a P, until_change: UntilChange<'a>, closure: C, incoming: I) -> Self {
22 Self {
23 path,
24 until_change,
25 closure,
26 incoming,
27 }
28 }
29}
30
31impl<'a, P: Path + ?Sized, C: FnMut(&P::Out) + Unpin, I: Stream<Item = P::Out>> Future
32 for BindForEach<'a, P, C, I>
33where
34 P::Out: Sized,
35{
36 type Output = ();
37
38 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
39 let mut this = self.project();
40
41 let mut val = None;
42 loop {
43 match this.incoming.as_mut().poll_next(cx) {
44 Poll::Ready(Some(v)) => {
45 val = Some(v);
46 }
47 Poll::Ready(None) => {
48 return Poll::Ready(());
49 }
50 Poll::Pending => break,
51 }
52 }
53 if let Some(val) = val {
54 if let Some(bm) = this.path.borrow_opt_mut().as_deref_mut() {
55 *bm = val;
56 } else {
57 return Poll::Ready(());
58 }
59 let _ = this.until_change.as_mut().poll_next(cx);
60 }
61
62 let first = !this.until_change.has_been_polled();
63 if first | this.until_change.as_mut().poll_next(cx).is_ready() {
64 if let Some(data) = this.path.borrow_opt().as_deref() {
65 (this.closure)(data);
66 }
67 }
68 Poll::Pending
69 }
70}