1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
use std::collections::{vec_deque::Drain, VecDeque};
use crate::{
frame_info::PlayerInput,
network::{
messages::ConnectionStatus,
protocol::{Event, UdpProtocol},
},
sessions::builder::MAX_EVENT_QUEUE_SIZE,
Config, Frame, GgrsError, GgrsEvent, GgrsRequest, InputStatus, NetworkStats, NonBlockingSocket,
SessionState, NULL_FRAME,
};
// The amount of frames the spectator advances in a single step if not too far behind
const NORMAL_SPEED: usize = 1;
// The amount of inputs a spectator can buffer (a second worth of inputs)
pub(crate) const SPECTATOR_BUFFER_SIZE: usize = 60;
/// [`SpectatorSession`] provides all functionality to connect to a remote host in a peer-to-peer fashion.
/// The host will broadcast all confirmed inputs to this session.
/// This session can be used to spectate a session without contributing to the game input.
pub struct SpectatorSession<T>
where
T: Config,
{
state: SessionState,
num_players: usize,
inputs: Vec<Vec<PlayerInput<T::Input>>>,
host_connect_status: Vec<ConnectionStatus>,
socket: Box<dyn NonBlockingSocket<T::Address>>,
host: UdpProtocol<T>,
event_queue: VecDeque<GgrsEvent<T>>,
current_frame: Frame,
last_recv_frame: Frame,
max_frames_behind: usize,
catchup_speed: usize,
}
impl<T: Config> SpectatorSession<T> {
/// Creates a new [`SpectatorSession`] for a spectator.
/// The session will receive inputs from all players from the given host directly.
/// The session will use the provided socket.
pub(crate) fn new(
num_players: usize,
socket: Box<dyn NonBlockingSocket<T::Address>>,
host: UdpProtocol<T>,
max_frames_behind: usize,
catchup_speed: usize,
) -> Self {
// host connection status
let mut host_connect_status = Vec::new();
for _ in 0..num_players {
host_connect_status.push(ConnectionStatus::default());
}
Self {
state: SessionState::Synchronizing,
num_players,
inputs: vec![
vec![PlayerInput::blank_input(NULL_FRAME); num_players];
SPECTATOR_BUFFER_SIZE
],
host_connect_status,
socket,
host,
event_queue: VecDeque::new(),
current_frame: NULL_FRAME,
last_recv_frame: NULL_FRAME,
max_frames_behind,
catchup_speed,
}
}
/// Returns the current [`SessionState`] of a session.
pub fn current_state(&self) -> SessionState {
self.state
}
/// Returns the number of frames behind the host
pub fn frames_behind_host(&self) -> usize {
let diff = self.last_recv_frame - self.current_frame;
assert!(diff >= 0);
diff as usize
}
/// Used to fetch some statistics about the quality of the network connection.
/// # Errors
/// - Returns [`NotSynchronized`] if the session is not connected to other clients yet.
///
/// [`NotSynchronized`]: GgrsError::NotSynchronized
pub fn network_stats(&self) -> Result<NetworkStats, GgrsError> {
self.host.network_stats()
}
/// Returns all events that happened since last queried for events. If the number of stored events exceeds `MAX_EVENT_QUEUE_SIZE`, the oldest events will be discarded.
pub fn events(&mut self) -> Drain<GgrsEvent<T>> {
self.event_queue.drain(..)
}
/// You should call this to notify GGRS that you are ready to advance your gamestate by a single frame.
/// Returns an order-sensitive [`Vec<GgrsRequest>`]. You should fulfill all requests in the exact order they are provided.
/// Failure to do so will cause panics later.
/// # Errors
/// - Returns [`NotSynchronized`] if the session is not yet ready to accept input.
/// In this case, you either need to start the session or wait for synchronization between clients.
///
/// [`Vec<GgrsRequest>`]: GgrsRequest
/// [`NotSynchronized`]: GgrsError::NotSynchronized
pub fn advance_frame(&mut self) -> Result<Vec<GgrsRequest<T>>, GgrsError> {
// receive info from host, trigger events and send messages
self.poll_remote_clients();
if self.state != SessionState::Running {
return Err(GgrsError::NotSynchronized);
}
let mut requests = Vec::new();
let frames_to_advance = if self.frames_behind_host() > self.max_frames_behind {
self.catchup_speed
} else {
NORMAL_SPEED
};
for _ in 0..frames_to_advance {
// get inputs for the next frame
let frame_to_grab = self.current_frame + 1;
let synced_inputs = self.inputs_at_frame(frame_to_grab)?;
requests.push(GgrsRequest::AdvanceFrame {
inputs: synced_inputs,
});
// advance the frame, but only if grabbing the inputs succeeded
self.current_frame += 1;
}
Ok(requests)
}
/// Receive UDP packages, distribute them to corresponding UDP endpoints, handle all occurring events and send all outgoing UDP packages.
/// Should be called periodically by your application to give GGRS a chance to do internal work like packet transmissions.
pub fn poll_remote_clients(&mut self) {
// Get all udp packets and distribute them to associated endpoints.
// The endpoints will handle their packets, which will trigger both events and UPD replies.
for (from, msg) in &self.socket.receive_all_messages() {
if self.host.is_handling_message(from) {
self.host.handle_message(msg);
}
}
// run host poll and get events. This will trigger additional UDP packets to be sent.
let mut events = VecDeque::new();
let addr = self.host.peer_addr();
for event in self.host.poll(&self.host_connect_status) {
events.push_back((event, addr.clone()));
}
// handle all events locally
for (event, addr) in events.drain(..) {
self.handle_event(event, addr);
}
// send out all pending UDP messages
self.host.send_all_messages(&mut self.socket);
}
/// Returns the number of players this session was constructed with.
pub fn num_players(&self) -> usize {
self.num_players
}
fn inputs_at_frame(
&self,
frame_to_grab: Frame,
) -> Result<Vec<(T::Input, InputStatus)>, GgrsError> {
let player_inputs = &self.inputs[frame_to_grab as usize % SPECTATOR_BUFFER_SIZE];
// We haven't received the input from the host yet. Wait.
if player_inputs[0].frame < frame_to_grab {
return Err(GgrsError::PredictionThreshold);
}
// The host is more than [`SPECTATOR_BUFFER_SIZE`] frames ahead of the spectator. The input we need is gone forever.
if player_inputs[0].frame > frame_to_grab {
return Err(GgrsError::SpectatorTooFarBehind);
}
Ok(player_inputs
.iter()
.enumerate()
.map(|(handle, player_input)| {
if self.host_connect_status[handle].disconnected
&& self.host_connect_status[handle].last_frame < frame_to_grab
{
(player_input.input, InputStatus::Disconnected)
} else {
(player_input.input, InputStatus::Confirmed)
}
})
.collect())
}
fn handle_event(&mut self, event: Event<T>, addr: T::Address) {
match event {
// forward to user
Event::Synchronizing { total, count } => {
self.event_queue
.push_back(GgrsEvent::Synchronizing { addr, total, count });
}
// forward to user
Event::NetworkInterrupted { disconnect_timeout } => {
self.event_queue.push_back(GgrsEvent::NetworkInterrupted {
addr,
disconnect_timeout,
});
}
// forward to user
Event::NetworkResumed => {
self.event_queue
.push_back(GgrsEvent::NetworkResumed { addr });
}
// synced with the host, then forward to user
Event::Synchronized => {
self.state = SessionState::Running;
self.event_queue.push_back(GgrsEvent::Synchronized { addr });
}
// disconnect the player, then forward to user
Event::Disconnected => {
self.event_queue.push_back(GgrsEvent::Disconnected { addr });
}
// add the input and all associated information
Event::Input { input, player } => {
// save the input
self.inputs[input.frame as usize % SPECTATOR_BUFFER_SIZE][player] = input;
assert!(input.frame >= self.last_recv_frame);
self.last_recv_frame = input.frame;
// update the frame advantage
self.host.update_local_frame_advantage(input.frame);
// update the host connection status
for i in 0..self.num_players {
self.host_connect_status[i] = self.host.peer_connect_status(i);
}
}
}
// check event queue size and discard oldest events if too big
while self.event_queue.len() > MAX_EVENT_QUEUE_SIZE {
self.event_queue.pop_front();
}
}
}