use crate::{
AdvanceWorld, Checksum, ConfirmedFrameCount, FixedTimestepData, LoadWorld, LocalInputs,
LocalPlayers, MaxPredictionWindow, PlayerInputs, ReadInputs, RollbackFrameCount,
RollbackFrameRate, SaveWorld, Session, SyncTestMismatch,
};
use bevy::prelude::*;
use core::time::Duration;
use ggrs::{
Config, GgrsError, GgrsRequest, P2PSession, SessionState, SpectatorSession, SyncTestSession,
};
pub(crate) fn run_ggrs_schedules<T: Config>(world: &mut World) {
let framerate: usize = **world.get_resource_or_insert_with::<RollbackFrameRate>(default);
let mut time_data = world
.remove_resource::<FixedTimestepData>()
.expect("failed to extract GGRS FixedTimeStepData");
let delta = world
.get_resource::<Time>()
.expect("Time resource not found, did you remove it?")
.delta();
let fps_delta = if time_data.run_slow {
Duration::from_nanos(1_000_000_000u64 * 11 / (framerate as u64 * 10))
} else {
Duration::from_nanos(1_000_000_000u64 / framerate as u64)
};
time_data.accumulator = time_data.accumulator.saturating_add(delta);
if let Some(mut session) = world.get_resource_mut::<Session<T>>() {
match &mut *session {
Session::P2P(session) => {
session.poll_remote_clients();
}
Session::Spectator(session) => {
session.poll_remote_clients();
}
_ => {}
}
}
while time_data.accumulator >= fps_delta {
time_data.accumulator = time_data.accumulator.saturating_sub(fps_delta);
let session = world.remove_resource::<Session<T>>();
match session {
Some(Session::SyncTest(s)) => run_synctest::<T>(world, s),
Some(Session::P2P(session)) => {
time_data.run_slow = session.frames_ahead() > 0;
run_p2p(world, session);
}
Some(Session::Spectator(s)) => run_spectator(world, s),
_ => {
time_data.accumulator = Duration::ZERO;
time_data.run_slow = false;
world.insert_resource(LocalPlayers::default());
world.insert_resource(RollbackFrameCount(0));
world.insert_resource(ConfirmedFrameCount(-1));
world.insert_resource(MaxPredictionWindow(8));
}
}
}
world.insert_resource(time_data);
}
pub(crate) fn run_synctest<C: Config>(world: &mut World, mut sess: SyncTestSession<C>) {
world.insert_resource(LocalPlayers((0..sess.num_players()).collect()));
world.run_schedule(ReadInputs);
let local_inputs = world.remove_resource::<LocalInputs<C>>().expect(
"No local player inputs found. Did you insert systems into the ReadInputs schedule?",
);
for (handle, input) in local_inputs.0 {
sess.add_local_input(handle, input)
.expect("All handles in local_handles should be valid");
}
let requests = sess.advance_frame();
world.insert_resource(Session::SyncTest(sess));
match requests {
Ok(requests) => handle_requests(requests, world),
Err(e) => {
warn!("{e}");
if let GgrsError::MismatchedChecksum {
current_frame,
mismatched_frames,
} = e
{
world.trigger(SyncTestMismatch {
current_frame,
mismatched_frames,
});
}
}
}
}
pub(crate) fn run_spectator<T: Config>(world: &mut World, mut sess: SpectatorSession<T>) {
let running = sess.current_state() == SessionState::Running;
let requests = running.then(|| sess.advance_frame());
world.insert_resource(Session::Spectator(sess));
match requests {
Some(Ok(requests)) => handle_requests(requests, world),
Some(Err(GgrsError::PredictionThreshold)) => {
info!("P2PSpectatorSession: Waiting for input from host.")
}
Some(Err(e)) => warn!("{e}"),
None => {}
};
}
pub(crate) fn run_p2p<C: Config>(world: &mut World, mut sess: P2PSession<C>) {
world.insert_resource(LocalPlayers(sess.local_player_handles()));
let running = sess.current_state() == SessionState::Running;
if running {
world.run_schedule(ReadInputs);
let local_inputs = world.remove_resource::<LocalInputs<C>>().expect(
"No local player inputs found. Did you insert systems into the ReadInputs schedule?",
);
for (handle, input) in local_inputs.0 {
sess.add_local_input(handle, input)
.expect("All handles in local_inputs should be valid");
}
}
let requests = running.then(|| sess.advance_frame());
world.insert_resource(Session::P2P(sess));
match requests {
Some(Ok(requests)) => handle_requests(requests, world),
Some(Err(GgrsError::PredictionThreshold)) => {
info!("Skipping a frame: PredictionThreshold.")
}
Some(Err(e)) => warn!("{e}"),
None => {}
}
}
pub(crate) fn handle_requests<T: Config>(requests: Vec<GgrsRequest<T>>, world: &mut World) {
let _span = bevy::log::tracing::info_span!("ggrs", name = "HandleRequests").entered();
let mut schedules = world.resource_mut::<Schedules>();
let Some((_, mut load_world_schedule)) = schedules.remove_entry(LoadWorld) else {
panic!("Could not extract LoadWorld Schedule!");
};
let Some((_, mut save_world_schedule)) = schedules.remove_entry(SaveWorld) else {
panic!("Could not extract SaveWorld Schedule!");
};
let Some((_, mut advance_world_schedule)) = schedules.remove_entry(AdvanceWorld) else {
panic!("Could not extract AdvanceWorld Schedule!");
};
for request in requests {
let current_frame = world
.get_resource::<RollbackFrameCount>()
.map(|frame| frame.0)
.unwrap_or_default();
let session = world.get_resource::<Session<T>>();
let max_prediction = match session {
Some(Session::P2P(s)) => Some(s.max_prediction()),
Some(Session::SyncTest(s)) => Some(s.max_prediction()),
Some(Session::Spectator(_)) => Some(0),
None => None,
};
let confirmed_frame = match session {
Some(Session::P2P(s)) => Some(s.confirmed_frame()),
Some(Session::SyncTest(s)) => {
let current_frame = current_frame - (s.check_distance() as i32);
(current_frame >= 0).then_some(current_frame)
}
Some(Session::Spectator(_)) => Some(current_frame),
None => None,
};
if let Some(max_prediction) = max_prediction {
world.insert_resource(MaxPredictionWindow(max_prediction));
}
if let Some(confirmed_frame) = confirmed_frame {
world.insert_resource(ConfirmedFrameCount(confirmed_frame));
}
match request {
GgrsRequest::SaveGameState { cell, frame } => {
let _span =
bevy::log::tracing::info_span!("schedule", name = "SaveWorld").entered();
debug!("saving snapshot for frame {frame}");
save_world_schedule.run(world);
let checksum = world
.get_resource::<Checksum>()
.map(|&Checksum(checksum)| checksum);
cell.save(frame, None, checksum);
}
GgrsRequest::LoadGameState { frame, .. } => {
let _span =
bevy::log::tracing::info_span!("schedule", name = "LoadWorld").entered();
debug!("restoring snapshot for frame {frame}");
world
.get_resource_mut::<RollbackFrameCount>()
.expect("Unable to find GGRS RollbackFrameCount. Did you remove it?")
.0 = frame;
load_world_schedule.run(world);
}
GgrsRequest::AdvanceFrame { inputs } => {
let _span =
bevy::log::tracing::info_span!("schedule", name = "AdvanceWorld").entered();
let mut frame_count = world
.get_resource_mut::<RollbackFrameCount>()
.expect("Unable to find GGRS RollbackFrameCount. Did you remove it?");
frame_count.0 += 1;
let frame = frame_count.0;
debug!("advancing to frame: {}", frame);
world.insert_resource(PlayerInputs::<T>(inputs));
advance_world_schedule.run(world);
world.remove_resource::<PlayerInputs<T>>();
debug!("frame {frame} completed");
}
}
}
let mut schedules = world.resource_mut::<Schedules>();
let old = schedules.insert(load_world_schedule);
if old.is_some() {
panic!("LoadWorld Schedule was Duplicated!");
}
let old = schedules.insert(save_world_schedule);
if old.is_some() {
panic!("SaveWorld Schedule was Duplicated!");
}
let old = schedules.insert(advance_world_schedule);
if old.is_some() {
panic!("GgrsSchedule Schedule was Duplicated!");
}
}