1pub mod events;
2
3use std::{
4 collections::VecDeque,
5 marker::PhantomData,
6 net::{SocketAddr, ToSocketAddrs, UdpSocket},
7 str::FromStr,
8 sync::{Arc, RwLock},
9 time::Instant,
10};
11
12use anyhow::anyhow;
13use byteorder::{ByteOrder, LittleEndian};
14use events::SocketEvent;
15
16use crate::{
17 acknowledgement::{
18 manager::AcknowledgementManager,
19 packet::{AckNumber, AckPacket},
20 },
21 events::{EventCallbackArgs, EventEmitter},
22 packet::{IntoPacketDelivery, PacketDelivery},
23 persistent::{storage::PersistentStorage, Persistent},
24 plugins::SocketPlugin,
25 sequence::SequenceNumber,
26};
27
28pub type ReceivedPacket = (SocketAddr, Vec<u8>);
29
30pub struct NautSocket<'socket, S>
31where
32 S: SocketType<'socket>,
33{
34 pub(crate) socket: UdpSocket,
35 pub(crate) packet_queue: VecDeque<ReceivedPacket>,
36 pub(crate) inner: S,
37
38 pub(crate) event_emitter: EventEmitter<'socket, S>,
39 pub(crate) ack_manager: AcknowledgementManager,
40 pub(crate) phantom: PhantomData<&'socket S>,
41
42 pub(crate) socket_events: Vec<SocketEvent>,
43
44 pub(crate) persistent: PersistentStorage,
45}
46
47impl<'socket, S> NautSocket<'socket, S>
48where
49 S: SocketType<'socket>,
50{
51 pub const DELIVERY_TYPE_OFFSET: usize = 0;
53 pub const DELIVERY_TYPE_BUF: usize = 2;
55
56 pub const SEQ_NUM_OFFSET: usize = 2;
58 pub const SEQ_NUM_BUF: usize = 4;
60
61 pub const ACK_NUM_OFFSET: usize = 6;
63 pub const ACK_NUM_BUF: usize = 4;
65
66 pub const EVENT_LEN_OFFSET: usize = 10;
68 pub const EVENT_LEN_BUF: usize = 4;
70
71 pub const PACKET_PADDING: usize =
72 Self::DELIVERY_TYPE_BUF + Self::SEQ_NUM_BUF + Self::ACK_NUM_BUF + Self::EVENT_LEN_BUF;
73
74 pub fn socket(&self) -> &UdpSocket {
76 &self.socket
77 }
78
79 pub fn socket_mut(&mut self) -> &mut UdpSocket {
81 &mut self.socket
82 }
83
84 pub fn poll(&mut self) {
86 let mut buf = vec![0; 1024];
87 while let Ok((size, addr)) = self.socket.recv_from(&mut buf) {
88 let buf = buf[0..size].to_vec();
89 self.packet_queue.push_back((addr, buf));
90 }
91 }
92
93 pub(crate) fn retry_ack_packets(&self) {
95 for AckPacket {
96 bytes,
97 time_created,
98 target,
99 } in self.ack_manager.packets_waiting_on_ack.values()
100 {
101 if Instant::now().duration_since(*time_created) < self.ack_manager.ack_retry_time {
102 continue;
103 }
104
105 let _ = self.socket.send_to(bytes, target);
106 }
107 }
108
109 pub(crate) fn oldest_packet_in_queue(&mut self) -> Option<ReceivedPacket> {
111 self.packet_queue.pop_front()
112 }
113
114 pub(crate) fn get_delivery_type_from_packet(buf: &[u8]) -> Option<u16> {
116 if Self::DELIVERY_TYPE_OFFSET == buf.len()
117 || Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF > buf.len()
118 {
119 return None;
120 }
121
122 Some(LittleEndian::read_u16(
123 &buf[Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
124 ))
125 }
126
127 pub(crate) fn get_ack_num_from_packet(buf: &[u8]) -> Option<u32> {
129 if Self::ACK_NUM_OFFSET > buf.len() || Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF > buf.len()
130 {
131 return None;
132 }
133
134 Some(LittleEndian::read_u32(
135 &buf[Self::ACK_NUM_OFFSET..Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF],
136 ))
137 }
138
139 pub(crate) fn get_seq_from_packet(buf: &[u8]) -> Option<SequenceNumber> {
141 if Self::SEQ_NUM_OFFSET > buf.len() || Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF > buf.len()
142 {
143 return None;
144 }
145
146 Some(SequenceNumber::new(LittleEndian::read_u32(
147 &buf[Self::SEQ_NUM_OFFSET..Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF],
148 )))
149 }
150
151 pub(crate) fn get_event_from_packet(buf: &[u8]) -> anyhow::Result<String> {
153 let length = LittleEndian::read_u32(
154 &buf[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
155 ) as usize;
156
157 let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
158
159 if event_offset > buf.len() || event_offset + length > buf.len() {
160 return Err(anyhow!("Packet not large enough for event"));
161 }
162
163 Ok(String::from_utf8(
164 buf[event_offset..event_offset + length].to_vec(),
165 )?)
166 }
167
168 pub(crate) fn get_packet_bytes(buf: &[u8]) -> Option<Vec<u8>> {
170 let length = LittleEndian::read_u32(
171 &buf[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
172 ) as usize;
173 let pad = (4 - (length % 4)) % 4;
174
175 let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
176 let bytes_offset = event_offset + length + pad;
177
178 if bytes_offset > buf.len() {
179 return None;
180 }
181
182 Some(buf[bytes_offset..].to_vec())
183 }
184
185 pub(crate) fn send_by_addr<A>(
188 &mut self,
189 event: &str,
190 buf: &[u8],
191 delivery: PacketDelivery,
192 addr: A,
193 ) -> anyhow::Result<()>
194 where
195 A: ToSocketAddrs + Into<String> + Clone,
196 {
197 let pad = (4 - (event.len() % 4)) % 4;
199 let padded_event_len = event.len() + pad;
200 let total_len = Self::PACKET_PADDING + padded_event_len + buf.len();
201 let delivery_type = delivery.packet_delivery_as()?;
202
203 let mut packet = vec![0; total_len];
204 LittleEndian::write_u16(
206 &mut packet
207 [Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
208 delivery_type,
209 );
210
211 if delivery.is_sequenced() {
212 let addr = Into::<String>::into(addr.clone());
213 let addr = SocketAddr::from_str(&addr).unwrap();
214 let seq_num = self
215 .inner
216 .update_current_send_seq_num_for_event(&addr, event);
217
218 if let Some(seq_num) = seq_num {
219 LittleEndian::write_u32(
220 &mut packet[Self::SEQ_NUM_OFFSET..Self::SEQ_NUM_OFFSET + Self::SEQ_NUM_BUF],
221 seq_num.raw(),
222 );
223 }
224 }
225
226 let ack_number = if delivery.is_reliable() {
229 self.ack_manager.get_new_ack_num()
230 } else {
231 AckNumber::new(0)
232 };
233
234 LittleEndian::write_u32(
236 &mut packet[Self::ACK_NUM_OFFSET..Self::ACK_NUM_OFFSET + Self::ACK_NUM_BUF],
237 ack_number.raw(),
238 );
239
240 LittleEndian::write_u32(
242 &mut packet[Self::EVENT_LEN_OFFSET..Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF],
243 event.len() as u32,
244 );
245
246 let event_offset = Self::EVENT_LEN_OFFSET + Self::EVENT_LEN_BUF;
247 packet[event_offset..event_offset + event.len()].copy_from_slice(event.as_bytes());
249
250 let bytes_offset = event_offset + padded_event_len;
252 packet[bytes_offset..].copy_from_slice(buf);
253
254 if ack_number.raw() > 0 {
256 self.ack_manager.insert_packet_into_ack_waiting_list(
257 ack_number,
258 packet.to_vec(),
259 addr.clone(),
260 );
261 }
262
263 self.socket.send_to(&packet, addr)?;
264
265 Ok(())
266 }
267
268 pub fn on<F>(&mut self, event: &str, cb: F)
279 where
280 F: Fn(&mut NautSocket<S>, EventCallbackArgs) + Send + Sync + 'static,
281 {
282 self.event_emitter.register_event(event, cb);
283 }
284
285 pub fn on_poll<F>(&mut self, cb: F)
297 where
298 F: Fn(&mut NautSocket<S>) + Send + Sync + 'static,
299 {
300 self.event_emitter.register_poll_event(cb);
301 }
302
303 pub(crate) fn send_ack_packet<A>(&self, addr: A, packet: &[u8]) -> anyhow::Result<()>
305 where
306 A: ToSocketAddrs,
307 {
308 let mut buf = vec![0; 6];
309
310 LittleEndian::write_u16(
312 &mut buf
313 [Self::DELIVERY_TYPE_OFFSET..Self::DELIVERY_TYPE_OFFSET + Self::DELIVERY_TYPE_BUF],
314 PacketDelivery::ack_delivery().packet_delivery_as()?,
315 );
316
317 let Some(ack_num) = Self::get_ack_num_from_packet(packet) else {
319 return Err(anyhow!("No ack num in packet"));
320 };
321
322 LittleEndian::write_u32(&mut buf[2..6], ack_num);
324
325 self.socket.send_to(&buf, addr)?;
326
327 Ok(())
328 }
329
330 pub fn register_plugin<'plugin>(&'plugin mut self, plugin: impl SocketPlugin<'socket, S>) {
332 plugin.register(self);
333 }
334
335 pub fn get_persistent<P>(&self) -> Option<Arc<RwLock<P>>>
336 where
337 P: Persistent,
338 {
339 self.persistent.get_persistent()
340 }
341
342 pub fn insert_persistent<P>(&mut self, persistent: P)
343 where
344 P: Persistent,
345 {
346 self.persistent.insert_persistent(persistent);
347 }
348
349 pub fn init_persistent<P>(&mut self)
350 where
351 P: Persistent + Default,
352 {
353 self.persistent.init_persistent::<P>();
354 }
355}
356
357pub trait SocketType<'socket>: Default {
361 fn update_current_send_seq_num_for_event(
364 &mut self,
365 addr: &SocketAddr,
366 event: &str,
367 ) -> Option<SequenceNumber>;
368
369 fn last_recv_seq_num_for_event(
372 &'socket mut self,
373 addr: &SocketAddr,
374 event: &str,
375 ) -> Option<&'socket mut SequenceNumber>;
376}