atomic_time/
duration.rs

1use crate::AtomicU128;
2use core::{sync::atomic::Ordering, time::Duration};
3
4/// Atomic version of [`Duration`]
5#[repr(transparent)]
6pub struct AtomicDuration(AtomicU128);
7impl core::fmt::Debug for AtomicDuration {
8  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9    f.debug_tuple("AtomicDuration")
10      .field(&self.load(Ordering::SeqCst))
11      .finish()
12  }
13}
14impl AtomicDuration {
15  /// Creates a new `AtomicDuration` with the given value.
16  pub const fn new(duration: Duration) -> Self {
17    Self(AtomicU128::new(encode_duration(duration)))
18  }
19  /// Loads [`Duration`](Duration) from `AtomicDuration`.
20  ///
21  /// load takes an [`Ordering`] argument which describes the memory ordering of this operation.
22  ///
23  /// # Panics
24  /// Panics if order is [`Release`](Ordering::Release) or [`AcqRel`](Ordering::AcqRel).
25  pub fn load(&self, ordering: Ordering) -> Duration {
26    decode_duration(self.0.load(ordering))
27  }
28  /// Stores a value into the `AtomicDuration`.
29  ///
30  /// `store` takes an [`Ordering`] argument which describes the memory ordering
31  /// of this operation.
32  ///
33  /// # Panics
34  ///
35  /// Panics if `order` is [`Acquire`](Ordering::Acquire) or [`AcqRel`](Ordering::AcqRel).
36  pub fn store(&self, val: Duration, ordering: Ordering) {
37    self.0.store(encode_duration(val), ordering)
38  }
39  /// Stores a value into the `AtomicDuration`, returning the old value.
40  ///
41  /// `swap` takes an [`Ordering`] argument which describes the memory ordering
42  /// of this operation.
43  pub fn swap(&self, val: Duration, ordering: Ordering) -> Duration {
44    decode_duration(self.0.swap(encode_duration(val), ordering))
45  }
46  /// Stores a value into the `AtomicDuration` if the current value is the same as the
47  /// `current` value.
48  ///
49  /// Unlike [`compare_exchange`], this function is allowed to spuriously fail
50  /// even when the comparison succeeds, which can result in more efficient
51  /// code on some platforms. The return value is a result indicating whether
52  /// the new value was written and containing the previous value.
53  ///
54  /// `compare_exchange` takes two [`Ordering`] arguments to describe the memory
55  /// ordering of this operation. The first describes the required ordering if
56  /// the operation succeeds while the second describes the required ordering
57  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
58  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
59  /// success ordering.
60  ///
61  /// [`compare_exchange`]: #method.compare_exchange
62  pub fn compare_exchange_weak(
63    &self,
64    current: Duration,
65    new: Duration,
66    success: Ordering,
67    failure: Ordering,
68  ) -> Result<Duration, Duration> {
69    self
70      .0
71      .compare_exchange_weak(
72        encode_duration(current),
73        encode_duration(new),
74        success,
75        failure,
76      )
77      .map(decode_duration)
78      .map_err(decode_duration)
79  }
80  /// Stores a value into the `AtomicDuration` if the current value is the same as the
81  /// `current` value.
82  ///
83  /// The return value is a result indicating whether the new value was
84  /// written and containing the previous value. On success this value is
85  /// guaranteed to be equal to `new`.
86  ///
87  /// [`compare_exchange`] takes two [`Ordering`] arguments to describe the memory
88  /// ordering of this operation. The first describes the required ordering if
89  /// the operation succeeds while the second describes the required ordering
90  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
91  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
92  ///
93  /// [`compare_exchange`]: #method.compare_exchange
94  pub fn compare_exchange(
95    &self,
96    current: Duration,
97    new: Duration,
98    success: Ordering,
99    failure: Ordering,
100  ) -> Result<Duration, Duration> {
101    self
102      .0
103      .compare_exchange(
104        encode_duration(current),
105        encode_duration(new),
106        success,
107        failure,
108      )
109      .map(decode_duration)
110      .map_err(decode_duration)
111  }
112  /// Fetches the value, and applies a function to it that returns an optional
113  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
114  /// `Err(previous_value)`.
115  ///
116  /// Note: This may call the function multiple times if the value has been changed from other threads in
117  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
118  /// only once to the stored value.
119  ///
120  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
121  /// The first describes the required ordering for when the operation finally succeeds while the second
122  /// describes the required ordering for loads. These correspond to the success and failure orderings of
123  /// [`compare_exchange`] respectively.
124  ///
125  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
126  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
127  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
128  /// and must be equivalent to or weaker than the success ordering.
129  ///
130  /// [`compare_exchange`]: #method.compare_exchange
131  ///
132  /// # Examples
133  ///
134  /// ```rust
135  /// use atomic_time::AtomicDuration;
136  /// use std::{time::Duration, sync::atomic::Ordering};
137  ///
138  /// let x = AtomicDuration::new(Duration::from_secs(7));
139  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(Duration::from_secs(7)));
140  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(7)));
141  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(Duration::from_secs(8)));
142  /// assert_eq!(x.load(Ordering::SeqCst), Duration::from_secs(9));
143  /// ```
144  pub fn fetch_update<F>(
145    &self,
146    set_order: Ordering,
147    fetch_order: Ordering,
148    mut f: F,
149  ) -> Result<Duration, Duration>
150  where
151    F: FnMut(Duration) -> Option<Duration>,
152  {
153    self
154      .0
155      .fetch_update(set_order, fetch_order, |d| {
156        f(decode_duration(d)).map(encode_duration)
157      })
158      .map(decode_duration)
159      .map_err(decode_duration)
160  }
161  /// Consumes the atomic and returns the contained value.
162  ///
163  /// This is safe because passing `self` by value guarantees that no other threads are
164  /// concurrently accessing the atomic data.
165  #[inline]
166  pub fn into_inner(self) -> Duration {
167    decode_duration(self.0.into_inner())
168  }
169
170  /// Returns `true` if operations on values of this type are lock-free.
171  /// If the compiler or the platform doesn't support the necessary
172  /// atomic instructions, global locks for every potentially
173  /// concurrent atomic operation will be used.
174  ///
175  /// # Examples
176  /// ```
177  /// use atomic_time::AtomicDuration;
178  ///
179  /// let is_lock_free = AtomicDuration::is_lock_free();
180  /// ```
181  #[inline]
182  pub fn is_lock_free() -> bool {
183    AtomicU128::is_lock_free()
184  }
185}
186
187/// Encodes a [`Duration`] into a [`u128`].
188pub const fn encode_duration(duration: Duration) -> u128 {
189  let seconds = duration.as_secs() as u128;
190  let nanos = duration.subsec_nanos() as u128;
191  (seconds << 32) + nanos
192}
193
194/// Decodes a [`u128`] from a [`Duration`].
195pub const fn decode_duration(encoded: u128) -> Duration {
196  let seconds = (encoded >> 32) as u64;
197  let nanos = (encoded & 0xFFFFFFFF) as u32;
198  Duration::new(seconds, nanos)
199}
200
201#[cfg(feature = "serde")]
202const _: () = {
203  use serde::{Deserialize, Serialize};
204
205  impl Serialize for AtomicDuration {
206    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
207      self.load(Ordering::SeqCst).serialize(serializer)
208    }
209  }
210
211  impl<'de> Deserialize<'de> for AtomicDuration {
212    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
213      Ok(Self::new(Duration::deserialize(deserializer)?))
214    }
215  }
216};
217
218#[cfg(test)]
219mod tests {
220  use super::*;
221
222  #[test]
223  fn test_atomic_duration_new() {
224    let duration = Duration::from_secs(5);
225    let atomic_duration = AtomicDuration::new(duration);
226    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
227  }
228
229  #[test]
230  fn test_atomic_duration_load() {
231    let duration = Duration::new(10, 10);
232    let atomic_duration = AtomicDuration::new(duration);
233    assert_eq!(atomic_duration.load(Ordering::SeqCst), duration);
234  }
235
236  #[test]
237  fn test_atomic_duration_store() {
238    let initial_duration = Duration::from_secs(3);
239    let new_duration = Duration::from_secs(7);
240    let atomic_duration = AtomicDuration::new(initial_duration);
241    atomic_duration.store(new_duration, Ordering::SeqCst);
242    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
243  }
244
245  #[test]
246  fn test_atomic_duration_swap() {
247    let initial_duration = Duration::from_secs(2);
248    let new_duration = Duration::from_secs(8);
249    let atomic_duration = AtomicDuration::new(initial_duration);
250    let prev_duration = atomic_duration.swap(new_duration, Ordering::SeqCst);
251    assert_eq!(prev_duration, initial_duration);
252    assert_eq!(atomic_duration.load(Ordering::SeqCst), new_duration);
253  }
254
255  #[test]
256  fn test_atomic_duration_compare_exchange_weak() {
257    let initial_duration = Duration::from_secs(4);
258    let atomic_duration = AtomicDuration::new(initial_duration);
259
260    // Successful exchange
261    let mut result;
262    loop {
263      result = atomic_duration.compare_exchange_weak(
264        initial_duration,
265        Duration::from_secs(6),
266        Ordering::SeqCst,
267        Ordering::SeqCst,
268      );
269
270      if result.is_ok() || result.unwrap_err() != initial_duration {
271        // Break if successfully updated or if the current value is no longer initial_duration.
272        break;
273      }
274    }
275
276    assert!(result.is_ok());
277    assert_eq!(result.unwrap(), initial_duration);
278    assert_eq!(
279      atomic_duration.load(Ordering::SeqCst),
280      Duration::from_secs(6)
281    );
282
283    // Failed exchange
284    // Here, we expect this to fail as the current value is no longer `initial_duration`.
285    let result = atomic_duration.compare_exchange_weak(
286      initial_duration,
287      Duration::from_secs(7),
288      Ordering::SeqCst,
289      Ordering::SeqCst,
290    );
291
292    assert!(result.is_err());
293    // The error should be the current value, which is now Duration::from_secs(6).
294    assert_eq!(result.unwrap_err(), Duration::from_secs(6));
295  }
296
297  #[test]
298  fn test_atomic_duration_compare_exchange() {
299    let initial_duration = Duration::from_secs(1);
300    let atomic_duration = AtomicDuration::new(initial_duration);
301
302    // Successful exchange
303    let result = atomic_duration.compare_exchange(
304      initial_duration,
305      Duration::from_secs(5),
306      Ordering::SeqCst,
307      Ordering::SeqCst,
308    );
309    assert!(result.is_ok());
310    assert_eq!(result.unwrap(), initial_duration);
311    assert_eq!(
312      atomic_duration.load(Ordering::SeqCst),
313      Duration::from_secs(5)
314    );
315
316    // Failed exchange
317    let result = atomic_duration.compare_exchange(
318      initial_duration,
319      Duration::from_secs(6),
320      Ordering::SeqCst,
321      Ordering::SeqCst,
322    );
323    assert!(result.is_err());
324    assert_eq!(result.unwrap_err(), Duration::from_secs(5));
325  }
326
327  #[test]
328  fn test_atomic_duration_fetch_update() {
329    let initial_duration = Duration::from_secs(4);
330    let atomic_duration = AtomicDuration::new(initial_duration);
331
332    let result = atomic_duration.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |d| {
333      Some(d + Duration::from_secs(2))
334    });
335    assert_eq!(result, Ok(initial_duration));
336    assert_eq!(
337      atomic_duration.load(Ordering::SeqCst),
338      Duration::from_secs(6)
339    );
340  }
341
342  #[test]
343  fn test_atomic_duration_into_inner() {
344    let duration = Duration::from_secs(3);
345    let atomic_duration = AtomicDuration::new(duration);
346    assert_eq!(atomic_duration.into_inner(), duration);
347  }
348
349  #[test]
350  #[cfg(feature = "std")]
351  fn test_atomic_duration_thread_safety() {
352    use std::sync::Arc;
353    use std::thread;
354
355    let atomic_duration = Arc::new(AtomicDuration::new(Duration::from_secs(0)));
356    let mut handles = vec![];
357
358    // Spawn multiple threads to increment the duration
359    for _ in 0..10 {
360      let atomic_clone = Arc::clone(&atomic_duration);
361      let handle = thread::spawn(move || {
362        for _ in 0..100 {
363          loop {
364            let current = atomic_clone.load(Ordering::SeqCst);
365            let new = current + Duration::from_millis(1);
366            match atomic_clone.compare_exchange_weak(
367              current,
368              new,
369              Ordering::SeqCst,
370              Ordering::SeqCst,
371            ) {
372              Ok(_) => break,     // Successfully updated
373              Err(_) => continue, // Spurious failure, try again
374            }
375          }
376        }
377      });
378      handles.push(handle);
379    }
380
381    // Wait for all threads to complete
382    for handle in handles {
383      handle.join().unwrap();
384    }
385
386    // Verify the final value
387    let expected_duration = Duration::from_millis(10 * 100);
388    assert_eq!(atomic_duration.load(Ordering::SeqCst), expected_duration);
389  }
390
391  #[cfg(feature = "serde")]
392  #[test]
393  fn test_atomic_duration_serde() {
394    use serde::{Deserialize, Serialize};
395
396    #[derive(Serialize, Deserialize)]
397    struct Test {
398      duration: AtomicDuration,
399    }
400
401    let test = Test {
402      duration: AtomicDuration::new(Duration::from_secs(5)),
403    };
404    let serialized = serde_json::to_string(&test).unwrap();
405    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
406    assert_eq!(
407      deserialized.duration.load(Ordering::SeqCst),
408      Duration::from_secs(5)
409    );
410  }
411}