cozal/core/schedule_stream/
target_stream.rs1use super::{StatefulSchedulePoll, StatefulScheduleStream};
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9#[pin_project]
11pub struct TargetStream<St: StatefulScheduleStream> {
12 #[pin]
13 stream: St,
14 target: St::Time,
15 next_target: Option<St::Time>,
16 state: Option<St::State>,
17}
18
19impl<St: StatefulScheduleStream> TargetStream<St> {
20 pub(super) fn new(stream: St, target: St::Time) -> Self {
21 Self {
22 stream,
23 target,
24 next_target: None,
25 state: None,
26 }
27 }
28
29 pub fn set_target(self: Pin<&mut Self>, target: St::Time) {
30 *self.project().target = target;
31 }
32
33 pub fn get_next_target(self: Pin<&mut Self>) -> Option<St::Time> {
34 self.into_ref().get_ref().next_target
35 }
36
37 pub fn into_inner(self) -> St {
38 self.stream
39 }
40}
41
42impl<St: StatefulScheduleStream> Stream for TargetStream<St> {
43 type Item = (St::Time, St::Item, St::State);
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 let this = self.project();
47 *this.next_target = None;
48
49 match this.stream.poll(*this.target, cx) {
50 StatefulSchedulePoll::Ready(t, p, s) => Poll::Ready(Some((t, p, s))),
51 StatefulSchedulePoll::Scheduled(t, _) => {
52 *this.next_target = Some(t);
53
54 Poll::Pending
55 }
56 StatefulSchedulePoll::Waiting(_) => Poll::Pending,
57 StatefulSchedulePoll::Pending => Poll::Pending,
58 StatefulSchedulePoll::Done(_) => Poll::Ready(None),
59 }
60 }
61
62 fn size_hint(&self) -> (usize, Option<usize>) {
63 self.stream.size_hint()
64 }
65}