Skip to main content

cozal/core/schedule_stream/
target_stream.rs

1use super::{StatefulSchedulePoll, StatefulScheduleStream};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9/// Stream for the [`to_realtime`](super::schedule_stream_ext::ScheduleStreamExt::to_realtime) method.
10#[pin_project]
11pub struct TargetStream<St: StatefulScheduleStream> {
12    #[pin]
13    stream: St,
14    target: St::Time,
15    next_target: Option<St::Time>,
16    state: Option<St::State>,
17}
18
19impl<St: StatefulScheduleStream> TargetStream<St> {
20    pub(super) fn new(stream: St, target: St::Time) -> Self {
21        Self {
22            stream,
23            target,
24            next_target: None,
25            state: None,
26        }
27    }
28
29    pub fn set_target(self: Pin<&mut Self>, target: St::Time) {
30        *self.project().target = target;
31    }
32
33    pub fn get_next_target(self: Pin<&mut Self>) -> Option<St::Time> {
34        self.into_ref().get_ref().next_target
35    }
36
37    pub fn into_inner(self) -> St {
38        self.stream
39    }
40}
41
42impl<St: StatefulScheduleStream> Stream for TargetStream<St> {
43    type Item = (St::Time, St::Item, St::State);
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let this = self.project();
47        *this.next_target = None;
48
49        match this.stream.poll(*this.target, cx) {
50            StatefulSchedulePoll::Ready(t, p, s) => Poll::Ready(Some((t, p, s))),
51            StatefulSchedulePoll::Scheduled(t, _) => {
52                *this.next_target = Some(t);
53
54                Poll::Pending
55            }
56            StatefulSchedulePoll::Waiting(_) => Poll::Pending,
57            StatefulSchedulePoll::Pending => Poll::Pending,
58            StatefulSchedulePoll::Done(_) => Poll::Ready(None),
59        }
60    }
61
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        self.stream.size_hint()
64    }
65}