1use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10 pin::Pin,
11 rc::Rc,
12 task::{Context, Poll},
13 time::Duration,
14};
15
16use crate::{
17 extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18 host::Host,
19 reactor::root::{RootReactor, TickerId},
20 types::Cid,
21};
22
23pub struct Clock {
25 host: Rc<dyn Host>,
26 root_reactor: Rc<RootReactor>,
27}
28
29impl Clock {
31 pub fn period(self, period: Duration) -> Timer {
32 self.host.set_tick_period(period);
33 Timer { clock: Some(self) }
34 }
35}
36
37impl FromContext<ConfigureContext> for Clock {
38 type Error = AlreadyExtracted<Clock>;
39
40 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
41 context.extract_unique(|| Self {
43 host: context.host.clone(),
44 root_reactor: context.root_reactor.clone(),
45 })
46 }
47}
48
49pub struct Timer {
51 clock: Option<Clock>,
52}
53
54impl Timer {
55 fn ticks(&self) -> Ticks<'_> {
56 Ticks {
57 id_and_cid: None,
58 count: 0,
59 ticker: self,
60 }
61 }
62
63 fn clock(&self) -> &Clock {
65 self.clock.as_ref().unwrap()
66 }
67
68 pub async fn next_tick(&self) -> bool {
70 self.ticks().next().await.is_some()
71 }
72
73 pub async fn sleep(&self, interval: Duration) -> bool {
75 let mut now = SystemTime::now();
76 let release_time = now + interval;
77 let mut slept = true;
78 while now < release_time && slept {
79 slept = self.next_tick().await;
80 now = SystemTime::now();
81 }
82
83 slept
84 }
85
86 fn reset(&self) {
88 if let Some(clock) = self.clock.as_ref() {
89 clock.host.set_tick_period(Duration::from_nanos(0));
90 }
91 }
92
93 pub fn release(self) -> Clock {
95 self.reset();
96 let mut timer = self;
97 timer.clock.take().unwrap()
98 }
99}
100
101impl Drop for Timer {
102 fn drop(&mut self) {
103 self.reset();
104 }
105}
106
107#[doc(hidden)]
108pub struct Ticks<'a> {
109 id_and_cid: Option<(TickerId, Cid)>,
110 count: usize,
111 ticker: &'a Timer,
112}
113
114impl Ticks<'_> {
115 fn detach(&mut self) {
116 if let Some((id, _)) = &self.id_and_cid {
117 self.ticker.clock().root_reactor.remove_ticker(*id);
118 self.id_and_cid = None;
119 }
120 }
121}
122
123impl Drop for Ticks<'_> {
124 fn drop(&mut self) {
125 self.detach();
126 }
127}
128
129impl Stream for Ticks<'_> {
130 type Item = usize;
131
132 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let this = &mut *self.as_mut();
134 let reactor = this.ticker.clock().root_reactor.as_ref();
135
136 if reactor.done() {
137 self.detach();
138 return Poll::Ready(None);
139 }
140
141 match &this.id_and_cid {
142 None => {
143 let id = reactor.insert_ticker(cx.waker().clone());
145
146 let cid = reactor.active_cid();
147
148 self.id_and_cid = Some((id, cid));
149
150 reactor.set_paused(cid, true);
151
152 Poll::Pending
153 }
154 Some((id, cid)) => {
155 if reactor.consume_tick(*id, cx.waker()) {
156 let count = this.count;
157 this.count += 1;
158
159 reactor.set_active_cid(*cid);
160 this.ticker.clock().host.set_effective_context(cid.into());
161
162 Poll::Ready(Some(count))
163 } else {
164 Poll::Pending
165 }
166 }
167 }
168 }
169}