Skip to main content

nurtex_protocol/connection/
connection.rs

1use std::fmt::Debug;
2use std::io::{Cursor, Error, ErrorKind};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicI8, AtomicI32, Ordering};
5
6use nurtex_encrypt::{AesDecryptor, AesEncryptor};
7use nurtex_proxy::{Proxy, ProxyResult};
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
11use tokio::sync::Mutex;
12
13use crate::connection::reader::{deserialize_packet, read_raw_packet};
14use crate::connection::writer::{serialize_packet, write_raw_packet};
15use crate::packets::{
16  configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
17  handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
18  login::{ClientsideLoginPacket, ServersideLoginPacket},
19  play::{ClientsidePlayPacket, ServersidePlayPacket},
20  status::{ClientsideStatusPacket, ServersideStatusPacket},
21};
22
23/// Состояние подключения
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConnectionState {
26  Handshake,
27  Status,
28  Login,
29  Configuration,
30  Play,
31}
32
33impl From<i8> for ConnectionState {
34  fn from(value: i8) -> Self {
35    match value {
36      -1 => Self::Status,
37      0 => Self::Handshake,
38      1 => Self::Login,
39      2 => Self::Configuration,
40      3 => Self::Play,
41      _ => Self::Handshake,
42    }
43  }
44}
45
46/// Универсальное перечисление `Clientside` пакетов
47#[derive(Debug, Clone)]
48pub enum ClientsidePacket {
49  Handshake(ClientsideHandshakePacket),
50  Status(ClientsideStatusPacket),
51  Login(ClientsideLoginPacket),
52  Configuration(ClientsideConfigurationPacket),
53  Play(ClientsidePlayPacket),
54}
55
56/// Универсальное перечисление `Serverside` пакетов
57#[derive(Debug, Clone)]
58pub enum ServersidePacket {
59  Handshake(ServersideHandshakePacket),
60  Status(ServersideStatusPacket),
61  Login(ServersideLoginPacket),
62  Configuration(ServersideConfigurationPacket),
63  Play(ServersidePlayPacket),
64}
65
66impl ServersidePacket {
67  /// Вспомогательный метод создания `handshake` пакета
68  pub fn handshake(packet: ServersideHandshakePacket) -> Self {
69    ServersidePacket::Handshake(packet)
70  }
71
72  /// Вспомогательный метод создания `status` пакета
73  pub fn status(packet: ServersideStatusPacket) -> Self {
74    ServersidePacket::Status(packet)
75  }
76
77  //// Вспомогательный метод создания `login` пакета
78  pub fn login(packet: ServersideLoginPacket) -> Self {
79    ServersidePacket::Login(packet)
80  }
81
82  /// Вспомогательный метод создания `configuration` пакета
83  pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
84    ServersidePacket::Configuration(packet)
85  }
86
87  /// Вспомогательный метод создания `play` пакета
88  pub fn play(packet: ServersidePlayPacket) -> Self {
89    ServersidePacket::Play(packet)
90  }
91}
92
93/// Структура для чтения пакетов
94pub struct ConnectionReader {
95  /// Специальная половина `TcpStream` для чтения пакетов
96  read_stream: OwnedReadHalf,
97
98  /// Текущий буффер данных
99  buffer: Cursor<Vec<u8>>,
100
101  /// Декодировщик данных
102  decryptor: Arc<Mutex<Option<AesDecryptor>>>,
103
104  /// Состояние подключения
105  state: Arc<AtomicI8>,
106
107  /// Порог сжатия (от 0 до 1024), изначально -1
108  compression_threshold: Arc<AtomicI32>,
109}
110
111/// Структура для записи пакетов
112pub struct ConnectionWriter {
113  /// Специальная половина `TcpStream` для записи пакетов
114  write_stream: OwnedWriteHalf,
115
116  /// Кодировщик данных
117  encryptor: Arc<Mutex<Option<AesEncryptor>>>,
118
119  /// Порог сжатия (от 0 до 1024), изначально -1
120  compression_threshold: Arc<AtomicI32>,
121}
122
123/// Основная структура подключения
124pub struct NurtexConnection {
125  /// Чтение пакетов
126  reader: Arc<Mutex<ConnectionReader>>,
127
128  /// Запись пакетов
129  writer: Arc<Mutex<ConnectionWriter>>,
130
131  /// Состояние подключения
132  state: Arc<AtomicI8>,
133
134  /// Порог сжатия (от 0 до 1024), изначально -1
135  compression_threshold: Arc<AtomicI32>,
136}
137
138impl ConnectionReader {
139  /// Метод чтения пакета
140  pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
141    let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
142    let state = ConnectionState::from(self.state.load(Ordering::SeqCst));
143    let mut decryptor_guard = self.decryptor.lock().await;
144
145    let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
146    let mut cursor = Cursor::new(raw_packet.as_ref());
147
148    match state {
149      ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
150      ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
151      ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
152      ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
153      ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
154    }
155  }
156}
157
158impl ConnectionWriter {
159  /// Метод записи пакета
160  pub async fn write_packet(&mut self, packet: ServersidePacket) -> std::io::Result<()> {
161    let serialized = match packet {
162      ServersidePacket::Handshake(p) => serialize_packet(&p),
163      ServersidePacket::Status(p) => serialize_packet(&p),
164      ServersidePacket::Login(p) => serialize_packet(&p),
165      ServersidePacket::Configuration(p) => serialize_packet(&p),
166      ServersidePacket::Play(p) => serialize_packet(&p),
167    }
168    .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "failed to serialize packet"))?;
169
170    let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
171    let mut encryptor_guard = self.encryptor.lock().await;
172
173    write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
174  }
175  /// Метод выключения потока записи
176  pub async fn shutdown(&mut self) -> std::io::Result<()> {
177    self.write_stream.shutdown().await
178  }
179}
180
181impl NurtexConnection {
182  /// Метод создания нового подключения
183  pub async fn new(server_host: impl Into<String>, server_port: u16) -> std::io::Result<Self> {
184    let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
185    stream.set_nodelay(true)?;
186    Self::new_from_stream(stream).await
187  }
188
189  /// Метод создания нового подключения с прокси
190  pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> std::io::Result<Self> {
191    let stream = match proxy.connect(server_host, server_port).await {
192      ProxyResult::Ok(s) => s,
193      ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
194    };
195
196    stream.set_nodelay(true)?;
197
198    Self::new_from_stream(stream).await
199  }
200
201  /// Метод создания нового подключения из TcpStream
202  pub async fn new_from_stream(stream: TcpStream) -> std::io::Result<Self> {
203    let (rh, wh) = stream.into_split();
204
205    // Изначально хэндшейк
206    let state = Arc::new(AtomicI8::new(0));
207
208    // Изначально отрицательный (не указан)
209    let compression_threshold = Arc::new(AtomicI32::new(-1));
210
211    let reader = ConnectionReader {
212      read_stream: rh,
213      buffer: Cursor::new(Vec::new()),
214      compression_threshold: Arc::clone(&compression_threshold),
215      decryptor: Arc::new(Mutex::new(None)),
216      state: Arc::clone(&state),
217    };
218
219    let writer = ConnectionWriter {
220      write_stream: wh,
221      compression_threshold: Arc::clone(&compression_threshold),
222      encryptor: Arc::new(Mutex::new(None)),
223    };
224
225    Ok(NurtexConnection {
226      reader: Arc::new(Mutex::new(reader)),
227      writer: Arc::new(Mutex::new(writer)),
228      state: state,
229      compression_threshold,
230    })
231  }
232
233  /// Метод получения `reader`
234  pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
235    self.reader.clone()
236  }
237
238  /// Метод получения `writer`
239  pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
240    self.writer.clone()
241  }
242
243  /// Метод получения текущего состояния подключения
244  pub async fn get_state(&self) -> ConnectionState {
245    ConnectionState::from(self.state.load(Ordering::SeqCst))
246  }
247
248  /// Метод изменения состояния подключения
249  pub async fn set_state(&self, state: ConnectionState) {
250    let state_id = match state {
251      ConnectionState::Status => -1,
252      ConnectionState::Handshake => 0,
253      ConnectionState::Login => 1,
254      ConnectionState::Configuration => 2,
255      ConnectionState::Play => 3,
256    };
257
258    self.state.store(state_id, Ordering::SeqCst);
259  }
260
261  /// Вспомогательный метод чтения пакета
262  pub async fn read_packet(&self) -> Option<ClientsidePacket> {
263    let mut reader = self.reader.lock().await;
264    reader.read_packet().await
265  }
266
267  /// Вспомогательный метод чтения `status` пакета
268  pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
269    let mut reader = self.reader.lock().await;
270
271    if let Some(ClientsidePacket::Status(packet)) = reader.read_packet().await {
272      Some(packet)
273    } else {
274      None
275    }
276  }
277
278  /// Вспомогательный метод чтения `login` пакета
279  pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
280    let mut reader = self.reader.lock().await;
281
282    if let Some(ClientsidePacket::Login(packet)) = reader.read_packet().await {
283      Some(packet)
284    } else {
285      None
286    }
287  }
288
289  /// Вспомогательный метод чтения `configuration` пакета
290  pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
291    let mut reader = self.reader.lock().await;
292
293    if let Some(ClientsidePacket::Configuration(packet)) = reader.read_packet().await {
294      Some(packet)
295    } else {
296      None
297    }
298  }
299
300  /// Вспомогательный метод чтения `play` пакета
301  pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
302    let mut reader = self.reader.lock().await;
303
304    if let Some(ClientsidePacket::Play(packet)) = reader.read_packet().await {
305      Some(packet)
306    } else {
307      None
308    }
309  }
310
311  /// Вспомогательный метод записи пакета
312  pub async fn write_packet(&self, packet: ServersidePacket) -> std::io::Result<()> {
313    let mut writer = self.writer.lock().await;
314    writer.write_packet(packet).await
315  }
316
317  /// Вспомогательный метод записи `handshake` пакета
318  pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> std::io::Result<()> {
319    let mut writer = self.writer.lock().await;
320    writer.write_packet(ServersidePacket::Handshake(packet)).await
321  }
322
323  /// Вспомогательный метод записи `status` пакета
324  pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> std::io::Result<()> {
325    let mut writer = self.writer.lock().await;
326    writer.write_packet(ServersidePacket::Status(packet)).await
327  }
328
329  /// Вспомогательный метод записи `login` пакета
330  pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> std::io::Result<()> {
331    let mut writer = self.writer.lock().await;
332    writer.write_packet(ServersidePacket::Login(packet)).await
333  }
334
335  /// Вспомогательный метод записи `configuration` пакета
336  pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> std::io::Result<()> {
337    let mut writer = self.writer.lock().await;
338    writer.write_packet(ServersidePacket::Configuration(packet)).await
339  }
340
341  /// Вспомогательный метод записи `play` пакета
342  pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> std::io::Result<()> {
343    let mut writer = self.writer.lock().await;
344    writer.write_packet(ServersidePacket::Play(packet)).await
345  }
346
347  /// Метод выключения соединения
348  pub async fn shutdown(&self) -> std::io::Result<()> {
349    let mut writer = self.writer.lock().await;
350    writer.shutdown().await
351  }
352
353  /// Метод установки порога сжатия
354  pub async fn set_compression_threshold(&self, threshold: i32) {
355    self.compression_threshold.store(threshold, Ordering::SeqCst);
356  }
357
358  /// Устанавливает шифрование на соединении используя секретный ключ.
359  /// Этот метод должен быть вызван **после** отправки `EncryptionResponse` серверу
360  pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
361    let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
362
363    {
364      let reader = self.reader.lock().await;
365      *reader.decryptor.lock().await = Some(decryptor);
366    }
367
368    {
369      let writer = self.writer.lock().await;
370      *writer.encryptor.lock().await = Some(encryptor);
371    }
372  }
373}