rak_rs/connection/queue/
recv.rs

1use std::collections::{HashMap, HashSet};
2
3use crate::connection::controller::window::ReliableWindow;
4use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
5use crate::protocol::frame::{Frame, FramePacket};
6use crate::protocol::reliability::Reliability;
7use crate::protocol::MAX_FRAGS;
8use crate::server::current_epoch;
9use crate::{rakrs_debug, rakrs_debug_buffers};
10
11use super::{FragmentQueue, OrderedQueue};
12
13#[derive(Debug, Clone)]
14pub enum RecvQueueError {
15    OldSeq,
16}
17
18#[derive(Debug, Clone)]
19pub struct RecvQueue {
20    frag_queue: FragmentQueue,
21    pub(crate) window: ReliableWindow,
22    pub(crate) reliable_window: ReliableWindow,
23    order_channels: HashMap<u8, OrderedQueue<Vec<u8>>>,
24    /// Set of sequences that we've acknowledged.
25    /// (seq, time)
26    ack: HashSet<(u32, u64)>,
27    nack: HashSet<u32>,
28    ready: Vec<Vec<u8>>,
29}
30
31impl RecvQueue {
32    pub fn new() -> Self {
33        Self {
34            frag_queue: FragmentQueue::new(),
35            ack: HashSet::new(),
36            nack: HashSet::new(),
37            window: ReliableWindow::new(),
38            reliable_window: ReliableWindow::new(),
39            ready: Vec::new(),
40            order_channels: HashMap::new(),
41        }
42    }
43
44    pub fn insert(&mut self, packet: FramePacket) -> Result<(), RecvQueueError> {
45        if !self.window.insert(packet.sequence) {
46            return Err(RecvQueueError::OldSeq);
47        }
48
49        if self.window.range().0 < packet.sequence {
50            for i in self.window.range().0..packet.sequence {
51                self.nack.insert(i);
52            }
53        }
54
55        self.ack.insert((packet.sequence, current_epoch()));
56
57        for frame in packet.frames.iter() {
58            self.handle_frame(frame);
59        }
60
61        return Ok(());
62    }
63
64    pub fn flush(&mut self) -> Vec<Vec<u8>> {
65        self.ready.drain(..).collect::<Vec<Vec<u8>>>()
66    }
67
68    pub fn ack_flush(&mut self) -> Vec<u32> {
69        self.ack.drain().map(|(seq, _)| seq).collect()
70    }
71
72    pub fn nack_queue(&mut self) -> Vec<u32> {
73        self.nack.iter().map(|x| *x).collect::<Vec<u32>>()
74    }
75
76    fn handle_frame(&mut self, frame: &Frame) {
77        if let Some(reliable_index) = frame.reliable_index {
78            if !self.reliable_window.insert(reliable_index) {
79                return;
80            }
81        }
82
83        if let Some(meta) = frame.fragment_meta.as_ref() {
84            if meta.size > MAX_FRAGS {
85                rakrs_debug!(true, "Fragment size is too large, rejected {}!", meta.size);
86                return;
87            }
88            if let Err(_) = self.frag_queue.insert(frame.clone()) {}
89
90            let res = self.frag_queue.collect(meta.id);
91            if let Ok(data) = res {
92                // reconstructed frame packet!
93                self.ready.push(data);
94            } else {
95                rakrs_debug!(
96                    true,
97                    "Still Missing some fragments! {:?}",
98                    frame.fragment_meta.as_ref().unwrap()
99                );
100            }
101            return;
102        }
103
104        rakrs_debug_buffers!(
105            true,
106            "RecvQueue: {}\n{:?}\n",
107            frame.body.len(),
108            frame.body.clone()
109        );
110
111        match frame.reliability {
112            Reliability::Unreliable => {
113                self.ready.push(frame.body.clone());
114            }
115            Reliability::Reliable => {
116                self.ready.push(frame.body.clone());
117            }
118            Reliability::ReliableOrd => {
119                let channel = frame.order_channel.unwrap();
120                let queue = self
121                    .order_channels
122                    .entry(channel)
123                    .or_insert(OrderedQueue::new());
124
125                if queue.insert(frame.order_index.unwrap(), frame.body.clone()) {
126                    for pk in queue.flush() {
127                        self.ready.push(pk);
128                    }
129                }
130            }
131            _ => {
132                self.ready.push(frame.body.clone());
133            }
134        }
135    }
136}
137
138impl Ackable for RecvQueue {
139    type NackItem = ();
140
141    fn ack(&mut self, ack: Ack) {
142        if ack.is_nack() {
143            rakrs_debug!(true, "Invalid ack: {:?}", ack.clone());
144            return;
145        }
146
147        rakrs_debug!(true, "Got ack item: {:?}", ack.clone());
148
149        // these packets are acknowledged, so we can remove them from the queue.
150        for record in ack.records.iter() {
151            match record {
152                Record::Single(SingleRecord { sequence }) => {
153                    self.nack.remove(&sequence);
154                }
155                Record::Range(ranged) => {
156                    for i in ranged.start.0..ranged.end.0 {
157                        self.nack.remove(&i);
158                    }
159                }
160            }
161        }
162    }
163}