use crate::{world_snapshot::WorldSnapshot, SessionType};
use bevy::{prelude::*, reflect::TypeRegistry};
use ggrs::{
Config, GGRSError, GGRSRequest, GameStateCell, InputStatus, P2PSession, PlayerHandle,
SessionState, SpectatorSession, SyncTestSession,
};
use instant::{Duration, Instant};
pub(crate) struct GGRSStage<T>
where
T: Config,
{
schedule: Schedule,
pub(crate) type_registry: TypeRegistry,
pub(crate) input_system: Box<dyn System<In = PlayerHandle, Out = T::Input>>,
snapshots: Vec<WorldSnapshot>,
update_frequency: usize,
frame: i32,
last_update: Instant,
accumulator: Duration,
run_slow: bool,
}
impl<T: Config + Send + Sync> Stage for GGRSStage<T> {
fn run(&mut self, world: &mut World) {
let delta = Instant::now().duration_since(self.last_update);
let mut fps_delta = 1. / self.update_frequency as f64;
if self.run_slow {
fps_delta *= 1.1;
}
self.accumulator = self.accumulator.saturating_add(delta);
self.last_update = Instant::now();
if let Some(mut sess) = world.get_resource_mut::<P2PSession<T>>() {
sess.poll_remote_clients();
}
if let Some(mut sess) = world.get_resource_mut::<SpectatorSession<T>>() {
sess.poll_remote_clients();
}
while self.accumulator.as_secs_f64() > fps_delta {
self.accumulator = self
.accumulator
.saturating_sub(Duration::from_secs_f64(fps_delta));
let session_type = world.get_resource::<SessionType>();
match session_type {
Some(SessionType::SyncTestSession) => self.run_synctest(world),
Some(SessionType::P2PSession) => self.run_p2p(world),
Some(SessionType::SpectatorSession) => self.run_spectator(world),
None => self.reset(), }
}
}
}
impl<T: Config> GGRSStage<T> {
pub(crate) fn new(input_system: Box<dyn System<In = PlayerHandle, Out = T::Input>>) -> Self {
Self {
schedule: Schedule::default(),
type_registry: TypeRegistry::default(),
input_system,
snapshots: Vec::new(),
frame: 0,
update_frequency: 60,
last_update: Instant::now(),
accumulator: Duration::ZERO,
run_slow: false,
}
}
pub(crate) fn reset(&mut self) {
self.last_update = Instant::now();
self.accumulator = Duration::ZERO;
self.frame = 0;
self.run_slow = false;
self.snapshots = Vec::new();
}
pub(crate) fn run_synctest(&mut self, world: &mut World) {
let sess = world.get_resource::<SyncTestSession<T>>().expect(
"No GGRS SyncTestSession found. Please start a session and add it as a resource.",
);
if self.snapshots.is_empty() {
for _ in 0..sess.max_prediction() {
self.snapshots.push(WorldSnapshot::default());
}
}
let mut inputs = Vec::new();
for handle in 0..sess.num_players() as usize {
inputs.push(self.input_system.run(handle, world));
}
let mut sess = world.get_resource_mut::<SyncTestSession<T>>().expect(
"No GGRS SyncTestSession found. Please start a session and add it as a resource.",
);
for (player_handle, &input) in inputs.iter().enumerate() {
sess.add_local_input(player_handle, input)
.expect("All handles between 0 and num_players should be valid");
}
match sess.advance_frame() {
Ok(requests) => self.handle_requests(requests, world),
Err(e) => warn!("{}", e),
}
}
pub(crate) fn run_spectator(&mut self, world: &mut World) {
let mut sess = world.get_resource_mut::<SpectatorSession<T>>().expect(
"No GGRS P2PSpectatorSession found. Please start a session and add it as a resource.",
);
if sess.current_state() == SessionState::Running {
match sess.advance_frame() {
Ok(requests) => self.handle_requests(requests, world),
Err(GGRSError::PredictionThreshold) => {
info!("P2PSpectatorSession: Waiting for input from host.")
}
Err(e) => warn!("{}", e),
};
}
}
pub(crate) fn run_p2p(&mut self, world: &mut World) {
let sess = world
.get_resource::<P2PSession<T>>()
.expect("No GGRS P2PSession found. Please start a session and add it as a resource.");
if self.snapshots.is_empty() {
for _ in 0..sess.max_prediction() {
self.snapshots.push(WorldSnapshot::default());
}
}
self.run_slow = sess.frames_ahead() > 0;
let local_handles = sess.local_player_handles();
let mut local_inputs = Vec::new();
for &local_handle in &local_handles {
let input = self.input_system.run(local_handle, world);
local_inputs.push(input);
}
let mut sess = world
.get_resource_mut::<P2PSession<T>>()
.expect("No GGRS P2PSession found. Please start a session and add it as a resource.");
if sess.current_state() == SessionState::Running {
for i in 0..local_inputs.len() {
sess.add_local_input(local_handles[i], local_inputs[i])
.expect("All handles in local_handles should be valid");
}
match sess.advance_frame() {
Ok(requests) => self.handle_requests(requests, world),
Err(GGRSError::PredictionThreshold) => {
info!("Skipping a frame: PredictionThreshold.")
}
Err(e) => warn!("{}", e),
};
}
}
pub(crate) fn handle_requests(&mut self, requests: Vec<GGRSRequest<T>>, world: &mut World) {
for request in requests {
match request {
GGRSRequest::SaveGameState { cell, frame } => self.save_world(cell, frame, world),
GGRSRequest::LoadGameState { frame, .. } => self.load_world(frame, world),
GGRSRequest::AdvanceFrame { inputs } => self.advance_frame(inputs, world),
}
}
}
pub(crate) fn save_world(
&mut self,
cell: GameStateCell<T::State>,
frame: i32,
world: &mut World,
) {
assert_eq!(self.frame, frame);
let snapshot = WorldSnapshot::from_world(world, &self.type_registry);
cell.save(self.frame, None, Some(snapshot.checksum as u128));
let pos = frame as usize % self.snapshots.len();
self.snapshots[pos] = snapshot;
}
pub(crate) fn load_world(&mut self, frame: i32, world: &mut World) {
self.frame = frame;
let pos = frame as usize % self.snapshots.len();
let snapshot_to_load = &self.snapshots[pos];
snapshot_to_load.write_to_world(world, &self.type_registry);
}
pub(crate) fn advance_frame(
&mut self,
inputs: Vec<(T::Input, InputStatus)>,
world: &mut World,
) {
world.insert_resource(inputs);
self.schedule.run_once(world);
world.remove_resource::<Vec<(T::Input, InputStatus)>>();
self.frame += 1;
}
pub(crate) fn set_update_frequency(&mut self, update_frequency: usize) {
self.update_frequency = update_frequency
}
pub(crate) fn set_schedule(&mut self, schedule: Schedule) {
self.schedule = schedule;
}
pub(crate) fn set_type_registry(&mut self, type_registry: TypeRegistry) {
self.type_registry = type_registry;
}
}