1use std::{collections::BinaryHeap, sync::Mutex, time::Duration};
2
3use chrono::{DateTime, TimeZone, Utc};
4use futures::{channel::oneshot, Future, FutureExt};
5
6use crate::binding::__exocore_host_now;
7
8#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
10pub struct Timestamp(pub u64);
11
12impl Timestamp {
13 pub fn to_chrono_datetime(self) -> DateTime<Utc> {
14 self.into()
15 }
16}
17
18impl std::ops::Add<Duration> for Timestamp {
19 type Output = Timestamp;
20
21 fn add(self, rhs: Duration) -> Self::Output {
22 Timestamp(self.0 + rhs.as_nanos() as u64)
23 }
24}
25
26impl std::ops::Sub<Timestamp> for Timestamp {
27 type Output = Option<Duration>;
28
29 fn sub(self, rhs: Timestamp) -> Self::Output {
30 self.0.checked_sub(rhs.0).map(Duration::from_nanos)
31 }
32}
33
34impl From<u64> for Timestamp {
35 fn from(v: u64) -> Self {
36 Timestamp(v)
37 }
38}
39
40impl From<Timestamp> for u64 {
41 fn from(ts: Timestamp) -> Self {
42 ts.0
43 }
44}
45
46impl From<Timestamp> for DateTime<Utc> {
47 fn from(ts: Timestamp) -> Self {
48 Utc.timestamp_nanos(ts.0 as i64)
49 }
50}
51
52impl From<DateTime<Utc>> for Timestamp {
53 fn from(dt: DateTime<Utc>) -> Self {
54 Timestamp(dt.timestamp_nanos_opt().unwrap_or_default() as u64)
55 }
56}
57
58pub fn now() -> Timestamp {
60 unsafe { Timestamp(__exocore_host_now()) }
61}
62
63pub async fn sleep(duration: Duration) {
65 let time = now() + duration;
66 TIMERS.push(time).await;
67}
68
69lazy_static! {
70 static ref TIMERS: Timers = Timers::new();
71}
72
73struct Timers {
81 timers: Mutex<BinaryHeap<std::cmp::Reverse<Timer>>>,
82}
83
84pub(crate) fn poll_timers() {
86 TIMERS.poll();
87}
88
89pub(crate) fn next_timer_time() -> Option<Timestamp> {
92 TIMERS.next_timer()
93}
94
95impl Timers {
96 fn new() -> Timers {
97 Timers {
98 timers: Mutex::new(BinaryHeap::new()),
99 }
100 }
101
102 fn poll(&self) {
103 let mut timers = self.timers.lock().unwrap();
104 let now = now();
105
106 loop {
107 if let Some(peek) = timers.peek() {
108 if peek.0.time > now {
109 return;
110 }
111 } else {
112 return;
113 }
114
115 let timer = timers.pop().unwrap();
116 let _ = timer.0.sender.send(());
117 }
118 }
119
120 fn next_timer(&self) -> Option<Timestamp> {
121 let timers = self.timers.lock().unwrap();
122 timers.peek().map(|t| t.0.time)
123 }
124
125 fn push(&self, time: Timestamp) -> impl Future<Output = ()> {
126 let (sender, receiver) = oneshot::channel();
127
128 let mut timers = self.timers.lock().unwrap();
129 timers.push(std::cmp::Reverse(Timer { time, sender }));
130
131 receiver.map(|_| ())
132 }
133}
134
135struct Timer {
136 time: Timestamp,
137 sender: oneshot::Sender<()>,
138}
139
140impl PartialEq for Timer {
143 fn eq(&self, other: &Self) -> bool {
144 self.time.eq(&other.time)
145 }
146}
147
148impl Eq for Timer {}
149
150impl PartialOrd for Timer {
151 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152 Some(self.time.cmp(&other.time))
153 }
154}
155
156impl Ord for Timer {
157 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
158 self.time.cmp(&other.time)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn test_timestamp() {
168 let now1 = now();
169 std::thread::sleep(Duration::from_millis(10));
170 let now2 = now();
171 let diff = now2 - now1;
172 assert!(diff.unwrap().as_millis() >= 10);
173
174 let now3 = now2 + Duration::from_secs(1);
175 let diff = now3 - now2;
176 assert_eq!(diff.unwrap().as_secs(), 1);
177 }
178
179 #[tokio::test]
180 async fn test_timer() {
181 let (sender_before, receiver_before) = oneshot::channel();
182 let (sender_after, mut receiver_after) = oneshot::channel();
183 tokio::spawn(async move {
184 sender_before.send(now()).unwrap();
185 sleep(Duration::from_millis(10)).await;
186 sender_after.send(now()).unwrap();
187 });
188
189 let before_time = receiver_before.await.unwrap();
190
191 assert!(receiver_after.try_recv().unwrap().is_none());
193
194 let mut next = next_timer_time();
196 loop {
197 if next.is_some() {
198 break;
199 }
200
201 tokio::time::sleep(Duration::from_micros(100)).await;
202 next = next_timer_time();
203 }
204 let next_dur = (next.unwrap() - before_time).unwrap();
205 assert!(next_dur.as_millis() > 5);
206
207 tokio::time::sleep(Duration::from_millis(10)).await;
209 poll_timers();
210
211 let after_time = receiver_after.await.unwrap();
213 let time_diff = (after_time - before_time).unwrap();
214 assert!(time_diff.as_millis() >= 10);
215 }
216}