bevy_networker_multiplayer/
sync.rs1use bevy::prelude::*;
9use bincode::config;
10use serde::{Serialize, de::DeserializeOwned};
11use std::collections::HashMap;
12
13use crate::{
14 netres::{NetResource, ReplicationPacket},
15 replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21 pub type_path: &'static str,
23 pub wire_id: u64,
25 pub register: fn(&mut App),
27 pub apply: fn(&mut World, Entity, &[u8]),
29 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33inventory::collect!(ComponentRegistration);
35
36#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39 pub type_path: &'static str,
41 pub wire_id: u64,
43 pub register: fn(&mut App),
45 pub apply: fn(&mut World, &[u8]),
47 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51inventory::collect!(ResourceRegistration);
53
54#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57 pub type_path: &'static str,
59 pub wire_id: u64,
61 pub register: fn(&mut App),
63 pub matches: fn(&World, Entity) -> bool,
65 pub apply: fn(&mut World, Entity),
67}
68
69inventory::collect!(PrefabRegistration);
71
72#[derive(Resource, Default)]
74pub struct SyncRegistry {
75 by_wire_id: HashMap<u64, ComponentRegistration>,
76 by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80 pub fn register(&mut self, registration: ComponentRegistration) {
82 self.by_wire_id.insert(registration.wire_id, registration);
83 self.by_type_path
84 .insert(registration.type_path, registration);
85 }
86
87 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
89 self.by_wire_id.get(&wire_id)
90 }
91}
92
93#[derive(Resource, Default)]
95pub struct SyncResourceRegistry {
96 by_wire_id: HashMap<u64, ResourceRegistration>,
97 by_type_path: HashMap<&'static str, ResourceRegistration>,
98}
99
100impl SyncResourceRegistry {
101 pub fn register(&mut self, registration: ResourceRegistration) {
103 self.by_wire_id.insert(registration.wire_id, registration);
104 self.by_type_path
105 .insert(registration.type_path, registration);
106 }
107
108 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
110 self.by_wire_id.get(&wire_id)
111 }
112}
113
114pub trait SyncComponent:
116 Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
117{
118 const TYPE_PATH: &'static str;
120 const WIRE_ID: u64;
122}
123
124pub trait SyncResource:
126 Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
127{
128 const TYPE_PATH: &'static str;
130 const WIRE_ID: u64;
132}
133
134#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
136#[doc(hidden)]
137pub struct PrefabId(pub u64);
138
139#[derive(Resource, Default)]
141pub struct PrefabRegistry {
142 by_wire_id: HashMap<u64, PrefabRegistration>,
143 by_type_path: HashMap<&'static str, PrefabRegistration>,
144}
145
146impl PrefabRegistry {
147 pub fn register(&mut self, registration: PrefabRegistration) {
149 self.by_wire_id.insert(registration.wire_id, registration);
150 self.by_type_path
151 .insert(registration.type_path, registration);
152 }
153
154 pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
156 self.by_wire_id.get(&wire_id)
157 }
158
159 pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
161 self.by_wire_id.values()
162 }
163}
164
165#[derive(Resource, Default)]
167struct PendingComponentUpdates(Vec<ReplicationPacket>);
168
169#[derive(Clone, Copy, Debug)]
171pub struct SyncResourceSettings {
172 pub min_interval_seconds: f32,
177 pub heartbeat_seconds: Option<f32>,
182}
183
184impl Default for SyncResourceSettings {
185 fn default() -> Self {
186 Self {
187 min_interval_seconds: 0.0,
188 heartbeat_seconds: None,
189 }
190 }
191}
192
193#[derive(Debug)]
195pub struct SyncResourceSendState {
196 last_sent_bytes: Option<Vec<u8>>,
197 pending_bytes: Option<Vec<u8>>,
198 seconds_since_send: f32,
199}
200
201impl Default for SyncResourceSendState {
202 fn default() -> Self {
203 Self {
204 last_sent_bytes: None,
205 pending_bytes: None,
206 seconds_since_send: f32::INFINITY,
207 }
208 }
209}
210
211pub const fn hash_type_path(type_path: &str) -> u64 {
213 let bytes = type_path.as_bytes();
214 let mut hash: u64 = 0xcbf29ce484222325;
215 let mut index = 0;
216 while index < bytes.len() {
217 hash ^= bytes[index] as u64;
218 hash = hash.wrapping_mul(0x100000001b3);
219 index += 1;
220 }
221 hash
222}
223
224pub fn register_sync_components(app: &mut App) {
226 app.init_resource::<SyncRegistry>();
227 app.init_resource::<SyncResourceRegistry>();
228 app.init_resource::<PrefabRegistry>();
229 app.init_resource::<PendingComponentUpdates>();
230
231 let mut registry = SyncRegistry::default();
232 for registration in inventory::iter::<ComponentRegistration> {
233 registry.register(*registration);
234 (registration.register)(app);
235 }
236
237 let mut resource_registry = SyncResourceRegistry::default();
238 for registration in inventory::iter::<ResourceRegistration> {
239 resource_registry.register(*registration);
240 (registration.register)(app);
241 }
242
243 let mut prefab_registry = PrefabRegistry::default();
244 for registration in inventory::iter::<PrefabRegistration> {
245 prefab_registry.register(*registration);
246 (registration.register)(app);
247 }
248
249 app.insert_resource(registry);
250 app.insert_resource(resource_registry);
251 app.insert_resource(prefab_registry);
252}
253
254pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
256 net.poll_incoming();
257}
258
259pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
261 net.flush_outbox();
262}
263
264pub fn sync_component<T: SyncComponent>(
266 mut net: ResMut<NetResource>,
267 query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
268) {
269 if !net.is_server() {
270 return;
271 }
272
273 for (network_id, component) in &query {
274 let bytes = bincode::serde::encode_to_vec(component, config::standard())
275 .expect("failed to serialize sync component");
276 net.queue_packet(ReplicationPacket::UpdateComponent {
277 network_id: network_id.0,
278 component_wire_id: T::WIRE_ID,
279 bytes,
280 });
281 }
282}
283
284pub fn sync_resource<T: SyncResource>(
286 time: Res<Time>,
287 mut net: ResMut<NetResource>,
288 resource: Option<Res<T>>,
289 mut state: Local<SyncResourceSendState>,
290) {
291 sync_resource_with_settings::<T>(
292 &time,
293 &mut net,
294 resource,
295 &mut state,
296 SyncResourceSettings::default(),
297 );
298}
299
300pub fn sync_resource_with_settings<T: SyncResource>(
302 time: &Time,
303 net: &mut NetResource,
304 resource: Option<Res<T>>,
305 state: &mut SyncResourceSendState,
306 settings: SyncResourceSettings,
307) {
308 let Some(resource) = resource else {
309 return;
310 };
311
312 if !net.is_server() {
313 return;
314 }
315
316 state.seconds_since_send += time.delta_secs();
317
318 if resource.is_added() || resource.is_changed() {
319 let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
320 .expect("failed to serialize sync resource");
321
322 if state.last_sent_bytes.as_ref() != Some(&bytes) {
323 state.pending_bytes = Some(bytes);
324 }
325 }
326
327 let heartbeat_due = settings
328 .heartbeat_seconds
329 .map(|seconds| state.seconds_since_send >= seconds.max(0.0))
330 .unwrap_or(false);
331
332 if state.pending_bytes.is_none() && heartbeat_due {
333 state.pending_bytes = state.last_sent_bytes.clone().or_else(|| {
334 Some(
335 bincode::serde::encode_to_vec(&*resource, config::standard())
336 .expect("failed to serialize sync resource"),
337 )
338 });
339 }
340
341 let interval_ready = state.seconds_since_send >= settings.min_interval_seconds.max(0.0);
342 if state.pending_bytes.is_none() || (!interval_ready && !heartbeat_due) {
343 return;
344 }
345
346 let bytes = state
347 .pending_bytes
348 .take()
349 .expect("pending resource bytes should exist");
350 net.queue_packet(ReplicationPacket::UpdateResource {
351 resource_wire_id: T::WIRE_ID,
352 bytes: bytes.clone(),
353 });
354 state.last_sent_bytes = Some(bytes);
355 state.seconds_since_send = 0.0;
356}
357
358pub fn apply_resource_update<T: SyncResource>(world: &mut World, bytes: &[u8]) {
360 if let Some(existing) = world.get_resource::<T>() {
361 let existing_bytes = bincode::serde::encode_to_vec(existing, config::standard())
362 .expect("failed to serialize existing sync resource");
363 if existing_bytes == bytes {
364 return;
365 }
366 }
367
368 let (resource, _): (T, usize) = bincode::serde::decode_from_slice(bytes, config::standard())
369 .expect("failed to deserialize sync resource");
370 world.insert_resource(resource);
371}
372
373pub fn sync_new_connections(world: &mut World) {
375 let is_server = world.resource::<NetResource>().is_server();
376 if !is_server {
377 return;
378 }
379
380 let connections = {
381 let mut net = world.resource_mut::<NetResource>();
382 net.drain_new_connections()
383 };
384
385 if connections.is_empty() {
386 return;
387 }
388
389 let component_registrations: Vec<ComponentRegistration> =
390 inventory::iter::<ComponentRegistration>()
391 .copied()
392 .collect();
393 let resource_registrations: Vec<ResourceRegistration> =
394 inventory::iter::<ResourceRegistration>().copied().collect();
395
396 for socket in &connections {
397 let replicated_entities = {
398 let mut query =
399 world.query_filtered::<(Entity, &NetworkId, Option<&PrefabId>), With<Replicated>>();
400 query
401 .iter(world)
402 .map(|(entity, network_id, prefab_id)| {
403 (
404 entity,
405 *network_id,
406 prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0),
407 )
408 })
409 .collect::<Vec<_>>()
410 };
411
412 let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
413 .iter()
414 .map(|registration| (registration.snapshot)(world))
415 .collect();
416 let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
417 .iter()
418 .map(|registration| (registration.snapshot)(world))
419 .collect();
420
421 {
422 let net = world.resource::<NetResource>();
423 for (_, network_id, prefab_wire_id) in &replicated_entities {
424 net.send_packet_to(
425 socket,
426 ReplicationPacket::SpawnEntity {
427 network_id: network_id.0,
428 prefab_wire_id: *prefab_wire_id,
429 },
430 );
431 }
432
433 for packets in component_snapshots
434 .into_iter()
435 .chain(resource_snapshots.into_iter())
436 {
437 for packet in packets {
438 net.send_packet_to(socket, packet);
439 }
440 }
441 };
442 }
443}
444
445pub fn apply_incoming_packets(world: &mut World) {
447 let mut packets = {
448 let mut pending = world.resource_mut::<PendingComponentUpdates>();
449 pending.0.drain(..).collect::<Vec<_>>()
450 };
451
452 packets.extend({
453 let mut net = world.resource_mut::<NetResource>();
454 net.drain_inbox()
455 });
456
457 if packets.is_empty() {
458 return;
459 }
460
461 packets.sort_by_key(|packet| match packet {
462 ReplicationPacket::SpawnEntity { .. } => 0,
463 ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
464 ReplicationPacket::DespawnEntity { .. } => 2,
465 });
466
467 let mut deferred = Vec::new();
468
469 for packet in packets {
470 match packet {
471 ReplicationPacket::SpawnEntity {
472 network_id,
473 prefab_wire_id,
474 } => {
475 let entity = world
476 .spawn_empty()
477 .insert(Replicated)
478 .insert(NetworkId(network_id))
479 .id();
480 world
481 .resource_mut::<EntityIndex>()
482 .insert(NetworkId(network_id), entity);
483 if prefab_wire_id != 0 {
484 if let Some(registration) = world
485 .resource::<PrefabRegistry>()
486 .by_wire_id(prefab_wire_id)
487 .copied()
488 {
489 (registration.apply)(world, entity);
490 world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
491 }
492 }
493 }
494 ReplicationPacket::DespawnEntity { network_id } => {
495 let entity = world
496 .resource::<EntityIndex>()
497 .entity(NetworkId(network_id));
498 if let Some(entity) = entity {
499 world.despawn(entity);
500 world.resource_mut::<EntityIndex>().remove_entity(entity);
501 }
502 }
503 ReplicationPacket::UpdateComponent {
504 network_id,
505 component_wire_id,
506 bytes,
507 } => {
508 let entity = world
509 .resource::<EntityIndex>()
510 .entity(NetworkId(network_id));
511 let registration = {
512 world
513 .resource::<SyncRegistry>()
514 .by_wire_id(component_wire_id)
515 .copied()
516 };
517
518 match (entity, registration) {
519 (Some(entity), Some(registration)) => {
520 (registration.apply)(world, entity, &bytes);
521 }
522 (None, Some(_)) => {
523 deferred.push(ReplicationPacket::UpdateComponent {
524 network_id,
525 component_wire_id,
526 bytes,
527 });
528 }
529 _ => {}
530 }
531 }
532 ReplicationPacket::UpdateResource {
533 resource_wire_id,
534 bytes,
535 } => {
536 let registration = {
537 world
538 .resource::<SyncResourceRegistry>()
539 .by_wire_id(resource_wire_id)
540 .copied()
541 };
542
543 if let Some(registration) = registration {
544 (registration.apply)(world, &bytes);
545 }
546 }
547 }
548 }
549
550 if !deferred.is_empty() {
551 world
552 .resource_mut::<PendingComponentUpdates>()
553 .0
554 .extend(deferred);
555 }
556}
557
558pub fn assign_network_ids(world: &mut World) {
560 let is_server = world.resource::<NetResource>().is_server();
561 if !is_server {
562 return;
563 }
564
565 let entities = {
566 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
567 query.iter(world).collect::<Vec<_>>()
568 };
569
570 for entity in entities {
571 let network_id = {
572 let mut next_id = world.resource_mut::<NextNetworkId>();
573 let network_id = NetworkId(next_id.0);
574 next_id.0 = next_id.0.saturating_add(1);
575 network_id
576 };
577
578 world.entity_mut(entity).insert(network_id);
579 world
580 .resource_mut::<EntityIndex>()
581 .insert(network_id, entity);
582 let prefab_wire_id = world
583 .entity(entity)
584 .get::<PrefabId>()
585 .map(|prefab_id| prefab_id.0)
586 .unwrap_or(0);
587 world
588 .resource_mut::<NetResource>()
589 .queue_packet(ReplicationPacket::SpawnEntity {
590 network_id: network_id.0,
591 prefab_wire_id,
592 });
593 }
594}
595
596pub fn assign_prefab_ids(world: &mut World) {
598 let entities = {
599 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
600 query.iter(world).collect::<Vec<_>>()
601 };
602
603 let registrations: Vec<PrefabRegistration> =
604 inventory::iter::<PrefabRegistration>().copied().collect();
605
606 for entity in entities {
607 if world.entity(entity).contains::<PrefabId>() {
608 continue;
609 }
610
611 for registration in ®istrations {
612 if (registration.matches)(world, entity) {
613 world
614 .entity_mut(entity)
615 .insert(PrefabId(registration.wire_id));
616 break;
617 }
618 }
619 }
620}
621
622pub fn replicate_removals(
624 mut removed: RemovedComponents<Replicated>,
625 mut net: ResMut<NetResource>,
626 mut index: ResMut<EntityIndex>,
627) {
628 if !net.is_server() {
629 return;
630 }
631
632 for entity in removed.read() {
633 if let Some(network_id) = index.remove_entity(entity) {
634 net.queue_packet(ReplicationPacket::DespawnEntity {
635 network_id: network_id.0,
636 });
637 }
638 }
639}