1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5#[cfg(feature = "async_std")]
6use async_std::net::UdpSocket;
7
8use binary_util::interfaces::Writer;
9#[cfg(feature = "async_tokio")]
10use tokio::net::UdpSocket;
11
12use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
13use crate::protocol::frame::{Frame, FramePacket};
14use crate::protocol::packet::RakPacket;
15use crate::protocol::reliability::Reliability;
16use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
17use crate::util::{to_address_token, SafeGenerator};
18use crate::{rakrs_debug, rakrs_debug_buffers};
19
20use super::{FragmentQueue, FragmentQueueError, NetQueue, RecoveryQueue};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
23pub enum SendQueueError {
24 PacketTooLarge,
26 ParseError,
28 FragmentError(FragmentQueueError),
30 SendError,
32}
33
34impl std::fmt::Display for SendQueueError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(
37 f,
38 "{}",
39 match self {
40 SendQueueError::PacketTooLarge => "Packet too large".to_string(),
41 SendQueueError::ParseError => "Parse error".to_string(),
42 SendQueueError::FragmentError(e) => format!("Fragment error: {}", e),
43 SendQueueError::SendError => "Send error".to_string(),
44 }
45 )
46 }
47}
48
49impl std::error::Error for SendQueueError {}
50
51#[derive(Debug, Clone)]
55pub struct SendQueue {
56 mtu_size: u16,
57
58 _timeout: u16,
61
62 _max_tries: u16,
65
66 send_seq: SafeGenerator<u32>,
70
71 reliable_seq: SafeGenerator<u32>,
74
75 ack: RecoveryQueue<FramePacket>,
77
78 fragment_queue: FragmentQueue,
80
81 order_channels: HashMap<u8, (u32, u32)>,
84
85 ready: Vec<Frame>,
86
87 socket: Arc<UdpSocket>,
88
89 address: SocketAddr,
90}
91
92impl SendQueue {
93 pub fn new(
94 mtu_size: u16,
95 _timeout: u16,
96 _max_tries: u16,
97 socket: Arc<UdpSocket>,
98 address: SocketAddr,
99 ) -> Self {
100 Self {
101 mtu_size,
102 _timeout,
103 _max_tries,
104 send_seq: SafeGenerator::new(),
105 reliable_seq: SafeGenerator::new(),
106 ack: RecoveryQueue::new(),
107 fragment_queue: FragmentQueue::new(),
108 order_channels: HashMap::new(),
109 ready: Vec::new(),
110 socket,
111 address,
112 }
113 }
114
115 pub async fn insert(
119 &mut self,
120 packet: &[u8],
121 reliability: Reliability,
122 immediate: bool,
123 channel: Option<u8>,
124 ) -> Result<(), SendQueueError> {
125 rakrs_debug!(
126 true,
127 "Inserting packet into send queue: {} bytes",
128 packet.len()
129 );
130 rakrs_debug!("Write is now processing packet");
131 let reliable = if packet.len() > (self.mtu_size + RAKNET_HEADER_FRAME_OVERHEAD) as usize {
132 Reliability::ReliableOrd
133 } else {
134 reliability
135 };
136
137 rakrs_debug!("Write is now processing packet: {:?}", reliable);
138
139 match reliability {
140 Reliability::Unreliable => {
141 let frame = Frame::new(Reliability::Unreliable, Some(packet));
143 self.send_frame(frame).await;
144 return Ok(());
145 }
146 Reliability::Reliable => {
147 let frame = Frame::new(Reliability::Reliable, Some(packet));
149 self.send_frame(frame).await;
150 return Ok(());
151 }
152 _ => {}
153 };
154
155 if packet.len() > (self.mtu_size + RAKNET_HEADER_FRAME_OVERHEAD) as usize {
158 let mut pk = FramePacket::new();
161 pk.sequence = self.send_seq.next();
162 pk.reliability = reliability;
163
164 rakrs_debug!("Write is now splitting, too large: {:?}", reliability);
165
166 let fragmented = self.fragment_queue.split_insert(&packet, self.mtu_size);
167
168 if fragmented.is_ok() {
169 let frag_id = fragmented.unwrap();
170 let (_, frames) = self.fragment_queue.get_mut(&frag_id).unwrap();
171 let (ord_seq, ord_index) = self
172 .order_channels
173 .entry(channel.unwrap_or(0))
174 .or_insert((0, 0));
175
176 for frame in frames.iter_mut() {
177 frame.reliability = reliability;
178 frame.sequence_index = Some(*ord_seq);
179 frame.order_channel = Some(channel.unwrap_or(0));
180 frame.order_index = Some(*ord_index);
181
182 if frame.reliability.is_reliable() {
183 frame.reliable_index = Some(self.reliable_seq.next());
184 }
185 }
186
187 *ord_index = ord_index.wrapping_add(1);
188 *ord_seq = ord_seq.wrapping_add(1);
189
190 if let Ok(p) = pk.write_to_bytes() {
192 rakrs_debug!("Write is sending stream: {:?}", reliability);
193
194 self.send_stream(p.as_slice()).await;
195 self.ack.insert_id(pk.sequence, pk);
196 return Ok(());
197 } else {
198 return Err(SendQueueError::SendError);
199 }
200 } else {
201 return Err(SendQueueError::FragmentError(fragmented.unwrap_err()));
203 }
204 } else {
205 let mut frame = Frame::new(reliable, Some(packet));
208
209 if frame.reliability.is_reliable() {
210 frame.reliable_index = Some(self.reliable_seq.next());
211 }
212
213 if frame.reliability.is_ordered() {
214 let (_, ord_index) = self
215 .order_channels
216 .entry(channel.unwrap_or(0))
217 .or_insert((0, 0));
218 frame.order_index = Some(*ord_index);
219 frame.sequence_index = Some(self.send_seq.get());
220 *ord_index = ord_index.wrapping_add(1);
221 } else if frame.reliability.is_sequenced() {
222 let (seq_index, ord_index) = self
223 .order_channels
224 .entry(channel.unwrap_or(0))
225 .or_insert((0, 0));
226 *seq_index = seq_index.wrapping_add(1);
227 frame.order_index = Some(*ord_index);
228 frame.sequence_index = Some(*seq_index);
229 }
230
231 if immediate {
232 self.send_frame(frame).await;
233 } else {
234 self.ready.push(frame);
235 }
236
237 return Ok(());
238 }
239 }
240
241 async fn send_frame(&mut self, mut frame: Frame) {
244 let mut pk = FramePacket::new();
245 pk.sequence = self.send_seq.next();
246 pk.reliability = frame.reliability;
247
248 if pk.reliability.is_reliable() {
249 frame.reliable_index = Some(self.reliable_seq.next());
250 }
251
252 pk.frames.push(frame);
253
254 if pk.reliability.is_reliable() {
255 self.ack.insert_id(self.reliable_seq.get(), pk.clone());
257 }
258
259 if let Ok(buf) = pk.write_to_bytes() {
260 rakrs_debug!("[!] Write sent the packet.. {:?}", buf.as_slice());
261 self.send_stream(buf.as_slice()).await;
262 } else {
263 rakrs_debug_buffers!(true, "SendQ: Failed to send frame: {:?}", pk);
264 }
265 }
266
267 pub(crate) async fn send_stream(&mut self, packet: &[u8]) {
268 rakrs_debug_buffers!(false, "SendQ: {}\n{:?}\n", packet.len(), packet);
269
270 if let Err(e) = self.socket.send_to(packet, &self.address).await {
271 rakrs_debug!(
273 true,
274 "[{}] Failed to send packet! {:?}",
275 to_address_token(self.address),
276 e
277 );
278 }
279 }
280
281 pub async fn send_packet(
282 &mut self,
283 packet: RakPacket,
284 reliability: Reliability,
285 immediate: bool,
286 ) -> Result<(), SendQueueError> {
287 if let Ok(buf) = packet.write_to_bytes() {
289 if let Err(e) = self
290 .insert(buf.as_slice(), reliability, immediate, None)
291 .await
292 {
293 rakrs_debug!(
294 true,
295 "[{}] Failed to insert packet into send queue: {:?}",
296 to_address_token(self.address),
297 e
298 );
299 return Err(e);
300 }
301 return Ok(());
302 } else {
303 return Err(SendQueueError::ParseError);
304 }
305 }
306
307 pub async fn update(&mut self) {
308 for frame in self.ready.drain(..).collect::<Vec<Frame>>() {
312 self.send_frame(frame).await;
313 }
314
315 let resend_queue = self.ack.flush().unwrap();
319 for packet in resend_queue.iter() {
328 if let Ok(buf) = packet.write_to_bytes() {
329 self.send_stream(buf.as_slice()).await;
330 }
331 }
332 }
333}
334
335impl Ackable for SendQueue {
336 type NackItem = FramePacket;
337
338 fn ack(&mut self, ack: Ack) {
339 if ack.is_nack() {
340 return;
341 }
342
343 for record in ack.records.iter() {
345 match record {
346 Record::Single(SingleRecord { sequence }) => {
347 if let Ok(_) = self.ack.remove(sequence.0) {};
348 }
349 Record::Range(ranged) => {
350 for i in ranged.start.0..ranged.end.0 {
351 if let Ok(_) = self.ack.remove(i) {};
352 }
353 }
354 }
355 }
356 }
357
358 fn nack(&mut self, nack: Ack) -> Vec<FramePacket> {
359 if !nack.is_nack() {
360 return Vec::new();
361 }
362
363 let mut resend_queue = Vec::<FramePacket>::new();
364
365 for record in nack.records.iter() {
367 match record {
368 Record::Single(single) => {
369 if let Ok(packet) = self.ack.get(single.sequence.0) {
370 resend_queue.push(packet.clone());
371 }
372 }
373 Record::Range(ranged) => {
374 for i in ranged.start.0..ranged.end.0 {
375 if let Ok(packet) = self.ack.get(i) {
376 resend_queue.push(packet.clone());
377 }
378 }
379 }
380 }
381 }
382
383 return resend_queue;
384 }
385}