1use futures::{Stream, StreamExt};
8use std::time::SystemTime;
9use std::{
10 pin::Pin,
11 rc::Rc,
12 task::{Context, Poll},
13 time::Duration,
14};
15
16use crate::{
17 extract::{context::ConfigureContext, AlreadyExtracted, FromContext},
18 host::Host,
19 reactor::root::{RootReactor, TickerId},
20 types::Cid,
21};
22
23use crate::host::clock::Clock as HostClock;
24
25pub struct Clock {
27 host: Rc<dyn Host>,
28 host_clock: Rc<dyn HostClock>,
29 root_reactor: Rc<RootReactor>,
30}
31
32impl Clock {
34 pub fn period(self, period: Duration) -> Timer {
35 self.host.set_tick_period(period);
36 Timer { clock: Some(self) }
37 }
38
39 pub fn now(&self) -> SystemTime {
41 self.host_clock.get_current_time()
42 }
43}
44
45impl FromContext<ConfigureContext> for Clock {
46 type Error = AlreadyExtracted<Clock>;
47
48 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
49 context.extract_unique(|| Self {
51 host: context.host.clone(),
52 host_clock: context.clock.clone(),
53 root_reactor: context.root_reactor.clone(),
54 })
55 }
56}
57
58pub struct Timer {
60 clock: Option<Clock>,
61}
62
63impl Timer {
64 fn ticks(&self) -> Ticks<'_> {
65 Ticks {
66 id_and_cid: None,
67 count: 0,
68 ticker: self,
69 }
70 }
71
72 fn clock(&self) -> &Clock {
74 self.clock.as_ref().unwrap()
75 }
76
77 pub fn now(&self) -> SystemTime {
79 self.clock().host_clock.get_current_time()
80 }
81
82 pub async fn next_tick(&self) -> bool {
84 self.ticks().next().await.is_some()
85 }
86
87 pub async fn sleep(&self, interval: Duration) -> bool {
89 let mut now = self.now();
90 let release_time = now + interval;
91 let mut slept = true;
92 while now < release_time && slept {
93 slept = self.next_tick().await;
94 now = self.now();
95 }
96
97 slept
98 }
99
100 fn reset(&self) {
102 if let Some(clock) = self.clock.as_ref() {
103 clock.host.set_tick_period(Duration::from_nanos(0));
104 }
105 }
106
107 pub fn release(self) -> Clock {
109 self.reset();
110 let mut timer = self;
111 timer.clock.take().unwrap()
112 }
113}
114
115impl Drop for Timer {
116 fn drop(&mut self) {
117 self.reset();
118 }
119}
120
121#[doc(hidden)]
122pub struct Ticks<'a> {
123 id_and_cid: Option<(TickerId, Cid)>,
124 count: usize,
125 ticker: &'a Timer,
126}
127
128impl Ticks<'_> {
129 fn detach(&mut self) {
130 if let Some((id, _)) = &self.id_and_cid {
131 self.ticker.clock().root_reactor.remove_ticker(*id);
132 self.id_and_cid = None;
133 }
134 }
135}
136
137impl Drop for Ticks<'_> {
138 fn drop(&mut self) {
139 self.detach();
140 }
141}
142
143impl Stream for Ticks<'_> {
144 type Item = usize;
145
146 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147 let this = &mut *self.as_mut();
148 let reactor = this.ticker.clock().root_reactor.as_ref();
149
150 if reactor.done() {
151 self.detach();
152 return Poll::Ready(None);
153 }
154
155 match &this.id_and_cid {
156 None => {
157 let id = reactor.insert_ticker(cx.waker().clone());
159
160 let cid = reactor.active_cid();
161
162 self.id_and_cid = Some((id, cid));
163
164 reactor.set_paused(cid, true);
165
166 Poll::Pending
167 }
168 Some((id, cid)) => {
169 if reactor.consume_tick(*id, cx.waker()) {
170 let count = this.count;
171 this.count += 1;
172
173 reactor.set_active_cid(*cid);
174 this.ticker.clock().host.set_effective_context(cid.into());
175
176 Poll::Ready(Some(count))
177 } else {
178 Poll::Pending
179 }
180 }
181 }
182 }
183}