use std::{
sync::atomic::AtomicI64,
time::{SystemTime, UNIX_EPOCH},
};
use std::sync::Mutex;
use std::sync::atomic::Ordering;
use tokio::time::{Duration, Instant};
use tracing::warn;
pub trait TimestampGenerator: Send + Sync {
fn next_timestamp(&self) -> i64;
}
#[derive(Default, Debug)]
pub struct SimpleTimestampGenerator {}
impl SimpleTimestampGenerator {
pub fn new() -> Self {
SimpleTimestampGenerator {}
}
}
impl TimestampGenerator for SimpleTimestampGenerator {
fn next_timestamp(&self) -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64
}
}
#[derive(Debug)]
struct MonotonicTimestampGeneratorWarningsCfg {
warning_threshold: Duration,
warning_interval: Duration,
}
#[derive(Debug)]
pub struct MonotonicTimestampGenerator {
last: AtomicI64,
last_warning: Mutex<Instant>,
config: Option<MonotonicTimestampGeneratorWarningsCfg>,
}
impl MonotonicTimestampGenerator {
pub fn new() -> Self {
MonotonicTimestampGenerator {
last: AtomicI64::new(0),
last_warning: Mutex::new(Instant::now()),
config: Some(MonotonicTimestampGeneratorWarningsCfg {
warning_threshold: Duration::from_secs(1),
warning_interval: Duration::from_secs(1),
}),
}
}
pub fn with_warning_times(
mut self,
warning_threshold: Duration,
warning_interval: Duration,
) -> Self {
self.config = Some(MonotonicTimestampGeneratorWarningsCfg {
warning_threshold,
warning_interval,
});
self
}
pub fn without_warnings(mut self) -> Self {
self.config = None;
self
}
fn compute_next(&self, last: i64) -> i64 {
let current = SystemTime::now().duration_since(UNIX_EPOCH);
if let Ok(cur_time) = current {
let u_cur = cur_time.as_micros() as i64;
if u_cur > last {
return u_cur;
} else if let Some(cfg) = self.config.as_ref() {
if last - u_cur > cfg.warning_threshold.as_micros() as i64 {
let mut last_warn = self.last_warning.lock().unwrap();
let now = Instant::now();
if now >= last_warn.checked_add(cfg.warning_interval).unwrap() {
*last_warn = now;
drop(last_warn);
warn!(
"Clock skew detected. The current time ({}) was {} \
microseconds behind the last generated timestamp ({}). \
The next generated timestamp will be artificially incremented \
to guarantee monotonicity.",
u_cur,
last - u_cur,
last
)
}
}
}
} else {
warn!("Clock skew detected. The current time was behind UNIX epoch.");
}
last + 1
}
}
impl Default for MonotonicTimestampGenerator {
fn default() -> Self {
Self::new()
}
}
impl TimestampGenerator for MonotonicTimestampGenerator {
fn next_timestamp(&self) -> i64 {
loop {
let last = self.last.load(Ordering::SeqCst);
let cur = self.compute_next(last);
if self
.last
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return cur;
}
}
}
}
#[test]
fn monotonic_timestamp_generator_is_monotonic() {
const NUMBER_OF_ITERATIONS: u32 = 1000;
let mut prev = None;
let mut cur;
let generator = MonotonicTimestampGenerator::new();
for _ in 0..NUMBER_OF_ITERATIONS {
cur = generator.next_timestamp();
if let Some(prev_val) = prev {
assert!(cur > prev_val);
}
prev = Some(cur);
}
}
#[test]
fn monotonic_timestamp_generator_is_monotonic_with_concurrency() {
use std::collections::HashSet;
use std::sync::Arc;
const NUMBER_OF_ITERATIONS: usize = 1000;
const NUMBER_OF_THREADS: usize = 10;
let generator = Arc::new(MonotonicTimestampGenerator::new());
let timestamps_sets: Vec<_> = std::thread::scope(|s| {
(0..NUMBER_OF_THREADS)
.map(|_| {
s.spawn(|| {
let timestamps: Vec<i64> = (0..NUMBER_OF_ITERATIONS)
.map(|_| generator.next_timestamp())
.collect();
assert!(timestamps.windows(2).all(|w| w[0] < w[1]));
let timestamps_set: HashSet<i64> = HashSet::from_iter(timestamps);
assert_eq!(
timestamps_set.len(),
NUMBER_OF_ITERATIONS,
"Colliding values in a single thread"
);
timestamps_set
})
})
.map(|handle| handle.join().unwrap())
.collect()
});
let full_set: HashSet<i64> = timestamps_sets.iter().flatten().copied().collect();
assert_eq!(
full_set.len(),
NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS,
"Colliding values between threads"
);
}