ya_world_time/
world_time.rs

1use dns_lookup::lookup_host;
2use sntpc::{Error, NtpContext, NtpResult, NtpTimestampGenerator, NtpUdpSocket, Result};
3use std::env;
4use std::fmt::Display;
5use std::mem::MaybeUninit;
6use std::net::IpAddr::{V4, V6};
7use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
8use std::ops::Add;
9use std::sync::{Arc, Mutex, Once};
10use std::thread::JoinHandle;
11use std::time::Duration;
12
13#[derive(Copy, Clone, Default)]
14struct StdTimestampGen {
15    duration: Duration,
16}
17
18impl NtpTimestampGenerator for StdTimestampGen {
19    fn init(&mut self) {
20        self.duration = std::time::SystemTime::now()
21            .duration_since(std::time::SystemTime::UNIX_EPOCH)
22            .unwrap();
23    }
24    fn timestamp_sec(&self) -> u64 {
25        self.duration.as_secs()
26    }
27    fn timestamp_subsec_micros(&self) -> u32 {
28        self.duration.subsec_micros()
29    }
30}
31
32#[derive(Debug)]
33struct UdpSocketWrapper(UdpSocket);
34
35impl NtpUdpSocket for UdpSocketWrapper {
36    fn send_to<T: ToSocketAddrs>(&self, buf: &[u8], addr: T) -> Result<usize> {
37        match self.0.send_to(buf, addr) {
38            Ok(usize) => Ok(usize),
39            Err(_) => Err(Error::Network),
40        }
41    }
42    fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
43        match self.0.recv_from(buf) {
44            Ok((size, addr)) => Ok((size, addr)),
45            Err(_) => Err(Error::Network),
46        }
47    }
48}
49pub fn get_time_from_single_serv(serv: &str) -> Result<NtpResult> {
50    let socket = UdpSocket::bind("0.0.0.0:0").expect("Unable to crate UDP socket");
51    socket
52        .set_read_timeout(Some(Duration::from_secs(2)))
53        .expect("Unable to set UDP socket read timeout");
54    let sock_wrapper = UdpSocketWrapper(socket);
55    let ntp_context = NtpContext::new(StdTimestampGen::default());
56    sntpc::get_time(serv, sock_wrapper, ntp_context)
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub struct WorldTimer {
61    pub offset: i64,
62    pub precision: Option<i64>,
63}
64
65pub struct WorldTimerWrapper {
66    pub world_timer: Arc<Mutex<WorldTimer>>,
67}
68
69impl WorldTimer {
70    pub fn local_time(&self) -> chrono::DateTime<chrono::Local> {
71        let local_now = chrono::Local::now();
72        local_now.add(chrono::Duration::microseconds(self.offset))
73    }
74
75    pub fn utc_time(&self) -> chrono::DateTime<chrono::Utc> {
76        let utc_now = chrono::Utc::now();
77        utc_now.add(chrono::Duration::microseconds(self.offset))
78    }
79}
80pub fn world_time() -> WorldTimer {
81    *world_time_wrapper().world_timer.lock().unwrap()
82}
83pub fn world_time_wrapper() -> &'static WorldTimerWrapper {
84    static mut WORLD_TIME_WRAPPER: MaybeUninit<WorldTimerWrapper> = MaybeUninit::uninit();
85    static ONCE: Once = Once::new();
86
87    let world_time = WorldTimerWrapper {
88        world_timer: Arc::new(Mutex::new(WorldTimer::default())),
89    };
90
91    // SAFETY: This is simple singleton pattern
92    // it shouldn't cause any problems
93    unsafe {
94        ONCE.call_once(|| {
95            // SAFETY: This is safe because we only write to the singleton once.
96            WORLD_TIME_WRAPPER.write(world_time);
97        });
98
99        // SAFETY: This is safe because singleton is initialized inside ONCE call
100        WORLD_TIME_WRAPPER.assume_init_ref()
101    }
102}
103
104pub fn init_world_time() {
105    let world_time = get_time();
106    *world_time_wrapper().world_timer.lock().unwrap() = world_time;
107}
108
109#[derive(Debug, Clone)]
110struct ServerInfo {
111    port: u16,
112    ip_addr: String,
113    host_name: String,
114}
115
116impl Display for ServerInfo {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        write!(f, "{}:{} [{}]", self.ip_addr, self.port, self.host_name)
119    }
120}
121
122struct Server {
123    server_info: ServerInfo,
124    join_handle: JoinHandle<Result<NtpResult>>,
125}
126struct Measurement {
127    server_info: ServerInfo,
128    result: NtpResult,
129}
130
131fn add_servers_from_host(time_servers: &mut Vec<ServerInfo>, host: &str) {
132    match lookup_host(host) {
133        Ok(ip_addrs) => {
134            for ip_addr in ip_addrs {
135                match ip_addr {
136                    V4(addr) => {
137                        log::debug!("Adding IPv4 address: {addr} resolved from {host}");
138                        time_servers.push(ServerInfo {
139                            port: 123,
140                            ip_addr: addr.to_string(),
141                            host_name: host.to_string(),
142                        });
143                    }
144                    V6(addr) => {
145                        log::debug!("Ignoring IPv6 address: {addr} resolved from {host}");
146                    }
147                }
148            }
149        }
150        Err(_err) => {
151            log::warn!("Unable to resolve host: {host}");
152        }
153    }
154}
155
156fn get_time() -> WorldTimer {
157    //const MAX_AT_ONCE: usize = 50;
158    //const MAX_SERVERS: usize = 100;
159
160    let max_at_once = env::var("YA_WORLD_TIME_MAX_AT_ONCE")
161        .unwrap_or("50".to_string())
162        .parse::<usize>()
163        .expect("YA_WORLD_TIME_MAX_AT_ONCE cannot parse to usize");
164    let max_total = env::var("YA_WORLD_TIME_MAX_TOTAL")
165        .unwrap_or("100".to_string())
166        .parse::<usize>()
167        .expect("YA_WORLD_TIME_MAX_TOTAL cannot parse to usize");
168    let max_timeout = std::time::Duration::from_millis(
169        env::var("YA_WORLD_TIME_MAX_TIMEOUT")
170            .unwrap_or("300".to_string())
171            .parse::<u64>()
172            .expect("YA_WORLD_TIME_MAX_TIMEOUT cannot parse to usize"),
173    );
174
175    let mut time_servers: Vec<ServerInfo> = vec![];
176
177    let default_hosts = vec![
178        "time.google.com",
179        "ntp.qix.ca",
180        "ntp.nict.jp",
181        "pool.ntp.org",
182        "time.cloudflare.com",
183        "ntp.fizyka.umk.pl",
184        "time.apple.com",
185        "time.fu-berlin.de",
186        "time.facebook.com",
187    ];
188
189    if let Ok(time_server_hosts) = env::var("YA_WORLD_TIME_SERVER_HOSTS") {
190        for serv in time_server_hosts.split(';') {
191            add_servers_from_host(&mut time_servers, serv.trim());
192        }
193    } else {
194        for serv in default_hosts {
195            add_servers_from_host(&mut time_servers, serv);
196        }
197    }
198
199    let mut avg_difference = 0;
200    let mut number_of_reads = 0;
201
202    let mut measurements = Vec::new();
203    if time_servers.len() > max_total {
204        log::warn!("Too many servers, truncating to {}", max_total);
205        time_servers.truncate(max_total);
206    }
207    let mut number_checked = 0;
208    let chunked: Vec<Vec<ServerInfo>> =
209        time_servers.chunks(max_at_once).map(|s| s.into()).collect();
210    for chunk in chunked {
211        log::info!(
212            "Checking [{}..{}] servers out of {}",
213            number_checked,
214            number_checked + chunk.len(),
215            time_servers.len()
216        );
217        number_checked += chunk.len();
218        let mut results: Vec<Server> = Vec::new();
219        for server_info in chunk {
220            results.push(Server {
221                server_info: server_info.clone(),
222                join_handle: std::thread::spawn(move || {
223                    get_time_from_single_serv(
224                        format!("{}:{}", server_info.ip_addr, server_info.port).as_str(),
225                    )
226                }),
227            });
228        }
229
230        let mut unjoined = results;
231
232        let current_time = std::time::Instant::now();
233        loop {
234            let mut idxs = Vec::new();
235            for (idx, item) in unjoined.iter().enumerate() {
236                if item.join_handle.is_finished() {
237                    idxs.push(idx);
238                }
239            }
240            for idx in idxs.iter().rev() {
241                let el = unjoined.remove(*idx);
242                match el.join_handle.join() {
243                    Ok(Ok(result)) => {
244                        avg_difference += result.offset;
245                        number_of_reads += 1;
246                        measurements.push(Measurement {
247                            server_info: el.server_info,
248                            result,
249                        });
250                    }
251                    Ok(Err(_)) => {
252                        log::warn!("Unable to get time from server {}", el.server_info);
253                    }
254                    Err(_) => {
255                        log::warn!("Unable to join thread");
256                    }
257                }
258            }
259            if unjoined.is_empty() {
260                log::info!(
261                    "All servers responded in time: {}ms",
262                    current_time.elapsed().as_millis()
263                );
264                break;
265            }
266
267            if current_time.elapsed() > max_timeout {
268                let str_vec: Vec<ServerInfo> =
269                    unjoined.into_iter().map(|x| x.server_info).collect();
270                log::debug!("Don't wait for other servers: {:?}", str_vec);
271                break;
272            }
273            std::thread::sleep(Duration::from_millis(5));
274        }
275    }
276
277    measurements.sort_by(|a, b| a.result.roundtrip.cmp(&b.result.roundtrip));
278
279    if number_of_reads > 0 {
280        log::info!("Total of servers responded: {}", number_of_reads);
281        avg_difference /= number_of_reads;
282
283        let mut harmonic_sum = 0.0;
284        let mut harmonic_norm = 0.0;
285        for measurement in measurements.iter() {
286            log::debug!(
287                "Server {}, Offset: {}ms, Roundtrip {}ms",
288                measurement.server_info,
289                measurement.result.offset as f64 / 1000.0,
290                measurement.result.roundtrip as f64 / 1000.0
291            );
292            harmonic_sum += measurement.result.offset as f64
293                / (measurement.result.roundtrip as f64).powf(2.0f64);
294            harmonic_norm += 1.0 / (measurement.result.roundtrip as f64).powf(2.0f64);
295        }
296        let harmonic_avg = harmonic_sum / harmonic_norm;
297        let harmonic_error = (1.0 / harmonic_norm).sqrt();
298
299        let additional_systematic_error = 200.0;
300        let roundtrip_to_error_multiplier = 5.0;
301
302        log::info!(
303            "Difference estimation: {:.02}ms ± {:.02}ms",
304            harmonic_avg / 1000.0,
305            (harmonic_error / roundtrip_to_error_multiplier + additional_systematic_error) / 1000.0
306        );
307
308        WorldTimer {
309            offset: avg_difference,
310            precision: Some(
311                (harmonic_error / roundtrip_to_error_multiplier + additional_systematic_error)
312                    as i64,
313            ),
314        }
315    } else {
316        log::warn!("No time servers available");
317        WorldTimer {
318            offset: 0,
319            precision: None,
320        }
321    }
322}