serf_types/
clock.rs

1use std::sync::{
2  atomic::{AtomicU64, Ordering},
3  Arc,
4};
5
6use transformable::{utils::*, Transformable};
7
8/// A lamport time is a simple u64 that represents a point in time.
9#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11#[cfg_attr(feature = "serde", serde(transparent))]
12#[repr(transparent)]
13pub struct LamportTime(pub(crate) u64);
14
15impl core::fmt::Display for LamportTime {
16  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
17    write!(f, "{}", self.0)
18  }
19}
20
21impl From<u64> for LamportTime {
22  fn from(time: u64) -> Self {
23    Self(time)
24  }
25}
26
27impl From<LamportTime> for u64 {
28  fn from(time: LamportTime) -> Self {
29    time.0
30  }
31}
32
33impl LamportTime {
34  /// Zero lamport time
35  pub const ZERO: Self = Self(0);
36
37  /// Creates a new lamport time from the given u64
38  #[inline]
39  pub const fn new(time: u64) -> Self {
40    Self(time)
41  }
42
43  /// Returns the lamport time as a big endian byte array
44  #[inline]
45  pub const fn to_be_bytes(self) -> [u8; 8] {
46    self.0.to_be_bytes()
47  }
48
49  /// Returns the lamport time as a little endian byte array
50  #[inline]
51  pub const fn to_le_bytes(self) -> [u8; 8] {
52    self.0.to_le_bytes()
53  }
54
55  /// Creates a new lamport time from a big endian byte array
56  #[inline]
57  pub const fn from_be_bytes(bytes: [u8; 8]) -> Self {
58    Self(u64::from_be_bytes(bytes))
59  }
60
61  /// Creates a new lamport time from a little endian byte array
62  #[inline]
63  pub const fn from_le_bytes(bytes: [u8; 8]) -> Self {
64    Self(u64::from_le_bytes(bytes))
65  }
66}
67
68impl core::ops::Add<Self> for LamportTime {
69  type Output = Self;
70
71  #[inline]
72  fn add(self, rhs: Self) -> Self::Output {
73    Self(self.0 + rhs.0)
74  }
75}
76
77impl core::ops::Sub<Self> for LamportTime {
78  type Output = Self;
79
80  #[inline]
81  fn sub(self, rhs: Self) -> Self::Output {
82    Self(self.0 - rhs.0)
83  }
84}
85
86impl core::ops::Rem<Self> for LamportTime {
87  type Output = Self;
88
89  #[inline]
90  fn rem(self, rhs: Self) -> Self::Output {
91    Self(self.0 % rhs.0)
92  }
93}
94
95/// Error that can occur when transforming a lamport time
96#[derive(thiserror::Error, Debug)]
97pub enum LamportTimeTransformError {
98  /// Encode varint error
99  #[error(transparent)]
100  Encode(#[from] InsufficientBuffer),
101  /// Decode varint error
102  #[error(transparent)]
103  Decode(#[from] DecodeVarintError),
104}
105
106impl Transformable for LamportTime {
107  type Error = LamportTimeTransformError;
108
109  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
110    encode_u64_varint(self.0, dst).map_err(Into::into)
111  }
112
113  fn encoded_len(&self) -> usize {
114    encoded_u64_varint_len(self.0)
115  }
116
117  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
118  where
119    Self: Sized,
120  {
121    decode_u64_varint(src)
122      .map(|(n, time)| (n, Self(time)))
123      .map_err(Into::into)
124  }
125}
126
127/// A thread safe implementation of a lamport clock. It
128/// uses efficient atomic operations for all of its functions, falling back
129/// to a heavy lock only if there are enough CAS failures.
130#[derive(Debug, Clone)]
131pub struct LamportClock(Arc<AtomicU64>);
132
133impl Default for LamportClock {
134  fn default() -> Self {
135    Self::new()
136  }
137}
138
139impl LamportClock {
140  /// Creates a new lamport clock with the given initial value
141  #[inline]
142  pub fn new() -> Self {
143    Self(Arc::new(AtomicU64::new(0)))
144  }
145
146  /// Return the current value of the lamport clock
147  #[inline]
148  pub fn time(&self) -> LamportTime {
149    LamportTime(self.0.load(Ordering::SeqCst))
150  }
151
152  /// Increment and return the value of the lamport clock
153  #[inline]
154  pub fn increment(&self) -> LamportTime {
155    LamportTime(self.0.fetch_add(1, Ordering::SeqCst) + 1)
156  }
157
158  /// Witness is called to update our local clock if necessary after
159  /// witnessing a clock value received from another process
160  #[inline]
161  pub fn witness(&self, time: LamportTime) {
162    loop {
163      // If the other value is old, we do not need to do anything
164      let current = self.0.load(Ordering::SeqCst);
165      if time.0 < current {
166        return;
167      }
168
169      // Ensure that our local clock is at least one ahead.
170      match self
171        .0
172        .compare_exchange_weak(current, time.0 + 1, Ordering::SeqCst, Ordering::SeqCst)
173      {
174        Ok(_) => return,
175        Err(_) => continue,
176      }
177    }
178  }
179}
180
181#[cfg(test)]
182impl LamportTime {
183  pub(crate) fn random() -> Self {
184    use rand::Rng;
185    Self(rand::thread_rng().gen_range(0..u64::MAX))
186  }
187}
188
189#[test]
190fn test_lamport_clock() {
191  let l = LamportClock::new();
192
193  assert_eq!(l.time(), 0.into());
194  assert_eq!(l.increment(), 1.into());
195  assert_eq!(l.time(), 1.into());
196
197  l.witness(41.into());
198  assert_eq!(l.time(), 42.into());
199
200  l.witness(41.into());
201  assert_eq!(l.time(), 42.into());
202
203  l.witness(30.into());
204  assert_eq!(l.time(), 42.into());
205}