timesimp/lib.rs
1//! Simple sans-io timesync client and server.
2//!
3//! Timesimp is based on the averaging method described in [Simpson (2002), A Stream-based Time
4//! Synchronization Technique For Networked Computer Games][paper], but with a corrected delta
5//! calculation. Compared to NTP, it's a simpler and less accurate time synchronisation algorithm
6//! that is usable over network streams, rather than datagrams. Simpson asserts they were able to
7//! achieve accuracies of 100ms or better, which is sufficient in many cases; my testing gets
8//! accuracies well below 5ms. The main limitation of the algorithm is that round-trip-time is
9//! assumed to be symmetric: if the forward trip time is different from the return trip time, then
10//! an error is induced equal to the value of the difference in trip times.
11//!
12//! This library provides a sans-io implementation: you bring in your async runtime, your transport,
13//! and your storage; timesimp gives you time offsets.
14//!
15//! If the local clock goes backward during a synchronisation, the invalid delta is discarded; this
16//! may cause the sync attempt to fail, especially if the `samples` count is lowered to its minimum
17//! of 3. This is a deliberate design decision: you should handle failure and retry, and the sync
18//! will proceed correctly when the clock is stable.
19//!
20//! [paper]: https://web.archive.org/web/20160310125700/http://mine-control.com/zack/timesync/timesync.html
21//!
22//! # Example
23//!
24//! ```no_run
25//! use std::{convert::Infallible, time::Duration};
26//! use reqwest::{Client, Url};
27//! use timesimp::{SignedDuration, Timesimp};
28//!
29//! struct ServerSimp;
30//! impl Timesimp for ServerSimp {
31//! type Err = Infallible;
32//!
33//! async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err> {
34//! // server time is correct
35//! Ok(Some(SignedDuration::ZERO))
36//! }
37//!
38//! async fn store_offset(&mut self, _offset: SignedDuration) -> Result<(), Self::Err> {
39//! // as time is correct, no need to store offset
40//! unimplemented!()
41//! }
42//!
43//! async fn query_server(
44//! &self,
45//! _request: timesimp::Request,
46//! ) -> Result<timesimp::Response, Self::Err> {
47//! // server has no upstream timesimp
48//! unimplemented!()
49//! }
50//!
51//! async fn sleep(duration: std::time::Duration) {
52//! tokio::time::sleep(duration).await;
53//! }
54//! }
55//!
56//! // Not shown: serving ServerSimp from a URL
57//!
58//! struct ClientSimp {
59//! offset: Option<SignedDuration>,
60//! url: Url,
61//! }
62//!
63//! impl Timesimp for ClientSimp {
64//! type Err = reqwest::Error;
65//!
66//! async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err> {
67//! Ok(self.offset)
68//! }
69//!
70//! async fn store_offset(&mut self, offset: SignedDuration) -> Result<(), Self::Err> {
71//! self.offset = Some(offset);
72//! Ok(())
73//! }
74//!
75//! async fn query_server(
76//! &self,
77//! request: timesimp::Request,
78//! ) -> Result<timesimp::Response, Self::Err> {
79//! let resp = Client::new()
80//! .post(self.url.clone())
81//! .body(request.to_bytes().to_vec())
82//! .send()
83//! .await?
84//! .error_for_status()?
85//! .bytes()
86//! .await?;
87//! Ok(timesimp::Response::try_from(&resp[..]).unwrap())
88//! }
89//!
90//! async fn sleep(duration: std::time::Duration) {
91//! tokio::time::sleep(duration).await;
92//! }
93//! }
94//!
95//! #[tokio::main]
96//! async fn main() {
97//! let mut client = ClientSimp {
98//! offset: None,
99//! url: "https://timesimp.server".try_into().unwrap(),
100//! };
101//!
102//! loop {
103//! if let Ok(Some(offset)) = client.attempt_sync(Default::default()).await {
104//! println!(
105//! "Received offset: {offset:?}; current time is {}",
106//! client.adjusted_timestamp().await.unwrap(),
107//! );
108//! }
109//! tokio::time::sleep(Duration::from_secs(300)).await;
110//! }
111//! }
112//! ```
113
114use std::time::Duration;
115
116pub use jiff::{SignedDuration, Timestamp};
117
118mod delta;
119use delta::*;
120
121mod messages;
122pub use messages::*;
123
124mod settings;
125pub use settings::*;
126
127/// A time sync client and/or server.
128///
129/// You must implement the four required functions and not override the others.
130///
131/// Then, use `answer_client()` to implement a time sync server, and/or use `attempt_sync()` to
132/// implement a time sync client.
133#[allow(async_fn_in_trait)]
134pub trait Timesimp {
135 /// Error for your required methods.
136 type Err: std::error::Error;
137
138 /// Load the current time offset.
139 ///
140 /// This must return the current stored time offset, or `None` if no time offset is currently
141 /// stored.
142 ///
143 /// As this expects a `SignedDuration`, you are free to load whatever precision you wish, so
144 /// long as `store_offset()` agrees. Microseconds should be enough for most purposes.
145 async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err>;
146
147 /// Store the current time offset.
148 ///
149 /// This must store the given time offset, typically in some kind of database.
150 ///
151 /// As this is given a `SignedDuration`, you are free to store whatever precision you wish, so
152 /// long as `load_offset()` agrees. Microseconds should be enough for most purposes.
153 ///
154 /// Additionally, once `store_offset` has been called once, `load_offset` should return `Some`.
155 async fn store_offset(&mut self, offset: SignedDuration) -> Result<(), Self::Err>;
156
157 /// Query a timesimp server endpoint.
158 ///
159 /// This must query in some manner a timesimp server, by sending the given [`Request`] and
160 /// obtaining a [`Response`]. Both [`Request`] and [`Response`] can be parsed from and
161 /// serialized to bytes. The query implementation should do as little else as possible to
162 /// avoid adding unnecessary latency.
163 ///
164 /// If using a connecting protocol, such as TCP or QUIC, it's recommended to keep the
165 /// connection alive if practicable, with a timeout longer than the
166 /// [`Settings.jitter`](Settings) value. That should result in all but the first sample being
167 /// approximately a single round trip, eliminating the handshake delay.
168 async fn query_server(&self, request: Request) -> Result<Response, Self::Err>;
169
170 /// Sleep for a [`Duration`].
171 ///
172 /// This is usually something like `tokio::time::sleep` or equivalent.
173 async fn sleep(duration: Duration);
174
175 /// Obtain an adjusted timestamp.
176 ///
177 /// Do not override.
178 ///
179 /// This simply loads the offset and applies it to the current local timestamp.
180 ///
181 /// It is provided as convenience for simple use; you may want to implement your own.
182 async fn adjusted_timestamp(&self) -> Result<Timestamp, Self::Err> {
183 let offset = self.load_offset().await?.unwrap_or_default();
184 Ok(Timestamp::now() + offset)
185 }
186
187 /// The implementation of the server endpoint.
188 ///
189 /// Do not override.
190 ///
191 /// Use this in your server endpoint implementation. Both [`Request`] and [`Response`] can be
192 /// parsed from and serialized to bytes. The endpoint should do as little else as possible to
193 /// avoid adding unnecessary latency.
194 async fn answer_client(&self, request: Request) -> Result<Response, Self::Err> {
195 Ok(Response {
196 client: request.client,
197 server: self.adjusted_timestamp().await?,
198 })
199 }
200
201 /// The main client state driver. Call this in a loop.
202 ///
203 /// You're expected to sleep for a while after calling this, or to run it on a schedule. Take
204 /// care to compute your schedule on your raw system monotonic clock or equivalent, so it does
205 /// not get influenced by the offset, which could make it jump around or even spin.
206 ///
207 /// If `load_offset()` returns `Ok(None)`, this method will attempt to `store_offset()` the
208 /// first delta it gets from the server. This lets you get an "accurate enough" timestamp
209 /// pretty quickly, instead of waiting for a full round of samples. Errors from that store are
210 /// ignored silently.
211 ///
212 /// If this returns `Ok(None)`, not enough samples were obtained to have enough confidence in
213 /// the result, likely because the `server_query()` method encountered an error for most tries.
214 /// Errors from `server_query()` are not returned, but instead are logged using tracing.
215 ///
216 /// Do not override.
217 ///
218 /// # Example
219 ///
220 /// ```ignore
221 /// loop {
222 /// match simp.attempt_sync(Settings::default()).await {
223 /// Err(err) => eprintln!("{err}"),
224 /// Ok(None) => eprintln!("did not get enough samples to have confidence"),
225 /// Ok(Some(offset)) => {
226 /// println!("Obtained offset: {offset:?}");
227 /// println!("The adjusted time is {}", simp.adjusted_timestamp().unwrap());
228 /// }
229 /// }
230 /// sleep(Duration::from_secs(60));
231 /// }
232 /// ```
233 async fn attempt_sync(
234 &mut self,
235 settings: Settings,
236 ) -> Result<Option<SignedDuration>, Self::Err> {
237 let Settings { samples, jitter } = settings.clamp();
238 let current_offset = self.load_offset().await?.unwrap_or_default();
239 tracing::trace!(?samples, ?current_offset, "starting delta collection");
240
241 let mut gap = Duration::ZERO;
242 let mut responses: Vec<Delta> = Vec::with_capacity(samples.into());
243 for _ in 0..settings.samples {
244 tracing::trace!(delay=?gap, max_jitter=?jitter, "sleeping to spread out requests");
245 Self::sleep(gap).await;
246
247 // compute the next gap before we query, so if query_server errors we don't immediately reloop
248 gap = Duration::from_nanos(rand::random_range(
249 0..=u64::try_from(jitter.as_nanos()).unwrap(),
250 ));
251 // UNWRAP: jitter has been clamped to 0..=10 seconds, so nanos will never reach u64::MAX
252
253 let response = match self
254 .query_server(Request {
255 client: Timestamp::now(),
256 })
257 .await
258 {
259 Ok(response) => response,
260 Err(err) => {
261 tracing::error!(?err, "query_server failed");
262 continue;
263 }
264 };
265
266 let Some(packet) = Delta::new(response, Timestamp::now()) else {
267 tracing::error!("local clock went backwards! skipping this sampling");
268 continue;
269 };
270
271 tracing::trace!(latency=?packet.latency, delta=?packet.delta, "obtained raw offset from server");
272 responses.push(packet);
273
274 if self.load_offset().await?.is_none() {
275 tracing::debug!(offset=?packet.delta, "no offset stored, storing initial delta");
276 let _ = self.store_offset(packet.delta).await;
277 }
278 }
279
280 if !responses.is_empty() && responses.len() % 2 == 0 {
281 // if we have an even number of responses, we need to discard one
282 // the first response is most likely to be an outlier due to connection establishment
283 responses.remove(0);
284 }
285
286 if responses.len() < 3 {
287 tracing::debug!(
288 count = responses.len(),
289 "not enough responses for confidence"
290 );
291 return Ok(None);
292 }
293
294 responses.sort_by_key(|r| r.latency);
295 let deltas = responses
296 .iter()
297 .map(|r| r.delta.as_millis_f64())
298 .collect::<Vec<_>>();
299 tracing::trace!(?deltas, "response deltas sorted by latency");
300
301 let median_idx = deltas.len() / 2;
302 let median = deltas[median_idx];
303
304 let mean: f64 = deltas.iter().copied().sum::<f64>() / deltas.len() as f64;
305 let variance: f64 = deltas
306 .iter()
307 .copied()
308 .map(|d| (d - mean).powi(2))
309 .sum::<f64>()
310 / ((deltas.len() - 1) as f64);
311 let stddev: f64 = variance.sqrt();
312 tracing::trace!(
313 ?median,
314 ?mean,
315 ?variance,
316 ?stddev,
317 "statistics about response deltas"
318 );
319
320 let inliers = deltas
321 .iter()
322 .copied()
323 .filter(|d| *d >= median - stddev && *d <= median + stddev)
324 .collect::<Vec<_>>();
325 tracing::trace!(?inliers, "eliminated outliers");
326
327 let offset = SignedDuration::from_micros(
328 ((inliers.iter().sum::<f64>() / (inliers.len() as f64)) * 1000.0) as i64,
329 );
330
331 tracing::debug!(?offset, "storing calculated offset");
332 self.store_offset(offset).await?;
333 Ok(Some(offset))
334 }
335}