1pub(crate) mod recv;
2pub(crate) mod send;
3
4pub use self::recv::*;
5pub use self::send::*;
6
7use std::collections::BTreeMap;
8use std::collections::HashMap;
9
10use crate::protocol::frame::FragmentMeta;
11use crate::protocol::frame::Frame;
12use crate::protocol::reliability::Reliability;
13use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
14use crate::rakrs_debug;
15use crate::server::current_epoch;
16
17#[derive(Debug, Clone)]
18pub enum NetQueueError<E> {
19 InvalidInsertion,
21 InvalidInsertionKnown(String),
23 ItemDeletionFail,
25 InvalidItem,
27 EmptyQueue,
29 Other(E),
31}
32
33pub trait NetQueue<Item> {
34 type KeyId;
40
41 type Error;
43
44 fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>>;
46
47 fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>>;
49
50 fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>>;
52
53 fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>>;
55}
56
57#[derive(Debug, Clone)]
60pub struct RecoveryQueue<Item> {
61 queue: HashMap<u32, (u64, Item)>,
65}
66
67impl<Item> RecoveryQueue<Item>
68where
69 Item: Clone,
70{
71 pub fn new() -> Self {
72 Self {
73 queue: HashMap::new(),
74 }
75 }
76
77 pub fn insert_id(&mut self, seq: u32, item: Item) {
78 self.queue.insert(seq, (current_epoch(), item));
79 }
80
81 pub fn get_all(&mut self) -> Vec<(u32, Item)> {
82 self.queue
83 .iter()
84 .map(|(seq, (_, item))| (*seq, item.clone()))
85 .collect::<Vec<_>>()
86 }
87
88 pub fn flush_old(&mut self, threshold: u64) -> Vec<Item> {
89 let old = self
90 .queue
91 .iter()
92 .filter(|(_, (time, _))| (*time + threshold) < current_epoch())
93 .map(|(_, (_, item))| item.clone())
94 .collect::<Vec<_>>();
95 self.queue
96 .retain(|_, (time, _)| (*time + threshold) > current_epoch());
97 old
98 }
99}
100
101impl<Item> NetQueue<Item> for RecoveryQueue<Item> {
102 type KeyId = u32;
103 type Error = ();
104
105 fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>> {
106 let index = self.queue.len() as u32;
107 self.queue.insert(index, (current_epoch(), item));
108 Ok(index)
109 }
110
111 fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>> {
112 if let Some((_, item)) = self.queue.remove(&key) {
113 Ok(item)
114 } else {
115 Err(NetQueueError::ItemDeletionFail)
116 }
117 }
118
119 fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>> {
120 if let Some((_, item)) = self.queue.get(&key) {
121 Ok(item)
122 } else {
123 Err(NetQueueError::ItemDeletionFail)
124 }
125 }
126
127 fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>> {
128 let mut items = Vec::new();
129 for (_, (_, item)) in self.queue.drain() {
130 items.push(item);
131 }
132 Ok(items)
133 }
134}
135
136#[derive(Debug, Clone)]
161pub struct OrderedQueue<Item: Clone + std::fmt::Debug> {
162 pub queue: BTreeMap<u32, Item>,
165 pub window: (u32, u32),
167}
168
169impl<Item> OrderedQueue<Item>
170where
171 Item: Clone + std::fmt::Debug,
172{
173 pub fn new() -> Self {
174 Self {
175 queue: BTreeMap::new(),
176 window: (0, 0),
177 }
178 }
179
180 pub fn next(&mut self) -> u32 {
181 self.window.0 = self.window.0.wrapping_add(1);
182 return self.window.0;
183 }
184
185 pub fn insert(&mut self, index: u32, item: Item) -> bool {
186 if index < self.window.0 {
187 return false;
188 }
189
190 if self.queue.contains_key(&index) {
191 return false;
192 }
193
194 if index >= self.window.1 {
195 self.window.1 = index + 1;
196 }
197
198 self.queue.insert(index, item);
199 true
200 }
201
202 pub fn insert_abs(&mut self, index: u32, item: Item) {
203 if index >= self.window.1 {
204 self.window.1 = index + 1;
205 }
206
207 self.queue.insert(index, item);
208 }
209
210 pub fn missing(&self) -> Vec<u32> {
211 let mut missing = Vec::new();
212 for i in self.window.0..self.window.1 {
213 if !self.queue.contains_key(&i) {
214 missing.push(i);
215 }
216 }
217 missing
218 }
219
220 pub fn flush(&mut self) -> Vec<Item> {
225 let mut items = Vec::new();
226 for i in self.window.0..self.window.1 {
227 if let Some(item) = self.queue.remove(&i) {
228 items.push(item);
229 }
230 }
231 self.window.0 = self.window.1;
232 items
233 }
234
235 pub fn flush_old_impl(&mut self) -> Vec<Item> {
240 let mut items = Vec::<(u32, Item)>::new();
241
242 let mut i = self.window.0;
243
244 while self.queue.contains_key(&i) {
245 rakrs_debug!("[!>] Removing: {}", &i);
246 if let Some(item) = self.queue.remove(&i) {
247 items.push((self.window.0, item));
248 i += 1;
249 } else {
250 break;
251 }
252 }
253
254 self.window.0 = i;
255
256 items.sort_by(|a, b| a.0.cmp(&b.0));
257 return items
258 .iter()
259 .map(|(_, item)| item.clone())
260 .collect::<Vec<Item>>();
261 }
262}
263
264#[derive(Clone, Debug)]
270pub struct FragmentQueue {
271 fragment_id: u16,
275
276 fragments: HashMap<u16, (u32, Vec<Frame>)>,
280}
281
282impl FragmentQueue {
283 pub fn new() -> Self {
284 Self {
285 fragment_id: 0,
286 fragments: HashMap::new(),
287 }
288 }
289
290 pub fn insert(&mut self, fragment: Frame) -> Result<(u32, u32), FragmentQueueError> {
293 if let Some(meta) = fragment.fragment_meta.clone() {
294 if let Some((size, frames)) = self.fragments.get_mut(&meta.id) {
295 if meta.index >= *size {
299 return Err(FragmentQueueError::FrameIndexOutOfBounds);
300 }
301 if let Some(_) = frames
303 .iter()
304 .find(|&f| f.fragment_meta.as_ref().unwrap().index == meta.index)
305 {
306 return Err(FragmentQueueError::FrameExists);
308 } else {
309 frames.push(fragment);
310 return Ok((meta.size, meta.index));
311 }
312 } else {
313 let (size, mut frames) = (meta.size, Vec::<Frame>::new());
315 frames.push(fragment);
316
317 self.fragments.insert(meta.id, (size, frames));
318 return Ok((meta.size, meta.index));
319 }
320 }
321
322 return Err(FragmentQueueError::FrameNotFragmented);
323 }
324
325 pub fn collect(&mut self, id: u16) -> Result<Vec<u8>, FragmentQueueError> {
328 if let Some((size, frames)) = self.fragments.get_mut(&id) {
329 if *size == frames.len() as u32 {
330 frames.sort_by(|a, b| {
333 a.fragment_meta
334 .as_ref()
335 .unwrap()
336 .index
337 .cmp(&b.fragment_meta.as_ref().unwrap().index)
338 });
339
340 let mut buffer = Vec::<u8>::new();
341
342 for frame in frames.iter() {
343 buffer.extend_from_slice(&frame.body);
344 }
345
346 self.fragments.remove(&id);
347 return Ok(buffer);
348 }
349 return Err(FragmentQueueError::FragmentsMissing);
350 }
351
352 return Err(FragmentQueueError::FragmentInvalid);
353 }
354
355 pub fn split_insert(&mut self, buffer: &[u8], mtu: u16) -> Result<u16, FragmentQueueError> {
358 self.fragment_id += self.fragment_id.wrapping_add(1);
359
360 let id = self.fragment_id;
361
362 if self.fragments.contains_key(&id) {
363 self.fragments.remove(&id);
364 }
365
366 if let Ok(frames) = Self::split(buffer, id, mtu) {
367 self.fragments.insert(id, (frames.len() as u32, frames));
368 return Ok(id);
369 }
370
371 return Err(FragmentQueueError::DoesNotNeedSplit);
372 }
373
374 pub fn split(buffer: &[u8], id: u16, mtu: u16) -> Result<Vec<Frame>, FragmentQueueError> {
375 let max_mtu = mtu - RAKNET_HEADER_FRAME_OVERHEAD;
376
377 if buffer.len() > max_mtu.into() {
378 let splits = buffer
379 .chunks(max_mtu.into())
380 .map(|c| c.to_vec())
381 .collect::<Vec<Vec<u8>>>();
382 let mut frames: Vec<Frame> = Vec::new();
383 let mut index: u32 = 0;
384
385 for buf in splits.iter() {
386 let mut f = Frame::new(Reliability::ReliableOrd, Some(&buf[..]));
387 f.fragment_meta = Some(FragmentMeta {
388 index,
389 size: splits.len() as u32,
390 id,
391 });
392
393 index += 1;
394
395 frames.push(f);
396 }
397
398 return Ok(frames);
399 }
400
401 return Err(FragmentQueueError::DoesNotNeedSplit);
402 }
403
404 pub fn get(&self, id: &u16) -> Result<&(u32, Vec<Frame>), FragmentQueueError> {
405 if let Some(v) = self.fragments.get(id) {
406 return Ok(v);
407 }
408
409 return Err(FragmentQueueError::FragmentInvalid);
410 }
411
412 pub fn get_mut(&mut self, id: &u16) -> Result<&mut (u32, Vec<Frame>), FragmentQueueError> {
413 if let Some(v) = self.fragments.get_mut(id) {
414 return Ok(v);
415 }
416
417 return Err(FragmentQueueError::FragmentInvalid);
418 }
419
420 pub fn remove(&mut self, id: &u16) -> bool {
421 self.fragments.remove(id).is_some()
422 }
423
424 pub fn clear(&mut self) {
426 self.fragment_id = 0;
427 self.fragments.clear();
428 }
429}
430
431#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
432pub enum FragmentQueueError {
433 FrameExists,
434 FrameNotFragmented,
435 DoesNotNeedSplit,
436 FragmentInvalid,
437 FragmentsMissing,
438 FrameIndexOutOfBounds,
439}
440
441impl std::fmt::Display for FragmentQueueError {
442 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443 write!(
444 f,
445 "{}",
446 match self {
447 FragmentQueueError::FrameExists => "Frame already exists",
448 FragmentQueueError::FrameNotFragmented => "Frame is not fragmented",
449 FragmentQueueError::DoesNotNeedSplit => "Frame does not need to be split",
450 FragmentQueueError::FragmentInvalid => "Fragment is invalid",
451 FragmentQueueError::FragmentsMissing => "Fragments are missing",
452 FragmentQueueError::FrameIndexOutOfBounds => "Frame index is out of bounds",
453 }
454 )
455 }
456}
457
458impl std::error::Error for FragmentQueueError {}