Skip to main content

aetheris_server/
tick.rs

1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use aetheris_protocol::error::EncodeError;
9use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
10use aetheris_protocol::reassembler::Reassembler;
11use aetheris_protocol::traits::{Encoder, GameTransport, WorldState};
12use aetheris_protocol::types::{ClientId, NetworkId};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17/// Messages sent to the dedicated outbound sender task.
18pub enum OutboundMessage {
19    Unreliable { client_id: ClientId, data: Vec<u8> },
20    Reliable { client_id: ClientId, data: Vec<u8> },
21    BroadcastUnreliable { data: Vec<u8> },
22}
23
24#[derive(Debug, Clone)]
25pub enum DeltaTargets {
26    Broadcast,
27    Recipients(Vec<ClientId>),
28    NoRecipients,
29}
30
31/// Manages the fixed-timestep execution of the game loop.
32#[derive(Debug)]
33pub struct TickScheduler {
34    tick_rate: u64,
35    current_tick: u64,
36    auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
37
38    /// Maps `ClientId` -> (Session JTI, owned session ship `NetworkId`)
39    authenticated_clients: HashMap<ClientId, (String, Option<NetworkId>)>,
40    /// Tracks when each client was successfully authenticated.
41    auth_timestamps: HashMap<ClientId, Instant>,
42    reassembler: Reassembler,
43    next_message_id: u32,
44    encode_pool: Arc<rayon::ThreadPool>,
45    outbound_tx: Option<mpsc::Sender<OutboundMessage>>,
46    recording_ticks: Option<u64>,
47    golden_hashes: Vec<u64>,
48    /// When true, `tick_step` calls `world.stress_test(100, true)` after `advance_tick()` at tick 0.
49    /// This matches the recording mode setup and is also used by the determinism replay test.
50    spawn_at_zero: bool,
51}
52
53impl TickScheduler {
54    /// Creates a new scheduler with the specified tick rate.
55    #[must_use]
56    pub fn new(
57        tick_rate: u64,
58        auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
59        encode_pool: Arc<rayon::ThreadPool>,
60    ) -> Self {
61        Self {
62            tick_rate,
63            current_tick: 0,
64            auth_service,
65            authenticated_clients: HashMap::new(),
66            auth_timestamps: HashMap::new(),
67            reassembler: Reassembler::new(),
68            next_message_id: 1,
69            encode_pool,
70            outbound_tx: None,
71            recording_ticks: std::env::var("AETHERIS_RECORD_GOLDEN")
72                .ok()
73                .and_then(|v| v.parse().ok()),
74            golden_hashes: Vec::new(),
75            spawn_at_zero: std::env::var("AETHERIS_RECORD_GOLDEN").is_ok(),
76        }
77    }
78
79    /// Enables spawning 100 stress-test entities after `advance_tick()` at tick 0.
80    ///
81    /// This matches the server recording setup and must be used by the determinism replay
82    /// test so that entity `origin_tick` values are identical to those in the golden file.
83    #[must_use]
84    pub fn with_spawn_at_zero(mut self, v: bool) -> Self {
85        self.spawn_at_zero = v;
86        self
87    }
88
89    /// Sets the outbound channel for messages. Used in tests or custom loops.
90    pub fn set_outbound_tx(&mut self, tx: tokio::sync::mpsc::Sender<OutboundMessage>) {
91        self.outbound_tx = Some(tx);
92    }
93
94    /// Runs the infinite game loop until the shutdown token is cancelled.
95    pub async fn run(
96        &mut self,
97        transport: Box<dyn GameTransport>,
98        mut world: Box<dyn WorldState>,
99        encoder: Box<dyn Encoder>,
100        mut shutdown: broadcast::Receiver<()>,
101    ) {
102        let (tx, mut rx) = mpsc::channel(2048);
103        self.outbound_tx = Some(tx.clone());
104
105        let transport = Arc::new(RwLock::new(transport));
106        let transport_clone = transport.clone();
107
108        let mut outbound_shutdown = shutdown.resubscribe();
109        tokio::spawn(async move {
110            loop {
111                tokio::select! {
112                    msg = rx.recv() => {
113                        let Some(msg) = msg else { break; };
114                        let transport = transport_clone.read().await;
115                        match msg {
116                            OutboundMessage::Unreliable { client_id, data } => {
117                                if let Err(e) = transport.send_unreliable(client_id, &data).await {
118                                    error!(error = ?e, ?client_id, "Outbound task failed to send unreliable message");
119                                }
120                            }
121                            OutboundMessage::Reliable { client_id, data } => {
122                                if let Err(e) = transport.send_reliable(client_id, &data).await {
123                                    error!(error = ?e, ?client_id, "Outbound task failed to send reliable message");
124                                }
125                            }
126                            OutboundMessage::BroadcastUnreliable { data } => {
127                                if let Err(e) = transport.broadcast_unreliable(&data).await {
128                                    error!(error = ?e, "Outbound task failed to broadcast unreliable message");
129                                }
130                            }
131                        }
132                    }
133                    _ = outbound_shutdown.recv() => {
134                        break;
135                    }
136                }
137            }
138        });
139
140        #[allow(clippy::cast_precision_loss)]
141        let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
142        let mut interval = interval(tick_duration);
143        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
144
145        let mut last_tick_wall = Instant::now();
146
147        loop {
148            tokio::select! {
149                _ = interval.tick() => {
150                    let tick_num = self.current_tick;
151                    let start = Instant::now();
152
153                    // Wall-clock tick rate: measured from the previous tick start.
154                    let wall_elapsed = start.duration_since(last_tick_wall);
155                    if wall_elapsed.as_secs_f64() > 0.0 {
156                        metrics::gauge!("aetheris_actual_tick_rate_hz")
157                            .set(1.0 / wall_elapsed.as_secs_f64());
158                    }
159                    last_tick_wall = start;
160
161                    self.tick_step(
162                        &transport,
163                        world.as_mut(),
164                        encoder.as_ref(),
165                    )
166                    .instrument(info_span!("tick", tick = tick_num))
167                    .await;
168                    let elapsed = start.elapsed();
169
170                    metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
171                }
172                _ = shutdown.recv() => {
173                    tracing::info!("Server shutting down gracefully");
174                    break;
175                }
176            }
177        }
178    }
179
180    /// Executes a single 5-stage tick pipeline.
181    #[allow(clippy::too_many_lines)]
182    pub async fn tick_step(
183        &mut self,
184        transport_lock: &RwLock<Box<dyn GameTransport>>,
185        world: &mut dyn WorldState,
186        encoder: &dyn Encoder,
187    ) {
188        let tick_start = Instant::now();
189        let tick = self.current_tick;
190        self.current_tick += 1;
191
192        let mut transport = transport_lock.write().await;
193        // Pre-Stage: Advance the world change tick before any inputs are applied.
194        // This ensures entities spawned in Stage 2 receive a tick strictly greater than
195        // `last_extraction_tick`, which is required for Bevy 0.15+'s `is_changed` check.
196        // Without this, newly spawned entities share the same tick as `last_extraction_tick`
197        // and are silently skipped by `extract_deltas`, causing them to never be replicated.
198        world.advance_tick();
199
200        if tick == 0 && self.spawn_at_zero {
201            tracing::info!("Recording mode: Spawning 100 entities for determinism test");
202            world.stress_test(100, true);
203        }
204
205        // Stage 1: Poll
206        let t1 = Instant::now();
207        let events = match transport
208            .poll_events()
209            .instrument(debug_span!("stage1_poll"))
210            .await
211        {
212            Ok(e) => e,
213            Err(e) => {
214                error!(error = ?e, "Fatal transport error during poll; skipping tick");
215                return;
216            }
217        };
218        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
219            .record(t1.elapsed().as_secs_f64());
220
221        let inbound_count: u64 = events
222            .iter()
223            .filter(|e| {
224                matches!(
225                    e,
226                    NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
227                )
228            })
229            .count() as u64;
230        metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
231
232        // Periodic Session Validation (every 60 ticks / ~1s)
233        if tick.is_multiple_of(60) {
234            let mut to_remove = Vec::new();
235            for (&client_id, (jti, _)) in &self.authenticated_clients {
236                if !self.auth_service.is_session_authorized(jti, Some(tick)) {
237                    tracing::warn!(?client_id, "Session invalidated during periodic check");
238                    to_remove.push(client_id);
239                }
240            }
241            for client_id in to_remove {
242                if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id) {
243                    let _ = world.despawn_networked(nid);
244                }
245                self.auth_timestamps.remove(&client_id);
246                metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
247            }
248        }
249
250        // Stage 2: Apply
251        let t2 = Instant::now();
252        let mut pong_responses = None;
253        let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
254        if !events.is_empty() {
255            let _span = debug_span!("stage2_apply", count = events.len()).entered();
256            let mut updates = Vec::with_capacity(events.len());
257            for event in events {
258                // Stage 2.1: Reassembly & Normalization
259                let (client_id, raw_data, is_message) = match event {
260                    NetworkEvent::Fragment {
261                        client_id,
262                        fragment,
263                    } => {
264                        if let Some(data) = self.reassembler.ingest(client_id, fragment) {
265                            (client_id, data, true)
266                        } else {
267                            continue;
268                        }
269                    }
270                    NetworkEvent::UnreliableMessage { data, client_id }
271                    | NetworkEvent::ReliableMessage { data, client_id } => {
272                        // Try to decode as a protocol fragment first
273                        if let Ok(NetworkEvent::Fragment { fragment, .. }) =
274                            encoder.decode_event(&data)
275                        {
276                            if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
277                            {
278                                (client_id, reassembled, true)
279                            } else {
280                                continue;
281                            }
282                        } else {
283                            (client_id, data, true)
284                        }
285                    }
286                    NetworkEvent::ClientConnected(id) => {
287                        metrics::gauge!("aetheris_connected_clients").increment(1.0);
288                        tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
289                        (id, Vec::new(), false)
290                    }
291                    NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
292                        metrics::gauge!("aetheris_connected_clients").decrement(1.0);
293                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
294                            let _ = world.despawn_networked(nid);
295                        }
296                        self.auth_timestamps.remove(&id);
297                        tracing::info!(client_id = ?id, "Client disconnected");
298                        (id, Vec::new(), false)
299                    }
300                    NetworkEvent::SessionClosed(id) => {
301                        metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
302                        .increment(1);
303                        tracing::warn!(client_id = ?id, "WebTransport session closed");
304                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
305                            let _ = world.despawn_networked(nid);
306                        }
307                        self.auth_timestamps.remove(&id);
308                        (id, Vec::new(), false)
309                    }
310                    NetworkEvent::StreamReset(id) => {
311                        metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
312                        .increment(1);
313                        tracing::error!(client_id = ?id, "WebTransport stream reset");
314                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
315                            let _ = world.despawn_networked(nid);
316                        }
317                        self.auth_timestamps.remove(&id);
318                        (id, Vec::new(), false)
319                    }
320                    NetworkEvent::Ping { client_id, tick } => {
321                        if self.authenticated_clients.contains_key(&client_id) {
322                            pong_responses.get_or_insert_with(Vec::new).push((
323                                client_id,
324                                tick,
325                                Instant::now(),
326                            ));
327                            metrics::counter!("aetheris_protocol_pings_received_total")
328                                .increment(1);
329                        }
330                        (client_id, Vec::new(), false)
331                    }
332                    NetworkEvent::ClearWorld { client_id, .. }
333                    | NetworkEvent::StartSession { client_id }
334                    | NetworkEvent::RequestSystemManifest { client_id }
335                    | NetworkEvent::GameEvent { client_id, .. }
336                    | NetworkEvent::StressTest { client_id, .. }
337                    | NetworkEvent::ReplicationBatch { client_id, .. }
338                    | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
339                    NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
340                        (aetheris_protocol::types::ClientId(0), Vec::new(), false)
341                    }
342                };
343
344                if !is_message {
345                    continue;
346                }
347
348                // Stage 2.2: Auth & Protocol Decode
349                let jti = if let Some((jti, _)) = self.authenticated_clients.get(&client_id) {
350                    // Re-validate session on every message to refresh sliding window / catch revocation
351                    if !self.auth_service.is_session_authorized(jti, Some(tick)) {
352                        tracing::warn!(?client_id, "Session revoked; dropping client");
353                        if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id)
354                        {
355                            let _ = world.despawn_networked(nid);
356                        }
357                        self.auth_timestamps.remove(&client_id);
358                        metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
359                        continue;
360                    }
361                    jti
362                } else {
363                    // Client not authenticated yet; only accept Auth message
364                    match encoder.decode_event(&raw_data) {
365                        Ok(NetworkEvent::Auth { session_token }) => {
366                            tracing::info!(?client_id, "Auth message received");
367                            match self.auth_service.verify_session(&session_token, Some(tick)) {
368                                Ok(session) => {
369                                    tracing::info!(?client_id, "Client authenticated successfully");
370
371                                    self.authenticated_clients
372                                        .insert(client_id, (session.jti, None));
373                                    // Record when auth completed so we can measure server-side
374                                    // possession latency (A-08 profiling metric).
375                                    self.auth_timestamps.insert(client_id, Instant::now());
376
377                                    tracing::info!(
378                                        ?client_id,
379                                        "[Auth] Client authenticated — waiting for StartSession to spawn ship"
380                                    );
381                                    continue;
382                                }
383                                Err(e) => {
384                                    tracing::warn!(
385                                        ?client_id,
386                                        error = ?e,
387                                        "Client failed authentication"
388                                    );
389                                }
390                            }
391                        }
392                        Ok(other) => {
393                            tracing::warn!(
394                                ?client_id,
395                                variant = ?std::mem::discriminant(&other),
396                                bytes = raw_data.len(),
397                                "Unauthenticated client sent non-Auth event — discarding"
398                            );
399                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
400                        }
401                        Err(e) => {
402                            tracing::warn!(
403                                ?client_id,
404                                error = ?e,
405                                bytes = raw_data.len(),
406                                "Failed to decode message from unauthenticated client"
407                            );
408                            metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
409                        }
410                    }
411                    continue;
412                };
413
414                // Check if it's a protocol-level event first (Ping/Pong/etc)
415                if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
416                    match protocol_event {
417                        NetworkEvent::Ping { tick: p_tick, .. } => {
418                            pong_responses.get_or_insert_with(Vec::new).push((
419                                client_id,
420                                p_tick,
421                                Instant::now(),
422                            ));
423                            metrics::counter!("aetheris_protocol_pings_received_total")
424                                .increment(1);
425                        }
426                        NetworkEvent::Auth { .. } => {
427                            tracing::debug!(?client_id, "Client re-authenticating (ignored)");
428                        }
429                        NetworkEvent::StressTest { count, rotate, .. } => {
430                            tracing::info!(
431                                ?client_id,
432                                count,
433                                rotate,
434                                "StressTest event received from authenticated client"
435                            );
436                            if can_run_playground_command(jti) {
437                                // M10105 — Safety cap to prevent server-side resource exhaustion.
438                                const MAX_STRESS: u16 = 1000;
439                                let capped_count = count.min(MAX_STRESS);
440                                if count > MAX_STRESS {
441                                    tracing::warn!(
442                                        ?client_id,
443                                        count,
444                                        capped_count,
445                                        "Stress test count capped at limit"
446                                    );
447                                }
448
449                                tracing::info!(
450                                    ?client_id,
451                                    count = capped_count,
452                                    rotate,
453                                    "Stress test command executed"
454                                );
455                                world.stress_test(capped_count, rotate);
456                            } else {
457                                tracing::warn!(?client_id, "Unauthorized StressTest attempt");
458                                metrics::counter!("aetheris_unprivileged_packets_total")
459                                    .increment(1);
460                            }
461                        }
462                        NetworkEvent::Spawn {
463                            entity_type,
464                            x,
465                            y,
466                            rot,
467                            ..
468                        } => {
469                            if can_run_playground_command(jti) {
470                                let network_id =
471                                    world.spawn_kind_for(entity_type, x, y, rot, client_id);
472
473                                tracing::info!(
474                                    ?client_id,
475                                    entity_type,
476                                    new_entity_id = network_id.0,
477                                    "[Spawn] Playground entity spawned"
478                                );
479                            } else {
480                                tracing::warn!(?client_id, "Unauthorized Spawn attempt");
481                                metrics::counter!("aetheris_unprivileged_packets_total")
482                                    .increment(1);
483                            }
484                        }
485                        NetworkEvent::StartSession { .. } => {
486                            if let Some((_, ship_id)) =
487                                self.authenticated_clients.get_mut(&client_id)
488                            {
489                                let network_id = if let Some(nid) = ship_id {
490                                    tracing::info!(
491                                        ?client_id,
492                                        ?nid,
493                                        "Reusing existing session ship"
494                                    );
495                                    *nid
496                                } else {
497                                    let nid = world.spawn_session_ship(1, 0.0, 0.0, 0.0, client_id);
498                                    *ship_id = Some(nid);
499                                    nid
500                                };
501
502                                world.queue_reliable_event(
503                                    Some(client_id),
504                                    aetheris_protocol::events::GameEvent::Possession { network_id },
505                                );
506
507                                // Record server-side auth→possession latency (A-08 profiling).
508                                if let Some(auth_ts) = self.auth_timestamps.remove(&client_id) {
509                                    metrics::histogram!("aetheris_session_start_latency_seconds")
510                                        .record(auth_ts.elapsed().as_secs_f64());
511                                }
512
513                                tracing::info!(
514                                    ?client_id,
515                                    network_id = network_id.0,
516                                    "[StartSession] Session ship assigned (spawned or reused) — Possession event queued"
517                                );
518                            }
519                        }
520                        NetworkEvent::ClearWorld { .. } => {
521                            if can_run_playground_command(jti) {
522                                tracing::info!(?client_id, "ClearWorld command executed");
523                                world.clear_world();
524                                // Reset the client's entity-ID tracking so that a subsequent
525                                // StartSession can spawn a new session ship.  Without this,
526                                // the "already_has_ship" guard blocks the next StartSession
527                                // even though all entities were just despawned.
528                                if let Some((_, ship_id)) =
529                                    self.authenticated_clients.get_mut(&client_id)
530                                {
531                                    *ship_id = None;
532                                }
533                                // Queue a reliable ClearWorld ack to send after this block.
534                                // EnteredSpan is !Send so we cannot .await inside this scope.
535                                // The ack arrives at the client AFTER stale in-flight datagrams,
536                                // guaranteeing a full entity flush (eliminates partial-clear race).
537                                clear_ack_targets.push(client_id);
538                            } else {
539                                tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
540                                metrics::counter!("aetheris_unprivileged_packets_total")
541                                    .increment(1);
542                            }
543                        }
544                        NetworkEvent::RequestSystemManifest { .. } => {
545                            let jti = if let Some((jti, _)) =
546                                self.authenticated_clients.get(&client_id)
547                            {
548                                jti
549                            } else {
550                                ""
551                            };
552
553                            let manifest = self.get_filtered_manifest(jti);
554                            world.queue_reliable_event(
555                                Some(client_id),
556                                aetheris_protocol::events::GameEvent::SystemManifest { manifest },
557                            );
558                        }
559                        NetworkEvent::ReplicationBatch { events, .. } => {
560                            for event in events {
561                                updates.push((
562                                    client_id,
563                                    aetheris_protocol::events::ComponentUpdate {
564                                        network_id: event.network_id,
565                                        component_kind: event.component_kind,
566                                        payload: event.payload,
567                                        tick: event.tick,
568                                    },
569                                ));
570                            }
571                        }
572                        _ => {
573                            tracing::trace!(?protocol_event, "Protocol event");
574                        }
575                    }
576                } else {
577                    // If it's not a protocol event, try to decode it as a game update
578                    match encoder.decode(&raw_data) {
579                        Ok(update) => updates.push((client_id, update)),
580                        Err(e) => {
581                            metrics::counter!("aetheris_decode_errors_total").increment(1);
582                            error!(
583                                error = ?e,
584                                size = raw_data.len(),
585                                "Failed to decode update (not a protocol event)"
586                            );
587                        }
588                    }
589                }
590            }
591            world.apply_updates(&updates);
592            self.reassembler.prune();
593        }
594        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
595            .record(t2.elapsed().as_secs_f64());
596
597        // Send ClearWorld acks (reliable) for any ClearWorld commands processed this tick.
598        // Sent after the EnteredSpan is dropped, since EnteredSpan is !Send.
599        // The reliable delivery guarantees the client sees this AFTER any stale in-flight
600        // unreliable datagrams, closing the partial-clear race condition.
601        for target in clear_ack_targets {
602            let ack = NetworkEvent::ClearWorld { client_id: target };
603            #[allow(clippy::collapsible_if)]
604            if let Ok(data) = encoder.encode_event(&ack) {
605                if let Err(e) = transport.send_reliable(target, &data).await {
606                    tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
607                }
608            }
609        }
610
611        // Send Pongs for all collected Pings.
612        // Use unreliable (datagram) so the reply travels the same path as the
613        // incoming Ping, and clients that only read datagrams can receive it.
614        if let Some(pongs) = pong_responses {
615            for (client_id, p_tick, received_at) in pongs {
616                let pong_event = NetworkEvent::Pong { tick: p_tick };
617                if let Ok(data) = encoder.encode_event(&pong_event) {
618                    // Measure server-side Pong dispatch time (encode + send).
619                    // This is NOT the full network RTT, but it captures the
620                    // server processing overhead between Ping receipt and Pong send.
621                    let dispatch_start = Instant::now();
622                    match transport.send_unreliable(client_id, &data).await {
623                        Ok(()) => {
624                            let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
625                            let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
626                            metrics::histogram!("aetheris_server_pong_dispatch_ms")
627                                .record(dispatch_ms);
628                            metrics::histogram!("aetheris_server_ping_hold_ms")
629                                .record(server_hold_ms);
630                        }
631                        Err(e) => {
632                            error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
633                        }
634                    }
635                }
636            }
637        }
638
639        // Stage 3: Simulate
640        let t3 = Instant::now();
641        {
642            let _span = debug_span!("stage3_simulate").entered();
643            // Simulation logic (physics, AI, game rules) happens here.
644            world.simulate();
645        }
646        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
647            .record(t3.elapsed().as_secs_f64());
648
649        // Stage 4: Extract
650        let t4 = Instant::now();
651        let (deltas, reliable_events) = {
652            let ds = world.extract_deltas();
653
654            // VS-07 §3.3: Golden File Recording
655            if let Some(limit) = self.recording_ticks.filter(|&l| tick < l) {
656                let hash = world.state_hash();
657                self.golden_hashes.push(hash);
658
659                if tick + 1 == limit {
660                    tracing::info!(
661                        limit,
662                        "Golden recording complete. Saving to golden_600ticks.bin"
663                    );
664                    let data: Vec<u8> = self
665                        .golden_hashes
666                        .iter()
667                        .flat_map(|h| h.to_le_bytes())
668                        .collect();
669                    if let Err(e) = std::fs::write("golden_600ticks.bin", data) {
670                        tracing::error!(error = ?e, "Failed to write golden file");
671                    } else {
672                        tracing::info!("Successfully saved golden_600ticks.bin");
673                        std::process::exit(0);
674                    }
675                }
676            }
677            let rs = world.extract_reliable_events();
678            (ds, rs)
679        };
680        // Reset ECS change-detection *after* extraction so simulate()'s mutations are visible.
681        world.post_extract();
682        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
683            .record(t4.elapsed().as_secs_f64());
684
685        // Stage 5: Encode & Send
686        let t5 = Instant::now();
687
688        // Stage 5.1: Send Reliable Events
689        for (target, wire_event) in reliable_events {
690            // Broadcast reliably to all authenticated clients if target is None
691            let targets: Vec<_> = if let Some(id) = target {
692                vec![id]
693            } else {
694                self.authenticated_clients.keys().copied().collect()
695            };
696
697            for id in targets {
698                let network_event = wire_event.clone().into_network_event(id);
699                match encoder.encode_event(&network_event) {
700                    Ok(data) => {
701                        if let Some(tx) = &self.outbound_tx {
702                            let _ = tx
703                                .send(OutboundMessage::Reliable {
704                                    client_id: id,
705                                    data,
706                                })
707                                .await;
708                        }
709                    }
710                    Err(e) => {
711                        error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
712                    }
713                }
714            }
715        }
716
717        if !deltas.is_empty() {
718            let mut broadcast_count: u64 = 0;
719
720            let stage_span = debug_span!("stage5_send", count = deltas.len());
721            let _guard = stage_span.enter();
722
723            // A-01: Packet Batching (Phase 1 Optimization)
724            // Group all deltas by their target clients to avoid N*M packet explosion.
725            let mut client_batches: HashMap<
726                aetheris_protocol::types::ClientId,
727                Vec<aetheris_protocol::events::ReplicationEvent>,
728            > = HashMap::with_capacity(self.authenticated_clients.len());
729
730            for delta in deltas {
731                let targets =
732                    Self::get_delta_targets(world, &self.authenticated_clients, delta.network_id);
733
734                match targets {
735                    DeltaTargets::Broadcast => {
736                        // Global broadcast (to all authenticated clients)
737                        for &client_id in self.authenticated_clients.keys() {
738                            client_batches
739                                .entry(client_id)
740                                .or_default()
741                                .push(delta.clone());
742                        }
743                    }
744                    DeltaTargets::Recipients(recipients) => {
745                        // Targeted multicast (AoI / Room filtered)
746                        for target in recipients {
747                            client_batches
748                                .entry(target)
749                                .or_default()
750                                .push(delta.clone());
751                        }
752                    }
753                    DeltaTargets::NoRecipients => {}
754                }
755            }
756
757            let max_size = encoder.max_encoded_size();
758            thread_local! {
759                static SCRATCH_BUFFER: std::cell::RefCell<Vec<u8>> = const { std::cell::RefCell::new(Vec::new()) };
760            }
761
762            // A-04: Parallel Stage 5 Encode
763            // CPU-intensive serialization is offloaded to a dedicated Rayon pool.
764            // We use block_in_place to inform Tokio that the current thread is performing CPU-heavy work.
765            use rayon::prelude::{IntoParallelIterator, ParallelIterator};
766
767            let batches_to_encode: Vec<_> = client_batches.into_iter().collect();
768
769            let encoded_results = tokio::task::block_in_place(|| {
770                self.encode_pool.install(|| {
771                    batches_to_encode
772                        .into_par_iter()
773                        .map(|(client_id, events)| {
774                            let batch_event =
775                                aetheris_protocol::events::NetworkEvent::ReplicationBatch {
776                                    client_id,
777                                    events,
778                                };
779                            // SCRATCH_BUFFER optimization (M10105):
780                            // Uses worker-local memory to avoid allocations during serialization.
781                            // Only a single final allocation (to_vec) is performed to return data to main thread.
782                            SCRATCH_BUFFER.with(|buf| {
783                                let mut b = buf.borrow_mut();
784                                if b.len() < max_size {
785                                    b.resize(max_size, 0);
786                                }
787                                match encoder.encode_event_into(&batch_event, &mut b) {
788                                    Ok(size) => (client_id, Ok(b[..size].to_vec())),
789                                    Err(
790                                        aetheris_protocol::error::EncodeError::BufferOverflow {
791                                            ..
792                                        },
793                                    ) => {
794                                        // M10105 — Reliable fallback for large batches that exceed scratch buffer.
795                                        // We use the allocating encode_event() here as a safety valve.
796                                        match encoder.encode_event(&batch_event) {
797                                            Ok(data) => (client_id, Ok(data)),
798                                            Err(e) => (client_id, Err(e)),
799                                        }
800                                    }
801                                    Err(e) => (client_id, Err(e)),
802                                }
803                            })
804                        })
805                        .collect::<Vec<_>>()
806                })
807            });
808
809            for (client_id, result) in encoded_results {
810                match result {
811                    Ok(data) => {
812                        let targets = DeltaTargets::Recipients(vec![client_id]);
813                        if data.len() > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE {
814                            match self
815                                .fragment_and_send(&data, data.len(), &targets, encoder)
816                                .await
817                            {
818                                Ok(count) => broadcast_count += count,
819                                Err(e) => {
820                                    error!(error = ?e, ?client_id, "Failed to fragment large batch");
821                                }
822                            }
823                        } else if let Some(tx) = &self.outbound_tx {
824                            let _ = tx
825                                .send(OutboundMessage::Unreliable { client_id, data })
826                                .await;
827                            broadcast_count += 1;
828                        }
829                    }
830                    Err(e) => {
831                        error!(error = ?e, ?client_id, "Failed to encode batch");
832                    }
833                }
834            }
835
836            metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
837            metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
838        }
839        metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
840            .record(t5.elapsed().as_secs_f64());
841
842        metrics::histogram!("aetheris_tick_duration_seconds")
843            .record(tick_start.elapsed().as_secs_f64());
844    }
845
846    fn get_delta_targets(
847        world: &dyn WorldState,
848        clients: &HashMap<ClientId, (String, Option<NetworkId>)>,
849        entity_id: NetworkId,
850    ) -> DeltaTargets {
851        if let Some(room_id) = world.get_entity_room(entity_id) {
852            let mut recipients = Vec::new();
853            for &client_id in clients.keys() {
854                if world.get_client_room(client_id) == Some(room_id) {
855                    recipients.push(client_id);
856                }
857            }
858            if recipients.is_empty() {
859                DeltaTargets::NoRecipients
860            } else {
861                DeltaTargets::Recipients(recipients)
862            }
863        } else {
864            DeltaTargets::Broadcast
865        }
866    }
867
868    async fn fragment_and_send(
869        &mut self,
870        data: &[u8],
871        len: usize,
872        targets: &DeltaTargets,
873        encoder: &dyn Encoder,
874    ) -> Result<u64, EncodeError> {
875        let Some(tx) = &self.outbound_tx else {
876            return Ok(0);
877        };
878        let message_id = self.next_message_id;
879        self.next_message_id = self.next_message_id.wrapping_add(1);
880
881        let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
882        let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
883
884        let Ok(total_fragments) = u16::try_from(chunks.len()) else {
885            error!(
886                message_id,
887                chunks = chunks.len(),
888                "Too many fragments required for message; dropping payload"
889            );
890            return Err(EncodeError::Io(std::io::Error::new(
891                std::io::ErrorKind::InvalidData,
892                "Too many fragments",
893            )));
894        };
895
896        let mut sent_count = 0;
897        for (i, chunk) in chunks.into_iter().enumerate() {
898            let Ok(fragment_index) = u16::try_from(i) else {
899                error!(message_id, index = i, "Fragment index overflow; stopping");
900                break;
901            };
902
903            let fragment = FragmentedEvent {
904                message_id,
905                fragment_index,
906                total_fragments,
907                payload: chunk.to_vec(),
908            };
909            let fragment_event = NetworkEvent::Fragment {
910                client_id: ClientId(0),
911                fragment,
912            };
913
914            match encoder.encode_event(&fragment_event) {
915                Ok(encoded_fragment) => match targets {
916                    DeltaTargets::Broadcast => {
917                        let _ = tx
918                            .send(OutboundMessage::BroadcastUnreliable {
919                                data: encoded_fragment,
920                            })
921                            .await;
922                        sent_count += 1;
923                    }
924                    DeltaTargets::Recipients(recipients) => {
925                        for &target in recipients {
926                            let _ = tx
927                                .send(OutboundMessage::Unreliable {
928                                    client_id: target,
929                                    data: encoded_fragment.clone(),
930                                })
931                                .await;
932                            sent_count += 1;
933                        }
934                    }
935                    DeltaTargets::NoRecipients => {}
936                },
937                Err(e) => {
938                    error!(error = ?e, "Failed to encode fragment event");
939                }
940            }
941        }
942
943        Ok(sent_count)
944    }
945
946    fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
947        let mut manifest = BTreeMap::new();
948        manifest.insert(
949            "version_server".to_string(),
950            env!("CARGO_PKG_VERSION").to_string(),
951        );
952        manifest.insert(
953            "version_protocol".to_string(),
954            aetheris_protocol::VERSION.to_string(),
955        );
956
957        if can_run_playground_command(jti) {
958            manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
959            manifest.insert(
960                "clients_active".to_string(),
961                self.authenticated_clients.len().to_string(),
962            );
963        }
964        manifest
965    }
966}
967
968/// Validates if a session (identified by its JTI) is authorized to run destructive playground commands.
969///
970/// In Phase 1, this uses a simplified check against the 'admin' JTI used in development.
971/// In Phase 3, this will be tied to the account's permission level.
972fn can_run_playground_command(jti: &str) -> bool {
973    // Current dev credential in Aetheris Playground always generates jti="admin"
974    // Fail closed: AETHERIS_ENV must be explicitly set to "dev"; absence is not treated as dev.
975    jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
976}