Skip to main content

exocore_apps_sdk/
time.rs

1use std::{collections::BinaryHeap, sync::Mutex, time::Duration};
2
3use chrono::{DateTime, TimeZone, Utc};
4use futures::{channel::oneshot, Future, FutureExt};
5
6use crate::binding::__exocore_host_now;
7
8/// Unix timestamp in nanoseconds.
9#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
10pub struct Timestamp(pub u64);
11
12impl Timestamp {
13    pub fn to_chrono_datetime(self) -> DateTime<Utc> {
14        self.into()
15    }
16}
17
18impl std::ops::Add<Duration> for Timestamp {
19    type Output = Timestamp;
20
21    fn add(self, rhs: Duration) -> Self::Output {
22        Timestamp(self.0 + rhs.as_nanos() as u64)
23    }
24}
25
26impl std::ops::Sub<Timestamp> for Timestamp {
27    type Output = Option<Duration>;
28
29    fn sub(self, rhs: Timestamp) -> Self::Output {
30        self.0.checked_sub(rhs.0).map(Duration::from_nanos)
31    }
32}
33
34impl From<u64> for Timestamp {
35    fn from(v: u64) -> Self {
36        Timestamp(v)
37    }
38}
39
40impl From<Timestamp> for u64 {
41    fn from(ts: Timestamp) -> Self {
42        ts.0
43    }
44}
45
46impl From<Timestamp> for DateTime<Utc> {
47    fn from(ts: Timestamp) -> Self {
48        Utc.timestamp_nanos(ts.0 as i64)
49    }
50}
51
52impl From<DateTime<Utc>> for Timestamp {
53    fn from(dt: DateTime<Utc>) -> Self {
54        Timestamp(dt.timestamp_nanos_opt().unwrap_or_default() as u64)
55    }
56}
57
58/// Returns the current unix timestamp.
59pub fn now() -> Timestamp {
60    unsafe { Timestamp(__exocore_host_now()) }
61}
62
63/// Returns a future that will sleep for the given duration.
64pub async fn sleep(duration: Duration) {
65    let time = now() + duration;
66    TIMERS.push(time).await;
67}
68
69lazy_static! {
70    static ref TIMERS: Timers = Timers::new();
71}
72
73/// Timer containers polled at interval by the application runtime. The
74/// `poll_timers` method is called when the application is being polled. Once
75/// polled, the `next_timer_time` returns an optional timestamp at which the
76/// next poll of the timers is expected.
77///
78/// Uses a binary heap to sort timers in trigger order so that the head always
79/// represents the next timer to be triggered.
80struct Timers {
81    timers: Mutex<BinaryHeap<std::cmp::Reverse<Timer>>>,
82}
83
84/// Polls timers and trigger those that have expired.
85pub(crate) fn poll_timers() {
86    TIMERS.poll();
87}
88
89/// Returns the timestamp of the next timer that needs to be triggered. Returns
90/// none if no timers are scheduled.
91pub(crate) fn next_timer_time() -> Option<Timestamp> {
92    TIMERS.next_timer()
93}
94
95impl Timers {
96    fn new() -> Timers {
97        Timers {
98            timers: Mutex::new(BinaryHeap::new()),
99        }
100    }
101
102    fn poll(&self) {
103        let mut timers = self.timers.lock().unwrap();
104        let now = now();
105
106        loop {
107            if let Some(peek) = timers.peek() {
108                if peek.0.time > now {
109                    return;
110                }
111            } else {
112                return;
113            }
114
115            let timer = timers.pop().unwrap();
116            let _ = timer.0.sender.send(());
117        }
118    }
119
120    fn next_timer(&self) -> Option<Timestamp> {
121        let timers = self.timers.lock().unwrap();
122        timers.peek().map(|t| t.0.time)
123    }
124
125    fn push(&self, time: Timestamp) -> impl Future<Output = ()> {
126        let (sender, receiver) = oneshot::channel();
127
128        let mut timers = self.timers.lock().unwrap();
129        timers.push(std::cmp::Reverse(Timer { time, sender }));
130
131        receiver.map(|_| ())
132    }
133}
134
135struct Timer {
136    time: Timestamp,
137    sender: oneshot::Sender<()>,
138}
139
140/// Not really Eq since 2 timers could have same trigger time. We only require
141/// this for ordering.
142impl PartialEq for Timer {
143    fn eq(&self, other: &Self) -> bool {
144        self.time.eq(&other.time)
145    }
146}
147
148impl Eq for Timer {}
149
150impl PartialOrd for Timer {
151    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152        Some(self.time.cmp(&other.time))
153    }
154}
155
156impl Ord for Timer {
157    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
158        self.time.cmp(&other.time)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_timestamp() {
168        let now1 = now();
169        std::thread::sleep(Duration::from_millis(10));
170        let now2 = now();
171        let diff = now2 - now1;
172        assert!(diff.unwrap().as_millis() >= 10);
173
174        let now3 = now2 + Duration::from_secs(1);
175        let diff = now3 - now2;
176        assert_eq!(diff.unwrap().as_secs(), 1);
177    }
178
179    #[tokio::test]
180    async fn test_timer() {
181        let (sender_before, receiver_before) = oneshot::channel();
182        let (sender_after, mut receiver_after) = oneshot::channel();
183        tokio::spawn(async move {
184            sender_before.send(now()).unwrap();
185            sleep(Duration::from_millis(10)).await;
186            sender_after.send(now()).unwrap();
187        });
188
189        let before_time = receiver_before.await.unwrap();
190
191        // shouldn't have received yet since timer hasn't been triggered
192        assert!(receiver_after.try_recv().unwrap().is_none());
193
194        // wait for timer to be pushed, check make sure next poll reflects its value
195        let mut next = next_timer_time();
196        loop {
197            if next.is_some() {
198                break;
199            }
200
201            tokio::time::sleep(Duration::from_micros(100)).await;
202            next = next_timer_time();
203        }
204        let next_dur = (next.unwrap() - before_time).unwrap();
205        assert!(next_dur.as_millis() > 5);
206
207        // wait for timer to expire, and trigger it
208        tokio::time::sleep(Duration::from_millis(10)).await;
209        poll_timers();
210
211        // timer should have been triggered and 10ms or more should have elapsed
212        let after_time = receiver_after.await.unwrap();
213        let time_diff = (after_time - before_time).unwrap();
214        assert!(time_diff.as_millis() >= 10);
215    }
216}