use std::collections::HashMap;
use std::sync::Arc;
use azalea_protocol::connect::Connection;
use azalea_protocol::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
use eyre::Result;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::ids;
use crate::local_server::LocalClient;
use crate::plugin::{Frame, Pipeline};
use crate::reflect::{self, BotPose};
use crate::relay::{AzaleaFrameSink, AzaleaFrameSource, FrameSink, FrameSource};
use crate::upstream::Upstream;
pub type ClientId = u32;
pub enum SessionMsg {
FromUpstream(Frame),
UpstreamClosed(String),
FromClient(ClientId, Frame),
Attach {
id: ClientId,
tx: mpsc::Sender<Frame>,
username: String,
uuid: Uuid,
},
Detach(ClientId),
}
enum ClientState {
Parked,
Joining,
Live,
}
struct ClientHandle {
tx: mpsc::Sender<Frame>,
state: ClientState,
username: String,
uuid: Uuid,
swallow_next_accept: bool,
}
#[derive(Default)]
struct JoinCache {
config_frames: Vec<Frame>,
login: Option<Frame>,
last_position: Option<Frame>,
respawn: Option<Frame>,
spawn_pos: Option<Frame>,
chunk_center: Option<Frame>,
chunk_radius: Option<Frame>,
chunks: HashMap<(i32, i32), Frame>,
world: crate::snapshot::WorldSnapshot,
}
impl JoinCache {
fn on_respawn(&mut self, respawn: Frame) {
self.respawn = Some(respawn);
self.last_position = None;
self.spawn_pos = None;
self.chunk_center = None;
self.chunks.clear();
self.world.on_respawn();
}
fn join_frames(&self) -> Vec<Frame> {
let mut q = Vec::with_capacity(self.chunks.len() + 32);
q.extend(self.login.iter().cloned());
q.extend(self.respawn.iter().cloned());
q.extend(self.spawn_pos.iter().cloned());
q.extend(self.last_position.iter().cloned());
q.push(ids::wait_for_chunks_frame());
q.extend(self.chunk_radius.iter().cloned());
q.extend(self.chunk_center.iter().cloned());
q.extend(self.chunks.values().cloned());
q.extend(self.world.replay());
q
}
}
enum UpstreamState {
Config,
Game,
}
struct Session {
pipeline: Arc<Pipeline>,
upstream_tx: mpsc::Sender<Frame>,
clients: HashMap<ClientId, ClientHandle>,
controller: Option<ClientId>,
cache: JoinCache,
upstream_state: UpstreamState,
seen_first_game_frame: bool,
bot_uuid: Uuid,
bot_name: String,
pose: BotPose,
real_game_mode: u8,
abilities: Option<Frame>,
respawn_entity_pending: bool,
forward_next_position: bool,
}
pub fn spawn(
upstream: Upstream,
controller: LocalClient,
controller_id: ClientId,
pipeline: Arc<Pipeline>,
) -> mpsc::Sender<SessionMsg> {
tracing::info!(
"session start: controller '{}', upstream compression threshold {:?}",
controller.username,
upstream.compression_threshold
);
let bot_uuid = upstream.profile.id;
let bot_name = upstream.profile.name.clone();
for p in &pipeline.plugins {
p.on_session_start();
}
let (msg_tx, msg_rx) = mpsc::channel::<SessionMsg>(1024);
let upstream_tx = start_upstream_io(upstream, msg_tx.clone());
let (ctl_tx, ctl_rx) = mpsc::channel::<Frame>(4096);
let mut clients = HashMap::new();
clients.insert(
controller_id,
ClientHandle {
tx: ctl_tx,
state: ClientState::Live,
username: controller.username.clone(),
uuid: controller.uuid,
swallow_next_accept: false,
},
);
start_client_io(controller_id, controller.connection, msg_tx.clone(), ctl_rx);
let session = Session {
pipeline,
upstream_tx,
clients,
controller: Some(controller_id),
cache: JoinCache::default(),
upstream_state: UpstreamState::Config,
seen_first_game_frame: false,
bot_uuid,
bot_name,
pose: BotPose::default(),
real_game_mode: 0,
abilities: None,
respawn_entity_pending: false,
forward_next_position: false,
};
tokio::spawn(session.run(msg_rx));
msg_tx
}
pub async fn attach_viewer(
session_tx: &mpsc::Sender<SessionMsg>,
id: ClientId,
client: LocalClient,
) -> Result<()> {
let (tx, rx) = mpsc::channel::<Frame>(8192);
session_tx
.send(SessionMsg::Attach {
id,
tx,
username: client.username.clone(),
uuid: client.uuid,
})
.await
.map_err(|_| eyre::eyre!("session closed while attaching"))?;
start_client_io(id, client.connection, session_tx.clone(), rx);
Ok(())
}
fn login_game_mode(f: &Frame) -> Option<u8> {
use azalea_protocol::packets::ProtocolPacket;
use azalea_protocol::packets::game::ClientboundGamePacket;
use std::io::Cursor;
match ClientboundGamePacket::read(f.packet_id, &mut Cursor::new(&f.body[..])) {
Ok(ClientboundGamePacket::Login(l)) => Some(l.common.game_type.to_id()),
_ => None,
}
}
fn start_upstream_io(upstream: Upstream, msg_tx: mpsc::Sender<SessionMsg>) -> mpsc::Sender<Frame> {
let (read, write) = upstream.connection.into_split_raw();
let (tx, mut rx) = mpsc::channel::<Frame>(1024);
tokio::spawn(async move {
let mut sink = AzaleaFrameSink { writer: write };
while let Some(f) = rx.recv().await {
if let Err(e) = sink.write_frame(f).await {
tracing::warn!("upstream write failed: {e:#}");
break;
}
}
});
tokio::spawn(async move {
let mut src = AzaleaFrameSource { reader: read };
loop {
match src.read_frame().await {
Ok(f) => {
if msg_tx.send(SessionMsg::FromUpstream(f)).await.is_err() {
break;
}
}
Err(e) => {
let _ = msg_tx
.send(SessionMsg::UpstreamClosed(format!("{e:#}")))
.await;
break;
}
}
}
});
tx
}
fn start_client_io(
id: ClientId,
conn: Connection<ServerboundConfigPacket, ClientboundConfigPacket>,
msg_tx: mpsc::Sender<SessionMsg>,
mut frame_rx: mpsc::Receiver<Frame>,
) {
let (read, write) = conn.into_split_raw();
tokio::spawn(async move {
let mut sink = AzaleaFrameSink { writer: write };
while let Some(f) = frame_rx.recv().await {
if let Err(e) = sink.write_frame(f).await {
tracing::debug!("client {id} write failed: {e:#}");
break;
}
}
});
tokio::spawn(async move {
let mut src = AzaleaFrameSource { reader: read };
loop {
match src.read_frame().await {
Ok(f) => {
if msg_tx.send(SessionMsg::FromClient(id, f)).await.is_err() {
break;
}
}
Err(e) => {
tracing::debug!("client {id} read ended: {e:#}");
let _ = msg_tx.send(SessionMsg::Detach(id)).await;
break;
}
}
}
});
}
impl Session {
async fn run(mut self, mut rx: mpsc::Receiver<SessionMsg>) {
while let Some(msg) = rx.recv().await {
match msg {
SessionMsg::FromUpstream(frame) => self.on_upstream_frame(frame).await,
SessionMsg::UpstreamClosed(reason) => {
tracing::info!("upstream closed: {reason}");
break;
}
SessionMsg::FromClient(id, frame) => {
if let Err(e) = self.on_client_frame(id, frame).await {
tracing::info!("session ending: {e:#}");
break;
}
}
SessionMsg::Attach {
id,
tx,
username,
uuid,
} => self.on_attach(id, tx, username, uuid),
SessionMsg::Detach(id) => self.drop_client(id, "disconnected"),
}
if self.clients.is_empty() {
tracing::info!("last client left; tearing session down");
break;
}
}
tracing::info!(
"session ended ({} client(s) still attached will be dropped)",
self.clients.len()
);
}
async fn on_upstream_frame(&mut self, frame: Frame) {
for f in self.pipeline.clientbound(frame) {
let id = f.packet_id;
self.observe_clientbound(&f);
self.stand_in(&f).await;
self.broadcast(f).await;
if matches!(self.upstream_state, UpstreamState::Game)
&& (id == ids::CB_GAME_LOGIN || id == ids::CB_GAME_RESPAWN)
{
self.reassert_spectators();
}
}
}
async fn stand_in(&mut self, f: &Frame) {
if self.controller.is_some() {
return;
}
let reply = match self.upstream_state {
UpstreamState::Game => match f.packet_id {
ids::CB_GAME_KEEP_ALIVE => reflect::keepalive_id(f).map(reflect::keepalive_reply),
ids::CB_GAME_PLAYER_POSITION => {
reflect::teleport_id(f).map(reflect::accept_teleport_frame)
}
_ => None,
},
UpstreamState::Config => match f.packet_id {
ids::CB_CONFIG_KEEP_ALIVE => {
reflect::keepalive_id(f).map(reflect::config_keepalive_reply)
}
_ => None,
},
};
if let Some(r) = reply {
if self.upstream_tx.send(r).await.is_err() {
tracing::warn!("stand-in reply failed: upstream writer closed");
}
}
}
fn observe_clientbound(&mut self, f: &Frame) {
match self.upstream_state {
UpstreamState::Config => match f.packet_id {
ids::CB_CONFIG_FINISH => {
self.upstream_state = UpstreamState::Game;
self.seen_first_game_frame = false;
}
ids::CB_CONFIG_KEEP_ALIVE | ids::CB_CONFIG_PING => {}
_ => self.cache.config_frames.push(f.clone()),
},
UpstreamState::Game => {
if !self.seen_first_game_frame {
self.seen_first_game_frame = true;
if f.packet_id != ids::CB_GAME_LOGIN {
tracing::warn!(
"first game-state frame has id {} but Login should be {} — \
ids.rs may be stale for this azalea version",
f.packet_id,
ids::CB_GAME_LOGIN
);
}
}
self.cache.world.observe(f);
match f.packet_id {
ids::CB_GAME_LOGIN => {
self.cache.login = Some(f.clone());
self.real_game_mode = login_game_mode(f).unwrap_or(0);
self.respawn_entity_pending = true;
self.forward_next_position = true;
self.flush_parked();
}
ids::CB_GAME_PLAYER_POSITION => {
self.cache.last_position = Some(f.clone());
reflect::apply_server_teleport(&mut self.pose, f);
}
ids::CB_GAME_RESPAWN => {
self.cache.on_respawn(f.clone());
self.pose.pos = None;
self.respawn_entity_pending = true;
self.forward_next_position = true;
}
ids::CB_GAME_PLAYER_ABILITIES => {
self.abilities = Some(f.clone());
}
ids::CB_GAME_GAME_EVENT => {
if f.body.first() == Some(&3) {
if let Some(mode) = f.body.get(1..5).and_then(|b| {
b.try_into().ok().map(|a| f32::from_be_bytes(a) as u8)
}) {
self.real_game_mode = mode;
}
}
}
ids::CB_GAME_SET_DEFAULT_SPAWN_POSITION => {
self.cache.spawn_pos = Some(f.clone());
}
ids::CB_GAME_SET_CHUNK_CACHE_CENTER => {
self.cache.chunk_center = Some(f.clone());
}
ids::CB_GAME_SET_CHUNK_CACHE_RADIUS => {
self.cache.chunk_radius = Some(f.clone());
}
ids::CB_GAME_LEVEL_CHUNK_WITH_LIGHT => {
if let Some(key) = ids::chunk_key(&f.body) {
self.cache.chunks.insert(key, f.clone());
}
}
ids::CB_GAME_FORGET_LEVEL_CHUNK => {
if let Some(key) = ids::forget_chunk_key(&f.body) {
self.cache.chunks.remove(&key);
}
}
ids::CB_GAME_START_CONFIGURATION => {
self.upstream_state = UpstreamState::Config;
self.cache = JoinCache::default();
}
_ => {}
}
}
}
}
async fn on_client_frame(&mut self, id: ClientId, frame: Frame) -> Result<()> {
if matches!(self.upstream_state, UpstreamState::Game) {
if let Some(text) = reflect::chat_text(&frame) {
if text.starts_with(',') {
self.handle_command(id, text.trim()).await?;
return Ok(());
}
}
}
if Some(id) == self.controller {
if frame.packet_id == ids::SB_GAME_ACCEPT_TELEPORTATION
&& matches!(self.upstream_state, UpstreamState::Game)
{
if let Some(c) = self.clients.get_mut(&id) {
if c.swallow_next_accept {
c.swallow_next_accept = false;
return Ok(());
}
}
}
if reflect::apply_controller_move(&mut self.pose, &frame) {
let update = if self.respawn_entity_pending && self.pose.pos.is_some() {
self.respawn_entity_pending = false;
reflect::spawn_frames(self.bot_uuid, &self.pose)
} else {
reflect::move_frames(&self.pose)
};
self.send_to_viewers(&update);
}
for f in self.pipeline.serverbound(frame) {
if self.upstream_tx.send(f).await.is_err() {
eyre::bail!("upstream writer closed");
}
}
return Ok(());
}
let is_join_ack = matches!(
self.clients.get(&id),
Some(c) if matches!(c.state, ClientState::Joining)
) && frame.packet_id == ids::SB_CONFIG_FINISH;
if is_join_ack {
let mut queue = self.cache.join_frames();
let (uuid, name) = {
let c = self.clients.get(&id).expect("checked above");
(c.uuid, c.username.clone())
};
queue.extend(reflect::spectator_kit(uuid, &name));
queue.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
queue.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
let c = self.clients.get_mut(&id).expect("checked above");
let mut ok = true;
for f in queue {
if c.tx.try_send(f).is_err() {
ok = false;
break;
}
}
if ok {
c.state = ClientState::Live;
tracing::info!("viewer {id} ('{}') is live", c.username);
} else {
self.drop_client(id, "queue overflow during join");
}
}
Ok(())
}
async fn handle_command(&mut self, id: ClientId, cmd: &str) -> Result<()> {
tracing::info!("client {id} issued command: {cmd}");
match cmd {
",acquire" => {
if Some(id) == self.controller {
self.feedback(id, "you already have control");
return Ok(());
}
if let Some(old) = self.controller.take() {
self.demote_to_spectator(old);
self.feedback(old, "your control was taken by another client");
}
self.promote_to_controller(id);
self.feedback(id, "you have control now");
}
",release" | ",spectate" => {
if Some(id) == self.controller {
self.controller = None;
self.demote_to_spectator(id);
self.feedback(id, "control released; proxy is keeping the session alive");
} else {
self.feedback(id, "you are not the controller");
}
}
_ => self.feedback(id, "commands: ,acquire ,release ,spectate"),
}
Ok(())
}
fn feedback(&mut self, id: ClientId, msg: &str) {
if let Some(c) = self.clients.get(&id) {
let _ = c.tx.try_send(reflect::system_chat_frame(msg));
}
}
fn demote_to_spectator(&mut self, id: ClientId) {
let Some(c) = self.clients.get(&id) else {
return;
};
let mut frames = reflect::spectator_kit(c.uuid, &c.username);
frames.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
frames.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
let c = self.clients.get(&id).expect("checked above");
for f in frames {
let _ = c.tx.try_send(f);
}
}
fn promote_to_controller(&mut self, id: ClientId) {
let Some(c) = self.clients.get(&id) else {
return;
};
let mut frames = reflect::controller_kit(c.uuid, &c.username, self.real_game_mode);
frames.extend(self.abilities.iter().cloned());
let teleport = reflect::handoff_teleport_frame(&self.pose);
let has_teleport = teleport.is_some();
frames.extend(teleport);
for f in frames {
let _ = c.tx.try_send(f);
}
if let Some(c) = self.clients.get_mut(&id) {
c.swallow_next_accept = has_teleport;
}
self.controller = Some(id);
}
fn reassert_spectators(&mut self) {
let viewers: Vec<(ClientId, Uuid, String)> = self
.clients
.iter()
.filter(|(&cid, c)| {
Some(cid) != self.controller && matches!(c.state, ClientState::Live)
})
.map(|(&cid, c)| (cid, c.uuid, c.username.clone()))
.collect();
for (cid, uuid, name) in viewers {
let kit = reflect::spectator_kit(uuid, &name);
if let Some(c) = self.clients.get(&cid) {
for f in kit {
let _ = c.tx.try_send(f);
}
}
}
}
fn on_attach(&mut self, id: ClientId, tx: mpsc::Sender<Frame>, username: String, uuid: Uuid) {
tracing::info!("viewer {id} ('{username}') attaching");
self.clients.insert(
id,
ClientHandle {
tx,
state: ClientState::Parked,
username,
uuid,
swallow_next_accept: false,
},
);
if self.cache.login.is_some() {
self.start_replay(id);
} else {
tracing::info!("viewer {id} parked until session reaches game state");
}
}
fn start_replay(&mut self, id: ClientId) {
let mut frames = self.cache.config_frames.clone();
frames.push(ids::finish_config_frame());
let ok = {
let Some(c) = self.clients.get_mut(&id) else {
return;
};
let mut ok = true;
for f in frames {
if c.tx.try_send(f).is_err() {
ok = false;
break;
}
}
if ok {
c.state = ClientState::Joining;
}
ok
};
if !ok {
self.drop_client(id, "queue overflow during replay");
}
}
fn flush_parked(&mut self) {
let parked: Vec<ClientId> = self
.clients
.iter()
.filter(|(_, c)| matches!(c.state, ClientState::Parked))
.map(|(&id, _)| id)
.collect();
for id in parked {
self.start_replay(id);
}
}
fn viewers_receive(&mut self, f: &Frame) -> bool {
if !matches!(self.upstream_state, UpstreamState::Game) {
return true;
}
match f.packet_id {
ids::CB_GAME_PLAYER_POSITION => {
if self.forward_next_position {
self.forward_next_position = false;
true
} else {
false
}
}
ids::CB_GAME_PLAYER_ABILITIES => false,
ids::CB_GAME_GAME_EVENT => f.body.first() != Some(&3),
_ => true,
}
}
async fn broadcast(&mut self, frame: Frame) {
let viewers_receive = self.viewers_receive(&frame);
let mut dead = Vec::new();
for (&id, c) in self.clients.iter() {
if !matches!(c.state, ClientState::Live) {
continue;
}
if Some(id) == self.controller {
if c.tx.send(frame.clone()).await.is_err() {
dead.push(id);
}
} else if viewers_receive && c.tx.try_send(frame.clone()).is_err() {
dead.push(id);
}
}
for id in dead {
self.drop_client(id, "send failed or fell behind");
}
}
fn send_to_viewers(&mut self, frames: &[Frame]) {
let mut dead = Vec::new();
for (&id, c) in self.clients.iter() {
if Some(id) == self.controller || !matches!(c.state, ClientState::Live) {
continue;
}
for f in frames {
if c.tx.try_send(f.clone()).is_err() {
dead.push(id);
break;
}
}
}
for id in dead {
self.drop_client(id, "send failed or fell behind");
}
}
fn drop_client(&mut self, id: ClientId, reason: &str) {
if let Some(c) = self.clients.remove(&id) {
tracing::info!("client {id} ('{}') dropped: {reason}", c.username);
}
if self.controller == Some(id) {
self.controller = None;
tracing::info!("controller left; session is now controllerless (use ,acquire)");
}
}
}