1pub mod err;
44
45use crate::err::ClientLogicError;
46use flood_rs::BufferDeserializer;
47use flood_rs::{Deserialize, Serialize};
48use log::{debug, trace};
49use metricator::{AggregateMetric, MinMaxAvg};
50use monotonic_time_rs::{Millis, MillisLow16};
51use nimble_blob_stream::prelude::{FrontLogic, SenderToReceiverFrontCommands};
52use nimble_participant::ParticipantId;
53use nimble_protocol::client_to_host::{
54 ConnectRequest, DownloadGameStateRequest, JoinGameType, JoinPlayerRequest, JoinPlayerRequests,
55};
56use nimble_protocol::host_to_client::{
57 ConnectionAccepted, DownloadGameStateResponse, GameStepResponseHeader, PongInfo,
58};
59use nimble_protocol::prelude::*;
60use nimble_protocol::{ClientRequestId, NIMBLE_PROTOCOL_VERSION};
61use nimble_step::Step;
62use nimble_step_map::StepMap;
63use std::fmt::Debug;
64use tick_id::TickId;
65use tick_queue::Queue;
66
67#[derive(Debug, PartialEq, Eq)]
69pub enum ClientLogicPhase {
70 RequestConnect,
72
73 RequestDownloadState { download_state_request_id: u8 },
75
76 DownloadingState(TickId),
78
79 SendPredictedSteps,
81}
82
83pub type LocalIndex = u8;
84
85#[derive(Debug, Clone)]
86pub struct LocalPlayer {
87 pub index: LocalIndex,
88 pub participant_id: ParticipantId,
89}
90
91#[derive(Debug)]
98pub struct ClientLogic<
99 StateT: BufferDeserializer,
100 StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
101> {
102 deterministic_simulation_version: app_version::Version,
104
105 connect_request_id: Option<ClientRequestId>,
106
107 joining_player: Option<Vec<LocalIndex>>,
109
110 state: Option<StateT>,
112
113 blob_stream_client: FrontLogic,
115
116 outgoing_predicted_steps: Queue<StepMap<StepT>>,
118
119 incoming_authoritative_steps: Queue<StepMap<Step<StepT>>>,
121
122 phase: ClientLogicPhase,
124
125 server_buffer_delta_tick_id: AggregateMetric<i16>,
127
128 latency: AggregateMetric<u16>,
130
131 joining_request_id: ClientRequestId,
134
135 local_players: Vec<LocalPlayer>,
136}
137
138impl<
139 StateT: BufferDeserializer,
140 StepT: Clone + Deserialize + Serialize + Debug + std::fmt::Display,
141 > ClientLogic<StateT, StepT>
142{
143 #[must_use]
145 #[allow(clippy::missing_panics_doc)]
146 pub fn new(deterministic_simulation_version: app_version::Version) -> Self {
147 Self {
148 joining_player: None,
149 joining_request_id: ClientRequestId(0),
150 blob_stream_client: FrontLogic::new(),
151 outgoing_predicted_steps: Queue::default(),
152 incoming_authoritative_steps: Queue::default(),
153 server_buffer_delta_tick_id: AggregateMetric::new(3).unwrap(),
154 state: None,
156 phase: ClientLogicPhase::RequestConnect,
157 local_players: Vec::new(),
158 deterministic_simulation_version,
159 connect_request_id: None,
160 latency: AggregateMetric::<u16>::new(10).unwrap().with_unit("ms"),
161 }
162 }
163
164 pub const fn debug_authoritative_steps(&self) -> &Queue<StepMap<Step<StepT>>> {
166 &self.incoming_authoritative_steps
167 }
168
169 pub const fn phase(&self) -> &ClientLogicPhase {
170 &self.phase
171 }
172
173 pub fn latency(&self) -> Option<MinMaxAvg<u16>> {
174 self.latency.values()
175 }
176
177 pub fn pop_all_authoritative_steps(&mut self) -> (TickId, Vec<StepMap<Step<StepT>>>) {
178 if let Some(first_tick_id) = self.incoming_authoritative_steps.front_tick_id() {
179 let vec = self.incoming_authoritative_steps.to_vec();
180 self.incoming_authoritative_steps.clear(first_tick_id + 1);
181 (first_tick_id, vec)
182 } else {
183 (TickId(0), vec![])
184 }
185 }
186
187 pub fn set_joining_player(&mut self, local_players: Vec<LocalIndex>) {
192 self.joining_player = Some(local_players);
193 }
194
195 fn download_state_request(
203 &mut self,
204 download_request_id: u8,
205 ) -> Vec<ClientToHostCommands<StepT>> {
206 let mut vec = vec![];
207 let download_request = DownloadGameStateRequest {
208 request_id: download_request_id,
209 };
210 vec.push(ClientToHostCommands::DownloadGameState(download_request));
211
212 if let Some(cmd) = self.blob_stream_client.send() {
213 vec.push(ClientToHostCommands::BlobStreamChannel(cmd));
214 }
215
216 vec
217 }
218
219 fn send_connect_request(&mut self) -> ClientToHostCommands<StepT> {
220 if self.connect_request_id.is_none() {
221 self.connect_request_id = Some(ClientRequestId(0));
222 }
223
224 let connect_request = ConnectRequest {
225 nimble_version: NIMBLE_PROTOCOL_VERSION,
226 use_debug_stream: false,
227 application_version: Version {
228 major: self.deterministic_simulation_version.major(),
229 minor: self.deterministic_simulation_version.minor(),
230 patch: self.deterministic_simulation_version.patch(),
231 },
232 client_request_id: ClientRequestId(0),
233 };
234
235 ClientToHostCommands::ConnectType(connect_request)
236 }
237
238 fn send_steps_request(&self) -> ClientToHostCommands<StepT> {
243 let steps_request = StepsRequest {
244 ack: StepsAck {
245 waiting_for_tick_id: self.incoming_authoritative_steps.expected_write_tick_id(),
246 },
247 combined_predicted_steps: CombinedSteps::<StepT> {
248 tick_id: self
249 .outgoing_predicted_steps
250 .front_tick_id()
251 .unwrap_or_default(),
252 steps: self.outgoing_predicted_steps.to_vec(),
253 },
254 };
255
256 ClientToHostCommands::Steps(steps_request)
257 }
258
259 pub const fn debug_connect_request_id(&self) -> Option<ClientRequestId> {
260 self.connect_request_id
261 }
262
263 #[must_use]
268 pub fn send(&mut self, now: Millis) -> Vec<ClientToHostCommands<StepT>> {
269 let mut commands: Vec<ClientToHostCommands<StepT>> = vec![];
270
271 if self.phase != ClientLogicPhase::RequestConnect {
272 let ping = ClientToHostCommands::Ping(now.to_lower());
274 commands.push(ping);
275
276 if let Some(joining_players) = &self.joining_player {
277 debug!("connected. send join_game_request {:?}", joining_players);
278
279 let player_requests = joining_players
280 .iter()
281 .map(|local_index| JoinPlayerRequest {
282 local_index: *local_index,
283 })
284 .collect();
285 let join_command = ClientToHostCommands::JoinGameType(JoinGameRequest {
286 client_request_id: self.joining_request_id,
287 join_game_type: JoinGameType::NoSecret,
288 player_requests: JoinPlayerRequests {
289 players: player_requests,
290 },
291 });
292 trace!("send join command: {join_command:?}");
293 commands.push(join_command);
294 }
295 }
296
297 let normal_commands: Vec<ClientToHostCommands<StepT>> = match self.phase {
298 ClientLogicPhase::RequestDownloadState {
299 download_state_request_id,
300 } => self.download_state_request(download_state_request_id),
301 ClientLogicPhase::SendPredictedSteps => [self.send_steps_request()].to_vec(),
302 ClientLogicPhase::DownloadingState(_) => {
303 self.blob_stream_client.send().map_or_else(Vec::new, |x| {
304 [ClientToHostCommands::BlobStreamChannel(x)].to_vec()
305 })
306 }
307 ClientLogicPhase::RequestConnect => [self.send_connect_request()].to_vec(),
308 };
309
310 commands.extend(normal_commands);
311
312 commands
313 }
314
315 pub fn can_push_predicted_step(&self) -> bool {
316 self.is_in_game() && self.game().is_some()
317 }
318
319 pub fn is_in_game(&self) -> bool {
320 self.phase == ClientLogicPhase::SendPredictedSteps
321 && self.joining_player.is_none()
322 && !self.local_players.is_empty()
323 }
324
325 pub fn push_predicted_step(
334 &mut self,
335 tick_id: TickId,
336 step: StepMap<StepT>,
337 ) -> Result<(), ClientLogicError> {
338 self.outgoing_predicted_steps.push(tick_id, step)?;
339 Ok(())
340 }
341
342 pub fn predicted_step_count_in_queue(&self) -> usize {
343 self.outgoing_predicted_steps.len()
344 }
345
346 fn on_pong(&mut self, now: Millis, cmd: &PongInfo) -> Result<(), ClientLogicError> {
347 let low_16 = cmd.lower_millis as MillisLow16;
348
349 let earlier = now
350 .from_lower(low_16)
351 .ok_or_else(|| ClientLogicError::MillisFromLowerError)?;
352 let duration_ms = now
353 .checked_duration_since_ms(earlier)
354 .ok_or_else(|| ClientLogicError::AbsoluteTimeError)?;
355
356 self.latency.add(
357 u16::try_from(duration_ms.as_millis())
358 .map_err(|_| ClientLogicError::LatencyIsTooBig)?,
359 );
360
361 Ok(())
362 }
363
364 fn on_join_game(&mut self, cmd: &JoinGameAccepted) -> Result<(), ClientLogicError> {
372 debug!("join game accepted: {:?}", cmd);
373
374 if cmd.client_request_id != self.joining_request_id {
375 Err(ClientLogicError::WrongJoinResponseRequestId {
376 encountered: cmd.client_request_id,
377 expected: self.joining_request_id,
378 })?;
379 }
380
381 self.joining_player = None;
382
383 self.local_players.clear();
384
385 for participant in &cmd.participants.0 {
386 self.local_players.push(LocalPlayer {
387 index: participant.local_index,
388 participant_id: participant.participant_id,
389 });
390 }
391
392 Ok(())
393 }
394
395 pub const fn game(&self) -> Option<&StateT> {
400 self.state.as_ref()
401 }
402
403 pub fn game_mut(&mut self) -> Option<&mut StateT> {
404 self.state.as_mut()
405 }
406
407 fn handle_game_step_header(&mut self, header: &GameStepResponseHeader) {
412 let host_expected_tick_id = header.next_expected_tick_id;
413 self.server_buffer_delta_tick_id
414 .add(i16::from(header.delta_buffer));
415 trace!("removing every predicted step before {host_expected_tick_id}");
417 self.outgoing_predicted_steps
418 .discard_up_to(host_expected_tick_id);
419 trace!(
420 "predicted steps remaining {}",
421 self.outgoing_predicted_steps.len()
422 );
423 }
424
425 fn on_connect(&mut self, cmd: &ConnectionAccepted) -> Result<(), ClientLogicError> {
426 if self.phase != ClientLogicPhase::RequestConnect {
427 Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
428 }
429
430 if self.connect_request_id.is_none() {
431 Err(ClientLogicError::ReceivedConnectResponseWhenNotConnecting)?;
432 }
433
434 if cmd.response_to_request != self.connect_request_id.unwrap() {
435 Err(ClientLogicError::WrongConnectResponseRequestId(
436 cmd.response_to_request,
437 ))?;
438 }
439 self.phase = ClientLogicPhase::RequestDownloadState {
440 download_state_request_id: 0x99,
441 }; debug!("set phase to connected!");
443 Ok(())
444 }
445
446 fn on_game_step(
454 &mut self,
455 cmd: &GameStepResponse<Step<StepT>>,
456 ) -> Result<(), ClientLogicError> {
457 trace!("game step response: {}", cmd);
458
459 self.handle_game_step_header(&cmd.response_header);
460
461 if cmd.authoritative_steps.ranges.is_empty() {
462 return Ok(());
463 }
464
465 let mut accepted_count = 0;
466
467 for range in &cmd.authoritative_steps.ranges {
468 let range_len = u32::try_from(range.steps.len())
469 .map_err(|_| ClientLogicError::TooManyStepsInRange)?;
470 let mut current_authoritative_tick_id = range.tick_id;
471 for combined_auth_step in &range.steps {
472 if current_authoritative_tick_id
473 == self.incoming_authoritative_steps.expected_write_tick_id()
474 {
475 self.incoming_authoritative_steps
476 .push(current_authoritative_tick_id, combined_auth_step.clone())?;
477 accepted_count += 1;
478 }
479 current_authoritative_tick_id += 1;
480 }
481
482 current_authoritative_tick_id += range_len;
483 }
484
485 if accepted_count > 0 {
486 trace!(
487 "accepted {accepted_count} auth steps, waiting for {}, total count: {}",
488 self.incoming_authoritative_steps.expected_write_tick_id(),
489 self.incoming_authoritative_steps.len(),
490 );
491 }
492
493 Ok(())
494 }
495
496 fn on_download_state_response(
504 &mut self,
505 download_response: &DownloadGameStateResponse,
506 ) -> Result<(), ClientLogicError> {
507 match self.phase {
508 ClientLogicPhase::RequestDownloadState {
509 download_state_request_id,
510 } => {
511 if download_response.client_request != download_state_request_id {
512 Err(ClientLogicError::WrongDownloadRequestId)?;
513 }
514 }
515 _ => Err(ClientLogicError::DownloadResponseWasUnexpected)?,
516 }
517
518 self.phase = ClientLogicPhase::DownloadingState(download_response.tick_id);
519
520 Ok(())
521 }
522
523 fn on_blob_stream(
531 &mut self,
532 blob_stream_command: &SenderToReceiverFrontCommands,
533 ) -> Result<(), ClientLogicError> {
534 match self.phase {
535 ClientLogicPhase::DownloadingState(_) => {
536 self.blob_stream_client.receive(blob_stream_command)?;
537 if let Some(blob_ready) = self.blob_stream_client.blob() {
538 debug!("blob stream received, phase is set to SendPredictedSteps");
539 self.phase = ClientLogicPhase::SendPredictedSteps;
540 let (deserialized, _) = StateT::deserialize(blob_ready)?;
541 self.state = Some(deserialized);
542 }
543 }
544 _ => Err(ClientLogicError::UnexpectedBlobChannelCommand)?,
545 }
546 Ok(())
547 }
548
549 pub fn receive(
557 &mut self,
558 now: Millis,
559 command: &HostToClientCommands<Step<StepT>>,
560 ) -> Result<(), ClientLogicError> {
561 match command {
562 HostToClientCommands::JoinGame(ref join_game_response) => {
563 self.on_join_game(join_game_response)
564 }
565 HostToClientCommands::GameStep(ref game_step_response) => {
566 self.on_game_step(game_step_response)
567 }
568 HostToClientCommands::DownloadGameState(ref download_response) => {
569 self.on_download_state_response(download_response)
570 }
571 HostToClientCommands::BlobStreamChannel(ref blob_stream_command) => {
572 self.on_blob_stream(blob_stream_command)
573 }
574 HostToClientCommands::ConnectType(ref connect_accepted) => {
575 self.on_connect(connect_accepted)
576 }
577 HostToClientCommands::Pong(pong_info) => self.on_pong(now, pong_info),
578 }
579 }
580
581 pub fn server_buffer_delta_ticks(&self) -> Option<i16> {
586 self.server_buffer_delta_tick_id
587 .average()
588 .map(|value| value.round() as i16)
589 }
590
591 pub fn local_players(&self) -> Vec<LocalPlayer> {
592 self.local_players.clone()
593 }
594}