Skip to main content

nurtex_protocol/connection/
reader.rs

1use flate2::read::ZlibDecoder;
2use futures::StreamExt;
3use futures_util::future::FutureExt;
4use nurtex_codec::VarInt;
5use nurtex_encrypt::AesDecryptor;
6use std::fmt::Debug;
7use std::io::{Cursor, Read};
8use tokio::io::AsyncRead;
9use tokio_util::bytes::Buf;
10use tokio_util::codec::{BytesCodec, FramedRead};
11
12use crate::ProtocolPacket;
13
14/// Функция парсинга фрейма
15fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Option<Box<[u8]>> {
16  let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
17
18  let length = i32::read_varint(&mut buffer_copy)? as usize;
19
20  if length > buffer_copy.remaining() {
21    return None;
22  }
23
24  let varint_length = buffer.remaining() - buffer_copy.remaining();
25  buffer.advance(varint_length);
26  let data = buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
27  buffer.advance(length);
28
29  if buffer.position() == buffer.get_ref().len() as u64 {
30    buffer.get_mut().clear();
31    buffer.get_mut().shrink_to(1024 * 64);
32    buffer.set_position(0);
33  }
34
35  Some(data.into_boxed_slice())
36}
37
38/// Функция десериализации сетевого пакета
39pub fn deserialize_packet<P: ProtocolPacket + Debug>(stream: &mut Cursor<&[u8]>) -> Option<P> {
40  let packet_id = i32::read_varint(stream)? as u32;
41  P::read(packet_id, stream)
42}
43
44/// Функция декодировки с учётом порога сжатия
45pub fn compression_decoder(stream: &mut Cursor<&[u8]>, compression_threshold: u32) -> Option<Box<[u8]>> {
46  let n = i32::read_varint(stream)? as u32;
47
48  if n == 0 {
49    let buf = stream.get_ref()[stream.position() as usize..].to_vec().into_boxed_slice();
50    stream.set_position(stream.get_ref().len() as u64);
51    return Some(buf);
52  }
53
54  if n < compression_threshold {
55    return None;
56  }
57
58  if n > 8388608 {
59    return None;
60  }
61
62  let mut decoded_buf = Vec::with_capacity(n as usize);
63  let mut decoder = ZlibDecoder::new(stream);
64  decoder.read_to_end(&mut decoded_buf).ok()?;
65
66  Some(decoded_buf.into_boxed_slice())
67}
68
69/// Функция чтения сетевого пакета
70pub async fn read_packet<P: ProtocolPacket + Debug, R>(
71  stream: &mut R,
72  buffer: &mut Cursor<Vec<u8>>,
73  compression_threshold: Option<u32>,
74  cipher: &mut Option<AesDecryptor>,
75) -> Option<P>
76where
77  R: AsyncRead + Unpin + Send + Sync,
78{
79  let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
80  let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
81  Some(packet)
82}
83
84/// Функция чтения сетевого пакета (неблокирующая)
85pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
86  stream: &mut R,
87  buffer: &mut Cursor<Vec<u8>>,
88  compression_threshold: Option<u32>,
89  cipher: &mut Option<AesDecryptor>,
90) -> Result<Option<P>, std::io::Error>
91where
92  R: AsyncRead + Unpin + Send + Sync,
93{
94  let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)? else {
95    return Ok(None);
96  };
97
98  let packet = deserialize_packet(&mut Cursor::new(&raw_packet));
99  Ok(packet)
100}
101
102/// Функция чтения сырого пакета
103pub async fn read_raw_packet<R>(stream: &mut R, buffer: &mut Cursor<Vec<u8>>, compression_threshold: Option<u32>, cipher: &mut Option<AesDecryptor>) -> Option<Box<[u8]>>
104where
105  R: AsyncRead + Unpin + Send + Sync,
106{
107  loop {
108    if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold) {
109      return Some(buf);
110    };
111
112    let bytes = read_and_decrypt_frame(stream, cipher).await?;
113    buffer.get_mut().extend_from_slice(&bytes);
114  }
115}
116
117/// Функция чтения сырого пакета (неблокирующая)
118pub fn try_read_raw_packet<R>(
119  stream: &mut R,
120  buffer: &mut Cursor<Vec<u8>>,
121  compression_threshold: Option<u32>,
122  cipher: &mut Option<AesDecryptor>,
123) -> Result<Option<Box<[u8]>>, std::io::Error>
124where
125  R: AsyncRead + Unpin + Send + Sync,
126{
127  loop {
128    if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold) {
129      return Ok(Some(buf));
130    };
131
132    let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
133      return Ok(None);
134    };
135
136    buffer.get_mut().extend_from_slice(&bytes);
137  }
138}
139
140/// Функция чтения и расшифровки фрейма
141async fn read_and_decrypt_frame<R>(stream: &mut R, cipher: &mut Option<AesDecryptor>) -> Option<Box<[u8]>>
142where
143  R: AsyncRead + Unpin + Send + Sync,
144{
145  let mut framed = FramedRead::new(stream, BytesCodec::new());
146
147  let Some(message) = framed.next().await else {
148    return None;
149  };
150
151  let bytes = message.ok()?;
152
153  let mut bytes = bytes.to_vec().into_boxed_slice();
154
155  if let Some(cipher) = cipher {
156    nurtex_encrypt::decrypt_packet(cipher, &mut bytes);
157  }
158
159  Some(bytes)
160}
161
162/// Функция чтения и расшифровки фрейма (неблокирующая)
163fn try_read_and_decrypt_frame<R>(stream: &mut R, cipher: &mut Option<AesDecryptor>) -> Result<Option<Box<[u8]>>, std::io::Error>
164where
165  R: AsyncRead + Unpin + Send + Sync,
166{
167  let mut framed = FramedRead::new(stream, BytesCodec::new());
168
169  let Some(message) = framed.next().now_or_never() else {
170    return Ok(None);
171  };
172
173  let Some(message) = message else {
174    return Err(std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "Connection closed"));
175  };
176
177  let bytes = message?.freeze();
178  let mut bytes = bytes.to_vec().into_boxed_slice();
179
180  if let Some(cipher) = cipher {
181    nurtex_encrypt::decrypt_packet(cipher, &mut bytes);
182  }
183
184  Ok(Some(bytes))
185}
186
187/// Функция чтения сырого пакета из буффера
188pub fn read_raw_packet_from_buffer<R>(buffer: &mut Cursor<Vec<u8>>, compression_threshold: Option<u32>) -> Option<Box<[u8]>>
189where
190  R: AsyncRead + Unpin + Send + Sync,
191{
192  let Some(mut buf) = parse_frame(buffer) else {
193    return None;
194  };
195
196  if let Some(compression_threshold) = compression_threshold {
197    buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)?;
198  }
199
200  Some(buf)
201}