bevy_networker_multiplayer/
netres.rs1use bevy::prelude::*;
7use bincode::config;
8use networker_rs::net::{EasySocketServer, Socket};
9use std::{
10 net::{SocketAddr, UdpSocket},
11 sync::{Arc, Mutex},
12 thread,
13 time::Duration,
14};
15
16use crate::netmsg::NetMessage;
17
18#[derive(Default)]
20struct NetState {
21 is_server: Option<bool>,
22 server_address: Option<SocketAddr>,
23 connections: Vec<Socket>,
24 new_connections: Vec<Socket>,
25 outbox: Vec<ReplicationPacket>,
26 inbox: Vec<ReplicationPacket>,
27 message_outbox: Vec<RawNetMessage>,
28 message_inbox: Vec<RawNetMessage>,
29}
30
31#[derive(Resource, Default)]
33pub struct NetResource {
34 state: Arc<Mutex<NetState>>,
35 server: Option<Arc<EasySocketServer>>,
36}
37
38impl NetResource {
39 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn is_server(&self) -> bool {
46 self.state.lock().unwrap().is_server == Some(true)
47 }
48
49 pub fn join_server(&mut self, server_address: String) {
51 let server_address: SocketAddr = server_address
52 .parse()
53 .expect("server_address must be a valid socket address");
54
55 let udp_socket = UdpSocket::bind("0.0.0.0:0").expect("failed to open UDP socket");
56 let socket = Socket::new_udp_with_peer(Arc::new(udp_socket), server_address);
57 self.bind_client_socket(socket);
58
59 if let Some(socket) = self.state.lock().unwrap().connections.last().cloned() {
60 thread::spawn(move || {
61 for _ in 0..20 {
62 socket.send("__networker_join__", []);
63 thread::sleep(Duration::from_millis(100));
64 }
65 });
66 }
67
68 let mut state = self.state.lock().unwrap();
69 state.server_address = Some(server_address);
70 state.is_server = Some(false);
71 }
72
73 pub fn start_server(&mut self, port: u16) {
75 let server = Arc::new(EasySocketServer::new());
76 let state = Arc::clone(&self.state);
77
78 server.on("connection", move |socket| {
79 let state_for_replication = Arc::clone(&state);
80 socket.on_bytes(
81 "replication",
82 move |payload| match bincode::serde::decode_from_slice::<ReplicationPacket, _>(
83 payload,
84 config::standard(),
85 ) {
86 Ok((raw, _)) => state_for_replication.lock().unwrap().inbox.push(raw),
87 Err(error) => {
88 warn!(
89 "server failed to decode replication packet bytes={} error={error}",
90 payload.len()
91 );
92 }
93 },
94 );
95
96 let state_for_message = Arc::clone(&state);
97 socket.on_bytes("netmsg", move |payload| {
98 if let Ok((raw, _)) = bincode::serde::decode_from_slice::<RawNetMessage, _>(
99 payload,
100 config::standard(),
101 ) {
102 state_for_message.lock().unwrap().message_inbox.push(raw);
103 }
104 });
105
106 let mut state = state.lock().unwrap();
107 state.new_connections.push(socket.clone());
108 state.connections.push(socket.clone());
109 });
110
111 let address = format!("0.0.0.0:{port}");
112 let server_for_thread = Arc::clone(&server);
113 thread::spawn(move || {
114 if let Err(error) = server_for_thread.listen_udp(&address) {
115 eprintln!("server failed to listen on {address}: {error}");
116 }
117 });
118
119 {
120 let mut state = self.state.lock().unwrap();
121 state.is_server = Some(true);
122 }
123
124 self.server = Some(server);
125 }
126
127 pub fn queue_packet(&mut self, packet: ReplicationPacket) {
129 self.state.lock().unwrap().outbox.push(packet);
130 }
131
132 pub fn send_packet_to(&self, socket: &Socket, packet: ReplicationPacket) {
134 let reliable = packet.requires_reliable_delivery();
135 let bytes = bincode::serde::encode_to_vec(packet, config::standard())
136 .expect("failed to serialize replication packet");
137 socket.send_with_reliability("replication", bytes, reliable);
138 }
139
140 pub fn inject_packet(&mut self, packet: ReplicationPacket) {
142 self.state.lock().unwrap().inbox.push(packet);
143 }
144
145 pub fn queue_message<T: NetMessage>(&mut self, message: T) {
147 let bytes = bincode::serde::encode_to_vec(&message, config::standard())
148 .expect("failed to serialize network message");
149 self.state
150 .lock()
151 .unwrap()
152 .message_outbox
153 .push(RawNetMessage {
154 wire_id: T::WIRE_ID,
155 bytes,
156 });
157 }
158
159 pub fn drain_outbox(&mut self) -> Vec<ReplicationPacket> {
161 self.state.lock().unwrap().outbox.drain(..).collect()
162 }
163
164 pub fn drain_inbox(&mut self) -> Vec<ReplicationPacket> {
166 self.state.lock().unwrap().inbox.drain(..).collect()
167 }
168
169 pub fn drain_new_connections(&mut self) -> Vec<Socket> {
171 self.state
172 .lock()
173 .unwrap()
174 .new_connections
175 .drain(..)
176 .collect()
177 }
178
179 pub fn drain_message_inbox(&mut self) -> Vec<RawNetMessage> {
181 self.state.lock().unwrap().message_inbox.drain(..).collect()
182 }
183
184 pub fn drain_messages<T: NetMessage>(&mut self) -> Vec<T> {
186 let messages = self.drain_message_inbox();
187 let mut matched = Vec::new();
188 let mut unmatched = Vec::new();
189
190 for message in messages {
191 if message.wire_id == T::WIRE_ID {
192 if let Ok((message, _)) =
193 bincode::serde::decode_from_slice::<T, _>(&message.bytes, config::standard())
194 {
195 matched.push(message);
196 continue;
197 }
198 }
199
200 unmatched.push(message);
201 }
202
203 if !unmatched.is_empty() {
204 self.state.lock().unwrap().message_inbox.extend(unmatched);
205 }
206
207 matched
208 }
209
210 pub fn flush_outbox(&mut self) {
212 let (packets, message_packets, connections) = {
213 let state = self.state.lock().unwrap();
214 if (state.outbox.is_empty() && state.message_outbox.is_empty())
215 || state.connections.is_empty()
216 {
217 return;
218 }
219
220 (
221 state.outbox.clone(),
222 state.message_outbox.clone(),
223 state.connections.clone(),
224 )
225 };
226
227 {
228 let mut state = self.state.lock().unwrap();
229 state.outbox.clear();
230 state.message_outbox.clear();
231 }
232
233 for packet in packets {
234 let reliable = packet.requires_reliable_delivery();
235 let bytes = bincode::serde::encode_to_vec(packet, config::standard())
236 .expect("failed to serialize replication packet");
237 for socket in &connections {
238 socket.send_with_reliability("replication", bytes.clone(), reliable);
239 }
240 }
241
242 for packet in message_packets {
243 for socket in &connections {
244 let bytes = bincode::serde::encode_to_vec(&packet, config::standard())
245 .expect("failed to serialize network message");
246 socket.send("netmsg", bytes);
247 }
248 }
249 }
250
251 pub fn poll_incoming(&mut self) {}
253
254 fn bind_client_socket(&self, socket: Socket) {
256 let state_for_replication = Arc::clone(&self.state);
257 let socket_for_listener = socket.clone();
258 socket.on_bytes(
259 "replication",
260 move |payload| match bincode::serde::decode_from_slice::<ReplicationPacket, _>(
261 payload,
262 config::standard(),
263 ) {
264 Ok((raw, _)) => state_for_replication.lock().unwrap().inbox.push(raw),
265 Err(error) => {
266 warn!(
267 "client failed to decode replication packet bytes={} error={error}",
268 payload.len()
269 );
270 }
271 },
272 );
273
274 let state_for_message = Arc::clone(&self.state);
275 socket.on_bytes("netmsg", move |payload| {
276 if let Ok((raw, _)) =
277 bincode::serde::decode_from_slice::<RawNetMessage, _>(payload, config::standard())
278 {
279 state_for_message.lock().unwrap().message_inbox.push(raw);
280 }
281 });
282
283 self.state.lock().unwrap().connections.push(socket.clone());
284 thread::spawn(move || socket_for_listener.listen_udp());
285 }
286}
287
288#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
290pub struct RawNetMessage {
291 pub wire_id: u64,
293 pub bytes: Vec<u8>,
295}
296
297#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
299pub enum ReplicationPacket {
300 SpawnEntity {
302 network_id: u64,
303 prefab_wire_id: u64,
304 },
305 DespawnEntity { network_id: u64 },
307 UpdateComponent {
309 network_id: u64,
310 component_wire_id: u64,
311 bytes: Vec<u8>,
312 },
313 UpdateResource {
315 resource_wire_id: u64,
316 bytes: Vec<u8>,
317 },
318}
319
320impl ReplicationPacket {
321 pub fn requires_reliable_delivery(&self) -> bool {
322 matches!(
323 self,
324 Self::SpawnEntity { .. } | Self::DespawnEntity { .. } | Self::UpdateResource { .. }
325 )
326 }
327
328 pub fn kind(&self) -> &'static str {
329 match self {
330 Self::SpawnEntity { .. } => "SpawnEntity",
331 Self::DespawnEntity { .. } => "DespawnEntity",
332 Self::UpdateComponent { .. } => "UpdateComponent",
333 Self::UpdateResource { .. } => "UpdateResource",
334 }
335 }
336}