Skip to main content

bevy_networker_multiplayer/
sync.rs

1// SPDX-License-Identifier: MIT
2//! Sync registration, snapshots, and packet application.
3//!
4//! This module is the bridge between Bevy ECS state and the wire format used by
5//! `NetResource`. Attribute macros submit metadata into `inventory`; this module
6//! collects that metadata, registers systems, and translates packets in both
7//! directions.
8use bevy::prelude::*;
9use bincode::config;
10use serde::{de::DeserializeOwned, Serialize};
11use std::collections::HashMap;
12
13use crate::{
14    netres::{NetResource, ReplicationPacket},
15    replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18/// Metadata for a syncable component type.
19#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21    /// Stable type path for diagnostics and registry lookup.
22    pub type_path: &'static str,
23    /// Stable wire identifier for the component type.
24    pub wire_id: u64,
25    /// Registration callback that installs Bevy systems.
26    pub register: fn(&mut App),
27    /// Applies a decoded component update to an entity.
28    pub apply: fn(&mut World, Entity, &[u8]),
29    /// Produces full-state snapshots for late joiners.
30    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33// `inventory` collection of all component registrations.
34inventory::collect!(ComponentRegistration);
35
36/// Metadata for a syncable resource type.
37#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39    /// Stable type path for diagnostics and registry lookup.
40    pub type_path: &'static str,
41    /// Stable wire identifier for the resource type.
42    pub wire_id: u64,
43    /// Registration callback that installs Bevy systems.
44    pub register: fn(&mut App),
45    /// Applies a decoded resource update to the world.
46    pub apply: fn(&mut World, &[u8]),
47    /// Produces a snapshot packet for the resource.
48    pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51// `inventory` collection of all resource registrations.
52inventory::collect!(ResourceRegistration);
53
54/// Metadata for a prefab definition used to spawn visuals remotely.
55#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57    /// Stable type path for diagnostics and registry lookup.
58    pub type_path: &'static str,
59    /// Stable wire identifier for the prefab.
60    pub wire_id: u64,
61    /// Registration callback that can install any companion systems.
62    pub register: fn(&mut App),
63    /// Returns true when an entity should be tagged with this prefab.
64    pub matches: fn(&World, Entity) -> bool,
65    /// Applies the prefab's visual or structural components.
66    pub apply: fn(&mut World, Entity),
67}
68
69// `inventory` collection of all prefab registrations.
70inventory::collect!(PrefabRegistration);
71
72/// Runtime registry for component sync handlers.
73#[derive(Resource, Default)]
74pub struct SyncRegistry {
75    by_wire_id: HashMap<u64, ComponentRegistration>,
76    by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80    /// Registers one component handler.
81    pub fn register(&mut self, registration: ComponentRegistration) {
82        self.by_wire_id.insert(registration.wire_id, registration);
83        self.by_type_path.insert(registration.type_path, registration);
84    }
85
86    /// Looks up a component handler by wire ID.
87    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
88        self.by_wire_id.get(&wire_id)
89    }
90}
91
92/// Runtime registry for resource sync handlers.
93#[derive(Resource, Default)]
94pub struct SyncResourceRegistry {
95    by_wire_id: HashMap<u64, ResourceRegistration>,
96    by_type_path: HashMap<&'static str, ResourceRegistration>,
97}
98
99impl SyncResourceRegistry {
100    /// Registers one resource handler.
101    pub fn register(&mut self, registration: ResourceRegistration) {
102        self.by_wire_id.insert(registration.wire_id, registration);
103        self.by_type_path.insert(registration.type_path, registration);
104    }
105
106    /// Looks up a resource handler by wire ID.
107    pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
108        self.by_wire_id.get(&wire_id)
109    }
110}
111
112/// Trait implemented by syncable components.
113pub trait SyncComponent:
114    Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
115{
116    /// Fully qualified type path.
117    const TYPE_PATH: &'static str;
118    /// Stable wire identifier.
119    const WIRE_ID: u64;
120}
121
122/// Trait implemented by syncable resources.
123pub trait SyncResource:
124    Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
125{
126    /// Fully qualified type path.
127    const TYPE_PATH: &'static str;
128    /// Stable wire identifier.
129    const WIRE_ID: u64;
130}
131
132/// Internal component used to remember which prefab a replicated entity uses.
133#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
134#[doc(hidden)]
135pub struct PrefabId(pub u64);
136
137/// Runtime registry for prefab handlers.
138#[derive(Resource, Default)]
139pub struct PrefabRegistry {
140    by_wire_id: HashMap<u64, PrefabRegistration>,
141    by_type_path: HashMap<&'static str, PrefabRegistration>,
142}
143
144impl PrefabRegistry {
145    /// Registers one prefab handler.
146    pub fn register(&mut self, registration: PrefabRegistration) {
147        self.by_wire_id.insert(registration.wire_id, registration);
148        self.by_type_path.insert(registration.type_path, registration);
149    }
150
151    /// Looks up a prefab handler by wire ID.
152    pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
153        self.by_wire_id.get(&wire_id)
154    }
155
156    /// Iterates over all prefab registrations.
157    pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
158        self.by_wire_id.values()
159    }
160}
161
162/// Component packets that arrived before their entity spawn packet.
163#[derive(Resource, Default)]
164struct PendingComponentUpdates(Vec<ReplicationPacket>);
165
166/// FNV-1a hash used to derive wire IDs from type paths.
167pub const fn hash_type_path(type_path: &str) -> u64 {
168    let bytes = type_path.as_bytes();
169    let mut hash: u64 = 0xcbf29ce484222325;
170    let mut index = 0;
171    while index < bytes.len() {
172        hash ^= bytes[index] as u64;
173        hash = hash.wrapping_mul(0x100000001b3);
174        index += 1;
175    }
176    hash
177}
178
179/// Collects all inventory registrations and stores them in runtime registries.
180pub fn register_sync_components(app: &mut App) {
181    app.init_resource::<SyncRegistry>();
182    app.init_resource::<SyncResourceRegistry>();
183    app.init_resource::<PrefabRegistry>();
184    app.init_resource::<PendingComponentUpdates>();
185
186    let mut registry = SyncRegistry::default();
187    for registration in inventory::iter::<ComponentRegistration> {
188        registry.register(*registration);
189        (registration.register)(app);
190    }
191
192    let mut resource_registry = SyncResourceRegistry::default();
193    for registration in inventory::iter::<ResourceRegistration> {
194        resource_registry.register(*registration);
195        (registration.register)(app);
196    }
197
198    let mut prefab_registry = PrefabRegistry::default();
199    for registration in inventory::iter::<PrefabRegistration> {
200        prefab_registry.register(*registration);
201        (registration.register)(app);
202    }
203
204    app.insert_resource(registry);
205    app.insert_resource(resource_registry);
206    app.insert_resource(prefab_registry);
207}
208
209/// Poll hook run before the main replication systems.
210pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
211    net.poll_incoming();
212}
213
214/// Flushes queued packets after the frame has finished mutating state.
215pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
216    net.flush_outbox();
217}
218
219/// Sends updated sync components for entities that are added or changed.
220pub fn sync_component<T: SyncComponent>(
221    mut net: ResMut<NetResource>,
222    query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
223) {
224    if !net.is_server() {
225        return;
226    }
227
228    for (network_id, component) in &query {
229        let bytes = bincode::serde::encode_to_vec(component, config::standard())
230            .expect("failed to serialize sync component");
231        net.queue_packet(ReplicationPacket::UpdateComponent {
232            network_id: network_id.0,
233            component_wire_id: T::WIRE_ID,
234            bytes,
235        });
236    }
237}
238
239/// Sends updated resources when they are added or changed.
240pub fn sync_resource<T: SyncResource>(
241    mut net: ResMut<NetResource>,
242    resource: Option<Res<T>>,
243) {
244    let Some(resource) = resource else {
245        return;
246    };
247
248    if !net.is_server() || !(resource.is_added() || resource.is_changed()) {
249        return;
250    }
251
252    let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
253        .expect("failed to serialize sync resource");
254    net.queue_packet(ReplicationPacket::UpdateResource {
255        resource_wire_id: T::WIRE_ID,
256        bytes,
257    });
258}
259
260/// Sends the initial world state to newly connected clients.
261pub fn sync_new_connections(world: &mut World) {
262    let is_server = world.resource::<NetResource>().is_server();
263    if !is_server {
264        return;
265    }
266
267    let connections = {
268        let mut net = world.resource_mut::<NetResource>();
269        net.drain_new_connections()
270    };
271
272    if connections.is_empty() {
273        return;
274    }
275
276    let component_registrations: Vec<ComponentRegistration> =
277        inventory::iter::<ComponentRegistration>().copied().collect();
278    let resource_registrations: Vec<ResourceRegistration> =
279        inventory::iter::<ResourceRegistration>().copied().collect();
280
281    for socket in &connections {
282        let replicated_entities = {
283            let mut query = world.query_filtered::<
284                (Entity, &NetworkId, Option<&PrefabId>),
285                With<Replicated>,
286            >();
287            query
288                .iter(world)
289                .map(|(entity, network_id, prefab_id)| {
290                    (entity, *network_id, prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0))
291                })
292                .collect::<Vec<_>>()
293        };
294
295        let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
296            .iter()
297            .map(|registration| (registration.snapshot)(world))
298            .collect();
299        let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
300            .iter()
301            .map(|registration| (registration.snapshot)(world))
302            .collect();
303
304        {
305            let net = world.resource::<NetResource>();
306            for (_, network_id, prefab_wire_id) in &replicated_entities {
307                net.send_packet_to(
308                    socket,
309                    ReplicationPacket::SpawnEntity {
310                        network_id: network_id.0,
311                        prefab_wire_id: *prefab_wire_id,
312                    },
313                );
314            }
315
316            for packets in component_snapshots.into_iter().chain(resource_snapshots.into_iter()) {
317                for packet in packets {
318                    net.send_packet_to(socket, packet);
319                }
320            }
321        };
322    }
323}
324
325/// Applies queued incoming replication packets to the local world.
326pub fn apply_incoming_packets(world: &mut World) {
327    let mut packets = {
328        let mut pending = world.resource_mut::<PendingComponentUpdates>();
329        pending.0.drain(..).collect::<Vec<_>>()
330    };
331
332    packets.extend({
333        let mut net = world.resource_mut::<NetResource>();
334        net.drain_inbox()
335    });
336
337    if packets.is_empty() {
338        return;
339    }
340
341    packets.sort_by_key(|packet| match packet {
342        ReplicationPacket::SpawnEntity { .. } => 0,
343        ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
344        ReplicationPacket::DespawnEntity { .. } => 2,
345    });
346
347    let mut deferred = Vec::new();
348
349    for packet in packets {
350        match packet {
351            ReplicationPacket::SpawnEntity {
352                network_id,
353                prefab_wire_id,
354            } => {
355                let entity = world
356                    .spawn_empty()
357                    .insert(Replicated)
358                    .insert(NetworkId(network_id))
359                    .id();
360                world.resource_mut::<EntityIndex>().insert(NetworkId(network_id), entity);
361                if prefab_wire_id != 0 {
362                    if let Some(registration) = world
363                        .resource::<PrefabRegistry>()
364                        .by_wire_id(prefab_wire_id)
365                        .copied()
366                    {
367                        (registration.apply)(world, entity);
368                        world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
369                    }
370                }
371            }
372            ReplicationPacket::DespawnEntity { network_id } => {
373                let entity = world.resource::<EntityIndex>().entity(NetworkId(network_id));
374                if let Some(entity) = entity {
375                    world.despawn(entity);
376                    world
377                        .resource_mut::<EntityIndex>()
378                        .remove_entity(entity);
379                }
380            }
381            ReplicationPacket::UpdateComponent {
382                network_id,
383                component_wire_id,
384                bytes,
385            } => {
386                let entity = world.resource::<EntityIndex>().entity(NetworkId(network_id));
387                let registration = {
388                    world
389                        .resource::<SyncRegistry>()
390                        .by_wire_id(component_wire_id)
391                        .copied()
392                };
393
394                match (entity, registration) {
395                    (Some(entity), Some(registration)) => {
396                        (registration.apply)(world, entity, &bytes);
397                    }
398                    (None, Some(_)) => {
399                        deferred.push(ReplicationPacket::UpdateComponent {
400                            network_id,
401                            component_wire_id,
402                            bytes,
403                        });
404                    }
405                    _ => {}
406                }
407            }
408            ReplicationPacket::UpdateResource {
409                resource_wire_id,
410                bytes,
411            } => {
412                let registration = {
413                    world
414                        .resource::<SyncResourceRegistry>()
415                        .by_wire_id(resource_wire_id)
416                        .copied()
417                };
418
419                if let Some(registration) = registration {
420                    (registration.apply)(world, &bytes);
421                }
422            }
423        }
424    }
425
426    if !deferred.is_empty() {
427        world
428            .resource_mut::<PendingComponentUpdates>()
429            .0
430            .extend(deferred);
431    }
432}
433
434/// Assigns network IDs to newly replicated entities on the server.
435pub fn assign_network_ids(world: &mut World) {
436    let is_server = world.resource::<NetResource>().is_server();
437    if !is_server {
438        return;
439    }
440
441    let entities = {
442        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
443        query.iter(world).collect::<Vec<_>>()
444    };
445
446    for entity in entities {
447        let network_id = {
448            let mut next_id = world.resource_mut::<NextNetworkId>();
449            let network_id = NetworkId(next_id.0);
450            next_id.0 = next_id.0.saturating_add(1);
451            network_id
452        };
453
454        world.entity_mut(entity).insert(network_id);
455        world
456            .resource_mut::<EntityIndex>()
457            .insert(network_id, entity);
458        let prefab_wire_id = world
459            .entity(entity)
460            .get::<PrefabId>()
461            .map(|prefab_id| prefab_id.0)
462            .unwrap_or(0);
463        world.resource_mut::<NetResource>().queue_packet(
464            ReplicationPacket::SpawnEntity {
465                network_id: network_id.0,
466                prefab_wire_id,
467            },
468        );
469    }
470}
471
472/// Detects prefab matches on newly replicated entities.
473pub fn assign_prefab_ids(world: &mut World) {
474    let entities = {
475        let mut query = world.query_filtered::<Entity, Added<Replicated>>();
476        query.iter(world).collect::<Vec<_>>()
477    };
478
479    let registrations: Vec<PrefabRegistration> = inventory::iter::<PrefabRegistration>().copied().collect();
480
481    for entity in entities {
482        if world.entity(entity).contains::<PrefabId>() {
483            continue;
484        }
485
486        for registration in &registrations {
487            if (registration.matches)(world, entity) {
488                world.entity_mut(entity).insert(PrefabId(registration.wire_id));
489                break;
490            }
491        }
492    }
493}
494
495/// Converts despawned replicated entities into network despawn packets.
496pub fn replicate_removals(
497    mut removed: RemovedComponents<Replicated>,
498    mut net: ResMut<NetResource>,
499    mut index: ResMut<EntityIndex>,
500) {
501    if !net.is_server() {
502        return;
503    }
504
505    for entity in removed.read() {
506        if let Some(network_id) = index.remove_entity(entity) {
507            net.queue_packet(ReplicationPacket::DespawnEntity {
508                network_id: network_id.0,
509            });
510        }
511    }
512}