1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//! This module contains utility methods and structs for handling time.
//!
//! SlateDB has two concepts of time:
//!
//! 1. The [SystemClock], which is used to measure wall-clock time for things
//! like garbage collection schedule ticks, compaction schedule ticks, and so
//! on.
//! 2. The [LogicalClock], which is a monotonically increasing number used to order
//! writes in the database. This could represent a logical sequence number
//! (LSN) from a database, a Kafka offset, a `created_at` timestamp
//! associated with the write, and so on.
//!
//! The [SystemClock] struct is provided in `slatedb-common`. The `slatedb` package
//! builds on top of that and provides the [LogicalClock] abstraction.
//! [DefaultLogicalClock] implements a logical clock that wraps the
//! [DefaultSystemClock] and returns the number of milliseconds since the Unix epoch.
#![allow(clippy::disallowed_methods)]
use crate::error::SlateDBError;
use log::debug;
use slatedb_common::clock::SystemClock;
use std::{
cmp,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
time::Duration,
};
/// SlateDB uses MonotonicClock internally so that it can enforce that clock ticks
/// from the underlying implementation are monotonically increasing.
#[allow(dead_code)] // unused during DST
pub(crate) struct MonotonicClock {
pub(crate) last_tick: AtomicI64,
pub(crate) last_durable_tick: AtomicI64,
delegate: Arc<dyn SystemClock>,
}
impl MonotonicClock {
pub(crate) fn new(delegate: Arc<dyn SystemClock>, init_tick: i64) -> Self {
Self {
delegate,
last_tick: AtomicI64::new(init_tick),
last_durable_tick: AtomicI64::new(init_tick),
}
}
pub(crate) fn set_last_tick(&self, tick: i64) -> Result<i64, SlateDBError> {
self.enforce_monotonic(tick)
}
pub(crate) fn fetch_max_last_durable_tick(&self, tick: i64) -> i64 {
self.last_durable_tick.fetch_max(tick, Ordering::SeqCst)
}
pub(crate) async fn now(&self) -> Result<i64, SlateDBError> {
let tick = self.delegate.now().timestamp_millis();
match self.enforce_monotonic(tick) {
Err(SlateDBError::InvalidClockTick {
last_tick,
next_tick: _,
}) => {
let sync_millis = cmp::min(10_000, 2 * (last_tick - tick).unsigned_abs());
debug!(
"Clock tick {} is lagging behind the last known tick {}. \
Sleeping {}ms to potentially resolve skew before returning InvalidClockTick.",
tick, last_tick, sync_millis
);
tokio::time::sleep(Duration::from_millis(sync_millis)).await;
self.enforce_monotonic(self.delegate.now().timestamp_millis())
}
result => result,
}
}
fn enforce_monotonic(&self, tick: i64) -> Result<i64, SlateDBError> {
let updated_last_tick = self.last_tick.fetch_max(tick, Ordering::SeqCst);
if tick < updated_last_tick {
return Err(SlateDBError::InvalidClockTick {
last_tick: updated_last_tick,
next_tick: tick,
});
}
Ok(tick)
}
}