atomic_time/
option_duration.rs

1use crate::AtomicU128;
2use core::{sync::atomic::Ordering, time::Duration};
3
4/// Atomic version of [`Option<Duration>`].
5#[repr(transparent)]
6pub struct AtomicOptionDuration(AtomicU128);
7impl core::fmt::Debug for AtomicOptionDuration {
8  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
9    f.debug_tuple("AtomicOptionDuration")
10      .field(&self.load(Ordering::SeqCst))
11      .finish()
12  }
13}
14impl AtomicOptionDuration {
15  /// Creates a new `AtomicOptionDuration` with `None`.
16  pub const fn none() -> Self {
17    Self(AtomicU128::new(encode_option_duration(None)))
18  }
19
20  /// Creates a new `AtomicOptionDuration` with the given value.
21  pub const fn new(duration: Option<Duration>) -> Self {
22    Self(AtomicU128::new(encode_option_duration(duration)))
23  }
24  /// Loads `Option<Duration>` from `AtomicOptionDuration`.
25  ///
26  /// load takes an [`Ordering`] argument which describes the memory ordering of this operation.
27  ///
28  /// # Panics
29  /// Panics if order is [`Release`](Ordering::Release) or [`AcqRel`](Ordering::AcqRel).
30  pub fn load(&self, ordering: Ordering) -> Option<Duration> {
31    decode_option_duration(self.0.load(ordering))
32  }
33  /// Stores a value into the `AtomicOptionDuration`.
34  ///
35  /// `store` takes an [`Ordering`] argument which describes the memory ordering
36  /// of this operation.
37  ///
38  /// # Panics
39  ///
40  /// Panics if `order` is [`Acquire`](Ordering::Acquire) or [`AcqRel`](Ordering::AcqRel).
41  pub fn store(&self, val: Option<Duration>, ordering: Ordering) {
42    self.0.store(encode_option_duration(val), ordering)
43  }
44  /// Stores a value into the `AtomicOptionDuration`, returning the old value.
45  ///
46  /// `swap` takes an [`Ordering`] argument which describes the memory ordering
47  /// of this operation.
48  pub fn swap(&self, val: Option<Duration>, ordering: Ordering) -> Option<Duration> {
49    decode_option_duration(self.0.swap(encode_option_duration(val), ordering))
50  }
51  /// Stores a value into the `AtomicOptionDuration` if the current value is the same as the
52  /// `current` value.
53  ///
54  /// Unlike [`compare_exchange`], this function is allowed to spuriously fail
55  /// even when the comparison succeeds, which can result in more efficient
56  /// code on some platforms. The return value is a result indicating whether
57  /// the new value was written and containing the previous value.
58  ///
59  /// `compare_exchange` takes two [`Ordering`] arguments to describe the memory
60  /// ordering of this operation. The first describes the required ordering if
61  /// the operation succeeds while the second describes the required ordering
62  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
63  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
64  /// success ordering.
65  ///
66  /// [`compare_exchange`]: #method.compare_exchange
67  pub fn compare_exchange_weak(
68    &self,
69    current: Option<Duration>,
70    new: Option<Duration>,
71    success: Ordering,
72    failure: Ordering,
73  ) -> Result<Option<Duration>, Option<Duration>> {
74    self
75      .0
76      .compare_exchange_weak(
77        encode_option_duration(current),
78        encode_option_duration(new),
79        success,
80        failure,
81      )
82      .map(decode_option_duration)
83      .map_err(decode_option_duration)
84  }
85  /// Stores a value into the `AtomicOptionDuration` if the current value is the same as the
86  /// `current` value.
87  ///
88  /// The return value is a result indicating whether the new value was
89  /// written and containing the previous value. On success this value is
90  /// guaranteed to be equal to `new`.
91  ///
92  /// [`compare_exchange`] takes two [`Ordering`] arguments to describe the memory
93  /// ordering of this operation. The first describes the required ordering if
94  /// the operation succeeds while the second describes the required ordering
95  /// when the operation fails. The failure ordering can't be [`Release`](Ordering::Release) or
96  /// [`AcqRel`](Ordering::AcqRel) and must be equivalent or weaker than the success ordering.
97  ///
98  /// [`compare_exchange`]: #method.compare_exchange
99  pub fn compare_exchange(
100    &self,
101    current: Option<Duration>,
102    new: Option<Duration>,
103    success: Ordering,
104    failure: Ordering,
105  ) -> Result<Option<Duration>, Option<Duration>> {
106    self
107      .0
108      .compare_exchange(
109        encode_option_duration(current),
110        encode_option_duration(new),
111        success,
112        failure,
113      )
114      .map(decode_option_duration)
115      .map_err(decode_option_duration)
116  }
117  /// Fetches the value, and applies a function to it that returns an optional
118  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
119  /// `Err(previous_value)`.
120  ///
121  /// Note: This may call the function multiple times if the value has been changed from other threads in
122  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
123  /// only once to the stored value.
124  ///
125  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
126  /// The first describes the required ordering for when the operation finally succeeds while the second
127  /// describes the required ordering for loads. These correspond to the success and failure orderings of
128  /// [`compare_exchange`] respectively.
129  ///
130  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
131  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
132  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
133  /// and must be equivalent to or weaker than the success ordering.
134  ///
135  /// [`compare_exchange`]: #method.compare_exchange
136  ///
137  /// # Examples
138  ///
139  /// ```rust
140  /// use atomic_time::AtomicOptionDuration;
141  /// use std::{time::Duration, sync::atomic::Ordering};
142  ///
143  /// let x = AtomicOptionDuration::new(Some(Duration::from_secs(7)));
144  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(Some(Duration::from_secs(7))));
145  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x.map(|val| val + Duration::from_secs(1)))), Ok(Some(Duration::from_secs(7))));
146  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x.map(|val| val + Duration::from_secs(1)))), Ok(Some(Duration::from_secs(8))));
147  /// assert_eq!(x.load(Ordering::SeqCst), Some(Duration::from_secs(9)));
148  /// ```
149  pub fn fetch_update<F>(
150    &self,
151    set_order: Ordering,
152    fetch_order: Ordering,
153    mut f: F,
154  ) -> Result<Option<Duration>, Option<Duration>>
155  where
156    F: FnMut(Option<Duration>) -> Option<Option<Duration>>,
157  {
158    self
159      .0
160      .fetch_update(set_order, fetch_order, |d| {
161        f(decode_option_duration(d)).map(encode_option_duration)
162      })
163      .map(decode_option_duration)
164      .map_err(decode_option_duration)
165  }
166  /// Consumes the atomic and returns the contained value.
167  ///
168  /// This is safe because passing `self` by value guarantees that no other threads are
169  /// concurrently accessing the atomic data.
170  #[inline]
171  pub fn into_inner(self) -> Option<Duration> {
172    decode_option_duration(self.0.into_inner())
173  }
174
175  /// Returns `true` if operations on values of this type are lock-free.
176  /// If the compiler or the platform doesn't support the necessary
177  /// atomic instructions, global locks for every potentially
178  /// concurrent atomic operation will be used.
179  ///
180  /// # Examples
181  /// ```
182  /// use atomic_time::AtomicOptionDuration;
183  ///
184  /// let is_lock_free = AtomicOptionDuration::is_lock_free();
185  /// ```
186  #[inline]
187  pub fn is_lock_free() -> bool {
188    AtomicU128::is_lock_free()
189  }
190}
191
192/// Encode an [`Option<Duration>`] into an [`u128`].
193pub const fn encode_option_duration(option_duration: Option<Duration>) -> u128 {
194  match option_duration {
195    Some(duration) => {
196      let seconds = duration.as_secs() as u128;
197      let nanos = duration.subsec_nanos() as u128;
198      (1 << 127) | (seconds << 32) | nanos
199    }
200    None => 0,
201  }
202}
203
204/// Decode an [`Option<Duration>`] from an encoded [`u128`].
205pub const fn decode_option_duration(encoded: u128) -> Option<Duration> {
206  if encoded >> 127 == 0 {
207    None
208  } else {
209    let seconds = ((encoded << 1) >> 33) as u64;
210    let nanos = (encoded & 0xFFFFFFFF) as u32;
211    Some(Duration::new(seconds, nanos))
212  }
213}
214
215#[cfg(feature = "serde")]
216const _: () = {
217  use serde::{Deserialize, Serialize};
218
219  impl Serialize for AtomicOptionDuration {
220    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
221      self.load(Ordering::SeqCst).serialize(serializer)
222    }
223  }
224
225  impl<'de> Deserialize<'de> for AtomicOptionDuration {
226    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
227      Ok(Self::new(Option::<Duration>::deserialize(deserializer)?))
228    }
229  }
230};
231
232#[cfg(test)]
233mod tests {
234  use super::*;
235
236  #[test]
237  fn test_new_atomic_option_duration() {
238    let duration = Duration::from_secs(5);
239    let atomic_duration = AtomicOptionDuration::new(Some(duration));
240    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(duration));
241  }
242
243  #[test]
244  fn test_atomic_option_duration_load() {
245    let duration = Duration::from_secs(10);
246    let atomic_duration = AtomicOptionDuration::new(Some(duration));
247    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(duration));
248  }
249
250  #[test]
251  fn test_atomic_option_duration_store() {
252    let initial_duration = Duration::from_secs(3);
253    let new_duration = Duration::from_secs(7);
254    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
255    atomic_duration.store(Some(new_duration), Ordering::SeqCst);
256    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
257  }
258
259  #[test]
260  fn test_atomic_option_duration_store_none() {
261    let initial_duration = Duration::from_secs(3);
262    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
263    atomic_duration.store(None, Ordering::SeqCst);
264    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
265  }
266
267  #[test]
268  fn test_atomic_option_duration_swap() {
269    let initial_duration = Duration::from_secs(2);
270    let new_duration = Duration::from_secs(8);
271    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
272    let prev_duration = atomic_duration.swap(Some(new_duration), Ordering::SeqCst);
273    assert_eq!(prev_duration, Some(initial_duration));
274    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
275  }
276
277  #[test]
278  fn test_atomic_option_duration_compare_exchange_weak() {
279    let initial_duration = Duration::from_secs(4);
280    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
281
282    // Successful exchange
283    let mut result;
284    loop {
285      result = atomic_duration.compare_exchange_weak(
286        Some(initial_duration),
287        Some(Duration::from_secs(6)),
288        Ordering::SeqCst,
289        Ordering::SeqCst,
290      );
291
292      if result.is_ok() {
293        break;
294      }
295    }
296
297    assert!(result.is_ok());
298    assert_eq!(result.unwrap(), Some(initial_duration));
299    assert_eq!(
300      atomic_duration.load(Ordering::SeqCst),
301      Some(Duration::from_secs(6))
302    );
303
304    // Failed exchange
305    let result = atomic_duration.compare_exchange_weak(
306      Some(initial_duration),
307      Some(Duration::from_secs(7)),
308      Ordering::SeqCst,
309      Ordering::SeqCst,
310    );
311    assert!(result.is_err());
312    assert_eq!(result.unwrap_err(), Some(Duration::from_secs(6)));
313  }
314
315  #[test]
316  fn test_atomic_option_duration_compare_exchange() {
317    let initial_duration = Duration::from_secs(1);
318    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
319
320    // Successful exchange
321    let result = atomic_duration.compare_exchange(
322      Some(initial_duration),
323      Some(Duration::from_secs(5)),
324      Ordering::SeqCst,
325      Ordering::SeqCst,
326    );
327    assert!(result.is_ok());
328    assert_eq!(result.unwrap(), Some(initial_duration));
329    assert_eq!(
330      atomic_duration.load(Ordering::SeqCst),
331      Some(Duration::from_secs(5))
332    );
333
334    // Failed exchange
335    let result = atomic_duration.compare_exchange(
336      Some(initial_duration),
337      Some(Duration::from_secs(6)),
338      Ordering::SeqCst,
339      Ordering::SeqCst,
340    );
341    assert!(result.is_err());
342    assert_eq!(result.unwrap_err(), Some(Duration::from_secs(5)));
343  }
344
345  #[test]
346  fn test_atomic_option_duration_with_none_initially() {
347    let atomic_duration = AtomicOptionDuration::new(None);
348    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
349  }
350
351  #[test]
352  fn test_atomic_option_duration_store_none_and_then_value() {
353    let atomic_duration = AtomicOptionDuration::new(None);
354    atomic_duration.store(Some(Duration::from_secs(5)), Ordering::SeqCst);
355    assert_eq!(
356      atomic_duration.load(Ordering::SeqCst),
357      Some(Duration::from_secs(5))
358    );
359  }
360
361  #[test]
362  fn test_atomic_option_duration_swap_with_none() {
363    let initial_duration = Duration::from_secs(2);
364    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
365    let prev_duration = atomic_duration.swap(None, Ordering::SeqCst);
366    assert_eq!(prev_duration, Some(initial_duration));
367    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
368  }
369
370  #[test]
371  fn test_atomic_option_duration_compare_exchange_weak_with_none() {
372    let initial_duration = Duration::from_secs(4);
373    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
374
375    // Change to None
376    let result = atomic_duration.compare_exchange_weak(
377      Some(initial_duration),
378      None,
379      Ordering::SeqCst,
380      Ordering::SeqCst,
381    );
382    assert!(result.is_ok());
383    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
384
385    // Change back to Some(Duration)
386    let new_duration = Duration::from_secs(6);
387    let result = atomic_duration.compare_exchange_weak(
388      None,
389      Some(new_duration),
390      Ordering::SeqCst,
391      Ordering::SeqCst,
392    );
393    assert!(result.is_ok());
394    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
395  }
396
397  #[test]
398  fn test_atomic_option_duration_compare_exchange_with_none() {
399    let initial_duration = Duration::from_secs(1);
400    let atomic_duration = AtomicOptionDuration::new(Some(initial_duration));
401
402    // Change to None
403    let result = atomic_duration.compare_exchange(
404      Some(initial_duration),
405      None,
406      Ordering::SeqCst,
407      Ordering::SeqCst,
408    );
409    assert!(result.is_ok());
410    assert_eq!(atomic_duration.load(Ordering::SeqCst), None);
411
412    // Change back to Some(Duration)
413    let new_duration = Duration::from_secs(5);
414    let result = atomic_duration.compare_exchange(
415      None,
416      Some(new_duration),
417      Ordering::SeqCst,
418      Ordering::SeqCst,
419    );
420    assert!(result.is_ok());
421    assert_eq!(atomic_duration.load(Ordering::SeqCst), Some(new_duration));
422  }
423
424  #[test]
425  #[cfg(feature = "std")]
426  fn test_atomic_option_duration_thread_safety() {
427    use std::sync::Arc;
428    use std::thread;
429
430    let atomic_duration = Arc::new(AtomicOptionDuration::new(Some(Duration::from_secs(0))));
431    let mut handles = vec![];
432
433    // Spawn multiple threads to increment the duration
434    for _ in 0..10 {
435      let atomic_clone = Arc::clone(&atomic_duration);
436      let handle = thread::spawn(move || {
437        for _ in 0..100 {
438          loop {
439            let current = atomic_clone.load(Ordering::SeqCst);
440            let new_duration = current
441              .map(|d| d + Duration::from_millis(1))
442              .or(Some(Duration::from_millis(1)));
443            match atomic_clone.compare_exchange_weak(
444              current,
445              new_duration,
446              Ordering::SeqCst,
447              Ordering::SeqCst,
448            ) {
449              Ok(_) => break,     // Successfully updated
450              Err(_) => continue, // Spurious failure, retry
451            }
452          }
453        }
454      });
455      handles.push(handle);
456    }
457
458    // Wait for all threads to complete
459    for handle in handles {
460      handle.join().unwrap();
461    }
462
463    // Verify the final value
464    let expected_duration = Some(Duration::from_millis(10 * 100));
465    assert_eq!(atomic_duration.load(Ordering::SeqCst), expected_duration);
466  }
467
468  #[cfg(feature = "serde")]
469  #[test]
470  fn test_atomic_option_duration_serde() {
471    use serde::{Deserialize, Serialize};
472
473    #[derive(Serialize, Deserialize)]
474    struct Test {
475      duration: AtomicOptionDuration,
476    }
477
478    let test = Test {
479      duration: AtomicOptionDuration::new(Some(Duration::from_secs(5))),
480    };
481    let serialized = serde_json::to_string(&test).unwrap();
482    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
483    assert_eq!(
484      deserialized.duration.load(Ordering::SeqCst),
485      Some(Duration::from_secs(5))
486    );
487
488    let test = Test {
489      duration: AtomicOptionDuration::new(None),
490    };
491    let serialized = serde_json::to_string(&test).unwrap();
492    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
493    assert_eq!(deserialized.duration.load(Ordering::SeqCst), None);
494  }
495}