use crate::ping::manager::PingManager;
use crate::plugin::SyncSystems;
use bevy_app::{App, Last, Plugin, PostUpdate};
use bevy_ecs::prelude::*;
use bevy_reflect::Reflect;
use bevy_time::{Fixed, Time, Virtual};
use bevy_utils::prelude::DebugName;
use core::time::Duration;
use lightyear_connection::client::{Connected, Disconnected};
use lightyear_connection::host::HostClient;
use lightyear_core::prelude::{LocalTimeline, NetworkTimelinePlugin};
use lightyear_core::tick::TickDuration;
use lightyear_core::time::{Overstep, TickInstant};
use lightyear_core::timeline::{NetworkTimeline, SyncEvent};
#[allow(unused_imports)]
use tracing::{debug, info, trace};
#[derive(Component, Debug)]
pub struct IsSynced<T> {
pub(crate) marker: core::marker::PhantomData<T>,
}
impl<T> Default for IsSynced<T> {
fn default() -> Self {
IsSynced {
marker: core::marker::PhantomData,
}
}
}
pub trait SyncedTimeline: NetworkTimeline {
fn sync_objective<Remote: SyncTargetTimeline>(
&self,
other: &Remote,
config: &Self::Config,
ping_manager: &PingManager,
tick_duration: Duration,
) -> TickInstant;
fn resync(&mut self, sync_objective: TickInstant) -> i16;
fn sync<Remote: SyncTargetTimeline>(
&mut self,
main: &Remote,
config: &Self::Config,
ping_manager: &PingManager,
tick_duration: Duration,
) -> Option<i16>;
fn is_synced(&self) -> bool;
fn relative_speed(&self) -> f32;
fn set_relative_speed(&mut self, ratio: f32);
fn reset(&mut self);
}
pub trait SyncTargetTimeline: NetworkTimeline + Default {
fn current_estimate(&self) -> TickInstant;
fn received_packet(&self) -> bool;
}
#[derive(Clone, Copy, Debug, Reflect)]
pub struct SyncConfig {
pub jitter_multiple: u8,
pub jitter_margin: Duration,
pub handshake_pings: u8,
pub error_margin: f32,
pub max_error_margin: f32,
pub consecutive_errors: u8,
pub previous_error_sign: bool,
pub consecutive_errors_threshold: u8,
pub speedup_factor: f32,
}
impl SyncConfig {
pub(crate) fn jitter_margin(&self, jitter: Duration) -> Duration {
jitter * self.jitter_multiple as u32 + self.jitter_margin
}
}
impl Default for SyncConfig {
fn default() -> Self {
SyncConfig {
jitter_multiple: 4,
jitter_margin: Duration::from_millis(5),
handshake_pings: 3,
error_margin: 1.0,
max_error_margin: 10.0,
consecutive_errors: 0,
previous_error_sign: true,
consecutive_errors_threshold: 3,
speedup_factor: 1.05,
}
}
}
#[derive(Debug, Reflect)]
pub struct SyncContext {
pub consecutive_errors: u8,
pub previous_error_sign: bool,
}
impl Default for SyncContext {
fn default() -> Self {
Self {
consecutive_errors: 0,
previous_error_sign: true,
}
}
}
#[derive(Debug)]
pub enum SyncAdjustment {
Resync,
SpeedAdjust(f32),
DoNothing,
}
impl SyncContext {
pub fn speed_adjustment(&mut self, config: &SyncConfig, offset: f32) -> SyncAdjustment {
let current_error_sign = offset.is_sign_positive();
let previous_error_sign = self.previous_error_sign;
self.previous_error_sign = current_error_sign;
if offset.abs() > config.max_error_margin {
self.consecutive_errors = 0;
SyncAdjustment::Resync
} else if offset.abs() > config.error_margin {
self.consecutive_errors = self.consecutive_errors.saturating_add(1);
if (current_error_sign ^ previous_error_sign)
|| self.consecutive_errors < config.consecutive_errors_threshold
{
self.previous_error_sign = current_error_sign;
return SyncAdjustment::DoNothing;
}
let base_factor = config.speedup_factor - 1.0;
let error_ratio = (offset.abs() / config.max_error_margin).clamp(0.0, 1.0);
let adjustment = 1.0 + (base_factor * error_ratio * 2.0);
let ratio = if offset > 0.0 {
1.0 / adjustment
} else {
adjustment
};
SyncAdjustment::SpeedAdjust(ratio)
} else {
self.consecutive_errors = 0;
SyncAdjustment::DoNothing
}
}
}
pub struct SyncedTimelinePlugin<Synced, Remote, const DRIVING: bool = false> {
pub(crate) _marker: core::marker::PhantomData<(Synced, Remote)>,
}
impl<Synced: SyncedTimeline, Remote: SyncTargetTimeline, const DRIVING: bool>
SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
pub(crate) fn handle_connect(
trigger: On<Add, Connected>,
local_timeline: Res<LocalTimeline>,
mut query: Query<&mut Synced>,
) {
if let Ok(mut timeline) = query.get_mut(trigger.entity) {
timeline.reset();
if DRIVING {
trace!("Set Driving timeline tick to LocalTimeline");
let delta = local_timeline.tick() - timeline.tick();
timeline.apply_delta(delta.into());
}
}
}
pub(crate) fn handle_host_client(trigger: On<Add, HostClient>, mut commands: Commands) {
commands
.entity(trigger.entity)
.insert(IsSynced::<Synced>::default());
}
pub(crate) fn handle_disconnect(trigger: On<Add, Disconnected>, mut commands: Commands) {
commands.entity(trigger.entity).remove::<IsSynced<Synced>>();
}
pub(crate) fn sync_from_local_timeline(
local_timeline: Res<LocalTimeline>,
fixed_time: Res<Time<Fixed>>,
mut query: Query<&mut Synced>,
) {
let tick = local_timeline.tick();
let overstep = fixed_time.overstep_fraction();
query.iter_mut().for_each(|mut synced| {
synced.set_now(TickInstant::from_tick_and_overstep(
tick,
Overstep::from_f32(overstep),
));
});
}
pub(crate) fn update_virtual_time(
mut virtual_time: ResMut<Time<Virtual>>,
query: Query<&Synced, (With<IsSynced<Synced>>, With<Connected>, Without<HostClient>)>,
) {
if let Ok(timeline) = query.single() {
trace!(
"Timeline {} sets the virtual time relative speed to {}",
DebugName::type_name::<Synced>(),
timeline.relative_speed()
);
virtual_time.set_relative_speed(timeline.relative_speed());
}
}
pub(crate) fn sync_timelines(
tick_duration: Res<TickDuration>,
mut commands: Commands,
mut query: Query<
(
Entity,
&mut Synced,
&Synced::Config,
&Remote,
&PingManager,
Has<IsSynced<Synced>>,
),
(With<Connected>, Without<HostClient>),
>,
) {
query.iter_mut().for_each(
|(entity, mut sync_timeline, config, main_timeline, ping_manager, has_is_synced)| {
trace!(
?entity,
?has_is_synced,
"In SyncTimelines from {:?} to {:?}",
DebugName::type_name::<Synced>(),
DebugName::type_name::<Remote>()
);
if !main_timeline.received_packet() {
return;
}
if !has_is_synced && sync_timeline.is_synced() {
debug!(
"Timeline {:?} is synced to {:?}",
DebugName::type_name::<Synced>(),
DebugName::type_name::<Remote>()
);
commands
.entity(entity)
.insert(IsSynced::<Synced>::default());
}
if let Some(tick_delta) =
sync_timeline.sync(main_timeline, config, ping_manager, tick_duration.0)
{
commands.trigger(SyncEvent::<Synced::Config>::new(entity, tick_delta));
}
},
)
}
pub(crate) fn handle_sync_event(
trigger: On<SyncEvent<Synced::Config>>,
mut local_timeline: ResMut<LocalTimeline>,
) {
local_timeline.apply_delta(trigger.tick_delta);
let new_tick = local_timeline.tick();
debug!(
tick_delta = ?trigger.tick_delta,
?new_tick,
"Apply delta to LocalTimeline from driving pipeline {:?}'s SyncEvent", DebugName::type_name::<Synced>()
);
}
}
impl<Synced, Remote, const DRIVING: bool> Default
for SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
fn default() -> Self {
Self {
_marker: core::marker::PhantomData,
}
}
}
impl<Synced: SyncedTimeline, Remote: SyncTargetTimeline, const DRIVING: bool> Plugin
for SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
fn build(&self, app: &mut App) {
app.add_plugins(NetworkTimelinePlugin::<Synced>::default());
app.register_required_components::<Synced, PingManager>();
app.register_required_components::<Synced, Remote>();
app.add_observer(Self::handle_connect);
app.add_observer(Self::handle_host_client);
app.add_observer(Self::handle_disconnect);
app.add_systems(PostUpdate, Self::sync_timelines.in_set(SyncSystems::Sync));
if DRIVING {
app.add_systems(
PostUpdate,
Self::sync_from_local_timeline
.in_set(SyncSystems::Sync)
.before(Self::sync_timelines),
);
app.add_systems(Last, Self::update_virtual_time);
app.add_observer(Self::handle_sync_event);
}
}
}