use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use bytes::BufMut;
use deepslate_protocol::codec;
use deepslate_protocol::packet::Packet;
use deepslate_protocol::packet::handshake::{HandshakeIntent, HandshakePacket};
use deepslate_protocol::packet::login::{
EncryptionRequestPacket, EncryptionResponsePacket, LoginAcknowledgedPacket,
LoginDisconnectPacket, LoginStartPacket, LoginSuccessPacket,
};
use deepslate_protocol::packet::play::{
ConfigPacketIds, PlayPacketIds, parse_command_from_signed, parse_command_from_unsigned,
};
use deepslate_protocol::packet::status::{
PingRequestPacket, PongResponsePacket, StatusRequestPacket, StatusResponsePacket,
};
use deepslate_protocol::types::GameProfile;
use deepslate_protocol::varint;
use deepslate_protocol::version::ProtocolVersion;
use tracing::{debug, info, warn};
use crate::auth;
use crate::config::Config;
use crate::connection::MinecraftConnection;
use crate::connection::backend;
use crate::crypto::ServerKeyPair;
use crate::event::{
ChooseServerEvent, ChooseServerResult, DisconnectEvent, EventManager, LoginEvent, LoginResult,
PingEvent, PingResult, PlayerInfo, ResultedEvent, ServerConnectedEvent, ServerSwitchEvent,
ServerSwitchResult,
};
use crate::server::ServerRegistry;
use crate::status;
#[expect(
clippy::large_futures,
reason = "Connection handling owns large per-connection buffers across awaited phases"
)]
pub async fn handle_client(
mut conn: MinecraftConnection,
client_addr: SocketAddr,
config: Arc<Config>,
key_pair: Arc<ServerKeyPair>,
registry: Arc<ServerRegistry>,
http_client: reqwest::Client,
event_manager: Arc<EventManager>,
) -> io::Result<()> {
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id != HandshakePacket::PACKET_ID {
warn!(packet_id, "expected handshake packet");
return Ok(());
}
let handshake = HandshakePacket::decode(&mut cursor).map_err(protocol_err)?;
debug!(
protocol = handshake.protocol_version,
addr = %handshake.server_address,
port = handshake.server_port,
intent = ?handshake.next_state,
"received handshake"
);
match handshake.next_state {
HandshakeIntent::Status => {
handle_status(
&mut conn,
&config,
&event_manager,
handshake.protocol_version,
)
.await
}
HandshakeIntent::Login => {
handle_login(
conn,
client_addr,
&config,
key_pair,
®istry,
&http_client,
&event_manager,
&handshake,
)
.await
}
HandshakeIntent::Transfer => {
debug!("transfer intent not supported, closing");
Ok(())
}
}
}
#[expect(
clippy::large_futures,
reason = "MinecraftConnection keeps large read/write buffers in async state"
)]
async fn handle_status(
conn: &mut MinecraftConnection,
config: &Config,
event_manager: &EventManager,
client_protocol: i32,
) -> io::Result<()> {
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id != StatusRequestPacket::PACKET_ID {
return Ok(());
}
let default_response = status::build_status_response(
&config.motd,
config.max_players,
0, client_protocol,
);
let ping_event = event_manager.fire(PingEvent::new(default_response.clone(), client_protocol));
let response_json = match ping_event.result() {
PingResult::Default => default_response,
PingResult::Override(json) => json.clone(),
};
conn.write_packet(&StatusResponsePacket {
json: response_json.to_string(),
})
.await?;
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id == PingRequestPacket::PACKET_ID {
let ping = PingRequestPacket::decode(&mut cursor).map_err(protocol_err)?;
conn.write_packet(&PongResponsePacket {
payload: ping.payload,
})
.await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
#[expect(
clippy::large_futures,
reason = "Login orchestration awaits several large connection-state futures"
)]
#[expect(
clippy::large_stack_frames,
reason = "The state machine works on buffered packet frames held by MinecraftConnection"
)]
async fn handle_login(
mut conn: MinecraftConnection,
client_addr: SocketAddr,
config: &Config,
key_pair: Arc<ServerKeyPair>,
registry: &ServerRegistry,
http_client: &reqwest::Client,
event_manager: &EventManager,
handshake: &HandshakePacket,
) -> io::Result<()> {
let Some(protocol_version) = ProtocolVersion::from_protocol(handshake.protocol_version) else {
let msg = format!(
"Unsupported protocol version {}. Supported: {}",
handshake.protocol_version,
ProtocolVersion::SUPPORTED_VERSIONS
);
disconnect_login(&mut conn, &msg).await?;
return Ok(());
};
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id != LoginStartPacket::PACKET_ID {
warn!(packet_id, "expected login start packet");
return Ok(());
}
let login_start = LoginStartPacket::decode(&mut cursor).map_err(protocol_err)?;
info!(username = %login_start.username, uuid = %login_start.uuid, "player login");
let profile = if config.online_mode {
let mut verify_token = [0u8; 4];
rand::Fill::fill(&mut verify_token, &mut rand::rng());
conn.write_packet(&EncryptionRequestPacket {
server_id: String::new(),
public_key: key_pair.public_key_der().to_vec(),
verify_token: verify_token.to_vec(),
should_authenticate: true,
})
.await?;
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id != EncryptionResponsePacket::PACKET_ID {
warn!(packet_id, "expected encryption response packet");
return Ok(());
}
let enc_response = EncryptionResponsePacket::decode(&mut cursor).map_err(protocol_err)?;
let (shared_secret, decrypted_verify) = tokio::task::spawn_blocking({
let key_pair = Arc::clone(&key_pair);
let ciphertext_secret = enc_response.shared_secret.clone();
let ciphertext_verify = enc_response.verify_token.clone();
move || -> io::Result<_> {
let ss = key_pair.decrypt(&ciphertext_secret).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("RSA decrypt failed: {e}"),
)
})?;
let vt = key_pair.decrypt(&ciphertext_verify).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("RSA decrypt failed: {e}"),
)
})?;
Ok((ss, vt))
}
})
.await
.map_err(io::Error::other)??;
if decrypted_verify != verify_token {
disconnect_login(&mut conn, "Verify token mismatch").await?;
return Ok(());
}
let server_id =
crate::crypto::generate_server_id(&shared_secret, key_pair.public_key_der());
let profile =
match auth::verify_player(http_client, &login_start.username, &server_id).await {
Ok(profile) => profile,
Err(auth::AuthError::NotAuthenticated) => {
disconnect_login(&mut conn, "Failed to verify username!").await?;
return Ok(());
}
Err(e) => {
warn!(error = %e, "Mojang authentication failed");
disconnect_login(
&mut conn,
"Authentication servers are down. Please try again later.",
)
.await?;
return Ok(());
}
};
conn.enable_encryption(&shared_secret);
debug!(username = %profile.name, uuid = %profile.id, "authenticated with Mojang");
profile
} else {
GameProfile {
id: login_start.uuid,
name: login_start.username.clone(),
properties: vec![],
}
};
let player_info = PlayerInfo {
profile: profile.clone(),
remote_addr: client_addr.ip().to_string(),
protocol_version: handshake.protocol_version,
};
let login_event = event_manager.fire(LoginEvent::new(player_info.clone()));
if let LoginResult::Deny(reason) = login_event.result() {
disconnect_login(&mut conn, reason).await?;
return Ok(());
}
if config.compression_threshold >= 0 {
conn.set_compression(config.compression_threshold).await?;
}
conn.write_packet(&LoginSuccessPacket::from_profile(&profile))
.await?;
let Some(frame) = conn.read_frame().await? else {
return Ok(());
};
let mut cursor = &frame[..];
let packet_id = codec::read_packet_id(&mut cursor).map_err(protocol_err)?;
if packet_id != LoginAcknowledgedPacket::PACKET_ID {
warn!(packet_id, "expected login acknowledged packet");
return Ok(());
}
debug!(username = %profile.name, "login acknowledged, entering config state");
let choose_event = event_manager.fire(ChooseServerEvent::new(player_info.clone()));
let server_id = match choose_event.result() {
ChooseServerResult::Override(id) => Some(id.clone()),
ChooseServerResult::Default => registry.select_initial().map(|s| s.id),
};
let Some(server_id) = server_id else {
disconnect_config(&mut conn, "No available servers. Please try again later.").await?;
return Ok(());
};
let Some(server) = registry.get(&server_id) else {
disconnect_config(
&mut conn,
&format!("Server '{server_id}' not found in registry."),
)
.await?;
return Ok(());
};
info!(
username = %profile.name,
server = %server.id,
addr = %server.addr,
"connecting to backend"
);
let mut backend_conn = match backend::connect_and_login(
&server.addr,
&profile,
&player_info,
handshake.protocol_version,
config,
)
.await
{
Ok(c) => c,
Err(e) => {
warn!(error = %e, server = %server.id, "failed to connect to backend");
disconnect_config(&mut conn, "Failed to connect to backend server.").await?;
return Ok(());
}
};
let mut current_server_id = server.id.clone();
event_manager.fire(ServerConnectedEvent::new(
player_info.clone(),
current_server_id.clone(),
));
let play_ids = PlayPacketIds::for_version(protocol_version);
loop {
if !relay_config(&mut conn, &mut backend_conn).await? {
break;
}
match relay_play(
&mut conn,
&mut backend_conn,
&play_ids,
&player_info,
¤t_server_id,
registry,
event_manager,
)
.await?
{
PlayOutcome::Disconnected => break,
PlayOutcome::ServerList => {
let servers = registry.list();
let list: Vec<&str> = servers.iter().map(|s| s.id.as_str()).collect();
let msg = format!(
"You are on: {current_server_id}. Available: {}",
list.join(", ")
);
send_system_message(&mut conn, &msg, &play_ids).await?;
match relay_play(
&mut conn,
&mut backend_conn,
&play_ids,
&player_info,
¤t_server_id,
registry,
event_manager,
)
.await?
{
PlayOutcome::Disconnected => break,
PlayOutcome::ServerList => {
break;
}
PlayOutcome::SwitchServer(target) => {
if !do_server_switch(
&mut conn,
&mut backend_conn,
&target,
&mut current_server_id,
&profile,
&player_info,
handshake,
config,
&play_ids,
registry,
event_manager,
)
.await?
{
break;
}
}
}
}
PlayOutcome::SwitchServer(target) => {
if !do_server_switch(
&mut conn,
&mut backend_conn,
&target,
&mut current_server_id,
&profile,
&player_info,
handshake,
config,
&play_ids,
registry,
event_manager,
)
.await?
{
break;
}
}
}
}
event_manager.fire(DisconnectEvent::new(player_info));
info!(username = %profile.name, "player disconnected");
Ok(())
}
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::large_futures,
reason = "Server switching reuses both buffered connections during handshake and replay"
)]
async fn do_server_switch(
conn: &mut MinecraftConnection,
backend_conn: &mut MinecraftConnection,
target_server_id: &str,
current_server_id: &mut String,
profile: &GameProfile,
player_info: &PlayerInfo,
handshake: &HandshakePacket,
config: &Config,
play_ids: &PlayPacketIds,
registry: &ServerRegistry,
event_manager: &EventManager,
) -> io::Result<bool> {
let Some(target) = registry.get(target_server_id) else {
send_system_message(
conn,
&format!("Server '{target_server_id}' not found."),
play_ids,
)
.await?;
return Ok(false); };
let switch_event = event_manager.fire(ServerSwitchEvent::new(
player_info.clone(),
current_server_id.clone(),
target_server_id.to_string(),
));
if let ServerSwitchResult::Deny(reason) = switch_event.result() {
send_system_message(conn, reason, play_ids).await?;
return Ok(false);
}
info!(
username = %profile.name,
from = %current_server_id,
to = %target.id,
"switching server"
);
let new_backend = match backend::connect_and_login(
&target.addr,
profile,
player_info,
handshake.protocol_version,
config,
)
.await
{
Ok(c) => c,
Err(e) => {
warn!(error = %e, server = %target.id, "failed to connect to new backend");
send_system_message(
conn,
&format!("Failed to connect to '{}'.", target.id),
play_ids,
)
.await?;
return Ok(false);
}
};
*backend_conn = new_backend;
send_start_configuration(conn, play_ids).await?;
if !wait_for_config_ack(conn, play_ids).await? {
return Ok(false);
}
*current_server_id = target.id;
event_manager.fire(ServerConnectedEvent::new(
player_info.clone(),
current_server_id.clone(),
));
Ok(true)
}
enum PlayOutcome {
Disconnected,
SwitchServer(String),
ServerList,
}
async fn relay_config(
client: &mut MinecraftConnection,
backend: &mut MinecraftConnection,
) -> io::Result<bool> {
let mut backend_finished = false;
let mut client_finished = false;
loop {
if backend_finished && client_finished {
return Ok(true);
}
tokio::select! {
backend_frame = backend.read_frame(), if !backend_finished => {
let Some(frame) = backend_frame? else {
return Ok(false);
};
let packet_id = peek_packet_id(&frame);
client.write_raw_packet(&frame).await?;
if packet_id == Some(ConfigPacketIds::FINISHED_CONFIGURATION_CLIENTBOUND) {
backend_finished = true;
}
}
client_frame = client.read_frame(), if !client_finished => {
let Some(frame) = client_frame? else {
return Ok(false);
};
let packet_id = peek_packet_id(&frame);
backend.write_raw_packet(&frame).await?;
if packet_id == Some(ConfigPacketIds::FINISHED_CONFIGURATION_SERVERBOUND) {
client_finished = true;
}
}
}
}
}
async fn relay_play(
client: &mut MinecraftConnection,
backend: &mut MinecraftConnection,
play_ids: &PlayPacketIds,
_player_info: &PlayerInfo,
_current_server_id: &str,
_registry: &ServerRegistry,
_event_manager: &EventManager,
) -> io::Result<PlayOutcome> {
loop {
tokio::select! {
client_frame = client.read_frame() => {
let Some(frame) = client_frame? else {
return Ok(PlayOutcome::Disconnected);
};
if let Some(command) = try_extract_command(
&frame, play_ids.unsigned_command, play_ids.signed_command,
) {
if let Some(target) = command.strip_prefix("server ") {
let target = target.trim().to_string();
if !target.is_empty() {
return Ok(PlayOutcome::SwitchServer(target));
}
}
if command == "server" {
return Ok(PlayOutcome::ServerList);
}
}
backend.write_raw_packet(&frame).await?;
}
backend_frame = backend.read_frame() => {
let Some(frame) = backend_frame? else {
return Ok(PlayOutcome::Disconnected);
};
client.write_raw_packet(&frame).await?;
}
}
}
}
fn try_extract_command(frame: &[u8], unsigned_cmd_id: i32, signed_cmd_id: i32) -> Option<String> {
let first = *frame.first()?;
if first & 0x80 != 0 {
return None; }
let packet_id = i32::from(first);
if packet_id == unsigned_cmd_id {
parse_command_from_unsigned(&frame[1..])
} else if packet_id == signed_cmd_id {
parse_command_from_signed(&frame[1..])
} else {
None
}
}
fn peek_packet_id(frame: &[u8]) -> Option<i32> {
varint::peek_var_int(frame).ok()?.map(|(id, _)| id)
}
async fn send_start_configuration(
conn: &mut MinecraftConnection,
play_ids: &PlayPacketIds,
) -> io::Result<()> {
let packet_data = codec::encode_packet_data(play_ids.start_configuration, |_buf| {});
conn.write_raw_packet(&packet_data).await
}
#[expect(
clippy::large_futures,
reason = "Acknowledgement waiting retains the buffered connection across reads"
)]
async fn wait_for_config_ack(
conn: &mut MinecraftConnection,
play_ids: &PlayPacketIds,
) -> io::Result<bool> {
loop {
let Some(frame) = conn.read_frame().await? else {
return Ok(false);
};
if peek_packet_id(&frame) == Some(play_ids.acknowledge_configuration) {
return Ok(true);
}
}
}
async fn send_system_message(
conn: &mut MinecraftConnection,
message: &str,
play_ids: &PlayPacketIds,
) -> io::Result<()> {
let packet_data = codec::encode_packet_data(play_ids.system_chat, |buf| {
deepslate_protocol::types::write_nbt_text_component(buf, message, Some("yellow"));
buf.put_u8(0); });
conn.write_raw_packet(&packet_data).await
}
async fn disconnect_login(conn: &mut MinecraftConnection, reason: &str) -> io::Result<()> {
let json_reason = serde_json::json!({"text": reason}).to_string();
conn.write_packet(&LoginDisconnectPacket {
reason: json_reason,
})
.await?;
conn.shutdown().await
}
async fn disconnect_config(conn: &mut MinecraftConnection, reason: &str) -> io::Result<()> {
let packet_data = codec::encode_packet_data(ConfigPacketIds::DISCONNECT, |buf| {
deepslate_protocol::types::write_nbt_text_component(buf, reason, None);
});
conn.write_raw_packet(&packet_data).await?;
conn.shutdown().await
}
fn protocol_err(e: deepslate_protocol::types::ProtocolError) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, e)
}