1use std::collections::{BTreeMap, HashMap};
2use std::time::{Duration, Instant};
3
4use tokio::sync::broadcast;
5use tokio::time::{MissedTickBehavior, interval};
6use tracing::{Instrument, debug_span, error, info_span};
7
8use crate::auth::AuthServiceImpl;
9use aetheris_protocol::error::EncodeError;
10use aetheris_protocol::events::{FragmentedEvent, NetworkEvent};
11use aetheris_protocol::reassembler::Reassembler;
12use aetheris_protocol::traits::{Encoder, GameTransport, WorldState};
13
14#[derive(Debug)]
16pub struct TickScheduler {
17 tick_rate: u64,
18 current_tick: u64,
19 auth_service: AuthServiceImpl,
20
21 authenticated_clients: HashMap<
24 aetheris_protocol::types::ClientId,
25 (String, Vec<aetheris_protocol::types::NetworkId>),
26 >,
27 reassembler: Reassembler,
28 next_message_id: u32,
29}
30
31impl TickScheduler {
32 #[must_use]
34 pub fn new(tick_rate: u64, auth_service: AuthServiceImpl) -> Self {
35 Self {
36 tick_rate,
37 current_tick: 0,
38 auth_service,
39 authenticated_clients: HashMap::new(),
40 reassembler: Reassembler::new(),
41 next_message_id: 0,
42 }
43 }
44
45 pub async fn run(
47 &mut self,
48 mut transport: Box<dyn GameTransport>,
49 mut world: Box<dyn WorldState>,
50 encoder: Box<dyn Encoder>,
51 mut shutdown: broadcast::Receiver<()>,
52 ) {
53 #[allow(clippy::cast_precision_loss)]
54 let tick_duration = Duration::from_secs_f64(1.0 / self.tick_rate as f64);
55 let mut interval = interval(tick_duration);
56 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
57
58 let mut encode_buffer = vec![0u8; encoder.max_encoded_size()];
61
62 loop {
63 tokio::select! {
64 _ = interval.tick() => {
65 let tick_num = self.current_tick;
66 let start = Instant::now();
67 self.tick_step(
68 transport.as_mut(),
69 world.as_mut(),
70 encoder.as_ref(),
71 &mut encode_buffer,
72 )
73 .instrument(info_span!("tick", tick = tick_num))
74 .await;
75 let elapsed = start.elapsed();
76
77 metrics::histogram!("aetheris_tick_duration_seconds").record(elapsed.as_secs_f64());
78 }
79 _ = shutdown.recv() => {
80 tracing::info!("Server shutting down gracefully");
81 break;
82 }
83 }
84 }
85 }
86
87 #[allow(clippy::too_many_lines, clippy::too_many_arguments)]
89 pub async fn tick_step(
90 &mut self,
91 transport: &mut dyn GameTransport,
92 world: &mut dyn WorldState,
93 encoder: &dyn Encoder,
94 encode_buffer: &mut [u8],
95 ) {
96 let tick = self.current_tick;
97 world.advance_tick();
103
104 let t1 = Instant::now();
106 let events = match transport
107 .poll_events()
108 .instrument(debug_span!("stage1_poll"))
109 .await
110 {
111 Ok(e) => e,
112 Err(e) => {
113 error!(error = ?e, "Fatal transport error during poll; skipping tick");
114 return;
115 }
116 };
117 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "poll")
118 .record(t1.elapsed().as_secs_f64());
119
120 let inbound_count: u64 = events
121 .iter()
122 .filter(|e| {
123 matches!(
124 e,
125 NetworkEvent::UnreliableMessage { .. } | NetworkEvent::ReliableMessage { .. }
126 )
127 })
128 .count() as u64;
129 metrics::counter!("aetheris_packets_inbound_total").increment(inbound_count);
130
131 if tick.is_multiple_of(60) {
133 let mut to_remove = Vec::new();
134 for (&client_id, (jti, _)) in &self.authenticated_clients {
135 if !self.auth_service.is_session_authorized(jti, Some(tick)) {
136 tracing::warn!(?client_id, "Session invalidated during periodic check");
137 to_remove.push(client_id);
138 }
139 }
140 for client_id in to_remove {
141 if let Some((_, network_ids)) = self.authenticated_clients.remove(&client_id) {
142 for network_id in network_ids {
143 let _ = world.despawn_networked(network_id);
144 }
145 }
146 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
147 }
148 }
149
150 let t2 = Instant::now();
152 let mut pong_responses = None;
153 let mut clear_ack_targets: Vec<aetheris_protocol::types::ClientId> = Vec::new();
154 if !events.is_empty() {
155 let _span = debug_span!("stage2_apply", count = events.len()).entered();
156 let mut updates = Vec::with_capacity(events.len());
157 for event in events {
158 let (client_id, raw_data, is_message) = match event {
160 NetworkEvent::Fragment {
161 client_id,
162 fragment,
163 } => {
164 if let Some(data) = self.reassembler.ingest(client_id, fragment) {
165 (client_id, data, true)
166 } else {
167 continue;
168 }
169 }
170 NetworkEvent::UnreliableMessage { data, client_id }
171 | NetworkEvent::ReliableMessage { data, client_id } => {
172 if let Ok(NetworkEvent::Fragment { fragment, .. }) =
174 encoder.decode_event(&data)
175 {
176 if let Some(reassembled) = self.reassembler.ingest(client_id, fragment)
177 {
178 (client_id, reassembled, true)
179 } else {
180 continue;
181 }
182 } else {
183 (client_id, data, true)
184 }
185 }
186 NetworkEvent::ClientConnected(id) => {
187 metrics::gauge!("aetheris_connected_clients").increment(1.0);
188 tracing::info!(client_id = ?id, "Client connected (awaiting auth)");
189 (id, Vec::new(), false)
190 }
191 NetworkEvent::ClientDisconnected(id) | NetworkEvent::Disconnected(id) => {
192 metrics::gauge!("aetheris_connected_clients").decrement(1.0);
193 if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
194 for network_id in network_ids {
195 let _ = world.despawn_networked(network_id);
196 }
197 }
198 tracing::info!(client_id = ?id, "Client disconnected");
199 (id, Vec::new(), false)
200 }
201 NetworkEvent::SessionClosed(id) => {
202 metrics::counter!("aetheris_transport_events_total", "type" => "session_closed")
203 .increment(1);
204 tracing::warn!(client_id = ?id, "WebTransport session closed");
205 if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
206 for network_id in network_ids {
207 let _ = world.despawn_networked(network_id);
208 }
209 }
210 (id, Vec::new(), false)
211 }
212 NetworkEvent::StreamReset(id) => {
213 metrics::counter!("aetheris_transport_events_total", "type" => "stream_reset")
214 .increment(1);
215 tracing::error!(client_id = ?id, "WebTransport stream reset");
216 if let Some((_, network_ids)) = self.authenticated_clients.remove(&id) {
217 for network_id in network_ids {
218 let _ = world.despawn_networked(network_id);
219 }
220 }
221 (id, Vec::new(), false)
222 }
223 NetworkEvent::Ping { client_id, tick } => {
224 if self.authenticated_clients.contains_key(&client_id) {
225 pong_responses.get_or_insert_with(Vec::new).push((
226 client_id,
227 tick,
228 Instant::now(),
229 ));
230 metrics::counter!("aetheris_protocol_pings_received_total")
231 .increment(1);
232 }
233 (client_id, Vec::new(), false)
234 }
235 NetworkEvent::ClearWorld { client_id, .. }
236 | NetworkEvent::StartSession { client_id }
237 | NetworkEvent::RequestSystemManifest { client_id }
238 | NetworkEvent::GameEvent { client_id, .. }
239 | NetworkEvent::StressTest { client_id, .. }
240 | NetworkEvent::Spawn { client_id, .. } => (client_id, Vec::new(), false),
241 NetworkEvent::Pong { .. } | NetworkEvent::Auth { .. } => {
242 (aetheris_protocol::types::ClientId(0), Vec::new(), false)
243 }
244 };
245
246 if !is_message {
247 continue;
248 }
249
250 let jti = if let Some((jti, _)) = self.authenticated_clients.get(&client_id) {
252 if !self.auth_service.is_session_authorized(jti, Some(tick)) {
254 tracing::warn!(?client_id, "Session revoked; dropping client");
255 if let Some((_, network_ids)) =
256 self.authenticated_clients.remove(&client_id)
257 {
258 for network_id in network_ids {
259 let _ = world.despawn_networked(network_id);
260 }
261 }
262 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
263 continue;
264 }
265 jti
266 } else {
267 match encoder.decode_event(&raw_data) {
269 Ok(NetworkEvent::Auth { session_token }) => {
270 tracing::info!(?client_id, "Auth message received");
271 if let Some(jti) = self
272 .auth_service
273 .validate_and_get_jti(&session_token, Some(tick))
274 {
275 tracing::info!(?client_id, "Client authenticated successfully");
276
277 self.authenticated_clients
278 .insert(client_id, (jti, Vec::new()));
279
280 tracing::info!(
281 ?client_id,
282 "[Auth] Client authenticated — waiting for StartSession to spawn ship"
283 );
284 continue;
285 }
286 tracing::warn!(
287 ?client_id,
288 "Client failed authentication (token rejected)"
289 );
290 }
291 Ok(other) => {
292 tracing::warn!(
293 ?client_id,
294 variant = ?std::mem::discriminant(&other),
295 bytes = raw_data.len(),
296 "Unauthenticated client sent non-Auth event — discarding"
297 );
298 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
299 }
300 Err(e) => {
301 tracing::warn!(
302 ?client_id,
303 error = ?e,
304 bytes = raw_data.len(),
305 "Failed to decode message from unauthenticated client"
306 );
307 metrics::counter!("aetheris_unprivileged_packets_total").increment(1);
308 }
309 }
310 continue;
311 };
312
313 if let Ok(protocol_event) = encoder.decode_event(&raw_data) {
315 match protocol_event {
316 NetworkEvent::Ping { tick: p_tick, .. } => {
317 pong_responses.get_or_insert_with(Vec::new).push((
318 client_id,
319 p_tick,
320 Instant::now(),
321 ));
322 metrics::counter!("aetheris_protocol_pings_received_total")
323 .increment(1);
324 }
325 NetworkEvent::Auth { .. } => {
326 tracing::debug!(?client_id, "Client re-authenticating (ignored)");
327 }
328 NetworkEvent::StressTest { count, rotate, .. } => {
329 tracing::info!(
330 ?client_id,
331 count,
332 rotate,
333 "StressTest event received from authenticated client"
334 );
335 if can_run_playground_command(jti) {
336 const MAX_STRESS: u16 = 1000;
338 let capped_count = count.min(MAX_STRESS);
339 if count > MAX_STRESS {
340 tracing::warn!(
341 ?client_id,
342 count,
343 capped_count,
344 "Stress test count capped at limit"
345 );
346 }
347
348 tracing::info!(
349 ?client_id,
350 count = capped_count,
351 rotate,
352 "Stress test command executed"
353 );
354 world.stress_test(capped_count, rotate);
355 } else {
356 tracing::warn!(?client_id, "Unauthorized StressTest attempt");
357 metrics::counter!("aetheris_unprivileged_packets_total")
358 .increment(1);
359 }
360 }
361 NetworkEvent::Spawn {
362 entity_type,
363 x,
364 y,
365 rot,
366 ..
367 } => {
368 if can_run_playground_command(jti) {
369 let network_id =
370 world.spawn_kind_for(entity_type, x, y, rot, client_id);
371 if let Some((_, network_ids)) =
372 self.authenticated_clients.get_mut(&client_id)
373 {
374 network_ids.push(network_id);
375 }
376
377 tracing::info!(
378 ?client_id,
379 entity_type,
380 new_entity_id = network_id.0,
381 "[Spawn] Playground entity spawned — tracked for cleanup on disconnect"
382 );
383 } else {
384 tracing::warn!(?client_id, "Unauthorized Spawn attempt");
385 metrics::counter!("aetheris_unprivileged_packets_total")
386 .increment(1);
387 }
388 }
389 NetworkEvent::StartSession { .. } => {
390 let already_has_ship = self
392 .authenticated_clients
393 .get(&client_id)
394 .is_some_and(|(_, ids)| !ids.is_empty());
395
396 if already_has_ship {
397 tracing::warn!(
398 ?client_id,
399 "StartSession ignored — client already has a session ship"
400 );
401 } else {
402 let network_id =
403 world.spawn_session_ship(1, 0.0, 0.0, 0.0, client_id);
404 if let Some((_, network_ids)) =
405 self.authenticated_clients.get_mut(&client_id)
406 {
407 network_ids.push(network_id); }
409
410 world.queue_reliable_event(
411 Some(client_id),
412 aetheris_protocol::events::GameEvent::Possession { network_id },
413 );
414
415 tracing::info!(
416 ?client_id,
417 network_id = network_id.0,
418 "[StartSession] Session ship spawned — Possession sent"
419 );
420 }
421 }
422 NetworkEvent::ClearWorld { .. } => {
423 if can_run_playground_command(jti) {
424 tracing::info!(?client_id, "ClearWorld command executed");
425 world.clear_world();
426 if let Some((_, ids)) =
431 self.authenticated_clients.get_mut(&client_id)
432 {
433 ids.clear();
434 }
435 clear_ack_targets.push(client_id);
440 } else {
441 tracing::warn!(?client_id, "Unauthorized ClearWorld attempt");
442 metrics::counter!("aetheris_unprivileged_packets_total")
443 .increment(1);
444 }
445 }
446 NetworkEvent::RequestSystemManifest { .. } => {
447 let jti = if let Some((jti, _)) =
448 self.authenticated_clients.get(&client_id)
449 {
450 jti
451 } else {
452 ""
453 };
454
455 let manifest = self.get_filtered_manifest(jti);
456 world.queue_reliable_event(
457 Some(client_id),
458 aetheris_protocol::events::GameEvent::SystemManifest { manifest },
459 );
460 }
461 _ => {
462 tracing::trace!(?protocol_event, "Protocol event");
463 }
464 }
465 } else {
466 match encoder.decode(&raw_data) {
468 Ok(update) => updates.push((client_id, update)),
469 Err(e) => {
470 metrics::counter!("aetheris_decode_errors_total").increment(1);
471 error!(
472 error = ?e,
473 size = raw_data.len(),
474 "Failed to decode update (not a protocol event)"
475 );
476 }
477 }
478 }
479 }
480 world.apply_updates(&updates);
481 self.reassembler.prune();
482 }
483 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "apply")
484 .record(t2.elapsed().as_secs_f64());
485
486 for target in clear_ack_targets {
491 let ack = NetworkEvent::ClearWorld { client_id: target };
492 #[allow(clippy::collapsible_if)]
493 if let Ok(data) = encoder.encode_event(&ack) {
494 if let Err(e) = transport.send_reliable(target, &data).await {
495 tracing::warn!(client_id = ?target, error = ?e, "Failed to send ClearWorld ack");
496 }
497 }
498 }
499
500 if let Some(pongs) = pong_responses {
504 for (client_id, p_tick, received_at) in pongs {
505 let pong_event = NetworkEvent::Pong { tick: p_tick };
506 if let Ok(data) = encoder.encode_event(&pong_event) {
507 let dispatch_start = Instant::now();
511 match transport.send_unreliable(client_id, &data).await {
512 Ok(()) => {
513 let dispatch_ms = dispatch_start.elapsed().as_secs_f64() * 1000.0;
514 let server_hold_ms = received_at.elapsed().as_secs_f64() * 1000.0;
515 metrics::histogram!("aetheris_server_pong_dispatch_ms")
516 .record(dispatch_ms);
517 metrics::histogram!("aetheris_server_ping_hold_ms")
518 .record(server_hold_ms);
519 }
520 Err(e) => {
521 error!(error = ?e, client_id = ?client_id, "Failed to send Pong");
522 }
523 }
524 }
525 }
526 }
527
528 let t3 = Instant::now();
530 {
531 let _span = debug_span!("stage3_simulate").entered();
532 world.simulate();
534 }
535 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "simulate")
536 .record(t3.elapsed().as_secs_f64());
537
538 let t4 = Instant::now();
540 let (deltas, reliable_events) = {
541 let _span = debug_span!("stage4_extract").entered();
542 (world.extract_deltas(), world.extract_reliable_events())
543 };
544 world.post_extract();
546 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "extract")
547 .record(t4.elapsed().as_secs_f64());
548
549 let t5 = Instant::now();
551
552 for (target, wire_event) in reliable_events {
554 let targets: Vec<_> = if let Some(id) = target {
556 vec![id]
557 } else {
558 self.authenticated_clients.keys().copied().collect()
559 };
560
561 for id in targets {
562 let network_event = wire_event.clone().into_network_event(id);
563 match encoder.encode_event(&network_event) {
564 Ok(data) => {
565 if let Err(e) = transport.send_reliable(id, &data).await {
566 error!(error = ?e, client_id = ?id, "Failed to send reliable event");
567 }
568 }
569 Err(e) => {
570 error!(error = ?e, client_id = ?id, "Failed to encode reliable event");
571 }
572 }
573 }
574 }
575
576 if !deltas.is_empty() {
577 let mut broadcast_count: u64 = 0;
578
579 let stage_span = debug_span!("stage5_send", count = deltas.len());
580 let _guard = stage_span.enter();
581
582 for delta in deltas {
583 let encode_result = encoder.encode(&delta, encode_buffer);
584 match encode_result {
585 Ok(len) if len > aetheris_protocol::MAX_SAFE_PAYLOAD_SIZE => {
586 let targets = Self::get_delta_targets(
587 world,
588 &self.authenticated_clients,
589 delta.network_id,
590 );
591 match self
592 .fragment_and_send(encode_buffer, len, &targets, encoder, transport)
593 .await
594 {
595 Ok(count) => broadcast_count += count,
596 Err(e) => error!(error = ?e, "Failed to fragment and broadcast delta"),
597 }
598 }
599 Ok(len) => {
600 let targets = Self::get_delta_targets(
601 world,
602 &self.authenticated_clients,
603 delta.network_id,
604 );
605 if targets.is_empty() {
606 if let Err(e) =
607 transport.broadcast_unreliable(&encode_buffer[..len]).await
608 {
609 error!(error = ?e, "Failed to broadcast delta");
610 } else {
611 broadcast_count += 1;
612 }
613 } else {
614 for target in targets {
615 if let Err(e) = transport
616 .send_unreliable(target, &encode_buffer[..len])
617 .await
618 {
619 error!(error = ?e, "Failed to send delta");
620 } else {
621 broadcast_count += 1;
622 }
623 }
624 }
625 }
626 Err(EncodeError::BufferOverflow {
627 needed,
628 available: _,
629 }) => {
630 let mut large_buffer = vec![0u8; needed];
631 if let Ok(len) = encoder.encode(&delta, &mut large_buffer) {
632 let targets = Self::get_delta_targets(
633 world,
634 &self.authenticated_clients,
635 delta.network_id,
636 );
637 match self
638 .fragment_and_send(&large_buffer, len, &targets, encoder, transport)
639 .await
640 {
641 Ok(count) => broadcast_count += count,
642 Err(e) => {
643 error!(error = ?e, "Failed to fragment and broadcast large delta");
644 }
645 }
646 } else {
647 error!("Failed to encode into large scratch buffer");
648 }
649 }
650 Err(e) => {
651 metrics::counter!("aetheris_encode_errors_total").increment(1);
652 error!(
653 network_id = ?delta.network_id,
654 error = ?e,
655 "Failed to encode delta"
656 );
657 }
658 }
659 }
660 metrics::counter!("aetheris_packets_outbound_total").increment(broadcast_count);
661 metrics::counter!("aetheris_packets_broadcast_total").increment(broadcast_count);
662 }
663 metrics::histogram!("aetheris_stage_duration_seconds", "stage" => "send")
664 .record(t5.elapsed().as_secs_f64());
665
666 self.current_tick += 1;
668 }
669
670 fn get_delta_targets(
671 world: &mut dyn WorldState,
672 clients: &HashMap<
673 aetheris_protocol::types::ClientId,
674 (String, Vec<aetheris_protocol::types::NetworkId>),
675 >,
676 entity_id: aetheris_protocol::types::NetworkId,
677 ) -> Vec<aetheris_protocol::types::ClientId> {
678 if let Some(room_id) = world.get_entity_room(entity_id) {
679 let mut targets = Vec::new();
680 for &client_id in clients.keys() {
681 if world.get_client_room(client_id) == Some(room_id) {
682 targets.push(client_id);
683 }
684 }
685 targets
686 } else {
687 Vec::new() }
689 }
690
691 async fn fragment_and_send(
692 &mut self,
693 data: &[u8],
694 len: usize,
695 targets: &[aetheris_protocol::types::ClientId],
696 encoder: &dyn Encoder,
697 transport: &dyn GameTransport,
698 ) -> Result<u64, EncodeError> {
699 let message_id = self.next_message_id;
700 self.next_message_id = self.next_message_id.wrapping_add(1);
701
702 let chunk_size = aetheris_protocol::MAX_FRAGMENT_PAYLOAD_SIZE;
703 let chunks: Vec<_> = data[..len].chunks(chunk_size).collect();
704
705 let Ok(total_fragments) = u16::try_from(chunks.len()) else {
706 error!(
707 message_id,
708 chunks = chunks.len(),
709 "Too many fragments required for message; dropping payload"
710 );
711 return Err(EncodeError::Io(std::io::Error::new(
712 std::io::ErrorKind::InvalidData,
713 "Too many fragments",
714 )));
715 };
716
717 let mut sent_count = 0;
718 for (i, chunk) in chunks.into_iter().enumerate() {
719 let Ok(fragment_index) = u16::try_from(i) else {
720 error!(message_id, index = i, "Fragment index overflow; stopping");
721 break;
722 };
723
724 let fragment = FragmentedEvent {
725 message_id,
726 fragment_index,
727 total_fragments,
728 payload: chunk.to_vec(),
729 };
730 let fragment_event = NetworkEvent::Fragment {
731 client_id: aetheris_protocol::types::ClientId(0),
732 fragment,
733 };
734
735 match encoder.encode_event(&fragment_event) {
736 Ok(encoded_fragment) => {
737 if targets.is_empty() {
738 if let Err(e) = transport.broadcast_unreliable(&encoded_fragment).await {
739 error!(error = ?e, "Failed to broadcast fragment");
740 } else {
741 sent_count += 1;
742 }
743 } else {
744 for &target in targets {
745 if let Err(e) =
746 transport.send_unreliable(target, &encoded_fragment).await
747 {
748 error!(error = ?e, "Failed to send fragment");
749 } else {
750 sent_count += 1;
751 }
752 }
753 }
754 }
755 Err(e) => {
756 error!(error = ?e, "Failed to encode fragment event");
757 }
758 }
759 }
760
761 Ok(sent_count)
762 }
763
764 fn get_filtered_manifest(&self, jti: &str) -> BTreeMap<String, String> {
765 let mut manifest = BTreeMap::new();
766 manifest.insert(
767 "version_server".to_string(),
768 env!("CARGO_PKG_VERSION").to_string(),
769 );
770 manifest.insert(
771 "version_protocol".to_string(),
772 aetheris_protocol::VERSION.to_string(),
773 );
774
775 if can_run_playground_command(jti) {
776 manifest.insert("tick_rate".to_string(), self.tick_rate.to_string());
777 manifest.insert(
778 "clients_active".to_string(),
779 self.authenticated_clients.len().to_string(),
780 );
781 }
782 manifest
783 }
784}
785
786fn can_run_playground_command(jti: &str) -> bool {
791 jti == "admin" || std::env::var("AETHERIS_ENV").ok().as_deref() == Some("dev")
794}