lightyear_sync 0.27.0

IO primitives for the lightyear networking library
Documentation
//! Manages sending/receiving pings and computing network statistics
use crate::ping::estimator::RttEstimatorEwma;
use crate::ping::message::{Ping, Pong};
use crate::ping::store::{PingId, PingStore};
use alloc::{vec, vec::Vec};
use bevy_ecs::component::Component;
use bevy_reflect::Reflect;
use bevy_time::{Real, Stopwatch, Time};
use core::time::Duration;
use lightyear_core::time::Instant;
use lightyear_core::time::TickDelta;
use lightyear_messages::prelude::{MessageReceiver, MessageSender};
use tracing::{error, trace};

/// Config for the ping manager, which sends regular pings to the remote machine in order
/// to compute network statistics (RTT, jitter)
#[derive(Clone, Copy, Debug, Reflect)]
pub struct PingConfig {
    /// The duration to wait before sending a ping message to the remote host,
    /// in order to estimate RTT time
    pub ping_interval: Duration,
}

impl Default for PingConfig {
    fn default() -> Self {
        PingConfig {
            ping_interval: Duration::from_millis(100),
        }
    }
}

/// The [`PingManager`] is responsible for sending regular pings to the remote machine,
/// and monitor pongs in order to estimate statistics (rtt, jitter) about the connection.
#[derive(Debug, Component)]
#[require(MessageSender<Ping>, MessageReceiver<Ping>, MessageSender<Pong>, MessageReceiver<Pong>)]
pub struct PingManager {
    config: PingConfig,
    /// Timer to send regular pings to the remote
    ping_timer: Stopwatch,
    /// ping store to track which pings we sent
    ping_store: PingStore,
    /// ping id corresponding to the most recent pong received
    most_recent_received_ping: Option<PingId>,
    /// We received time-sync pongs; we keep track that we will have to send pongs back when we can
    /// (when the connection's send_timer is ready)
    pongs_to_send: Vec<(Pong, Instant)>,

    /// Estimator used to compute RTT/Jitter from pongs and packet acknowledgements.
    pub rtt_estimator_ewma: RttEstimatorEwma,
    /// The number of pings we have sent
    pub(crate) pings_sent: u32,
    /// The number of pongs we have received
    pub pongs_recv: u32,
    /// The number of latency samples received from pongs or packet acknowledgements.
    latency_samples_recv: u32,
}

impl Default for PingManager {
    fn default() -> Self {
        Self {
            config: PingConfig::default(),
            ping_timer: Stopwatch::new(),
            ping_store: PingStore::new(),
            most_recent_received_ping: None,
            pongs_to_send: vec![],
            rtt_estimator_ewma: RttEstimatorEwma::default(),
            pings_sent: 0,
            pongs_recv: 0,
            latency_samples_recv: 0,
        }
    }
}

impl PingManager {
    pub(crate) fn reset(&mut self) {
        self.ping_timer.reset();
        self.ping_store.reset();
        self.most_recent_received_ping = None;
        self.pongs_to_send.clear();
        self.rtt_estimator_ewma.reset();
        self.pings_sent = 0;
        self.pongs_recv = 0;
        self.latency_samples_recv = 0;
    }
}

impl PingManager {
    pub fn new(config: PingConfig) -> Self {
        Self {
            config,
            // pings
            ping_timer: Stopwatch::new(),
            ping_store: PingStore::new(),
            most_recent_received_ping: None,
            pongs_to_send: vec![],
            // sync
            rtt_estimator_ewma: RttEstimatorEwma::default(),
            pings_sent: 0,
            pongs_recv: 0,
            latency_samples_recv: 0,
        }
    }

    /// Return the latest estimate of rtt
    pub fn rtt(&self) -> Duration {
        self.rtt_estimator_ewma.final_stats.rtt
    }

    /// Return the latest estimate of jitter
    pub fn jitter(&self) -> Duration {
        self.rtt_estimator_ewma.final_stats.jitter
    }

    /// Return the number of RTT samples used to estimate latency.
    pub fn latency_samples_recv(&self) -> u32 {
        self.latency_samples_recv
    }

    /// Update the ping manager after a delta update
    pub(crate) fn update(&mut self, time: &Time<Real>) {
        self.ping_timer.tick(time.delta());
    }

    fn record_rtt_sample(&mut self, rtt_sample: Duration) {
        self.latency_samples_recv += 1;
        self.rtt_estimator_ewma.update_with_new_sample(rtt_sample);
    }

    /// Process an RTT sample measured from an ordinary packet acknowledgement.
    pub fn process_packet_rtt_sample(&mut self, rtt_sample: Duration) {
        self.record_rtt_sample(rtt_sample);
        trace!(
            target: "lightyear_debug::sync",
            kind = "packet_ack_rtt_sample",
            schedule = "PreUpdate",
            sample_point = "PreUpdate",
            rtt_sample_ms = rtt_sample.as_secs_f64() * 1000.0,
            latency_samples_recv = self.latency_samples_recv,
            estimated_rtt_ms = self.rtt().as_secs_f64() * 1000.0,
            jitter_ms = self.jitter().as_secs_f64() * 1000.0,
            "processed packet ack RTT sample"
        );
    }

    /// Check if we are ready to send a ping to the remote
    pub(crate) fn maybe_prepare_ping(&mut self, now: Instant) -> Option<Ping> {
        // TODO: should we have something to start sending a sync ping right away? (so we don't wait for initial timer)
        if self.ping_timer.elapsed() >= self.config.ping_interval {
            self.ping_timer.reset();

            let ping_id = self.ping_store.push_new(now);
            self.pings_sent += 1;
            trace!(
                target: "lightyear_debug::sync",
                kind = "ping_send",
                schedule = "PostUpdate",
                sample_point = "PostUpdate",
                ping_id = ping_id.0,
                pings_sent = self.pings_sent,
                "prepared ping"
            );
            return Some(Ping { id: ping_id });
        }
        None
    }

    /// Received a pong: update
    /// Returns true if we have enough pongs to finalize the handshake
    pub(crate) fn process_pong(&mut self, pong: &Pong, now: Instant, tick_duration: Duration) {
        self.pongs_recv += 1;
        let received_time = now;

        let Some(ping_sent_time) = self.ping_store.remove(pong.ping_id) else {
            error!("Received a ping that is not present in the ping-store anymore");
            return;
        };

        // only update values for the most recent pongs received
        if self
            .most_recent_received_ping
            .is_none_or(|latest| pong.ping_id > latest)
        {
            // compute round-trip delay via NTP algorithm: https://en.wikipedia.org/wiki/Network_Time_Protocol
            self.most_recent_received_ping = Some(pong.ping_id);

            // round-trip-delay
            let rtt = received_time.saturating_duration_since(ping_sent_time);
            let server_process_time = TickDelta::from(pong.frame_time).to_duration(tick_duration);
            trace!(?rtt, ?received_time, ?ping_sent_time, ?pong.frame_time,  "process received pong");
            let round_trip_delay = rtt.saturating_sub(server_process_time);

            // recompute stats whenever we get a new pong
            self.record_rtt_sample(round_trip_delay);
            trace!(
                target: "lightyear_debug::sync",
                kind = "pong_recv",
                schedule = "PreUpdate",
                sample_point = "PreUpdate",
                ping_id = pong.ping_id.0,
                pongs_recv = self.pongs_recv,
                latency_samples_recv = self.latency_samples_recv,
                rtt_ms = rtt.as_secs_f64() * 1000.0,
                server_process_ms = server_process_time.as_secs_f64() * 1000.0,
                round_trip_delay_ms = round_trip_delay.as_secs_f64() * 1000.0,
                estimated_rtt_ms = self.rtt().as_secs_f64() * 1000.0,
                jitter_ms = self.jitter().as_secs_f64() * 1000.0,
                "processed pong"
            );
        }
    }

    /// When we receive a Ping, we prepare a Pong in response.
    /// However we cannot send it immediately because we send packets at a regular interval
    /// Keep track of the pongs we need to send
    pub(crate) fn buffer_pending_pong(&mut self, ping: &Ping, now: Instant) {
        trace!(
            target: "lightyear_debug::sync",
            kind = "ping_recv",
            schedule = "PreUpdate",
            sample_point = "PreUpdate",
            ping_id = ping.id.0,
            pending_pongs = self.pongs_to_send.len() + 1,
            "buffered pong response"
        );
        self.pongs_to_send.push((
            Pong {
                ping_id: ping.id,
                frame_time: Default::default(),
            },
            now,
        ))
    }

    pub(crate) fn take_pending_pongs(&mut self) -> Vec<(Pong, Instant)> {
        core::mem::take(&mut self.pongs_to_send)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn packet_ack_rtt_samples_update_latency_stats_without_pongs() {
        let mut ping_manager = PingManager::default();

        ping_manager.process_packet_rtt_sample(Duration::from_millis(50));

        assert_eq!(ping_manager.pongs_recv, 0);
        assert_eq!(ping_manager.latency_samples_recv(), 1);
        assert_eq!(ping_manager.rtt(), Duration::from_millis(50));
        assert_eq!(
            ping_manager.jitter(),
            Duration::from_millis(12) + Duration::from_micros(500)
        );
    }

    // #[test]
    // fn test_send_pings() {
    //     let config = PingConfig {
    //         ping_interval: Duration::from_millis(100),
    //         stats_buffer_duration: Duration::from_secs(4),
    //     };
    //     let mut ping_manager = PingManager::new(config, Duration::default());
    //     let mut real = Time::<Real>::default();
    //     real.update();
    //
    //     assert_eq!(ping_manager.maybe_prepare_ping(real.last_update().unwrap()), None);
    //
    //     let delta = Duration::from_millis(100);
    //     real.update_with_duration(delta);
    //     ping_manager.update(&real);
    //
    //     // send pings
    //     assert_eq!(
    //         ping_manager.maybe_prepare_ping(real.last_update().unwrap()),
    //         Some(Ping { id: PingId(0) })
    //     );
    //     let delta = Duration::from_millis(60);
    //     real.update_with_duration(delta);
    //     ping_manager.update(&real);
    //
    //     // ping timer hasn't gone off yet, send nothing
    //     assert_eq!(ping_manager.maybe_prepare_ping(real.last_update().unwrap()), None);
    //     real.update_with_duration(delta);
    //     ping_manager.update(&real);
    //     assert_eq!(
    //         ping_manager.maybe_prepare_ping(real.last_update().unwrap()),
    //         Some(Ping { id: PingId(1) })
    //     );
    //
    //     let delta = Duration::from_millis(100);
    //     real.update_with_duration(delta);
    //     ping_manager.update(&real);
    //     assert_eq!(
    //         ping_manager.maybe_prepare_ping(real.last_update().unwrap()),
    //         Some(Ping { id: PingId(2) })
    //     );
    //
    //     // we sent all the pings we need
    //     assert_eq!(ping_manager.maybe_prepare_ping(real.last_update().unwrap()), None);
    //
    //     // check ping store
    //     assert_eq!(
    //         ping_manager.ping_store.remove(PingId(0)),
    //         Some(Duration::from_millis(100))
    //     );
    //     assert_eq!(
    //         ping_manager.ping_store.remove(PingId(1)),
    //         Some(Duration::from_millis(220))
    //     );
    //     assert_eq!(
    //         ping_manager.ping_store.remove(PingId(2)),
    //         Some(Duration::from_millis(320))
    //     );
    //
    //     // receive pongs
    //     // TODO
    // }

    // #[test]
    // fn test_ping_manager() {
    //     let ping_config = PingConfig {
    //         ping_interval_ms: Duration::from_millis(100),
    //         rtt_ms_initial_estimate: Duration::from_millis(10),
    //         jitter_ms_initial_estimate: Default::default(),
    //         rtt_smoothing_factor: 0.0,
    //     };
    //     let mut ping_manager = PingManager::new(&ping_config);
    //     // let tick_config = TickConfig::new(Duration::from_millis(16));
    //     let mut time_manager = TimeManager::new(Duration::default());
    //
    //     assert!(!ping_manager.should_send_ping());
    //     let delta = Duration::from_millis(100);
    //     ping_manager.update(delta);
    //     time_manager.update(delta, Duration::default());
    //     assert!(ping_manager.should_send_ping());
    //
    //     let ping_message = ping_manager.prepare_ping(&time_manager);
    //     assert!(!ping_manager.should_send_ping());
    //     assert_eq!(ping_message.id, PingId(0));
    //
    //     let delta = Duration::from_millis(20);
    //     ping_manager.update(delta);
    //     time_manager.update(delta, Duration::default());
    //     let pong_message = Pong {
    //         ping_id: PingId(0),
    //         tick: Default::default(),
    //         offset_sec: 0.0,
    //     };
    //     ping_manager.process_pong(pong_message, &time_manager);
    //
    //     assert_eq!(ping_manager.rtt_ms_average, 0.9 * 10.0 + 0.1 * 20.0);
    //     assert_eq!(ping_manager.jitter_ms_average, 0.9 * 0.0 + 0.1 * 5.0);
    // }
}