bevy_connect 0.18.6

Connectivity via TCP sessions
Documentation
//! Channel communication over network.
//! This crate uses TCP for communication.
//!
//! A new session can be created per every message type. Each of these message
//! types is a connection, and muliple message types will create parallel
//! connections.
//!
//! Connecions are established in direct mode via host+port. For the host it
//! means binding to that host & port, for the client means to connect to it.
//!
//! The client has an initial blocking time until it receives an assignment
//! message from the host, which is sent straight away after accepting the tcp
//! connection.
//!
//! If using compression and/or encryption, the clients and server must use the
//! same values.
//!
//! A resourece of type [`channel::Channel<T>`] will be present for each of the
//! connection, for each of the different message type `T`.
//!
//! Connecting or disconnecting is done by queuing a Command to bevy.
//! [`commands::SessionConnectCommand<T>`] will create a new session, like this
//! will create two different sessions each identified by the type of message
//! they are handling, and send and receive messages:
//!
//! ```
//! use bevy_app::*;
//! use bevy_ecs::prelude::*;
//! use bevy_connect::prelude::*;
//! use serde::{Serialize, Deserialize};
//!
//! App::new()
//!     .add_plugins(SessionPlugin::<Msg>::default())
//!     .add_systems(Startup, bevy_startup)
//!     .add_systems(Update, (receive_message, send_message)
//!         // The presence of the channel resource indicates an established
//!         // session (connected)
//!         .run_if(resource_exists::<Channel<Msg>>));
//!
//! #[derive(Serialize, Deserialize, Debug)]
//! struct Msg {
//!     data: String
//! }
//!
//! #[derive(Serialize, Deserialize, Debug)]
//! struct AnotherDifferentMessageType {
//!     audio_buffer: Vec<u8>
//! }
//!
//! fn bevy_startup(mut cmd: Commands) {
//!     cmd.queue(SessionConnectCommand::<Msg>::from_config(
//!         SessionConfig::Direct {
//!             addr: Some("127.0.0.1".parse().unwrap()),
//!             port: 6000,
//!             host: true, // use false for the client side
//!             compress: false, // use true to compress network data
//!             key: None, // Pass a key to use encryption
//!             options: Default::default(),
//!         },
//!     ));
//!     cmd.queue(SessionConnectCommand::<AnotherDifferentMessageType>::from_config(
//!         SessionConfig::Direct {
//!             addr: Some("127.0.0.1".parse().unwrap()),
//!             port: 7000,
//!             host: true, // use false for the client side
//!             compress: false, // use true to compress network data
//!             key: None, // Pass a key to use encryption
//!             options: Default::default(),
//!         },
//!     ));
//! }
//!
//! fn receive_message(mut events: EventReader<MessageReceivedEvent<Msg>>) {
//!     for e in events.read() {
//!         println!("{}", e.message.data);
//!     }
//! }
//!
//! fn send_message(mut channel: ResMut<Channel<Msg>>) {
//!     channel.broadcast(Msg{data: "hello".to_string()}.into());
//! }
//! ```
//!
//! Using the command [`commands::SessionDisconnectCommand<T>`] will disconnect
//! the session for the message type `T`. The resource [`channel::Channel<T>`]
//! will be gone once disconnection is complete.
//!
pub mod channel;
pub mod commands;
pub mod events;

use bevy_app::{App, Plugin, PostUpdate, PreUpdate};
use bevy_ecs::message::{Message as BevyMessage, MessageReader, MessageWriter};
use bevy_ecs::schedule::IntoScheduleConfigs;
use bevy_ecs::{
    schedule::common_conditions::resource_exists,
    system::{Commands, ResMut},
};

use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::{io, marker::PhantomData};
use thiserror::Error;
use tracing::error;

use crate::{
    channel::Channel,
    events::{
        ClientJoined, ClientLeft, MessageReceivedEvent, SessionConnectedEvent,
        SessionDisconnectedEvent,
    },
};

pub use tubes::ClientId;

pub mod prelude {
    pub use crate::SessionPlugin;
    pub use crate::channel::*;
    pub use crate::commands::*;
    pub use crate::events::*;
    pub use tubes::ClientId;
}

#[cfg(not(feature = "debug"))]
pub trait Message: Serialize + DeserializeOwned + Send + Sync + 'static {}
#[cfg(not(feature = "debug"))]
impl<T> Message for T where for<'a> T: Serialize + Deserialize<'a> + Send + Sync + 'static {}

#[cfg(feature = "debug")]
pub trait Message: std::fmt::Debug + Serialize + DeserializeOwned + Send + Sync + 'static {}
#[cfg(feature = "debug")]
impl<T> Message for T where
    for<'a> T: std::fmt::Debug + Serialize + Deserialize<'a> + Send + Sync + 'static
{
}

/// Add this plugin (unique by type T) to the app to first setup the channel
/// for the message type T.
///
/// Then use the [`commands::SessionConnectCommand<T>`] to connect.
///
/// A resource [`channel::Channel<T>`] will be present once the connection is
/// established. If the resource is gone, the connection was closed.
pub struct SessionPlugin<T: Message> {
    _t: PhantomData<T>,
}

impl<T: Message> Default for SessionPlugin<T> {
    fn default() -> Self {
        Self { _t: PhantomData }
    }
}

impl<T: Message> Plugin for SessionPlugin<T> {
    fn build(&self, app: &mut App) {
        app.add_message::<SessionConnectedEvent<T>>();
        app.add_message::<SessionDisconnectedEvent<T>>();
        app.add_message::<MessageReceivedEvent<T>>();
        app.add_message::<ClientJoined<T>>();
        app.add_message::<ClientLeft<T>>();
        app.add_message::<TerminateEvent<T>>();
        app.add_systems(
            PreUpdate,
            process_channel::<T>.run_if(resource_exists::<Channel<T>>),
        );
        app.add_systems(
            PostUpdate,
            terminate::<T>.run_if(resource_exists::<Channel<T>>),
        );
    }
}

fn process_channel<T: Message>(
    mut channel: ResMut<Channel<T>>,
    mut events: MessageWriter<MessageReceivedEvent<T>>,
    mut joinevent: MessageWriter<ClientJoined<T>>,
    mut leaveevent: MessageWriter<ClientLeft<T>>,
    mut termevent: MessageWriter<TerminateEvent<T>>,
) {
    if let Err(e) = channel.poll(&mut events, &mut joinevent, &mut leaveevent, &mut termevent) {
        error!("Error polling for messages: {e}");
    }
}

// Tell bevy to yank the channel out with this
// This happens as a last resort in case of broken pipes and the similar.
#[derive(BevyMessage)]
pub(crate) struct TerminateEvent<T: Message> {
    pub(crate) _t: PhantomData<T>,
}

#[allow(clippy::needless_pass_by_value)]
fn terminate<T: Message>(
    mut cmd: Commands,
    channel: ResMut<Channel<T>>,
    mut event: MessageReader<TerminateEvent<T>>,
) {
    // This should happen only for clients, as the host will keep hosting until
    // disconnected.
    if !channel.is_host() && event.read().next().is_some() {
        cmd.remove_resource::<Channel<T>>();
    }
}

#[derive(Error, Debug)]
pub(crate) enum Fault {
    // Instructs the upper layer to kill this connection
    // This is an unrecoverable state and the session must be created again
    // from the other end.
    #[error("Terminate connection")]
    Terminate(#[from] io::Error),
}