rak_rs/connection/queue/
send.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5#[cfg(feature = "async_std")]
6use async_std::net::UdpSocket;
7
8use binary_util::interfaces::Writer;
9#[cfg(feature = "async_tokio")]
10use tokio::net::UdpSocket;
11
12use crate::protocol::ack::{Ack, Ackable, Record, SingleRecord};
13use crate::protocol::frame::{Frame, FramePacket};
14use crate::protocol::packet::RakPacket;
15use crate::protocol::reliability::Reliability;
16use crate::protocol::RAKNET_HEADER_FRAME_OVERHEAD;
17use crate::util::{to_address_token, SafeGenerator};
18use crate::{rakrs_debug, rakrs_debug_buffers};
19
20use super::{FragmentQueue, FragmentQueueError, NetQueue, RecoveryQueue};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
23pub enum SendQueueError {
24    /// The packet is too large to be sent.
25    PacketTooLarge,
26    /// Parsing Error
27    ParseError,
28    /// Fragmentation error
29    FragmentError(FragmentQueueError),
30    /// Send queue error
31    SendError,
32}
33
34impl std::fmt::Display for SendQueueError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(
37            f,
38            "{}",
39            match self {
40                SendQueueError::PacketTooLarge => "Packet too large".to_string(),
41                SendQueueError::ParseError => "Parse error".to_string(),
42                SendQueueError::FragmentError(e) => format!("Fragment error: {}", e),
43                SendQueueError::SendError => "Send error".to_string(),
44            }
45        )
46    }
47}
48
49impl std::error::Error for SendQueueError {}
50
51/// This queue is used to prioritize packets being sent out
52/// Packets that are old, are either dropped or requested again.
53/// You can define this behavior with the `timeout` property.
54#[derive(Debug, Clone)]
55pub struct SendQueue {
56    mtu_size: u16,
57
58    /// The amount of time that needs to pass for a packet to be
59    /// dropped or requested again.
60    _timeout: u16,
61
62    /// The amount of times we should retry sending a packet before
63    /// dropping it from the queue. This is currently set to `5`.
64    _max_tries: u16,
65
66    /// The current sequence number. This is incremented every time
67    /// a packet is sent reliably. We can resend these if they are
68    /// NAcked.
69    send_seq: SafeGenerator<u32>,
70
71    /// The current reliable index number.
72    /// a packet is sent reliably an sequenced.
73    reliable_seq: SafeGenerator<u32>,
74
75    /// The current recovery queue.
76    ack: RecoveryQueue<FramePacket>,
77
78    /// The fragment queue.
79    fragment_queue: FragmentQueue,
80
81    /// The ordered channels.
82    /// (send_seq, reliable_seq)
83    order_channels: HashMap<u8, (u32, u32)>,
84
85    ready: Vec<Frame>,
86
87    socket: Arc<UdpSocket>,
88
89    address: SocketAddr,
90}
91
92impl SendQueue {
93    pub fn new(
94        mtu_size: u16,
95        _timeout: u16,
96        _max_tries: u16,
97        socket: Arc<UdpSocket>,
98        address: SocketAddr,
99    ) -> Self {
100        Self {
101            mtu_size,
102            _timeout,
103            _max_tries,
104            send_seq: SafeGenerator::new(),
105            reliable_seq: SafeGenerator::new(),
106            ack: RecoveryQueue::new(),
107            fragment_queue: FragmentQueue::new(),
108            order_channels: HashMap::new(),
109            ready: Vec::new(),
110            socket,
111            address,
112        }
113    }
114
115    /// Send a packet based on its reliability.
116    /// Note, reliability will be set to `Reliability::ReliableOrd` if
117    /// the buffer is larger than max MTU.
118    pub async fn insert(
119        &mut self,
120        packet: &[u8],
121        reliability: Reliability,
122        immediate: bool,
123        channel: Option<u8>,
124    ) -> Result<(), SendQueueError> {
125        rakrs_debug!(
126            true,
127            "Inserting packet into send queue: {} bytes",
128            packet.len()
129        );
130        rakrs_debug!("Write is now processing packet");
131        let reliable = if packet.len() > (self.mtu_size + RAKNET_HEADER_FRAME_OVERHEAD) as usize {
132            Reliability::ReliableOrd
133        } else {
134            reliability
135        };
136
137        rakrs_debug!("Write is now processing packet: {:?}", reliable);
138
139        match reliability {
140            Reliability::Unreliable => {
141                // we can just send this packet out immediately.
142                let frame = Frame::new(Reliability::Unreliable, Some(packet));
143                self.send_frame(frame).await;
144                return Ok(());
145            }
146            Reliability::Reliable => {
147                // we need to send this packet out reliably.
148                let frame = Frame::new(Reliability::Reliable, Some(packet));
149                self.send_frame(frame).await;
150                return Ok(());
151            }
152            _ => {}
153        };
154
155        // do another integrity check
156        // this is to check to see if we really need to split this packet.
157        if packet.len() > (self.mtu_size + RAKNET_HEADER_FRAME_OVERHEAD) as usize {
158            // we need to split this packet!
159            // pass the buffer to the fragment queue.
160            let mut pk = FramePacket::new();
161            pk.sequence = self.send_seq.next();
162            pk.reliability = reliability;
163
164            rakrs_debug!("Write is now splitting, too large: {:?}", reliability);
165
166            let fragmented = self.fragment_queue.split_insert(&packet, self.mtu_size);
167
168            if fragmented.is_ok() {
169                let frag_id = fragmented.unwrap();
170                let (_, frames) = self.fragment_queue.get_mut(&frag_id).unwrap();
171                let (ord_seq, ord_index) = self
172                    .order_channels
173                    .entry(channel.unwrap_or(0))
174                    .or_insert((0, 0));
175
176                for frame in frames.iter_mut() {
177                    frame.reliability = reliability;
178                    frame.sequence_index = Some(*ord_seq);
179                    frame.order_channel = Some(channel.unwrap_or(0));
180                    frame.order_index = Some(*ord_index);
181
182                    if frame.reliability.is_reliable() {
183                        frame.reliable_index = Some(self.reliable_seq.next());
184                    }
185                }
186
187                *ord_index = ord_index.wrapping_add(1);
188                *ord_seq = ord_seq.wrapping_add(1);
189
190                // Add this frame packet to the recovery queue.
191                if let Ok(p) = pk.write_to_bytes() {
192                    rakrs_debug!("Write is sending stream: {:?}", reliability);
193
194                    self.send_stream(p.as_slice()).await;
195                    self.ack.insert_id(pk.sequence, pk);
196                    return Ok(());
197                } else {
198                    return Err(SendQueueError::SendError);
199                }
200            } else {
201                // we couldn't send this frame!
202                return Err(SendQueueError::FragmentError(fragmented.unwrap_err()));
203            }
204        } else {
205            // we're not gonna send this frame out yet!
206            // we need to wait for the next tick.
207            let mut frame = Frame::new(reliable, Some(packet));
208
209            if frame.reliability.is_reliable() {
210                frame.reliable_index = Some(self.reliable_seq.next());
211            }
212
213            if frame.reliability.is_ordered() {
214                let (_, ord_index) = self
215                    .order_channels
216                    .entry(channel.unwrap_or(0))
217                    .or_insert((0, 0));
218                frame.order_index = Some(*ord_index);
219                frame.sequence_index = Some(self.send_seq.get());
220                *ord_index = ord_index.wrapping_add(1);
221            } else if frame.reliability.is_sequenced() {
222                let (seq_index, ord_index) = self
223                    .order_channels
224                    .entry(channel.unwrap_or(0))
225                    .or_insert((0, 0));
226                *seq_index = seq_index.wrapping_add(1);
227                frame.order_index = Some(*ord_index);
228                frame.sequence_index = Some(*seq_index);
229            }
230
231            if immediate {
232                self.send_frame(frame).await;
233            } else {
234                self.ready.push(frame);
235            }
236
237            return Ok(());
238        }
239    }
240
241    /// A wrapper to send a single frame over the wire.
242    /// While also reliabily tracking it.
243    async fn send_frame(&mut self, mut frame: Frame) {
244        let mut pk = FramePacket::new();
245        pk.sequence = self.send_seq.next();
246        pk.reliability = frame.reliability;
247
248        if pk.reliability.is_reliable() {
249            frame.reliable_index = Some(self.reliable_seq.next());
250        }
251
252        pk.frames.push(frame);
253
254        if pk.reliability.is_reliable() {
255            // this seems redundant, but we need to insert the packet into the ACK queue
256            self.ack.insert_id(self.reliable_seq.get(), pk.clone());
257        }
258
259        if let Ok(buf) = pk.write_to_bytes() {
260            rakrs_debug!("[!] Write sent the packet.. {:?}", buf.as_slice());
261            self.send_stream(buf.as_slice()).await;
262        } else {
263            rakrs_debug_buffers!(true, "SendQ: Failed to send frame: {:?}", pk);
264        }
265    }
266
267    pub(crate) async fn send_stream(&mut self, packet: &[u8]) {
268        rakrs_debug_buffers!(false, "SendQ: {}\n{:?}\n", packet.len(), packet);
269
270        if let Err(e) = self.socket.send_to(packet, &self.address).await {
271            // we couldn't sent the packet!
272            rakrs_debug!(
273                true,
274                "[{}] Failed to send packet! {:?}",
275                to_address_token(self.address),
276                e
277            );
278        }
279    }
280
281    pub async fn send_packet(
282        &mut self,
283        packet: RakPacket,
284        reliability: Reliability,
285        immediate: bool,
286    ) -> Result<(), SendQueueError> {
287        // parse the packet
288        if let Ok(buf) = packet.write_to_bytes() {
289            if let Err(e) = self
290                .insert(buf.as_slice(), reliability, immediate, None)
291                .await
292            {
293                rakrs_debug!(
294                    true,
295                    "[{}] Failed to insert packet into send queue: {:?}",
296                    to_address_token(self.address),
297                    e
298                );
299                return Err(e);
300            }
301            return Ok(());
302        } else {
303            return Err(SendQueueError::ParseError);
304        }
305    }
306
307    pub async fn update(&mut self) {
308        // send all the ready packets
309        // TODO batch these packets together
310        // TODO by lengths
311        for frame in self.ready.drain(..).collect::<Vec<Frame>>() {
312            self.send_frame(frame).await;
313        }
314
315        // Flush ACK
316        // check to see if we need to resend any packets.
317        // TODO actually implement this
318        let resend_queue = self.ack.flush().unwrap();
319        // let mut resend_queue = Vec::<FramePacket>::new();
320
321        // for (seq, packet) in self.ack.get_all() {
322        //     if packet.resend_time < Instant::now() {
323        //         resend_queue.push(packet.clone());
324        //     }
325        // }
326
327        for packet in resend_queue.iter() {
328            if let Ok(buf) = packet.write_to_bytes() {
329                self.send_stream(buf.as_slice()).await;
330            }
331        }
332    }
333}
334
335impl Ackable for SendQueue {
336    type NackItem = FramePacket;
337
338    fn ack(&mut self, ack: Ack) {
339        if ack.is_nack() {
340            return;
341        }
342
343        // these packets are acknowledged, so we can remove them from the queue.
344        for record in ack.records.iter() {
345            match record {
346                Record::Single(SingleRecord { sequence }) => {
347                    if let Ok(_) = self.ack.remove(sequence.0) {};
348                }
349                Record::Range(ranged) => {
350                    for i in ranged.start.0..ranged.end.0 {
351                        if let Ok(_) = self.ack.remove(i) {};
352                    }
353                }
354            }
355        }
356    }
357
358    fn nack(&mut self, nack: Ack) -> Vec<FramePacket> {
359        if !nack.is_nack() {
360            return Vec::new();
361        }
362
363        let mut resend_queue = Vec::<FramePacket>::new();
364
365        // we need to get the packets to resend.
366        for record in nack.records.iter() {
367            match record {
368                Record::Single(single) => {
369                    if let Ok(packet) = self.ack.get(single.sequence.0) {
370                        resend_queue.push(packet.clone());
371                    }
372                }
373                Record::Range(ranged) => {
374                    for i in ranged.start.0..ranged.end.0 {
375                        if let Ok(packet) = self.ack.get(i) {
376                            resend_queue.push(packet.clone());
377                        }
378                    }
379                }
380            }
381        }
382
383        return resend_queue;
384    }
385}