lightyear_netcode 0.27.0

Connection handling for the lightyear networking library
Documentation
use alloc::{format, string::ToString};

use crate::Error;
use crate::auth::Authentication;
use crate::client::{ClientConfig, ClientState};
use aeronet_io::connection::PeerAddr;
use bevy_app::{App, Plugin, PostUpdate, PreUpdate};
use bevy_ecs::lifecycle::HookContext;
use bevy_ecs::prelude::*;
use bevy_ecs::{system::ParallelCommands, world::DeferredWorld};
use bevy_reflect::Reflect;
use bevy_time::{Real, Time};
use lightyear_connection::ConnectionSystems;
use lightyear_connection::client::{
    Connect, Connected, Connecting, ConnectionPlugin, Disconnect, Disconnected,
};
use lightyear_connection::host::HostClient;
use lightyear_core::id::{LocalId, PeerId, RemoteId};
use lightyear_link::{Link, LinkSystems, Linked};
use lightyear_transport::plugin::TransportSystems;
use tracing::{debug, error, info};

pub struct NetcodeClientPlugin;

/// Add this component on an entity to make it a netcode client.
///
/// The [`Link`] component will be added.
#[derive(Component)]
#[require(Link, lightyear_connection::client::Client)]
#[require(Disconnected)]
#[component(on_insert = NetcodeClient::on_insert)]
pub struct NetcodeClient {
    pub inner: crate::client::Client<()>,
}

impl NetcodeClient {
    fn on_insert(mut world: DeferredWorld, context: HookContext) {
        if let Some(server_addr) = world
            .get::<NetcodeClient>(context.entity)
            .map(|client| client.inner.server_addr())
        {
            world
                .commands()
                .get_entity(context.entity)
                .unwrap()
                .insert(PeerAddr(server_addr));
        }
    }
}

#[derive(Clone, Reflect)]
/// Config related to the netcode protocol (abstraction of a connection over raw UDP-like transport)
pub struct NetcodeConfig {
    pub num_disconnect_packets: usize,
    pub keepalive_packet_send_rate: f64,
    /// Set the duration (in seconds) after which the server disconnects a client if they don't hear from them.
    /// This is valid for tokens generated by the server.
    /// The default is 3 seconds. A negative value means no timeout.
    /// This is used when the client generates a `ConnectToken` (with `Authentication::Manual`)
    pub client_timeout_secs: i32,
    /// Set the duration in seconds after which the `ConnectToken` generated by the Client
    /// will expire. Set a negative value for the token to never expire.
    pub token_expire_secs: i32,
}

impl Default for NetcodeConfig {
    fn default() -> Self {
        Self {
            num_disconnect_packets: 10,
            keepalive_packet_send_rate: 1.0 / 10.0,
            client_timeout_secs: 3,
            token_expire_secs: 30,
        }
    }
}

impl NetcodeConfig {
    pub(crate) fn build(&self) -> ClientConfig<()> {
        ClientConfig::default()
            .num_disconnect_packets(self.num_disconnect_packets)
            .packet_send_rate(self.keepalive_packet_send_rate)
    }
}

impl NetcodeClient {
    pub fn new(auth: Authentication, config: NetcodeConfig) -> Result<Self, Error> {
        let token = auth.get_token(config.client_timeout_secs, config.token_expire_secs)?;
        let token_bytes = token.try_into_bytes()?;
        Ok(Self {
            inner: crate::client::Client::with_config(&token_bytes, config.build())?,
        })
    }

    pub fn id(&self) -> PeerId {
        // TODO: returns PeerId::Entity if not connected.
        PeerId::Netcode(self.inner.id())
    }
}

// TODO: when Client is spawned, add an observer for connection/disconnection, etc.

// We process bytes from the link and re-add them to the link so that the Transport can still
// fetch directly from the link
impl NetcodeClientPlugin {
    /// Takes packets from the Link, process them through netcode
    /// and buffer them back into the link to be sent by the IO
    fn send(
        mut query: Query<(&mut Link, &mut NetcodeClient), (With<Linked>, Without<HostClient>)>,
    ) {
        query.par_iter_mut().for_each(|(mut link, mut client)| {
            // send user packets
            for _ in 0..link.send.len() {
                if let Some(payload) = link.send.pop() {
                    client
                        .inner
                        .send(payload, &mut link.send)
                        .inspect_err(|e| {
                            error!("Error sending packet: {:?}", e);
                        })
                        .ok();
                }
            }

            // send netcode internal packets
            client.inner.drain_send_netcode_packets(&mut link.send);

            // #[cfg(feature = "test_utils")]
            // trace!("CLIENT: length of each packet in send: {:?}", link.send.iter().map(|p| p.len()).collect::<Vec<_>>());
        })
    }

    /// Receive packets from the Link, and process them through the client,
    /// then buffer them back into the Link
    fn receive(
        real_time: Res<Time<Real>>,
        mut query: Query<
            (
                Entity,
                &mut Link,
                &mut NetcodeClient,
                Has<Connecting>,
                Has<Disconnected>,
            ),
            (With<Linked>, Without<HostClient>),
        >,
        parallel_commands: ParallelCommands,
    ) {
        let delta = real_time.delta();
        query
            .par_iter_mut()
            .for_each(|(entity, mut link, mut client, connecting, disconnected)| {
                // #[cfg(feature = "test_utils")]
                // trace!("CLIENT: length of each packet in receive: {:?}", link.recv.iter().map(|p| p.len()).collect::<Vec<_>>());

                // Buffer the packets received from the link into the Connection
                // don't short-circuit on error
                if let Ok(state) = client
                    .inner
                    .try_update(delta.as_secs_f64(), &mut link.recv)
                    .inspect_err(|e| {
                        error!("Error receiving packet: {:?}", e);
                    })
                {
                    if state == ClientState::Connected && connecting {
                        info!("Client {} connected", client.id());
                        parallel_commands.command_scope(|mut commands| {
                            commands.entity(entity).insert((
                                Connected,
                                LocalId(client.id()),
                                RemoteId(PeerId::Server),
                            ));
                        });
                    }
                    if !disconnected
                        && !matches!(
                            state,
                            ClientState::Connected
                                | ClientState::SendingConnectionRequest
                                | ClientState::SendingChallengeResponse
                        )
                    {
                        info!("Client {} disconnected. State: {state:?}", client.id());
                        parallel_commands.command_scope(|mut commands| {
                            commands.entity(entity).insert(Disconnected {
                                reason: Some(format!("Client disconnected: {state:?}")),
                            });
                        });
                    }
                }
            })
    }

    fn connect(
        trigger: On<Connect>,
        mut commands: Commands,
        mut query: Query<&mut NetcodeClient, Without<Connected>>,
    ) {
        if let Ok(mut client) = query.get_mut(trigger.entity) {
            debug!("Starting netcode connection process");
            client.inner.connect();
            commands.entity(trigger.entity).insert(Connecting);
        }
    }

    fn disconnect(
        trigger: On<Disconnect>,
        mut commands: Commands,
        mut query: Query<&mut NetcodeClient, Without<Disconnected>>,
    ) -> Result {
        if let Ok(mut client) = query.get_mut(trigger.entity) {
            client.inner.disconnect()?;
            commands.entity(trigger.entity).insert(Disconnected {
                reason: Some("Client trigger".to_string()),
            });
        }
        Ok(())
    }
}

impl Plugin for NetcodeClientPlugin {
    fn build(&self, app: &mut App) {
        if !app.is_plugin_added::<ConnectionPlugin>() {
            app.add_plugins(ConnectionPlugin);
        }
        app.configure_sets(
            PreUpdate,
            (
                LinkSystems::Receive,
                ConnectionSystems::Receive,
                TransportSystems::Receive,
            )
                .chain(),
        );
        app.configure_sets(
            PostUpdate,
            (
                TransportSystems::Send,
                ConnectionSystems::Send,
                LinkSystems::Send,
            )
                .chain(),
        );

        app.add_systems(PreUpdate, Self::receive.in_set(ConnectionSystems::Receive));
        app.add_systems(PostUpdate, Self::send.in_set(ConnectionSystems::Send));
        app.add_observer(Self::connect);
        app.add_observer(Self::disconnect);
    }
}