Skip to main content

atomic_time/
instant.rs

1use std::time::Instant;
2
3use super::*;
4
5/// Atomic version of [`Instant`].
6#[repr(transparent)]
7pub struct AtomicInstant(AtomicDuration);
8
9impl core::fmt::Debug for AtomicInstant {
10  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
11    f.debug_tuple("AtomicInstant")
12      .field(&self.load(Ordering::SeqCst))
13      .finish()
14  }
15}
16impl From<Instant> for AtomicInstant {
17  #[cfg_attr(not(tarpaulin), inline(always))]
18  fn from(instant: Instant) -> Self {
19    Self::new(instant)
20  }
21}
22impl AtomicInstant {
23  /// Returns the instant corresponding to "now".
24  ///
25  /// # Examples
26  /// ```
27  /// use atomic_time::AtomicInstant;
28  ///
29  /// let now = AtomicInstant::now();
30  /// ```
31  #[cfg_attr(not(tarpaulin), inline(always))]
32  pub fn now() -> Self {
33    Self::new(Instant::now())
34  }
35
36  /// Creates a new `AtomicInstant` with the given `Instant` value.
37  #[cfg_attr(not(tarpaulin), inline(always))]
38  pub fn new(instant: Instant) -> Self {
39    Self(AtomicDuration::new(encode_instant_to_duration(instant)))
40  }
41
42  /// Loads a value from the atomic instant.
43  #[cfg_attr(not(tarpaulin), inline(always))]
44  pub fn load(&self, order: Ordering) -> Instant {
45    decode_instant_from_duration(self.0.load(order))
46  }
47
48  /// Stores a value into the atomic instant.
49  #[cfg_attr(not(tarpaulin), inline(always))]
50  pub fn store(&self, instant: Instant, order: Ordering) {
51    self.0.store(encode_instant_to_duration(instant), order)
52  }
53
54  /// Stores a value into the atomic instant, returning the previous value.
55  #[cfg_attr(not(tarpaulin), inline(always))]
56  pub fn swap(&self, instant: Instant, order: Ordering) -> Instant {
57    decode_instant_from_duration(self.0.swap(encode_instant_to_duration(instant), order))
58  }
59
60  /// Stores a value into the atomic instant if the current value is the same as the `current`
61  /// value.
62  #[cfg_attr(not(tarpaulin), inline(always))]
63  pub fn compare_exchange(
64    &self,
65    current: Instant,
66    new: Instant,
67    success: Ordering,
68    failure: Ordering,
69  ) -> Result<Instant, Instant> {
70    match self.0.compare_exchange(
71      encode_instant_to_duration(current),
72      encode_instant_to_duration(new),
73      success,
74      failure,
75    ) {
76      Ok(duration) => Ok(decode_instant_from_duration(duration)),
77      Err(duration) => Err(decode_instant_from_duration(duration)),
78    }
79  }
80
81  /// Stores a value into the atomic instant if the current value is the same as the `current`
82  /// value.
83  #[cfg_attr(not(tarpaulin), inline(always))]
84  pub fn compare_exchange_weak(
85    &self,
86    current: Instant,
87    new: Instant,
88    success: Ordering,
89    failure: Ordering,
90  ) -> Result<Instant, Instant> {
91    match self.0.compare_exchange_weak(
92      encode_instant_to_duration(current),
93      encode_instant_to_duration(new),
94      success,
95      failure,
96    ) {
97      Ok(duration) => Ok(decode_instant_from_duration(duration)),
98      Err(duration) => Err(decode_instant_from_duration(duration)),
99    }
100  }
101
102  /// Fetches the value, and applies a function to it that returns an optional
103  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
104  /// `Err(previous_value)`.
105  ///
106  /// Note: This may call the function multiple times if the value has been changed from other threads in
107  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
108  /// only once to the stored value.
109  ///
110  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
111  /// The first describes the required ordering for when the operation finally succeeds while the second
112  /// describes the required ordering for loads. These correspond to the success and failure orderings of
113  /// [`compare_exchange`] respectively.
114  ///
115  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
116  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
117  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
118  /// and must be equivalent to or weaker than the success ordering.
119  ///
120  /// [`compare_exchange`]: #method.compare_exchange
121  ///
122  /// # Examples
123  ///
124  /// ```rust
125  /// use atomic_time::AtomicInstant;
126  /// use std::{time::{Duration, Instant}, sync::atomic::Ordering};
127  ///
128  /// let now = Instant::now();
129  /// let x = AtomicInstant::new(now);
130  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(now));
131  ///
132  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(now));
133  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(now + Duration::from_secs(1)));
134  /// assert_eq!(x.load(Ordering::SeqCst), now + Duration::from_secs(2));
135  /// ```
136  #[cfg_attr(not(tarpaulin), inline(always))]
137  pub fn fetch_update<F>(
138    &self,
139    set_order: Ordering,
140    fetch_order: Ordering,
141    mut f: F,
142  ) -> Result<Instant, Instant>
143  where
144    F: FnMut(Instant) -> Option<Instant>,
145  {
146    self
147      .0
148      .fetch_update(set_order, fetch_order, |duration| {
149        f(decode_instant_from_duration(duration)).map(encode_instant_to_duration)
150      })
151      .map(decode_instant_from_duration)
152      .map_err(decode_instant_from_duration)
153  }
154
155  /// Returns `true` if operations on values of this type are lock-free.
156  /// If the compiler or the platform doesn't support the necessary
157  /// atomic instructions, global locks for every potentially
158  /// concurrent atomic operation will be used.
159  ///
160  /// # Examples
161  /// ```
162  /// use atomic_time::AtomicInstant;
163  ///
164  /// let is_lock_free = AtomicInstant::is_lock_free();
165  /// ```
166  #[cfg_attr(not(tarpaulin), inline(always))]
167  pub fn is_lock_free() -> bool {
168    AtomicU128::is_lock_free()
169  }
170
171  /// Consumes the atomic and returns the contained value.
172  ///
173  /// This is safe because passing `self` by value guarantees that no other threads are
174  /// concurrently accessing the atomic data.
175  #[cfg_attr(not(tarpaulin), inline(always))]
176  pub fn into_inner(self) -> Instant {
177    decode_instant_from_duration(self.0.into_inner())
178  }
179}
180
181#[cfg(feature = "serde")]
182const _: () = {
183  use core::time::Duration;
184  use serde::{Deserialize, Serialize};
185
186  impl Serialize for AtomicInstant {
187    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
188      self.0.load(Ordering::SeqCst).serialize(serializer)
189    }
190  }
191
192  impl<'de> Deserialize<'de> for AtomicInstant {
193    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
194      Ok(Self::new(decode_instant_from_duration(
195        Duration::deserialize(deserializer)?,
196      )))
197    }
198  }
199};
200
201#[cfg(test)]
202mod tests {
203  use super::*;
204  use std::time::Duration;
205
206  #[test]
207  fn test_atomic_instant_now() {
208    let atomic_instant = AtomicInstant::now();
209    // Check that the time is reasonable (not too far from now).
210    let now = Instant::now();
211    let loaded_instant = atomic_instant.load(Ordering::SeqCst);
212    assert!(loaded_instant <= now);
213    assert!(loaded_instant >= now - Duration::from_secs(1));
214  }
215
216  #[test]
217  fn test_atomic_instant_new_and_load() {
218    let now = Instant::now();
219    let atomic_instant = AtomicInstant::new(now);
220    assert_eq!(atomic_instant.load(Ordering::SeqCst), now);
221  }
222
223  #[test]
224  fn test_atomic_instant_store_and_load() {
225    let now = Instant::now();
226    let after_one_sec = now + Duration::from_secs(1);
227    let atomic_instant = AtomicInstant::new(now);
228    atomic_instant.store(after_one_sec, Ordering::SeqCst);
229    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
230  }
231
232  #[test]
233  fn test_atomic_instant_swap() {
234    let now = Instant::now();
235    let after_one_sec = now + Duration::from_secs(1);
236    let atomic_instant = AtomicInstant::new(now);
237    let prev_instant = atomic_instant.swap(after_one_sec, Ordering::SeqCst);
238    assert_eq!(prev_instant, now);
239    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
240  }
241
242  #[test]
243  fn test_atomic_instant_compare_exchange() {
244    let now = Instant::now();
245    let after_one_sec = now + Duration::from_secs(1);
246    let atomic_instant = AtomicInstant::new(now);
247    let result =
248      atomic_instant.compare_exchange(now, after_one_sec, Ordering::SeqCst, Ordering::SeqCst);
249    assert!(result.is_ok());
250    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
251  }
252
253  #[test]
254  fn test_atomic_instant_compare_exchange_weak() {
255    let now = Instant::now();
256    let after_one_sec = now + Duration::from_secs(1);
257    let atomic_instant = AtomicInstant::new(now);
258
259    let mut result;
260    loop {
261      result = atomic_instant.compare_exchange_weak(
262        now,
263        after_one_sec,
264        Ordering::SeqCst,
265        Ordering::SeqCst,
266      );
267      if result.is_ok() {
268        break;
269      }
270    }
271    assert!(result.is_ok());
272    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
273  }
274
275  #[test]
276  fn test_atomic_instant_fetch_update() {
277    let now = Instant::now();
278    let atomic_instant = AtomicInstant::new(now);
279
280    let result = atomic_instant.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
281      Some(prev + Duration::from_secs(1))
282    });
283    assert!(result.is_ok());
284    assert_eq!(result.unwrap(), now);
285    assert_eq!(
286      atomic_instant.load(Ordering::SeqCst),
287      now + Duration::from_secs(1)
288    );
289  }
290
291  #[test]
292  fn test_atomic_instant_thread_safety() {
293    use std::sync::Arc;
294    use std::thread;
295
296    // Start from a fixed, known value so we can assert an *exact*
297    // final result. The previous version did `load + add + store`,
298    // which loses updates under contention (two threads load the
299    // same value, each writes load+50ms, only one write survives).
300    // Its assertion — "within 200 ms of now" — was satisfied even
301    // when 3 of 4 updates were dropped.
302    let start = Instant::now();
303    let atomic_instant = Arc::new(AtomicInstant::new(start));
304    let mut handles = vec![];
305
306    for _ in 0..4 {
307      let atomic_clone = atomic_instant.clone();
308      let handle = thread::spawn(move || {
309        // `fetch_update` retries on conflict, so every thread's
310        // increment is guaranteed to stick.
311        atomic_clone
312          .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
313            Some(current + Duration::from_millis(50))
314          })
315          .expect("closure never returns None");
316      });
317      handles.push(handle);
318    }
319
320    for handle in handles {
321      handle.join().unwrap();
322    }
323
324    // 4 threads × 50 ms = 200 ms, no lost updates.
325    assert_eq!(
326      atomic_instant.load(Ordering::SeqCst),
327      start + Duration::from_millis(200)
328    );
329  }
330
331  #[test]
332  fn test_atomic_instant_debug() {
333    let atomic_instant = AtomicInstant::now();
334    let debug_str = format!("{:?}", atomic_instant);
335    assert!(debug_str.contains("AtomicInstant"));
336  }
337
338  #[test]
339  fn test_atomic_instant_from() {
340    let now = Instant::now();
341    let atomic_instant = AtomicInstant::from(now);
342    assert_eq!(atomic_instant.load(Ordering::SeqCst), now);
343  }
344
345  #[test]
346  fn test_atomic_instant_into_inner() {
347    let now = Instant::now();
348    let atomic_instant = AtomicInstant::new(now);
349    assert_eq!(atomic_instant.into_inner(), now);
350  }
351
352  #[test]
353  fn test_atomic_instant_compare_exchange_failure() {
354    let now = Instant::now();
355    let other = now + Duration::from_secs(5);
356    let atomic_instant = AtomicInstant::new(now);
357    let result = atomic_instant.compare_exchange(other, other, Ordering::SeqCst, Ordering::SeqCst);
358    assert!(result.is_err());
359    assert_eq!(result.unwrap_err(), now);
360  }
361
362  #[test]
363  fn test_atomic_instant_fetch_update_failure() {
364    let now = Instant::now();
365    let atomic_instant = AtomicInstant::new(now);
366    let result = atomic_instant.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None);
367    assert!(result.is_err());
368    assert_eq!(result.unwrap_err(), now);
369  }
370
371  #[cfg(feature = "serde")]
372  #[test]
373  fn test_atomic_instant_serde() {
374    use serde::{Deserialize, Serialize};
375
376    #[derive(Serialize, Deserialize)]
377    struct Test {
378      time: AtomicInstant,
379    }
380
381    let now = Instant::now();
382    let test = Test {
383      time: AtomicInstant::new(now),
384    };
385    let serialized = serde_json::to_string(&test).unwrap();
386    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
387    assert_eq!(deserialized.time.load(Ordering::SeqCst), now);
388  }
389
390  #[test]
391  fn test_atomic_instant_past_value() {
392    use std::thread;
393
394    let past = Instant::now();
395    thread::sleep(Duration::from_millis(10));
396    let now = Instant::now();
397
398    // Store a past instant and verify roundtrip
399    let atomic = AtomicInstant::new(now);
400    atomic.store(past, Ordering::SeqCst);
401    let loaded = atomic.load(Ordering::SeqCst);
402    assert!(loaded < now);
403    assert_eq!(loaded, past);
404  }
405
406  #[test]
407  fn decode_instant_from_extreme_duration_does_not_panic() {
408    // Regression: `decode_instant_from_duration(Duration::MAX)` used to
409    // panic via `instant_now + huge_delta`. After the fix it saturates
410    // at `instant_now` (the closest representable Instant). The exact
411    // value is less important than the fact that no panic occurs — this
412    // path is hit by serde Deserialize, so a panic would crash the
413    // process on malformed/adversarial input.
414    let max_dur = Duration::new(u64::MAX, 999_999_999);
415    let decoded = crate::utils::decode_instant_from_duration(max_dur);
416    // Should be some valid Instant — we can't predict the exact value
417    // since it depends on when the process started, but it must not
418    // have panicked.
419    let _ = decoded;
420  }
421
422  #[cfg(feature = "serde")]
423  #[test]
424  fn deserialize_extreme_instant_does_not_panic() {
425    // Simulates adversarial input via serde. The Duration inside the
426    // JSON is so large that decoding it into an Instant would overflow
427    // — this must return an Ok (with a saturated value), not crash.
428    let json = r#"{"secs":18446744073709551615,"nanos":999999999}"#;
429    let result: Result<AtomicInstant, _> = serde_json::from_str(json);
430    assert!(
431      result.is_ok(),
432      "deserialization of extreme Instant should not panic"
433    );
434  }
435}