Skip to main content

nurtex_protocol/connection/
connection.rs

1use std::fmt::Debug;
2use std::io::{self, Cursor, Error, ErrorKind};
3use std::sync::Arc;
4
5use nurtex_encrypt::{AesDecryptor, AesEncryptor};
6use nurtex_proxy::Proxy;
7use nurtex_proxy::result::ProxyResult;
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11use tokio::sync::{Mutex, RwLock};
12
13use crate::connection::reader::{deserialize_packet, read_raw_packet, try_read_raw_packet};
14use crate::connection::writer::{serialize_packet, write_raw_packet};
15use crate::packets::{
16  configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
17  handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
18  login::{ClientsideLoginPacket, ServersideLoginPacket},
19  play::{ClientsidePlayPacket, ServersidePlayPacket},
20  status::{ClientsideStatusPacket, ServersideStatusPacket},
21};
22
23/// Состояние подключения
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConnectionState {
26  Handshake,
27  Status,
28  Login,
29  Configuration,
30  Play,
31}
32
33/// Универсальное перечисление `Clientside` пакетов
34#[derive(Debug, Clone)]
35pub enum ClientsidePacket {
36  Handshake(ClientsideHandshakePacket),
37  Status(ClientsideStatusPacket),
38  Login(ClientsideLoginPacket),
39  Configuration(ClientsideConfigurationPacket),
40  Play(ClientsidePlayPacket),
41}
42
43/// Универсальное перечисление `Serverside` пакетов
44#[derive(Debug, Clone)]
45pub enum ServersidePacket {
46  Handshake(ServersideHandshakePacket),
47  Status(ServersideStatusPacket),
48  Login(ServersideLoginPacket),
49  Configuration(ServersideConfigurationPacket),
50  Play(ServersidePlayPacket),
51}
52
53impl ServersidePacket {
54  /// Вспомогательный метод создания `handshake` пакета
55  pub fn handshake(packet: ServersideHandshakePacket) -> Self {
56    ServersidePacket::Handshake(packet)
57  }
58
59  /// Вспомогательный метод создания `status` пакета
60  pub fn status(packet: ServersideStatusPacket) -> Self {
61    ServersidePacket::Status(packet)
62  }
63
64  //// Вспомогательный метод создания `login` пакета
65  pub fn login(packet: ServersideLoginPacket) -> Self {
66    ServersidePacket::Login(packet)
67  }
68
69  /// Вспомогательный метод создания `configuration` пакета
70  pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
71    ServersidePacket::Configuration(packet)
72  }
73
74  /// Вспомогательный метод создания `play` пакета
75  pub fn play(packet: ServersidePlayPacket) -> Self {
76    ServersidePacket::Play(packet)
77  }
78}
79
80/// Структура для чтения пакетов
81pub struct ConnectionReader {
82  read_stream: OwnedReadHalf,
83  buffer: Cursor<Vec<u8>>,
84  compression_threshold: Arc<RwLock<Option<u32>>>,
85  decryptor: Arc<Mutex<Option<AesDecryptor>>>,
86  state: Arc<RwLock<ConnectionState>>,
87}
88
89/// Структура для записи пакетов
90pub struct ConnectionWriter {
91  write_stream: OwnedWriteHalf,
92  compression_threshold: Arc<RwLock<Option<u32>>>,
93  encryptor: Arc<Mutex<Option<AesEncryptor>>>,
94}
95
96/// Основная структура подключения
97pub struct NurtexConnection {
98  reader: Arc<Mutex<ConnectionReader>>,
99  writer: Arc<Mutex<ConnectionWriter>>,
100  state: Arc<RwLock<ConnectionState>>,
101  compression_threshold: Arc<RwLock<Option<u32>>>,
102}
103
104impl ConnectionReader {
105  /// Метод чтения пакета
106  pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
107    let compression_threshold = *self.compression_threshold.read().await;
108    let mut decryptor_guard = self.decryptor.lock().await;
109
110    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
111
112    let mut cursor = Cursor::new(raw_packet.as_ref());
113    let state = *self.state.read().await;
114
115    match state {
116      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
117      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
118      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
119      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
120      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
121    }
122  }
123
124  /// Метод чтения пакета (неблокирующий)
125  pub fn try_read_packet(&mut self) -> Result<Option<ClientsidePacket>, std::io::Error> {
126    let compression_threshold = match self.compression_threshold.try_read() {
127      Ok(threshold) => *threshold,
128      Err(_) => return Ok(None),
129    };
130
131    let mut decryptor_guard = match self.decryptor.try_lock() {
132      Ok(guard) => guard,
133      Err(_) => return Ok(None),
134    };
135
136    let Some(raw_packet) = try_read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard)? else {
137      return Ok(None);
138    };
139
140    let mut cursor = Cursor::new(raw_packet.as_ref());
141    let state = match self.state.try_read() {
142      Ok(state) => *state,
143      Err(_) => return Ok(None),
144    };
145
146    let packet = match state {
147      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
148      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
149      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
150      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
151      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
152    };
153
154    Ok(packet)
155  }
156
157  /// Вспомогательный метод чтения `status` пакета
158  pub async fn read_status_packet(&mut self) -> Option<ClientsideStatusPacket> {
159    let compression_threshold = *self.compression_threshold.read().await;
160    let mut decryptor_guard = self.decryptor.lock().await;
161
162    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
163    let mut cursor = Cursor::new(raw_packet.as_ref());
164    deserialize_packet::<ClientsideStatusPacket>(&mut cursor)
165  }
166
167  /// Вспомогательный метод чтения `login` пакета
168  pub async fn read_login_packet(&mut self) -> Option<ClientsideLoginPacket> {
169    let compression_threshold = *self.compression_threshold.read().await;
170    let mut decryptor_guard = self.decryptor.lock().await;
171
172    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
173    let mut cursor = Cursor::new(raw_packet.as_ref());
174    deserialize_packet::<ClientsideLoginPacket>(&mut cursor)
175  }
176
177  /// Вспомогательный метод чтения `configuration` пакета
178  pub async fn read_configuration_packet(&mut self) -> Option<ClientsideConfigurationPacket> {
179    let compression_threshold = *self.compression_threshold.read().await;
180    let mut decryptor_guard = self.decryptor.lock().await;
181
182    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
183    let mut cursor = Cursor::new(raw_packet.as_ref());
184    deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor)
185  }
186
187  /// Вспомогательный метод чтения `play` пакета
188  pub async fn read_play_packet(&mut self) -> Option<ClientsidePlayPacket> {
189    let compression_threshold = *self.compression_threshold.read().await;
190    let mut decryptor_guard = self.decryptor.lock().await;
191
192    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
193    let mut cursor = Cursor::new(raw_packet.as_ref());
194    deserialize_packet::<ClientsidePlayPacket>(&mut cursor)
195  }
196}
197
198impl ConnectionWriter {
199  /// Метод записи пакета
200  pub async fn write_packet(&mut self, packet: ServersidePacket) -> io::Result<()> {
201    let serialized = match packet {
202      ServersidePacket::Handshake(p) => serialize_packet(&p),
203      ServersidePacket::Status(p) => serialize_packet(&p),
204      ServersidePacket::Login(p) => serialize_packet(&p),
205      ServersidePacket::Configuration(p) => serialize_packet(&p),
206      ServersidePacket::Play(p) => serialize_packet(&p),
207    }
208    .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to serialize packet"))?;
209
210    let compression_threshold = *self.compression_threshold.read().await;
211    let mut encryptor_guard = self.encryptor.lock().await;
212
213    write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
214  }
215
216  /// Вспомогательный метод записи `handshake` пакета
217  pub async fn write_handshake_packet(&mut self, packet: ServersideHandshakePacket) -> io::Result<()> {
218    self.write_packet(ServersidePacket::Handshake(packet)).await
219  }
220
221  /// Вспомогательный метод записи `status` пакета
222  pub async fn write_status_packet(&mut self, packet: ServersideStatusPacket) -> io::Result<()> {
223    self.write_packet(ServersidePacket::Status(packet)).await
224  }
225
226  /// Вспомогательный метод записи `login` пакета
227  pub async fn write_login_packet(&mut self, packet: ServersideLoginPacket) -> io::Result<()> {
228    self.write_packet(ServersidePacket::Login(packet)).await
229  }
230
231  /// Вспомогательный метод записи `configuration` пакета
232  pub async fn write_configuration_packet(&mut self, packet: ServersideConfigurationPacket) -> io::Result<()> {
233    self.write_packet(ServersidePacket::Configuration(packet)).await
234  }
235
236  /// Вспомогательный метод записи `play` пакета
237  pub async fn write_play_packet(&mut self, packet: ServersidePlayPacket) -> io::Result<()> {
238    self.write_packet(ServersidePacket::Play(packet)).await
239  }
240
241  /// Метод выключения потока записи
242  pub async fn shutdown(&mut self) -> io::Result<()> {
243    self.write_stream.shutdown().await
244  }
245}
246
247impl NurtexConnection {
248  /// Метод создания нового подключения
249  pub async fn new(server_host: impl Into<String>, server_port: u16) -> io::Result<Self> {
250    let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
251    stream.set_nodelay(true)?;
252    Self::new_from_stream(stream).await
253  }
254
255  /// Метод создания нового подключения с прокси
256  pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> io::Result<Self> {
257    proxy.bind(server_host.into(), server_port);
258
259    let stream = match proxy.connect().await {
260      ProxyResult::Ok(s) => s,
261      ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
262    };
263
264    stream.set_nodelay(true)?;
265
266    Self::new_from_stream(stream).await
267  }
268
269  /// Метод создания нового подключения из TcpStream
270  pub async fn new_from_stream(stream: TcpStream) -> io::Result<Self> {
271    let (read_stream, write_stream) = stream.into_split();
272
273    let state = Arc::new(RwLock::new(ConnectionState::Handshake));
274    let compression_threshold = Arc::new(RwLock::new(None));
275
276    let reader = ConnectionReader {
277      read_stream,
278      buffer: Cursor::new(Vec::new()),
279      compression_threshold: Arc::clone(&compression_threshold),
280      decryptor: Arc::new(Mutex::new(None)),
281      state: Arc::clone(&state),
282    };
283
284    let writer = ConnectionWriter {
285      write_stream,
286      compression_threshold: Arc::clone(&compression_threshold),
287      encryptor: Arc::new(Mutex::new(None)),
288    };
289
290    Ok(NurtexConnection {
291      reader: Arc::new(Mutex::new(reader)),
292      writer: Arc::new(Mutex::new(writer)),
293      state,
294      compression_threshold,
295    })
296  }
297
298  /// Метод получения `reader`
299  pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
300    self.reader.clone()
301  }
302
303  /// Метод получения `writer`
304  pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
305    self.writer.clone()
306  }
307
308  /// Метод получения текущего состояния подключения
309  pub async fn get_state(&self) -> ConnectionState {
310    *self.state.read().await
311  }
312
313  /// Метод изменения состояния подключения
314  pub async fn set_state(&self, state: ConnectionState) {
315    *self.state.write().await = state;
316  }
317
318  /// Вспомогательный метод чтения пакета
319  pub async fn read_packet(&self) -> Option<ClientsidePacket> {
320    let mut reader = self.reader.lock().await;
321    reader.read_packet().await
322  }
323
324  /// Вспомогательный метод чтения пакета (неблокирующий)
325  pub fn try_read_packet(&self) -> Result<Option<ClientsidePacket>, std::io::Error> {
326    let mut reader = match self.reader.try_lock() {
327      Ok(reader) => reader,
328      Err(_) => return Ok(None),
329    };
330
331    reader.try_read_packet()
332  }
333
334  /// Вспомогательный метод чтения `status` пакета
335  pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
336    let mut reader = self.reader.lock().await;
337    reader.read_status_packet().await
338  }
339
340  /// Вспомогательный метод чтения `login` пакета
341  pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
342    let mut reader = self.reader.lock().await;
343    reader.read_login_packet().await
344  }
345
346  /// Вспомогательный метод чтения `configuration` пакета
347  pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
348    let mut reader = self.reader.lock().await;
349    reader.read_configuration_packet().await
350  }
351
352  /// Вспомогательный метод чтения `play` пакета
353  pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
354    let mut reader = self.reader.lock().await;
355    reader.read_play_packet().await
356  }
357
358  /// Вспомогательный метод записи пакета
359  pub async fn write_packet(&self, packet: ServersidePacket) -> io::Result<()> {
360    let mut writer = self.writer.lock().await;
361    writer.write_packet(packet).await
362  }
363
364  /// Вспомогательный метод записи `handshake` пакета
365  pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> io::Result<()> {
366    let mut writer = self.writer.lock().await;
367    writer.write_handshake_packet(packet).await
368  }
369
370  /// Вспомогательный метод записи `status` пакета
371  pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> io::Result<()> {
372    let mut writer = self.writer.lock().await;
373    writer.write_status_packet(packet).await
374  }
375
376  /// Вспомогательный метод записи `login` пакета
377  pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> io::Result<()> {
378    let mut writer = self.writer.lock().await;
379    writer.write_login_packet(packet).await
380  }
381
382  /// Вспомогательный метод записи `configuration` пакета
383  pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> io::Result<()> {
384    let mut writer = self.writer.lock().await;
385    writer.write_configuration_packet(packet).await
386  }
387
388  /// Вспомогательный метод записи `play` пакета
389  pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> io::Result<()> {
390    let mut writer = self.writer.lock().await;
391    writer.write_play_packet(packet).await
392  }
393
394  /// Метод выключения соединения
395  pub async fn shutdown(&self) -> io::Result<()> {
396    let mut writer = self.writer.lock().await;
397    writer.shutdown().await
398  }
399
400  /// Метод установки порога сжатия
401  pub async fn set_compression_threshold(&self, threshold: i32) {
402    let new_threshold = if threshold >= 0 { Some(threshold as u32) } else { None };
403
404    *self.compression_threshold.write().await = new_threshold;
405  }
406
407  /// Устанавливает шифрование на соединении используя секретный ключ.
408  /// Этот метод должен быть вызван **после** отправки `EncryptionResponse` серверу
409  pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
410    let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
411
412    {
413      let reader = self.reader.lock().await;
414      *reader.decryptor.lock().await = Some(decryptor);
415    }
416
417    {
418      let writer = self.writer.lock().await;
419      *writer.encryptor.lock().await = Some(encryptor);
420    }
421  }
422}