nurtex_protocol/connection/
reader.rs1use std::fmt::Debug;
2use std::io::{Cursor, Read};
3
4use flate2::read::ZlibDecoder;
5use futures::StreamExt;
6use nurtex_codec::types::variable::VarI32;
7use nurtex_encrypt::AesDecryptor;
8use tokio::io::AsyncRead;
9use tokio_util::bytes::Buf;
10use tokio_util::codec::{BytesCodec, FramedRead};
11
12use crate::ProtocolPacket;
13
14fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Option<Box<[u8]>> {
16 let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
17
18 let length = i32::read_var(&mut buffer_copy)? as usize;
19
20 if length > buffer_copy.remaining() {
21 return None;
22 }
23
24 let varint_length = buffer.remaining() - buffer_copy.remaining();
25 buffer.advance(varint_length);
26 let data = buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
27 buffer.advance(length);
28
29 if buffer.position() == buffer.get_ref().len() as u64 {
30 buffer.get_mut().clear();
31 buffer.get_mut().shrink_to(1024 * 64);
32 buffer.set_position(0);
33 }
34
35 Some(data.into_boxed_slice())
36}
37
38pub fn deserialize_packet<P: ProtocolPacket + Debug>(buffer: &mut Cursor<&[u8]>) -> Option<P> {
40 let packet_id = i32::read_var(buffer)? as u32;
41 P::read(packet_id, buffer)
42}
43
44pub fn compression_decoder(buffer: &mut Cursor<&[u8]>, compression_threshold: i32) -> Option<Box<[u8]>> {
46 let n = i32::read_var(buffer)?;
47
48 if n == 0 {
49 let buf = buffer.get_ref()[buffer.position() as usize..].to_vec().into_boxed_slice();
50 buffer.set_position(buffer.get_ref().len() as u64);
51 return Some(buf);
52 }
53
54 if n < compression_threshold {
55 return None;
56 }
57
58 if n > 8388608 {
59 return None;
60 }
61
62 let mut decoded_buf = Vec::with_capacity(n as usize);
63 let mut decoder = ZlibDecoder::new(buffer);
64 decoder.read_to_end(&mut decoded_buf).ok()?;
65
66 Some(decoded_buf.into_boxed_slice())
67}
68
69pub async fn read_packet<P: ProtocolPacket + Debug, R>(stream: &mut R, buffer: &mut Cursor<Vec<u8>>, compression_threshold: i32, cipher: &mut Option<AesDecryptor>) -> Option<P>
71where
72 R: AsyncRead + Unpin + Send + Sync,
73{
74 let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
75 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
76 Some(packet)
77}
78
79pub async fn read_raw_packet<R>(stream: &mut R, buffer: &mut Cursor<Vec<u8>>, compression_threshold: i32, cipher: &mut Option<AesDecryptor>) -> Option<Box<[u8]>>
81where
82 R: AsyncRead + Unpin + Send + Sync,
83{
84 loop {
85 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold) {
86 return Some(buf);
87 };
88
89 let bytes = read_and_decrypt_frame(stream, cipher).await?;
90 buffer.get_mut().extend_from_slice(&bytes);
91 }
92}
93
94async fn read_and_decrypt_frame<R>(stream: &mut R, cipher: &mut Option<AesDecryptor>) -> Option<Box<[u8]>>
96where
97 R: AsyncRead + Unpin + Send + Sync,
98{
99 let mut framed = FramedRead::new(stream, BytesCodec::new());
100
101 let Some(message) = framed.next().await else {
102 return None;
103 };
104
105 let bytes = message.ok()?;
106
107 let mut bytes = bytes.to_vec().into_boxed_slice();
108
109 if let Some(cipher) = cipher {
110 nurtex_encrypt::decrypt_packet(cipher, &mut bytes);
111 }
112
113 Some(bytes)
114}
115
116pub fn read_raw_packet_from_buffer<R>(buffer: &mut Cursor<Vec<u8>>, compression_threshold: i32) -> Option<Box<[u8]>>
118where
119 R: AsyncRead + Unpin + Send + Sync,
120{
121 let Some(mut buf) = parse_frame(buffer) else {
122 return None;
123 };
124
125 if compression_threshold >= 0 {
126 buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)?;
127 }
128
129 Some(buf)
130}