cheetah_game_realtime_protocol/
lib.rs

1extern crate core;
2
3use std::collections::VecDeque;
4use std::fmt::Debug;
5use std::time::Instant;
6
7use crate::coniguration::ProtocolConfiguration;
8use crate::disconnect::command::DisconnectByCommand;
9use crate::disconnect::timeout::DisconnectByTimeout;
10use crate::frame::disconnected_reason::DisconnectedReason;
11use crate::frame::packets_collector::{PacketsCollector, PACKET_SIZE};
12use crate::frame::segment::{Segment, SEGMENT_SIZE};
13use crate::frame::{ConnectionId, Frame, FrameId};
14use crate::others::keep_alive::KeepAlive;
15use crate::others::rtt::RoundTripTime;
16use crate::reliable::ack::AckSender;
17use crate::reliable::replay_protection::FrameReplayProtection;
18use crate::reliable::retransmit::Retransmitter;
19
20pub mod codec;
21pub mod collections;
22pub mod coniguration;
23pub mod disconnect;
24pub mod frame;
25pub mod others;
26pub mod reliable;
27
28pub type RoomMemberId = u64;
29pub type RoomId = u64;
30
31///
32/// Примерное количество фреймов в секунду на одного peer
33/// - необходимо для расчетов размеров структур
34/// - в точности нет необходимости, но не должно отличаться на порядки
35///
36pub const MAX_FRAME_PER_SECONDS: usize = 120;
37
38///
39/// Если от peer не будет фреймов за данное время - считаем что соединение разорвано
40///
41pub const DISCONNECT_TIMEOUT_IN_SECONDS: usize = 60;
42pub const NOT_EXIST_FRAME_ID: FrameId = 0;
43
44pub trait InputDataHandler {
45    fn on_input_data(&mut self, data: &[u8]);
46    fn reset(&mut self);
47}
48
49pub trait OutputDataProducer {
50    fn contains_output_data(&self) -> bool;
51    fn get_output_data(&mut self, out: &mut [u8]) -> (usize, bool);
52    fn reset(&mut self);
53}
54
55///
56/// Реализация игрового протокола, поверх ненадежного канала доставки данных (например, через UDP)
57///
58/// - логическая часть, без сети и сериализации
59/// - надежная доставка
60/// - защита от повторов
61///
62#[derive(Debug)]
63pub struct Protocol<IN, OUT>
64    where
65        IN: InputDataHandler,
66        OUT: OutputDataProducer,
67{
68    pub connection_id: ConnectionId,
69    pub next_frame_id: u64,
70    pub next_packed_id: u64,
71    pub replay_protection: FrameReplayProtection,
72    pub ack_sender: AckSender,
73    pub retransmitter: Retransmitter,
74    pub disconnect_by_timeout: DisconnectByTimeout,
75    pub disconnect_by_command: DisconnectByCommand,
76    pub input_data_handler: IN,
77    pub output_data_producer: OUT,
78    pub rtt: RoundTripTime,
79    pub keep_alive: KeepAlive,
80    pub in_frame_counter: u64,
81    packets_collector: PacketsCollector,
82    pub configuration: ProtocolConfiguration,
83}
84
85impl<IN, OUT> Protocol<IN, OUT>
86    where
87        IN: InputDataHandler,
88        OUT: OutputDataProducer,
89{
90    #[must_use]
91    pub fn new(input_data_handler: IN, output_data_producer: OUT, connection_id: ConnectionId, now: Instant, start_application_time: Instant, configuration: ProtocolConfiguration) -> Self {
92        Self {
93            next_frame_id: 1,
94            next_packed_id: 0,
95            disconnect_by_timeout: DisconnectByTimeout::new(now, configuration.disconnect_timeout),
96            retransmitter: Retransmitter::new(configuration.disconnect_timeout),
97            rtt: RoundTripTime::new(start_application_time),
98            connection_id,
99            input_data_handler,
100            output_data_producer,
101            replay_protection: Default::default(),
102            ack_sender: Default::default(),
103            disconnect_by_command: Default::default(),
104            keep_alive: Default::default(),
105            in_frame_counter: Default::default(),
106            packets_collector: Default::default(),
107            configuration,
108        }
109    }
110
111    ///
112    /// Обработка входящего фрейма
113    ///
114    pub fn on_frame_received(&mut self, frame: &Frame, now: Instant) {
115        // у другой стороны уже новый идентификатор соединения
116        if frame.connection_id > self.connection_id {
117            self.reset_protocol(frame, now);
118        }
119
120        // игнорируем все входящие фреймы не с текущем идентификатором соединения
121        if frame.connection_id != self.connection_id {
122            return;
123        }
124
125        self.in_frame_counter += 1;
126        self.disconnect_by_timeout.on_frame_received(now);
127        self.ack_sender.on_frame_received(frame, now);
128        self.retransmitter.on_frame_received(frame);
129        match self.replay_protection.set_and_check(frame) {
130            Ok(replayed) => {
131                if !replayed {
132                    self.disconnect_by_command.on_frame_received(frame);
133                    self.rtt.on_frame_received(frame, now);
134                    self.processing_data(frame);
135                }
136            }
137            Err(..) => {
138                tracing::error!("Replay Protection overflow")
139            }
140        }
141    }
142
143    fn processing_data(&mut self, frame: &Frame) {
144        match self.packets_collector.on_segment(&frame.segment) {
145            Ok(packet) => match packet {
146                None => {}
147                Some(packet) => {
148                    self.input_data_handler.on_input_data(packet);
149                }
150            },
151            Err(_) => {
152                tracing::error!("PacketsCollector error")
153            }
154        }
155    }
156
157    fn reset_protocol(&mut self, frame: &Frame, now: Instant) {
158        self.connection_id = frame.connection_id;
159        self.next_frame_id = 1;
160        self.next_packed_id = 1;
161        self.disconnect_by_timeout = DisconnectByTimeout::new(now, self.disconnect_by_timeout.timeout);
162        self.replay_protection = Default::default();
163        self.ack_sender = Default::default();
164        self.retransmitter = Retransmitter::new(self.configuration.disconnect_timeout);
165        self.disconnect_by_command = Default::default();
166        self.keep_alive = Default::default();
167        self.in_frame_counter = Default::default();
168        self.packets_collector = Default::default();
169        self.input_data_handler.reset();
170        self.output_data_producer.reset();
171    }
172
173    ///
174    /// Создание фрейма для отправки
175    ///
176    #[allow(clippy::cast_precision_loss)]
177    pub fn collect_out_frames(&mut self, now: Instant, out: &mut VecDeque<Frame>) {
178        match self.get_next_retransmit_frame(now) {
179            None => {}
180            Some(frame) => {
181                out.push_back(frame);
182                return;
183            }
184        }
185
186        let contains_data =
187            self.ack_sender.contains_self_data(now) || self.output_data_producer.contains_output_data() || self.disconnect_by_command.contains_self_data() || self.keep_alive.contains_self_data(now);
188
189        if !contains_data {
190            return;
191        }
192
193        let mut packet = [0; PACKET_SIZE];
194        let (packet_size, reliability) = self.output_data_producer.get_output_data(&mut packet);
195        let segments = packet[0..packet_size].chunks(SEGMENT_SIZE);
196
197
198        if segments.len() == 0 {
199            self.next_frame_id += 1;
200            let frame = self.create_out_frame(now, reliability, 0, 0, &[0; 0]);
201            out.push_back(frame);
202        }
203
204        let count_segments = segments.len();
205        for (segment_number, segment_data) in segments.enumerate() {
206            self.next_frame_id += 1;
207            let frame = self.create_out_frame(now, reliability, count_segments, segment_number, segment_data);
208            out.push_back(frame);
209        }
210        self.next_packed_id += 1;
211    }
212
213    fn create_out_frame(&mut self, now: Instant, reliability: bool, count_segments: usize, segment_number: usize, segment_data: &[u8]) -> Frame {
214        let segment = Segment::new(self.next_packed_id, count_segments as u8, segment_number as u8, segment_data);
215        let mut frame = Frame::new(self.connection_id, self.next_frame_id, reliability, segment);
216        self.disconnect_by_command.build_frame(&mut frame);
217        self.rtt.build_frame(&mut frame, now);
218        self.keep_alive.build_frame(&mut frame, now);
219        self.retransmitter.build_frame(&frame, now);
220        self.ack_sender.build_out_frame(&mut frame, now);
221        frame
222    }
223
224    ///
225    /// Разорвана ли связь?
226    ///
227    #[must_use]
228    pub fn is_disconnected(&self, now: Instant) -> Option<DisconnectedReason> {
229        let reason = if self.retransmitter.is_disconnected(now) {
230            Some(DisconnectedReason::RetransmitOverflow)
231        } else if self.disconnect_by_timeout.is_disconnected(now) {
232            Some(DisconnectedReason::Timeout)
233        } else {
234            self.disconnect_by_command.disconnected().map(DisconnectedReason::Command)
235        };
236        if reason.is_some() {
237            tracing::info!("Protocol: is disconnected {:?}", reason);
238        }
239        reason
240    }
241
242    ///
243    /// Установлено ли соединения?
244    ///
245    #[must_use]
246    pub fn is_connected(&self, now: Instant) -> bool {
247        self.in_frame_counter > 0 && self.is_disconnected(now).is_none()
248    }
249
250    pub fn get_next_retransmit_frame(&mut self, now: Instant) -> Option<Frame> {
251        let next_frame_id = self.next_frame_id + 1;
252        match self.retransmitter.get_retransmit_frame(now, next_frame_id) {
253            None => None,
254            Some(frame) => {
255                self.next_frame_id = next_frame_id;
256                Some(frame)
257            }
258        }
259    }
260}
261
262#[cfg(test)]
263pub mod tests {
264    use std::collections::VecDeque;
265    use std::time::{Duration, Instant};
266
267    use crate::coniguration::ProtocolConfiguration;
268    use crate::frame::Frame;
269    use crate::frame::{ConnectionId, FrameId};
270    use crate::{InputDataHandler, OutputDataProducer, Protocol};
271
272    #[derive(Default)]
273    struct StubDataRecvHandler {
274        on_recv_count: usize,
275    }
276
277    impl InputDataHandler for StubDataRecvHandler {
278        fn on_input_data(&mut self, _data: &[u8]) {
279            self.on_recv_count += 1;
280        }
281
282        fn reset(&mut self) {
283        }
284    }
285
286    #[derive(Default)]
287    struct StubDataSource {}
288
289    impl OutputDataProducer for StubDataSource {
290        fn contains_output_data(&self) -> bool {
291            true
292        }
293
294        fn get_output_data(&mut self, _packet: &mut [u8]) -> (usize, bool) {
295            (0, false)
296        }
297
298        fn reset(&mut self) {
299        }
300    }
301
302    #[test]
303    fn should_dont_apply_commands_from_frame_id_with_different_connection_id() {
304        let mut protocol = create_protocol(5);
305        protocol.on_frame_received(&create_frame(1, 1), Instant::now());
306        assert_eq!(protocol.input_data_handler.on_recv_count, 0);
307    }
308
309    #[test]
310    fn should_keep_alive_frame_create() {
311        let mut protocol = create_protocol(5);
312        let mut out: VecDeque<Frame> = Default::default();
313        protocol.collect_out_frames(Instant::now(), &mut out);
314        assert_eq!(out.len(), 1);
315    }
316
317
318    #[test]
319    fn should_reset() {
320        let mut protocol = create_protocol(1);
321        protocol.on_frame_received(&create_frame(5, 99), Instant::now());
322        let mut out: VecDeque<Frame> = Default::default();
323        protocol.collect_out_frames(Instant::now(), &mut out);
324        // на переключение connection_id протокол должен сгенироровать новый keep alive пакет
325        assert_eq!(out[0].connection_id, 5);
326        assert_eq!(out[0].frame_id, 2);
327    }
328
329    fn create_frame(connection_id: ConnectionId, frame_id: FrameId) -> Frame {
330        Frame::new(connection_id, frame_id, true, Default::default())
331    }
332
333    fn create_protocol(connection_id: ConnectionId) -> Protocol<StubDataRecvHandler, StubDataSource> {
334        Protocol::<StubDataRecvHandler, StubDataSource>::new(
335            Default::default(),
336            Default::default(),
337            connection_id,
338            Instant::now(),
339            Instant::now(),
340            ProtocolConfiguration {
341                disconnect_timeout: Duration::from_millis(100),
342            },
343        )
344    }
345}