1use super::{BackrollError, BackrollResult, Player, PlayerHandle};
2use crate::{
3 command::{Command, Commands},
4 input::FrameInput,
5 is_null,
6 protocol::{ConnectionStatus, Event as ProtocolEvent, Peer, PeerConfig},
7 sync::{self, Sync},
8 transport::Peer as TransportPeer,
9 Config, Event, Frame, NetworkStats, MAX_PLAYERS,
10};
11use async_channel::TryRecvError;
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::debug;
16
17const RECOMMENDATION_INTERVAL: Frame = 240;
18const DEFAULT_FRAME_DELAY: Frame = 3;
19const DEFAULT_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
20const DEFAULT_DISCONNECT_NOTIFY_START: Duration = Duration::from_millis(750);
21
22enum PlayerType<T>
23where
24 T: Config,
25{
26 Local,
27 Remote {
28 peer: Box<Peer<T>>,
29 rx: async_channel::Receiver<ProtocolEvent<T::Input>>,
30 },
31}
32
33impl<T: Config> Clone for PlayerType<T> {
34 fn clone(&self) -> Self {
35 match self {
36 Self::Local => Self::Local,
37 Self::Remote { peer, rx } => Self::Remote {
38 peer: peer.clone(),
39 rx: rx.clone(),
40 },
41 }
42 }
43}
44
45impl<T: Config> PlayerType<T> {
46 pub fn new(
47 queue: usize,
48 player: &Player,
49 builder: &P2PSessionBuilder<T>,
50 connect: Arc<[RwLock<ConnectionStatus>]>,
51 ) -> Self {
52 match player {
53 Player::Local => Self::Local,
54 Player::Remote(peer) => {
55 let (peer, rx) = Self::make_peer(queue, peer, builder, connect);
56 PlayerType::<T>::Remote {
57 peer: Box::new(peer),
58 rx,
59 }
60 }
61 }
62 }
63
64 fn make_peer(
65 queue: usize,
66 peer: &TransportPeer,
67 builder: &P2PSessionBuilder<T>,
68 connect: Arc<[RwLock<ConnectionStatus>]>,
69 ) -> (Peer<T>, async_channel::Receiver<ProtocolEvent<T::Input>>) {
70 let config = PeerConfig {
71 peer: peer.clone(),
72 disconnect_timeout: builder.disconnect_timeout,
73 disconnect_notify_start: builder.disconnect_notify_start,
74 };
75
76 Peer::<T>::new(queue, config, connect)
77 }
78
79 pub fn peer(&self) -> Option<&Peer<T>> {
80 match self {
81 Self::Local => None,
82 Self::Remote { ref peer, .. } => Some(peer),
83 }
84 }
85
86 pub fn is_local(&self) -> bool {
87 self.peer().is_none()
88 }
89
90 pub fn is_remote_player(&self) -> bool {
91 matches!(self, Self::Remote { .. })
92 }
93
94 pub fn is_synchronized(&self) -> bool {
95 if let Some(peer) = self.peer() {
96 peer.is_running()
97 } else {
98 true
99 }
100 }
101
102 pub fn send_input(&mut self, input: FrameInput<T::Input>) {
103 if let Some(peer) = self.peer() {
104 let _ = peer.send_input(input);
105 }
106 }
107
108 pub fn disconnect(&mut self) {
109 if let Some(peer) = self.peer() {
110 peer.disconnect();
111 }
112 }
113
114 pub fn get_network_stats(&self) -> Option<NetworkStats> {
115 self.peer().map(|peer| peer.get_network_stats())
116 }
117}
118
119pub struct P2PSessionBuilder<T>
123where
124 T: Config,
125{
126 players: Vec<Player>,
127 frame_delay: Frame,
128 disconnect_timeout: Duration,
129 disconnect_notify_start: Duration,
130 marker_: std::marker::PhantomData<T>,
131}
132
133impl<T> Default for P2PSessionBuilder<T>
134where
135 T: Config,
136{
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl<T> P2PSessionBuilder<T>
143where
144 T: Config,
145{
146 pub fn new() -> Self {
150 Self {
151 players: Vec::new(),
152 frame_delay: DEFAULT_FRAME_DELAY,
153 disconnect_timeout: DEFAULT_DISCONNECT_TIMEOUT,
154 disconnect_notify_start: DEFAULT_DISCONNECT_NOTIFY_START,
155 marker_: Default::default(),
156 }
157 }
158
159 pub fn with_frame_delay(mut self, frame_delay: Frame) -> Self {
162 self.frame_delay = frame_delay;
163 self
164 }
165
166 pub fn with_disconnect_timeout(mut self, timeout: Duration) -> Self {
169 self.disconnect_timeout = timeout;
170 self
171 }
172
173 pub fn with_disconnect_notify_start(mut self, timeout: Duration) -> Self {
178 self.disconnect_timeout = timeout;
179 self
180 }
181
182 pub fn add_player(&mut self, player: Player) -> PlayerHandle {
184 let id = self.players.len();
185 self.players.push(player);
186 PlayerHandle(id)
187 }
188
189 pub fn start(self) -> BackrollResult<P2PSession<T>> {
197 P2PSession::new_internal(self)
198 }
199}
200
201struct P2PSessionRef<T>
202where
203 T: Config,
204{
205 sync: Sync<T>,
206 players: Vec<PlayerType<T>>,
207
208 synchronizing: bool,
209 next_recommended_sleep: Frame,
210
211 local_connect_status: Arc<[RwLock<ConnectionStatus>]>,
212}
213
214impl<T: Config> P2PSessionRef<T> {
215 fn players(&self) -> impl Iterator<Item = &Peer<T>> {
216 self.players
217 .iter()
218 .filter(|player| player.is_remote_player())
219 .filter_map(|player| player.peer())
220 }
221
222 fn player_handle_to_queue(&self, player: PlayerHandle) -> BackrollResult<usize> {
223 let offset = player.0;
224 if offset >= self.sync.player_count() {
225 return Err(BackrollError::InvalidPlayer(player));
226 }
227 Ok(offset)
228 }
229
230 fn check_initial_sync(&mut self, commands: &mut Commands<T>) {
231 if self.synchronizing && self.is_synchronized() {
232 commands.push(Command::Event(Event::Running));
233 self.synchronizing = false;
234 }
235 }
236
237 fn disconnect_player(
238 &mut self,
239 commands: &mut Commands<T>,
240 player: PlayerHandle,
241 ) -> BackrollResult<()> {
242 let queue = self.player_handle_to_queue(player)?;
243 let (last_frame, disconnected) = {
244 let status = self.local_connect_status[queue].read();
245 (status.last_frame, status.disconnected)
246 };
247
248 if disconnected {
249 return Err(BackrollError::PlayerDisconnected(player));
250 }
251
252 if self.players[queue].is_local() {
253 let current_frame = self.sync.frame_count();
257 debug!(
258 "Disconnecting local player {} at frame {} by user request.",
259 queue, last_frame
260 );
261 for i in 0..self.players.len() {
262 if !self.players[i].is_local() {
263 self.disconnect_player_queue(commands, i, current_frame);
264 }
265 }
266 } else {
267 debug!(
268 "Disconnecting queue {} at frame {} by user request.",
269 queue, last_frame
270 );
271 self.disconnect_player_queue(commands, queue, last_frame);
272 }
273 Ok(())
274 }
275
276 fn disconnect_player_queue(&mut self, commands: &mut Commands<T>, queue: usize, syncto: Frame) {
277 let frame_count = self.sync.frame_count();
278
279 self.players[queue].disconnect();
280
281 debug!("Changing queue {} local connect status for last frame from {} to {} on disconnect request (current: {}).",
282 queue, self.local_connect_status[queue].read().last_frame, syncto, frame_count);
283
284 {
285 let mut status = self.local_connect_status[queue].write();
286 status.disconnected = true;
287 status.last_frame = syncto;
288 }
289
290 if syncto < frame_count {
291 debug!(
292 "Adjusting simulation to account for the fact that {} disconnected @ {}.",
293 queue, syncto
294 );
295 self.sync.adjust_simulation(commands, syncto);
296 debug!("Finished adjusting simulation.");
297 }
298
299 commands.push(Command::Event(Event::Disconnected(PlayerHandle(queue))));
300
301 self.check_initial_sync(commands);
302 }
303
304 fn flush_events(&mut self, commands: &mut Commands<T>) {
305 for (queue, player) in self.players.clone().iter().enumerate() {
306 if let PlayerType::<T>::Remote { rx, .. } = player {
307 self.flush_peer_events(commands, queue, rx.clone());
308 }
309 }
310 }
311
312 fn flush_peer_events(
313 &mut self,
314 commands: &mut Commands<T>,
315 queue: usize,
316 rx: async_channel::Receiver<ProtocolEvent<T::Input>>,
317 ) {
318 loop {
319 match rx.try_recv() {
320 Ok(evt) => self.handle_event(commands, queue, evt),
321 Err(TryRecvError::Empty) => break,
322 Err(TryRecvError::Closed) => {
323 self.disconnect_player(commands, PlayerHandle(queue))
324 .expect("Disconnecting should not error on closing connection");
325 break;
326 }
327 }
328 }
329 }
330
331 fn handle_event(
332 &mut self,
333 commands: &mut Commands<T>,
334 queue: usize,
335 evt: ProtocolEvent<T::Input>,
336 ) {
337 let player = PlayerHandle(queue);
338 match evt {
339 ProtocolEvent::<T::Input>::Connected => {
340 commands.push(Command::Event(Event::Connected(PlayerHandle(queue))));
341 }
342 ProtocolEvent::<T::Input>::Synchronizing { total, count } => {
343 commands.push(Command::Event(Event::Synchronizing {
344 player,
345 total,
346 count,
347 }));
348 }
349 ProtocolEvent::<T::Input>::Inputs(inputs) => {
350 let mut status = self.local_connect_status[queue].write();
351 if status.disconnected {
352 return;
353 }
354
355 for input in inputs {
356 let current_remote_frame = status.last_frame;
357 let new_remote_frame = input.frame;
358 debug_assert!(
359 crate::is_null(current_remote_frame)
360 || new_remote_frame == (current_remote_frame + 1)
361 );
362 self.sync.add_remote_input(queue, input);
363
364 debug!(
366 "setting remote connect status for queue {} to {}",
367 queue, new_remote_frame
368 );
369
370 status.last_frame = new_remote_frame;
371 }
372 }
373 ProtocolEvent::<T::Input>::Synchronized => {
374 commands.push(Command::Event(Event::Synchronized(player)));
375 self.check_initial_sync(commands);
376 }
377 ProtocolEvent::<T::Input>::NetworkInterrupted { disconnect_timeout } => {
378 commands.push(Command::Event(Event::ConnectionInterrupted {
379 player,
380 disconnect_timeout,
381 }));
382 }
383 ProtocolEvent::<T::Input>::NetworkResumed => {
384 commands.push(Command::Event(Event::Synchronized(player)));
385 }
386 }
387 }
388
389 fn do_poll(&mut self, commands: &mut Commands<T>) {
390 if self.sync.in_rollback() {
391 return;
392 }
393
394 self.flush_events(commands);
395
396 if self.synchronizing {
397 return;
398 }
399
400 self.sync.check_simulation(commands);
401
402 let current_frame = self.sync.frame_count();
405 for player in self.players() {
406 player.set_local_frame_number(current_frame);
407 }
408
409 let remote_player_count = self
410 .players
411 .iter()
412 .filter(|player| !player.is_local())
413 .count();
414
415 let min_frame = if remote_player_count == 0 {
416 current_frame
417 } else if self.players().count() <= 2 {
418 self.poll_2_players(commands)
419 } else {
420 self.poll_n_players(commands)
421 };
422
423 debug!("last confirmed frame in p2p backend is {}.", min_frame);
424 if min_frame >= 0 {
425 debug_assert!(min_frame != Frame::MAX);
426 debug!("setting confirmed frame in sync to {}.", min_frame);
427 self.sync.set_last_confirmed_frame(min_frame);
428 }
429
430 if current_frame > self.next_recommended_sleep {
432 let interval = self
433 .players()
434 .map(|player| player.recommend_frame_delay())
435 .max();
436 if let Some(interval) = interval {
437 commands.push(Command::Event(Event::TimeSync {
438 frames_ahead: interval as u8,
439 }));
440 self.next_recommended_sleep = current_frame + RECOMMENDATION_INTERVAL;
441 }
442 }
443 }
444
445 fn poll_2_players(&mut self, commands: &mut Commands<T>) -> Frame {
446 let mut min_frame = Frame::MAX;
448 for i in 0..self.players.len() {
449 let player = &self.players[i];
450 let mut queue_connected = true;
451 if let Some(peer) = player.peer() {
452 if peer.is_running() {
453 queue_connected = !peer.get_peer_connect_status(i).disconnected;
454 }
455 }
456 let local_status = self.local_connect_status[i].read().clone();
457 if !local_status.disconnected {
458 min_frame = std::cmp::min(local_status.last_frame, min_frame);
459 }
460 debug!(
461 "local endp: connected = {}, last_received = {}, total_min_confirmed = {}.",
462 !local_status.disconnected, local_status.last_frame, min_frame
463 );
464 if !queue_connected && !local_status.disconnected {
465 debug!("disconnecting player {} by remote request.", i);
466 self.disconnect_player_queue(commands, i, min_frame);
467 }
468 debug!("min_frame = {}.", min_frame);
469 }
470 min_frame
471 }
472
473 fn poll_n_players(&mut self, commands: &mut Commands<T>) -> Frame {
474 let mut min_frame = Frame::MAX;
476 for queue in 0..self.players.len() {
477 let mut queue_connected = true;
478 let mut queue_min_confirmed = Frame::MAX;
479 debug!("considering queue {}.", queue);
480 for (i, player) in self.players.iter().enumerate() {
481 if player.peer().map(|peer| peer.is_running()).unwrap_or(false) {
485 let peer = player.peer().unwrap();
486 let status = peer.get_peer_connect_status(queue);
487 queue_connected = queue_connected && !status.disconnected;
488 queue_min_confirmed = std::cmp::min(status.last_frame, queue_min_confirmed);
489 debug!("endpoint {}: connected = {}, last_received = {}, queue_min_confirmed = {}.",
490 i, queue_connected, status.last_frame, queue_min_confirmed);
491 } else {
492 debug!("endpoint {}: ignoring... not running.", i);
493 }
494 }
495
496 let local_status = self.local_connect_status[queue].read().clone();
497 if !local_status.disconnected {
499 queue_min_confirmed = std::cmp::min(local_status.last_frame, queue_min_confirmed);
500 }
501 debug!(
502 "local endp: connected = {}, last_received = {}, queue_min_confirmed = {}.",
503 !local_status.disconnected, local_status.last_frame, queue_min_confirmed
504 );
505
506 if queue_connected {
507 min_frame = std::cmp::min(queue_min_confirmed, min_frame);
508 } else {
509 if !local_status.disconnected || local_status.last_frame > queue_min_confirmed {
513 debug!("disconnecting queue {} by remote request.", queue);
514 self.disconnect_player_queue(commands, queue, queue_min_confirmed);
515 }
516 }
517 debug!("min_frame = {}.", min_frame);
518 }
519 min_frame
520 }
521
522 fn is_synchronized(&self) -> bool {
523 for (i, player) in self.players.iter().enumerate() {
526 if !player.is_local()
527 && !player.is_synchronized()
528 && !self.local_connect_status[i].read().disconnected
529 {
530 return false;
531 }
532 }
533 true
534 }
535}
536
537pub struct P2PSession<T>(Arc<RwLock<P2PSessionRef<T>>>)
542where
543 T: Config;
544
545impl<T: Config> Clone for P2PSession<T> {
546 fn clone(&self) -> Self {
547 Self(self.0.clone())
548 }
549}
550
551impl<T: Config> P2PSession<T> {
552 pub fn build() -> P2PSessionBuilder<T> {
553 P2PSessionBuilder::new()
554 }
555
556 fn new_internal(builder: P2PSessionBuilder<T>) -> BackrollResult<Self> {
557 let local_player_count = builder
558 .players
559 .iter()
560 .filter(|player| player.is_local())
561 .count();
562 let remote_player_count = builder.players.len() - local_player_count;
563
564 if local_player_count > 1 && remote_player_count > 1 {
565 return Err(BackrollError::MultipleLocalPlayers);
566 }
567 let player_count = builder.players.len();
568 let connect_status: Vec<RwLock<ConnectionStatus>> =
569 (0..player_count).map(|_| Default::default()).collect();
570 let connect_status: Arc<[RwLock<ConnectionStatus>]> = connect_status.into();
571
572 let players: Vec<PlayerType<T>> = builder
573 .players
574 .iter()
575 .enumerate()
576 .map(|(i, player)| PlayerType::<T>::new(i, player, &builder, connect_status.clone()))
577 .collect();
578
579 let synchronizing = players.iter().any(|player| !player.is_local());
580 let config = sync::PlayerConfig {
581 player_count,
582 frame_delay: builder.frame_delay,
583 };
584 let sync = Sync::<T>::new(config, connect_status.clone());
585 Ok(Self(Arc::new(RwLock::new(P2PSessionRef::<T> {
586 sync,
587 players,
588 synchronizing,
589 next_recommended_sleep: 0,
590 local_connect_status: connect_status,
591 }))))
592 }
593
594 pub fn player_count(&self) -> usize {
597 self.0.read().sync.player_count()
598 }
599
600 pub fn in_rollback(&self) -> bool {
602 self.0.read().sync.in_rollback()
603 }
604
605 pub fn current_frame(&self) -> Frame {
607 self.0.read().sync.frame_count()
608 }
609
610 pub fn local_players(&self) -> smallvec::SmallVec<[PlayerHandle; MAX_PLAYERS]> {
611 self.0
612 .read()
613 .players
614 .iter()
615 .enumerate()
616 .filter(|(_, player)| player.is_local())
617 .map(|(i, _)| PlayerHandle(i))
618 .collect()
619 }
620
621 pub fn remote_players(&self) -> smallvec::SmallVec<[PlayerHandle; MAX_PLAYERS]> {
622 self.0
623 .read()
624 .players
625 .iter()
626 .enumerate()
627 .filter(|(_, player)| player.is_remote_player())
628 .map(|(i, _)| PlayerHandle(i))
629 .collect()
630 }
631
632 pub fn is_synchronized(&self) -> bool {
635 self.0.read().is_synchronized()
636 }
637
638 pub fn add_local_input(&self, player: PlayerHandle, input: T::Input) -> BackrollResult<()> {
661 let mut session_ref = self.0.write();
662 if session_ref.sync.in_rollback() {
663 return Err(BackrollError::InRollback);
664 }
665 if session_ref.synchronizing {
666 return Err(BackrollError::NotSynchronized);
667 }
668
669 let queue = session_ref.player_handle_to_queue(player)?;
670 assert!(
671 session_ref.players[queue].is_local(),
672 "{:?} is not a local player!",
673 player
674 );
675 let frame = session_ref.sync.add_local_input(queue, input)?;
676 if !is_null(frame) {
677 debug!(
682 "setting local connect status for local queue {} to {}",
683 queue, frame
684 );
685 session_ref.local_connect_status[queue].write().last_frame = frame;
686
687 for player in session_ref.players.iter_mut() {
688 player.send_input(FrameInput::<T::Input> { frame, input });
689 }
690 }
691
692 Ok(())
693 }
694
695 pub fn advance_frame(&self) -> Commands<T> {
707 let mut session_ref = self.0.write();
708 let mut commands = Commands::<T>::default();
709 debug!("End of frame ({})...", session_ref.sync.frame_count());
710 if !session_ref.synchronizing {
711 session_ref.sync.increment_frame(&mut commands);
712 }
713 session_ref.do_poll(&mut commands);
714 commands
715 }
716
717 pub fn poll(&self) -> Commands<T> {
723 let mut session_ref = self.0.write();
724 let mut commands = Commands::default();
725 session_ref.do_poll(&mut commands);
726 commands
727 }
728
729 pub fn disconnect_player(&self, player: PlayerHandle) -> BackrollResult<Commands<T>> {
741 let mut session_ref = self.0.write();
742 let queue = session_ref.player_handle_to_queue(player)?;
743 if session_ref.local_connect_status[queue].read().disconnected {
744 return Err(BackrollError::PlayerDisconnected(player));
745 }
746
747 let mut commands = Commands::<T>::default();
748 let last_frame = session_ref.local_connect_status[queue].read().last_frame;
749 if session_ref.players[queue].is_local() {
750 let current_frame = session_ref.sync.frame_count();
754 debug!(
755 "Disconnecting local player {} at frame {} by user request.",
756 queue, last_frame
757 );
758 for i in 0..session_ref.players.len() {
759 if !session_ref.players[i].is_local() {
760 session_ref.disconnect_player_queue(&mut commands, i, current_frame);
761 }
762 }
763 } else {
764 debug!(
765 "Disconnecting queue {} at frame {} by user request.",
766 queue, last_frame
767 );
768 session_ref.disconnect_player_queue(&mut commands, queue, last_frame);
769 }
770 Ok(commands)
771 }
772
773 pub fn get_network_stats(&self, player: PlayerHandle) -> BackrollResult<NetworkStats> {
779 let session_ref = self.0.read();
780 let queue = session_ref.player_handle_to_queue(player)?;
781 Ok(session_ref.players[queue]
782 .get_network_stats()
783 .unwrap_or_default())
784 }
785
786 pub fn set_frame_delay(&self, player: PlayerHandle, delay: Frame) -> BackrollResult<()> {
792 let mut session_ref = self.0.write();
793 let queue = session_ref.player_handle_to_queue(player)?;
794 session_ref.sync.set_frame_delay(queue, delay);
795 Ok(())
796 }
797}