Skip to main content

atomic_time/
option_duration.rs

1use crate::AtomicU128;
2use core::{sync::atomic::Ordering, time::Duration};
3
4/// Atomic version of [`Option<Duration>`].
5#[repr(transparent)]
6pub struct AtomicOptionDuration(AtomicU128);
7impl core::fmt::Debug for AtomicOptionDuration {
8  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9    f.debug_tuple("AtomicOptionDuration")
10      .field(&self.load(Ordering::SeqCst))
11      .finish()
12  }
13}
14impl Default for AtomicOptionDuration {
15  /// Creates an `AtomicOptionDuration` initialized to `None`.
16  #[inline]
17  fn default() -> Self {
18    Self::none()
19  }
20}
21impl From<Option<Duration>> for AtomicOptionDuration {
22  #[inline]
23  fn from(duration: Option<Duration>) -> Self {
24    Self::new(duration)
25  }
26}
27impl AtomicOptionDuration {
28  /// Creates a new `AtomicOptionDuration` with `None`.
29  pub const fn none() -> Self {
30    Self(AtomicU128::new(encode_option_duration(None)))
31  }
32
33  /// Creates a new `AtomicOptionDuration` with the given value.
34  pub const fn new(duration: Option<Duration>) -> Self {
35    Self(AtomicU128::new(encode_option_duration(duration)))
36  }
37  /// Loads `Option<Duration>` from `AtomicOptionDuration`.
38  ///
39  /// load takes an [`Ordering`] argument which describes the memory ordering of this operation.
40  ///
41  /// # Panics
42  /// Panics if order is [`Release`](Ordering::Release) or [`AcqRel`](Ordering::AcqRel).
43  pub fn load(&self, ordering: Ordering) -> Option<Duration> {
44    decode_option_duration(self.0.load(ordering))
45  }
46  /// Stores a value into the `AtomicOptionDuration`.
47  ///
48  /// `store` takes an [`Ordering`] argument which describes the memory ordering
49  /// of this operation.
50  ///
51  /// # Panics
52  ///
53  /// Panics if `order` is [`Acquire`](Ordering::Acquire) or [`AcqRel`](Ordering::AcqRel).
54  pub fn store(&self, val: Option<Duration>, ordering: Ordering) {
55    self.0.store(encode_option_duration(val), ordering)
56  }
57  /// Stores a value into the `AtomicOptionDuration`, returning the old value.
58  ///
59  /// `swap` takes an [`Ordering`] argument which describes the memory ordering
60  /// of this operation.
61  pub fn swap(&self, val: Option<Duration>, ordering: Ordering) -> Option<Duration> {
62    decode_option_duration(self.0.swap(encode_option_duration(val), ordering))
63  }
64  /// Stores a value into the `AtomicOptionDuration` if the current value is the same as the
65  /// `current` value.
66  ///
67  /// Unlike [`compare_exchange`], this function is allowed to spuriously fail
68  /// even when the comparison succeeds, which can result in more efficient
69  /// code on some platforms. The return value is a result indicating whether
70  /// the new value was written and containing the previous value.
71  ///
72  /// `compare_exchange` takes two [`Ordering`] arguments to describe the memory
73  /// ordering of this operation. The first describes the required ordering if
74  /// the operation succeeds while the second describes the required ordering
75  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
76  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
77  /// success ordering.
78  ///
79  /// [`compare_exchange`]: #method.compare_exchange
80  pub fn compare_exchange_weak(
81    &self,
82    current: Option<Duration>,
83    new: Option<Duration>,
84    success: Ordering,
85    failure: Ordering,
86  ) -> Result<Option<Duration>, Option<Duration>> {
87    self
88      .0
89      .compare_exchange_weak(
90        encode_option_duration(current),
91        encode_option_duration(new),
92        success,
93        failure,
94      )
95      .map(decode_option_duration)
96      .map_err(decode_option_duration)
97  }
98  /// Stores a value into the `AtomicOptionDuration` if the current value is the same as the
99  /// `current` value.
100  ///
101  /// The return value is a result indicating whether the new value was
102  /// written and containing the previous value. On success this value is
103  /// guaranteed to be equal to `new`.
104  ///
105  /// [`compare_exchange`] takes two [`Ordering`] arguments to describe the memory
106  /// ordering of this operation. The first describes the required ordering if
107  /// the operation succeeds while the second describes the required ordering
108  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
109  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
110  ///
111  /// [`compare_exchange`]: #method.compare_exchange
112  pub fn compare_exchange(
113    &self,
114    current: Option<Duration>,
115    new: Option<Duration>,
116    success: Ordering,
117    failure: Ordering,
118  ) -> Result<Option<Duration>, Option<Duration>> {
119    self
120      .0
121      .compare_exchange(
122        encode_option_duration(current),
123        encode_option_duration(new),
124        success,
125        failure,
126      )
127      .map(decode_option_duration)
128      .map_err(decode_option_duration)
129  }
130  /// Fetches the value, and applies a function to it that returns an optional
131  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
132  /// `Err(previous_value)`.
133  ///
134  /// Note: This may call the function multiple times if the value has been changed from other threads in
135  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
136  /// only once to the stored value.
137  ///
138  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
139  /// The first describes the required ordering for when the operation finally succeeds while the second
140  /// describes the required ordering for loads. These correspond to the success and failure orderings of
141  /// [`compare_exchange`] respectively.
142  ///
143  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
144  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
145  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
146  /// and must be equivalent to or weaker than the success ordering.
147  ///
148  /// [`compare_exchange`]: #method.compare_exchange
149  ///
150  /// # Examples
151  ///
152  /// ```rust
153  /// use atomic_time::AtomicOptionDuration;
154  /// use std::{time::Duration, sync::atomic::Ordering};
155  ///
156  /// let x = AtomicOptionDuration::new(Some(Duration::from_secs(7)));
157  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(Some(Duration::from_secs(7))));
158  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x.map(|val| val + Duration::from_secs(1)))), Ok(Some(Duration::from_secs(7))));
159  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x.map(|val| val + Duration::from_secs(1)))), Ok(Some(Duration::from_secs(8))));
160  /// assert_eq!(x.load(Ordering::SeqCst), Some(Duration::from_secs(9)));
161  /// ```
162  pub fn fetch_update<F>(
163    &self,
164    set_order: Ordering,
165    fetch_order: Ordering,
166    mut f: F,
167  ) -> Result<Option<Duration>, Option<Duration>>
168  where
169    F: FnMut(Option<Duration>) -> Option<Option<Duration>>,
170  {
171    self
172      .0
173      .fetch_update(set_order, fetch_order, |d| {
174        f(decode_option_duration(d)).map(encode_option_duration)
175      })
176      .map(decode_option_duration)
177      .map_err(decode_option_duration)
178  }
179  /// Consumes the atomic and returns the contained value.
180  ///
181  /// This is safe because passing `self` by value guarantees that no other threads are
182  /// concurrently accessing the atomic data.
183  #[inline]
184  pub fn into_inner(self) -> Option<Duration> {
185    decode_option_duration(self.0.into_inner())
186  }
187
188  /// Returns `true` if operations on values of this type are lock-free.
189  /// If the compiler or the platform doesn't support the necessary
190  /// atomic instructions, global locks for every potentially
191  /// concurrent atomic operation will be used.
192  ///
193  /// # Examples
194  /// ```
195  /// use atomic_time::AtomicOptionDuration;
196  ///
197  /// let is_lock_free = AtomicOptionDuration::is_lock_free();
198  /// ```
199  #[inline]
200  pub fn is_lock_free() -> bool {
201    AtomicU128::is_lock_free()
202  }
203}
204
205/// Encode an [`Option<Duration>`] into an [`u128`].
206pub const fn encode_option_duration(option_duration: Option<Duration>) -> u128 {
207  match option_duration {
208    Some(duration) => {
209      let seconds = duration.as_secs() as u128;
210      let nanos = duration.subsec_nanos() as u128;
211      (1 << 127) | (seconds << 32) | nanos
212    }
213    None => 0,
214  }
215}
216
217/// Decode an [`Option<Duration>`] from an encoded [`u128`].
218pub const fn decode_option_duration(encoded: u128) -> Option<Duration> {
219  if encoded >> 127 == 0 {
220    None
221  } else {
222    let seconds = ((encoded << 1) >> 33) as u64;
223    let nanos = (encoded & 0xFFFFFFFF) as u32;
224    Some(Duration::new(seconds, nanos))
225  }
226}
227
228#[cfg(feature = "serde")]
229const _: () = {
230  use serde::{Deserialize, Serialize};
231
232  impl Serialize for AtomicOptionDuration {
233    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
234      self.load(Ordering::SeqCst).serialize(serializer)
235    }
236  }
237
238  impl<'de> Deserialize<'de> for AtomicOptionDuration {
239    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
240      Ok(Self::new(Option::<Duration>::deserialize(deserializer)?))
241    }
242  }
243};
244
245#[cfg(test)]
246mod tests {
247  use super::*;
248
249  #[test]
250  fn test_new_atomic_option_duration() {
251    let duration = Duration::from_secs(5);
252    let atomic_duration = AtomicOptionDuration::new(Some(duration));
253    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(duration));
254  }
255
256  #[test]
257  fn test_atomic_option_duration_load() {
258    let duration = Duration::from_secs(10);
259    let atomic_duration = AtomicOptionDuration::new(Some(duration));
260    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(duration));
261  }
262
263  #[test]
264  fn test_atomic_option_duration_store() {
265    let initial_duration = Duration::from_secs(3);
266    let new_duration = Duration::from_secs(7);
267    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
268    atomic_duration.store(Some(new_duration), Ordering::SeqCst);
269    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
270  }
271
272  #[test]
273  fn test_atomic_option_duration_store_none() {
274    let initial_duration = Duration::from_secs(3);
275    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
276    atomic_duration.store(None, Ordering::SeqCst);
277    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
278  }
279
280  #[test]
281  fn test_atomic_option_duration_swap() {
282    let initial_duration = Duration::from_secs(2);
283    let new_duration = Duration::from_secs(8);
284    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
285    let prev_duration = atomic_duration.swap(Some(new_duration), Ordering::SeqCst);
286    assert_eq!(prev_duration, Some(initial_duration));
287    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
288  }
289
290  #[test]
291  fn test_atomic_option_duration_compare_exchange_weak() {
292    let initial_duration = Duration::from_secs(4);
293    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
294
295    // Successful exchange
296    let mut result;
297    loop {
298      result = atomic_duration.compare_exchange_weak(
299        Some(initial_duration),
300        Some(Duration::from_secs(6)),
301        Ordering::SeqCst,
302        Ordering::SeqCst,
303      );
304
305      if result.is_ok() {
306        break;
307      }
308    }
309
310    assert!(result.is_ok());
311    assert_eq!(result.unwrap(), Some(initial_duration));
312    assert_eq!(
313      atomic_duration.load(Ordering::SeqCst),
314      Some(Duration::from_secs(6))
315    );
316
317    // Failed exchange
318    let result = atomic_duration.compare_exchange_weak(
319      Some(initial_duration),
320      Some(Duration::from_secs(7)),
321      Ordering::SeqCst,
322      Ordering::SeqCst,
323    );
324    assert!(result.is_err());
325    assert_eq!(result.unwrap_err(), Some(Duration::from_secs(6)));
326  }
327
328  #[test]
329  fn test_atomic_option_duration_compare_exchange() {
330    let initial_duration = Duration::from_secs(1);
331    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
332
333    // Successful exchange
334    let result = atomic_duration.compare_exchange(
335      Some(initial_duration),
336      Some(Duration::from_secs(5)),
337      Ordering::SeqCst,
338      Ordering::SeqCst,
339    );
340    assert!(result.is_ok());
341    assert_eq!(result.unwrap(), Some(initial_duration));
342    assert_eq!(
343      atomic_duration.load(Ordering::SeqCst),
344      Some(Duration::from_secs(5))
345    );
346
347    // Failed exchange
348    let result = atomic_duration.compare_exchange(
349      Some(initial_duration),
350      Some(Duration::from_secs(6)),
351      Ordering::SeqCst,
352      Ordering::SeqCst,
353    );
354    assert!(result.is_err());
355    assert_eq!(result.unwrap_err(), Some(Duration::from_secs(5)));
356  }
357
358  #[test]
359  fn test_atomic_option_duration_with_none_initially() {
360    let atomic_duration = AtomicOptionDuration::new(None);
361    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
362  }
363
364  #[test]
365  fn test_atomic_option_duration_store_none_and_then_value() {
366    let atomic_duration = AtomicOptionDuration::new(None);
367    atomic_duration.store(Some(Duration::from_secs(5)), Ordering::SeqCst);
368    assert_eq!(
369      atomic_duration.load(Ordering::SeqCst),
370      Some(Duration::from_secs(5))
371    );
372  }
373
374  #[test]
375  fn test_atomic_option_duration_swap_with_none() {
376    let initial_duration = Duration::from_secs(2);
377    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
378    let prev_duration = atomic_duration.swap(None, Ordering::SeqCst);
379    assert_eq!(prev_duration, Some(initial_duration));
380    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
381  }
382
383  #[test]
384  fn test_atomic_option_duration_compare_exchange_weak_with_none() {
385    let initial_duration = Duration::from_secs(4);
386    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
387
388    // Change to None
389    let mut result;
390
391    loop {
392      result = atomic_duration.compare_exchange_weak(
393        Some(initial_duration),
394        None,
395        Ordering::SeqCst,
396        Ordering::SeqCst,
397      );
398
399      if result.is_ok() {
400        break;
401      }
402    }
403
404    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
405
406    // Change back to Some(Duration)
407    let new_duration = Duration::from_secs(6);
408    let mut result;
409
410    loop {
411      result = atomic_duration.compare_exchange_weak(
412        None,
413        Some(new_duration),
414        Ordering::SeqCst,
415        Ordering::SeqCst,
416      );
417      if result.is_ok() {
418        break;
419      }
420    }
421
422    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
423  }
424
425  #[test]
426  fn test_atomic_option_duration_compare_exchange_with_none() {
427    let initial_duration = Duration::from_secs(1);
428    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
429
430    // Change to None
431    let result = atomic_duration.compare_exchange(
432      Some(initial_duration),
433      None,
434      Ordering::SeqCst,
435      Ordering::SeqCst,
436    );
437    assert!(result.is_ok());
438    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
439
440    // Change back to Some(Duration)
441    let new_duration = Duration::from_secs(5);
442    let result = atomic_duration.compare_exchange(
443      None,
444      Some(new_duration),
445      Ordering::SeqCst,
446      Ordering::SeqCst,
447    );
448    assert!(result.is_ok());
449    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
450  }
451
452  #[test]
453  #[cfg(feature = "std")]
454  fn test_atomic_option_duration_thread_safety() {
455    use std::sync::Arc;
456    use std::thread;
457
458    let atomic_duration = Arc::new(AtomicOptionDuration::new(Some(Duration::from_secs(0))));
459    let mut handles = vec![];
460
461    // Spawn multiple threads to increment the duration
462    for _ in 0..10 {
463      let atomic_clone = Arc::clone(&atomic_duration);
464      let handle = thread::spawn(move || {
465        for _ in 0..100 {
466          loop {
467            let current = atomic_clone.load(Ordering::SeqCst);
468            let new_duration = current
469              .map(|d| d + Duration::from_millis(1))
470              .or(Some(Duration::from_millis(1)));
471            match atomic_clone.compare_exchange_weak(
472              current,
473              new_duration,
474              Ordering::SeqCst,
475              Ordering::SeqCst,
476            ) {
477              Ok(_) => break,     // Successfully updated
478              Err(_) => continue, // Spurious failure, retry
479            }
480          }
481        }
482      });
483      handles.push(handle);
484    }
485
486    // Wait for all threads to complete
487    for handle in handles {
488      handle.join().unwrap();
489    }
490
491    // Verify the final value
492    let expected_duration = Some(Duration::from_millis(10 * 100));
493    assert_eq!(atomic_duration.load(Ordering::SeqCst), expected_duration);
494  }
495
496  #[cfg(feature = "serde")]
497  #[test]
498  fn test_atomic_option_duration_serde() {
499    use serde::{Deserialize, Serialize};
500
501    #[derive(Serialize, Deserialize)]
502    struct Test {
503      duration: AtomicOptionDuration,
504    }
505
506    let test = Test {
507      duration: AtomicOptionDuration::new(Some(Duration::from_secs(5))),
508    };
509    let serialized = serde_json::to_string(&test).unwrap();
510    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
511    assert_eq!(
512      deserialized.duration.load(Ordering::SeqCst),
513      Some(Duration::from_secs(5))
514    );
515
516    let test = Test {
517      duration: AtomicOptionDuration::new(None),
518    };
519    let serialized = serde_json::to_string(&test).unwrap();
520    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
521    assert_eq!(deserialized.duration.load(Ordering::SeqCst), None);
522  }
523}