Skip to main content

nurtex_protocol/connection/
connection.rs

1use std::fmt::Debug;
2use std::io::{self, Cursor};
3use std::sync::Arc;
4
5use nurtex_encrypt::{AesDecryptor, AesEncryptor};
6use tokio::io::AsyncWriteExt;
7use tokio::net::TcpStream;
8use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
9use tokio::sync::{Mutex, RwLock};
10
11use crate::connection::address::NurtexAddr;
12use crate::connection::reader::{deserialize_packet, read_raw_packet, try_read_raw_packet};
13use crate::connection::writer::{serialize_packet, write_raw_packet};
14use crate::packets::{
15  configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
16  handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
17  login::{ClientsideLoginPacket, ServersideLoginPacket},
18  play::{ClientsidePlayPacket, ServersidePlayPacket},
19  status::{ClientsideStatusPacket, ServersideStatusPacket},
20};
21
22/// Состояние подключения
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum ConnectionState {
25  Handshake,
26  Status,
27  Login,
28  Configuration,
29  Play,
30}
31
32/// Универсальное перечисление `Clientside` пакетов
33#[derive(Debug, Clone)]
34pub enum ClientsidePacket {
35  Handshake(ClientsideHandshakePacket),
36  Status(ClientsideStatusPacket),
37  Login(ClientsideLoginPacket),
38  Configuration(ClientsideConfigurationPacket),
39  Play(ClientsidePlayPacket),
40}
41
42/// Универсальное перечисление `Serverside` пакетов
43#[derive(Debug, Clone)]
44pub enum ServersidePacket {
45  Handshake(ServersideHandshakePacket),
46  Status(ServersideStatusPacket),
47  Login(ServersideLoginPacket),
48  Configuration(ServersideConfigurationPacket),
49  Play(ServersidePlayPacket),
50}
51
52impl ServersidePacket {
53  /// Вспомогательный метод создания `handshake` пакета
54  pub fn handshake(packet: ServersideHandshakePacket) -> Self {
55    ServersidePacket::Handshake(packet)
56  }
57
58  /// Вспомогательный метод создания `status` пакета
59  pub fn status(packet: ServersideStatusPacket) -> Self {
60    ServersidePacket::Status(packet)
61  }
62
63  //// Вспомогательный метод создания `login` пакета
64  pub fn login(packet: ServersideLoginPacket) -> Self {
65    ServersidePacket::Login(packet)
66  }
67
68  /// Вспомогательный метод создания `configuration` пакета
69  pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
70    ServersidePacket::Configuration(packet)
71  }
72
73  /// Вспомогательный метод создания `play` пакета
74  pub fn play(packet: ServersidePlayPacket) -> Self {
75    ServersidePacket::Play(packet)
76  }
77}
78
79/// Структура для чтения пакетов
80pub struct ConnectionReader {
81  read_stream: OwnedReadHalf,
82  buffer: Cursor<Vec<u8>>,
83  compression_threshold: Arc<RwLock<Option<u32>>>,
84  decryptor: Arc<Mutex<Option<AesDecryptor>>>,
85  state: Arc<RwLock<ConnectionState>>,
86}
87
88/// Структура для записи пакетов
89pub struct ConnectionWriter {
90  write_stream: OwnedWriteHalf,
91  compression_threshold: Arc<RwLock<Option<u32>>>,
92  encryptor: Arc<Mutex<Option<AesEncryptor>>>,
93}
94
95/// Основная структура подключения
96pub struct NurtexConnection {
97  reader: Arc<Mutex<ConnectionReader>>,
98  writer: Arc<Mutex<ConnectionWriter>>,
99  state: Arc<RwLock<ConnectionState>>,
100  compression_threshold: Arc<RwLock<Option<u32>>>,
101}
102
103impl ConnectionReader {
104  /// Метод чтения пакета
105  pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
106    let compression_threshold = *self.compression_threshold.read().await;
107    let mut decryptor_guard = self.decryptor.lock().await;
108
109    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
110
111    let mut cursor = Cursor::new(raw_packet.as_ref());
112    let state = *self.state.read().await;
113
114    match state {
115      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
116      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
117      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
118      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
119      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
120    }
121  }
122
123  /// Метод чтения пакета (неблокирующий)
124  pub fn try_read_packet(&mut self) -> Result<Option<ClientsidePacket>, std::io::Error> {
125    let compression_threshold = match self.compression_threshold.try_read() {
126      Ok(threshold) => *threshold,
127      Err(_) => return Ok(None),
128    };
129
130    let mut decryptor_guard = match self.decryptor.try_lock() {
131      Ok(guard) => guard,
132      Err(_) => return Ok(None),
133    };
134
135    let Some(raw_packet) = try_read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard)? else {
136      return Ok(None);
137    };
138
139    let mut cursor = Cursor::new(raw_packet.as_ref());
140    let state = match self.state.try_read() {
141      Ok(state) => *state,
142      Err(_) => return Ok(None),
143    };
144
145    let packet = match state {
146      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
147      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
148      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
149      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
150      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
151    };
152
153    Ok(packet)
154  }
155
156  /// Метод проверки активности TCP соединения
157  pub fn is_connection_alive(&self) -> bool {
158    match self.read_stream.peer_addr() {
159      Ok(_) => true,
160      Err(_) => false,
161    }
162  }
163
164  /// Вспомогательный метод чтения `status` пакета
165  pub async fn read_status_packet(&mut self) -> Option<ClientsideStatusPacket> {
166    let compression_threshold = *self.compression_threshold.read().await;
167    let mut decryptor_guard = self.decryptor.lock().await;
168
169    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
170    let mut cursor = Cursor::new(raw_packet.as_ref());
171    deserialize_packet::<ClientsideStatusPacket>(&mut cursor)
172  }
173
174  /// Вспомогательный метод чтения `login` пакета
175  pub async fn read_login_packet(&mut self) -> Option<ClientsideLoginPacket> {
176    let compression_threshold = *self.compression_threshold.read().await;
177    let mut decryptor_guard = self.decryptor.lock().await;
178
179    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
180    let mut cursor = Cursor::new(raw_packet.as_ref());
181    deserialize_packet::<ClientsideLoginPacket>(&mut cursor)
182  }
183
184  /// Вспомогательный метод чтения `configuration` пакета
185  pub async fn read_configuration_packet(&mut self) -> Option<ClientsideConfigurationPacket> {
186    let compression_threshold = *self.compression_threshold.read().await;
187    let mut decryptor_guard = self.decryptor.lock().await;
188
189    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
190    let mut cursor = Cursor::new(raw_packet.as_ref());
191    deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor)
192  }
193
194  /// Вспомогательный метод чтения `play` пакета
195  pub async fn read_play_packet(&mut self) -> Option<ClientsidePlayPacket> {
196    let compression_threshold = *self.compression_threshold.read().await;
197    let mut decryptor_guard = self.decryptor.lock().await;
198
199    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
200    let mut cursor = Cursor::new(raw_packet.as_ref());
201    deserialize_packet::<ClientsidePlayPacket>(&mut cursor)
202  }
203}
204
205impl ConnectionWriter {
206  /// Метод записи пакета
207  pub async fn write_packet(&mut self, packet: ServersidePacket) -> io::Result<()> {
208    let serialized = match packet {
209      ServersidePacket::Handshake(p) => serialize_packet(&p),
210      ServersidePacket::Status(p) => serialize_packet(&p),
211      ServersidePacket::Login(p) => serialize_packet(&p),
212      ServersidePacket::Configuration(p) => serialize_packet(&p),
213      ServersidePacket::Play(p) => serialize_packet(&p),
214    }
215    .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
216
217    let compression_threshold = *self.compression_threshold.read().await;
218    let mut encryptor_guard = self.encryptor.lock().await;
219
220    write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
221  }
222
223  /// Вспомогательный метод записи `handshake` пакета
224  pub async fn write_handshake_packet(&mut self, packet: ServersideHandshakePacket) -> io::Result<()> {
225    self.write_packet(ServersidePacket::Handshake(packet)).await
226  }
227
228  /// Вспомогательный метод записи `status` пакета
229  pub async fn write_status_packet(&mut self, packet: ServersideStatusPacket) -> io::Result<()> {
230    self.write_packet(ServersidePacket::Status(packet)).await
231  }
232
233  /// Вспомогательный метод записи `login` пакета
234  pub async fn write_login_packet(&mut self, packet: ServersideLoginPacket) -> io::Result<()> {
235    self.write_packet(ServersidePacket::Login(packet)).await
236  }
237
238  /// Вспомогательный метод записи `configuration` пакета
239  pub async fn write_configuration_packet(&mut self, packet: ServersideConfigurationPacket) -> io::Result<()> {
240    self.write_packet(ServersidePacket::Configuration(packet)).await
241  }
242
243  /// Вспомогательный метод записи `play` пакета
244  pub async fn write_play_packet(&mut self, packet: ServersidePlayPacket) -> io::Result<()> {
245    self.write_packet(ServersidePacket::Play(packet)).await
246  }
247
248  /// Метод выключения потока записи
249  pub async fn shutdown(&mut self) -> io::Result<()> {
250    self.write_stream.shutdown().await
251  }
252}
253
254impl NurtexConnection {
255  /// Метод создания нового подключения
256  pub async fn new(address: &NurtexAddr) -> io::Result<Self> {
257    let stream = TcpStream::connect(address.unpack()).await?;
258    stream.set_nodelay(true)?;
259    Self::new_from_stream(stream).await
260  }
261
262  /// Метод создания нового подключения из TcpStream
263  pub async fn new_from_stream(stream: TcpStream) -> io::Result<Self> {
264    let (read_stream, write_stream) = stream.into_split();
265
266    let state = Arc::new(RwLock::new(ConnectionState::Handshake));
267    let compression_threshold = Arc::new(RwLock::new(None));
268
269    let reader = ConnectionReader {
270      read_stream,
271      buffer: Cursor::new(Vec::new()),
272      compression_threshold: Arc::clone(&compression_threshold),
273      decryptor: Arc::new(Mutex::new(None)),
274      state: Arc::clone(&state),
275    };
276
277    let writer = ConnectionWriter {
278      write_stream,
279      compression_threshold: Arc::clone(&compression_threshold),
280      encryptor: Arc::new(Mutex::new(None)),
281    };
282
283    Ok(NurtexConnection {
284      reader: Arc::new(Mutex::new(reader)),
285      writer: Arc::new(Mutex::new(writer)),
286      state,
287      compression_threshold,
288    })
289  }
290
291  /// Метод получения `reader`
292  pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
293    self.reader.clone()
294  }
295
296  /// Метод получения `writer`
297  pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
298    self.writer.clone()
299  }
300
301  /// Метод получения текущего состояния подключения
302  pub async fn get_state(&self) -> ConnectionState {
303    *self.state.read().await
304  }
305
306  /// Метод изменения состояния подключения
307  pub async fn set_state(&self, state: ConnectionState) {
308    *self.state.write().await = state;
309  }
310
311  /// Вспомогательный метод чтения пакета
312  pub async fn read_packet(&self) -> Option<ClientsidePacket> {
313    let mut reader = self.reader.lock().await;
314    reader.read_packet().await
315  }
316
317  /// Вспомогательный метод чтения пакета (неблокирующий)
318  pub fn try_read_packet(&self) -> Result<Option<ClientsidePacket>, std::io::Error> {
319    let mut reader = match self.reader.try_lock() {
320      Ok(reader) => reader,
321      Err(_) => return Ok(None),
322    };
323
324    reader.try_read_packet()
325  }
326
327  /// Метод проверки активности TCP соединения
328  pub fn is_connection_alive(&self) -> bool {
329    let reader = match self.reader.try_lock() {
330      Ok(reader) => reader,
331      Err(_) => return true,
332    };
333
334    reader.is_connection_alive()
335  }
336
337  /// Вспомогательный метод чтения `status` пакета
338  pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
339    let mut reader = self.reader.lock().await;
340    reader.read_status_packet().await
341  }
342
343  /// Вспомогательный метод чтения `login` пакета
344  pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
345    let mut reader = self.reader.lock().await;
346    reader.read_login_packet().await
347  }
348
349  /// Вспомогательный метод чтения `configuration` пакета
350  pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
351    let mut reader = self.reader.lock().await;
352    reader.read_configuration_packet().await
353  }
354
355  /// Вспомогательный метод чтения `play` пакета
356  pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
357    let mut reader = self.reader.lock().await;
358    reader.read_play_packet().await
359  }
360
361  /// Вспомогательный метод записи пакета
362  pub async fn write_packet(&self, packet: ServersidePacket) -> io::Result<()> {
363    let mut writer = self.writer.lock().await;
364    writer.write_packet(packet).await
365  }
366
367  /// Вспомогательный метод записи `handshake` пакета
368  pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> io::Result<()> {
369    let mut writer = self.writer.lock().await;
370    writer.write_handshake_packet(packet).await
371  }
372
373  /// Вспомогательный метод записи `status` пакета
374  pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> io::Result<()> {
375    let mut writer = self.writer.lock().await;
376    writer.write_status_packet(packet).await
377  }
378
379  /// Вспомогательный метод записи `login` пакета
380  pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> io::Result<()> {
381    let mut writer = self.writer.lock().await;
382    writer.write_login_packet(packet).await
383  }
384
385  /// Вспомогательный метод записи `configuration` пакета
386  pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> io::Result<()> {
387    let mut writer = self.writer.lock().await;
388    writer.write_configuration_packet(packet).await
389  }
390
391  /// Вспомогательный метод записи `play` пакета
392  pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> io::Result<()> {
393    let mut writer = self.writer.lock().await;
394    writer.write_play_packet(packet).await
395  }
396
397  /// Метод выключения соединения
398  pub async fn shutdown(&self) -> io::Result<()> {
399    let mut writer = self.writer.lock().await;
400    writer.shutdown().await
401  }
402
403  /// Метод установки порога сжатия
404  pub async fn set_compression_threshold(&self, threshold: i32) {
405    let new_threshold = if threshold >= 0 { Some(threshold as u32) } else { None };
406
407    *self.compression_threshold.write().await = new_threshold;
408  }
409
410  /// Устанавливает шифрование на соединении используя секретный ключ.
411  /// Этот метод должен быть вызван **после** отправки `EncryptionResponse` серверу
412  pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
413    let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
414
415    {
416      let reader = self.reader.lock().await;
417      *reader.decryptor.lock().await = Some(decryptor);
418    }
419
420    {
421      let writer = self.writer.lock().await;
422      *writer.encryptor.lock().await = Some(encryptor);
423    }
424  }
425}