nurtex_protocol/connection/
connection.rs1use std::fmt::Debug;
2use std::io::{Cursor, Error, ErrorKind};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicI8, AtomicI32, Ordering};
5
6use nurtex_encrypt::{AesDecryptor, AesEncryptor};
7use nurtex_proxy::Proxy;
8use nurtex_proxy::result::ProxyResult;
9use tokio::io::AsyncWriteExt;
10use tokio::net::TcpStream;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::sync::Mutex;
13
14use crate::connection::reader::{deserialize_packet, read_raw_packet};
15use crate::connection::writer::{serialize_packet, write_raw_packet};
16use crate::packets::{
17 configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
18 handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
19 login::{ClientsideLoginPacket, ServersideLoginPacket},
20 play::{ClientsidePlayPacket, ServersidePlayPacket},
21 status::{ClientsideStatusPacket, ServersideStatusPacket},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum ConnectionState {
27 Handshake,
28 Status,
29 Login,
30 Configuration,
31 Play,
32}
33
34impl From<i8> for ConnectionState {
35 fn from(value: i8) -> Self {
36 match value {
37 -1 => Self::Status,
38 0 => Self::Handshake,
39 1 => Self::Login,
40 2 => Self::Configuration,
41 3 => Self::Play,
42 _ => Self::Handshake,
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub enum ClientsidePacket {
50 Handshake(ClientsideHandshakePacket),
51 Status(ClientsideStatusPacket),
52 Login(ClientsideLoginPacket),
53 Configuration(ClientsideConfigurationPacket),
54 Play(ClientsidePlayPacket),
55}
56
57#[derive(Debug, Clone)]
59pub enum ServersidePacket {
60 Handshake(ServersideHandshakePacket),
61 Status(ServersideStatusPacket),
62 Login(ServersideLoginPacket),
63 Configuration(ServersideConfigurationPacket),
64 Play(ServersidePlayPacket),
65}
66
67impl ServersidePacket {
68 pub fn handshake(packet: ServersideHandshakePacket) -> Self {
70 ServersidePacket::Handshake(packet)
71 }
72
73 pub fn status(packet: ServersideStatusPacket) -> Self {
75 ServersidePacket::Status(packet)
76 }
77
78 pub fn login(packet: ServersideLoginPacket) -> Self {
80 ServersidePacket::Login(packet)
81 }
82
83 pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
85 ServersidePacket::Configuration(packet)
86 }
87
88 pub fn play(packet: ServersidePlayPacket) -> Self {
90 ServersidePacket::Play(packet)
91 }
92}
93
94pub struct ConnectionReader {
96 read_stream: OwnedReadHalf,
98
99 buffer: Cursor<Vec<u8>>,
101
102 decryptor: Arc<Mutex<Option<AesDecryptor>>>,
104
105 state: Arc<AtomicI8>,
107
108 compression_threshold: Arc<AtomicI32>,
110}
111
112pub struct ConnectionWriter {
114 write_stream: OwnedWriteHalf,
116
117 encryptor: Arc<Mutex<Option<AesEncryptor>>>,
119
120 compression_threshold: Arc<AtomicI32>,
122}
123
124pub struct NurtexConnection {
126 reader: Arc<Mutex<ConnectionReader>>,
128
129 writer: Arc<Mutex<ConnectionWriter>>,
131
132 state: Arc<AtomicI8>,
134
135 compression_threshold: Arc<AtomicI32>,
137}
138
139impl ConnectionReader {
140 pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
142 let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
143 let state = ConnectionState::from(self.state.load(Ordering::SeqCst));
144 let mut decryptor_guard = self.decryptor.lock().await;
145
146 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
147 let mut cursor = Cursor::new(raw_packet.as_ref());
148
149 match state {
150 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
151 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
152 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
153 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
154 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
155 }
156 }
157}
158
159impl ConnectionWriter {
160 pub async fn write_packet(&mut self, packet: ServersidePacket) -> std::io::Result<()> {
162 let serialized = match packet {
163 ServersidePacket::Handshake(p) => serialize_packet(&p),
164 ServersidePacket::Status(p) => serialize_packet(&p),
165 ServersidePacket::Login(p) => serialize_packet(&p),
166 ServersidePacket::Configuration(p) => serialize_packet(&p),
167 ServersidePacket::Play(p) => serialize_packet(&p),
168 }
169 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
170
171 let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
172 let mut encryptor_guard = self.encryptor.lock().await;
173
174 write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
175 }
176 pub async fn shutdown(&mut self) -> std::io::Result<()> {
178 self.write_stream.shutdown().await
179 }
180}
181
182impl NurtexConnection {
183 pub async fn new(server_host: impl Into<String>, server_port: u16) -> std::io::Result<Self> {
185 let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
186 stream.set_nodelay(true)?;
187 Self::new_from_stream(stream).await
188 }
189
190 pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> std::io::Result<Self> {
192 let stream = match proxy.connect(server_host, server_port).await {
193 ProxyResult::Ok(s) => s,
194 ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
195 };
196
197 stream.set_nodelay(true)?;
198
199 Self::new_from_stream(stream).await
200 }
201
202 pub async fn new_from_stream(stream: TcpStream) -> std::io::Result<Self> {
204 let (rh, wh) = stream.into_split();
205
206 let state = Arc::new(AtomicI8::new(0));
208
209 let compression_threshold = Arc::new(AtomicI32::new(-1));
211
212 let reader = ConnectionReader {
213 read_stream: rh,
214 buffer: Cursor::new(Vec::new()),
215 compression_threshold: Arc::clone(&compression_threshold),
216 decryptor: Arc::new(Mutex::new(None)),
217 state: Arc::clone(&state),
218 };
219
220 let writer = ConnectionWriter {
221 write_stream: wh,
222 compression_threshold: Arc::clone(&compression_threshold),
223 encryptor: Arc::new(Mutex::new(None)),
224 };
225
226 Ok(NurtexConnection {
227 reader: Arc::new(Mutex::new(reader)),
228 writer: Arc::new(Mutex::new(writer)),
229 state: state,
230 compression_threshold,
231 })
232 }
233
234 pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
236 self.reader.clone()
237 }
238
239 pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
241 self.writer.clone()
242 }
243
244 pub async fn get_state(&self) -> ConnectionState {
246 ConnectionState::from(self.state.load(Ordering::SeqCst))
247 }
248
249 pub async fn set_state(&self, state: ConnectionState) {
251 let state_id = match state {
252 ConnectionState::Status => -1,
253 ConnectionState::Handshake => 0,
254 ConnectionState::Login => 1,
255 ConnectionState::Configuration => 2,
256 ConnectionState::Play => 3,
257 };
258
259 self.state.store(state_id, Ordering::SeqCst);
260 }
261
262 pub async fn read_packet(&self) -> Option<ClientsidePacket> {
264 let mut reader = self.reader.lock().await;
265 reader.read_packet().await
266 }
267
268 pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
270 let mut reader = self.reader.lock().await;
271
272 if let Some(ClientsidePacket::Status(packet)) = reader.read_packet().await {
273 Some(packet)
274 } else {
275 None
276 }
277 }
278
279 pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
281 let mut reader = self.reader.lock().await;
282
283 if let Some(ClientsidePacket::Login(packet)) = reader.read_packet().await {
284 Some(packet)
285 } else {
286 None
287 }
288 }
289
290 pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
292 let mut reader = self.reader.lock().await;
293
294 if let Some(ClientsidePacket::Configuration(packet)) = reader.read_packet().await {
295 Some(packet)
296 } else {
297 None
298 }
299 }
300
301 pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
303 let mut reader = self.reader.lock().await;
304
305 if let Some(ClientsidePacket::Play(packet)) = reader.read_packet().await {
306 Some(packet)
307 } else {
308 None
309 }
310 }
311
312 pub async fn write_packet(&self, packet: ServersidePacket) -> std::io::Result<()> {
314 let mut writer = self.writer.lock().await;
315 writer.write_packet(packet).await
316 }
317
318 pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> std::io::Result<()> {
320 let mut writer = self.writer.lock().await;
321 writer.write_packet(ServersidePacket::Handshake(packet)).await
322 }
323
324 pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> std::io::Result<()> {
326 let mut writer = self.writer.lock().await;
327 writer.write_packet(ServersidePacket::Status(packet)).await
328 }
329
330 pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> std::io::Result<()> {
332 let mut writer = self.writer.lock().await;
333 writer.write_packet(ServersidePacket::Login(packet)).await
334 }
335
336 pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> std::io::Result<()> {
338 let mut writer = self.writer.lock().await;
339 writer.write_packet(ServersidePacket::Configuration(packet)).await
340 }
341
342 pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> std::io::Result<()> {
344 let mut writer = self.writer.lock().await;
345 writer.write_packet(ServersidePacket::Play(packet)).await
346 }
347
348 pub async fn shutdown(&self) -> std::io::Result<()> {
350 let mut writer = self.writer.lock().await;
351 writer.shutdown().await
352 }
353
354 pub async fn set_compression_threshold(&self, threshold: i32) {
356 self.compression_threshold.store(threshold, Ordering::SeqCst);
357 }
358
359 pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
362 let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
363
364 {
365 let reader = self.reader.lock().await;
366 *reader.decryptor.lock().await = Some(decryptor);
367 }
368
369 {
370 let writer = self.writer.lock().await;
371 *writer.encryptor.lock().await = Some(encryptor);
372 }
373 }
374}