use crate::bevy_tokio_tasks::{TokioTasksPlugin, TokioTasksRuntime};
use bevy::prelude::*;
use bevy::utils::HashMap;
use bevygap_shared::nats::*;
use lightyear::connection::netcode::ClientId;
use lightyear::connection::server::{ConnectionRequestHandler, DeniedReason};
use lightyear::prelude::server::*;
use lightyear::server::events::{ConnectEvent, DisconnectEvent};
use std::sync::Arc;
use crate::arbitrium_env::ArbitriumEnv;
use crate::edgegap_context::{self, ArbitriumContext};
pub struct BevygapServerPlugin;
#[derive(Resource)]
struct CertDigest(String);
#[derive(Event)]
pub struct NatsConnected;
#[derive(Event)]
pub struct BevygapReady;
impl Plugin for BevygapServerPlugin {
fn build(&self, app: &mut App) {
if !app.is_plugin_added::<TokioTasksPlugin>() {
app.add_plugins(TokioTasksPlugin::default());
}
info!("Reading Arbitrium ENVs");
let arb_env = ArbitriumEnv::from_env().expect("Failed to read Arbitrium ENVs");
app.insert_resource(arb_env);
inject_ca_root_env_var_from_cmdline_arg();
app.add_systems(Startup, (extract_cert_digest, setup_nats).chain());
app.add_observer(edgegap_context::fetch_context_on_nats_connected);
app.add_observer(send_context_to_nats);
app.add_observer(setup_connection_request_handler);
app.add_observer(handle_lightyear_client_connect);
app.add_observer(handle_lightyear_client_disconnect);
}
}
#[allow(unreachable_patterns)]
fn extract_cert_digest(
server_config: Res<lightyear::server::config::ServerConfig>,
mut commands: Commands,
) {
let net_config = &server_config.net[0];
let digest = match &net_config {
NetConfig::Netcode { io, .. } => match &io.transport {
ServerTransport::WebTransportServer { certificate, .. } => Some(
certificate.certificate_chain().as_slice()[0]
.hash()
.to_string(),
),
_ => None,
},
_ => None,
};
let Some(digest) = digest else {
panic!(
"Unable to extract cert digest. Is there a webtransport server transport configured?"
);
};
info!("Extracted cert digest: {}", digest);
commands.insert_resource(CertDigest(digest));
}
fn inject_ca_root_env_var_from_cmdline_arg() {
use std::env;
let args: Vec<_> = env::args().collect();
if args.len() < 2 {
return;
}
let mut found_flag = false;
for arg in args {
if found_flag {
let ca_root = arg.clone();
info!(
"Found --ca_contents, setting NATS_CA_CONTENTS to [{} bytes]",
ca_root.len()
);
env::set_var("NATS_CA_CONTENTS", ca_root);
return;
}
if arg == "--ca_contents" {
found_flag = true;
continue;
}
}
}
fn handle_lightyear_client_disconnect(
trigger: Trigger<DisconnectEvent>,
nats_sender: ResMut<NatsSender>,
) {
let client_id = trigger.event().client_id;
info!("Lightyear disconnect event for client_id {}", client_id);
nats_sender.client_disconnected(client_id.to_bits());
}
fn handle_lightyear_client_connect(
trigger: Trigger<ConnectEvent>,
nats_sender: ResMut<NatsSender>,
) {
let client_id = trigger.event().client_id;
info!("Lightyear connect event for client_id {}", client_id);
nats_sender.client_connected(client_id.to_bits());
}
fn setup_connection_request_handler(
_trigger: Trigger<NatsConnected>,
bgnats: Res<BevygapNats>,
mut commands: Commands,
mut server_config: ResMut<lightyear::server::config::ServerConfig>,
) {
let crh = BevygapConnectionRequestHandler::new(bgnats.clone());
let arc_crh = Arc::new(crh);
commands.insert_resource(CRH(arc_crh.clone()));
for net in server_config.net.iter_mut() {
net.set_connection_request_handler(arc_crh.clone());
}
}
fn send_context_to_nats(
_trigger: Trigger<edgegap_context::ContextLoaded>,
context: Res<ArbitriumContext>,
nats_sender: ResMut<NatsSender>,
mut commands: Commands,
digest: Res<CertDigest>,
) {
info!("CONTEXT added: {context:?}");
info!("CONTEXT fqdn: {}", context.fqdn());
nats_sender.cert_digest(context.public_ip(), digest.0.clone());
nats_sender.arbitrium_context(context.clone());
commands.trigger(BevygapReady);
}
#[derive(Debug, Event)]
enum NatsEvent {
ClientConnected(ClientId),
ClientDisconnected(ClientId),
ArbitriumContext(ArbitriumContext),
CertDigest(String, String),
}
#[derive(Resource)]
struct NatsSender(tokio::sync::mpsc::UnboundedSender<NatsEvent>);
impl NatsSender {
fn client_connected(&self, client_id: u64) {
self.0
.send(NatsEvent::ClientConnected(client_id))
.expect("Unable to send NatsEvent for client_connected")
}
fn client_disconnected(&self, client_id: u64) {
self.0
.send(NatsEvent::ClientDisconnected(client_id))
.expect("Unable to send NatsEvent for client_disconnected")
}
fn arbitrium_context(&self, context: ArbitriumContext) {
self.0
.send(NatsEvent::ArbitriumContext(context))
.expect("Unable to send NatsEvent for arbitrium_context")
}
fn cert_digest(&self, ip: String, digest: String) {
self.0
.send(NatsEvent::CertDigest(ip, digest))
.expect("Unable to send NatsEvent for cert_digest")
}
}
struct DeferredTriggerCommand<T>(T);
impl<T: Event> bevy::ecs::world::Command for DeferredTriggerCommand<T> {
fn apply(self, world: &mut World) {
world.trigger(self.0);
}
}
fn setup_nats(runtime: ResMut<TokioTasksRuntime>, mut commands: Commands) {
info!("Setting up NATS");
let (nats_event_sender, mut nats_event_receiver) =
tokio::sync::mpsc::unbounded_channel::<NatsEvent>();
commands.insert_resource(NatsSender(nats_event_sender));
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
std::process::exit(1);
}));
runtime.spawn_background_task(|mut ctx| async move {
let bgnats = match BevygapNats::new_and_connect("bevygap_server_plugin").await {
Ok(nats) => nats,
Err(e) => {
error!("Failed to setup NATS: {}", e);
panic!("Failed to setup NATS");
}
};
info!("NATS connected");
let kv_c2s = bgnats.kv_c2s().clone();
let kv_sessions = bgnats.kv_active_connections().clone();
let kv_cert_digests = bgnats.kv_cert_digests().clone();
let client = bgnats.client().clone();
ctx.run_on_main_thread(move |ctx| {
ctx.world.insert_resource(bgnats);
ctx.world.commands().queue(DeferredTriggerCommand(NatsConnected));
})
.await;
let mut client_id_to_session_id = HashMap::new();
info!("Starting NatsEvent loop");
loop {
let Some(ev) = nats_event_receiver.recv().await else {
panic!("NatsEvent channel closed, aborting.");
};
match ev {
NatsEvent::ClientConnected(client_id) => {
info!("Client connected: {}, writing to nats kv", client_id);
let session_id = kv_c2s
.get(client_id.to_string())
.await
.expect("Failed to get session_id from KV");
match session_id {
None => {
panic!("Client ID is not mapped to a session id! wtf.");
}
Some(session_id) => {
let session_id_key = String::from_utf8(session_id.into())
.expect("Failed to convert session_id to string");
info!("Client ID {client_id} associated with session id: {session_id_key}",);
client_id_to_session_id.insert(client_id, session_id_key.clone());
kv_sessions
.put(session_id_key, client_id.to_string().into())
.await
.expect("Failed to put client_id in KV");
}
}
}
NatsEvent::ClientDisconnected(client_id) => {
info!("Client disconnected: {}, writing to nats kv", client_id);
if let Some(session_id) = client_id_to_session_id.get(&client_id) {
kv_sessions
.delete(session_id)
.await
.expect("Failed to del client_id in KV");
} else {
error!("Client disconnected but not found in client_id_to_session_id");
}
}
NatsEvent::ArbitriumContext(context) => {
info!("ArbitriumContext added: {context:?}");
let arb_context_bytes = context.to_bytes();
client
.publish("gameserver.contexts", arb_context_bytes.into())
.await
.expect("Failed to write context to NATS");
}
NatsEvent::CertDigest(ip, digest) => {
info!("CertDigest added: {ip} -> {digest}");
let key = ip;
kv_cert_digests
.put(key, digest.into())
.await
.expect("Failed to put digest in KV");
}
}
client.flush().await.expect("Failed to flush NATS");
}
});
}
#[derive(Resource)]
pub struct CRH(Arc<BevygapConnectionRequestHandler>);
#[derive(Clone, Debug)]
pub struct BevygapConnectionRequestHandler {
bgnats: BevygapNats,
}
impl BevygapConnectionRequestHandler {
pub fn new(bgnats: BevygapNats) -> Self {
Self { bgnats }
}
}
impl ConnectionRequestHandler for BevygapConnectionRequestHandler {
fn handle_request(
&self,
client_id: lightyear::connection::id::ClientId,
) -> Option<DeniedReason> {
info!("BevygapConnectionRequestHandler({client_id})");
None
}
}