1use std::time::Duration;
2
3use datacake_crdt::HLCTimestamp;
4use tokio::sync::oneshot;
5
6use crate::NodeId;
7
8const CLOCK_BACKPRESSURE_LIMIT: u16 = u16::MAX - 10;
9
10#[derive(Clone)]
11pub struct Clock {
12 node_id: NodeId,
13 tx: flume::Sender<Event>,
14}
15
16impl Clock {
17 pub fn new(node_id: NodeId) -> Self {
18 let ts = HLCTimestamp::now(0, node_id);
19 let (tx, rx) = flume::bounded(1000);
20
21 tokio::spawn(run_clock(ts, rx));
22
23 Self { node_id, tx }
24 }
25
26 pub async fn register_ts(&self, ts: HLCTimestamp) {
27 if ts.node() == self.node_id {
28 return;
29 }
30
31 self.tx
32 .send_async(Event::Register(ts))
33 .await
34 .expect("Clock actor should never die");
35 }
36
37 pub async fn get_time(&self) -> HLCTimestamp {
38 let (tx, rx) = oneshot::channel();
39
40 self.tx
41 .send_async(Event::Get(tx))
42 .await
43 .expect("Clock actor should never die");
44
45 rx.await.expect("Responder should not be dropped")
46 }
47}
48
49pub enum Event {
50 Get(oneshot::Sender<HLCTimestamp>),
51 Register(HLCTimestamp),
52}
53
54async fn run_clock(mut clock: HLCTimestamp, reqs: flume::Receiver<Event>) {
55 while let Ok(event) = reqs.recv_async().await {
56 match event {
57 Event::Get(tx) => {
58 let ts = clock.send().expect("Clock counter should not overflow");
59
60 if clock.counter() >= CLOCK_BACKPRESSURE_LIMIT {
61 tokio::time::sleep(Duration::from_millis(1)).await;
62 }
63
64 let _ = tx.send(ts);
65 },
66 Event::Register(remote_ts) => {
67 let _ = clock.recv(&remote_ts);
68
69 if clock.counter() >= CLOCK_BACKPRESSURE_LIMIT {
70 tokio::time::sleep(Duration::from_millis(1)).await;
71 }
72 },
73 }
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80
81 #[tokio::test]
82 async fn test_clock() {
83 let clock = Clock::new(0);
84
85 let ts1 = clock.get_time().await;
86 clock.register_ts(ts1).await;
87 let ts2 = clock.get_time().await;
88 assert!(ts1 < ts2);
89
90 let ts1 = clock.get_time().await;
91 let ts2 = clock.get_time().await;
92 let ts3 = clock.get_time().await;
93 assert!(ts1 < ts2);
94 assert!(ts2 < ts3);
95
96 let drift_ts =
97 HLCTimestamp::new(ts3.datacake_timestamp() + Duration::from_secs(50), 0, 1);
98 clock.register_ts(drift_ts).await;
99 let ts = clock.get_time().await;
100 assert!(
101 drift_ts < ts,
102 "New timestamp should be monotonic relative to drifted ts."
103 );
104
105 let old_ts =
106 HLCTimestamp::new(ts3.datacake_timestamp() + Duration::from_secs(5), 0, 1);
107 clock.register_ts(old_ts).await;
108 }
109}