1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
use crate::ping::manager::PingManager;
use crate::plugin::SyncSystems;
use bevy_app::{App, Last, Plugin, PostUpdate};
use bevy_ecs::prelude::*;
use bevy_reflect::Reflect;
use bevy_time::{Fixed, Time, Virtual};
use bevy_utils::prelude::DebugName;
use core::time::Duration;
use lightyear_connection::client::{Connected, Disconnected};
use lightyear_connection::host::HostClient;
use lightyear_core::prelude::{LocalTimeline, NetworkTimelinePlugin};
use lightyear_core::tick::TickDuration;
use lightyear_core::time::{Overstep, TickInstant};
use lightyear_core::timeline::{NetworkTimeline, SyncEvent};
#[allow(unused_imports)]
use tracing::{debug, info, trace};
/// Marker component to indicate that the timeline has been synced
#[derive(Component, Debug)]
pub struct IsSynced<T> {
pub(crate) marker: core::marker::PhantomData<T>,
}
impl<T> Default for IsSynced<T> {
fn default() -> Self {
IsSynced {
marker: core::marker::PhantomData,
}
}
}
/// Timeline that is synced to another timeline
pub trait SyncedTimeline: NetworkTimeline {
/// Get the ideal [`TickInstant`] that this timeline should be at
fn sync_objective<Remote: SyncTargetTimeline>(
&self,
other: &Remote,
config: &Self::Config,
ping_manager: &PingManager,
tick_duration: Duration,
) -> TickInstant;
/// Resync the timeline if they are too out of sync. Returns the number of tick deltas
/// that should be applied
fn resync(&mut self, sync_objective: TickInstant) -> i32;
/// Sync the current timeline to the other timeline T.
/// Usually this is achieved by slightly speeding up or slowing down the current timeline.
/// If there is a big discrepancy we can do a `resync` instead.
///
/// Returns the number of delta ticks that should be applied
// TODO: should we use LinkStats instead of PingManager? and PingManager is a way to update the LinkStats?
fn sync<Remote: SyncTargetTimeline>(
&mut self,
main: &Remote,
config: &Self::Config,
ping_manager: &PingManager,
tick_duration: Duration,
) -> Option<i32>;
fn is_synced(&self) -> bool;
/// Returns the speed of your timeline relative to your system clock as an `f32`.
/// A value of `1.0` means the timeline is running at normal speed.
/// A value of `0.5` means the timeline is running at half speed,
fn relative_speed(&self) -> f32;
fn set_relative_speed(&mut self, ratio: f32);
/// Reset the timeline to its initial state (used when a client reconnects)
fn reset(&mut self);
}
pub trait SyncTargetTimeline: NetworkTimeline + Default {
fn current_estimate(&self) -> TickInstant;
/// Returns true if the SyncTimelines are allowed to use this timeline as a sync target this frame
fn received_packet(&self) -> bool;
}
/// Configuration for the sync manager, which is in charge of syncing the client's tick/time with the server's tick/time
///
/// The sync manager runs only on the client and maintains two different times:
/// - the prediction tick/time: this is the client time, which runs roughly RTT/2 ahead of the server time, so that input packets
/// for tick T sent from the client arrive on the server at tick T
/// - the interpolation tick/time: this is the interpolation timeline, which runs behind the server time so that interpolation
/// always has at least one packet to interpolate towards
#[derive(Clone, Copy, Debug, Reflect)]
pub struct SyncConfig {
/// How much multiple of jitter do we apply as margin when computing the time
/// a packet will get received by the server
/// (worst case will be RTT / 2 + jitter * multiple_margin + jitter_margin)
/// % of packets that will be received within k * jitter
/// 1: 65%, 2: 95%, 3: 99.7%
pub jitter_multiple: u8,
/// Fixed safety margin added on top of the jitter-derived one, expressed as a
/// fractional number of ticks.
///
/// The default is `1.0`, which guarantees the client timeline lands at least
/// one full tick ahead of the server timeline under zero-RTT/zero-jitter
/// conditions (e.g. loopback). Any less than `1.0` risks the server
/// simulating tick `T` before the client's input for `T` has arrived,
/// producing a desync in deterministic replication.
pub jitter_margin: f32,
/// Number of pings to exchange with the server before finalizing the handshake
pub handshake_pings: u8,
/// Error margin for upstream throttle (in multiple of ticks)
pub error_margin: f32,
/// If the error margin is too big, we snap the prediction/interpolation time to the objective value
pub max_error_margin: f32,
/// How many consecutive errors have we seen that are in the same direction
pub consecutive_errors: u8,
/// Sign of the previous error
pub previous_error_sign: bool,
/// How many consecutive errors are allowed before we start adjusting the speed
pub consecutive_errors_threshold: u8,
// TODO: instead of constant speedup_factor, the speedup should be linear w.r.t the offset
/// By how much should we speed up the simulation to make ticks stay in sync with server?
pub speedup_factor: f32,
}
impl SyncConfig {
/// Total jitter-based safety margin as a `Duration`, combining the measured
/// jitter scaled by [`Self::jitter_multiple`] with the fixed fractional-tick
/// margin in [`Self::jitter_margin`].
pub fn jitter_margin(&self, jitter: Duration, tick_duration: Duration) -> Duration {
jitter * self.jitter_multiple as u32 + tick_duration.mul_f32(self.jitter_margin)
}
}
impl Default for SyncConfig {
fn default() -> Self {
SyncConfig {
jitter_multiple: 4,
jitter_margin: 1.0,
handshake_pings: 3,
error_margin: 1.0,
max_error_margin: 10.0,
consecutive_errors: 0,
previous_error_sign: true,
consecutive_errors_threshold: 3,
speedup_factor: 1.05,
}
}
}
#[derive(Debug, Reflect)]
pub struct SyncContext {
/// How many consecutive errors have we seen that are in the same direction
pub consecutive_errors: u8,
/// Sign of the previous error
pub previous_error_sign: bool,
}
impl Default for SyncContext {
fn default() -> Self {
Self {
consecutive_errors: 0,
previous_error_sign: true,
}
}
}
#[derive(Debug)]
pub enum SyncAdjustment {
Resync,
SpeedAdjust(f32),
DoNothing,
}
impl SyncContext {
pub fn speed_adjustment(&mut self, config: &SyncConfig, offset: f32) -> SyncAdjustment {
let current_error_sign = offset.is_sign_positive();
let previous_error_sign = self.previous_error_sign;
self.previous_error_sign = current_error_sign;
if offset.abs() > config.max_error_margin {
self.consecutive_errors = 0;
SyncAdjustment::Resync
} else if offset.abs() > config.error_margin {
self.consecutive_errors = self.consecutive_errors.saturating_add(1);
// skip if we haven't seen enough consecutive errors in the same direction
if (current_error_sign ^ previous_error_sign)
|| self.consecutive_errors < config.consecutive_errors_threshold
{
self.previous_error_sign = current_error_sign;
return SyncAdjustment::DoNothing;
}
let base_factor = config.speedup_factor - 1.0;
let error_ratio = (offset.abs() / config.max_error_margin).clamp(0.0, 1.0);
// Apply progressively stronger adjustment as error increases
let adjustment = 1.0 + (base_factor * error_ratio * 2.0);
// Slow down if we are ahead
let ratio = if offset > 0.0 {
1.0 / adjustment
} else {
adjustment
};
SyncAdjustment::SpeedAdjust(ratio)
} else {
self.consecutive_errors = 0;
SyncAdjustment::DoNothing
}
}
}
/// Plugin to sync the Synced timeline to the Remote timeline
///
/// The const generic is used to indicate if the timeline is driving/updating the [`Time<Virtual>`] and [`LocalTimeline`].
pub struct SyncedTimelinePlugin<Synced, Remote, const DRIVING: bool = false> {
pub(crate) _marker: core::marker::PhantomData<(Synced, Remote)>,
}
impl<Synced: SyncedTimeline, Remote: SyncTargetTimeline, const DRIVING: bool>
SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
/// On connection, reset the Synced timeline.
pub(crate) fn handle_connect(
trigger: On<Add, Connected>,
local_timeline: Res<LocalTimeline>,
mut query: Query<&mut Synced>,
) {
let local_tick = local_timeline.tick();
if let Ok(mut timeline) = query.get_mut(trigger.entity) {
timeline.reset();
if DRIVING {
trace!("Set Driving timeline tick to LocalTimeline");
let delta = local_tick - timeline.tick();
timeline.apply_delta(delta.into());
}
}
}
/// For HostClient, we directly set IsSynced on connection (since there is no messages exchanged) and the
/// Synced timeline is always the same as the Remote timeline
pub(crate) fn handle_host_client(trigger: On<Add, HostClient>, mut commands: Commands) {
commands
.entity(trigger.entity)
.insert(IsSynced::<Synced>::default());
}
/// On disconnection, remove IsSynced.
pub(crate) fn handle_disconnect(trigger: On<Add, Disconnected>, mut commands: Commands) {
commands.entity(trigger.entity).remove::<IsSynced<Synced>>();
}
/// Synchronize the driving timeline's phase with the local simulation phase
/// derived from [`LocalTimeline`] and [`Time<Fixed>`].
///
/// This runs once per frame before `sync_timelines` for driving timelines.
pub(crate) fn sync_from_local_timeline(
local_timeline: Res<LocalTimeline>,
fixed_time: Res<Time<Fixed>>,
mut query: Query<&mut Synced>,
) {
let local_tick = local_timeline.tick();
let overstep = fixed_time.overstep_fraction();
query.iter_mut().for_each(|mut synced| {
// Desired phase: LocalTimeline.tick + Time<Fixed> overstep.
synced.set_now(TickInstant::from_tick_and_overstep(
local_tick,
Overstep::from_f32(overstep),
));
trace!(
target: "lightyear_debug::timeline",
kind = "sync_from_local_timeline",
schedule = "PostUpdate",
sample_point = "PostUpdate",
timeline = ?DebugName::type_name::<Synced>(),
local_tick = local_tick.0,
timeline_tick = synced.tick().0,
overstep,
"driving timeline synced from LocalTimeline"
);
});
}
pub(crate) fn update_virtual_time(
mut virtual_time: ResMut<Time<Virtual>>,
query: Query<&Synced, (With<IsSynced<Synced>>, With<Connected>, Without<HostClient>)>,
) {
if let Ok(timeline) = query.single() {
trace!(
"Timeline {} sets the virtual time relative speed to {}",
DebugName::type_name::<Synced>(),
timeline.relative_speed()
);
trace!(
target: "lightyear_debug::sync",
kind = "relative_speed",
schedule = "Last",
sample_point = "Last",
timeline = ?DebugName::type_name::<Synced>(),
tick = timeline.tick().0,
relative_speed = timeline.relative_speed(),
"timeline relative speed applied to Virtual time"
);
// TODO: be able to apply the speed_ratio on top of any speed ratio already applied by the user.
virtual_time.set_relative_speed(timeline.relative_speed());
}
}
/// Sync timeline T to timeline [`RemoteTimeline`](super::remote::RemoteTimeline) by either
/// - speeding up/slowing down the timeline T to match timeline Remote
/// - emitting a [`SyncEvent<T>`]
pub(crate) fn sync_timelines(
tick_duration: Res<TickDuration>,
mut commands: Commands,
mut query: Query<
(
Entity,
&mut Synced,
&Synced::Config,
&Remote,
&PingManager,
Has<IsSynced<Synced>>,
),
(With<Connected>, Without<HostClient>),
>,
) {
// TODO: return early if we haven't received any remote packets? (nothing to sync to)
query.iter_mut().for_each(
|(entity, mut sync_timeline, config, main_timeline, ping_manager, has_is_synced)| {
trace!(
?entity,
?has_is_synced,
"In SyncTimelines from {:?} to {:?}",
DebugName::type_name::<Synced>(),
DebugName::type_name::<Remote>()
);
// return early if the remote timeline hasn't received any packets
if !main_timeline.received_packet() {
trace!(
target: "lightyear_debug::sync",
kind = "sync_skipped_no_packet",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
timeline = ?DebugName::type_name::<Synced>(),
remote_timeline = ?DebugName::type_name::<Remote>(),
timeline_tick = sync_timeline.tick().0,
"sync skipped because remote timeline received no packet"
);
return;
}
if !has_is_synced && sync_timeline.is_synced() {
debug!(
"Timeline {:?} is synced to {:?}",
DebugName::type_name::<Synced>(),
DebugName::type_name::<Remote>()
);
commands
.entity(entity)
.insert(IsSynced::<Synced>::default());
trace!(
target: "lightyear_debug::sync",
kind = "timeline_synced",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
timeline = ?DebugName::type_name::<Synced>(),
remote_timeline = ?DebugName::type_name::<Remote>(),
timeline_tick = sync_timeline.tick().0,
"timeline marked synced"
);
}
let before_now = sync_timeline.now();
let remote_estimate = main_timeline.current_estimate();
if let Some(tick_delta) =
sync_timeline.sync(main_timeline, config, ping_manager, tick_duration.0)
{
trace!(
target: "lightyear_debug::sync",
kind = "sync_adjustment",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
timeline = ?DebugName::type_name::<Synced>(),
remote_timeline = ?DebugName::type_name::<Remote>(),
timeline_tick = sync_timeline.tick().0,
remote_tick = main_timeline.tick().0,
before = ?before_now,
after = ?sync_timeline.now(),
remote_estimate = ?remote_estimate,
tick_delta,
relative_speed = sync_timeline.relative_speed(),
rtt_ms = ping_manager.rtt().as_secs_f64() * 1000.0,
jitter_ms = ping_manager.jitter().as_secs_f64() * 1000.0,
"timeline sync emitted SyncEvent"
);
// if it's the driving pipeline, also update the LocalTimeline in `handle_sync_event`
commands.trigger(SyncEvent::<Synced::Config>::new(entity, tick_delta));
} else {
trace!(
target: "lightyear_debug::sync",
kind = "sync_sample",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
timeline = ?DebugName::type_name::<Synced>(),
remote_timeline = ?DebugName::type_name::<Remote>(),
timeline_tick = sync_timeline.tick().0,
remote_tick = main_timeline.tick().0,
before = ?before_now,
after = ?sync_timeline.now(),
remote_estimate = ?remote_estimate,
relative_speed = sync_timeline.relative_speed(),
rtt_ms = ping_manager.rtt().as_secs_f64() * 1000.0,
jitter_ms = ping_manager.jitter().as_secs_f64() * 1000.0,
"timeline sync sampled"
);
}
},
)
}
pub(crate) fn handle_sync_event(
trigger: On<SyncEvent<Synced::Config>>,
mut local_timeline: ResMut<LocalTimeline>,
) {
local_timeline.apply_delta(trigger.tick_delta);
let new_tick = local_timeline.tick();
debug!(
tick_delta = ?trigger.tick_delta,
?new_tick,
"Apply delta to LocalTimeline from driving pipeline {:?}'s SyncEvent", DebugName::type_name::<Synced>()
);
trace!(
target: "lightyear_debug::sync",
kind = "sync_event_apply",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?trigger.entity,
timeline = ?DebugName::type_name::<Synced>(),
tick_delta = trigger.tick_delta,
local_tick = new_tick.0,
"applied SyncEvent to LocalTimeline"
);
}
}
impl<Synced, Remote, const DRIVING: bool> Default
for SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
fn default() -> Self {
Self {
_marker: core::marker::PhantomData,
}
}
}
impl<Synced: SyncedTimeline, Remote: SyncTargetTimeline, const DRIVING: bool> Plugin
for SyncedTimelinePlugin<Synced, Remote, DRIVING>
{
fn build(&self, app: &mut App) {
app.add_plugins(NetworkTimelinePlugin::<Synced>::default());
app.register_required_components::<Synced, PingManager>();
app.register_required_components::<Synced, Remote>();
app.add_observer(Self::handle_connect);
app.add_observer(Self::handle_host_client);
app.add_observer(Self::handle_disconnect);
// NOTE: we don't have to run this in PostUpdate, we could run this right after RunFixedMainLoop?
app.add_systems(PostUpdate, Self::sync_timelines.in_set(SyncSystems::Sync));
if DRIVING {
app.add_systems(
PostUpdate,
Self::sync_from_local_timeline
.in_set(SyncSystems::Sync)
.before(Self::sync_timelines),
);
app.add_systems(Last, Self::update_virtual_time);
app.add_observer(Self::handle_sync_event);
}
}
}