1use std::{any::Any, collections::VecDeque, hash::Hash, net::SocketAddr, time::Duration};
2
3use log::{info, warn};
4
5use naia_shared::{
6 BitWriter, Channel, ChannelKind, ComponentKind, EntityAndGlobalEntityConverter,
7 EntityAndLocalEntityConverter, EntityAuthStatus, EntityConverterMut, EntityDoesNotExistError,
8 EntityEventMessage, EntityResponseEvent, FakeEntityConverter, GameInstant, GlobalEntity,
9 GlobalRequestId, GlobalResponseId, GlobalWorldManagerType, Instant, Message, MessageContainer,
10 PacketType, Protocol, RemoteEntity, Replicate, ReplicatedComponent, Request, Response,
11 ResponseReceiveKey, ResponseSendKey, Serde, SharedGlobalWorldManager, SocketConfig,
12 StandardHeader, SystemChannel, Tick, WorldMutType, WorldRefType,
13};
14
15use super::{client_config::ClientConfig, error::NaiaClientError, events::Events};
16use crate::{
17 connection::{base_time_manager::BaseTimeManager, connection::Connection, io::Io},
18 handshake::{HandshakeManager, HandshakeResult, Handshaker},
19 transport::{IdentityReceiverResult, Socket},
20 world::{
21 entity_mut::EntityMut, entity_owner::EntityOwner, entity_ref::EntityRef,
22 global_world_manager::GlobalWorldManager,
23 },
24 ReplicationConfig,
25};
26
27pub struct Client<E: Copy + Eq + Hash + Send + Sync> {
30 client_config: ClientConfig,
32 protocol: Protocol,
33 auth_message: Option<Vec<u8>>,
35 auth_headers: Option<Vec<(String, String)>>,
36 io: Io,
37 server_connection: Option<Connection<E>>,
38 handshake_manager: Box<dyn Handshaker>,
39 manual_disconnect: bool,
40 waitlist_messages: VecDeque<(ChannelKind, Box<dyn Message>)>,
41 global_world_manager: GlobalWorldManager<E>,
43 incoming_events: Events<E>,
45 queued_entity_auth_release_messages: Vec<E>,
47}
48
49impl<E: Copy + Eq + Hash + Send + Sync> Client<E> {
50 pub fn new<P: Into<Protocol>>(client_config: ClientConfig, protocol: P) -> Self {
52 let mut protocol: Protocol = protocol.into();
53 protocol.lock();
54
55 let handshake_manager = HandshakeManager::new(
56 client_config.send_handshake_interval,
57 client_config.ping_interval,
58 client_config.handshake_pings,
59 );
60
61 let compression_config = protocol.compression.clone();
62
63 Self {
64 client_config: client_config.clone(),
66 protocol,
67 auth_message: None,
69 auth_headers: None,
70 io: Io::new(
71 &client_config.connection.bandwidth_measure_duration,
72 &compression_config,
73 ),
74 server_connection: None,
75 handshake_manager: Box::new(handshake_manager),
76 manual_disconnect: false,
77 waitlist_messages: VecDeque::new(),
78 global_world_manager: GlobalWorldManager::new(),
80 incoming_events: Events::new(),
82 queued_entity_auth_release_messages: Vec::new(),
84 }
85 }
86
87 pub fn auth<M: Message>(&mut self, auth: M) {
89 let mut bit_writer = BitWriter::new();
91 auth.write(
92 &self.protocol.message_kinds,
93 &mut bit_writer,
94 &mut FakeEntityConverter,
95 );
96 let auth_bytes = bit_writer.to_bytes();
97 self.auth_message = Some(auth_bytes.to_vec());
98 }
99
100 pub fn auth_headers(&mut self, headers: Vec<(String, String)>) {
101 self.auth_headers = Some(headers);
102 }
103
104 pub fn connect<S: Into<Box<dyn Socket>>>(&mut self, socket: S) {
106 if !self.is_disconnected() {
107 panic!("Client has already initiated a connection, cannot initiate a new one. TIP: Check client.is_disconnected() before calling client.connect()");
108 }
109
110 if let Some(auth_bytes) = &self.auth_message {
111 if let Some(auth_headers) = &self.auth_headers {
112 let boxed_socket: Box<dyn Socket> = socket.into();
114 let (id_receiver, packet_sender, packet_receiver) = boxed_socket
115 .connect_with_auth_and_headers(auth_bytes.clone(), auth_headers.clone());
116 self.io.load(id_receiver, packet_sender, packet_receiver);
117 } else {
118 let boxed_socket: Box<dyn Socket> = socket.into();
120 let (id_receiver, packet_sender, packet_receiver) =
121 boxed_socket.connect_with_auth(auth_bytes.clone());
122 self.io.load(id_receiver, packet_sender, packet_receiver);
123 }
124 } else {
125 if let Some(auth_headers) = &self.auth_headers {
126 let boxed_socket: Box<dyn Socket> = socket.into();
128 let (id_receiver, packet_sender, packet_receiver) =
129 boxed_socket.connect_with_auth_headers(auth_headers.clone());
130 self.io.load(id_receiver, packet_sender, packet_receiver);
131 } else {
132 let boxed_socket: Box<dyn Socket> = socket.into();
134 let (id_receiver, packet_sender, packet_receiver) = boxed_socket.connect();
135 self.io.load(id_receiver, packet_sender, packet_receiver);
136 }
137 }
138 }
139
140 pub fn connection_status(&self) -> ConnectionStatus {
142 if self.is_connected() {
143 if self.is_disconnecting() {
144 return ConnectionStatus::Disconnecting;
145 } else {
146 return ConnectionStatus::Connected;
147 }
148 } else {
149 if self.is_disconnected() {
150 return ConnectionStatus::Disconnected;
151 }
152 if self.is_connecting() {
153 return ConnectionStatus::Connecting;
154 }
155 panic!("Client is in an unknown connection state!");
156 }
157 }
158
159 fn is_connecting(&self) -> bool {
161 self.io.is_loaded()
162 }
163
164 fn is_connected(&self) -> bool {
166 self.server_connection.is_some()
167 }
168
169 fn is_disconnecting(&self) -> bool {
171 if let Some(connection) = &self.server_connection {
172 connection.base.should_drop() || self.manual_disconnect
173 } else {
174 false
175 }
176 }
177
178 fn is_disconnected(&self) -> bool {
180 !self.io.is_loaded()
181 }
182
183 pub fn disconnect(&mut self) {
185 if !self.is_connected() {
186 panic!("Trying to disconnect Client which is not connected yet!")
187 }
188
189 for _ in 0..10 {
190 let writer = self.handshake_manager.write_disconnect();
191 if self.io.send_packet(writer.to_packet()).is_err() {
192 warn!("Client Error: Cannot send disconnect packet to Server");
194 }
195 }
196
197 self.manual_disconnect = true;
198 }
199
200 pub fn socket_config(&self) -> &SocketConfig {
202 &self.protocol.socket
203 }
204
205 pub fn receive<W: WorldMutType<E>>(&mut self, mut world: W) -> Events<E> {
211 self.maintain_socket();
214
215 self.send_queued_auth_release_messages();
216
217 let mut response_events = None;
218
219 if self.is_disconnecting() {
221 self.disconnect_with_events(&mut world);
222 return std::mem::take(&mut self.incoming_events);
223 }
224
225 let now = Instant::now();
226
227 if let Some(connection) = &mut self.server_connection {
228 let (receiving_tick_happened, sending_tick_happened) =
229 connection.time_manager.collect_ticks(&now);
230
231 if let Some((prev_receiving_tick, current_receiving_tick)) = receiving_tick_happened {
232 if connection
234 .read_buffered_packets(&self.protocol, &mut self.global_world_manager)
235 .is_err()
236 {
237 warn!("Error reading from buffered packet!");
239 }
240
241 response_events = Some(connection.process_packets(
243 &mut self.global_world_manager,
244 &self.protocol,
245 &mut world,
246 &now,
247 &mut self.incoming_events,
248 ));
249
250 let mut index_tick = prev_receiving_tick.wrapping_add(1);
251 loop {
252 self.incoming_events.push_server_tick(index_tick);
253
254 if index_tick == current_receiving_tick {
255 break;
256 }
257 index_tick = index_tick.wrapping_add(1);
258 }
259 }
260
261 if let Some((prev_sending_tick, current_sending_tick)) = sending_tick_happened {
262 if let Some(mut entities) = connection
266 .base
267 .host_world_manager
268 .world_channel
269 .collect_auth_release_messages()
270 {
271 self.queued_entity_auth_release_messages
272 .append(&mut entities);
273 }
274
275 connection.send_packets(
277 &self.protocol,
278 &now,
279 &mut self.io,
280 &world,
281 &self.global_world_manager,
282 );
283
284 let mut index_tick = prev_sending_tick.wrapping_add(1);
286 loop {
287 self.incoming_events.push_client_tick(index_tick);
288
289 if index_tick == current_sending_tick {
290 break;
291 }
292 index_tick = index_tick.wrapping_add(1);
293 }
294 }
295 } else {
296 if self.io.is_loaded() {
297 if let Some(outgoing_packet) = self.handshake_manager.send() {
298 if self.io.send_packet(outgoing_packet).is_err() {
299 warn!("Client Error: Cannot send handshake packet to Server");
301 }
302 }
303 }
304 }
305
306 if let Some(events) = response_events {
307 self.process_response_events(&mut world, events);
308 }
309
310 std::mem::take(&mut self.incoming_events)
311 }
312
313 pub fn send_message<C: Channel, M: Message>(&mut self, message: &M) {
317 let cloned_message = M::clone_box(message);
318 self.send_message_inner(&ChannelKind::of::<C>(), cloned_message);
319 }
320
321 fn send_message_inner(&mut self, channel_kind: &ChannelKind, message_box: Box<dyn Message>) {
322 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
323 if !channel_settings.can_send_to_server() {
324 panic!("Cannot send message to Server on this Channel");
325 }
326
327 if channel_settings.tick_buffered() {
328 panic!("Cannot call `Client.send_message()` on a Tick Buffered Channel, use `Client.send_tick_buffered_message()` instead");
329 }
330
331 if let Some(connection) = &mut self.server_connection {
332 let mut converter = EntityConverterMut::new(
333 &self.global_world_manager,
334 &mut connection.base.local_world_manager,
335 );
336 let message = MessageContainer::from_write(message_box, &mut converter);
337 connection.base.message_manager.send_message(
338 &self.protocol.message_kinds,
339 &mut converter,
340 channel_kind,
341 message,
342 );
343 } else {
344 self.waitlist_messages
345 .push_back((channel_kind.clone(), message_box));
346 }
347 }
348
349 pub fn send_request<C: Channel, Q: Request>(
351 &mut self,
352 request: &Q,
353 ) -> Result<ResponseReceiveKey<Q::Response>, NaiaClientError> {
354 let cloned_request = Q::clone_box(request);
355 let id = self.send_request_inner(&ChannelKind::of::<C>(), cloned_request)?;
357 Ok(ResponseReceiveKey::new(id))
358 }
359
360 fn send_request_inner(
361 &mut self,
362 channel_kind: &ChannelKind,
363 request_box: Box<dyn Message>,
365 ) -> Result<GlobalRequestId, NaiaClientError> {
366 let channel_settings = self.protocol.channel_kinds.channel(&channel_kind);
367
368 if !channel_settings.can_request_and_respond() {
369 std::panic!("Requests can only be sent over Bidirectional, Reliable Channels");
370 }
371
372 let Some(connection) = &mut self.server_connection else {
373 warn!("currently not connected to server");
374 return Err(NaiaClientError::Message(
375 "currently not connected to server".to_string(),
376 ));
377 };
378 let mut converter = EntityConverterMut::new(
379 &self.global_world_manager,
380 &mut connection.base.local_world_manager,
381 );
382
383 let request_id = connection.global_request_manager.create_request_id();
384 let message = MessageContainer::from_write(request_box, &mut converter);
385 connection.base.message_manager.send_request(
386 &self.protocol.message_kinds,
387 &mut converter,
388 channel_kind,
389 request_id,
390 message,
391 );
392
393 return Ok(request_id);
394 }
395
396 pub fn send_response<S: Response>(
398 &mut self,
399 response_key: &ResponseSendKey<S>,
400 response: &S,
401 ) -> bool {
402 let response_id = response_key.response_id();
403
404 let cloned_response = S::clone_box(response);
405
406 self.send_response_inner(&response_id, cloned_response)
407 }
408
409 fn send_response_inner(
411 &mut self,
412 response_id: &GlobalResponseId,
413 response_box: Box<dyn Message>,
414 ) -> bool {
415 let Some(connection) = &mut self.server_connection else {
416 return false;
417 };
418 let Some((channel_kind, local_response_id)) = connection
419 .global_response_manager
420 .destroy_response_id(response_id)
421 else {
422 return false;
423 };
424 let mut converter = EntityConverterMut::new(
425 &self.global_world_manager,
426 &mut connection.base.local_world_manager,
427 );
428
429 let response = MessageContainer::from_write(response_box, &mut converter);
430 connection.base.message_manager.send_response(
431 &self.protocol.message_kinds,
432 &mut converter,
433 &channel_kind,
434 local_response_id,
435 response,
436 );
437 return true;
438 }
439
440 pub fn receive_response<S: Response>(
441 &mut self,
442 response_key: &ResponseReceiveKey<S>,
443 ) -> Option<S> {
444 let Some(connection) = &mut self.server_connection else {
445 return None;
446 };
447 let request_id = response_key.request_id();
448 let Some(container) = connection
449 .global_request_manager
450 .destroy_request_id(&request_id)
451 else {
452 return None;
453 };
454 let response: S = Box::<dyn Any + 'static>::downcast::<S>(container.to_boxed_any())
455 .ok()
456 .map(|boxed_s| *boxed_s)
457 .unwrap();
458 return Some(response);
459 }
460 fn on_connect(&mut self) {
463 let messages = std::mem::take(&mut self.waitlist_messages);
465 for (channel_kind, message_box) in messages {
466 self.send_message_inner(&channel_kind, message_box);
467 }
468 }
469
470 pub fn send_tick_buffer_message<C: Channel, M: Message>(&mut self, tick: &Tick, message: &M) {
471 let cloned_message = M::clone_box(message);
472 self.send_tick_buffer_message_inner(tick, &ChannelKind::of::<C>(), cloned_message);
473 }
474
475 fn send_tick_buffer_message_inner(
476 &mut self,
477 tick: &Tick,
478 channel_kind: &ChannelKind,
479 message_box: Box<dyn Message>,
480 ) {
481 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
482
483 if !channel_settings.can_send_to_server() {
484 panic!("Cannot send message to Server on this Channel");
485 }
486
487 if !channel_settings.tick_buffered() {
488 panic!("Can only use `Client.send_tick_buffer_message()` on a Channel that is configured for it.");
489 }
490
491 if let Some(connection) = self.server_connection.as_mut() {
492 let mut converter = EntityConverterMut::new(
493 &self.global_world_manager,
494 &mut connection.base.local_world_manager,
495 );
496 let message = MessageContainer::from_write(message_box, &mut converter);
497 connection
498 .tick_buffer
499 .send_message(tick, channel_kind, message);
500 }
501 }
502
503 pub fn spawn_entity<W: WorldMutType<E>>(&mut self, mut world: W) -> EntityMut<E, W> {
508 self.check_client_authoritative_allowed();
509
510 let entity = world.spawn_entity();
511 self.spawn_entity_inner(&entity);
512
513 EntityMut::new(self, world, &entity)
514 }
515
516 fn spawn_entity_inner(&mut self, entity: &E) {
518 self.global_world_manager.host_spawn_entity(entity);
519 if let Some(connection) = &mut self.server_connection {
520 let component_kinds = self.global_world_manager.component_kinds(entity).unwrap();
521 connection.base.host_world_manager.init_entity(
522 &mut connection.base.local_world_manager,
523 entity,
524 component_kinds,
525 );
526 }
527 }
528
529 pub fn entity<W: WorldRefType<E>>(&self, world: W, entity: &E) -> EntityRef<E, W> {
533 if world.has_entity(entity) {
534 return EntityRef::new(self, world, entity);
535 }
536 panic!("No Entity exists for given Key!");
537 }
538
539 pub fn entity_mut<W: WorldMutType<E>>(&mut self, world: W, entity: &E) -> EntityMut<E, W> {
543 self.check_client_authoritative_allowed();
544 if world.has_entity(entity) {
545 return EntityMut::new(self, world, entity);
546 }
547 panic!("No Entity exists for given Key!");
548 }
549
550 pub fn entities<W: WorldRefType<E>>(&self, world: &W) -> Vec<E> {
552 world.entities()
553 }
554
555 pub fn entity_owner(&self, entity: &E) -> EntityOwner {
556 if let Some(owner) = self.global_world_manager.entity_owner(entity) {
557 return owner;
558 }
559 return EntityOwner::Local;
560 }
561
562 pub fn enable_entity_replication(&mut self, entity: &E) {
566 self.check_client_authoritative_allowed();
567 self.spawn_entity_inner(&entity);
568 }
569
570 pub fn disable_entity_replication(&mut self, entity: &E) {
572 self.check_client_authoritative_allowed();
573 self.despawn_entity_worldless(entity);
575 }
576
577 pub fn entity_replication_config(&self, entity: &E) -> Option<ReplicationConfig> {
579 self.check_client_authoritative_allowed();
580 self.global_world_manager.entity_replication_config(entity)
581 }
582
583 pub fn configure_entity_replication<W: WorldMutType<E>>(
585 &mut self,
586 world: &mut W,
587 entity: &E,
588 config: ReplicationConfig,
589 ) {
590 self.check_client_authoritative_allowed();
591 if !self.global_world_manager.has_entity(entity) {
592 panic!("Entity is not yet replicating. Be sure to call `enable_replication` or `spawn_entity` on the Client, before configuring replication.");
593 }
594 let entity_owner = self.global_world_manager.entity_owner(entity).unwrap();
595 let server_owned = entity_owner.is_server();
596 if server_owned {
597 panic!("Client cannot configure replication strategy of Server-owned Entities.");
598 }
599 let client_owned = entity_owner.is_client();
600 if !client_owned {
601 panic!("Client cannot configure replication strategy of Entities it does not own.");
602 }
603 let next_config = config;
604 let prev_config = self
605 .global_world_manager
606 .entity_replication_config(entity)
607 .unwrap();
608 if prev_config == config {
609 panic!(
610 "Entity replication config is already set to {:?}. Should not set twice.",
611 config
612 );
613 }
614 match prev_config {
615 ReplicationConfig::Private => {
616 match next_config {
617 ReplicationConfig::Private => {
618 panic!("This should not be possible.");
619 }
620 ReplicationConfig::Public => {
621 self.publish_entity(entity, true);
623 }
624 ReplicationConfig::Delegated => {
625 self.publish_entity(entity, true);
627 self.entity_enable_delegation(world, entity, true);
628 }
629 }
630 }
631 ReplicationConfig::Public => {
632 match next_config {
633 ReplicationConfig::Private => {
634 self.unpublish_entity(entity, true);
636 }
637 ReplicationConfig::Public => {
638 panic!("This should not be possible.");
639 }
640 ReplicationConfig::Delegated => {
641 self.entity_enable_delegation(world, entity, true);
643 }
644 }
645 }
646 ReplicationConfig::Delegated => {
647 panic!(
648 "Delegated Entities are always ultimately Server-owned. Client cannot modify."
649 )
650 }
651 }
652 }
653
654 pub fn entity_authority_status(&self, entity: &E) -> Option<EntityAuthStatus> {
656 self.check_client_authoritative_allowed();
657
658 self.global_world_manager.entity_authority_status(entity)
659 }
660
661 pub fn entity_request_authority(&mut self, entity: &E) {
663 self.check_client_authoritative_allowed();
664
665 let success = self.global_world_manager.entity_request_authority(entity);
667 if success {
668 let Some(connection) = &mut self.server_connection else {
670 return;
671 };
672 let new_host_entity = connection
673 .base
674 .local_world_manager
675 .host_reserve_entity(entity);
676
677 let message = EntityEventMessage::new_request_authority(
679 &self.global_world_manager,
680 entity,
681 new_host_entity,
682 );
683 self.send_message::<SystemChannel, EntityEventMessage>(&message);
684 }
685 }
686
687 pub fn entity_release_authority(&mut self, entity: &E) {
689 self.check_client_authoritative_allowed();
690
691 let success = self.global_world_manager.entity_release_authority(entity);
693 if success {
694 let Some(connection) = &mut self.server_connection else {
695 return;
696 };
697 let send_release_message = connection
698 .base
699 .host_world_manager
700 .world_channel
701 .entity_release_authority(entity);
702 if send_release_message {
703 self.send_entity_release_auth_message(entity);
704 }
705 }
706 }
707
708 fn send_entity_release_auth_message(&mut self, entity: &E) {
709 let message = EntityEventMessage::new_release_authority(&self.global_world_manager, entity);
711 self.send_message::<SystemChannel, EntityEventMessage>(&message);
712 }
713
714 pub fn server_address(&self) -> Result<SocketAddr, NaiaClientError> {
718 self.io.server_addr()
719 }
720
721 pub fn rtt(&self) -> f32 {
723 self.server_connection
724 .as_ref()
725 .expect("it is expected that you should verify whether the client is connected before calling this method")
726 .time_manager.rtt()
727 }
728
729 pub fn jitter(&self) -> f32 {
731 self.server_connection
732 .as_ref()
733 .expect("it is expected that you should verify whether the client is connected before calling this method")
734 .time_manager.jitter()
735 }
736
737 pub fn client_tick(&self) -> Option<Tick> {
741 let connection = self.server_connection.as_ref()?;
742 return Some(connection.time_manager.client_sending_tick);
743 }
744
745 pub fn client_instant(&self) -> Option<GameInstant> {
747 let connection = self.server_connection.as_ref()?;
748 return Some(connection.time_manager.client_sending_instant);
749 }
750
751 pub fn server_tick(&self) -> Option<Tick> {
753 let connection = self.server_connection.as_ref()?;
754 return Some(connection.time_manager.client_receiving_tick);
755 }
756
757 pub fn server_instant(&self) -> Option<GameInstant> {
759 let connection = self.server_connection.as_ref()?;
760 return Some(connection.time_manager.client_receiving_instant);
761 }
762
763 pub fn tick_to_instant(&self, tick: Tick) -> Option<GameInstant> {
764 if let Some(connection) = &self.server_connection {
765 return Some(connection.time_manager.tick_to_instant(tick));
766 }
767 return None;
768 }
769
770 pub fn tick_duration(&self) -> Option<Duration> {
771 if let Some(connection) = &self.server_connection {
772 return Some(connection.time_manager.tick_duration());
773 }
774 return None;
775 }
776
777 pub fn client_interpolation(&self) -> Option<f32> {
781 if let Some(connection) = &self.server_connection {
782 return Some(connection.time_manager.client_interpolation());
783 }
784 return None;
785 }
786
787 pub fn server_interpolation(&self) -> Option<f32> {
789 if let Some(connection) = &self.server_connection {
790 return Some(connection.time_manager.server_interpolation());
791 }
792 return None;
793 }
794
795 pub fn outgoing_bandwidth(&mut self) -> f32 {
797 self.io.outgoing_bandwidth()
798 }
799
800 pub fn incoming_bandwidth(&mut self) -> f32 {
801 self.io.incoming_bandwidth()
802 }
803
804 pub(crate) fn despawn_entity<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
810 if !world.has_entity(entity) {
811 panic!("attempted to de-spawn nonexistent entity");
812 }
813
814 world.despawn_entity(entity);
816
817 self.despawn_entity_worldless(entity);
819 }
820
821 pub fn despawn_entity_worldless(&mut self, entity: &E) {
822 if !self.global_world_manager.has_entity(entity) {
823 warn!("attempting to despawn entity that has already been despawned?");
824 return;
825 }
826
827 if let Some(owner) = self.global_world_manager.entity_owner(entity) {
829 if owner.is_server() {
830 if !self.global_world_manager.entity_is_delegated(entity) {
831 panic!("attempting to despawn entity that is not yet delegated. Delegation needs some time to be confirmed by the Server, so check that a despawn is possible by calling `commands.entity(..).replication_config(..).is_delegated()` first.");
832 }
833 if self.global_world_manager.entity_authority_status(entity)
834 != Some(EntityAuthStatus::Granted)
835 {
836 panic!("attempting to despawn entity that we do not have authority over");
837 }
838 }
839 } else {
840 panic!("attempting to despawn entity that has no owner");
841 }
842
843 if let Some(connection) = &mut self.server_connection {
844 connection.base.host_world_manager.despawn_entity(entity);
846 }
847
848 self.global_world_manager.host_despawn_entity(entity);
850 }
851
852 pub(crate) fn insert_component<R: ReplicatedComponent, W: WorldMutType<E>>(
854 &mut self,
855 world: &mut W,
856 entity: &E,
857 mut component: R,
858 ) {
859 if !world.has_entity(entity) {
860 panic!("attempted to add component to non-existent entity");
861 }
862
863 let component_kind = component.kind();
864
865 if world.has_component_of_kind(entity, &component_kind) {
866 let Some(mut component_mut) = world.component_mut::<R>(entity) else {
869 panic!("Should never happen because we checked for this above");
870 };
871 component_mut.mirror(&component);
872 } else {
873 self.insert_component_worldless(entity, &mut component);
876
877 world.insert_component(entity, component);
879 }
880 }
881
882 pub fn insert_component_worldless(&mut self, entity: &E, component: &mut dyn Replicate) {
884 let component_kind = component.kind();
885
886 if let Some(connection) = &mut self.server_connection {
888 if connection.base.host_world_manager.host_has_entity(entity) {
890 connection
891 .base
892 .host_world_manager
893 .insert_component(entity, &component_kind);
894 }
895 }
896
897 self.global_world_manager
899 .host_insert_component(entity, component);
900
901 if self.global_world_manager.entity_is_delegated(entity) {
903 let accessor = self.global_world_manager.get_entity_auth_accessor(entity);
904 component.enable_delegation(&accessor, None)
905 }
906 }
907
908 pub(crate) fn remove_component<R: ReplicatedComponent, W: WorldMutType<E>>(
910 &mut self,
911 world: &mut W,
912 entity: &E,
913 ) -> Option<R> {
914 let component_kind = ComponentKind::of::<R>();
916
917 self.remove_component_worldless(entity, &component_kind);
918
919 world.remove_component::<R>(entity)
921 }
922
923 pub fn remove_component_worldless(&mut self, entity: &E, component_kind: &ComponentKind) {
925 if let Some(connection) = &mut self.server_connection {
927 connection
928 .base
929 .host_world_manager
930 .remove_component(entity, &component_kind);
931 }
932
933 self.global_world_manager
935 .host_remove_component(entity, &component_kind);
936 }
937
938 pub(crate) fn publish_entity(&mut self, entity: &E, client_is_origin: bool) {
939 if client_is_origin {
940 let message = EntityEventMessage::new_publish(&self.global_world_manager, entity);
942 self.send_message::<SystemChannel, EntityEventMessage>(&message);
943 } else {
944 if self.global_world_manager.entity_replication_config(entity)
945 != Some(ReplicationConfig::Private)
946 {
947 panic!("Server can only publish Private entities");
948 }
949 }
950 self.global_world_manager.entity_publish(entity);
951 }
953
954 pub(crate) fn unpublish_entity(&mut self, entity: &E, client_is_origin: bool) {
955 if client_is_origin {
956 let message = EntityEventMessage::new_unpublish(&self.global_world_manager, entity);
957 self.send_message::<SystemChannel, EntityEventMessage>(&message);
958 } else {
959 if self.global_world_manager.entity_replication_config(entity)
960 != Some(ReplicationConfig::Public)
961 {
962 panic!("Server can only unpublish Public entities");
963 }
964 }
965 self.global_world_manager.entity_unpublish(entity);
966 }
968
969 pub(crate) fn entity_enable_delegation<W: WorldMutType<E>>(
970 &mut self,
971 world: &mut W,
972 entity: &E,
973 client_is_origin: bool,
974 ) {
975 self.global_world_manager
977 .entity_register_auth_for_delegation(entity);
978
979 if client_is_origin {
980 let message =
983 EntityEventMessage::new_enable_delegation(&self.global_world_manager, entity);
984 self.send_message::<SystemChannel, EntityEventMessage>(&message);
985 } else {
986 self.entity_complete_delegation(world, entity);
987 self.global_world_manager
988 .entity_update_authority(entity, EntityAuthStatus::Available);
989 }
990 }
991
992 fn entity_complete_delegation<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
993 world.entity_enable_delegation(&self.global_world_manager, &entity);
994
995 self.global_world_manager.entity_enable_delegation(&entity);
997 }
998
999 pub(crate) fn entity_disable_delegation<W: WorldMutType<E>>(
1000 &mut self,
1001 world: &mut W,
1002 entity: &E,
1003 client_is_origin: bool,
1004 ) {
1005 info!("client.entity_disable_delegation");
1006 if client_is_origin {
1007 panic!("Cannot disable delegation from Client. Server owns all delegated Entities.");
1008 }
1009
1010 self.global_world_manager.entity_disable_delegation(entity);
1011 world.entity_disable_delegation(entity);
1012
1013 self.despawn_entity_worldless(entity)
1015 }
1016
1017 pub(crate) fn entity_update_authority(
1018 &mut self,
1019 entity: &E,
1020 new_auth_status: EntityAuthStatus,
1021 ) {
1022 let old_auth_status = self
1023 .global_world_manager
1024 .entity_authority_status(entity)
1025 .unwrap();
1026
1027 self.global_world_manager
1028 .entity_update_authority(entity, new_auth_status);
1029
1030 match (old_auth_status, new_auth_status) {
1037 (EntityAuthStatus::Requested, EntityAuthStatus::Granted) => {
1038 let Some(connection) = &mut self.server_connection else {
1041 return;
1042 };
1043 let component_kinds = self.global_world_manager.component_kinds(entity).unwrap();
1045 connection.base.host_world_manager.track_remote_entity(
1046 &mut connection.base.local_world_manager,
1047 entity,
1048 component_kinds,
1049 );
1050
1051 self.incoming_events.push_auth_grant(*entity);
1053 }
1054 (EntityAuthStatus::Releasing, EntityAuthStatus::Available)
1055 | (EntityAuthStatus::Granted, EntityAuthStatus::Available) => {
1056 let Some(connection) = &mut self.server_connection else {
1060 return;
1061 };
1062 connection
1063 .base
1064 .host_world_manager
1065 .untrack_remote_entity(&mut connection.base.local_world_manager, entity);
1066
1067 self.incoming_events.push_auth_reset(*entity);
1069 }
1070 (EntityAuthStatus::Available, EntityAuthStatus::Denied) => {
1071 self.incoming_events.push_auth_deny(*entity);
1073 }
1074 (EntityAuthStatus::Denied, EntityAuthStatus::Available) => {
1075 self.incoming_events.push_auth_reset(*entity);
1077 }
1078 (EntityAuthStatus::Releasing, EntityAuthStatus::Granted) => {
1079 self.global_world_manager
1081 .entity_update_authority(entity, EntityAuthStatus::Available);
1082
1083 let Some(connection) = &mut self.server_connection else {
1085 return;
1086 };
1087 connection
1088 .base
1089 .local_world_manager
1090 .remove_reserved_host_entity(entity);
1091 }
1092 (EntityAuthStatus::Available, EntityAuthStatus::Available) => {
1093 }
1095 (_, _) => {
1096 panic!(
1097 "-- Entity updated authority, not handled -- {:?} -> {:?}",
1098 old_auth_status, new_auth_status
1099 );
1100 }
1101 }
1102 }
1103
1104 fn check_client_authoritative_allowed(&self) {
1107 if !self.protocol.client_authoritative_entities {
1108 panic!("Cannot perform this operation: Client Authoritative Entities are not enabled! Enable them in the Protocol, with the `enable_client_authoritative_entities() method, and note that if you do enable them, to make sure you handle all Spawn/Insert/Update events in the Server, as this may be an attack vector.")
1109 }
1110 }
1111
1112 fn maintain_socket(&mut self) {
1113 if self.server_connection.is_none() {
1114 self.maintain_handshake();
1115 } else {
1116 self.maintain_connection();
1117 }
1118 }
1119
1120 fn maintain_handshake(&mut self) {
1121 if !self.io.is_loaded() {
1124 return;
1125 }
1126
1127 if !self.io.is_authenticated() {
1128 match self.io.recv_auth() {
1129 IdentityReceiverResult::Success(id_token) => {
1130 self.handshake_manager.set_identity_token(id_token);
1131 }
1132 IdentityReceiverResult::Waiting => {
1133 return;
1134 }
1135 IdentityReceiverResult::ErrorResponseCode(code) => {
1136 let old_socket_addr_result = self.io.server_addr();
1139
1140 self.io = Io::new(
1142 &self.client_config.connection.bandwidth_measure_duration,
1143 &self.protocol.compression,
1144 );
1145
1146 if code == 401 {
1147 match old_socket_addr_result {
1149 Ok(old_socket_addr) => {
1150 self.incoming_events.push_rejection(&old_socket_addr);
1151 }
1152 Err(err) => {
1153 self.incoming_events.push_error(err);
1154 }
1155 }
1156 } else {
1157 self.incoming_events
1159 .push_error(NaiaClientError::IdError(code));
1160 }
1161
1162 return;
1163 }
1164 }
1165 }
1166
1167 loop {
1169 match self.io.recv_reader() {
1170 Ok(Some(mut reader)) => {
1171 match self.handshake_manager.recv(&mut reader) {
1172 Some(HandshakeResult::Connected(time_manager)) => {
1173 self.server_connection = Some(Connection::new(
1175 &self.client_config.connection,
1176 &self.protocol.channel_kinds,
1177 time_manager,
1178 &self.global_world_manager,
1179 ));
1180 self.on_connect();
1181
1182 let server_addr = self.server_address_unwrapped();
1183 self.incoming_events.push_connection(&server_addr);
1184 }
1185 None => {}
1193 }
1194 }
1195 Ok(None) => {
1196 break;
1197 }
1198 Err(error) => {
1199 self.incoming_events
1200 .push_error(NaiaClientError::Wrapped(Box::new(error)));
1201 }
1202 }
1203 }
1204
1205 return;
1206 }
1207
1208 fn maintain_connection(&mut self) {
1209 let Some(connection) = self.server_connection.as_mut() else {
1212 panic!("Should have checked for this above");
1213 };
1214
1215 Self::handle_heartbeats(connection, &mut self.io);
1216 Self::handle_pings(connection, &mut self.io);
1217 Self::handle_empty_acks(connection, &mut self.io);
1218
1219 loop {
1221 match self.io.recv_reader() {
1222 Ok(Some(mut reader)) => {
1223 connection.base.mark_heard();
1224
1225 let header = StandardHeader::de(&mut reader)
1226 .expect("unable to parse header from incoming packet");
1227
1228 match header.packet_type {
1229 PacketType::Data
1230 | PacketType::Heartbeat
1231 | PacketType::Ping
1232 | PacketType::Pong => {
1233 }
1236 _ => {
1237 continue;
1240 }
1241 }
1242
1243 connection.process_incoming_header(&header);
1245
1246 let Ok(server_tick) = Tick::de(&mut reader) else {
1248 warn!("unable to parse server_tick from packet");
1249 continue;
1250 };
1251
1252 let Ok(server_tick_instant) = GameInstant::de(&mut reader) else {
1254 warn!("unable to parse server_tick_instant from packet");
1255 continue;
1256 };
1257
1258 connection
1259 .time_manager
1260 .recv_tick_instant(&server_tick, &server_tick_instant);
1261
1262 match header.packet_type {
1264 PacketType::Data => {
1265 connection.base.mark_should_send_empty_ack();
1266
1267 if connection
1268 .buffer_data_packet(&server_tick, &mut reader)
1269 .is_err()
1270 {
1271 warn!("unable to parse data packet");
1272 continue;
1273 }
1274 }
1275 PacketType::Heartbeat => {
1276 }
1278 PacketType::Ping => {
1279 let Ok(ping_index) = BaseTimeManager::read_ping(&mut reader) else {
1280 panic!("unable to read ping index");
1281 };
1282 BaseTimeManager::send_pong(connection, &mut self.io, ping_index);
1283 }
1284 PacketType::Pong => {
1285 if connection.time_manager.read_pong(&mut reader).is_err() {
1286 warn!("Client Error: Cannot process pong packet from Server");
1288 }
1289 }
1290 _ => {
1291 }
1294 }
1295 }
1296 Ok(None) => {
1297 break;
1298 }
1299 Err(error) => {
1300 self.incoming_events
1301 .push_error(NaiaClientError::Wrapped(Box::new(error)));
1302 }
1303 }
1304 }
1305 }
1306
1307 fn handle_heartbeats(connection: &mut Connection<E>, io: &mut Io) {
1308 if connection.base.should_send_heartbeat() {
1310 Self::send_heartbeat_packet(connection, io);
1311 }
1312 }
1313
1314 fn handle_empty_acks(connection: &mut Connection<E>, io: &mut Io) {
1315 if connection.base.should_send_empty_ack() {
1317 Self::send_heartbeat_packet(connection, io);
1318 }
1319 }
1320
1321 fn send_heartbeat_packet(connection: &mut Connection<E>, io: &mut Io) {
1322 let mut writer = BitWriter::new();
1323
1324 connection
1326 .base
1327 .write_header(PacketType::Heartbeat, &mut writer);
1328
1329 if io.send_packet(writer.to_packet()).is_err() {
1331 warn!("Client Error: Cannot send heartbeat packet to Server");
1333 }
1334 connection.base.mark_sent();
1335 }
1336
1337 fn handle_pings(connection: &mut Connection<E>, io: &mut Io) {
1338 if connection.time_manager.send_ping(io) {
1340 connection.base.mark_sent();
1341 }
1342 }
1343
1344 fn disconnect_with_events<W: WorldMutType<E>>(&mut self, world: &mut W) {
1345 let server_addr = self.server_address_unwrapped();
1346
1347 self.incoming_events.clear();
1348
1349 self.despawn_all_remote_entities(world);
1350 self.disconnect_reset_connection();
1351
1352 self.incoming_events.push_disconnection(&server_addr);
1353 }
1354
1355 fn despawn_all_remote_entities<W: WorldMutType<E>>(&mut self, world: &mut W) {
1356 let Some(connection) = self.server_connection.as_mut() else {
1360 panic!("Client is already disconnected!");
1361 };
1362
1363 let remote_entities = connection.base.remote_entities();
1364 let entity_events = SharedGlobalWorldManager::<E>::despawn_all_entities(
1365 world,
1366 &self.global_world_manager,
1367 remote_entities,
1368 );
1369 let response_events = self.incoming_events.receive_world_events(entity_events);
1370 self.process_response_events(world, response_events);
1371 }
1372
1373 fn disconnect_reset_connection(&mut self) {
1374 self.server_connection = None;
1375
1376 self.io = Io::new(
1377 &self.client_config.connection.bandwidth_measure_duration,
1378 &self.protocol.compression,
1379 );
1380
1381 self.handshake_manager = Box::new(HandshakeManager::new(
1382 self.client_config.send_handshake_interval,
1383 self.client_config.ping_interval,
1384 self.client_config.handshake_pings,
1385 ));
1386
1387 self.manual_disconnect = false;
1388 self.global_world_manager = GlobalWorldManager::new();
1389 self.queued_entity_auth_release_messages = Vec::new();
1390 }
1391
1392 fn server_address_unwrapped(&self) -> SocketAddr {
1393 self.io.server_addr().expect("connection not established!")
1395 }
1396
1397 fn process_response_events<W: WorldMutType<E>>(
1398 &mut self,
1399 world: &mut W,
1400 response_events: Vec<EntityResponseEvent<E>>,
1401 ) {
1402 for response_event in response_events {
1403 match response_event {
1404 EntityResponseEvent::SpawnEntity(entity) => {
1405 self.global_world_manager.remote_spawn_entity(&entity);
1406 let Some(connection) = self.server_connection.as_mut() else {
1407 panic!("Client is disconnected!");
1408 };
1409 let local_entity = connection
1410 .base
1411 .local_world_manager
1412 .entity_to_remote_entity(&entity)
1413 .unwrap();
1414 connection
1415 .base
1416 .remote_world_manager
1417 .on_entity_channel_opened(&local_entity);
1418 }
1419 EntityResponseEvent::DespawnEntity(entity) => {
1420 if self.global_world_manager.entity_is_delegated(&entity) {
1421 if let Some(status) =
1422 self.global_world_manager.entity_authority_status(&entity)
1423 {
1424 if status != EntityAuthStatus::Available {
1425 self.entity_update_authority(&entity, EntityAuthStatus::Available);
1426 }
1427 }
1428 }
1429 self.global_world_manager.remove_entity_record(&entity);
1430 }
1431 EntityResponseEvent::InsertComponent(entity, component_kind) => {
1432 self.global_world_manager
1433 .remote_insert_component(&entity, &component_kind);
1434 }
1435 EntityResponseEvent::RemoveComponent(entity, component_kind) => {
1436 self.global_world_manager
1437 .remote_remove_component(&entity, &component_kind);
1438 }
1439 EntityResponseEvent::PublishEntity(entity) => {
1440 self.publish_entity(&entity, false);
1441 self.incoming_events.push_publish(entity);
1442 }
1443 EntityResponseEvent::UnpublishEntity(entity) => {
1444 self.unpublish_entity(&entity, false);
1445 self.incoming_events.push_unpublish(entity);
1446 }
1447 EntityResponseEvent::EnableDelegationEntity(entity) => {
1448 self.entity_enable_delegation(world, &entity, false);
1449
1450 let message = EntityEventMessage::new_enable_delegation_response(
1452 &self.global_world_manager,
1453 &entity,
1454 );
1455 self.send_message::<SystemChannel, EntityEventMessage>(&message);
1456 }
1457 EntityResponseEvent::EnableDelegationEntityResponse(_) => {
1458 panic!("Client should never receive an EnableDelegationEntityResponse event");
1459 }
1460 EntityResponseEvent::DisableDelegationEntity(entity) => {
1461 self.entity_disable_delegation(world, &entity, false);
1462 }
1463 EntityResponseEvent::EntityRequestAuthority(_entity, _remote_entity) => {
1464 panic!("Client should never receive an EntityRequestAuthority event");
1465 }
1466 EntityResponseEvent::EntityReleaseAuthority(_entity) => {
1467 panic!("Client should never receive an EntityReleaseAuthority event");
1468 }
1469 EntityResponseEvent::EntityUpdateAuthority(entity, new_auth_status) => {
1470 self.entity_update_authority(&entity, new_auth_status);
1471 }
1472 EntityResponseEvent::EntityMigrateResponse(world_entity, remote_entity) => {
1473 self.entity_complete_delegation(world, &world_entity);
1474 self.add_redundant_remote_entity_to_host(&world_entity, remote_entity);
1475
1476 self.global_world_manager
1477 .entity_update_authority(&world_entity, EntityAuthStatus::Granted);
1478
1479 self.incoming_events.push_auth_grant(world_entity);
1480 }
1481 }
1482 }
1483 }
1484
1485 pub fn add_redundant_remote_entity_to_host(
1486 &mut self,
1487 world_entity: &E,
1488 remote_entity: RemoteEntity,
1489 ) {
1490 let Some(connection) = self.server_connection.as_mut() else {
1491 panic!("Client is disconnected!");
1492 };
1493
1494 connection
1496 .base
1497 .local_world_manager
1498 .insert_remote_entity(world_entity, remote_entity);
1499
1500 let component_kinds = self
1502 .global_world_manager
1503 .component_kinds(world_entity)
1504 .unwrap();
1505 connection
1506 .base
1507 .remote_world_reader
1508 .track_hosts_redundant_remote_entity(&remote_entity, &component_kinds);
1509 }
1510
1511 fn send_queued_auth_release_messages(&mut self) {
1512 if self.queued_entity_auth_release_messages.is_empty() {
1513 return;
1514 }
1515 let entities = std::mem::take(&mut self.queued_entity_auth_release_messages);
1516 for entity in entities {
1517 self.send_entity_release_auth_message(&entity);
1518 }
1519 }
1520}
1521
1522impl<E: Copy + Eq + Hash + Send + Sync> EntityAndGlobalEntityConverter<E> for Client<E> {
1523 fn global_entity_to_entity(
1524 &self,
1525 global_entity: &GlobalEntity,
1526 ) -> Result<E, EntityDoesNotExistError> {
1527 self.global_world_manager
1528 .global_entity_to_entity(global_entity)
1529 }
1530
1531 fn entity_to_global_entity(&self, entity: &E) -> Result<GlobalEntity, EntityDoesNotExistError> {
1532 self.global_world_manager.entity_to_global_entity(entity)
1533 }
1534}
1535
1536#[derive(Copy, Clone, PartialEq, Eq)]
1537pub enum ConnectionStatus {
1538 Disconnected,
1539 Connecting,
1540 Connected,
1541 Disconnecting,
1542}
1543
1544impl ConnectionStatus {
1545 pub fn is_disconnected(&self) -> bool {
1546 self == &ConnectionStatus::Disconnected
1547 }
1548
1549 pub fn is_connecting(&self) -> bool {
1550 self == &ConnectionStatus::Connecting
1551 }
1552
1553 pub fn is_connected(&self) -> bool {
1554 self == &ConnectionStatus::Connected
1555 }
1556
1557 pub fn is_disconnecting(&self) -> bool {
1558 self == &ConnectionStatus::Disconnecting
1559 }
1560}