nurtex_protocol/connection/
connection.rs1use std::fmt::Debug;
2use std::io::{self, Cursor, Error, ErrorKind};
3use std::sync::Arc;
4
5use nurtex_encrypt::{AesDecryptor, AesEncryptor};
6use nurtex_proxy::Proxy;
7use nurtex_proxy::result::ProxyResult;
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11use tokio::sync::{Mutex, RwLock};
12
13use crate::connection::reader::{deserialize_packet, read_raw_packet, try_read_raw_packet};
14use crate::connection::writer::{serialize_packet, write_raw_packet};
15use crate::packets::{
16 configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
17 handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
18 login::{ClientsideLoginPacket, ServersideLoginPacket},
19 play::{ClientsidePlayPacket, ServersidePlayPacket},
20 status::{ClientsideStatusPacket, ServersideStatusPacket},
21};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConnectionState {
26 Handshake,
27 Status,
28 Login,
29 Configuration,
30 Play,
31}
32
33#[derive(Debug, Clone)]
35pub enum ClientsidePacket {
36 Handshake(ClientsideHandshakePacket),
37 Status(ClientsideStatusPacket),
38 Login(ClientsideLoginPacket),
39 Configuration(ClientsideConfigurationPacket),
40 Play(ClientsidePlayPacket),
41}
42
43#[derive(Debug, Clone)]
45pub enum ServersidePacket {
46 Handshake(ServersideHandshakePacket),
47 Status(ServersideStatusPacket),
48 Login(ServersideLoginPacket),
49 Configuration(ServersideConfigurationPacket),
50 Play(ServersidePlayPacket),
51}
52
53impl ServersidePacket {
54 pub fn handshake(packet: ServersideHandshakePacket) -> Self {
56 ServersidePacket::Handshake(packet)
57 }
58
59 pub fn status(packet: ServersideStatusPacket) -> Self {
61 ServersidePacket::Status(packet)
62 }
63
64 pub fn login(packet: ServersideLoginPacket) -> Self {
66 ServersidePacket::Login(packet)
67 }
68
69 pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
71 ServersidePacket::Configuration(packet)
72 }
73
74 pub fn play(packet: ServersidePlayPacket) -> Self {
76 ServersidePacket::Play(packet)
77 }
78}
79
80pub struct ConnectionReader {
82 read_stream: OwnedReadHalf,
83 buffer: Cursor<Vec<u8>>,
84 compression_threshold: Arc<RwLock<Option<u32>>>,
85 decryptor: Arc<Mutex<Option<AesDecryptor>>>,
86 state: Arc<RwLock<ConnectionState>>,
87}
88
89pub struct ConnectionWriter {
91 write_stream: OwnedWriteHalf,
92 compression_threshold: Arc<RwLock<Option<u32>>>,
93 encryptor: Arc<Mutex<Option<AesEncryptor>>>,
94}
95
96pub struct NurtexConnection {
98 reader: Arc<Mutex<ConnectionReader>>,
99 writer: Arc<Mutex<ConnectionWriter>>,
100 state: Arc<RwLock<ConnectionState>>,
101 compression_threshold: Arc<RwLock<Option<u32>>>,
102}
103
104impl ConnectionReader {
105 pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
107 let compression_threshold = *self.compression_threshold.read().await;
108 let mut decryptor_guard = self.decryptor.lock().await;
109
110 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
111
112 let mut cursor = Cursor::new(raw_packet.as_ref());
113 let state = *self.state.read().await;
114
115 match state {
116 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
117 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
118 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
119 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
120 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
121 }
122 }
123
124 pub fn try_read_packet(&mut self) -> Result<Option<ClientsidePacket>, std::io::Error> {
126 let compression_threshold = match self.compression_threshold.try_read() {
127 Ok(threshold) => *threshold,
128 Err(_) => return Ok(None),
129 };
130
131 let mut decryptor_guard = match self.decryptor.try_lock() {
132 Ok(guard) => guard,
133 Err(_) => return Ok(None),
134 };
135
136 let Some(raw_packet) = try_read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard)? else {
137 return Ok(None);
138 };
139
140 let mut cursor = Cursor::new(raw_packet.as_ref());
141 let state = match self.state.try_read() {
142 Ok(state) => *state,
143 Err(_) => return Ok(None),
144 };
145
146 let packet = match state {
147 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
148 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
149 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
150 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
151 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
152 };
153
154 Ok(packet)
155 }
156
157 pub async fn read_status_packet(&mut self) -> Option<ClientsideStatusPacket> {
159 let compression_threshold = *self.compression_threshold.read().await;
160 let mut decryptor_guard = self.decryptor.lock().await;
161
162 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
163 let mut cursor = Cursor::new(raw_packet.as_ref());
164 deserialize_packet::<ClientsideStatusPacket>(&mut cursor)
165 }
166
167 pub async fn read_login_packet(&mut self) -> Option<ClientsideLoginPacket> {
169 let compression_threshold = *self.compression_threshold.read().await;
170 let mut decryptor_guard = self.decryptor.lock().await;
171
172 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
173 let mut cursor = Cursor::new(raw_packet.as_ref());
174 deserialize_packet::<ClientsideLoginPacket>(&mut cursor)
175 }
176
177 pub async fn read_configuration_packet(&mut self) -> Option<ClientsideConfigurationPacket> {
179 let compression_threshold = *self.compression_threshold.read().await;
180 let mut decryptor_guard = self.decryptor.lock().await;
181
182 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
183 let mut cursor = Cursor::new(raw_packet.as_ref());
184 deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor)
185 }
186
187 pub async fn read_play_packet(&mut self) -> Option<ClientsidePlayPacket> {
189 let compression_threshold = *self.compression_threshold.read().await;
190 let mut decryptor_guard = self.decryptor.lock().await;
191
192 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
193 let mut cursor = Cursor::new(raw_packet.as_ref());
194 deserialize_packet::<ClientsidePlayPacket>(&mut cursor)
195 }
196}
197
198impl ConnectionWriter {
199 pub async fn write_packet(&mut self, packet: ServersidePacket) -> io::Result<()> {
201 let serialized = match packet {
202 ServersidePacket::Handshake(p) => serialize_packet(&p),
203 ServersidePacket::Status(p) => serialize_packet(&p),
204 ServersidePacket::Login(p) => serialize_packet(&p),
205 ServersidePacket::Configuration(p) => serialize_packet(&p),
206 ServersidePacket::Play(p) => serialize_packet(&p),
207 }
208 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
209
210 let compression_threshold = *self.compression_threshold.read().await;
211 let mut encryptor_guard = self.encryptor.lock().await;
212
213 write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
214 }
215
216 pub async fn write_handshake_packet(&mut self, packet: ServersideHandshakePacket) -> io::Result<()> {
218 self.write_packet(ServersidePacket::Handshake(packet)).await
219 }
220
221 pub async fn write_status_packet(&mut self, packet: ServersideStatusPacket) -> io::Result<()> {
223 self.write_packet(ServersidePacket::Status(packet)).await
224 }
225
226 pub async fn write_login_packet(&mut self, packet: ServersideLoginPacket) -> io::Result<()> {
228 self.write_packet(ServersidePacket::Login(packet)).await
229 }
230
231 pub async fn write_configuration_packet(&mut self, packet: ServersideConfigurationPacket) -> io::Result<()> {
233 self.write_packet(ServersidePacket::Configuration(packet)).await
234 }
235
236 pub async fn write_play_packet(&mut self, packet: ServersidePlayPacket) -> io::Result<()> {
238 self.write_packet(ServersidePacket::Play(packet)).await
239 }
240
241 pub async fn shutdown(&mut self) -> io::Result<()> {
243 self.write_stream.shutdown().await
244 }
245}
246
247impl NurtexConnection {
248 pub async fn new(server_host: impl Into<String>, server_port: u16) -> io::Result<Self> {
250 let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
251 stream.set_nodelay(true)?;
252 Self::new_from_stream(stream).await
253 }
254
255 pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> io::Result<Self> {
257 proxy.bind(server_host.into(), server_port);
258
259 let stream = match proxy.connect().await {
260 ProxyResult::Ok(s) => s,
261 ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
262 };
263
264 stream.set_nodelay(true)?;
265
266 Self::new_from_stream(stream).await
267 }
268
269 pub async fn new_from_stream(stream: TcpStream) -> io::Result<Self> {
271 let (read_stream, write_stream) = stream.into_split();
272
273 let state = Arc::new(RwLock::new(ConnectionState::Handshake));
274 let compression_threshold = Arc::new(RwLock::new(None));
275
276 let reader = ConnectionReader {
277 read_stream,
278 buffer: Cursor::new(Vec::new()),
279 compression_threshold: Arc::clone(&compression_threshold),
280 decryptor: Arc::new(Mutex::new(None)),
281 state: Arc::clone(&state),
282 };
283
284 let writer = ConnectionWriter {
285 write_stream,
286 compression_threshold: Arc::clone(&compression_threshold),
287 encryptor: Arc::new(Mutex::new(None)),
288 };
289
290 Ok(NurtexConnection {
291 reader: Arc::new(Mutex::new(reader)),
292 writer: Arc::new(Mutex::new(writer)),
293 state,
294 compression_threshold,
295 })
296 }
297
298 pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
300 self.reader.clone()
301 }
302
303 pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
305 self.writer.clone()
306 }
307
308 pub async fn get_state(&self) -> ConnectionState {
310 *self.state.read().await
311 }
312
313 pub async fn set_state(&self, state: ConnectionState) {
315 *self.state.write().await = state;
316 }
317
318 pub async fn read_packet(&self) -> Option<ClientsidePacket> {
320 let mut reader = self.reader.lock().await;
321 reader.read_packet().await
322 }
323
324 pub fn try_read_packet(&self) -> Result<Option<ClientsidePacket>, std::io::Error> {
326 let mut reader = match self.reader.try_lock() {
327 Ok(reader) => reader,
328 Err(_) => return Ok(None),
329 };
330
331 reader.try_read_packet()
332 }
333
334 pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
336 let mut reader = self.reader.lock().await;
337 reader.read_status_packet().await
338 }
339
340 pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
342 let mut reader = self.reader.lock().await;
343 reader.read_login_packet().await
344 }
345
346 pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
348 let mut reader = self.reader.lock().await;
349 reader.read_configuration_packet().await
350 }
351
352 pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
354 let mut reader = self.reader.lock().await;
355 reader.read_play_packet().await
356 }
357
358 pub async fn write_packet(&self, packet: ServersidePacket) -> io::Result<()> {
360 let mut writer = self.writer.lock().await;
361 writer.write_packet(packet).await
362 }
363
364 pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> io::Result<()> {
366 let mut writer = self.writer.lock().await;
367 writer.write_handshake_packet(packet).await
368 }
369
370 pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> io::Result<()> {
372 let mut writer = self.writer.lock().await;
373 writer.write_status_packet(packet).await
374 }
375
376 pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> io::Result<()> {
378 let mut writer = self.writer.lock().await;
379 writer.write_login_packet(packet).await
380 }
381
382 pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> io::Result<()> {
384 let mut writer = self.writer.lock().await;
385 writer.write_configuration_packet(packet).await
386 }
387
388 pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> io::Result<()> {
390 let mut writer = self.writer.lock().await;
391 writer.write_play_packet(packet).await
392 }
393
394 pub async fn shutdown(&self) -> io::Result<()> {
396 let mut writer = self.writer.lock().await;
397 writer.shutdown().await
398 }
399
400 pub async fn set_compression_threshold(&self, threshold: i32) {
402 let new_threshold = if threshold >= 0 { Some(threshold as u32) } else { None };
403
404 *self.compression_threshold.write().await = new_threshold;
405 }
406
407 pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
410 let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
411
412 {
413 let reader = self.reader.lock().await;
414 *reader.decryptor.lock().await = Some(decryptor);
415 }
416
417 {
418 let writer = self.writer.lock().await;
419 *writer.encryptor.lock().await = Some(encryptor);
420 }
421 }
422}