datacake_node/
clock.rs

1use std::time::Duration;
2
3use datacake_crdt::HLCTimestamp;
4use tokio::sync::oneshot;
5
6use crate::NodeId;
7
8const CLOCK_BACKPRESSURE_LIMIT: u16 = u16::MAX - 10;
9
10#[derive(Clone)]
11pub struct Clock {
12    node_id: NodeId,
13    tx: flume::Sender<Event>,
14}
15
16impl Clock {
17    pub fn new(node_id: NodeId) -> Self {
18        let ts = HLCTimestamp::now(0, node_id);
19        let (tx, rx) = flume::bounded(1000);
20
21        tokio::spawn(run_clock(ts, rx));
22
23        Self { node_id, tx }
24    }
25
26    pub async fn register_ts(&self, ts: HLCTimestamp) {
27        if ts.node() == self.node_id {
28            return;
29        }
30
31        self.tx
32            .send_async(Event::Register(ts))
33            .await
34            .expect("Clock actor should never die");
35    }
36
37    pub async fn get_time(&self) -> HLCTimestamp {
38        let (tx, rx) = oneshot::channel();
39
40        self.tx
41            .send_async(Event::Get(tx))
42            .await
43            .expect("Clock actor should never die");
44
45        rx.await.expect("Responder should not be dropped")
46    }
47}
48
49pub enum Event {
50    Get(oneshot::Sender<HLCTimestamp>),
51    Register(HLCTimestamp),
52}
53
54async fn run_clock(mut clock: HLCTimestamp, reqs: flume::Receiver<Event>) {
55    while let Ok(event) = reqs.recv_async().await {
56        match event {
57            Event::Get(tx) => {
58                let ts = clock.send().expect("Clock counter should not overflow");
59
60                if clock.counter() >= CLOCK_BACKPRESSURE_LIMIT {
61                    tokio::time::sleep(Duration::from_millis(1)).await;
62                }
63
64                let _ = tx.send(ts);
65            },
66            Event::Register(remote_ts) => {
67                let _ = clock.recv(&remote_ts);
68
69                if clock.counter() >= CLOCK_BACKPRESSURE_LIMIT {
70                    tokio::time::sleep(Duration::from_millis(1)).await;
71                }
72            },
73        }
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[tokio::test]
82    async fn test_clock() {
83        let clock = Clock::new(0);
84
85        let ts1 = clock.get_time().await;
86        clock.register_ts(ts1).await;
87        let ts2 = clock.get_time().await;
88        assert!(ts1 < ts2);
89
90        let ts1 = clock.get_time().await;
91        let ts2 = clock.get_time().await;
92        let ts3 = clock.get_time().await;
93        assert!(ts1 < ts2);
94        assert!(ts2 < ts3);
95
96        let drift_ts =
97            HLCTimestamp::new(ts3.datacake_timestamp() + Duration::from_secs(50), 0, 1);
98        clock.register_ts(drift_ts).await;
99        let ts = clock.get_time().await;
100        assert!(
101            drift_ts < ts,
102            "New timestamp should be monotonic relative to drifted ts."
103        );
104
105        let old_ts =
106            HLCTimestamp::new(ts3.datacake_timestamp() + Duration::from_secs(5), 0, 1);
107        clock.register_ts(old_ts).await;
108    }
109}