1use std::sync::{
2 atomic::{AtomicU64, Ordering},
3 Arc,
4};
5
6use transformable::{utils::*, Transformable};
7
8#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11#[cfg_attr(feature = "serde", serde(transparent))]
12#[repr(transparent)]
13pub struct LamportTime(pub(crate) u64);
14
15impl core::fmt::Display for LamportTime {
16 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
17 write!(f, "{}", self.0)
18 }
19}
20
21impl From<u64> for LamportTime {
22 fn from(time: u64) -> Self {
23 Self(time)
24 }
25}
26
27impl From<LamportTime> for u64 {
28 fn from(time: LamportTime) -> Self {
29 time.0
30 }
31}
32
33impl LamportTime {
34 pub const ZERO: Self = Self(0);
36
37 #[inline]
39 pub const fn new(time: u64) -> Self {
40 Self(time)
41 }
42
43 #[inline]
45 pub const fn to_be_bytes(self) -> [u8; 8] {
46 self.0.to_be_bytes()
47 }
48
49 #[inline]
51 pub const fn to_le_bytes(self) -> [u8; 8] {
52 self.0.to_le_bytes()
53 }
54
55 #[inline]
57 pub const fn from_be_bytes(bytes: [u8; 8]) -> Self {
58 Self(u64::from_be_bytes(bytes))
59 }
60
61 #[inline]
63 pub const fn from_le_bytes(bytes: [u8; 8]) -> Self {
64 Self(u64::from_le_bytes(bytes))
65 }
66}
67
68impl core::ops::Add<Self> for LamportTime {
69 type Output = Self;
70
71 #[inline]
72 fn add(self, rhs: Self) -> Self::Output {
73 Self(self.0 + rhs.0)
74 }
75}
76
77impl core::ops::Sub<Self> for LamportTime {
78 type Output = Self;
79
80 #[inline]
81 fn sub(self, rhs: Self) -> Self::Output {
82 Self(self.0 - rhs.0)
83 }
84}
85
86impl core::ops::Rem<Self> for LamportTime {
87 type Output = Self;
88
89 #[inline]
90 fn rem(self, rhs: Self) -> Self::Output {
91 Self(self.0 % rhs.0)
92 }
93}
94
95#[derive(thiserror::Error, Debug)]
97pub enum LamportTimeTransformError {
98 #[error(transparent)]
100 Encode(#[from] InsufficientBuffer),
101 #[error(transparent)]
103 Decode(#[from] DecodeVarintError),
104}
105
106impl Transformable for LamportTime {
107 type Error = LamportTimeTransformError;
108
109 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
110 encode_u64_varint(self.0, dst).map_err(Into::into)
111 }
112
113 fn encoded_len(&self) -> usize {
114 encoded_u64_varint_len(self.0)
115 }
116
117 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
118 where
119 Self: Sized,
120 {
121 decode_u64_varint(src)
122 .map(|(n, time)| (n, Self(time)))
123 .map_err(Into::into)
124 }
125}
126
127#[derive(Debug, Clone)]
131pub struct LamportClock(Arc<AtomicU64>);
132
133impl Default for LamportClock {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl LamportClock {
140 #[inline]
142 pub fn new() -> Self {
143 Self(Arc::new(AtomicU64::new(0)))
144 }
145
146 #[inline]
148 pub fn time(&self) -> LamportTime {
149 LamportTime(self.0.load(Ordering::SeqCst))
150 }
151
152 #[inline]
154 pub fn increment(&self) -> LamportTime {
155 LamportTime(self.0.fetch_add(1, Ordering::SeqCst) + 1)
156 }
157
158 #[inline]
161 pub fn witness(&self, time: LamportTime) {
162 loop {
163 let current = self.0.load(Ordering::SeqCst);
165 if time.0 < current {
166 return;
167 }
168
169 match self
171 .0
172 .compare_exchange_weak(current, time.0 + 1, Ordering::SeqCst, Ordering::SeqCst)
173 {
174 Ok(_) => return,
175 Err(_) => continue,
176 }
177 }
178 }
179}
180
181#[cfg(test)]
182impl LamportTime {
183 pub(crate) fn random() -> Self {
184 use rand::Rng;
185 Self(rand::thread_rng().gen_range(0..u64::MAX))
186 }
187}
188
189#[test]
190fn test_lamport_clock() {
191 let l = LamportClock::new();
192
193 assert_eq!(l.time(), 0.into());
194 assert_eq!(l.increment(), 1.into());
195 assert_eq!(l.time(), 1.into());
196
197 l.witness(41.into());
198 assert_eq!(l.time(), 42.into());
199
200 l.witness(41.into());
201 assert_eq!(l.time(), 42.into());
202
203 l.witness(30.into());
204 assert_eq!(l.time(), 42.into());
205}