use crate::ping::estimator::RttEstimatorEwma;
use crate::ping::message::{Ping, Pong};
use crate::ping::store::{PingId, PingStore};
use alloc::{vec, vec::Vec};
use bevy_ecs::component::Component;
use bevy_reflect::Reflect;
use bevy_time::{Real, Stopwatch, Time};
use core::time::Duration;
use lightyear_core::time::Instant;
use lightyear_core::time::TickDelta;
use lightyear_messages::prelude::{MessageReceiver, MessageSender};
use tracing::{error, trace};
#[derive(Clone, Copy, Debug, Reflect)]
pub struct PingConfig {
pub ping_interval: Duration,
}
impl Default for PingConfig {
fn default() -> Self {
PingConfig {
ping_interval: Duration::from_millis(100),
}
}
}
#[derive(Debug, Component)]
#[require(MessageSender<Ping>, MessageReceiver<Ping>, MessageSender<Pong>, MessageReceiver<Pong>)]
pub struct PingManager {
config: PingConfig,
ping_timer: Stopwatch,
ping_store: PingStore,
most_recent_received_ping: PingId,
pongs_to_send: Vec<(Pong, Instant)>,
pub rtt_estimator_ewma: RttEstimatorEwma,
pub(crate) pings_sent: u32,
pub pongs_recv: u32,
}
impl Default for PingManager {
fn default() -> Self {
Self {
config: PingConfig::default(),
ping_timer: Stopwatch::new(),
ping_store: PingStore::new(),
most_recent_received_ping: PingId(u16::MAX - 1),
pongs_to_send: vec![],
rtt_estimator_ewma: RttEstimatorEwma::default(),
pings_sent: 0,
pongs_recv: 0,
}
}
}
impl PingManager {
pub(crate) fn reset(&mut self) {
self.ping_timer.reset();
self.ping_store.reset();
self.most_recent_received_ping = PingId(u16::MAX - 1);
self.pongs_to_send.clear();
self.rtt_estimator_ewma.reset();
self.pings_sent = 0;
self.pongs_recv = 0;
}
}
impl PingManager {
pub fn new(config: PingConfig) -> Self {
Self {
config,
ping_timer: Stopwatch::new(),
ping_store: PingStore::new(),
most_recent_received_ping: PingId(u16::MAX - 1),
pongs_to_send: vec![],
rtt_estimator_ewma: RttEstimatorEwma::default(),
pings_sent: 0,
pongs_recv: 0,
}
}
pub fn rtt(&self) -> Duration {
self.rtt_estimator_ewma.final_stats.rtt
}
pub fn jitter(&self) -> Duration {
self.rtt_estimator_ewma.final_stats.jitter
}
pub(crate) fn update(&mut self, time: &Time<Real>) {
self.ping_timer.tick(time.delta());
}
pub(crate) fn maybe_prepare_ping(&mut self, now: Instant) -> Option<Ping> {
if self.ping_timer.elapsed() >= self.config.ping_interval {
self.ping_timer.reset();
let ping_id = self.ping_store.push_new(now);
self.pings_sent += 1;
return Some(Ping { id: ping_id });
}
None
}
pub(crate) fn process_pong(&mut self, pong: &Pong, now: Instant, tick_duration: Duration) {
self.pongs_recv += 1;
let received_time = now;
let Some(ping_sent_time) = self.ping_store.remove(pong.ping_id) else {
error!("Received a ping that is not present in the ping-store anymore");
return;
};
if pong.ping_id > self.most_recent_received_ping {
self.most_recent_received_ping = pong.ping_id;
let rtt = received_time.saturating_duration_since(ping_sent_time);
let server_process_time = TickDelta::from(pong.frame_time).to_duration(tick_duration);
trace!(?rtt, ?received_time, ?ping_sent_time, ?pong.frame_time, "process received pong");
let round_trip_delay = rtt.saturating_sub(server_process_time);
self.rtt_estimator_ewma
.update_with_new_sample(round_trip_delay);
}
}
pub(crate) fn buffer_pending_pong(&mut self, ping: &Ping, now: Instant) {
self.pongs_to_send.push((
Pong {
ping_id: ping.id,
frame_time: Default::default(),
},
now,
))
}
pub(crate) fn take_pending_pongs(&mut self) -> Vec<(Pong, Instant)> {
core::mem::take(&mut self.pongs_to_send)
}
}
#[cfg(test)]
mod tests {
}