1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4 sync::atomic::{AtomicBool, AtomicI64, Ordering},
5 time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10pub struct AtomicTimer {
12 duration: AtomicI64,
13 start: AtomicI64,
14 permit_handle_expiration: AtomicBool,
15 monotonic_fn: fn() -> i64,
16}
17
18fn monotonic_ns() -> i64 {
19 i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
20}
21
22impl AtomicTimer {
23 #[allow(dead_code)]
24 fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
25 AtomicTimer {
26 duration: AtomicI64::new(duration),
27 start: AtomicI64::new(monotonic_fn() - elapsed),
28 monotonic_fn,
29 permit_handle_expiration: AtomicBool::new(phe),
30 }
31 }
32 pub fn new(duration: Duration) -> Self {
38 Self::construct(
39 duration
40 .as_nanos()
41 .try_into()
42 .expect("Duration is too large"),
43 0,
44 true,
45 monotonic_ns,
46 )
47 }
48 pub fn new_expired(duration: Duration) -> Self {
54 Self::construct(
55 duration
56 .as_nanos()
57 .try_into()
58 .expect("Duration is too large"),
59 duration
60 .as_nanos()
61 .try_into()
62 .expect("Duration is too large"),
63 true,
64 monotonic_ns,
65 )
66 }
67 #[inline]
73 pub fn duration(&self) -> Duration {
74 Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
75 }
76 #[inline]
81 pub fn permit_handle_expiration(&self) -> bool {
82 self.permit_handle_expiration
83 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
84 (v && self.expired()).then_some(false)
85 })
86 .is_ok()
87 }
88 pub fn set_duration(&self, duration: Duration) {
94 self.duration
95 .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
96 }
97 #[inline]
99 pub fn reset(&self) {
100 self.permit_handle_expiration.store(true, Ordering::SeqCst);
101 self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
102 }
103 #[inline]
105 pub fn expire_now(&self) {
106 self.start.store(
107 (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
108 Ordering::SeqCst,
109 );
110 }
111 #[inline]
114 pub fn reset_if_expired(&self) -> bool {
115 let now = (self.monotonic_fn)();
116 self.start
117 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
118 self.permit_handle_expiration.store(true, Ordering::SeqCst);
119 (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
120 })
121 .is_ok()
122 }
123 #[inline]
127 pub fn elapsed(&self) -> Duration {
128 Duration::from_nanos(
129 (self.monotonic_fn)()
130 .saturating_sub(self.start.load(Ordering::SeqCst))
131 .try_into()
132 .unwrap_or_default(),
133 )
134 }
135 #[inline]
139 pub fn remaining(&self) -> Duration {
140 let elapsed = self.elapsed_ns();
141 if elapsed >= self.duration.load(Ordering::SeqCst) {
142 Duration::ZERO
143 } else {
144 Duration::from_nanos(
145 (self.duration.load(Ordering::SeqCst) - elapsed)
146 .try_into()
147 .unwrap_or_default(),
148 )
149 }
150 }
151 #[inline]
152 fn elapsed_ns(&self) -> i64 {
153 (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
154 }
155 #[inline]
157 pub fn expired(&self) -> bool {
158 self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
159 }
160}
161
162#[cfg(feature = "serde")]
163mod ser {
164 use super::{monotonic_ns, AtomicTimer};
165 use serde::{Deserialize, Deserializer, Serialize, Serializer};
166 use std::sync::atomic::Ordering;
167
168 #[derive(Serialize, Deserialize)]
169 struct SerializedTimer {
170 duration: i64,
171 elapsed: i64,
172 phe: bool,
173 }
174
175 impl Serialize for AtomicTimer {
176 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177 where
178 S: Serializer,
179 {
180 let s = SerializedTimer {
181 duration: self.duration.load(Ordering::SeqCst),
182 elapsed: self.elapsed_ns(),
183 phe: self.permit_handle_expiration.load(Ordering::SeqCst),
184 };
185 s.serialize(serializer)
186 }
187 }
188
189 impl<'de> Deserialize<'de> for AtomicTimer {
190 fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
191 where
192 D: Deserializer<'de>,
193 {
194 let s = SerializedTimer::deserialize(deserializer)?;
195 Ok(AtomicTimer::construct(
196 s.duration,
197 s.elapsed,
198 s.phe,
199 monotonic_ns,
200 ))
201 }
202 }
203}
204
205#[cfg(test)]
206mod test {
207 use super::AtomicTimer;
208 use std::{
209 sync::{Arc, Barrier},
210 thread,
211 time::Duration,
212 };
213
214 pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
215 let diff = window / 2;
216 let min = b - diff;
217 let max = b + diff;
218 a >= min && a <= max
219 }
220
221 #[test]
222 fn test_reset() {
223 let timer = AtomicTimer::new(Duration::from_secs(5));
224 thread::sleep(Duration::from_secs(1));
225 timer.reset();
226 assert!(timer.elapsed() < Duration::from_millis(100));
227 }
228
229 #[test]
230 fn test_expire_now() {
231 let timer = AtomicTimer::new(Duration::from_secs(5));
232 assert!(!timer.expired());
233 assert!(in_time_window(
234 timer.remaining(),
235 Duration::from_secs(5),
236 Duration::from_millis(100)
237 ));
238 timer.expire_now();
239 assert!(timer.expired());
240 }
241
242 #[test]
243 fn test_reset_if_expired() {
244 let timer = AtomicTimer::new(Duration::from_secs(1));
245 assert!(!timer.reset_if_expired());
246 thread::sleep(Duration::from_millis(1100));
247 assert!(timer.expired());
248 assert!(timer.reset_if_expired());
249 }
250
251 #[test]
252 fn test_reset_if_expired_no_datarace() {
253 let n = 1000;
254 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
255 thread::sleep(Duration::from_millis(200));
256 assert!(timer.expired());
257 let barrier = Arc::new(Barrier::new(n));
258 let (tx, rx) = std::sync::mpsc::channel::<bool>();
259 let mut result = Vec::with_capacity(n);
260 for _ in 0..n {
261 let timer = timer.clone();
262 let barrier = barrier.clone();
263 let tx = tx.clone();
264 thread::spawn(move || {
265 barrier.wait();
266 tx.send(timer.reset_if_expired()).unwrap();
267 });
268 }
269 drop(tx);
270 while let Ok(v) = rx.recv() {
271 result.push(v);
272 }
273 assert_eq!(result.len(), n);
274 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
275 }
276
277 #[test]
278 fn test_permit_handle_expiration() {
279 let timer = AtomicTimer::new(Duration::from_secs(1));
280 assert!(!timer.permit_handle_expiration());
281 thread::sleep(Duration::from_millis(1100));
282 assert!(timer.expired());
283 assert!(timer.permit_handle_expiration());
284 assert!(!timer.permit_handle_expiration());
285 timer.reset();
286 thread::sleep(Duration::from_millis(1100));
287 timer.reset();
288 assert!(!timer.permit_handle_expiration());
289 }
290
291 #[test]
292 fn test_permit_handle_expiration_no_datarace() {
293 let n = 1000;
294 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
295 thread::sleep(Duration::from_millis(200));
296 assert!(timer.expired());
297 let barrier = Arc::new(Barrier::new(n));
298 let (tx, rx) = std::sync::mpsc::channel::<bool>();
299 let mut result = Vec::with_capacity(n);
300 for _ in 0..n {
301 let timer = timer.clone();
302 let barrier = barrier.clone();
303 let tx = tx.clone();
304 thread::spawn(move || {
305 barrier.wait();
306 tx.send(timer.permit_handle_expiration()).unwrap();
307 });
308 }
309 drop(tx);
310 while let Ok(v) = rx.recv() {
311 result.push(v);
312 }
313 assert_eq!(result.len(), n);
314 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
315 }
316}
317
318#[cfg(feature = "serde")]
319#[cfg(test)]
320mod test_serialization {
321 use super::test::in_time_window;
322 use super::AtomicTimer;
323 use std::{sync::atomic::Ordering, thread, time::Duration};
324
325 #[test]
326 fn test_serialize_deserialize() {
327 let timer = AtomicTimer::new(Duration::from_secs(5));
328 thread::sleep(Duration::from_secs(1));
329 let serialized = serde_json::to_string(&timer).unwrap();
330 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
331 assert!(in_time_window(
332 deserialized.elapsed(),
333 Duration::from_secs(1),
334 Duration::from_millis(100)
335 ));
336 }
337
338 #[test]
339 fn test_serialize_deserialize_monotonic_goes_forward() {
340 fn monotonic_ns_forwarded() -> i64 {
341 super::monotonic_ns() + 10_000 * 1_000_000_000
342 }
343 let timer = AtomicTimer::new(Duration::from_secs(5));
344 thread::sleep(Duration::from_secs(1));
345 let serialized = serde_json::to_string(&timer).unwrap();
346 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
347 let deserialized_rewinded = AtomicTimer::construct(
348 deserialized.duration().as_nanos().try_into().unwrap(),
349 deserialized.elapsed_ns(),
350 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
351 monotonic_ns_forwarded,
352 );
353 assert!(in_time_window(
354 deserialized_rewinded.elapsed(),
355 Duration::from_secs(1),
356 Duration::from_millis(100)
357 ));
358 }
359
360 #[test]
361 fn test_serialize_deserialize_monotonic_goes_backward() {
362 fn monotonic_ns_forwarded() -> i64 {
363 super::monotonic_ns() - 10_000 * 1_000_000_000
364 }
365 let timer = AtomicTimer::new(Duration::from_secs(5));
366 thread::sleep(Duration::from_secs(1));
367 let serialized = serde_json::to_string(&timer).unwrap();
368 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
369 let deserialized_rewinded = AtomicTimer::construct(
370 deserialized.duration().as_nanos().try_into().unwrap(),
371 deserialized.elapsed_ns(),
372 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
373 monotonic_ns_forwarded,
374 );
375 assert!(in_time_window(
376 deserialized_rewinded.elapsed(),
377 Duration::from_secs(1),
378 Duration::from_millis(100)
379 ));
380 }
381
382 #[test]
383 fn test_serialize_deserialize_monotonic_goes_zero() {
384 fn monotonic_ns_forwarded() -> i64 {
385 0
386 }
387 let timer = AtomicTimer::new(Duration::from_secs(5));
388 thread::sleep(Duration::from_secs(1));
389 let serialized = serde_json::to_string(&timer).unwrap();
390 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
391 let deserialized_rewinded = AtomicTimer::construct(
392 deserialized.duration().as_nanos().try_into().unwrap(),
393 deserialized.elapsed_ns(),
394 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
395 monotonic_ns_forwarded,
396 );
397 assert!(in_time_window(
398 deserialized_rewinded.elapsed(),
399 Duration::from_secs(1),
400 Duration::from_millis(100)
401 ));
402 }
403}