Skip to main content

atomic_time/
duration.rs

1use crate::AtomicU128;
2use core::{sync::atomic::Ordering, time::Duration};
3
4/// Atomic version of [`Duration`]
5#[repr(transparent)]
6pub struct AtomicDuration(AtomicU128);
7impl core::fmt::Debug for AtomicDuration {
8  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9    f.debug_tuple("AtomicDuration")
10      .field(&self.load(Ordering::SeqCst))
11      .finish()
12  }
13}
14impl Default for AtomicDuration {
15  /// Creates an `AtomicDuration` initialized to [`Duration::ZERO`].
16  #[cfg_attr(not(tarpaulin), inline(always))]
17  fn default() -> Self {
18    Self::new(Duration::ZERO)
19  }
20}
21impl From<Duration> for AtomicDuration {
22  #[cfg_attr(not(tarpaulin), inline(always))]
23  fn from(duration: Duration) -> Self {
24    Self::new(duration)
25  }
26}
27impl AtomicDuration {
28  /// Creates a new `AtomicDuration` with the given value.
29  #[cfg_attr(not(tarpaulin), inline(always))]
30  pub const fn new(duration: Duration) -> Self {
31    Self(AtomicU128::new(encode_duration(duration)))
32  }
33  /// Loads [`Duration`](Duration) from `AtomicDuration`.
34  ///
35  /// load takes an [`Ordering`] argument which describes the memory ordering of this operation.
36  ///
37  /// # Panics
38  /// Panics if order is [`Release`](Ordering::Release) or [`AcqRel`](Ordering::AcqRel).
39  #[cfg_attr(not(tarpaulin), inline(always))]
40  pub fn load(&self, ordering: Ordering) -> Duration {
41    decode_duration(self.0.load(ordering))
42  }
43  /// Stores a value into the `AtomicDuration`.
44  ///
45  /// `store` takes an [`Ordering`] argument which describes the memory ordering
46  /// of this operation.
47  ///
48  /// # Panics
49  ///
50  /// Panics if `order` is [`Acquire`](Ordering::Acquire) or [`AcqRel`](Ordering::AcqRel).
51  #[cfg_attr(not(tarpaulin), inline(always))]
52  pub fn store(&self, val: Duration, ordering: Ordering) {
53    self.0.store(encode_duration(val), ordering)
54  }
55  /// Stores a value into the `AtomicDuration`, returning the old value.
56  ///
57  /// `swap` takes an [`Ordering`] argument which describes the memory ordering
58  /// of this operation.
59  #[cfg_attr(not(tarpaulin), inline(always))]
60  pub fn swap(&self, val: Duration, ordering: Ordering) -> Duration {
61    decode_duration(self.0.swap(encode_duration(val), ordering))
62  }
63  /// Stores a value into the `AtomicDuration` if the current value is the same as the
64  /// `current` value.
65  ///
66  /// Unlike [`compare_exchange`], this function is allowed to spuriously fail
67  /// even when the comparison succeeds, which can result in more efficient
68  /// code on some platforms. The return value is a result indicating whether
69  /// the new value was written and containing the previous value.
70  ///
71  /// `compare_exchange` takes two [`Ordering`] arguments to describe the memory
72  /// ordering of this operation. The first describes the required ordering if
73  /// the operation succeeds while the second describes the required ordering
74  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
75  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
76  /// success ordering.
77  ///
78  /// [`compare_exchange`]: #method.compare_exchange
79  #[cfg_attr(not(tarpaulin), inline(always))]
80  pub fn compare_exchange_weak(
81    &self,
82    current: Duration,
83    new: Duration,
84    success: Ordering,
85    failure: Ordering,
86  ) -> Result<Duration, Duration> {
87    self
88      .0
89      .compare_exchange_weak(
90        encode_duration(current),
91        encode_duration(new),
92        success,
93        failure,
94      )
95      .map(decode_duration)
96      .map_err(decode_duration)
97  }
98  /// Stores a value into the `AtomicDuration` if the current value is the same as the
99  /// `current` value.
100  ///
101  /// The return value is a result indicating whether the new value was
102  /// written and containing the previous value. On success this value is
103  /// guaranteed to be equal to `new`.
104  ///
105  /// [`compare_exchange`] takes two [`Ordering`] arguments to describe the memory
106  /// ordering of this operation. The first describes the required ordering if
107  /// the operation succeeds while the second describes the required ordering
108  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
109  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
110  ///
111  /// [`compare_exchange`]: #method.compare_exchange
112  #[cfg_attr(not(tarpaulin), inline(always))]
113  pub fn compare_exchange(
114    &self,
115    current: Duration,
116    new: Duration,
117    success: Ordering,
118    failure: Ordering,
119  ) -> Result<Duration, Duration> {
120    self
121      .0
122      .compare_exchange(
123        encode_duration(current),
124        encode_duration(new),
125        success,
126        failure,
127      )
128      .map(decode_duration)
129      .map_err(decode_duration)
130  }
131  /// Fetches the value, and applies a function to it that returns an optional
132  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
133  /// `Err(previous_value)`.
134  ///
135  /// Note: This may call the function multiple times if the value has been changed from other threads in
136  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
137  /// only once to the stored value.
138  ///
139  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
140  /// The first describes the required ordering for when the operation finally succeeds while the second
141  /// describes the required ordering for loads. These correspond to the success and failure orderings of
142  /// [`compare_exchange`] respectively.
143  ///
144  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
145  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
146  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
147  /// and must be equivalent to or weaker than the success ordering.
148  ///
149  /// [`compare_exchange`]: #method.compare_exchange
150  ///
151  /// # Examples
152  ///
153  /// ```rust
154  /// use atomic_time::AtomicDuration;
155  /// use std::{time::Duration, sync::atomic::Ordering};
156  ///
157  /// let x = AtomicDuration::new(Duration::from_secs(7));
158  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(Duration::from_secs(7)));
159  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(7)));
160  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(8)));
161  /// assert_eq!(x.load(Ordering::SeqCst), Duration::from_secs(9));
162  /// ```
163  #[cfg_attr(not(tarpaulin), inline(always))]
164  pub fn fetch_update<F>(
165    &self,
166    set_order: Ordering,
167    fetch_order: Ordering,
168    mut f: F,
169  ) -> Result<Duration, Duration>
170  where
171    F: FnMut(Duration) -> Option<Duration>,
172  {
173    self
174      .0
175      .fetch_update(set_order, fetch_order, |d| {
176        f(decode_duration(d)).map(encode_duration)
177      })
178      .map(decode_duration)
179      .map_err(decode_duration)
180  }
181  /// Consumes the atomic and returns the contained value.
182  ///
183  /// This is safe because passing `self` by value guarantees that no other threads are
184  /// concurrently accessing the atomic data.
185  #[cfg_attr(not(tarpaulin), inline(always))]
186  pub fn into_inner(self) -> Duration {
187    decode_duration(self.0.into_inner())
188  }
189
190  /// Returns `true` if operations on values of this type are lock-free.
191  /// If the compiler or the platform doesn't support the necessary
192  /// atomic instructions, global locks for every potentially
193  /// concurrent atomic operation will be used.
194  ///
195  /// # Examples
196  /// ```
197  /// use atomic_time::AtomicDuration;
198  ///
199  /// let is_lock_free = AtomicDuration::is_lock_free();
200  /// ```
201  #[cfg_attr(not(tarpaulin), inline(always))]
202  pub fn is_lock_free() -> bool {
203    AtomicU128::is_lock_free()
204  }
205}
206
207/// Encodes a [`Duration`] into a [`u128`].
208#[cfg_attr(not(tarpaulin), inline(always))]
209pub const fn encode_duration(duration: Duration) -> u128 {
210  let seconds = duration.as_secs() as u128;
211  let nanos = duration.subsec_nanos() as u128;
212  (seconds << 32) + nanos
213}
214
215/// Decodes a [`u128`] into a [`Duration`].
216///
217/// Accepts non-canonical input without panicking. The encoding
218/// produced by [`encode_duration`] stores 64 bits of seconds in bits
219/// 32..=95 and at most 30 bits of nanoseconds (the valid range
220/// `0..1_000_000_000`) in bits 0..=31. If the decoded bits carry a
221/// nanosecond count of 10⁹ or more — which `encode_duration` never
222/// produces, but which can appear when the encoded value comes from
223/// corrupted storage or untrusted input — the extra whole seconds are
224/// folded into the seconds field; if that push past `u64::MAX`, the
225/// result saturates at [`Duration::MAX`].
226///
227/// This means `decode_duration(u128::MAX)` yields a saturated
228/// `Duration` rather than panicking as the previous implementation
229/// did.
230#[cfg_attr(not(tarpaulin), inline(always))]
231pub const fn decode_duration(encoded: u128) -> Duration {
232  let seconds = (encoded >> 32) as u64;
233  let raw_nanos = (encoded & 0xFFFFFFFF) as u32;
234  // Normalize out-of-range nanoseconds before calling `Duration::new`
235  // — the latter panics if the carry from nanos overflows the
236  // seconds field.
237  let extra_secs = (raw_nanos / 1_000_000_000) as u64;
238  let nanos = raw_nanos % 1_000_000_000;
239  match seconds.checked_add(extra_secs) {
240    Some(secs) => Duration::new(secs, nanos),
241    None => Duration::new(u64::MAX, 999_999_999),
242  }
243}
244
245#[cfg(feature = "serde")]
246const _: () = {
247  use serde::{Deserialize, Serialize};
248
249  impl Serialize for AtomicDuration {
250    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
251      self.load(Ordering::SeqCst).serialize(serializer)
252    }
253  }
254
255  impl<'de> Deserialize<'de> for AtomicDuration {
256    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
257      Ok(Self::new(Duration::deserialize(deserializer)?))
258    }
259  }
260};
261
262#[cfg(test)]
263mod tests {
264  use super::*;
265
266  #[test]
267  fn test_atomic_duration_new() {
268    let duration = Duration::from_secs(5);
269    let atomic_duration = AtomicDuration::new(duration);
270    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
271  }
272
273  #[test]
274  fn test_atomic_duration_load() {
275    let duration = Duration::new(10, 10);
276    let atomic_duration = AtomicDuration::new(duration);
277    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
278  }
279
280  #[test]
281  fn test_atomic_duration_store() {
282    let initial_duration = Duration::from_secs(3);
283    let new_duration = Duration::from_secs(7);
284    let atomic_duration = AtomicDuration::new(initial_duration);
285    atomic_duration.store(new_duration, Ordering::SeqCst);
286    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
287  }
288
289  #[test]
290  fn test_atomic_duration_swap() {
291    let initial_duration = Duration::from_secs(2);
292    let new_duration = Duration::from_secs(8);
293    let atomic_duration = AtomicDuration::new(initial_duration);
294    let prev_duration = atomic_duration.swap(new_duration, Ordering::SeqCst);
295    assert_eq!(prev_duration, initial_duration);
296    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
297  }
298
299  #[test]
300  fn test_atomic_duration_compare_exchange_weak() {
301    let initial_duration = Duration::from_secs(4);
302    let atomic_duration = AtomicDuration::new(initial_duration);
303
304    // Successful exchange
305    let mut result;
306    loop {
307      result = atomic_duration.compare_exchange_weak(
308        initial_duration,
309        Duration::from_secs(6),
310        Ordering::SeqCst,
311        Ordering::SeqCst,
312      );
313
314      if result.is_ok() || result.unwrap_err() != initial_duration {
315        // Break if successfully updated or if the current value is no longer initial_duration.
316        break;
317      }
318    }
319
320    assert!(result.is_ok());
321    assert_eq!(result.unwrap(), initial_duration);
322    assert_eq!(
323      atomic_duration.load(Ordering::SeqCst),
324      Duration::from_secs(6)
325    );
326
327    // Failed exchange
328    // Here, we expect this to fail as the current value is no longer `initial_duration`.
329    let result = atomic_duration.compare_exchange_weak(
330      initial_duration,
331      Duration::from_secs(7),
332      Ordering::SeqCst,
333      Ordering::SeqCst,
334    );
335
336    assert!(result.is_err());
337    // The error should be the current value, which is now Duration::from_secs(6).
338    assert_eq!(result.unwrap_err(), Duration::from_secs(6));
339  }
340
341  #[test]
342  fn test_atomic_duration_compare_exchange() {
343    let initial_duration = Duration::from_secs(1);
344    let atomic_duration = AtomicDuration::new(initial_duration);
345
346    // Successful exchange
347    let result = atomic_duration.compare_exchange(
348      initial_duration,
349      Duration::from_secs(5),
350      Ordering::SeqCst,
351      Ordering::SeqCst,
352    );
353    assert!(result.is_ok());
354    assert_eq!(result.unwrap(), initial_duration);
355    assert_eq!(
356      atomic_duration.load(Ordering::SeqCst),
357      Duration::from_secs(5)
358    );
359
360    // Failed exchange
361    let result = atomic_duration.compare_exchange(
362      initial_duration,
363      Duration::from_secs(6),
364      Ordering::SeqCst,
365      Ordering::SeqCst,
366    );
367    assert!(result.is_err());
368    assert_eq!(result.unwrap_err(), Duration::from_secs(5));
369  }
370
371  #[test]
372  fn test_atomic_duration_fetch_update() {
373    let initial_duration = Duration::from_secs(4);
374    let atomic_duration = AtomicDuration::new(initial_duration);
375
376    let result = atomic_duration.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |d| {
377      Some(d + Duration::from_secs(2))
378    });
379    assert_eq!(result, Ok(initial_duration));
380    assert_eq!(
381      atomic_duration.load(Ordering::SeqCst),
382      Duration::from_secs(6)
383    );
384  }
385
386  #[test]
387  fn test_atomic_duration_into_inner() {
388    let duration = Duration::from_secs(3);
389    let atomic_duration = AtomicDuration::new(duration);
390    assert_eq!(atomic_duration.into_inner(), duration);
391  }
392
393  #[test]
394  #[cfg(feature = "std")]
395  fn test_atomic_duration_thread_safety() {
396    use std::sync::Arc;
397    use std::thread;
398
399    let atomic_duration = Arc::new(AtomicDuration::new(Duration::from_secs(0)));
400    let mut handles = vec![];
401
402    // Spawn multiple threads to increment the duration
403    for _ in 0..10 {
404      let atomic_clone = Arc::clone(&atomic_duration);
405      let handle = thread::spawn(move || {
406        for _ in 0..100 {
407          loop {
408            let current = atomic_clone.load(Ordering::SeqCst);
409            let new = current + Duration::from_millis(1);
410            match atomic_clone.compare_exchange_weak(
411              current,
412              new,
413              Ordering::SeqCst,
414              Ordering::SeqCst,
415            ) {
416              Ok(_) => break,     // Successfully updated
417              Err(_) => continue, // Spurious failure, try again
418            }
419          }
420        }
421      });
422      handles.push(handle);
423    }
424
425    // Wait for all threads to complete
426    for handle in handles {
427      handle.join().unwrap();
428    }
429
430    // Verify the final value
431    let expected_duration = Duration::from_millis(10 * 100);
432    assert_eq!(atomic_duration.load(Ordering::SeqCst), expected_duration);
433  }
434
435  #[cfg(feature = "std")]
436  #[test]
437  fn test_atomic_duration_debug() {
438    let duration = Duration::new(1, 500_000_000);
439    let atomic_duration = AtomicDuration::new(duration);
440    let debug_str = format!("{:?}", atomic_duration);
441    assert!(debug_str.contains("AtomicDuration"));
442  }
443
444  #[test]
445  fn test_atomic_duration_default() {
446    let atomic_duration = AtomicDuration::default();
447    assert_eq!(atomic_duration.load(Ordering::SeqCst), Duration::ZERO);
448  }
449
450  #[test]
451  fn test_atomic_duration_from() {
452    let duration = Duration::from_secs(42);
453    let atomic_duration = AtomicDuration::from(duration);
454    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
455  }
456
457  #[cfg(feature = "serde")]
458  #[test]
459  fn test_atomic_duration_serde() {
460    use serde::{Deserialize, Serialize};
461
462    #[derive(Serialize, Deserialize)]
463    struct Test {
464      duration: AtomicDuration,
465    }
466
467    let test = Test {
468      duration: AtomicDuration::new(Duration::from_secs(5)),
469    };
470    let serialized = serde_json::to_string(&test).unwrap();
471    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
472    assert_eq!(
473      deserialized.duration.load(Ordering::SeqCst),
474      Duration::from_secs(5)
475    );
476  }
477
478  #[test]
479  fn decode_duration_roundtrip() {
480    let cases = [
481      Duration::ZERO,
482      Duration::from_secs(1),
483      Duration::new(123_456_789, 999_999_999),
484      Duration::new(u64::MAX, 999_999_999),
485    ];
486    for d in cases {
487      assert_eq!(decode_duration(encode_duration(d)), d);
488    }
489  }
490
491  #[test]
492  fn decode_duration_saturates_on_non_canonical_input() {
493    // u128::MAX has nanos = u32::MAX (> 1e9) and seconds = u64::MAX,
494    // so the carry from normalising nanos overflows u64 — the old
495    // implementation panicked here.
496    let max = decode_duration(u128::MAX);
497    assert_eq!(max, Duration::new(u64::MAX, 999_999_999));
498
499    // A value with just out-of-range nanos (but within seconds budget):
500    // encode(0 secs, 2_000_000_000 nanos) — hand-crafted, not produced
501    // by encode_duration.
502    let d = decode_duration(2_000_000_000u128);
503    assert_eq!(d, Duration::new(2, 0));
504  }
505}