1use bevy::app::App;
3use bevy::prelude::*;
4use bevy_matchbox::matchbox_socket::{ChannelConfig, WebRtcSocket};
5use bevy_matchbox::prelude::PeerId;
6use bevy_matchbox::MatchboxSocket;
7use futures::channel::mpsc::SendError;
8use serde::{Deserialize, Serialize};
9#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
13pub enum Reliability {
14 Reliable = 0,
15 Unreliable = 1,
16 UnreliableOrdered = 2,
17}
18impl Reliability {
19 pub const RELIABILITY: [Reliability; 3] = [
20 Reliability::Reliable,
21 Reliability::Unreliable,
22 Reliability::UnreliableOrdered,
23 ];
24}
25
26pub trait NetworkedEvent {
27 const RELIABILITY: Reliability;
28 fn id(&self) -> PeerId;
29}
30
31#[derive(Serialize, Deserialize)]
32pub struct Message {
33 pub type_name: String,
34 pub content: Vec<u8>,
35}
36impl Message {
37 pub fn new<T: Serialize + TypePath>(content: &T) -> Self {
38 Message {
39 type_name: T::type_path().to_string(),
40 content: bincode::serialize(content).unwrap(),
41 }
42 }
43}
44
45pub trait SocketSendMessage {
49 fn receive_msg(&mut self, reliability: Reliability) -> impl Iterator<Item = (PeerId, Message)>;
50 fn send_msg_all<T: Serialize + TypePath + Event + NetworkedEvent>(
51 &mut self,
52 message: &T,
53 reliability: Reliability,
54 ) -> Result<(), SendError>;
55 fn send_msg<T: Serialize + TypePath + Event + NetworkedEvent>(
56 &mut self,
57 peer: PeerId,
58 message: &T,
59 reliability: Reliability,
60 ) -> Result<(), SendError>;
61}
62
63impl SocketSendMessage for WebRtcSocket {
64 fn receive_msg(&mut self, reliability: Reliability) -> impl Iterator<Item = (PeerId, Message)> {
65 self.channel_mut(reliability as usize)
66 .receive()
67 .into_iter()
68 .map(|(id, packet)| (id, bincode::deserialize(&packet).unwrap()))
69 }
70 fn send_msg_all<T: Serialize + TypePath + Event + NetworkedEvent>(
71 &mut self,
72 message: &T,
73 reliability: Reliability,
74 ) -> Result<(), SendError> {
75 let peers = self.connected_peers().collect::<Vec<_>>();
76 for peer in peers {
77 self.send_msg(peer, message, reliability)?;
78 }
79 Ok(())
80 }
81
82 fn send_msg<T: Serialize + TypePath + Event + NetworkedEvent>(
83 &mut self,
84 peer: PeerId,
85 message: &T,
86 reliability: Reliability,
87 ) -> Result<(), SendError> {
88 let msg = Message::new(message);
89 let msg = bincode::serialize(&msg).unwrap();
90 self.channel_mut(reliability as usize)
91 .try_send(msg.into(), peer)?;
92 Ok(())
93 }
94}
95
96#[derive(Default, Resource)]
100pub struct NetworkedMessages(
101 std::collections::HashMap<String, (fn(&mut World, &[u8]), fn(&mut World))>,
102);
103
104fn route_outgoing_messages<
105 T: NetworkedEvent + Event + Serialize + for<'a> Deserialize<'a> + TypePath,
106>(
107 world: &mut World,
108) {
109 world.resource_scope(|world, mut socket: Mut<MatchboxSocket>| {
110 let events = world.resource_mut::<Events<T>>();
111 let mut cursor = events.get_cursor();
112 for e in cursor.read(&events) {
113 if e.id() == socket.id().expect("Not connected") {
114 if let Err(err) = socket.send_msg_all(e, T::RELIABILITY) {
115 error!("Failed to send message: {:?}", err);
116 }
117 }
118 }
119 });
120}
121
122fn route_incoming_messages<T: NetworkedEvent + Event + for<'a> Deserialize<'a>>(
123 world: &mut World,
124 message: &[u8],
125) {
126 let e: T = bincode::deserialize(message).unwrap();
127 world.send_event(e);
128}
129
130fn route_messages(world: &mut World) {
131 if !world.contains_resource::<MatchboxSocket>() {
132 return;
133 }
134
135 world.resource_scope(|world, networked_messages: Mut<NetworkedMessages>| {
136 world.resource_scope(|world, mut socket: Mut<MatchboxSocket>| {
137 for reliability in Reliability::RELIABILITY {
138 for (_peer_id, msg) in socket.receive_msg(reliability) {
139 let func = networked_messages.0.get(&msg.type_name).unwrap();
140 func.0(world, &msg.content);
141 }
142 }
143 });
144 for (_, route_outgoing_messages) in networked_messages.0.values() {
145 route_outgoing_messages(world);
146 }
147 });
148 let _peers = world
149 .get_resource_mut::<MatchboxSocket>()
150 .unwrap()
151 .update_peers();
152}
153
154#[derive(Clone, Copy, Resource, Default)]
155pub struct LocalId(Option<PeerId>);
156impl LocalId {
157 pub fn get(&self) -> Option<PeerId> {
158 self.0
159 }
160}
161impl std::ops::Deref for LocalId {
162 type Target = Option<PeerId>;
163
164 fn deref(&self) -> &Self::Target {
165 &self.0
166 }
167}
168
169fn local_id_set(mut local_id: ResMut<LocalId>, matchbox_socket: Option<ResMut<MatchboxSocket>>) {
170 if let Some(mut matchbox_socket) = matchbox_socket {
171 local_id.0 = matchbox_socket.id();
172 } else {
173 local_id.0.take();
174 }
175}
176
177pub struct EvnetPlugin;
181
182impl Plugin for EvnetPlugin {
183 fn build(&self, app: &mut App) {
184 app.add_systems(Update, (local_id_set, route_messages).chain());
185 app.init_resource::<NetworkedMessages>();
186 app.init_resource::<LocalId>();
187 }
188}
189
190pub trait NetworkedAppExt {
191 fn register_networked_event<
192 T: NetworkedEvent + Event + for<'a> Deserialize<'a> + Serialize + TypePath,
193 >(
194 &mut self,
195 ) -> &mut Self;
196}
197
198impl NetworkedAppExt for App {
199 fn register_networked_event<
200 T: NetworkedEvent + Event + for<'a> Deserialize<'a> + Serialize + TypePath,
201 >(
202 &mut self,
203 ) -> &mut Self {
204 self.init_resource::<NetworkedMessages>();
205 let mut networked_messages = self.world_mut().resource_mut::<NetworkedMessages>();
206 networked_messages.0.insert(
207 T::type_path().to_string(),
208 (route_incoming_messages::<T>, route_outgoing_messages::<T>),
209 );
210 self
211 }
212}
213
214pub trait NetworkedCommandExt {
215 fn connect(&mut self, room: &str);
216}
217
218impl NetworkedCommandExt for Commands<'_, '_> {
219 fn connect(&mut self, room_url: &str) {
220 let matchbox = MatchboxSocket::from(
221 bevy_matchbox::matchbox_socket::WebRtcSocketBuilder::new(room_url)
223 .add_reliable_channel()
224 .add_unreliable_channel()
225 .add_channel(ChannelConfig {
226 ordered: true,
228 max_retransmits: Some(0),
229 })
230 .build(),
231 );
232 self.insert_resource(matchbox);
233 }
234}