nautilus_sockets/socket/
mod.rs

1pub mod events;
2
3use std::{
4    collections::VecDeque,
5    marker::PhantomData,
6    net::{SocketAddr, ToSocketAddrs, UdpSocket},
7    str::FromStr,
8    sync::{Arc, RwLock},
9    time::Instant,
10};
11
12use anyhow::anyhow;
13use byteorder::{ByteOrder, LittleEndian};
14use events::SocketEvent;
15
16use crate::{
17    acknowledgement::{
18        manager::AcknowledgementManager,
19        packet::{AckNumber, AckPacket},
20    },
21    events::{EventCallbackArgs, EventEmitter},
22    packet::{IntoPacketDelivery, PacketDelivery},
23    persistent::{storage::PersistentStorage, Persistent},
24    plugins::SocketPlugin,
25    sequence::SequenceNumber,
26};
27
28pub type ReceivedPacket = (SocketAddr, Vec<u8>);
29
30pub struct NautSocket<'socket, S>
31where
32    S: SocketType<'socket>,
33{
34    pub(crate) socket: UdpSocket,
35    pub(crate) packet_queue: VecDeque<ReceivedPacket>,
36    pub(crate) inner: S,
37
38    pub(crate) event_emitter: EventEmitter<'socket, S>,
39    pub(crate) ack_manager: AcknowledgementManager,
40    pub(crate) phantom: PhantomData<&'socket S>,
41
42    pub(crate) socket_events: Vec<SocketEvent>,
43
44    pub(crate) persistent: PersistentStorage,
45}
46
47impl<'socket, S> NautSocket<'socket, S>
48where
49    S: SocketType<'socket>,
50{
51    /// The offset in the packet of the delivery type
52    pub const DELIVERY_TYPE_OFFSET: usize = 0;
53    /// The amount of space in each packet for the delivery type
54    pub const DELIVERY_TYPE_BUF: usize = 2;
55
56    /// The offset in the packet of the sequence number if its got a sequenced delivery type
57    pub const SEQ_NUM_OFFSET: usize = 2;
58    /// The amount of space in each packet for the sequence order if its a sequenced packet
59    pub const SEQ_NUM_BUF: usize = 4;
60
61    /// The offset in the packet of the acknowldegement number
62    pub const ACK_NUM_OFFSET: usize = 6;
63    /// The amount of space in each packet for the ack number
64    pub const ACK_NUM_BUF: usize = 4;
65
66    /// The offset in the packet for the length of the event title
67    pub const EVENT_LEN_OFFSET: usize = 10;
68    /// The amount of space in each packet for the length of the event title
69    pub const EVENT_LEN_BUF: usize = 4;
70
71    pub const PACKET_PADDING: usize =
72        Self::DELIVERY_TYPE_BUF + Self::SEQ_NUM_BUF + Self::ACK_NUM_BUF + Self::EVENT_LEN_BUF;
73
74    /// Reference to the [raw socket](Self::socket)
75    pub fn socket(&self) -> &UdpSocket {
76        &self.socket
77    }
78
79    /// Mutable reference to the [raw socket](Self::socket)
80    pub fn socket_mut(&mut self) -> &mut UdpSocket {
81        &mut self.socket
82    }
83
84    /// Polls the received packets and pushes them to the [packet queue](Self::packet_queue)
85    pub fn poll(&mut self) {
86        let mut buf = vec![0; 1024];
87        while let Ok((size, addr)) = self.socket.recv_from(&mut buf) {
88            let buf = buf[0..size].to_vec();
89            self.packet_queue.push_back((addr, buf));
90        }
91    }
92
93    /// Retries a packet after [retry time](AcknowledgementManager::ack_retry_time)
94    pub(crate) fn retry_ack_packets(&self) {
95        for AckPacket {
96            bytes,
97            time_created,
98            target,
99        } in self.ack_manager.packets_waiting_on_ack.values()
100        {
101            if Instant::now().duration_since(*time_created) < self.ack_manager.ack_retry_time {
102                continue;
103            }
104
105            let _ = self.socket.send_to(bytes, target);
106        }
107    }
108
109    /// Pops a packet from the front of the [packet queue](Self::packet_queue)
110    pub(crate) fn oldest_packet_in_queue(&mut self) -> Option<ReceivedPacket> {
111        self.packet_queue.pop_front()
112    }
113
114    /// Gets the delivery type of the packet
115    pub(crate) fn get_delivery_type_from_packet(buf: &[u8]) -> Option<u16> {
116        if Self::DELIVERY_TYPE_OFFSET == buf.len()
117            || Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF > buf.len()
118        {
119            return None;
120        }
121
122        Some(LittleEndian::read_u16(
123            &buf[Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
124        ))
125    }
126
127    /// Gets the acknowledgement number from the packet
128    pub(crate) fn get_ack_num_from_packet(buf: &[u8]) -> Option<u32> {
129        if Self::ACK_NUM_OFFSET > buf.len() || Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF > buf.len()
130        {
131            return None;
132        }
133
134        Some(LittleEndian::read_u32(
135            &buf[Self::ACK_NUM_OFFSET..Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF],
136        ))
137    }
138
139    /// Gets the sequence number from the packet
140    pub(crate) fn get_seq_from_packet(buf: &[u8]) -> Option<SequenceNumber> {
141        if Self::SEQ_NUM_OFFSET > buf.len() || Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF > buf.len()
142        {
143            return None;
144        }
145
146        Some(SequenceNumber::new(LittleEndian::read_u32(
147            &buf[Self::SEQ_NUM_OFFSET..Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF],
148        )))
149    }
150
151    /// Get the event title from the packet
152    pub(crate) fn get_event_from_packet(buf: &[u8]) -> anyhow::Result<String> {
153        let length = LittleEndian::read_u32(
154            &buf[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
155        ) as usize;
156
157        let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
158
159        if event_offset > buf.len() || event_offset + length > buf.len() {
160            return Err(anyhow!("Packet not large enough for event"));
161        }
162
163        Ok(String::from_utf8(
164            buf[event_offset..event_offset + length].to_vec(),
165        )?)
166    }
167
168    /// Gets the remaining packet bytes
169    pub(crate) fn get_packet_bytes(buf: &[u8]) -> Option<Vec<u8>> {
170        let length = LittleEndian::read_u32(
171            &buf[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
172        ) as usize;
173        let pad = (4 - (length % 4)) % 4;
174
175        let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
176        let bytes_offset = event_offset + length + pad;
177
178        if bytes_offset > buf.len() {
179            return None;
180        }
181
182        Some(buf[bytes_offset..].to_vec())
183    }
184
185    /// Sends a packet to a [socket address](SocketAddr) and inserts the [packet delivery type](PacketDelivery), [AckNumber] and [SequenceNumber] and the
186    /// remaining bytes sent for the actual event
187    pub(crate) fn send_by_addr<A>(
188        &mut self,
189        event: &str,
190        buf: &[u8],
191        delivery: PacketDelivery,
192        addr: A,
193    ) -> anyhow::Result<()>
194    where
195        A: ToSocketAddrs + Into<String> + Clone,
196    {
197        // Stays consistent with memory layout
198        let pad = (4 - (event.len() % 4)) % 4;
199        let padded_event_len = event.len() + pad;
200        let total_len = Self::PACKET_PADDING + padded_event_len + buf.len();
201        let delivery_type = delivery.packet_delivery_as()?;
202
203        let mut packet = vec![0; total_len];
204        // Inserts the packet delivery type into the packet
205        LittleEndian::write_u16(
206            &mut packet
207                [Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
208            delivery_type,
209        );
210
211        if delivery.is_sequenced() {
212            let addr = Into::<String>::into(addr.clone());
213            let addr = SocketAddr::from_str(&addr).unwrap();
214            let seq_num = self
215                .inner
216                .update_current_send_seq_num_for_event(&addr, event);
217
218            if let Some(seq_num) = seq_num {
219                LittleEndian::write_u32(
220                    &mut packet[Self::SEQ_NUM_OFFSET..Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF],
221                    seq_num.raw(),
222                );
223            }
224        }
225
226        // If its a reliable packet, we must assign it an acknowledgement number so the receiver
227        // can return a packet letting the sender know we got the packet
228        let ack_number = if delivery.is_reliable() {
229            self.ack_manager.get_new_ack_num()
230        } else {
231            AckNumber::new(0)
232        };
233
234        // If its an ack packet then we insert it
235        LittleEndian::write_u32(
236            &mut packet[Self::ACK_NUM_OFFSET..Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF],
237            ack_number.raw(),
238        );
239
240        // Inserts the length of the event string into the packet
241        LittleEndian::write_u32(
242            &mut packet[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
243            event.len() as u32,
244        );
245
246        let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
247        // Copies the event into the packet
248        packet[event_offset..event_offset + event.len()].copy_from_slice(event.as_bytes());
249
250        // Inserts the byte buf into the packet buf
251        let bytes_offset = event_offset + padded_event_len;
252        packet[bytes_offset..].copy_from_slice(buf);
253
254        // Store complete packet in ack waiting list
255        if ack_number.raw() > 0 {
256            self.ack_manager.insert_packet_into_ack_waiting_list(
257                ack_number,
258                packet.to_vec(),
259                addr.clone(),
260            );
261        }
262
263        self.socket.send_to(&packet, addr)?;
264
265        Ok(())
266    }
267
268    /// Run a function as a callback when a certain event is sent
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// // When the client recieves a "hello" event it will print the bytes received
274    /// client.on("hello", |_client, (_addr, packet)| {
275    ///     println!("hello bytes {:?}", packet);
276    /// });
277    /// ```
278    pub fn on<F>(&mut self, event: &str, cb: F)
279    where
280        F: Fn(&mut NautSocket<S>, EventCallbackArgs) + Send + Sync + 'static,
281    {
282        self.event_emitter.register_event(event, cb);
283    }
284
285    /// Run a function as a callback everytime the socket is polled
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// // When the server is polled it will print "Do some stuff"]
291    /// // Can be used to read server events, etc.
292    /// server.on_poll(|_server| {
293    ///     println!("Do some stufff");
294    /// });
295    /// ```
296    pub fn on_poll<F>(&mut self, cb: F)
297    where
298        F: Fn(&mut NautSocket<S>) + Send + Sync + 'static,
299    {
300        self.event_emitter.register_poll_event(cb);
301    }
302
303    /// Sends an [acknowledgement packet](AckPacket) to the [address](SocketAddr)
304    pub(crate) fn send_ack_packet<A>(&self, addr: A, packet: &[u8]) -> anyhow::Result<()>
305    where
306        A: ToSocketAddrs,
307    {
308        let mut buf = vec![0; 6];
309
310        // Write that its a ack response to the packet
311        LittleEndian::write_u16(
312            &mut buf
313                [Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
314            PacketDelivery::ack_delivery().packet_delivery_as()?,
315        );
316
317        // Get the ack num from the original packet
318        let Some(ack_num) = Self::get_ack_num_from_packet(packet) else {
319            return Err(anyhow!("No ack num in packet"));
320        };
321
322        // Write ack num into ack delivery packet
323        LittleEndian::write_u32(&mut buf[2..6], ack_num);
324
325        self.socket.send_to(&buf, addr)?;
326
327        Ok(())
328    }
329
330    /// Registers a [plugin](crate::plugins::SocketPlugin)
331    pub fn register_plugin<'plugin>(&'plugin mut self, plugin: impl SocketPlugin<'socket, S>) {
332        plugin.register(self);
333    }
334
335    pub fn get_persistent<P>(&self) -> Option<Arc<RwLock<P>>>
336    where
337        P: Persistent,
338    {
339        self.persistent.get_persistent()
340    }
341
342    pub fn insert_persistent<P>(&mut self, persistent: P)
343    where
344        P: Persistent,
345    {
346        self.persistent.insert_persistent(persistent);
347    }
348
349    pub fn init_persistent<P>(&mut self)
350    where
351        P: Persistent + Default,
352    {
353        self.persistent.init_persistent::<P>();
354    }
355}
356
357/// Used for the [nautlis socket](NautSocket)
358/// A socket must have a way to handle sequenced packets for clients. Otherwise, it will manage
359/// everything the same way
360pub trait SocketType<'socket>: Default {
361    /// Updates the current [sequence number](SequenceNumber) for that specific event and should be
362    /// handled on a per client basis
363    fn update_current_send_seq_num_for_event(
364        &mut self,
365        addr: &SocketAddr,
366        event: &str,
367    ) -> Option<SequenceNumber>;
368
369    /// Returns a mutable reference to the last received sequence number, it should be changed to
370    /// the newest sequence number if the last received sequence number is lower
371    fn last_recv_seq_num_for_event(
372        &'socket mut self,
373        addr: &SocketAddr,
374        event: &str,
375    ) -> Option<&'socket mut SequenceNumber>;
376}