1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use aetheris_protocol::error::EncodeError;
9use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
10use aetheris_protocol::reassembler::Reassembler;
11use aetheris_protocol::traits::{Encoder, GameTransport, WorldState};
12use aetheris_protocol::types::{ClientId, NetworkId};
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tokio::sync::mpsc;
16
17pub enum OutboundMessage {
19 Unreliable { client_id: ClientId, data: Vec<u8> },
20 Reliable { client_id: ClientId, data: Vec<u8> },
21 BroadcastUnreliable { data: Vec<u8> },
22}
23
24#[derive(Debug, Clone)]
25pub enum DeltaTargets {
26 Broadcast,
27 Recipients(Vec<ClientId>),
28 NoRecipients,
29}
30
31#[derive(Debug)]
33pub struct TickScheduler {
34 tick_rate: u64,
35 current_tick: u64,
36 auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
37
38 authenticated_clients: HashMap<ClientId, (String, Option<NetworkId>)>,
40 auth_timestamps: HashMap<ClientId, Instant>,
42 reassembler: Reassembler,
43 next_message_id: u32,
44 encode_pool: Arc<rayon::ThreadPool>,
45 outbound_tx: Option<mpsc::Sender<OutboundMessage>>,
46 recording_ticks: Option<u64>,
47 golden_hashes: Vec<u64>,
48 spawn_at_zero: bool,
51}
52
53impl TickScheduler {
54 #[must_use]
56 pub fn new(
57 tick_rate: u64,
58 auth_service: Arc<dyn crate::auth::AuthSessionVerifier>,
59 encode_pool: Arc<rayon::ThreadPool>,
60 ) -> Self {
61 Self {
62 tick_rate,
63 current_tick: 0,
64 auth_service,
65 authenticated_clients: HashMap::new(),
66 auth_timestamps: HashMap::new(),
67 reassembler: Reassembler::new(),
68 next_message_id: 1,
69 encode_pool,
70 outbound_tx: None,
71 recording_ticks: std::env::var("AETHERIS_RECORD_GOLDEN")
72 .ok()
73 .and_then(|v| v.parse().ok()),
74 golden_hashes: Vec::new(),
75 spawn_at_zero: std::env::var("AETHERIS_RECORD_GOLDEN").is_ok(),
76 }
77 }
78
79 #[must_use]
84 pub fn with_spawn_at_zero(mut self, v: bool) -> Self {
85 self.spawn_at_zero = v;
86 self
87 }
88
89 pub fn set_outbound_tx(&mut self, tx: tokio::sync::mpsc::Sender<OutboundMessage>) {
91 self.outbound_tx = Some(tx);
92 }
93
94 pub async fn run(
96 &mut self,
97 transport: Box<dyn GameTransport>,
98 mut world: Box<dyn WorldState>,
99 encoder: Box<dyn Encoder>,
100 mut shutdown: broadcast::Receiver<()>,
101 ) {
102 let (tx, mut rx) = mpsc::channel(2048);
103 self.outbound_tx = Some(tx.clone());
104
105 let transport = Arc::new(RwLock::new(transport));
106 let transport_clone = transport.clone();
107
108 let mut outbound_shutdown = shutdown.resubscribe();
109 tokio::spawn(async move {
110 loop {
111 tokio::select! {
112 msg = rx.recv() => {
113 let Some(msg) = msg else { break; };
114 let transport = transport_clone.read().await;
115 match msg {
116 OutboundMessage::Unreliable { client_id, data } => {
117 if let Err(e) = transport.send_unreliable(client_id, &data).await {
118 error!(error = ?e, ?client_id, "Outbound task failed to send unreliable message");
119 }
120 }
121 OutboundMessage::Reliable { client_id, data } => {
122 if let Err(e) = transport.send_reliable(client_id, &data).await {
123 error!(error = ?e, ?client_id, "Outbound task failed to send reliable message");
124 }
125 }
126 OutboundMessage::BroadcastUnreliable { data } => {
127 if let Err(e) = transport.broadcast_unreliable(&data).await {
128 error!(error = ?e, "Outbound task failed to broadcast unreliable message");
129 }
130 }
131 }
132 }
133 _ = outbound_shutdown.recv() => {
134 break;
135 }
136 }
137 }
138 });
139
140 #[allow(clippy::cast_precision_loss)]
141 let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
142 let mut interval = interval(tick_duration);
143 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
144
145 let mut last_tick_wall = Instant::now();
146
147 loop {
148 tokio::select! {
149 _ = interval.tick() => {
150 let tick_num = self.current_tick;
151 let start = Instant::now();
152
153 let wall_elapsed = start.duration_since(last_tick_wall);
155 if wall_elapsed.as_secs_f64() > 0.0 {
156 metrics::gauge!("aetheris_actual_tick_rate_hz")
157 .set(1.0 / wall_elapsed.as_secs_f64());
158 }
159 last_tick_wall = start;
160
161 self.tick_step(
162 &transport,
163 world.as_mut(),
164 encoder.as_ref(),
165 )
166 .instrument(info_span!("tick", tick = tick_num))
167 .await;
168 let elapsed = start.elapsed();
169
170 metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
171 }
172 _ = shutdown.recv() => {
173 tracing::info!("Server shutting down gracefully");
174 break;
175 }
176 }
177 }
178 }
179
180 #[allow(clippy::too_many_lines)]
182 pub async fn tick_step(
183 &mut self,
184 transport_lock: &RwLock<Box<dyn GameTransport>>,
185 world: &mut dyn WorldState,
186 encoder: &dyn Encoder,
187 ) {
188 let tick_start = Instant::now();
189 let tick = self.current_tick;
190 self.current_tick += 1;
191
192 let mut transport = transport_lock.write().await;
193 world.advance_tick();
199
200 if tick == 0 && self.spawn_at_zero {
201 tracing::info!("Recording mode: Spawning 100 entities for determinism test");
202 world.stress_test(100, true);
203 }
204
205 let t1 = Instant::now();
207 let events = match transport
208 .poll_events()
209 .instrument(debug_span!("stage1_poll"))
210 .await
211 {
212 Ok(e) => e,
213 Err(e) => {
214 error!(error = ?e, "Fatal transport error during poll; skipping tick");
215 return;
216 }
217 };
218 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
219 .record(t1.elapsed().as_secs_f64());
220
221 let inbound_count: u64 = events
222 .iter()
223 .filter(|e| {
224 matches!(
225 e,
226 NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
227 )
228 })
229 .count() as u64;
230 metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
231
232 if tick.is_multiple_of(60) {
234 let mut to_remove = Vec::new();
235 for (&client_id, (jti, _)) in &self.authenticated_clients {
236 if !self.auth_service.is_session_authorized(jti, Some(tick)) {
237 tracing::warn!(?client_id, "Session invalidated during periodic check");
238 to_remove.push(client_id);
239 }
240 }
241 for client_id in to_remove {
242 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id) {
243 let _ = world.despawn_networked(nid);
244 }
245 self.auth_timestamps.remove(&client_id);
246 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
247 }
248 }
249
250 let t2 = Instant::now();
252 let mut pong_responses = None;
253 let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
254 if !events.is_empty() {
255 let _span = debug_span!("stage2_apply", count = events.len()).entered();
256 let mut updates = Vec::with_capacity(events.len());
257 for event in events {
258 let (client_id, raw_data, is_message) = match event {
260 NetworkEvent::Fragment {
261 client_id,
262 fragment,
263 } => {
264 if let Some(data) = self.reassembler.ingest(client_id, fragment) {
265 (client_id, data, true)
266 } else {
267 continue;
268 }
269 }
270 NetworkEvent::UnreliableMessage { data, client_id }
271 | NetworkEvent::ReliableMessage { data, client_id } => {
272 if let Ok(NetworkEvent::Fragment { fragment, .. }) =
274 encoder.decode_event(&data)
275 {
276 if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
277 {
278 (client_id, reassembled, true)
279 } else {
280 continue;
281 }
282 } else {
283 (client_id, data, true)
284 }
285 }
286 NetworkEvent::ClientConnected(id) => {
287 metrics::gauge!("aetheris_connected_clients").increment(1.0);
288 tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
289 (id, Vec::new(), false)
290 }
291 NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
292 metrics::gauge!("aetheris_connected_clients").decrement(1.0);
293 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
294 let _ = world.despawn_networked(nid);
295 }
296 self.auth_timestamps.remove(&id);
297 tracing::info!(client_id = ?id, "Client disconnected");
298 (id, Vec::new(), false)
299 }
300 NetworkEvent::SessionClosed(id) => {
301 metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
302 .increment(1);
303 tracing::warn!(client_id = ?id, "WebTransport session closed");
304 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
305 let _ = world.despawn_networked(nid);
306 }
307 self.auth_timestamps.remove(&id);
308 (id, Vec::new(), false)
309 }
310 NetworkEvent::StreamReset(id) => {
311 metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
312 .increment(1);
313 tracing::error!(client_id = ?id, "WebTransport stream reset");
314 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&id) {
315 let _ = world.despawn_networked(nid);
316 }
317 self.auth_timestamps.remove(&id);
318 (id, Vec::new(), false)
319 }
320 NetworkEvent::Ping { client_id, tick } => {
321 if self.authenticated_clients.contains_key(&client_id) {
322 pong_responses.get_or_insert_with(Vec::new).push((
323 client_id,
324 tick,
325 Instant::now(),
326 ));
327 metrics::counter!("aetheris_protocol_pings_received_total")
328 .increment(1);
329 }
330 (client_id, Vec::new(), false)
331 }
332 NetworkEvent::ClearWorld { client_id, .. }
333 | NetworkEvent::StartSession { client_id }
334 | NetworkEvent::RequestSystemManifest { client_id }
335 | NetworkEvent::GameEvent { client_id, .. }
336 | NetworkEvent::StressTest { client_id, .. }
337 | NetworkEvent::ReplicationBatch { client_id, .. }
338 | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
339 NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
340 (aetheris_protocol::types::ClientId(0), Vec::new(), false)
341 }
342 };
343
344 if !is_message {
345 continue;
346 }
347
348 let jti = if let Some((jti, _)) = self.authenticated_clients.get(&client_id) {
350 if !self.auth_service.is_session_authorized(jti, Some(tick)) {
352 tracing::warn!(?client_id, "Session revoked; dropping client");
353 if let Some((_, Some(nid))) = self.authenticated_clients.remove(&client_id)
354 {
355 let _ = world.despawn_networked(nid);
356 }
357 self.auth_timestamps.remove(&client_id);
358 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
359 continue;
360 }
361 jti
362 } else {
363 match encoder.decode_event(&raw_data) {
365 Ok(NetworkEvent::Auth { session_token }) => {
366 tracing::info!(?client_id, "Auth message received");
367 match self.auth_service.verify_session(&session_token, Some(tick)) {
368 Ok(session) => {
369 tracing::info!(?client_id, "Client authenticated successfully");
370
371 self.authenticated_clients
372 .insert(client_id, (session.jti, None));
373 self.auth_timestamps.insert(client_id, Instant::now());
376
377 tracing::info!(
378 ?client_id,
379 "[Auth] Client authenticated — waiting for StartSession to spawn ship"
380 );
381 continue;
382 }
383 Err(e) => {
384 tracing::warn!(
385 ?client_id,
386 error = ?e,
387 "Client failed authentication"
388 );
389 }
390 }
391 }
392 Ok(other) => {
393 tracing::warn!(
394 ?client_id,
395 variant = ?std::mem::discriminant(&other),
396 bytes = raw_data.len(),
397 "Unauthenticated client sent non-Auth event — discarding"
398 );
399 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
400 }
401 Err(e) => {
402 tracing::warn!(
403 ?client_id,
404 error = ?e,
405 bytes = raw_data.len(),
406 "Failed to decode message from unauthenticated client"
407 );
408 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
409 }
410 }
411 continue;
412 };
413
414 if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
416 match protocol_event {
417 NetworkEvent::Ping { tick: p_tick, .. } => {
418 pong_responses.get_or_insert_with(Vec::new).push((
419 client_id,
420 p_tick,
421 Instant::now(),
422 ));
423 metrics::counter!("aetheris_protocol_pings_received_total")
424 .increment(1);
425 }
426 NetworkEvent::Auth { .. } => {
427 tracing::debug!(?client_id, "Client re-authenticating (ignored)");
428 }
429 NetworkEvent::StressTest { count, rotate, .. } => {
430 tracing::info!(
431 ?client_id,
432 count,
433 rotate,
434 "StressTest event received from authenticated client"
435 );
436 if can_run_playground_command(jti) {
437 const MAX_STRESS: u16 = 1000;
439 let capped_count = count.min(MAX_STRESS);
440 if count > MAX_STRESS {
441 tracing::warn!(
442 ?client_id,
443 count,
444 capped_count,
445 "Stress test count capped at limit"
446 );
447 }
448
449 tracing::info!(
450 ?client_id,
451 count = capped_count,
452 rotate,
453 "Stress test command executed"
454 );
455 world.stress_test(capped_count, rotate);
456 } else {
457 tracing::warn!(?client_id, "Unauthorized StressTest attempt");
458 metrics::counter!("aetheris_unprivileged_packets_total")
459 .increment(1);
460 }
461 }
462 NetworkEvent::Spawn {
463 entity_type,
464 x,
465 y,
466 rot,
467 ..
468 } => {
469 if can_run_playground_command(jti) {
470 let network_id =
471 world.spawn_kind_for(entity_type, x, y, rot, client_id);
472
473 tracing::info!(
474 ?client_id,
475 entity_type,
476 new_entity_id = network_id.0,
477 "[Spawn] Playground entity spawned"
478 );
479 } else {
480 tracing::warn!(?client_id, "Unauthorized Spawn attempt");
481 metrics::counter!("aetheris_unprivileged_packets_total")
482 .increment(1);
483 }
484 }
485 NetworkEvent::StartSession { .. } => {
486 if let Some((_, ship_id)) =
487 self.authenticated_clients.get_mut(&client_id)
488 {
489 let network_id = if let Some(nid) = ship_id {
490 tracing::info!(
491 ?client_id,
492 ?nid,
493 "Reusing existing session ship"
494 );
495 *nid
496 } else {
497 let nid = world.spawn_session_ship(1, 0.0, 0.0, 0.0, client_id);
498 *ship_id = Some(nid);
499 nid
500 };
501
502 world.queue_reliable_event(
503 Some(client_id),
504 aetheris_protocol::events::GameEvent::Possession { network_id },
505 );
506
507 if let Some(auth_ts) = self.auth_timestamps.remove(&client_id) {
509 metrics::histogram!("aetheris_session_start_latency_seconds")
510 .record(auth_ts.elapsed().as_secs_f64());
511 }
512
513 tracing::info!(
514 ?client_id,
515 network_id = network_id.0,
516 "[StartSession] Session ship assigned (spawned or reused) — Possession event queued"
517 );
518 }
519 }
520 NetworkEvent::ClearWorld { .. } => {
521 if can_run_playground_command(jti) {
522 tracing::info!(?client_id, "ClearWorld command executed");
523 world.clear_world();
524 if let Some((_, ship_id)) =
529 self.authenticated_clients.get_mut(&client_id)
530 {
531 *ship_id = None;
532 }
533 clear_ack_targets.push(client_id);
538 } else {
539 tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
540 metrics::counter!("aetheris_unprivileged_packets_total")
541 .increment(1);
542 }
543 }
544 NetworkEvent::RequestSystemManifest { .. } => {
545 let jti = if let Some((jti, _)) =
546 self.authenticated_clients.get(&client_id)
547 {
548 jti
549 } else {
550 ""
551 };
552
553 let manifest = self.get_filtered_manifest(jti);
554 world.queue_reliable_event(
555 Some(client_id),
556 aetheris_protocol::events::GameEvent::SystemManifest { manifest },
557 );
558 }
559 NetworkEvent::ReplicationBatch { events, .. } => {
560 for event in events {
561 updates.push((
562 client_id,
563 aetheris_protocol::events::ComponentUpdate {
564 network_id: event.network_id,
565 component_kind: event.component_kind,
566 payload: event.payload,
567 tick: event.tick,
568 },
569 ));
570 }
571 }
572 _ => {
573 tracing::trace!(?protocol_event, "Protocol event");
574 }
575 }
576 } else {
577 match encoder.decode(&raw_data) {
579 Ok(update) => updates.push((client_id, update)),
580 Err(e) => {
581 metrics::counter!("aetheris_decode_errors_total").increment(1);
582 error!(
583 error = ?e,
584 size = raw_data.len(),
585 "Failed to decode update (not a protocol event)"
586 );
587 }
588 }
589 }
590 }
591 world.apply_updates(&updates);
592 self.reassembler.prune();
593 }
594 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
595 .record(t2.elapsed().as_secs_f64());
596
597 for target in clear_ack_targets {
602 let ack = NetworkEvent::ClearWorld { client_id: target };
603 #[allow(clippy::collapsible_if)]
604 if let Ok(data) = encoder.encode_event(&ack) {
605 if let Err(e) = transport.send_reliable(target, &data).await {
606 tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
607 }
608 }
609 }
610
611 if let Some(pongs) = pong_responses {
615 for (client_id, p_tick, received_at) in pongs {
616 let pong_event = NetworkEvent::Pong { tick: p_tick };
617 if let Ok(data) = encoder.encode_event(&pong_event) {
618 let dispatch_start = Instant::now();
622 match transport.send_unreliable(client_id, &data).await {
623 Ok(()) => {
624 let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
625 let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
626 metrics::histogram!("aetheris_server_pong_dispatch_ms")
627 .record(dispatch_ms);
628 metrics::histogram!("aetheris_server_ping_hold_ms")
629 .record(server_hold_ms);
630 }
631 Err(e) => {
632 error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
633 }
634 }
635 }
636 }
637 }
638
639 let t3 = Instant::now();
641 {
642 let _span = debug_span!("stage3_simulate").entered();
643 world.simulate();
645 }
646 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
647 .record(t3.elapsed().as_secs_f64());
648
649 let t4 = Instant::now();
651 let (deltas, reliable_events) = {
652 let ds = world.extract_deltas();
653
654 if let Some(limit) = self.recording_ticks.filter(|&l| tick < l) {
656 let hash = world.state_hash();
657 self.golden_hashes.push(hash);
658
659 if tick + 1 == limit {
660 tracing::info!(
661 limit,
662 "Golden recording complete. Saving to golden_600ticks.bin"
663 );
664 let data: Vec<u8> = self
665 .golden_hashes
666 .iter()
667 .flat_map(|h| h.to_le_bytes())
668 .collect();
669 if let Err(e) = std::fs::write("golden_600ticks.bin", data) {
670 tracing::error!(error = ?e, "Failed to write golden file");
671 } else {
672 tracing::info!("Successfully saved golden_600ticks.bin");
673 std::process::exit(0);
674 }
675 }
676 }
677 let rs = world.extract_reliable_events();
678 (ds, rs)
679 };
680 world.post_extract();
682 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
683 .record(t4.elapsed().as_secs_f64());
684
685 let t5 = Instant::now();
687
688 for (target, wire_event) in reliable_events {
690 let targets: Vec<_> = if let Some(id) = target {
692 vec![id]
693 } else {
694 self.authenticated_clients.keys().copied().collect()
695 };
696
697 for id in targets {
698 let network_event = wire_event.clone().into_network_event(id);
699 match encoder.encode_event(&network_event) {
700 Ok(data) => {
701 if let Some(tx) = &self.outbound_tx {
702 let _ = tx
703 .send(OutboundMessage::Reliable {
704 client_id: id,
705 data,
706 })
707 .await;
708 }
709 }
710 Err(e) => {
711 error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
712 }
713 }
714 }
715 }
716
717 if !deltas.is_empty() {
718 let mut broadcast_count: u64 = 0;
719
720 let stage_span = debug_span!("stage5_send", count = deltas.len());
721 let _guard = stage_span.enter();
722
723 let mut client_batches: HashMap<
726 aetheris_protocol::types::ClientId,
727 Vec<aetheris_protocol::events::ReplicationEvent>,
728 > = HashMap::with_capacity(self.authenticated_clients.len());
729
730 for delta in deltas {
731 let targets =
732 Self::get_delta_targets(world, &self.authenticated_clients, delta.network_id);
733
734 match targets {
735 DeltaTargets::Broadcast => {
736 for &client_id in self.authenticated_clients.keys() {
738 client_batches
739 .entry(client_id)
740 .or_default()
741 .push(delta.clone());
742 }
743 }
744 DeltaTargets::Recipients(recipients) => {
745 for target in recipients {
747 client_batches
748 .entry(target)
749 .or_default()
750 .push(delta.clone());
751 }
752 }
753 DeltaTargets::NoRecipients => {}
754 }
755 }
756
757 let max_size = encoder.max_encoded_size();
758 thread_local! {
759 static SCRATCH_BUFFER: std::cell::RefCell<Vec<u8>> = const { std::cell::RefCell::new(Vec::new()) };
760 }
761
762 use rayon::prelude::{IntoParallelIterator, ParallelIterator};
766
767 let batches_to_encode: Vec<_> = client_batches.into_iter().collect();
768
769 let encoded_results = tokio::task::block_in_place(|| {
770 self.encode_pool.install(|| {
771 batches_to_encode
772 .into_par_iter()
773 .map(|(client_id, events)| {
774 let batch_event =
775 aetheris_protocol::events::NetworkEvent::ReplicationBatch {
776 client_id,
777 events,
778 };
779 SCRATCH_BUFFER.with(|buf| {
783 let mut b = buf.borrow_mut();
784 if b.len() < max_size {
785 b.resize(max_size, 0);
786 }
787 match encoder.encode_event_into(&batch_event, &mut b) {
788 Ok(size) => (client_id, Ok(b[..size].to_vec())),
789 Err(
790 aetheris_protocol::error::EncodeError::BufferOverflow {
791 ..
792 },
793 ) => {
794 match encoder.encode_event(&batch_event) {
797 Ok(data) => (client_id, Ok(data)),
798 Err(e) => (client_id, Err(e)),
799 }
800 }
801 Err(e) => (client_id, Err(e)),
802 }
803 })
804 })
805 .collect::<Vec<_>>()
806 })
807 });
808
809 for (client_id, result) in encoded_results {
810 match result {
811 Ok(data) => {
812 let targets = DeltaTargets::Recipients(vec![client_id]);
813 if data.len() > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE {
814 match self
815 .fragment_and_send(&data, data.len(), &targets, encoder)
816 .await
817 {
818 Ok(count) => broadcast_count += count,
819 Err(e) => {
820 error!(error = ?e, ?client_id, "Failed to fragment large batch");
821 }
822 }
823 } else if let Some(tx) = &self.outbound_tx {
824 let _ = tx
825 .send(OutboundMessage::Unreliable { client_id, data })
826 .await;
827 broadcast_count += 1;
828 }
829 }
830 Err(e) => {
831 error!(error = ?e, ?client_id, "Failed to encode batch");
832 }
833 }
834 }
835
836 metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
837 metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
838 }
839 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
840 .record(t5.elapsed().as_secs_f64());
841
842 metrics::histogram!("aetheris_tick_duration_seconds")
843 .record(tick_start.elapsed().as_secs_f64());
844 }
845
846 fn get_delta_targets(
847 world: &dyn WorldState,
848 clients: &HashMap<ClientId, (String, Option<NetworkId>)>,
849 entity_id: NetworkId,
850 ) -> DeltaTargets {
851 if let Some(room_id) = world.get_entity_room(entity_id) {
852 let mut recipients = Vec::new();
853 for &client_id in clients.keys() {
854 if world.get_client_room(client_id) == Some(room_id) {
855 recipients.push(client_id);
856 }
857 }
858 if recipients.is_empty() {
859 DeltaTargets::NoRecipients
860 } else {
861 DeltaTargets::Recipients(recipients)
862 }
863 } else {
864 DeltaTargets::Broadcast
865 }
866 }
867
868 async fn fragment_and_send(
869 &mut self,
870 data: &[u8],
871 len: usize,
872 targets: &DeltaTargets,
873 encoder: &dyn Encoder,
874 ) -> Result<u64, EncodeError> {
875 let Some(tx) = &self.outbound_tx else {
876 return Ok(0);
877 };
878 let message_id = self.next_message_id;
879 self.next_message_id = self.next_message_id.wrapping_add(1);
880
881 let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
882 let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
883
884 let Ok(total_fragments) = u16::try_from(chunks.len()) else {
885 error!(
886 message_id,
887 chunks = chunks.len(),
888 "Too many fragments required for message; dropping payload"
889 );
890 return Err(EncodeError::Io(std::io::Error::new(
891 std::io::ErrorKind::InvalidData,
892 "Too many fragments",
893 )));
894 };
895
896 let mut sent_count = 0;
897 for (i, chunk) in chunks.into_iter().enumerate() {
898 let Ok(fragment_index) = u16::try_from(i) else {
899 error!(message_id, index = i, "Fragment index overflow; stopping");
900 break;
901 };
902
903 let fragment = FragmentedEvent {
904 message_id,
905 fragment_index,
906 total_fragments,
907 payload: chunk.to_vec(),
908 };
909 let fragment_event = NetworkEvent::Fragment {
910 client_id: ClientId(0),
911 fragment,
912 };
913
914 match encoder.encode_event(&fragment_event) {
915 Ok(encoded_fragment) => match targets {
916 DeltaTargets::Broadcast => {
917 let _ = tx
918 .send(OutboundMessage::BroadcastUnreliable {
919 data: encoded_fragment,
920 })
921 .await;
922 sent_count += 1;
923 }
924 DeltaTargets::Recipients(recipients) => {
925 for &target in recipients {
926 let _ = tx
927 .send(OutboundMessage::Unreliable {
928 client_id: target,
929 data: encoded_fragment.clone(),
930 })
931 .await;
932 sent_count += 1;
933 }
934 }
935 DeltaTargets::NoRecipients => {}
936 },
937 Err(e) => {
938 error!(error = ?e, "Failed to encode fragment event");
939 }
940 }
941 }
942
943 Ok(sent_count)
944 }
945
946 fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
947 let mut manifest = BTreeMap::new();
948 manifest.insert(
949 "version_server".to_string(),
950 env!("CARGO_PKG_VERSION").to_string(),
951 );
952 manifest.insert(
953 "version_protocol".to_string(),
954 aetheris_protocol::VERSION.to_string(),
955 );
956
957 if can_run_playground_command(jti) {
958 manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
959 manifest.insert(
960 "clients_active".to_string(),
961 self.authenticated_clients.len().to_string(),
962 );
963 }
964 manifest
965 }
966}
967
968fn can_run_playground_command(jti: &str) -> bool {
973 jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
976}