bevy_connect/
lib.rs

1//! Channel communication over network.
2//! This crate uses TCP for communication.
3//!
4//! A new session can be created per every message type. Each of these message
5//! types is a connection, and muliple message types will create parallel
6//! connections.
7//!
8//! Connecions are established in direct mode via host+port. For the host it
9//! means binding to that host & port, for the client means to connect to it.
10//!
11//! The client has an initial blocking time until it receives an assignment
12//! message from the host, which is sent straight away after accepting the tcp
13//! connection.
14//!
15//! If using compression and/or encryption, the clients and server must use the
16//! same values.
17//!
18//! A resourece of type [`channel::Channel<T>`] will be present for each of the
19//! connection, for each of the different message type `T`.
20//!
21//! Connecting or disconnecting is done by queuing a Command to bevy.
22//! [`commands::SessionConnectCommand<T>`] will create a new session, like this
23//! will create two different sessions each identified by the type of message
24//! they are handling, and send and receive messages:
25//!
26//! ```
27//! use bevy_app::*;
28//! use bevy_ecs::prelude::*;
29//! use bevy_connect::prelude::*;
30//! use serde::{Serialize, Deserialize};
31//!
32//! App::new()
33//!     .add_plugins(SessionPlugin::<Msg>::default())
34//!     .add_systems(Startup, bevy_startup)
35//!     .add_systems(Update, (receive_message, send_message)
36//!         // The presence of the channel resource indicates an established
37//!         // session (connected)
38//!         .run_if(resource_exists::<Channel<Msg>>));
39//!
40//! #[derive(Serialize, Deserialize, Debug)]
41//! struct Msg {
42//!     data: String
43//! }
44//!
45//! #[derive(Serialize, Deserialize, Debug)]
46//! struct AnotherDifferentMessageType {
47//!     audio_buffer: Vec<u8>
48//! }
49//!
50//! fn bevy_startup(mut cmd: Commands) {
51//!     cmd.queue(SessionConnectCommand::<Msg>::from_config(
52//!         SessionConfig::Direct {
53//!             addr: Some("127.0.0.1".parse().unwrap()),
54//!             port: 6000,
55//!             host: true, // use false for the client side
56//!             compress: false, // use true to compress network data
57//!             key: None, // Pass a key to use encryption
58//!             options: Default::default(),
59//!         },
60//!     ));
61//!     cmd.queue(SessionConnectCommand::<AnotherDifferentMessageType>::from_config(
62//!         SessionConfig::Direct {
63//!             addr: Some("127.0.0.1".parse().unwrap()),
64//!             port: 7000,
65//!             host: true, // use false for the client side
66//!             compress: false, // use true to compress network data
67//!             key: None, // Pass a key to use encryption
68//!             options: Default::default(),
69//!         },
70//!     ));
71//! }
72//!
73//! fn receive_message(mut events: EventReader<MessageReceivedEvent<Msg>>) {
74//!     for e in events.read() {
75//!         println!("{}", e.message.data);
76//!     }
77//! }
78//!
79//! fn send_message(mut channel: ResMut<Channel<Msg>>) {
80//!     channel.broadcast(Msg{data: "hello".to_string()}.into());
81//! }
82//! ```
83//!
84//! Using the command [`commands::SessionDisconnectCommand<T>`] will disconnect
85//! the session for the message type `T`. The resource [`channel::Channel<T>`]
86//! will be gone once disconnection is complete.
87//!
88pub mod channel;
89pub mod commands;
90pub mod events;
91
92use bevy_app::{App, Plugin, PostUpdate, PreUpdate};
93use bevy_ecs::message::{Message as BevyMessage, MessageReader, MessageWriter};
94use bevy_ecs::schedule::IntoScheduleConfigs;
95use bevy_ecs::{
96    schedule::common_conditions::resource_exists,
97    system::{Commands, ResMut},
98};
99
100use serde::{Deserialize, Serialize, de::DeserializeOwned};
101use std::{io, marker::PhantomData};
102use thiserror::Error;
103use tracing::error;
104
105use crate::{
106    channel::Channel,
107    events::{
108        ClientJoined, ClientLeft, MessageReceivedEvent, SessionConnectedEvent,
109        SessionDisconnectedEvent,
110    },
111};
112
113pub use tubes::Uuid;
114
115pub mod prelude {
116    pub use crate::SessionPlugin;
117    pub use crate::channel::*;
118    pub use crate::commands::*;
119    pub use crate::events::*;
120    pub use tubes::Uuid;
121}
122
123#[cfg(not(feature = "debug"))]
124pub trait Message: Serialize + DeserializeOwned + Send + Sync + 'static {}
125#[cfg(not(feature = "debug"))]
126impl<T> Message for T where for<'a> T: Serialize + Deserialize<'a> + Send + Sync + 'static {}
127
128#[cfg(feature = "debug")]
129pub trait Message: std::fmt::Debug + Serialize + DeserializeOwned + Send + Sync + 'static {}
130#[cfg(feature = "debug")]
131impl<T> Message for T where
132    for<'a> T: std::fmt::Debug + Serialize + Deserialize<'a> + Send + Sync + 'static
133{
134}
135
136/// Add this plugin (unique by type T) to the app to first setup the channel
137/// for the message type T.
138///
139/// Then use the [`commands::SessionConnectCommand<T>`] to connect.
140///
141/// A resource [`channel::Channel<T>`] will be present once the connection is
142/// established. If the resource is gone, the connection was closed.
143pub struct SessionPlugin<T: Message> {
144    _t: PhantomData<T>,
145}
146
147impl<T: Message> Default for SessionPlugin<T> {
148    fn default() -> Self {
149        Self {
150            _t: Default::default(),
151        }
152    }
153}
154
155impl<T: Message> Plugin for SessionPlugin<T> {
156    fn build(&self, app: &mut App) {
157        app.add_message::<SessionConnectedEvent<T>>();
158        app.add_message::<SessionDisconnectedEvent<T>>();
159        app.add_message::<MessageReceivedEvent<T>>();
160        app.add_message::<ClientJoined<T>>();
161        app.add_message::<ClientLeft<T>>();
162        app.add_message::<TerminateEvent<T>>();
163        app.add_systems(
164            PreUpdate,
165            process_channel::<T>.run_if(resource_exists::<Channel<T>>),
166        );
167        app.add_systems(
168            PostUpdate,
169            terminate::<T>.run_if(resource_exists::<Channel<T>>),
170        );
171    }
172}
173
174fn process_channel<T: Message>(
175    mut channel: ResMut<Channel<T>>,
176    mut events: MessageWriter<MessageReceivedEvent<T>>,
177    mut joinevent: MessageWriter<ClientJoined<T>>,
178    mut leaveevent: MessageWriter<ClientLeft<T>>,
179    mut termevent: MessageWriter<TerminateEvent<T>>,
180) {
181    if let Err(e) = channel.poll(&mut events, &mut joinevent, &mut leaveevent, &mut termevent) {
182        error!("Error polling for messages: {e}");
183    }
184}
185
186// Tell bevy to yank the channel out with this
187// This happens as a last resort in case of broken pipes and the similar.
188#[derive(BevyMessage)]
189pub(crate) struct TerminateEvent<T: Message> {
190    pub(crate) _t: PhantomData<T>,
191}
192
193fn terminate<T: Message>(
194    mut cmd: Commands,
195    channel: ResMut<Channel<T>>,
196    mut event: MessageReader<TerminateEvent<T>>,
197) {
198    // This should happen only for clients, as the host will keep hosting until
199    // disconnected.
200    if !channel.is_host() && event.read().next().is_some() {
201        cmd.remove_resource::<Channel<T>>();
202    }
203}
204
205#[derive(Error, Debug)]
206pub(crate) enum Fault {
207    // Instructs the upper layer to kill this connection
208    // This is an unrecoverable state and the session must be created again
209    // from the other end.
210    #[error("Terminate connection")]
211    Terminate(#[from] io::Error),
212}