nimble_host_logic/
connection.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5
6use crate::combine::HostCombinator;
7use crate::session::Participant;
8use crate::{GameSession, GameStateProvider, HostLogicError, Phase};
9use app_version::Version;
10use flood_rs::{Deserialize, Serialize};
11use log::{debug, trace};
12use monotonic_time_rs::Millis;
13use nimble_blob_stream::out_logic_front::OutLogicFront;
14use nimble_blob_stream::prelude::{ReceiverToSenderFrontCommands, TransferId};
15use nimble_participant::ParticipantId;
16use nimble_protocol::client_to_host::{
17    ConnectRequest, DownloadGameStateRequest, JoinGameRequest, StepsRequest,
18};
19use nimble_protocol::host_to_client::{
20    AuthoritativeStepRanges, ConnectionAccepted, DownloadGameStateResponse, GameStepResponse,
21    GameStepResponseHeader, HostToClientCommands, JoinGameAccepted, JoinGameParticipant,
22    JoinGameParticipants, PartyAndSessionSecret,
23};
24use nimble_protocol::prelude::CombinedSteps;
25use nimble_protocol::SessionConnectionSecret;
26use nimble_step::Step;
27use std::cell::RefCell;
28use std::collections::HashMap;
29use std::fmt::Debug;
30use std::marker::PhantomData;
31use std::rc::Rc;
32use std::time::Duration;
33use tick_id::TickId;
34
35#[derive(Debug)]
36#[allow(clippy::new_without_default)]
37pub struct Connection<StepT: Clone + Eq + Debug + Deserialize + Serialize> {
38    pub participant_lookup: HashMap<ParticipantId, Rc<RefCell<Participant>>>,
39    pub out_blob_stream: Option<OutLogicFront>,
40    pub blob_stream_for_client_request: Option<u8>,
41    last_transfer_id: u16,
42    pub(crate) phase: Phase,
43    #[allow(unused)]
44    debug_counter: u16,
45    phantom_data: PhantomData<StepT>,
46}
47
48#[allow(clippy::new_without_default)]
49impl<StepT: Clone + Eq + Debug + Deserialize + Serialize + std::fmt::Display> Connection<StepT> {
50    #[must_use]
51    pub fn new() -> Self {
52        Self {
53            participant_lookup: HashMap::default(),
54            out_blob_stream: None,
55            blob_stream_for_client_request: None,
56            last_transfer_id: 0,
57            debug_counter: 0,
58            phase: Phase::WaitingForValidConnectRequest,
59            phantom_data: PhantomData,
60        }
61    }
62
63    #[must_use]
64    pub const fn phase(&self) -> &Phase {
65        &self.phase
66    }
67
68    /// # Errors
69    ///
70    /// `HostLogicError` // TODO:
71    pub fn on_connect(
72        &mut self,
73        connect_request: &ConnectRequest,
74        required_deterministic_simulation_version: &Version,
75    ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
76        self.phase = Phase::Connected;
77
78        let connect_version = Version::new(
79            connect_request.application_version.major,
80            connect_request.application_version.minor,
81            connect_request.application_version.patch,
82        );
83
84        if connect_version != *required_deterministic_simulation_version {
85            return Err(HostLogicError::WrongApplicationVersion);
86        }
87
88        let response = ConnectionAccepted {
89            flags: 0,
90            response_to_request: connect_request.client_request_id,
91        };
92        debug!(
93            "host-stream received connect request {:?} and responding:\n{:?}",
94            connect_request, response
95        );
96        Ok([HostToClientCommands::ConnectType(response)].into())
97    }
98
99    #[must_use]
100    pub fn is_state_received_by_remote(&self) -> bool {
101        self.out_blob_stream
102            .as_ref()
103            .map_or(false, OutLogicFront::is_received_by_remote)
104    }
105
106    pub(crate) fn on_blob_stream(
107        &mut self,
108        now: Millis,
109        blob_stream_command: &ReceiverToSenderFrontCommands,
110    ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
111        let blob_stream = self
112            .out_blob_stream
113            .as_mut()
114            .ok_or(HostLogicError::NoDownloadNow)?;
115        blob_stream.receive(blob_stream_command)?;
116        let blob_commands = blob_stream.send(now)?;
117
118        let converted_commands: Vec<_> = blob_commands
119            .into_iter()
120            .map(HostToClientCommands::BlobStreamChannel)
121            .collect();
122
123        Ok(converted_commands)
124    }
125
126    pub(crate) fn on_join(
127        &mut self,
128        session: &mut GameSession<StepT>,
129        request: &JoinGameRequest,
130    ) -> Result<HostToClientCommands<Step<StepT>>, HostLogicError> {
131        debug!("on_join {:?}", request);
132
133        if request.player_requests.players.is_empty() {
134            return Err(HostLogicError::NoFreeParticipantIds);
135        }
136
137        let local_indices: Vec<_> = request
138            .player_requests
139            .players
140            .iter()
141            .map(|p| p.local_index)
142            .collect();
143
144        let participants = session
145            .create_participants(local_indices.as_slice())
146            .ok_or(HostLogicError::NoFreeParticipantIds)?;
147
148        for participant in &participants {
149            self.participant_lookup
150                .insert(participant.borrow().id, participant.clone());
151            session.combinator.create_buffer(participant.borrow().id);
152        }
153
154        let join_game_participants = participants
155            .iter()
156            .map(|found_participant| JoinGameParticipant {
157                local_index: found_participant.borrow().client_local_index,
158                participant_id: found_participant.borrow().id,
159            })
160            .collect();
161
162        let join_accepted = JoinGameAccepted {
163            client_request_id: request.client_request_id,
164            party_and_session_secret: PartyAndSessionSecret {
165                session_secret: SessionConnectionSecret { value: 0 },
166                party_id: 0,
167            },
168            participants: JoinGameParticipants(join_game_participants),
169        };
170
171        Ok(HostToClientCommands::JoinGame(join_accepted))
172    }
173
174    pub(crate) fn on_download(
175        &mut self,
176        tick_id_to_be_produced: TickId,
177        now: Millis,
178        request: &DownloadGameStateRequest,
179        state_provider: &impl GameStateProvider,
180    ) -> Result<Vec<HostToClientCommands<Step<StepT>>>, HostLogicError> {
181        const FIXED_CHUNK_SIZE: u16 = 1024;
182        const RESEND_DURATION: Duration = Duration::from_millis(32 * 3);
183
184        debug!("client requested download {:?}", request);
185        let (state_tick_id, state_vec) = state_provider.state(tick_id_to_be_produced);
186
187        let is_new_request = self
188            .blob_stream_for_client_request
189            .map_or(true, |x| x == request.request_id);
190        if is_new_request {
191            self.last_transfer_id += 1;
192            let transfer_id = TransferId(self.last_transfer_id);
193            self.out_blob_stream = Some(OutLogicFront::new(
194                transfer_id,
195                FIXED_CHUNK_SIZE,
196                RESEND_DURATION,
197                state_vec.as_slice(),
198            )?);
199        }
200
201        let response = DownloadGameStateResponse {
202            client_request: request.request_id,
203            tick_id: state_tick_id,
204            blob_stream_channel: self.out_blob_stream.as_ref().unwrap().transfer_id().0,
205        };
206        let mut commands = vec![];
207        commands.push(HostToClientCommands::DownloadGameState(response));
208
209        // Since most datagram transports have a very low packet drop rate,
210        // this implementation is optimized for the high likelihood of datagram delivery.
211        // So we start including the first blob commands right away
212        let blob_commands = self.out_blob_stream.as_mut().unwrap().send(now)?;
213        let converted_blob_commands: Vec<_> = blob_commands
214            .into_iter()
215            .map(HostToClientCommands::BlobStreamChannel)
216            .collect();
217        commands.extend(converted_blob_commands);
218
219        Ok(commands)
220    }
221
222    pub(crate) fn on_steps(
223        &self,
224        combinator: &mut HostCombinator<StepT>,
225        request: &StepsRequest<StepT>,
226    ) -> Result<HostToClientCommands<Step<StepT>>, HostLogicError> {
227        trace!("on incoming predicted steps {}", request);
228
229        /*
230                               let mut tick = add_steps_request.combined_predicted_steps.tick_id;
231                       for combined_step in &add_steps_request.combined_predicted_steps.steps {
232                           for (participant_id, step) in combined_step.combined_step.into_iter() {
233                               if !connection.participant_lookup.contains_key(participant_id) {
234                                   Err(HostLogicError::UnknownPartyMember(*participant_id))?;
235                               }
236                               self.session
237                                   .combinator
238                                   .receive_step(*participant_id, tick, step.clone())?;
239                           }
240                           tick += 1;
241                       }
242        */
243
244        let mut current_tick = request.combined_predicted_steps.tick_id;
245        for combined_predicted_step in &request.combined_predicted_steps.steps {
246            for participant_id in combined_predicted_step.keys() {
247                // TODO:
248                if self.participant_lookup.contains_key(participant_id) {
249                    let part = combined_predicted_step.get(participant_id).unwrap();
250
251                    let buffer = combinator
252                        .get_mut(*participant_id)
253                        .expect("since the participant lookup worked, there should be a buffer");
254                    if buffer.expected_write_tick_id() != current_tick {
255                        continue;
256                    }
257                    buffer.push(current_tick, part.clone())?;
258                } else {
259                    return Err(HostLogicError::UnknownPartyMember(*participant_id));
260                }
261            }
262            current_tick += 1;
263        }
264
265        let authoritative_steps = combinator.authoritative_steps();
266
267        let combined_steps_vec =
268            authoritative_steps
269                .front_tick_id()
270                .map_or(Vec::new(), |found_first_tick_id| {
271                    let combined_steps = CombinedSteps::<Step<StepT>> {
272                        tick_id: found_first_tick_id,
273                        steps: authoritative_steps.to_vec(),
274                    };
275                    vec![combined_steps]
276                });
277
278        let game_step_response = GameStepResponse {
279            response_header: GameStepResponseHeader {
280                connection_buffer_count: 0,
281                delta_buffer: 0,
282                next_expected_tick_id: combinator.tick_id_to_produce(),
283            },
284            authoritative_steps: AuthoritativeStepRanges {
285                ranges: combined_steps_vec,
286            },
287        };
288
289        trace!("sending auth steps: {}", game_step_response);
290        Ok(HostToClientCommands::GameStep(game_step_response))
291    }
292}