Skip to main content

aetheris_server/
tick.rs

1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use crate::auth::AuthServiceImpl;
9use aetheris_protocol::error::EncodeError;
10use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
11use aetheris_protocol::reassembler::Reassembler;
12use aetheris_protocol::traits::{Encoder, GameTransport, WorldState};
13
14/// Manages the fixed-timestep execution of the game loop.
15#[derive(Debug)]
16pub struct TickScheduler {
17    tick_rate: u64,
18    current_tick: u64,
19    auth_service: AuthServiceImpl,
20
21    /// Maps `ClientId` -> (Session JTI, all owned `NetworkId`s)
22    /// Index 0 (if present) is always the session ship.
23    authenticated_clients: HashMap<
24        aetheris_protocol::types::ClientId,
25        (String, Vec<aetheris_protocol::types::NetworkId>),
26    >,
27    /// Tracks when each client was successfully authenticated.
28    /// Used to record `aetheris_session_start_latency_seconds` — the server-side
29    /// time from auth validation to Possession dispatch. See A-08 in
30    /// `performance/runs/20260422_101553/ACTIONS.md`.
31    auth_timestamps: HashMap<aetheris_protocol::types::ClientId, Instant>,
32    reassembler: Reassembler,
33    next_message_id: u32,
34}
35
36impl TickScheduler {
37    /// Creates a new scheduler with the specified tick rate.
38    #[must_use]
39    pub fn new(tick_rate: u64, auth_service: AuthServiceImpl) -> Self {
40        Self {
41            tick_rate,
42            current_tick: 0,
43            auth_service,
44            authenticated_clients: HashMap::new(),
45            auth_timestamps: HashMap::new(),
46            reassembler: Reassembler::new(),
47            next_message_id: 0,
48        }
49    }
50
51    /// Runs the infinite game loop until the shutdown token is cancelled.
52    pub async fn run(
53        &mut self,
54        mut transport: Box<dyn GameTransport>,
55        mut world: Box<dyn WorldState>,
56        encoder: Box<dyn Encoder>,
57        mut shutdown: broadcast::Receiver<()>,
58    ) {
59        #[allow(clippy::cast_precision_loss)]
60        let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
61        let mut interval = interval(tick_duration);
62        // Use Delay so that a slow tick shifts the next deadline rather than
63        // firing immediately (Burst) or silently skipping it (Skip). This keeps
64        // the effective tick rate at the configured target instead of running at
65        // whatever rate the hardware allows. See A-07 in performance/runs/20260422_092931/.
66        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
67
68        // Pre-allocate buffer for Stage 5 to avoid per-tick allocations.
69        // Encoder's max_encoded_size is used as a safe upper bound.
70        let mut encode_buffer = vec![0u8; encoder.max_encoded_size()];
71
72        let mut last_tick_wall = Instant::now();
73
74        loop {
75            tokio::select! {
76                _ = interval.tick() => {
77                    let tick_num = self.current_tick;
78                    let start = Instant::now();
79
80                    // Wall-clock tick rate: measured from the previous tick start.
81                    let wall_elapsed = start.duration_since(last_tick_wall);
82                    if wall_elapsed.as_secs_f64() > 0.0 {
83                        metrics::gauge!("aetheris_actual_tick_rate_hz")
84                            .set(1.0 / wall_elapsed.as_secs_f64());
85                    }
86                    last_tick_wall = start;
87
88                    self.tick_step(
89                        transport.as_mut(),
90                        world.as_mut(),
91                        encoder.as_ref(),
92                        &mut encode_buffer,
93                    )
94                    .instrument(info_span!("tick", tick = tick_num))
95                    .await;
96                    let elapsed = start.elapsed();
97
98                    metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
99                }
100                _ = shutdown.recv() => {
101                    tracing::info!("Server shutting down gracefully");
102                    break;
103                }
104            }
105        }
106    }
107
108    /// Executes a single 5-stage tick pipeline.
109    #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
110    pub async fn tick_step(
111        &mut self,
112        transport: &mut dyn GameTransport,
113        world: &mut dyn WorldState,
114        encoder: &dyn Encoder,
115        encode_buffer: &mut [u8],
116    ) {
117        let tick = self.current_tick;
118        // Pre-Stage: Advance the world change tick before any inputs are applied.
119        // This ensures entities spawned in Stage 2 receive a tick strictly greater than
120        // `last_extraction_tick`, which is required for Bevy 0.15+'s `is_changed` check.
121        // Without this, newly spawned entities share the same tick as `last_extraction_tick`
122        // and are silently skipped by `extract_deltas`, causing them to never be replicated.
123        world.advance_tick();
124
125        // Stage 1: Poll
126        let t1 = Instant::now();
127        let events = match transport
128            .poll_events()
129            .instrument(debug_span!("stage1_poll"))
130            .await
131        {
132            Ok(e) => e,
133            Err(e) => {
134                error!(error = ?e, "Fatal transport error during poll; skipping tick");
135                return;
136            }
137        };
138        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
139            .record(t1.elapsed().as_secs_f64());
140
141        let inbound_count: u64 = events
142            .iter()
143            .filter(|e| {
144                matches!(
145                    e,
146                    NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
147                )
148            })
149            .count() as u64;
150        metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
151
152        // Periodic Session Validation (every 60 ticks / ~1s)
153        if tick.is_multiple_of(60) {
154            let mut to_remove = Vec::new();
155            for (&client_id, (jti, _)) in &self.authenticated_clients {
156                if !self.auth_service.is_session_authorized(jti, Some(tick)) {
157                    tracing::warn!(?client_id, "Session invalidated during periodic check");
158                    to_remove.push(client_id);
159                }
160            }
161            for client_id in to_remove {
162                if let Some((_, network_ids)) = self.authenticated_clients.remove(&client_id) {
163                    for network_id in network_ids {
164                        let _ = world.despawn_networked(network_id);
165                    }
166                }
167                self.auth_timestamps.remove(&client_id);
168                metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
169            }
170        }
171
172        // Stage 2: Apply
173        let t2 = Instant::now();
174        let mut pong_responses = None;
175        let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
176        if !events.is_empty() {
177            let _span = debug_span!("stage2_apply", count = events.len()).entered();
178            let mut updates = Vec::with_capacity(events.len());
179            for event in events {
180                // Stage 2.1: Reassembly & Normalization
181                let (client_id, raw_data, is_message) = match event {
182                    NetworkEvent::Fragment {
183                        client_id,
184                        fragment,
185                    } => {
186                        if let Some(data) = self.reassembler.ingest(client_id, fragment) {
187                            (client_id, data, true)
188                        } else {
189                            continue;
190                        }
191                    }
192                    NetworkEvent::UnreliableMessage { data, client_id }
193                    | NetworkEvent::ReliableMessage { data, client_id } => {
194                        // Try to decode as a protocol fragment first
195                        if let Ok(NetworkEvent::Fragment { fragment, .. }) =
196                            encoder.decode_event(&data)
197                        {
198                            if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
199                            {
200                                (client_id, reassembled, true)
201                            } else {
202                                continue;
203                            }
204                        } else {
205                            (client_id, data, true)
206                        }
207                    }
208                    NetworkEvent::ClientConnected(id) => {
209                        metrics::gauge!("aetheris_connected_clients").increment(1.0);
210                        tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
211                        (id, Vec::new(), false)
212                    }
213                    NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
214                        metrics::gauge!("aetheris_connected_clients").decrement(1.0);
215                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
216                            for network_id in network_ids {
217                                let _ = world.despawn_networked(network_id);
218                            }
219                        }
220                        self.auth_timestamps.remove(&id);
221                        tracing::info!(client_id = ?id, "Client disconnected");
222                        (id, Vec::new(), false)
223                    }
224                    NetworkEvent::SessionClosed(id) => {
225                        metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
226                        .increment(1);
227                        tracing::warn!(client_id = ?id, "WebTransport session closed");
228                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
229                            for network_id in network_ids {
230                                let _ = world.despawn_networked(network_id);
231                            }
232                        }
233                        self.auth_timestamps.remove(&id);
234                        (id, Vec::new(), false)
235                    }
236                    NetworkEvent::StreamReset(id) => {
237                        metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
238                        .increment(1);
239                        tracing::error!(client_id = ?id, "WebTransport stream reset");
240                        if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
241                            for network_id in network_ids {
242                                let _ = world.despawn_networked(network_id);
243                            }
244                        }
245                        self.auth_timestamps.remove(&id);
246                        (id, Vec::new(), false)
247                    }
248                    NetworkEvent::Ping { client_id, tick } => {
249                        if self.authenticated_clients.contains_key(&client_id) {
250                            pong_responses.get_or_insert_with(Vec::new).push((
251                                client_id,
252                                tick,
253                                Instant::now(),
254                            ));
255                            metrics::counter!("aetheris_protocol_pings_received_total")
256                                .increment(1);
257                        }
258                        (client_id, Vec::new(), false)
259                    }
260                    NetworkEvent::ClearWorld { client_id, .. }
261                    | NetworkEvent::StartSession { client_id }
262                    | NetworkEvent::RequestSystemManifest { client_id }
263                    | NetworkEvent::GameEvent { client_id, .. }
264                    | NetworkEvent::StressTest { client_id, .. }
265                    | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
266                    NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
267                        (aetheris_protocol::types::ClientId(0), Vec::new(), false)
268                    }
269                };
270
271                if !is_message {
272                    continue;
273                }
274
275                // Stage 2.2: Auth & Protocol Decode
276                let jti = if let Some((jti, _)) = self.authenticated_clients.get(&client_id) {
277                    // Re-validate session on every message to refresh sliding window / catch revocation
278                    if !self.auth_service.is_session_authorized(jti, Some(tick)) {
279                        tracing::warn!(?client_id, "Session revoked; dropping client");
280                        if let Some((_, network_ids)) =
281                            self.authenticated_clients.remove(&client_id)
282                        {
283                            for network_id in network_ids {
284                                let _ = world.despawn_networked(network_id);
285                            }
286                        }
287                        self.auth_timestamps.remove(&client_id);
288                        metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
289                        continue;
290                    }
291                    jti
292                } else {
293                    // Client not authenticated yet; only accept Auth message
294                    match encoder.decode_event(&raw_data) {
295                        Ok(NetworkEvent::Auth { session_token }) => {
296                            tracing::info!(?client_id, "Auth message received");
297                            if let Some(jti) = self
298                                .auth_service
299                                .validate_and_get_jti(&session_token, Some(tick))
300                            {
301                                tracing::info!(?client_id, "Client authenticated successfully");
302
303                                self.authenticated_clients
304                                    .insert(client_id, (jti, Vec::new()));
305                                // Record when auth completed so we can measure server-side
306                                // possession latency (A-08 profiling metric).
307                                self.auth_timestamps.insert(client_id, Instant::now());
308
309                                tracing::info!(
310                                    ?client_id,
311                                    "[Auth] Client authenticated — waiting for StartSession to spawn ship"
312                                );
313                                continue;
314                            }
315                            tracing::warn!(
316                                ?client_id,
317                                "Client failed authentication (token rejected)"
318                            );
319                        }
320                        Ok(other) => {
321                            tracing::warn!(
322                                ?client_id,
323                                variant = ?std::mem::discriminant(&other),
324                                bytes = raw_data.len(),
325                                "Unauthenticated client sent non-Auth event — discarding"
326                            );
327                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
328                        }
329                        Err(e) => {
330                            tracing::warn!(
331                                ?client_id,
332                                error = ?e,
333                                bytes = raw_data.len(),
334                                "Failed to decode message from unauthenticated client"
335                            );
336                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
337                        }
338                    }
339                    continue;
340                };
341
342                // Check if it's a protocol-level event first (Ping/Pong/etc)
343                if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
344                    match protocol_event {
345                        NetworkEvent::Ping { tick: p_tick, .. } => {
346                            pong_responses.get_or_insert_with(Vec::new).push((
347                                client_id,
348                                p_tick,
349                                Instant::now(),
350                            ));
351                            metrics::counter!("aetheris_protocol_pings_received_total")
352                                .increment(1);
353                        }
354                        NetworkEvent::Auth { .. } => {
355                            tracing::debug!(?client_id, "Client re-authenticating (ignored)");
356                        }
357                        NetworkEvent::StressTest { count, rotate, .. } => {
358                            tracing::info!(
359                                ?client_id,
360                                count,
361                                rotate,
362                                "StressTest event received from authenticated client"
363                            );
364                            if can_run_playground_command(jti) {
365                                // M10105 — Safety cap to prevent server-side resource exhaustion.
366                                const MAX_STRESS: u16 = 1000;
367                                let capped_count = count.min(MAX_STRESS);
368                                if count > MAX_STRESS {
369                                    tracing::warn!(
370                                        ?client_id,
371                                        count,
372                                        capped_count,
373                                        "Stress test count capped at limit"
374                                    );
375                                }
376
377                                tracing::info!(
378                                    ?client_id,
379                                    count = capped_count,
380                                    rotate,
381                                    "Stress test command executed"
382                                );
383                                world.stress_test(capped_count, rotate);
384                            } else {
385                                tracing::warn!(?client_id, "Unauthorized StressTest attempt");
386                                metrics::counter!("aetheris_unprivileged_packets_total")
387                                    .increment(1);
388                            }
389                        }
390                        NetworkEvent::Spawn {
391                            entity_type,
392                            x,
393                            y,
394                            rot,
395                            ..
396                        } => {
397                            if can_run_playground_command(jti) {
398                                let network_id =
399                                    world.spawn_kind_for(entity_type, x, y, rot, client_id);
400                                if let Some((_, network_ids)) =
401                                    self.authenticated_clients.get_mut(&client_id)
402                                {
403                                    network_ids.push(network_id);
404                                }
405
406                                tracing::info!(
407                                    ?client_id,
408                                    entity_type,
409                                    new_entity_id = network_id.0,
410                                    "[Spawn] Playground entity spawned — tracked for cleanup on disconnect"
411                                );
412                            } else {
413                                tracing::warn!(?client_id, "Unauthorized Spawn attempt");
414                                metrics::counter!("aetheris_unprivileged_packets_total")
415                                    .increment(1);
416                            }
417                        }
418                        NetworkEvent::StartSession { .. } => {
419                            // Only allow one session ship per client.
420                            let already_has_ship = self
421                                .authenticated_clients
422                                .get(&client_id)
423                                .is_some_and(|(_, ids)| !ids.is_empty());
424
425                            if already_has_ship {
426                                tracing::warn!(
427                                    ?client_id,
428                                    "StartSession ignored — client already has a session ship"
429                                );
430                            } else {
431                                let network_id =
432                                    world.spawn_session_ship(1, 0.0, 0.0, 0.0, client_id);
433                                if let Some((_, network_ids)) =
434                                    self.authenticated_clients.get_mut(&client_id)
435                                {
436                                    network_ids.push(network_id); // index 0 = session ship
437                                }
438
439                                world.queue_reliable_event(
440                                    Some(client_id),
441                                    aetheris_protocol::events::GameEvent::Possession { network_id },
442                                );
443
444                                // Record server-side auth→possession latency (A-08 profiling).
445                                // This measures only the server cost (spawn + event queue) after
446                                // auth validation — not the client-observed round-trip time.
447                                // If this histogram shows values near zero the 6 ms stretch miss
448                                // in Time-to-Possess P99 is attributable to protocol handshake
449                                // and tick-scheduling jitter, not server processing.
450                                if let Some(auth_ts) = self.auth_timestamps.remove(&client_id) {
451                                    metrics::histogram!("aetheris_session_start_latency_seconds")
452                                        .record(auth_ts.elapsed().as_secs_f64());
453                                }
454
455                                tracing::info!(
456                                    ?client_id,
457                                    network_id = network_id.0,
458                                    "[StartSession] Session ship spawned — Possession sent"
459                                );
460                            }
461                        }
462                        NetworkEvent::ClearWorld { .. } => {
463                            if can_run_playground_command(jti) {
464                                tracing::info!(?client_id, "ClearWorld command executed");
465                                world.clear_world();
466                                // Reset the client's entity-ID tracking so that a subsequent
467                                // StartSession can spawn a new session ship.  Without this,
468                                // the "already_has_ship" guard blocks the next StartSession
469                                // even though all entities were just despawned.
470                                if let Some((_, ids)) =
471                                    self.authenticated_clients.get_mut(&client_id)
472                                {
473                                    ids.clear();
474                                }
475                                // Queue a reliable ClearWorld ack to send after this block.
476                                // EnteredSpan is !Send so we cannot .await inside this scope.
477                                // The ack arrives at the client AFTER stale in-flight datagrams,
478                                // guaranteeing a full entity flush (eliminates partial-clear race).
479                                clear_ack_targets.push(client_id);
480                            } else {
481                                tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
482                                metrics::counter!("aetheris_unprivileged_packets_total")
483                                    .increment(1);
484                            }
485                        }
486                        NetworkEvent::RequestSystemManifest { .. } => {
487                            let jti = if let Some((jti, _)) =
488                                self.authenticated_clients.get(&client_id)
489                            {
490                                jti
491                            } else {
492                                ""
493                            };
494
495                            let manifest = self.get_filtered_manifest(jti);
496                            world.queue_reliable_event(
497                                Some(client_id),
498                                aetheris_protocol::events::GameEvent::SystemManifest { manifest },
499                            );
500                        }
501                        _ => {
502                            tracing::trace!(?protocol_event, "Protocol event");
503                        }
504                    }
505                } else {
506                    // If it's not a protocol event, try to decode it as a game update
507                    match encoder.decode(&raw_data) {
508                        Ok(update) => updates.push((client_id, update)),
509                        Err(e) => {
510                            metrics::counter!("aetheris_decode_errors_total").increment(1);
511                            error!(
512                                error = ?e,
513                                size = raw_data.len(),
514                                "Failed to decode update (not a protocol event)"
515                            );
516                        }
517                    }
518                }
519            }
520            world.apply_updates(&updates);
521            self.reassembler.prune();
522        }
523        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
524            .record(t2.elapsed().as_secs_f64());
525
526        // Send ClearWorld acks (reliable) for any ClearWorld commands processed this tick.
527        // Sent after the EnteredSpan is dropped, since EnteredSpan is !Send.
528        // The reliable delivery guarantees the client sees this AFTER any stale in-flight
529        // unreliable datagrams, closing the partial-clear race condition.
530        for target in clear_ack_targets {
531            let ack = NetworkEvent::ClearWorld { client_id: target };
532            #[allow(clippy::collapsible_if)]
533            if let Ok(data) = encoder.encode_event(&ack) {
534                if let Err(e) = transport.send_reliable(target, &data).await {
535                    tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
536                }
537            }
538        }
539
540        // Send Pongs for all collected Pings.
541        // Use unreliable (datagram) so the reply travels the same path as the
542        // incoming Ping, and clients that only read datagrams can receive it.
543        if let Some(pongs) = pong_responses {
544            for (client_id, p_tick, received_at) in pongs {
545                let pong_event = NetworkEvent::Pong { tick: p_tick };
546                if let Ok(data) = encoder.encode_event(&pong_event) {
547                    // Measure server-side Pong dispatch time (encode + send).
548                    // This is NOT the full network RTT, but it captures the
549                    // server processing overhead between Ping receipt and Pong send.
550                    let dispatch_start = Instant::now();
551                    match transport.send_unreliable(client_id, &data).await {
552                        Ok(()) => {
553                            let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
554                            let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
555                            metrics::histogram!("aetheris_server_pong_dispatch_ms")
556                                .record(dispatch_ms);
557                            metrics::histogram!("aetheris_server_ping_hold_ms")
558                                .record(server_hold_ms);
559                        }
560                        Err(e) => {
561                            error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
562                        }
563                    }
564                }
565            }
566        }
567
568        // Stage 3: Simulate
569        let t3 = Instant::now();
570        {
571            let _span = debug_span!("stage3_simulate").entered();
572            // Simulation logic (physics, AI, game rules) happens here.
573            world.simulate();
574        }
575        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
576            .record(t3.elapsed().as_secs_f64());
577
578        // Stage 4: Extract
579        let t4 = Instant::now();
580        let (deltas, reliable_events) = {
581            let _span = debug_span!("stage4_extract").entered();
582            (world.extract_deltas(), world.extract_reliable_events())
583        };
584        // Reset ECS change-detection *after* extraction so simulate()'s mutations are visible.
585        world.post_extract();
586        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
587            .record(t4.elapsed().as_secs_f64());
588
589        // Stage 5: Encode & Send
590        let t5 = Instant::now();
591
592        // Stage 5.1: Send Reliable Events
593        for (target, wire_event) in reliable_events {
594            // Broadcast reliably to all authenticated clients if target is None
595            let targets: Vec<_> = if let Some(id) = target {
596                vec![id]
597            } else {
598                self.authenticated_clients.keys().copied().collect()
599            };
600
601            for id in targets {
602                let network_event = wire_event.clone().into_network_event(id);
603                match encoder.encode_event(&network_event) {
604                    Ok(data) => {
605                        if let Err(e) = transport.send_reliable(id, &data).await {
606                            error!(error = ?e, client_id = ?id, "Failed to send reliable event");
607                        }
608                    }
609                    Err(e) => {
610                        error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
611                    }
612                }
613            }
614        }
615
616        if !deltas.is_empty() {
617            let mut broadcast_count: u64 = 0;
618
619            let stage_span = debug_span!("stage5_send", count = deltas.len());
620            let _guard = stage_span.enter();
621
622            for delta in deltas {
623                let encode_result = encoder.encode(&delta, encode_buffer);
624                match encode_result {
625                    Ok(len) if len > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE => {
626                        let targets = Self::get_delta_targets(
627                            world,
628                            &self.authenticated_clients,
629                            delta.network_id,
630                        );
631                        match self
632                            .fragment_and_send(encode_buffer, len, &targets, encoder, transport)
633                            .await
634                        {
635                            Ok(count) => broadcast_count += count,
636                            Err(e) => error!(error = ?e, "Failed to fragment and broadcast delta"),
637                        }
638                    }
639                    Ok(len) => {
640                        let targets = Self::get_delta_targets(
641                            world,
642                            &self.authenticated_clients,
643                            delta.network_id,
644                        );
645                        if targets.is_empty() {
646                            for &client_id in self.authenticated_clients.keys() {
647                                if let Err(e) = transport
648                                    .send_unreliable(client_id, &encode_buffer[..len])
649                                    .await
650                                {
651                                    error!(error = ?e, client = ?client_id, "Failed to send delta");
652                                } else {
653                                    broadcast_count += 1;
654                                }
655                            }
656                        } else if targets.len() == self.authenticated_clients.len() {
657                            // A-05: Phase 1 single-room broadcast short-circuit.
658                            //
659                            // SEMANTICS NOTE — broadcast_count:
660                            //   We iterate authenticated_clients and send individually to ensure
661                            //   unauthenticated connections do not receive data (broadcast_unreliable
662                            //   would send to everyone). broadcast_count is incremented once
663                            //   per successful per-client send (so +N for N clients).
664                            //   This ensures metric semantics are preserved.
665                            //
666                            //   The metric counts *dispatch calls* in this branch, which now
667                            //   equals the number of recipients.
668                            //
669                            //   ⚠ MUST REVERT before AoI / multi-room lands (Phase 2+).
670                            //   When AoI introduces per-room filtering, `targets` will be a
671                            //   strict subset of `authenticated_clients`. This branch should be
672                            //   removed or updated to handle true AoI-based multicasting.
673                            for &client_id in self.authenticated_clients.keys() {
674                                if let Err(e) = transport
675                                    .send_unreliable(client_id, &encode_buffer[..len])
676                                    .await
677                                {
678                                    error!(error = ?e, client = ?client_id, "Failed to send delta");
679                                } else {
680                                    broadcast_count += 1;
681                                }
682                            }
683                        } else {
684                            for target in targets {
685                                if let Err(e) = transport
686                                    .send_unreliable(target, &encode_buffer[..len])
687                                    .await
688                                {
689                                    error!(error = ?e, "Failed to send delta");
690                                } else {
691                                    broadcast_count += 1;
692                                }
693                            }
694                        }
695                    }
696                    Err(EncodeError::BufferOverflow {
697                        needed,
698                        available: _,
699                    }) => {
700                        let mut large_buffer = vec![0u8; needed];
701                        if let Ok(len) = encoder.encode(&delta, &mut large_buffer) {
702                            let targets = Self::get_delta_targets(
703                                world,
704                                &self.authenticated_clients,
705                                delta.network_id,
706                            );
707                            match self
708                                .fragment_and_send(&large_buffer, len, &targets, encoder, transport)
709                                .await
710                            {
711                                Ok(count) => broadcast_count += count,
712                                Err(e) => {
713                                    error!(error = ?e, "Failed to fragment and broadcast large delta");
714                                }
715                            }
716                        } else {
717                            error!("Failed to encode into large scratch buffer");
718                        }
719                    }
720                    Err(e) => {
721                        metrics::counter!("aetheris_encode_errors_total").increment(1);
722                        error!(
723                            network_id = ?delta.network_id,
724                            error = ?e,
725                            "Failed to encode delta"
726                        );
727                    }
728                }
729            }
730            metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
731            metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
732        }
733        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
734            .record(t5.elapsed().as_secs_f64());
735
736        // Stage 6: Finalize
737        self.current_tick += 1;
738    }
739
740    fn get_delta_targets(
741        world: &mut dyn WorldState,
742        clients: &HashMap<
743            aetheris_protocol::types::ClientId,
744            (String, Vec<aetheris_protocol::types::NetworkId>),
745        >,
746        entity_id: aetheris_protocol::types::NetworkId,
747    ) -> Vec<aetheris_protocol::types::ClientId> {
748        if let Some(room_id) = world.get_entity_room(entity_id) {
749            let mut targets = Vec::new();
750            for &client_id in clients.keys() {
751                if world.get_client_room(client_id) == Some(room_id) {
752                    targets.push(client_id);
753                }
754            }
755            targets
756        } else {
757            Vec::new() // Empty means broadcast
758        }
759    }
760
761    async fn fragment_and_send(
762        &mut self,
763        data: &[u8],
764        len: usize,
765        targets: &[aetheris_protocol::types::ClientId],
766        encoder: &dyn Encoder,
767        transport: &dyn GameTransport,
768    ) -> Result<u64, EncodeError> {
769        let message_id = self.next_message_id;
770        self.next_message_id = self.next_message_id.wrapping_add(1);
771
772        let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
773        let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
774
775        let Ok(total_fragments) = u16::try_from(chunks.len()) else {
776            error!(
777                message_id,
778                chunks = chunks.len(),
779                "Too many fragments required for message; dropping payload"
780            );
781            return Err(EncodeError::Io(std::io::Error::new(
782                std::io::ErrorKind::InvalidData,
783                "Too many fragments",
784            )));
785        };
786
787        let mut sent_count = 0;
788        for (i, chunk) in chunks.into_iter().enumerate() {
789            let Ok(fragment_index) = u16::try_from(i) else {
790                error!(message_id, index = i, "Fragment index overflow; stopping");
791                break;
792            };
793
794            let fragment = FragmentedEvent {
795                message_id,
796                fragment_index,
797                total_fragments,
798                payload: chunk.to_vec(),
799            };
800            let fragment_event = NetworkEvent::Fragment {
801                client_id: aetheris_protocol::types::ClientId(0),
802                fragment,
803            };
804
805            match encoder.encode_event(&fragment_event) {
806                Ok(encoded_fragment) => {
807                    if targets.is_empty() {
808                        if let Err(e) = transport.broadcast_unreliable(&encoded_fragment).await {
809                            error!(error = ?e, "Failed to broadcast fragment");
810                        } else {
811                            sent_count += 1;
812                        }
813                    } else {
814                        for &target in targets {
815                            if let Err(e) =
816                                transport.send_unreliable(target, &encoded_fragment).await
817                            {
818                                error!(error = ?e, "Failed to send fragment");
819                            } else {
820                                sent_count += 1;
821                            }
822                        }
823                    }
824                }
825                Err(e) => {
826                    error!(error = ?e, "Failed to encode fragment event");
827                }
828            }
829        }
830
831        Ok(sent_count)
832    }
833
834    fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
835        let mut manifest = BTreeMap::new();
836        manifest.insert(
837            "version_server".to_string(),
838            env!("CARGO_PKG_VERSION").to_string(),
839        );
840        manifest.insert(
841            "version_protocol".to_string(),
842            aetheris_protocol::VERSION.to_string(),
843        );
844
845        if can_run_playground_command(jti) {
846            manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
847            manifest.insert(
848                "clients_active".to_string(),
849                self.authenticated_clients.len().to_string(),
850            );
851        }
852        manifest
853    }
854}
855
856/// Validates if a session (identified by its JTI) is authorized to run destructive playground commands.
857///
858/// In Phase 1, this uses a simplified check against the 'admin' JTI used in development.
859/// In Phase 3, this will be tied to the account's permission level.
860fn can_run_playground_command(jti: &str) -> bool {
861    // Current dev credential in Aetheris Playground always generates jti="admin"
862    // Fail closed: AETHERIS_ENV must be explicitly set to "dev"; absence is not treated as dev.
863    jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
864}