use crate::{GlobalTime, PeerID, ServerWatermarksLE, ServerWatermarksLEExt, Srw, Svw};
use async_std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
collections::BTreeSet,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
pub trait Clock: Send + Sync {
fn current_time(&self) -> SystemTime;
}
pub struct RealClock;
impl Clock for RealClock {
fn current_time(&self) -> SystemTime {
std::time::SystemTime::now()
}
}
pub struct TestClock(Arc<AtomicU64>);
impl TestClock {
pub fn new() -> Self {
TestClock(Arc::new(AtomicU64::from(0)))
}
}
impl Clock for TestClock {
fn current_time(&self) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_millis(self.0.fetch_add(1, Ordering::SeqCst))
}
}
pub struct TidMgr {
clock: Box<dyn Clock>,
prev: GlobalTime,
pub(crate) wq_set: BTreeSet<GlobalTime>,
wa_set: BTreeSet<GlobalTime>,
}
impl TidMgr {
pub(crate) fn new(self_id: PeerID, clock: Box<dyn Clock>) -> Self {
TidMgr {
clock,
prev: GlobalTime::time_zero_for(self_id),
wq_set: Default::default(),
wa_set: Default::default(),
}
}
pub(crate) fn current_time(&self) -> SystemTime {
self.clock.current_time()
}
pub(crate) fn self_id(&self) -> PeerID {
self.prev.peer
}
pub(crate) fn create_timestamp(&mut self) -> GlobalTime {
let now = self.clock.current_time();
let next_millis = match now.duration_since(UNIX_EPOCH) {
Err(_) => None,
Ok(dur) => {
let secs: u64 = dur.as_secs();
let millis: u32 = dur.subsec_millis();
let mut ms = secs
.checked_mul(1000)
.expect("create_timestamp sec-to-ms overflow");
ms = ms
.checked_add(millis as u64)
.expect("create_timestamp ms-addition overflow");
Some(ms)
}
};
self.prev = match next_millis {
Some(millis) if millis > self.prev.milli_secs => {
self.prev.with_milli_sec(millis).with_event(1)
}
_ => self.prev.next_event(),
};
self.prev
}
pub(crate) fn create_watermark_and_start_s_phase(&mut self) -> GlobalTime {
let ts = self.create_timestamp();
self.wq_set.insert(ts);
self.wa_set.insert(ts);
ts
}
pub(crate) fn stored(&mut self, ts: GlobalTime) {
assert!(self.wq_set.remove(&ts));
}
pub(crate) fn fully_replicated(&mut self, ts: GlobalTime) {
assert!(!self.wq_set.contains(&ts));
assert!(self.wa_set.remove(&ts));
}
pub(crate) fn svw(&mut self) -> Svw {
match self.wq_set.iter().next() {
None => Svw(self.create_timestamp()),
Some(ts) => Svw(ts.clone()),
}
}
pub(crate) fn srw(&mut self) -> Srw {
match self.wa_set.iter().next() {
None => Srw(self.create_timestamp()),
Some(ts) => Srw(ts.clone()),
}
}
pub(crate) fn server_watermarks(&mut self) -> ServerWatermarksLE {
ServerWatermarksLE::new(self.svw(), self.srw())
}
}