1use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10 pin::Pin,
11 rc::Rc,
12 task::{Context, Poll},
13 time::Duration,
14};
15
16use crate::{
17 extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18 host::Host,
19 reactor::root::{RootReactor, TickerId},
20 types::Cid,
21};
22
23use crate::host::clock::Clock as HostClock;
24
25pub struct Clock {
27 host: Rc<dyn Host>,
28 host_clock: Rc<dyn HostClock>,
29 root_reactor: Rc<RootReactor>,
30}
31
32impl Clock {
34 pub fn period(self, period: Duration) -> Timer {
35 self.host.set_tick_period(period);
36 Timer { clock: Some(self) }
37 }
38}
39
40impl FromContext<ConfigureContext> for Clock {
41 type Error = AlreadyExtracted<Clock>;
42
43 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
44 context.extract_unique(|| Self {
46 host: context.host.clone(),
47 host_clock: context.clock.clone(),
48 root_reactor: context.root_reactor.clone(),
49 })
50 }
51}
52
53pub struct Timer {
55 clock: Option<Clock>,
56}
57
58impl Timer {
59 fn ticks(&self) -> Ticks<'_> {
60 Ticks {
61 id_and_cid: None,
62 count: 0,
63 ticker: self,
64 }
65 }
66
67 fn clock(&self) -> &Clock {
69 self.clock.as_ref().unwrap()
70 }
71
72 fn now(&self) -> SystemTime {
73 self.clock().host_clock.get_current_time()
74 }
75
76 pub async fn next_tick(&self) -> bool {
78 self.ticks().next().await.is_some()
79 }
80
81 pub async fn sleep(&self, interval: Duration) -> bool {
83 let mut now = self.now();
84 let release_time = now + interval;
85 let mut slept = true;
86 while now < release_time && slept {
87 slept = self.next_tick().await;
88 now = self.now();
89 }
90
91 slept
92 }
93
94 fn reset(&self) {
96 if let Some(clock) = self.clock.as_ref() {
97 clock.host.set_tick_period(Duration::from_nanos(0));
98 }
99 }
100
101 pub fn release(self) -> Clock {
103 self.reset();
104 let mut timer = self;
105 timer.clock.take().unwrap()
106 }
107}
108
109impl Drop for Timer {
110 fn drop(&mut self) {
111 self.reset();
112 }
113}
114
115#[doc(hidden)]
116pub struct Ticks<'a> {
117 id_and_cid: Option<(TickerId, Cid)>,
118 count: usize,
119 ticker: &'a Timer,
120}
121
122impl Ticks<'_> {
123 fn detach(&mut self) {
124 if let Some((id, _)) = &self.id_and_cid {
125 self.ticker.clock().root_reactor.remove_ticker(*id);
126 self.id_and_cid = None;
127 }
128 }
129}
130
131impl Drop for Ticks<'_> {
132 fn drop(&mut self) {
133 self.detach();
134 }
135}
136
137impl Stream for Ticks<'_> {
138 type Item = usize;
139
140 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 let this = &mut *self.as_mut();
142 let reactor = this.ticker.clock().root_reactor.as_ref();
143
144 if reactor.done() {
145 self.detach();
146 return Poll::Ready(None);
147 }
148
149 match &this.id_and_cid {
150 None => {
151 let id = reactor.insert_ticker(cx.waker().clone());
153
154 let cid = reactor.active_cid();
155
156 self.id_and_cid = Some((id, cid));
157
158 reactor.set_paused(cid, true);
159
160 Poll::Pending
161 }
162 Some((id, cid)) => {
163 if reactor.consume_tick(*id, cx.waker()) {
164 let count = this.count;
165 this.count += 1;
166
167 reactor.set_active_cid(*cid);
168 this.ticker.clock().host.set_effective_context(cid.into());
169
170 Poll::Ready(Some(count))
171 } else {
172 Poll::Pending
173 }
174 }
175 }
176 }
177}