nautilus_sockets/client/
mod.rs

1use std::{
2    collections::VecDeque,
3    marker::PhantomData,
4    net::{SocketAddr, ToSocketAddrs, UdpSocket},
5    str::FromStr,
6};
7
8use byteorder::{ByteOrder, LittleEndian};
9
10use crate::{
11    acknowledgement::{manager::AcknowledgementManager, packet::AckNumber},
12    connection::EstablishedConnection,
13    events::EventEmitter,
14    packet::{IntoPacketDelivery, PacketDelivery},
15    persistent::storage::PersistentStorage,
16    sequence::SequenceNumber,
17    socket::{events::SocketEvent, NautSocket, SocketType},
18};
19
20pub type ConnectionId = u16;
21#[derive(Default)]
22pub struct NautClient {
23    /// The [nautilus server](crate::server::NautServer) we are connected to
24    server_connection: Option<EstablishedConnection>,
25}
26
27impl<'socket> SocketType<'socket> for NautClient {
28    fn update_current_send_seq_num_for_event(
29        &mut self,
30        _addr: &std::net::SocketAddr,
31        event: &str,
32    ) -> Option<SequenceNumber> {
33        let server_connection = self.server_connection.as_mut()?;
34
35        let Some(seq) = server_connection.current_send_seq_num.get_mut(event) else {
36            server_connection
37                .current_send_seq_num
38                .insert(event.to_owned(), SequenceNumber::new(0));
39            return Some(SequenceNumber::new(0));
40        };
41
42        *seq += SequenceNumber::new(1);
43
44        Some(*seq)
45    }
46
47    fn last_recv_seq_num_for_event(
48        &'socket mut self,
49        _addr: &std::net::SocketAddr,
50        event: &str,
51    ) -> Option<&'socket mut SequenceNumber> {
52        let server_connection = self.server_connection.as_mut()?;
53
54        if !server_connection.last_seq_num_recv.contains_key(event) {
55            server_connection
56                .last_seq_num_recv
57                .insert(event.to_string(), SequenceNumber::new(0));
58        }
59
60        let seq = server_connection.last_seq_num_recv.get_mut(event)?;
61
62        Some(seq)
63    }
64}
65
66impl<'socket> NautSocket<'socket, NautClient> {
67    /// Creates a new [event listening socket](crate::socket::NautSocket) with a [client](NautClient) type
68    pub fn new<A>(addr: A) -> anyhow::Result<Self>
69    where
70        A: ToSocketAddrs,
71    {
72        let socket = UdpSocket::bind(addr)?;
73        socket.set_nonblocking(true)?;
74
75        let client = NautClient::default();
76        let naut_socket = Self {
77            socket,
78            packet_queue: VecDeque::new(),
79            inner: client,
80            event_emitter: EventEmitter::new(),
81            ack_manager: AcknowledgementManager::new(),
82            phantom: PhantomData,
83            socket_events: Vec::new(),
84            persistent: PersistentStorage::new(),
85        };
86
87        Ok(naut_socket)
88    }
89
90    /// Gets the [address](SocketAddr) of the (server)[crate::server::NautServer] we are connected
91    /// to
92    pub fn get_server_address(&self) -> Option<&SocketAddr> {
93        Some(&self.inner.server_connection.as_ref()?.addr)
94    }
95
96    /// Establishes a connection to another [nautilus compatible socket](crate::socket::NautSocket)
97    pub fn connect_to<A>(&mut self, addr: A) -> anyhow::Result<()>
98    where
99        A: ToSocketAddrs + Into<String> + Clone,
100    {
101        let addr_str = Into::<String>::into(addr.clone());
102        self.inner.server_connection = Some(EstablishedConnection::new(
103            SocketAddr::from_str(addr_str.as_str()).unwrap(),
104        ));
105
106        Ok(self.socket().connect(addr)?)
107    }
108
109    /// Sends an event message to the [server](crate::server::NautServer) we are connected to
110    pub fn send(
111        &mut self,
112        event: &str,
113        buf: &[u8],
114        delivery: PacketDelivery,
115    ) -> anyhow::Result<()> {
116        let server_addr = { self.inner.server_connection.as_ref().unwrap().addr };
117
118        self.send_by_addr(event, buf, delivery, server_addr.to_string())?;
119
120        Ok(())
121    }
122
123    /// Gets the packets from the packet queue and will handle returning
124    /// [ack packets](crate::acknowledgement::packet::AckPacket), resolving sequenced packets and emitting
125    /// listening events
126    pub fn run_events(&mut self) {
127        let event_emitter = std::mem::take(&mut self.event_emitter);
128        let event_emitter_ref = &event_emitter;
129        while let Some((addr, packet)) = self.oldest_packet_in_queue() {
130            let Some(delivery_type) = Self::get_delivery_type_from_packet(&packet) else {
131                self.socket_events
132                    .push(SocketEvent::ReadPacketFail("No delivery type".to_string()));
133                continue;
134            };
135
136            let Ok(delivery_type) =
137                <PacketDelivery as IntoPacketDelivery<u16>>::into_packet_delivery(delivery_type)
138            else {
139                continue;
140            };
141
142            // We have received acknowledgement of a packet we have sent
143            if delivery_type == PacketDelivery::ack_delivery() {
144                let ack_num = AckNumber::new(LittleEndian::read_u32(&packet[2..6]));
145                self.ack_manager.packets_waiting_on_ack.remove(&ack_num);
146
147                continue;
148            }
149
150            // Check size here instead of in poll as ack packets do not fit into padding
151            if packet.len() < Self::PACKET_PADDING {
152                continue;
153            }
154
155            // Gets the event title from the packet
156            let Ok(event) = Self::get_event_from_packet(&packet) else {
157                continue;
158            };
159
160            // Send a packet  to acknowledge the sender we have recieved their packet
161            if delivery_type.is_reliable() {
162                if let Err(e) = self.send_ack_packet(addr, &packet) {
163                    self.socket_events
164                        .push(SocketEvent::SendPacketFail(e.to_string()))
165                }
166            }
167
168            // If its a sequenced packet we must make sure its the latest packet in sequence
169            if delivery_type.is_sequenced() {
170                let Some(seq_num) = Self::get_seq_from_packet(&packet) else {
171                    self.socket_events.push(SocketEvent::ReadPacketFail(
172                        "No sequence number in sequenced packet".to_string(),
173                    ));
174                    continue;
175                };
176
177                if let Some(last_recv_seq_num) =
178                    self.inner.last_recv_seq_num_for_event(&addr, &event)
179                {
180                    // Discard packet
181                    if seq_num < *last_recv_seq_num {
182                        self.socket_events.push(SocketEvent::PacketDiscard(format!(
183                            "Discarding {event} packet, last recv: {:?} recv: {:?}",
184                            *last_recv_seq_num, seq_num
185                        )));
186                        continue;
187                    }
188
189                    *last_recv_seq_num = seq_num;
190                };
191            }
192
193            let bytes = Self::get_packet_bytes(&packet).unwrap_or(Default::default());
194            // Emits the event to the event listeners
195            event_emitter_ref.emit_event(&event, self, (addr, &bytes));
196        }
197
198        event_emitter_ref.emit_polled_events(self);
199
200        // Retry ack packets
201        self.socket_events.clear();
202        self.retry_ack_packets();
203
204        self.event_emitter = event_emitter;
205    }
206}