Skip to main content

nurtex_protocol/connection/
connection.rs

1use std::fmt::Debug;
2use std::io::{Cursor, Error, ErrorKind};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicI8, AtomicI32, Ordering};
5
6use nurtex_encrypt::{AesDecryptor, AesEncryptor};
7use nurtex_proxy::Proxy;
8use nurtex_proxy::result::ProxyResult;
9use tokio::io::AsyncWriteExt;
10use tokio::net::TcpStream;
11use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
12use tokio::sync::Mutex;
13
14use crate::connection::reader::{deserialize_packet, read_raw_packet};
15use crate::connection::writer::{serialize_packet, write_raw_packet};
16use crate::packets::{
17  configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
18  handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
19  login::{ClientsideLoginPacket, ServersideLoginPacket},
20  play::{ClientsidePlayPacket, ServersidePlayPacket},
21  status::{ClientsideStatusPacket, ServersideStatusPacket},
22};
23
24/// Состояние подключения
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum ConnectionState {
27  Handshake,
28  Status,
29  Login,
30  Configuration,
31  Play,
32}
33
34impl From<i8> for ConnectionState {
35  fn from(value: i8) -> Self {
36    match value {
37      -1 => Self::Status,
38      0 => Self::Handshake,
39      1 => Self::Login,
40      2 => Self::Configuration,
41      3 => Self::Play,
42      _ => Self::Handshake,
43    }
44  }
45}
46
47/// Универсальное перечисление `Clientside` пакетов
48#[derive(Debug, Clone)]
49pub enum ClientsidePacket {
50  Handshake(ClientsideHandshakePacket),
51  Status(ClientsideStatusPacket),
52  Login(ClientsideLoginPacket),
53  Configuration(ClientsideConfigurationPacket),
54  Play(ClientsidePlayPacket),
55}
56
57/// Универсальное перечисление `Serverside` пакетов
58#[derive(Debug, Clone)]
59pub enum ServersidePacket {
60  Handshake(ServersideHandshakePacket),
61  Status(ServersideStatusPacket),
62  Login(ServersideLoginPacket),
63  Configuration(ServersideConfigurationPacket),
64  Play(ServersidePlayPacket),
65}
66
67impl ServersidePacket {
68  /// Вспомогательный метод создания `handshake` пакета
69  pub fn handshake(packet: ServersideHandshakePacket) -> Self {
70    ServersidePacket::Handshake(packet)
71  }
72
73  /// Вспомогательный метод создания `status` пакета
74  pub fn status(packet: ServersideStatusPacket) -> Self {
75    ServersidePacket::Status(packet)
76  }
77
78  //// Вспомогательный метод создания `login` пакета
79  pub fn login(packet: ServersideLoginPacket) -> Self {
80    ServersidePacket::Login(packet)
81  }
82
83  /// Вспомогательный метод создания `configuration` пакета
84  pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
85    ServersidePacket::Configuration(packet)
86  }
87
88  /// Вспомогательный метод создания `play` пакета
89  pub fn play(packet: ServersidePlayPacket) -> Self {
90    ServersidePacket::Play(packet)
91  }
92}
93
94/// Структура для чтения пакетов
95pub struct ConnectionReader {
96  /// Специальная половина `TcpStream` для чтения пакетов
97  read_stream: OwnedReadHalf,
98
99  /// Текущий буффер данных
100  buffer: Cursor<Vec<u8>>,
101
102  /// Декодировщик данных
103  decryptor: Arc<Mutex<Option<AesDecryptor>>>,
104
105  /// Состояние подключения
106  state: Arc<AtomicI8>,
107
108  /// Порог сжатия (от 0 до 1024), изначально -1
109  compression_threshold: Arc<AtomicI32>,
110}
111
112/// Структура для записи пакетов
113pub struct ConnectionWriter {
114  /// Специальная половина `TcpStream` для записи пакетов
115  write_stream: OwnedWriteHalf,
116
117  /// Кодировщик данных
118  encryptor: Arc<Mutex<Option<AesEncryptor>>>,
119
120  /// Порог сжатия (от 0 до 1024), изначально -1
121  compression_threshold: Arc<AtomicI32>,
122}
123
124/// Основная структура подключения
125pub struct NurtexConnection {
126  /// Чтение пакетов
127  reader: Arc<Mutex<ConnectionReader>>,
128
129  /// Запись пакетов
130  writer: Arc<Mutex<ConnectionWriter>>,
131
132  /// Состояние подключения
133  state: Arc<AtomicI8>,
134
135  /// Порог сжатия (от 0 до 1024), изначально -1
136  compression_threshold: Arc<AtomicI32>,
137}
138
139impl ConnectionReader {
140  /// Метод чтения пакета
141  pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
142    let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
143    let state = ConnectionState::from(self.state.load(Ordering::SeqCst));
144    let mut decryptor_guard = self.decryptor.lock().await;
145
146    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
147    let mut cursor = Cursor::new(raw_packet.as_ref());
148
149    match state {
150      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
151      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
152      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
153      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
154      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
155    }
156  }
157}
158
159impl ConnectionWriter {
160  /// Метод записи пакета
161  pub async fn write_packet(&mut self, packet: ServersidePacket) -> std::io::Result<()> {
162    let serialized = match packet {
163      ServersidePacket::Handshake(p) => serialize_packet(&p),
164      ServersidePacket::Status(p) => serialize_packet(&p),
165      ServersidePacket::Login(p) => serialize_packet(&p),
166      ServersidePacket::Configuration(p) => serialize_packet(&p),
167      ServersidePacket::Play(p) => serialize_packet(&p),
168    }
169    .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
170
171    let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
172    let mut encryptor_guard = self.encryptor.lock().await;
173
174    write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
175  }
176  /// Метод выключения потока записи
177  pub async fn shutdown(&mut self) -> std::io::Result<()> {
178    self.write_stream.shutdown().await
179  }
180}
181
182impl NurtexConnection {
183  /// Метод создания нового подключения
184  pub async fn new(server_host: impl Into<String>, server_port: u16) -> std::io::Result<Self> {
185    let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
186    stream.set_nodelay(true)?;
187    Self::new_from_stream(stream).await
188  }
189
190  /// Метод создания нового подключения с прокси
191  pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> std::io::Result<Self> {
192    let stream = match proxy.connect(server_host, server_port).await {
193      ProxyResult::Ok(s) => s,
194      ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
195    };
196
197    stream.set_nodelay(true)?;
198
199    Self::new_from_stream(stream).await
200  }
201
202  /// Метод создания нового подключения из TcpStream
203  pub async fn new_from_stream(stream: TcpStream) -> std::io::Result<Self> {
204    let (rh, wh) = stream.into_split();
205
206    // Изначально хэндшейк
207    let state = Arc::new(AtomicI8::new(0));
208
209    // Изначально отрицательный (не указан)
210    let compression_threshold = Arc::new(AtomicI32::new(-1));
211
212    let reader = ConnectionReader {
213      read_stream: rh,
214      buffer: Cursor::new(Vec::new()),
215      compression_threshold: Arc::clone(&compression_threshold),
216      decryptor: Arc::new(Mutex::new(None)),
217      state: Arc::clone(&state),
218    };
219
220    let writer = ConnectionWriter {
221      write_stream: wh,
222      compression_threshold: Arc::clone(&compression_threshold),
223      encryptor: Arc::new(Mutex::new(None)),
224    };
225
226    Ok(NurtexConnection {
227      reader: Arc::new(Mutex::new(reader)),
228      writer: Arc::new(Mutex::new(writer)),
229      state: state,
230      compression_threshold,
231    })
232  }
233
234  /// Метод получения `reader`
235  pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
236    self.reader.clone()
237  }
238
239  /// Метод получения `writer`
240  pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
241    self.writer.clone()
242  }
243
244  /// Метод получения текущего состояния подключения
245  pub async fn get_state(&self) -> ConnectionState {
246    ConnectionState::from(self.state.load(Ordering::SeqCst))
247  }
248
249  /// Метод изменения состояния подключения
250  pub async fn set_state(&self, state: ConnectionState) {
251    let state_id = match state {
252      ConnectionState::Status => -1,
253      ConnectionState::Handshake => 0,
254      ConnectionState::Login => 1,
255      ConnectionState::Configuration => 2,
256      ConnectionState::Play => 3,
257    };
258
259    self.state.store(state_id, Ordering::SeqCst);
260  }
261
262  /// Вспомогательный метод чтения пакета
263  pub async fn read_packet(&self) -> Option<ClientsidePacket> {
264    let mut reader = self.reader.lock().await;
265    reader.read_packet().await
266  }
267
268  /// Вспомогательный метод чтения `status` пакета
269  pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
270    let mut reader = self.reader.lock().await;
271
272    if let Some(ClientsidePacket::Status(packet)) = reader.read_packet().await {
273      Some(packet)
274    } else {
275      None
276    }
277  }
278
279  /// Вспомогательный метод чтения `login` пакета
280  pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
281    let mut reader = self.reader.lock().await;
282
283    if let Some(ClientsidePacket::Login(packet)) = reader.read_packet().await {
284      Some(packet)
285    } else {
286      None
287    }
288  }
289
290  /// Вспомогательный метод чтения `configuration` пакета
291  pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
292    let mut reader = self.reader.lock().await;
293
294    if let Some(ClientsidePacket::Configuration(packet)) = reader.read_packet().await {
295      Some(packet)
296    } else {
297      None
298    }
299  }
300
301  /// Вспомогательный метод чтения `play` пакета
302  pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
303    let mut reader = self.reader.lock().await;
304
305    if let Some(ClientsidePacket::Play(packet)) = reader.read_packet().await {
306      Some(packet)
307    } else {
308      None
309    }
310  }
311
312  /// Вспомогательный метод записи пакета
313  pub async fn write_packet(&self, packet: ServersidePacket) -> std::io::Result<()> {
314    let mut writer = self.writer.lock().await;
315    writer.write_packet(packet).await
316  }
317
318  /// Вспомогательный метод записи `handshake` пакета
319  pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> std::io::Result<()> {
320    let mut writer = self.writer.lock().await;
321    writer.write_packet(ServersidePacket::Handshake(packet)).await
322  }
323
324  /// Вспомогательный метод записи `status` пакета
325  pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> std::io::Result<()> {
326    let mut writer = self.writer.lock().await;
327    writer.write_packet(ServersidePacket::Status(packet)).await
328  }
329
330  /// Вспомогательный метод записи `login` пакета
331  pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> std::io::Result<()> {
332    let mut writer = self.writer.lock().await;
333    writer.write_packet(ServersidePacket::Login(packet)).await
334  }
335
336  /// Вспомогательный метод записи `configuration` пакета
337  pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> std::io::Result<()> {
338    let mut writer = self.writer.lock().await;
339    writer.write_packet(ServersidePacket::Configuration(packet)).await
340  }
341
342  /// Вспомогательный метод записи `play` пакета
343  pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> std::io::Result<()> {
344    let mut writer = self.writer.lock().await;
345    writer.write_packet(ServersidePacket::Play(packet)).await
346  }
347
348  /// Метод выключения соединения
349  pub async fn shutdown(&self) -> std::io::Result<()> {
350    let mut writer = self.writer.lock().await;
351    writer.shutdown().await
352  }
353
354  /// Метод установки порога сжатия
355  pub async fn set_compression_threshold(&self, threshold: i32) {
356    self.compression_threshold.store(threshold, Ordering::SeqCst);
357  }
358
359  /// Устанавливает шифрование на соединении используя секретный ключ.
360  /// Этот метод должен быть вызван **после** отправки `EncryptionResponse` серверу
361  pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
362    let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
363
364    {
365      let reader = self.reader.lock().await;
366      *reader.decryptor.lock().await = Some(decryptor);
367    }
368
369    {
370      let writer = self.writer.lock().await;
371      *writer.encryptor.lock().await = Some(encryptor);
372    }
373  }
374}