1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4 sync::atomic::{AtomicBool, AtomicI64, Ordering},
5 time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10pub struct AtomicTimer {
12 duration: AtomicI64,
13 start: AtomicI64,
14 permit_handle_expiration: AtomicBool,
15 monotonic_fn: fn() -> i64,
16}
17
18impl std::fmt::Debug for AtomicTimer {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 f.debug_struct("AtomicTimer")
21 .field("duration", &self.duration())
22 .field("elapsed", &self.elapsed())
23 .field("remaining", &self.remaining())
24 .field("expired", &self.expired())
25 .finish()
26 }
27}
28
29fn monotonic_ns() -> i64 {
30 i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
31}
32
33impl AtomicTimer {
34 #[allow(dead_code)]
35 fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
36 AtomicTimer {
37 duration: AtomicI64::new(duration),
38 start: AtomicI64::new(monotonic_fn() - elapsed),
39 monotonic_fn,
40 permit_handle_expiration: AtomicBool::new(phe),
41 }
42 }
43 pub fn new(duration: Duration) -> Self {
49 Self::construct(
50 duration
51 .as_nanos()
52 .try_into()
53 .expect("Duration is too large"),
54 0,
55 true,
56 monotonic_ns,
57 )
58 }
59 pub fn new_expired(duration: Duration) -> Self {
65 Self::construct(
66 duration
67 .as_nanos()
68 .try_into()
69 .expect("Duration is too large"),
70 duration
71 .as_nanos()
72 .try_into()
73 .expect("Duration is too large"),
74 true,
75 monotonic_ns,
76 )
77 }
78 #[inline]
84 pub fn duration(&self) -> Duration {
85 Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
86 }
87 #[inline]
92 pub fn permit_handle_expiration(&self) -> bool {
93 self.permit_handle_expiration
94 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
95 (v && self.expired()).then_some(false)
96 })
97 .is_ok()
98 }
99 pub fn set_duration(&self, duration: Duration) {
105 self.duration
106 .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
107 }
108 #[inline]
110 pub fn reset(&self) {
111 self.permit_handle_expiration.store(true, Ordering::SeqCst);
112 self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
113 }
114 #[inline]
116 pub fn expire_now(&self) {
117 self.start.store(
118 (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
119 Ordering::SeqCst,
120 );
121 }
122 #[inline]
125 pub fn reset_if_expired(&self) -> bool {
126 let now = (self.monotonic_fn)();
127 self.start
128 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
129 self.permit_handle_expiration.store(true, Ordering::SeqCst);
130 (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
131 })
132 .is_ok()
133 }
134 #[inline]
138 pub fn elapsed(&self) -> Duration {
139 Duration::from_nanos(
140 (self.monotonic_fn)()
141 .saturating_sub(self.start.load(Ordering::SeqCst))
142 .try_into()
143 .unwrap_or_default(),
144 )
145 }
146 #[inline]
150 pub fn remaining(&self) -> Duration {
151 let elapsed = self.elapsed_ns();
152 if elapsed >= self.duration.load(Ordering::SeqCst) {
153 Duration::ZERO
154 } else {
155 Duration::from_nanos(
156 (self.duration.load(Ordering::SeqCst) - elapsed)
157 .try_into()
158 .unwrap_or_default(),
159 )
160 }
161 }
162 #[inline]
163 fn elapsed_ns(&self) -> i64 {
164 (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
165 }
166 #[inline]
168 pub fn expired(&self) -> bool {
169 self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
170 }
171}
172
173#[cfg(feature = "serde")]
174mod ser {
175 use super::{monotonic_ns, AtomicTimer};
176 use serde::{Deserialize, Deserializer, Serialize, Serializer};
177 use std::sync::atomic::Ordering;
178
179 #[derive(Serialize, Deserialize)]
180 struct SerializedTimer {
181 duration: i64,
182 elapsed: i64,
183 phe: bool,
184 }
185
186 impl Serialize for AtomicTimer {
187 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
188 where
189 S: Serializer,
190 {
191 let s = SerializedTimer {
192 duration: self.duration.load(Ordering::SeqCst),
193 elapsed: self.elapsed_ns(),
194 phe: self.permit_handle_expiration.load(Ordering::SeqCst),
195 };
196 s.serialize(serializer)
197 }
198 }
199
200 impl<'de> Deserialize<'de> for AtomicTimer {
201 fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
202 where
203 D: Deserializer<'de>,
204 {
205 let s = SerializedTimer::deserialize(deserializer)?;
206 Ok(AtomicTimer::construct(
207 s.duration,
208 s.elapsed,
209 s.phe,
210 monotonic_ns,
211 ))
212 }
213 }
214}
215
216#[cfg(test)]
217mod test {
218 use super::AtomicTimer;
219 use std::{
220 sync::{Arc, Barrier},
221 thread,
222 time::Duration,
223 };
224
225 pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
226 let diff = window / 2;
227 let min = b - diff;
228 let max = b + diff;
229 a >= min && a <= max
230 }
231
232 #[test]
233 fn test_reset() {
234 let timer = AtomicTimer::new(Duration::from_secs(5));
235 thread::sleep(Duration::from_secs(1));
236 timer.reset();
237 assert!(timer.elapsed() < Duration::from_millis(100));
238 }
239
240 #[test]
241 fn test_expire_now() {
242 let timer = AtomicTimer::new(Duration::from_secs(5));
243 assert!(!timer.expired());
244 assert!(in_time_window(
245 timer.remaining(),
246 Duration::from_secs(5),
247 Duration::from_millis(100)
248 ));
249 timer.expire_now();
250 assert!(timer.expired());
251 }
252
253 #[test]
254 fn test_reset_if_expired() {
255 let timer = AtomicTimer::new(Duration::from_secs(1));
256 assert!(!timer.reset_if_expired());
257 thread::sleep(Duration::from_millis(1100));
258 assert!(timer.expired());
259 assert!(timer.reset_if_expired());
260 }
261
262 #[test]
263 fn test_reset_if_expired_no_datarace() {
264 let n = 1000;
265 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
266 thread::sleep(Duration::from_millis(200));
267 assert!(timer.expired());
268 let barrier = Arc::new(Barrier::new(n));
269 let (tx, rx) = std::sync::mpsc::channel::<bool>();
270 let mut result = Vec::with_capacity(n);
271 for _ in 0..n {
272 let timer = timer.clone();
273 let barrier = barrier.clone();
274 let tx = tx.clone();
275 thread::spawn(move || {
276 barrier.wait();
277 tx.send(timer.reset_if_expired()).unwrap();
278 });
279 }
280 drop(tx);
281 while let Ok(v) = rx.recv() {
282 result.push(v);
283 }
284 assert_eq!(result.len(), n);
285 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
286 }
287
288 #[test]
289 fn test_permit_handle_expiration() {
290 let timer = AtomicTimer::new(Duration::from_secs(1));
291 assert!(!timer.permit_handle_expiration());
292 thread::sleep(Duration::from_millis(1100));
293 assert!(timer.expired());
294 assert!(timer.permit_handle_expiration());
295 assert!(!timer.permit_handle_expiration());
296 timer.reset();
297 thread::sleep(Duration::from_millis(1100));
298 timer.reset();
299 assert!(!timer.permit_handle_expiration());
300 }
301
302 #[test]
303 fn test_permit_handle_expiration_no_datarace() {
304 let n = 1000;
305 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
306 thread::sleep(Duration::from_millis(200));
307 assert!(timer.expired());
308 let barrier = Arc::new(Barrier::new(n));
309 let (tx, rx) = std::sync::mpsc::channel::<bool>();
310 let mut result = Vec::with_capacity(n);
311 for _ in 0..n {
312 let timer = timer.clone();
313 let barrier = barrier.clone();
314 let tx = tx.clone();
315 thread::spawn(move || {
316 barrier.wait();
317 tx.send(timer.permit_handle_expiration()).unwrap();
318 });
319 }
320 drop(tx);
321 while let Ok(v) = rx.recv() {
322 result.push(v);
323 }
324 assert_eq!(result.len(), n);
325 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
326 }
327}
328
329#[cfg(feature = "serde")]
330#[cfg(test)]
331mod test_serialization {
332 use super::test::in_time_window;
333 use super::AtomicTimer;
334 use std::{sync::atomic::Ordering, thread, time::Duration};
335
336 #[test]
337 fn test_serialize_deserialize() {
338 let timer = AtomicTimer::new(Duration::from_secs(5));
339 thread::sleep(Duration::from_secs(1));
340 let serialized = serde_json::to_string(&timer).unwrap();
341 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
342 assert!(in_time_window(
343 deserialized.elapsed(),
344 Duration::from_secs(1),
345 Duration::from_millis(100)
346 ));
347 }
348
349 #[test]
350 fn test_serialize_deserialize_monotonic_goes_forward() {
351 fn monotonic_ns_forwarded() -> i64 {
352 super::monotonic_ns() + 10_000 * 1_000_000_000
353 }
354 let timer = AtomicTimer::new(Duration::from_secs(5));
355 thread::sleep(Duration::from_secs(1));
356 let serialized = serde_json::to_string(&timer).unwrap();
357 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
358 let deserialized_rewinded = AtomicTimer::construct(
359 deserialized.duration().as_nanos().try_into().unwrap(),
360 deserialized.elapsed_ns(),
361 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
362 monotonic_ns_forwarded,
363 );
364 assert!(in_time_window(
365 deserialized_rewinded.elapsed(),
366 Duration::from_secs(1),
367 Duration::from_millis(100)
368 ));
369 }
370
371 #[test]
372 fn test_serialize_deserialize_monotonic_goes_backward() {
373 fn monotonic_ns_forwarded() -> i64 {
374 super::monotonic_ns() - 10_000 * 1_000_000_000
375 }
376 let timer = AtomicTimer::new(Duration::from_secs(5));
377 thread::sleep(Duration::from_secs(1));
378 let serialized = serde_json::to_string(&timer).unwrap();
379 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
380 let deserialized_rewinded = AtomicTimer::construct(
381 deserialized.duration().as_nanos().try_into().unwrap(),
382 deserialized.elapsed_ns(),
383 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
384 monotonic_ns_forwarded,
385 );
386 assert!(in_time_window(
387 deserialized_rewinded.elapsed(),
388 Duration::from_secs(1),
389 Duration::from_millis(100)
390 ));
391 }
392
393 #[test]
394 fn test_serialize_deserialize_monotonic_goes_zero() {
395 fn monotonic_ns_forwarded() -> i64 {
396 0
397 }
398 let timer = AtomicTimer::new(Duration::from_secs(5));
399 thread::sleep(Duration::from_secs(1));
400 let serialized = serde_json::to_string(&timer).unwrap();
401 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
402 let deserialized_rewinded = AtomicTimer::construct(
403 deserialized.duration().as_nanos().try_into().unwrap(),
404 deserialized.elapsed_ns(),
405 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
406 monotonic_ns_forwarded,
407 );
408 assert!(in_time_window(
409 deserialized_rewinded.elapsed(),
410 Duration::from_secs(1),
411 Duration::from_millis(100)
412 ));
413 }
414}