bevy_networker_multiplayer/
sync.rs1use bevy::prelude::*;
9use bincode::config;
10use serde::{de::DeserializeOwned, Serialize};
11use std::collections::HashMap;
12
13use crate::{
14 netres::{NetResource, ReplicationPacket},
15 replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21 pub type_path: &'static str,
23 pub wire_id: u64,
25 pub register: fn(&mut App),
27 pub apply: fn(&mut World, Entity, &[u8]),
29 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33inventory::collect!(ComponentRegistration);
35
36#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39 pub type_path: &'static str,
41 pub wire_id: u64,
43 pub register: fn(&mut App),
45 pub apply: fn(&mut World, &[u8]),
47 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51inventory::collect!(ResourceRegistration);
53
54#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57 pub type_path: &'static str,
59 pub wire_id: u64,
61 pub register: fn(&mut App),
63 pub matches: fn(&World, Entity) -> bool,
65 pub apply: fn(&mut World, Entity),
67}
68
69inventory::collect!(PrefabRegistration);
71
72#[derive(Resource, Default)]
74pub struct SyncRegistry {
75 by_wire_id: HashMap<u64, ComponentRegistration>,
76 by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80 pub fn register(&mut self, registration: ComponentRegistration) {
82 self.by_wire_id.insert(registration.wire_id, registration);
83 self.by_type_path.insert(registration.type_path, registration);
84 }
85
86 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
88 self.by_wire_id.get(&wire_id)
89 }
90}
91
92#[derive(Resource, Default)]
94pub struct SyncResourceRegistry {
95 by_wire_id: HashMap<u64, ResourceRegistration>,
96 by_type_path: HashMap<&'static str, ResourceRegistration>,
97}
98
99impl SyncResourceRegistry {
100 pub fn register(&mut self, registration: ResourceRegistration) {
102 self.by_wire_id.insert(registration.wire_id, registration);
103 self.by_type_path.insert(registration.type_path, registration);
104 }
105
106 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
108 self.by_wire_id.get(&wire_id)
109 }
110}
111
112pub trait SyncComponent:
114 Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
115{
116 const TYPE_PATH: &'static str;
118 const WIRE_ID: u64;
120}
121
122pub trait SyncResource:
124 Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
125{
126 const TYPE_PATH: &'static str;
128 const WIRE_ID: u64;
130}
131
132#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
134#[doc(hidden)]
135pub struct PrefabId(pub u64);
136
137#[derive(Resource, Default)]
139pub struct PrefabRegistry {
140 by_wire_id: HashMap<u64, PrefabRegistration>,
141 by_type_path: HashMap<&'static str, PrefabRegistration>,
142}
143
144impl PrefabRegistry {
145 pub fn register(&mut self, registration: PrefabRegistration) {
147 self.by_wire_id.insert(registration.wire_id, registration);
148 self.by_type_path.insert(registration.type_path, registration);
149 }
150
151 pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
153 self.by_wire_id.get(&wire_id)
154 }
155
156 pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
158 self.by_wire_id.values()
159 }
160}
161
162#[derive(Resource, Default)]
164struct PendingComponentUpdates(Vec<ReplicationPacket>);
165
166pub const fn hash_type_path(type_path: &str) -> u64 {
168 let bytes = type_path.as_bytes();
169 let mut hash: u64 = 0xcbf29ce484222325;
170 let mut index = 0;
171 while index < bytes.len() {
172 hash ^= bytes[index] as u64;
173 hash = hash.wrapping_mul(0x100000001b3);
174 index += 1;
175 }
176 hash
177}
178
179pub fn register_sync_components(app: &mut App) {
181 app.init_resource::<SyncRegistry>();
182 app.init_resource::<SyncResourceRegistry>();
183 app.init_resource::<PrefabRegistry>();
184 app.init_resource::<PendingComponentUpdates>();
185
186 let mut registry = SyncRegistry::default();
187 for registration in inventory::iter::<ComponentRegistration> {
188 registry.register(*registration);
189 (registration.register)(app);
190 }
191
192 let mut resource_registry = SyncResourceRegistry::default();
193 for registration in inventory::iter::<ResourceRegistration> {
194 resource_registry.register(*registration);
195 (registration.register)(app);
196 }
197
198 let mut prefab_registry = PrefabRegistry::default();
199 for registration in inventory::iter::<PrefabRegistration> {
200 prefab_registry.register(*registration);
201 (registration.register)(app);
202 }
203
204 app.insert_resource(registry);
205 app.insert_resource(resource_registry);
206 app.insert_resource(prefab_registry);
207}
208
209pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
211 net.poll_incoming();
212}
213
214pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
216 net.flush_outbox();
217}
218
219pub fn sync_component<T: SyncComponent>(
221 mut net: ResMut<NetResource>,
222 query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
223) {
224 if !net.is_server() {
225 return;
226 }
227
228 for (network_id, component) in &query {
229 let bytes = bincode::serde::encode_to_vec(component, config::standard())
230 .expect("failed to serialize sync component");
231 net.queue_packet(ReplicationPacket::UpdateComponent {
232 network_id: network_id.0,
233 component_wire_id: T::WIRE_ID,
234 bytes,
235 });
236 }
237}
238
239pub fn sync_resource<T: SyncResource>(
241 mut net: ResMut<NetResource>,
242 resource: Option<Res<T>>,
243) {
244 let Some(resource) = resource else {
245 return;
246 };
247
248 if !net.is_server() || !(resource.is_added() || resource.is_changed()) {
249 return;
250 }
251
252 let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
253 .expect("failed to serialize sync resource");
254 net.queue_packet(ReplicationPacket::UpdateResource {
255 resource_wire_id: T::WIRE_ID,
256 bytes,
257 });
258}
259
260pub fn sync_new_connections(world: &mut World) {
262 let is_server = world.resource::<NetResource>().is_server();
263 if !is_server {
264 return;
265 }
266
267 let connections = {
268 let mut net = world.resource_mut::<NetResource>();
269 net.drain_new_connections()
270 };
271
272 if connections.is_empty() {
273 return;
274 }
275
276 let component_registrations: Vec<ComponentRegistration> =
277 inventory::iter::<ComponentRegistration>().copied().collect();
278 let resource_registrations: Vec<ResourceRegistration> =
279 inventory::iter::<ResourceRegistration>().copied().collect();
280
281 for socket in &connections {
282 let replicated_entities = {
283 let mut query = world.query_filtered::<
284 (Entity, &NetworkId, Option<&PrefabId>),
285 With<Replicated>,
286 >();
287 query
288 .iter(world)
289 .map(|(entity, network_id, prefab_id)| {
290 (entity, *network_id, prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0))
291 })
292 .collect::<Vec<_>>()
293 };
294
295 let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
296 .iter()
297 .map(|registration| (registration.snapshot)(world))
298 .collect();
299 let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
300 .iter()
301 .map(|registration| (registration.snapshot)(world))
302 .collect();
303
304 {
305 let net = world.resource::<NetResource>();
306 for (_, network_id, prefab_wire_id) in &replicated_entities {
307 net.send_packet_to(
308 socket,
309 ReplicationPacket::SpawnEntity {
310 network_id: network_id.0,
311 prefab_wire_id: *prefab_wire_id,
312 },
313 );
314 }
315
316 for packets in component_snapshots.into_iter().chain(resource_snapshots.into_iter()) {
317 for packet in packets {
318 net.send_packet_to(socket, packet);
319 }
320 }
321 };
322 }
323}
324
325pub fn apply_incoming_packets(world: &mut World) {
327 let mut packets = {
328 let mut pending = world.resource_mut::<PendingComponentUpdates>();
329 pending.0.drain(..).collect::<Vec<_>>()
330 };
331
332 packets.extend({
333 let mut net = world.resource_mut::<NetResource>();
334 net.drain_inbox()
335 });
336
337 if packets.is_empty() {
338 return;
339 }
340
341 packets.sort_by_key(|packet| match packet {
342 ReplicationPacket::SpawnEntity { .. } => 0,
343 ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
344 ReplicationPacket::DespawnEntity { .. } => 2,
345 });
346
347 let mut deferred = Vec::new();
348
349 for packet in packets {
350 match packet {
351 ReplicationPacket::SpawnEntity {
352 network_id,
353 prefab_wire_id,
354 } => {
355 let entity = world
356 .spawn_empty()
357 .insert(Replicated)
358 .insert(NetworkId(network_id))
359 .id();
360 world.resource_mut::<EntityIndex>().insert(NetworkId(network_id), entity);
361 if prefab_wire_id != 0 {
362 if let Some(registration) = world
363 .resource::<PrefabRegistry>()
364 .by_wire_id(prefab_wire_id)
365 .copied()
366 {
367 (registration.apply)(world, entity);
368 world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
369 }
370 }
371 }
372 ReplicationPacket::DespawnEntity { network_id } => {
373 let entity = world.resource::<EntityIndex>().entity(NetworkId(network_id));
374 if let Some(entity) = entity {
375 world.despawn(entity);
376 world
377 .resource_mut::<EntityIndex>()
378 .remove_entity(entity);
379 }
380 }
381 ReplicationPacket::UpdateComponent {
382 network_id,
383 component_wire_id,
384 bytes,
385 } => {
386 let entity = world.resource::<EntityIndex>().entity(NetworkId(network_id));
387 let registration = {
388 world
389 .resource::<SyncRegistry>()
390 .by_wire_id(component_wire_id)
391 .copied()
392 };
393
394 match (entity, registration) {
395 (Some(entity), Some(registration)) => {
396 (registration.apply)(world, entity, &bytes);
397 }
398 (None, Some(_)) => {
399 deferred.push(ReplicationPacket::UpdateComponent {
400 network_id,
401 component_wire_id,
402 bytes,
403 });
404 }
405 _ => {}
406 }
407 }
408 ReplicationPacket::UpdateResource {
409 resource_wire_id,
410 bytes,
411 } => {
412 let registration = {
413 world
414 .resource::<SyncResourceRegistry>()
415 .by_wire_id(resource_wire_id)
416 .copied()
417 };
418
419 if let Some(registration) = registration {
420 (registration.apply)(world, &bytes);
421 }
422 }
423 }
424 }
425
426 if !deferred.is_empty() {
427 world
428 .resource_mut::<PendingComponentUpdates>()
429 .0
430 .extend(deferred);
431 }
432}
433
434pub fn assign_network_ids(world: &mut World) {
436 let is_server = world.resource::<NetResource>().is_server();
437 if !is_server {
438 return;
439 }
440
441 let entities = {
442 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
443 query.iter(world).collect::<Vec<_>>()
444 };
445
446 for entity in entities {
447 let network_id = {
448 let mut next_id = world.resource_mut::<NextNetworkId>();
449 let network_id = NetworkId(next_id.0);
450 next_id.0 = next_id.0.saturating_add(1);
451 network_id
452 };
453
454 world.entity_mut(entity).insert(network_id);
455 world
456 .resource_mut::<EntityIndex>()
457 .insert(network_id, entity);
458 let prefab_wire_id = world
459 .entity(entity)
460 .get::<PrefabId>()
461 .map(|prefab_id| prefab_id.0)
462 .unwrap_or(0);
463 world.resource_mut::<NetResource>().queue_packet(
464 ReplicationPacket::SpawnEntity {
465 network_id: network_id.0,
466 prefab_wire_id,
467 },
468 );
469 }
470}
471
472pub fn assign_prefab_ids(world: &mut World) {
474 let entities = {
475 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
476 query.iter(world).collect::<Vec<_>>()
477 };
478
479 let registrations: Vec<PrefabRegistration> = inventory::iter::<PrefabRegistration>().copied().collect();
480
481 for entity in entities {
482 if world.entity(entity).contains::<PrefabId>() {
483 continue;
484 }
485
486 for registration in ®istrations {
487 if (registration.matches)(world, entity) {
488 world.entity_mut(entity).insert(PrefabId(registration.wire_id));
489 break;
490 }
491 }
492 }
493}
494
495pub fn replicate_removals(
497 mut removed: RemovedComponents<Replicated>,
498 mut net: ResMut<NetResource>,
499 mut index: ResMut<EntityIndex>,
500) {
501 if !net.is_server() {
502 return;
503 }
504
505 for entity in removed.read() {
506 if let Some(network_id) = index.remove_entity(entity) {
507 net.queue_packet(ReplicationPacket::DespawnEntity {
508 network_id: network_id.0,
509 });
510 }
511 }
512}