use std::time::Duration;
pub use jiff::{SignedDuration, Timestamp};
mod delta;
use delta::*;
mod messages;
pub use messages::*;
mod settings;
pub use settings::*;
#[allow(async_fn_in_trait)]
pub trait Timesimp {
type Err: std::error::Error;
async fn load_offset(&self) -> Result<Option<SignedDuration>, Self::Err>;
async fn store_offset(&mut self, offset: SignedDuration) -> Result<(), Self::Err>;
async fn query_server(&self, request: Request) -> Result<Response, Self::Err>;
async fn sleep(duration: Duration);
async fn adjusted_timestamp(&self) -> Result<Timestamp, Self::Err> {
let offset = self.load_offset().await?.unwrap_or_default();
Ok(Timestamp::now() + offset)
}
async fn answer_client(&self, request: Request) -> Result<Response, Self::Err> {
Ok(Response {
client: request.client,
server: self.adjusted_timestamp().await?,
})
}
async fn attempt_sync(
&mut self,
settings: Settings,
) -> Result<Option<SignedDuration>, Self::Err> {
let Settings { samples, jitter } = settings.clamp();
let current_offset = self.load_offset().await?.unwrap_or_default();
tracing::trace!(?samples, ?current_offset, "starting delta collection");
let mut gap = Duration::ZERO;
let mut responses: Vec<Delta> = Vec::with_capacity(samples.into());
for _ in 0..settings.samples {
tracing::trace!(delay=?gap, max_jitter=?jitter, "sleeping to spread out requests");
Self::sleep(gap).await;
gap = Duration::from_nanos(rand::random_range(
0..=u64::try_from(jitter.as_nanos()).unwrap(),
));
let response = match self
.query_server(Request {
client: Timestamp::now(),
})
.await
{
Ok(response) => response,
Err(err) => {
tracing::error!(?err, "query_server failed");
continue;
}
};
let Some(packet) = Delta::new(response, Timestamp::now()) else {
tracing::error!("local clock went backwards! skipping this sampling");
continue;
};
tracing::trace!(latency=?packet.latency, delta=?packet.delta, "obtained raw offset from server");
responses.push(packet);
if self.load_offset().await?.is_none() {
tracing::debug!(offset=?packet.delta, "no offset stored, storing initial delta");
let _ = self.store_offset(packet.delta).await;
}
}
if !responses.is_empty() && responses.len() % 2 == 0 {
responses.remove(0);
}
if responses.len() < 3 {
tracing::debug!(
count = responses.len(),
"not enough responses for confidence"
);
return Ok(None);
}
responses.sort_by_key(|r| r.latency);
let deltas = responses
.iter()
.map(|r| r.delta.as_millis_f64())
.collect::<Vec<_>>();
tracing::trace!(?deltas, "response deltas sorted by latency");
let median_idx = deltas.len() / 2;
let median = deltas[median_idx];
let mean: f64 = deltas.iter().copied().sum::<f64>() / deltas.len() as f64;
let variance: f64 = deltas
.iter()
.copied()
.map(|d| (d - mean).powi(2))
.sum::<f64>()
/ ((deltas.len() - 1) as f64);
let stddev: f64 = variance.sqrt();
tracing::trace!(
?median,
?mean,
?variance,
?stddev,
"statistics about response deltas"
);
let inliers = deltas
.iter()
.copied()
.filter(|d| *d >= median - stddev && *d <= median + stddev)
.collect::<Vec<_>>();
tracing::trace!(?inliers, "eliminated outliers");
let offset = SignedDuration::from_micros(
((inliers.iter().sum::<f64>() / (inliers.len() as f64)) * 1000.0) as i64,
);
tracing::debug!(?offset, "storing calculated offset");
self.store_offset(offset).await?;
Ok(Some(offset))
}
}