#![doc = include_str!("./networking.md")]
use self::{
input::{DenseInput, NetworkInputConfig, NetworkPlayerControl, NetworkPlayerControls},
socket::Socket,
};
use crate::prelude::*;
use bones_matchmaker_proto::{MATCH_ALPN, PLAY_ALPN};
use ggrs::P2PSession;
use instant::Duration;
use once_cell::sync::Lazy;
use std::{fmt::Debug, marker::PhantomData, sync::Arc};
use tracing::{debug, error, info, trace, warn};
#[cfg(feature = "net-debug")]
use {
self::debug::{NetworkDebugMessage, PlayerSyncState, NETWORK_DEBUG_CHANNEL},
ggrs::{NetworkStats, PlayerHandle},
};
use crate::input::PlayerControls as PlayerControlsTrait;
pub mod input;
pub mod lan;
pub mod online;
pub mod proto;
pub mod socket;
#[cfg(feature = "net-debug")]
pub mod debug;
pub static RUNTIME: Lazy<tokio::runtime::Runtime> =
Lazy::new(|| tokio::runtime::Runtime::new().expect("unable to crate tokio runtime"));
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NetworkInputStatus {
Confirmed,
Predicted,
Disconnected,
}
impl From<ggrs::InputStatus> for NetworkInputStatus {
fn from(value: ggrs::InputStatus) -> Self {
match value {
ggrs::InputStatus::Confirmed => NetworkInputStatus::Confirmed,
ggrs::InputStatus::Predicted => NetworkInputStatus::Predicted,
ggrs::InputStatus::Disconnected => NetworkInputStatus::Disconnected,
}
}
}
pub mod prelude {
pub use super::{input, lan, online, proto, DisconnectedPlayers, SyncingInfo, RUNTIME};
#[cfg(feature = "net-debug")]
pub use super::debug::prelude::*;
}
pub const NETWORK_FRAME_RATE_FACTOR: f32 = 0.9;
pub const NETWORK_MAX_PREDICTION_WINDOW_DEFAULT: usize = 7;
pub const NETWORK_LOCAL_INPUT_DELAY_DEFAULT: usize = 2;
pub enum NetworkError {
Disconnected,
}
#[derive(Debug)]
pub struct GgrsConfig<T: DenseInput + Debug> {
phantom: PhantomData<T>,
}
impl<T: DenseInput + Debug> ggrs::Config for GgrsConfig<T> {
type Input = T;
type State = World;
type Address = usize;
}
static NETWORK_ENDPOINT: tokio::sync::OnceCell<iroh_net::Endpoint> =
tokio::sync::OnceCell::const_new();
pub async fn get_network_endpoint() -> &'static iroh_net::Endpoint {
NETWORK_ENDPOINT
.get_or_init(|| async move {
let secret_key = iroh_net::key::SecretKey::generate();
iroh_net::Endpoint::builder()
.alpns(vec![MATCH_ALPN.to_vec(), PLAY_ALPN.to_vec()])
.discovery(Box::new(
iroh_net::discovery::ConcurrentDiscovery::from_services(vec![
Box::new(iroh_net::discovery::dns::DnsDiscovery::n0_dns()),
Box::new(iroh_net::discovery::pkarr::PkarrPublisher::n0_dns(
secret_key.clone(),
)),
]),
))
.secret_key(secret_key)
.bind(0)
.await
.unwrap()
})
.await
}
#[derive(Clone, HasSchema, Deref, DerefMut)]
#[schema(no_default)]
pub struct NetworkMatchSocket(Arc<dyn NetworkSocket>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GameMessage {
pub match_id: u8,
pub message: ggrs::Message,
}
pub trait GgrsSocket: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
impl<T> GgrsSocket for T where T: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
pub trait NetworkSocket: Sync + Send {
fn ggrs_socket(&self) -> Socket;
fn send_reliable(&self, target: SocketTarget, message: &[u8]);
fn recv_reliable(&self) -> Vec<(u32, Vec<u8>)>;
fn close(&self);
fn player_idx(&self) -> u32;
fn player_count(&self) -> u32;
fn increment_match_id(&mut self);
}
pub enum SocketTarget {
Player(u32),
All,
}
#[derive(HasSchema, Clone)]
#[schema(no_default)]
pub enum SyncingInfo {
Online {
current_frame: i32,
last_confirmed_frame: i32,
socket: Socket,
players_network_stats: SVec<PlayerNetworkStats>,
local_player_idx: usize,
local_frame_delay: usize,
disconnected_players: SVec<usize>,
},
Offline {
current_frame: i32,
},
}
impl SyncingInfo {
pub fn is_online(&self) -> bool {
matches!(self, SyncingInfo::Online { .. })
}
pub fn is_offline(&self) -> bool {
matches!(self, SyncingInfo::Offline { .. })
}
pub fn current_frame(&self) -> i32 {
match self {
SyncingInfo::Online { current_frame, .. } => *current_frame,
SyncingInfo::Offline { current_frame } => *current_frame,
}
}
pub fn last_confirmed_frame(&self) -> i32 {
match self {
SyncingInfo::Online {
last_confirmed_frame,
..
} => *last_confirmed_frame,
SyncingInfo::Offline { current_frame } => *current_frame,
}
}
pub fn socket(&self) -> Option<&Socket> {
match self {
SyncingInfo::Online { socket, .. } => Some(socket),
SyncingInfo::Offline { .. } => None,
}
}
pub fn socket_mut(&mut self) -> Option<&mut Socket> {
match self {
SyncingInfo::Online { socket, .. } => Some(socket),
SyncingInfo::Offline { .. } => None,
}
}
pub fn player_network_stats(&self, player_idx: usize) -> Option<PlayerNetworkStats> {
match self {
SyncingInfo::Online {
players_network_stats,
..
} => players_network_stats.get(player_idx).cloned(),
SyncingInfo::Offline { .. } => None,
}
}
pub fn players_network_stats(&self) -> SVec<PlayerNetworkStats> {
match self {
SyncingInfo::Online {
players_network_stats,
..
} => players_network_stats.clone(),
SyncingInfo::Offline { .. } => SVec::new(),
}
}
pub fn remote_players_network_stats(&self) -> SVec<PlayerNetworkStats> {
match self {
SyncingInfo::Online {
players_network_stats,
..
} => players_network_stats
.iter()
.filter(|&stats| stats.ping != 0 || stats.kbps_sent != 0)
.cloned()
.collect(),
SyncingInfo::Offline { .. } => SVec::new(),
}
}
pub fn total_kbps_sent(&self) -> usize {
self.remote_players_network_stats()
.iter()
.map(|stats| stats.kbps_sent)
.sum()
}
pub fn averaged_kbps_sent(&self) -> f32 {
let remote_stats = self.remote_players_network_stats();
if remote_stats.is_empty() {
0.0
} else {
let total_kbps: usize = remote_stats.iter().map(|stats| stats.kbps_sent).sum();
total_kbps as f32 / remote_stats.len() as f32
}
}
pub fn highest_local_frames_behind(&self) -> i32 {
self.remote_players_network_stats()
.iter()
.map(|stats| stats.local_frames_behind)
.max()
.unwrap_or(0)
}
pub fn highest_remote_frames_behind(&self) -> i32 {
self.remote_players_network_stats()
.iter()
.map(|stats| stats.remote_frames_behind)
.max()
.unwrap_or(0)
}
pub fn averaged_ping(&self) -> u128 {
let remote_stats = self.remote_players_network_stats();
if remote_stats.is_empty() {
0
} else {
let total_ping: u128 = remote_stats.iter().map(|stats| stats.ping).sum();
total_ping / remote_stats.len() as u128
}
}
pub fn lowest_ping(&self) -> u128 {
self.remote_players_network_stats()
.iter()
.map(|stats| stats.ping)
.min()
.unwrap_or(0)
}
pub fn highest_ping(&self) -> u128 {
self.remote_players_network_stats()
.iter()
.map(|stats| stats.ping)
.max()
.unwrap_or(0)
}
pub fn local_player_idx_checked(&self) -> Option<usize> {
match self {
SyncingInfo::Online {
local_player_idx, ..
} => Some(*local_player_idx),
SyncingInfo::Offline { .. } => None,
}
}
pub fn local_player_idx(&self) -> usize {
match self {
SyncingInfo::Online {
local_player_idx, ..
} => *local_player_idx,
SyncingInfo::Offline { .. } => 0,
}
}
pub fn local_frame_delay(&self) -> usize {
match self {
SyncingInfo::Online {
local_frame_delay, ..
} => *local_frame_delay,
SyncingInfo::Offline { .. } => 0,
}
}
pub fn players_count(&self) -> usize {
match self {
SyncingInfo::Online {
players_network_stats,
..
} => players_network_stats.len(),
SyncingInfo::Offline { .. } => 0,
}
}
pub fn players_count_checked(&self) -> Option<usize> {
match self {
SyncingInfo::Online {
players_network_stats,
..
} => Some(players_network_stats.len()),
SyncingInfo::Offline { .. } => None,
}
}
pub fn active_players(&self) -> SVec<usize> {
match self {
SyncingInfo::Online {
players_network_stats,
disconnected_players,
..
} => {
let total_players = players_network_stats.len();
(0..total_players)
.filter(|&id| !disconnected_players.contains(&id))
.collect()
}
SyncingInfo::Offline { .. } => SVec::new(),
}
}
pub fn active_players_checked(&self) -> Option<SVec<usize>> {
match self {
SyncingInfo::Online {
players_network_stats,
disconnected_players,
..
} => {
let total_players = players_network_stats.len();
let active = (0..total_players)
.filter(|&id| !disconnected_players.contains(&id))
.collect();
Some(active)
}
SyncingInfo::Offline { .. } => None,
}
}
pub fn disconnected_players(&self) -> SVec<usize> {
match self {
SyncingInfo::Online {
disconnected_players,
..
} => disconnected_players.clone(),
SyncingInfo::Offline { .. } => SVec::new(),
}
}
pub fn disconnected_players_checked(&self) -> Option<SVec<usize>> {
match self {
SyncingInfo::Online {
disconnected_players,
..
} => Some(disconnected_players.clone()),
SyncingInfo::Offline { .. } => None,
}
}
}
#[derive(HasSchema, Clone, Default)]
pub struct DisconnectedPlayers {
pub disconnected_players: Vec<usize>,
}
pub struct GgrsSessionRunner<'a, InputTypes: NetworkInputConfig<'a>> {
pub last_player_input: InputTypes::Dense,
pub session: P2PSession<GgrsConfig<InputTypes::Dense>>,
pub player_idx: u32,
pub local_player_idx: u32,
pub accumulator: f64,
pub last_run: Option<Instant>,
pub network_fps: f64,
pub original_fps: f64,
pub input_collector: InputTypes::InputCollector,
pub local_input_disabled: bool,
disconnected_players: Vec<usize>,
socket: Socket,
local_input_delay: usize,
}
#[derive(Clone)]
pub struct GgrsSessionRunnerInfo {
pub socket: Socket,
pub player_idx: u32,
pub player_count: u32,
pub max_prediction_window: Option<usize>,
pub local_input_delay: Option<usize>,
}
impl GgrsSessionRunnerInfo {
pub fn new(
socket: Socket,
max_prediction_window: Option<usize>,
local_input_delay: Option<usize>,
) -> Self {
let player_idx = socket.player_idx();
let player_count = socket.player_count();
Self {
socket,
player_idx,
player_count,
max_prediction_window,
local_input_delay,
}
}
}
impl<'a, InputTypes> GgrsSessionRunner<'a, InputTypes>
where
InputTypes: NetworkInputConfig<'a>,
{
pub fn new(simulation_fps: f32, info: GgrsSessionRunnerInfo) -> Self
where
Self: Sized,
{
let network_fps = (simulation_fps * NETWORK_FRAME_RATE_FACTOR) as f64;
let network_fps = network_fps
.max(usize::MIN as f64)
.min(usize::MAX as f64)
.round() as usize;
let max_prediction = info
.max_prediction_window
.unwrap_or(NETWORK_MAX_PREDICTION_WINDOW_DEFAULT);
let local_input_delay = info
.local_input_delay
.unwrap_or(NETWORK_LOCAL_INPUT_DELAY_DEFAULT);
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::SetMaxPrediction(max_prediction))
.unwrap();
let mut builder = ggrs::SessionBuilder::new()
.with_num_players(info.player_count as usize)
.with_input_delay(local_input_delay)
.with_fps(network_fps)
.unwrap()
.with_max_prediction_window(max_prediction)
.unwrap();
let local_player_idx = info.player_idx;
for i in 0..info.player_count {
if i == info.player_idx {
builder = builder
.add_player(ggrs::PlayerType::Local, i as usize)
.unwrap();
} else {
builder = builder
.add_player(ggrs::PlayerType::Remote(i as usize), i as usize)
.unwrap();
}
}
let session = builder.start_p2p_session(info.socket.clone()).unwrap();
Self {
last_player_input: InputTypes::Dense::default(),
session,
player_idx: info.player_idx,
local_player_idx,
accumulator: default(),
last_run: None,
network_fps: network_fps as f64,
original_fps: simulation_fps as f64,
disconnected_players: default(),
input_collector: InputTypes::InputCollector::default(),
socket: info.socket.clone(),
local_input_delay,
local_input_disabled: false,
}
}
}
#[allow(type_alias_bounds)]
type ControlMapping<'a, C: NetworkInputConfig<'a>> =
<C::PlayerControls as PlayerControls<'a, C::Control>>::ControlMapping;
impl<InputTypes> SessionRunner for GgrsSessionRunner<'static, InputTypes>
where
InputTypes: NetworkInputConfig<'static> + 'static,
{
fn step(&mut self, frame_start: Instant, world: &mut World, stages: &mut SystemStages) {
let step: f64 = 1.0 / self.network_fps;
let last_run = self.last_run.unwrap_or(frame_start);
let delta = (frame_start - last_run).as_secs_f64();
self.accumulator += delta;
let mut skip_frames: u32 = 0;
{
let keyboard = world.resource::<KeyboardInputs>();
let gamepad = world.resource::<GamepadInputs>();
let player_inputs = world.resource::<InputTypes::PlayerControls>();
self.input_collector.apply_inputs(
&world.resource::<ControlMapping<InputTypes>>(),
&keyboard,
&gamepad,
);
self.input_collector.update_just_pressed();
match player_inputs.get_control_source(self.local_player_idx as usize) {
Some(control_source) => {
let control = self
.input_collector
.get_control(self.local_player_idx as usize, control_source);
self.last_player_input = control.get_dense_input();
},
None => warn!("GgrsSessionRunner local_player_idx {} has no control source, no local input provided.",
self.local_player_idx)
};
}
#[cfg(feature = "net-debug")]
let current_frame_original = self.session.current_frame();
for event in self.session.events() {
match event {
ggrs::GgrsEvent::Synchronizing { addr, total, count } => {
info!(player=%addr, %total, progress=%count, "Syncing network player");
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::PlayerSync((
PlayerSyncState::SyncInProgress,
addr,
)))
.unwrap();
}
ggrs::GgrsEvent::Synchronized { addr } => {
info!(player=%addr, "Syncrhonized network client");
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::PlayerSync((
PlayerSyncState::Sychronized,
addr,
)))
.unwrap();
}
ggrs::GgrsEvent::Disconnected { addr } => {
warn!(player=%addr, "Player Disconnected");
self.disconnected_players.push(addr);
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::DisconnectedPlayers(
self.disconnected_players.clone(),
))
.unwrap();
} ggrs::GgrsEvent::NetworkInterrupted { addr, .. } => {
info!(player=%addr, "Network player interrupted");
}
ggrs::GgrsEvent::NetworkResumed { addr } => {
info!(player=%addr, "Network player re-connected");
}
ggrs::GgrsEvent::WaitRecommendation {
skip_frames: skip_count,
} => {
info!(
"Skipping {skip_count} frames to give network players a chance to catch up"
);
skip_frames = skip_count;
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::SkipFrame {
frame: current_frame_original,
count: skip_count,
})
.unwrap();
}
ggrs::GgrsEvent::DesyncDetected {
frame,
local_checksum,
remote_checksum,
addr,
} => {
error!(%frame, %local_checksum, %remote_checksum, player=%addr, "Network de-sync detected");
}
}
}
loop {
if self.accumulator >= step {
self.accumulator -= step;
if !self.local_input_disabled {
self.session
.add_local_input(self.local_player_idx as usize, self.last_player_input)
.unwrap();
} else {
self.session
.add_local_input(
self.local_player_idx as usize,
InputTypes::Dense::default(),
)
.unwrap();
}
#[cfg(feature = "net-debug")]
{
let current_frame = self.session.current_frame();
let confirmed_frame = self.session.confirmed_frame();
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::FrameUpdate {
current: current_frame,
last_confirmed: confirmed_frame,
})
.unwrap();
}
if skip_frames > 0 {
skip_frames = skip_frames.saturating_sub(1);
continue;
}
match self.session.advance_frame() {
Ok(requests) => {
for request in requests {
match request {
ggrs::GgrsRequest::SaveGameState { cell, frame } => {
cell.save(frame, Some(world.clone()), None)
}
ggrs::GgrsRequest::LoadGameState { cell, .. } => {
let mut sessions = Sessions::default();
std::mem::swap(
&mut sessions,
&mut world.resource_mut::<Sessions>(),
);
*world = cell.load().unwrap_or_default();
std::mem::swap(
&mut sessions,
&mut world.resource_mut::<Sessions>(),
);
}
ggrs::GgrsRequest::AdvanceFrame {
inputs: network_inputs,
} => {
self.input_collector.advance_frame();
let mut players_network_stats: Vec<PlayerNetworkStats> = vec![
PlayerNetworkStats::default();
self.session.remote_player_handles().len() + 1 ];
for handle in self.session.remote_player_handles().iter() {
if let Ok(stats) = self.session.network_stats(*handle) {
players_network_stats[*handle] =
PlayerNetworkStats::from_ggrs_network_stats(
*handle, stats,
);
}
}
world.insert_resource(SyncingInfo::Online {
current_frame: self.session.current_frame(),
last_confirmed_frame: self.session.confirmed_frame(),
socket: self.socket.clone(),
players_network_stats: players_network_stats.into(),
local_player_idx: self.local_player_idx as usize,
local_frame_delay: self.local_input_delay,
disconnected_players: self
.disconnected_players
.clone()
.into(),
});
world.insert_resource(DisconnectedPlayers {
disconnected_players: self.disconnected_players.clone(),
});
{
world
.resource_mut::<Time>()
.advance_exact(Duration::from_secs_f64(step));
let mut player_inputs =
world.resource_mut::<InputTypes::PlayerControls>();
for (player_idx, (input, status)) in
network_inputs.into_iter().enumerate()
{
trace!(
"Net player({player_idx}) local: {}, status: {status:?}, input: {:?}",
self.local_player_idx as usize == player_idx,
input
);
player_inputs.network_update(
player_idx,
&input,
status.into(),
);
}
}
stages.run(world);
}
}
}
}
Err(e) => match e {
ggrs::GgrsError::NotSynchronized => {
debug!("Waiting for network clients to sync")
}
ggrs::GgrsError::PredictionThreshold => {
warn!("Freezing game while waiting for network to catch-up.");
#[cfg(feature = "net-debug")]
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::FrameFroze {
frame: self.session.current_frame(),
})
.unwrap();
}
e => error!("Network protocol error: {e}"),
},
}
} else {
break;
}
}
self.last_run = Some(frame_start);
#[cfg(feature = "net-debug")]
{
let mut network_stats: Vec<(PlayerHandle, NetworkStats)> = vec![];
for handle in self.session.remote_player_handles().iter() {
if let Ok(stats) = self.session.network_stats(*handle) {
network_stats.push((*handle, stats));
}
}
if !network_stats.is_empty() {
NETWORK_DEBUG_CHANNEL
.sender
.try_send(NetworkDebugMessage::NetworkStats { network_stats })
.unwrap();
}
}
}
fn restart_session(&mut self) {
self.socket.increment_match_id();
let runner_info = GgrsSessionRunnerInfo {
socket: self.socket.clone(),
player_idx: self.player_idx,
player_count: self.session.num_players().try_into().unwrap(),
max_prediction_window: Some(self.session.max_prediction()),
local_input_delay: Some(self.local_input_delay),
};
*self = GgrsSessionRunner::new(self.original_fps as f32, runner_info);
}
fn disable_local_input(&mut self, input_disabled: bool) {
self.local_input_disabled = input_disabled;
}
}
#[derive(Debug, Default, Clone, Copy, HasSchema)]
pub struct PlayerNetworkStats {
pub player_idx: usize,
pub send_queue_len: usize,
pub ping: u128,
pub kbps_sent: usize,
pub local_frames_behind: i32,
pub remote_frames_behind: i32,
}
impl PlayerNetworkStats {
pub fn from_ggrs_network_stats(player_idx: usize, stats: ggrs::NetworkStats) -> Self {
Self {
player_idx,
send_queue_len: stats.send_queue_len,
ping: stats.ping,
kbps_sent: stats.kbps_sent,
local_frames_behind: stats.local_frames_behind,
remote_frames_behind: stats.remote_frames_behind,
}
}
}