use super::{events::RtcServerEvent, RtcServer, RtcServerState, RtcServerStatus};
use crate::{
latency::{LatencyTracer, LatencyTracerPayload},
socket::RtcSocket,
};
use bevy::prelude::*;
use bevy_matchbox::{
matchbox_signaling::{
topologies::client_server::{ClientServer, ClientServerState},
SignalingServerBuilder,
},
matchbox_socket::{PeerState, WebRtcSocket},
prelude::ChannelConfig,
OpenSocketExt, StartServerExt,
};
use instant::Duration;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub fn init_signaling_server(mut commands: Commands, rtc_state: Res<RtcServerState>) {
let host_ready: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let builder =
SignalingServerBuilder::new(rtc_state.addr, ClientServer, ClientServerState::default())
.on_id_assignment(|(socket, id)| info!("{socket} assigned {id}"))
.on_host_connected({
let addr = rtc_state.addr;
let host_ready = host_ready.clone();
move |id| {
host_ready.store(true, Ordering::Relaxed);
info!("Host ready: {id}");
info!("Ready for clients (broadcasting on {addr})");
}
})
.on_host_disconnected(|id| panic!("Host left: {id}"))
.on_client_connected(|id| info!("Client joined: {id}"))
.on_client_disconnected(|id| info!("Client left: {id}"))
.on_connection_request({
let ready = host_ready.clone();
move |request| {
if ready.load(Ordering::Relaxed) {
Ok(true)
} else {
let origin = request.origin.ip();
match origin {
std::net::IpAddr::V4(ip) => {
if ip.is_loopback() {
Ok(true)
} else {
Ok(false)
}
}
std::net::IpAddr::V6(ip) => {
if ip.is_loopback() {
Ok(true)
} else {
Ok(false)
}
}
}
}
}
})
.cors()
.trace();
commands.start_server(builder);
}
pub fn init_server_socket(mut commands: Commands, state: Res<RtcServerState>) {
let room_url = format!("ws://{}", state.addr);
let socker_builder = WebRtcSocket::builder(room_url)
.add_channel(ChannelConfig {
ordered: true,
max_retransmits: Some(0),
})
.add_channel(ChannelConfig::reliable());
commands.open_socket(socker_builder);
}
pub fn server_event_writer(
mut commands: Commands,
tracer_query: Query<(Entity, &LatencyTracer)>,
mut state: ResMut<RtcServerState>,
mut socket: ResMut<RtcSocket>,
mut event_wtr: EventWriter<RtcServerEvent>,
mut next_server_status: ResMut<NextState<RtcServerStatus>>,
) {
if let Some(id) = socket.id() {
if state.peer_id.is_none() {
state.peer_id.replace(id);
event_wtr.send(RtcServerEvent::IdAssigned(id));
next_server_status.set(RtcServerStatus::Ready);
}
}
for (peer, peer_state) in socket.update_peers() {
match peer_state {
PeerState::Connected => {
state.peers.insert(peer);
commands.spawn(LatencyTracer::new(peer));
event_wtr.send(RtcServerEvent::ClientJoined(peer));
}
PeerState::Disconnected => {
state.peers.remove(&peer);
state.latencies.remove(&peer);
state.smoothed_latencies.remove(&peer);
if let Some(entity) = tracer_query
.iter()
.find(|(_, tracer)| tracer.peer_id == peer)
.map(|(e, _)| e)
{
commands.entity(entity).despawn();
} else {
error!("No latency tracer found for {peer}");
}
event_wtr.send(RtcServerEvent::ClientLeft(peer));
}
}
}
}
pub fn send_latency_tracers(
state: Res<RtcServerState>,
mut server: RtcServer<LatencyTracerPayload>,
) {
let peer_id = state.peer_id.expect("expected peer id");
server.unreliable_to_all(LatencyTracerPayload::new(peer_id));
}
pub fn read_latency_tracers(
state: Res<RtcServerState>,
mut tracers: Query<&mut LatencyTracer>,
mut server: RtcServer<LatencyTracerPayload>,
) {
let host_id = state.peer_id.expect("expected host id");
for (from, payload) in server.read() {
if payload.from == host_id {
if let Some(mut tracer) = tracers.iter_mut().find(|tracer| tracer.peer_id == from) {
tracer.process(payload);
}
} else if payload.from == from {
server.unreliable_to_peer(from, payload);
} else {
warn!("Invalid latency tracer from {from}: {payload:?}, ignoring");
}
}
}
pub fn calculate_latency(
time: Res<Time>,
mut state: ResMut<RtcServerState>,
mut tracers: Query<&mut LatencyTracer>,
) {
for mut tracer in tracers.iter_mut() {
if !state.peers.contains(&tracer.peer_id) {
state.latencies.remove(&tracer.peer_id);
state.smoothed_latencies.remove(&tracer.peer_id);
continue;
}
tracer.update_latency();
let last_latency = tracer.last_latency.map(Duration::from_secs_f32);
match last_latency {
Some(last_latency) => {
state.latencies.insert(tracer.peer_id, Some(last_latency));
let current_smoothed = state
.smoothed_latencies
.entry(tracer.peer_id)
.or_insert(Some(last_latency))
.get_or_insert(last_latency);
const AVG_SECS: f32 = 1.0; let alpha = 1.0 - f32::exp(-time.delta_seconds() / AVG_SECS);
let current_f32 = current_smoothed.as_secs_f32() * (1.0 - alpha);
let delta = last_latency.as_secs_f32() * alpha;
*current_smoothed = Duration::from_secs_f32(current_f32 + delta);
}
None => {
state.latencies.insert(tracer.peer_id, None);
state.smoothed_latencies.insert(tracer.peer_id, None);
}
}
}
}