rak_rs/connection/queue/
mod.rs

1pub(crate) mod recv;
2pub(crate) mod send;
3
4pub use self::recv::*;
5pub use self::send::*;
6
7use std::collections::BTreeMap;
8use std::collections::HashMap;
9
10use crate::protocol::frame::FragmentMeta;
11use crate::protocol::frame::Frame;
12use crate::protocol::reliability::Reliability;
13use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
14use crate::rakrs_debug;
15use crate::server::current_epoch;
16
17#[derive(Debug, Clone)]
18pub enum NetQueueError<E> {
19    /// The insertion failed for any given reason.
20    InvalidInsertion,
21    /// The insertion failed and the reason is known.
22    InvalidInsertionKnown(String),
23    /// The `Item` failed to be removed from the queue.
24    ItemDeletionFail,
25    /// The `Item` is invalid and can not be retrieved.
26    InvalidItem,
27    /// The queue is empty.
28    EmptyQueue,
29    /// The error is a custom error.
30    Other(E),
31}
32
33pub trait NetQueue<Item> {
34    /// The `Item` of the queue.
35    // type Item = V;
36
37    /// The "key" that each `Item` is stored under
38    /// (used for removal)
39    type KeyId;
40
41    /// A custom error specifier for NetQueueError
42    type Error;
43
44    /// Inserts `Item` into the queue, given the conditions are fulfilled.
45    fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>>;
46
47    /// Remove an `Item` from the queue by providing an instance of `Self::KeyId`
48    fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>>;
49
50    /// Retrieves an `Item` from the queue, by reference.
51    fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>>;
52
53    /// Clears the entire queue.
54    fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>>;
55}
56
57/// A recovery queue is used to store packets that need to be resent.
58/// This is used for sequenced and ordered packets.
59#[derive(Debug, Clone)]
60pub struct RecoveryQueue<Item> {
61    /// The current queue of packets by timestamp
62    /// (seq, (packet, timestamp))
63    // TODO use the timestamp for round trip time (RTT)
64    queue: HashMap<u32, (u64, Item)>,
65}
66
67impl<Item> RecoveryQueue<Item>
68where
69    Item: Clone,
70{
71    pub fn new() -> Self {
72        Self {
73            queue: HashMap::new(),
74        }
75    }
76
77    pub fn insert_id(&mut self, seq: u32, item: Item) {
78        self.queue.insert(seq, (current_epoch(), item));
79    }
80
81    pub fn get_all(&mut self) -> Vec<(u32, Item)> {
82        self.queue
83            .iter()
84            .map(|(seq, (_, item))| (*seq, item.clone()))
85            .collect::<Vec<_>>()
86    }
87
88    pub fn flush_old(&mut self, threshold: u64) -> Vec<Item> {
89        let old = self
90            .queue
91            .iter()
92            .filter(|(_, (time, _))| (*time + threshold) < current_epoch())
93            .map(|(_, (_, item))| item.clone())
94            .collect::<Vec<_>>();
95        self.queue
96            .retain(|_, (time, _)| (*time + threshold) > current_epoch());
97        old
98    }
99}
100
101impl<Item> NetQueue<Item> for RecoveryQueue<Item> {
102    type KeyId = u32;
103    type Error = ();
104
105    fn insert(&mut self, item: Item) -> Result<Self::KeyId, NetQueueError<Self::Error>> {
106        let index = self.queue.len() as u32;
107        self.queue.insert(index, (current_epoch(), item));
108        Ok(index)
109    }
110
111    fn remove(&mut self, key: Self::KeyId) -> Result<Item, NetQueueError<Self::Error>> {
112        if let Some((_, item)) = self.queue.remove(&key) {
113            Ok(item)
114        } else {
115            Err(NetQueueError::ItemDeletionFail)
116        }
117    }
118
119    fn get(&mut self, key: Self::KeyId) -> Result<&Item, NetQueueError<Self::Error>> {
120        if let Some((_, item)) = self.queue.get(&key) {
121            Ok(item)
122        } else {
123            Err(NetQueueError::ItemDeletionFail)
124        }
125    }
126
127    fn flush(&mut self) -> Result<Vec<Item>, NetQueueError<Self::Error>> {
128        let mut items = Vec::new();
129        for (_, (_, item)) in self.queue.drain() {
130            items.push(item);
131        }
132        Ok(items)
133    }
134}
135
136/// An ordered queue is used to Index incoming packets over a channel
137/// within a reliable window time.
138///
139/// Usage:
140/// ```ignore
141/// use rak_rs::connection::queue::OrderedQueue;
142/// let mut ord_qu: OrderedQueue<Vec<u8>> = OrderedQueue::new();
143/// // Insert a packet with the id of "1"
144/// ord_qu.insert(1, vec![0, 1]);
145/// ord_qu.insert(5, vec![1, 0]);
146/// ord_qu.insert(3, vec![2, 0]);
147///
148/// // Get the packets we still need.
149/// let needed: Vec<u32> = ord_qu.missing();
150/// assert_eq!(needed, vec![0, 2, 4]);
151///
152/// // We would in theory, request these packets, but we're going to insert them
153/// ord_qu.insert(4, vec![2, 0, 0, 1]);
154/// ord_qu.insert(2, vec![1, 0, 0, 2]);
155///
156/// // Now let's return our packets in order.
157/// // Will return a vector of these packets in order by their "id".
158/// let ordered: Vec<Vec<u8>> = ord_qu.flush();
159/// ```
160#[derive(Debug, Clone)]
161pub struct OrderedQueue<Item: Clone + std::fmt::Debug> {
162    /// The current ordered queue channels
163    /// Channel, (Highest Index, Ord Index, Item)
164    pub queue: BTreeMap<u32, Item>,
165    /// The window for this queue.
166    pub window: (u32, u32),
167}
168
169impl<Item> OrderedQueue<Item>
170where
171    Item: Clone + std::fmt::Debug,
172{
173    pub fn new() -> Self {
174        Self {
175            queue: BTreeMap::new(),
176            window: (0, 0),
177        }
178    }
179
180    pub fn next(&mut self) -> u32 {
181        self.window.0 = self.window.0.wrapping_add(1);
182        return self.window.0;
183    }
184
185    pub fn insert(&mut self, index: u32, item: Item) -> bool {
186        if index < self.window.0 {
187            return false;
188        }
189
190        if self.queue.contains_key(&index) {
191            return false;
192        }
193
194        if index >= self.window.1 {
195            self.window.1 = index + 1;
196        }
197
198        self.queue.insert(index, item);
199        true
200    }
201
202    pub fn insert_abs(&mut self, index: u32, item: Item) {
203        if index >= self.window.1 {
204            self.window.1 = index + 1;
205        }
206
207        self.queue.insert(index, item);
208    }
209
210    pub fn missing(&self) -> Vec<u32> {
211        let mut missing = Vec::new();
212        for i in self.window.0..self.window.1 {
213            if !self.queue.contains_key(&i) {
214                missing.push(i);
215            }
216        }
217        missing
218    }
219
220    /// Forcefully flushes the incoming queue resetting the highest window
221    /// to the lowest window.
222    ///
223    /// THIS IS A PATCH FIX UNTIL I CAN FIGURE OUT WHY THE OTHER FLUSH IS BROKEN
224    pub fn flush(&mut self) -> Vec<Item> {
225        let mut items = Vec::new();
226        for i in self.window.0..self.window.1 {
227            if let Some(item) = self.queue.remove(&i) {
228                items.push(item);
229            }
230        }
231        self.window.0 = self.window.1;
232        items
233    }
234
235    /// Older, broken implementation, idk what is causing this to break
236    /// after index 3
237    /// The logic here is supposed to be, remove all indexes until the highest most up to date index.
238    /// and retain older indexes until the order is correct.
239    pub fn flush_old_impl(&mut self) -> Vec<Item> {
240        let mut items = Vec::<(u32, Item)>::new();
241
242        let mut i = self.window.0;
243
244        while self.queue.contains_key(&i) {
245            rakrs_debug!("[!>] Removing: {}", &i);
246            if let Some(item) = self.queue.remove(&i) {
247                items.push((self.window.0, item));
248                i += 1;
249            } else {
250                break;
251            }
252        }
253
254        self.window.0 = i;
255
256        items.sort_by(|a, b| a.0.cmp(&b.0));
257        return items
258            .iter()
259            .map(|(_, item)| item.clone())
260            .collect::<Vec<Item>>();
261    }
262}
263
264/// A specialized structure for re-ordering fragments over the wire.
265/// You can use this structure to fragment frames as well.
266///
267/// **NOTE:** This structure will NOT update a frame's reliable index!
268/// The sender is required to this!
269#[derive(Clone, Debug)]
270pub struct FragmentQueue {
271    /// The current fragment id to use
272    /// If for some reason this wraps back to 0,
273    /// and the fragment queue is full, 0 is then cleared and reused.
274    fragment_id: u16,
275
276    /// The current Fragments
277    /// Hashmap is by Fragment id, with the value being
278    /// (`size`, Vec<Frame>)
279    fragments: HashMap<u16, (u32, Vec<Frame>)>,
280}
281
282impl FragmentQueue {
283    pub fn new() -> Self {
284        Self {
285            fragment_id: 0,
286            fragments: HashMap::new(),
287        }
288    }
289
290    /// Inserts the frame into the fragment queue.
291    /// Returns a result tuple of (`fragment_size`, `fragment_index`)
292    pub fn insert(&mut self, fragment: Frame) -> Result<(u32, u32), FragmentQueueError> {
293        if let Some(meta) = fragment.fragment_meta.clone() {
294            if let Some((size, frames)) = self.fragments.get_mut(&meta.id) {
295                // check if the frame index is out of bounds
296                // todo: Check if == or >, I think it's > but I'm not sure.
297                // todo: This is because the index starts at 0 and the size starts at 1.
298                if meta.index >= *size {
299                    return Err(FragmentQueueError::FrameIndexOutOfBounds);
300                }
301                // the frame exists, and we have parts, check if we have this particular frame already.
302                if let Some(_) = frames
303                    .iter()
304                    .find(|&f| f.fragment_meta.as_ref().unwrap().index == meta.index)
305                {
306                    // We already have this frame! Do not replace it!!
307                    return Err(FragmentQueueError::FrameExists);
308                } else {
309                    frames.push(fragment);
310                    return Ok((meta.size, meta.index));
311                }
312            } else {
313                // We don't already have this fragment index!
314                let (size, mut frames) = (meta.size, Vec::<Frame>::new());
315                frames.push(fragment);
316
317                self.fragments.insert(meta.id, (size, frames));
318                return Ok((meta.size, meta.index));
319            }
320        }
321
322        return Err(FragmentQueueError::FrameNotFragmented);
323    }
324
325    /// Attempts to collect all fragments from a given fragment id.
326    /// Will fail if not all fragments are specified.
327    pub fn collect(&mut self, id: u16) -> Result<Vec<u8>, FragmentQueueError> {
328        if let Some((size, frames)) = self.fragments.get_mut(&id) {
329            if *size == frames.len() as u32 {
330                // sort all frames by id,
331                // because we now have all frames.
332                frames.sort_by(|a, b| {
333                    a.fragment_meta
334                        .as_ref()
335                        .unwrap()
336                        .index
337                        .cmp(&b.fragment_meta.as_ref().unwrap().index)
338                });
339
340                let mut buffer = Vec::<u8>::new();
341
342                for frame in frames.iter() {
343                    buffer.extend_from_slice(&frame.body);
344                }
345
346                self.fragments.remove(&id);
347                return Ok(buffer);
348            }
349            return Err(FragmentQueueError::FragmentsMissing);
350        }
351
352        return Err(FragmentQueueError::FragmentInvalid);
353    }
354
355    /// This will split a given frame into a bunch of smaller frames within the specified
356    /// restriction.
357    pub fn split_insert(&mut self, buffer: &[u8], mtu: u16) -> Result<u16, FragmentQueueError> {
358        self.fragment_id += self.fragment_id.wrapping_add(1);
359
360        let id = self.fragment_id;
361
362        if self.fragments.contains_key(&id) {
363            self.fragments.remove(&id);
364        }
365
366        if let Ok(frames) = Self::split(buffer, id, mtu) {
367            self.fragments.insert(id, (frames.len() as u32, frames));
368            return Ok(id);
369        }
370
371        return Err(FragmentQueueError::DoesNotNeedSplit);
372    }
373
374    pub fn split(buffer: &[u8], id: u16, mtu: u16) -> Result<Vec<Frame>, FragmentQueueError> {
375        let max_mtu = mtu - RAKNET_HEADER_FRAME_OVERHEAD;
376
377        if buffer.len() > max_mtu.into() {
378            let splits = buffer
379                .chunks(max_mtu.into())
380                .map(|c| c.to_vec())
381                .collect::<Vec<Vec<u8>>>();
382            let mut frames: Vec<Frame> = Vec::new();
383            let mut index: u32 = 0;
384
385            for buf in splits.iter() {
386                let mut f = Frame::new(Reliability::ReliableOrd, Some(&buf[..]));
387                f.fragment_meta = Some(FragmentMeta {
388                    index,
389                    size: splits.len() as u32,
390                    id,
391                });
392
393                index += 1;
394
395                frames.push(f);
396            }
397
398            return Ok(frames);
399        }
400
401        return Err(FragmentQueueError::DoesNotNeedSplit);
402    }
403
404    pub fn get(&self, id: &u16) -> Result<&(u32, Vec<Frame>), FragmentQueueError> {
405        if let Some(v) = self.fragments.get(id) {
406            return Ok(v);
407        }
408
409        return Err(FragmentQueueError::FragmentInvalid);
410    }
411
412    pub fn get_mut(&mut self, id: &u16) -> Result<&mut (u32, Vec<Frame>), FragmentQueueError> {
413        if let Some(v) = self.fragments.get_mut(id) {
414            return Ok(v);
415        }
416
417        return Err(FragmentQueueError::FragmentInvalid);
418    }
419
420    pub fn remove(&mut self, id: &u16) -> bool {
421        self.fragments.remove(id).is_some()
422    }
423
424    /// This will hard clear the fragment queue, this should only be used if memory becomes an issue!
425    pub fn clear(&mut self) {
426        self.fragment_id = 0;
427        self.fragments.clear();
428    }
429}
430
431#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
432pub enum FragmentQueueError {
433    FrameExists,
434    FrameNotFragmented,
435    DoesNotNeedSplit,
436    FragmentInvalid,
437    FragmentsMissing,
438    FrameIndexOutOfBounds,
439}
440
441impl std::fmt::Display for FragmentQueueError {
442    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
443        write!(
444            f,
445            "{}",
446            match self {
447                FragmentQueueError::FrameExists => "Frame already exists",
448                FragmentQueueError::FrameNotFragmented => "Frame is not fragmented",
449                FragmentQueueError::DoesNotNeedSplit => "Frame does not need to be split",
450                FragmentQueueError::FragmentInvalid => "Fragment is invalid",
451                FragmentQueueError::FragmentsMissing => "Fragments are missing",
452                FragmentQueueError::FrameIndexOutOfBounds => "Frame index is out of bounds",
453            }
454        )
455    }
456}
457
458impl std::error::Error for FragmentQueueError {}