nimble_client_logic/
lib.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5
6/*!
7
8# Nimble Client Logic
9
10`nimble-client-logic` is a Rust crate designed to manage the client-side logic for multiplayer game
11sessions using the Nimble protocol. It handles the creation and processing of messages,
12including requests and responses:
13
14- Download a complete game state from the host.
15- Add and remove participants from the game session.
16- Send predicted player inputs (steps) to the host.
17- Receive authoritative combined steps from the host.
18
19This crate ensures seamless synchronization between the client and host, maintaining the integrity
20and consistency of the game state across all participants.
21
22## Features
23
24- **Connection Management**: Handles connecting to the host, agreeing on protocol versions,
25    and managing connection states.
26- **Game State Handling**: Downloads and maintains the complete game state from the host.
27- **Participant Management**: Adds and removes players from the game session dynamically.
28- **Step Prediction and Reconciliation**: Sends predicted player steps to the host and reconciles
29    them with authoritative steps received from the host.
30- **Blob Streaming**: Manages blob streaming for efficient game state transfers.
31
32## Usage
33
34Add `nimble-client-logic` to your `Cargo.toml`:
35
36```toml
37[dependencies]
38nimble-client-logic = "0.0.14-dev"
39```
40
41*/
42
43pub mod err;
44
45use crate::err::ClientLogicError;
46use flood_rs::BufferDeserializer;
47use flood_rs::{Deserialize, Serialize};
48use log::{debug, trace};
49use metricator::{AggregateMetric, MinMaxAvg};
50use monotonic_time_rs::{Millis, MillisLow16};
51use nimble_blob_stream::prelude::{FrontLogic, SenderToReceiverFrontCommands};
52use nimble_participant::ParticipantId;
53use nimble_protocol::client_to_host::{
54    ConnectRequest, DownloadGameStateRequest, JoinGameType, JoinPlayerRequest, JoinPlayerRequests,
55};
56use nimble_protocol::host_to_client::{
57    ConnectionAccepted, DownloadGameStateResponse, GameStepResponseHeader, PongInfo,
58};
59use nimble_protocol::prelude::*;
60use nimble_protocol::{ClientRequestId, NIMBLE_PROTOCOL_VERSION};
61use nimble_step::Step;
62use nimble_step_map::StepMap;
63use std::fmt::Debug;
64use tick_id::TickId;
65use tick_queue::Queue;
66
67/// Represents the various phases of the client logic.
68#[derive(Debug, PartialEq, Eq)]
69pub enum ClientLogicPhase {
70    /// Request Connect (agreeing on abilities, such as version)
71    RequestConnect,
72
73    /// Requesting a download of the game state.
74    RequestDownloadState { download_state_request_id: u8 },
75
76    /// Downloading the game state from the host.
77    DownloadingState(TickId),
78
79    /// Sending predicted steps from the client to the host.
80    SendPredictedSteps,
81}
82
83pub type LocalIndex = u8;
84
85#[derive(Debug, Clone)]
86pub struct LocalPlayer {
87    pub index: LocalIndex,
88    pub participant_id: ParticipantId,
89}
90
91/// `ClientLogic` manages the client's state and communication logic
92/// with the host in a multiplayer game session.
93///
94/// # Type Parameters
95/// * `StateT`: A type implementing representing the game state.
96/// * `StepT`: A type implementing representing the game steps.
97#[derive(Debug)]
98pub struct ClientLogic<
99    StateT: BufferDeserializer,
100    StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
101> {
102    // The deterministic simulation version
103    deterministic_simulation_version: app_version::Version,
104
105    connect_request_id: Option<ClientRequestId>,
106
107    /// Represents the player's join game request, if available.
108    joining_player: Option<Vec<LocalIndex>>,
109
110    /// Holds the current game state.
111    state: Option<StateT>,
112
113    /// Manages the blob stream logic for the client.
114    blob_stream_client: FrontLogic,
115
116    /// Stores the outgoing predicted steps from the client.
117    outgoing_predicted_steps: Queue<StepMap<StepT>>,
118
119    /// Stores the incoming authoritative steps from the host.
120    incoming_authoritative_steps: Queue<StepMap<Step<StepT>>>,
121
122    /// Represents the current phase of the client's logic.
123    phase: ClientLogicPhase,
124
125    /// Tracks the delta of tick id on the server.
126    server_buffer_delta_tick_id: AggregateMetric<i16>,
127
128    // Latency
129    latency: AggregateMetric<u16>,
130
131    /// Tracks the buffer step count on the server.
132    //server_buffer_count: AggregateMetric<u8>,
133    joining_request_id: ClientRequestId,
134
135    local_players: Vec<LocalPlayer>,
136}
137
138impl<
139        StateT: BufferDeserializer,
140        StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
141    > ClientLogic<StateT, StepT>
142{
143    /// Creates a new `ClientLogic` instance, initializing all fields.
144    #[must_use]
145    #[allow(clippy::missing_panics_doc)]
146    pub fn new(deterministic_simulation_version: app_version::Version) -> Self {
147        Self {
148            joining_player: None,
149            joining_request_id: ClientRequestId(0),
150            blob_stream_client: FrontLogic::new(),
151            outgoing_predicted_steps: Queue::default(),
152            incoming_authoritative_steps: Queue::default(),
153            server_buffer_delta_tick_id: AggregateMetric::new(3).unwrap(),
154            //server_buffer_count: AggregateMetric::new(3).unwrap(),
155            state: None,
156            phase: ClientLogicPhase::RequestConnect,
157            local_players: Vec::new(),
158            deterministic_simulation_version,
159            connect_request_id: None,
160            latency: AggregateMetric::<u16>::new(10).unwrap().with_unit("ms"),
161        }
162    }
163
164    /// Returns a reference to the incoming authoritative steps.
165    pub const fn debug_authoritative_steps(&self) -> &Queue<StepMap<Step<StepT>>> {
166        &self.incoming_authoritative_steps
167    }
168
169    pub const fn phase(&self) -> &ClientLogicPhase {
170        &self.phase
171    }
172
173    pub fn latency(&self) -> Option<MinMaxAvg<u16>> {
174        self.latency.values()
175    }
176
177    pub fn pop_all_authoritative_steps(&mut self) -> (TickId, Vec<StepMap<Step<StepT>>>) {
178        if let Some(first_tick_id) = self.incoming_authoritative_steps.front_tick_id() {
179            let vec = self.incoming_authoritative_steps.to_vec();
180            self.incoming_authoritative_steps.clear(first_tick_id + 1);
181            (first_tick_id, vec)
182        } else {
183            (TickId(0), vec![])
184        }
185    }
186
187    /// Sets the joining player request for this client.
188    ///
189    /// # Arguments
190    /// * `join_game_request`: The join game request to send to the host.
191    pub fn set_joining_player(&mut self, local_players: Vec<LocalIndex>) {
192        self.joining_player = Some(local_players);
193    }
194
195    /// Generates a download state request command to send to the host.
196    ///
197    /// # Arguments
198    /// * `download_request_id`: The request ID for the download state.
199    ///
200    /// # Returns
201    /// A vector of `ClientToHostCommands`.
202    fn download_state_request(
203        &mut self,
204        download_request_id: u8,
205    ) -> Vec<ClientToHostCommands<StepT>> {
206        let mut vec = vec![];
207        let download_request = DownloadGameStateRequest {
208            request_id: download_request_id,
209        };
210        vec.push(ClientToHostCommands::DownloadGameState(download_request));
211
212        if let Some(cmd) = self.blob_stream_client.send() {
213            vec.push(ClientToHostCommands::BlobStreamChannel(cmd));
214        }
215
216        vec
217    }
218
219    fn send_connect_request(&mut self) -> ClientToHostCommands<StepT> {
220        if self.connect_request_id.is_none() {
221            self.connect_request_id = Some(ClientRequestId(0));
222        }
223
224        let connect_request = ConnectRequest {
225            nimble_version: NIMBLE_PROTOCOL_VERSION,
226            use_debug_stream: false,
227            application_version: Version {
228                major: self.deterministic_simulation_version.major(),
229                minor: self.deterministic_simulation_version.minor(),
230                patch: self.deterministic_simulation_version.patch(),
231            },
232            client_request_id: ClientRequestId(0),
233        };
234
235        ClientToHostCommands::ConnectType(connect_request)
236    }
237
238    /// Sends the predicted steps to the host.
239    ///
240    /// # Returns
241    /// A `ClientToHostCommands` representing the predicted steps.
242    fn send_steps_request(&self) -> ClientToHostCommands<StepT> {
243        let steps_request = StepsRequest {
244            ack: StepsAck {
245                waiting_for_tick_id: self.incoming_authoritative_steps.expected_write_tick_id(),
246            },
247            combined_predicted_steps: CombinedSteps::<StepT> {
248                tick_id: self
249                    .outgoing_predicted_steps
250                    .front_tick_id()
251                    .unwrap_or_default(),
252                steps: self.outgoing_predicted_steps.to_vec(),
253            },
254        };
255
256        ClientToHostCommands::Steps(steps_request)
257    }
258
259    pub const fn debug_connect_request_id(&self) -> Option<ClientRequestId> {
260        self.connect_request_id
261    }
262
263    /// Returns client commands that should be sent to the host.
264    ///
265    /// # Returns
266    /// A vector of `ClientToHostCommands` representing all the commands to be sent to the host.
267    #[must_use]
268    pub fn send(&mut self, now: Millis) -> Vec<ClientToHostCommands<StepT>> {
269        let mut commands: Vec<ClientToHostCommands<StepT>> = vec![];
270
271        if self.phase != ClientLogicPhase::RequestConnect {
272            // Always send ping when connected
273            let ping = ClientToHostCommands::Ping(now.to_lower());
274            commands.push(ping);
275
276            if let Some(joining_players) = &self.joining_player {
277                debug!("connected. send join_game_request {:?}", joining_players);
278
279                let player_requests = joining_players
280                    .iter()
281                    .map(|local_index| JoinPlayerRequest {
282                        local_index: *local_index,
283                    })
284                    .collect();
285                let join_command = ClientToHostCommands::JoinGameType(JoinGameRequest {
286                    client_request_id: self.joining_request_id,
287                    join_game_type: JoinGameType::NoSecret,
288                    player_requests: JoinPlayerRequests {
289                        players: player_requests,
290                    },
291                });
292                trace!("send join command: {join_command:?}");
293                commands.push(join_command);
294            }
295        }
296
297        let normal_commands: Vec<ClientToHostCommands<StepT>> = match self.phase {
298            ClientLogicPhase::RequestDownloadState {
299                download_state_request_id,
300            } => self.download_state_request(download_state_request_id),
301            ClientLogicPhase::SendPredictedSteps => [self.send_steps_request()].to_vec(),
302            ClientLogicPhase::DownloadingState(_) => {
303                self.blob_stream_client.send().map_or_else(Vec::new, |x| {
304                    [ClientToHostCommands::BlobStreamChannel(x)].to_vec()
305                })
306            }
307            ClientLogicPhase::RequestConnect => [self.send_connect_request()].to_vec(),
308        };
309
310        commands.extend(normal_commands);
311
312        commands
313    }
314
315    pub fn can_push_predicted_step(&self) -> bool {
316        self.is_in_game() && self.game().is_some()
317    }
318
319    pub fn is_in_game(&self) -> bool {
320        self.phase == ClientLogicPhase::SendPredictedSteps
321            && self.joining_player.is_none()
322            && !self.local_players.is_empty()
323    }
324
325    /// Adds a predicted step to the outgoing steps queue.
326    ///
327    /// # Arguments
328    /// * `tick_id`: The tick ID of the step.
329    /// * `step`: The predicted step to add.
330    ///
331    /// # Errors
332    /// Returns a [`StepsError`] if the step is empty or cannot be added.
333    pub fn push_predicted_step(
334        &mut self,
335        tick_id: TickId,
336        step: StepMap<StepT>,
337    ) -> Result<(), ClientLogicError> {
338        self.outgoing_predicted_steps.push(tick_id, step)?;
339        Ok(())
340    }
341
342    pub fn predicted_step_count_in_queue(&self) -> usize {
343        self.outgoing_predicted_steps.len()
344    }
345
346    fn on_pong(&mut self, now: Millis, cmd: &PongInfo) -> Result<(), ClientLogicError> {
347        let low_16 = cmd.lower_millis as MillisLow16;
348
349        let earlier = now
350            .from_lower(low_16)
351            .ok_or_else(|| ClientLogicError::MillisFromLowerError)?;
352        let duration_ms = now
353            .checked_duration_since_ms(earlier)
354            .ok_or_else(|| ClientLogicError::AbsoluteTimeError)?;
355
356        self.latency.add(
357            u16::try_from(duration_ms.as_millis())
358                .map_err(|_| ClientLogicError::LatencyIsTooBig)?,
359        );
360
361        Ok(())
362    }
363
364    /// Handles the reception of the join game acceptance message from the host.
365    ///
366    /// # Arguments
367    /// * `cmd`: The join game acceptance command.
368    ///
369    /// # Errors
370    /// Returns a [`ClientErrorKind`] if the join game process encounters an error.
371    fn on_join_game(&mut self, cmd: &JoinGameAccepted) -> Result<(), ClientLogicError> {
372        debug!("join game accepted: {:?}", cmd);
373
374        if cmd.client_request_id != self.joining_request_id {
375            Err(ClientLogicError::WrongJoinResponseRequestId {
376                encountered: cmd.client_request_id,
377                expected: self.joining_request_id,
378            })?;
379        }
380
381        self.joining_player = None;
382
383        self.local_players.clear();
384
385        for participant in &cmd.participants.0 {
386            self.local_players.push(LocalPlayer {
387                index: participant.local_index,
388                participant_id: participant.participant_id,
389            });
390        }
391
392        Ok(())
393    }
394
395    /// Returns the received game state from the host.
396    ///
397    /// # Returns
398    /// An `Option` containing a reference to the received game state, if available.
399    pub const fn game(&self) -> Option<&StateT> {
400        self.state.as_ref()
401    }
402
403    pub fn game_mut(&mut self) -> Option<&mut StateT> {
404        self.state.as_mut()
405    }
406
407    /// Processes the game step response header received from the host.
408    ///
409    /// # Arguments
410    /// * `header`: The game step response header.
411    fn handle_game_step_header(&mut self, header: &GameStepResponseHeader) {
412        let host_expected_tick_id = header.next_expected_tick_id;
413        self.server_buffer_delta_tick_id
414            .add(i16::from(header.delta_buffer));
415        //self.server_buffer_count.add(header.connection_buffer_count);
416        trace!("removing every predicted step before {host_expected_tick_id}");
417        self.outgoing_predicted_steps
418            .discard_up_to(host_expected_tick_id);
419        trace!(
420            "predicted steps remaining {}",
421            self.outgoing_predicted_steps.len()
422        );
423    }
424
425    fn on_connect(&mut self, cmd: &ConnectionAccepted) -> Result<(), ClientLogicError> {
426        if self.phase != ClientLogicPhase::RequestConnect {
427            Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
428        }
429
430        if self.connect_request_id.is_none() {
431            Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
432        }
433
434        if cmd.response_to_request != self.connect_request_id.unwrap() {
435            Err(ClientLogicError::WrongConnectResponseRequestId(
436                cmd.response_to_request,
437            ))?;
438        }
439        self.phase = ClientLogicPhase::RequestDownloadState {
440            download_state_request_id: 0x99,
441        }; // TODO: proper download state request id
442        debug!("set phase to connected!");
443        Ok(())
444    }
445
446    /// Handles the reception of a game step response from the host.
447    ///
448    /// # Arguments
449    /// * `cmd`: The game step response.
450    ///
451    /// # Errors
452    /// Returns a `ClientErrorKind` if there are issues processing the game steps.
453    fn on_game_step(
454        &mut self,
455        cmd: &GameStepResponse<Step<StepT>>,
456    ) -> Result<(), ClientLogicError> {
457        trace!("game step response: {}", cmd);
458
459        self.handle_game_step_header(&cmd.response_header);
460
461        if cmd.authoritative_steps.ranges.is_empty() {
462            return Ok(());
463        }
464
465        let mut accepted_count = 0;
466
467        for range in &cmd.authoritative_steps.ranges {
468            let range_len = u32::try_from(range.steps.len())
469                .map_err(|_| ClientLogicError::TooManyStepsInRange)?;
470            let mut current_authoritative_tick_id = range.tick_id;
471            for combined_auth_step in &range.steps {
472                if current_authoritative_tick_id
473                    == self.incoming_authoritative_steps.expected_write_tick_id()
474                {
475                    self.incoming_authoritative_steps
476                        .push(current_authoritative_tick_id, combined_auth_step.clone())?;
477                    accepted_count += 1;
478                }
479                current_authoritative_tick_id += 1;
480            }
481
482            current_authoritative_tick_id += range_len;
483        }
484
485        if accepted_count > 0 {
486            trace!(
487                "accepted {accepted_count} auth steps, waiting for {}, total count: {}",
488                self.incoming_authoritative_steps.expected_write_tick_id(),
489                self.incoming_authoritative_steps.len(),
490            );
491        }
492
493        Ok(())
494    }
495
496    /// Handles the reception of a download game state response.
497    ///
498    /// # Arguments
499    /// * `download_response`: The download game state response from the host.
500    ///
501    /// # Errors
502    /// Returns a `ClientErrorKind` if the download response is unexpected or has a mismatched request ID.
503    fn on_download_state_response(
504        &mut self,
505        download_response: &DownloadGameStateResponse,
506    ) -> Result<(), ClientLogicError> {
507        match self.phase {
508            ClientLogicPhase::RequestDownloadState {
509                download_state_request_id,
510            } => {
511                if download_response.client_request != download_state_request_id {
512                    Err(ClientLogicError::WrongDownloadRequestId)?;
513                }
514            }
515            _ => Err(ClientLogicError::DownloadResponseWasUnexpected)?,
516        }
517
518        self.phase = ClientLogicPhase::DownloadingState(download_response.tick_id);
519
520        Ok(())
521    }
522
523    /// Handles the reception of a blob stream command.
524    ///
525    /// # Arguments
526    /// * `blob_stream_command`: The blob stream command from the host.
527    ///
528    /// # Errors
529    /// Returns a `ClientErrorKind` if the blob stream command is unexpected.
530    fn on_blob_stream(
531        &mut self,
532        blob_stream_command: &SenderToReceiverFrontCommands,
533    ) -> Result<(), ClientLogicError> {
534        match self.phase {
535            ClientLogicPhase::DownloadingState(_) => {
536                self.blob_stream_client.receive(blob_stream_command)?;
537                if let Some(blob_ready) = self.blob_stream_client.blob() {
538                    debug!("blob stream received, phase is set to SendPredictedSteps");
539                    self.phase = ClientLogicPhase::SendPredictedSteps;
540                    let (deserialized, _) = StateT::deserialize(blob_ready)?;
541                    self.state = Some(deserialized);
542                }
543            }
544            _ => Err(ClientLogicError::UnexpectedBlobChannelCommand)?,
545        }
546        Ok(())
547    }
548
549    /// Receives a command from the host and processes it accordingly.
550    ///
551    /// # Arguments
552    /// * `command`: The command from the host.
553    ///
554    /// # Errors
555    /// Returns a [`ClientErrorKind`] if the command cannot be processed.
556    pub fn receive(
557        &mut self,
558        now: Millis,
559        command: &HostToClientCommands<Step<StepT>>,
560    ) -> Result<(), ClientLogicError> {
561        match command {
562            HostToClientCommands::JoinGame(ref join_game_response) => {
563                self.on_join_game(join_game_response)
564            }
565            HostToClientCommands::GameStep(ref game_step_response) => {
566                self.on_game_step(game_step_response)
567            }
568            HostToClientCommands::DownloadGameState(ref download_response) => {
569                self.on_download_state_response(download_response)
570            }
571            HostToClientCommands::BlobStreamChannel(ref blob_stream_command) => {
572                self.on_blob_stream(blob_stream_command)
573            }
574            HostToClientCommands::ConnectType(ref connect_accepted) => {
575                self.on_connect(connect_accepted)
576            }
577            HostToClientCommands::Pong(pong_info) => self.on_pong(now, pong_info),
578        }
579    }
580
581    /// Returns the average server buffer delta tick, if available.
582    ///
583    /// # Returns
584    /// An optional average server buffer delta tick.
585    pub fn server_buffer_delta_ticks(&self) -> Option<i16> {
586        self.server_buffer_delta_tick_id
587            .average()
588            .map(|value| value.round() as i16)
589    }
590
591    pub fn local_players(&self) -> Vec<LocalPlayer> {
592        self.local_players.clone()
593    }
594}