reactor/
timeouts.rs

1// Library for concurrent I/O resource management using reactor pattern.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Written in 2021-2023 by
6//     Dr. Maxim Orlovsky <orlovsky@ubideco.org>
7//     Alexis Sellier <alexis@cloudhead.io>
8//
9// Copyright 2022-2023 UBIDECO Institute, Switzerland
10// Copyright 2021 Alexis Sellier <alexis@cloudhead.io>
11//
12// Licensed under the Apache License, Version 2.0 (the "License");
13// you may not use this file except in compliance with the License.
14// You may obtain a copy of the License at
15//
16//     http://www.apache.org/licenses/LICENSE-2.0
17//
18// Unless required by applicable law or agreed to in writing, software
19// distributed under the License is distributed on an "AS IS" BASIS,
20// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21// See the License for the specific language governing permissions and
22// limitations under the License.
23
24use std::collections::BTreeSet;
25use std::ops::{Add, AddAssign, Sub, SubAssign};
26use std::time::{Duration, SystemTime};
27
28/// UNIX timestamp which helps working with absolute time.
29#[derive(Wrapper, WrapperMut, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug, From)]
30#[wrapper(Display, LowerHex, UpperHex, Octal, Add, Sub)]
31#[wrapper_mut(AddAssign, SubAssign)]
32pub struct Timestamp(u128);
33
34impl Timestamp {
35    /// Creates timestamp matching the current moment.
36    pub fn now() -> Self {
37        let duration =
38            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).expect("system time");
39        Self(duration.as_millis())
40    }
41
42    /// Constructs timestamp from a given number of seconds since [`SystemTime::UNIX_EPOCH`].
43    pub fn from_secs(secs: u64) -> Timestamp { Timestamp(secs as u128 * 1000) }
44
45    /// Constructs timestamp from a given number of milliseconds since [`SystemTime::UNIX_EPOCH`].
46    pub fn from_millis(millis: u128) -> Timestamp { Timestamp(millis) }
47
48    #[deprecated(note = "use Timestamp::as_secs")]
49    /// Returns number of seconds since UNIX epoch.
50    pub fn into_secs(self) -> u64 { self.as_secs() }
51
52    /// Returns number of seconds since UNIX epoch.
53    pub fn as_secs(&self) -> u64 { (self.0 / 1000) as u64 }
54
55    /// Returns number of milliseconds since UNIX epoch.
56    pub fn as_millis(&self) -> u64 {
57        // Nb. We have enough space in a `u64` to store a unix timestamp in millisecond
58        // precision for millions of years.
59        self.0 as u64
60    }
61}
62
63impl Add<Duration> for Timestamp {
64    type Output = Timestamp;
65
66    fn add(self, rhs: Duration) -> Self::Output { Timestamp(self.0 + rhs.as_millis()) }
67}
68
69impl Sub<Duration> for Timestamp {
70    type Output = Timestamp;
71
72    fn sub(self, rhs: Duration) -> Self::Output { Timestamp(self.0 - rhs.as_millis()) }
73}
74
75impl AddAssign<Duration> for Timestamp {
76    fn add_assign(&mut self, rhs: Duration) { self.0 += rhs.as_millis() }
77}
78
79impl SubAssign<Duration> for Timestamp {
80    fn sub_assign(&mut self, rhs: Duration) { self.0 -= rhs.as_millis() }
81}
82
83/// Manages timers and triggers timeouts.
84#[derive(Debug, Default)]
85pub struct Timer {
86    /// Timeouts are durations since the UNIX epoch.
87    timeouts: BTreeSet<Timestamp>,
88}
89
90impl Timer {
91    /// Create a new timer containing no timeouts.
92    pub fn new() -> Self { Self { timeouts: bset! {} } }
93
94    /// Return the number of timeouts being tracked.
95    pub fn count(&self) -> usize { self.timeouts.len() }
96
97    /// Check whether there are timeouts being tracked.
98    pub fn has_timeouts(&self) -> bool { !self.timeouts.is_empty() }
99
100    /// Register a new timeout relative to a certain point in time.
101    pub fn set_timeout(&mut self, timeout: Duration, after: Timestamp) {
102        let time = after + Timestamp(timeout.as_millis());
103        self.timeouts.insert(time);
104    }
105
106    /// Get the first timeout expiring right at or after certain moment of time.
107    /// Returns `None` if there are no timeouts.
108    ///
109    /// ```
110    /// # use std::time::{Duration};
111    /// use reactor::{Timer, Timestamp};
112    ///
113    /// let mut tm = Timer::new();
114    ///
115    /// let now = Timestamp::now();
116    /// tm.set_timeout(Duration::from_secs(16), now);
117    /// tm.set_timeout(Duration::from_secs(8), now);
118    /// tm.set_timeout(Duration::from_secs(64), now);
119    ///
120    /// let mut now = Timestamp::now();
121    /// // We need to wait 8 secs to trigger the next timeout (1).
122    /// assert!(tm.next_expiring_from(now) <= Some(Duration::from_secs(8)));
123    ///
124    /// // ... sleep for a sec ...
125    /// now += Duration::from_secs(1);
126    ///
127    /// // Now we don't need to wait as long!
128    /// assert!(tm.next_expiring_from(now).unwrap() <= Duration::from_secs(7));
129    /// ```
130    pub fn next_expiring_from(&self, time: impl Into<Timestamp>) -> Option<Duration> {
131        let time = time.into();
132        let last = *self.timeouts.first()?;
133        Some(if last >= time {
134            Duration::from_millis(last.as_millis() - time.as_millis())
135        } else {
136            Duration::from_secs(0)
137        })
138    }
139
140    /// Removes timeouts which expire by a certain moment of time (inclusive),
141    /// returning total number of timeouts which were removed.
142    pub fn remove_expired_by(&mut self, time: Timestamp) -> usize {
143        // Since `split_off` returns everything *after* the given key, including the key,
144        // if a timer is set for exactly the given time, it would remain in the "after"
145        // set of unexpired keys. This isn't what we want, therefore we add `1` to the
146        // given time value so that it is put in the "before" set that gets expired
147        // and overwritten.
148        let at = time + Timestamp::from_millis(1);
149        let unexpired = self.timeouts.split_off(&at);
150        let fired = self.timeouts.len();
151        self.timeouts = unexpired;
152        fired
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn test_wake_exact() {
162        let mut tm = Timer::new();
163
164        let now = Timestamp::now();
165        tm.set_timeout(Duration::from_secs(8), now);
166        tm.set_timeout(Duration::from_secs(9), now);
167        tm.set_timeout(Duration::from_secs(10), now);
168
169        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
170        assert_eq!(tm.count(), 1);
171    }
172
173    #[test]
174    fn test_wake() {
175        let mut tm = Timer::new();
176
177        let now = Timestamp::now();
178        tm.set_timeout(Duration::from_secs(8), now);
179        tm.set_timeout(Duration::from_secs(16), now);
180        tm.set_timeout(Duration::from_secs(64), now);
181        tm.set_timeout(Duration::from_secs(72), now);
182
183        assert_eq!(tm.remove_expired_by(now), 0);
184        assert_eq!(tm.count(), 4);
185
186        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
187        assert_eq!(tm.count(), 3, "one timeout has expired");
188
189        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
190        assert_eq!(tm.count(), 1, "another two timeouts have expired");
191
192        assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
193        assert!(!tm.has_timeouts(), "all timeouts have expired");
194    }
195
196    #[test]
197    fn test_next() {
198        let mut tm = Timer::new();
199
200        let mut now = Timestamp::now();
201        tm.set_timeout(Duration::from_secs(3), now);
202        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));
203
204        now += Duration::from_secs(2);
205        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));
206
207        now += Duration::from_secs(1);
208        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
209
210        now += Duration::from_secs(1);
211        assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));
212
213        assert_eq!(tm.remove_expired_by(now), 1);
214        assert_eq!(tm.count(), 0);
215        assert_eq!(tm.next_expiring_from(now), None);
216    }
217}