Skip to main content

naia_client/
client.rs

1use std::{any::Any, collections::VecDeque, hash::Hash, net::SocketAddr, time::Duration};
2
3use log::{debug, info, warn};
4
5use naia_shared::{
6    handshake::{HandshakeHeader, RejectReason},
7    AuthorityError, BitWriter, Channel, ChannelKind, ComponentKind, ConnectionStats,
8    EntityAndGlobalEntityConverter,
9    EntityAuthStatus, EntityDoesNotExistError, EntityEvent, EntityPriorityMut, EntityPriorityRef,
10    FakeEntityConverter, GameInstant, GlobalEntity, GlobalEntityMap, GlobalEntitySpawner,
11    GlobalRequestId, GlobalResponseId, GlobalWorldManagerType, HostType, Instant, Message,
12    MessageContainer, OwnedLocalEntity, PacketType, Protocol, ProtocolId, Replicate,
13    ReplicatedComponent, Request, Response, ResponseReceiveKey, ResponseSendKey, Serde,
14    SharedGlobalWorldManager, SocketConfig, StandardHeader, Tick, UserPriorityState, WorldMutType,
15    WorldRefType,
16};
17
18use super::{
19    client_config::ClientConfig, error::NaiaClientError, world_events::Events,
20    JitterBufferType,
21};
22use crate::{
23    connection::{base_time_manager::BaseTimeManager, connection::Connection, io::Io},
24    handshake::{HandshakeManager, HandshakeResult, Handshaker},
25    tick_events::TickEvents,
26    transport::{IdentityReceiverResult, Socket},
27    world::{
28        entity_mut::EntityMut, entity_owner::EntityOwner, entity_ref::EntityRef,
29        global_world_manager::GlobalWorldManager,
30    },
31    Publicity,
32};
33
34/// The naia client — connects to a server, receives replicated entities, and
35/// sends client-authoritative mutations and messages.
36///
37/// `E` is your world's entity key type (e.g. a `u32` or ECS `Entity`). It must
38/// be `Copy + Eq + Hash + Send + Sync`.
39///
40/// # Minimal client loop
41///
42/// ```text
43/// loop {
44///     client.receive_all_packets();                      // 1. read UDP/WebRTC
45///     client.process_all_packets(&mut world, &now);      // 2. decode + dispatch
46///     for event in client.take_world_events() { ... }   // 3. handle events
47///     for event in client.take_tick_events(&now) { ... } // 4. advance ticks
48///     // apply predicted state here
49///     client.send_all_packets(&world);                   // 5. flush outbound
50/// }
51/// ```
52///
53/// Steps 1–5 must run in this order every frame. Call [`auth`](Client::auth)
54/// and then [`connect`](Client::connect) once before entering the loop.
55pub struct Client<E: Copy + Eq + Hash + Send + Sync> {
56    // Config
57    client_config: ClientConfig,
58    protocol: Protocol,
59    protocol_id: ProtocolId,
60    // Connection
61    auth_message: Option<Vec<u8>>,
62    auth_headers: Option<Vec<(String, String)>>,
63    io: Io,
64    server_connection: Option<Connection>,
65    handshake_manager: Box<dyn Handshaker>,
66    manual_disconnect: bool,
67    server_disconnect: bool,
68    waitlist_messages: VecDeque<(ChannelKind, Box<dyn Message>)>,
69    // World
70    global_world_manager: GlobalWorldManager,
71    global_entity_map: GlobalEntityMap<E>,
72    // Events
73    incoming_world_events: Events<E>,
74    incoming_tick_events: TickEvents,
75    // Per-connection priority layer (single connection; no global/per-user split).
76    priority: UserPriorityState<E>,
77    // Replicated Resources — client-side mirror of the server's
78    // ResourceRegistry. Populated when an InsertComponent for a
79    // resource-marked component kind arrives; consulted by the bevy
80    // adapter's mirror system to translate component events into
81    // resource events and maintain the bevy-Resource side. See
82    // `_AGENTS/RESOURCES_PLAN.md` §A1 + `RESOURCES_AUDIT.md`.
83    resource_registry: naia_shared::ResourceRegistry,
84}
85
86impl<E: Copy + Eq + Hash + Send + Sync> Client<E> {
87    /// Creates a new client with the given config and protocol.
88    ///
89    /// Call [`auth`](Client::auth) (optional) and then
90    /// [`connect`](Client::connect) before entering the main loop.
91    pub fn new<P: Into<Protocol>>(client_config: ClientConfig, protocol: P) -> Self {
92        let mut protocol: Protocol = protocol.into();
93        protocol.lock();
94        let protocol_id = protocol.protocol_id();
95        Self::new_with_protocol_id(client_config, protocol, protocol_id)
96    }
97
98    /// Creates a new client with an explicit protocol ID.
99    ///
100    /// # Adapter use only
101    ///
102    /// Bevy and macroquad adapters use this to inject a pre-computed ID.
103    /// Prefer [`new`](Client::new) in application code.
104    pub fn new_with_protocol_id(
105        client_config: ClientConfig,
106        protocol: Protocol,
107        protocol_id: ProtocolId,
108    ) -> Self {
109        let handshake_manager = HandshakeManager::new(
110            protocol_id,
111            client_config.send_handshake_interval,
112            client_config.ping_interval,
113            client_config.handshake_pings,
114        );
115
116        let compression_config = protocol.compression.clone();
117
118        Self {
119            // Config
120            client_config: client_config.clone(),
121            protocol,
122            protocol_id,
123            // Connection
124            auth_message: None,
125            auth_headers: None,
126            io: Io::new(
127                &client_config.connection.bandwidth_measure_duration,
128                &compression_config,
129            ),
130            server_connection: None,
131            handshake_manager: Box::new(handshake_manager),
132            manual_disconnect: false,
133            server_disconnect: false,
134            waitlist_messages: VecDeque::new(),
135            // World
136            global_world_manager: GlobalWorldManager::new(),
137            global_entity_map: GlobalEntityMap::new(),
138            // Events
139            incoming_world_events: Events::new(),
140            incoming_tick_events: TickEvents::new(),
141            priority: UserPriorityState::new(),
142            resource_registry: naia_shared::ResourceRegistry::new(),
143        }
144    }
145
146    // Priority
147
148    /// Read-only handle to the priority state for `entity` on this client's
149    /// outbound connection.
150    pub fn entity_priority(&self, entity: E) -> EntityPriorityRef<'_, E> {
151        self.priority.get_ref(entity)
152    }
153
154    /// Mutable handle to the priority state for `entity` on this client's
155    /// outbound connection. Lazy-creates an entry on first write.
156    pub fn entity_priority_mut(&mut self, entity: E) -> EntityPriorityMut<'_, E> {
157        self.priority.get_mut(entity)
158    }
159
160    /// Stores the authentication message to send during the handshake.
161    ///
162    /// Must be called before [`connect`](Client::connect) if the server
163    /// requires authentication. The server receives this as an
164    /// [`AuthEvent`] in its connection handler.
165    ///
166    /// [`AuthEvent`]: naia_server::events::AuthEvent
167    pub fn auth<M: Message>(&mut self, auth: M) {
168        // get auth bytes
169        let mut bit_writer = BitWriter::new();
170        auth.write(
171            &self.protocol.message_kinds,
172            &mut bit_writer,
173            &mut FakeEntityConverter,
174        );
175        let auth_bytes = bit_writer.to_bytes();
176        self.auth_message = Some(auth_bytes.to_vec());
177    }
178
179    /// Stores HTTP-style key-value headers to include in the WebRTC upgrade
180    /// request.
181    ///
182    /// Used by WebRTC transports that support header-based authentication or
183    /// routing. Ignored by native UDP sockets.
184    pub fn auth_headers(&mut self, headers: Vec<(String, String)>) {
185        self.auth_headers = Some(headers);
186    }
187
188    /// Opens the socket and begins the handshake with the server.
189    ///
190    /// If [`auth`](Client::auth) was called, the auth payload is included in
191    /// the handshake. After connecting, process events via the main loop
192    /// until a [`ConnectionEvent`] arrives.
193    ///
194    /// # Panics
195    ///
196    /// Panics if the client has already initiated a connection. Check
197    /// [`connection_status`](Client::connection_status) before calling.
198    ///
199    /// [`ConnectionEvent`]: crate::events::ConnectionEvent
200    pub fn connect<S: Into<Box<dyn Socket>>>(&mut self, socket: S) {
201        if !self.is_disconnected() {
202            panic!("Client has already initiated a connection, cannot initiate a new one. TIP: Check client.is_disconnected() before calling client.connect()");
203        }
204
205        if let Some(auth_bytes) = &self.auth_message {
206            if let Some(auth_headers) = &self.auth_headers {
207                // connect with auth & headers
208                let boxed_socket: Box<dyn Socket> = socket.into();
209                let (id_receiver, packet_sender, packet_receiver) = boxed_socket
210                    .connect_with_auth_and_headers(auth_bytes.clone(), auth_headers.clone());
211                self.io.load(id_receiver, packet_sender, packet_receiver);
212            } else {
213                // connect with auth
214                let boxed_socket: Box<dyn Socket> = socket.into();
215                let (id_receiver, packet_sender, packet_receiver) =
216                    boxed_socket.connect_with_auth(auth_bytes.clone());
217                self.io.load(id_receiver, packet_sender, packet_receiver);
218            }
219        } else if let Some(auth_headers) = &self.auth_headers {
220            // connect with auth headers
221            let boxed_socket: Box<dyn Socket> = socket.into();
222            let (id_receiver, packet_sender, packet_receiver) =
223                boxed_socket.connect_with_auth_headers(auth_headers.clone());
224            self.io.load(id_receiver, packet_sender, packet_receiver);
225        } else {
226            // connect without auth
227            let boxed_socket: Box<dyn Socket> = socket.into();
228            let (id_receiver, packet_sender, packet_receiver) = boxed_socket.connect();
229            self.io.load(id_receiver, packet_sender, packet_receiver);
230        }
231    }
232
233    /// Returns the client's current connection lifecycle state.
234    ///
235    /// Transitions: `Disconnected` → `Connecting` (after
236    /// [`connect`](Client::connect)) → `Connected` (after handshake) →
237    /// `Disconnecting` (after [`disconnect`](Client::disconnect)) →
238    /// `Disconnected`.
239    pub fn connection_status(&self) -> ConnectionStatus {
240        if self.is_connected() {
241            if self.is_disconnecting() {
242                ConnectionStatus::Disconnecting
243            } else {
244                ConnectionStatus::Connected
245            }
246        } else {
247            if self.is_disconnected() {
248                return ConnectionStatus::Disconnected;
249            }
250            if self.is_connecting() {
251                return ConnectionStatus::Connecting;
252            }
253            panic!("Client is in an unknown connection state!");
254        }
255    }
256
257    /// Returns whether or not a connection is being established with the Server
258    fn is_connecting(&self) -> bool {
259        self.io.is_loaded()
260    }
261
262    /// Returns whether or not a connection has been established with the Server
263    fn is_connected(&self) -> bool {
264        self.server_connection.is_some()
265    }
266
267    /// Returns whether or not the client is disconnecting
268    fn is_disconnecting(&self) -> bool {
269        if let Some(connection) = &self.server_connection {
270            connection.should_drop() || self.manual_disconnect || self.server_disconnect
271        } else {
272            false
273        }
274    }
275
276    /// Returns whether or not the client is disconnected
277    fn is_disconnected(&self) -> bool {
278        !self.io.is_loaded()
279    }
280
281    /// Initiates a clean disconnect from the server.
282    ///
283    /// Sends several disconnect packets to increase delivery probability,
284    /// then begins the disconnection process. A [`DisconnectionEvent`] is
285    /// emitted on the next [`take_world_events`](Client::take_world_events)
286    /// call.
287    ///
288    /// # Panics
289    ///
290    /// Panics if the client is not currently connected.
291    ///
292    /// [`DisconnectionEvent`]: crate::events::DisconnectionEvent
293    pub fn disconnect(&mut self) {
294        if !self.is_connected() {
295            panic!("Trying to disconnect Client which is not connected yet!")
296        }
297
298        for _ in 0..10 {
299            let writer = self.handshake_manager.write_disconnect();
300            if self.io.send_packet(writer.to_packet()).is_err() {
301                // Best-effort: we send 10 disconnect packets and move on.
302                // If none reach the server it will time out the connection anyway.
303                warn!("Client Error: Cannot send disconnect packet to Server");
304            }
305        }
306
307        self.manual_disconnect = true;
308    }
309
310    /// Returns the socket configuration from the protocol.
311    pub fn socket_config(&self) -> &SocketConfig {
312        &self.protocol.socket
313    }
314
315    // Event loop ────────────────────────────────────────────────────────────
316
317    /// Reads all pending packets from the socket.
318    ///
319    /// Must be called **first** in the client loop, before
320    /// [`process_all_packets`](Client::process_all_packets). Handles
321    /// handshake progress, heartbeats, and buffers incoming data packets.
322    pub fn receive_all_packets(&mut self) {
323        // Need to run this to maintain connection with server, and receive packets
324        // until none left
325        self.maintain_socket();
326    }
327
328    /// Decodes all buffered packets and applies changes to the world.
329    ///
330    /// Must be called after [`receive_all_packets`](Client::receive_all_packets)
331    /// and before [`take_world_events`](Client::take_world_events). Applies
332    /// server-replicated entity spawn/update/despawn events and queues them
333    /// for the next [`take_world_events`] call.
334    pub fn process_all_packets<W: WorldMutType<E>>(&mut self, mut world: W, now: &Instant) {
335        // all other operations
336        if self.is_disconnecting() {
337            let reason = if self.manual_disconnect || self.server_disconnect {
338                naia_shared::DisconnectReason::ClientDisconnected
339            } else {
340                naia_shared::DisconnectReason::TimedOut
341            };
342            self.disconnect_with_events(&mut world, reason);
343            return;
344        }
345
346        let Some(connection) = &mut self.server_connection else {
347            return;
348        };
349
350        // receive packets, process into events
351        let entity_events = connection.process_packets(
352            &mut self.global_entity_map,
353            &mut self.global_world_manager,
354            &self.protocol,
355            &mut world,
356            now,
357            &mut self.incoming_world_events,
358        );
359
360        self.process_entity_events(&mut world, entity_events);
361    }
362
363    /// Drains and returns all accumulated world events since the last call.
364    ///
365    /// Must be called after [`process_all_packets`](Client::process_all_packets).
366    /// The returned [`Events`] contains entity spawn/despawn/update notifications,
367    /// message arrivals, connection/disconnection signals, and authority events.
368    /// Not calling this causes the buffer to grow without bound.
369    ///
370    /// [`Events`]: crate::Events
371    pub fn take_world_events(&mut self) -> Events<E> {
372        std::mem::take(&mut self.incoming_world_events)
373    }
374
375    /// Advances the tick clocks and returns any tick-boundary events.
376    ///
377    /// Must be called after [`take_world_events`](Client::take_world_events).
378    /// Returns a [`TickEvents`] containing client and server tick advances
379    /// since the last call. Also de-jitters buffered packets on tick
380    /// boundaries (unless the jitter buffer is in bypass mode).
381    ///
382    /// [`TickEvents`]: crate::TickEvents
383    pub fn take_tick_events(&mut self, now: &Instant) -> TickEvents {
384        let Some(connection) = &mut self.server_connection else {
385            return TickEvents::default();
386        };
387
388        let (receiving_tick_happened, sending_tick_happened) =
389            connection.time_manager.collect_ticks(now);
390
391        // If jitter buffer is in bypass mode, process packets immediately regardless of tick
392        // Otherwise, only process on tick boundaries
393        let should_read_packets = match self.client_config.jitter_buffer {
394            JitterBufferType::Bypass => true,
395            JitterBufferType::Real => receiving_tick_happened.is_some(),
396        };
397
398        if should_read_packets {
399            // read packets on tick boundary, de-jittering
400            if let Err(_err) = connection.read_buffered_packets(
401                &self.protocol.channel_kinds,
402                &self.protocol.message_kinds,
403                &self.protocol.component_kinds,
404            ) {
405                // TODO: Except for cosmic radiation .. Server should never send a malformed packet .. handle this
406                warn!("Error reading from buffered packet!");
407            }
408        }
409
410        if let Some((prev_receiving_tick, current_receiving_tick)) = receiving_tick_happened {
411            let mut index_tick = prev_receiving_tick.wrapping_add(1);
412            loop {
413                self.incoming_tick_events.push_server_tick(index_tick);
414
415                if index_tick == current_receiving_tick {
416                    break;
417                }
418                index_tick = index_tick.wrapping_add(1);
419            }
420        }
421
422        if let Some((prev_sending_tick, current_sending_tick)) = sending_tick_happened {
423            // insert tick events in total range
424            let mut index_tick = prev_sending_tick.wrapping_add(1);
425            loop {
426                self.incoming_tick_events.push_client_tick(index_tick);
427
428                if index_tick == current_sending_tick {
429                    break;
430                }
431                index_tick = index_tick.wrapping_add(1);
432            }
433        }
434
435        std::mem::take(&mut self.incoming_tick_events)
436    }
437
438    /// Flushes all queued messages and entity mutations to the server.
439    ///
440    /// Must be called **last** in the client loop. Serialises outbound
441    /// packets and hands them to the transport. Also handles handshake
442    /// packet retransmission when not yet connected. If this is not called,
443    /// the server never receives any updates.
444    pub fn send_all_packets<W: WorldRefType<E>>(&mut self, world: W) {
445        if let Some(connection) = &mut self.server_connection {
446            let now = Instant::now();
447
448            // send packets
449            connection.send_packets(
450                &self.protocol,
451                &now,
452                &mut self.io,
453                &world,
454                &self.global_entity_map,
455                &self.global_world_manager,
456            );
457        } else if self.io.is_loaded() {
458            if let Some(outgoing_packet) = self.handshake_manager.send() {
459                if self.io.send_packet(outgoing_packet).is_err() {
460                    // Single handshake send failure is not fatal: the handshake
461                    // manager retries on the next tick until the server responds.
462                    warn!("Client Error: Cannot send handshake packet to Server");
463                }
464            }
465        }
466    }
467
468    // Messaging ─────────────────────────────────────────────────────────────
469
470    /// Queues a message to be sent to the server on the next
471    /// [`send_all_packets`](Client::send_all_packets) call.
472    ///
473    /// `C` is the channel type (ordering and reliability). `M` is the message
474    /// type (must be registered in the [`Protocol`]). Messages sent before
475    /// the connection is established are queued and delivered on connect.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the channel does not allow client-to-server
480    /// messages, or if the channel is `TickBuffered` (use
481    /// [`send_tick_buffer_message`](Client::send_tick_buffer_message) instead).
482    ///
483    /// [`Protocol`]: naia_shared::Protocol
484    pub fn send_message<C: Channel, M: Message>(
485        &mut self,
486        message: &M,
487    ) -> Result<(), NaiaClientError> {
488        let cloned_message = M::clone_box(message);
489        self.send_message_inner(&ChannelKind::of::<C>(), cloned_message)
490    }
491
492    fn send_message_inner(
493        &mut self,
494        channel_kind: &ChannelKind,
495        message_box: Box<dyn Message>,
496    ) -> Result<(), NaiaClientError> {
497        let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
498        if !channel_settings.can_send_to_server() {
499            return Err(NaiaClientError::Message(
500                "Cannot send message to Server on this Channel".to_string(),
501            ));
502        }
503
504        if channel_settings.tick_buffered() {
505            return Err(NaiaClientError::Message("Cannot call `Client.send_message()` on a Tick Buffered Channel, use `Client.send_tick_buffered_message()` instead".to_string()));
506        }
507
508        if let Some(connection) = &mut self.server_connection {
509            let mut converter = connection
510                .base
511                .world_manager
512                .entity_converter_mut(&self.global_world_manager);
513            let message = MessageContainer::new(message_box);
514            let accepted = connection.base.message_manager.send_message(
515                &self.protocol.message_kinds,
516                &mut converter,
517                channel_kind,
518                message,
519            );
520            if !accepted {
521                return Err(NaiaClientError::MessageQueueFull);
522            }
523        } else {
524            self.waitlist_messages
525                .push_back((*channel_kind, message_box));
526        }
527        Ok(())
528    }
529
530    /// Sends a request to the server and returns a key for polling the
531    /// response.
532    ///
533    /// Use [`receive_response`](Client::receive_response) with the returned
534    /// key to collect the server's reply.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the client is not currently connected.
539    ///
540    /// # Panics
541    ///
542    /// Panics if the channel is not bidirectional and reliable.
543    pub fn send_request<C: Channel, Q: Request>(
544        &mut self,
545        request: &Q,
546    ) -> Result<ResponseReceiveKey<Q::Response>, NaiaClientError> {
547        let cloned_request = Q::clone_box(request);
548        // let response_type_id = TypeId::of::<Q::Response>();
549        let id = self.send_request_inner(&ChannelKind::of::<C>(), cloned_request)?;
550        Ok(ResponseReceiveKey::new(id))
551    }
552
553    fn send_request_inner(
554        &mut self,
555        channel_kind: &ChannelKind,
556        // response_type_id: TypeId,
557        request_box: Box<dyn Message>,
558    ) -> Result<GlobalRequestId, NaiaClientError> {
559        let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
560
561        if !channel_settings.can_request_and_respond() {
562            std::panic!("Requests can only be sent over Bidirectional, Reliable Channels");
563        }
564
565        let Some(connection) = &mut self.server_connection else {
566            warn!("currently not connected to server");
567            return Err(NaiaClientError::Message(
568                "currently not connected to server".to_string(),
569            ));
570        };
571        let mut converter = connection
572            .base
573            .world_manager
574            .entity_converter_mut(&self.global_world_manager);
575
576        let request_id = connection.global_request_manager.create_request_id();
577        let message = MessageContainer::new(request_box);
578        connection.base.message_manager.send_request(
579            &self.protocol.message_kinds,
580            &mut converter,
581            channel_kind,
582            request_id,
583            message,
584        );
585
586        Ok(request_id)
587    }
588
589    /// Sends a response to the server's request.
590    ///
591    /// `response_key` is obtained from the [`RequestEvent`] that delivered
592    /// the server's original request. Returns `true` on success; `false` if
593    /// the key is no longer valid (e.g. the connection was dropped).
594    ///
595    /// [`RequestEvent`]: crate::events::RequestEvent
596    pub fn send_response<S: Response>(
597        &mut self,
598        response_key: &ResponseSendKey<S>,
599        response: &S,
600    ) -> bool {
601        let response_id = response_key.response_id();
602
603        let cloned_response = S::clone_box(response);
604
605        self.send_response_inner(&response_id, cloned_response)
606    }
607
608    // returns whether was successful
609    fn send_response_inner(
610        &mut self,
611        response_id: &GlobalResponseId,
612        response_box: Box<dyn Message>,
613    ) -> bool {
614        let Some(connection) = &mut self.server_connection else {
615            return false;
616        };
617        let Some((channel_kind, local_response_id)) = connection
618            .global_response_manager
619            .destroy_response_id(response_id)
620        else {
621            return false;
622        };
623        let mut converter = connection
624            .base
625            .world_manager
626            .entity_converter_mut(&self.global_world_manager);
627
628        let response = MessageContainer::new(response_box);
629        connection.base.message_manager.send_response(
630            &self.protocol.message_kinds,
631            &mut converter,
632            &channel_kind,
633            local_response_id,
634            response,
635        );
636        true
637    }
638
639    /// Returns `true` if a response to the given request has arrived.
640    ///
641    /// Non-destructive — does not consume the response. Call
642    /// [`receive_response`](Client::receive_response) to retrieve and consume
643    /// it.
644    pub fn has_response<S: Response>(&self, response_key: &ResponseReceiveKey<S>) -> bool {
645        let Some(connection) = &self.server_connection else {
646            return false;
647        };
648        let request_id = response_key.request_id();
649        connection.global_request_manager.has_response(&request_id)
650    }
651
652    /// Polls for and consumes a response to a previously sent client request.
653    ///
654    /// Returns `Some(response)` once the server replies, or `None` if the
655    /// response has not yet arrived or the key is invalid. The key is
656    /// invalidated after a successful receive.
657    pub fn receive_response<S: Response>(
658        &mut self,
659        response_key: &ResponseReceiveKey<S>,
660    ) -> Option<S> {
661        let Some(connection) = &mut self.server_connection else {
662            return None;
663        };
664        let request_id = response_key.request_id();
665        let container = connection
666            .global_request_manager
667            .destroy_request_id(&request_id)?;
668        let response: S = Box::<dyn Any + 'static>::downcast::<S>(container.to_boxed_any())
669            .ok()
670            .map(|boxed_s| *boxed_s)
671            .unwrap();
672        Some(response)
673    }
674    //
675
676    fn on_connect(&mut self) {
677        // send queued messages
678        let messages = std::mem::take(&mut self.waitlist_messages);
679        for (channel_kind, message_box) in messages {
680            let _ = self.send_message_inner(&channel_kind, message_box);
681        }
682    }
683
684    /// Queues a tick-buffered message stamped with the given client tick.
685    ///
686    /// Use this for client input on a [`TickBuffered`] channel. The server
687    /// receives the message when its tick counter reaches the stamped tick,
688    /// enabling tick-accurate input replay.
689    ///
690    /// # Panics
691    ///
692    /// Panics if the channel does not have `TickBuffered` mode enabled.
693    ///
694    /// [`TickBuffered`]: naia_shared::ChannelMode::TickBuffered
695    pub fn send_tick_buffer_message<C: Channel, M: Message>(&mut self, tick: &Tick, message: &M) {
696        let cloned_message = M::clone_box(message);
697        self.send_tick_buffer_message_inner(tick, &ChannelKind::of::<C>(), cloned_message);
698    }
699
700    fn send_tick_buffer_message_inner(
701        &mut self,
702        tick: &Tick,
703        channel_kind: &ChannelKind,
704        message_box: Box<dyn Message>,
705    ) {
706        let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
707
708        if !channel_settings.can_send_to_server() {
709            panic!("Cannot send message to Server on this Channel");
710        }
711
712        if !channel_settings.tick_buffered() {
713            panic!("Can only use `Client.send_tick_buffer_message()` on a Channel that is configured for it.");
714        }
715
716        if let Some(connection) = self.server_connection.as_mut() {
717            let message = MessageContainer::new(message_box);
718            connection
719                .tick_buffer
720                .send_message(tick, channel_kind, message);
721        }
722    }
723
724    // Entities ──────────────────────────────────────────────────────────────
725
726    /// Spawns a client-owned entity and returns a builder for configuring it.
727    ///
728    /// The spawned entity starts as [`Private`](naia_shared::Publicity::Private);
729    /// call [`configure_replication`](crate::EntityMut::configure_replication)
730    /// on the returned [`EntityMut`] to publish it.
731    ///
732    /// Requires that the protocol was built with
733    /// `enable_client_authoritative_entities()`.
734    ///
735    /// # Panics
736    ///
737    /// Panics if client-authoritative entities are not enabled in the protocol.
738    pub fn spawn_entity<W: WorldMutType<E>>(&'_ mut self, mut world: W) -> EntityMut<'_, E, W> {
739        self.check_client_authoritative_allowed();
740
741        let world_entity = world.spawn_entity();
742
743        self.spawn_entity_inner(&world_entity);
744
745        EntityMut::new(self, world, &world_entity)
746    }
747
748    /// Creates a new Entity with a specific id
749    fn spawn_entity_inner(&mut self, world_entity: &E) {
750        let global_entity = self.global_entity_map.spawn(*world_entity, None);
751
752        self.global_world_manager.host_spawn_entity(&global_entity);
753
754        let Some(connection) = &mut self.server_connection else {
755            return;
756        };
757        let component_kinds = self
758            .global_world_manager
759            .component_kinds(&global_entity)
760            .unwrap();
761        connection
762            .base
763            .world_manager
764            .host_init_entity(&global_entity, component_kinds, &self.protocol.component_kinds, false);
765    }
766
767    // Replicated Resources (client-side mirror) ─────────────────────────────
768    // Populated when the remote-apply path delivers an InsertComponent for a
769    // resource kind. Clears on Despawn. The Bevy adapter consumes this to
770    // drive the Bevy-Resource mirror (see adapters/bevy/client/src/resource_sync).
771
772    /// Returns `true` if the client has a server-replicated resource of type
773    /// `R` currently in scope.
774    pub fn has_resource<R: 'static>(&self) -> bool {
775        self.resource_registry.entity_for::<R>().is_some()
776    }
777
778    /// O(1): the world-entity carrying resource `R` on this client,
779    /// or `None` if not currently in scope.
780    pub fn resource_entity<R: 'static>(&self) -> Option<E> {
781        let global_entity = self.resource_registry.entity_for::<R>()?;
782        self.global_entity_map
783            .global_entity_to_entity(&global_entity)
784            .ok()
785    }
786
787    /// True iff `world_entity` is the entity carrying any Replicated
788    /// Resource currently in scope on this client.
789    pub fn is_resource_entity(&self, world_entity: &E) -> bool {
790        let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity)
791        else {
792            return false;
793        };
794        self.resource_registry.is_resource_entity(&global_entity)
795    }
796
797    /// Number of currently-mirrored Replicated Resources.
798    pub fn resources_count(&self) -> usize {
799        self.resource_registry.len()
800    }
801
802    /// Iterate over the world-entities of all currently-mirrored resources.
803    pub fn resource_entities(&self) -> Vec<E> {
804        let mut out = Vec::with_capacity(self.resource_registry.len());
805        for global_entity in self.resource_registry.entities() {
806            if let Ok(e) = self
807                .global_entity_map
808                .global_entity_to_entity(global_entity)
809            {
810                out.push(e);
811            }
812        }
813        out
814    }
815
816    /// Returns a read-only handle to the entity.
817    ///
818    /// # Panics
819    ///
820    /// Panics if the entity does not exist in the world.
821    pub fn entity<W: WorldRefType<E>>(&'_ self, world: W, entity: &E) -> EntityRef<'_, E, W> {
822        if world.has_entity(entity) {
823            return EntityRef::new(self, world, entity);
824        }
825        panic!("No Entity exists for given Key!");
826    }
827
828    /// Returns a mutable handle to the entity.
829    ///
830    /// # Panics
831    ///
832    /// Panics if the entity does not exist in the world, or if
833    /// client-authoritative entities are not enabled in the protocol.
834    pub fn entity_mut<W: WorldMutType<E>>(
835        &'_ mut self,
836        world: W,
837        entity: &E,
838    ) -> EntityMut<'_, E, W> {
839        self.check_client_authoritative_allowed();
840        if world.has_entity(entity) {
841            return EntityMut::new(self, world, entity);
842        }
843        panic!("No Entity exists for given Key!");
844    }
845
846    /// Returns all entities currently present in the world.
847    pub fn entities<W: WorldRefType<E>>(&self, world: &W) -> Vec<E> {
848        world.entities()
849    }
850
851    pub(crate) fn entity_owner(&self, world_entity: &E) -> EntityOwner {
852        if let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) {
853            if let Some(owner) = self.global_world_manager.entity_owner(&global_entity) {
854                return owner;
855            }
856        }
857        EntityOwner::Local
858    }
859
860    // Authority and replication config ──────────────────────────────────────
861
862    /// Registers the entity with the replication layer.
863    ///
864    /// # Adapter use only
865    ///
866    /// Called by the Bevy adapter when a [`Replicate`] component is inserted.
867    /// Use [`spawn_entity`](Client::spawn_entity) in application code.
868    ///
869    /// [`Replicate`]: naia_shared::Replicate
870    pub fn enable_entity_replication(&mut self, entity: &E) {
871        self.check_client_authoritative_allowed();
872        self.spawn_entity_inner(entity);
873    }
874
875    /// Unregisters the entity from the replication layer.
876    ///
877    /// # Adapter use only
878    ///
879    /// Called by the Bevy adapter when a [`Replicate`] component is removed.
880    ///
881    /// [`Replicate`]: naia_shared::Replicate
882    pub fn disable_entity_replication(&mut self, entity: &E) {
883        self.check_client_authoritative_allowed();
884        // Despawn from connections and inner tracking
885        self.despawn_entity_worldless(entity);
886    }
887
888    /// Returns the current [`Publicity`] for the entity, or `None` if the
889    /// entity is not registered.
890    ///
891    /// # Adapter use only
892    ///
893    /// Use [`EntityRef::replication_config`](crate::EntityRef::replication_config)
894    /// in application code.
895    pub fn entity_replication_config(&self, world_entity: &E) -> Option<Publicity> {
896        self.check_client_authoritative_allowed();
897        let global_entity = self
898            .global_entity_map
899            .entity_to_global_entity(world_entity)
900            .unwrap();
901        self.global_world_manager
902            .entity_replication_config(&global_entity)
903    }
904
905    /// Updates the replication config for a client-owned entity.
906    ///
907    /// # Adapter use only
908    ///
909    /// Application code should call
910    /// [`entity_mut(...).configure_replication(config)`](crate::EntityMut::configure_replication)
911    /// instead.
912    ///
913    /// # Panics
914    ///
915    /// Panics if the entity is server-owned, not yet replicating, or if the
916    /// entity is already `Delegated`.
917    pub fn configure_entity_replication<W: WorldMutType<E>>(
918        &mut self,
919        world: &mut W,
920        world_entity: &E,
921        config: Publicity,
922    ) {
923        self.check_client_authoritative_allowed();
924        let global_entity = self
925            .global_entity_map
926            .entity_to_global_entity(world_entity)
927            .unwrap();
928        if !self.global_world_manager.has_entity(&global_entity) {
929            panic!("Entity is not yet replicating. Be sure to call `enable_replication` or `spawn_entity` on the Client, before configuring replication.");
930        }
931        let entity_owner = self
932            .global_world_manager
933            .entity_owner(&global_entity)
934            .unwrap();
935        let server_owned = entity_owner.is_server();
936        if server_owned {
937            panic!("Client cannot configure replication strategy of Server-owned Entities.");
938        }
939        let client_owned = entity_owner.is_client();
940        if !client_owned {
941            panic!("Client cannot configure replication strategy of Entities it does not own.");
942        }
943        let next_config = config;
944        let prev_config = self
945            .global_world_manager
946            .entity_replication_config(&global_entity)
947            .unwrap();
948        if prev_config == config {
949            // Already in the desired state, no-op
950            return;
951        }
952        match prev_config {
953            Publicity::Private => {
954                match next_config {
955                    Publicity::Private => {
956                        panic!("This should not be possible.");
957                    }
958                    Publicity::Public => {
959                        // private -> public
960                        self.publish_entity(&global_entity, true);
961                    }
962                    Publicity::Delegated => {
963                        // private -> delegated
964                        self.publish_entity(&global_entity, true);
965                        self.entity_enable_delegation(world, &global_entity, world_entity, true);
966                    }
967                }
968            }
969            Publicity::Public => {
970                match next_config {
971                    Publicity::Private => {
972                        // public -> private
973                        self.unpublish_entity(&global_entity, true);
974                    }
975                    Publicity::Public => {
976                        panic!("This should not be possible.");
977                    }
978                    Publicity::Delegated => {
979                        // public -> delegated
980                        self.entity_enable_delegation(world, &global_entity, world_entity, true);
981                    }
982                }
983            }
984            Publicity::Delegated => {
985                panic!(
986                    "Delegated Entities are always ultimately Server-owned. Client cannot modify."
987                )
988            }
989        }
990    }
991
992    /// Returns the current authority status for the entity from the client's
993    /// perspective, or `None` if the entity is not delegable.
994    ///
995    /// # Adapter use only
996    ///
997    /// Application code should inspect authority via [`EntityRef::authority`](crate::EntityRef::authority).
998    pub fn entity_authority_status(&self, world_entity: &E) -> Option<EntityAuthStatus> {
999        self.check_client_authoritative_allowed();
1000
1001        let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) else {
1002            return None;
1003        };
1004
1005        self.global_world_manager.entity_authority_status(&global_entity)
1006    }
1007
1008    /// Sends an authority request to the server for the given delegated entity.
1009    ///
1010    /// The server responds with either [`EntityAuthGrantedEvent`] or
1011    /// [`EntityAuthDeniedEvent`]. Only valid for entities with
1012    /// [`Delegated`](naia_shared::Publicity::Delegated) replication config.
1013    ///
1014    /// # Adapter use only
1015    ///
1016    /// Application code should call
1017    /// [`entity_mut(...).request_authority()`](crate::EntityMut::request_authority)
1018    /// instead.
1019    ///
1020    /// [`EntityAuthGrantedEvent`]: crate::events::EntityAuthGrantedEvent
1021    /// [`EntityAuthDeniedEvent`]: crate::events::EntityAuthDeniedEvent
1022    pub fn entity_request_authority(&mut self, world_entity: &E) -> Result<(), AuthorityError> {
1023        self.check_client_authoritative_allowed();
1024
1025        let global_entity = self
1026            .global_entity_map
1027            .entity_to_global_entity(world_entity)
1028            .unwrap();
1029
1030        // 1. Set local authority status for Entity
1031        let result = self
1032            .global_world_manager
1033            .entity_request_authority(&global_entity);
1034
1035        if result.is_ok() {
1036            // 2. Send request to Server via EntityActionEvent system
1037            let Some(connection) = &mut self.server_connection else {
1038                return result;
1039            };
1040
1041            connection
1042                .base
1043                .world_manager
1044                .remote_send_request_auth(&global_entity);
1045        }
1046        result
1047    }
1048
1049    /// Releases the client's authority over the given entity back to the
1050    /// server.
1051    ///
1052    /// Only valid when this client holds `Granted` authority. The server
1053    /// resumes ownership after confirming the release.
1054    ///
1055    /// # Adapter use only
1056    ///
1057    /// Application code should call
1058    /// [`entity_mut(...).release_authority()`](crate::EntityMut::release_authority)
1059    /// instead.
1060    pub fn entity_release_authority(&mut self, world_entity: &E) -> Result<(), AuthorityError> {
1061        self.check_client_authoritative_allowed();
1062
1063        let global_entity = self
1064            .global_entity_map
1065            .entity_to_global_entity(world_entity)
1066            .unwrap();
1067
1068        // 1. Set local authority status for Entity
1069        let result = self
1070            .global_world_manager
1071            .entity_release_authority(&global_entity);
1072        if result.is_ok() {
1073            let Some(connection) = &mut self.server_connection else {
1074                return result;
1075            };
1076            connection
1077                .base
1078                .world_manager
1079                .remote_send_release_auth(&global_entity);
1080        }
1081        result
1082    }
1083
1084    // Connection ────────────────────────────────────────────────────────────
1085
1086    /// Returns the server's socket address.
1087    ///
1088    /// # Errors
1089    ///
1090    /// Returns an error if the connection has not been established yet.
1091    pub fn server_address(&self) -> Result<SocketAddr, NaiaClientError> {
1092        self.io.server_addr()
1093    }
1094
1095    /// Returns the rolling-average round-trip time (seconds) to the server.
1096    ///
1097    /// Returns `0.0` if the connection has not been established yet.
1098    pub fn rtt(&self) -> f32 {
1099        self.server_connection
1100            .as_ref()
1101            .map(|conn| conn.time_manager.rtt() / 1000.0)
1102            .unwrap_or(0.0)
1103    }
1104
1105    /// Returns the rolling-average jitter (seconds) measured for the server
1106    /// connection.
1107    ///
1108    /// Returns `0.0` if the connection has not been established yet.
1109    pub fn jitter(&self) -> f32 {
1110        self.server_connection
1111            .as_ref()
1112            .map(|conn| conn.time_manager.jitter() / 1000.0)
1113            .unwrap_or(0.0)
1114    }
1115
1116    // Ticks ─────────────────────────────────────────────────────────────────
1117
1118    /// Returns the client's current sending tick, or `None` if not connected.
1119    ///
1120    /// This is the tick at which the client is currently sending — use it to
1121    /// stamp [`TickBuffered`] messages for prediction.
1122    ///
1123    /// [`TickBuffered`]: naia_shared::ChannelMode::TickBuffered
1124    pub fn client_tick(&self) -> Option<Tick> {
1125        let connection = self.server_connection.as_ref()?;
1126        Some(connection.time_manager.client_sending_tick)
1127    }
1128
1129    /// Returns the `GameInstant` corresponding to the client's current sending
1130    /// tick, or `None` if not connected.
1131    pub fn client_instant(&self) -> Option<GameInstant> {
1132        let connection = self.server_connection.as_ref()?;
1133        Some(connection.time_manager.client_sending_instant)
1134    }
1135
1136    /// Returns the server tick that the client is currently receiving, or
1137    /// `None` if not connected.
1138    ///
1139    /// This lags slightly behind the server's actual current tick due to
1140    /// network latency and the jitter buffer.
1141    pub fn server_tick(&self) -> Option<Tick> {
1142        let connection = self.server_connection.as_ref()?;
1143        Some(connection.time_manager.client_receiving_tick)
1144    }
1145
1146    /// Returns the `GameInstant` corresponding to the current server-receive
1147    /// tick, or `None` if not connected.
1148    pub fn server_instant(&self) -> Option<GameInstant> {
1149        let connection = self.server_connection.as_ref()?;
1150        Some(connection.time_manager.client_receiving_instant)
1151    }
1152
1153    /// Converts a tick counter value to the corresponding `GameInstant`,
1154    /// or `None` if not connected.
1155    pub fn tick_to_instant(&self, tick: Tick) -> Option<GameInstant> {
1156        if let Some(connection) = &self.server_connection {
1157            return Some(connection.time_manager.tick_to_instant(tick));
1158        }
1159        None
1160    }
1161
1162    /// Returns the duration of a single tick as configured in the protocol,
1163    /// or `None` if not connected.
1164    pub fn tick_duration(&self) -> Option<Duration> {
1165        if let Some(connection) = &self.server_connection {
1166            return Some(connection.time_manager.tick_duration());
1167        }
1168        None
1169    }
1170
1171    // Interpolation ─────────────────────────────────────────────────────────
1172
1173    /// Returns the interpolation fraction `[0.0, 1.0)` for the current frame
1174    /// within the client sending tick.
1175    ///
1176    /// Use this to lerp predicted entities between their state at the previous
1177    /// and current client ticks. Returns `None` if not connected.
1178    pub fn client_interpolation(&self) -> Option<f32> {
1179        if let Some(connection) = &self.server_connection {
1180            return Some(connection.time_manager.client_interpolation());
1181        }
1182        None
1183    }
1184
1185    /// Returns the interpolation fraction `[0.0, 1.0)` for the current frame
1186    /// within the server receive tick.
1187    ///
1188    /// Use this to lerp authoritative server-replicated entities between their
1189    /// state at the previous and current server ticks. Returns `None` if not
1190    /// connected.
1191    pub fn server_interpolation(&self) -> Option<f32> {
1192        if let Some(connection) = &self.server_connection {
1193            return Some(connection.time_manager.server_interpolation());
1194        }
1195        None
1196    }
1197
1198    // Diagnostics ───────────────────────────────────────────────────────────
1199
1200    /// Returns the rolling-average outgoing bandwidth to the server
1201    /// (bytes/second).
1202    pub fn outgoing_bandwidth(&self) -> f32 {
1203        self.io.outgoing_bandwidth()
1204    }
1205
1206    /// Returns the rolling-average incoming bandwidth from the server
1207    /// (bytes/second).
1208    pub fn incoming_bandwidth(&self) -> f32 {
1209        self.io.incoming_bandwidth()
1210    }
1211
1212    /// Returns a snapshot of per-connection diagnostics.
1213    ///
1214    /// Returns `None` if not connected. Includes RTT (average in ms), jitter,
1215    /// packet-loss fraction, and send/recv bandwidth in kbps.
1216    pub fn connection_stats(&self) -> Option<ConnectionStats> {
1217        let conn = self.server_connection.as_ref()?;
1218        let rtt_ms = conn.time_manager.rtt();
1219        let jitter_ms = conn.time_manager.jitter();
1220        let packet_loss_pct = conn.base.packet_loss_pct();
1221        Some(ConnectionStats {
1222            rtt_ms,
1223            rtt_p50_ms: rtt_ms,
1224            rtt_p99_ms: conn.time_manager.rtt_p99_ms(),
1225            jitter_ms,
1226            packet_loss_pct,
1227            kbps_sent: self.io.outgoing_bandwidth(),
1228            kbps_recv: self.io.incoming_bandwidth(),
1229        })
1230    }
1231
1232    // Crate-Public methods
1233
1234    /// Despawns the Entity, if it exists.
1235    /// This will also remove all of the Entity’s Components.
1236    /// Panics if the Entity does not exist.
1237    pub(crate) fn despawn_entity<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
1238        if !world.has_entity(entity) {
1239            panic!("attempted to de-spawn nonexistent entity");
1240        }
1241
1242        // Actually despawn from world
1243        world.despawn_entity(entity);
1244
1245        // Despawn from connections and inner tracking
1246        self.despawn_entity_worldless(entity);
1247    }
1248
1249    /// Despawns the entity from the replication layer without touching the
1250    /// world.
1251    ///
1252    /// # Adapter use only
1253    ///
1254    /// The Bevy adapter calls this when the ECS world has already removed the
1255    /// entity. Application code should despawn via the world, which triggers
1256    /// the adapter hook automatically.
1257    ///
1258    /// # Panics
1259    ///
1260    /// Panics if the entity is server-owned without delegation, or if the
1261    /// client does not hold `Granted` authority over a delegated entity.
1262    pub fn despawn_entity_worldless(&mut self, world_entity: &E) {
1263        let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(world_entity) else {
1264            warn!("attempting to despawn entity that has already been despawned?");
1265            return;
1266        };
1267        if !self.global_world_manager.has_entity(&global_entity) {
1268            warn!("attempting to despawn entity that has already been despawned?");
1269            return;
1270        }
1271
1272        // check whether we have authority to despawn this entity
1273        if let Some(owner) = self.global_world_manager.entity_owner(&global_entity) {
1274            if owner.is_server() {
1275                let is_delegated = self
1276                    .global_world_manager
1277                    .entity_is_delegated(&global_entity);
1278                if !is_delegated {
1279                    panic!("attempting to despawn entity that is not yet delegated. Delegation needs some time to be confirmed by the Server, so check that a despawn is possible by calling `commands.entity(..).replication_config(..).is_delegated()` first.");
1280                }
1281                if self
1282                    .global_world_manager
1283                    .entity_authority_status(&global_entity)
1284                    != Some(EntityAuthStatus::Granted)
1285                {
1286                    panic!("attempting to despawn entity that we do not have authority over");
1287                }
1288            }
1289        } else {
1290            panic!("attempting to despawn entity that has no owner");
1291        }
1292
1293        if let Some(connection) = &mut self.server_connection {
1294            //remove entity from server connection
1295            connection.base.world_manager.despawn_entity(&global_entity);
1296        }
1297
1298        // Remove from ECS Record
1299        self.global_world_manager
1300            .host_despawn_entity(&global_entity);
1301    }
1302
1303    /// Adds a Component to an Entity
1304    pub(crate) fn insert_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1305        &mut self,
1306        world: &mut W,
1307        entity: &E,
1308        mut component: R,
1309    ) {
1310        if !world.has_entity(entity) {
1311            panic!("attempted to add component to non-existent entity");
1312        }
1313
1314        let component_kind = component.kind();
1315
1316        // Check if client has permission to mutate this entity
1317        // For client-owned entities: check if this client is the owner
1318        // For delegated entities: check if client has Granted authority
1319        // If not, silently ignore the mutation (matches test expectation that updates are ignored)
1320        if let Ok(global_entity) = self.global_entity_map.entity_to_global_entity(entity) {
1321            let owner = self.global_world_manager.entity_owner(&global_entity);
1322            let is_delegated = self
1323                .global_world_manager
1324                .entity_is_delegated(&global_entity);
1325
1326            let can_mutate = if is_delegated {
1327                // For delegated entities, check authority status
1328                self.global_world_manager
1329                    .entity_authority_status(&global_entity)
1330                    == Some(EntityAuthStatus::Granted)
1331            } else if let Some(owner) = owner {
1332                // For client-owned non-delegated entities, owner can always mutate
1333                owner.is_client()
1334            } else {
1335                // No owner info - cannot mutate
1336                false
1337            };
1338
1339            if !can_mutate {
1340                // Client doesn't have permission - silently ignore the mutation
1341                return;
1342            }
1343        }
1344
1345        if world.has_component_of_kind(entity, &component_kind) {
1346            // Entity already has this Component type yet, update Component
1347
1348            let Some(mut component_mut) = world.component_mut::<R>(entity) else {
1349                panic!("Should never happen because we checked for this above");
1350            };
1351            component_mut.mirror(&component);
1352        } else {
1353            // Entity does not have this Component type yet, initialize Component
1354
1355            self.insert_component_worldless(entity, &mut component);
1356
1357            // actually insert component into world
1358            world.insert_component(entity, component);
1359        }
1360    }
1361
1362    // For debugging purposes only
1363    /// Returns the registered name of the component identified by `component_kind`; intended for debug logging.
1364    pub fn component_name(&self, component_kind: &ComponentKind) -> String {
1365        self.protocol.component_kinds.kind_to_name(component_kind)
1366    }
1367
1368    /// Registers a component insertion with the replication layer without
1369    /// touching the world's component storage.
1370    ///
1371    /// # Adapter use only
1372    ///
1373    /// The Bevy adapter calls this when the component already exists in the
1374    /// ECS world. Application code should insert components via the world.
1375    pub fn insert_component_worldless(&mut self, world_entity: &E, component: &mut dyn Replicate) {
1376        let component_kind = component.kind();
1377
1378        let global_entity = self
1379            .global_entity_map
1380            .entity_to_global_entity(world_entity)
1381            .unwrap();
1382
1383        // When authority is granted for a previously-remote delegated entity
1384        // (server calls give_authority while the entity is already in scope),
1385        // entity_complete_delegation has already registered this component in
1386        // the GlobalDiffHandler and set the Property to Delegated state.
1387        // Re-entering here would double-panic in both host_insert_component
1388        // and Property::enable_delegation.  Skip entirely.
1389        if self
1390            .global_world_manager
1391            .component_already_host_registered(&global_entity, &component_kind)
1392        {
1393            return;
1394        }
1395
1396        // Register component in GlobalDiffHandler FIRST (before inserting into connection)
1397        // This ensures that when insert_component is called on the connection's world_manager,
1398        // the component is already registered in GlobalDiffHandler, allowing UserDiffHandler
1399        // to successfully register it.
1400        self.global_world_manager.host_insert_component(
1401            &self.protocol.component_kinds,
1402            &global_entity,
1403            component,
1404        );
1405
1406        // insert component into server connection
1407        if let Some(connection) = &mut self.server_connection {
1408            // insert component into server connection
1409            if connection
1410                .base
1411                .world_manager
1412                .has_global_entity(&global_entity)
1413            {
1414                connection
1415                    .base
1416                    .world_manager
1417                    .insert_component(&global_entity, &component_kind);
1418            } else {
1419                warn!("Attempting to insert component into a non-existent entity in the server connection. This should not happen.");
1420            }
1421        } else {
1422            warn!("Attempting to insert component into a non-existent entity in the server connection. This should not happen.");
1423        }
1424
1425        // if entity is delegated, convert over
1426        if self
1427            .global_world_manager
1428            .entity_is_delegated(&global_entity)
1429        {
1430            let accessor = self
1431                .global_world_manager
1432                .get_entity_auth_accessor(&global_entity);
1433            component.enable_delegation(&accessor, None)
1434        }
1435    }
1436
1437    /// Removes a Component from an Entity
1438    pub(crate) fn remove_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1439        &mut self,
1440        world: &mut W,
1441        entity: &E,
1442    ) -> Option<R> {
1443        // get component key from type
1444        let component_kind = ComponentKind::of::<R>();
1445
1446        self.remove_component_worldless(entity, &component_kind);
1447
1448        // remove from world
1449        world.remove_component::<R>(entity)
1450    }
1451
1452    /// Registers a component removal with the replication layer without
1453    /// touching the world's component storage.
1454    ///
1455    /// # Adapter use only
1456    ///
1457    /// The Bevy adapter calls this when the component has already been removed
1458    /// from the ECS world.
1459    pub fn remove_component_worldless(&mut self, world_entity: &E, component_kind: &ComponentKind) {
1460        let global_entity = self
1461            .global_entity_map
1462            .entity_to_global_entity(world_entity)
1463            .unwrap();
1464
1465        // remove component from server connection
1466        if let Some(connection) = &mut self.server_connection {
1467            connection
1468                .base
1469                .world_manager
1470                .remove_component(&global_entity, component_kind);
1471        }
1472
1473        // cleanup all other loose ends
1474        self.global_world_manager
1475            .host_remove_component(&global_entity, component_kind);
1476    }
1477
1478    pub(crate) fn publish_entity(&mut self, global_entity: &GlobalEntity, client_is_origin: bool) {
1479        if client_is_origin {
1480            // Send PublishEntity action via EntityActionEvent system
1481            let Some(connection) = &mut self.server_connection else {
1482                return;
1483            };
1484            connection
1485                .base
1486                .world_manager
1487                .send_publish(HostType::Client, global_entity);
1488        } else if self
1489            .global_world_manager
1490            .entity_replication_config(global_entity)
1491            != Some(Publicity::Private)
1492        {
1493            panic!("Server can only publish Private entities");
1494        }
1495        self.global_world_manager.entity_publish(global_entity);
1496        // don't need to publish the Entity/Component via the World here, because Remote entities work the same whether they are published or not
1497    }
1498
1499    pub(crate) fn unpublish_entity(
1500        &mut self,
1501        global_entity: &GlobalEntity,
1502        client_is_origin: bool,
1503    ) {
1504        if client_is_origin {
1505            // Send UnpublishEntity action via EntityActionEvent system
1506            let Some(connection) = &mut self.server_connection else {
1507                return;
1508            };
1509            connection
1510                .base
1511                .world_manager
1512                .send_unpublish(HostType::Client, global_entity);
1513        } else if self
1514            .global_world_manager
1515            .entity_replication_config(global_entity)
1516            != Some(Publicity::Public)
1517        {
1518            panic!("Server can only unpublish Public entities");
1519        }
1520        self.global_world_manager.entity_unpublish(global_entity);
1521        // don't need to publish the Entity/Component via the World here, because Remote entities work the same whether they are published or not
1522    }
1523
1524    pub(crate) fn entity_enable_delegation<W: WorldMutType<E>>(
1525        &mut self,
1526        world: &mut W,
1527        global_entity: &GlobalEntity,
1528        world_entity: &E,
1529        client_is_origin: bool,
1530    ) {
1531        // this should happen BEFORE the world entity/component has been translated over to Delegated
1532        self.global_world_manager
1533            .entity_register_auth_for_delegation(global_entity);
1534
1535        if client_is_origin {
1536            // info!(
1537            //     "CLIENT: Sending EnableDelegation to server for {:?}",
1538            //     global_entity
1539            // );
1540
1541            // Send EnableDelegationEntity action via EntityActionEvent system
1542            let Some(connection) = &mut self.server_connection else {
1543                return;
1544            };
1545            connection.base.world_manager.send_enable_delegation(
1546                HostType::Client,
1547                true,
1548                global_entity,
1549            );
1550        } else {
1551            self.entity_complete_delegation(world, global_entity, world_entity);
1552            for component_kind in world.component_kinds(world_entity) {
1553                if !self
1554                    .global_world_manager
1555                    .entity_has_component(global_entity, &component_kind)
1556                {
1557                    self.global_world_manager
1558                        .remote_insert_component(global_entity, &component_kind);
1559                }
1560            }
1561            self.global_world_manager
1562                .entity_update_authority(global_entity, EntityAuthStatus::Available);
1563        }
1564    }
1565
1566    fn entity_complete_delegation<W: WorldMutType<E>>(
1567        &mut self,
1568        world: &mut W,
1569        global_entity: &GlobalEntity,
1570        world_entity: &E,
1571    ) {
1572        // info!("client.entity_complete_delegation({:?})", global_entity);
1573
1574        world.entity_enable_delegation(
1575            &self.protocol.component_kinds,
1576            &self.global_entity_map,
1577            &self.global_world_manager,
1578            world_entity,
1579        );
1580
1581        // this should happen AFTER the world entity/component has been translated over to Delegated
1582        self.global_world_manager
1583            .entity_enable_delegation(global_entity);
1584    }
1585
1586    pub(crate) fn entity_disable_delegation<W: WorldMutType<E>>(
1587        &mut self,
1588        world: &mut W,
1589        global_entity: &GlobalEntity,
1590        world_entity: &E,
1591        client_is_origin: bool,
1592    ) {
1593        info!("client.entity_disable_delegation");
1594        if client_is_origin {
1595            panic!("Cannot disable delegation from Client. Server owns all delegated Entities.");
1596        }
1597
1598        // Snapshot authority status BEFORE clearing delegation
1599        let had_granted = self
1600            .global_world_manager
1601            .entity_authority_status(global_entity)
1602            == Some(EntityAuthStatus::Granted);
1603
1604        // Clear delegation + authority semantics
1605        self.global_world_manager
1606            .entity_disable_delegation(global_entity);
1607        world.entity_disable_delegation(world_entity);
1608
1609        // Emit AuthLost (AuthReset) if client had Granted authority
1610        if had_granted {
1611            self.incoming_world_events.push_auth_reset(*world_entity);
1612        }
1613
1614        // Cleanup connection state (despawn from connection's world_manager, but NOT from client world)
1615        if let Some(connection) = &mut self.server_connection {
1616            connection.base.world_manager.despawn_entity(global_entity);
1617        }
1618
1619        // Note: We do NOT call despawn_entity_worldless here.
1620        // Disabling delegation clears authority semantics; entity remains alive in the client world.
1621        // The entity continues normal replication as undelegated.
1622    }
1623
1624    pub(crate) fn entity_update_authority(
1625        &mut self,
1626        global_entity: &GlobalEntity,
1627        world_entity: &E,
1628        new_auth_status: EntityAuthStatus,
1629    ) {
1630        let old_auth_status = self
1631            .global_world_manager
1632            .entity_authority_status(global_entity)
1633            .unwrap();
1634
1635        self.global_world_manager
1636            .entity_update_authority(global_entity, new_auth_status);
1637
1638        // Count when authority state is actually mutated
1639        #[cfg(feature = "e2e_debug")]
1640        if new_auth_status == EntityAuthStatus::Granted {
1641            use crate::counters::CLIENT_HANDLE_SET_AUTH;
1642            use std::sync::atomic::Ordering;
1643            CLIENT_HANDLE_SET_AUTH.fetch_add(1, Ordering::Relaxed);
1644        }
1645
1646        // Update RemoteEntityChannel's internal AuthChannel status (for migrated entities)
1647        // This ensures the channel's state machine stays in sync with the global tracker
1648        if let Some(connection) = &mut self.server_connection {
1649            // Check if entity exists as RemoteEntity
1650            let channel_status_before = connection
1651                .base
1652                .world_manager
1653                .get_remote_entity_auth_status(global_entity);
1654
1655            // Only sync if entity exists as RemoteEntity (i.e., migration completed)
1656            if channel_status_before.is_some() {
1657                connection
1658                    .base
1659                    .world_manager
1660                    .remote_receive_set_auth(global_entity, new_auth_status);
1661            } else {
1662                warn!(
1663                    "Entity {:?} not yet migrated to RemoteEntity - channel sync skipped",
1664                    global_entity
1665                );
1666            }
1667        } else {
1668            debug!("  No server connection - skipping channel sync");
1669        }
1670
1671        // info!(
1672        //     "<-- Received Entity Update Authority message! {:?} -> {:?}",
1673        //     old_auth_status, new_auth_status
1674        // );
1675
1676        // Updated Host Manager
1677        match (old_auth_status, new_auth_status) {
1678            // Grant authority (from any state)
1679            (EntityAuthStatus::Requested, EntityAuthStatus::Granted)
1680            | (EntityAuthStatus::Denied, EntityAuthStatus::Granted)
1681            | (EntityAuthStatus::Available, EntityAuthStatus::Granted) => {
1682                // Register and emit grant event
1683                self.server_connection
1684                    .as_mut()
1685                    .unwrap()
1686                    .base
1687                    .world_manager
1688                    .register_authed_entity(&self.global_world_manager, global_entity);
1689                self.incoming_world_events.push_auth_grant(*world_entity);
1690                #[cfg(feature = "e2e_debug")]
1691                {
1692                    use crate::counters::CLIENT_EMIT_AUTH_GRANTED_EVENT;
1693                    use std::sync::atomic::Ordering;
1694                    CLIENT_EMIT_AUTH_GRANTED_EVENT.fetch_add(1, Ordering::Relaxed);
1695                }
1696            }
1697            // Lose authority (must deregister and emit reset)
1698            (EntityAuthStatus::Granted, EntityAuthStatus::Available)
1699            | (EntityAuthStatus::Granted, EntityAuthStatus::Denied) => {
1700                // Deregister and emit reset event
1701                self.server_connection
1702                    .as_mut()
1703                    .unwrap()
1704                    .base
1705                    .world_manager
1706                    .deregister_authed_entity(&self.global_world_manager, global_entity);
1707                self.incoming_world_events.push_auth_reset(*world_entity);
1708            }
1709            // Request denied (only when Requested -> Denied)
1710            (EntityAuthStatus::Requested, EntityAuthStatus::Denied) => {
1711                // Emit denied event, but do NOT deregister (never had authority)
1712                self.incoming_world_events.push_auth_deny(*world_entity);
1713            }
1714            // Release flow
1715            (EntityAuthStatus::Releasing, EntityAuthStatus::Available) => {
1716                self.server_connection
1717                    .as_mut()
1718                    .unwrap()
1719                    .base
1720                    .world_manager
1721                    .deregister_authed_entity(&self.global_world_manager, global_entity);
1722                self.incoming_world_events.push_auth_reset(*world_entity);
1723            }
1724            (EntityAuthStatus::Releasing, EntityAuthStatus::Denied) => {
1725                // Server takeover during release
1726                self.server_connection
1727                    .as_mut()
1728                    .unwrap()
1729                    .base
1730                    .world_manager
1731                    .deregister_authed_entity(&self.global_world_manager, global_entity);
1732                self.incoming_world_events.push_auth_reset(*world_entity);
1733            }
1734            (EntityAuthStatus::Releasing, EntityAuthStatus::Granted) => {
1735                // Grant arrived during release - treat as Available
1736                self.global_world_manager
1737                    .entity_update_authority(global_entity, EntityAuthStatus::Available);
1738            }
1739            // Available → Denied. Fires when another client (or the server)
1740            // takes authority for an entity that this client had been free to
1741            // request. Per contract `entity-delegation-15`: every transition
1742            // into Denied emits exactly one AuthDenied event so the
1743            // application can react (e.g. close a request UI, mark the
1744            // entity read-only).
1745            (EntityAuthStatus::Available, EntityAuthStatus::Denied) => {
1746                self.incoming_world_events.push_auth_deny(*world_entity);
1747            }
1748            (EntityAuthStatus::Denied, EntityAuthStatus::Available) => {
1749                // Release by someone else - emit reset
1750                self.incoming_world_events.push_auth_reset(*world_entity);
1751            }
1752            (EntityAuthStatus::Available, EntityAuthStatus::Available)
1753            | (EntityAuthStatus::Denied, EntityAuthStatus::Denied)
1754            | (EntityAuthStatus::Granted, EntityAuthStatus::Granted)
1755            | (EntityAuthStatus::Requested, EntityAuthStatus::Requested)
1756            | (EntityAuthStatus::Releasing, EntityAuthStatus::Releasing) => {
1757                // Idempotent — same-state transitions are no-ops. The grant/take/release
1758                // side effects (register, deregister, push_auth_grant, push_auth_reset)
1759                // already fired on the original transition into this state; receiving a
1760                // duplicate "you are still in state X" message must not double-fire them.
1761                // Granted→Granted in particular happens on the publication migration path
1762                // where MigrateResponse sets Granted (client.rs:2167) and an explicit
1763                // EntityUpdateAuth(Granted) follows.
1764            }
1765            (_, _) => {
1766                panic!(
1767                    "-- Entity {:?} updated authority, not handled -- {:?} -> {:?}",
1768                    global_entity, old_auth_status, new_auth_status
1769                );
1770            }
1771        }
1772    }
1773
1774    // Private methods
1775
1776    fn check_client_authoritative_allowed(&self) {
1777        if !self.protocol.client_authoritative_entities {
1778            panic!("Cannot perform this operation: Client Authoritative Entities are not enabled! Enable them in the Protocol, with the `enable_client_authoritative_entities() method, and note that if you do enable them, to make sure you handle all Spawn/Insert/Update events in the Server, as this may be an attack vector.")
1779        }
1780    }
1781
1782    fn maintain_socket(&mut self) {
1783        // Tick bandwidth monitors to clear expired packets
1784        self.io.tick_bandwidth_monitors();
1785
1786        if self.server_connection.is_none() {
1787            self.maintain_handshake();
1788        }
1789        // Note: maintain_handshake may have just established the connection,
1790        // so we check again (not else) to immediately process any remaining
1791        // packets (e.g. entity replication data) that arrived in the same
1792        // transport batch as the final handshake response.
1793        if self.server_connection.is_some() {
1794            self.maintain_connection();
1795        }
1796    }
1797
1798    fn maintain_handshake(&mut self) {
1799        // No connection established yet
1800
1801        if !self.io.is_loaded() {
1802            return;
1803        }
1804
1805        if !self.io.is_authenticated() {
1806            match self.io.recv_auth() {
1807                IdentityReceiverResult::Success(id_token) => {
1808                    self.handshake_manager.set_identity_token(id_token);
1809                }
1810                IdentityReceiverResult::Waiting => {
1811                    return;
1812                }
1813                IdentityReceiverResult::ErrorResponseCode(code) => {
1814                    let old_socket_addr_result = self.io.server_addr();
1815
1816                    // reset connection
1817                    self.io = Io::new(
1818                        &self.client_config.connection.bandwidth_measure_duration,
1819                        &self.protocol.compression,
1820                    );
1821
1822                    if code == 401 {
1823                        // push out rejection
1824                        match old_socket_addr_result {
1825                            Ok(old_socket_addr) => {
1826                                self.incoming_world_events
1827                                    .push_rejection(&old_socket_addr, RejectReason::Auth);
1828                            }
1829                            Err(err) => {
1830                                self.incoming_world_events.push_error(err);
1831                            }
1832                        }
1833                    } else {
1834                        // push out error
1835                        self.incoming_world_events
1836                            .push_error(NaiaClientError::IdError(code));
1837                    }
1838
1839                    return;
1840                }
1841            }
1842        }
1843
1844        // receive from socket
1845        loop {
1846            match self.io.recv_reader() {
1847                Ok(Some(mut reader)) => {
1848                    match self.handshake_manager.recv(&mut reader) {
1849                        Some(HandshakeResult::Connected(time_manager)) => {
1850                            // new connect!
1851                            self.server_connection = Some(Connection::new(
1852                                &self.client_config.connection,
1853                                &self.protocol.channel_kinds,
1854                                *time_manager,
1855                                &self.global_world_manager,
1856                                self.client_config.jitter_buffer,
1857                                &self.protocol.component_kinds,
1858                            ));
1859                            self.on_connect();
1860
1861                            let server_addr = self.server_address_unwrapped();
1862                            self.incoming_world_events.push_connection(&server_addr);
1863
1864                            // Stop reading here — any remaining packets in
1865                            // the transport (e.g. Data packets with entity
1866                            // replication) must be processed through
1867                            // maintain_connection, not the handshake loop
1868                            // which silently discards non-handshake packets.
1869                            break;
1870                        }
1871                        Some(HandshakeResult::Rejected(reason)) => {
1872                            info!("Client: Received HandshakeResult::Rejected({:?})", reason);
1873                            let server_addr = self.server_address_unwrapped();
1874                            self.incoming_world_events
1875                                .push_rejection(&server_addr, reason);
1876                            self.disconnect_reset_connection();
1877                            break;
1878                        }
1879                        None => {}
1880                    }
1881                }
1882                Ok(None) => {
1883                    break;
1884                }
1885                Err(error) => {
1886                    self.incoming_world_events
1887                        .push_error(NaiaClientError::Wrapped(Box::new(error)));
1888                }
1889            }
1890        }
1891    }
1892
1893    fn maintain_connection(&mut self) {
1894        // connection already established
1895
1896        let Some(connection) = self.server_connection.as_mut() else {
1897            panic!("Should have checked for this above");
1898        };
1899
1900        Self::handle_heartbeats(connection, &mut self.io);
1901        Self::handle_pings(connection, &mut self.io);
1902        Self::handle_empty_acks(connection, &mut self.io);
1903
1904        let mut received_any = false;
1905
1906        // receive from socket
1907        loop {
1908            match self.io.recv_reader() {
1909                Ok(Some(mut reader)) => {
1910                    connection.mark_heard();
1911
1912                    let header = match StandardHeader::de(&mut reader) {
1913                        Ok(h) => h,
1914                        Err(_e) => {
1915                            continue;
1916                        }
1917                    };
1918                    match header.packet_type {
1919                        PacketType::Data => {
1920                            // Count world packets received from transport
1921                            #[cfg(feature = "e2e_debug")]
1922                            {
1923                                use crate::counters::CLIENT_WORLD_PKTS_RECV;
1924                                use std::sync::atomic::Ordering;
1925                                CLIENT_WORLD_PKTS_RECV.fetch_add(1, Ordering::Relaxed);
1926                            }
1927                            // continue
1928                        }
1929                        PacketType::Heartbeat | PacketType::Ping | PacketType::Pong => {
1930                            // these packet types are allowed when
1931                            // connection is established
1932                        }
1933                        PacketType::Handshake => {
1934                            // Server sent a handshake packet while connected -
1935                            // this should only be a Disconnect message
1936                            let Ok(handshake_header) = HandshakeHeader::de(&mut reader) else {
1937                                warn!("unable to parse handshake header from server");
1938                                continue;
1939                            };
1940                            if matches!(handshake_header, HandshakeHeader::Disconnect) {
1941                                info!("Received disconnect from server");
1942                                self.server_disconnect = true;
1943                            }
1944                            continue;
1945                        }
1946                    }
1947
1948                    // Read incoming header
1949                    received_any = true;
1950                    connection.process_incoming_header(&header);
1951
1952                    // read server tick
1953                    let Ok(server_tick) = Tick::de(&mut reader) else {
1954                        warn!("unable to parse server_tick from packet");
1955                        continue;
1956                    };
1957
1958                    // read time since last tick
1959                    let Ok(server_tick_instant) = GameInstant::de(&mut reader) else {
1960                        warn!("unable to parse server_tick_instant from packet");
1961                        continue;
1962                    };
1963
1964                    connection
1965                        .time_manager
1966                        .recv_tick_instant(&server_tick, &server_tick_instant);
1967
1968                    // Handle based on PacketType
1969                    match header.packet_type {
1970                        PacketType::Data => {
1971                            connection.base.mark_should_send_empty_ack();
1972
1973                            if connection
1974                                .buffer_data_packet(&server_tick, &mut reader)
1975                                .is_err()
1976                            {
1977                                warn!("unable to parse data packet");
1978                                continue;
1979                            }
1980                        }
1981                        PacketType::Heartbeat => {
1982                            // already marked as heard, job done
1983                        }
1984                        PacketType::Ping => {
1985                            let Ok(ping_index) = BaseTimeManager::read_ping(&mut reader) else {
1986                                panic!("unable to read ping index");
1987                            };
1988                            BaseTimeManager::send_pong(connection, &mut self.io, ping_index);
1989                        }
1990                        PacketType::Pong => {
1991                            if connection.time_manager.read_pong(&mut reader).is_err() {
1992                                // Malformed pong: skip this sample. RTT estimation
1993                                // recovers on the next successful pong exchange.
1994                                warn!("Client Error: Cannot process pong packet from Server");
1995                            }
1996                        }
1997                        _ => {
1998                            // no other packet types matter when connection
1999                            // is established
2000                        }
2001                    }
2002                }
2003                Ok(None) => {
2004                    break;
2005                }
2006                Err(error) => {
2007                    self.incoming_world_events
2008                        .push_error(NaiaClientError::Wrapped(Box::new(error)));
2009                }
2010            }
2011        }
2012
2013        if received_any {
2014            connection.process_received_commands();
2015        }
2016    }
2017
2018    fn handle_heartbeats(connection: &mut Connection, io: &mut Io) {
2019        // send heartbeats
2020        if connection.base.should_send_heartbeat() {
2021            Self::send_heartbeat_packet(connection, io);
2022        }
2023    }
2024
2025    fn handle_empty_acks(connection: &mut Connection, io: &mut Io) {
2026        // send empty acks
2027        if connection.base.should_send_empty_ack() {
2028            Self::send_heartbeat_packet(connection, io);
2029        }
2030    }
2031
2032    fn send_heartbeat_packet(connection: &mut Connection, io: &mut Io) {
2033        let mut writer = BitWriter::new();
2034
2035        // write header
2036        let _header = connection
2037            .base
2038            .write_header(PacketType::Heartbeat, &mut writer);
2039
2040        // send packet
2041        if io.send_packet(writer.to_packet()).is_err() {
2042            // Heartbeat send failure is not fatal: the server's connection
2043            // timeout will fire if heartbeats stop arriving persistently.
2044            warn!("Client Error: Cannot send heartbeat packet to Server");
2045        }
2046        connection.mark_sent();
2047    }
2048
2049    fn handle_pings(connection: &mut Connection, io: &mut Io) {
2050        // send pings
2051        if connection.time_manager.send_ping(io) {
2052            connection.mark_sent();
2053        }
2054    }
2055
2056    fn disconnect_with_events<W: WorldMutType<E>>(&mut self, world: &mut W, reason: naia_shared::DisconnectReason) {
2057        let server_addr = self.server_address_unwrapped();
2058
2059        self.incoming_world_events.clear();
2060        self.incoming_tick_events.clear();
2061
2062        self.despawn_all_remote_entities(world);
2063        self.disconnect_reset_connection();
2064
2065        self.incoming_world_events.push_disconnection(&server_addr, reason);
2066    }
2067
2068    fn despawn_all_remote_entities<W: WorldMutType<E>>(&mut self, world: &mut W) {
2069        // this is very similar to the newtype method .. can we coalesce and reduce
2070        // duplication?
2071
2072        let Some(connection) = self.server_connection.as_mut() else {
2073            panic!("Client is already disconnected!");
2074        };
2075
2076        let remote_entities = connection.base.world_manager.remote_entities();
2077        let entity_events = SharedGlobalWorldManager::despawn_all_entities(
2078            world,
2079            &self.global_entity_map,
2080            &self.global_world_manager,
2081            remote_entities,
2082        );
2083        self.process_entity_events(world, entity_events);
2084    }
2085
2086    fn disconnect_reset_connection(&mut self) {
2087        self.server_connection = None;
2088
2089        self.io = Io::new(
2090            &self.client_config.connection.bandwidth_measure_duration,
2091            &self.protocol.compression,
2092        );
2093
2094        self.handshake_manager = Box::new(HandshakeManager::new(
2095            self.protocol_id,
2096            self.client_config.send_handshake_interval,
2097            self.client_config.ping_interval,
2098            self.client_config.handshake_pings,
2099        ));
2100
2101        self.manual_disconnect = false;
2102        self.global_world_manager = GlobalWorldManager::new();
2103    }
2104
2105    fn server_address_unwrapped(&self) -> SocketAddr {
2106        // NOTE: may panic if the connection is not yet established!
2107        self.io.server_addr().expect("connection not established!")
2108    }
2109
2110    #[cfg(feature = "e2e_debug")]
2111    pub fn debug_remote_channel_diagnostic(
2112        &self,
2113        remote_entity: &naia_shared::RemoteEntity,
2114    ) -> Option<(
2115        naia_shared::EntityChannelState,
2116        (
2117            naia_shared::SubCommandId,
2118            usize,
2119            Option<naia_shared::SubCommandId>,
2120            usize,
2121        ),
2122    )> {
2123        let Some(connection) = self.server_connection.as_ref() else {
2124            return None;
2125        };
2126        connection
2127            .base
2128            .world_manager
2129            .debug_remote_channel_diagnostic(remote_entity)
2130    }
2131
2132    #[cfg(feature = "e2e_debug")]
2133    pub fn debug_remote_channel_snapshot(
2134        &self,
2135        remote_entity: &naia_shared::RemoteEntity,
2136    ) -> Option<(
2137        naia_shared::EntityChannelState,
2138        Option<naia_shared::MessageIndex>,
2139        usize,
2140        Option<(naia_shared::MessageIndex, naia_shared::EntityMessageType)>,
2141        Option<naia_shared::MessageIndex>,
2142    )> {
2143        let Some(connection) = self.server_connection.as_ref() else {
2144            return None;
2145        };
2146        connection
2147            .base
2148            .world_manager
2149            .debug_remote_channel_snapshot(remote_entity)
2150    }
2151
2152    fn process_entity_events<W: WorldMutType<E>>(
2153        &mut self,
2154        world: &mut W,
2155        entity_events: Vec<EntityEvent>,
2156    ) {
2157        for response_event in entity_events {
2158            // info!(
2159            //     "Client.process_entity_events(), handling response_event: {:?}",
2160            //     response_event.log()
2161            // );
2162            match response_event {
2163                EntityEvent::Spawn(global_entity) => {
2164                    let world_entity = self
2165                        .global_entity_map
2166                        .global_entity_to_entity(&global_entity)
2167                        .unwrap();
2168                    self.incoming_world_events.push_spawn(world_entity);
2169                    self.global_world_manager
2170                        .remote_spawn_entity(&global_entity);
2171                    let Some(connection) = self.server_connection.as_mut() else {
2172                        panic!("Client is disconnected!");
2173                    };
2174                    connection
2175                        .base
2176                        .world_manager
2177                        .remote_spawn_entity(&global_entity); // TODO: move to localworld?
2178                    #[cfg(feature = "e2e_debug")]
2179                    {
2180                        use crate::counters::CLIENT_SCOPE_APPLIED_ADD_E2;
2181                        use std::sync::atomic::Ordering;
2182                        CLIENT_SCOPE_APPLIED_ADD_E2.fetch_add(1, Ordering::Relaxed);
2183                    }
2184                }
2185                EntityEvent::Despawn(global_entity) => {
2186                    let world_entity = self
2187                        .global_entity_map
2188                        .global_entity_to_entity(&global_entity)
2189                        .unwrap();
2190                    // Resource registry maintenance: if this entity was
2191                    // a resource entity, clear the registry record so
2192                    // future has_resource::<R>() calls return false.
2193                    self.resource_registry.remove_by_entity(&global_entity);
2194                    self.incoming_world_events.push_despawn(world_entity);
2195                    if self
2196                        .global_world_manager
2197                        .entity_is_delegated(&global_entity)
2198                    {
2199                        if let Some(status) = self
2200                            .global_world_manager
2201                            .entity_authority_status(&global_entity)
2202                        {
2203                            if status != EntityAuthStatus::Available {
2204                                self.entity_update_authority(
2205                                    &global_entity,
2206                                    &world_entity,
2207                                    EntityAuthStatus::Available,
2208                                );
2209                            }
2210                        }
2211                    }
2212                    self.global_world_manager
2213                        .remove_entity_record(&global_entity);
2214                    self.global_entity_map.despawn_by_global(&global_entity);
2215                    #[cfg(feature = "e2e_debug")]
2216                    {
2217                        use crate::counters::CLIENT_SCOPE_APPLIED_REMOVE_E1;
2218                        use std::sync::atomic::Ordering;
2219                        CLIENT_SCOPE_APPLIED_REMOVE_E1.fetch_add(1, Ordering::Relaxed);
2220                    }
2221                }
2222                EntityEvent::InsertComponent(global_entity, component_kind) => {
2223                    let world_entity = self
2224                        .global_entity_map
2225                        .global_entity_to_entity(&global_entity)
2226                        .unwrap();
2227                    // Resource registry maintenance: if the inserted
2228                    // component is a Replicated Resource kind, record
2229                    // the (TypeId, GlobalEntity) mapping so the bevy
2230                    // adapter's mirror system + has_resource::<R>()
2231                    // lookups work O(1).
2232                    if self.protocol.resource_kinds.is_resource(&component_kind) {
2233                        let type_id: std::any::TypeId = component_kind.into();
2234                        let _ = self.resource_registry.insert_raw(type_id, global_entity);
2235                    }
2236                    self.incoming_world_events
2237                        .push_insert(world_entity, component_kind);
2238
2239                    if !self
2240                        .global_world_manager
2241                        .entity_has_component(&global_entity, &component_kind)
2242                    {
2243                        if self
2244                            .global_world_manager
2245                            .entity_is_delegated(&global_entity)
2246                        {
2247                            // let component_name = self
2248                            //     .protocol
2249                            //     .component_kinds
2250                            //     .kind_to_name(&component_kind);
2251                            // info!(
2252                            //     "Client.process_response_events(), handling InsertComponent for Component: {:?} into delegated Entity: {:?}",
2253                            //     component_name, global_entity
2254                            // );
2255                            world.component_publish(
2256                                &self.protocol.component_kinds,
2257                                &self.global_entity_map,
2258                                &self.global_world_manager,
2259                                &world_entity,
2260                                &component_kind,
2261                            );
2262                            world.component_enable_delegation(
2263                                &self.protocol.component_kinds,
2264                                &self.global_entity_map,
2265                                &self.global_world_manager,
2266                                &world_entity,
2267                                &component_kind,
2268                            );
2269                        }
2270
2271                        self.global_world_manager
2272                            .remote_insert_component(&global_entity, &component_kind);
2273                    }
2274                }
2275                EntityEvent::RemoveComponent(global_entity, component_box) => {
2276                    let component_kind = component_box.kind();
2277                    let world_entity = self
2278                        .global_entity_map
2279                        .global_entity_to_entity(&global_entity)
2280                        .unwrap();
2281                    self.incoming_world_events
2282                        .push_remove(world_entity, component_box);
2283                    if self
2284                        .global_world_manager
2285                        .entity_is_delegated(&global_entity)
2286                    {
2287                        self.remove_component_worldless(&world_entity, &component_kind);
2288                    } else {
2289                        self.global_world_manager
2290                            .remove_component_record(&global_entity, &component_kind);
2291                    }
2292                }
2293                EntityEvent::Publish(global_entity) => {
2294                    let world_entity = self
2295                        .global_entity_map
2296                        .global_entity_to_entity(&global_entity)
2297                        .unwrap();
2298                    self.publish_entity(&global_entity, false);
2299                    self.incoming_world_events.push_publish(world_entity);
2300                }
2301                EntityEvent::Unpublish(global_entity) => {
2302                    let world_entity = self
2303                        .global_entity_map
2304                        .global_entity_to_entity(&global_entity)
2305                        .unwrap();
2306                    self.unpublish_entity(&global_entity, false);
2307                    self.incoming_world_events.push_unpublish(world_entity);
2308                }
2309                EntityEvent::EnableDelegation(global_entity) => {
2310                    #[cfg(feature = "e2e_debug")]
2311                    naia_shared::e2e_trace!(
2312                        "[CLIENT_RECV] EnableDelegation entity={:?}",
2313                        global_entity
2314                    );
2315                    let world_entity = self
2316                        .global_entity_map
2317                        .global_entity_to_entity(&global_entity)
2318                        .unwrap();
2319
2320                    self.entity_enable_delegation(world, &global_entity, &world_entity, false);
2321
2322                    // Send EnableDelegationEntityResponse action via EntityActionEvent system
2323                    let Some(connection) = &mut self.server_connection else {
2324                        return;
2325                    };
2326                    connection
2327                        .base
2328                        .world_manager
2329                        .send_enable_delegation_response(&global_entity); // TODO: move to localworld?
2330                }
2331                EntityEvent::EnableDelegationResponse(_) => {
2332                    panic!("Client should never receive an EnableDelegationEntityResponse event");
2333                }
2334                EntityEvent::DisableDelegation(global_entity) => {
2335                    #[cfg(feature = "e2e_debug")]
2336                    {
2337                        let delegated_at_entry = self
2338                            .global_world_manager
2339                            .entity_is_delegated(&global_entity);
2340                        naia_shared::e2e_trace!(
2341                            "[CLIENT_RECV] DisableDelegation entity={:?} delegated_at_entry={}",
2342                            global_entity,
2343                            delegated_at_entry
2344                        );
2345                    }
2346                    let world_entity = self
2347                        .global_entity_map
2348                        .global_entity_to_entity(&global_entity)
2349                        .unwrap();
2350                    self.entity_disable_delegation(world, &global_entity, &world_entity, false);
2351                }
2352                EntityEvent::RequestAuthority(_global_entity) => {
2353                    panic!("Client should never receive an EntityRequestAuthority event");
2354                }
2355                EntityEvent::ReleaseAuthority(_global_entity) => {
2356                    panic!("Client should never receive an EntityReleaseAuthority event");
2357                }
2358                EntityEvent::SetAuthority(global_entity, new_auth_status) => {
2359                    // Count when SetAuthority successfully converts to EntityEvent (after mapping)
2360                    #[cfg(feature = "e2e_debug")]
2361                    if new_auth_status == EntityAuthStatus::Granted {
2362                        use crate::counters::{CLIENT_RX_SET_AUTH, CLIENT_TO_EVENT_SET_AUTH_OK};
2363                        use std::sync::atomic::Ordering;
2364                        CLIENT_RX_SET_AUTH.fetch_add(1, Ordering::Relaxed);
2365                        CLIENT_TO_EVENT_SET_AUTH_OK.fetch_add(1, Ordering::Relaxed);
2366                    }
2367                    let world_entity = self
2368                        .global_entity_map
2369                        .global_entity_to_entity(&global_entity)
2370                        .unwrap();
2371                    self.entity_update_authority(&global_entity, &world_entity, new_auth_status);
2372                }
2373                EntityEvent::MigrateResponse(global_entity, new_remote_entity) => {
2374                    // Validate we have a valid world entity
2375                    let world_entity = match self
2376                        .global_entity_map
2377                        .global_entity_to_entity(&global_entity)
2378                    {
2379                        Ok(entity) => entity,
2380                        Err(_) => {
2381                            warn!(
2382                                "Received MigrateResponse for unknown global entity: {:?}",
2383                                global_entity
2384                            );
2385                            return;
2386                        }
2387                    };
2388
2389                    // Scope the connection borrow to complete migration steps
2390                    {
2391                        let Some(connection) = &mut self.server_connection else {
2392                            warn!("Received MigrateResponse without active server connection");
2393                            return;
2394                        };
2395
2396                        let old_host_entity = match connection
2397                            .base
2398                            .world_manager
2399                            .entity_converter()
2400                            .global_entity_to_host_entity(&global_entity)
2401                        {
2402                            Ok(entity) => entity,
2403                            Err(_) => {
2404                                warn!(
2405                                    "Entity {:?} does not exist as HostEntity before migration",
2406                                    global_entity
2407                                );
2408                                return;
2409                            }
2410                        };
2411
2412                        // Extract and buffer outgoing commands to preserve pending operations
2413                        let buffered_commands = connection
2414                            .base
2415                            .world_manager
2416                            .extract_host_entity_commands(&global_entity);
2417
2418                        // Extract component state to preserve during migration
2419                        let component_kinds = connection
2420                            .base
2421                            .world_manager
2422                            .extract_host_component_kinds(&global_entity);
2423
2424                        // Remove old HostEntityChannel
2425                        connection
2426                            .base
2427                            .world_manager
2428                            .remove_host_entity(&global_entity);
2429
2430                        // Create new RemoteEntityChannel with preserved component state
2431                        connection.base.world_manager.insert_remote_entity(
2432                            &global_entity,
2433                            new_remote_entity,
2434                            component_kinds,
2435                        );
2436
2437                        // Install entity redirect for old references
2438                        let old_entity = OwnedLocalEntity::Host { id: old_host_entity.value(), is_static: false };
2439                        let new_entity = new_remote_entity.copy_to_owned();
2440                        connection
2441                            .base
2442                            .world_manager
2443                            .install_entity_redirect(old_entity, new_entity);
2444
2445                        // Update pending command packet references
2446                        connection
2447                            .base
2448                            .world_manager
2449                            .update_sent_command_entity_refs(
2450                                &global_entity,
2451                                old_entity,
2452                                new_entity,
2453                            );
2454
2455                        // Replay buffered commands
2456                        for command in buffered_commands {
2457                            if command.is_valid_for_remote_entity() {
2458                                connection
2459                                    .base
2460                                    .world_manager
2461                                    .replay_entity_command(&global_entity, command);
2462                            }
2463                        }
2464
2465                        // Update RemoteEntityChannel's internal AuthChannel status
2466                        // After migration, grant authority back to the creating client
2467                        connection
2468                            .base
2469                            .world_manager
2470                            .remote_receive_set_auth(&global_entity, EntityAuthStatus::Granted);
2471                    }
2472
2473                    // Register the entity with the client's auth handler
2474                    // before completing delegation. Without this, the
2475                    // `entity_complete_delegation` → `world.entity_enable_delegation`
2476                    // → `component_enable_delegation` chain panics in
2477                    // `host_auth_handler::get_accessor` because the
2478                    // owning-client (A's) MigrateResponse path skips the
2479                    // `entity_register_auth_for_delegation` call that the
2480                    // EnableDelegation event path uses for non-owners.
2481                    // Both paths must produce the same registered state
2482                    // before components are flipped to delegated mode.
2483                    //
2484                    // Idempotent guard: some flows (e.g. the publication
2485                    // migration path tested by [entity-publication-08])
2486                    // register the entity earlier via the EnableDelegation
2487                    // event before MigrateResponse arrives — calling
2488                    // `register_entity` again would panic.
2489                    if self
2490                        .global_world_manager
2491                        .entity_authority_status(&global_entity)
2492                        .is_none()
2493                    {
2494                        self.global_world_manager
2495                            .entity_register_auth_for_delegation(&global_entity);
2496                    }
2497
2498                    // Complete delegation in global world manager
2499                    self.entity_complete_delegation(world, &global_entity, &world_entity);
2500
2501                    // Update global authority status
2502                    self.global_world_manager
2503                        .entity_update_authority(&global_entity, EntityAuthStatus::Granted);
2504
2505                    // Emit AuthGrant event
2506                    self.incoming_world_events.push_auth_grant(world_entity);
2507                    #[cfg(feature = "e2e_debug")]
2508                    {
2509                        use crate::counters::CLIENT_EMIT_AUTH_GRANTED_EVENT;
2510                        use std::sync::atomic::Ordering;
2511                        CLIENT_EMIT_AUTH_GRANTED_EVENT.fetch_add(1, Ordering::Relaxed);
2512                    }
2513                }
2514                EntityEvent::UpdateComponent(tick, global_entity, component_kind) => {
2515                    let world_entity = self
2516                        .global_entity_map
2517                        .global_entity_to_entity(&global_entity)
2518                        .unwrap();
2519                    self.incoming_world_events
2520                        .push_update(tick, world_entity, component_kind);
2521                }
2522            }
2523        }
2524    }
2525}
2526
2527impl<E: Hash + Copy + Eq + Sync + Send> EntityAndGlobalEntityConverter<E> for Client<E> {
2528    fn global_entity_to_entity(
2529        &self,
2530        global_entity: &GlobalEntity,
2531    ) -> Result<E, EntityDoesNotExistError> {
2532        self.global_entity_map
2533            .global_entity_to_entity(global_entity)
2534    }
2535
2536    fn entity_to_global_entity(
2537        &self,
2538        world_entity: &E,
2539    ) -> Result<GlobalEntity, EntityDoesNotExistError> {
2540        self.global_entity_map.entity_to_global_entity(world_entity)
2541    }
2542}
2543
2544/// The lifecycle state of the client's connection to the server.
2545///
2546/// Retrieved via [`Client::connection_status`].
2547#[derive(Copy, Clone, PartialEq, Eq)]
2548pub enum ConnectionStatus {
2549    /// No socket is open; [`connect`](Client::connect) has not been called.
2550    Disconnected,
2551    /// The socket is open and the handshake is in progress.
2552    Connecting,
2553    /// The handshake is complete and the connection is active.
2554    Connected,
2555    /// [`disconnect`](Client::disconnect) has been called; awaiting confirmation.
2556    Disconnecting,
2557}
2558
2559impl ConnectionStatus {
2560    /// Returns `true` if the client is fully disconnected.
2561    pub fn is_disconnected(&self) -> bool {
2562        self == &ConnectionStatus::Disconnected
2563    }
2564
2565    /// Returns `true` if the handshake is in progress.
2566    pub fn is_connecting(&self) -> bool {
2567        self == &ConnectionStatus::Connecting
2568    }
2569
2570    /// Returns `true` if the connection is active.
2571    pub fn is_connected(&self) -> bool {
2572        self == &ConnectionStatus::Connected
2573    }
2574
2575    /// Returns `true` if the client is tearing down an active connection.
2576    pub fn is_disconnecting(&self) -> bool {
2577        self == &ConnectionStatus::Disconnecting
2578    }
2579}
2580
2581cfg_if! {
2582    if #[cfg(feature = "interior_visibility")] {
2583
2584        use naia_shared::LocalEntity;
2585
2586        impl<E: Copy + Eq + Hash + Send + Sync> Client<E> {
2587            /// Returns all LocalEntity IDs for entities replicated to the server.
2588            ///
2589            /// Returns the set of LocalEntity IDs that currently exist for the server
2590            /// (i.e., all entities replicated to the server).
2591            /// The ordering doesn't matter.
2592            ///
2593            /// # Panics
2594            /// Panics if not connected to server
2595            pub fn local_entities(&self) -> Vec<LocalEntity> {
2596                let connection = self
2597                    .server_connection
2598                    .as_ref()
2599                    .expect("Server connection does not exist");
2600
2601                connection.base.world_manager.local_entities()
2602            }
2603
2604            /// Retrieves an EntityRef that exposes read-only operations for the Entity
2605            /// identified by the given LocalEntity for the server.
2606            ///
2607            /// Returns `None` if:
2608            /// - The server is not connected
2609            /// - The LocalEntity doesn't exist for the server
2610            /// - The entity does not exist in the world
2611            pub fn local_entity<W: WorldRefType<E>>(
2612                &self,
2613                world: W,
2614                local_entity: &LocalEntity,
2615            ) -> Option<EntityRef<'_, E, W>> {
2616                let world_entity = self.local_to_world_entity(local_entity)?;
2617                if !world.has_entity(&world_entity) {
2618                    return None;
2619                }
2620                Some(self.entity(world, &world_entity))
2621            }
2622
2623            /// Retrieves an EntityMut that exposes read and write operations for the Entity
2624            /// identified by the given LocalEntity for the server.
2625            ///
2626            /// Returns `None` if:
2627            /// - The server is not connected
2628            /// - The LocalEntity doesn't exist for the server
2629            /// - The entity does not exist in the world
2630            pub fn local_entity_mut<W: WorldMutType<E>>(
2631                &mut self,
2632                world: W,
2633                local_entity: &LocalEntity,
2634            ) -> Option<EntityMut<'_, E, W>> {
2635                let world_entity = self.local_to_world_entity(local_entity)?;
2636                if !world.has_entity(&world_entity) {
2637                    return None;
2638                }
2639                Some(self.entity_mut(world, &world_entity))
2640            }
2641
2642            fn local_to_world_entity(
2643                &self,
2644                local_entity: &LocalEntity
2645            ) -> Option<E> {
2646                let connection = self.server_connection.as_ref()?;
2647                let converter = connection.base.world_manager.entity_converter();
2648
2649                let owned_local_entity: OwnedLocalEntity = (*local_entity).into();
2650                let global_entity = converter.owned_entity_to_global_entity(&owned_local_entity).ok()?;
2651                let world_entity = self
2652                    .global_entity_map
2653                    .global_entity_to_entity(&global_entity)
2654                    .ok()?;
2655
2656                Some(world_entity)
2657            }
2658
2659            pub(crate) fn world_to_local_entity(
2660                &self,
2661                world_entity: &E,
2662            ) -> Option<LocalEntity> {
2663                let global_entity = self.global_entity_map.entity_to_global_entity(world_entity).ok()?;
2664
2665                let connection = self.server_connection.as_ref()?;
2666                let converter = connection.base.world_manager.entity_converter();
2667                let owned_entity = converter.global_entity_to_owned_entity(&global_entity).ok()?;
2668
2669                Some(LocalEntity::from(owned_entity))
2670            }
2671        }
2672    }
2673}