Skip to main content

atomic_time/
duration.rs

1use crate::AtomicU128;
2use core::{sync::atomic::Ordering, time::Duration};
3
4/// Atomic version of [`Duration`]
5#[repr(transparent)]
6pub struct AtomicDuration(AtomicU128);
7impl core::fmt::Debug for AtomicDuration {
8  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9    f.debug_tuple("AtomicDuration")
10      .field(&self.load(Ordering::SeqCst))
11      .finish()
12  }
13}
14impl Default for AtomicDuration {
15  /// Creates an `AtomicDuration` initialized to [`Duration::ZERO`].
16  #[inline]
17  fn default() -> Self {
18    Self::new(Duration::ZERO)
19  }
20}
21impl From<Duration> for AtomicDuration {
22  #[inline]
23  fn from(duration: Duration) -> Self {
24    Self::new(duration)
25  }
26}
27impl AtomicDuration {
28  /// Creates a new `AtomicDuration` with the given value.
29  pub const fn new(duration: Duration) -> Self {
30    Self(AtomicU128::new(encode_duration(duration)))
31  }
32  /// Loads [`Duration`](Duration) from `AtomicDuration`.
33  ///
34  /// load takes an [`Ordering`] argument which describes the memory ordering of this operation.
35  ///
36  /// # Panics
37  /// Panics if order is [`Release`](Ordering::Release) or [`AcqRel`](Ordering::AcqRel).
38  pub fn load(&self, ordering: Ordering) -> Duration {
39    decode_duration(self.0.load(ordering))
40  }
41  /// Stores a value into the `AtomicDuration`.
42  ///
43  /// `store` takes an [`Ordering`] argument which describes the memory ordering
44  /// of this operation.
45  ///
46  /// # Panics
47  ///
48  /// Panics if `order` is [`Acquire`](Ordering::Acquire) or [`AcqRel`](Ordering::AcqRel).
49  pub fn store(&self, val: Duration, ordering: Ordering) {
50    self.0.store(encode_duration(val), ordering)
51  }
52  /// Stores a value into the `AtomicDuration`, returning the old value.
53  ///
54  /// `swap` takes an [`Ordering`] argument which describes the memory ordering
55  /// of this operation.
56  pub fn swap(&self, val: Duration, ordering: Ordering) -> Duration {
57    decode_duration(self.0.swap(encode_duration(val), ordering))
58  }
59  /// Stores a value into the `AtomicDuration` if the current value is the same as the
60  /// `current` value.
61  ///
62  /// Unlike [`compare_exchange`], this function is allowed to spuriously fail
63  /// even when the comparison succeeds, which can result in more efficient
64  /// code on some platforms. The return value is a result indicating whether
65  /// the new value was written and containing the previous value.
66  ///
67  /// `compare_exchange` takes two [`Ordering`] arguments to describe the memory
68  /// ordering of this operation. The first describes the required ordering if
69  /// the operation succeeds while the second describes the required ordering
70  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
71  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
72  /// success ordering.
73  ///
74  /// [`compare_exchange`]: #method.compare_exchange
75  pub fn compare_exchange_weak(
76    &self,
77    current: Duration,
78    new: Duration,
79    success: Ordering,
80    failure: Ordering,
81  ) -> Result<Duration, Duration> {
82    self
83      .0
84      .compare_exchange_weak(
85        encode_duration(current),
86        encode_duration(new),
87        success,
88        failure,
89      )
90      .map(decode_duration)
91      .map_err(decode_duration)
92  }
93  /// Stores a value into the `AtomicDuration` if the current value is the same as the
94  /// `current` value.
95  ///
96  /// The return value is a result indicating whether the new value was
97  /// written and containing the previous value. On success this value is
98  /// guaranteed to be equal to `new`.
99  ///
100  /// [`compare_exchange`] takes two [`Ordering`] arguments to describe the memory
101  /// ordering of this operation. The first describes the required ordering if
102  /// the operation succeeds while the second describes the required ordering
103  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
104  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
105  ///
106  /// [`compare_exchange`]: #method.compare_exchange
107  pub fn compare_exchange(
108    &self,
109    current: Duration,
110    new: Duration,
111    success: Ordering,
112    failure: Ordering,
113  ) -> Result<Duration, Duration> {
114    self
115      .0
116      .compare_exchange(
117        encode_duration(current),
118        encode_duration(new),
119        success,
120        failure,
121      )
122      .map(decode_duration)
123      .map_err(decode_duration)
124  }
125  /// Fetches the value, and applies a function to it that returns an optional
126  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
127  /// `Err(previous_value)`.
128  ///
129  /// Note: This may call the function multiple times if the value has been changed from other threads in
130  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
131  /// only once to the stored value.
132  ///
133  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
134  /// The first describes the required ordering for when the operation finally succeeds while the second
135  /// describes the required ordering for loads. These correspond to the success and failure orderings of
136  /// [`compare_exchange`] respectively.
137  ///
138  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
139  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
140  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
141  /// and must be equivalent to or weaker than the success ordering.
142  ///
143  /// [`compare_exchange`]: #method.compare_exchange
144  ///
145  /// # Examples
146  ///
147  /// ```rust
148  /// use atomic_time::AtomicDuration;
149  /// use std::{time::Duration, sync::atomic::Ordering};
150  ///
151  /// let x = AtomicDuration::new(Duration::from_secs(7));
152  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(Duration::from_secs(7)));
153  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(7)));
154  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(8)));
155  /// assert_eq!(x.load(Ordering::SeqCst), Duration::from_secs(9));
156  /// ```
157  pub fn fetch_update<F>(
158    &self,
159    set_order: Ordering,
160    fetch_order: Ordering,
161    mut f: F,
162  ) -> Result<Duration, Duration>
163  where
164    F: FnMut(Duration) -> Option<Duration>,
165  {
166    self
167      .0
168      .fetch_update(set_order, fetch_order, |d| {
169        f(decode_duration(d)).map(encode_duration)
170      })
171      .map(decode_duration)
172      .map_err(decode_duration)
173  }
174  /// Consumes the atomic and returns the contained value.
175  ///
176  /// This is safe because passing `self` by value guarantees that no other threads are
177  /// concurrently accessing the atomic data.
178  #[inline]
179  pub fn into_inner(self) -> Duration {
180    decode_duration(self.0.into_inner())
181  }
182
183  /// Returns `true` if operations on values of this type are lock-free.
184  /// If the compiler or the platform doesn't support the necessary
185  /// atomic instructions, global locks for every potentially
186  /// concurrent atomic operation will be used.
187  ///
188  /// # Examples
189  /// ```
190  /// use atomic_time::AtomicDuration;
191  ///
192  /// let is_lock_free = AtomicDuration::is_lock_free();
193  /// ```
194  #[inline]
195  pub fn is_lock_free() -> bool {
196    AtomicU128::is_lock_free()
197  }
198}
199
200/// Encodes a [`Duration`] into a [`u128`].
201pub const fn encode_duration(duration: Duration) -> u128 {
202  let seconds = duration.as_secs() as u128;
203  let nanos = duration.subsec_nanos() as u128;
204  (seconds << 32) + nanos
205}
206
207/// Decodes a [`u128`] from a [`Duration`].
208pub const fn decode_duration(encoded: u128) -> Duration {
209  let seconds = (encoded >> 32) as u64;
210  let nanos = (encoded & 0xFFFFFFFF) as u32;
211  Duration::new(seconds, nanos)
212}
213
214#[cfg(feature = "serde")]
215const _: () = {
216  use serde::{Deserialize, Serialize};
217
218  impl Serialize for AtomicDuration {
219    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
220      self.load(Ordering::SeqCst).serialize(serializer)
221    }
222  }
223
224  impl<'de> Deserialize<'de> for AtomicDuration {
225    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
226      Ok(Self::new(Duration::deserialize(deserializer)?))
227    }
228  }
229};
230
231#[cfg(test)]
232mod tests {
233  use super::*;
234
235  #[test]
236  fn test_atomic_duration_new() {
237    let duration = Duration::from_secs(5);
238    let atomic_duration = AtomicDuration::new(duration);
239    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
240  }
241
242  #[test]
243  fn test_atomic_duration_load() {
244    let duration = Duration::new(10, 10);
245    let atomic_duration = AtomicDuration::new(duration);
246    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
247  }
248
249  #[test]
250  fn test_atomic_duration_store() {
251    let initial_duration = Duration::from_secs(3);
252    let new_duration = Duration::from_secs(7);
253    let atomic_duration = AtomicDuration::new(initial_duration);
254    atomic_duration.store(new_duration, Ordering::SeqCst);
255    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
256  }
257
258  #[test]
259  fn test_atomic_duration_swap() {
260    let initial_duration = Duration::from_secs(2);
261    let new_duration = Duration::from_secs(8);
262    let atomic_duration = AtomicDuration::new(initial_duration);
263    let prev_duration = atomic_duration.swap(new_duration, Ordering::SeqCst);
264    assert_eq!(prev_duration, initial_duration);
265    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
266  }
267
268  #[test]
269  fn test_atomic_duration_compare_exchange_weak() {
270    let initial_duration = Duration::from_secs(4);
271    let atomic_duration = AtomicDuration::new(initial_duration);
272
273    // Successful exchange
274    let mut result;
275    loop {
276      result = atomic_duration.compare_exchange_weak(
277        initial_duration,
278        Duration::from_secs(6),
279        Ordering::SeqCst,
280        Ordering::SeqCst,
281      );
282
283      if result.is_ok() || result.unwrap_err() != initial_duration {
284        // Break if successfully updated or if the current value is no longer initial_duration.
285        break;
286      }
287    }
288
289    assert!(result.is_ok());
290    assert_eq!(result.unwrap(), initial_duration);
291    assert_eq!(
292      atomic_duration.load(Ordering::SeqCst),
293      Duration::from_secs(6)
294    );
295
296    // Failed exchange
297    // Here, we expect this to fail as the current value is no longer `initial_duration`.
298    let result = atomic_duration.compare_exchange_weak(
299      initial_duration,
300      Duration::from_secs(7),
301      Ordering::SeqCst,
302      Ordering::SeqCst,
303    );
304
305    assert!(result.is_err());
306    // The error should be the current value, which is now Duration::from_secs(6).
307    assert_eq!(result.unwrap_err(), Duration::from_secs(6));
308  }
309
310  #[test]
311  fn test_atomic_duration_compare_exchange() {
312    let initial_duration = Duration::from_secs(1);
313    let atomic_duration = AtomicDuration::new(initial_duration);
314
315    // Successful exchange
316    let result = atomic_duration.compare_exchange(
317      initial_duration,
318      Duration::from_secs(5),
319      Ordering::SeqCst,
320      Ordering::SeqCst,
321    );
322    assert!(result.is_ok());
323    assert_eq!(result.unwrap(), initial_duration);
324    assert_eq!(
325      atomic_duration.load(Ordering::SeqCst),
326      Duration::from_secs(5)
327    );
328
329    // Failed exchange
330    let result = atomic_duration.compare_exchange(
331      initial_duration,
332      Duration::from_secs(6),
333      Ordering::SeqCst,
334      Ordering::SeqCst,
335    );
336    assert!(result.is_err());
337    assert_eq!(result.unwrap_err(), Duration::from_secs(5));
338  }
339
340  #[test]
341  fn test_atomic_duration_fetch_update() {
342    let initial_duration = Duration::from_secs(4);
343    let atomic_duration = AtomicDuration::new(initial_duration);
344
345    let result = atomic_duration.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |d| {
346      Some(d + Duration::from_secs(2))
347    });
348    assert_eq!(result, Ok(initial_duration));
349    assert_eq!(
350      atomic_duration.load(Ordering::SeqCst),
351      Duration::from_secs(6)
352    );
353  }
354
355  #[test]
356  fn test_atomic_duration_into_inner() {
357    let duration = Duration::from_secs(3);
358    let atomic_duration = AtomicDuration::new(duration);
359    assert_eq!(atomic_duration.into_inner(), duration);
360  }
361
362  #[test]
363  #[cfg(feature = "std")]
364  fn test_atomic_duration_thread_safety() {
365    use std::sync::Arc;
366    use std::thread;
367
368    let atomic_duration = Arc::new(AtomicDuration::new(Duration::from_secs(0)));
369    let mut handles = vec![];
370
371    // Spawn multiple threads to increment the duration
372    for _ in 0..10 {
373      let atomic_clone = Arc::clone(&atomic_duration);
374      let handle = thread::spawn(move || {
375        for _ in 0..100 {
376          loop {
377            let current = atomic_clone.load(Ordering::SeqCst);
378            let new = current + Duration::from_millis(1);
379            match atomic_clone.compare_exchange_weak(
380              current,
381              new,
382              Ordering::SeqCst,
383              Ordering::SeqCst,
384            ) {
385              Ok(_) => break,     // Successfully updated
386              Err(_) => continue, // Spurious failure, try again
387            }
388          }
389        }
390      });
391      handles.push(handle);
392    }
393
394    // Wait for all threads to complete
395    for handle in handles {
396      handle.join().unwrap();
397    }
398
399    // Verify the final value
400    let expected_duration = Duration::from_millis(10 * 100);
401    assert_eq!(atomic_duration.load(Ordering::SeqCst), expected_duration);
402  }
403
404  #[cfg(feature = "serde")]
405  #[test]
406  fn test_atomic_duration_serde() {
407    use serde::{Deserialize, Serialize};
408
409    #[derive(Serialize, Deserialize)]
410    struct Test {
411      duration: AtomicDuration,
412    }
413
414    let test = Test {
415      duration: AtomicDuration::new(Duration::from_secs(5)),
416    };
417    let serialized = serde_json::to_string(&test).unwrap();
418    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
419    assert_eq!(
420      deserialized.duration.load(Ordering::SeqCst),
421      Duration::from_secs(5)
422    );
423  }
424}