hybrid_clocks/
lib.rs

1//! Implementation of Hybrid Logical Clocks.
2//!
3//! This is based on the paper "Logical Physical Clocks and Consistent
4//! Snapshots in Globally Distributed Databases". Provides a
5//! strictly-monotonic clock that can be used to determine if one event
6//! `happens-before` another.
7
8#![deny(warnings)]
9
10#[cfg(feature = "serialization")]
11extern crate serde;
12#[cfg(feature = "serialization")]
13#[macro_use]
14extern crate serde_derive;
15#[cfg(all(feature = "serialization", test))]
16extern crate serde_json;
17
18use std::cmp::Ordering;
19use std::fmt;
20
21use thiserror::Error;
22
23mod source;
24pub use crate::source::*;
25
26#[derive(Debug, Error)]
27pub enum Error {
28    #[error("Offset greater than limit")]
29    OffsetTooGreat,
30    #[error("Outside of specified offset")]
31    SystemTime(#[from] std::time::SystemTimeError),
32    #[error("Integer conversion error")]
33    FromInt(#[from] std::num::TryFromIntError),
34    #[error("Outside supported time range: {0}ticks")]
35    SupportedTime(u128),
36}
37
38pub type Result<T> = std::result::Result<T, Error>;
39
40/// A value that represents a logical timestamp.
41///
42/// These allow us to describe at least a partial ordering over events, in the
43/// same style as Lamport Clocks. In summary, if `a < b` then we can say that `a` logically
44/// `happens-before` `b`. Because they are scalar values, they can't be used to tell between whether:
45///
46///  * `a` happenned concurrently with `b`, or
47///  * `a` is part of `b`'s causal history, or vica-versa.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
49pub struct Timestamp<T> {
50    /// An epoch counter.
51    pub epoch: u32,
52    /// The Wall-clock time as returned by the clock source.
53    pub time: T,
54    /// A Lamport clock used to disambiguate events that are given the same
55    /// Wall-clock time. This is reset whenever `time` is incremented.
56    pub count: u32,
57}
58
59/// The main clock type.
60#[derive(Debug, Clone)]
61pub struct Clock<S: ClockSource> {
62    src: S,
63    epoch: u32,
64    last_observed: Timestamp<S::Time>,
65}
66
67/// A wrapper around `Clock` that will refuse updates outside of our tolerance.
68#[derive(Debug, Clone)]
69pub struct OffsetLimiter<S: ClockSource> {
70    clock: Clock<S>,
71    max_offset: S::Delta,
72}
73
74impl Clock<WallNS> {
75    /// Returns a `Clock` that uses WallNS-clock time.
76    pub fn wall_ns() -> Result<Clock<WallNS>> {
77        Clock::new(WallNS)
78    }
79}
80
81impl Clock<WallMS> {
82    /// Returns a `Clock` that uses WallMS-clock time.
83    pub fn wall_ms() -> Result<Clock<WallMS>> {
84        Clock::new(WallMS)
85    }
86}
87
88impl Clock<ManualClock> {
89    /// Returns a `Clock` that uses ManualClock-clock time.
90    pub fn manual(t: u64) -> Result<Clock<ManualClock>> {
91        Clock::new(ManualClock::new(t))
92    }
93    pub fn set_time(&mut self, t: u64) {
94        self.src.set_time(t)
95    }
96}
97
98impl<S: ClockSource> Clock<S> {
99    /// Creates a clock with `src` as the time provider.
100    pub fn new(mut src: S) -> Result<Self> {
101        let init = src.now()?;
102        let clock = Clock {
103            src: src,
104            last_observed: Timestamp {
105                epoch: 0,
106                time: init,
107                count: 0,
108            },
109            epoch: 0,
110        };
111        Ok(clock)
112    }
113
114    /// Creates a clock with `src` as the time provider, and `diff` as how far
115    /// in the future we don't mind seeing updates from.
116    pub fn with_max_diff(self, max_offset: S::Delta) -> OffsetLimiter<S> {
117        OffsetLimiter::new(self, max_offset)
118    }
119
120    /// Used to create a new "epoch" of clock times, mostly useful as a manual
121    /// override when a cluster member has skewed the clock time far
122    /// into the future.
123    pub fn set_epoch(&mut self, epoch: u32) {
124        self.epoch = epoch;
125    }
126
127    /// Creates a unique monotonic timestamp suitable for annotating messages we send.
128    pub fn now(&mut self) -> Result<Timestamp<S::Time>> {
129        let pt = self.read_pt()?;
130        self.do_observe(&pt);
131        Ok(self.last_observed)
132    }
133
134    fn do_observe(&mut self, observation: &Timestamp<S::Time>) {
135        let lp = self.last_observed.clone();
136
137        self.last_observed = match (
138            lp.epoch.cmp(&observation.epoch),
139            lp.time.cmp(&observation.time),
140            lp.count.cmp(&observation.count),
141        ) {
142            (Ordering::Less, _, _) | (Ordering::Equal, Ordering::Less, _) => observation.clone(),
143            (Ordering::Equal, Ordering::Equal, Ordering::Less) => Timestamp {
144                count: observation.count + 1,
145                ..lp
146            },
147            _ => Timestamp {
148                count: lp.count + 1,
149                ..lp
150            },
151        };
152    }
153
154    /// Accepts a timestamp from an incoming message, and updates the clock
155    /// so that further calls to `now` will always return a timestamp that
156    /// `happens-after` either locally generated timestamps or that of the
157    /// input message.
158    pub fn observe(&mut self, msg: &Timestamp<S::Time>) {
159        self.do_observe(&msg);
160    }
161
162    fn read_pt(&mut self) -> Result<Timestamp<S::Time>> {
163        Ok(Timestamp {
164            epoch: self.epoch,
165            time: self.src.now()?,
166            count: 0,
167        })
168    }
169}
170impl<S: ClockSource> OffsetLimiter<S> {
171    pub fn new(clock: Clock<S>, max_offset: S::Delta) -> Self {
172        OffsetLimiter { clock, max_offset }
173    }
174    /// Accepts a timestamp from an incoming message, and updates the clock
175    /// so that further calls to `now` will always return a timestamp that
176    /// `happens-after` either locally generated timestamps or that of the
177    /// input message. Returns an Error iff the delta from our local lock to
178    /// the observed timestamp is greater than our configured limit.
179    pub fn observe(&mut self, msg: &Timestamp<S::Time>) -> Result<()> {
180        let pt = self.clock.read_pt()?;
181        self.verify_offset(&pt, msg)?;
182        self.clock.observe(&msg);
183        Ok(())
184    }
185
186    /// Creates a unique monotonic timestamp suitable for annotating messages we send.
187    pub fn now(&mut self) -> Result<Timestamp<S::Time>> {
188        self.clock.now()
189    }
190
191    fn verify_offset(&self, pt: &Timestamp<S::Time>, msg: &Timestamp<S::Time>) -> Result<()> {
192        // Guard from overflow when `S::Time.time` uses unsigned arithmetic.
193        if msg.time <= pt.time {
194            return Ok(())
195        }
196
197        let diff = msg.time - pt.time;
198        if diff > self.max_offset {
199            return Err(Error::OffsetTooGreat);
200        }
201
202        Ok(())
203    }
204
205    /// Extract the inner `Clock`
206    pub fn into_inner(self) -> Clock<S> {
207        self.clock
208    }
209
210    /// Get a reference to the inner `Clock`
211    pub fn inner(&self) -> &Clock<S> {
212        &self.clock
213    }
214
215    /// Get a mutable reference to the inner `Clock`
216    pub fn inner_mut(&mut self) -> &mut Clock<S> {
217        &mut self.clock
218    }
219}
220
221impl<T: fmt::Display> fmt::Display for Timestamp<T> {
222    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
223        write!(fmt, "{}:{}+{}", self.epoch, self.time, self.count)
224    }
225}
226
227impl<T> Timestamp<T> {
228    pub fn time_into<U: From<T>>(self) -> Timestamp<U> {
229        Timestamp {
230            epoch: self.epoch,
231            time: self.time.into(),
232            count: self.count,
233        }
234    }
235}
236
237#[cfg(feature = "serialization")]
238mod serde_impl;
239
240#[cfg(test)]
241mod tests {
242    use super::Timestamp;
243    use suppositions::generators::*;
244
245    pub fn timestamps<C: Generator + 'static>(
246        times: C,
247    ) -> Box<dyn GeneratorObject<Item = Timestamp<C::Item>>> {
248        let epochs = u32s();
249        let counts = u32s();
250        (epochs, times, counts)
251            .map(|(epoch, time, count)| Timestamp { epoch, time, count })
252            .boxed()
253    }
254}