nautilus_sockets/client/
mod.rs1use std::{
2 collections::VecDeque,
3 marker::PhantomData,
4 net::{SocketAddr, ToSocketAddrs, UdpSocket},
5 str::FromStr,
6};
7
8use byteorder::{ByteOrder, LittleEndian};
9
10use crate::{
11 acknowledgement::{manager::AcknowledgementManager, packet::AckNumber},
12 connection::EstablishedConnection,
13 events::EventEmitter,
14 packet::{IntoPacketDelivery, PacketDelivery},
15 persistent::storage::PersistentStorage,
16 sequence::SequenceNumber,
17 socket::{events::SocketEvent, NautSocket, SocketType},
18};
19
20pub type ConnectionId = u16;
21#[derive(Default)]
22pub struct NautClient {
23 server_connection: Option<EstablishedConnection>,
25}
26
27impl<'socket> SocketType<'socket> for NautClient {
28 fn update_current_send_seq_num_for_event(
29 &mut self,
30 _addr: &std::net::SocketAddr,
31 event: &str,
32 ) -> Option<SequenceNumber> {
33 let server_connection = self.server_connection.as_mut()?;
34
35 let Some(seq) = server_connection.current_send_seq_num.get_mut(event) else {
36 server_connection
37 .current_send_seq_num
38 .insert(event.to_owned(), SequenceNumber::new(0));
39 return Some(SequenceNumber::new(0));
40 };
41
42 *seq += SequenceNumber::new(1);
43
44 Some(*seq)
45 }
46
47 fn last_recv_seq_num_for_event(
48 &'socket mut self,
49 _addr: &std::net::SocketAddr,
50 event: &str,
51 ) -> Option<&'socket mut SequenceNumber> {
52 let server_connection = self.server_connection.as_mut()?;
53
54 if !server_connection.last_seq_num_recv.contains_key(event) {
55 server_connection
56 .last_seq_num_recv
57 .insert(event.to_string(), SequenceNumber::new(0));
58 }
59
60 let seq = server_connection.last_seq_num_recv.get_mut(event)?;
61
62 Some(seq)
63 }
64}
65
66impl<'socket> NautSocket<'socket, NautClient> {
67 pub fn new<A>(addr: A) -> anyhow::Result<Self>
69 where
70 A: ToSocketAddrs,
71 {
72 let socket = UdpSocket::bind(addr)?;
73 socket.set_nonblocking(true)?;
74
75 let client = NautClient::default();
76 let naut_socket = Self {
77 socket,
78 packet_queue: VecDeque::new(),
79 inner: client,
80 event_emitter: EventEmitter::new(),
81 ack_manager: AcknowledgementManager::new(),
82 phantom: PhantomData,
83 socket_events: Vec::new(),
84 persistent: PersistentStorage::new(),
85 };
86
87 Ok(naut_socket)
88 }
89
90 pub fn get_server_address(&self) -> Option<&SocketAddr> {
93 Some(&self.inner.server_connection.as_ref()?.addr)
94 }
95
96 pub fn connect_to<A>(&mut self, addr: A) -> anyhow::Result<()>
98 where
99 A: ToSocketAddrs + Into<String> + Clone,
100 {
101 let addr_str = Into::<String>::into(addr.clone());
102 self.inner.server_connection = Some(EstablishedConnection::new(
103 SocketAddr::from_str(addr_str.as_str()).unwrap(),
104 ));
105
106 Ok(self.socket().connect(addr)?)
107 }
108
109 pub fn send(
111 &mut self,
112 event: &str,
113 buf: &[u8],
114 delivery: PacketDelivery,
115 ) -> anyhow::Result<()> {
116 let server_addr = { self.inner.server_connection.as_ref().unwrap().addr };
117
118 self.send_by_addr(event, buf, delivery, server_addr.to_string())?;
119
120 Ok(())
121 }
122
123 pub fn run_events(&mut self) {
127 let event_emitter = std::mem::take(&mut self.event_emitter);
128 let event_emitter_ref = &event_emitter;
129 while let Some((addr, packet)) = self.oldest_packet_in_queue() {
130 let Some(delivery_type) = Self::get_delivery_type_from_packet(&packet) else {
131 self.socket_events
132 .push(SocketEvent::ReadPacketFail("No delivery type".to_string()));
133 continue;
134 };
135
136 let Ok(delivery_type) =
137 <PacketDelivery as IntoPacketDelivery<u16>>::into_packet_delivery(delivery_type)
138 else {
139 continue;
140 };
141
142 if delivery_type == PacketDelivery::ack_delivery() {
144 let ack_num = AckNumber::new(LittleEndian::read_u32(&packet[2..6]));
145 self.ack_manager.packets_waiting_on_ack.remove(&ack_num);
146
147 continue;
148 }
149
150 if packet.len() < Self::PACKET_PADDING {
152 continue;
153 }
154
155 let Ok(event) = Self::get_event_from_packet(&packet) else {
157 continue;
158 };
159
160 if delivery_type.is_reliable() {
162 if let Err(e) = self.send_ack_packet(addr, &packet) {
163 self.socket_events
164 .push(SocketEvent::SendPacketFail(e.to_string()))
165 }
166 }
167
168 if delivery_type.is_sequenced() {
170 let Some(seq_num) = Self::get_seq_from_packet(&packet) else {
171 self.socket_events.push(SocketEvent::ReadPacketFail(
172 "No sequence number in sequenced packet".to_string(),
173 ));
174 continue;
175 };
176
177 if let Some(last_recv_seq_num) =
178 self.inner.last_recv_seq_num_for_event(&addr, &event)
179 {
180 if seq_num < *last_recv_seq_num {
182 self.socket_events.push(SocketEvent::PacketDiscard(format!(
183 "Discarding {event} packet, last recv: {:?} recv: {:?}",
184 *last_recv_seq_num, seq_num
185 )));
186 continue;
187 }
188
189 *last_recv_seq_num = seq_num;
190 };
191 }
192
193 let bytes = Self::get_packet_bytes(&packet).unwrap_or(Default::default());
194 event_emitter_ref.emit_event(&event, self, (addr, &bytes));
196 }
197
198 event_emitter_ref.emit_polled_events(self);
199
200 self.socket_events.clear();
202 self.retry_ack_packets();
203
204 self.event_emitter = event_emitter;
205 }
206}