ckb_sync/
net_time_checker.rs

1use crate::utils::send_message_to;
2use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
3use ckb_logger::{debug, info, warn};
4use ckb_network::async_trait;
5use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, bytes::Bytes};
6use ckb_types::{packed, prelude::*};
7use ckb_util::RwLock;
8use std::collections::VecDeque;
9use std::sync::Arc;
10
11pub(crate) const TOLERANT_OFFSET: u64 = 7_200_000;
12const MIN_SAMPLES: usize = 5;
13const MAX_SAMPLES: usize = 11;
14
15/// Collect and check time offset samples
16#[derive(Clone)]
17pub struct NetTimeChecker {
18    /// Local clock should has less offset than this value.
19    tolerant_offset: u64,
20    max_samples: usize,
21    min_samples: usize,
22    samples: VecDeque<i64>,
23}
24
25impl NetTimeChecker {
26    pub fn new(min_samples: usize, max_samples: usize, tolerant_offset: u64) -> Self {
27        NetTimeChecker {
28            min_samples,
29            max_samples,
30            tolerant_offset,
31            samples: VecDeque::with_capacity(max_samples + 1),
32        }
33    }
34
35    pub fn add_sample(&mut self, offset: i64) {
36        self.samples.push_back(offset);
37        if self.samples.len() > self.max_samples {
38            self.samples.pop_front();
39        }
40    }
41
42    fn median_offset(&self) -> Option<i64> {
43        if self.samples.is_empty() || self.samples.len() < self.min_samples {
44            return None;
45        }
46        let mut samples = self.samples.iter().cloned().collect::<Vec<_>>();
47        samples.sort_unstable();
48        let mid = samples.len() >> 1;
49        if samples.len() & 0x1 == 0 {
50            // samples is even
51            Some((samples[mid - 1] + samples[mid]) >> 1)
52        } else {
53            // samples is odd
54            samples.get(mid).cloned()
55        }
56    }
57
58    pub fn check(&self) -> Result<(), i64> {
59        let network_offset = match self.median_offset() {
60            Some(offset) => offset,
61            None => return Ok(()),
62        };
63        if network_offset.unsigned_abs() > self.tolerant_offset {
64            return Err(network_offset);
65        }
66        Ok(())
67    }
68}
69
70impl Default for NetTimeChecker {
71    fn default() -> Self {
72        NetTimeChecker::new(MIN_SAMPLES, MAX_SAMPLES, TOLERANT_OFFSET)
73    }
74}
75
76/// Collect time offset samples from network peers and send notify to user if offset is too large
77pub struct NetTimeProtocol {
78    checker: RwLock<NetTimeChecker>,
79}
80
81impl Clone for NetTimeProtocol {
82    fn clone(&self) -> Self {
83        NetTimeProtocol {
84            checker: RwLock::new(self.checker.read().to_owned()),
85        }
86    }
87}
88
89impl NetTimeProtocol {
90    /// Init time protocol
91    pub fn new(min_samples: usize, max_samples: usize, tolerant_offset: u64) -> Self {
92        let checker = RwLock::new(NetTimeChecker::new(
93            min_samples,
94            max_samples,
95            tolerant_offset,
96        ));
97        NetTimeProtocol { checker }
98    }
99}
100
101impl Default for NetTimeProtocol {
102    fn default() -> Self {
103        let checker = RwLock::new(NetTimeChecker::default());
104        NetTimeProtocol { checker }
105    }
106}
107
108#[async_trait]
109impl CKBProtocolHandler for NetTimeProtocol {
110    async fn init(&mut self, _nc: Arc<dyn CKBProtocolContext + Sync>) {}
111
112    async fn connected(
113        &mut self,
114        nc: Arc<dyn CKBProtocolContext + Sync>,
115        peer_index: PeerIndex,
116        _version: &str,
117    ) {
118        // send local time to inbound peers
119        if let Some(true) = nc.get_peer(peer_index).map(|peer| peer.is_inbound()) {
120            let now = ckb_systemtime::unix_time_as_millis();
121            let time = packed::Time::new_builder().timestamp(now).build();
122            let _status = send_message_to(nc.as_ref(), peer_index, &time);
123        }
124    }
125
126    async fn received(
127        &mut self,
128        nc: Arc<dyn CKBProtocolContext + Sync>,
129        peer_index: PeerIndex,
130        data: Bytes,
131    ) {
132        if let Some(true) = nc.get_peer(peer_index).map(|peer| peer.is_inbound()) {
133            info!(
134                "Received a time message from a non-outbound peer {}",
135                peer_index
136            );
137        }
138
139        let timestamp: u64 = match packed::TimeReader::from_slice(&data)
140            .map(|time| time.timestamp().into())
141            .ok()
142        {
143            Some(timestamp) => timestamp,
144            None => {
145                info!("Received a malformed message from peer {}", peer_index);
146                nc.ban_peer(
147                    peer_index,
148                    BAD_MESSAGE_BAN_TIME,
149                    String::from("send us a malformed message"),
150                );
151                return;
152            }
153        };
154
155        let now: u64 = ckb_systemtime::unix_time_as_millis();
156        let offset: i64 = (i128::from(now) - i128::from(timestamp)) as i64;
157        let mut net_time_checker = self.checker.write();
158        debug!("New net time offset sample {}ms", offset);
159        net_time_checker.add_sample(offset);
160        if let Err(offset) = net_time_checker.check() {
161            warn!(
162                "Please check your computer's local clock ({}ms offset from network peers). Incorrect time setting may cause unexpected errors.",
163                offset
164            );
165        }
166    }
167}