1use std::{any::Any, collections::VecDeque, hash::Hash, net::SocketAddr, time::Duration};
2
3use log::{debug, info, warn};
4
5use naia_shared::{
6 handshake::{HandshakeHeader, RejectReason},
7 AuthorityError, BitWriter, Channel, ChannelKind, ComponentKind, ConnectionStats,
8 EntityAndGlobalEntityConverter,
9 EntityAuthStatus, EntityDoesNotExistError, EntityEvent, EntityPriorityMut, EntityPriorityRef,
10 FakeEntityConverter, GameInstant, GlobalEntity, GlobalEntityMap, GlobalEntitySpawner,
11 GlobalRequestId, GlobalResponseId, GlobalWorldManagerType, HostType, Instant, Message,
12 MessageContainer, OwnedLocalEntity, PacketType, Protocol, ProtocolId, Replicate,
13 ReplicatedComponent, Request, Response, ResponseReceiveKey, ResponseSendKey, Serde,
14 SharedGlobalWorldManager, SocketConfig, StandardHeader, Tick, UserPriorityState, WorldMutType,
15 WorldRefType,
16};
17
18use super::{
19 client_config::ClientConfig, error::NaiaClientError, world_events::Events,
20 JitterBufferType,
21};
22use crate::{
23 connection::{base_time_manager::BaseTimeManager, connection::Connection, io::Io},
24 handshake::{HandshakeManager, HandshakeResult, Handshaker},
25 tick_events::TickEvents,
26 transport::{IdentityReceiverResult, Socket},
27 world::{
28 entity_mut::EntityMut, entity_owner::EntityOwner, entity_ref::EntityRef,
29 global_world_manager::GlobalWorldManager,
30 },
31 Publicity,
32};
33
34pub struct Client<E: Copy + Eq + Hash + Send + Sync> {
56 client_config: ClientConfig,
58 protocol: Protocol,
59 protocol_id: ProtocolId,
60 auth_message: Option<Vec<u8>>,
62 auth_headers: Option<Vec<(String, String)>>,
63 io: Io,
64 server_connection: Option<Connection>,
65 handshake_manager: Box<dyn Handshaker>,
66 manual_disconnect: bool,
67 server_disconnect: bool,
68 waitlist_messages: VecDeque<(ChannelKind, Box<dyn Message>)>,
69 global_world_manager: GlobalWorldManager,
71 global_entity_map: GlobalEntityMap<E>,
72 incoming_world_events: Events<E>,
74 incoming_tick_events: TickEvents,
75 priority: UserPriorityState<E>,
77 resource_registry: naia_shared::ResourceRegistry,
84}
85
86impl<E: Copy + Eq + Hash + Send + Sync> Client<E> {
87 pub fn new<P: Into<Protocol>>(client_config: ClientConfig, protocol: P) -> Self {
92 let mut protocol: Protocol = protocol.into();
93 protocol.lock();
94 let protocol_id = protocol.protocol_id();
95 Self::new_with_protocol_id(client_config, protocol, protocol_id)
96 }
97
98 pub fn new_with_protocol_id(
105 client_config: ClientConfig,
106 protocol: Protocol,
107 protocol_id: ProtocolId,
108 ) -> Self {
109 let handshake_manager = HandshakeManager::new(
110 protocol_id,
111 client_config.send_handshake_interval,
112 client_config.ping_interval,
113 client_config.handshake_pings,
114 );
115
116 let compression_config = protocol.compression.clone();
117
118 Self {
119 client_config: client_config.clone(),
121 protocol,
122 protocol_id,
123 auth_message: None,
125 auth_headers: None,
126 io: Io::new(
127 &client_config.connection.bandwidth_measure_duration,
128 &compression_config,
129 ),
130 server_connection: None,
131 handshake_manager: Box::new(handshake_manager),
132 manual_disconnect: false,
133 server_disconnect: false,
134 waitlist_messages: VecDeque::new(),
135 global_world_manager: GlobalWorldManager::new(),
137 global_entity_map: GlobalEntityMap::new(),
138 incoming_world_events: Events::new(),
140 incoming_tick_events: TickEvents::new(),
141 priority: UserPriorityState::new(),
142 resource_registry: naia_shared::ResourceRegistry::new(),
143 }
144 }
145
146 pub fn entity_priority(&self, entity: E) -> EntityPriorityRef<'_, E> {
151 self.priority.get_ref(entity)
152 }
153
154 pub fn entity_priority_mut(&mut self, entity: E) -> EntityPriorityMut<'_, E> {
157 self.priority.get_mut(entity)
158 }
159
160 pub fn auth<M: Message>(&mut self, auth: M) {
168 let mut bit_writer = BitWriter::new();
170 auth.write(
171 &self.protocol.message_kinds,
172 &mut bit_writer,
173 &mut FakeEntityConverter,
174 );
175 let auth_bytes = bit_writer.to_bytes();
176 self.auth_message = Some(auth_bytes.to_vec());
177 }
178
179 pub fn auth_headers(&mut self, headers: Vec<(String, String)>) {
185 self.auth_headers = Some(headers);
186 }
187
188 pub fn connect<S: Into<Box<dyn Socket>>>(&mut self, socket: S) {
201 if !self.is_disconnected() {
202 panic!("Client has already initiated a connection, cannot initiate a new one. TIP: Check client.is_disconnected() before calling client.connect()");
203 }
204
205 if let Some(auth_bytes) = &self.auth_message {
206 if let Some(auth_headers) = &self.auth_headers {
207 let boxed_socket: Box<dyn Socket> = socket.into();
209 let (id_receiver, packet_sender, packet_receiver) = boxed_socket
210 .connect_with_auth_and_headers(auth_bytes.clone(), auth_headers.clone());
211 self.io.load(id_receiver, packet_sender, packet_receiver);
212 } else {
213 let boxed_socket: Box<dyn Socket> = socket.into();
215 let (id_receiver, packet_sender, packet_receiver) =
216 boxed_socket.connect_with_auth(auth_bytes.clone());
217 self.io.load(id_receiver, packet_sender, packet_receiver);
218 }
219 } else if let Some(auth_headers) = &self.auth_headers {
220 let boxed_socket: Box<dyn Socket> = socket.into();
222 let (id_receiver, packet_sender, packet_receiver) =
223 boxed_socket.connect_with_auth_headers(auth_headers.clone());
224 self.io.load(id_receiver, packet_sender, packet_receiver);
225 } else {
226 let boxed_socket: Box<dyn Socket> = socket.into();
228 let (id_receiver, packet_sender, packet_receiver) = boxed_socket.connect();
229 self.io.load(id_receiver, packet_sender, packet_receiver);
230 }
231 }
232
233 pub fn connection_status(&self) -> ConnectionStatus {
240 if self.is_connected() {
241 if self.is_disconnecting() {
242 ConnectionStatus::Disconnecting
243 } else {
244 ConnectionStatus::Connected
245 }
246 } else {
247 if self.is_disconnected() {
248 return ConnectionStatus::Disconnected;
249 }
250 if self.is_connecting() {
251 return ConnectionStatus::Connecting;
252 }
253 panic!("Client is in an unknown connection state!");
254 }
255 }
256
257 fn is_connecting(&self) -> bool {
259 self.io.is_loaded()
260 }
261
262 fn is_connected(&self) -> bool {
264 self.server_connection.is_some()
265 }
266
267 fn is_disconnecting(&self) -> bool {
269 if let Some(connection) = &self.server_connection {
270 connection.should_drop() || self.manual_disconnect || self.server_disconnect
271 } else {
272 false
273 }
274 }
275
276 fn is_disconnected(&self) -> bool {
278 !self.io.is_loaded()
279 }
280
281 pub fn disconnect(&mut self) {
294 if !self.is_connected() {
295 panic!("Trying to disconnect Client which is not connected yet!")
296 }
297
298 for _ in 0..10 {
299 let writer = self.handshake_manager.write_disconnect();
300 if self.io.send_packet(writer.to_packet()).is_err() {
301 warn!("Client Error: Cannot send disconnect packet to Server");
304 }
305 }
306
307 self.manual_disconnect = true;
308 }
309
310 pub fn socket_config(&self) -> &SocketConfig {
312 &self.protocol.socket
313 }
314
315 pub fn receive_all_packets(&mut self) {
323 self.maintain_socket();
326 }
327
328 pub fn process_all_packets<W: WorldMutType<E>>(&mut self, mut world: W, now: &Instant) {
335 if self.is_disconnecting() {
337 let reason = if self.manual_disconnect || self.server_disconnect {
338 naia_shared::DisconnectReason::ClientDisconnected
339 } else {
340 naia_shared::DisconnectReason::TimedOut
341 };
342 self.disconnect_with_events(&mut world, reason);
343 return;
344 }
345
346 let Some(connection) = &mut self.server_connection else {
347 return;
348 };
349
350 let entity_events = connection.process_packets(
352 &mut self.global_entity_map,
353 &mut self.global_world_manager,
354 &self.protocol,
355 &mut world,
356 now,
357 &mut self.incoming_world_events,
358 );
359
360 self.process_entity_events(&mut world, entity_events);
361 }
362
363 pub fn take_world_events(&mut self) -> Events<E> {
372 std::mem::take(&mut self.incoming_world_events)
373 }
374
375 pub fn take_tick_events(&mut self, now: &Instant) -> TickEvents {
384 let Some(connection) = &mut self.server_connection else {
385 return TickEvents::default();
386 };
387
388 let (receiving_tick_happened, sending_tick_happened) =
389 connection.time_manager.collect_ticks(now);
390
391 let should_read_packets = match self.client_config.jitter_buffer {
394 JitterBufferType::Bypass => true,
395 JitterBufferType::Real => receiving_tick_happened.is_some(),
396 };
397
398 if should_read_packets {
399 if let Err(_err) = connection.read_buffered_packets(
401 &self.protocol.channel_kinds,
402 &self.protocol.message_kinds,
403 &self.protocol.component_kinds,
404 ) {
405 warn!("Error reading from buffered packet!");
407 }
408 }
409
410 if let Some((prev_receiving_tick, current_receiving_tick)) = receiving_tick_happened {
411 let mut index_tick = prev_receiving_tick.wrapping_add(1);
412 loop {
413 self.incoming_tick_events.push_server_tick(index_tick);
414
415 if index_tick == current_receiving_tick {
416 break;
417 }
418 index_tick = index_tick.wrapping_add(1);
419 }
420 }
421
422 if let Some((prev_sending_tick, current_sending_tick)) = sending_tick_happened {
423 let mut index_tick = prev_sending_tick.wrapping_add(1);
425 loop {
426 self.incoming_tick_events.push_client_tick(index_tick);
427
428 if index_tick == current_sending_tick {
429 break;
430 }
431 index_tick = index_tick.wrapping_add(1);
432 }
433 }
434
435 std::mem::take(&mut self.incoming_tick_events)
436 }
437
438 pub fn send_all_packets<W: WorldRefType<E>>(&mut self, world: W) {
445 if let Some(connection) = &mut self.server_connection {
446 let now = Instant::now();
447
448 connection.send_packets(
450 &self.protocol,
451 &now,
452 &mut self.io,
453 &world,
454 &self.global_entity_map,
455 &self.global_world_manager,
456 );
457 } else if self.io.is_loaded() {
458 if let Some(outgoing_packet) = self.handshake_manager.send() {
459 if self.io.send_packet(outgoing_packet).is_err() {
460 warn!("Client Error: Cannot send handshake packet to Server");
463 }
464 }
465 }
466 }
467
468 pub fn send_message<C: Channel, M: Message>(
485 &mut self,
486 message: &M,
487 ) -> Result<(), NaiaClientError> {
488 let cloned_message = M::clone_box(message);
489 self.send_message_inner(&ChannelKind::of::<C>(), cloned_message)
490 }
491
492 fn send_message_inner(
493 &mut self,
494 channel_kind: &ChannelKind,
495 message_box: Box<dyn Message>,
496 ) -> Result<(), NaiaClientError> {
497 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
498 if !channel_settings.can_send_to_server() {
499 return Err(NaiaClientError::Message(
500 "Cannot send message to Server on this Channel".to_string(),
501 ));
502 }
503
504 if channel_settings.tick_buffered() {
505 return Err(NaiaClientError::Message("Cannot call `Client.send_message()` on a Tick Buffered Channel, use `Client.send_tick_buffered_message()` instead".to_string()));
506 }
507
508 if let Some(connection) = &mut self.server_connection {
509 let mut converter = connection
510 .base
511 .world_manager
512 .entity_converter_mut(&self.global_world_manager);
513 let message = MessageContainer::new(message_box);
514 let accepted = connection.base.message_manager.send_message(
515 &self.protocol.message_kinds,
516 &mut converter,
517 channel_kind,
518 message,
519 );
520 if !accepted {
521 return Err(NaiaClientError::MessageQueueFull);
522 }
523 } else {
524 self.waitlist_messages
525 .push_back((*channel_kind, message_box));
526 }
527 Ok(())
528 }
529
530 pub fn send_request<C: Channel, Q: Request>(
544 &mut self,
545 request: &Q,
546 ) -> Result<ResponseReceiveKey<Q::Response>, NaiaClientError> {
547 let cloned_request = Q::clone_box(request);
548 let id = self.send_request_inner(&ChannelKind::of::<C>(), cloned_request)?;
550 Ok(ResponseReceiveKey::new(id))
551 }
552
553 fn send_request_inner(
554 &mut self,
555 channel_kind: &ChannelKind,
556 request_box: Box<dyn Message>,
558 ) -> Result<GlobalRequestId, NaiaClientError> {
559 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
560
561 if !channel_settings.can_request_and_respond() {
562 std::panic!("Requests can only be sent over Bidirectional, Reliable Channels");
563 }
564
565 let Some(connection) = &mut self.server_connection else {
566 warn!("currently not connected to server");
567 return Err(NaiaClientError::Message(
568 "currently not connected to server".to_string(),
569 ));
570 };
571 let mut converter = connection
572 .base
573 .world_manager
574 .entity_converter_mut(&self.global_world_manager);
575
576 let request_id = connection.global_request_manager.create_request_id();
577 let message = MessageContainer::new(request_box);
578 connection.base.message_manager.send_request(
579 &self.protocol.message_kinds,
580 &mut converter,
581 channel_kind,
582 request_id,
583 message,
584 );
585
586 Ok(request_id)
587 }
588
589 pub fn send_response<S: Response>(
597 &mut self,
598 response_key: &ResponseSendKey<S>,
599 response: &S,
600 ) -> bool {
601 let response_id = response_key.response_id();
602
603 let cloned_response = S::clone_box(response);
604
605 self.send_response_inner(&response_id, cloned_response)
606 }
607
608 fn send_response_inner(
610 &mut self,
611 response_id: &GlobalResponseId,
612 response_box: Box<dyn Message>,
613 ) -> bool {
614 let Some(connection) = &mut self.server_connection else {
615 return false;
616 };
617 let Some((channel_kind, local_response_id)) = connection
618 .global_response_manager
619 .destroy_response_id(response_id)
620 else {
621 return false;
622 };
623 let mut converter = connection
624 .base
625 .world_manager
626 .entity_converter_mut(&self.global_world_manager);
627
628 let response = MessageContainer::new(response_box);
629 connection.base.message_manager.send_response(
630 &self.protocol.message_kinds,
631 &mut converter,
632 &channel_kind,
633 local_response_id,
634 response,
635 );
636 true
637 }
638
639 pub fn has_response<S: Response>(&self, response_key: &ResponseReceiveKey<S>) -> bool {
645 let Some(connection) = &self.server_connection else {
646 return false;
647 };
648 let request_id = response_key.request_id();
649 connection.global_request_manager.has_response(&request_id)
650 }
651
652 pub fn receive_response<S: Response>(
658 &mut self,
659 response_key: &ResponseReceiveKey<S>,
660 ) -> Option<S> {
661 let Some(connection) = &mut self.server_connection else {
662 return None;
663 };
664 let request_id = response_key.request_id();
665 let container = connection
666 .global_request_manager
667 .destroy_request_id(&request_id)?;
668 let response: S = Box::<dyn Any + 'static>::downcast::<S>(container.to_boxed_any())
669 .ok()
670 .map(|boxed_s| *boxed_s)
671 .unwrap();
672 Some(response)
673 }
674 fn on_connect(&mut self) {
677 let messages = std::mem::take(&mut self.waitlist_messages);
679 for (channel_kind, message_box) in messages {
680 let _ = self.send_message_inner(&channel_kind, message_box);
681 }
682 }
683
684 pub fn send_tick_buffer_message<C: Channel, M: Message>(&mut self, tick: &Tick, message: &M) {
696 let cloned_message = M::clone_box(message);
697 self.send_tick_buffer_message_inner(tick, &ChannelKind::of::<C>(), cloned_message);
698 }
699
700 fn send_tick_buffer_message_inner(
701 &mut self,
702 tick: &Tick,
703 channel_kind: &ChannelKind,
704 message_box: Box<dyn Message>,
705 ) {
706 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
707
708 if !channel_settings.can_send_to_server() {
709 panic!("Cannot send message to Server on this Channel");
710 }
711
712 if !channel_settings.tick_buffered() {
713 panic!("Can only use `Client.send_tick_buffer_message()` on a Channel that is configured for it.");
714 }
715
716 if let Some(connection) = self.server_connection.as_mut() {
717 let message = MessageContainer::new(message_box);
718 connection
719 .tick_buffer
720 .send_message(tick, channel_kind, message);
721 }
722 }
723
724 pub fn spawn_entity<W: WorldMutType<E>>(&'_ mut self, mut world: W) -> EntityMut<'_, E, W> {
739 self.check_client_authoritative_allowed();
740
741 let world_entity = world.spawn_entity();
742
743 self.spawn_entity_inner(&world_entity);
744
745 EntityMut::new(self, world, &world_entity)
746 }
747
748 fn spawn_entity_inner(&mut self, world_entity: &E) {
750 let global_entity = self.global_entity_map.spawn(*world_entity, None);
751
752 self.global_world_manager.host_spawn_entity(&global_entity);
753
754 let Some(connection) = &mut self.server_connection else {
755 return;
756 };
757 let component_kinds = self
758 .global_world_manager
759 .component_kinds(&global_entity)
760 .unwrap();
761 connection
762 .base
763 .world_manager
764 .host_init_entity(&global_entity, component_kinds, &self.protocol.component_kinds, false);
765 }
766
767 pub fn has_resource<R: 'static>(&self) -> bool {
775 self.resource_registry.entity_for::<R>().is_some()
776 }
777
778 pub fn resource_entity<R: 'static>(&self) -> Option<E> {
781 let global_entity = self.resource_registry.entity_for::<R>()?;
782 self.global_entity_map
783 .global_entity_to_entity(&global_entity)
784 .ok()
785 }
786
787 pub fn is_resource_entity(&self, world_entity: &E) -> bool {
790 let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity)
791 else {
792 return false;
793 };
794 self.resource_registry.is_resource_entity(&global_entity)
795 }
796
797 pub fn resources_count(&self) -> usize {
799 self.resource_registry.len()
800 }
801
802 pub fn resource_entities(&self) -> Vec<E> {
804 let mut out = Vec::with_capacity(self.resource_registry.len());
805 for global_entity in self.resource_registry.entities() {
806 if let Ok(e) = self
807 .global_entity_map
808 .global_entity_to_entity(global_entity)
809 {
810 out.push(e);
811 }
812 }
813 out
814 }
815
816 pub fn entity<W: WorldRefType<E>>(&'_ self, world: W, entity: &E) -> EntityRef<'_, E, W> {
822 if world.has_entity(entity) {
823 return EntityRef::new(self, world, entity);
824 }
825 panic!("No Entity exists for given Key!");
826 }
827
828 pub fn entity_mut<W: WorldMutType<E>>(
835 &'_ mut self,
836 world: W,
837 entity: &E,
838 ) -> EntityMut<'_, E, W> {
839 self.check_client_authoritative_allowed();
840 if world.has_entity(entity) {
841 return EntityMut::new(self, world, entity);
842 }
843 panic!("No Entity exists for given Key!");
844 }
845
846 pub fn entities<W: WorldRefType<E>>(&self, world: &W) -> Vec<E> {
848 world.entities()
849 }
850
851 pub(crate) fn entity_owner(&self, world_entity: &E) -> EntityOwner {
852 if let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) {
853 if let Some(owner) = self.global_world_manager.entity_owner(&global_entity) {
854 return owner;
855 }
856 }
857 EntityOwner::Local
858 }
859
860 pub fn enable_entity_replication(&mut self, entity: &E) {
871 self.check_client_authoritative_allowed();
872 self.spawn_entity_inner(entity);
873 }
874
875 pub fn disable_entity_replication(&mut self, entity: &E) {
883 self.check_client_authoritative_allowed();
884 self.despawn_entity_worldless(entity);
886 }
887
888 pub fn entity_replication_config(&self, world_entity: &E) -> Option<Publicity> {
896 self.check_client_authoritative_allowed();
897 let global_entity = self
898 .global_entity_map
899 .entity_to_global_entity(world_entity)
900 .unwrap();
901 self.global_world_manager
902 .entity_replication_config(&global_entity)
903 }
904
905 pub fn configure_entity_replication<W: WorldMutType<E>>(
918 &mut self,
919 world: &mut W,
920 world_entity: &E,
921 config: Publicity,
922 ) {
923 self.check_client_authoritative_allowed();
924 let global_entity = self
925 .global_entity_map
926 .entity_to_global_entity(world_entity)
927 .unwrap();
928 if !self.global_world_manager.has_entity(&global_entity) {
929 panic!("Entity is not yet replicating. Be sure to call `enable_replication` or `spawn_entity` on the Client, before configuring replication.");
930 }
931 let entity_owner = self
932 .global_world_manager
933 .entity_owner(&global_entity)
934 .unwrap();
935 let server_owned = entity_owner.is_server();
936 if server_owned {
937 panic!("Client cannot configure replication strategy of Server-owned Entities.");
938 }
939 let client_owned = entity_owner.is_client();
940 if !client_owned {
941 panic!("Client cannot configure replication strategy of Entities it does not own.");
942 }
943 let next_config = config;
944 let prev_config = self
945 .global_world_manager
946 .entity_replication_config(&global_entity)
947 .unwrap();
948 if prev_config == config {
949 return;
951 }
952 match prev_config {
953 Publicity::Private => {
954 match next_config {
955 Publicity::Private => {
956 panic!("This should not be possible.");
957 }
958 Publicity::Public => {
959 self.publish_entity(&global_entity, true);
961 }
962 Publicity::Delegated => {
963 self.publish_entity(&global_entity, true);
965 self.entity_enable_delegation(world, &global_entity, world_entity, true);
966 }
967 }
968 }
969 Publicity::Public => {
970 match next_config {
971 Publicity::Private => {
972 self.unpublish_entity(&global_entity, true);
974 }
975 Publicity::Public => {
976 panic!("This should not be possible.");
977 }
978 Publicity::Delegated => {
979 self.entity_enable_delegation(world, &global_entity, world_entity, true);
981 }
982 }
983 }
984 Publicity::Delegated => {
985 panic!(
986 "Delegated Entities are always ultimately Server-owned. Client cannot modify."
987 )
988 }
989 }
990 }
991
992 pub fn entity_authority_status(&self, world_entity: &E) -> Option<EntityAuthStatus> {
999 self.check_client_authoritative_allowed();
1000
1001 let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) else {
1002 return None;
1003 };
1004
1005 self.global_world_manager.entity_authority_status(&global_entity)
1006 }
1007
1008 pub fn entity_request_authority(&mut self, world_entity: &E) -> Result<(), AuthorityError> {
1023 self.check_client_authoritative_allowed();
1024
1025 let global_entity = self
1026 .global_entity_map
1027 .entity_to_global_entity(world_entity)
1028 .unwrap();
1029
1030 let result = self
1032 .global_world_manager
1033 .entity_request_authority(&global_entity);
1034
1035 if result.is_ok() {
1036 let Some(connection) = &mut self.server_connection else {
1038 return result;
1039 };
1040
1041 connection
1042 .base
1043 .world_manager
1044 .remote_send_request_auth(&global_entity);
1045 }
1046 result
1047 }
1048
1049 pub fn entity_release_authority(&mut self, world_entity: &E) -> Result<(), AuthorityError> {
1061 self.check_client_authoritative_allowed();
1062
1063 let global_entity = self
1064 .global_entity_map
1065 .entity_to_global_entity(world_entity)
1066 .unwrap();
1067
1068 let result = self
1070 .global_world_manager
1071 .entity_release_authority(&global_entity);
1072 if result.is_ok() {
1073 let Some(connection) = &mut self.server_connection else {
1074 return result;
1075 };
1076 connection
1077 .base
1078 .world_manager
1079 .remote_send_release_auth(&global_entity);
1080 }
1081 result
1082 }
1083
1084 pub fn server_address(&self) -> Result<SocketAddr, NaiaClientError> {
1092 self.io.server_addr()
1093 }
1094
1095 pub fn rtt(&self) -> f32 {
1099 self.server_connection
1100 .as_ref()
1101 .map(|conn| conn.time_manager.rtt() / 1000.0)
1102 .unwrap_or(0.0)
1103 }
1104
1105 pub fn jitter(&self) -> f32 {
1110 self.server_connection
1111 .as_ref()
1112 .map(|conn| conn.time_manager.jitter() / 1000.0)
1113 .unwrap_or(0.0)
1114 }
1115
1116 pub fn client_tick(&self) -> Option<Tick> {
1125 let connection = self.server_connection.as_ref()?;
1126 Some(connection.time_manager.client_sending_tick)
1127 }
1128
1129 pub fn client_instant(&self) -> Option<GameInstant> {
1132 let connection = self.server_connection.as_ref()?;
1133 Some(connection.time_manager.client_sending_instant)
1134 }
1135
1136 pub fn server_tick(&self) -> Option<Tick> {
1142 let connection = self.server_connection.as_ref()?;
1143 Some(connection.time_manager.client_receiving_tick)
1144 }
1145
1146 pub fn server_instant(&self) -> Option<GameInstant> {
1149 let connection = self.server_connection.as_ref()?;
1150 Some(connection.time_manager.client_receiving_instant)
1151 }
1152
1153 pub fn tick_to_instant(&self, tick: Tick) -> Option<GameInstant> {
1156 if let Some(connection) = &self.server_connection {
1157 return Some(connection.time_manager.tick_to_instant(tick));
1158 }
1159 None
1160 }
1161
1162 pub fn tick_duration(&self) -> Option<Duration> {
1165 if let Some(connection) = &self.server_connection {
1166 return Some(connection.time_manager.tick_duration());
1167 }
1168 None
1169 }
1170
1171 pub fn client_interpolation(&self) -> Option<f32> {
1179 if let Some(connection) = &self.server_connection {
1180 return Some(connection.time_manager.client_interpolation());
1181 }
1182 None
1183 }
1184
1185 pub fn server_interpolation(&self) -> Option<f32> {
1192 if let Some(connection) = &self.server_connection {
1193 return Some(connection.time_manager.server_interpolation());
1194 }
1195 None
1196 }
1197
1198 pub fn outgoing_bandwidth(&self) -> f32 {
1203 self.io.outgoing_bandwidth()
1204 }
1205
1206 pub fn incoming_bandwidth(&self) -> f32 {
1209 self.io.incoming_bandwidth()
1210 }
1211
1212 pub fn connection_stats(&self) -> Option<ConnectionStats> {
1217 let conn = self.server_connection.as_ref()?;
1218 let rtt_ms = conn.time_manager.rtt();
1219 let jitter_ms = conn.time_manager.jitter();
1220 let packet_loss_pct = conn.base.packet_loss_pct();
1221 Some(ConnectionStats {
1222 rtt_ms,
1223 rtt_p50_ms: rtt_ms,
1224 rtt_p99_ms: conn.time_manager.rtt_p99_ms(),
1225 jitter_ms,
1226 packet_loss_pct,
1227 kbps_sent: self.io.outgoing_bandwidth(),
1228 kbps_recv: self.io.incoming_bandwidth(),
1229 })
1230 }
1231
1232 pub(crate) fn despawn_entity<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
1238 if !world.has_entity(entity) {
1239 panic!("attempted to de-spawn nonexistent entity");
1240 }
1241
1242 world.despawn_entity(entity);
1244
1245 self.despawn_entity_worldless(entity);
1247 }
1248
1249 pub fn despawn_entity_worldless(&mut self, world_entity: &E) {
1263 let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) else {
1264 warn!("attempting to despawn entity that has already been despawned?");
1265 return;
1266 };
1267 if !self.global_world_manager.has_entity(&global_entity) {
1268 warn!("attempting to despawn entity that has already been despawned?");
1269 return;
1270 }
1271
1272 if let Some(owner) = self.global_world_manager.entity_owner(&global_entity) {
1274 if owner.is_server() {
1275 let is_delegated = self
1276 .global_world_manager
1277 .entity_is_delegated(&global_entity);
1278 if !is_delegated {
1279 panic!("attempting to despawn entity that is not yet delegated. Delegation needs some time to be confirmed by the Server, so check that a despawn is possible by calling `commands.entity(..).replication_config(..).is_delegated()` first.");
1280 }
1281 if self
1282 .global_world_manager
1283 .entity_authority_status(&global_entity)
1284 != Some(EntityAuthStatus::Granted)
1285 {
1286 panic!("attempting to despawn entity that we do not have authority over");
1287 }
1288 }
1289 } else {
1290 panic!("attempting to despawn entity that has no owner");
1291 }
1292
1293 if let Some(connection) = &mut self.server_connection {
1294 connection.base.world_manager.despawn_entity(&global_entity);
1296 }
1297
1298 self.global_world_manager
1300 .host_despawn_entity(&global_entity);
1301 }
1302
1303 pub(crate) fn insert_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1305 &mut self,
1306 world: &mut W,
1307 entity: &E,
1308 mut component: R,
1309 ) {
1310 if !world.has_entity(entity) {
1311 panic!("attempted to add component to non-existent entity");
1312 }
1313
1314 let component_kind = component.kind();
1315
1316 if let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(entity) {
1321 let owner = self.global_world_manager.entity_owner(&global_entity);
1322 let is_delegated = self
1323 .global_world_manager
1324 .entity_is_delegated(&global_entity);
1325
1326 let can_mutate = if is_delegated {
1327 self.global_world_manager
1329 .entity_authority_status(&global_entity)
1330 == Some(EntityAuthStatus::Granted)
1331 } else if let Some(owner) = owner {
1332 owner.is_client()
1334 } else {
1335 false
1337 };
1338
1339 if !can_mutate {
1340 return;
1342 }
1343 }
1344
1345 if world.has_component_of_kind(entity, &component_kind) {
1346 let Some(mut component_mut) = world.component_mut::<R>(entity) else {
1349 panic!("Should never happen because we checked for this above");
1350 };
1351 component_mut.mirror(&component);
1352 } else {
1353 self.insert_component_worldless(entity, &mut component);
1356
1357 world.insert_component(entity, component);
1359 }
1360 }
1361
1362 pub fn component_name(&self, component_kind: &ComponentKind) -> String {
1365 self.protocol.component_kinds.kind_to_name(component_kind)
1366 }
1367
1368 pub fn insert_component_worldless(&mut self, world_entity: &E, component: &mut dyn Replicate) {
1376 let component_kind = component.kind();
1377
1378 let global_entity = self
1379 .global_entity_map
1380 .entity_to_global_entity(world_entity)
1381 .unwrap();
1382
1383 if self
1390 .global_world_manager
1391 .component_already_host_registered(&global_entity, &component_kind)
1392 {
1393 return;
1394 }
1395
1396 self.global_world_manager.host_insert_component(
1401 &self.protocol.component_kinds,
1402 &global_entity,
1403 component,
1404 );
1405
1406 if let Some(connection) = &mut self.server_connection {
1408 if connection
1410 .base
1411 .world_manager
1412 .has_global_entity(&global_entity)
1413 {
1414 connection
1415 .base
1416 .world_manager
1417 .insert_component(&global_entity, &component_kind);
1418 } else {
1419 warn!("Attempting to insert component into a non-existent entity in the server connection. This should not happen.");
1420 }
1421 } else {
1422 warn!("Attempting to insert component into a non-existent entity in the server connection. This should not happen.");
1423 }
1424
1425 if self
1427 .global_world_manager
1428 .entity_is_delegated(&global_entity)
1429 {
1430 let accessor = self
1431 .global_world_manager
1432 .get_entity_auth_accessor(&global_entity);
1433 component.enable_delegation(&accessor, None)
1434 }
1435 }
1436
1437 pub(crate) fn remove_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1439 &mut self,
1440 world: &mut W,
1441 entity: &E,
1442 ) -> Option<R> {
1443 let component_kind = ComponentKind::of::<R>();
1445
1446 self.remove_component_worldless(entity, &component_kind);
1447
1448 world.remove_component::<R>(entity)
1450 }
1451
1452 pub fn remove_component_worldless(&mut self, world_entity: &E, component_kind: &ComponentKind) {
1460 let global_entity = self
1461 .global_entity_map
1462 .entity_to_global_entity(world_entity)
1463 .unwrap();
1464
1465 if let Some(connection) = &mut self.server_connection {
1467 connection
1468 .base
1469 .world_manager
1470 .remove_component(&global_entity, component_kind);
1471 }
1472
1473 self.global_world_manager
1475 .host_remove_component(&global_entity, component_kind);
1476 }
1477
1478 pub(crate) fn publish_entity(&mut self, global_entity: &GlobalEntity, client_is_origin: bool) {
1479 if client_is_origin {
1480 let Some(connection) = &mut self.server_connection else {
1482 return;
1483 };
1484 connection
1485 .base
1486 .world_manager
1487 .send_publish(HostType::Client, global_entity);
1488 } else if self
1489 .global_world_manager
1490 .entity_replication_config(global_entity)
1491 != Some(Publicity::Private)
1492 {
1493 panic!("Server can only publish Private entities");
1494 }
1495 self.global_world_manager.entity_publish(global_entity);
1496 }
1498
1499 pub(crate) fn unpublish_entity(
1500 &mut self,
1501 global_entity: &GlobalEntity,
1502 client_is_origin: bool,
1503 ) {
1504 if client_is_origin {
1505 let Some(connection) = &mut self.server_connection else {
1507 return;
1508 };
1509 connection
1510 .base
1511 .world_manager
1512 .send_unpublish(HostType::Client, global_entity);
1513 } else if self
1514 .global_world_manager
1515 .entity_replication_config(global_entity)
1516 != Some(Publicity::Public)
1517 {
1518 panic!("Server can only unpublish Public entities");
1519 }
1520 self.global_world_manager.entity_unpublish(global_entity);
1521 }
1523
1524 pub(crate) fn entity_enable_delegation<W: WorldMutType<E>>(
1525 &mut self,
1526 world: &mut W,
1527 global_entity: &GlobalEntity,
1528 world_entity: &E,
1529 client_is_origin: bool,
1530 ) {
1531 self.global_world_manager
1533 .entity_register_auth_for_delegation(global_entity);
1534
1535 if client_is_origin {
1536 let Some(connection) = &mut self.server_connection else {
1543 return;
1544 };
1545 connection.base.world_manager.send_enable_delegation(
1546 HostType::Client,
1547 true,
1548 global_entity,
1549 );
1550 } else {
1551 self.entity_complete_delegation(world, global_entity, world_entity);
1552 for component_kind in world.component_kinds(world_entity) {
1553 if !self
1554 .global_world_manager
1555 .entity_has_component(global_entity, &component_kind)
1556 {
1557 self.global_world_manager
1558 .remote_insert_component(global_entity, &component_kind);
1559 }
1560 }
1561 self.global_world_manager
1562 .entity_update_authority(global_entity, EntityAuthStatus::Available);
1563 }
1564 }
1565
1566 fn entity_complete_delegation<W: WorldMutType<E>>(
1567 &mut self,
1568 world: &mut W,
1569 global_entity: &GlobalEntity,
1570 world_entity: &E,
1571 ) {
1572 world.entity_enable_delegation(
1575 &self.protocol.component_kinds,
1576 &self.global_entity_map,
1577 &self.global_world_manager,
1578 world_entity,
1579 );
1580
1581 self.global_world_manager
1583 .entity_enable_delegation(global_entity);
1584 }
1585
1586 pub(crate) fn entity_disable_delegation<W: WorldMutType<E>>(
1587 &mut self,
1588 world: &mut W,
1589 global_entity: &GlobalEntity,
1590 world_entity: &E,
1591 client_is_origin: bool,
1592 ) {
1593 info!("client.entity_disable_delegation");
1594 if client_is_origin {
1595 panic!("Cannot disable delegation from Client. Server owns all delegated Entities.");
1596 }
1597
1598 let had_granted = self
1600 .global_world_manager
1601 .entity_authority_status(global_entity)
1602 == Some(EntityAuthStatus::Granted);
1603
1604 self.global_world_manager
1606 .entity_disable_delegation(global_entity);
1607 world.entity_disable_delegation(world_entity);
1608
1609 if had_granted {
1611 self.incoming_world_events.push_auth_reset(*world_entity);
1612 }
1613
1614 if let Some(connection) = &mut self.server_connection {
1616 connection.base.world_manager.despawn_entity(global_entity);
1617 }
1618
1619 }
1623
1624 pub(crate) fn entity_update_authority(
1625 &mut self,
1626 global_entity: &GlobalEntity,
1627 world_entity: &E,
1628 new_auth_status: EntityAuthStatus,
1629 ) {
1630 let old_auth_status = self
1631 .global_world_manager
1632 .entity_authority_status(global_entity)
1633 .unwrap();
1634
1635 self.global_world_manager
1636 .entity_update_authority(global_entity, new_auth_status);
1637
1638 #[cfg(feature = "e2e_debug")]
1640 if new_auth_status == EntityAuthStatus::Granted {
1641 use crate::counters::CLIENT_HANDLE_SET_AUTH;
1642 use std::sync::atomic::Ordering;
1643 CLIENT_HANDLE_SET_AUTH.fetch_add(1, Ordering::Relaxed);
1644 }
1645
1646 if let Some(connection) = &mut self.server_connection {
1649 let channel_status_before = connection
1651 .base
1652 .world_manager
1653 .get_remote_entity_auth_status(global_entity);
1654
1655 if channel_status_before.is_some() {
1657 connection
1658 .base
1659 .world_manager
1660 .remote_receive_set_auth(global_entity, new_auth_status);
1661 } else {
1662 warn!(
1663 "Entity {:?} not yet migrated to RemoteEntity - channel sync skipped",
1664 global_entity
1665 );
1666 }
1667 } else {
1668 debug!(" No server connection - skipping channel sync");
1669 }
1670
1671 match (old_auth_status, new_auth_status) {
1678 (EntityAuthStatus::Requested, EntityAuthStatus::Granted)
1680 | (EntityAuthStatus::Denied, EntityAuthStatus::Granted)
1681 | (EntityAuthStatus::Available, EntityAuthStatus::Granted) => {
1682 self.server_connection
1684 .as_mut()
1685 .unwrap()
1686 .base
1687 .world_manager
1688 .register_authed_entity(&self.global_world_manager, global_entity);
1689 self.incoming_world_events.push_auth_grant(*world_entity);
1690 #[cfg(feature = "e2e_debug")]
1691 {
1692 use crate::counters::CLIENT_EMIT_AUTH_GRANTED_EVENT;
1693 use std::sync::atomic::Ordering;
1694 CLIENT_EMIT_AUTH_GRANTED_EVENT.fetch_add(1, Ordering::Relaxed);
1695 }
1696 }
1697 (EntityAuthStatus::Granted, EntityAuthStatus::Available)
1699 | (EntityAuthStatus::Granted, EntityAuthStatus::Denied) => {
1700 self.server_connection
1702 .as_mut()
1703 .unwrap()
1704 .base
1705 .world_manager
1706 .deregister_authed_entity(&self.global_world_manager, global_entity);
1707 self.incoming_world_events.push_auth_reset(*world_entity);
1708 }
1709 (EntityAuthStatus::Requested, EntityAuthStatus::Denied) => {
1711 self.incoming_world_events.push_auth_deny(*world_entity);
1713 }
1714 (EntityAuthStatus::Releasing, EntityAuthStatus::Available) => {
1716 self.server_connection
1717 .as_mut()
1718 .unwrap()
1719 .base
1720 .world_manager
1721 .deregister_authed_entity(&self.global_world_manager, global_entity);
1722 self.incoming_world_events.push_auth_reset(*world_entity);
1723 }
1724 (EntityAuthStatus::Releasing, EntityAuthStatus::Denied) => {
1725 self.server_connection
1727 .as_mut()
1728 .unwrap()
1729 .base
1730 .world_manager
1731 .deregister_authed_entity(&self.global_world_manager, global_entity);
1732 self.incoming_world_events.push_auth_reset(*world_entity);
1733 }
1734 (EntityAuthStatus::Releasing, EntityAuthStatus::Granted) => {
1735 self.global_world_manager
1737 .entity_update_authority(global_entity, EntityAuthStatus::Available);
1738 }
1739 (EntityAuthStatus::Available, EntityAuthStatus::Denied) => {
1746 self.incoming_world_events.push_auth_deny(*world_entity);
1747 }
1748 (EntityAuthStatus::Denied, EntityAuthStatus::Available) => {
1749 self.incoming_world_events.push_auth_reset(*world_entity);
1751 }
1752 (EntityAuthStatus::Available, EntityAuthStatus::Available)
1753 | (EntityAuthStatus::Denied, EntityAuthStatus::Denied)
1754 | (EntityAuthStatus::Granted, EntityAuthStatus::Granted)
1755 | (EntityAuthStatus::Requested, EntityAuthStatus::Requested)
1756 | (EntityAuthStatus::Releasing, EntityAuthStatus::Releasing) => {
1757 }
1765 (_, _) => {
1766 panic!(
1767 "-- Entity {:?} updated authority, not handled -- {:?} -> {:?}",
1768 global_entity, old_auth_status, new_auth_status
1769 );
1770 }
1771 }
1772 }
1773
1774 fn check_client_authoritative_allowed(&self) {
1777 if !self.protocol.client_authoritative_entities {
1778 panic!("Cannot perform this operation: Client Authoritative Entities are not enabled! Enable them in the Protocol, with the `enable_client_authoritative_entities() method, and note that if you do enable them, to make sure you handle all Spawn/Insert/Update events in the Server, as this may be an attack vector.")
1779 }
1780 }
1781
1782 fn maintain_socket(&mut self) {
1783 self.io.tick_bandwidth_monitors();
1785
1786 if self.server_connection.is_none() {
1787 self.maintain_handshake();
1788 }
1789 if self.server_connection.is_some() {
1794 self.maintain_connection();
1795 }
1796 }
1797
1798 fn maintain_handshake(&mut self) {
1799 if !self.io.is_loaded() {
1802 return;
1803 }
1804
1805 if !self.io.is_authenticated() {
1806 match self.io.recv_auth() {
1807 IdentityReceiverResult::Success(id_token) => {
1808 self.handshake_manager.set_identity_token(id_token);
1809 }
1810 IdentityReceiverResult::Waiting => {
1811 return;
1812 }
1813 IdentityReceiverResult::ErrorResponseCode(code) => {
1814 let old_socket_addr_result = self.io.server_addr();
1815
1816 self.io = Io::new(
1818 &self.client_config.connection.bandwidth_measure_duration,
1819 &self.protocol.compression,
1820 );
1821
1822 if code == 401 {
1823 match old_socket_addr_result {
1825 Ok(old_socket_addr) => {
1826 self.incoming_world_events
1827 .push_rejection(&old_socket_addr, RejectReason::Auth);
1828 }
1829 Err(err) => {
1830 self.incoming_world_events.push_error(err);
1831 }
1832 }
1833 } else {
1834 self.incoming_world_events
1836 .push_error(NaiaClientError::IdError(code));
1837 }
1838
1839 return;
1840 }
1841 }
1842 }
1843
1844 loop {
1846 match self.io.recv_reader() {
1847 Ok(Some(mut reader)) => {
1848 match self.handshake_manager.recv(&mut reader) {
1849 Some(HandshakeResult::Connected(time_manager)) => {
1850 self.server_connection = Some(Connection::new(
1852 &self.client_config.connection,
1853 &self.protocol.channel_kinds,
1854 *time_manager,
1855 &self.global_world_manager,
1856 self.client_config.jitter_buffer,
1857 &self.protocol.component_kinds,
1858 ));
1859 self.on_connect();
1860
1861 let server_addr = self.server_address_unwrapped();
1862 self.incoming_world_events.push_connection(&server_addr);
1863
1864 break;
1870 }
1871 Some(HandshakeResult::Rejected(reason)) => {
1872 info!("Client: Received HandshakeResult::Rejected({:?})", reason);
1873 let server_addr = self.server_address_unwrapped();
1874 self.incoming_world_events
1875 .push_rejection(&server_addr, reason);
1876 self.disconnect_reset_connection();
1877 break;
1878 }
1879 None => {}
1880 }
1881 }
1882 Ok(None) => {
1883 break;
1884 }
1885 Err(error) => {
1886 self.incoming_world_events
1887 .push_error(NaiaClientError::Wrapped(Box::new(error)));
1888 }
1889 }
1890 }
1891 }
1892
1893 fn maintain_connection(&mut self) {
1894 let Some(connection) = self.server_connection.as_mut() else {
1897 panic!("Should have checked for this above");
1898 };
1899
1900 Self::handle_heartbeats(connection, &mut self.io);
1901 Self::handle_pings(connection, &mut self.io);
1902 Self::handle_empty_acks(connection, &mut self.io);
1903
1904 let mut received_any = false;
1905
1906 loop {
1908 match self.io.recv_reader() {
1909 Ok(Some(mut reader)) => {
1910 connection.mark_heard();
1911
1912 let header = match StandardHeader::de(&mut reader) {
1913 Ok(h) => h,
1914 Err(_e) => {
1915 continue;
1916 }
1917 };
1918 match header.packet_type {
1919 PacketType::Data => {
1920 #[cfg(feature = "e2e_debug")]
1922 {
1923 use crate::counters::CLIENT_WORLD_PKTS_RECV;
1924 use std::sync::atomic::Ordering;
1925 CLIENT_WORLD_PKTS_RECV.fetch_add(1, Ordering::Relaxed);
1926 }
1927 }
1929 PacketType::Heartbeat | PacketType::Ping | PacketType::Pong => {
1930 }
1933 PacketType::Handshake => {
1934 let Ok(handshake_header) = HandshakeHeader::de(&mut reader) else {
1937 warn!("unable to parse handshake header from server");
1938 continue;
1939 };
1940 if matches!(handshake_header, HandshakeHeader::Disconnect) {
1941 info!("Received disconnect from server");
1942 self.server_disconnect = true;
1943 }
1944 continue;
1945 }
1946 }
1947
1948 received_any = true;
1950 connection.process_incoming_header(&header);
1951
1952 let Ok(server_tick) = Tick::de(&mut reader) else {
1954 warn!("unable to parse server_tick from packet");
1955 continue;
1956 };
1957
1958 let Ok(server_tick_instant) = GameInstant::de(&mut reader) else {
1960 warn!("unable to parse server_tick_instant from packet");
1961 continue;
1962 };
1963
1964 connection
1965 .time_manager
1966 .recv_tick_instant(&server_tick, &server_tick_instant);
1967
1968 match header.packet_type {
1970 PacketType::Data => {
1971 connection.base.mark_should_send_empty_ack();
1972
1973 if connection
1974 .buffer_data_packet(&server_tick, &mut reader)
1975 .is_err()
1976 {
1977 warn!("unable to parse data packet");
1978 continue;
1979 }
1980 }
1981 PacketType::Heartbeat => {
1982 }
1984 PacketType::Ping => {
1985 let Ok(ping_index) = BaseTimeManager::read_ping(&mut reader) else {
1986 panic!("unable to read ping index");
1987 };
1988 BaseTimeManager::send_pong(connection, &mut self.io, ping_index);
1989 }
1990 PacketType::Pong => {
1991 if connection.time_manager.read_pong(&mut reader).is_err() {
1992 warn!("Client Error: Cannot process pong packet from Server");
1995 }
1996 }
1997 _ => {
1998 }
2001 }
2002 }
2003 Ok(None) => {
2004 break;
2005 }
2006 Err(error) => {
2007 self.incoming_world_events
2008 .push_error(NaiaClientError::Wrapped(Box::new(error)));
2009 }
2010 }
2011 }
2012
2013 if received_any {
2014 connection.process_received_commands();
2015 }
2016 }
2017
2018 fn handle_heartbeats(connection: &mut Connection, io: &mut Io) {
2019 if connection.base.should_send_heartbeat() {
2021 Self::send_heartbeat_packet(connection, io);
2022 }
2023 }
2024
2025 fn handle_empty_acks(connection: &mut Connection, io: &mut Io) {
2026 if connection.base.should_send_empty_ack() {
2028 Self::send_heartbeat_packet(connection, io);
2029 }
2030 }
2031
2032 fn send_heartbeat_packet(connection: &mut Connection, io: &mut Io) {
2033 let mut writer = BitWriter::new();
2034
2035 let _header = connection
2037 .base
2038 .write_header(PacketType::Heartbeat, &mut writer);
2039
2040 if io.send_packet(writer.to_packet()).is_err() {
2042 warn!("Client Error: Cannot send heartbeat packet to Server");
2045 }
2046 connection.mark_sent();
2047 }
2048
2049 fn handle_pings(connection: &mut Connection, io: &mut Io) {
2050 if connection.time_manager.send_ping(io) {
2052 connection.mark_sent();
2053 }
2054 }
2055
2056 fn disconnect_with_events<W: WorldMutType<E>>(&mut self, world: &mut W, reason: naia_shared::DisconnectReason) {
2057 let server_addr = self.server_address_unwrapped();
2058
2059 self.incoming_world_events.clear();
2060 self.incoming_tick_events.clear();
2061
2062 self.despawn_all_remote_entities(world);
2063 self.disconnect_reset_connection();
2064
2065 self.incoming_world_events.push_disconnection(&server_addr, reason);
2066 }
2067
2068 fn despawn_all_remote_entities<W: WorldMutType<E>>(&mut self, world: &mut W) {
2069 let Some(connection) = self.server_connection.as_mut() else {
2073 panic!("Client is already disconnected!");
2074 };
2075
2076 let remote_entities = connection.base.world_manager.remote_entities();
2077 let entity_events = SharedGlobalWorldManager::despawn_all_entities(
2078 world,
2079 &self.global_entity_map,
2080 &self.global_world_manager,
2081 remote_entities,
2082 );
2083 self.process_entity_events(world, entity_events);
2084 }
2085
2086 fn disconnect_reset_connection(&mut self) {
2087 self.server_connection = None;
2088
2089 self.io = Io::new(
2090 &self.client_config.connection.bandwidth_measure_duration,
2091 &self.protocol.compression,
2092 );
2093
2094 self.handshake_manager = Box::new(HandshakeManager::new(
2095 self.protocol_id,
2096 self.client_config.send_handshake_interval,
2097 self.client_config.ping_interval,
2098 self.client_config.handshake_pings,
2099 ));
2100
2101 self.manual_disconnect = false;
2102 self.global_world_manager = GlobalWorldManager::new();
2103 }
2104
2105 fn server_address_unwrapped(&self) -> SocketAddr {
2106 self.io.server_addr().expect("connection not established!")
2108 }
2109
2110 #[cfg(feature = "e2e_debug")]
2111 pub fn debug_remote_channel_diagnostic(
2112 &self,
2113 remote_entity: &naia_shared::RemoteEntity,
2114 ) -> Option<(
2115 naia_shared::EntityChannelState,
2116 (
2117 naia_shared::SubCommandId,
2118 usize,
2119 Option<naia_shared::SubCommandId>,
2120 usize,
2121 ),
2122 )> {
2123 let Some(connection) = self.server_connection.as_ref() else {
2124 return None;
2125 };
2126 connection
2127 .base
2128 .world_manager
2129 .debug_remote_channel_diagnostic(remote_entity)
2130 }
2131
2132 #[cfg(feature = "e2e_debug")]
2133 pub fn debug_remote_channel_snapshot(
2134 &self,
2135 remote_entity: &naia_shared::RemoteEntity,
2136 ) -> Option<(
2137 naia_shared::EntityChannelState,
2138 Option<naia_shared::MessageIndex>,
2139 usize,
2140 Option<(naia_shared::MessageIndex, naia_shared::EntityMessageType)>,
2141 Option<naia_shared::MessageIndex>,
2142 )> {
2143 let Some(connection) = self.server_connection.as_ref() else {
2144 return None;
2145 };
2146 connection
2147 .base
2148 .world_manager
2149 .debug_remote_channel_snapshot(remote_entity)
2150 }
2151
2152 fn process_entity_events<W: WorldMutType<E>>(
2153 &mut self,
2154 world: &mut W,
2155 entity_events: Vec<EntityEvent>,
2156 ) {
2157 for response_event in entity_events {
2158 match response_event {
2163 EntityEvent::Spawn(global_entity) => {
2164 let world_entity = self
2165 .global_entity_map
2166 .global_entity_to_entity(&global_entity)
2167 .unwrap();
2168 self.incoming_world_events.push_spawn(world_entity);
2169 self.global_world_manager
2170 .remote_spawn_entity(&global_entity);
2171 let Some(connection) = self.server_connection.as_mut() else {
2172 panic!("Client is disconnected!");
2173 };
2174 connection
2175 .base
2176 .world_manager
2177 .remote_spawn_entity(&global_entity); #[cfg(feature = "e2e_debug")]
2179 {
2180 use crate::counters::CLIENT_SCOPE_APPLIED_ADD_E2;
2181 use std::sync::atomic::Ordering;
2182 CLIENT_SCOPE_APPLIED_ADD_E2.fetch_add(1, Ordering::Relaxed);
2183 }
2184 }
2185 EntityEvent::Despawn(global_entity) => {
2186 let world_entity = self
2187 .global_entity_map
2188 .global_entity_to_entity(&global_entity)
2189 .unwrap();
2190 self.resource_registry.remove_by_entity(&global_entity);
2194 self.incoming_world_events.push_despawn(world_entity);
2195 if self
2196 .global_world_manager
2197 .entity_is_delegated(&global_entity)
2198 {
2199 if let Some(status) = self
2200 .global_world_manager
2201 .entity_authority_status(&global_entity)
2202 {
2203 if status != EntityAuthStatus::Available {
2204 self.entity_update_authority(
2205 &global_entity,
2206 &world_entity,
2207 EntityAuthStatus::Available,
2208 );
2209 }
2210 }
2211 }
2212 self.global_world_manager
2213 .remove_entity_record(&global_entity);
2214 self.global_entity_map.despawn_by_global(&global_entity);
2215 #[cfg(feature = "e2e_debug")]
2216 {
2217 use crate::counters::CLIENT_SCOPE_APPLIED_REMOVE_E1;
2218 use std::sync::atomic::Ordering;
2219 CLIENT_SCOPE_APPLIED_REMOVE_E1.fetch_add(1, Ordering::Relaxed);
2220 }
2221 }
2222 EntityEvent::InsertComponent(global_entity, component_kind) => {
2223 let world_entity = self
2224 .global_entity_map
2225 .global_entity_to_entity(&global_entity)
2226 .unwrap();
2227 if self.protocol.resource_kinds.is_resource(&component_kind) {
2233 let type_id: std::any::TypeId = component_kind.into();
2234 let _ = self.resource_registry.insert_raw(type_id, global_entity);
2235 }
2236 self.incoming_world_events
2237 .push_insert(world_entity, component_kind);
2238
2239 if !self
2240 .global_world_manager
2241 .entity_has_component(&global_entity, &component_kind)
2242 {
2243 if self
2244 .global_world_manager
2245 .entity_is_delegated(&global_entity)
2246 {
2247 world.component_publish(
2256 &self.protocol.component_kinds,
2257 &self.global_entity_map,
2258 &self.global_world_manager,
2259 &world_entity,
2260 &component_kind,
2261 );
2262 world.component_enable_delegation(
2263 &self.protocol.component_kinds,
2264 &self.global_entity_map,
2265 &self.global_world_manager,
2266 &world_entity,
2267 &component_kind,
2268 );
2269 }
2270
2271 self.global_world_manager
2272 .remote_insert_component(&global_entity, &component_kind);
2273 }
2274 }
2275 EntityEvent::RemoveComponent(global_entity, component_box) => {
2276 let component_kind = component_box.kind();
2277 let world_entity = self
2278 .global_entity_map
2279 .global_entity_to_entity(&global_entity)
2280 .unwrap();
2281 self.incoming_world_events
2282 .push_remove(world_entity, component_box);
2283 if self
2284 .global_world_manager
2285 .entity_is_delegated(&global_entity)
2286 {
2287 self.remove_component_worldless(&world_entity, &component_kind);
2288 } else {
2289 self.global_world_manager
2290 .remove_component_record(&global_entity, &component_kind);
2291 }
2292 }
2293 EntityEvent::Publish(global_entity) => {
2294 let world_entity = self
2295 .global_entity_map
2296 .global_entity_to_entity(&global_entity)
2297 .unwrap();
2298 self.publish_entity(&global_entity, false);
2299 self.incoming_world_events.push_publish(world_entity);
2300 }
2301 EntityEvent::Unpublish(global_entity) => {
2302 let world_entity = self
2303 .global_entity_map
2304 .global_entity_to_entity(&global_entity)
2305 .unwrap();
2306 self.unpublish_entity(&global_entity, false);
2307 self.incoming_world_events.push_unpublish(world_entity);
2308 }
2309 EntityEvent::EnableDelegation(global_entity) => {
2310 #[cfg(feature = "e2e_debug")]
2311 naia_shared::e2e_trace!(
2312 "[CLIENT_RECV] EnableDelegation entity={:?}",
2313 global_entity
2314 );
2315 let world_entity = self
2316 .global_entity_map
2317 .global_entity_to_entity(&global_entity)
2318 .unwrap();
2319
2320 self.entity_enable_delegation(world, &global_entity, &world_entity, false);
2321
2322 let Some(connection) = &mut self.server_connection else {
2324 return;
2325 };
2326 connection
2327 .base
2328 .world_manager
2329 .send_enable_delegation_response(&global_entity); }
2331 EntityEvent::EnableDelegationResponse(_) => {
2332 panic!("Client should never receive an EnableDelegationEntityResponse event");
2333 }
2334 EntityEvent::DisableDelegation(global_entity) => {
2335 #[cfg(feature = "e2e_debug")]
2336 {
2337 let delegated_at_entry = self
2338 .global_world_manager
2339 .entity_is_delegated(&global_entity);
2340 naia_shared::e2e_trace!(
2341 "[CLIENT_RECV] DisableDelegation entity={:?} delegated_at_entry={}",
2342 global_entity,
2343 delegated_at_entry
2344 );
2345 }
2346 let world_entity = self
2347 .global_entity_map
2348 .global_entity_to_entity(&global_entity)
2349 .unwrap();
2350 self.entity_disable_delegation(world, &global_entity, &world_entity, false);
2351 }
2352 EntityEvent::RequestAuthority(_global_entity) => {
2353 panic!("Client should never receive an EntityRequestAuthority event");
2354 }
2355 EntityEvent::ReleaseAuthority(_global_entity) => {
2356 panic!("Client should never receive an EntityReleaseAuthority event");
2357 }
2358 EntityEvent::SetAuthority(global_entity, new_auth_status) => {
2359 #[cfg(feature = "e2e_debug")]
2361 if new_auth_status == EntityAuthStatus::Granted {
2362 use crate::counters::{CLIENT_RX_SET_AUTH, CLIENT_TO_EVENT_SET_AUTH_OK};
2363 use std::sync::atomic::Ordering;
2364 CLIENT_RX_SET_AUTH.fetch_add(1, Ordering::Relaxed);
2365 CLIENT_TO_EVENT_SET_AUTH_OK.fetch_add(1, Ordering::Relaxed);
2366 }
2367 let world_entity = self
2368 .global_entity_map
2369 .global_entity_to_entity(&global_entity)
2370 .unwrap();
2371 self.entity_update_authority(&global_entity, &world_entity, new_auth_status);
2372 }
2373 EntityEvent::MigrateResponse(global_entity, new_remote_entity) => {
2374 let world_entity = match self
2376 .global_entity_map
2377 .global_entity_to_entity(&global_entity)
2378 {
2379 Ok(entity) => entity,
2380 Err(_) => {
2381 warn!(
2382 "Received MigrateResponse for unknown global entity: {:?}",
2383 global_entity
2384 );
2385 return;
2386 }
2387 };
2388
2389 {
2391 let Some(connection) = &mut self.server_connection else {
2392 warn!("Received MigrateResponse without active server connection");
2393 return;
2394 };
2395
2396 let old_host_entity = match connection
2397 .base
2398 .world_manager
2399 .entity_converter()
2400 .global_entity_to_host_entity(&global_entity)
2401 {
2402 Ok(entity) => entity,
2403 Err(_) => {
2404 warn!(
2405 "Entity {:?} does not exist as HostEntity before migration",
2406 global_entity
2407 );
2408 return;
2409 }
2410 };
2411
2412 let buffered_commands = connection
2414 .base
2415 .world_manager
2416 .extract_host_entity_commands(&global_entity);
2417
2418 let component_kinds = connection
2420 .base
2421 .world_manager
2422 .extract_host_component_kinds(&global_entity);
2423
2424 connection
2426 .base
2427 .world_manager
2428 .remove_host_entity(&global_entity);
2429
2430 connection.base.world_manager.insert_remote_entity(
2432 &global_entity,
2433 new_remote_entity,
2434 component_kinds,
2435 );
2436
2437 let old_entity = OwnedLocalEntity::Host { id: old_host_entity.value(), is_static: false };
2439 let new_entity = new_remote_entity.copy_to_owned();
2440 connection
2441 .base
2442 .world_manager
2443 .install_entity_redirect(old_entity, new_entity);
2444
2445 connection
2447 .base
2448 .world_manager
2449 .update_sent_command_entity_refs(
2450 &global_entity,
2451 old_entity,
2452 new_entity,
2453 );
2454
2455 for command in buffered_commands {
2457 if command.is_valid_for_remote_entity() {
2458 connection
2459 .base
2460 .world_manager
2461 .replay_entity_command(&global_entity, command);
2462 }
2463 }
2464
2465 connection
2468 .base
2469 .world_manager
2470 .remote_receive_set_auth(&global_entity, EntityAuthStatus::Granted);
2471 }
2472
2473 if self
2490 .global_world_manager
2491 .entity_authority_status(&global_entity)
2492 .is_none()
2493 {
2494 self.global_world_manager
2495 .entity_register_auth_for_delegation(&global_entity);
2496 }
2497
2498 self.entity_complete_delegation(world, &global_entity, &world_entity);
2500
2501 self.global_world_manager
2503 .entity_update_authority(&global_entity, EntityAuthStatus::Granted);
2504
2505 self.incoming_world_events.push_auth_grant(world_entity);
2507 #[cfg(feature = "e2e_debug")]
2508 {
2509 use crate::counters::CLIENT_EMIT_AUTH_GRANTED_EVENT;
2510 use std::sync::atomic::Ordering;
2511 CLIENT_EMIT_AUTH_GRANTED_EVENT.fetch_add(1, Ordering::Relaxed);
2512 }
2513 }
2514 EntityEvent::UpdateComponent(tick, global_entity, component_kind) => {
2515 let world_entity = self
2516 .global_entity_map
2517 .global_entity_to_entity(&global_entity)
2518 .unwrap();
2519 self.incoming_world_events
2520 .push_update(tick, world_entity, component_kind);
2521 }
2522 }
2523 }
2524 }
2525}
2526
2527impl<E: Hash + Copy + Eq + Sync + Send> EntityAndGlobalEntityConverter<E> for Client<E> {
2528 fn global_entity_to_entity(
2529 &self,
2530 global_entity: &GlobalEntity,
2531 ) -> Result<E, EntityDoesNotExistError> {
2532 self.global_entity_map
2533 .global_entity_to_entity(global_entity)
2534 }
2535
2536 fn entity_to_global_entity(
2537 &self,
2538 world_entity: &E,
2539 ) -> Result<GlobalEntity, EntityDoesNotExistError> {
2540 self.global_entity_map.entity_to_global_entity(world_entity)
2541 }
2542}
2543
2544#[derive(Copy, Clone, PartialEq, Eq)]
2548pub enum ConnectionStatus {
2549 Disconnected,
2551 Connecting,
2553 Connected,
2555 Disconnecting,
2557}
2558
2559impl ConnectionStatus {
2560 pub fn is_disconnected(&self) -> bool {
2562 self == &ConnectionStatus::Disconnected
2563 }
2564
2565 pub fn is_connecting(&self) -> bool {
2567 self == &ConnectionStatus::Connecting
2568 }
2569
2570 pub fn is_connected(&self) -> bool {
2572 self == &ConnectionStatus::Connected
2573 }
2574
2575 pub fn is_disconnecting(&self) -> bool {
2577 self == &ConnectionStatus::Disconnecting
2578 }
2579}
2580
2581cfg_if! {
2582 if #[cfg(feature = "interior_visibility")] {
2583
2584 use naia_shared::LocalEntity;
2585
2586 impl<E: Copy + Eq + Hash + Send + Sync> Client<E> {
2587 pub fn local_entities(&self) -> Vec<LocalEntity> {
2596 let connection = self
2597 .server_connection
2598 .as_ref()
2599 .expect("Server connection does not exist");
2600
2601 connection.base.world_manager.local_entities()
2602 }
2603
2604 pub fn local_entity<W: WorldRefType<E>>(
2612 &self,
2613 world: W,
2614 local_entity: &LocalEntity,
2615 ) -> Option<EntityRef<'_, E, W>> {
2616 let world_entity = self.local_to_world_entity(local_entity)?;
2617 if !world.has_entity(&world_entity) {
2618 return None;
2619 }
2620 Some(self.entity(world, &world_entity))
2621 }
2622
2623 pub fn local_entity_mut<W: WorldMutType<E>>(
2631 &mut self,
2632 world: W,
2633 local_entity: &LocalEntity,
2634 ) -> Option<EntityMut<'_, E, W>> {
2635 let world_entity = self.local_to_world_entity(local_entity)?;
2636 if !world.has_entity(&world_entity) {
2637 return None;
2638 }
2639 Some(self.entity_mut(world, &world_entity))
2640 }
2641
2642 fn local_to_world_entity(
2643 &self,
2644 local_entity: &LocalEntity
2645 ) -> Option<E> {
2646 let connection = self.server_connection.as_ref()?;
2647 let converter = connection.base.world_manager.entity_converter();
2648
2649 let owned_local_entity: OwnedLocalEntity = (*local_entity).into();
2650 let global_entity = converter.owned_entity_to_global_entity(&owned_local_entity).ok()?;
2651 let world_entity = self
2652 .global_entity_map
2653 .global_entity_to_entity(&global_entity)
2654 .ok()?;
2655
2656 Some(world_entity)
2657 }
2658
2659 pub(crate) fn world_to_local_entity(
2660 &self,
2661 world_entity: &E,
2662 ) -> Option<LocalEntity> {
2663 let global_entity = self.global_entity_map.entity_to_global_entity(world_entity).ok()?;
2664
2665 let connection = self.server_connection.as_ref()?;
2666 let converter = connection.base.world_manager.entity_converter();
2667 let owned_entity = converter.global_entity_to_owned_entity(&global_entity).ok()?;
2668
2669 Some(LocalEntity::from(owned_entity))
2670 }
2671 }
2672 }
2673}