timesimp/
lib.rs

1//! Simple sans-io timesync client and server.
2//!
3//! Timesimp is based on the averaging method described in [Simpson (2002), A Stream-based Time
4//! Synchronization Technique For Networked Computer Games][paper], but with a corrected delta
5//! calculation. Compared to NTP, it's a simpler and less accurate time synchronisation algorithm
6//! that is usable over network streams, rather than datagrams. Simpson asserts they were able to
7//! achieve accuracies of 100ms or better, which is sufficient in many cases; my testing gets
8//! accuracies well below 5ms. The main limitation of the algorithm is that round-trip-time is
9//! assumed to be symmetric: if the forward trip time is different from the return trip time, then
10//! an error is induced equal to the value of the difference in trip times.
11//!
12//! This library provides a sans-io implementation: you bring in your async runtime, your transport,
13//! and your storage; timesimp gives you time offsets.
14//!
15//! If the local clock goes backward during a synchronisation, the invalid delta is discarded; this
16//! may cause the sync attempt to fail, especially if the `samples` count is lowered to its minimum
17//! of 3. This is a deliberate design decision: you should handle failure and retry, and the sync
18//! will proceed correctly when the clock is stable.
19//!
20//! [paper]: https://web.archive.org/web/20160310125700/http://mine-control.com/zack/timesync/timesync.html
21//!
22//! # Example
23//!
24//! ```no_run
25//! use std::{convert::Infallible, time::Duration};
26//! use reqwest::{Client, Url};
27//! use timesimp::{SignedDuration, Timesimp};
28//!
29//! struct ServerSimp;
30//! impl Timesimp for ServerSimp {
31//!     type Err = Infallible;
32//!
33//!     async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err> {
34//!         // server time is correct
35//!         Ok(Some(SignedDuration::ZERO))
36//!     }
37//!
38//!     async fn store_offset(&mut self, _offset: SignedDuration) -> Result<(), Self::Err> {
39//!         // as time is correct, no need to store offset
40//!         unimplemented!()
41//!     }
42//!
43//!     async fn query_server(
44//!         &self,
45//!         _request: timesimp::Request,
46//!     ) -> Result<timesimp::Response, Self::Err> {
47//!         // server has no upstream timesimp
48//!         unimplemented!()
49//!     }
50//!
51//!     async fn sleep(duration: std::time::Duration) {
52//!         tokio::time::sleep(duration).await;
53//!     }
54//! }
55//!
56//! // Not shown: serving ServerSimp from a URL
57//!
58//! struct ClientSimp {
59//!     offset: Option<SignedDuration>,
60//!     url: Url,
61//! }
62//!
63//! impl Timesimp for ClientSimp {
64//!     type Err = reqwest::Error;
65//!
66//!     async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err> {
67//!         Ok(self.offset)
68//!     }
69//!
70//!     async fn store_offset(&mut self, offset: SignedDuration) -> Result<(), Self::Err> {
71//!         self.offset = Some(offset);
72//!         Ok(())
73//!     }
74//!
75//!     async fn query_server(
76//!         &self,
77//!         request: timesimp::Request,
78//!     ) -> Result<timesimp::Response, Self::Err> {
79//!         let resp = Client::new()
80//!             .post(self.url.clone())
81//!             .body(request.to_bytes().to_vec())
82//!             .send()
83//!             .await?
84//!             .error_for_status()?
85//!             .bytes()
86//!             .await?;
87//!         Ok(timesimp::Response::try_from(&resp[..]).unwrap())
88//!     }
89//!
90//!     async fn sleep(duration: std::time::Duration) {
91//!         tokio::time::sleep(duration).await;
92//!     }
93//! }
94//!
95//! #[tokio::main]
96//! async fn main() {
97//!     let mut client = ClientSimp {
98//!         offset: None,
99//!         url: "https://timesimp.server".try_into().unwrap(),
100//!     };
101//!
102//!     loop {
103//!         if let Ok(Some(offset)) = client.attempt_sync(Default::default()).await {
104//!             println!(
105//!                 "Received offset: {offset:?}; current time is {}",
106//!                 client.adjusted_timestamp().await.unwrap(),
107//!             );
108//!         }
109//!         tokio::time::sleep(Duration::from_secs(300)).await;
110//!     }
111//! }
112//! ```
113
114use std::time::Duration;
115
116pub use jiff::{SignedDuration, Timestamp};
117
118mod delta;
119use delta::*;
120
121mod messages;
122pub use messages::*;
123
124mod settings;
125pub use settings::*;
126
127/// A time sync client and/or server.
128///
129/// You must implement the four required functions and not override the others.
130///
131/// Then, use `answer_client()` to implement a time sync server, and/or use `attempt_sync()` to
132/// implement a time sync client.
133#[allow(async_fn_in_trait)]
134pub trait Timesimp {
135    /// Error for your required methods.
136    type Err: std::error::Error;
137
138    /// Load the current time offset.
139    ///
140    /// This must return the current stored time offset, or `None` if no time offset is currently
141    /// stored.
142    ///
143    /// As this expects a `SignedDuration`, you are free to load whatever precision you wish, so
144    /// long as `store_offset()` agrees. Microseconds should be enough for most purposes.
145    async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err>;
146
147    /// Store the current time offset.
148    ///
149    /// This must store the given time offset, typically in some kind of database.
150    ///
151    /// As this is given a `SignedDuration`, you are free to store whatever precision you wish, so
152    /// long as `load_offset()` agrees. Microseconds should be enough for most purposes.
153    ///
154    /// Additionally, once `store_offset` has been called once, `load_offset` should return `Some`.
155    async fn store_offset(&mut self, offset: SignedDuration) -> Result<(), Self::Err>;
156
157    /// Query a timesimp server endpoint.
158    ///
159    /// This must query in some manner a timesimp server, by sending the given [`Request`] and
160    /// obtaining a [`Response`]. Both [`Request`] and [`Response`] can be parsed from and
161    /// serialized to bytes. The query implementation should do as little else as possible to
162    /// avoid adding unnecessary latency.
163    ///
164    /// If using a connecting protocol, such as TCP or QUIC, it's recommended to keep the
165    /// connection alive if practicable, with a timeout longer than the
166    /// [`Settings.jitter`](Settings) value. That should result in all but the first sample being
167    /// approximately a single round trip, eliminating the handshake delay.
168    async fn query_server(&self, request: Request) -> Result<Response, Self::Err>;
169
170    /// Sleep for a [`Duration`].
171    ///
172    /// This is usually something like `tokio::time::sleep` or equivalent.
173    async fn sleep(duration: Duration);
174
175    /// Obtain an adjusted timestamp.
176    ///
177    /// Do not override.
178    ///
179    /// This simply loads the offset and applies it to the current local timestamp.
180    ///
181    /// It is provided as convenience for simple use; you may want to implement your own.
182    async fn adjusted_timestamp(&self) -> Result<Timestamp, Self::Err> {
183        let offset = self.load_offset().await?.unwrap_or_default();
184        Ok(Timestamp::now() + offset)
185    }
186
187    /// The implementation of the server endpoint.
188    ///
189    /// Do not override.
190    ///
191    /// Use this in your server endpoint implementation. Both [`Request`] and [`Response`] can be
192    /// parsed from and serialized to bytes. The endpoint should do as little else as possible to
193    /// avoid adding unnecessary latency.
194    async fn answer_client(&self, request: Request) -> Result<Response, Self::Err> {
195        Ok(Response {
196            client: request.client,
197            server: self.adjusted_timestamp().await?,
198        })
199    }
200
201    /// The main client state driver. Call this in a loop.
202    ///
203    /// You're expected to sleep for a while after calling this, or to run it on a schedule. Take
204    /// care to compute your schedule on your raw system monotonic clock or equivalent, so it does
205    /// not get influenced by the offset, which could make it jump around or even spin.
206    ///
207    /// If `load_offset()` returns `Ok(None)`, this method will attempt to `store_offset()` the
208    /// first delta it gets from the server. This lets you get an "accurate enough" timestamp
209    /// pretty quickly, instead of waiting for a full round of samples. Errors from that store are
210    /// ignored silently.
211    ///
212    /// If this returns `Ok(None)`, not enough samples were obtained to have enough confidence in
213    /// the result, likely because the `server_query()` method encountered an error for most tries.
214    /// Errors from `server_query()` are not returned, but instead are logged using tracing.
215    ///
216    /// Do not override.
217    ///
218    /// # Example
219    ///
220    /// ```ignore
221    /// loop {
222    ///     match simp.attempt_sync(Settings::default()).await {
223    ///         Err(err) => eprintln!("{err}"),
224    ///         Ok(None) => eprintln!("did not get enough samples to have confidence"),
225    ///         Ok(Some(offset)) => {
226    ///             println!("Obtained offset: {offset:?}");
227    ///             println!("The adjusted time is {}", simp.adjusted_timestamp().unwrap());
228    ///         }
229    ///     }
230    ///     sleep(Duration::from_secs(60));
231    /// }
232    /// ```
233    async fn attempt_sync(
234        &mut self,
235        settings: Settings,
236    ) -> Result<Option<SignedDuration>, Self::Err> {
237        let Settings { samples, jitter } = settings.clamp();
238        let current_offset = self.load_offset().await?.unwrap_or_default();
239        tracing::trace!(?samples, ?current_offset, "starting delta collection");
240
241        let mut gap = Duration::ZERO;
242        let mut responses: Vec<Delta> = Vec::with_capacity(samples.into());
243        for _ in 0..settings.samples {
244            tracing::trace!(delay=?gap, max_jitter=?jitter, "sleeping to spread out requests");
245            Self::sleep(gap).await;
246
247            // compute the next gap before we query, so if query_server errors we don't immediately reloop
248            gap = Duration::from_nanos(rand::random_range(
249                0..=u64::try_from(jitter.as_nanos()).unwrap(),
250            ));
251            // UNWRAP: jitter has been clamped to 0..=10 seconds, so nanos will never reach u64::MAX
252
253            let response = match self
254                .query_server(Request {
255                    client: Timestamp::now(),
256                })
257                .await
258            {
259                Ok(response) => response,
260                Err(err) => {
261                    tracing::error!(?err, "query_server failed");
262                    continue;
263                }
264            };
265
266            let Some(packet) = Delta::new(response, Timestamp::now()) else {
267                tracing::error!("local clock went backwards! skipping this sampling");
268                continue;
269            };
270
271            tracing::trace!(latency=?packet.latency, delta=?packet.delta, "obtained raw offset from server");
272            responses.push(packet);
273
274            if self.load_offset().await?.is_none() {
275                tracing::debug!(offset=?packet.delta, "no offset stored, storing initial delta");
276                let _ = self.store_offset(packet.delta).await;
277            }
278        }
279
280        if !responses.is_empty() && responses.len() % 2 == 0 {
281            // if we have an even number of responses, we need to discard one
282            // the first response is most likely to be an outlier due to connection establishment
283            responses.remove(0);
284        }
285
286        if responses.len() < 3 {
287            tracing::debug!(
288                count = responses.len(),
289                "not enough responses for confidence"
290            );
291            return Ok(None);
292        }
293
294        responses.sort_by_key(|r| r.latency);
295        let deltas = responses
296            .iter()
297            .map(|r| r.delta.as_millis_f64())
298            .collect::<Vec<_>>();
299        tracing::trace!(?deltas, "response deltas sorted by latency");
300
301        let median_idx = deltas.len() / 2;
302        let median = deltas[median_idx];
303
304        let mean: f64 = deltas.iter().copied().sum::<f64>() / deltas.len() as f64;
305        let variance: f64 = deltas
306            .iter()
307            .copied()
308            .map(|d| (d - mean).powi(2))
309            .sum::<f64>()
310            / ((deltas.len() - 1) as f64);
311        let stddev: f64 = variance.sqrt();
312        tracing::trace!(
313            ?median,
314            ?mean,
315            ?variance,
316            ?stddev,
317            "statistics about response deltas"
318        );
319
320        let inliers = deltas
321            .iter()
322            .copied()
323            .filter(|d| *d >= median - stddev && *d <= median + stddev)
324            .collect::<Vec<_>>();
325        tracing::trace!(?inliers, "eliminated outliers");
326
327        let offset = SignedDuration::from_micros(
328            ((inliers.iter().sum::<f64>() / (inliers.len() as f64)) * 1000.0) as i64,
329        );
330
331        tracing::debug!(?offset, "storing calculated offset");
332        self.store_offset(offset).await?;
333        Ok(Some(offset))
334    }
335}