atomic_timer/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4    sync::atomic::{AtomicBool, AtomicI64, Ordering},
5    time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10/// Atomic timer
11pub struct AtomicTimer {
12    duration: AtomicI64,
13    start: AtomicI64,
14    permit_handle_expiration: AtomicBool,
15    monotonic_fn: fn() -> i64,
16}
17
18fn monotonic_ns() -> i64 {
19    i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
20}
21
22impl AtomicTimer {
23    #[allow(dead_code)]
24    fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
25        AtomicTimer {
26            duration: AtomicI64::new(duration),
27            start: AtomicI64::new(monotonic_fn() - elapsed),
28            monotonic_fn,
29            permit_handle_expiration: AtomicBool::new(phe),
30        }
31    }
32    /// Create a new atomic timer
33    ///
34    /// # Panics
35    ///
36    /// Panics if the duration is too large (in nanos greater than `i64::MAX`)
37    pub fn new(duration: Duration) -> Self {
38        Self::construct(
39            duration
40                .as_nanos()
41                .try_into()
42                .expect("Duration is too large"),
43            0,
44            true,
45            monotonic_ns,
46        )
47    }
48    /// Create a new atomic timer expired
49    ///
50    /// # Panics
51    ///
52    /// Panics if the duration is too large (in nanos greater than `i64::MAX`)
53    pub fn new_expired(duration: Duration) -> Self {
54        Self::construct(
55            duration
56                .as_nanos()
57                .try_into()
58                .expect("Duration is too large"),
59            duration
60                .as_nanos()
61                .try_into()
62                .expect("Duration is too large"),
63            true,
64            monotonic_ns,
65        )
66    }
67    /// Get the duration of the timer
68    ///
69    /// # Panics
70    ///
71    /// Panics if the duration is negative
72    #[inline]
73    pub fn duration(&self) -> Duration {
74        Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
75    }
76    /// Similar to reset if expired but does not reset the timer. As the timer is checked for
77    /// expiration, a tiny datarace may occur despite it passes the tests well. As soon as the
78    /// timer is reset with any method, the flag is reset as well. If used in multi-threaded
79    /// environment, "true" is returned to a single worker only. After, the flag is reset.
80    #[inline]
81    pub fn permit_handle_expiration(&self) -> bool {
82        self.permit_handle_expiration
83            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
84                (v && self.expired()).then_some(false)
85            })
86            .is_ok()
87    }
88    /// Change the duration of the timer
89    ///
90    /// # Panics
91    ///
92    /// Panics if the duration in nanos is larger than `i64::MAX`
93    pub fn set_duration(&self, duration: Duration) {
94        self.duration
95            .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
96    }
97    /// Reset the timer
98    #[inline]
99    pub fn reset(&self) {
100        self.permit_handle_expiration.store(true, Ordering::SeqCst);
101        self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
102    }
103    /// Focibly expire the timer
104    #[inline]
105    pub fn expire_now(&self) {
106        self.start.store(
107            (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
108            Ordering::SeqCst,
109        );
110    }
111    /// Reset the timer if it has expired, returns true if reset. If used in multi-threaded
112    /// environment, "true" is returned to a single worker only.
113    #[inline]
114    pub fn reset_if_expired(&self) -> bool {
115        let now = (self.monotonic_fn)();
116        self.start
117            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
118                self.permit_handle_expiration.store(true, Ordering::SeqCst);
119                (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
120            })
121            .is_ok()
122    }
123    /// Get the elapsed time
124    ///
125    /// In case if negative elapsed, returns `Duration::ZERO`
126    #[inline]
127    pub fn elapsed(&self) -> Duration {
128        Duration::from_nanos(
129            (self.monotonic_fn)()
130                .saturating_sub(self.start.load(Ordering::SeqCst))
131                .try_into()
132                .unwrap_or_default(),
133        )
134    }
135    /// Get the remaining time
136    ///
137    /// In case if negative remaining, returns `Duration::ZERO`
138    #[inline]
139    pub fn remaining(&self) -> Duration {
140        let elapsed = self.elapsed_ns();
141        if elapsed >= self.duration.load(Ordering::SeqCst) {
142            Duration::ZERO
143        } else {
144            Duration::from_nanos(
145                (self.duration.load(Ordering::SeqCst) - elapsed)
146                    .try_into()
147                    .unwrap_or_default(),
148            )
149        }
150    }
151    #[inline]
152    fn elapsed_ns(&self) -> i64 {
153        (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
154    }
155    /// Check if the timer has expired
156    #[inline]
157    pub fn expired(&self) -> bool {
158        self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
159    }
160}
161
162#[cfg(feature = "serde")]
163mod ser {
164    use super::{monotonic_ns, AtomicTimer};
165    use serde::{Deserialize, Deserializer, Serialize, Serializer};
166    use std::sync::atomic::Ordering;
167
168    #[derive(Serialize, Deserialize)]
169    struct SerializedTimer {
170        duration: i64,
171        elapsed: i64,
172        phe: bool,
173    }
174
175    impl Serialize for AtomicTimer {
176        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177        where
178            S: Serializer,
179        {
180            let s = SerializedTimer {
181                duration: self.duration.load(Ordering::SeqCst),
182                elapsed: self.elapsed_ns(),
183                phe: self.permit_handle_expiration.load(Ordering::SeqCst),
184            };
185            s.serialize(serializer)
186        }
187    }
188
189    impl<'de> Deserialize<'de> for AtomicTimer {
190        fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
191        where
192            D: Deserializer<'de>,
193        {
194            let s = SerializedTimer::deserialize(deserializer)?;
195            Ok(AtomicTimer::construct(
196                s.duration,
197                s.elapsed,
198                s.phe,
199                monotonic_ns,
200            ))
201        }
202    }
203}
204
205#[cfg(test)]
206mod test {
207    use super::AtomicTimer;
208    use std::{
209        sync::{Arc, Barrier},
210        thread,
211        time::Duration,
212    };
213
214    pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
215        let diff = window / 2;
216        let min = b - diff;
217        let max = b + diff;
218        a >= min && a <= max
219    }
220
221    #[test]
222    fn test_reset() {
223        let timer = AtomicTimer::new(Duration::from_secs(5));
224        thread::sleep(Duration::from_secs(1));
225        timer.reset();
226        assert!(timer.elapsed() < Duration::from_millis(100));
227    }
228
229    #[test]
230    fn test_expire_now() {
231        let timer = AtomicTimer::new(Duration::from_secs(5));
232        assert!(!timer.expired());
233        assert!(in_time_window(
234            timer.remaining(),
235            Duration::from_secs(5),
236            Duration::from_millis(100)
237        ));
238        timer.expire_now();
239        assert!(timer.expired());
240    }
241
242    #[test]
243    fn test_reset_if_expired() {
244        let timer = AtomicTimer::new(Duration::from_secs(1));
245        assert!(!timer.reset_if_expired());
246        thread::sleep(Duration::from_millis(1100));
247        assert!(timer.expired());
248        assert!(timer.reset_if_expired());
249    }
250
251    #[test]
252    fn test_reset_if_expired_no_datarace() {
253        let n = 1000;
254        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
255        thread::sleep(Duration::from_millis(200));
256        assert!(timer.expired());
257        let barrier = Arc::new(Barrier::new(n));
258        let (tx, rx) = std::sync::mpsc::channel::<bool>();
259        let mut result = Vec::with_capacity(n);
260        for _ in 0..n {
261            let timer = timer.clone();
262            let barrier = barrier.clone();
263            let tx = tx.clone();
264            thread::spawn(move || {
265                barrier.wait();
266                tx.send(timer.reset_if_expired()).unwrap();
267            });
268        }
269        drop(tx);
270        while let Ok(v) = rx.recv() {
271            result.push(v);
272        }
273        assert_eq!(result.len(), n);
274        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
275    }
276
277    #[test]
278    fn test_permit_handle_expiration() {
279        let timer = AtomicTimer::new(Duration::from_secs(1));
280        assert!(!timer.permit_handle_expiration());
281        thread::sleep(Duration::from_millis(1100));
282        assert!(timer.expired());
283        assert!(timer.permit_handle_expiration());
284        assert!(!timer.permit_handle_expiration());
285        timer.reset();
286        thread::sleep(Duration::from_millis(1100));
287        timer.reset();
288        assert!(!timer.permit_handle_expiration());
289    }
290
291    #[test]
292    fn test_permit_handle_expiration_no_datarace() {
293        let n = 1000;
294        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
295        thread::sleep(Duration::from_millis(200));
296        assert!(timer.expired());
297        let barrier = Arc::new(Barrier::new(n));
298        let (tx, rx) = std::sync::mpsc::channel::<bool>();
299        let mut result = Vec::with_capacity(n);
300        for _ in 0..n {
301            let timer = timer.clone();
302            let barrier = barrier.clone();
303            let tx = tx.clone();
304            thread::spawn(move || {
305                barrier.wait();
306                tx.send(timer.permit_handle_expiration()).unwrap();
307            });
308        }
309        drop(tx);
310        while let Ok(v) = rx.recv() {
311            result.push(v);
312        }
313        assert_eq!(result.len(), n);
314        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
315    }
316}
317
318#[cfg(feature = "serde")]
319#[cfg(test)]
320mod test_serialization {
321    use super::test::in_time_window;
322    use super::AtomicTimer;
323    use std::{sync::atomic::Ordering, thread, time::Duration};
324
325    #[test]
326    fn test_serialize_deserialize() {
327        let timer = AtomicTimer::new(Duration::from_secs(5));
328        thread::sleep(Duration::from_secs(1));
329        let serialized = serde_json::to_string(&timer).unwrap();
330        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
331        assert!(in_time_window(
332            deserialized.elapsed(),
333            Duration::from_secs(1),
334            Duration::from_millis(100)
335        ));
336    }
337
338    #[test]
339    fn test_serialize_deserialize_monotonic_goes_forward() {
340        fn monotonic_ns_forwarded() -> i64 {
341            super::monotonic_ns() + 10_000 * 1_000_000_000
342        }
343        let timer = AtomicTimer::new(Duration::from_secs(5));
344        thread::sleep(Duration::from_secs(1));
345        let serialized = serde_json::to_string(&timer).unwrap();
346        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
347        let deserialized_rewinded = AtomicTimer::construct(
348            deserialized.duration().as_nanos().try_into().unwrap(),
349            deserialized.elapsed_ns(),
350            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
351            monotonic_ns_forwarded,
352        );
353        assert!(in_time_window(
354            deserialized_rewinded.elapsed(),
355            Duration::from_secs(1),
356            Duration::from_millis(100)
357        ));
358    }
359
360    #[test]
361    fn test_serialize_deserialize_monotonic_goes_backward() {
362        fn monotonic_ns_forwarded() -> i64 {
363            super::monotonic_ns() - 10_000 * 1_000_000_000
364        }
365        let timer = AtomicTimer::new(Duration::from_secs(5));
366        thread::sleep(Duration::from_secs(1));
367        let serialized = serde_json::to_string(&timer).unwrap();
368        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
369        let deserialized_rewinded = AtomicTimer::construct(
370            deserialized.duration().as_nanos().try_into().unwrap(),
371            deserialized.elapsed_ns(),
372            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
373            monotonic_ns_forwarded,
374        );
375        assert!(in_time_window(
376            deserialized_rewinded.elapsed(),
377            Duration::from_secs(1),
378            Duration::from_millis(100)
379        ));
380    }
381
382    #[test]
383    fn test_serialize_deserialize_monotonic_goes_zero() {
384        fn monotonic_ns_forwarded() -> i64 {
385            0
386        }
387        let timer = AtomicTimer::new(Duration::from_secs(5));
388        thread::sleep(Duration::from_secs(1));
389        let serialized = serde_json::to_string(&timer).unwrap();
390        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
391        let deserialized_rewinded = AtomicTimer::construct(
392            deserialized.duration().as_nanos().try_into().unwrap(),
393            deserialized.elapsed_ns(),
394            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
395            monotonic_ns_forwarded,
396        );
397        assert!(in_time_window(
398            deserialized_rewinded.elapsed(),
399            Duration::from_secs(1),
400            Duration::from_millis(100)
401        ));
402    }
403}