1use dns_lookup::lookup_host;
2use sntpc::{Error, NtpContext, NtpResult, NtpTimestampGenerator, NtpUdpSocket, Result};
3use std::env;
4use std::fmt::Display;
5use std::mem::MaybeUninit;
6use std::net::IpAddr::{V4, V6};
7use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
8use std::ops::Add;
9use std::sync::{Arc, Mutex, Once};
10use std::thread::JoinHandle;
11use std::time::Duration;
12
13#[derive(Copy, Clone, Default)]
14struct StdTimestampGen {
15 duration: Duration,
16}
17
18impl NtpTimestampGenerator for StdTimestampGen {
19 fn init(&mut self) {
20 self.duration = std::time::SystemTime::now()
21 .duration_since(std::time::SystemTime::UNIX_EPOCH)
22 .unwrap();
23 }
24 fn timestamp_sec(&self) -> u64 {
25 self.duration.as_secs()
26 }
27 fn timestamp_subsec_micros(&self) -> u32 {
28 self.duration.subsec_micros()
29 }
30}
31
32#[derive(Debug)]
33struct UdpSocketWrapper(UdpSocket);
34
35impl NtpUdpSocket for UdpSocketWrapper {
36 fn send_to<T: ToSocketAddrs>(&self, buf: &[u8], addr: T) -> Result<usize> {
37 match self.0.send_to(buf, addr) {
38 Ok(usize) => Ok(usize),
39 Err(_) => Err(Error::Network),
40 }
41 }
42 fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
43 match self.0.recv_from(buf) {
44 Ok((size, addr)) => Ok((size, addr)),
45 Err(_) => Err(Error::Network),
46 }
47 }
48}
49pub fn get_time_from_single_serv(serv: &str) -> Result<NtpResult> {
50 let socket = UdpSocket::bind("0.0.0.0:0").expect("Unable to crate UDP socket");
51 socket
52 .set_read_timeout(Some(Duration::from_secs(2)))
53 .expect("Unable to set UDP socket read timeout");
54 let sock_wrapper = UdpSocketWrapper(socket);
55 let ntp_context = NtpContext::new(StdTimestampGen::default());
56 sntpc::get_time(serv, sock_wrapper, ntp_context)
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub struct WorldTimer {
61 pub offset: i64,
62 pub precision: Option<i64>,
63}
64
65pub struct WorldTimerWrapper {
66 pub world_timer: Arc<Mutex<WorldTimer>>,
67}
68
69impl WorldTimer {
70 pub fn local_time(&self) -> chrono::DateTime<chrono::Local> {
71 let local_now = chrono::Local::now();
72 local_now.add(chrono::Duration::microseconds(self.offset))
73 }
74
75 pub fn utc_time(&self) -> chrono::DateTime<chrono::Utc> {
76 let utc_now = chrono::Utc::now();
77 utc_now.add(chrono::Duration::microseconds(self.offset))
78 }
79}
80pub fn world_time() -> WorldTimer {
81 *world_time_wrapper().world_timer.lock().unwrap()
82}
83pub fn world_time_wrapper() -> &'static WorldTimerWrapper {
84 static mut WORLD_TIME_WRAPPER: MaybeUninit<WorldTimerWrapper> = MaybeUninit::uninit();
85 static ONCE: Once = Once::new();
86
87 let world_time = WorldTimerWrapper {
88 world_timer: Arc::new(Mutex::new(WorldTimer::default())),
89 };
90
91 unsafe {
94 ONCE.call_once(|| {
95 WORLD_TIME_WRAPPER.write(world_time);
97 });
98
99 WORLD_TIME_WRAPPER.assume_init_ref()
101 }
102}
103
104pub fn init_world_time() {
105 let world_time = get_time();
106 *world_time_wrapper().world_timer.lock().unwrap() = world_time;
107}
108
109#[derive(Debug, Clone)]
110struct ServerInfo {
111 port: u16,
112 ip_addr: String,
113 host_name: String,
114}
115
116impl Display for ServerInfo {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 write!(f, "{}:{} [{}]", self.ip_addr, self.port, self.host_name)
119 }
120}
121
122struct Server {
123 server_info: ServerInfo,
124 join_handle: JoinHandle<Result<NtpResult>>,
125}
126struct Measurement {
127 server_info: ServerInfo,
128 result: NtpResult,
129}
130
131fn add_servers_from_host(time_servers: &mut Vec<ServerInfo>, host: &str) {
132 match lookup_host(host) {
133 Ok(ip_addrs) => {
134 for ip_addr in ip_addrs {
135 match ip_addr {
136 V4(addr) => {
137 log::debug!("Adding IPv4 address: {addr} resolved from {host}");
138 time_servers.push(ServerInfo {
139 port: 123,
140 ip_addr: addr.to_string(),
141 host_name: host.to_string(),
142 });
143 }
144 V6(addr) => {
145 log::debug!("Ignoring IPv6 address: {addr} resolved from {host}");
146 }
147 }
148 }
149 }
150 Err(_err) => {
151 log::warn!("Unable to resolve host: {host}");
152 }
153 }
154}
155
156fn get_time() -> WorldTimer {
157 let max_at_once = env::var("YA_WORLD_TIME_MAX_AT_ONCE")
161 .unwrap_or("50".to_string())
162 .parse::<usize>()
163 .expect("YA_WORLD_TIME_MAX_AT_ONCE cannot parse to usize");
164 let max_total = env::var("YA_WORLD_TIME_MAX_TOTAL")
165 .unwrap_or("100".to_string())
166 .parse::<usize>()
167 .expect("YA_WORLD_TIME_MAX_TOTAL cannot parse to usize");
168 let max_timeout = std::time::Duration::from_millis(
169 env::var("YA_WORLD_TIME_MAX_TIMEOUT")
170 .unwrap_or("300".to_string())
171 .parse::<u64>()
172 .expect("YA_WORLD_TIME_MAX_TIMEOUT cannot parse to usize"),
173 );
174
175 let mut time_servers: Vec<ServerInfo> = vec![];
176
177 let default_hosts = vec![
178 "time.google.com",
179 "ntp.qix.ca",
180 "ntp.nict.jp",
181 "pool.ntp.org",
182 "time.cloudflare.com",
183 "ntp.fizyka.umk.pl",
184 "time.apple.com",
185 "time.fu-berlin.de",
186 "time.facebook.com",
187 ];
188
189 if let Ok(time_server_hosts) = env::var("YA_WORLD_TIME_SERVER_HOSTS") {
190 for serv in time_server_hosts.split(';') {
191 add_servers_from_host(&mut time_servers, serv.trim());
192 }
193 } else {
194 for serv in default_hosts {
195 add_servers_from_host(&mut time_servers, serv);
196 }
197 }
198
199 let mut avg_difference = 0;
200 let mut number_of_reads = 0;
201
202 let mut measurements = Vec::new();
203 if time_servers.len() > max_total {
204 log::warn!("Too many servers, truncating to {}", max_total);
205 time_servers.truncate(max_total);
206 }
207 let mut number_checked = 0;
208 let chunked: Vec<Vec<ServerInfo>> =
209 time_servers.chunks(max_at_once).map(|s| s.into()).collect();
210 for chunk in chunked {
211 log::info!(
212 "Checking [{}..{}] servers out of {}",
213 number_checked,
214 number_checked + chunk.len(),
215 time_servers.len()
216 );
217 number_checked += chunk.len();
218 let mut results: Vec<Server> = Vec::new();
219 for server_info in chunk {
220 results.push(Server {
221 server_info: server_info.clone(),
222 join_handle: std::thread::spawn(move || {
223 get_time_from_single_serv(
224 format!("{}:{}", server_info.ip_addr, server_info.port).as_str(),
225 )
226 }),
227 });
228 }
229
230 let mut unjoined = results;
231
232 let current_time = std::time::Instant::now();
233 loop {
234 let mut idxs = Vec::new();
235 for (idx, item) in unjoined.iter().enumerate() {
236 if item.join_handle.is_finished() {
237 idxs.push(idx);
238 }
239 }
240 for idx in idxs.iter().rev() {
241 let el = unjoined.remove(*idx);
242 match el.join_handle.join() {
243 Ok(Ok(result)) => {
244 avg_difference += result.offset;
245 number_of_reads += 1;
246 measurements.push(Measurement {
247 server_info: el.server_info,
248 result,
249 });
250 }
251 Ok(Err(_)) => {
252 log::warn!("Unable to get time from server {}", el.server_info);
253 }
254 Err(_) => {
255 log::warn!("Unable to join thread");
256 }
257 }
258 }
259 if unjoined.is_empty() {
260 log::info!(
261 "All servers responded in time: {}ms",
262 current_time.elapsed().as_millis()
263 );
264 break;
265 }
266
267 if current_time.elapsed() > max_timeout {
268 let str_vec: Vec<ServerInfo> =
269 unjoined.into_iter().map(|x| x.server_info).collect();
270 log::debug!("Don't wait for other servers: {:?}", str_vec);
271 break;
272 }
273 std::thread::sleep(Duration::from_millis(5));
274 }
275 }
276
277 measurements.sort_by(|a, b| a.result.roundtrip.cmp(&b.result.roundtrip));
278
279 if number_of_reads > 0 {
280 log::info!("Total of servers responded: {}", number_of_reads);
281 avg_difference /= number_of_reads;
282
283 let mut harmonic_sum = 0.0;
284 let mut harmonic_norm = 0.0;
285 for measurement in measurements.iter() {
286 log::debug!(
287 "Server {}, Offset: {}ms, Roundtrip {}ms",
288 measurement.server_info,
289 measurement.result.offset as f64 / 1000.0,
290 measurement.result.roundtrip as f64 / 1000.0
291 );
292 harmonic_sum += measurement.result.offset as f64
293 / (measurement.result.roundtrip as f64).powf(2.0f64);
294 harmonic_norm += 1.0 / (measurement.result.roundtrip as f64).powf(2.0f64);
295 }
296 let harmonic_avg = harmonic_sum / harmonic_norm;
297 let harmonic_error = (1.0 / harmonic_norm).sqrt();
298
299 let additional_systematic_error = 200.0;
300 let roundtrip_to_error_multiplier = 5.0;
301
302 log::info!(
303 "Difference estimation: {:.02}ms ± {:.02}ms",
304 harmonic_avg / 1000.0,
305 (harmonic_error / roundtrip_to_error_multiplier + additional_systematic_error) / 1000.0
306 );
307
308 WorldTimer {
309 offset: avg_difference,
310 precision: Some(
311 (harmonic_error / roundtrip_to_error_multiplier + additional_systematic_error)
312 as i64,
313 ),
314 }
315 } else {
316 log::warn!("No time servers available");
317 WorldTimer {
318 offset: 0,
319 precision: None,
320 }
321 }
322}