use crate::ping::manager::PingManager;
use crate::timeline::sync::SyncTargetTimeline;
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::prelude::*;
use bevy_reflect::Reflect;
use bevy_time::{Real, Time};
use core::time::Duration;
use lightyear_connection::client::Connected;
use lightyear_core::prelude::Rollback;
use lightyear_core::tick::{Tick, TickDuration};
use lightyear_core::time::{TickDelta, TickInstant};
use lightyear_core::timeline::{NetworkTimeline, Timeline, TimelineConfig};
use lightyear_link::Linked;
use lightyear_transport::plugin::PacketReceived;
use tracing::trace;
#[derive(Component, Default, Debug, Reflect)]
#[require(RemoteTimeline)]
pub struct RemoteTimelineConfig;
#[derive(Debug, Reflect)]
pub struct RemoteEstimate {
received_packet: bool,
last_received_tick: Option<Tick>,
offset: TickDelta,
min_ema_alpha: f32,
max_ema_alpha: f32,
handshake_pings: u32,
first_estimate: bool,
}
impl Default for RemoteEstimate {
fn default() -> Self {
Self {
received_packet: false,
last_received_tick: None,
offset: TickDelta::from(0),
min_ema_alpha: 0.02,
max_ema_alpha: 0.10,
handshake_pings: 3,
first_estimate: true,
}
}
}
#[derive(Component, Default, Debug, Deref, DerefMut, Reflect)]
pub struct RemoteTimeline(Timeline<RemoteTimelineConfig>);
impl TimelineConfig for RemoteTimelineConfig {
type Context = RemoteEstimate;
type Timeline = RemoteTimeline;
}
impl RemoteTimeline {
pub fn last_received_tick(&self) -> Option<Tick> {
self.context.last_received_tick
}
pub(crate) fn update(
&mut self,
remote_tick: Tick,
ping_manager: &PingManager,
tick_duration: Duration,
) {
if ping_manager.latency_samples_recv() < self.handshake_pings {
trace!(
target: "lightyear_debug::sync",
kind = "remote_estimate_waiting_for_latency_samples",
schedule = "PreUpdate",
sample_point = "PreUpdate",
remote_tick = remote_tick.0,
latency_samples_recv = ping_manager.latency_samples_recv(),
required_latency_samples = self.handshake_pings,
"remote timeline estimate skipped until latency samples are available"
);
return;
}
if self
.context
.last_received_tick
.is_none_or(|previous_tick| remote_tick >= previous_tick)
{
self.context.received_packet = true;
self.context.last_received_tick = Some(remote_tick);
let network_delay = TickDelta::from_duration(ping_manager.rtt() / 2, tick_duration);
let new_estimate = TickInstant::from(remote_tick) + network_delay;
let ideal_estimate = self.now();
let raw_offset = new_estimate - ideal_estimate;
let old_offset = self.offset;
if self.context.first_estimate {
self.offset = raw_offset;
self.context.first_estimate = false;
} else {
let jitter_ms = ping_manager.jitter().as_millis() as f32;
let alpha = self.ema_alpha(jitter_ms);
let smoothed_offset = self.offset * (1.0 - alpha) + raw_offset * alpha;
trace!(?new_estimate, ?ideal_estimate, old_offset = ?self.offset, new_offset = ?smoothed_offset, ?jitter_ms, ?alpha, "Update RemoteTimeline offset");
self.offset = smoothed_offset;
}
trace!(
target: "lightyear_debug::sync",
kind = "remote_estimate_update",
schedule = "PreUpdate",
sample_point = "PreUpdate",
remote_tick = remote_tick.0,
estimated_tick = self.current_estimate().tick().0,
timeline_tick = self.tick().0,
rtt_ms = ping_manager.rtt().as_secs_f64() * 1000.0,
jitter_ms = ping_manager.jitter().as_secs_f64() * 1000.0,
network_delay = ?network_delay,
new_estimate = ?new_estimate,
ideal_estimate = ?ideal_estimate,
raw_offset = ?raw_offset,
old_offset = ?old_offset,
new_offset = ?self.offset,
"remote timeline estimate updated"
);
}
}
pub(crate) fn handle_connect(
trigger: On<Add, Connected>,
mut query: Query<&mut RemoteTimeline>,
) {
if let Ok(mut timeline) = query.get_mut(trigger.entity) {
timeline.received_packet = false;
timeline.offset = TickDelta::from(0);
timeline.first_estimate = true;
timeline.last_received_tick = None;
}
}
fn ema_alpha(&self, current_jitter_ms: f32) -> f32 {
const JITTER_THRESHOLD_LOW_MS: f32 = 1.0;
const JITTER_THRESHOLD_HIGH_MS: f32 = 10.0;
let jitter_range = JITTER_THRESHOLD_HIGH_MS - JITTER_THRESHOLD_LOW_MS;
let alpha_range = self.max_ema_alpha - self.min_ema_alpha;
let normalized_jitter = (current_jitter_ms - JITTER_THRESHOLD_LOW_MS) / jitter_range;
let clamped_normalized_jitter = normalized_jitter.clamp(0.0, 1.0);
self.max_ema_alpha - clamped_normalized_jitter * alpha_range
}
}
pub(crate) fn update_remote_timeline(
trigger: On<PacketReceived>,
tick_duration: Res<TickDuration>,
mut query: Query<(&mut RemoteTimeline, &PingManager)>,
) {
if let Ok((mut t, ping_manager)) = query.get_mut(trigger.entity) {
trace!(
"Received packet received with remote tick {:?}",
trigger.remote_tick
);
trace!(
target: "lightyear_debug::timeline",
kind = "packet_remote_tick",
schedule = "PreUpdate",
sample_point = "PreUpdate",
entity = ?trigger.entity,
remote_tick = trigger.remote_tick.0,
"packet carried remote timeline tick"
);
t.update(trigger.remote_tick, ping_manager, tick_duration.0);
}
}
pub(crate) fn advance_remote_timeline(
fixed_time: Res<Time<Real>>,
tick_duration: Res<TickDuration>,
mut query: Query<&mut RemoteTimeline, (With<Linked>, Without<Rollback>)>,
) {
let delta = fixed_time.delta();
query.iter_mut().for_each(|mut t| {
t.apply_duration(delta, tick_duration.0);
})
}
pub(crate) fn reset_received_packet_remote_timeline(
mut query: Query<&mut RemoteTimeline, With<Linked>>,
) {
query.iter_mut().for_each(|mut t| {
t.context.received_packet = false;
});
}
impl SyncTargetTimeline for RemoteTimeline {
fn current_estimate(&self) -> TickInstant {
self.now + self.offset
}
fn received_packet(&self) -> bool {
self.received_packet
}
}