x_bow/path_ext/
signal_stream.rs

1use std::{cell::Ref, pin::Pin, task::Poll};
2
3use futures_core::Stream;
4
5use crate::{until_change::UntilChange, Path, PathExt};
6
7/// A [Stream]. See [signal_stream][crate::PathExt::signal_stream].
8pub struct SignalStream<'a, P: Path + ?Sized> {
9    path: &'a P,
10    until_change: UntilChange<'a>,
11    fire_immediately: bool,
12}
13
14impl<'a, P: Path + ?Sized> SignalStream<'a, P> {
15    pub(super) fn new(path: &'a P, until_change: UntilChange<'a>, fire_immediately: bool) -> Self {
16        Self {
17            path,
18            until_change,
19            fire_immediately,
20        }
21    }
22}
23
24impl<'a, P: Path + ?Sized> Stream for SignalStream<'a, P> {
25    type Item = Ref<'a, P::Out>;
26
27    fn poll_next(
28        self: std::pin::Pin<&mut Self>,
29        cx: &mut std::task::Context<'_>,
30    ) -> Poll<Option<Self::Item>> {
31        let this = self.get_mut();
32        let first = !this.until_change.has_been_polled();
33        if (first && this.fire_immediately)
34            | Pin::new(&mut this.until_change).poll_next(cx).is_ready()
35        {
36            Poll::Ready(this.path.borrow_opt())
37        } else {
38            Poll::Pending
39        }
40    }
41}