1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use aetheris_protocol::error::EncodeError;
9use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
10use aetheris_protocol::reassembler::Reassembler;
11use aetheris_protocol::traits::{Encoder, PlatformTransport, WorldState};
12use aetheris_protocol::types::{ClientId, NetworkId};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17pub enum OutboundMessage {
19 Unreliable { client_id: ClientId, data: Vec<u8> },
20 Reliable { client_id: ClientId, data: Vec<u8> },
21 BroadcastUnreliable { data: Vec<u8> },
22}
23
24#[derive(Debug, Clone)]
25pub enum DeltaTargets {
26 Broadcast,
27 Recipients(Vec<ClientId>),
28 NoRecipients,
29}
30
31#[derive(Debug)]
33pub struct TickScheduler {
34 tick_rate: u64,
35 current_tick: u64,
36 auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
37
38 authenticated_clients: HashMap<ClientId, (crate::auth::VerifiedSession, Option<NetworkId>)>,
40 auth_timestamps: HashMap<ClientId, Instant>,
42 reassembler: Reassembler,
43 next_message_id: u32,
44 encode_pool: Arc<rayon::ThreadPool>,
45 outbound_tx: Option<mpsc::Sender<OutboundMessage>>,
46 recording_ticks: Option<u64>,
47 golden_hashes: Vec<u64>,
48 spawn_at_zero: bool,
51}
52
53impl TickScheduler {
54 #[must_use]
56 pub fn new(
57 tick_rate: u64,
58 auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
59 encode_pool: Arc<rayon::ThreadPool>,
60 ) -> Self {
61 Self {
62 tick_rate,
63 current_tick: 0,
64 auth_service,
65 authenticated_clients: HashMap::new(),
66 auth_timestamps: HashMap::new(),
67 reassembler: Reassembler::new(),
68 next_message_id: 1,
69 encode_pool,
70 outbound_tx: None,
71 recording_ticks: std::env::var("AETHERIS_RECORD_GOLDEN")
72 .ok()
73 .and_then(|v| v.parse().ok()),
74 golden_hashes: Vec::new(),
75 spawn_at_zero: std::env::var("AETHERIS_RECORD_GOLDEN").is_ok(),
76 }
77 }
78
79 #[must_use]
84 pub fn with_spawn_at_zero(mut self, v: bool) -> Self {
85 self.spawn_at_zero = v;
86 self
87 }
88
89 pub fn set_outbound_tx(&mut self, tx: tokio::sync::mpsc::Sender<OutboundMessage>) {
91 self.outbound_tx = Some(tx);
92 }
93
94 pub async fn run(
96 &mut self,
97 transport: Box<dyn PlatformTransport>,
98 mut world: Box<dyn WorldState>,
99 encoder: Box<dyn Encoder>,
100 mut shutdown: broadcast::Receiver<()>,
101 ) {
102 let (tx, mut rx) = mpsc::channel(2048);
103 self.outbound_tx = Some(tx.clone());
104
105 let transport = Arc::new(RwLock::new(transport));
106 let transport_clone = transport.clone();
107
108 let mut outbound_shutdown = shutdown.resubscribe();
109 tokio::spawn(async move {
110 loop {
111 tokio::select! {
112 msg = rx.recv() => {
113 let Some(msg) = msg else { break; };
114 let transport = transport_clone.read().await;
115 match msg {
116 OutboundMessage::Unreliable { client_id, data } => {
117 if let Err(e) = transport.send_unreliable(client_id, &data).await {
118 error!(error = ?e, ?client_id, "Outbound task failed to send unreliable message");
119 }
120 }
121 OutboundMessage::Reliable { client_id, data } => {
122 if let Err(e) = transport.send_reliable(client_id, &data).await {
123 error!(error = ?e, ?client_id, "Outbound task failed to send reliable message");
124 }
125 }
126 OutboundMessage::BroadcastUnreliable { data } => {
127 if let Err(e) = transport.broadcast_unreliable(&data).await {
128 error!(error = ?e, "Outbound task failed to broadcast unreliable message");
129 }
130 }
131 }
132 }
133 _ = outbound_shutdown.recv() => {
134 break;
135 }
136 }
137 }
138 });
139
140 #[allow(clippy::cast_precision_loss)]
141 let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
142 let mut interval = interval(tick_duration);
143 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
144
145 let mut last_tick_wall = Instant::now();
146
147 loop {
148 tokio::select! {
149 _ = interval.tick() => {
150 let tick_num = self.current_tick;
151 let start = Instant::now();
152
153 let wall_elapsed = start.duration_since(last_tick_wall);
155 if wall_elapsed.as_secs_f64() > 0.0 {
156 metrics::gauge!("aetheris_actual_tick_rate_hz")
157 .set(1.0 / wall_elapsed.as_secs_f64());
158 }
159 last_tick_wall = start;
160
161 self.tick_step(
162 &transport,
163 world.as_mut(),
164 encoder.as_ref(),
165 )
166 .instrument(info_span!("tick", tick = tick_num))
167 .await;
168 let elapsed = start.elapsed();
169
170 metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
171 }
172 _ = shutdown.recv() => {
173 tracing::info!("Server shutting down gracefully");
174 break;
175 }
176 }
177 }
178 }
179
180 #[allow(clippy::too_many_lines)]
182 pub async fn tick_step(
183 &mut self,
184 transport_lock: &RwLock<Box<dyn PlatformTransport>>,
185 world: &mut dyn WorldState,
186 encoder: &dyn Encoder,
187 ) {
188 let tick_start = Instant::now();
189 let tick = self.current_tick;
190 self.current_tick += 1;
191
192 let mut transport = transport_lock.write().await;
193 world.advance_tick();
199
200 if tick == 0 && self.spawn_at_zero {
201 tracing::info!("Recording mode: Spawning 100 entities for determinism test");
202 world.stress_test(100, true);
203 }
204
205 let t1 = Instant::now();
207 let events = match transport
208 .poll_events()
209 .instrument(debug_span!("stage1_poll"))
210 .await
211 {
212 Ok(e) => e,
213 Err(e) => {
214 error!(error = ?e, "Fatal transport error during poll; skipping tick");
215 return;
216 }
217 };
218 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
219 .record(t1.elapsed().as_secs_f64());
220
221 let inbound_count: u64 = events
222 .iter()
223 .filter(|e| {
224 matches!(
225 e,
226 NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
227 )
228 })
229 .count() as u64;
230 metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
231
232 let mut to_disconnect = Vec::new();
233
234 if tick.is_multiple_of(60) {
236 let mut to_remove = Vec::new();
237 for (&client_id, (session, _)) in &self.authenticated_clients {
238 if !self
239 .auth_service
240 .is_session_authorized(&session.jti, Some(tick))
241 {
242 tracing::warn!(?client_id, "Session invalidated during periodic check");
243 to_remove.push(client_id);
244 }
245 }
246 for client_id in to_remove {
247 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id) {
248 let _ = world.despawn_networked(nid);
249 }
250 self.auth_timestamps.remove(&client_id);
251 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
252
253 to_disconnect.push(client_id);
255 }
256 }
257
258 let t2 = Instant::now();
260 let mut pong_responses = None;
261 let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
262 if !events.is_empty() {
263 let _span = debug_span!("stage2_apply", count = events.len()).entered();
264 let mut updates = Vec::with_capacity(events.len());
265 for event in events {
266 let (client_id, raw_data, is_message) = match event {
268 NetworkEvent::Fragment {
269 client_id,
270 fragment,
271 } => {
272 if let Some(data) = self.reassembler.ingest(client_id, fragment) {
273 (client_id, data, true)
274 } else {
275 continue;
276 }
277 }
278 NetworkEvent::UnreliableMessage { data, client_id }
279 | NetworkEvent::ReliableMessage { data, client_id } => {
280 if let Ok(NetworkEvent::Fragment { fragment, .. }) =
282 encoder.decode_event(&data)
283 {
284 if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
285 {
286 (client_id, reassembled, true)
287 } else {
288 continue;
289 }
290 } else {
291 (client_id, data, true)
292 }
293 }
294 NetworkEvent::ClientConnected(id) => {
295 metrics::gauge!("aetheris_connected_clients").increment(1.0);
296 tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
297 (id, Vec::new(), false)
298 }
299 NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
300 metrics::gauge!("aetheris_connected_clients").decrement(1.0);
301 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
302 let _ = world.despawn_networked(nid);
303 }
304 self.auth_timestamps.remove(&id);
305 tracing::info!(client_id = ?id, "Client disconnected");
306 (id, Vec::new(), false)
307 }
308 NetworkEvent::SessionClosed(id) => {
309 metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
310 .increment(1);
311 tracing::warn!(client_id = ?id, "WebTransport session closed");
312 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
313 let _ = world.despawn_networked(nid);
314 }
315 self.auth_timestamps.remove(&id);
316 (id, Vec::new(), false)
317 }
318 NetworkEvent::StreamReset(id) => {
319 metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
320 .increment(1);
321 tracing::error!(client_id = ?id, "WebTransport stream reset");
322 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
323 let _ = world.despawn_networked(nid);
324 }
325 self.auth_timestamps.remove(&id);
326 (id, Vec::new(), false)
327 }
328 NetworkEvent::Ping { client_id, tick } => {
329 if self.authenticated_clients.contains_key(&client_id) {
330 pong_responses.get_or_insert_with(Vec::new).push((
331 client_id,
332 tick,
333 Instant::now(),
334 ));
335 metrics::counter!("aetheris_protocol_pings_received_total")
336 .increment(1);
337 }
338 (client_id, Vec::new(), false)
339 }
340 NetworkEvent::ClearWorld { client_id, .. }
341 | NetworkEvent::StartSession { client_id }
342 | NetworkEvent::RequestWorkspaceManifest { client_id }
343 | NetworkEvent::PlatformEvent { client_id, .. }
344 | NetworkEvent::StressTest { client_id, .. }
345 | NetworkEvent::ReplicationBatch { client_id, .. }
346 | NetworkEvent::EntitySpawned { client_id, .. }
347 | NetworkEvent::EntityDespawned { client_id, .. }
348 | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
349 NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
350 (aetheris_protocol::types::ClientId(0), Vec::new(), false)
351 }
352 };
353
354 if !is_message {
355 continue;
356 }
357
358 let session = if let Some((session, _)) = self.authenticated_clients.get(&client_id)
360 {
361 if !self
363 .auth_service
364 .is_session_authorized(&session.jti, Some(tick))
365 {
366 tracing::warn!(?client_id, "Session revoked; dropping client");
367 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id)
368 {
369 let _ = world.despawn_networked(nid);
370 }
371 self.auth_timestamps.remove(&client_id);
372 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
373 continue;
374 }
375 session
376 } else {
377 match encoder.decode_event(&raw_data) {
379 Ok(NetworkEvent::Auth { session_token }) => {
380 tracing::info!(?client_id, "Auth message received");
381 match self.auth_service.verify_session(&session_token, Some(tick)) {
382 Ok(session) => {
383 tracing::info!(
384 ?client_id,
385 player_id = %session.player_id,
386 "Client authenticated successfully"
387 );
388
389 let old_clients: Vec<ClientId> = self
392 .authenticated_clients
393 .iter()
394 .filter(|(id, (s, _))| {
395 s.player_id == session.player_id && **id != client_id
396 })
397 .map(|(id, _)| *id)
398 .collect();
399
400 for old_id in old_clients {
401 tracing::info!(
402 ?old_id,
403 new_client_id = ?client_id,
404 "Kicking ghost session for same player_id"
405 );
406 if let Some((_, Some(nid))) =
407 self.authenticated_clients.remove(&old_id)
408 {
409 let _ = world.despawn_networked(nid);
410 }
411 self.auth_timestamps.remove(&old_id);
412
413 to_disconnect.push(old_id);
415 }
416
417 self.authenticated_clients
418 .insert(client_id, (session, None));
419
420 self.auth_timestamps.insert(client_id, Instant::now());
423
424 tracing::info!(
425 ?client_id,
426 "[Auth] Client authenticated — waiting for StartSession to spawn agent"
427 );
428 continue;
429 }
430 Err(e) => {
431 tracing::warn!(
432 ?client_id,
433 error = ?e,
434 "Client failed authentication"
435 );
436 }
437 }
438 }
439 Ok(other) => {
440 tracing::warn!(
441 ?client_id,
442 variant = ?std::mem::discriminant(&other),
443 bytes = raw_data.len(),
444 "Unauthenticated client sent non-Auth event — discarding"
445 );
446 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
447 }
448 Err(e) => {
449 tracing::warn!(
450 ?client_id,
451 error = ?e,
452 bytes = raw_data.len(),
453 "Failed to decode message from unauthenticated client"
454 );
455 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
456 }
457 }
458 continue;
459 };
460
461 if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
463 match protocol_event {
464 NetworkEvent::Ping { tick: p_tick, .. } => {
465 pong_responses.get_or_insert_with(Vec::new).push((
466 client_id,
467 p_tick,
468 Instant::now(),
469 ));
470 metrics::counter!("aetheris_protocol_pings_received_total")
471 .increment(1);
472 }
473 NetworkEvent::Auth { .. } => {
474 tracing::debug!(?client_id, "Client re-authenticating (ignored)");
475 }
476 NetworkEvent::StressTest { count, rotate, .. } => {
477 tracing::info!(
478 ?client_id,
479 count,
480 rotate,
481 "StressTest event received from authenticated client"
482 );
483 if can_run_playground_command(&session.jti) {
484 const MAX_STRESS: u16 = 1000;
486 let capped_count = count.min(MAX_STRESS);
487 if count > MAX_STRESS {
488 tracing::warn!(
489 ?client_id,
490 count,
491 capped_count,
492 "Stress test count capped at limit"
493 );
494 }
495
496 tracing::info!(
497 ?client_id,
498 count = capped_count,
499 rotate,
500 "Stress test command executed"
501 );
502 world.stress_test(capped_count, rotate);
503 } else {
504 tracing::warn!(?client_id, "Unauthorized StressTest attempt");
505 metrics::counter!("aetheris_unprivileged_packets_total")
506 .increment(1);
507 }
508 }
509 NetworkEvent::Spawn {
510 entity_type,
511 x,
512 y,
513 rot,
514 ..
515 } => {
516 if can_run_playground_command(&session.jti) {
517 let network_id =
518 world.spawn_kind_for(entity_type, x, y, rot, client_id);
519
520 tracing::info!(
521 ?client_id,
522 entity_type,
523 new_entity_id = network_id.0,
524 "[Spawn] Playground entity spawned"
525 );
526 } else {
527 tracing::warn!(?client_id, "Unauthorized Spawn attempt");
528 metrics::counter!("aetheris_unprivileged_packets_total")
529 .increment(1);
530 }
531 }
532 NetworkEvent::StartSession { .. } => {
533 if let Some((_, agent_id)) =
534 self.authenticated_clients.get_mut(&client_id)
535 {
536 let network_id = if let Some(nid) = agent_id {
537 tracing::info!(
538 ?client_id,
539 ?nid,
540 "Reusing existing session agent"
541 );
542 *nid
543 } else {
544 let nid =
545 world.spawn_session_agent(1, 0.0, 0.0, 0.0, client_id);
546 *agent_id = Some(nid);
547 nid
548 };
549
550 world.queue_reliable_event(
551 Some(client_id),
552 aetheris_protocol::events::PlatformEvent::Possession {
553 network_id,
554 },
555 );
556
557 if let Some(auth_ts) = self.auth_timestamps.remove(&client_id) {
559 metrics::histogram!("aetheris_session_start_latency_seconds")
560 .record(auth_ts.elapsed().as_secs_f64());
561 }
562
563 tracing::info!(
564 ?client_id,
565 network_id = network_id.0,
566 "[StartSession] Session agent assigned (spawned or reused) — Possession event queued"
567 );
568 }
569 }
570 NetworkEvent::ClearWorld { .. } => {
571 if can_run_playground_command(&session.jti) {
572 tracing::info!(?client_id, "ClearWorld command executed");
573 world.clear_world();
574 if let Some((_, agent_id)) =
579 self.authenticated_clients.get_mut(&client_id)
580 {
581 *agent_id = None;
582 }
583 clear_ack_targets.push(client_id);
588 } else {
589 tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
590 metrics::counter!("aetheris_unprivileged_packets_total")
591 .increment(1);
592 }
593 }
594 NetworkEvent::RequestWorkspaceManifest { .. } => {
595 let jti = if let Some((session, _)) =
596 self.authenticated_clients.get(&client_id)
597 {
598 &session.jti
599 } else {
600 ""
601 };
602
603 let manifest = self.get_filtered_manifest(jti);
604 world.queue_reliable_event(
605 Some(client_id),
606 aetheris_protocol::events::PlatformEvent::WorkspaceManifest {
607 manifest,
608 },
609 );
610 }
611 NetworkEvent::ReplicationBatch { events, .. } => {
612 for event in events {
613 updates.push((
614 client_id,
615 aetheris_protocol::events::ComponentUpdate {
616 network_id: event.network_id,
617 component_kind: event.component_kind,
618 payload: event.payload,
619 tick: event.tick,
620 },
621 ));
622 }
623 }
624 _ => {
625 tracing::trace!(?protocol_event, "Protocol event");
626 }
627 }
628 } else {
629 match encoder.decode(&raw_data) {
631 Ok(update) => updates.push((client_id, update)),
632 Err(e) => {
633 metrics::counter!("aetheris_decode_errors_total").increment(1);
634 error!(
635 error = ?e,
636 size = raw_data.len(),
637 "Failed to decode update (not a protocol event)"
638 );
639 }
640 }
641 }
642 }
643 world.apply_updates(&updates);
644 self.reassembler.prune();
645 }
646 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
647 .record(t2.elapsed().as_secs_f64());
648
649 for target in clear_ack_targets {
654 let ack = NetworkEvent::ClearWorld { client_id: target };
655 #[allow(clippy::collapsible_if)]
656 if let Ok(data) = encoder.encode_event(&ack) {
657 if let Err(e) = transport.send_reliable(target, &data).await {
658 tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
659 }
660 }
661 }
662
663 if let Some(pongs) = pong_responses {
667 for (client_id, p_tick, received_at) in pongs {
668 let pong_event = NetworkEvent::Pong { tick: p_tick };
669 if let Ok(data) = encoder.encode_event(&pong_event) {
670 let dispatch_start = Instant::now();
674 match transport.send_unreliable(client_id, &data).await {
675 Ok(()) => {
676 let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
677 let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
678 metrics::histogram!("aetheris_server_pong_dispatch_ms")
679 .record(dispatch_ms);
680 metrics::histogram!("aetheris_server_ping_hold_ms")
681 .record(server_hold_ms);
682 }
683 Err(e) => {
684 error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
685 }
686 }
687 }
688 }
689 }
690
691 let t3 = Instant::now();
693 {
694 let _span = debug_span!("stage3_simulate").entered();
695 world.simulate();
697 }
698 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
699 .record(t3.elapsed().as_secs_f64());
700
701 let t4 = Instant::now();
703 let (deltas, reliable_events) = {
704 let ds = world.extract_deltas();
705
706 if let Some(limit) = self.recording_ticks.filter(|&l| tick < l) {
708 let hash = world.state_hash();
709 self.golden_hashes.push(hash);
710
711 if tick + 1 == limit {
712 tracing::info!(
713 limit,
714 "Golden recording complete. Saving to golden_600ticks.bin"
715 );
716 let data: Vec<u8> = self
717 .golden_hashes
718 .iter()
719 .flat_map(|h| h.to_le_bytes())
720 .collect();
721 if let Err(e) = std::fs::write("golden_600ticks.bin", data) {
722 tracing::error!(error = ?e, "Failed to write golden file");
723 } else {
724 tracing::info!("Successfully saved golden_600ticks.bin");
725 std::process::exit(0);
726 }
727 }
728 }
729 let rs = world.extract_reliable_events();
730 (ds, rs)
731 };
732 world.post_extract();
734 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
735 .record(t4.elapsed().as_secs_f64());
736
737 let t5 = Instant::now();
739
740 for (target, wire_event) in reliable_events {
742 let targets: Vec<_> = if let Some(id) = target {
744 vec![id]
745 } else {
746 self.authenticated_clients.keys().copied().collect()
747 };
748
749 for id in targets {
750 tracing::info!(?id, event = ?wire_event, "Sending reliable event to client");
751 let network_event = wire_event.clone().into_network_event(id);
752 match encoder.encode_event(&network_event) {
753 Ok(data) => {
754 if let Some(tx) = &self.outbound_tx {
755 let _ = tx
756 .send(OutboundMessage::Reliable {
757 client_id: id,
758 data,
759 })
760 .await;
761 }
762 }
763 Err(e) => {
764 error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
765 }
766 }
767 }
768 }
769
770 if !deltas.is_empty() {
771 let mut broadcast_count: u64 = 0;
772
773 let stage_span = debug_span!("stage5_send", count = deltas.len());
774 let _guard = stage_span.enter();
775
776 let mut client_batches: HashMap<
779 aetheris_protocol::types::ClientId,
780 Vec<aetheris_protocol::events::ReplicationEvent>,
781 > = HashMap::with_capacity(self.authenticated_clients.len());
782
783 for delta in deltas {
784 let targets =
785 Self::get_delta_targets(world, &self.authenticated_clients, delta.network_id);
786
787 match targets {
788 DeltaTargets::Broadcast => {
789 for &client_id in self.authenticated_clients.keys() {
791 client_batches
792 .entry(client_id)
793 .or_default()
794 .push(delta.clone());
795 }
796 }
797 DeltaTargets::Recipients(recipients) => {
798 for target in recipients {
800 client_batches
801 .entry(target)
802 .or_default()
803 .push(delta.clone());
804 }
805 }
806 DeltaTargets::NoRecipients => {}
807 }
808 }
809
810 let max_size = encoder.max_encoded_size();
811 thread_local! {
812 static SCRATCH_BUFFER: std::cell::RefCell<Vec<u8>> = const { std::cell::RefCell::new(Vec::new()) };
813 }
814
815 use rayon::prelude::{IntoParallelIterator, ParallelIterator};
819
820 let batches_to_encode: Vec<_> = client_batches.into_iter().collect();
821
822 let encoded_results = tokio::task::block_in_place(|| {
823 self.encode_pool.install(|| {
824 batches_to_encode
825 .into_par_iter()
826 .map(|(client_id, events)| {
827 let batch_event =
828 aetheris_protocol::events::NetworkEvent::ReplicationBatch {
829 client_id,
830 events,
831 };
832 SCRATCH_BUFFER.with(|buf| {
836 let mut b = buf.borrow_mut();
837 if b.len() < max_size {
838 b.resize(max_size, 0);
839 }
840 match encoder.encode_event_into(&batch_event, &mut b) {
841 Ok(size) => (client_id, Ok(b[..size].to_vec())),
842 Err(
843 aetheris_protocol::error::EncodeError::BufferOverflow {
844 ..
845 },
846 ) => {
847 match encoder.encode_event(&batch_event) {
850 Ok(data) => (client_id, Ok(data)),
851 Err(e) => (client_id, Err(e)),
852 }
853 }
854 Err(e) => (client_id, Err(e)),
855 }
856 })
857 })
858 .collect::<Vec<_>>()
859 })
860 });
861
862 for (client_id, result) in encoded_results {
863 match result {
864 Ok(data) => {
865 let targets = DeltaTargets::Recipients(vec![client_id]);
866 if data.len() > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE {
867 match self
868 .fragment_and_send(&data, data.len(), &targets, encoder)
869 .await
870 {
871 Ok(count) => broadcast_count += count,
872 Err(e) => {
873 error!(error = ?e, ?client_id, "Failed to fragment large batch");
874 }
875 }
876 } else if let Some(tx) = &self.outbound_tx {
877 let _ = tx
878 .send(OutboundMessage::Unreliable { client_id, data })
879 .await;
880 broadcast_count += 1;
881 }
882 }
883 Err(e) => {
884 error!(error = ?e, ?client_id, "Failed to encode batch");
885 }
886 }
887 }
888
889 metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
890 metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
891 }
892 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
893 .record(t5.elapsed().as_secs_f64());
894
895 metrics::histogram!("aetheris_tick_duration_seconds")
896 .record(tick_start.elapsed().as_secs_f64());
897
898 drop(transport);
900 for id in to_disconnect {
901 let transport = transport_lock.read().await;
902 let _ = transport.disconnect(id).await;
903 }
904 }
905
906 fn get_delta_targets(
907 world: &dyn WorldState,
908 clients: &HashMap<ClientId, (crate::auth::VerifiedSession, Option<NetworkId>)>,
909 entity_id: NetworkId,
910 ) -> DeltaTargets {
911 if let Some(workspace_id) = world.get_entity_workspace(entity_id) {
912 let mut recipients = Vec::new();
913 for &client_id in clients.keys() {
914 if world.get_client_workspace(client_id) == Some(workspace_id) {
915 recipients.push(client_id);
916 }
917 }
918 if recipients.is_empty() {
919 DeltaTargets::NoRecipients
920 } else {
921 DeltaTargets::Recipients(recipients)
922 }
923 } else {
924 DeltaTargets::Broadcast
925 }
926 }
927
928 async fn fragment_and_send(
929 &mut self,
930 data: &[u8],
931 len: usize,
932 targets: &DeltaTargets,
933 encoder: &dyn Encoder,
934 ) -> Result<u64, EncodeError> {
935 let Some(tx) = &self.outbound_tx else {
936 return Ok(0);
937 };
938 let message_id = self.next_message_id;
939 self.next_message_id = self.next_message_id.wrapping_add(1);
940
941 let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
942 let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
943
944 let Ok(total_fragments) = u16::try_from(chunks.len()) else {
945 error!(
946 message_id,
947 chunks = chunks.len(),
948 "Too many fragments required for message; dropping payload"
949 );
950 return Err(EncodeError::Io(std::io::Error::new(
951 std::io::ErrorKind::InvalidData,
952 "Too many fragments",
953 )));
954 };
955
956 let mut sent_count = 0;
957 for (i, chunk) in chunks.into_iter().enumerate() {
958 let Ok(fragment_index) = u16::try_from(i) else {
959 error!(message_id, index = i, "Fragment index overflow; stopping");
960 break;
961 };
962
963 let fragment = FragmentedEvent {
964 message_id,
965 fragment_index,
966 total_fragments,
967 payload: chunk.to_vec(),
968 };
969 let fragment_event = NetworkEvent::Fragment {
970 client_id: ClientId(0),
971 fragment,
972 };
973
974 match encoder.encode_event(&fragment_event) {
975 Ok(encoded_fragment) => match targets {
976 DeltaTargets::Broadcast => {
977 let _ = tx
978 .send(OutboundMessage::BroadcastUnreliable {
979 data: encoded_fragment,
980 })
981 .await;
982 sent_count += 1;
983 }
984 DeltaTargets::Recipients(recipients) => {
985 for &target in recipients {
986 let _ = tx
987 .send(OutboundMessage::Unreliable {
988 client_id: target,
989 data: encoded_fragment.clone(),
990 })
991 .await;
992 sent_count += 1;
993 }
994 }
995 DeltaTargets::NoRecipients => {}
996 },
997 Err(e) => {
998 error!(error = ?e, "Failed to encode fragment event");
999 }
1000 }
1001 }
1002
1003 Ok(sent_count)
1004 }
1005
1006 fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
1007 let mut manifest = BTreeMap::new();
1008 manifest.insert(
1009 "version_server".to_string(),
1010 env!("CARGO_PKG_VERSION").to_string(),
1011 );
1012 manifest.insert(
1013 "version_protocol".to_string(),
1014 aetheris_protocol::VERSION.to_string(),
1015 );
1016
1017 if can_run_playground_command(jti) {
1018 manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
1019 manifest.insert(
1020 "clients_active".to_string(),
1021 self.authenticated_clients.len().to_string(),
1022 );
1023 }
1024 manifest
1025 }
1026}
1027
1028fn can_run_playground_command(jti: &str) -> bool {
1033 jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
1036}