nurtex_protocol/connection/
connection.rs1use std::fmt::Debug;
2use std::io::{Cursor, Error, ErrorKind};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicI8, AtomicI32, Ordering};
5
6use nurtex_encrypt::{AesDecryptor, AesEncryptor};
7use nurtex_proxy::{Proxy, ProxyResult};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11use tokio::sync::Mutex;
12
13use crate::connection::reader::{deserialize_packet, read_raw_packet};
14use crate::connection::writer::{serialize_packet, write_raw_packet};
15use crate::packets::{
16 configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
17 handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
18 login::{ClientsideLoginPacket, ServersideLoginPacket},
19 play::{ClientsidePlayPacket, ServersidePlayPacket},
20 status::{ClientsideStatusPacket, ServersideStatusPacket},
21};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConnectionState {
26 Handshake,
27 Status,
28 Login,
29 Configuration,
30 Play,
31}
32
33impl From<i8> for ConnectionState {
34 fn from(value: i8) -> Self {
35 match value {
36 -1 => Self::Status,
37 0 => Self::Handshake,
38 1 => Self::Login,
39 2 => Self::Configuration,
40 3 => Self::Play,
41 _ => Self::Handshake,
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub enum ClientsidePacket {
49 Handshake(ClientsideHandshakePacket),
50 Status(ClientsideStatusPacket),
51 Login(ClientsideLoginPacket),
52 Configuration(ClientsideConfigurationPacket),
53 Play(ClientsidePlayPacket),
54}
55
56#[derive(Debug, Clone)]
58pub enum ServersidePacket {
59 Handshake(ServersideHandshakePacket),
60 Status(ServersideStatusPacket),
61 Login(ServersideLoginPacket),
62 Configuration(ServersideConfigurationPacket),
63 Play(ServersidePlayPacket),
64}
65
66impl ServersidePacket {
67 pub fn handshake(packet: ServersideHandshakePacket) -> Self {
69 ServersidePacket::Handshake(packet)
70 }
71
72 pub fn status(packet: ServersideStatusPacket) -> Self {
74 ServersidePacket::Status(packet)
75 }
76
77 pub fn login(packet: ServersideLoginPacket) -> Self {
79 ServersidePacket::Login(packet)
80 }
81
82 pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
84 ServersidePacket::Configuration(packet)
85 }
86
87 pub fn play(packet: ServersidePlayPacket) -> Self {
89 ServersidePacket::Play(packet)
90 }
91}
92
93pub struct ConnectionReader {
95 read_stream: OwnedReadHalf,
97
98 buffer: Cursor<Vec<u8>>,
100
101 decryptor: Arc<Mutex<Option<AesDecryptor>>>,
103
104 state: Arc<AtomicI8>,
106
107 compression_threshold: Arc<AtomicI32>,
109}
110
111pub struct ConnectionWriter {
113 write_stream: OwnedWriteHalf,
115
116 encryptor: Arc<Mutex<Option<AesEncryptor>>>,
118
119 compression_threshold: Arc<AtomicI32>,
121}
122
123pub struct NurtexConnection {
125 reader: Arc<Mutex<ConnectionReader>>,
127
128 writer: Arc<Mutex<ConnectionWriter>>,
130
131 state: Arc<AtomicI8>,
133
134 compression_threshold: Arc<AtomicI32>,
136}
137
138impl ConnectionReader {
139 pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
141 let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
142 let state = ConnectionState::from(self.state.load(Ordering::SeqCst));
143 let mut decryptor_guard = self.decryptor.lock().await;
144
145 let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
146 let mut cursor = Cursor::new(raw_packet.as_ref());
147
148 match state {
149 ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
150 ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
151 ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
152 ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
153 ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
154 }
155 }
156}
157
158impl ConnectionWriter {
159 pub async fn write_packet(&mut self, packet: ServersidePacket) -> std::io::Result<()> {
161 let serialized = match packet {
162 ServersidePacket::Handshake(p) => serialize_packet(&p),
163 ServersidePacket::Status(p) => serialize_packet(&p),
164 ServersidePacket::Login(p) => serialize_packet(&p),
165 ServersidePacket::Configuration(p) => serialize_packet(&p),
166 ServersidePacket::Play(p) => serialize_packet(&p),
167 }
168 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "failed to serialize packet"))?;
169
170 let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
171 let mut encryptor_guard = self.encryptor.lock().await;
172
173 write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
174 }
175 pub async fn shutdown(&mut self) -> std::io::Result<()> {
177 self.write_stream.shutdown().await
178 }
179}
180
181impl NurtexConnection {
182 pub async fn new(server_host: impl Into<String>, server_port: u16) -> std::io::Result<Self> {
184 let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
185 stream.set_nodelay(true)?;
186 Self::new_from_stream(stream).await
187 }
188
189 pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> std::io::Result<Self> {
191 let stream = match proxy.connect(server_host, server_port).await {
192 ProxyResult::Ok(s) => s,
193 ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
194 };
195
196 stream.set_nodelay(true)?;
197
198 Self::new_from_stream(stream).await
199 }
200
201 pub async fn new_from_stream(stream: TcpStream) -> std::io::Result<Self> {
203 let (rh, wh) = stream.into_split();
204
205 let state = Arc::new(AtomicI8::new(0));
207
208 let compression_threshold = Arc::new(AtomicI32::new(-1));
210
211 let reader = ConnectionReader {
212 read_stream: rh,
213 buffer: Cursor::new(Vec::new()),
214 compression_threshold: Arc::clone(&compression_threshold),
215 decryptor: Arc::new(Mutex::new(None)),
216 state: Arc::clone(&state),
217 };
218
219 let writer = ConnectionWriter {
220 write_stream: wh,
221 compression_threshold: Arc::clone(&compression_threshold),
222 encryptor: Arc::new(Mutex::new(None)),
223 };
224
225 Ok(NurtexConnection {
226 reader: Arc::new(Mutex::new(reader)),
227 writer: Arc::new(Mutex::new(writer)),
228 state: state,
229 compression_threshold,
230 })
231 }
232
233 pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
235 self.reader.clone()
236 }
237
238 pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
240 self.writer.clone()
241 }
242
243 pub async fn get_state(&self) -> ConnectionState {
245 ConnectionState::from(self.state.load(Ordering::SeqCst))
246 }
247
248 pub async fn set_state(&self, state: ConnectionState) {
250 let state_id = match state {
251 ConnectionState::Status => -1,
252 ConnectionState::Handshake => 0,
253 ConnectionState::Login => 1,
254 ConnectionState::Configuration => 2,
255 ConnectionState::Play => 3,
256 };
257
258 self.state.store(state_id, Ordering::SeqCst);
259 }
260
261 pub async fn read_packet(&self) -> Option<ClientsidePacket> {
263 let mut reader = self.reader.lock().await;
264 reader.read_packet().await
265 }
266
267 pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
269 let mut reader = self.reader.lock().await;
270
271 if let Some(ClientsidePacket::Status(packet)) = reader.read_packet().await {
272 Some(packet)
273 } else {
274 None
275 }
276 }
277
278 pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
280 let mut reader = self.reader.lock().await;
281
282 if let Some(ClientsidePacket::Login(packet)) = reader.read_packet().await {
283 Some(packet)
284 } else {
285 None
286 }
287 }
288
289 pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
291 let mut reader = self.reader.lock().await;
292
293 if let Some(ClientsidePacket::Configuration(packet)) = reader.read_packet().await {
294 Some(packet)
295 } else {
296 None
297 }
298 }
299
300 pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
302 let mut reader = self.reader.lock().await;
303
304 if let Some(ClientsidePacket::Play(packet)) = reader.read_packet().await {
305 Some(packet)
306 } else {
307 None
308 }
309 }
310
311 pub async fn write_packet(&self, packet: ServersidePacket) -> std::io::Result<()> {
313 let mut writer = self.writer.lock().await;
314 writer.write_packet(packet).await
315 }
316
317 pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> std::io::Result<()> {
319 let mut writer = self.writer.lock().await;
320 writer.write_packet(ServersidePacket::Handshake(packet)).await
321 }
322
323 pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> std::io::Result<()> {
325 let mut writer = self.writer.lock().await;
326 writer.write_packet(ServersidePacket::Status(packet)).await
327 }
328
329 pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> std::io::Result<()> {
331 let mut writer = self.writer.lock().await;
332 writer.write_packet(ServersidePacket::Login(packet)).await
333 }
334
335 pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> std::io::Result<()> {
337 let mut writer = self.writer.lock().await;
338 writer.write_packet(ServersidePacket::Configuration(packet)).await
339 }
340
341 pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> std::io::Result<()> {
343 let mut writer = self.writer.lock().await;
344 writer.write_packet(ServersidePacket::Play(packet)).await
345 }
346
347 pub async fn shutdown(&self) -> std::io::Result<()> {
349 let mut writer = self.writer.lock().await;
350 writer.shutdown().await
351 }
352
353 pub async fn set_compression_threshold(&self, threshold: i32) {
355 self.compression_threshold.store(threshold, Ordering::SeqCst);
356 }
357
358 pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
361 let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
362
363 {
364 let reader = self.reader.lock().await;
365 *reader.decryptor.lock().await = Some(decryptor);
366 }
367
368 {
369 let writer = self.writer.lock().await;
370 *writer.encryptor.lock().await = Some(encryptor);
371 }
372 }
373}