1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
//! This module contains utility methods and structs for handling time.
//!
//! SlateDB has two concepts of time:
//!
//! 1. The [SystemClock], which is used to measure wall-clock time for things
//! like garbage collection schedule ticks, compaction schedule ticks, and so
//! on.
//! 2. The [LogicalClock], which is a monotonically increasing number used to order
//! writes in the database. This could represent a logical sequence number
//! (LSN) from a database, a Kafka offset, a `created_at` timestamp
//! associated with the write, and so on.
//!
//! We've chosen to implement our own [SystemClock] so we can mock it for testing
//! purposes. Mocks are available when the `test-util` feature is enabled.
//!
//! [DefaultSystemClock] and [DefaultLogicalClock] are both provided as well.
//! [DefaultSystemClock] implements a system clock that uses Tokio's clock to measure
//! time duration. [DefaultLogicalClock] implements a logical clock that wraps
//! the [DefaultSystemClock] and returns the number of milliseconds since the
//! Unix epoch.
#![allow(clippy::disallowed_methods)]
use std::{
cmp,
fmt::Debug,
future::Future,
pin::Pin,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
time::Duration,
};
use crate::error::SlateDBError;
use chrono::{DateTime, Utc};
use log::info;
/// Defines the physical clock that SlateDB will use to measure time for things
/// like garbage collection schedule ticks, compaction schedule ticks, and so on.
pub trait SystemClock: Debug + Send + Sync {
/// Returns the current time
fn now(&self) -> DateTime<Utc>;
/// Advances the clock by the specified duration
#[cfg(feature = "test-util")]
fn advance<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
/// Sleeps for the specified duration
fn sleep<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
/// Returns a ticker that emits a signal every `duration` interval
fn ticker<'a>(&'a self, duration: Duration) -> SystemClockTicker<'a>;
}
/// A ticker that emits a signal every `duration` interval. This allows us to use our
/// clock to control ticking.
///
/// The first tick will complete immediately. Subsequent ticks will complete after the
/// specified duration has elapsed. This is to mimic Tokio's ticker behavior.
pub struct SystemClockTicker<'a> {
clock: &'a dyn SystemClock,
duration: Duration,
last_tick: DateTime<Utc>,
}
impl<'a> SystemClockTicker<'a> {
/// Creates a new ticker that emits a signal every `duration` interval.
pub fn new(clock: &'a dyn SystemClock, duration: Duration) -> Self {
Self {
clock,
duration,
last_tick: DateTime::<Utc>::MIN_UTC,
}
}
/// Returns a future that emits a signal every `duration` interval. The next tick is
/// calculated as last_tick + duration. The first tick will complete immediately.
/// This is to mimic Tokio's ticker behavior.
///
/// If the clock advances more than the duration between `tick()` calls, the ticker
/// will tick immediately.
pub fn tick(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
let sleep_duration = self.calc_duration();
self.clock.sleep(sleep_duration).await;
self.last_tick = self.clock.now();
})
}
/// Calculates the duration until the next tick.
///
/// The duration is calculated as `duration - (now - last_tick)`.
fn calc_duration(&self) -> Duration {
let zero = Duration::from_millis(0);
let now_dt = self.clock.now();
let elapsed = now_dt
.signed_duration_since(self.last_tick)
.to_std()
.expect("elapsed time is negative");
// If we've already passed the next tick, sleep for 0ms to tick immediately.
self.duration.checked_sub(elapsed).unwrap_or(zero)
}
}
/// A system clock implementation that uses tokio::time::Instant to measure time duration.
/// Utc::now() is used to track the initial timestamp (ms since Unix epoch). This DateTime
/// is used to convert the tokio::time::Instant to a DateTime when now() is called.
///
/// Note that, because we're using tokio::time::Instant, manipulating tokio's clock with
/// tokio::time::pause(), tokio::time::advance(), and so on will affect the
/// DefaultSystemClock's time as well.
#[derive(Debug)]
pub struct DefaultSystemClock {
initial_ts: DateTime<Utc>,
initial_instant: tokio::time::Instant,
}
impl DefaultSystemClock {
pub fn new() -> Self {
Self {
initial_ts: Utc::now(),
initial_instant: tokio::time::Instant::now(),
}
}
}
impl Default for DefaultSystemClock {
fn default() -> Self {
Self::new()
}
}
impl SystemClock for DefaultSystemClock {
fn now(&self) -> DateTime<Utc> {
let elapsed = tokio::time::Instant::now().duration_since(self.initial_instant);
self.initial_ts + elapsed
}
#[cfg(feature = "test-util")]
fn advance<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(tokio::time::advance(duration))
}
fn sleep<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(tokio::time::sleep(duration))
}
fn ticker<'a>(&'a self, duration: Duration) -> SystemClockTicker<'a> {
SystemClockTicker::new(self, duration)
}
}
/// A mock system clock implementation that uses an atomic i64 to track time.
/// The clock always starts at 0 (the Unix epoch). Time only advances when the
/// `advance` method is called.
#[derive(Debug)]
#[cfg(feature = "test-util")]
pub struct MockSystemClock {
/// The current timestamp in milliseconds since the Unix epoch.
/// Can be negative to represent a time before the epoch.
current_ts: AtomicI64,
}
#[cfg(feature = "test-util")]
impl Default for MockSystemClock {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "test-util")]
impl MockSystemClock {
pub fn new() -> Self {
Self::with_time(0)
}
/// Creates a new mock system clock with the specified timestamp
pub fn with_time(ts_millis: i64) -> Self {
Self {
current_ts: AtomicI64::new(ts_millis),
}
}
/// Sets the current timestamp of the mock system clock
pub fn set(&self, ts_millis: i64) {
self.current_ts.store(ts_millis, Ordering::SeqCst);
}
}
#[cfg(feature = "test-util")]
impl SystemClock for MockSystemClock {
#[allow(clippy::panic)]
fn now(&self) -> DateTime<Utc> {
let current_ts = self.current_ts.load(Ordering::SeqCst);
DateTime::from_timestamp_millis(current_ts)
.unwrap_or_else(|| panic!("invalid timestamp: {}", current_ts))
}
fn advance<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.current_ts
.fetch_add(duration.as_millis() as i64, Ordering::SeqCst);
Box::pin(async move {
// An empty async block always returns Poll::Ready(()) because nothing inside
// the block can yield control to other tasks. Calling advance() in a tight loop
// would prevent other tasks from running in this case. Yielding control to other
// tasks explicitly so we avoid this issue.
tokio::task::yield_now().await;
})
}
/// Sleeps for the specified duration. Note that sleep() does not advance the clock.
/// Another thread or task must call advance() to advance the clock to unblock the sleep.
fn sleep<'a>(&'a self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let end_time = self.current_ts.load(Ordering::SeqCst) + duration.as_millis() as i64;
Box::pin(async move {
#[allow(clippy::while_immutable_condition)]
while self.current_ts.load(Ordering::SeqCst) < end_time {
tokio::task::yield_now().await;
}
})
}
fn ticker<'a>(&'a self, duration: Duration) -> SystemClockTicker<'a> {
SystemClockTicker::new(self, duration)
}
}
/// Defines the logical clock that SlateDB will use to measure time for things
/// like TTL expiration.
pub trait LogicalClock: Debug + Send + Sync {
/// Returns a timestamp (typically measured in millis since the unix epoch).
/// Must return monotonically increasing numbers (this is enforced
/// at runtime and will panic if the invariant is broken).
///
/// Note that this clock does not need to return a number that
/// represents a unix timestamp; the only requirement is that
/// it represents a sequence that can attribute a logical ordering
/// to actions on the database.
fn now(&self) -> i64;
}
/// A logical clock implementation that wraps the [DefaultSystemClock]
/// and returns the number of milliseconds since the Unix epoch.
#[derive(Debug)]
pub struct DefaultLogicalClock {
last_ts: AtomicI64,
inner: Arc<dyn SystemClock>,
}
impl Default for DefaultLogicalClock {
fn default() -> Self {
Self::new()
}
}
impl DefaultLogicalClock {
pub fn new() -> Self {
Self {
inner: Arc::new(DefaultSystemClock::new()),
last_ts: AtomicI64::new(i64::MIN),
}
}
}
impl LogicalClock for DefaultLogicalClock {
fn now(&self) -> i64 {
let current_ts = self.inner.now().timestamp_millis();
self.last_ts.fetch_max(current_ts, Ordering::SeqCst);
self.last_ts.load(Ordering::SeqCst)
}
}
/// A mock logical clock implementation that uses an atomic i64 to track time.
/// The clock always starts at i64::MIN and increments by 1 on each call to now().
/// It is fully deterministic.
#[cfg(feature = "test-util")]
#[derive(Debug)]
pub struct MockLogicalClock {
current_tick: AtomicI64,
}
#[cfg(feature = "test-util")]
impl Default for MockLogicalClock {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "test-util")]
impl MockLogicalClock {
pub fn new() -> Self {
Self {
current_tick: AtomicI64::new(i64::MIN),
}
}
}
#[cfg(feature = "test-util")]
impl LogicalClock for MockLogicalClock {
fn now(&self) -> i64 {
self.current_tick.fetch_add(1, Ordering::SeqCst)
}
}
/// SlateDB uses MonotonicClock internally so that it can enforce that clock ticks
/// from the underlying implementation are monotonically increasing.
pub(crate) struct MonotonicClock {
pub(crate) last_tick: AtomicI64,
pub(crate) last_durable_tick: AtomicI64,
delegate: Arc<dyn LogicalClock>,
}
impl MonotonicClock {
pub(crate) fn new(delegate: Arc<dyn LogicalClock>, init_tick: i64) -> Self {
Self {
delegate,
last_tick: AtomicI64::new(init_tick),
last_durable_tick: AtomicI64::new(init_tick),
}
}
pub(crate) fn set_last_tick(&self, tick: i64) -> Result<i64, SlateDBError> {
self.enforce_monotonic(tick)
}
pub(crate) fn fetch_max_last_durable_tick(&self, tick: i64) -> i64 {
self.last_durable_tick.fetch_max(tick, Ordering::SeqCst)
}
pub(crate) fn get_last_durable_tick(&self) -> i64 {
self.last_durable_tick.load(Ordering::SeqCst)
}
pub(crate) async fn now(&self) -> Result<i64, SlateDBError> {
let tick = self.delegate.now();
match self.enforce_monotonic(tick) {
Err(SlateDBError::InvalidClockTick {
last_tick,
next_tick: _,
}) => {
let sync_millis = cmp::min(10_000, 2 * (last_tick - tick).unsigned_abs());
info!(
"Clock tick {} is lagging behind the last known tick {}. \
Sleeping {}ms to potentially resolve skew before returning InvalidClockTick.",
tick, last_tick, sync_millis
);
tokio::time::sleep(Duration::from_millis(sync_millis)).await;
self.enforce_monotonic(self.delegate.now())
}
result => result,
}
}
fn enforce_monotonic(&self, tick: i64) -> Result<i64, SlateDBError> {
let updated_last_tick = self.last_tick.fetch_max(tick, Ordering::SeqCst);
if tick < updated_last_tick {
return Err(SlateDBError::InvalidClockTick {
last_tick: updated_last_tick,
next_tick: tick,
});
}
Ok(tick)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_mock_system_clock_default() {
let clock = MockSystemClock::default();
assert_eq!(
clock.now().timestamp_millis(),
0,
"Default MockSystemClock should start at timestamp 0"
);
}
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_mock_system_clock_set_now() {
let clock = Arc::new(MockSystemClock::new());
// Test positive timestamp
let positive_ts = 1625097600000i64; // 2021-07-01T00:00:00Z in milliseconds
clock.clone().set(positive_ts);
assert_eq!(
clock.now().timestamp_millis(),
positive_ts,
"MockSystemClock should return the timestamp set with set_now"
);
// Test negative timestamp (before Unix epoch)
let negative_ts = -1625097600000; // Before Unix epoch
clock.clone().set(negative_ts);
assert_eq!(
clock.now().timestamp_millis(),
negative_ts,
"MockSystemClock should handle negative timestamps correctly"
);
}
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_mock_system_clock_advance() {
let clock = Arc::new(MockSystemClock::new());
let initial_ts = 1000;
// Set initial time
clock.clone().set(initial_ts);
// Advance by 500ms
let duration = Duration::from_millis(500);
clock.clone().advance(duration).await;
// Check that time advanced correctly
assert_eq!(
clock.now().timestamp_millis(),
initial_ts + 500,
"MockSystemClock should advance time by the specified duration"
);
}
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_mock_system_clock_sleep() {
let clock = Arc::new(MockSystemClock::new());
let initial_ts = 2000;
// Set initial time
clock.clone().set(initial_ts);
// Start sleep for 1000ms
let sleep_duration = Duration::from_millis(1000);
let sleep_handle1 = clock.sleep(sleep_duration);
let sleep_handle2 = clock.sleep(sleep_duration);
let sleep_handle3 = clock.sleep(sleep_duration);
// Verify sleep doesn't complete immediately
assert!(
timeout(Duration::from_millis(10), sleep_handle1)
.await
.is_err(),
"Sleep should not complete until time advances"
);
// Advance clock by 500ms (not enough to complete sleep)
clock.set(initial_ts + 500);
assert!(
timeout(Duration::from_millis(10), sleep_handle2)
.await
.is_err(),
"Sleep should not complete when time has advanced by less than sleep duration"
);
// Advance clock by enough to complete sleep
clock.set(initial_ts + 1000);
assert!(
timeout(Duration::from_millis(100), sleep_handle3)
.await
.is_ok(),
"Sleep should complete when time has advanced by at least sleep duration"
);
}
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_mock_system_clock_ticker() {
let clock = Arc::new(MockSystemClock::new());
let tick_duration = Duration::from_millis(100);
// Create a ticker
let mut ticker = clock.ticker(tick_duration);
// First tick should complete immediately
assert!(
timeout(Duration::from_millis(10000), ticker.tick())
.await
.is_ok(),
"First tick should complete immediately"
);
// Next tick should not complete because time hasn't advanced
assert!(
timeout(Duration::from_millis(100), ticker.tick())
.await
.is_err(),
"Second tick should not complete until time advances"
);
// The the ticker future before we advance the clock so it's end time is
// now + 100. Then advance the clock by 100ms and verify the tick
// completes.
clock.clone().set(100);
assert!(
timeout(Duration::from_millis(10000), ticker.tick())
.await
.is_ok(),
"Tick should complete when time has advanced by at least tick duration"
);
}
#[tokio::test(start_paused = true)]
async fn test_default_system_clock_now() {
let clock = Arc::new(DefaultSystemClock::new());
// Record initial time
let initial_now = clock.now();
// Sleep a bit
let sleep_duration = Duration::from_millis(100);
clock.clone().sleep(sleep_duration).await;
// Check that time advances
let new_now = clock.clone().now();
assert_eq!(
new_now,
initial_now + sleep_duration,
"DefaultSystemClock now() should advance with time"
);
}
#[tokio::test(start_paused = true)]
#[cfg(feature = "test-util")]
async fn test_default_system_clock_advance() {
let clock = Arc::new(DefaultSystemClock::new());
let start = clock.now();
let duration = Duration::from_millis(500);
clock.clone().advance(duration).await;
// Check that time advanced correctly
assert_eq!(
start + duration,
clock.now(),
"DefaultSystemClock should advance time by the specified duration"
);
}
#[tokio::test(start_paused = true)]
async fn test_default_system_clock_ticker() {
let clock = Arc::new(DefaultSystemClock::new());
let tick_duration = Duration::from_millis(10);
// Create a ticker
let mut ticker = clock.ticker(tick_duration);
// First tick should complete immediately
assert!(
timeout(Duration::from_millis(10000), ticker.tick())
.await
.is_ok(),
"First tick should complete immediately"
);
// Tokio auto-advances the time on when sleep() is called and only
// timer futures remain. Calling tick() here will bump the clock by
// the tick duration.
let before = clock.now();
ticker.tick().await;
let after = clock.now();
assert_eq!(before + tick_duration, after);
}
}