atomic_timer/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4    sync::atomic::{AtomicBool, AtomicI64, Ordering},
5    time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10/// Atomic timer
11pub struct AtomicTimer {
12    duration: AtomicI64,
13    start: AtomicI64,
14    permit_handle_expiration: AtomicBool,
15    monotonic_fn: fn() -> i64,
16}
17
18fn monotonic_ns() -> i64 {
19    i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
20}
21
22impl AtomicTimer {
23    #[allow(dead_code)]
24    fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
25        AtomicTimer {
26            duration: AtomicI64::new(duration),
27            start: AtomicI64::new(monotonic_fn() - elapsed),
28            monotonic_fn,
29            permit_handle_expiration: AtomicBool::new(phe),
30        }
31    }
32    /// Create a new atomic timer
33    ///
34    /// # Panics
35    ///
36    /// Panics if the duration is too large (in nanos greater than `i64::MAX`)
37    pub fn new(duration: Duration) -> Self {
38        Self::construct(
39            duration
40                .as_nanos()
41                .try_into()
42                .expect("Duration is too large"),
43            0,
44            true,
45            monotonic_ns,
46        )
47    }
48    /// Get the duration of the timer
49    ///
50    /// # Panics
51    ///
52    /// Panics if the duration is negative
53    #[inline]
54    pub fn duration(&self) -> Duration {
55        Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
56    }
57    /// Similar to reset if expired but does not reset the timer. As the timer is checked for
58    /// expiration, a tiny datarace may occur despite it passes the tests well. As soon as the
59    /// timer is reset with any method, the flag is reset as well. If used in multi-threaded
60    /// environment, "true" is returned to a single worker only. After, the flag is reset.
61    #[inline]
62    pub fn permit_handle_expiration(&self) -> bool {
63        self.permit_handle_expiration
64            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
65                (v && self.expired()).then_some(false)
66            })
67            .is_ok()
68    }
69    /// Change the duration of the timer
70    ///
71    /// # Panics
72    ///
73    /// Panics if the duration in nanos is larger than `i64::MAX`
74    pub fn set_duration(&self, duration: Duration) {
75        self.duration
76            .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
77    }
78    /// Reset the timer
79    #[inline]
80    pub fn reset(&self) {
81        self.permit_handle_expiration.store(true, Ordering::SeqCst);
82        self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
83    }
84    /// Focibly expire the timer
85    #[inline]
86    pub fn expire_now(&self) {
87        self.start.store(
88            (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
89            Ordering::SeqCst,
90        );
91    }
92    /// Reset the timer if it has expired, returns true if reset. If used in multi-threaded
93    /// environment, "true" is returned to a single worker only.
94    #[inline]
95    pub fn reset_if_expired(&self) -> bool {
96        let now = (self.monotonic_fn)();
97        self.start
98            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
99                self.permit_handle_expiration.store(true, Ordering::SeqCst);
100                (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
101            })
102            .is_ok()
103    }
104    /// Get the elapsed time
105    ///
106    /// In case if negative elapsed, returns `Duration::ZERO`
107    #[inline]
108    pub fn elapsed(&self) -> Duration {
109        Duration::from_nanos(
110            (self.monotonic_fn)()
111                .saturating_sub(self.start.load(Ordering::SeqCst))
112                .try_into()
113                .unwrap_or_default(),
114        )
115    }
116    /// Get the remaining time
117    ///
118    /// In case if negative remaining, returns `Duration::ZERO`
119    #[inline]
120    pub fn remaining(&self) -> Duration {
121        let elapsed = self.elapsed_ns();
122        if elapsed >= self.duration.load(Ordering::SeqCst) {
123            Duration::ZERO
124        } else {
125            Duration::from_nanos(
126                (self.duration.load(Ordering::SeqCst) - elapsed)
127                    .try_into()
128                    .unwrap_or_default(),
129            )
130        }
131    }
132    #[inline]
133    fn elapsed_ns(&self) -> i64 {
134        (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
135    }
136    /// Check if the timer has expired
137    #[inline]
138    pub fn expired(&self) -> bool {
139        self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
140    }
141}
142
143#[cfg(feature = "serde")]
144mod ser {
145    use super::{monotonic_ns, AtomicTimer};
146    use serde::{Deserialize, Deserializer, Serialize, Serializer};
147    use std::sync::atomic::Ordering;
148
149    #[derive(Serialize, Deserialize)]
150    struct SerializedTimer {
151        duration: i64,
152        elapsed: i64,
153        phe: bool,
154    }
155
156    impl Serialize for AtomicTimer {
157        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
158        where
159            S: Serializer,
160        {
161            let s = SerializedTimer {
162                duration: self.duration.load(Ordering::SeqCst),
163                elapsed: self.elapsed_ns(),
164                phe: self.permit_handle_expiration.load(Ordering::SeqCst),
165            };
166            s.serialize(serializer)
167        }
168    }
169
170    impl<'de> Deserialize<'de> for AtomicTimer {
171        fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
172        where
173            D: Deserializer<'de>,
174        {
175            let s = SerializedTimer::deserialize(deserializer)?;
176            Ok(AtomicTimer::construct(
177                s.duration,
178                s.elapsed,
179                s.phe,
180                monotonic_ns,
181            ))
182        }
183    }
184}
185
186#[cfg(test)]
187mod test {
188    use super::AtomicTimer;
189    use std::{
190        sync::{Arc, Barrier},
191        thread,
192        time::Duration,
193    };
194
195    pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
196        let diff = window / 2;
197        let min = b - diff;
198        let max = b + diff;
199        a >= min && a <= max
200    }
201
202    #[test]
203    fn test_reset() {
204        let timer = AtomicTimer::new(Duration::from_secs(5));
205        thread::sleep(Duration::from_secs(1));
206        timer.reset();
207        assert!(timer.elapsed() < Duration::from_millis(100));
208    }
209
210    #[test]
211    fn test_expire_now() {
212        let timer = AtomicTimer::new(Duration::from_secs(5));
213        assert!(!timer.expired());
214        assert!(in_time_window(
215            timer.remaining(),
216            Duration::from_secs(5),
217            Duration::from_millis(100)
218        ));
219        timer.expire_now();
220        assert!(timer.expired());
221    }
222
223    #[test]
224    fn test_reset_if_expired() {
225        let timer = AtomicTimer::new(Duration::from_secs(1));
226        assert!(!timer.reset_if_expired());
227        thread::sleep(Duration::from_millis(1100));
228        assert!(timer.expired());
229        assert!(timer.reset_if_expired());
230    }
231
232    #[test]
233    fn test_reset_if_expired_no_datarace() {
234        let n = 1000;
235        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
236        thread::sleep(Duration::from_millis(200));
237        assert!(timer.expired());
238        let barrier = Arc::new(Barrier::new(n));
239        let (tx, rx) = std::sync::mpsc::channel::<bool>();
240        let mut result = Vec::with_capacity(n);
241        for _ in 0..n {
242            let timer = timer.clone();
243            let barrier = barrier.clone();
244            let tx = tx.clone();
245            thread::spawn(move || {
246                barrier.wait();
247                tx.send(timer.reset_if_expired()).unwrap();
248            });
249        }
250        drop(tx);
251        while let Ok(v) = rx.recv() {
252            result.push(v);
253        }
254        assert_eq!(result.len(), n);
255        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
256    }
257
258    #[test]
259    fn test_permit_handle_expiration() {
260        let timer = AtomicTimer::new(Duration::from_secs(1));
261        assert!(!timer.permit_handle_expiration());
262        thread::sleep(Duration::from_millis(1100));
263        assert!(timer.expired());
264        assert!(timer.permit_handle_expiration());
265        assert!(!timer.permit_handle_expiration());
266        timer.reset();
267        thread::sleep(Duration::from_millis(1100));
268        timer.reset();
269        assert!(!timer.permit_handle_expiration());
270    }
271
272    #[test]
273    fn test_permit_handle_expiration_no_datarace() {
274        let n = 1000;
275        let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
276        thread::sleep(Duration::from_millis(200));
277        assert!(timer.expired());
278        let barrier = Arc::new(Barrier::new(n));
279        let (tx, rx) = std::sync::mpsc::channel::<bool>();
280        let mut result = Vec::with_capacity(n);
281        for _ in 0..n {
282            let timer = timer.clone();
283            let barrier = barrier.clone();
284            let tx = tx.clone();
285            thread::spawn(move || {
286                barrier.wait();
287                tx.send(timer.permit_handle_expiration()).unwrap();
288            });
289        }
290        drop(tx);
291        while let Ok(v) = rx.recv() {
292            result.push(v);
293        }
294        assert_eq!(result.len(), n);
295        assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
296    }
297}
298
299#[cfg(feature = "serde")]
300#[cfg(test)]
301mod test_serialization {
302    use super::test::in_time_window;
303    use super::AtomicTimer;
304    use std::{sync::atomic::Ordering, thread, time::Duration};
305
306    #[test]
307    fn test_serialize_deserialize() {
308        let timer = AtomicTimer::new(Duration::from_secs(5));
309        thread::sleep(Duration::from_secs(1));
310        let serialized = serde_json::to_string(&timer).unwrap();
311        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
312        assert!(in_time_window(
313            deserialized.elapsed(),
314            Duration::from_secs(1),
315            Duration::from_millis(100)
316        ));
317    }
318
319    #[test]
320    fn test_serialize_deserialize_monotonic_goes_forward() {
321        fn monotonic_ns_forwarded() -> i64 {
322            super::monotonic_ns() + 10_000 * 1_000_000_000
323        }
324        let timer = AtomicTimer::new(Duration::from_secs(5));
325        thread::sleep(Duration::from_secs(1));
326        let serialized = serde_json::to_string(&timer).unwrap();
327        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
328        let deserialized_rewinded = AtomicTimer::construct(
329            deserialized.duration().as_nanos().try_into().unwrap(),
330            deserialized.elapsed_ns(),
331            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
332            monotonic_ns_forwarded,
333        );
334        assert!(in_time_window(
335            deserialized_rewinded.elapsed(),
336            Duration::from_secs(1),
337            Duration::from_millis(100)
338        ));
339    }
340
341    #[test]
342    fn test_serialize_deserialize_monotonic_goes_backward() {
343        fn monotonic_ns_forwarded() -> i64 {
344            super::monotonic_ns() - 10_000 * 1_000_000_000
345        }
346        let timer = AtomicTimer::new(Duration::from_secs(5));
347        thread::sleep(Duration::from_secs(1));
348        let serialized = serde_json::to_string(&timer).unwrap();
349        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
350        let deserialized_rewinded = AtomicTimer::construct(
351            deserialized.duration().as_nanos().try_into().unwrap(),
352            deserialized.elapsed_ns(),
353            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
354            monotonic_ns_forwarded,
355        );
356        assert!(in_time_window(
357            deserialized_rewinded.elapsed(),
358            Duration::from_secs(1),
359            Duration::from_millis(100)
360        ));
361    }
362
363    #[test]
364    fn test_serialize_deserialize_monotonic_goes_zero() {
365        fn monotonic_ns_forwarded() -> i64 {
366            0
367        }
368        let timer = AtomicTimer::new(Duration::from_secs(5));
369        thread::sleep(Duration::from_secs(1));
370        let serialized = serde_json::to_string(&timer).unwrap();
371        let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
372        let deserialized_rewinded = AtomicTimer::construct(
373            deserialized.duration().as_nanos().try_into().unwrap(),
374            deserialized.elapsed_ns(),
375            deserialized.permit_handle_expiration.load(Ordering::SeqCst),
376            monotonic_ns_forwarded,
377        );
378        assert!(in_time_window(
379            deserialized_rewinded.elapsed(),
380            Duration::from_secs(1),
381            Duration::from_millis(100)
382        ));
383    }
384}