cheetah_game_realtime_protocol/
lib.rs1extern crate core;
2
3use std::collections::VecDeque;
4use std::fmt::Debug;
5use std::time::Instant;
6
7use crate::coniguration::ProtocolConfiguration;
8use crate::disconnect::command::DisconnectByCommand;
9use crate::disconnect::timeout::DisconnectByTimeout;
10use crate::frame::disconnected_reason::DisconnectedReason;
11use crate::frame::packets_collector::{PacketsCollector, PACKET_SIZE};
12use crate::frame::segment::{Segment, SEGMENT_SIZE};
13use crate::frame::{ConnectionId, Frame, FrameId};
14use crate::others::keep_alive::KeepAlive;
15use crate::others::rtt::RoundTripTime;
16use crate::reliable::ack::AckSender;
17use crate::reliable::replay_protection::FrameReplayProtection;
18use crate::reliable::retransmit::Retransmitter;
19
20pub mod codec;
21pub mod collections;
22pub mod coniguration;
23pub mod disconnect;
24pub mod frame;
25pub mod others;
26pub mod reliable;
27
28pub type RoomMemberId = u64;
29pub type RoomId = u64;
30
31pub const MAX_FRAME_PER_SECONDS: usize = 120;
37
38pub const DISCONNECT_TIMEOUT_IN_SECONDS: usize = 60;
42pub const NOT_EXIST_FRAME_ID: FrameId = 0;
43
44pub trait InputDataHandler {
45 fn on_input_data(&mut self, data: &[u8]);
46 fn reset(&mut self);
47}
48
49pub trait OutputDataProducer {
50 fn contains_output_data(&self) -> bool;
51 fn get_output_data(&mut self, out: &mut [u8]) -> (usize, bool);
52 fn reset(&mut self);
53}
54
55#[derive(Debug)]
63pub struct Protocol<IN, OUT>
64 where
65 IN: InputDataHandler,
66 OUT: OutputDataProducer,
67{
68 pub connection_id: ConnectionId,
69 pub next_frame_id: u64,
70 pub next_packed_id: u64,
71 pub replay_protection: FrameReplayProtection,
72 pub ack_sender: AckSender,
73 pub retransmitter: Retransmitter,
74 pub disconnect_by_timeout: DisconnectByTimeout,
75 pub disconnect_by_command: DisconnectByCommand,
76 pub input_data_handler: IN,
77 pub output_data_producer: OUT,
78 pub rtt: RoundTripTime,
79 pub keep_alive: KeepAlive,
80 pub in_frame_counter: u64,
81 packets_collector: PacketsCollector,
82 pub configuration: ProtocolConfiguration,
83}
84
85impl<IN, OUT> Protocol<IN, OUT>
86 where
87 IN: InputDataHandler,
88 OUT: OutputDataProducer,
89{
90 #[must_use]
91 pub fn new(input_data_handler: IN, output_data_producer: OUT, connection_id: ConnectionId, now: Instant, start_application_time: Instant, configuration: ProtocolConfiguration) -> Self {
92 Self {
93 next_frame_id: 1,
94 next_packed_id: 0,
95 disconnect_by_timeout: DisconnectByTimeout::new(now, configuration.disconnect_timeout),
96 retransmitter: Retransmitter::new(configuration.disconnect_timeout),
97 rtt: RoundTripTime::new(start_application_time),
98 connection_id,
99 input_data_handler,
100 output_data_producer,
101 replay_protection: Default::default(),
102 ack_sender: Default::default(),
103 disconnect_by_command: Default::default(),
104 keep_alive: Default::default(),
105 in_frame_counter: Default::default(),
106 packets_collector: Default::default(),
107 configuration,
108 }
109 }
110
111 pub fn on_frame_received(&mut self, frame: &Frame, now: Instant) {
115 if frame.connection_id > self.connection_id {
117 self.reset_protocol(frame, now);
118 }
119
120 if frame.connection_id != self.connection_id {
122 return;
123 }
124
125 self.in_frame_counter += 1;
126 self.disconnect_by_timeout.on_frame_received(now);
127 self.ack_sender.on_frame_received(frame, now);
128 self.retransmitter.on_frame_received(frame);
129 match self.replay_protection.set_and_check(frame) {
130 Ok(replayed) => {
131 if !replayed {
132 self.disconnect_by_command.on_frame_received(frame);
133 self.rtt.on_frame_received(frame, now);
134 self.processing_data(frame);
135 }
136 }
137 Err(..) => {
138 tracing::error!("Replay Protection overflow")
139 }
140 }
141 }
142
143 fn processing_data(&mut self, frame: &Frame) {
144 match self.packets_collector.on_segment(&frame.segment) {
145 Ok(packet) => match packet {
146 None => {}
147 Some(packet) => {
148 self.input_data_handler.on_input_data(packet);
149 }
150 },
151 Err(_) => {
152 tracing::error!("PacketsCollector error")
153 }
154 }
155 }
156
157 fn reset_protocol(&mut self, frame: &Frame, now: Instant) {
158 self.connection_id = frame.connection_id;
159 self.next_frame_id = 1;
160 self.next_packed_id = 1;
161 self.disconnect_by_timeout = DisconnectByTimeout::new(now, self.disconnect_by_timeout.timeout);
162 self.replay_protection = Default::default();
163 self.ack_sender = Default::default();
164 self.retransmitter = Retransmitter::new(self.configuration.disconnect_timeout);
165 self.disconnect_by_command = Default::default();
166 self.keep_alive = Default::default();
167 self.in_frame_counter = Default::default();
168 self.packets_collector = Default::default();
169 self.input_data_handler.reset();
170 self.output_data_producer.reset();
171 }
172
173 #[allow(clippy::cast_precision_loss)]
177 pub fn collect_out_frames(&mut self, now: Instant, out: &mut VecDeque<Frame>) {
178 match self.get_next_retransmit_frame(now) {
179 None => {}
180 Some(frame) => {
181 out.push_back(frame);
182 return;
183 }
184 }
185
186 let contains_data =
187 self.ack_sender.contains_self_data(now) || self.output_data_producer.contains_output_data() || self.disconnect_by_command.contains_self_data() || self.keep_alive.contains_self_data(now);
188
189 if !contains_data {
190 return;
191 }
192
193 let mut packet = [0; PACKET_SIZE];
194 let (packet_size, reliability) = self.output_data_producer.get_output_data(&mut packet);
195 let segments = packet[0..packet_size].chunks(SEGMENT_SIZE);
196
197
198 if segments.len() == 0 {
199 self.next_frame_id += 1;
200 let frame = self.create_out_frame(now, reliability, 0, 0, &[0; 0]);
201 out.push_back(frame);
202 }
203
204 let count_segments = segments.len();
205 for (segment_number, segment_data) in segments.enumerate() {
206 self.next_frame_id += 1;
207 let frame = self.create_out_frame(now, reliability, count_segments, segment_number, segment_data);
208 out.push_back(frame);
209 }
210 self.next_packed_id += 1;
211 }
212
213 fn create_out_frame(&mut self, now: Instant, reliability: bool, count_segments: usize, segment_number: usize, segment_data: &[u8]) -> Frame {
214 let segment = Segment::new(self.next_packed_id, count_segments as u8, segment_number as u8, segment_data);
215 let mut frame = Frame::new(self.connection_id, self.next_frame_id, reliability, segment);
216 self.disconnect_by_command.build_frame(&mut frame);
217 self.rtt.build_frame(&mut frame, now);
218 self.keep_alive.build_frame(&mut frame, now);
219 self.retransmitter.build_frame(&frame, now);
220 self.ack_sender.build_out_frame(&mut frame, now);
221 frame
222 }
223
224 #[must_use]
228 pub fn is_disconnected(&self, now: Instant) -> Option<DisconnectedReason> {
229 let reason = if self.retransmitter.is_disconnected(now) {
230 Some(DisconnectedReason::RetransmitOverflow)
231 } else if self.disconnect_by_timeout.is_disconnected(now) {
232 Some(DisconnectedReason::Timeout)
233 } else {
234 self.disconnect_by_command.disconnected().map(DisconnectedReason::Command)
235 };
236 if reason.is_some() {
237 tracing::info!("Protocol: is disconnected {:?}", reason);
238 }
239 reason
240 }
241
242 #[must_use]
246 pub fn is_connected(&self, now: Instant) -> bool {
247 self.in_frame_counter > 0 && self.is_disconnected(now).is_none()
248 }
249
250 pub fn get_next_retransmit_frame(&mut self, now: Instant) -> Option<Frame> {
251 let next_frame_id = self.next_frame_id + 1;
252 match self.retransmitter.get_retransmit_frame(now, next_frame_id) {
253 None => None,
254 Some(frame) => {
255 self.next_frame_id = next_frame_id;
256 Some(frame)
257 }
258 }
259 }
260}
261
262#[cfg(test)]
263pub mod tests {
264 use std::collections::VecDeque;
265 use std::time::{Duration, Instant};
266
267 use crate::coniguration::ProtocolConfiguration;
268 use crate::frame::Frame;
269 use crate::frame::{ConnectionId, FrameId};
270 use crate::{InputDataHandler, OutputDataProducer, Protocol};
271
272 #[derive(Default)]
273 struct StubDataRecvHandler {
274 on_recv_count: usize,
275 }
276
277 impl InputDataHandler for StubDataRecvHandler {
278 fn on_input_data(&mut self, _data: &[u8]) {
279 self.on_recv_count += 1;
280 }
281
282 fn reset(&mut self) {
283 }
284 }
285
286 #[derive(Default)]
287 struct StubDataSource {}
288
289 impl OutputDataProducer for StubDataSource {
290 fn contains_output_data(&self) -> bool {
291 true
292 }
293
294 fn get_output_data(&mut self, _packet: &mut [u8]) -> (usize, bool) {
295 (0, false)
296 }
297
298 fn reset(&mut self) {
299 }
300 }
301
302 #[test]
303 fn should_dont_apply_commands_from_frame_id_with_different_connection_id() {
304 let mut protocol = create_protocol(5);
305 protocol.on_frame_received(&create_frame(1, 1), Instant::now());
306 assert_eq!(protocol.input_data_handler.on_recv_count, 0);
307 }
308
309 #[test]
310 fn should_keep_alive_frame_create() {
311 let mut protocol = create_protocol(5);
312 let mut out: VecDeque<Frame> = Default::default();
313 protocol.collect_out_frames(Instant::now(), &mut out);
314 assert_eq!(out.len(), 1);
315 }
316
317
318 #[test]
319 fn should_reset() {
320 let mut protocol = create_protocol(1);
321 protocol.on_frame_received(&create_frame(5, 99), Instant::now());
322 let mut out: VecDeque<Frame> = Default::default();
323 protocol.collect_out_frames(Instant::now(), &mut out);
324 assert_eq!(out[0].connection_id, 5);
326 assert_eq!(out[0].frame_id, 2);
327 }
328
329 fn create_frame(connection_id: ConnectionId, frame_id: FrameId) -> Frame {
330 Frame::new(connection_id, frame_id, true, Default::default())
331 }
332
333 fn create_protocol(connection_id: ConnectionId) -> Protocol<StubDataRecvHandler, StubDataSource> {
334 Protocol::<StubDataRecvHandler, StubDataSource>::new(
335 Default::default(),
336 Default::default(),
337 connection_id,
338 Instant::now(),
339 Instant::now(),
340 ProtocolConfiguration {
341 disconnect_timeout: Duration::from_millis(100),
342 },
343 )
344 }
345}