x_bow/path_ext/
signal_stream.rs1use std::{cell::Ref, pin::Pin, task::Poll};
2
3use futures_core::Stream;
4
5use crate::{until_change::UntilChange, Path, PathExt};
6
7pub struct SignalStream<'a, P: Path + ?Sized> {
9 path: &'a P,
10 until_change: UntilChange<'a>,
11 fire_immediately: bool,
12}
13
14impl<'a, P: Path + ?Sized> SignalStream<'a, P> {
15 pub(super) fn new(path: &'a P, until_change: UntilChange<'a>, fire_immediately: bool) -> Self {
16 Self {
17 path,
18 until_change,
19 fire_immediately,
20 }
21 }
22}
23
24impl<'a, P: Path + ?Sized> Stream for SignalStream<'a, P> {
25 type Item = Ref<'a, P::Out>;
26
27 fn poll_next(
28 self: std::pin::Pin<&mut Self>,
29 cx: &mut std::task::Context<'_>,
30 ) -> Poll<Option<Self::Item>> {
31 let this = self.get_mut();
32 let first = !this.until_change.has_been_polled();
33 if (first && this.fire_immediately)
34 | Pin::new(&mut this.until_change).poll_next(cx).is_ready()
35 {
36 Poll::Ready(this.path.borrow_opt())
37 } else {
38 Poll::Pending
39 }
40 }
41}