1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3use std::{
4 sync::atomic::{AtomicBool, AtomicI64, Ordering},
5 time::Duration,
6};
7
8use bma_ts::Monotonic;
9
10pub struct AtomicTimer {
12 duration: AtomicI64,
13 start: AtomicI64,
14 permit_handle_expiration: AtomicBool,
15 monotonic_fn: fn() -> i64,
16}
17
18fn monotonic_ns() -> i64 {
19 i64::try_from(Monotonic::now().as_nanos()).expect("Monotonic time is too large")
20}
21
22impl AtomicTimer {
23 #[allow(dead_code)]
24 fn construct(duration: i64, elapsed: i64, phe: bool, monotonic_fn: fn() -> i64) -> Self {
25 AtomicTimer {
26 duration: AtomicI64::new(duration),
27 start: AtomicI64::new(monotonic_fn() - elapsed),
28 monotonic_fn,
29 permit_handle_expiration: AtomicBool::new(phe),
30 }
31 }
32 pub fn new(duration: Duration) -> Self {
38 Self::construct(
39 duration
40 .as_nanos()
41 .try_into()
42 .expect("Duration is too large"),
43 0,
44 true,
45 monotonic_ns,
46 )
47 }
48 #[inline]
54 pub fn duration(&self) -> Duration {
55 Duration::from_nanos(self.duration.load(Ordering::SeqCst).try_into().unwrap())
56 }
57 #[inline]
62 pub fn permit_handle_expiration(&self) -> bool {
63 self.permit_handle_expiration
64 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
65 (v && self.expired()).then_some(false)
66 })
67 .is_ok()
68 }
69 pub fn set_duration(&self, duration: Duration) {
75 self.duration
76 .store(duration.as_nanos().try_into().unwrap(), Ordering::SeqCst);
77 }
78 #[inline]
80 pub fn reset(&self) {
81 self.permit_handle_expiration.store(true, Ordering::SeqCst);
82 self.start.store((self.monotonic_fn)(), Ordering::SeqCst);
83 }
84 #[inline]
86 pub fn expire_now(&self) {
87 self.start.store(
88 (self.monotonic_fn)() - self.duration.load(Ordering::SeqCst),
89 Ordering::SeqCst,
90 );
91 }
92 #[inline]
95 pub fn reset_if_expired(&self) -> bool {
96 let now = (self.monotonic_fn)();
97 self.start
98 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |start| {
99 self.permit_handle_expiration.store(true, Ordering::SeqCst);
100 (now.saturating_sub(start) >= self.duration.load(Ordering::SeqCst)).then_some(now)
101 })
102 .is_ok()
103 }
104 #[inline]
108 pub fn elapsed(&self) -> Duration {
109 Duration::from_nanos(
110 (self.monotonic_fn)()
111 .saturating_sub(self.start.load(Ordering::SeqCst))
112 .try_into()
113 .unwrap_or_default(),
114 )
115 }
116 #[inline]
120 pub fn remaining(&self) -> Duration {
121 let elapsed = self.elapsed_ns();
122 if elapsed >= self.duration.load(Ordering::SeqCst) {
123 Duration::ZERO
124 } else {
125 Duration::from_nanos(
126 (self.duration.load(Ordering::SeqCst) - elapsed)
127 .try_into()
128 .unwrap_or_default(),
129 )
130 }
131 }
132 #[inline]
133 fn elapsed_ns(&self) -> i64 {
134 (self.monotonic_fn)().saturating_sub(self.start.load(Ordering::SeqCst))
135 }
136 #[inline]
138 pub fn expired(&self) -> bool {
139 self.elapsed_ns() >= self.duration.load(Ordering::SeqCst)
140 }
141}
142
143#[cfg(feature = "serde")]
144mod ser {
145 use super::{monotonic_ns, AtomicTimer};
146 use serde::{Deserialize, Deserializer, Serialize, Serializer};
147 use std::sync::atomic::Ordering;
148
149 #[derive(Serialize, Deserialize)]
150 struct SerializedTimer {
151 duration: i64,
152 elapsed: i64,
153 phe: bool,
154 }
155
156 impl Serialize for AtomicTimer {
157 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
158 where
159 S: Serializer,
160 {
161 let s = SerializedTimer {
162 duration: self.duration.load(Ordering::SeqCst),
163 elapsed: self.elapsed_ns(),
164 phe: self.permit_handle_expiration.load(Ordering::SeqCst),
165 };
166 s.serialize(serializer)
167 }
168 }
169
170 impl<'de> Deserialize<'de> for AtomicTimer {
171 fn deserialize<D>(deserializer: D) -> Result<AtomicTimer, D::Error>
172 where
173 D: Deserializer<'de>,
174 {
175 let s = SerializedTimer::deserialize(deserializer)?;
176 Ok(AtomicTimer::construct(
177 s.duration,
178 s.elapsed,
179 s.phe,
180 monotonic_ns,
181 ))
182 }
183 }
184}
185
186#[cfg(test)]
187mod test {
188 use super::AtomicTimer;
189 use std::{
190 sync::{Arc, Barrier},
191 thread,
192 time::Duration,
193 };
194
195 pub(crate) fn in_time_window(a: Duration, b: Duration, window: Duration) -> bool {
196 let diff = window / 2;
197 let min = b - diff;
198 let max = b + diff;
199 a >= min && a <= max
200 }
201
202 #[test]
203 fn test_reset() {
204 let timer = AtomicTimer::new(Duration::from_secs(5));
205 thread::sleep(Duration::from_secs(1));
206 timer.reset();
207 assert!(timer.elapsed() < Duration::from_millis(100));
208 }
209
210 #[test]
211 fn test_expire_now() {
212 let timer = AtomicTimer::new(Duration::from_secs(5));
213 assert!(!timer.expired());
214 assert!(in_time_window(
215 timer.remaining(),
216 Duration::from_secs(5),
217 Duration::from_millis(100)
218 ));
219 timer.expire_now();
220 assert!(timer.expired());
221 }
222
223 #[test]
224 fn test_reset_if_expired() {
225 let timer = AtomicTimer::new(Duration::from_secs(1));
226 assert!(!timer.reset_if_expired());
227 thread::sleep(Duration::from_millis(1100));
228 assert!(timer.expired());
229 assert!(timer.reset_if_expired());
230 }
231
232 #[test]
233 fn test_reset_if_expired_no_datarace() {
234 let n = 1000;
235 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
236 thread::sleep(Duration::from_millis(200));
237 assert!(timer.expired());
238 let barrier = Arc::new(Barrier::new(n));
239 let (tx, rx) = std::sync::mpsc::channel::<bool>();
240 let mut result = Vec::with_capacity(n);
241 for _ in 0..n {
242 let timer = timer.clone();
243 let barrier = barrier.clone();
244 let tx = tx.clone();
245 thread::spawn(move || {
246 barrier.wait();
247 tx.send(timer.reset_if_expired()).unwrap();
248 });
249 }
250 drop(tx);
251 while let Ok(v) = rx.recv() {
252 result.push(v);
253 }
254 assert_eq!(result.len(), n);
255 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
256 }
257
258 #[test]
259 fn test_permit_handle_expiration() {
260 let timer = AtomicTimer::new(Duration::from_secs(1));
261 assert!(!timer.permit_handle_expiration());
262 thread::sleep(Duration::from_millis(1100));
263 assert!(timer.expired());
264 assert!(timer.permit_handle_expiration());
265 assert!(!timer.permit_handle_expiration());
266 timer.reset();
267 thread::sleep(Duration::from_millis(1100));
268 timer.reset();
269 assert!(!timer.permit_handle_expiration());
270 }
271
272 #[test]
273 fn test_permit_handle_expiration_no_datarace() {
274 let n = 1000;
275 let timer = Arc::new(AtomicTimer::new(Duration::from_millis(100)));
276 thread::sleep(Duration::from_millis(200));
277 assert!(timer.expired());
278 let barrier = Arc::new(Barrier::new(n));
279 let (tx, rx) = std::sync::mpsc::channel::<bool>();
280 let mut result = Vec::with_capacity(n);
281 for _ in 0..n {
282 let timer = timer.clone();
283 let barrier = barrier.clone();
284 let tx = tx.clone();
285 thread::spawn(move || {
286 barrier.wait();
287 tx.send(timer.permit_handle_expiration()).unwrap();
288 });
289 }
290 drop(tx);
291 while let Ok(v) = rx.recv() {
292 result.push(v);
293 }
294 assert_eq!(result.len(), n);
295 assert_eq!(result.into_iter().filter(|&v| v).count(), 1);
296 }
297}
298
299#[cfg(feature = "serde")]
300#[cfg(test)]
301mod test_serialization {
302 use super::test::in_time_window;
303 use super::AtomicTimer;
304 use std::{sync::atomic::Ordering, thread, time::Duration};
305
306 #[test]
307 fn test_serialize_deserialize() {
308 let timer = AtomicTimer::new(Duration::from_secs(5));
309 thread::sleep(Duration::from_secs(1));
310 let serialized = serde_json::to_string(&timer).unwrap();
311 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
312 assert!(in_time_window(
313 deserialized.elapsed(),
314 Duration::from_secs(1),
315 Duration::from_millis(100)
316 ));
317 }
318
319 #[test]
320 fn test_serialize_deserialize_monotonic_goes_forward() {
321 fn monotonic_ns_forwarded() -> i64 {
322 super::monotonic_ns() + 10_000 * 1_000_000_000
323 }
324 let timer = AtomicTimer::new(Duration::from_secs(5));
325 thread::sleep(Duration::from_secs(1));
326 let serialized = serde_json::to_string(&timer).unwrap();
327 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
328 let deserialized_rewinded = AtomicTimer::construct(
329 deserialized.duration().as_nanos().try_into().unwrap(),
330 deserialized.elapsed_ns(),
331 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
332 monotonic_ns_forwarded,
333 );
334 assert!(in_time_window(
335 deserialized_rewinded.elapsed(),
336 Duration::from_secs(1),
337 Duration::from_millis(100)
338 ));
339 }
340
341 #[test]
342 fn test_serialize_deserialize_monotonic_goes_backward() {
343 fn monotonic_ns_forwarded() -> i64 {
344 super::monotonic_ns() - 10_000 * 1_000_000_000
345 }
346 let timer = AtomicTimer::new(Duration::from_secs(5));
347 thread::sleep(Duration::from_secs(1));
348 let serialized = serde_json::to_string(&timer).unwrap();
349 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
350 let deserialized_rewinded = AtomicTimer::construct(
351 deserialized.duration().as_nanos().try_into().unwrap(),
352 deserialized.elapsed_ns(),
353 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
354 monotonic_ns_forwarded,
355 );
356 assert!(in_time_window(
357 deserialized_rewinded.elapsed(),
358 Duration::from_secs(1),
359 Duration::from_millis(100)
360 ));
361 }
362
363 #[test]
364 fn test_serialize_deserialize_monotonic_goes_zero() {
365 fn monotonic_ns_forwarded() -> i64 {
366 0
367 }
368 let timer = AtomicTimer::new(Duration::from_secs(5));
369 thread::sleep(Duration::from_secs(1));
370 let serialized = serde_json::to_string(&timer).unwrap();
371 let deserialized: AtomicTimer = serde_json::from_str(&serialized).unwrap();
372 let deserialized_rewinded = AtomicTimer::construct(
373 deserialized.duration().as_nanos().try_into().unwrap(),
374 deserialized.elapsed_ns(),
375 deserialized.permit_handle_expiration.load(Ordering::SeqCst),
376 monotonic_ns_forwarded,
377 );
378 assert!(in_time_window(
379 deserialized_rewinded.elapsed(),
380 Duration::from_secs(1),
381 Duration::from_millis(100)
382 ));
383 }
384}