cheetah_game_realtime_protocol/reliable/retransmit/
mod.rs

1use std::cmp::max;
2use std::collections::{HashSet, VecDeque};
3use std::ops::Sub;
4use std::time::{Duration, Instant};
5use std::usize;
6
7use fnv::FnvBuildHasher;
8
9use crate::frame::headers::{Header, HeaderVec};
10use crate::frame::Frame;
11use crate::frame::FrameId;
12use crate::reliable::ack::header::AckHeader;
13use crate::reliable::retransmit::header::RetransmitHeader;
14
15pub mod header;
16
17///
18/// Время ожидания ACK
19///
20pub const RETRANSMIT_DEFAULT_ACK_TIMEOUT: Duration = Duration::from_millis(300);
21
22#[derive(Debug)]
23pub struct Retransmitter {
24	///
25	/// Фреймы, отсортированные по времени отсылки
26	///
27	frames: VecDeque<ScheduledFrame>,
28
29	///
30	/// Фреймы, для которых мы ожидаем ACK
31	///
32	wait_ack_frames: HashSet<FrameId, FnvBuildHasher>,
33
34	///
35	/// Текущее максимальное количество повтора пакета
36	///
37	current_max_retransmit_count: usize,
38	///
39	/// Время ожидания подтверждения на фрейм
40	///
41	ack_wait_duration: Duration,
42
43	retransmit_limit: usize,
44}
45
46#[derive(Debug)]
47pub struct ScheduledFrame {
48	pub time: Instant,
49	pub original_frame_id: FrameId,
50	pub frame: Frame,
51	pub retransmit_count: usize,
52}
53
54impl Retransmitter {
55	pub(crate) fn new(disconnect_timeout: Duration) -> Self {
56		let retransmit_limit = (disconnect_timeout.as_secs_f64() / RETRANSMIT_DEFAULT_ACK_TIMEOUT.as_secs_f64()) as usize;
57
58		Self {
59			frames: Default::default(),
60			wait_ack_frames: Default::default(),
61			current_max_retransmit_count: Default::default(),
62			ack_wait_duration: RETRANSMIT_DEFAULT_ACK_TIMEOUT,
63			retransmit_limit,
64		}
65	}
66	///
67	/// Получить фрейм для повторной отправки (если такой есть)
68	/// - метод необходимо вызывать пока результат Some
69	///
70	pub fn get_retransmit_frame(&mut self, now: Instant, retransmit_frame_id: FrameId) -> Option<Frame> {
71		loop {
72			match self.frames.front() {
73				None => {
74					return None;
75				}
76				Some(scheduled_frame) => {
77					if !self.wait_ack_frames.contains(&scheduled_frame.original_frame_id) {
78						self.frames.pop_front();
79					} else if now.sub(scheduled_frame.time) >= self.ack_wait_duration {
80						let mut scheduled_frame = self.frames.pop_front().unwrap();
81
82						let retransmit_count = scheduled_frame.retransmit_count.checked_add(1).unwrap_or(usize::MAX);
83						if retransmit_count == usize::MAX {
84							tracing::error!("Retransmit count overflow");
85						}
86
87						self.current_max_retransmit_count = max(self.current_max_retransmit_count, retransmit_count);
88						scheduled_frame.retransmit_count = retransmit_count;
89						scheduled_frame.time = now;
90
91						let original_frame_id = scheduled_frame.original_frame_id;
92						let mut retransmit_frame = scheduled_frame.frame.clone();
93						retransmit_frame.frame_id = retransmit_frame_id;
94						let retransmit_header = Header::Retransmit(RetransmitHeader::new(original_frame_id));
95						retransmit_frame.headers.add(retransmit_header);
96						self.frames.push_back(scheduled_frame);
97						return Some(retransmit_frame);
98					} else {
99						return None;
100					}
101				}
102			}
103		}
104	}
105
106	///
107	/// Обрабатываем подтверждения фреймов
108	///
109	pub(crate) fn on_frame_received(&mut self, frame: &Frame) {
110		let ack_headers: HeaderVec<&AckHeader> = frame.headers.find(|p| match p {
111			Header::Ack(header) => Some(header),
112			_ => None
113		});
114		ack_headers.iter().for_each(|ack_header| {
115			ack_header.get_frames().for_each(|frame_id| {
116				self.wait_ack_frames.remove(frame_id);
117			});
118		});
119	}
120	///
121	/// Фрейм отослан - запоминаем для повтора
122	///
123	pub fn build_frame(&mut self, frame: &Frame, now: Instant) {
124		if frame.reliability {
125			let original_frame_id = frame.frame_id;
126			self.frames.push_back(ScheduledFrame {
127				time: now,
128				original_frame_id,
129				frame: frame.clone(),
130				retransmit_count: 0,
131			});
132			self.wait_ack_frames.insert(original_frame_id);
133		}
134	}
135
136	pub fn is_disconnected(&self, _: Instant) -> bool {
137		self.current_max_retransmit_count >= self.retransmit_limit
138	}
139}
140
141#[cfg(test)]
142mod tests {
143	use std::ops::Add;
144	use std::time::{Duration, Instant};
145
146	use crate::frame::headers::Header;
147	use crate::frame::Frame;
148	use crate::frame::FrameId;
149	use crate::reliable::ack::header::AckHeader;
150	use crate::reliable::retransmit::Retransmitter;
151
152	#[test]
153	///
154	/// Если не было отосланных фреймов - то нет фреймов и для повтора
155	///
156	fn should_empty_when_get_retransmit_frame() {
157		let mut handler = Retransmitter::new(Duration::default());
158		assert!(matches!(handler.get_retransmit_frame(Instant::now(), 1), None));
159	}
160
161	///
162	/// Для фрейма не получено подтверждение, но таймаут ожидания еще не прошел
163	///
164	#[test]
165	fn should_empty_when_no_timeout() {
166		let mut handler = Retransmitter::new(Duration::default());
167		let now = Instant::now();
168		handler.build_frame(&create_reliability_frame(1), now);
169		assert!(matches!(handler.get_retransmit_frame(now, 2), None));
170	}
171
172	///
173	/// Для повторно отправляемого фрейма должен быть добавлен заголовок с id исходного фрейма
174	///
175	#[test]
176	fn should_add_retransmit_header() {
177		let mut handler = Retransmitter::new(Duration::default());
178		let now = Instant::now();
179		let original_frame = create_reliability_frame(1);
180		handler.build_frame(&original_frame, now);
181		let get_time = now.add(handler.ack_wait_duration);
182		assert!(matches!(
183			handler.get_retransmit_frame(get_time,2),
184			Some(frame)
185			if frame.frame_id == 2
186			&&
187			frame.headers.first(|p| match p {
188            Header::Retransmit(header) => Some(header),
189            _ => None
190        }).unwrap().original_frame_id==original_frame.frame_id
191		));
192	}
193
194	///
195	/// Для фрейма не получено подтверждение, таймаут ожидания прошел
196	///
197	#[test]
198	fn should_return_retransmit_frame_when_timeout() {
199		let mut handler = Retransmitter::new(Duration::default());
200		let now = Instant::now();
201		let frame = create_reliability_frame(1);
202		handler.build_frame(&frame, now);
203
204		let get_time = now.add(handler.ack_wait_duration);
205		assert!(matches!(
206				handler.get_retransmit_frame(get_time,2),
207				Some(retransmit_frame) if retransmit_frame.frame_id ==2 ));
208	}
209
210	///
211	/// Для фрейма без надежной доставки не должно быть повторных фреймов
212	///
213	#[test]
214	fn should_return_none_for_unreliable_frame() {
215		let mut handler = Retransmitter::new(Duration::default());
216		let now = Instant::now();
217		let frame = create_unreliable_frame(1);
218		handler.build_frame(&frame, now);
219
220		let get_time = now.add(handler.ack_wait_duration);
221		assert!(matches!(handler.get_retransmit_frame(get_time, 2), None));
222	}
223
224	///
225	/// Если для фрейма получен ACK - то его не должно быть в повторных
226	///
227	#[test]
228	fn should_return_none_then_ack() {
229		let mut handler = Retransmitter::new(Duration::default());
230		let now = Instant::now();
231		let frame = create_reliability_frame(1);
232		handler.build_frame(&frame, now);
233		handler.on_frame_received(&create_ack_frame(100, frame.frame_id));
234		let get_time = now.add(handler.ack_wait_duration);
235		assert!(matches!(handler.get_retransmit_frame(get_time, 2), None));
236	}
237
238	///
239	/// Если не было ACK после повторной отправки - то фрейм должен быть повторно отослан через
240	/// Timeout
241	///
242	#[test]
243	fn should_retransmit_after_retransmit() {
244		let mut handler = Retransmitter::new(Duration::default());
245		let now = Instant::now();
246		let frame = create_reliability_frame(1);
247		handler.build_frame(&frame, now);
248
249		let get_time = now.add(handler.ack_wait_duration);
250		assert!(matches!(
251				handler.get_retransmit_frame(get_time,2),
252				Some(retransmit_frame) if retransmit_frame.frame_id == 2));
253		assert!(matches!(handler.get_retransmit_frame(get_time, 3), None));
254		let get_time = get_time.add(handler.ack_wait_duration);
255		assert!(matches!(
256				handler.get_retransmit_frame(get_time,4),
257				Some(retransmit_frame) if retransmit_frame.frame_id == 4 ));
258	}
259
260	///
261	/// Канал должен быть закрыт, после N не успешных попыток отправок
262	///
263	#[test]
264	fn should_disconnet_by_timeout() {
265		let mut handler = Retransmitter::new(Duration::from_secs(1));
266		let now = Instant::now();
267		let frame = create_reliability_frame(1);
268		handler.build_frame(&frame, now);
269
270		let mut get_time = now;
271		for _ in 0..handler.retransmit_limit - 1 {
272			get_time = get_time.add(handler.ack_wait_duration);
273			handler.get_retransmit_frame(get_time, 2);
274		}
275
276		assert!(!handler.is_disconnected(get_time));
277
278		get_time = get_time.add(handler.ack_wait_duration);
279		handler.get_retransmit_frame(get_time, 3);
280
281		assert!(handler.is_disconnected(get_time));
282	}
283
284	fn create_reliability_frame(frame_id: FrameId) -> Frame {
285		Frame::new(0, frame_id, true, Default::default())
286	}
287
288	fn create_unreliable_frame(frame_id: FrameId) -> Frame {
289		Frame::new(0, frame_id, false, Default::default())
290	}
291
292	fn create_ack_frame(frame_id: FrameId, acked_frame_id: FrameId) -> Frame {
293		let mut frame = Frame::new(0, frame_id, false, Default::default());
294		let mut ack_header = AckHeader::default();
295		ack_header.add_frame_id(acked_frame_id);
296		frame.headers.add(Header::Ack(ack_header));
297		frame
298	}
299}