Skip to main content

aetheris_server/
tick.rs

1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use aetheris_protocol::error::EncodeError;
9use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
10use aetheris_protocol::reassembler::Reassembler;
11use aetheris_protocol::traits::{Encoder, PlatformTransport, WorldState};
12use aetheris_protocol::types::{ClientId, NetworkId};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17/// Messages sent to the dedicated outbound sender task.
18pub enum OutboundMessage {
19    Unreliable { client_id: ClientId, data: Vec<u8> },
20    Reliable { client_id: ClientId, data: Vec<u8> },
21    BroadcastUnreliable { data: Vec<u8> },
22}
23
24#[derive(Debug, Clone)]
25pub enum DeltaTargets {
26    Broadcast,
27    Recipients(Vec<ClientId>),
28    NoRecipients,
29}
30
31/// Manages the fixed-timestep execution of the game loop.
32#[derive(Debug)]
33pub struct TickScheduler {
34    tick_rate: u64,
35    current_tick: u64,
36    auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
37
38    /// Maps `ClientId` -> (Verified Session, owned session agent `NetworkId`)
39    authenticated_clients: HashMap<ClientId, (crate::auth::VerifiedSession, Option<NetworkId>)>,
40    /// Tracks when each client was successfully authenticated.
41    auth_timestamps: HashMap<ClientId, Instant>,
42    reassembler: Reassembler,
43    next_message_id: u32,
44    encode_pool: Arc<rayon::ThreadPool>,
45    outbound_tx: Option<mpsc::Sender<OutboundMessage>>,
46    recording_ticks: Option<u64>,
47    golden_hashes: Vec<u64>,
48    /// When true, `tick_step` calls `world.stress_test(100, true)` after `advance_tick()` at tick 0.
49    /// This matches the recording mode setup and is also used by the determinism replay test.
50    spawn_at_zero: bool,
51}
52
53impl TickScheduler {
54    /// Creates a new scheduler with the specified tick rate.
55    #[must_use]
56    pub fn new(
57        tick_rate: u64,
58        auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
59        encode_pool: Arc<rayon::ThreadPool>,
60    ) -> Self {
61        Self {
62            tick_rate,
63            current_tick: 0,
64            auth_service,
65            authenticated_clients: HashMap::new(),
66            auth_timestamps: HashMap::new(),
67            reassembler: Reassembler::new(),
68            next_message_id: 1,
69            encode_pool,
70            outbound_tx: None,
71            recording_ticks: std::env::var("AETHERIS_RECORD_GOLDEN")
72                .ok()
73                .and_then(|v| v.parse().ok()),
74            golden_hashes: Vec::new(),
75            spawn_at_zero: std::env::var("AETHERIS_RECORD_GOLDEN").is_ok(),
76        }
77    }
78
79    /// Enables spawning 100 stress-test entities after `advance_tick()` at tick 0.
80    ///
81    /// This matches the server recording setup and must be used by the determinism replay
82    /// test so that entity `origin_tick` values are identical to those in the golden file.
83    #[must_use]
84    pub fn with_spawn_at_zero(mut self, v: bool) -> Self {
85        self.spawn_at_zero = v;
86        self
87    }
88
89    /// Sets the outbound channel for messages. Used in tests or custom loops.
90    pub fn set_outbound_tx(&mut self, tx: tokio::sync::mpsc::Sender<OutboundMessage>) {
91        self.outbound_tx = Some(tx);
92    }
93
94    /// Runs the infinite game loop until the shutdown token is cancelled.
95    pub async fn run(
96        &mut self,
97        transport: Box<dyn PlatformTransport>,
98        mut world: Box<dyn WorldState>,
99        encoder: Box<dyn Encoder>,
100        mut shutdown: broadcast::Receiver<()>,
101    ) {
102        let (tx, mut rx) = mpsc::channel(2048);
103        self.outbound_tx = Some(tx.clone());
104
105        let transport = Arc::new(RwLock::new(transport));
106        let transport_clone = transport.clone();
107
108        let mut outbound_shutdown = shutdown.resubscribe();
109        tokio::spawn(async move {
110            loop {
111                tokio::select! {
112                    msg = rx.recv() => {
113                        let Some(msg) = msg else { break; };
114                        let transport = transport_clone.read().await;
115                        match msg {
116                            OutboundMessage::Unreliable { client_id, data } => {
117                                if let Err(e) = transport.send_unreliable(client_id, &data).await {
118                                    error!(error = ?e, ?client_id, "Outbound task failed to send unreliable message");
119                                }
120                            }
121                            OutboundMessage::Reliable { client_id, data } => {
122                                if let Err(e) = transport.send_reliable(client_id, &data).await {
123                                    error!(error = ?e, ?client_id, "Outbound task failed to send reliable message");
124                                }
125                            }
126                            OutboundMessage::BroadcastUnreliable { data } => {
127                                if let Err(e) = transport.broadcast_unreliable(&data).await {
128                                    error!(error = ?e, "Outbound task failed to broadcast unreliable message");
129                                }
130                            }
131                        }
132                    }
133                    _ = outbound_shutdown.recv() => {
134                        break;
135                    }
136                }
137            }
138        });
139
140        #[allow(clippy::cast_precision_loss)]
141        let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
142        let mut interval = interval(tick_duration);
143        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
144
145        let mut last_tick_wall = Instant::now();
146
147        loop {
148            tokio::select! {
149                _ = interval.tick() => {
150                    let tick_num = self.current_tick;
151                    let start = Instant::now();
152
153                    // Wall-clock tick rate: measured from the previous tick start.
154                    let wall_elapsed = start.duration_since(last_tick_wall);
155                    if wall_elapsed.as_secs_f64() > 0.0 {
156                        metrics::gauge!("aetheris_actual_tick_rate_hz")
157                            .set(1.0 / wall_elapsed.as_secs_f64());
158                    }
159                    last_tick_wall = start;
160
161                    self.tick_step(
162                        &transport,
163                        world.as_mut(),
164                        encoder.as_ref(),
165                    )
166                    .instrument(info_span!("tick", tick = tick_num))
167                    .await;
168                    let elapsed = start.elapsed();
169
170                    metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
171                }
172                _ = shutdown.recv() => {
173                    tracing::info!("Server shutting down gracefully");
174                    break;
175                }
176            }
177        }
178    }
179
180    /// Executes a single 5-stage tick pipeline.
181    #[allow(clippy::too_many_lines)]
182    pub async fn tick_step(
183        &mut self,
184        transport_lock: &RwLock<Box<dyn PlatformTransport>>,
185        world: &mut dyn WorldState,
186        encoder: &dyn Encoder,
187    ) {
188        let tick_start = Instant::now();
189        let tick = self.current_tick;
190        self.current_tick += 1;
191
192        let mut transport = transport_lock.write().await;
193        // Pre-Stage: Advance the world change tick before any inputs are applied.
194        // This ensures entities spawned in Stage 2 receive a tick strictly greater than
195        // `last_extraction_tick`, which is required for Bevy 0.15+'s `is_changed` check.
196        // Without this, newly spawned entities share the same tick as `last_extraction_tick`
197        // and are silently skipped by `extract_deltas`, causing them to never be replicated.
198        world.advance_tick();
199
200        if tick == 0 && self.spawn_at_zero {
201            tracing::info!("Recording mode: Spawning 100 entities for determinism test");
202            world.stress_test(100, true);
203        }
204
205        // Stage 1: Poll
206        let t1 = Instant::now();
207        let events = match transport
208            .poll_events()
209            .instrument(debug_span!("stage1_poll"))
210            .await
211        {
212            Ok(e) => e,
213            Err(e) => {
214                error!(error = ?e, "Fatal transport error during poll; skipping tick");
215                return;
216            }
217        };
218        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
219            .record(t1.elapsed().as_secs_f64());
220
221        let inbound_count: u64 = events
222            .iter()
223            .filter(|e| {
224                matches!(
225                    e,
226                    NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
227                )
228            })
229            .count() as u64;
230        metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
231
232        let mut to_disconnect = Vec::new();
233
234        // Periodic Session Validation (every 60 ticks / ~1s)
235        if tick.is_multiple_of(60) {
236            let mut to_remove = Vec::new();
237            for (&client_id, (session, _)) in &self.authenticated_clients {
238                if !self
239                    .auth_service
240                    .is_session_authorized(&session.jti, Some(tick))
241                {
242                    tracing::warn!(?client_id, "Session invalidated during periodic check");
243                    to_remove.push(client_id);
244                }
245            }
246            for client_id in to_remove {
247                if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id) {
248                    let _ = world.despawn_networked(nid);
249                }
250                self.auth_timestamps.remove(&client_id);
251                metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
252
253                // Also disconnect the transport
254                to_disconnect.push(client_id);
255            }
256        }
257
258        // Stage 2: Apply
259        let t2 = Instant::now();
260        let mut pong_responses = None;
261        let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
262        if !events.is_empty() {
263            let _span = debug_span!("stage2_apply", count = events.len()).entered();
264            let mut updates = Vec::with_capacity(events.len());
265            for event in events {
266                // Stage 2.1: Reassembly & Normalization
267                let (client_id, raw_data, is_message) = match event {
268                    NetworkEvent::Fragment {
269                        client_id,
270                        fragment,
271                    } => {
272                        if let Some(data) = self.reassembler.ingest(client_id, fragment) {
273                            (client_id, data, true)
274                        } else {
275                            continue;
276                        }
277                    }
278                    NetworkEvent::UnreliableMessage { data, client_id }
279                    | NetworkEvent::ReliableMessage { data, client_id } => {
280                        // Try to decode as a protocol fragment first
281                        if let Ok(NetworkEvent::Fragment { fragment, .. }) =
282                            encoder.decode_event(&data)
283                        {
284                            if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
285                            {
286                                (client_id, reassembled, true)
287                            } else {
288                                continue;
289                            }
290                        } else {
291                            (client_id, data, true)
292                        }
293                    }
294                    NetworkEvent::ClientConnected(id) => {
295                        metrics::gauge!("aetheris_connected_clients").increment(1.0);
296                        tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
297                        (id, Vec::new(), false)
298                    }
299                    NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
300                        metrics::gauge!("aetheris_connected_clients").decrement(1.0);
301                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
302                            let _ = world.despawn_networked(nid);
303                        }
304                        self.auth_timestamps.remove(&id);
305                        tracing::info!(client_id = ?id, "Client disconnected");
306                        (id, Vec::new(), false)
307                    }
308                    NetworkEvent::SessionClosed(id) => {
309                        metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
310                        .increment(1);
311                        tracing::warn!(client_id = ?id, "WebTransport session closed");
312                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
313                            let _ = world.despawn_networked(nid);
314                        }
315                        self.auth_timestamps.remove(&id);
316                        (id, Vec::new(), false)
317                    }
318                    NetworkEvent::StreamReset(id) => {
319                        metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
320                        .increment(1);
321                        tracing::error!(client_id = ?id, "WebTransport stream reset");
322                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
323                            let _ = world.despawn_networked(nid);
324                        }
325                        self.auth_timestamps.remove(&id);
326                        (id, Vec::new(), false)
327                    }
328                    NetworkEvent::Ping { client_id, tick } => {
329                        if self.authenticated_clients.contains_key(&client_id) {
330                            pong_responses.get_or_insert_with(Vec::new).push((
331                                client_id,
332                                tick,
333                                Instant::now(),
334                            ));
335                            metrics::counter!("aetheris_protocol_pings_received_total")
336                                .increment(1);
337                        }
338                        (client_id, Vec::new(), false)
339                    }
340                    NetworkEvent::ClearWorld { client_id, .. }
341                    | NetworkEvent::StartSession { client_id }
342                    | NetworkEvent::RequestWorkspaceManifest { client_id }
343                    | NetworkEvent::PlatformEvent { client_id, .. }
344                    | NetworkEvent::StressTest { client_id, .. }
345                    | NetworkEvent::ReplicationBatch { client_id, .. }
346                    | NetworkEvent::EntitySpawned { client_id, .. }
347                    | NetworkEvent::EntityDespawned { client_id, .. }
348                    | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
349                    NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
350                        (aetheris_protocol::types::ClientId(0), Vec::new(), false)
351                    }
352                };
353
354                if !is_message {
355                    continue;
356                }
357
358                // Stage 2.2: Auth & Protocol Decode
359                let session = if let Some((session, _)) = self.authenticated_clients.get(&client_id)
360                {
361                    // Re-validate session on every message to refresh sliding window / catch revocation
362                    if !self
363                        .auth_service
364                        .is_session_authorized(&session.jti, Some(tick))
365                    {
366                        tracing::warn!(?client_id, "Session revoked; dropping client");
367                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id)
368                        {
369                            let _ = world.despawn_networked(nid);
370                        }
371                        self.auth_timestamps.remove(&client_id);
372                        metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
373                        continue;
374                    }
375                    session
376                } else {
377                    // Client not authenticated yet; only accept Auth message
378                    match encoder.decode_event(&raw_data) {
379                        Ok(NetworkEvent::Auth { session_token }) => {
380                            tracing::info!(?client_id, "Auth message received");
381                            match self.auth_service.verify_session(&session_token, Some(tick)) {
382                                Ok(session) => {
383                                    tracing::info!(
384                                        ?client_id,
385                                        player_id = %session.player_id,
386                                        "Client authenticated successfully"
387                                    );
388
389                                    // M10160: Kick old sessions for the same player.
390                                    // This prevents "ghost" entities from persisting after a quick reload.
391                                    let old_clients: Vec<ClientId> = self
392                                        .authenticated_clients
393                                        .iter()
394                                        .filter(|(id, (s, _))| {
395                                            s.player_id == session.player_id && **id != client_id
396                                        })
397                                        .map(|(id, _)| *id)
398                                        .collect();
399
400                                    for old_id in old_clients {
401                                        tracing::info!(
402                                            ?old_id,
403                                            new_client_id = ?client_id,
404                                            "Kicking ghost session for same player_id"
405                                        );
406                                        if let Some((_, Some(nid))) =
407                                            self.authenticated_clients.remove(&old_id)
408                                        {
409                                            let _ = world.despawn_networked(nid);
410                                        }
411                                        self.auth_timestamps.remove(&old_id);
412
413                                        // Collect for safe disconnection after releasing transport lock
414                                        to_disconnect.push(old_id);
415                                    }
416
417                                    self.authenticated_clients
418                                        .insert(client_id, (session, None));
419
420                                    // Record when auth completed so we can measure server-side
421                                    // possession latency (A-08 profiling metric).
422                                    self.auth_timestamps.insert(client_id, Instant::now());
423
424                                    tracing::info!(
425                                        ?client_id,
426                                        "[Auth] Client authenticated — waiting for StartSession to spawn agent"
427                                    );
428                                    continue;
429                                }
430                                Err(e) => {
431                                    tracing::warn!(
432                                        ?client_id,
433                                        error = ?e,
434                                        "Client failed authentication"
435                                    );
436                                }
437                            }
438                        }
439                        Ok(other) => {
440                            tracing::warn!(
441                                ?client_id,
442                                variant = ?std::mem::discriminant(&other),
443                                bytes = raw_data.len(),
444                                "Unauthenticated client sent non-Auth event — discarding"
445                            );
446                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
447                        }
448                        Err(e) => {
449                            tracing::warn!(
450                                ?client_id,
451                                error = ?e,
452                                bytes = raw_data.len(),
453                                "Failed to decode message from unauthenticated client"
454                            );
455                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
456                        }
457                    }
458                    continue;
459                };
460
461                // Check if it's a protocol-level event first (Ping/Pong/etc)
462                if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
463                    match protocol_event {
464                        NetworkEvent::Ping { tick: p_tick, .. } => {
465                            pong_responses.get_or_insert_with(Vec::new).push((
466                                client_id,
467                                p_tick,
468                                Instant::now(),
469                            ));
470                            metrics::counter!("aetheris_protocol_pings_received_total")
471                                .increment(1);
472                        }
473                        NetworkEvent::Auth { .. } => {
474                            tracing::debug!(?client_id, "Client re-authenticating (ignored)");
475                        }
476                        NetworkEvent::StressTest { count, rotate, .. } => {
477                            tracing::info!(
478                                ?client_id,
479                                count,
480                                rotate,
481                                "StressTest event received from authenticated client"
482                            );
483                            if can_run_playground_command(&session.jti) {
484                                // M10105 — Safety cap to prevent server-side resource exhaustion.
485                                const MAX_STRESS: u16 = 1000;
486                                let capped_count = count.min(MAX_STRESS);
487                                if count > MAX_STRESS {
488                                    tracing::warn!(
489                                        ?client_id,
490                                        count,
491                                        capped_count,
492                                        "Stress test count capped at limit"
493                                    );
494                                }
495
496                                tracing::info!(
497                                    ?client_id,
498                                    count = capped_count,
499                                    rotate,
500                                    "Stress test command executed"
501                                );
502                                world.stress_test(capped_count, rotate);
503                            } else {
504                                tracing::warn!(?client_id, "Unauthorized StressTest attempt");
505                                metrics::counter!("aetheris_unprivileged_packets_total")
506                                    .increment(1);
507                            }
508                        }
509                        NetworkEvent::Spawn {
510                            entity_type,
511                            x,
512                            y,
513                            rot,
514                            ..
515                        } => {
516                            if can_run_playground_command(&session.jti) {
517                                let network_id =
518                                    world.spawn_kind_for(entity_type, x, y, rot, client_id);
519
520                                tracing::info!(
521                                    ?client_id,
522                                    entity_type,
523                                    new_entity_id = network_id.0,
524                                    "[Spawn] Playground entity spawned"
525                                );
526                            } else {
527                                tracing::warn!(?client_id, "Unauthorized Spawn attempt");
528                                metrics::counter!("aetheris_unprivileged_packets_total")
529                                    .increment(1);
530                            }
531                        }
532                        NetworkEvent::StartSession { .. } => {
533                            if let Some((_, agent_id)) =
534                                self.authenticated_clients.get_mut(&client_id)
535                            {
536                                let network_id = if let Some(nid) = agent_id {
537                                    tracing::info!(
538                                        ?client_id,
539                                        ?nid,
540                                        "Reusing existing session agent"
541                                    );
542                                    *nid
543                                } else {
544                                    let nid =
545                                        world.spawn_session_agent(1, 0.0, 0.0, 0.0, client_id);
546                                    *agent_id = Some(nid);
547                                    nid
548                                };
549
550                                world.queue_reliable_event(
551                                    Some(client_id),
552                                    aetheris_protocol::events::PlatformEvent::Possession {
553                                        network_id,
554                                    },
555                                );
556
557                                // Record server-side auth→possession latency (A-08 profiling).
558                                if let Some(auth_ts) = self.auth_timestamps.remove(&client_id) {
559                                    metrics::histogram!("aetheris_session_start_latency_seconds")
560                                        .record(auth_ts.elapsed().as_secs_f64());
561                                }
562
563                                tracing::info!(
564                                    ?client_id,
565                                    network_id = network_id.0,
566                                    "[StartSession] Session agent assigned (spawned or reused) — Possession event queued"
567                                );
568                            }
569                        }
570                        NetworkEvent::ClearWorld { .. } => {
571                            if can_run_playground_command(&session.jti) {
572                                tracing::info!(?client_id, "ClearWorld command executed");
573                                world.clear_world();
574                                // Reset the client's entity-ID tracking so that a subsequent
575                                // StartSession can spawn a new session agent.  Without this,
576                                // the "already_has_agent" guard blocks the next StartSession
577                                // even though all entities were just despawned.
578                                if let Some((_, agent_id)) =
579                                    self.authenticated_clients.get_mut(&client_id)
580                                {
581                                    *agent_id = None;
582                                }
583                                // Queue a reliable ClearWorld ack to send after this block.
584                                // EnteredSpan is !Send so we cannot .await inside this scope.
585                                // The ack arrives at the client AFTER stale in-flight datagrams,
586                                // guaranteeing a full entity flush (eliminates partial-clear race).
587                                clear_ack_targets.push(client_id);
588                            } else {
589                                tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
590                                metrics::counter!("aetheris_unprivileged_packets_total")
591                                    .increment(1);
592                            }
593                        }
594                        NetworkEvent::RequestWorkspaceManifest { .. } => {
595                            let jti = if let Some((session, _)) =
596                                self.authenticated_clients.get(&client_id)
597                            {
598                                &session.jti
599                            } else {
600                                ""
601                            };
602
603                            let manifest = self.get_filtered_manifest(jti);
604                            world.queue_reliable_event(
605                                Some(client_id),
606                                aetheris_protocol::events::PlatformEvent::WorkspaceManifest {
607                                    manifest,
608                                },
609                            );
610                        }
611                        NetworkEvent::ReplicationBatch { events, .. } => {
612                            for event in events {
613                                updates.push((
614                                    client_id,
615                                    aetheris_protocol::events::ComponentUpdate {
616                                        network_id: event.network_id,
617                                        component_kind: event.component_kind,
618                                        payload: event.payload,
619                                        tick: event.tick,
620                                    },
621                                ));
622                            }
623                        }
624                        _ => {
625                            tracing::trace!(?protocol_event, "Protocol event");
626                        }
627                    }
628                } else {
629                    // If it's not a protocol event, try to decode it as a game update
630                    match encoder.decode(&raw_data) {
631                        Ok(update) => updates.push((client_id, update)),
632                        Err(e) => {
633                            metrics::counter!("aetheris_decode_errors_total").increment(1);
634                            error!(
635                                error = ?e,
636                                size = raw_data.len(),
637                                "Failed to decode update (not a protocol event)"
638                            );
639                        }
640                    }
641                }
642            }
643            world.apply_updates(&updates);
644            self.reassembler.prune();
645        }
646        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
647            .record(t2.elapsed().as_secs_f64());
648
649        // Send ClearWorld acks (reliable) for any ClearWorld commands processed this tick.
650        // Sent after the EnteredSpan is dropped, since EnteredSpan is !Send.
651        // The reliable delivery guarantees the client sees this AFTER any stale in-flight
652        // unreliable datagrams, closing the partial-clear race condition.
653        for target in clear_ack_targets {
654            let ack = NetworkEvent::ClearWorld { client_id: target };
655            #[allow(clippy::collapsible_if)]
656            if let Ok(data) = encoder.encode_event(&ack) {
657                if let Err(e) = transport.send_reliable(target, &data).await {
658                    tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
659                }
660            }
661        }
662
663        // Send Pongs for all collected Pings.
664        // Use unreliable (datagram) so the reply travels the same path as the
665        // incoming Ping, and clients that only read datagrams can receive it.
666        if let Some(pongs) = pong_responses {
667            for (client_id, p_tick, received_at) in pongs {
668                let pong_event = NetworkEvent::Pong { tick: p_tick };
669                if let Ok(data) = encoder.encode_event(&pong_event) {
670                    // Measure server-side Pong dispatch time (encode + send).
671                    // This is NOT the full network RTT, but it captures the
672                    // server processing overhead between Ping receipt and Pong send.
673                    let dispatch_start = Instant::now();
674                    match transport.send_unreliable(client_id, &data).await {
675                        Ok(()) => {
676                            let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
677                            let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
678                            metrics::histogram!("aetheris_server_pong_dispatch_ms")
679                                .record(dispatch_ms);
680                            metrics::histogram!("aetheris_server_ping_hold_ms")
681                                .record(server_hold_ms);
682                        }
683                        Err(e) => {
684                            error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
685                        }
686                    }
687                }
688            }
689        }
690
691        // Stage 3: Simulate
692        let t3 = Instant::now();
693        {
694            let _span = debug_span!("stage3_simulate").entered();
695            // Simulation logic (physics, AI, game rules) happens here.
696            world.simulate();
697        }
698        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
699            .record(t3.elapsed().as_secs_f64());
700
701        // Stage 4: Extract
702        let t4 = Instant::now();
703        let (deltas, reliable_events) = {
704            let ds = world.extract_deltas();
705
706            // VS-07 §3.3: Golden File Recording
707            if let Some(limit) = self.recording_ticks.filter(|&l| tick < l) {
708                let hash = world.state_hash();
709                self.golden_hashes.push(hash);
710
711                if tick + 1 == limit {
712                    tracing::info!(
713                        limit,
714                        "Golden recording complete. Saving to golden_600ticks.bin"
715                    );
716                    let data: Vec<u8> = self
717                        .golden_hashes
718                        .iter()
719                        .flat_map(|h| h.to_le_bytes())
720                        .collect();
721                    if let Err(e) = std::fs::write("golden_600ticks.bin", data) {
722                        tracing::error!(error = ?e, "Failed to write golden file");
723                    } else {
724                        tracing::info!("Successfully saved golden_600ticks.bin");
725                        std::process::exit(0);
726                    }
727                }
728            }
729            let rs = world.extract_reliable_events();
730            (ds, rs)
731        };
732        // Reset ECS change-detection *after* extraction so simulate()'s mutations are visible.
733        world.post_extract();
734        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
735            .record(t4.elapsed().as_secs_f64());
736
737        // Stage 5: Encode & Send
738        let t5 = Instant::now();
739
740        // Stage 5.1: Send Reliable Events
741        for (target, wire_event) in reliable_events {
742            // Broadcast reliably to all authenticated clients if target is None
743            let targets: Vec<_> = if let Some(id) = target {
744                vec![id]
745            } else {
746                self.authenticated_clients.keys().copied().collect()
747            };
748
749            for id in targets {
750                tracing::info!(?id, event = ?wire_event, "Sending reliable event to client");
751                let network_event = wire_event.clone().into_network_event(id);
752                match encoder.encode_event(&network_event) {
753                    Ok(data) => {
754                        if let Some(tx) = &self.outbound_tx {
755                            let _ = tx
756                                .send(OutboundMessage::Reliable {
757                                    client_id: id,
758                                    data,
759                                })
760                                .await;
761                        }
762                    }
763                    Err(e) => {
764                        error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
765                    }
766                }
767            }
768        }
769
770        if !deltas.is_empty() {
771            let mut broadcast_count: u64 = 0;
772
773            let stage_span = debug_span!("stage5_send", count = deltas.len());
774            let _guard = stage_span.enter();
775
776            // A-01: Packet Batching (Phase 1 Optimization)
777            // Group all deltas by their target clients to avoid N*M packet explosion.
778            let mut client_batches: HashMap<
779                aetheris_protocol::types::ClientId,
780                Vec<aetheris_protocol::events::ReplicationEvent>,
781            > = HashMap::with_capacity(self.authenticated_clients.len());
782
783            for delta in deltas {
784                let targets =
785                    Self::get_delta_targets(world, &self.authenticated_clients, delta.network_id);
786
787                match targets {
788                    DeltaTargets::Broadcast => {
789                        // Global broadcast (to all authenticated clients)
790                        for &client_id in self.authenticated_clients.keys() {
791                            client_batches
792                                .entry(client_id)
793                                .or_default()
794                                .push(delta.clone());
795                        }
796                    }
797                    DeltaTargets::Recipients(recipients) => {
798                        // Targeted multicast (AoI / Room filtered)
799                        for target in recipients {
800                            client_batches
801                                .entry(target)
802                                .or_default()
803                                .push(delta.clone());
804                        }
805                    }
806                    DeltaTargets::NoRecipients => {}
807                }
808            }
809
810            let max_size = encoder.max_encoded_size();
811            thread_local! {
812                static SCRATCH_BUFFER: std::cell::RefCell<Vec<u8>> = const { std::cell::RefCell::new(Vec::new()) };
813            }
814
815            // A-04: Parallel Stage 5 Encode
816            // CPU-intensive serialization is offloaded to a dedicated Rayon pool.
817            // We use block_in_place to inform Tokio that the current thread is performing CPU-heavy work.
818            use rayon::prelude::{IntoParallelIterator, ParallelIterator};
819
820            let batches_to_encode: Vec<_> = client_batches.into_iter().collect();
821
822            let encoded_results = tokio::task::block_in_place(|| {
823                self.encode_pool.install(|| {
824                    batches_to_encode
825                        .into_par_iter()
826                        .map(|(client_id, events)| {
827                            let batch_event =
828                                aetheris_protocol::events::NetworkEvent::ReplicationBatch {
829                                    client_id,
830                                    events,
831                                };
832                            // SCRATCH_BUFFER optimization (M10105):
833                            // Uses worker-local memory to avoid allocations during serialization.
834                            // Only a single final allocation (to_vec) is performed to return data to main thread.
835                            SCRATCH_BUFFER.with(|buf| {
836                                let mut b = buf.borrow_mut();
837                                if b.len() < max_size {
838                                    b.resize(max_size, 0);
839                                }
840                                match encoder.encode_event_into(&batch_event, &mut b) {
841                                    Ok(size) => (client_id, Ok(b[..size].to_vec())),
842                                    Err(
843                                        aetheris_protocol::error::EncodeError::BufferOverflow {
844                                            ..
845                                        },
846                                    ) => {
847                                        // M10105 — Reliable fallback for large batches that exceed scratch buffer.
848                                        // We use the allocating encode_event() here as a safety valve.
849                                        match encoder.encode_event(&batch_event) {
850                                            Ok(data) => (client_id, Ok(data)),
851                                            Err(e) => (client_id, Err(e)),
852                                        }
853                                    }
854                                    Err(e) => (client_id, Err(e)),
855                                }
856                            })
857                        })
858                        .collect::<Vec<_>>()
859                })
860            });
861
862            for (client_id, result) in encoded_results {
863                match result {
864                    Ok(data) => {
865                        let targets = DeltaTargets::Recipients(vec![client_id]);
866                        if data.len() > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE {
867                            match self
868                                .fragment_and_send(&data, data.len(), &targets, encoder)
869                                .await
870                            {
871                                Ok(count) => broadcast_count += count,
872                                Err(e) => {
873                                    error!(error = ?e, ?client_id, "Failed to fragment large batch");
874                                }
875                            }
876                        } else if let Some(tx) = &self.outbound_tx {
877                            let _ = tx
878                                .send(OutboundMessage::Unreliable { client_id, data })
879                                .await;
880                            broadcast_count += 1;
881                        }
882                    }
883                    Err(e) => {
884                        error!(error = ?e, ?client_id, "Failed to encode batch");
885                    }
886                }
887            }
888
889            metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
890            metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
891        }
892        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
893            .record(t5.elapsed().as_secs_f64());
894
895        metrics::histogram!("aetheris_tick_duration_seconds")
896            .record(tick_start.elapsed().as_secs_f64());
897
898        // Stage 6: Disconnect any kicked clients after releasing transport lock
899        drop(transport);
900        for id in to_disconnect {
901            let transport = transport_lock.read().await;
902            let _ = transport.disconnect(id).await;
903        }
904    }
905
906    fn get_delta_targets(
907        world: &dyn WorldState,
908        clients: &HashMap<ClientId, (crate::auth::VerifiedSession, Option<NetworkId>)>,
909        entity_id: NetworkId,
910    ) -> DeltaTargets {
911        if let Some(workspace_id) = world.get_entity_workspace(entity_id) {
912            let mut recipients = Vec::new();
913            for &client_id in clients.keys() {
914                if world.get_client_workspace(client_id) == Some(workspace_id) {
915                    recipients.push(client_id);
916                }
917            }
918            if recipients.is_empty() {
919                DeltaTargets::NoRecipients
920            } else {
921                DeltaTargets::Recipients(recipients)
922            }
923        } else {
924            DeltaTargets::Broadcast
925        }
926    }
927
928    async fn fragment_and_send(
929        &mut self,
930        data: &[u8],
931        len: usize,
932        targets: &DeltaTargets,
933        encoder: &dyn Encoder,
934    ) -> Result<u64, EncodeError> {
935        let Some(tx) = &self.outbound_tx else {
936            return Ok(0);
937        };
938        let message_id = self.next_message_id;
939        self.next_message_id = self.next_message_id.wrapping_add(1);
940
941        let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
942        let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
943
944        let Ok(total_fragments) = u16::try_from(chunks.len()) else {
945            error!(
946                message_id,
947                chunks = chunks.len(),
948                "Too many fragments required for message; dropping payload"
949            );
950            return Err(EncodeError::Io(std::io::Error::new(
951                std::io::ErrorKind::InvalidData,
952                "Too many fragments",
953            )));
954        };
955
956        let mut sent_count = 0;
957        for (i, chunk) in chunks.into_iter().enumerate() {
958            let Ok(fragment_index) = u16::try_from(i) else {
959                error!(message_id, index = i, "Fragment index overflow; stopping");
960                break;
961            };
962
963            let fragment = FragmentedEvent {
964                message_id,
965                fragment_index,
966                total_fragments,
967                payload: chunk.to_vec(),
968            };
969            let fragment_event = NetworkEvent::Fragment {
970                client_id: ClientId(0),
971                fragment,
972            };
973
974            match encoder.encode_event(&fragment_event) {
975                Ok(encoded_fragment) => match targets {
976                    DeltaTargets::Broadcast => {
977                        let _ = tx
978                            .send(OutboundMessage::BroadcastUnreliable {
979                                data: encoded_fragment,
980                            })
981                            .await;
982                        sent_count += 1;
983                    }
984                    DeltaTargets::Recipients(recipients) => {
985                        for &target in recipients {
986                            let _ = tx
987                                .send(OutboundMessage::Unreliable {
988                                    client_id: target,
989                                    data: encoded_fragment.clone(),
990                                })
991                                .await;
992                            sent_count += 1;
993                        }
994                    }
995                    DeltaTargets::NoRecipients => {}
996                },
997                Err(e) => {
998                    error!(error = ?e, "Failed to encode fragment event");
999                }
1000            }
1001        }
1002
1003        Ok(sent_count)
1004    }
1005
1006    fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
1007        let mut manifest = BTreeMap::new();
1008        manifest.insert(
1009            "version_server".to_string(),
1010            env!("CARGO_PKG_VERSION").to_string(),
1011        );
1012        manifest.insert(
1013            "version_protocol".to_string(),
1014            aetheris_protocol::VERSION.to_string(),
1015        );
1016
1017        if can_run_playground_command(jti) {
1018            manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
1019            manifest.insert(
1020                "clients_active".to_string(),
1021                self.authenticated_clients.len().to_string(),
1022            );
1023        }
1024        manifest
1025    }
1026}
1027
1028/// Validates if a session (identified by its JTI) is authorized to run destructive playground commands.
1029///
1030/// In Phase 1, this uses a simplified check against the 'admin' JTI used in development.
1031/// In Phase 3, this will be tied to the account's permission level.
1032fn can_run_playground_command(jti: &str) -> bool {
1033    // Current dev credential in Aetheris Playground always generates jti="admin"
1034    // Fail closed: AETHERIS_ENV must be explicitly set to "dev"; absence is not treated as dev.
1035    jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
1036}