atomic_timer/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4    sync::atomic::{AtomicBool, AtomicI64, Ordering},
5    time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10/// Atomic timer
11pub struct AtomicTimer {
12    duration: AtomicI64,
13    start: AtomicI64,
14    permit_handle_expiration: AtomicBool,
15    monotonic_fn: fn() -> i64,
16}
17
18impl std::fmt::Debug for AtomicTimer {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        f.debug_struct("AtomicTimer")
21            .field("duration", &self.duration())
22            .field("elapsed", &self.elapsed())
23            .field("remaining", &self.remaining())
24            .field("expired", &self.expired())
25            .finish()
26    }
27}
28
29fn monotonic_ns() -> i64 {
30    i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
31}
32
33impl AtomicTimer {
34    #[allow(dead_code)]
35    fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
36        AtomicTimer {
37            duration: AtomicI64::new(duration),
38            start: AtomicI64::new(monotonic_fn() - elapsed),
39            monotonic_fn,
40            permit_handle_expiration: AtomicBool::new(phe),
41        }
42    }
43    /// Create a new atomic timer
44    ///
45    /// # Panics
46    ///
47    /// Panics if the duration is too large (in nanos greater than `i64::MAX`)
48    pub fn new(duration: Duration) -> Self {
49        Self::construct(
50            duration
51                .as_nanos()
52                .try_into()
53                .expect("Duration is too large"),
54            0,
55            true,
56            monotonic_ns,
57        )
58    }
59    /// Create a new atomic timer expired
60    ///
61    /// # Panics
62    ///
63    /// Panics if the duration is too large (in nanos greater than `i64::MAX`)
64    pub fn new_expired(duration: Duration) -> Self {
65        Self::construct(
66            duration
67                .as_nanos()
68                .try_into()
69                .expect("Duration is too large"),
70            duration
71                .as_nanos()
72                .try_into()
73                .expect("Duration is too large"),
74            true,
75            monotonic_ns,
76        )
77    }
78    /// Get the duration of the timer
79    ///
80    /// # Panics
81    ///
82    /// Panics if the duration is negative
83    #[inline]
84    pub fn duration(&self) -> Duration {
85        Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
86    }
87    /// Similar to reset if expired but does not reset the timer. As the timer is checked for
88    /// expiration, a tiny datarace may occur despite it passes the tests well. As soon as the
89    /// timer is reset with any method, the flag is reset as well. If used in multi-threaded
90    /// environment, "true" is returned to a single worker only. After, the flag is reset.
91    #[inline]
92    pub fn permit_handle_expiration(&self) -> bool {
93        self.permit_handle_expiration
94            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
95                (v && self.expired()).then_some(false)
96            })
97            .is_ok()
98    }
99    /// Change the duration of the timer
100    ///
101    /// # Panics
102    ///
103    /// Panics if the duration in nanos is larger than `i64::MAX`
104    pub fn set_duration(&self, duration: Duration) {
105        self.duration
106            .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
107    }
108    /// Reset the timer
109    #[inline]
110    pub fn reset(&self) {
111        self.permit_handle_expiration.store(true, Ordering::SeqCst);
112        self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
113    }
114    /// Focibly expire the timer
115    #[inline]
116    pub fn expire_now(&self) {
117        self.start.store(
118            (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
119            Ordering::SeqCst,
120        );
121    }
122    /// Reset the timer if it has expired, returns true if reset. If used in multi-threaded
123    /// environment, "true" is returned to a single worker only.
124    #[inline]
125    pub fn reset_if_expired(&self) -> bool {
126        let now = (self.monotonic_fn)();
127        self.start
128            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
129                self.permit_handle_expiration.store(true, Ordering::SeqCst);
130                (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
131            })
132            .is_ok()
133    }
134    /// Get the elapsed time
135    ///
136    /// In case if negative elapsed, returns `Duration::ZERO`
137    #[inline]
138    pub fn elapsed(&self) -> Duration {
139        Duration::from_nanos(
140            (self.monotonic_fn)()
141                .saturating_sub(self.start.load(Ordering::SeqCst))
142                .try_into()
143                .unwrap_or_default(),
144        )
145    }
146    /// Get the remaining time
147    ///
148    /// In case if negative remaining, returns `Duration::ZERO`
149    #[inline]
150    pub fn remaining(&self) -> Duration {
151        let elapsed = self.elapsed_ns();
152        if elapsed >= self.duration.load(Ordering::SeqCst) {
153            Duration::ZERO
154        } else {
155            Duration::from_nanos(
156                (self.duration.load(Ordering::SeqCst) - elapsed)
157                    .try_into()
158                    .unwrap_or_default(),
159            )
160        }
161    }
162    #[inline]
163    fn elapsed_ns(&self) -> i64 {
164        (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
165    }
166    /// Check if the timer has expired
167    #[inline]
168    pub fn expired(&self) -> bool {
169        self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
170    }
171}
172
173#[cfg(feature = "serde")]
174mod ser {
175    use super::{monotonic_ns, AtomicTimer};
176    use serde::{Deserialize, Deserializer, Serialize, Serializer};
177    use std::sync::atomic::Ordering;
178
179    #[derive(Serialize, Deserialize)]
180    struct SerializedTimer {
181        duration: i64,
182        elapsed: i64,
183        phe: bool,
184    }
185
186    impl Serialize for AtomicTimer {
187        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
188        where
189            S: Serializer,
190        {
191            let s = SerializedTimer {
192                duration: self.duration.load(Ordering::SeqCst),
193                elapsed: self.elapsed_ns(),
194                phe: self.permit_handle_expiration.load(Ordering::SeqCst),
195            };
196            s.serialize(serializer)
197        }
198    }
199
200    impl<'de> Deserialize<'de> for AtomicTimer {
201        fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
202        where
203            D: Deserializer<'de>,
204        {
205            let s = SerializedTimer::deserialize(deserializer)?;
206            Ok(AtomicTimer::construct(
207                s.duration,
208                s.elapsed,
209                s.phe,
210                monotonic_ns,
211            ))
212        }
213    }
214}
215
216#[cfg(test)]
217mod test {
218    use super::AtomicTimer;
219    use std::{
220        sync::{Arc, Barrier},
221        thread,
222        time::Duration,
223    };
224
225    pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
226        let diff = window / 2;
227        let min = b - diff;
228        let max = b + diff;
229        a >= min && a <= max
230    }
231
232    #[test]
233    fn test_reset() {
234        let timer = AtomicTimer::new(Duration::from_secs(5));
235        thread::sleep(Duration::from_secs(1));
236        timer.reset();
237        assert!(timer.elapsed() < Duration::from_millis(100));
238    }
239
240    #[test]
241    fn test_expire_now() {
242        let timer = AtomicTimer::new(Duration::from_secs(5));
243        assert!(!timer.expired());
244        assert!(in_time_window(
245            timer.remaining(),
246            Duration::from_secs(5),
247            Duration::from_millis(100)
248        ));
249        timer.expire_now();
250        assert!(timer.expired());
251    }
252
253    #[test]
254    fn test_reset_if_expired() {
255        let timer = AtomicTimer::new(Duration::from_secs(1));
256        assert!(!timer.reset_if_expired());
257        thread::sleep(Duration::from_millis(1100));
258        assert!(timer.expired());
259        assert!(timer.reset_if_expired());
260    }
261
262    #[test]
263    fn test_reset_if_expired_no_datarace() {
264        let n = 1000;
265        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
266        thread::sleep(Duration::from_millis(200));
267        assert!(timer.expired());
268        let barrier = Arc::new(Barrier::new(n));
269        let (tx, rx) = std::sync::mpsc::channel::<bool>();
270        let mut result = Vec::with_capacity(n);
271        for _ in 0..n {
272            let timer = timer.clone();
273            let barrier = barrier.clone();
274            let tx = tx.clone();
275            thread::spawn(move || {
276                barrier.wait();
277                tx.send(timer.reset_if_expired()).unwrap();
278            });
279        }
280        drop(tx);
281        while let Ok(v) = rx.recv() {
282            result.push(v);
283        }
284        assert_eq!(result.len(), n);
285        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
286    }
287
288    #[test]
289    fn test_permit_handle_expiration() {
290        let timer = AtomicTimer::new(Duration::from_secs(1));
291        assert!(!timer.permit_handle_expiration());
292        thread::sleep(Duration::from_millis(1100));
293        assert!(timer.expired());
294        assert!(timer.permit_handle_expiration());
295        assert!(!timer.permit_handle_expiration());
296        timer.reset();
297        thread::sleep(Duration::from_millis(1100));
298        timer.reset();
299        assert!(!timer.permit_handle_expiration());
300    }
301
302    #[test]
303    fn test_permit_handle_expiration_no_datarace() {
304        let n = 1000;
305        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
306        thread::sleep(Duration::from_millis(200));
307        assert!(timer.expired());
308        let barrier = Arc::new(Barrier::new(n));
309        let (tx, rx) = std::sync::mpsc::channel::<bool>();
310        let mut result = Vec::with_capacity(n);
311        for _ in 0..n {
312            let timer = timer.clone();
313            let barrier = barrier.clone();
314            let tx = tx.clone();
315            thread::spawn(move || {
316                barrier.wait();
317                tx.send(timer.permit_handle_expiration()).unwrap();
318            });
319        }
320        drop(tx);
321        while let Ok(v) = rx.recv() {
322            result.push(v);
323        }
324        assert_eq!(result.len(), n);
325        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
326    }
327}
328
329#[cfg(feature = "serde")]
330#[cfg(test)]
331mod test_serialization {
332    use super::test::in_time_window;
333    use super::AtomicTimer;
334    use std::{sync::atomic::Ordering, thread, time::Duration};
335
336    #[test]
337    fn test_serialize_deserialize() {
338        let timer = AtomicTimer::new(Duration::from_secs(5));
339        thread::sleep(Duration::from_secs(1));
340        let serialized = serde_json::to_string(&timer).unwrap();
341        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
342        assert!(in_time_window(
343            deserialized.elapsed(),
344            Duration::from_secs(1),
345            Duration::from_millis(100)
346        ));
347    }
348
349    #[test]
350    fn test_serialize_deserialize_monotonic_goes_forward() {
351        fn monotonic_ns_forwarded() -> i64 {
352            super::monotonic_ns() + 10_000 * 1_000_000_000
353        }
354        let timer = AtomicTimer::new(Duration::from_secs(5));
355        thread::sleep(Duration::from_secs(1));
356        let serialized = serde_json::to_string(&timer).unwrap();
357        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
358        let deserialized_rewinded = AtomicTimer::construct(
359            deserialized.duration().as_nanos().try_into().unwrap(),
360            deserialized.elapsed_ns(),
361            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
362            monotonic_ns_forwarded,
363        );
364        assert!(in_time_window(
365            deserialized_rewinded.elapsed(),
366            Duration::from_secs(1),
367            Duration::from_millis(100)
368        ));
369    }
370
371    #[test]
372    fn test_serialize_deserialize_monotonic_goes_backward() {
373        fn monotonic_ns_forwarded() -> i64 {
374            super::monotonic_ns() - 10_000 * 1_000_000_000
375        }
376        let timer = AtomicTimer::new(Duration::from_secs(5));
377        thread::sleep(Duration::from_secs(1));
378        let serialized = serde_json::to_string(&timer).unwrap();
379        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
380        let deserialized_rewinded = AtomicTimer::construct(
381            deserialized.duration().as_nanos().try_into().unwrap(),
382            deserialized.elapsed_ns(),
383            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
384            monotonic_ns_forwarded,
385        );
386        assert!(in_time_window(
387            deserialized_rewinded.elapsed(),
388            Duration::from_secs(1),
389            Duration::from_millis(100)
390        ));
391    }
392
393    #[test]
394    fn test_serialize_deserialize_monotonic_goes_zero() {
395        fn monotonic_ns_forwarded() -> i64 {
396            0
397        }
398        let timer = AtomicTimer::new(Duration::from_secs(5));
399        thread::sleep(Duration::from_secs(1));
400        let serialized = serde_json::to_string(&timer).unwrap();
401        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
402        let deserialized_rewinded = AtomicTimer::construct(
403            deserialized.duration().as_nanos().try_into().unwrap(),
404            deserialized.elapsed_ns(),
405            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
406            monotonic_ns_forwarded,
407        );
408        assert!(in_time_window(
409            deserialized_rewinded.elapsed(),
410            Duration::from_secs(1),
411            Duration::from_millis(100)
412        ));
413    }
414}