rak_rs/connection/queue/
recv.rs1use std::collections::{HashMap, HashSet};
2
3use crate::connection::controller::window::ReliableWindow;
4use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
5use crate::protocol::frame::{Frame, FramePacket};
6use crate::protocol::reliability::Reliability;
7use crate::protocol::MAX_FRAGS;
8use crate::server::current_epoch;
9use crate::{rakrs_debug, rakrs_debug_buffers};
10
11use super::{FragmentQueue, OrderedQueue};
12
13#[derive(Debug, Clone)]
14pub enum RecvQueueError {
15 OldSeq,
16}
17
18#[derive(Debug, Clone)]
19pub struct RecvQueue {
20 frag_queue: FragmentQueue,
21 pub(crate) window: ReliableWindow,
22 pub(crate) reliable_window: ReliableWindow,
23 order_channels: HashMap<u8, OrderedQueue<Vec<u8>>>,
24 ack: HashSet<(u32, u64)>,
27 nack: HashSet<u32>,
28 ready: Vec<Vec<u8>>,
29}
30
31impl RecvQueue {
32 pub fn new() -> Self {
33 Self {
34 frag_queue: FragmentQueue::new(),
35 ack: HashSet::new(),
36 nack: HashSet::new(),
37 window: ReliableWindow::new(),
38 reliable_window: ReliableWindow::new(),
39 ready: Vec::new(),
40 order_channels: HashMap::new(),
41 }
42 }
43
44 pub fn insert(&mut self, packet: FramePacket) -> Result<(), RecvQueueError> {
45 if !self.window.insert(packet.sequence) {
46 return Err(RecvQueueError::OldSeq);
47 }
48
49 if self.window.range().0 < packet.sequence {
50 for i in self.window.range().0..packet.sequence {
51 self.nack.insert(i);
52 }
53 }
54
55 self.ack.insert((packet.sequence, current_epoch()));
56
57 for frame in packet.frames.iter() {
58 self.handle_frame(frame);
59 }
60
61 return Ok(());
62 }
63
64 pub fn flush(&mut self) -> Vec<Vec<u8>> {
65 self.ready.drain(..).collect::<Vec<Vec<u8>>>()
66 }
67
68 pub fn ack_flush(&mut self) -> Vec<u32> {
69 self.ack.drain().map(|(seq, _)| seq).collect()
70 }
71
72 pub fn nack_queue(&mut self) -> Vec<u32> {
73 self.nack.iter().map(|x| *x).collect::<Vec<u32>>()
74 }
75
76 fn handle_frame(&mut self, frame: &Frame) {
77 if let Some(reliable_index) = frame.reliable_index {
78 if !self.reliable_window.insert(reliable_index) {
79 return;
80 }
81 }
82
83 if let Some(meta) = frame.fragment_meta.as_ref() {
84 if meta.size > MAX_FRAGS {
85 rakrs_debug!(true, "Fragment size is too large, rejected {}!", meta.size);
86 return;
87 }
88 if let Err(_) = self.frag_queue.insert(frame.clone()) {}
89
90 let res = self.frag_queue.collect(meta.id);
91 if let Ok(data) = res {
92 self.ready.push(data);
94 } else {
95 rakrs_debug!(
96 true,
97 "Still Missing some fragments! {:?}",
98 frame.fragment_meta.as_ref().unwrap()
99 );
100 }
101 return;
102 }
103
104 rakrs_debug_buffers!(
105 true,
106 "RecvQueue: {}\n{:?}\n",
107 frame.body.len(),
108 frame.body.clone()
109 );
110
111 match frame.reliability {
112 Reliability::Unreliable => {
113 self.ready.push(frame.body.clone());
114 }
115 Reliability::Reliable => {
116 self.ready.push(frame.body.clone());
117 }
118 Reliability::ReliableOrd => {
119 let channel = frame.order_channel.unwrap();
120 let queue = self
121 .order_channels
122 .entry(channel)
123 .or_insert(OrderedQueue::new());
124
125 if queue.insert(frame.order_index.unwrap(), frame.body.clone()) {
126 for pk in queue.flush() {
127 self.ready.push(pk);
128 }
129 }
130 }
131 _ => {
132 self.ready.push(frame.body.clone());
133 }
134 }
135 }
136}
137
138impl Ackable for RecvQueue {
139 type NackItem = ();
140
141 fn ack(&mut self, ack: Ack) {
142 if ack.is_nack() {
143 rakrs_debug!(true, "Invalid ack: {:?}", ack.clone());
144 return;
145 }
146
147 rakrs_debug!(true, "Got ack item: {:?}", ack.clone());
148
149 for record in ack.records.iter() {
151 match record {
152 Record::Single(SingleRecord { sequence }) => {
153 self.nack.remove(&sequence);
154 }
155 Record::Range(ranged) => {
156 for i in ranged.start.0..ranged.end.0 {
157 self.nack.remove(&i);
158 }
159 }
160 }
161 }
162 }
163}