bevy_connect/lib.rs
1//! Channel communication over network.
2//! This crate uses TCP for communication.
3//!
4//! A new session can be created per every message type. Each of these message
5//! types is a connection, and muliple message types will create parallel
6//! connections.
7//!
8//! Connecions are established in direct mode via host+port. For the host it
9//! means binding to that host & port, for the client means to connect to it.
10//!
11//! The client has an initial blocking time until it receives an assignment
12//! message from the host, which is sent straight away after accepting the tcp
13//! connection.
14//!
15//! If using compression and/or encryption, the clients and server must use the
16//! same values.
17//!
18//! A resourece of type [`channel::Channel<T>`] will be present for each of the
19//! connection, for each of the different message type `T`.
20//!
21//! Connecting or disconnecting is done by queuing a Command to bevy.
22//! [`commands::SessionConnectCommand<T>`] will create a new session, like this
23//! will create two different sessions each identified by the type of message
24//! they are handling, and send and receive messages:
25//!
26//! ```
27//! use bevy_app::*;
28//! use bevy_ecs::prelude::*;
29//! use bevy_connect::prelude::*;
30//! use serde::{Serialize, Deserialize};
31//!
32//! App::new()
33//! .add_plugins(SessionPlugin::<Msg>::default())
34//! .add_systems(Startup, bevy_startup)
35//! .add_systems(Update, (receive_message, send_message)
36//! // The presence of the channel resource indicates an established
37//! // session (connected)
38//! .run_if(resource_exists::<Channel<Msg>>));
39//!
40//! #[derive(Serialize, Deserialize, Debug)]
41//! struct Msg {
42//! data: String
43//! }
44//!
45//! #[derive(Serialize, Deserialize, Debug)]
46//! struct AnotherDifferentMessageType {
47//! audio_buffer: Vec<u8>
48//! }
49//!
50//! fn bevy_startup(mut cmd: Commands) {
51//! cmd.queue(SessionConnectCommand::<Msg>::from_config(
52//! SessionConfig::Direct {
53//! addr: Some("127.0.0.1".parse().unwrap()),
54//! port: 6000,
55//! host: true, // use false for the client side
56//! compress: false, // use true to compress network data
57//! key: None, // Pass a key to use encryption
58//! options: Default::default(),
59//! },
60//! ));
61//! cmd.queue(SessionConnectCommand::<AnotherDifferentMessageType>::from_config(
62//! SessionConfig::Direct {
63//! addr: Some("127.0.0.1".parse().unwrap()),
64//! port: 7000,
65//! host: true, // use false for the client side
66//! compress: false, // use true to compress network data
67//! key: None, // Pass a key to use encryption
68//! options: Default::default(),
69//! },
70//! ));
71//! }
72//!
73//! fn receive_message(mut events: EventReader<MessageReceivedEvent<Msg>>) {
74//! for e in events.read() {
75//! println!("{}", e.message.data);
76//! }
77//! }
78//!
79//! fn send_message(mut channel: ResMut<Channel<Msg>>) {
80//! channel.broadcast(Msg{data: "hello".to_string()}.into());
81//! }
82//! ```
83//!
84//! Using the command [`commands::SessionDisconnectCommand<T>`] will disconnect
85//! the session for the message type `T`. The resource [`channel::Channel<T>`]
86//! will be gone once disconnection is complete.
87//!
88pub mod channel;
89pub mod commands;
90pub mod events;
91
92use bevy_app::{App, Plugin, PostUpdate, PreUpdate};
93use bevy_ecs::message::{Message as BevyMessage, MessageReader, MessageWriter};
94use bevy_ecs::schedule::IntoScheduleConfigs;
95use bevy_ecs::{
96 schedule::common_conditions::resource_exists,
97 system::{Commands, ResMut},
98};
99
100use serde::{Deserialize, Serialize, de::DeserializeOwned};
101use std::{io, marker::PhantomData};
102use thiserror::Error;
103use tracing::error;
104
105use crate::{
106 channel::Channel,
107 events::{
108 ClientJoined, ClientLeft, MessageReceivedEvent, SessionConnectedEvent,
109 SessionDisconnectedEvent,
110 },
111};
112
113pub use tubes::Uuid;
114
115pub mod prelude {
116 pub use crate::SessionPlugin;
117 pub use crate::channel::*;
118 pub use crate::commands::*;
119 pub use crate::events::*;
120 pub use tubes::Uuid;
121}
122
123#[cfg(not(feature = "debug"))]
124pub trait Message: Serialize + DeserializeOwned + Send + Sync + 'static {}
125#[cfg(not(feature = "debug"))]
126impl<T> Message for T where for<'a> T: Serialize + Deserialize<'a> + Send + Sync + 'static {}
127
128#[cfg(feature = "debug")]
129pub trait Message: std::fmt::Debug + Serialize + DeserializeOwned + Send + Sync + 'static {}
130#[cfg(feature = "debug")]
131impl<T> Message for T where
132 for<'a> T: std::fmt::Debug + Serialize + Deserialize<'a> + Send + Sync + 'static
133{
134}
135
136/// Add this plugin (unique by type T) to the app to first setup the channel
137/// for the message type T.
138///
139/// Then use the [`commands::SessionConnectCommand<T>`] to connect.
140///
141/// A resource [`channel::Channel<T>`] will be present once the connection is
142/// established. If the resource is gone, the connection was closed.
143pub struct SessionPlugin<T: Message> {
144 _t: PhantomData<T>,
145}
146
147impl<T: Message> Default for SessionPlugin<T> {
148 fn default() -> Self {
149 Self {
150 _t: Default::default(),
151 }
152 }
153}
154
155impl<T: Message> Plugin for SessionPlugin<T> {
156 fn build(&self, app: &mut App) {
157 app.add_message::<SessionConnectedEvent<T>>();
158 app.add_message::<SessionDisconnectedEvent<T>>();
159 app.add_message::<MessageReceivedEvent<T>>();
160 app.add_message::<ClientJoined<T>>();
161 app.add_message::<ClientLeft<T>>();
162 app.add_message::<TerminateEvent<T>>();
163 app.add_systems(
164 PreUpdate,
165 process_channel::<T>.run_if(resource_exists::<Channel<T>>),
166 );
167 app.add_systems(
168 PostUpdate,
169 terminate::<T>.run_if(resource_exists::<Channel<T>>),
170 );
171 }
172}
173
174fn process_channel<T: Message>(
175 mut channel: ResMut<Channel<T>>,
176 mut events: MessageWriter<MessageReceivedEvent<T>>,
177 mut joinevent: MessageWriter<ClientJoined<T>>,
178 mut leaveevent: MessageWriter<ClientLeft<T>>,
179 mut termevent: MessageWriter<TerminateEvent<T>>,
180) {
181 if let Err(e) = channel.poll(&mut events, &mut joinevent, &mut leaveevent, &mut termevent) {
182 error!("Error polling for messages: {e}");
183 }
184}
185
186// Tell bevy to yank the channel out with this
187// This happens as a last resort in case of broken pipes and the similar.
188#[derive(BevyMessage)]
189pub(crate) struct TerminateEvent<T: Message> {
190 pub(crate) _t: PhantomData<T>,
191}
192
193fn terminate<T: Message>(
194 mut cmd: Commands,
195 channel: ResMut<Channel<T>>,
196 mut event: MessageReader<TerminateEvent<T>>,
197) {
198 // This should happen only for clients, as the host will keep hosting until
199 // disconnected.
200 if !channel.is_host() && event.read().next().is_some() {
201 cmd.remove_resource::<Channel<T>>();
202 }
203}
204
205#[derive(Error, Debug)]
206pub(crate) enum Fault {
207 // Instructs the upper layer to kill this connection
208 // This is an unrecoverable state and the session must be created again
209 // from the other end.
210 #[error("Terminate connection")]
211 Terminate(#[from] io::Error),
212}