nurtex_protocol/connection/
connection.rs1use std::fmt::Debug;
2use std::io::{self, Cursor};
3use std::sync::Arc;
4
5use nurtex_encrypt::{AesDecryptor, AesEncryptor};
6use tokio::io::AsyncWriteExt;
7use tokio::net::TcpStream;
8use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
9use tokio::sync::{Mutex, RwLock};
10
11use crate::connection::address::NurtexAddr;
12use crate::connection::reader::{deserialize_packet, read_raw_packet, try_read_raw_packet};
13use crate::connection::writer::{serialize_packet, write_raw_packet};
14use crate::packets::{
15 configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
16 handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
17 login::{ClientsideLoginPacket, ServersideLoginPacket},
18 play::{ClientsidePlayPacket, ServersidePlayPacket},
19 status::{ClientsideStatusPacket, ServersideStatusPacket},
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum ConnectionState {
25 Handshake,
26 Status,
27 Login,
28 Configuration,
29 Play,
30}
31
32#[derive(Debug, Clone)]
34pub enum ClientsidePacket {
35 Handshake(ClientsideHandshakePacket),
36 Status(ClientsideStatusPacket),
37 Login(ClientsideLoginPacket),
38 Configuration(ClientsideConfigurationPacket),
39 Play(ClientsidePlayPacket),
40}
41
42#[derive(Debug, Clone)]
44pub enum ServersidePacket {
45 Handshake(ServersideHandshakePacket),
46 Status(ServersideStatusPacket),
47 Login(ServersideLoginPacket),
48 Configuration(ServersideConfigurationPacket),
49 Play(ServersidePlayPacket),
50}
51
52impl ServersidePacket {
53 pub fn handshake(packet: ServersideHandshakePacket) -> Self {
55 ServersidePacket::Handshake(packet)
56 }
57
58 pub fn status(packet: ServersideStatusPacket) -> Self {
60 ServersidePacket::Status(packet)
61 }
62
63 pub fn login(packet: ServersideLoginPacket) -> Self {
65 ServersidePacket::Login(packet)
66 }
67
68 pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
70 ServersidePacket::Configuration(packet)
71 }
72
73 pub fn play(packet: ServersidePlayPacket) -> Self {
75 ServersidePacket::Play(packet)
76 }
77}
78
79pub struct ConnectionReader {
81 read_stream: OwnedReadHalf,
82 buffer: Cursor<Vec<u8>>,
83 compression_threshold: Arc<RwLock<Option<u32>>>,
84 decryptor: Arc<Mutex<Option<AesDecryptor>>>,
85 state: Arc<RwLock<ConnectionState>>,
86}
87
88pub struct ConnectionWriter {
90 write_stream: OwnedWriteHalf,
91 compression_threshold: Arc<RwLock<Option<u32>>>,
92 encryptor: Arc<Mutex<Option<AesEncryptor>>>,
93}
94
95pub struct NurtexConnection {
97 reader: Arc<Mutex<ConnectionReader>>,
98 writer: Arc<Mutex<ConnectionWriter>>,
99 state: Arc<RwLock<ConnectionState>>,
100 compression_threshold: Arc<RwLock<Option<u32>>>,
101}
102
103impl ConnectionReader {
104 pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
106 let compression_threshold = *self.compression_threshold.read().await;
107 let mut decryptor_guard = self.decryptor.lock().await;
108
109 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
110
111 let mut cursor = Cursor::new(raw_packet.as_ref());
112 let state = *self.state.read().await;
113
114 match state {
115 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
116 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
117 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
118 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
119 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
120 }
121 }
122
123 pub fn try_read_packet(&mut self) -> Result<Option<ClientsidePacket>, std::io::Error> {
125 let compression_threshold = match self.compression_threshold.try_read() {
126 Ok(threshold) => *threshold,
127 Err(_) => return Ok(None),
128 };
129
130 let mut decryptor_guard = match self.decryptor.try_lock() {
131 Ok(guard) => guard,
132 Err(_) => return Ok(None),
133 };
134
135 let Some(raw_packet) = try_read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard)? else {
136 return Ok(None);
137 };
138
139 let mut cursor = Cursor::new(raw_packet.as_ref());
140 let state = match self.state.try_read() {
141 Ok(state) => *state,
142 Err(_) => return Ok(None),
143 };
144
145 let packet = match state {
146 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
147 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
148 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
149 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
150 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
151 };
152
153 Ok(packet)
154 }
155
156 pub fn is_connection_alive(&self) -> bool {
158 match self.read_stream.peer_addr() {
159 Ok(_) => true,
160 Err(_) => false,
161 }
162 }
163
164 pub async fn read_status_packet(&mut self) -> Option<ClientsideStatusPacket> {
166 let compression_threshold = *self.compression_threshold.read().await;
167 let mut decryptor_guard = self.decryptor.lock().await;
168
169 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
170 let mut cursor = Cursor::new(raw_packet.as_ref());
171 deserialize_packet::<ClientsideStatusPacket>(&mut cursor)
172 }
173
174 pub async fn read_login_packet(&mut self) -> Option<ClientsideLoginPacket> {
176 let compression_threshold = *self.compression_threshold.read().await;
177 let mut decryptor_guard = self.decryptor.lock().await;
178
179 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
180 let mut cursor = Cursor::new(raw_packet.as_ref());
181 deserialize_packet::<ClientsideLoginPacket>(&mut cursor)
182 }
183
184 pub async fn read_configuration_packet(&mut self) -> Option<ClientsideConfigurationPacket> {
186 let compression_threshold = *self.compression_threshold.read().await;
187 let mut decryptor_guard = self.decryptor.lock().await;
188
189 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
190 let mut cursor = Cursor::new(raw_packet.as_ref());
191 deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor)
192 }
193
194 pub async fn read_play_packet(&mut self) -> Option<ClientsidePlayPacket> {
196 let compression_threshold = *self.compression_threshold.read().await;
197 let mut decryptor_guard = self.decryptor.lock().await;
198
199 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
200 let mut cursor = Cursor::new(raw_packet.as_ref());
201 deserialize_packet::<ClientsidePlayPacket>(&mut cursor)
202 }
203}
204
205impl ConnectionWriter {
206 pub async fn write_packet(&mut self, packet: ServersidePacket) -> io::Result<()> {
208 let serialized = match packet {
209 ServersidePacket::Handshake(p) => serialize_packet(&p),
210 ServersidePacket::Status(p) => serialize_packet(&p),
211 ServersidePacket::Login(p) => serialize_packet(&p),
212 ServersidePacket::Configuration(p) => serialize_packet(&p),
213 ServersidePacket::Play(p) => serialize_packet(&p),
214 }
215 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
216
217 let compression_threshold = *self.compression_threshold.read().await;
218 let mut encryptor_guard = self.encryptor.lock().await;
219
220 write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
221 }
222
223 pub async fn write_handshake_packet(&mut self, packet: ServersideHandshakePacket) -> io::Result<()> {
225 self.write_packet(ServersidePacket::Handshake(packet)).await
226 }
227
228 pub async fn write_status_packet(&mut self, packet: ServersideStatusPacket) -> io::Result<()> {
230 self.write_packet(ServersidePacket::Status(packet)).await
231 }
232
233 pub async fn write_login_packet(&mut self, packet: ServersideLoginPacket) -> io::Result<()> {
235 self.write_packet(ServersidePacket::Login(packet)).await
236 }
237
238 pub async fn write_configuration_packet(&mut self, packet: ServersideConfigurationPacket) -> io::Result<()> {
240 self.write_packet(ServersidePacket::Configuration(packet)).await
241 }
242
243 pub async fn write_play_packet(&mut self, packet: ServersidePlayPacket) -> io::Result<()> {
245 self.write_packet(ServersidePacket::Play(packet)).await
246 }
247
248 pub async fn shutdown(&mut self) -> io::Result<()> {
250 self.write_stream.shutdown().await
251 }
252}
253
254impl NurtexConnection {
255 pub async fn new(address: &NurtexAddr) -> io::Result<Self> {
257 let stream = TcpStream::connect(address.unpack()).await?;
258 stream.set_nodelay(true)?;
259 Self::new_from_stream(stream).await
260 }
261
262 pub async fn new_from_stream(stream: TcpStream) -> io::Result<Self> {
264 let (read_stream, write_stream) = stream.into_split();
265
266 let state = Arc::new(RwLock::new(ConnectionState::Handshake));
267 let compression_threshold = Arc::new(RwLock::new(None));
268
269 let reader = ConnectionReader {
270 read_stream,
271 buffer: Cursor::new(Vec::new()),
272 compression_threshold: Arc::clone(&compression_threshold),
273 decryptor: Arc::new(Mutex::new(None)),
274 state: Arc::clone(&state),
275 };
276
277 let writer = ConnectionWriter {
278 write_stream,
279 compression_threshold: Arc::clone(&compression_threshold),
280 encryptor: Arc::new(Mutex::new(None)),
281 };
282
283 Ok(NurtexConnection {
284 reader: Arc::new(Mutex::new(reader)),
285 writer: Arc::new(Mutex::new(writer)),
286 state,
287 compression_threshold,
288 })
289 }
290
291 pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
293 self.reader.clone()
294 }
295
296 pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
298 self.writer.clone()
299 }
300
301 pub async fn get_state(&self) -> ConnectionState {
303 *self.state.read().await
304 }
305
306 pub async fn set_state(&self, state: ConnectionState) {
308 *self.state.write().await = state;
309 }
310
311 pub async fn read_packet(&self) -> Option<ClientsidePacket> {
313 let mut reader = self.reader.lock().await;
314 reader.read_packet().await
315 }
316
317 pub fn try_read_packet(&self) -> Result<Option<ClientsidePacket>, std::io::Error> {
319 let mut reader = match self.reader.try_lock() {
320 Ok(reader) => reader,
321 Err(_) => return Ok(None),
322 };
323
324 reader.try_read_packet()
325 }
326
327 pub fn is_connection_alive(&self) -> bool {
329 let reader = match self.reader.try_lock() {
330 Ok(reader) => reader,
331 Err(_) => return true,
332 };
333
334 reader.is_connection_alive()
335 }
336
337 pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
339 let mut reader = self.reader.lock().await;
340 reader.read_status_packet().await
341 }
342
343 pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
345 let mut reader = self.reader.lock().await;
346 reader.read_login_packet().await
347 }
348
349 pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
351 let mut reader = self.reader.lock().await;
352 reader.read_configuration_packet().await
353 }
354
355 pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
357 let mut reader = self.reader.lock().await;
358 reader.read_play_packet().await
359 }
360
361 pub async fn write_packet(&self, packet: ServersidePacket) -> io::Result<()> {
363 let mut writer = self.writer.lock().await;
364 writer.write_packet(packet).await
365 }
366
367 pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> io::Result<()> {
369 let mut writer = self.writer.lock().await;
370 writer.write_handshake_packet(packet).await
371 }
372
373 pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> io::Result<()> {
375 let mut writer = self.writer.lock().await;
376 writer.write_status_packet(packet).await
377 }
378
379 pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> io::Result<()> {
381 let mut writer = self.writer.lock().await;
382 writer.write_login_packet(packet).await
383 }
384
385 pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> io::Result<()> {
387 let mut writer = self.writer.lock().await;
388 writer.write_configuration_packet(packet).await
389 }
390
391 pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> io::Result<()> {
393 let mut writer = self.writer.lock().await;
394 writer.write_play_packet(packet).await
395 }
396
397 pub async fn shutdown(&self) -> io::Result<()> {
399 let mut writer = self.writer.lock().await;
400 writer.shutdown().await
401 }
402
403 pub async fn set_compression_threshold(&self, threshold: i32) {
405 let new_threshold = if threshold >= 0 { Some(threshold as u32) } else { None };
406
407 *self.compression_threshold.write().await = new_threshold;
408 }
409
410 pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
413 let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
414
415 {
416 let reader = self.reader.lock().await;
417 *reader.decryptor.lock().await = Some(decryptor);
418 }
419
420 {
421 let writer = self.writer.lock().await;
422 *writer.encryptor.lock().await = Some(encryptor);
423 }
424 }
425}