Skip to main content

aetheris_server/
tick.rs

1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use crate::auth::AuthServiceImpl;
9use aetheris_protocol::error::EncodeError;
10use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
11use aetheris_protocol::reassembler::Reassembler;
12use aetheris_protocol::traits::{Encoder, GameTransport, WorldState};
13
14/// Manages the fixed-timestep execution of the game loop.
15#[derive(Debug)]
16pub struct TickScheduler {
17    tick_rate: u64,
18    current_tick: u64,
19    auth_service: AuthServiceImpl,
20
21    /// Maps `ClientId` -> (Session JTI, all owned `NetworkId`s)
22    /// Index 0 (if present) is always the session ship.
23    authenticated_clients: HashMap<
24        aetheris_protocol::types::ClientId,
25        (String, Vec<aetheris_protocol::types::NetworkId>),
26    >,
27    reassembler: Reassembler,
28    next_message_id: u32,
29}
30
31impl TickScheduler {
32    /// Creates a new scheduler with the specified tick rate.
33    #[must_use]
34    pub fn new(tick_rate: u64, auth_service: AuthServiceImpl) -> Self {
35        Self {
36            tick_rate,
37            current_tick: 0,
38            auth_service,
39            authenticated_clients: HashMap::new(),
40            reassembler: Reassembler::new(),
41            next_message_id: 0,
42        }
43    }
44
45    /// Runs the infinite game loop until the shutdown token is cancelled.
46    pub async fn run(
47        &mut self,
48        mut transport: Box<dyn GameTransport>,
49        mut world: Box<dyn WorldState>,
50        encoder: Box<dyn Encoder>,
51        mut shutdown: broadcast::Receiver<()>,
52    ) {
53        #[allow(clippy::cast_precision_loss)]
54        let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
55        let mut interval = interval(tick_duration);
56        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
57
58        // Pre-allocate buffer for Stage 5 to avoid per-tick allocations.
59        // Encoder's max_encoded_size is used as a safe upper bound.
60        let mut encode_buffer = vec![0u8; encoder.max_encoded_size()];
61
62        loop {
63            tokio::select! {
64                _ = interval.tick() => {
65                    let tick_num = self.current_tick;
66                    let start = Instant::now();
67                    self.tick_step(
68                        transport.as_mut(),
69                        world.as_mut(),
70                        encoder.as_ref(),
71                        &mut encode_buffer,
72                    )
73                    .instrument(info_span!("tick", tick = tick_num))
74                    .await;
75                    let elapsed = start.elapsed();
76
77                    metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
78                }
79                _ = shutdown.recv() => {
80                    tracing::info!("Server shutting down gracefully");
81                    break;
82                }
83            }
84        }
85    }
86
87    /// Executes a single 5-stage tick pipeline.
88    #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
89    pub async fn tick_step(
90        &mut self,
91        transport: &mut dyn GameTransport,
92        world: &mut dyn WorldState,
93        encoder: &dyn Encoder,
94        encode_buffer: &mut [u8],
95    ) {
96        let tick = self.current_tick;
97        // Pre-Stage: Advance the world change tick before any inputs are applied.
98        // This ensures entities spawned in Stage 2 receive a tick strictly greater than
99        // `last_extraction_tick`, which is required for Bevy 0.15+'s `is_changed` check.
100        // Without this, newly spawned entities share the same tick as `last_extraction_tick`
101        // and are silently skipped by `extract_deltas`, causing them to never be replicated.
102        world.advance_tick();
103
104        // Stage 1: Poll
105        let t1 = Instant::now();
106        let events = match transport
107            .poll_events()
108            .instrument(debug_span!("stage1_poll"))
109            .await
110        {
111            Ok(e) => e,
112            Err(e) => {
113                error!(error = ?e, "Fatal transport error during poll; skipping tick");
114                return;
115            }
116        };
117        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
118            .record(t1.elapsed().as_secs_f64());
119
120        let inbound_count: u64 = events
121            .iter()
122            .filter(|e| {
123                matches!(
124                    e,
125                    NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
126                )
127            })
128            .count() as u64;
129        metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
130
131        // Periodic Session Validation (every 60 ticks / ~1s)
132        if tick.is_multiple_of(60) {
133            let mut to_remove = Vec::new();
134            for (&client_id, (jti, _)) in &self.authenticated_clients {
135                if !self.auth_service.is_session_authorized(jti, Some(tick)) {
136                    tracing::warn!(?client_id, "Session invalidated during periodic check");
137                    to_remove.push(client_id);
138                }
139            }
140            for client_id in to_remove {
141                if let Some((_, network_ids)) = self.authenticated_clients.remove(&client_id) {
142                    for network_id in network_ids {
143                        let _ = world.despawn_networked(network_id);
144                    }
145                }
146                metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
147            }
148        }
149
150        // Stage 2: Apply
151        let t2 = Instant::now();
152        let mut pong_responses = None;
153        let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
154        if !events.is_empty() {
155            let _span = debug_span!("stage2_apply", count = events.len()).entered();
156            let mut updates = Vec::with_capacity(events.len());
157            for event in events {
158                // Stage 2.1: Reassembly & Normalization
159                let (client_id, raw_data, is_message) = match event {
160                    NetworkEvent::Fragment {
161                        client_id,
162                        fragment,
163                    } => {
164                        if let Some(data) = self.reassembler.ingest(client_id, fragment) {
165                            (client_id, data, true)
166                        } else {
167                            continue;
168                        }
169                    }
170                    NetworkEvent::UnreliableMessage { data, client_id }
171                    | NetworkEvent::ReliableMessage { data, client_id } => {
172                        // Try to decode as a protocol fragment first
173                        if let Ok(NetworkEvent::Fragment { fragment, .. }) =
174                            encoder.decode_event(&data)
175                        {
176                            if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
177                            {
178                                (client_id, reassembled, true)
179                            } else {
180                                continue;
181                            }
182                        } else {
183                            (client_id, data, true)
184                        }
185                    }
186                    NetworkEvent::ClientConnected(id) => {
187                        metrics::gauge!("aetheris_connected_clients").increment(1.0);
188                        tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
189                        (id, Vec::new(), false)
190                    }
191                    NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
192                        metrics::gauge!("aetheris_connected_clients").decrement(1.0);
193                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
194                            for network_id in network_ids {
195                                let _ = world.despawn_networked(network_id);
196                            }
197                        }
198                        tracing::info!(client_id = ?id, "Client disconnected");
199                        (id, Vec::new(), false)
200                    }
201                    NetworkEvent::SessionClosed(id) => {
202                        metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
203                        .increment(1);
204                        tracing::warn!(client_id = ?id, "WebTransport session closed");
205                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
206                            for network_id in network_ids {
207                                let _ = world.despawn_networked(network_id);
208                            }
209                        }
210                        (id, Vec::new(), false)
211                    }
212                    NetworkEvent::StreamReset(id) => {
213                        metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
214                        .increment(1);
215                        tracing::error!(client_id = ?id, "WebTransport stream reset");
216                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
217                            for network_id in network_ids {
218                                let _ = world.despawn_networked(network_id);
219                            }
220                        }
221                        (id, Vec::new(), false)
222                    }
223                    NetworkEvent::Ping { client_id, tick } => {
224                        if self.authenticated_clients.contains_key(&client_id) {
225                            pong_responses.get_or_insert_with(Vec::new).push((
226                                client_id,
227                                tick,
228                                Instant::now(),
229                            ));
230                            metrics::counter!("aetheris_protocol_pings_received_total")
231                                .increment(1);
232                        }
233                        (client_id, Vec::new(), false)
234                    }
235                    NetworkEvent::ClearWorld { client_id, .. }
236                    | NetworkEvent::StartSession { client_id }
237                    | NetworkEvent::RequestSystemManifest { client_id }
238                    | NetworkEvent::GameEvent { client_id, .. }
239                    | NetworkEvent::StressTest { client_id, .. }
240                    | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
241                    NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
242                        (aetheris_protocol::types::ClientId(0), Vec::new(), false)
243                    }
244                };
245
246                if !is_message {
247                    continue;
248                }
249
250                // Stage 2.2: Auth & Protocol Decode
251                let jti = if let Some((jti, _)) = self.authenticated_clients.get(&client_id) {
252                    // Re-validate session on every message to refresh sliding window / catch revocation
253                    if !self.auth_service.is_session_authorized(jti, Some(tick)) {
254                        tracing::warn!(?client_id, "Session revoked; dropping client");
255                        if let Some((_, network_ids)) =
256                            self.authenticated_clients.remove(&client_id)
257                        {
258                            for network_id in network_ids {
259                                let _ = world.despawn_networked(network_id);
260                            }
261                        }
262                        metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
263                        continue;
264                    }
265                    jti
266                } else {
267                    // Client not authenticated yet; only accept Auth message
268                    match encoder.decode_event(&raw_data) {
269                        Ok(NetworkEvent::Auth { session_token }) => {
270                            tracing::info!(?client_id, "Auth message received");
271                            if let Some(jti) = self
272                                .auth_service
273                                .validate_and_get_jti(&session_token, Some(tick))
274                            {
275                                tracing::info!(?client_id, "Client authenticated successfully");
276
277                                self.authenticated_clients
278                                    .insert(client_id, (jti, Vec::new()));
279
280                                tracing::info!(
281                                    ?client_id,
282                                    "[Auth] Client authenticated — waiting for StartSession to spawn ship"
283                                );
284                                continue;
285                            }
286                            tracing::warn!(
287                                ?client_id,
288                                "Client failed authentication (token rejected)"
289                            );
290                        }
291                        Ok(other) => {
292                            tracing::warn!(
293                                ?client_id,
294                                variant = ?std::mem::discriminant(&other),
295                                bytes = raw_data.len(),
296                                "Unauthenticated client sent non-Auth event — discarding"
297                            );
298                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
299                        }
300                        Err(e) => {
301                            tracing::warn!(
302                                ?client_id,
303                                error = ?e,
304                                bytes = raw_data.len(),
305                                "Failed to decode message from unauthenticated client"
306                            );
307                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
308                        }
309                    }
310                    continue;
311                };
312
313                // Check if it's a protocol-level event first (Ping/Pong/etc)
314                if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
315                    match protocol_event {
316                        NetworkEvent::Ping { tick: p_tick, .. } => {
317                            pong_responses.get_or_insert_with(Vec::new).push((
318                                client_id,
319                                p_tick,
320                                Instant::now(),
321                            ));
322                            metrics::counter!("aetheris_protocol_pings_received_total")
323                                .increment(1);
324                        }
325                        NetworkEvent::Auth { .. } => {
326                            tracing::debug!(?client_id, "Client re-authenticating (ignored)");
327                        }
328                        NetworkEvent::StressTest { count, rotate, .. } => {
329                            tracing::info!(
330                                ?client_id,
331                                count,
332                                rotate,
333                                "StressTest event received from authenticated client"
334                            );
335                            if can_run_playground_command(jti) {
336                                // M10105 — Safety cap to prevent server-side resource exhaustion.
337                                const MAX_STRESS: u16 = 1000;
338                                let capped_count = count.min(MAX_STRESS);
339                                if count > MAX_STRESS {
340                                    tracing::warn!(
341                                        ?client_id,
342                                        count,
343                                        capped_count,
344                                        "Stress test count capped at limit"
345                                    );
346                                }
347
348                                tracing::info!(
349                                    ?client_id,
350                                    count = capped_count,
351                                    rotate,
352                                    "Stress test command executed"
353                                );
354                                world.stress_test(capped_count, rotate);
355                            } else {
356                                tracing::warn!(?client_id, "Unauthorized StressTest attempt");
357                                metrics::counter!("aetheris_unprivileged_packets_total")
358                                    .increment(1);
359                            }
360                        }
361                        NetworkEvent::Spawn {
362                            entity_type,
363                            x,
364                            y,
365                            rot,
366                            ..
367                        } => {
368                            if can_run_playground_command(jti) {
369                                let network_id =
370                                    world.spawn_kind_for(entity_type, x, y, rot, client_id);
371                                if let Some((_, network_ids)) =
372                                    self.authenticated_clients.get_mut(&client_id)
373                                {
374                                    network_ids.push(network_id);
375                                }
376
377                                tracing::info!(
378                                    ?client_id,
379                                    entity_type,
380                                    new_entity_id = network_id.0,
381                                    "[Spawn] Playground entity spawned — tracked for cleanup on disconnect"
382                                );
383                            } else {
384                                tracing::warn!(?client_id, "Unauthorized Spawn attempt");
385                                metrics::counter!("aetheris_unprivileged_packets_total")
386                                    .increment(1);
387                            }
388                        }
389                        NetworkEvent::StartSession { .. } => {
390                            // Only allow one session ship per client.
391                            let already_has_ship = self
392                                .authenticated_clients
393                                .get(&client_id)
394                                .is_some_and(|(_, ids)| !ids.is_empty());
395
396                            if already_has_ship {
397                                tracing::warn!(
398                                    ?client_id,
399                                    "StartSession ignored — client already has a session ship"
400                                );
401                            } else {
402                                let network_id =
403                                    world.spawn_session_ship(1, 0.0, 0.0, 0.0, client_id);
404                                if let Some((_, network_ids)) =
405                                    self.authenticated_clients.get_mut(&client_id)
406                                {
407                                    network_ids.push(network_id); // index 0 = session ship
408                                }
409
410                                world.queue_reliable_event(
411                                    Some(client_id),
412                                    aetheris_protocol::events::GameEvent::Possession { network_id },
413                                );
414
415                                tracing::info!(
416                                    ?client_id,
417                                    network_id = network_id.0,
418                                    "[StartSession] Session ship spawned — Possession sent"
419                                );
420                            }
421                        }
422                        NetworkEvent::ClearWorld { .. } => {
423                            if can_run_playground_command(jti) {
424                                tracing::info!(?client_id, "ClearWorld command executed");
425                                world.clear_world();
426                                // Reset the client's entity-ID tracking so that a subsequent
427                                // StartSession can spawn a new session ship.  Without this,
428                                // the "already_has_ship" guard blocks the next StartSession
429                                // even though all entities were just despawned.
430                                if let Some((_, ids)) =
431                                    self.authenticated_clients.get_mut(&client_id)
432                                {
433                                    ids.clear();
434                                }
435                                // Queue a reliable ClearWorld ack to send after this block.
436                                // EnteredSpan is !Send so we cannot .await inside this scope.
437                                // The ack arrives at the client AFTER stale in-flight datagrams,
438                                // guaranteeing a full entity flush (eliminates partial-clear race).
439                                clear_ack_targets.push(client_id);
440                            } else {
441                                tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
442                                metrics::counter!("aetheris_unprivileged_packets_total")
443                                    .increment(1);
444                            }
445                        }
446                        NetworkEvent::RequestSystemManifest { .. } => {
447                            let jti = if let Some((jti, _)) =
448                                self.authenticated_clients.get(&client_id)
449                            {
450                                jti
451                            } else {
452                                ""
453                            };
454
455                            let manifest = self.get_filtered_manifest(jti);
456                            world.queue_reliable_event(
457                                Some(client_id),
458                                aetheris_protocol::events::GameEvent::SystemManifest { manifest },
459                            );
460                        }
461                        _ => {
462                            tracing::trace!(?protocol_event, "Protocol event");
463                        }
464                    }
465                } else {
466                    // If it's not a protocol event, try to decode it as a game update
467                    match encoder.decode(&raw_data) {
468                        Ok(update) => updates.push((client_id, update)),
469                        Err(e) => {
470                            metrics::counter!("aetheris_decode_errors_total").increment(1);
471                            error!(
472                                error = ?e,
473                                size = raw_data.len(),
474                                "Failed to decode update (not a protocol event)"
475                            );
476                        }
477                    }
478                }
479            }
480            world.apply_updates(&updates);
481            self.reassembler.prune();
482        }
483        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
484            .record(t2.elapsed().as_secs_f64());
485
486        // Send ClearWorld acks (reliable) for any ClearWorld commands processed this tick.
487        // Sent after the EnteredSpan is dropped, since EnteredSpan is !Send.
488        // The reliable delivery guarantees the client sees this AFTER any stale in-flight
489        // unreliable datagrams, closing the partial-clear race condition.
490        for target in clear_ack_targets {
491            let ack = NetworkEvent::ClearWorld { client_id: target };
492            #[allow(clippy::collapsible_if)]
493            if let Ok(data) = encoder.encode_event(&ack) {
494                if let Err(e) = transport.send_reliable(target, &data).await {
495                    tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
496                }
497            }
498        }
499
500        // Send Pongs for all collected Pings.
501        // Use unreliable (datagram) so the reply travels the same path as the
502        // incoming Ping, and clients that only read datagrams can receive it.
503        if let Some(pongs) = pong_responses {
504            for (client_id, p_tick, received_at) in pongs {
505                let pong_event = NetworkEvent::Pong { tick: p_tick };
506                if let Ok(data) = encoder.encode_event(&pong_event) {
507                    // Measure server-side Pong dispatch time (encode + send).
508                    // This is NOT the full network RTT, but it captures the
509                    // server processing overhead between Ping receipt and Pong send.
510                    let dispatch_start = Instant::now();
511                    match transport.send_unreliable(client_id, &data).await {
512                        Ok(()) => {
513                            let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
514                            let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
515                            metrics::histogram!("aetheris_server_pong_dispatch_ms")
516                                .record(dispatch_ms);
517                            metrics::histogram!("aetheris_server_ping_hold_ms")
518                                .record(server_hold_ms);
519                        }
520                        Err(e) => {
521                            error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
522                        }
523                    }
524                }
525            }
526        }
527
528        // Stage 3: Simulate
529        let t3 = Instant::now();
530        {
531            let _span = debug_span!("stage3_simulate").entered();
532            // Simulation logic (physics, AI, game rules) happens here.
533            world.simulate();
534        }
535        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
536            .record(t3.elapsed().as_secs_f64());
537
538        // Stage 4: Extract
539        let t4 = Instant::now();
540        let (deltas, reliable_events) = {
541            let _span = debug_span!("stage4_extract").entered();
542            (world.extract_deltas(), world.extract_reliable_events())
543        };
544        // Reset ECS change-detection *after* extraction so simulate()'s mutations are visible.
545        world.post_extract();
546        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
547            .record(t4.elapsed().as_secs_f64());
548
549        // Stage 5: Encode & Send
550        let t5 = Instant::now();
551
552        // Stage 5.1: Send Reliable Events
553        for (target, wire_event) in reliable_events {
554            // Broadcast reliably to all authenticated clients if target is None
555            let targets: Vec<_> = if let Some(id) = target {
556                vec![id]
557            } else {
558                self.authenticated_clients.keys().copied().collect()
559            };
560
561            for id in targets {
562                let network_event = wire_event.clone().into_network_event(id);
563                match encoder.encode_event(&network_event) {
564                    Ok(data) => {
565                        if let Err(e) = transport.send_reliable(id, &data).await {
566                            error!(error = ?e, client_id = ?id, "Failed to send reliable event");
567                        }
568                    }
569                    Err(e) => {
570                        error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
571                    }
572                }
573            }
574        }
575
576        if !deltas.is_empty() {
577            let mut broadcast_count: u64 = 0;
578
579            let stage_span = debug_span!("stage5_send", count = deltas.len());
580            let _guard = stage_span.enter();
581
582            for delta in deltas {
583                let encode_result = encoder.encode(&delta, encode_buffer);
584                match encode_result {
585                    Ok(len) if len > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE => {
586                        let targets = Self::get_delta_targets(
587                            world,
588                            &self.authenticated_clients,
589                            delta.network_id,
590                        );
591                        match self
592                            .fragment_and_send(encode_buffer, len, &targets, encoder, transport)
593                            .await
594                        {
595                            Ok(count) => broadcast_count += count,
596                            Err(e) => error!(error = ?e, "Failed to fragment and broadcast delta"),
597                        }
598                    }
599                    Ok(len) => {
600                        let targets = Self::get_delta_targets(
601                            world,
602                            &self.authenticated_clients,
603                            delta.network_id,
604                        );
605                        if targets.is_empty() {
606                            if let Err(e) =
607                                transport.broadcast_unreliable(&encode_buffer[..len]).await
608                            {
609                                error!(error = ?e, "Failed to broadcast delta");
610                            } else {
611                                broadcast_count += 1;
612                            }
613                        } else {
614                            for target in targets {
615                                if let Err(e) = transport
616                                    .send_unreliable(target, &encode_buffer[..len])
617                                    .await
618                                {
619                                    error!(error = ?e, "Failed to send delta");
620                                } else {
621                                    broadcast_count += 1;
622                                }
623                            }
624                        }
625                    }
626                    Err(EncodeError::BufferOverflow {
627                        needed,
628                        available: _,
629                    }) => {
630                        let mut large_buffer = vec![0u8; needed];
631                        if let Ok(len) = encoder.encode(&delta, &mut large_buffer) {
632                            let targets = Self::get_delta_targets(
633                                world,
634                                &self.authenticated_clients,
635                                delta.network_id,
636                            );
637                            match self
638                                .fragment_and_send(&large_buffer, len, &targets, encoder, transport)
639                                .await
640                            {
641                                Ok(count) => broadcast_count += count,
642                                Err(e) => {
643                                    error!(error = ?e, "Failed to fragment and broadcast large delta");
644                                }
645                            }
646                        } else {
647                            error!("Failed to encode into large scratch buffer");
648                        }
649                    }
650                    Err(e) => {
651                        metrics::counter!("aetheris_encode_errors_total").increment(1);
652                        error!(
653                            network_id = ?delta.network_id,
654                            error = ?e,
655                            "Failed to encode delta"
656                        );
657                    }
658                }
659            }
660            metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
661            metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
662        }
663        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
664            .record(t5.elapsed().as_secs_f64());
665
666        // Stage 6: Finalize
667        self.current_tick += 1;
668    }
669
670    fn get_delta_targets(
671        world: &mut dyn WorldState,
672        clients: &HashMap<
673            aetheris_protocol::types::ClientId,
674            (String, Vec<aetheris_protocol::types::NetworkId>),
675        >,
676        entity_id: aetheris_protocol::types::NetworkId,
677    ) -> Vec<aetheris_protocol::types::ClientId> {
678        if let Some(room_id) = world.get_entity_room(entity_id) {
679            let mut targets = Vec::new();
680            for &client_id in clients.keys() {
681                if world.get_client_room(client_id) == Some(room_id) {
682                    targets.push(client_id);
683                }
684            }
685            targets
686        } else {
687            Vec::new() // Empty means broadcast
688        }
689    }
690
691    async fn fragment_and_send(
692        &mut self,
693        data: &[u8],
694        len: usize,
695        targets: &[aetheris_protocol::types::ClientId],
696        encoder: &dyn Encoder,
697        transport: &dyn GameTransport,
698    ) -> Result<u64, EncodeError> {
699        let message_id = self.next_message_id;
700        self.next_message_id = self.next_message_id.wrapping_add(1);
701
702        let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
703        let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
704
705        let Ok(total_fragments) = u16::try_from(chunks.len()) else {
706            error!(
707                message_id,
708                chunks = chunks.len(),
709                "Too many fragments required for message; dropping payload"
710            );
711            return Err(EncodeError::Io(std::io::Error::new(
712                std::io::ErrorKind::InvalidData,
713                "Too many fragments",
714            )));
715        };
716
717        let mut sent_count = 0;
718        for (i, chunk) in chunks.into_iter().enumerate() {
719            let Ok(fragment_index) = u16::try_from(i) else {
720                error!(message_id, index = i, "Fragment index overflow; stopping");
721                break;
722            };
723
724            let fragment = FragmentedEvent {
725                message_id,
726                fragment_index,
727                total_fragments,
728                payload: chunk.to_vec(),
729            };
730            let fragment_event = NetworkEvent::Fragment {
731                client_id: aetheris_protocol::types::ClientId(0),
732                fragment,
733            };
734
735            match encoder.encode_event(&fragment_event) {
736                Ok(encoded_fragment) => {
737                    if targets.is_empty() {
738                        if let Err(e) = transport.broadcast_unreliable(&encoded_fragment).await {
739                            error!(error = ?e, "Failed to broadcast fragment");
740                        } else {
741                            sent_count += 1;
742                        }
743                    } else {
744                        for &target in targets {
745                            if let Err(e) =
746                                transport.send_unreliable(target, &encoded_fragment).await
747                            {
748                                error!(error = ?e, "Failed to send fragment");
749                            } else {
750                                sent_count += 1;
751                            }
752                        }
753                    }
754                }
755                Err(e) => {
756                    error!(error = ?e, "Failed to encode fragment event");
757                }
758            }
759        }
760
761        Ok(sent_count)
762    }
763
764    fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
765        let mut manifest = BTreeMap::new();
766        manifest.insert(
767            "version_server".to_string(),
768            env!("CARGO_PKG_VERSION").to_string(),
769        );
770        manifest.insert(
771            "version_protocol".to_string(),
772            aetheris_protocol::VERSION.to_string(),
773        );
774
775        if can_run_playground_command(jti) {
776            manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
777            manifest.insert(
778                "clients_active".to_string(),
779                self.authenticated_clients.len().to_string(),
780            );
781        }
782        manifest
783    }
784}
785
786/// Validates if a session (identified by its JTI) is authorized to run destructive playground commands.
787///
788/// In Phase 1, this uses a simplified check against the 'admin' JTI used in development.
789/// In Phase 3, this will be tied to the account's permission level.
790fn can_run_playground_command(jti: &str) -> bool {
791    // Current dev credential in Aetheris Playground always generates jti="admin"
792    // Fail closed: AETHERIS_ENV must be explicitly set to "dev"; absence is not treated as dev.
793    jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
794}