1use bevy::prelude::*;
9use bincode::config;
10use serde::{Serialize, de::DeserializeOwned};
11use std::collections::HashMap;
12
13use crate::{
14 netres::{NetResource, ReplicationPacket},
15 replicated::{EntityIndex, NetworkId, NextNetworkId, Replicated},
16};
17
18#[derive(Debug, Clone, Copy)]
20pub struct ComponentRegistration {
21 pub type_path: &'static str,
23 pub wire_id: u64,
25 pub register: fn(&mut App),
27 pub apply: fn(&mut World, Entity, &[u8]),
29 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
31}
32
33inventory::collect!(ComponentRegistration);
35
36#[derive(Debug, Clone, Copy)]
38pub struct ResourceRegistration {
39 pub type_path: &'static str,
41 pub wire_id: u64,
43 pub register: fn(&mut App),
45 pub apply: fn(&mut World, &[u8]),
47 pub snapshot: fn(&mut World) -> Vec<ReplicationPacket>,
49}
50
51inventory::collect!(ResourceRegistration);
53
54#[derive(Debug, Clone, Copy)]
56pub struct PrefabRegistration {
57 pub type_path: &'static str,
59 pub wire_id: u64,
61 pub register: fn(&mut App),
63 pub matches: fn(&World, Entity) -> bool,
65 pub apply: fn(&mut World, Entity),
67}
68
69inventory::collect!(PrefabRegistration);
71
72#[derive(Resource, Default)]
74pub struct SyncRegistry {
75 by_wire_id: HashMap<u64, ComponentRegistration>,
76 by_type_path: HashMap<&'static str, ComponentRegistration>,
77}
78
79impl SyncRegistry {
80 pub fn register(&mut self, registration: ComponentRegistration) {
82 self.by_wire_id.insert(registration.wire_id, registration);
83 self.by_type_path
84 .insert(registration.type_path, registration);
85 }
86
87 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ComponentRegistration> {
89 self.by_wire_id.get(&wire_id)
90 }
91}
92
93#[derive(Resource, Default)]
95pub struct SyncResourceRegistry {
96 by_wire_id: HashMap<u64, ResourceRegistration>,
97 by_type_path: HashMap<&'static str, ResourceRegistration>,
98}
99
100impl SyncResourceRegistry {
101 pub fn register(&mut self, registration: ResourceRegistration) {
103 self.by_wire_id.insert(registration.wire_id, registration);
104 self.by_type_path
105 .insert(registration.type_path, registration);
106 }
107
108 pub fn by_wire_id(&self, wire_id: u64) -> Option<&ResourceRegistration> {
110 self.by_wire_id.get(&wire_id)
111 }
112}
113
114pub trait SyncComponent:
116 Component + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
117{
118 const TYPE_PATH: &'static str;
120 const WIRE_ID: u64;
122}
123
124pub trait SyncResource:
126 Resource + Serialize + DeserializeOwned + Clone + Send + Sync + 'static
127{
128 const TYPE_PATH: &'static str;
130 const WIRE_ID: u64;
132}
133
134#[derive(Component, Clone, Copy, Debug, Eq, Hash, PartialEq)]
136#[doc(hidden)]
137pub struct PrefabId(pub u64);
138
139#[derive(Resource, Default)]
141pub struct PrefabRegistry {
142 by_wire_id: HashMap<u64, PrefabRegistration>,
143 by_type_path: HashMap<&'static str, PrefabRegistration>,
144}
145
146impl PrefabRegistry {
147 pub fn register(&mut self, registration: PrefabRegistration) {
149 self.by_wire_id.insert(registration.wire_id, registration);
150 self.by_type_path
151 .insert(registration.type_path, registration);
152 }
153
154 pub fn by_wire_id(&self, wire_id: u64) -> Option<&PrefabRegistration> {
156 self.by_wire_id.get(&wire_id)
157 }
158
159 pub fn all(&self) -> impl Iterator<Item = &PrefabRegistration> {
161 self.by_wire_id.values()
162 }
163}
164
165#[derive(Resource, Default)]
167struct PendingComponentUpdates(Vec<ReplicationPacket>);
168
169#[derive(Debug, Resource)]
171#[doc(hidden)]
172pub struct ComponentUpdateSequenceState {
173 next_outgoing: u64,
174 latest_incoming: HashMap<(u64, u64), u64>,
175}
176
177impl Default for ComponentUpdateSequenceState {
178 fn default() -> Self {
179 Self {
180 next_outgoing: 1,
181 latest_incoming: HashMap::new(),
182 }
183 }
184}
185
186impl ComponentUpdateSequenceState {
187 fn next_outgoing(&mut self) -> u64 {
188 let sequence = self.next_outgoing;
189 self.next_outgoing = self.next_outgoing.saturating_add(1);
190 sequence
191 }
192
193 fn accept_incoming(&mut self, network_id: u64, component_wire_id: u64, sequence: u64) -> bool {
194 let key = (network_id, component_wire_id);
195 if self
196 .latest_incoming
197 .get(&key)
198 .is_some_and(|latest| *latest >= sequence)
199 {
200 return false;
201 }
202
203 self.latest_incoming.insert(key, sequence);
204 true
205 }
206
207 fn forget_network_id(&mut self, network_id: u64) {
208 self.latest_incoming
209 .retain(|(stored_network_id, _), _| *stored_network_id != network_id);
210 }
211}
212
213#[derive(Clone, Copy, Debug)]
215pub struct SyncResourceSettings {
216 pub min_interval_seconds: f32,
221 pub heartbeat_seconds: Option<f32>,
226}
227
228impl Default for SyncResourceSettings {
229 fn default() -> Self {
230 Self {
231 min_interval_seconds: 0.0,
232 heartbeat_seconds: None,
233 }
234 }
235}
236
237#[derive(Debug)]
239pub struct SyncResourceSendState {
240 last_sent_bytes: Option<Vec<u8>>,
241 pending_bytes: Option<Vec<u8>>,
242 seconds_since_send: f32,
243}
244
245impl Default for SyncResourceSendState {
246 fn default() -> Self {
247 Self {
248 last_sent_bytes: None,
249 pending_bytes: None,
250 seconds_since_send: f32::INFINITY,
251 }
252 }
253}
254
255pub const fn hash_type_path(type_path: &str) -> u64 {
257 let bytes = type_path.as_bytes();
258 let mut hash: u64 = 0xcbf29ce484222325;
259 let mut index = 0;
260 while index < bytes.len() {
261 hash ^= bytes[index] as u64;
262 hash = hash.wrapping_mul(0x100000001b3);
263 index += 1;
264 }
265 hash
266}
267
268pub fn register_sync_components(app: &mut App) {
270 app.init_resource::<SyncRegistry>();
271 app.init_resource::<SyncResourceRegistry>();
272 app.init_resource::<PrefabRegistry>();
273 app.init_resource::<PendingComponentUpdates>();
274 app.init_resource::<ComponentUpdateSequenceState>();
275
276 let mut registry = SyncRegistry::default();
277 for registration in inventory::iter::<ComponentRegistration> {
278 registry.register(*registration);
279 (registration.register)(app);
280 }
281
282 let mut resource_registry = SyncResourceRegistry::default();
283 for registration in inventory::iter::<ResourceRegistration> {
284 resource_registry.register(*registration);
285 (registration.register)(app);
286 }
287
288 let mut prefab_registry = PrefabRegistry::default();
289 for registration in inventory::iter::<PrefabRegistration> {
290 prefab_registry.register(*registration);
291 (registration.register)(app);
292 }
293
294 app.insert_resource(registry);
295 app.insert_resource(resource_registry);
296 app.insert_resource(prefab_registry);
297}
298
299#[doc(hidden)]
301pub fn next_component_update_sequence(world: &mut World) -> u64 {
302 world
303 .resource_mut::<ComponentUpdateSequenceState>()
304 .next_outgoing()
305}
306
307pub fn poll_network_incoming(mut net: ResMut<NetResource>) {
309 net.poll_incoming();
310}
311
312pub fn flush_network_outbox(mut net: ResMut<NetResource>) {
314 net.flush_outbox();
315}
316
317pub fn sync_component<T: SyncComponent>(
319 mut net: ResMut<NetResource>,
320 mut sequence_state: ResMut<ComponentUpdateSequenceState>,
321 query: Query<(&NetworkId, &T), (With<Replicated>, Or<(Added<T>, Changed<T>)>)>,
322) {
323 if !net.is_server() {
324 return;
325 }
326
327 for (network_id, component) in &query {
328 let bytes = bincode::serde::encode_to_vec(component, config::standard())
329 .expect("failed to serialize sync component");
330 net.queue_packet(ReplicationPacket::UpdateComponent {
331 network_id: network_id.0,
332 component_wire_id: T::WIRE_ID,
333 sequence: sequence_state.next_outgoing(),
334 bytes,
335 });
336 }
337}
338
339pub fn sync_resource<T: SyncResource>(
341 time: Res<Time>,
342 mut net: ResMut<NetResource>,
343 resource: Option<Res<T>>,
344 mut state: Local<SyncResourceSendState>,
345) {
346 sync_resource_with_settings::<T>(
347 &time,
348 &mut net,
349 resource,
350 &mut state,
351 SyncResourceSettings::default(),
352 );
353}
354
355pub fn sync_resource_with_settings<T: SyncResource>(
357 time: &Time,
358 net: &mut NetResource,
359 resource: Option<Res<T>>,
360 state: &mut SyncResourceSendState,
361 settings: SyncResourceSettings,
362) {
363 let Some(resource) = resource else {
364 return;
365 };
366
367 if !net.is_server() {
368 return;
369 }
370
371 state.seconds_since_send += time.delta_secs();
372
373 if resource.is_added() || resource.is_changed() {
374 let bytes = bincode::serde::encode_to_vec(&*resource, config::standard())
375 .expect("failed to serialize sync resource");
376
377 if state.last_sent_bytes.as_ref() != Some(&bytes) {
378 state.pending_bytes = Some(bytes);
379 }
380 }
381
382 let heartbeat_due = settings
383 .heartbeat_seconds
384 .map(|seconds| state.seconds_since_send >= seconds.max(0.0))
385 .unwrap_or(false);
386
387 if state.pending_bytes.is_none() && heartbeat_due {
388 state.pending_bytes = state.last_sent_bytes.clone().or_else(|| {
389 Some(
390 bincode::serde::encode_to_vec(&*resource, config::standard())
391 .expect("failed to serialize sync resource"),
392 )
393 });
394 }
395
396 let interval_ready = state.seconds_since_send >= settings.min_interval_seconds.max(0.0);
397 if state.pending_bytes.is_none() || (!interval_ready && !heartbeat_due) {
398 return;
399 }
400
401 let bytes = state
402 .pending_bytes
403 .take()
404 .expect("pending resource bytes should exist");
405 net.queue_packet(ReplicationPacket::UpdateResource {
406 resource_wire_id: T::WIRE_ID,
407 bytes: bytes.clone(),
408 });
409 state.last_sent_bytes = Some(bytes);
410 state.seconds_since_send = 0.0;
411}
412
413pub fn apply_resource_update<T: SyncResource>(world: &mut World, bytes: &[u8]) {
415 if let Some(existing) = world.get_resource::<T>() {
416 let existing_bytes = bincode::serde::encode_to_vec(existing, config::standard())
417 .expect("failed to serialize existing sync resource");
418 if existing_bytes == bytes {
419 return;
420 }
421 }
422
423 let (resource, _): (T, usize) = bincode::serde::decode_from_slice(bytes, config::standard())
424 .expect("failed to deserialize sync resource");
425 world.insert_resource(resource);
426}
427
428pub fn sync_new_connections(world: &mut World) {
430 let is_server = world.resource::<NetResource>().is_server();
431 if !is_server {
432 return;
433 }
434
435 let connections = {
436 let mut net = world.resource_mut::<NetResource>();
437 net.drain_new_connections()
438 };
439
440 if connections.is_empty() {
441 return;
442 }
443
444 let component_registrations: Vec<ComponentRegistration> =
445 inventory::iter::<ComponentRegistration>()
446 .copied()
447 .collect();
448 let resource_registrations: Vec<ResourceRegistration> =
449 inventory::iter::<ResourceRegistration>().copied().collect();
450
451 for socket in &connections {
452 let replicated_entities = {
453 let mut query =
454 world.query_filtered::<(Entity, &NetworkId, Option<&PrefabId>), With<Replicated>>();
455 query
456 .iter(world)
457 .map(|(entity, network_id, prefab_id)| {
458 (
459 entity,
460 *network_id,
461 prefab_id.map(|prefab_id| prefab_id.0).unwrap_or(0),
462 )
463 })
464 .collect::<Vec<_>>()
465 };
466
467 let component_snapshots: Vec<Vec<ReplicationPacket>> = component_registrations
468 .iter()
469 .map(|registration| (registration.snapshot)(world))
470 .collect();
471 let resource_snapshots: Vec<Vec<ReplicationPacket>> = resource_registrations
472 .iter()
473 .map(|registration| (registration.snapshot)(world))
474 .collect();
475
476 {
477 let net = world.resource::<NetResource>();
478 for (_, network_id, prefab_wire_id) in &replicated_entities {
479 net.send_packet_to(
480 socket,
481 ReplicationPacket::SpawnEntity {
482 network_id: network_id.0,
483 prefab_wire_id: *prefab_wire_id,
484 },
485 );
486 }
487
488 for packets in component_snapshots
489 .into_iter()
490 .chain(resource_snapshots.into_iter())
491 {
492 for packet in packets {
493 net.send_packet_to(socket, packet);
494 }
495 }
496 };
497 }
498}
499
500pub fn apply_incoming_packets(world: &mut World) {
502 let mut packets = {
503 let mut pending = world.resource_mut::<PendingComponentUpdates>();
504 pending.0.drain(..).collect::<Vec<_>>()
505 };
506
507 packets.extend({
508 let mut net = world.resource_mut::<NetResource>();
509 net.drain_inbox()
510 });
511
512 if packets.is_empty() {
513 return;
514 }
515
516 packets.sort_by_key(|packet| match packet {
517 ReplicationPacket::SpawnEntity { .. } => 0,
518 ReplicationPacket::UpdateComponent { .. } | ReplicationPacket::UpdateResource { .. } => 1,
519 ReplicationPacket::DespawnEntity { .. } => 2,
520 });
521
522 let mut deferred = Vec::new();
523
524 for packet in packets {
525 match packet {
526 ReplicationPacket::SpawnEntity {
527 network_id,
528 prefab_wire_id,
529 } => {
530 let entity = world
531 .spawn_empty()
532 .insert(Replicated)
533 .insert(NetworkId(network_id))
534 .id();
535 world
536 .resource_mut::<EntityIndex>()
537 .insert(NetworkId(network_id), entity);
538 if prefab_wire_id != 0 {
539 if let Some(registration) = world
540 .resource::<PrefabRegistry>()
541 .by_wire_id(prefab_wire_id)
542 .copied()
543 {
544 (registration.apply)(world, entity);
545 world.entity_mut(entity).insert(PrefabId(prefab_wire_id));
546 }
547 }
548 }
549 ReplicationPacket::DespawnEntity { network_id } => {
550 let entity = world
551 .resource::<EntityIndex>()
552 .entity(NetworkId(network_id));
553 if let Some(entity) = entity {
554 world.despawn(entity);
555 world.resource_mut::<EntityIndex>().remove_entity(entity);
556 world
557 .resource_mut::<ComponentUpdateSequenceState>()
558 .forget_network_id(network_id);
559 }
560 }
561 ReplicationPacket::UpdateComponent {
562 network_id,
563 component_wire_id,
564 sequence,
565 bytes,
566 } => {
567 let entity = world
568 .resource::<EntityIndex>()
569 .entity(NetworkId(network_id));
570 let registration = {
571 world
572 .resource::<SyncRegistry>()
573 .by_wire_id(component_wire_id)
574 .copied()
575 };
576
577 match (entity, registration) {
578 (Some(entity), Some(registration)) => {
579 let is_fresh = world
580 .resource_mut::<ComponentUpdateSequenceState>()
581 .accept_incoming(network_id, component_wire_id, sequence);
582 if is_fresh {
583 (registration.apply)(world, entity, &bytes);
584 }
585 }
586 (None, Some(_)) => {
587 deferred.push(ReplicationPacket::UpdateComponent {
588 network_id,
589 component_wire_id,
590 sequence,
591 bytes,
592 });
593 }
594 _ => {}
595 }
596 }
597 ReplicationPacket::UpdateResource {
598 resource_wire_id,
599 bytes,
600 } => {
601 let registration = {
602 world
603 .resource::<SyncResourceRegistry>()
604 .by_wire_id(resource_wire_id)
605 .copied()
606 };
607
608 if let Some(registration) = registration {
609 (registration.apply)(world, &bytes);
610 }
611 }
612 }
613 }
614
615 if !deferred.is_empty() {
616 world
617 .resource_mut::<PendingComponentUpdates>()
618 .0
619 .extend(deferred);
620 }
621}
622
623pub fn assign_network_ids(world: &mut World) {
625 let is_server = world.resource::<NetResource>().is_server();
626 if !is_server {
627 return;
628 }
629
630 let entities = {
631 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
632 query.iter(world).collect::<Vec<_>>()
633 };
634
635 for entity in entities {
636 let network_id = {
637 let mut next_id = world.resource_mut::<NextNetworkId>();
638 let network_id = NetworkId(next_id.0);
639 next_id.0 = next_id.0.saturating_add(1);
640 network_id
641 };
642
643 world.entity_mut(entity).insert(network_id);
644 world
645 .resource_mut::<EntityIndex>()
646 .insert(network_id, entity);
647 let prefab_wire_id = world
648 .entity(entity)
649 .get::<PrefabId>()
650 .map(|prefab_id| prefab_id.0)
651 .unwrap_or(0);
652 world
653 .resource_mut::<NetResource>()
654 .queue_packet(ReplicationPacket::SpawnEntity {
655 network_id: network_id.0,
656 prefab_wire_id,
657 });
658 }
659}
660
661pub fn assign_prefab_ids(world: &mut World) {
663 let entities = {
664 let mut query = world.query_filtered::<Entity, Added<Replicated>>();
665 query.iter(world).collect::<Vec<_>>()
666 };
667
668 let registrations: Vec<PrefabRegistration> =
669 inventory::iter::<PrefabRegistration>().copied().collect();
670
671 for entity in entities {
672 if world.entity(entity).contains::<PrefabId>() {
673 continue;
674 }
675
676 for registration in ®istrations {
677 if (registration.matches)(world, entity) {
678 world
679 .entity_mut(entity)
680 .insert(PrefabId(registration.wire_id));
681 break;
682 }
683 }
684 }
685}
686
687pub fn replicate_removals(
689 mut removed: RemovedComponents<Replicated>,
690 mut net: ResMut<NetResource>,
691 mut index: ResMut<EntityIndex>,
692) {
693 if !net.is_server() {
694 return;
695 }
696
697 for entity in removed.read() {
698 if let Some(network_id) = index.remove_entity(entity) {
699 net.queue_packet(ReplicationPacket::DespawnEntity {
700 network_id: network_id.0,
701 });
702 }
703 }
704}