1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
use std::collections::{vec_deque::Drain, VecDeque};
use crate::{
frame_info::PlayerInput,
network::{
messages::ConnectionStatus,
protocol::{Event, UdpProtocol},
},
sessions::builder::{MAX_EVENT_QUEUE_SIZE, SPECTATOR_BUFFER_SIZE},
Config, Frame, GgrsError, GgrsEvent, GgrsRequest, InputStatus, NetworkStats, NonBlockingSocket,
SessionState, NULL_FRAME,
};
// The amount of frames the spectator advances in a single step if not too far behind
const NORMAL_SPEED: usize = 1;
/// Connects to a remote host in a peer-to-peer fashion without contributing input.
///
/// The host will broadcast all confirmed inputs to this session, allowing it to
/// replay the game as a spectator.
pub struct SpectatorSession<T>
where
T: Config,
{
state: SessionState,
num_players: usize,
inputs: Vec<Vec<PlayerInput<T::Input>>>,
host_connect_status: Vec<ConnectionStatus>,
socket: Box<dyn NonBlockingSocket<T::Address>>,
host: UdpProtocol<T>,
event_queue: VecDeque<GgrsEvent<T>>,
current_frame: Frame,
last_recv_frame: Frame,
max_frames_behind: usize,
catchup_speed: usize,
}
impl<T: Config> SpectatorSession<T> {
/// Creates a new [`SpectatorSession`] for a spectator.
/// The session will receive inputs from all players from the given host directly.
/// The session will use the provided socket.
pub(crate) fn new(
num_players: usize,
socket: Box<dyn NonBlockingSocket<T::Address>>,
host: UdpProtocol<T>,
max_frames_behind: usize,
catchup_speed: usize,
) -> Self {
// host connection status
let mut host_connect_status = Vec::new();
for _ in 0..num_players {
host_connect_status.push(ConnectionStatus::default());
}
Self {
state: SessionState::Synchronizing,
num_players,
inputs: vec![
vec![PlayerInput::blank_input(NULL_FRAME); num_players];
SPECTATOR_BUFFER_SIZE
],
host_connect_status,
socket,
host,
event_queue: VecDeque::new(),
current_frame: NULL_FRAME,
last_recv_frame: NULL_FRAME,
max_frames_behind,
catchup_speed,
}
}
/// Returns the current [`SessionState`] of a session.
pub fn current_state(&self) -> SessionState {
self.state
}
/// Returns how many confirmed frames the spectator has yet to process.
///
/// Both `last_recv_frame` and `current_frame` are initialised to `NULL_FRAME` (-1),
/// so this returns `0` before any input has arrived from the host, which is correct:
/// neither side has advanced yet. Once the session is running, the invariant
/// `last_recv_frame >= current_frame` is maintained by the session, so the result
/// is always non-negative.
pub fn frames_behind_host(&self) -> usize {
let diff = self.last_recv_frame - self.current_frame;
assert!(diff >= 0);
diff as usize
}
/// Used to fetch some statistics about the quality of the network connection.
/// # Errors
/// - Returns [`NotSynchronized`] if the endpoint has not yet started connecting.
/// - Returns [`NotEnoughData`] if less than one second has elapsed since the connection was
/// established. The session may already be [`Running`]; retry after a short delay.
///
/// [`NotSynchronized`]: GgrsError::NotSynchronized
/// [`NotEnoughData`]: GgrsError::NotEnoughData
/// [`Running`]: crate::SessionState::Running
pub fn network_stats(&self) -> Result<NetworkStats, GgrsError> {
self.host.network_stats()
}
/// Returns all events that happened since last queried for events. If the number of stored events exceeds `MAX_EVENT_QUEUE_SIZE`, the oldest events will be discarded.
pub fn events(&mut self) -> Drain<'_, GgrsEvent<T>> {
self.event_queue.drain(..)
}
/// You should call this to notify GGRS that you are ready to advance your gamestate by a single frame.
/// Returns an order-sensitive [`Vec<GgrsRequest>`]. You should fulfill all requests in the exact order they are provided.
/// Failure to do so will cause panics later.
/// # Errors
/// - Returns [`NotSynchronized`] if the session is not yet ready to accept input.
/// In this case, you either need to start the session or wait for synchronization between clients.
///
/// [`Vec<GgrsRequest>`]: GgrsRequest
/// [`NotSynchronized`]: GgrsError::NotSynchronized
pub fn advance_frame(&mut self) -> Result<Vec<GgrsRequest<T>>, GgrsError> {
// receive info from host, trigger events and send messages
self.poll_remote_clients();
if self.state != SessionState::Running {
return Err(GgrsError::NotSynchronized);
}
let mut requests = Vec::new();
let frames_to_advance = if self.frames_behind_host() > self.max_frames_behind {
self.catchup_speed
} else {
NORMAL_SPEED
};
for _ in 0..frames_to_advance {
// get inputs for the next frame
let frame_to_grab = self.current_frame + 1;
let synced_inputs = self.inputs_at_frame(frame_to_grab)?;
requests.push(GgrsRequest::AdvanceFrame {
inputs: synced_inputs,
});
// advance the frame, but only if grabbing the inputs succeeded
self.current_frame += 1;
}
Ok(requests)
}
/// Receive UDP packages, distribute them to corresponding UDP endpoints, handle all occurring events and send all outgoing UDP packages.
/// Should be called periodically by your application to give GGRS a chance to do internal work like packet transmissions.
pub fn poll_remote_clients(&mut self) {
// Get all udp packets and distribute them to associated endpoints.
// The endpoints will handle their packets, which will trigger both events and UPD replies.
for (from, msg) in &self.socket.receive_all_messages() {
if self.host.is_handling_message(from) {
self.host.handle_message(msg);
}
}
// run host poll and get events. This will trigger additional UDP packets to be sent.
let mut events = VecDeque::new();
let addr = self.host.peer_addr();
for event in self.host.poll(&self.host_connect_status) {
events.push_back((event, addr.clone()));
}
// handle all events locally
for (event, addr) in events {
self.handle_event(event, addr);
}
// send out all pending UDP messages
self.host.send_all_messages(&mut self.socket);
}
/// Returns the current frame of a session.
pub fn current_frame(&self) -> Frame {
self.current_frame
}
/// Returns the number of players this session was constructed with.
pub fn num_players(&self) -> usize {
self.num_players
}
fn inputs_at_frame(
&self,
frame_to_grab: Frame,
) -> Result<Vec<(T::Input, InputStatus)>, GgrsError> {
let player_inputs = &self.inputs[frame_to_grab as usize % SPECTATOR_BUFFER_SIZE];
// We haven't received the input from the host yet. Wait.
if player_inputs[0].frame < frame_to_grab {
return Err(GgrsError::PredictionThreshold);
}
// The host is more than [`SPECTATOR_BUFFER_SIZE`] frames ahead of the spectator. The input we need is gone forever.
if player_inputs[0].frame > frame_to_grab {
return Err(GgrsError::SpectatorTooFarBehind);
}
Ok(player_inputs
.iter()
.enumerate()
.map(|(handle, player_input)| {
if self.host_connect_status[handle].disconnected
&& self.host_connect_status[handle].last_frame < frame_to_grab
{
(player_input.input, InputStatus::Disconnected)
} else {
(player_input.input, InputStatus::Confirmed)
}
})
.collect())
}
fn handle_event(&mut self, event: Event<T>, addr: T::Address) {
match event {
// forward to user
Event::Synchronizing { total, count } => {
self.event_queue
.push_back(GgrsEvent::Synchronizing { addr, total, count });
}
// forward to user
Event::NetworkInterrupted { disconnect_timeout } => {
self.event_queue.push_back(GgrsEvent::NetworkInterrupted {
addr,
disconnect_timeout,
});
}
// forward to user
Event::NetworkResumed => {
self.event_queue
.push_back(GgrsEvent::NetworkResumed { addr });
}
// synced with the host, then forward to user
Event::Synchronized => {
self.state = SessionState::Running;
self.event_queue.push_back(GgrsEvent::Synchronized { addr });
}
// disconnect the player, then forward to user
Event::Disconnected => {
self.event_queue.push_back(GgrsEvent::Disconnected { addr });
}
// add the input and all associated information
Event::Input { input, player } => {
// save the input
self.inputs[input.frame as usize % SPECTATOR_BUFFER_SIZE][player] = input;
assert!(input.frame >= self.last_recv_frame);
self.last_recv_frame = input.frame;
// update the frame advantage
self.host.update_local_frame_advantage(input.frame);
// update the host connection status
for i in 0..self.num_players {
self.host_connect_status[i] = self.host.peer_connect_status(i);
}
}
}
// check event queue size and discard oldest events if too big
while self.event_queue.len() > MAX_EVENT_QUEUE_SIZE {
self.event_queue.pop_front();
}
}
}