Skip to main content

bevy_networker_multiplayer/
netres.rs

1// SPDX-License-Identifier: MIT
2//! Bridge between Bevy ECS state and the underlying socket transport.
3//!
4//! `NetResource` owns the client/server socket state, packet queues, and the
5//! list of newly connected peers that need a full-world snapshot.
6use bevy::prelude::*;
7use bincode::config;
8use networker_rs::net::{EasySocketServer, Socket};
9use std::{
10    net::{SocketAddr, UdpSocket},
11    sync::{Arc, Mutex},
12    thread,
13    time::Duration,
14};
15
16use crate::netmsg::NetMessage;
17
18/// Internal mutable network state protected by a mutex.
19#[derive(Default)]
20struct NetState {
21    is_server: Option<bool>,
22    server_address: Option<SocketAddr>,
23    connections: Vec<Socket>,
24    new_connections: Vec<Socket>,
25    outbox: Vec<ReplicationPacket>,
26    inbox: Vec<ReplicationPacket>,
27    message_outbox: Vec<RawNetMessage>,
28    message_inbox: Vec<RawNetMessage>,
29}
30
31/// Bevy resource that owns the networking transport and packet queues.
32#[derive(Resource, Default)]
33pub struct NetResource {
34    state: Arc<Mutex<NetState>>,
35    server: Option<Arc<EasySocketServer>>,
36}
37
38impl NetResource {
39    /// Creates an empty networking resource.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Returns `true` when the resource is currently running as server.
45    pub fn is_server(&self) -> bool {
46        self.state.lock().unwrap().is_server == Some(true)
47    }
48
49    /// Connects to a server and binds packet handlers to the new socket.
50    pub fn join_server(&mut self, server_address: String) {
51        let server_address: SocketAddr = server_address
52            .parse()
53            .expect("server_address must be a valid socket address");
54
55        let udp_socket = UdpSocket::bind("0.0.0.0:0").expect("failed to open UDP socket");
56        let socket = Socket::new_udp_with_peer(Arc::new(udp_socket), server_address);
57        self.bind_client_socket(socket);
58
59        if let Some(socket) = self.state.lock().unwrap().connections.last().cloned() {
60            thread::spawn(move || {
61                for _ in 0..20 {
62                    socket.send("__networker_join__", []);
63                    thread::sleep(Duration::from_millis(100));
64                }
65            });
66        }
67
68        let mut state = self.state.lock().unwrap();
69        state.server_address = Some(server_address);
70        state.is_server = Some(false);
71    }
72
73    /// Starts the server and installs the connection callback.
74    pub fn start_server(&mut self, port: u16) {
75        let server = Arc::new(EasySocketServer::new());
76        let state = Arc::clone(&self.state);
77
78        server.on("connection", move |socket| {
79            let state_for_replication = Arc::clone(&state);
80            socket.on_bytes(
81                "replication",
82                move |payload| match bincode::serde::decode_from_slice::<ReplicationPacket, _>(
83                    payload,
84                    config::standard(),
85                ) {
86                    Ok((raw, _)) => state_for_replication.lock().unwrap().inbox.push(raw),
87                    Err(error) => {
88                        warn!(
89                            "server failed to decode replication packet bytes={} error={error}",
90                            payload.len()
91                        );
92                    }
93                },
94            );
95
96            let state_for_message = Arc::clone(&state);
97            socket.on_bytes("netmsg", move |payload| {
98                if let Ok((raw, _)) = bincode::serde::decode_from_slice::<RawNetMessage, _>(
99                    payload,
100                    config::standard(),
101                ) {
102                    state_for_message.lock().unwrap().message_inbox.push(raw);
103                }
104            });
105
106            let mut state = state.lock().unwrap();
107            state.new_connections.push(socket.clone());
108            state.connections.push(socket.clone());
109        });
110
111        let address = format!("0.0.0.0:{port}");
112        let server_for_thread = Arc::clone(&server);
113        thread::spawn(move || {
114            if let Err(error) = server_for_thread.listen_udp(&address) {
115                eprintln!("server failed to listen on {address}: {error}");
116            }
117        });
118
119        {
120            let mut state = self.state.lock().unwrap();
121            state.is_server = Some(true);
122        }
123
124        self.server = Some(server);
125    }
126
127    /// Adds a replication packet to the outgoing queue.
128    pub fn queue_packet(&mut self, packet: ReplicationPacket) {
129        self.state.lock().unwrap().outbox.push(packet);
130    }
131
132    /// Sends a replication packet immediately to one socket.
133    pub fn send_packet_to(&self, socket: &Socket, packet: ReplicationPacket) {
134        let reliable = packet.requires_reliable_delivery();
135        let bytes = bincode::serde::encode_to_vec(packet, config::standard())
136            .expect("failed to serialize replication packet");
137        socket.send_with_reliability("replication", bytes, reliable);
138    }
139
140    /// Injects a packet directly into the incoming queue.
141    pub fn inject_packet(&mut self, packet: ReplicationPacket) {
142        self.state.lock().unwrap().inbox.push(packet);
143    }
144
145    /// Queues a typed network message for broadcast.
146    pub fn queue_message<T: NetMessage>(&mut self, message: T) {
147        let bytes = bincode::serde::encode_to_vec(&message, config::standard())
148            .expect("failed to serialize network message");
149        self.state
150            .lock()
151            .unwrap()
152            .message_outbox
153            .push(RawNetMessage {
154                wire_id: T::WIRE_ID,
155                bytes,
156            });
157    }
158
159    /// Drains all queued outgoing replication packets.
160    pub fn drain_outbox(&mut self) -> Vec<ReplicationPacket> {
161        self.state.lock().unwrap().outbox.drain(..).collect()
162    }
163
164    /// Drains all received replication packets.
165    pub fn drain_inbox(&mut self) -> Vec<ReplicationPacket> {
166        self.state.lock().unwrap().inbox.drain(..).collect()
167    }
168
169    /// Drains sockets that connected since the last snapshot broadcast.
170    pub fn drain_new_connections(&mut self) -> Vec<Socket> {
171        self.state
172            .lock()
173            .unwrap()
174            .new_connections
175            .drain(..)
176            .collect()
177    }
178
179    /// Drains raw messages regardless of wire type.
180    pub fn drain_message_inbox(&mut self) -> Vec<RawNetMessage> {
181        self.state.lock().unwrap().message_inbox.drain(..).collect()
182    }
183
184    /// Extracts only messages of type `T`, leaving the rest queued.
185    pub fn drain_messages<T: NetMessage>(&mut self) -> Vec<T> {
186        let messages = self.drain_message_inbox();
187        let mut matched = Vec::new();
188        let mut unmatched = Vec::new();
189
190        for message in messages {
191            if message.wire_id == T::WIRE_ID {
192                if let Ok((message, _)) =
193                    bincode::serde::decode_from_slice::<T, _>(&message.bytes, config::standard())
194                {
195                    matched.push(message);
196                    continue;
197                }
198            }
199
200            unmatched.push(message);
201        }
202
203        if !unmatched.is_empty() {
204            self.state.lock().unwrap().message_inbox.extend(unmatched);
205        }
206
207        matched
208    }
209
210    /// Serializes and flushes all queued packets to connected clients.
211    pub fn flush_outbox(&mut self) {
212        let (packets, message_packets, connections) = {
213            let state = self.state.lock().unwrap();
214            if (state.outbox.is_empty() && state.message_outbox.is_empty())
215                || state.connections.is_empty()
216            {
217                return;
218            }
219
220            (
221                state.outbox.clone(),
222                state.message_outbox.clone(),
223                state.connections.clone(),
224            )
225        };
226
227        {
228            let mut state = self.state.lock().unwrap();
229            state.outbox.clear();
230            state.message_outbox.clear();
231        }
232
233        for packet in packets {
234            let reliable = packet.requires_reliable_delivery();
235            let bytes = bincode::serde::encode_to_vec(packet, config::standard())
236                .expect("failed to serialize replication packet");
237            for socket in &connections {
238                socket.send_with_reliability("replication", bytes.clone(), reliable);
239            }
240        }
241
242        for packet in message_packets {
243            for socket in &connections {
244                let bytes = bincode::serde::encode_to_vec(&packet, config::standard())
245                    .expect("failed to serialize network message");
246                socket.send("netmsg", bytes);
247            }
248        }
249    }
250
251    /// Placeholder hook for future socket polling implementations.
252    pub fn poll_incoming(&mut self) {}
253
254    /// Binds replication and message handlers to a socket.
255    fn bind_client_socket(&self, socket: Socket) {
256        let state_for_replication = Arc::clone(&self.state);
257        let socket_for_listener = socket.clone();
258        socket.on_bytes(
259            "replication",
260            move |payload| match bincode::serde::decode_from_slice::<ReplicationPacket, _>(
261                payload,
262                config::standard(),
263            ) {
264                Ok((raw, _)) => state_for_replication.lock().unwrap().inbox.push(raw),
265                Err(error) => {
266                    warn!(
267                        "client failed to decode replication packet bytes={} error={error}",
268                        payload.len()
269                    );
270                }
271            },
272        );
273
274        let state_for_message = Arc::clone(&self.state);
275        socket.on_bytes("netmsg", move |payload| {
276            if let Ok((raw, _)) =
277                bincode::serde::decode_from_slice::<RawNetMessage, _>(payload, config::standard())
278            {
279                state_for_message.lock().unwrap().message_inbox.push(raw);
280            }
281        });
282
283        self.state.lock().unwrap().connections.push(socket.clone());
284        thread::spawn(move || socket_for_listener.listen_udp());
285    }
286}
287
288/// Raw wire payload for a typed network message.
289#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
290pub struct RawNetMessage {
291    /// Stable wire identifier for the message type.
292    pub wire_id: u64,
293    /// Serialized message bytes.
294    pub bytes: Vec<u8>,
295}
296
297/// Packet types used by entity and resource replication.
298#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
299pub enum ReplicationPacket {
300    /// Spawn a replicated entity, optionally with a prefab.
301    SpawnEntity {
302        network_id: u64,
303        prefab_wire_id: u64,
304    },
305    /// Despawn a replicated entity.
306    DespawnEntity { network_id: u64 },
307    /// Update a replicated component on one entity.
308    UpdateComponent {
309        network_id: u64,
310        component_wire_id: u64,
311        bytes: Vec<u8>,
312    },
313    /// Replace a replicated resource snapshot.
314    UpdateResource {
315        resource_wire_id: u64,
316        bytes: Vec<u8>,
317    },
318}
319
320impl ReplicationPacket {
321    pub fn requires_reliable_delivery(&self) -> bool {
322        matches!(
323            self,
324            Self::SpawnEntity { .. } | Self::DespawnEntity { .. } | Self::UpdateResource { .. }
325        )
326    }
327
328    pub fn kind(&self) -> &'static str {
329        match self {
330            Self::SpawnEntity { .. } => "SpawnEntity",
331            Self::DespawnEntity { .. } => "DespawnEntity",
332            Self::UpdateComponent { .. } => "UpdateComponent",
333            Self::UpdateResource { .. } => "UpdateResource",
334        }
335    }
336}