1use crate::ping::manager::PingManager;
2use crate::timeline::sync::{
3 SyncAdjustment, SyncConfig, SyncContext, SyncTargetTimeline, SyncedTimeline,
4};
5
6use bevy_derive::{Deref, DerefMut};
7use bevy_ecs::prelude::*;
8use bevy_reflect::Reflect;
9use core::time::Duration;
10use lightyear_core::tick::{Tick, TickDuration};
11use lightyear_core::time::{TickDelta, TickInstant};
12use lightyear_core::timeline::{NetworkTimeline, SyncEvent, Timeline, TimelineConfig};
13use lightyear_link::{Link, LinkStats};
14use tracing::trace;
15
16#[derive(Debug, Component, Reflect)]
19#[require(InputTimeline)]
20pub struct InputTimelineConfig {
21 pub(crate) sync: SyncConfig,
22 pub(crate) input_delay_config: InputDelayConfig,
23}
24
25impl InputTimelineConfig {
26 pub fn new(sync_config: SyncConfig, input_delay: InputDelayConfig) -> Self {
27 Self {
28 sync: sync_config,
29 input_delay_config: input_delay,
30 }
31 }
32
33 pub fn with_input_delay(mut self, input_delay: InputDelayConfig) -> Self {
34 self.input_delay_config = input_delay;
35 self
36 }
37
38 pub fn with_sync_config(mut self, sync_config: SyncConfig) -> Self {
39 self.sync = sync_config;
40 self
41 }
42
43 #[inline]
46 pub fn is_lockstep(&self) -> bool {
47 self.input_delay_config.is_lockstep()
48 }
49
50 pub(crate) fn recompute_input_delay_on_sync(
53 trigger: On<SyncEvent<InputTimelineConfig>>,
54 tick_duration: Res<TickDuration>,
55 mut query: Query<(&Link, &mut InputTimeline, &InputTimelineConfig)>,
56 ) {
57 if let Ok((link, mut timeline, config)) = query.get_mut(trigger.entity) {
58 let before = timeline.input_delay_ticks;
59 timeline.input_delay_ticks = config.input_delay_config.input_delay_ticks(
60 link.stats,
61 &config.sync,
62 tick_duration.0,
63 );
64 trace!(
65 "Recomputing input delay on sync event! Input delay ticks: {}",
66 timeline.input_delay_ticks
67 );
68 trace!(
69 target: "lightyear_debug::sync",
70 kind = "input_delay_recomputed_on_sync",
71 schedule = "PreUpdate",
72 sample_point = "PreUpdate",
73 entity = ?trigger.entity,
74 tick_delta = trigger.tick_delta,
75 input_delay_ticks_before = before,
76 input_delay_ticks_after = timeline.input_delay_ticks,
77 rtt_ms = link.stats.rtt.as_secs_f64() * 1000.0,
78 "sync event: recomputed input delay"
79 );
80 }
81 }
82
83 pub(crate) fn recompute_input_delay_on_config_update(
88 trigger: On<Insert, InputTimelineConfig>,
89 tick_duration: Res<TickDuration>,
90 mut query: Query<(&Link, &mut InputTimeline, &InputTimelineConfig)>,
91 ) {
92 if let Ok((link, mut timeline, config)) = query.get_mut(trigger.entity) {
93 timeline.input_delay_ticks = config.input_delay_config.input_delay_ticks(
94 link.stats,
95 &config.sync,
96 tick_duration.0,
97 );
98 trace!(
99 "Recomputing input delay on config update! Input delay ticks: {}. Config: {:?}",
100 timeline.input_delay_ticks, config.input_delay_config
101 );
102 }
103 }
104}
105
106impl Default for InputTimelineConfig {
107 fn default() -> Self {
108 Self {
109 sync: SyncConfig::default(),
110 input_delay_config: InputDelayConfig::no_input_delay(),
111 }
112 }
113}
114
115#[derive(Debug, Reflect)]
116pub struct InputContext {
117 sync: SyncContext,
118 input_delay_ticks: u16,
120 relative_speed: f32,
121 is_synced: bool,
122}
123
124impl InputContext {
125 pub fn input_delay(&self) -> u16 {
127 self.input_delay_ticks
128 }
129}
130
131impl Default for InputContext {
132 fn default() -> Self {
133 Self {
134 sync: SyncContext::default(),
135 input_delay_ticks: 0,
136 relative_speed: 1.0,
137 is_synced: false,
138 }
139 }
140}
141
142#[derive(Debug, Clone, Copy, Reflect)]
143pub struct InputDelayConfig {
144 pub minimum_input_delay_ticks: u16,
150 pub maximum_input_delay_before_prediction: u16,
163 pub maximum_predicted_ticks: u16,
173}
174
175impl InputDelayConfig {
176 pub fn balanced() -> Self {
182 Self {
183 minimum_input_delay_ticks: 0,
184 maximum_input_delay_before_prediction: 3,
185 maximum_predicted_ticks: 7,
186 }
187 }
188
189 pub fn no_input_delay() -> Self {
191 Self {
192 minimum_input_delay_ticks: 0,
193 maximum_input_delay_before_prediction: 0,
194 maximum_predicted_ticks: 100,
195 }
196 }
197
198 #[inline]
201 pub fn is_lockstep(&self) -> bool {
202 self.maximum_predicted_ticks == 0
203 }
204
205 pub fn no_prediction() -> Self {
207 Self {
208 minimum_input_delay_ticks: 0,
209 maximum_input_delay_before_prediction: 0,
210 maximum_predicted_ticks: 0,
211 }
212 }
213
214 pub fn fixed_input_delay(delay_ticks: u16) -> Self {
215 Self {
216 minimum_input_delay_ticks: delay_ticks,
217 maximum_input_delay_before_prediction: delay_ticks,
218 maximum_predicted_ticks: 100,
219 }
220 }
221
222 fn input_delay_ticks(
224 &self,
225 link_stats: LinkStats,
226 sync_config: &SyncConfig,
227 tick_interval: Duration,
228 ) -> u16 {
229 let jitter_margin = sync_config.jitter_margin(link_stats.jitter, tick_interval);
230 let effective_rtt = link_stats.rtt + jitter_margin;
231 assert!(
232 self.minimum_input_delay_ticks <= self.maximum_input_delay_before_prediction,
233 "The minimum amount of input_delay should be less than or equal to the maximum_input_delay_before_prediction"
234 );
235 let mut rtt_ticks =
236 (effective_rtt.as_nanos() as f32 / tick_interval.as_nanos() as f32).ceil() as u16;
237
238 if self.is_lockstep() {
240 rtt_ticks += 2;
242 }
243 if rtt_ticks <= self.minimum_input_delay_ticks {
245 return self.minimum_input_delay_ticks;
246 }
247 if rtt_ticks <= self.maximum_input_delay_before_prediction {
249 return rtt_ticks;
250 }
251 if rtt_ticks <= (self.maximum_predicted_ticks + self.maximum_input_delay_before_prediction)
254 {
255 self.maximum_input_delay_before_prediction
256 } else {
257 rtt_ticks - self.maximum_predicted_ticks
258 }
259 }
260}
261
262#[derive(Component, Deref, DerefMut, Default, Debug, Reflect)]
271pub struct InputTimeline(pub Timeline<InputTimelineConfig>);
272
273impl TimelineConfig for InputTimelineConfig {
274 type Context = InputContext;
275 type Timeline = InputTimeline;
276}
277
278impl SyncedTimeline for InputTimeline {
279 fn sync_objective<T: SyncTargetTimeline>(
286 &self,
287 remote: &T,
288 config: &Self::Config,
289 ping_manager: &PingManager,
290 tick_duration: Duration,
291 ) -> TickInstant {
292 let remote = remote.current_estimate();
293 let network_delay = TickDelta::from_duration(ping_manager.rtt() / 2, tick_duration);
294 let jitter_margin = TickDelta::from_duration(
295 config
296 .sync
297 .jitter_margin(ping_manager.jitter(), tick_duration),
298 tick_duration,
299 );
300 let input_delay: TickDelta = Tick(self.context.input_delay_ticks as u32).into();
301 let sync_error_margin = TickDelta::from_duration(
302 tick_duration.mul_f32(config.sync.error_margin),
303 tick_duration,
304 );
305 let obj =
314 remote + network_delay + jitter_margin + TickDelta::from_i32(1) + sync_error_margin
315 - input_delay;
316 trace!(
317 ?remote,
318 ?network_delay,
319 ?jitter_margin,
320 ?sync_error_margin,
321 ?input_delay,
322 "InputTimeline objective: {:?}",
323 obj
324 );
325 obj
326 }
327
328 fn resync(&mut self, sync_objective: TickInstant) -> i32 {
329 let now = self.now();
330 self.now = sync_objective;
331 (sync_objective - now).to_i32()
332 }
333
334 fn sync<T: SyncTargetTimeline>(
341 &mut self,
342 main: &T,
343 config: &Self::Config,
344 ping_manager: &PingManager,
345 tick_duration: Duration,
346 ) -> Option<i32> {
347 if ping_manager.latency_samples_recv() < config.sync.handshake_pings as u32 {
349 return None;
350 }
351 let now = self.now();
352 let objective = self.sync_objective(main, config, ping_manager, tick_duration);
353 let error = now - objective;
354 let error_ticks = error.to_f32();
355 let adjustment = if !self.is_synced {
356 SyncAdjustment::Resync
357 } else {
358 self.sync.speed_adjustment(&config.sync, error_ticks)
359 };
360 trace!(
361 ?now,
362 ?objective,
363 ?adjustment,
364 ?error_ticks,
365 error_margin = ?config.sync.error_margin,
366 max_error_margin = ?config.sync.max_error_margin,
367 "InputTimeline sync"
368 );
369 self.is_synced = true;
370 match adjustment {
371 SyncAdjustment::Resync => {
372 return Some(self.resync(objective));
373 }
374 SyncAdjustment::SpeedAdjust(ratio) => {
375 self.set_relative_speed(ratio);
376 }
377 SyncAdjustment::DoNothing => {
378 let current = self.relative_speed();
380 if (current - 1.0).abs() > 0.001 {
381 let new_speed = current + (1.0 - current) * 0.1;
382 self.set_relative_speed(new_speed);
383 }
384 }
385 }
386 None
387 }
388
389 fn is_synced(&self) -> bool {
390 self.is_synced
391 }
392
393 fn relative_speed(&self) -> f32 {
394 self.relative_speed
395 }
396
397 fn set_relative_speed(&mut self, ratio: f32) {
398 self.relative_speed = ratio;
399 }
400
401 fn reset(&mut self) {
402 trace!("Resetting InputTimeline");
403 self.is_synced = false;
404 self.relative_speed = 1.0;
405 self.now = Default::default();
406 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use crate::timeline::remote::RemoteTimeline;
414 use bevy_utils::default;
415 use lightyear_core::timeline::NetworkTimeline;
416
417 fn assert_tick_instant_close(actual: TickInstant, expected: TickInstant) {
418 let error = (actual - expected).to_f32().abs();
419 assert!(
420 error < 0.001,
421 "expected {expected:?}, got {actual:?}, error {error}"
422 );
423 }
424
425 #[test]
426 fn input_timeline_objective_preserves_margin_after_sync_deadband() {
427 let tick_duration = Duration::from_millis(10);
428 let mut remote = RemoteTimeline::default();
429 remote.set_now(TickInstant::from(Tick(100)));
430
431 let mut ping_manager = PingManager::default();
432 ping_manager.rtt_estimator_ewma.final_stats.rtt = Duration::from_millis(40);
433 ping_manager.rtt_estimator_ewma.final_stats.jitter = Duration::from_millis(5);
434
435 let mut config = InputTimelineConfig::default();
436 config.sync.jitter_multiple = 2;
437 config.sync.jitter_margin = 1.0;
438 config.sync.error_margin = 0.75;
439
440 let objective =
441 InputTimeline::default().sync_objective(&remote, &config, &ping_manager, tick_duration);
442
443 assert_tick_instant_close(objective, TickInstant::lit("105.75"));
446
447 let earliest_uncorrected_timeline = objective - TickDelta::lit("0.75");
448 assert_tick_instant_close(earliest_uncorrected_timeline, TickInstant::lit("105"));
452 }
453
454 #[test]
455 fn input_delay_still_offsets_input_timeline_objective() {
456 let tick_duration = Duration::from_millis(10);
457 let mut remote = RemoteTimeline::default();
458 remote.set_now(TickInstant::from(Tick(100)));
459
460 let mut ping_manager = PingManager::default();
461 ping_manager.rtt_estimator_ewma.final_stats.rtt = Duration::from_millis(40);
462 ping_manager.rtt_estimator_ewma.final_stats.jitter = Duration::from_millis(5);
463
464 let mut config =
465 InputTimelineConfig::default().with_input_delay(InputDelayConfig::fixed_input_delay(2));
466 config.sync.jitter_multiple = 2;
467 config.sync.jitter_margin = 1.0;
468 config.sync.error_margin = 0.75;
469
470 let mut timeline = InputTimeline::default();
471 timeline.context.input_delay_ticks = 2;
472
473 let objective = timeline.sync_objective(&remote, &config, &ping_manager, tick_duration);
474
475 assert_tick_instant_close(objective, TickInstant::lit("103.75"));
476 }
477
478 #[test]
488 fn sync_objective_keeps_sent_input_tick_ahead_under_worst_case_drift() {
489 let tick_duration = Duration::from_millis(10);
490 let mut remote = RemoteTimeline::default();
491 remote.set_now(TickInstant::from(Tick(100)));
492
493 let mut ping_manager = PingManager::default();
495 ping_manager.rtt_estimator_ewma.final_stats.rtt = Duration::ZERO;
496 ping_manager.rtt_estimator_ewma.final_stats.jitter = Duration::ZERO;
497
498 let mut config =
502 InputTimelineConfig::default().with_input_delay(InputDelayConfig::fixed_input_delay(2));
503 config.sync.jitter_margin = 0.5;
504 assert!(
505 config.sync.error_margin >= 1.0,
506 "test premise: error_margin is at least 1 tick"
507 );
508
509 let mut timeline = InputTimeline::default();
510 timeline.context.input_delay_ticks = 2;
511 let objective = timeline.sync_objective(&remote, &config, &ping_manager, tick_duration);
512
513 let worst_case_drift = TickDelta::from_duration(
516 tick_duration.mul_f32(config.sync.error_margin),
517 tick_duration,
518 );
519 let worst_case_local = objective - worst_case_drift;
520 let sent_input_tick = worst_case_local + TickDelta::from_i32(2);
521
522 let required_input_tick = TickInstant::from(Tick(101));
523 assert!(
524 sent_input_tick >= required_input_tick,
525 "worst-case sent input tick is {sent_input_tick:?}, but the \
526 server reads inputs in FixedPreUpdate after receiving packets in \
527 PreUpdate and advancing in FixedFirst, so the packet must contain \
528 input for at least {required_input_tick:?} (= remote + 1).",
529 );
530 }
531
532 #[test]
533 fn test_input_delay_config() {
534 let sync_config = SyncConfig::default();
535 let config_1 = InputDelayConfig {
536 minimum_input_delay_ticks: 2,
537 maximum_input_delay_before_prediction: 3,
538 maximum_predicted_ticks: 7,
539 };
540 assert_eq!(
542 config_1.input_delay_ticks(
543 LinkStats {
544 rtt: Duration::from_millis(10),
545 ..default()
546 },
547 &sync_config,
548 Duration::from_millis(16)
549 ),
550 2
551 );
552
553 assert_eq!(
555 config_1.input_delay_ticks(
556 LinkStats {
557 rtt: Duration::from_millis(60),
558 ..default()
559 },
560 &sync_config,
561 Duration::from_millis(16)
562 ),
563 3
564 );
565
566 assert_eq!(
568 config_1.input_delay_ticks(
569 LinkStats {
570 rtt: Duration::from_millis(200),
571 ..default()
572 },
573 &sync_config,
574 Duration::from_millis(16)
575 ),
576 7
577 );
578 assert_eq!(
579 config_1.input_delay_ticks(
580 LinkStats {
581 rtt: Duration::from_millis(300),
582 ..default()
583 },
584 &sync_config,
585 Duration::from_millis(16)
586 ),
587 13
588 );
589 }
590}