backroll/backend/
p2p.rs

1use super::{BackrollError, BackrollResult, Player, PlayerHandle};
2use crate::{
3    command::{Command, Commands},
4    input::FrameInput,
5    is_null,
6    protocol::{ConnectionStatus, Event as ProtocolEvent, Peer, PeerConfig},
7    sync::{self, Sync},
8    transport::Peer as TransportPeer,
9    Config, Event, Frame, NetworkStats, MAX_PLAYERS,
10};
11use async_channel::TryRecvError;
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::debug;
16
17const RECOMMENDATION_INTERVAL: Frame = 240;
18const DEFAULT_FRAME_DELAY: Frame = 3;
19const DEFAULT_DISCONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
20const DEFAULT_DISCONNECT_NOTIFY_START: Duration = Duration::from_millis(750);
21
22enum PlayerType<T>
23where
24    T: Config,
25{
26    Local,
27    Remote {
28        peer: Box<Peer<T>>,
29        rx: async_channel::Receiver<ProtocolEvent<T::Input>>,
30    },
31}
32
33impl<T: Config> Clone for PlayerType<T> {
34    fn clone(&self) -> Self {
35        match self {
36            Self::Local => Self::Local,
37            Self::Remote { peer, rx } => Self::Remote {
38                peer: peer.clone(),
39                rx: rx.clone(),
40            },
41        }
42    }
43}
44
45impl<T: Config> PlayerType<T> {
46    pub fn new(
47        queue: usize,
48        player: &Player,
49        builder: &P2PSessionBuilder<T>,
50        connect: Arc<[RwLock<ConnectionStatus>]>,
51    ) -> Self {
52        match player {
53            Player::Local => Self::Local,
54            Player::Remote(peer) => {
55                let (peer, rx) = Self::make_peer(queue, peer, builder, connect);
56                PlayerType::<T>::Remote {
57                    peer: Box::new(peer),
58                    rx,
59                }
60            }
61        }
62    }
63
64    fn make_peer(
65        queue: usize,
66        peer: &TransportPeer,
67        builder: &P2PSessionBuilder<T>,
68        connect: Arc<[RwLock<ConnectionStatus>]>,
69    ) -> (Peer<T>, async_channel::Receiver<ProtocolEvent<T::Input>>) {
70        let config = PeerConfig {
71            peer: peer.clone(),
72            disconnect_timeout: builder.disconnect_timeout,
73            disconnect_notify_start: builder.disconnect_notify_start,
74        };
75
76        Peer::<T>::new(queue, config, connect)
77    }
78
79    pub fn peer(&self) -> Option<&Peer<T>> {
80        match self {
81            Self::Local => None,
82            Self::Remote { ref peer, .. } => Some(peer),
83        }
84    }
85
86    pub fn is_local(&self) -> bool {
87        self.peer().is_none()
88    }
89
90    pub fn is_remote_player(&self) -> bool {
91        matches!(self, Self::Remote { .. })
92    }
93
94    pub fn is_synchronized(&self) -> bool {
95        if let Some(peer) = self.peer() {
96            peer.is_running()
97        } else {
98            true
99        }
100    }
101
102    pub fn send_input(&mut self, input: FrameInput<T::Input>) {
103        if let Some(peer) = self.peer() {
104            let _ = peer.send_input(input);
105        }
106    }
107
108    pub fn disconnect(&mut self) {
109        if let Some(peer) = self.peer() {
110            peer.disconnect();
111        }
112    }
113
114    pub fn get_network_stats(&self) -> Option<NetworkStats> {
115        self.peer().map(|peer| peer.get_network_stats())
116    }
117}
118
119/// A builder for [P2PSession].
120///
121/// [P2PSession]: self::P2PSession
122pub struct P2PSessionBuilder<T>
123where
124    T: Config,
125{
126    players: Vec<Player>,
127    frame_delay: Frame,
128    disconnect_timeout: Duration,
129    disconnect_notify_start: Duration,
130    marker_: std::marker::PhantomData<T>,
131}
132
133impl<T> Default for P2PSessionBuilder<T>
134where
135    T: Config,
136{
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl<T> P2PSessionBuilder<T>
143where
144    T: Config,
145{
146    /// Creates a new builder. Identical to [P2PSession::build].
147    ///
148    /// [P2PSession]: self::P2PSession
149    pub fn new() -> Self {
150        Self {
151            players: Vec::new(),
152            frame_delay: DEFAULT_FRAME_DELAY,
153            disconnect_timeout: DEFAULT_DISCONNECT_TIMEOUT,
154            disconnect_notify_start: DEFAULT_DISCONNECT_NOTIFY_START,
155            marker_: Default::default(),
156        }
157    }
158
159    /// Sets how much frame delay is used for all active players.
160    /// Defaults to 3 frames.
161    pub fn with_frame_delay(mut self, frame_delay: Frame) -> Self {
162        self.frame_delay = frame_delay;
163        self
164    }
165
166    /// Sets how long the client will wait for a packet from a remote player
167    /// before considering the connection disconnected. Defaults to 5000ms.
168    pub fn with_disconnect_timeout(mut self, timeout: Duration) -> Self {
169        self.disconnect_timeout = timeout;
170        self
171    }
172
173    /// Sets how long the client will wait for a packet from a remote player before
174    /// before firing a [Event::ConnectionInterrupted] event. Defaults to 750ms.
175    ///
176    /// [Event]: crate::Event
177    pub fn with_disconnect_notify_start(mut self, timeout: Duration) -> Self {
178        self.disconnect_timeout = timeout;
179        self
180    }
181
182    /// Adds a player to the session and returns the corresponding handle.
183    pub fn add_player(&mut self, player: Player) -> PlayerHandle {
184        let id = self.players.len();
185        self.players.push(player);
186        PlayerHandle(id)
187    }
188
189    /// Constructs and starts the P2PSession. Consumes the builder.
190    ///
191    /// # Errors
192    /// Returns [BackrollError::MultipleLocalPlayers] if there are multiple local players.
193    /// Backroll currently only supports one local player.
194    ///
195    /// [BackrolLError]: crate::BackrolLError
196    pub fn start(self) -> BackrollResult<P2PSession<T>> {
197        P2PSession::new_internal(self)
198    }
199}
200
201struct P2PSessionRef<T>
202where
203    T: Config,
204{
205    sync: Sync<T>,
206    players: Vec<PlayerType<T>>,
207
208    synchronizing: bool,
209    next_recommended_sleep: Frame,
210
211    local_connect_status: Arc<[RwLock<ConnectionStatus>]>,
212}
213
214impl<T: Config> P2PSessionRef<T> {
215    fn players(&self) -> impl Iterator<Item = &Peer<T>> {
216        self.players
217            .iter()
218            .filter(|player| player.is_remote_player())
219            .filter_map(|player| player.peer())
220    }
221
222    fn player_handle_to_queue(&self, player: PlayerHandle) -> BackrollResult<usize> {
223        let offset = player.0;
224        if offset >= self.sync.player_count() {
225            return Err(BackrollError::InvalidPlayer(player));
226        }
227        Ok(offset)
228    }
229
230    fn check_initial_sync(&mut self, commands: &mut Commands<T>) {
231        if self.synchronizing && self.is_synchronized() {
232            commands.push(Command::Event(Event::Running));
233            self.synchronizing = false;
234        }
235    }
236
237    fn disconnect_player(
238        &mut self,
239        commands: &mut Commands<T>,
240        player: PlayerHandle,
241    ) -> BackrollResult<()> {
242        let queue = self.player_handle_to_queue(player)?;
243        let (last_frame, disconnected) = {
244            let status = self.local_connect_status[queue].read();
245            (status.last_frame, status.disconnected)
246        };
247
248        if disconnected {
249            return Err(BackrollError::PlayerDisconnected(player));
250        }
251
252        if self.players[queue].is_local() {
253            // The player is local. This should disconnect the local player from the rest
254            // of the game. All other players need to be disconnected.
255            // that if the endpoint is not initalized, this must be the local player.
256            let current_frame = self.sync.frame_count();
257            debug!(
258                "Disconnecting local player {} at frame {} by user request.",
259                queue, last_frame
260            );
261            for i in 0..self.players.len() {
262                if !self.players[i].is_local() {
263                    self.disconnect_player_queue(commands, i, current_frame);
264                }
265            }
266        } else {
267            debug!(
268                "Disconnecting queue {} at frame {} by user request.",
269                queue, last_frame
270            );
271            self.disconnect_player_queue(commands, queue, last_frame);
272        }
273        Ok(())
274    }
275
276    fn disconnect_player_queue(&mut self, commands: &mut Commands<T>, queue: usize, syncto: Frame) {
277        let frame_count = self.sync.frame_count();
278
279        self.players[queue].disconnect();
280
281        debug!("Changing queue {} local connect status for last frame from {} to {} on disconnect request (current: {}).",
282               queue, self.local_connect_status[queue].read().last_frame, syncto, frame_count);
283
284        {
285            let mut status = self.local_connect_status[queue].write();
286            status.disconnected = true;
287            status.last_frame = syncto;
288        }
289
290        if syncto < frame_count {
291            debug!(
292                "Adjusting simulation to account for the fact that {} disconnected @ {}.",
293                queue, syncto
294            );
295            self.sync.adjust_simulation(commands, syncto);
296            debug!("Finished adjusting simulation.");
297        }
298
299        commands.push(Command::Event(Event::Disconnected(PlayerHandle(queue))));
300
301        self.check_initial_sync(commands);
302    }
303
304    fn flush_events(&mut self, commands: &mut Commands<T>) {
305        for (queue, player) in self.players.clone().iter().enumerate() {
306            if let PlayerType::<T>::Remote { rx, .. } = player {
307                self.flush_peer_events(commands, queue, rx.clone());
308            }
309        }
310    }
311
312    fn flush_peer_events(
313        &mut self,
314        commands: &mut Commands<T>,
315        queue: usize,
316        rx: async_channel::Receiver<ProtocolEvent<T::Input>>,
317    ) {
318        loop {
319            match rx.try_recv() {
320                Ok(evt) => self.handle_event(commands, queue, evt),
321                Err(TryRecvError::Empty) => break,
322                Err(TryRecvError::Closed) => {
323                    self.disconnect_player(commands, PlayerHandle(queue))
324                        .expect("Disconnecting should not error on closing connection");
325                    break;
326                }
327            }
328        }
329    }
330
331    fn handle_event(
332        &mut self,
333        commands: &mut Commands<T>,
334        queue: usize,
335        evt: ProtocolEvent<T::Input>,
336    ) {
337        let player = PlayerHandle(queue);
338        match evt {
339            ProtocolEvent::<T::Input>::Connected => {
340                commands.push(Command::Event(Event::Connected(PlayerHandle(queue))));
341            }
342            ProtocolEvent::<T::Input>::Synchronizing { total, count } => {
343                commands.push(Command::Event(Event::Synchronizing {
344                    player,
345                    total,
346                    count,
347                }));
348            }
349            ProtocolEvent::<T::Input>::Inputs(inputs) => {
350                let mut status = self.local_connect_status[queue].write();
351                if status.disconnected {
352                    return;
353                }
354
355                for input in inputs {
356                    let current_remote_frame = status.last_frame;
357                    let new_remote_frame = input.frame;
358                    debug_assert!(
359                        crate::is_null(current_remote_frame)
360                            || new_remote_frame == (current_remote_frame + 1)
361                    );
362                    self.sync.add_remote_input(queue, input);
363
364                    // Notify the other endpoints which frame we received from a peer
365                    debug!(
366                        "setting remote connect status for queue {} to {}",
367                        queue, new_remote_frame
368                    );
369
370                    status.last_frame = new_remote_frame;
371                }
372            }
373            ProtocolEvent::<T::Input>::Synchronized => {
374                commands.push(Command::Event(Event::Synchronized(player)));
375                self.check_initial_sync(commands);
376            }
377            ProtocolEvent::<T::Input>::NetworkInterrupted { disconnect_timeout } => {
378                commands.push(Command::Event(Event::ConnectionInterrupted {
379                    player,
380                    disconnect_timeout,
381                }));
382            }
383            ProtocolEvent::<T::Input>::NetworkResumed => {
384                commands.push(Command::Event(Event::Synchronized(player)));
385            }
386        }
387    }
388
389    fn do_poll(&mut self, commands: &mut Commands<T>) {
390        if self.sync.in_rollback() {
391            return;
392        }
393
394        self.flush_events(commands);
395
396        if self.synchronizing {
397            return;
398        }
399
400        self.sync.check_simulation(commands);
401
402        // notify all of our endpoints of their local frame number for their
403        // next connection quality report
404        let current_frame = self.sync.frame_count();
405        for player in self.players() {
406            player.set_local_frame_number(current_frame);
407        }
408
409        let remote_player_count = self
410            .players
411            .iter()
412            .filter(|player| !player.is_local())
413            .count();
414
415        let min_frame = if remote_player_count == 0 {
416            current_frame
417        } else if self.players().count() <= 2 {
418            self.poll_2_players(commands)
419        } else {
420            self.poll_n_players(commands)
421        };
422
423        debug!("last confirmed frame in p2p backend is {}.", min_frame);
424        if min_frame >= 0 {
425            debug_assert!(min_frame != Frame::MAX);
426            debug!("setting confirmed frame in sync to {}.", min_frame);
427            self.sync.set_last_confirmed_frame(min_frame);
428        }
429
430        // send timesync notifications if now is the proper time
431        if current_frame > self.next_recommended_sleep {
432            let interval = self
433                .players()
434                .map(|player| player.recommend_frame_delay())
435                .max();
436            if let Some(interval) = interval {
437                commands.push(Command::Event(Event::TimeSync {
438                    frames_ahead: interval as u8,
439                }));
440                self.next_recommended_sleep = current_frame + RECOMMENDATION_INTERVAL;
441            }
442        }
443    }
444
445    fn poll_2_players(&mut self, commands: &mut Commands<T>) -> Frame {
446        // discard confirmed frames as appropriate
447        let mut min_frame = Frame::MAX;
448        for i in 0..self.players.len() {
449            let player = &self.players[i];
450            let mut queue_connected = true;
451            if let Some(peer) = player.peer() {
452                if peer.is_running() {
453                    queue_connected = !peer.get_peer_connect_status(i).disconnected;
454                }
455            }
456            let local_status = self.local_connect_status[i].read().clone();
457            if !local_status.disconnected {
458                min_frame = std::cmp::min(local_status.last_frame, min_frame);
459            }
460            debug!(
461                "local endp: connected = {}, last_received = {}, total_min_confirmed = {}.",
462                !local_status.disconnected, local_status.last_frame, min_frame
463            );
464            if !queue_connected && !local_status.disconnected {
465                debug!("disconnecting player {} by remote request.", i);
466                self.disconnect_player_queue(commands, i, min_frame);
467            }
468            debug!("min_frame = {}.", min_frame);
469        }
470        min_frame
471    }
472
473    fn poll_n_players(&mut self, commands: &mut Commands<T>) -> Frame {
474        // discard confirmed frames as appropriate
475        let mut min_frame = Frame::MAX;
476        for queue in 0..self.players.len() {
477            let mut queue_connected = true;
478            let mut queue_min_confirmed = Frame::MAX;
479            debug!("considering queue {}.", queue);
480            for (i, player) in self.players.iter().enumerate() {
481                // we're going to do a lot of logic here in consideration of endpoint i.
482                // keep accumulating the minimum confirmed point for all n*n packets and
483                // throw away the rest.
484                if player.peer().map(|peer| peer.is_running()).unwrap_or(false) {
485                    let peer = player.peer().unwrap();
486                    let status = peer.get_peer_connect_status(queue);
487                    queue_connected = queue_connected && !status.disconnected;
488                    queue_min_confirmed = std::cmp::min(status.last_frame, queue_min_confirmed);
489                    debug!("endpoint {}: connected = {}, last_received = {}, queue_min_confirmed = {}.", 
490                          i, queue_connected, status.last_frame, queue_min_confirmed);
491                } else {
492                    debug!("endpoint {}: ignoring... not running.", i);
493                }
494            }
495
496            let local_status = self.local_connect_status[queue].read().clone();
497            // merge in our local status only if we're still connected!
498            if !local_status.disconnected {
499                queue_min_confirmed = std::cmp::min(local_status.last_frame, queue_min_confirmed);
500            }
501            debug!(
502                "local endp: connected = {}, last_received = {}, queue_min_confirmed = {}.",
503                !local_status.disconnected, local_status.last_frame, queue_min_confirmed
504            );
505
506            if queue_connected {
507                min_frame = std::cmp::min(queue_min_confirmed, min_frame);
508            } else {
509                // check to see if this disconnect notification is further back than we've been before.  If
510                // so, we need to re-adjust.  This can happen when we detect our own disconnect at frame n
511                // and later receive a disconnect notification for frame n-1.
512                if !local_status.disconnected || local_status.last_frame > queue_min_confirmed {
513                    debug!("disconnecting queue {} by remote request.", queue);
514                    self.disconnect_player_queue(commands, queue, queue_min_confirmed);
515                }
516            }
517            debug!("min_frame = {}.", min_frame);
518        }
519        min_frame
520    }
521
522    fn is_synchronized(&self) -> bool {
523        // Check to see if everyone is now synchronized.  If so,
524        // go ahead and tell the client that we're ok to accept input.
525        for (i, player) in self.players.iter().enumerate() {
526            if !player.is_local()
527                && !player.is_synchronized()
528                && !self.local_connect_status[i].read().disconnected
529            {
530                return false;
531            }
532        }
533        true
534    }
535}
536
537/// The main peer-to-peer Backroll session.
538///
539/// This type internally wraps an Arc<RwLock<...>>, so it is safe to
540/// send and access across threads, and is cheap to clone.
541pub struct P2PSession<T>(Arc<RwLock<P2PSessionRef<T>>>)
542where
543    T: Config;
544
545impl<T: Config> Clone for P2PSession<T> {
546    fn clone(&self) -> Self {
547        Self(self.0.clone())
548    }
549}
550
551impl<T: Config> P2PSession<T> {
552    pub fn build() -> P2PSessionBuilder<T> {
553        P2PSessionBuilder::new()
554    }
555
556    fn new_internal(builder: P2PSessionBuilder<T>) -> BackrollResult<Self> {
557        let local_player_count = builder
558            .players
559            .iter()
560            .filter(|player| player.is_local())
561            .count();
562        let remote_player_count = builder.players.len() - local_player_count;
563
564        if local_player_count > 1 && remote_player_count > 1 {
565            return Err(BackrollError::MultipleLocalPlayers);
566        }
567        let player_count = builder.players.len();
568        let connect_status: Vec<RwLock<ConnectionStatus>> =
569            (0..player_count).map(|_| Default::default()).collect();
570        let connect_status: Arc<[RwLock<ConnectionStatus>]> = connect_status.into();
571
572        let players: Vec<PlayerType<T>> = builder
573            .players
574            .iter()
575            .enumerate()
576            .map(|(i, player)| PlayerType::<T>::new(i, player, &builder, connect_status.clone()))
577            .collect();
578
579        let synchronizing = players.iter().any(|player| !player.is_local());
580        let config = sync::PlayerConfig {
581            player_count,
582            frame_delay: builder.frame_delay,
583        };
584        let sync = Sync::<T>::new(config, connect_status.clone());
585        Ok(Self(Arc::new(RwLock::new(P2PSessionRef::<T> {
586            sync,
587            players,
588            synchronizing,
589            next_recommended_sleep: 0,
590            local_connect_status: connect_status,
591        }))))
592    }
593
594    /// Gets the number of players in the current session. This includes
595    /// users that are already disconnected.
596    pub fn player_count(&self) -> usize {
597        self.0.read().sync.player_count()
598    }
599
600    /// Checks if the session currently in the middle of a rollback.
601    pub fn in_rollback(&self) -> bool {
602        self.0.read().sync.in_rollback()
603    }
604
605    /// Gets the current frame of the game.
606    pub fn current_frame(&self) -> Frame {
607        self.0.read().sync.frame_count()
608    }
609
610    pub fn local_players(&self) -> smallvec::SmallVec<[PlayerHandle; MAX_PLAYERS]> {
611        self.0
612            .read()
613            .players
614            .iter()
615            .enumerate()
616            .filter(|(_, player)| player.is_local())
617            .map(|(i, _)| PlayerHandle(i))
618            .collect()
619    }
620
621    pub fn remote_players(&self) -> smallvec::SmallVec<[PlayerHandle; MAX_PLAYERS]> {
622        self.0
623            .read()
624            .players
625            .iter()
626            .enumerate()
627            .filter(|(_, player)| player.is_remote_player())
628            .map(|(i, _)| PlayerHandle(i))
629            .collect()
630    }
631
632    /// Checks if all remote players are synchronized. If all players are
633    /// local, this will always return true.
634    pub fn is_synchronized(&self) -> bool {
635        self.0.read().is_synchronized()
636    }
637
638    /// Adds a local input for the current frame. This will register the input in the local
639    /// input queues, as well as queue the input to be sent to all remote players. If called multiple
640    /// times for the same player without advancing the session with [advance_frame], the previously
641    /// queued input for the frame will be overwritten.
642    ///
643    /// For a corrrect simulation, this must be called on all local players every frame before calling
644    /// [advance_frame].
645    ///
646    /// # Errors
647    /// Returns [BackrollError::InRollback] if the session is currently in the middle of a rollback.
648    ///
649    /// Returns [BackrollError::NotSynchronized] if the all of the remote peers have not yet
650    /// synchornized.
651    ///
652    /// Returns [BackrollError::InvalidPlayer] if the provided player handle does not point a vali
653    /// player.
654    ///
655    /// # Panics
656    /// This function will panic if the player is not a local player.
657    ///
658    /// [BackrollError]: crate::BackrollError
659    /// [advance_frame]: self::P2PSession::advance_frame
660    pub fn add_local_input(&self, player: PlayerHandle, input: T::Input) -> BackrollResult<()> {
661        let mut session_ref = self.0.write();
662        if session_ref.sync.in_rollback() {
663            return Err(BackrollError::InRollback);
664        }
665        if session_ref.synchronizing {
666            return Err(BackrollError::NotSynchronized);
667        }
668
669        let queue = session_ref.player_handle_to_queue(player)?;
670        assert!(
671            session_ref.players[queue].is_local(),
672            "{:?} is not a local player!",
673            player
674        );
675        let frame = session_ref.sync.add_local_input(queue, input)?;
676        if !is_null(frame) {
677            // Update the local connect status state to indicate that we've got a
678            // confirmed local frame for this player.  this must come first so it
679            // gets incorporated into the next packet we send.
680
681            debug!(
682                "setting local connect status for local queue {} to {}",
683                queue, frame
684            );
685            session_ref.local_connect_status[queue].write().last_frame = frame;
686
687            for player in session_ref.players.iter_mut() {
688                player.send_input(FrameInput::<T::Input> { frame, input });
689            }
690        }
691
692        Ok(())
693    }
694
695    /// Advances the game simulation by a single frame. This will issue a [Command::AdvanceFrame]
696    /// then check if the simulation is consistent with the inputs sent by remote players. If not, a
697    /// rollback will be triggered, and the game will be resimulated from the point of rollback.
698    ///
699    /// For a corrrect simulation, [add_local_input] must be called on all local players every frame before
700    /// calling this. If any call to [add_local_input] fails, this should not be called.
701    ///
702    /// All of the provided commands must be executed in order, and must not be reordered or skipped.
703    ///
704    /// [add_local_input]: self::P2PSession::add_local_input
705    /// [Command]: crate::command::Command
706    pub fn advance_frame(&self) -> Commands<T> {
707        let mut session_ref = self.0.write();
708        let mut commands = Commands::<T>::default();
709        debug!("End of frame ({})...", session_ref.sync.frame_count());
710        if !session_ref.synchronizing {
711            session_ref.sync.increment_frame(&mut commands);
712        }
713        session_ref.do_poll(&mut commands);
714        commands
715    }
716
717    /// Flushes lower level network events. This should always be called before adding local
718    /// inputs every frame of the game regardless of if the game is advancing it's state or
719    /// not.
720    ///
721    /// All of the provided commands must be executed in order, and must not be reordered or skipped.
722    pub fn poll(&self) -> Commands<T> {
723        let mut session_ref = self.0.write();
724        let mut commands = Commands::default();
725        session_ref.do_poll(&mut commands);
726        commands
727    }
728
729    /// Disconnects a player from the game.
730    ///
731    /// If called on a local player, this will disconnect the client from all remote peers.
732    ///
733    /// If called on a remote player, this will disconnect the connection with only that player.
734    ///
735    /// # Errors
736    /// Returns [BackrollError::InvalidPlayer] if the provided player handle does not point a vali
737    /// player.
738    ///
739    /// Returns [BackrollError::PlayerDisconnected] if the provided player is already disconnected.
740    pub fn disconnect_player(&self, player: PlayerHandle) -> BackrollResult<Commands<T>> {
741        let mut session_ref = self.0.write();
742        let queue = session_ref.player_handle_to_queue(player)?;
743        if session_ref.local_connect_status[queue].read().disconnected {
744            return Err(BackrollError::PlayerDisconnected(player));
745        }
746
747        let mut commands = Commands::<T>::default();
748        let last_frame = session_ref.local_connect_status[queue].read().last_frame;
749        if session_ref.players[queue].is_local() {
750            // The player is local. This should disconnect the local player from the rest
751            // of the game. All other players need to be disconnected.
752            // that if the endpoint is not initalized, this must be the local player.
753            let current_frame = session_ref.sync.frame_count();
754            debug!(
755                "Disconnecting local player {} at frame {} by user request.",
756                queue, last_frame
757            );
758            for i in 0..session_ref.players.len() {
759                if !session_ref.players[i].is_local() {
760                    session_ref.disconnect_player_queue(&mut commands, i, current_frame);
761                }
762            }
763        } else {
764            debug!(
765                "Disconnecting queue {} at frame {} by user request.",
766                queue, last_frame
767            );
768            session_ref.disconnect_player_queue(&mut commands, queue, last_frame);
769        }
770        Ok(commands)
771    }
772
773    /// Gets network statistics with a remote player.
774    ///
775    /// # Errors
776    /// Returns [BackrollError::InvalidPlayer] if the provided player handle does not point a vali
777    /// player.
778    pub fn get_network_stats(&self, player: PlayerHandle) -> BackrollResult<NetworkStats> {
779        let session_ref = self.0.read();
780        let queue = session_ref.player_handle_to_queue(player)?;
781        Ok(session_ref.players[queue]
782            .get_network_stats()
783            .unwrap_or_default())
784    }
785
786    /// Sets the frame delay for a given player.
787    ///
788    /// # Errors
789    /// Returns [BackrollError::InvalidPlayer] if the provided player handle does not point a vali
790    /// player.
791    pub fn set_frame_delay(&self, player: PlayerHandle, delay: Frame) -> BackrollResult<()> {
792        let mut session_ref = self.0.write();
793        let queue = session_ref.player_handle_to_queue(player)?;
794        session_ref.sync.set_frame_delay(queue, delay);
795        Ok(())
796    }
797}