1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use crate::ping::PingChannel;
use crate::ping::manager::PingManager;
use crate::ping::message::{Ping, Pong};
use bevy_app::{App, Plugin, PostUpdate, PreUpdate};
use bevy_ecs::prelude::*;
use bevy_time::{Real, Time};
use core::time::Duration;
use lightyear_connection::client::Connected;
use lightyear_connection::direction::NetworkDirection;
use lightyear_connection::host::HostClient;
use lightyear_core::tick::TickDuration;
use lightyear_core::time::Instant;
use lightyear_core::time::TickDelta;
use lightyear_link::Link;
use lightyear_messages::plugin::MessageSystems;
use lightyear_messages::prelude::AppMessageExt;
use lightyear_messages::receive::MessageReceiver;
use lightyear_messages::send::MessageSender;
use lightyear_transport::plugin::PacketAcked;
use lightyear_transport::prelude::{AppChannelExt, ChannelMode, ChannelSettings};
#[allow(unused_imports)]
use tracing::{info, trace};
#[deprecated(note = "Use PingSystems instead")]
pub type PingSet = PingSystems;
#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum PingSystems {
/// Receive messages from the Link and buffer them into the ChannelReceivers
Receive,
/// Flush the messages buffered in the ChannelSenders to the Link
Send,
}
pub struct PingPlugin;
impl PingPlugin {
fn receive(
real_time: Res<Time<Real>>,
tick_duration: Res<TickDuration>,
mut query: Query<
(
&mut Link,
&mut PingManager,
&mut MessageReceiver<Ping>,
&mut MessageReceiver<Pong>,
),
(With<Connected>, Without<HostClient>),
>,
) {
query
.par_iter_mut()
.for_each(|(mut link, mut m, mut ping_receiver, mut pong_receiver)| {
// update
m.update(&real_time);
// receive pings
ping_receiver.receive().for_each(|ping| {
m.buffer_pending_pong(&ping, Instant::now());
});
// receive pongs
pong_receiver.receive().for_each(|pong| {
// process the pong
m.process_pong(&pong, Instant::now(), tick_duration.0);
});
link.stats.rtt = m.rtt();
link.stats.jitter = m.jitter();
})
}
/// Send pings/pongs to the remote
/// We modify the pongs that were buffered so that we can write the correct
/// time spent between PostUpdate and PreUpdate
fn send(
tick_duration: Res<TickDuration>,
mut query: Query<
(
Entity,
&mut PingManager,
&mut MessageSender<Ping>,
&mut MessageSender<Pong>,
),
(With<Connected>, Without<HostClient>),
>,
) {
let now = Instant::now();
// NOTE: the real_time.last_update() is the time from the Render World! It seems like it cannot be compared directly
// with the time from Instant::now(), so we stick to only using Instant::now() for now.
// let Some(frame_start) = real_time.last_update() else {
// return
// };
// let frame_time = now - frame_start;
query
.par_iter_mut()
.for_each(|(entity, mut m, mut ping_sender, mut pong_sender)| {
// send the pings
if let Some(ping) = m.maybe_prepare_ping(now) {
trace!(
target: "lightyear_debug::sync",
kind = "ping_message_send",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
ping_id = ping.id.0,
"queued ping message"
);
ping_sender.send::<PingChannel>(ping);
}
// prepare the pong messages with the correct send time
m.take_pending_pongs()
.into_iter()
.for_each(|(mut pong, ping_receive_time)| {
pong.frame_time =
TickDelta::from_duration(now - ping_receive_time, tick_duration.0)
.into();
trace!(?now, ?ping_receive_time, ?pong, "Sending pong");
trace!(
target: "lightyear_debug::sync",
kind = "pong_send",
schedule = "PostUpdate",
sample_point = "PostUpdate",
entity = ?entity,
ping_id = pong.ping_id.0,
frame_time_ticks = ?pong.frame_time,
"queued pong message"
);
// TODO: maybe include the tick + overstep in every packet?
// TODO: how to use the overstep?
// pong.overstep = fixed_time.overstep_fraction();
pong_sender.send::<PingChannel>(pong);
});
})
}
/// On connection, reset the PingManager.
pub(crate) fn handle_connect(trigger: On<Add, Connected>, mut query: Query<&mut PingManager>) {
if let Ok(mut manager) = query.get_mut(trigger.entity) {
manager.reset();
}
}
pub(crate) fn process_packet_ack(trigger: On<PacketAcked>, mut query: Query<&mut PingManager>) {
if let Ok(mut manager) = query.get_mut(trigger.entity) {
manager.process_packet_rtt_sample(trigger.rtt_sample);
}
}
}
impl Plugin for PingPlugin {
fn build(&self, app: &mut App) {
app.add_channel::<PingChannel>(ChannelSettings {
// NOTE: using Sequenced is invalid if we are sharing a channel between Ping and Pong!
mode: ChannelMode::UnorderedUnreliable,
send_frequency: Duration::default(),
// we always want to include the ping in the packet
priority: f32::INFINITY,
})
.add_direction(NetworkDirection::Bidirectional);
app.register_message_to_bytes::<Ping>()
.add_direction(NetworkDirection::Bidirectional);
app.register_message_to_bytes::<Pong>()
.add_direction(NetworkDirection::Bidirectional);
// NOTE: the Transport's PacketBuilder needs accurate LinkStats to function correctly.
// Theoretically anything can modify the LinkStats but in practice it's done in the PingManager
// so we make the Transport require a PingManager.
// Maybe we should error if TransportPlugin is added without PingPlugin?
// We used to have Client -> InputTimeline -> PingManager -> MessageSender<Ping> -> MessageManager -> Transport -> [Link, LocalTimeline]
// but it is not possible anymore since we also have a Transport -> PingManager dependency and cyclic dependencies are not allowed anymore.
//
// So we removed Transport -> PingManager dependency and hope that PingManager will always be added to entities that have a Transport...
// app.register_required_components::<Transport, PingManager>();
#[cfg(feature = "server")]
app.register_required_components::<lightyear_connection::prelude::server::ClientOf, PingManager>();
app.configure_sets(
PreUpdate,
(MessageSystems::Receive, PingSystems::Receive).chain(),
);
app.configure_sets(
PostUpdate,
(PingSystems::Send, MessageSystems::Send).chain(),
);
app.add_systems(PreUpdate, Self::receive.in_set(PingSystems::Receive));
app.add_systems(PostUpdate, Self::send.in_set(PingSystems::Send));
app.add_observer(Self::handle_connect);
app.add_observer(Self::process_packet_ack);
}
}