cheetah_game_realtime_protocol/reliable/retransmit/
mod.rs1use std::cmp::max;
2use std::collections::{HashSet, VecDeque};
3use std::ops::Sub;
4use std::time::{Duration, Instant};
5use std::usize;
6
7use fnv::FnvBuildHasher;
8
9use crate::frame::headers::{Header, HeaderVec};
10use crate::frame::Frame;
11use crate::frame::FrameId;
12use crate::reliable::ack::header::AckHeader;
13use crate::reliable::retransmit::header::RetransmitHeader;
14
15pub mod header;
16
17pub const RETRANSMIT_DEFAULT_ACK_TIMEOUT: Duration = Duration::from_millis(300);
21
22#[derive(Debug)]
23pub struct Retransmitter {
24 frames: VecDeque<ScheduledFrame>,
28
29 wait_ack_frames: HashSet<FrameId, FnvBuildHasher>,
33
34 current_max_retransmit_count: usize,
38 ack_wait_duration: Duration,
42
43 retransmit_limit: usize,
44}
45
46#[derive(Debug)]
47pub struct ScheduledFrame {
48 pub time: Instant,
49 pub original_frame_id: FrameId,
50 pub frame: Frame,
51 pub retransmit_count: usize,
52}
53
54impl Retransmitter {
55 pub(crate) fn new(disconnect_timeout: Duration) -> Self {
56 let retransmit_limit = (disconnect_timeout.as_secs_f64() / RETRANSMIT_DEFAULT_ACK_TIMEOUT.as_secs_f64()) as usize;
57
58 Self {
59 frames: Default::default(),
60 wait_ack_frames: Default::default(),
61 current_max_retransmit_count: Default::default(),
62 ack_wait_duration: RETRANSMIT_DEFAULT_ACK_TIMEOUT,
63 retransmit_limit,
64 }
65 }
66 pub fn get_retransmit_frame(&mut self, now: Instant, retransmit_frame_id: FrameId) -> Option<Frame> {
71 loop {
72 match self.frames.front() {
73 None => {
74 return None;
75 }
76 Some(scheduled_frame) => {
77 if !self.wait_ack_frames.contains(&scheduled_frame.original_frame_id) {
78 self.frames.pop_front();
79 } else if now.sub(scheduled_frame.time) >= self.ack_wait_duration {
80 let mut scheduled_frame = self.frames.pop_front().unwrap();
81
82 let retransmit_count = scheduled_frame.retransmit_count.checked_add(1).unwrap_or(usize::MAX);
83 if retransmit_count == usize::MAX {
84 tracing::error!("Retransmit count overflow");
85 }
86
87 self.current_max_retransmit_count = max(self.current_max_retransmit_count, retransmit_count);
88 scheduled_frame.retransmit_count = retransmit_count;
89 scheduled_frame.time = now;
90
91 let original_frame_id = scheduled_frame.original_frame_id;
92 let mut retransmit_frame = scheduled_frame.frame.clone();
93 retransmit_frame.frame_id = retransmit_frame_id;
94 let retransmit_header = Header::Retransmit(RetransmitHeader::new(original_frame_id));
95 retransmit_frame.headers.add(retransmit_header);
96 self.frames.push_back(scheduled_frame);
97 return Some(retransmit_frame);
98 } else {
99 return None;
100 }
101 }
102 }
103 }
104 }
105
106 pub(crate) fn on_frame_received(&mut self, frame: &Frame) {
110 let ack_headers: HeaderVec<&AckHeader> = frame.headers.find(|p| match p {
111 Header::Ack(header) => Some(header),
112 _ => None
113 });
114 ack_headers.iter().for_each(|ack_header| {
115 ack_header.get_frames().for_each(|frame_id| {
116 self.wait_ack_frames.remove(frame_id);
117 });
118 });
119 }
120 pub fn build_frame(&mut self, frame: &Frame, now: Instant) {
124 if frame.reliability {
125 let original_frame_id = frame.frame_id;
126 self.frames.push_back(ScheduledFrame {
127 time: now,
128 original_frame_id,
129 frame: frame.clone(),
130 retransmit_count: 0,
131 });
132 self.wait_ack_frames.insert(original_frame_id);
133 }
134 }
135
136 pub fn is_disconnected(&self, _: Instant) -> bool {
137 self.current_max_retransmit_count >= self.retransmit_limit
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::ops::Add;
144 use std::time::{Duration, Instant};
145
146 use crate::frame::headers::Header;
147 use crate::frame::Frame;
148 use crate::frame::FrameId;
149 use crate::reliable::ack::header::AckHeader;
150 use crate::reliable::retransmit::Retransmitter;
151
152 #[test]
153 fn should_empty_when_get_retransmit_frame() {
157 let mut handler = Retransmitter::new(Duration::default());
158 assert!(matches!(handler.get_retransmit_frame(Instant::now(), 1), None));
159 }
160
161 #[test]
165 fn should_empty_when_no_timeout() {
166 let mut handler = Retransmitter::new(Duration::default());
167 let now = Instant::now();
168 handler.build_frame(&create_reliability_frame(1), now);
169 assert!(matches!(handler.get_retransmit_frame(now, 2), None));
170 }
171
172 #[test]
176 fn should_add_retransmit_header() {
177 let mut handler = Retransmitter::new(Duration::default());
178 let now = Instant::now();
179 let original_frame = create_reliability_frame(1);
180 handler.build_frame(&original_frame, now);
181 let get_time = now.add(handler.ack_wait_duration);
182 assert!(matches!(
183 handler.get_retransmit_frame(get_time,2),
184 Some(frame)
185 if frame.frame_id == 2
186 &&
187 frame.headers.first(|p| match p {
188 Header::Retransmit(header) => Some(header),
189 _ => None
190 }).unwrap().original_frame_id==original_frame.frame_id
191 ));
192 }
193
194 #[test]
198 fn should_return_retransmit_frame_when_timeout() {
199 let mut handler = Retransmitter::new(Duration::default());
200 let now = Instant::now();
201 let frame = create_reliability_frame(1);
202 handler.build_frame(&frame, now);
203
204 let get_time = now.add(handler.ack_wait_duration);
205 assert!(matches!(
206 handler.get_retransmit_frame(get_time,2),
207 Some(retransmit_frame) if retransmit_frame.frame_id ==2 ));
208 }
209
210 #[test]
214 fn should_return_none_for_unreliable_frame() {
215 let mut handler = Retransmitter::new(Duration::default());
216 let now = Instant::now();
217 let frame = create_unreliable_frame(1);
218 handler.build_frame(&frame, now);
219
220 let get_time = now.add(handler.ack_wait_duration);
221 assert!(matches!(handler.get_retransmit_frame(get_time, 2), None));
222 }
223
224 #[test]
228 fn should_return_none_then_ack() {
229 let mut handler = Retransmitter::new(Duration::default());
230 let now = Instant::now();
231 let frame = create_reliability_frame(1);
232 handler.build_frame(&frame, now);
233 handler.on_frame_received(&create_ack_frame(100, frame.frame_id));
234 let get_time = now.add(handler.ack_wait_duration);
235 assert!(matches!(handler.get_retransmit_frame(get_time, 2), None));
236 }
237
238 #[test]
243 fn should_retransmit_after_retransmit() {
244 let mut handler = Retransmitter::new(Duration::default());
245 let now = Instant::now();
246 let frame = create_reliability_frame(1);
247 handler.build_frame(&frame, now);
248
249 let get_time = now.add(handler.ack_wait_duration);
250 assert!(matches!(
251 handler.get_retransmit_frame(get_time,2),
252 Some(retransmit_frame) if retransmit_frame.frame_id == 2));
253 assert!(matches!(handler.get_retransmit_frame(get_time, 3), None));
254 let get_time = get_time.add(handler.ack_wait_duration);
255 assert!(matches!(
256 handler.get_retransmit_frame(get_time,4),
257 Some(retransmit_frame) if retransmit_frame.frame_id == 4 ));
258 }
259
260 #[test]
264 fn should_disconnet_by_timeout() {
265 let mut handler = Retransmitter::new(Duration::from_secs(1));
266 let now = Instant::now();
267 let frame = create_reliability_frame(1);
268 handler.build_frame(&frame, now);
269
270 let mut get_time = now;
271 for _ in 0..handler.retransmit_limit - 1 {
272 get_time = get_time.add(handler.ack_wait_duration);
273 handler.get_retransmit_frame(get_time, 2);
274 }
275
276 assert!(!handler.is_disconnected(get_time));
277
278 get_time = get_time.add(handler.ack_wait_duration);
279 handler.get_retransmit_frame(get_time, 3);
280
281 assert!(handler.is_disconnected(get_time));
282 }
283
284 fn create_reliability_frame(frame_id: FrameId) -> Frame {
285 Frame::new(0, frame_id, true, Default::default())
286 }
287
288 fn create_unreliable_frame(frame_id: FrameId) -> Frame {
289 Frame::new(0, frame_id, false, Default::default())
290 }
291
292 fn create_ack_frame(frame_id: FrameId, acked_frame_id: FrameId) -> Frame {
293 let mut frame = Frame::new(0, frame_id, false, Default::default());
294 let mut ack_header = AckHeader::default();
295 ack_header.add_frame_id(acked_frame_id);
296 frame.headers.add(Header::Ack(ack_header));
297 frame
298 }
299}