use std::fmt::Debug;
use std::io::{Cursor, Error, ErrorKind};
use std::sync::Arc;
use std::sync::atomic::{AtomicI8, AtomicI32, Ordering};
use nurtex_encrypt::{AesDecryptor, AesEncryptor};
use nurtex_proxy::{Proxy, ProxyResult};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::sync::Mutex;
use crate::connection::reader::{deserialize_packet, read_raw_packet};
use crate::connection::writer::{serialize_packet, write_raw_packet};
use crate::packets::{
configuration::{ClientsideConfigurationPacket, ServersideConfigurationPacket},
handshake::{ClientsideHandshakePacket, ServersideHandshakePacket},
login::{ClientsideLoginPacket, ServersideLoginPacket},
play::{ClientsidePlayPacket, ServersidePlayPacket},
status::{ClientsideStatusPacket, ServersideStatusPacket},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Handshake,
Status,
Login,
Configuration,
Play,
}
impl From<i8> for ConnectionState {
fn from(value: i8) -> Self {
match value {
-1 => Self::Status,
0 => Self::Handshake,
1 => Self::Login,
2 => Self::Configuration,
3 => Self::Play,
_ => Self::Handshake,
}
}
}
#[derive(Debug, Clone)]
pub enum ClientsidePacket {
Handshake(ClientsideHandshakePacket),
Status(ClientsideStatusPacket),
Login(ClientsideLoginPacket),
Configuration(ClientsideConfigurationPacket),
Play(ClientsidePlayPacket),
}
#[derive(Debug, Clone)]
pub enum ServersidePacket {
Handshake(ServersideHandshakePacket),
Status(ServersideStatusPacket),
Login(ServersideLoginPacket),
Configuration(ServersideConfigurationPacket),
Play(ServersidePlayPacket),
}
impl ServersidePacket {
pub fn handshake(packet: ServersideHandshakePacket) -> Self {
ServersidePacket::Handshake(packet)
}
pub fn status(packet: ServersideStatusPacket) -> Self {
ServersidePacket::Status(packet)
}
pub fn login(packet: ServersideLoginPacket) -> Self {
ServersidePacket::Login(packet)
}
pub fn configuration(packet: ServersideConfigurationPacket) -> Self {
ServersidePacket::Configuration(packet)
}
pub fn play(packet: ServersidePlayPacket) -> Self {
ServersidePacket::Play(packet)
}
}
pub struct ConnectionReader {
read_stream: OwnedReadHalf,
buffer: Cursor<Vec<u8>>,
decryptor: Arc<Mutex<Option<AesDecryptor>>>,
state: Arc<AtomicI8>,
compression_threshold: Arc<AtomicI32>,
}
pub struct ConnectionWriter {
write_stream: OwnedWriteHalf,
encryptor: Arc<Mutex<Option<AesEncryptor>>>,
compression_threshold: Arc<AtomicI32>,
}
pub struct NurtexConnection {
reader: Arc<Mutex<ConnectionReader>>,
writer: Arc<Mutex<ConnectionWriter>>,
state: Arc<AtomicI8>,
compression_threshold: Arc<AtomicI32>,
}
impl ConnectionReader {
pub async fn read_packet(&mut self) -> Option<ClientsidePacket> {
let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
let state = ConnectionState::from(self.state.load(Ordering::SeqCst));
let mut decryptor_guard = self.decryptor.lock().await;
let raw_packet = read_raw_packet(&mut self.read_stream, &mut self.buffer, compression_threshold, &mut *decryptor_guard).await?;
let mut cursor = Cursor::new(raw_packet.as_ref());
match state {
ConnectionState::Handshake => deserialize_packet::<ClientsideHandshakePacket>(&mut cursor).map(ClientsidePacket::Handshake),
ConnectionState::Status => deserialize_packet::<ClientsideStatusPacket>(&mut cursor).map(ClientsidePacket::Status),
ConnectionState::Login => deserialize_packet::<ClientsideLoginPacket>(&mut cursor).map(ClientsidePacket::Login),
ConnectionState::Configuration => deserialize_packet::<ClientsideConfigurationPacket>(&mut cursor).map(ClientsidePacket::Configuration),
ConnectionState::Play => deserialize_packet::<ClientsidePlayPacket>(&mut cursor).map(ClientsidePacket::Play),
}
}
}
impl ConnectionWriter {
pub async fn write_packet(&mut self, packet: ServersidePacket) -> std::io::Result<()> {
let serialized = match packet {
ServersidePacket::Handshake(p) => serialize_packet(&p),
ServersidePacket::Status(p) => serialize_packet(&p),
ServersidePacket::Login(p) => serialize_packet(&p),
ServersidePacket::Configuration(p) => serialize_packet(&p),
ServersidePacket::Play(p) => serialize_packet(&p),
}
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "failed to serialize packet"))?;
let compression_threshold = self.compression_threshold.load(Ordering::SeqCst);
let mut encryptor_guard = self.encryptor.lock().await;
write_raw_packet(&serialized, &mut self.write_stream, compression_threshold, &mut *encryptor_guard).await
}
pub async fn shutdown(&mut self) -> std::io::Result<()> {
self.write_stream.shutdown().await
}
}
impl NurtexConnection {
pub async fn new(server_host: impl Into<String>, server_port: u16) -> std::io::Result<Self> {
let stream = TcpStream::connect(format!("{}:{}", server_host.into(), server_port)).await?;
stream.set_nodelay(true)?;
Self::new_from_stream(stream).await
}
pub async fn new_with_proxy(server_host: impl Into<String>, server_port: u16, proxy: &Proxy) -> std::io::Result<Self> {
let stream = match proxy.connect(server_host, server_port).await {
ProxyResult::Ok(s) => s,
ProxyResult::Err(e) => return Err(Error::new(ErrorKind::NotConnected, e.text())),
};
stream.set_nodelay(true)?;
Self::new_from_stream(stream).await
}
pub async fn new_from_stream(stream: TcpStream) -> std::io::Result<Self> {
let (rh, wh) = stream.into_split();
let state = Arc::new(AtomicI8::new(0));
let compression_threshold = Arc::new(AtomicI32::new(-1));
let reader = ConnectionReader {
read_stream: rh,
buffer: Cursor::new(Vec::new()),
compression_threshold: Arc::clone(&compression_threshold),
decryptor: Arc::new(Mutex::new(None)),
state: Arc::clone(&state),
};
let writer = ConnectionWriter {
write_stream: wh,
compression_threshold: Arc::clone(&compression_threshold),
encryptor: Arc::new(Mutex::new(None)),
};
Ok(NurtexConnection {
reader: Arc::new(Mutex::new(reader)),
writer: Arc::new(Mutex::new(writer)),
state: state,
compression_threshold,
})
}
pub fn get_reader(&self) -> Arc<Mutex<ConnectionReader>> {
self.reader.clone()
}
pub fn get_writer(&self) -> Arc<Mutex<ConnectionWriter>> {
self.writer.clone()
}
pub async fn get_state(&self) -> ConnectionState {
ConnectionState::from(self.state.load(Ordering::SeqCst))
}
pub async fn set_state(&self, state: ConnectionState) {
let state_id = match state {
ConnectionState::Status => -1,
ConnectionState::Handshake => 0,
ConnectionState::Login => 1,
ConnectionState::Configuration => 2,
ConnectionState::Play => 3,
};
self.state.store(state_id, Ordering::SeqCst);
}
pub async fn read_packet(&self) -> Option<ClientsidePacket> {
let mut reader = self.reader.lock().await;
reader.read_packet().await
}
pub async fn read_status_packet(&self) -> Option<ClientsideStatusPacket> {
let mut reader = self.reader.lock().await;
if let Some(ClientsidePacket::Status(packet)) = reader.read_packet().await {
Some(packet)
} else {
None
}
}
pub async fn read_login_packet(&self) -> Option<ClientsideLoginPacket> {
let mut reader = self.reader.lock().await;
if let Some(ClientsidePacket::Login(packet)) = reader.read_packet().await {
Some(packet)
} else {
None
}
}
pub async fn read_configuration_packet(&self) -> Option<ClientsideConfigurationPacket> {
let mut reader = self.reader.lock().await;
if let Some(ClientsidePacket::Configuration(packet)) = reader.read_packet().await {
Some(packet)
} else {
None
}
}
pub async fn read_play_packet(&self) -> Option<ClientsidePlayPacket> {
let mut reader = self.reader.lock().await;
if let Some(ClientsidePacket::Play(packet)) = reader.read_packet().await {
Some(packet)
} else {
None
}
}
pub async fn write_packet(&self, packet: ServersidePacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(packet).await
}
pub async fn write_handshake_packet(&self, packet: ServersideHandshakePacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(ServersidePacket::Handshake(packet)).await
}
pub async fn write_status_packet(&self, packet: ServersideStatusPacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(ServersidePacket::Status(packet)).await
}
pub async fn write_login_packet(&self, packet: ServersideLoginPacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(ServersidePacket::Login(packet)).await
}
pub async fn write_configuration_packet(&self, packet: ServersideConfigurationPacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(ServersidePacket::Configuration(packet)).await
}
pub async fn write_play_packet(&self, packet: ServersidePlayPacket) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.write_packet(ServersidePacket::Play(packet)).await
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
writer.shutdown().await
}
pub async fn set_compression_threshold(&self, threshold: i32) {
self.compression_threshold.store(threshold, Ordering::SeqCst);
}
pub async fn set_encryption_key(&self, secret_key: [u8; 16]) {
let (encryptor, decryptor) = nurtex_encrypt::create_cipher(&secret_key);
{
let reader = self.reader.lock().await;
*reader.decryptor.lock().await = Some(decryptor);
}
{
let writer = self.writer.lock().await;
*writer.encryptor.lock().await = Some(encryptor);
}
}
}