Skip to main content

atomic_time/
instant.rs

1use std::time::Instant;
2
3use super::*;
4
5/// Atomic version of [`Instant`].
6#[repr(transparent)]
7pub struct AtomicInstant(AtomicDuration);
8
9impl core::fmt::Debug for AtomicInstant {
10  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
11    f.debug_tuple("AtomicInstant")
12      .field(&self.load(Ordering::SeqCst))
13      .finish()
14  }
15}
16impl From<Instant> for AtomicInstant {
17  #[inline]
18  fn from(instant: Instant) -> Self {
19    Self::new(instant)
20  }
21}
22impl AtomicInstant {
23  /// Returns the instant corresponding to "now".
24  ///
25  /// # Examples
26  /// ```
27  /// use atomic_time::AtomicInstant;
28  ///
29  /// let now = AtomicInstant::now();
30  /// ```
31  pub fn now() -> Self {
32    Self::new(Instant::now())
33  }
34
35  /// Creates a new `AtomicInstant` with the given `Instant` value.
36  pub fn new(instant: Instant) -> Self {
37    Self(AtomicDuration::new(encode_instant_to_duration(instant)))
38  }
39
40  /// Loads a value from the atomic instant.
41  pub fn load(&self, order: Ordering) -> Instant {
42    decode_instant_from_duration(self.0.load(order))
43  }
44
45  /// Stores a value into the atomic instant.
46  pub fn store(&self, instant: Instant, order: Ordering) {
47    self.0.store(encode_instant_to_duration(instant), order)
48  }
49
50  /// Stores a value into the atomic instant, returning the previous value.
51  pub fn swap(&self, instant: Instant, order: Ordering) -> Instant {
52    decode_instant_from_duration(self.0.swap(encode_instant_to_duration(instant), order))
53  }
54
55  /// Stores a value into the atomic instant if the current value is the same as the `current`
56  /// value.
57  pub fn compare_exchange(
58    &self,
59    current: Instant,
60    new: Instant,
61    success: Ordering,
62    failure: Ordering,
63  ) -> Result<Instant, Instant> {
64    match self.0.compare_exchange(
65      encode_instant_to_duration(current),
66      encode_instant_to_duration(new),
67      success,
68      failure,
69    ) {
70      Ok(duration) => Ok(decode_instant_from_duration(duration)),
71      Err(duration) => Err(decode_instant_from_duration(duration)),
72    }
73  }
74
75  /// Stores a value into the atomic instant if the current value is the same as the `current`
76  /// value.
77  pub fn compare_exchange_weak(
78    &self,
79    current: Instant,
80    new: Instant,
81    success: Ordering,
82    failure: Ordering,
83  ) -> Result<Instant, Instant> {
84    match self.0.compare_exchange_weak(
85      encode_instant_to_duration(current),
86      encode_instant_to_duration(new),
87      success,
88      failure,
89    ) {
90      Ok(duration) => Ok(decode_instant_from_duration(duration)),
91      Err(duration) => Err(decode_instant_from_duration(duration)),
92    }
93  }
94
95  /// Fetches the value, and applies a function to it that returns an optional
96  /// new value. Returns a `Result` of `Ok(previous_value)` if the function returned `Some(_)`, else
97  /// `Err(previous_value)`.
98  ///
99  /// Note: This may call the function multiple times if the value has been changed from other threads in
100  /// the meantime, as long as the function returns `Some(_)`, but the function will have been applied
101  /// only once to the stored value.
102  ///
103  /// `fetch_update` takes two [`Ordering`] arguments to describe the memory ordering of this operation.
104  /// The first describes the required ordering for when the operation finally succeeds while the second
105  /// describes the required ordering for loads. These correspond to the success and failure orderings of
106  /// [`compare_exchange`] respectively.
107  ///
108  /// Using [`Acquire`](Ordering::Acquire) as success ordering makes the store part
109  /// of this operation [`Relaxed`](Ordering::Relaxed), and using [`Release`](Ordering::Release) makes the final successful load
110  /// [`Relaxed`](Ordering::Relaxed). The (failed) load ordering can only be [`SeqCst`](Ordering::SeqCst), [`Acquire`](Ordering::Acquire) or [`Relaxed`](Ordering::Release)
111  /// and must be equivalent to or weaker than the success ordering.
112  ///
113  /// [`compare_exchange`]: #method.compare_exchange
114  ///
115  /// # Examples
116  ///
117  /// ```rust
118  /// use atomic_time::AtomicInstant;
119  /// use std::{time::{Duration, Instant}, sync::atomic::Ordering};
120  ///
121  /// let now = Instant::now();
122  /// let x = AtomicInstant::new(now);
123  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| None), Err(now));
124  ///
125  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(now));
126  /// assert_eq!(x.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| Some(x + Duration::from_secs(1))), Ok(now + Duration::from_secs(1)));
127  /// assert_eq!(x.load(Ordering::SeqCst), now + Duration::from_secs(2));
128  /// ```
129  pub fn fetch_update<F>(
130    &self,
131    set_order: Ordering,
132    fetch_order: Ordering,
133    mut f: F,
134  ) -> Result<Instant, Instant>
135  where
136    F: FnMut(Instant) -> Option<Instant>,
137  {
138    self
139      .0
140      .fetch_update(set_order, fetch_order, |duration| {
141        f(decode_instant_from_duration(duration)).map(encode_instant_to_duration)
142      })
143      .map(decode_instant_from_duration)
144      .map_err(decode_instant_from_duration)
145  }
146
147  /// Returns `true` if operations on values of this type are lock-free.
148  /// If the compiler or the platform doesn't support the necessary
149  /// atomic instructions, global locks for every potentially
150  /// concurrent atomic operation will be used.
151  ///
152  /// # Examples
153  /// ```
154  /// use atomic_time::AtomicInstant;
155  ///
156  /// let is_lock_free = AtomicInstant::is_lock_free();
157  /// ```
158  #[inline]
159  pub fn is_lock_free() -> bool {
160    AtomicU128::is_lock_free()
161  }
162
163  /// Consumes the atomic and returns the contained value.
164  ///
165  /// This is safe because passing `self` by value guarantees that no other threads are
166  /// concurrently accessing the atomic data.
167  #[inline]
168  pub fn into_inner(self) -> Instant {
169    decode_instant_from_duration(self.0.into_inner())
170  }
171}
172
173#[cfg(feature = "serde")]
174const _: () = {
175  use core::time::Duration;
176  use serde::{Deserialize, Serialize};
177
178  impl Serialize for AtomicInstant {
179    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
180      self.0.load(Ordering::SeqCst).serialize(serializer)
181    }
182  }
183
184  impl<'de> Deserialize<'de> for AtomicInstant {
185    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
186      Ok(Self::new(decode_instant_from_duration(
187        Duration::deserialize(deserializer)?,
188      )))
189    }
190  }
191};
192
193#[cfg(test)]
194mod tests {
195  use super::*;
196  use std::time::Duration;
197
198  #[test]
199  fn test_atomic_instant_now() {
200    let atomic_instant = AtomicInstant::now();
201    // Check that the time is reasonable (not too far from now).
202    let now = Instant::now();
203    let loaded_instant = atomic_instant.load(Ordering::SeqCst);
204    assert!(loaded_instant <= now);
205    assert!(loaded_instant >= now - Duration::from_secs(1));
206  }
207
208  #[test]
209  fn test_atomic_instant_new_and_load() {
210    let now = Instant::now();
211    let atomic_instant = AtomicInstant::new(now);
212    assert_eq!(atomic_instant.load(Ordering::SeqCst), now);
213  }
214
215  #[test]
216  fn test_atomic_instant_store_and_load() {
217    let now = Instant::now();
218    let after_one_sec = now + Duration::from_secs(1);
219    let atomic_instant = AtomicInstant::new(now);
220    atomic_instant.store(after_one_sec, Ordering::SeqCst);
221    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
222  }
223
224  #[test]
225  fn test_atomic_instant_swap() {
226    let now = Instant::now();
227    let after_one_sec = now + Duration::from_secs(1);
228    let atomic_instant = AtomicInstant::new(now);
229    let prev_instant = atomic_instant.swap(after_one_sec, Ordering::SeqCst);
230    assert_eq!(prev_instant, now);
231    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
232  }
233
234  #[test]
235  fn test_atomic_instant_compare_exchange() {
236    let now = Instant::now();
237    let after_one_sec = now + Duration::from_secs(1);
238    let atomic_instant = AtomicInstant::new(now);
239    let result =
240      atomic_instant.compare_exchange(now, after_one_sec, Ordering::SeqCst, Ordering::SeqCst);
241    assert!(result.is_ok());
242    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
243  }
244
245  #[test]
246  fn test_atomic_instant_compare_exchange_weak() {
247    let now = Instant::now();
248    let after_one_sec = now + Duration::from_secs(1);
249    let atomic_instant = AtomicInstant::new(now);
250
251    let mut result;
252    loop {
253      result = atomic_instant.compare_exchange_weak(
254        now,
255        after_one_sec,
256        Ordering::SeqCst,
257        Ordering::SeqCst,
258      );
259      if result.is_ok() {
260        break;
261      }
262    }
263    assert!(result.is_ok());
264    assert_eq!(atomic_instant.load(Ordering::SeqCst), after_one_sec);
265  }
266
267  #[test]
268  fn test_atomic_instant_fetch_update() {
269    let now = Instant::now();
270    let atomic_instant = AtomicInstant::new(now);
271
272    let result = atomic_instant.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
273      Some(prev + Duration::from_secs(1))
274    });
275    assert!(result.is_ok());
276    assert_eq!(result.unwrap(), now);
277    assert_eq!(
278      atomic_instant.load(Ordering::SeqCst),
279      now + Duration::from_secs(1)
280    );
281  }
282
283  #[test]
284  fn test_atomic_instant_thread_safety() {
285    use std::sync::Arc;
286    use std::thread;
287
288    let atomic_instant = Arc::new(AtomicInstant::now());
289    let mut handles = vec![];
290
291    for _ in 0..4 {
292      let atomic_clone = atomic_instant.clone();
293      let handle = thread::spawn(move || {
294        let current = atomic_clone.load(Ordering::SeqCst);
295        let new = current + Duration::from_millis(50);
296        atomic_clone.store(new, Ordering::SeqCst);
297      });
298      handles.push(handle);
299    }
300
301    for handle in handles {
302      handle.join().unwrap();
303    }
304
305    // Check that the instant has advanced by at least 50 milliseconds * 4 threads
306    let loaded_instant = atomic_instant.load(Ordering::SeqCst);
307    assert!(loaded_instant >= Instant::now() - Duration::from_millis(200));
308  }
309
310  #[cfg(feature = "serde")]
311  #[test]
312  fn test_atomic_instant_serde() {
313    use serde::{Deserialize, Serialize};
314
315    #[derive(Serialize, Deserialize)]
316    struct Test {
317      time: AtomicInstant,
318    }
319
320    let now = Instant::now();
321    let test = Test {
322      time: AtomicInstant::new(now),
323    };
324    let serialized = serde_json::to_string(&test).unwrap();
325    let deserialized: Test = serde_json::from_str(&serialized).unwrap();
326    assert_eq!(deserialized.time.load(Ordering::SeqCst), now);
327  }
328
329  #[test]
330  fn test_atomic_instant_past_value() {
331    use std::thread;
332
333    let past = Instant::now();
334    thread::sleep(Duration::from_millis(10));
335    let now = Instant::now();
336
337    // Store a past instant and verify roundtrip
338    let atomic = AtomicInstant::new(now);
339    atomic.store(past, Ordering::SeqCst);
340    let loaded = atomic.load(Ordering::SeqCst);
341    assert!(loaded < now);
342    assert_eq!(loaded, past);
343  }
344}