Skip to main content

bevy_networker_multiplayer/
sync.rs

1// SPDX-License-Identifier: MIT
2//! Sync registration, snapshots, and packet application.
3//!
4//! This module is the bridge between Bevy ECS state and the wire format used by
5//! `NetResource`. Attribute macros submit metadata into `inventory`; this module
6//! collects that metadata, registers systems, and translates packets in both
7//! directions.
8use bevy::prelude::*;
9use bincode::config;
10use serde::{Serialize, de::DeserializeOwned};
11use std::collections::HashMap;
12
13use crate::{
14    netres::{NetResource, ReplicationPacket},
15    replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18/// Metadata for a syncable component type.
19#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21    /// Stable type path for diagnostics and registry lookup.
22    pub type_path: &'static str,
23    /// Stable wire identifier for the component type.
24    pub wire_id: u64,
25    /// Registration callback that installs Bevy systems.
26    pub register: fn(&mut App),
27    /// Applies a decoded component update to an entity.
28    pub apply: fn(&mut World, Entity, &[u8]),
29    /// Produces full-state snapshots for late joiners.
30    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33// `inventory` collection of all component registrations.
34inventory::collect!(ComponentRegistration);
35
36/// Metadata for a syncable resource type.
37#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39    /// Stable type path for diagnostics and registry lookup.
40    pub type_path: &'static str,
41    /// Stable wire identifier for the resource type.
42    pub wire_id: u64,
43    /// Registration callback that installs Bevy systems.
44    pub register: fn(&mut App),
45    /// Applies a decoded resource update to the world.
46    pub apply: fn(&mut World, &[u8]),
47    /// Produces a snapshot packet for the resource.
48    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51// `inventory` collection of all resource registrations.
52inventory::collect!(ResourceRegistration);
53
54/// Metadata for a prefab definition used to spawn visuals remotely.
55#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57    /// Stable type path for diagnostics and registry lookup.
58    pub type_path: &'static str,
59    /// Stable wire identifier for the prefab.
60    pub wire_id: u64,
61    /// Registration callback that can install any companion systems.
62    pub register: fn(&mut App),
63    /// Returns true when an entity should be tagged with this prefab.
64    pub matches: fn(&World, Entity) -> bool,
65    /// Applies the prefab's visual or structural components.
66    pub apply: fn(&mut World, Entity),
67}
68
69// `inventory` collection of all prefab registrations.
70inventory::collect!(PrefabRegistration);
71
72/// Runtime registry for component sync handlers.
73#[derive(Resource, Default)]
74pub struct SyncRegistry {
75    by_wire_id: HashMap<u64, ComponentRegistration>,
76    by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80    /// Registers one component handler.
81    pub fn register(&mut self, registration: ComponentRegistration) {
82        self.by_wire_id.insert(registration.wire_id, registration);
83        self.by_type_path
84            .insert(registration.type_path, registration);
85    }
86
87    /// Looks up a component handler by wire ID.
88    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
89        self.by_wire_id.get(&wire_id)
90    }
91}
92
93/// Runtime registry for resource sync handlers.
94#[derive(Resource, Default)]
95pub struct SyncResourceRegistry {
96    by_wire_id: HashMap<u64, ResourceRegistration>,
97    by_type_path: HashMap<&'static str, ResourceRegistration>,
98}
99
100impl SyncResourceRegistry {
101    /// Registers one resource handler.
102    pub fn register(&mut self, registration: ResourceRegistration) {
103        self.by_wire_id.insert(registration.wire_id, registration);
104        self.by_type_path
105            .insert(registration.type_path, registration);
106    }
107
108    /// Looks up a resource handler by wire ID.
109    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
110        self.by_wire_id.get(&wire_id)
111    }
112}
113
114/// Trait implemented by syncable components.
115pub trait SyncComponent:
116    Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
117{
118    /// Fully qualified type path.
119    const TYPE_PATH: &'static str;
120    /// Stable wire identifier.
121    const WIRE_ID: u64;
122}
123
124/// Trait implemented by syncable resources.
125pub trait SyncResource:
126    Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
127{
128    /// Fully qualified type path.
129    const TYPE_PATH: &'static str;
130    /// Stable wire identifier.
131    const WIRE_ID: u64;
132}
133
134/// Internal component used to remember which prefab a replicated entity uses.
135#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
136#[doc(hidden)]
137pub struct PrefabId(pub u64);
138
139/// Runtime registry for prefab handlers.
140#[derive(Resource, Default)]
141pub struct PrefabRegistry {
142    by_wire_id: HashMap<u64, PrefabRegistration>,
143    by_type_path: HashMap<&'static str, PrefabRegistration>,
144}
145
146impl PrefabRegistry {
147    /// Registers one prefab handler.
148    pub fn register(&mut self, registration: PrefabRegistration) {
149        self.by_wire_id.insert(registration.wire_id, registration);
150        self.by_type_path
151            .insert(registration.type_path, registration);
152    }
153
154    /// Looks up a prefab handler by wire ID.
155    pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
156        self.by_wire_id.get(&wire_id)
157    }
158
159    /// Iterates over all prefab registrations.
160    pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
161        self.by_wire_id.values()
162    }
163}
164
165/// Component packets that arrived before their entity spawn packet.
166#[derive(Resource, Default)]
167struct PendingComponentUpdates(Vec<ReplicationPacket>);
168
169/// Runtime options for syncing a resource.
170#[derive(Clone, Copy, Debug)]
171pub struct SyncResourceSettings {
172    /// Minimum seconds between sends for this resource type.
173    ///
174    /// When a resource changes faster than this interval, the latest serialized
175    /// value is coalesced and sent once the interval has elapsed.
176    pub min_interval_seconds: f32,
177    /// Optional unchanged-state heartbeat.
178    ///
179    /// This is useful for lossy state streams. Reliable resources generally do
180    /// not need a heartbeat.
181    pub heartbeat_seconds: Option<f32>,
182}
183
184impl Default for SyncResourceSettings {
185    fn default() -> Self {
186        Self {
187            min_interval_seconds: 0.0,
188            heartbeat_seconds: None,
189        }
190    }
191}
192
193/// Per-system state used to dedupe and coalesce resource snapshots.
194#[derive(Debug)]
195pub struct SyncResourceSendState {
196    last_sent_bytes: Option<Vec<u8>>,
197    pending_bytes: Option<Vec<u8>>,
198    seconds_since_send: f32,
199}
200
201impl Default for SyncResourceSendState {
202    fn default() -> Self {
203        Self {
204            last_sent_bytes: None,
205            pending_bytes: None,
206            seconds_since_send: f32::INFINITY,
207        }
208    }
209}
210
211/// FNV-1a hash used to derive wire IDs from type paths.
212pub const fn hash_type_path(type_path: &str) -> u64 {
213    let bytes = type_path.as_bytes();
214    let mut hash: u64 = 0xcbf29ce484222325;
215    let mut index = 0;
216    while index < bytes.len() {
217        hash ^= bytes[index] as u64;
218        hash = hash.wrapping_mul(0x100000001b3);
219        index += 1;
220    }
221    hash
222}
223
224/// Collects all inventory registrations and stores them in runtime registries.
225pub fn register_sync_components(app: &mut App) {
226    app.init_resource::<SyncRegistry>();
227    app.init_resource::<SyncResourceRegistry>();
228    app.init_resource::<PrefabRegistry>();
229    app.init_resource::<PendingComponentUpdates>();
230
231    let mut registry = SyncRegistry::default();
232    for registration in inventory::iter::<ComponentRegistration> {
233        registry.register(*registration);
234        (registration.register)(app);
235    }
236
237    let mut resource_registry = SyncResourceRegistry::default();
238    for registration in inventory::iter::<ResourceRegistration> {
239        resource_registry.register(*registration);
240        (registration.register)(app);
241    }
242
243    let mut prefab_registry = PrefabRegistry::default();
244    for registration in inventory::iter::<PrefabRegistration> {
245        prefab_registry.register(*registration);
246        (registration.register)(app);
247    }
248
249    app.insert_resource(registry);
250    app.insert_resource(resource_registry);
251    app.insert_resource(prefab_registry);
252}
253
254/// Poll hook run before the main replication systems.
255pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
256    net.poll_incoming();
257}
258
259/// Flushes queued packets after the frame has finished mutating state.
260pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
261    net.flush_outbox();
262}
263
264/// Sends updated sync components for entities that are added or changed.
265pub fn sync_component<T: SyncComponent>(
266    mut net: ResMut<NetResource>,
267    query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
268) {
269    if !net.is_server() {
270        return;
271    }
272
273    for (network_id, component) in &query {
274        let bytes = bincode::serde::encode_to_vec(component, config::standard())
275            .expect("failed to serialize sync component");
276        net.queue_packet(ReplicationPacket::UpdateComponent {
277            network_id: network_id.0,
278            component_wire_id: T::WIRE_ID,
279            bytes,
280        });
281    }
282}
283
284/// Sends updated resources when they are added or changed.
285pub fn sync_resource<T: SyncResource>(
286    time: Res<Time>,
287    mut net: ResMut<NetResource>,
288    resource: Option<Res<T>>,
289    mut state: Local<SyncResourceSendState>,
290) {
291    sync_resource_with_settings::<T>(
292        &time,
293        &mut net,
294        resource,
295        &mut state,
296        SyncResourceSettings::default(),
297    );
298}
299
300/// Sends updated resources with byte-level dedupe and optional send coalescing.
301pub fn sync_resource_with_settings<T: SyncResource>(
302    time: &Time,
303    net: &mut NetResource,
304    resource: Option<Res<T>>,
305    state: &mut SyncResourceSendState,
306    settings: SyncResourceSettings,
307) {
308    let Some(resource) = resource else {
309        return;
310    };
311
312    if !net.is_server() {
313        return;
314    }
315
316    state.seconds_since_send += time.delta_secs();
317
318    if resource.is_added() || resource.is_changed() {
319        let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
320            .expect("failed to serialize sync resource");
321
322        if state.last_sent_bytes.as_ref() != Some(&bytes) {
323            state.pending_bytes = Some(bytes);
324        }
325    }
326
327    let heartbeat_due = settings
328        .heartbeat_seconds
329        .map(|seconds| state.seconds_since_send >= seconds.max(0.0))
330        .unwrap_or(false);
331
332    if state.pending_bytes.is_none() && heartbeat_due {
333        state.pending_bytes = state.last_sent_bytes.clone().or_else(|| {
334            Some(
335                bincode::serde::encode_to_vec(&*resource, config::standard())
336                    .expect("failed to serialize sync resource"),
337            )
338        });
339    }
340
341    let interval_ready = state.seconds_since_send >= settings.min_interval_seconds.max(0.0);
342    if state.pending_bytes.is_none() || (!interval_ready && !heartbeat_due) {
343        return;
344    }
345
346    let bytes = state
347        .pending_bytes
348        .take()
349        .expect("pending resource bytes should exist");
350    net.queue_packet(ReplicationPacket::UpdateResource {
351        resource_wire_id: T::WIRE_ID,
352        bytes: bytes.clone(),
353    });
354    state.last_sent_bytes = Some(bytes);
355    state.seconds_since_send = 0.0;
356}
357
358/// Applies a resource snapshot only when the serialized value actually changed.
359pub fn apply_resource_update<T: SyncResource>(world: &mut World, bytes: &[u8]) {
360    if let Some(existing) = world.get_resource::<T>() {
361        let existing_bytes = bincode::serde::encode_to_vec(existing, config::standard())
362            .expect("failed to serialize existing sync resource");
363        if existing_bytes == bytes {
364            return;
365        }
366    }
367
368    let (resource, _): (T, usize) = bincode::serde::decode_from_slice(bytes, config::standard())
369        .expect("failed to deserialize sync resource");
370    world.insert_resource(resource);
371}
372
373/// Sends the initial world state to newly connected clients.
374pub fn sync_new_connections(world: &mut World) {
375    let is_server = world.resource::<NetResource>().is_server();
376    if !is_server {
377        return;
378    }
379
380    let connections = {
381        let mut net = world.resource_mut::<NetResource>();
382        net.drain_new_connections()
383    };
384
385    if connections.is_empty() {
386        return;
387    }
388
389    let component_registrations: Vec<ComponentRegistration> =
390        inventory::iter::<ComponentRegistration>()
391            .copied()
392            .collect();
393    let resource_registrations: Vec<ResourceRegistration> =
394        inventory::iter::<ResourceRegistration>().copied().collect();
395
396    for socket in &connections {
397        let replicated_entities = {
398            let mut query =
399                world.query_filtered::<(Entity, &NetworkId, Option<&PrefabId>), With<Replicated>>();
400            query
401                .iter(world)
402                .map(|(entity, network_id, prefab_id)| {
403                    (
404                        entity,
405                        *network_id,
406                        prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0),
407                    )
408                })
409                .collect::<Vec<_>>()
410        };
411
412        let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
413            .iter()
414            .map(|registration| (registration.snapshot)(world))
415            .collect();
416        let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
417            .iter()
418            .map(|registration| (registration.snapshot)(world))
419            .collect();
420
421        {
422            let net = world.resource::<NetResource>();
423            for (_, network_id, prefab_wire_id) in &replicated_entities {
424                net.send_packet_to(
425                    socket,
426                    ReplicationPacket::SpawnEntity {
427                        network_id: network_id.0,
428                        prefab_wire_id: *prefab_wire_id,
429                    },
430                );
431            }
432
433            for packets in component_snapshots
434                .into_iter()
435                .chain(resource_snapshots.into_iter())
436            {
437                for packet in packets {
438                    net.send_packet_to(socket, packet);
439                }
440            }
441        };
442    }
443}
444
445/// Applies queued incoming replication packets to the local world.
446pub fn apply_incoming_packets(world: &mut World) {
447    let mut packets = {
448        let mut pending = world.resource_mut::<PendingComponentUpdates>();
449        pending.0.drain(..).collect::<Vec<_>>()
450    };
451
452    packets.extend({
453        let mut net = world.resource_mut::<NetResource>();
454        net.drain_inbox()
455    });
456
457    if packets.is_empty() {
458        return;
459    }
460
461    packets.sort_by_key(|packet| match packet {
462        ReplicationPacket::SpawnEntity { .. } => 0,
463        ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
464        ReplicationPacket::DespawnEntity { .. } => 2,
465    });
466
467    let mut deferred = Vec::new();
468
469    for packet in packets {
470        match packet {
471            ReplicationPacket::SpawnEntity {
472                network_id,
473                prefab_wire_id,
474            } => {
475                let entity = world
476                    .spawn_empty()
477                    .insert(Replicated)
478                    .insert(NetworkId(network_id))
479                    .id();
480                world
481                    .resource_mut::<EntityIndex>()
482                    .insert(NetworkId(network_id), entity);
483                if prefab_wire_id != 0 {
484                    if let Some(registration) = world
485                        .resource::<PrefabRegistry>()
486                        .by_wire_id(prefab_wire_id)
487                        .copied()
488                    {
489                        (registration.apply)(world, entity);
490                        world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
491                    }
492                }
493            }
494            ReplicationPacket::DespawnEntity { network_id } => {
495                let entity = world
496                    .resource::<EntityIndex>()
497                    .entity(NetworkId(network_id));
498                if let Some(entity) = entity {
499                    world.despawn(entity);
500                    world.resource_mut::<EntityIndex>().remove_entity(entity);
501                }
502            }
503            ReplicationPacket::UpdateComponent {
504                network_id,
505                component_wire_id,
506                bytes,
507            } => {
508                let entity = world
509                    .resource::<EntityIndex>()
510                    .entity(NetworkId(network_id));
511                let registration = {
512                    world
513                        .resource::<SyncRegistry>()
514                        .by_wire_id(component_wire_id)
515                        .copied()
516                };
517
518                match (entity, registration) {
519                    (Some(entity), Some(registration)) => {
520                        (registration.apply)(world, entity, &bytes);
521                    }
522                    (None, Some(_)) => {
523                        deferred.push(ReplicationPacket::UpdateComponent {
524                            network_id,
525                            component_wire_id,
526                            bytes,
527                        });
528                    }
529                    _ => {}
530                }
531            }
532            ReplicationPacket::UpdateResource {
533                resource_wire_id,
534                bytes,
535            } => {
536                let registration = {
537                    world
538                        .resource::<SyncResourceRegistry>()
539                        .by_wire_id(resource_wire_id)
540                        .copied()
541                };
542
543                if let Some(registration) = registration {
544                    (registration.apply)(world, &bytes);
545                }
546            }
547        }
548    }
549
550    if !deferred.is_empty() {
551        world
552            .resource_mut::<PendingComponentUpdates>()
553            .0
554            .extend(deferred);
555    }
556}
557
558/// Assigns network IDs to newly replicated entities on the server.
559pub fn assign_network_ids(world: &mut World) {
560    let is_server = world.resource::<NetResource>().is_server();
561    if !is_server {
562        return;
563    }
564
565    let entities = {
566        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
567        query.iter(world).collect::<Vec<_>>()
568    };
569
570    for entity in entities {
571        let network_id = {
572            let mut next_id = world.resource_mut::<NextNetworkId>();
573            let network_id = NetworkId(next_id.0);
574            next_id.0 = next_id.0.saturating_add(1);
575            network_id
576        };
577
578        world.entity_mut(entity).insert(network_id);
579        world
580            .resource_mut::<EntityIndex>()
581            .insert(network_id, entity);
582        let prefab_wire_id = world
583            .entity(entity)
584            .get::<PrefabId>()
585            .map(|prefab_id| prefab_id.0)
586            .unwrap_or(0);
587        world
588            .resource_mut::<NetResource>()
589            .queue_packet(ReplicationPacket::SpawnEntity {
590                network_id: network_id.0,
591                prefab_wire_id,
592            });
593    }
594}
595
596/// Detects prefab matches on newly replicated entities.
597pub fn assign_prefab_ids(world: &mut World) {
598    let entities = {
599        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
600        query.iter(world).collect::<Vec<_>>()
601    };
602
603    let registrations: Vec<PrefabRegistration> =
604        inventory::iter::<PrefabRegistration>().copied().collect();
605
606    for entity in entities {
607        if world.entity(entity).contains::<PrefabId>() {
608            continue;
609        }
610
611        for registration in &registrations {
612            if (registration.matches)(world, entity) {
613                world
614                    .entity_mut(entity)
615                    .insert(PrefabId(registration.wire_id));
616                break;
617            }
618        }
619    }
620}
621
622/// Converts despawned replicated entities into network despawn packets.
623pub fn replicate_removals(
624    mut removed: RemovedComponents<Replicated>,
625    mut net: ResMut<NetResource>,
626    mut index: ResMut<EntityIndex>,
627) {
628    if !net.is_server() {
629        return;
630    }
631
632    for entity in removed.read() {
633        if let Some(network_id) = index.remove_entity(entity) {
634            net.queue_packet(ReplicationPacket::DespawnEntity {
635                network_id: network_id.0,
636            });
637        }
638    }
639}