1use crate::combine::HostCombinator;
7use crate::session::Participant;
8use crate::{GameSession, GameStateProvider, HostLogicError, Phase};
9use app_version::Version;
10use flood_rs::{Deserialize, Serialize};
11use log::{debug, trace};
12use monotonic_time_rs::Millis;
13use nimble_blob_stream::out_logic_front::OutLogicFront;
14use nimble_blob_stream::prelude::{ReceiverToSenderFrontCommands, TransferId};
15use nimble_participant::ParticipantId;
16use nimble_protocol::client_to_host::{
17 ConnectRequest, DownloadGameStateRequest, JoinGameRequest, StepsRequest,
18};
19use nimble_protocol::host_to_client::{
20 AuthoritativeStepRanges, ConnectionAccepted, DownloadGameStateResponse, GameStepResponse,
21 GameStepResponseHeader, HostToClientCommands, JoinGameAccepted, JoinGameParticipant,
22 JoinGameParticipants, PartyAndSessionSecret,
23};
24use nimble_protocol::prelude::CombinedSteps;
25use nimble_protocol::SessionConnectionSecret;
26use nimble_step::Step;
27use std::cell::RefCell;
28use std::collections::HashMap;
29use std::fmt::Debug;
30use std::marker::PhantomData;
31use std::rc::Rc;
32use std::time::Duration;
33use tick_id::TickId;
34
35#[derive(Debug)]
36#[allow(clippy::new_without_default)]
37pub struct Connection<StepT: Clone + Eq + Debug + Deserialize + Serialize> {
38 pub participant_lookup: HashMap<ParticipantId, Rc<RefCell<Participant>>>,
39 pub out_blob_stream: Option<OutLogicFront>,
40 pub blob_stream_for_client_request: Option<u8>,
41 last_transfer_id: u16,
42 pub(crate) phase: Phase,
43 #[allow(unused)]
44 debug_counter: u16,
45 phantom_data: PhantomData<StepT>,
46}
47
48#[allow(clippy::new_without_default)]
49impl<StepT: Clone + Eq + Debug + Deserialize + Serialize + std::fmt::Display> Connection<StepT> {
50 #[must_use]
51 pub fn new() -> Self {
52 Self {
53 participant_lookup: HashMap::default(),
54 out_blob_stream: None,
55 blob_stream_for_client_request: None,
56 last_transfer_id: 0,
57 debug_counter: 0,
58 phase: Phase::WaitingForValidConnectRequest,
59 phantom_data: PhantomData,
60 }
61 }
62
63 #[must_use]
64 pub const fn phase(&self) -> &Phase {
65 &self.phase
66 }
67
68 pub fn on_connect(
72 &mut self,
73 connect_request: &ConnectRequest,
74 required_deterministic_simulation_version: &Version,
75 ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
76 self.phase = Phase::Connected;
77
78 let connect_version = Version::new(
79 connect_request.application_version.major,
80 connect_request.application_version.minor,
81 connect_request.application_version.patch,
82 );
83
84 if connect_version != *required_deterministic_simulation_version {
85 return Err(HostLogicError::WrongApplicationVersion);
86 }
87
88 let response = ConnectionAccepted {
89 flags: 0,
90 response_to_request: connect_request.client_request_id,
91 };
92 debug!(
93 "host-stream received connect request {:?} and responding:\n{:?}",
94 connect_request, response
95 );
96 Ok([HostToClientCommands::ConnectType(response)].into())
97 }
98
99 #[must_use]
100 pub fn is_state_received_by_remote(&self) -> bool {
101 self.out_blob_stream
102 .as_ref()
103 .map_or(false, OutLogicFront::is_received_by_remote)
104 }
105
106 pub(crate) fn on_blob_stream(
107 &mut self,
108 now: Millis,
109 blob_stream_command: &ReceiverToSenderFrontCommands,
110 ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
111 let blob_stream = self
112 .out_blob_stream
113 .as_mut()
114 .ok_or(HostLogicError::NoDownloadNow)?;
115 blob_stream.receive(blob_stream_command)?;
116 let blob_commands = blob_stream.send(now)?;
117
118 let converted_commands: Vec<_> = blob_commands
119 .into_iter()
120 .map(HostToClientCommands::BlobStreamChannel)
121 .collect();
122
123 Ok(converted_commands)
124 }
125
126 pub(crate) fn on_join(
127 &mut self,
128 session: &mut GameSession<StepT>,
129 request: &JoinGameRequest,
130 ) -> Result<HostToClientCommands<Step<StepT>>, HostLogicError> {
131 debug!("on_join {:?}", request);
132
133 if request.player_requests.players.is_empty() {
134 return Err(HostLogicError::NoFreeParticipantIds);
135 }
136
137 let local_indices: Vec<_> = request
138 .player_requests
139 .players
140 .iter()
141 .map(|p| p.local_index)
142 .collect();
143
144 let participants = session
145 .create_participants(local_indices.as_slice())
146 .ok_or(HostLogicError::NoFreeParticipantIds)?;
147
148 for participant in &participants {
149 self.participant_lookup
150 .insert(participant.borrow().id, participant.clone());
151 session.combinator.create_buffer(participant.borrow().id);
152 }
153
154 let join_game_participants = participants
155 .iter()
156 .map(|found_participant| JoinGameParticipant {
157 local_index: found_participant.borrow().client_local_index,
158 participant_id: found_participant.borrow().id,
159 })
160 .collect();
161
162 let join_accepted = JoinGameAccepted {
163 client_request_id: request.client_request_id,
164 party_and_session_secret: PartyAndSessionSecret {
165 session_secret: SessionConnectionSecret { value: 0 },
166 party_id: 0,
167 },
168 participants: JoinGameParticipants(join_game_participants),
169 };
170
171 Ok(HostToClientCommands::JoinGame(join_accepted))
172 }
173
174 pub(crate) fn on_download(
175 &mut self,
176 tick_id_to_be_produced: TickId,
177 now: Millis,
178 request: &DownloadGameStateRequest,
179 state_provider: &impl GameStateProvider,
180 ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
181 const FIXED_CHUNK_SIZE: u16 = 1024;
182 const RESEND_DURATION: Duration = Duration::from_millis(32 * 3);
183
184 debug!("client requested download {:?}", request);
185 let (state_tick_id, state_vec) = state_provider.state(tick_id_to_be_produced);
186
187 let is_new_request = self
188 .blob_stream_for_client_request
189 .map_or(true, |x| x == request.request_id);
190 if is_new_request {
191 self.last_transfer_id += 1;
192 let transfer_id = TransferId(self.last_transfer_id);
193 self.out_blob_stream = Some(OutLogicFront::new(
194 transfer_id,
195 FIXED_CHUNK_SIZE,
196 RESEND_DURATION,
197 state_vec.as_slice(),
198 )?);
199 }
200
201 let response = DownloadGameStateResponse {
202 client_request: request.request_id,
203 tick_id: state_tick_id,
204 blob_stream_channel: self.out_blob_stream.as_ref().unwrap().transfer_id().0,
205 };
206 let mut commands = vec![];
207 commands.push(HostToClientCommands::DownloadGameState(response));
208
209 let blob_commands = self.out_blob_stream.as_mut().unwrap().send(now)?;
213 let converted_blob_commands: Vec<_> = blob_commands
214 .into_iter()
215 .map(HostToClientCommands::BlobStreamChannel)
216 .collect();
217 commands.extend(converted_blob_commands);
218
219 Ok(commands)
220 }
221
222 pub(crate) fn on_steps(
223 &self,
224 combinator: &mut HostCombinator<StepT>,
225 request: &StepsRequest<StepT>,
226 ) -> Result<HostToClientCommands<Step<StepT>>, HostLogicError> {
227 trace!("on incoming predicted steps {}", request);
228
229 let mut current_tick = request.combined_predicted_steps.tick_id;
245 for combined_predicted_step in &request.combined_predicted_steps.steps {
246 for participant_id in combined_predicted_step.keys() {
247 if self.participant_lookup.contains_key(participant_id) {
249 let part = combined_predicted_step.get(participant_id).unwrap();
250
251 let buffer = combinator
252 .get_mut(*participant_id)
253 .expect("since the participant lookup worked, there should be a buffer");
254 if buffer.expected_write_tick_id() != current_tick {
255 continue;
256 }
257 buffer.push(current_tick, part.clone())?;
258 } else {
259 return Err(HostLogicError::UnknownPartyMember(*participant_id));
260 }
261 }
262 current_tick += 1;
263 }
264
265 let authoritative_steps = combinator.authoritative_steps();
266
267 let combined_steps_vec =
268 authoritative_steps
269 .front_tick_id()
270 .map_or(Vec::new(), |found_first_tick_id| {
271 let combined_steps = CombinedSteps::<Step<StepT>> {
272 tick_id: found_first_tick_id,
273 steps: authoritative_steps.to_vec(),
274 };
275 vec![combined_steps]
276 });
277
278 let game_step_response = GameStepResponse {
279 response_header: GameStepResponseHeader {
280 connection_buffer_count: 0,
281 delta_buffer: 0,
282 next_expected_tick_id: combinator.tick_id_to_produce(),
283 },
284 authoritative_steps: AuthoritativeStepRanges {
285 ranges: combined_steps_vec,
286 },
287 };
288
289 trace!("sending auth steps: {}", game_step_response);
290 Ok(HostToClientCommands::GameStep(game_step_response))
291 }
292}