Skip to main content

bevy_networker_multiplayer/
sync.rs

1// SPDX-License-Identifier: MIT
2//! Sync registration, snapshots, and packet application.
3//!
4//! This module is the bridge between Bevy ECS state and the wire format used by
5//! `NetResource`. Attribute macros submit metadata into `inventory`; this module
6//! collects that metadata, registers systems, and translates packets in both
7//! directions.
8use bevy::prelude::*;
9use bincode::config;
10use serde::{Serialize, de::DeserializeOwned};
11use std::collections::HashMap;
12
13use crate::{
14    netres::{NetResource, ReplicationPacket},
15    replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18/// Metadata for a syncable component type.
19#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21    /// Stable type path for diagnostics and registry lookup.
22    pub type_path: &'static str,
23    /// Stable wire identifier for the component type.
24    pub wire_id: u64,
25    /// Registration callback that installs Bevy systems.
26    pub register: fn(&mut App),
27    /// Applies a decoded component update to an entity.
28    pub apply: fn(&mut World, Entity, &[u8]),
29    /// Produces full-state snapshots for late joiners.
30    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33// `inventory` collection of all component registrations.
34inventory::collect!(ComponentRegistration);
35
36/// Metadata for a syncable resource type.
37#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39    /// Stable type path for diagnostics and registry lookup.
40    pub type_path: &'static str,
41    /// Stable wire identifier for the resource type.
42    pub wire_id: u64,
43    /// Registration callback that installs Bevy systems.
44    pub register: fn(&mut App),
45    /// Applies a decoded resource update to the world.
46    pub apply: fn(&mut World, &[u8]),
47    /// Produces a snapshot packet for the resource.
48    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51// `inventory` collection of all resource registrations.
52inventory::collect!(ResourceRegistration);
53
54/// Metadata for a prefab definition used to spawn visuals remotely.
55#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57    /// Stable type path for diagnostics and registry lookup.
58    pub type_path: &'static str,
59    /// Stable wire identifier for the prefab.
60    pub wire_id: u64,
61    /// Registration callback that can install any companion systems.
62    pub register: fn(&mut App),
63    /// Returns true when an entity should be tagged with this prefab.
64    pub matches: fn(&World, Entity) -> bool,
65    /// Applies the prefab's visual or structural components.
66    pub apply: fn(&mut World, Entity),
67}
68
69// `inventory` collection of all prefab registrations.
70inventory::collect!(PrefabRegistration);
71
72/// Runtime registry for component sync handlers.
73#[derive(Resource, Default)]
74pub struct SyncRegistry {
75    by_wire_id: HashMap<u64, ComponentRegistration>,
76    by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80    /// Registers one component handler.
81    pub fn register(&mut self, registration: ComponentRegistration) {
82        self.by_wire_id.insert(registration.wire_id, registration);
83        self.by_type_path
84            .insert(registration.type_path, registration);
85    }
86
87    /// Looks up a component handler by wire ID.
88    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
89        self.by_wire_id.get(&wire_id)
90    }
91}
92
93/// Runtime registry for resource sync handlers.
94#[derive(Resource, Default)]
95pub struct SyncResourceRegistry {
96    by_wire_id: HashMap<u64, ResourceRegistration>,
97    by_type_path: HashMap<&'static str, ResourceRegistration>,
98}
99
100impl SyncResourceRegistry {
101    /// Registers one resource handler.
102    pub fn register(&mut self, registration: ResourceRegistration) {
103        self.by_wire_id.insert(registration.wire_id, registration);
104        self.by_type_path
105            .insert(registration.type_path, registration);
106    }
107
108    /// Looks up a resource handler by wire ID.
109    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
110        self.by_wire_id.get(&wire_id)
111    }
112}
113
114/// Trait implemented by syncable components.
115pub trait SyncComponent:
116    Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
117{
118    /// Fully qualified type path.
119    const TYPE_PATH: &'static str;
120    /// Stable wire identifier.
121    const WIRE_ID: u64;
122}
123
124/// Trait implemented by syncable resources.
125pub trait SyncResource:
126    Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
127{
128    /// Fully qualified type path.
129    const TYPE_PATH: &'static str;
130    /// Stable wire identifier.
131    const WIRE_ID: u64;
132}
133
134/// Internal component used to remember which prefab a replicated entity uses.
135#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
136#[doc(hidden)]
137pub struct PrefabId(pub u64);
138
139/// Runtime registry for prefab handlers.
140#[derive(Resource, Default)]
141pub struct PrefabRegistry {
142    by_wire_id: HashMap<u64, PrefabRegistration>,
143    by_type_path: HashMap<&'static str, PrefabRegistration>,
144}
145
146impl PrefabRegistry {
147    /// Registers one prefab handler.
148    pub fn register(&mut self, registration: PrefabRegistration) {
149        self.by_wire_id.insert(registration.wire_id, registration);
150        self.by_type_path
151            .insert(registration.type_path, registration);
152    }
153
154    /// Looks up a prefab handler by wire ID.
155    pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
156        self.by_wire_id.get(&wire_id)
157    }
158
159    /// Iterates over all prefab registrations.
160    pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
161        self.by_wire_id.values()
162    }
163}
164
165/// Component packets that arrived before their entity spawn packet.
166#[derive(Resource, Default)]
167struct PendingComponentUpdates(Vec<ReplicationPacket>);
168
169/// Sequence tracking for lossy component update streams.
170#[derive(Debug, Resource)]
171#[doc(hidden)]
172pub struct ComponentUpdateSequenceState {
173    next_outgoing: u64,
174    latest_incoming: HashMap<(u64, u64), u64>,
175}
176
177impl Default for ComponentUpdateSequenceState {
178    fn default() -> Self {
179        Self {
180            next_outgoing: 1,
181            latest_incoming: HashMap::new(),
182        }
183    }
184}
185
186impl ComponentUpdateSequenceState {
187    fn next_outgoing(&mut self) -> u64 {
188        let sequence = self.next_outgoing;
189        self.next_outgoing = self.next_outgoing.saturating_add(1);
190        sequence
191    }
192
193    fn accept_incoming(&mut self, network_id: u64, component_wire_id: u64, sequence: u64) -> bool {
194        let key = (network_id, component_wire_id);
195        if self
196            .latest_incoming
197            .get(&key)
198            .is_some_and(|latest| *latest >= sequence)
199        {
200            return false;
201        }
202
203        self.latest_incoming.insert(key, sequence);
204        true
205    }
206
207    fn forget_network_id(&mut self, network_id: u64) {
208        self.latest_incoming
209            .retain(|(stored_network_id, _), _| *stored_network_id != network_id);
210    }
211}
212
213/// Runtime options for syncing a resource.
214#[derive(Clone, Copy, Debug)]
215pub struct SyncResourceSettings {
216    /// Minimum seconds between sends for this resource type.
217    ///
218    /// When a resource changes faster than this interval, the latest serialized
219    /// value is coalesced and sent once the interval has elapsed.
220    pub min_interval_seconds: f32,
221    /// Optional unchanged-state heartbeat.
222    ///
223    /// This is useful for lossy state streams. Reliable resources generally do
224    /// not need a heartbeat.
225    pub heartbeat_seconds: Option<f32>,
226}
227
228impl Default for SyncResourceSettings {
229    fn default() -> Self {
230        Self {
231            min_interval_seconds: 0.0,
232            heartbeat_seconds: None,
233        }
234    }
235}
236
237/// Per-system state used to dedupe and coalesce resource snapshots.
238#[derive(Debug)]
239pub struct SyncResourceSendState {
240    last_sent_bytes: Option<Vec<u8>>,
241    pending_bytes: Option<Vec<u8>>,
242    seconds_since_send: f32,
243}
244
245impl Default for SyncResourceSendState {
246    fn default() -> Self {
247        Self {
248            last_sent_bytes: None,
249            pending_bytes: None,
250            seconds_since_send: f32::INFINITY,
251        }
252    }
253}
254
255/// FNV-1a hash used to derive wire IDs from type paths.
256pub const fn hash_type_path(type_path: &str) -> u64 {
257    let bytes = type_path.as_bytes();
258    let mut hash: u64 = 0xcbf29ce484222325;
259    let mut index = 0;
260    while index < bytes.len() {
261        hash ^= bytes[index] as u64;
262        hash = hash.wrapping_mul(0x100000001b3);
263        index += 1;
264    }
265    hash
266}
267
268/// Collects all inventory registrations and stores them in runtime registries.
269pub fn register_sync_components(app: &mut App) {
270    app.init_resource::<SyncRegistry>();
271    app.init_resource::<SyncResourceRegistry>();
272    app.init_resource::<PrefabRegistry>();
273    app.init_resource::<PendingComponentUpdates>();
274    app.init_resource::<ComponentUpdateSequenceState>();
275
276    let mut registry = SyncRegistry::default();
277    for registration in inventory::iter::<ComponentRegistration> {
278        registry.register(*registration);
279        (registration.register)(app);
280    }
281
282    let mut resource_registry = SyncResourceRegistry::default();
283    for registration in inventory::iter::<ResourceRegistration> {
284        resource_registry.register(*registration);
285        (registration.register)(app);
286    }
287
288    let mut prefab_registry = PrefabRegistry::default();
289    for registration in inventory::iter::<PrefabRegistration> {
290        prefab_registry.register(*registration);
291        (registration.register)(app);
292    }
293
294    app.insert_resource(registry);
295    app.insert_resource(resource_registry);
296    app.insert_resource(prefab_registry);
297}
298
299/// Returns the next outgoing component update sequence.
300#[doc(hidden)]
301pub fn next_component_update_sequence(world: &mut World) -> u64 {
302    world
303        .resource_mut::<ComponentUpdateSequenceState>()
304        .next_outgoing()
305}
306
307/// Poll hook run before the main replication systems.
308pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
309    net.poll_incoming();
310}
311
312/// Flushes queued packets after the frame has finished mutating state.
313pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
314    net.flush_outbox();
315}
316
317/// Sends updated sync components for entities that are added or changed.
318pub fn sync_component<T: SyncComponent>(
319    mut net: ResMut<NetResource>,
320    mut sequence_state: ResMut<ComponentUpdateSequenceState>,
321    query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
322) {
323    if !net.is_server() {
324        return;
325    }
326
327    for (network_id, component) in &query {
328        let bytes = bincode::serde::encode_to_vec(component, config::standard())
329            .expect("failed to serialize sync component");
330        net.queue_packet(ReplicationPacket::UpdateComponent {
331            network_id: network_id.0,
332            component_wire_id: T::WIRE_ID,
333            sequence: sequence_state.next_outgoing(),
334            bytes,
335        });
336    }
337}
338
339/// Sends updated resources when they are added or changed.
340pub fn sync_resource<T: SyncResource>(
341    time: Res<Time>,
342    mut net: ResMut<NetResource>,
343    resource: Option<Res<T>>,
344    mut state: Local<SyncResourceSendState>,
345) {
346    sync_resource_with_settings::<T>(
347        &time,
348        &mut net,
349        resource,
350        &mut state,
351        SyncResourceSettings::default(),
352    );
353}
354
355/// Sends updated resources with byte-level dedupe and optional send coalescing.
356pub fn sync_resource_with_settings<T: SyncResource>(
357    time: &Time,
358    net: &mut NetResource,
359    resource: Option<Res<T>>,
360    state: &mut SyncResourceSendState,
361    settings: SyncResourceSettings,
362) {
363    let Some(resource) = resource else {
364        return;
365    };
366
367    if !net.is_server() {
368        return;
369    }
370
371    state.seconds_since_send += time.delta_secs();
372
373    if resource.is_added() || resource.is_changed() {
374        let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
375            .expect("failed to serialize sync resource");
376
377        if state.last_sent_bytes.as_ref() != Some(&bytes) {
378            state.pending_bytes = Some(bytes);
379        }
380    }
381
382    let heartbeat_due = settings
383        .heartbeat_seconds
384        .map(|seconds| state.seconds_since_send >= seconds.max(0.0))
385        .unwrap_or(false);
386
387    if state.pending_bytes.is_none() && heartbeat_due {
388        state.pending_bytes = state.last_sent_bytes.clone().or_else(|| {
389            Some(
390                bincode::serde::encode_to_vec(&*resource, config::standard())
391                    .expect("failed to serialize sync resource"),
392            )
393        });
394    }
395
396    let interval_ready = state.seconds_since_send >= settings.min_interval_seconds.max(0.0);
397    if state.pending_bytes.is_none() || (!interval_ready && !heartbeat_due) {
398        return;
399    }
400
401    let bytes = state
402        .pending_bytes
403        .take()
404        .expect("pending resource bytes should exist");
405    net.queue_packet(ReplicationPacket::UpdateResource {
406        resource_wire_id: T::WIRE_ID,
407        bytes: bytes.clone(),
408    });
409    state.last_sent_bytes = Some(bytes);
410    state.seconds_since_send = 0.0;
411}
412
413/// Applies a resource snapshot only when the serialized value actually changed.
414pub fn apply_resource_update<T: SyncResource>(world: &mut World, bytes: &[u8]) {
415    if let Some(existing) = world.get_resource::<T>() {
416        let existing_bytes = bincode::serde::encode_to_vec(existing, config::standard())
417            .expect("failed to serialize existing sync resource");
418        if existing_bytes == bytes {
419            return;
420        }
421    }
422
423    let (resource, _): (T, usize) = bincode::serde::decode_from_slice(bytes, config::standard())
424        .expect("failed to deserialize sync resource");
425    world.insert_resource(resource);
426}
427
428/// Sends the initial world state to newly connected clients.
429pub fn sync_new_connections(world: &mut World) {
430    let is_server = world.resource::<NetResource>().is_server();
431    if !is_server {
432        return;
433    }
434
435    let connections = {
436        let mut net = world.resource_mut::<NetResource>();
437        net.drain_new_connections()
438    };
439
440    if connections.is_empty() {
441        return;
442    }
443
444    let component_registrations: Vec<ComponentRegistration> =
445        inventory::iter::<ComponentRegistration>()
446            .copied()
447            .collect();
448    let resource_registrations: Vec<ResourceRegistration> =
449        inventory::iter::<ResourceRegistration>().copied().collect();
450
451    for socket in &connections {
452        let replicated_entities = {
453            let mut query =
454                world.query_filtered::<(Entity, &NetworkId, Option<&PrefabId>), With<Replicated>>();
455            query
456                .iter(world)
457                .map(|(entity, network_id, prefab_id)| {
458                    (
459                        entity,
460                        *network_id,
461                        prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0),
462                    )
463                })
464                .collect::<Vec<_>>()
465        };
466
467        let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
468            .iter()
469            .map(|registration| (registration.snapshot)(world))
470            .collect();
471        let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
472            .iter()
473            .map(|registration| (registration.snapshot)(world))
474            .collect();
475
476        {
477            let net = world.resource::<NetResource>();
478            for (_, network_id, prefab_wire_id) in &replicated_entities {
479                net.send_packet_to(
480                    socket,
481                    ReplicationPacket::SpawnEntity {
482                        network_id: network_id.0,
483                        prefab_wire_id: *prefab_wire_id,
484                    },
485                );
486            }
487
488            for packets in component_snapshots
489                .into_iter()
490                .chain(resource_snapshots.into_iter())
491            {
492                for packet in packets {
493                    net.send_packet_to(socket, packet);
494                }
495            }
496        };
497    }
498}
499
500/// Applies queued incoming replication packets to the local world.
501pub fn apply_incoming_packets(world: &mut World) {
502    let mut packets = {
503        let mut pending = world.resource_mut::<PendingComponentUpdates>();
504        pending.0.drain(..).collect::<Vec<_>>()
505    };
506
507    packets.extend({
508        let mut net = world.resource_mut::<NetResource>();
509        net.drain_inbox()
510    });
511
512    if packets.is_empty() {
513        return;
514    }
515
516    packets.sort_by_key(|packet| match packet {
517        ReplicationPacket::SpawnEntity { .. } => 0,
518        ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
519        ReplicationPacket::DespawnEntity { .. } => 2,
520    });
521
522    let mut deferred = Vec::new();
523
524    for packet in packets {
525        match packet {
526            ReplicationPacket::SpawnEntity {
527                network_id,
528                prefab_wire_id,
529            } => {
530                let entity = world
531                    .spawn_empty()
532                    .insert(Replicated)
533                    .insert(NetworkId(network_id))
534                    .id();
535                world
536                    .resource_mut::<EntityIndex>()
537                    .insert(NetworkId(network_id), entity);
538                if prefab_wire_id != 0 {
539                    if let Some(registration) = world
540                        .resource::<PrefabRegistry>()
541                        .by_wire_id(prefab_wire_id)
542                        .copied()
543                    {
544                        (registration.apply)(world, entity);
545                        world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
546                    }
547                }
548            }
549            ReplicationPacket::DespawnEntity { network_id } => {
550                let entity = world
551                    .resource::<EntityIndex>()
552                    .entity(NetworkId(network_id));
553                if let Some(entity) = entity {
554                    world.despawn(entity);
555                    world.resource_mut::<EntityIndex>().remove_entity(entity);
556                    world
557                        .resource_mut::<ComponentUpdateSequenceState>()
558                        .forget_network_id(network_id);
559                }
560            }
561            ReplicationPacket::UpdateComponent {
562                network_id,
563                component_wire_id,
564                sequence,
565                bytes,
566            } => {
567                let entity = world
568                    .resource::<EntityIndex>()
569                    .entity(NetworkId(network_id));
570                let registration = {
571                    world
572                        .resource::<SyncRegistry>()
573                        .by_wire_id(component_wire_id)
574                        .copied()
575                };
576
577                match (entity, registration) {
578                    (Some(entity), Some(registration)) => {
579                        let is_fresh = world
580                            .resource_mut::<ComponentUpdateSequenceState>()
581                            .accept_incoming(network_id, component_wire_id, sequence);
582                        if is_fresh {
583                            (registration.apply)(world, entity, &bytes);
584                        }
585                    }
586                    (None, Some(_)) => {
587                        deferred.push(ReplicationPacket::UpdateComponent {
588                            network_id,
589                            component_wire_id,
590                            sequence,
591                            bytes,
592                        });
593                    }
594                    _ => {}
595                }
596            }
597            ReplicationPacket::UpdateResource {
598                resource_wire_id,
599                bytes,
600            } => {
601                let registration = {
602                    world
603                        .resource::<SyncResourceRegistry>()
604                        .by_wire_id(resource_wire_id)
605                        .copied()
606                };
607
608                if let Some(registration) = registration {
609                    (registration.apply)(world, &bytes);
610                }
611            }
612        }
613    }
614
615    if !deferred.is_empty() {
616        world
617            .resource_mut::<PendingComponentUpdates>()
618            .0
619            .extend(deferred);
620    }
621}
622
623/// Assigns network IDs to newly replicated entities on the server.
624pub fn assign_network_ids(world: &mut World) {
625    let is_server = world.resource::<NetResource>().is_server();
626    if !is_server {
627        return;
628    }
629
630    let entities = {
631        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
632        query.iter(world).collect::<Vec<_>>()
633    };
634
635    for entity in entities {
636        let network_id = {
637            let mut next_id = world.resource_mut::<NextNetworkId>();
638            let network_id = NetworkId(next_id.0);
639            next_id.0 = next_id.0.saturating_add(1);
640            network_id
641        };
642
643        world.entity_mut(entity).insert(network_id);
644        world
645            .resource_mut::<EntityIndex>()
646            .insert(network_id, entity);
647        let prefab_wire_id = world
648            .entity(entity)
649            .get::<PrefabId>()
650            .map(|prefab_id| prefab_id.0)
651            .unwrap_or(0);
652        world
653            .resource_mut::<NetResource>()
654            .queue_packet(ReplicationPacket::SpawnEntity {
655                network_id: network_id.0,
656                prefab_wire_id,
657            });
658    }
659}
660
661/// Detects prefab matches on newly replicated entities.
662pub fn assign_prefab_ids(world: &mut World) {
663    let entities = {
664        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
665        query.iter(world).collect::<Vec<_>>()
666    };
667
668    let registrations: Vec<PrefabRegistration> =
669        inventory::iter::<PrefabRegistration>().copied().collect();
670
671    for entity in entities {
672        if world.entity(entity).contains::<PrefabId>() {
673            continue;
674        }
675
676        for registration in &registrations {
677            if (registration.matches)(world, entity) {
678                world
679                    .entity_mut(entity)
680                    .insert(PrefabId(registration.wire_id));
681                break;
682            }
683        }
684    }
685}
686
687/// Converts despawned replicated entities into network despawn packets.
688pub fn replicate_removals(
689    mut removed: RemovedComponents<Replicated>,
690    mut net: ResMut<NetResource>,
691    mut index: ResMut<EntityIndex>,
692) {
693    if !net.is_server() {
694        return;
695    }
696
697    for entity in removed.read() {
698        if let Some(network_id) = index.remove_entity(entity) {
699            net.queue_packet(ReplicationPacket::DespawnEntity {
700                network_id: network_id.0,
701            });
702        }
703    }
704}