pub mod err;
use crate::err::ClientLogicError;
use flood_rs::BufferDeserializer;
use flood_rs::{Deserialize, Serialize};
use log::{debug, trace};
use metricator::{AggregateMetric, MinMaxAvg};
use monotonic_time_rs::{Millis, MillisLow16};
use nimble_blob_stream::prelude::{FrontLogic, SenderToReceiverFrontCommands};
use nimble_participant::ParticipantId;
use nimble_protocol::client_to_host::{
ConnectRequest, DownloadGameStateRequest, JoinGameType, JoinPlayerRequest, JoinPlayerRequests,
};
use nimble_protocol::host_to_client::{
ConnectionAccepted, DownloadGameStateResponse, GameStepResponseHeader, PongInfo,
};
use nimble_protocol::prelude::*;
use nimble_protocol::{ClientRequestId, NIMBLE_PROTOCOL_VERSION};
use nimble_step::Step;
use nimble_step_map::StepMap;
use std::fmt::Debug;
use tick_id::TickId;
use tick_queue::Queue;
#[derive(Debug, PartialEq, Eq)]
pub enum ClientLogicPhase {
RequestConnect,
RequestDownloadState { download_state_request_id: u8 },
DownloadingState(TickId),
SendPredictedSteps,
}
pub type LocalIndex = u8;
#[derive(Debug, Clone)]
pub struct LocalPlayer {
pub index: LocalIndex,
pub participant_id: ParticipantId,
}
#[derive(Debug)]
pub struct ClientLogic<
StateT: BufferDeserializer,
StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
> {
deterministic_simulation_version: app_version::Version,
connect_request_id: Option<ClientRequestId>,
joining_player: Option<Vec<LocalIndex>>,
state: Option<StateT>,
blob_stream_client: FrontLogic,
outgoing_predicted_steps: Queue<StepMap<StepT>>,
incoming_authoritative_steps: Queue<StepMap<Step<StepT>>>,
phase: ClientLogicPhase,
server_buffer_delta_tick_id: AggregateMetric<i16>,
latency: AggregateMetric<u16>,
joining_request_id: ClientRequestId,
local_players: Vec<LocalPlayer>,
}
impl<
StateT: BufferDeserializer,
StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
> ClientLogic<StateT, StepT>
{
#[must_use]
#[allow(clippy::missing_panics_doc)]
pub fn new(deterministic_simulation_version: app_version::Version) -> Self {
Self {
joining_player: None,
joining_request_id: ClientRequestId(0),
blob_stream_client: FrontLogic::new(),
outgoing_predicted_steps: Queue::default(),
incoming_authoritative_steps: Queue::default(),
server_buffer_delta_tick_id: AggregateMetric::new(3).unwrap(),
state: None,
phase: ClientLogicPhase::RequestConnect,
local_players: Vec::new(),
deterministic_simulation_version,
connect_request_id: None,
latency: AggregateMetric::<u16>::new(10).unwrap().with_unit("ms"),
}
}
pub const fn debug_authoritative_steps(&self) -> &Queue<StepMap<Step<StepT>>> {
&self.incoming_authoritative_steps
}
pub const fn phase(&self) -> &ClientLogicPhase {
&self.phase
}
pub fn latency(&self) -> Option<MinMaxAvg<u16>> {
self.latency.values()
}
pub fn pop_all_authoritative_steps(&mut self) -> (TickId, Vec<StepMap<Step<StepT>>>) {
if let Some(first_tick_id) = self.incoming_authoritative_steps.front_tick_id() {
let vec = self.incoming_authoritative_steps.to_vec();
self.incoming_authoritative_steps.clear(first_tick_id + 1);
(first_tick_id, vec)
} else {
(TickId(0), vec![])
}
}
pub fn set_joining_player(&mut self, local_players: Vec<LocalIndex>) {
self.joining_player = Some(local_players);
}
fn download_state_request(
&mut self,
download_request_id: u8,
) -> Vec<ClientToHostCommands<StepT>> {
let mut vec = vec![];
let download_request = DownloadGameStateRequest {
request_id: download_request_id,
};
vec.push(ClientToHostCommands::DownloadGameState(download_request));
if let Some(cmd) = self.blob_stream_client.send() {
vec.push(ClientToHostCommands::BlobStreamChannel(cmd));
}
vec
}
fn send_connect_request(&mut self) -> ClientToHostCommands<StepT> {
if self.connect_request_id.is_none() {
self.connect_request_id = Some(ClientRequestId(0));
}
let connect_request = ConnectRequest {
nimble_version: NIMBLE_PROTOCOL_VERSION,
use_debug_stream: false,
application_version: Version {
major: self.deterministic_simulation_version.major(),
minor: self.deterministic_simulation_version.minor(),
patch: self.deterministic_simulation_version.patch(),
},
client_request_id: ClientRequestId(0),
};
ClientToHostCommands::ConnectType(connect_request)
}
fn send_steps_request(&self) -> ClientToHostCommands<StepT> {
let steps_request = StepsRequest {
ack: StepsAck {
waiting_for_tick_id: self.incoming_authoritative_steps.expected_write_tick_id(),
},
combined_predicted_steps: CombinedSteps::<StepT> {
tick_id: self
.outgoing_predicted_steps
.front_tick_id()
.unwrap_or_default(),
steps: self.outgoing_predicted_steps.to_vec(),
},
};
ClientToHostCommands::Steps(steps_request)
}
pub const fn debug_connect_request_id(&self) -> Option<ClientRequestId> {
self.connect_request_id
}
#[must_use]
pub fn send(&mut self, now: Millis) -> Vec<ClientToHostCommands<StepT>> {
let mut commands: Vec<ClientToHostCommands<StepT>> = vec![];
if self.phase != ClientLogicPhase::RequestConnect {
let ping = ClientToHostCommands::Ping(now.to_lower());
commands.push(ping);
if let Some(joining_players) = &self.joining_player {
debug!("connected. send join_game_request {:?}", joining_players);
let player_requests = joining_players
.iter()
.map(|local_index| JoinPlayerRequest {
local_index: *local_index,
})
.collect();
let join_command = ClientToHostCommands::JoinGameType(JoinGameRequest {
client_request_id: self.joining_request_id,
join_game_type: JoinGameType::NoSecret,
player_requests: JoinPlayerRequests {
players: player_requests,
},
});
trace!("send join command: {join_command:?}");
commands.push(join_command);
}
}
let normal_commands: Vec<ClientToHostCommands<StepT>> = match self.phase {
ClientLogicPhase::RequestDownloadState {
download_state_request_id,
} => self.download_state_request(download_state_request_id),
ClientLogicPhase::SendPredictedSteps => [self.send_steps_request()].to_vec(),
ClientLogicPhase::DownloadingState(_) => {
self.blob_stream_client.send().map_or_else(Vec::new, |x| {
[ClientToHostCommands::BlobStreamChannel(x)].to_vec()
})
}
ClientLogicPhase::RequestConnect => [self.send_connect_request()].to_vec(),
};
commands.extend(normal_commands);
commands
}
pub fn can_push_predicted_step(&self) -> bool {
self.is_in_game() && self.game().is_some()
}
pub fn is_in_game(&self) -> bool {
self.phase == ClientLogicPhase::SendPredictedSteps
&& self.joining_player.is_none()
&& !self.local_players.is_empty()
}
pub fn push_predicted_step(
&mut self,
tick_id: TickId,
step: StepMap<StepT>,
) -> Result<(), ClientLogicError> {
self.outgoing_predicted_steps.push(tick_id, step)?;
Ok(())
}
pub fn predicted_step_count_in_queue(&self) -> usize {
self.outgoing_predicted_steps.len()
}
fn on_pong(&mut self, now: Millis, cmd: &PongInfo) -> Result<(), ClientLogicError> {
let low_16 = cmd.lower_millis as MillisLow16;
let earlier = now
.from_lower(low_16)
.ok_or_else(|| ClientLogicError::MillisFromLowerError)?;
let duration_ms = now
.checked_duration_since_ms(earlier)
.ok_or_else(|| ClientLogicError::AbsoluteTimeError)?;
self.latency.add(
u16::try_from(duration_ms.as_millis())
.map_err(|_| ClientLogicError::LatencyIsTooBig)?,
);
Ok(())
}
fn on_join_game(&mut self, cmd: &JoinGameAccepted) -> Result<(), ClientLogicError> {
debug!("join game accepted: {:?}", cmd);
if cmd.client_request_id != self.joining_request_id {
Err(ClientLogicError::WrongJoinResponseRequestId {
encountered: cmd.client_request_id,
expected: self.joining_request_id,
})?;
}
self.joining_player = None;
self.local_players.clear();
for participant in &cmd.participants.0 {
self.local_players.push(LocalPlayer {
index: participant.local_index,
participant_id: participant.participant_id,
});
}
Ok(())
}
pub const fn game(&self) -> Option<&StateT> {
self.state.as_ref()
}
pub fn game_mut(&mut self) -> Option<&mut StateT> {
self.state.as_mut()
}
fn handle_game_step_header(&mut self, header: &GameStepResponseHeader) {
let host_expected_tick_id = header.next_expected_tick_id;
self.server_buffer_delta_tick_id
.add(i16::from(header.delta_buffer));
trace!("removing every predicted step before {host_expected_tick_id}");
self.outgoing_predicted_steps
.discard_up_to(host_expected_tick_id);
trace!(
"predicted steps remaining {}",
self.outgoing_predicted_steps.len()
);
}
fn on_connect(&mut self, cmd: &ConnectionAccepted) -> Result<(), ClientLogicError> {
if self.phase != ClientLogicPhase::RequestConnect {
Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
}
if self.connect_request_id.is_none() {
Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
}
if cmd.response_to_request != self.connect_request_id.unwrap() {
Err(ClientLogicError::WrongConnectResponseRequestId(
cmd.response_to_request,
))?;
}
self.phase = ClientLogicPhase::RequestDownloadState {
download_state_request_id: 0x99,
}; debug!("set phase to connected!");
Ok(())
}
fn on_game_step(
&mut self,
cmd: &GameStepResponse<Step<StepT>>,
) -> Result<(), ClientLogicError> {
trace!("game step response: {}", cmd);
self.handle_game_step_header(&cmd.response_header);
if cmd.authoritative_steps.ranges.is_empty() {
return Ok(());
}
let mut accepted_count = 0;
for range in &cmd.authoritative_steps.ranges {
let range_len = u32::try_from(range.steps.len())
.map_err(|_| ClientLogicError::TooManyStepsInRange)?;
let mut current_authoritative_tick_id = range.tick_id;
for combined_auth_step in &range.steps {
if current_authoritative_tick_id
== self.incoming_authoritative_steps.expected_write_tick_id()
{
self.incoming_authoritative_steps
.push(current_authoritative_tick_id, combined_auth_step.clone())?;
accepted_count += 1;
}
current_authoritative_tick_id += 1;
}
current_authoritative_tick_id += range_len;
}
if accepted_count > 0 {
trace!(
"accepted {accepted_count} auth steps, waiting for {}, total count: {}",
self.incoming_authoritative_steps.expected_write_tick_id(),
self.incoming_authoritative_steps.len(),
);
}
Ok(())
}
fn on_download_state_response(
&mut self,
download_response: &DownloadGameStateResponse,
) -> Result<(), ClientLogicError> {
match self.phase {
ClientLogicPhase::RequestDownloadState {
download_state_request_id,
} => {
if download_response.client_request != download_state_request_id {
Err(ClientLogicError::WrongDownloadRequestId)?;
}
}
_ => Err(ClientLogicError::DownloadResponseWasUnexpected)?,
}
self.phase = ClientLogicPhase::DownloadingState(download_response.tick_id);
Ok(())
}
fn on_blob_stream(
&mut self,
blob_stream_command: &SenderToReceiverFrontCommands,
) -> Result<(), ClientLogicError> {
match self.phase {
ClientLogicPhase::DownloadingState(_) => {
self.blob_stream_client.receive(blob_stream_command)?;
if let Some(blob_ready) = self.blob_stream_client.blob() {
debug!("blob stream received, phase is set to SendPredictedSteps");
self.phase = ClientLogicPhase::SendPredictedSteps;
let (deserialized, _) = StateT::deserialize(blob_ready)?;
self.state = Some(deserialized);
}
}
_ => Err(ClientLogicError::UnexpectedBlobChannelCommand)?,
}
Ok(())
}
pub fn receive(
&mut self,
now: Millis,
command: &HostToClientCommands<Step<StepT>>,
) -> Result<(), ClientLogicError> {
match command {
HostToClientCommands::JoinGame(ref join_game_response) => {
self.on_join_game(join_game_response)
}
HostToClientCommands::GameStep(ref game_step_response) => {
self.on_game_step(game_step_response)
}
HostToClientCommands::DownloadGameState(ref download_response) => {
self.on_download_state_response(download_response)
}
HostToClientCommands::BlobStreamChannel(ref blob_stream_command) => {
self.on_blob_stream(blob_stream_command)
}
HostToClientCommands::ConnectType(ref connect_accepted) => {
self.on_connect(connect_accepted)
}
HostToClientCommands::Pong(pong_info) => self.on_pong(now, pong_info),
}
}
pub fn server_buffer_delta_ticks(&self) -> Option<i16> {
self.server_buffer_delta_tick_id
.average()
.map(|value| value.round() as i16)
}
pub fn local_players(&self) -> Vec<LocalPlayer> {
self.local_players.clone()
}
}