Skip to main content

azalea_reflection_proxy/
session.rs

1//! Phase 2: the replicator. One upstream session shared by many local
2//! clients — the azalea equivalent of the original's replicator plugin.
3//!
4//! Clientbound traffic is broadcast to every attached client; only the
5//! controller's serverbound traffic reaches the target server. Viewers'
6//! serverbound frames (keepalive replies, teleport confirms, everything)
7//! are swallowed here — the server only ever hears the controller.
8//!
9//! Everything runs through one actor task that owns all mutable state;
10//! upstream and client sockets talk to it over channels, so there are no
11//! locks on the packet path.
12//!
13//! Mid-session joins need the config-state registry data the server only
14//! sent once, so the session keeps a minimal JoinCache: config frames,
15//! the game Login packet, and the last teleport. That's just enough for
16//! a viewer to reach the game state — chunks/entities/inventory replay
17//! is Phase 3, so until then late viewers spawn over the void and see
18//! only live traffic from their join onward.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use azalea_protocol::connect::Connection;
24use azalea_protocol::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
25use eyre::Result;
26use tokio::sync::{broadcast, mpsc};
27
28use uuid::Uuid;
29
30use crate::ProxyEvent;
31
32/// Behavior knobs forwarded from the builder.
33#[derive(Clone)]
34pub struct SessionOpts {
35    /// Refuse attaches beyond this many simultaneous clients.
36    pub max_clients: Option<usize>,
37    /// When the controller disconnects, promote the oldest live client
38    /// instead of going controllerless (the original's
39    /// `alwaysFirstControl`).
40    pub always_first_control: bool,
41}
42
43use crate::ids;
44use crate::local_server::LocalClient;
45use crate::plugin::{Frame, Pipeline};
46use crate::reflect::{self, BotPose};
47use crate::relay::{AzaleaFrameSink, AzaleaFrameSource, FrameSink, FrameSource};
48use crate::upstream::Upstream;
49
50pub type ClientId = u32;
51
52pub enum SessionMsg {
53    FromUpstream(Frame),
54    UpstreamClosed(String),
55    FromClient(ClientId, Frame),
56    Attach {
57        id: ClientId,
58        tx: mpsc::Sender<Frame>,
59        username: String,
60        uuid: Uuid,
61    },
62    Detach(ClientId),
63    /// Once-a-second timer: while controllerless, the stand-in must
64    /// report the player's position like an idle client would.
65    StandInTick,
66}
67
68enum ClientState {
69    /// Attached before the session reached game state; replay starts once
70    /// the Login packet is cached.
71    Parked,
72    /// Config replay sent; waiting for the client's serverbound
73    /// FinishConfiguration ack.
74    Joining,
75    /// Receiving live broadcast.
76    Live,
77}
78
79struct ClientHandle {
80    tx: mpsc::Sender<Frame>,
81    state: ClientState,
82    username: String,
83    uuid: Uuid,
84    /// Swallow the accept for a proxy-synthesized handoff teleport so it
85    /// never reaches the server.
86    swallow_next_accept: bool,
87    /// `,spectate` camera lock currently active for this viewer.
88    camera_on: bool,
89}
90
91/// Join cache: config replay + world state a late viewer needs. Chunks
92/// are cached raw, keyed by coordinates parsed from the frame body —
93/// the vanilla client refuses to leave "Loading terrain..." until the
94/// chunk under its feet loads, so chunk replay is a join requirement,
95/// not a nicety. Everything else world-shaped (entities, players,
96/// scoreboards, inventory, vitals) lives in the WorldSnapshot.
97#[derive(Default)]
98struct JoinCache {
99    config_frames: Vec<Frame>,
100    login: Option<Frame>,
101    last_position: Option<Frame>,
102    respawn: Option<Frame>,
103    spawn_pos: Option<Frame>,
104    chunk_center: Option<Frame>,
105    chunk_radius: Option<Frame>,
106    chunks: HashMap<(i32, i32), Frame>,
107    world: crate::snapshot::WorldSnapshot,
108}
109
110impl JoinCache {
111    /// The dimension changed: everything tied to the old world is stale.
112    fn on_respawn(&mut self, respawn: Frame) {
113        self.respawn = Some(respawn);
114        self.last_position = None;
115        self.spawn_pos = None;
116        self.chunk_center = None;
117        self.chunks.clear();
118        self.world.on_respawn();
119    }
120
121    /// The game-state frames to replay at a viewer that just entered the
122    /// game state, in vanilla join order: identity, position, the
123    /// chunk-loading handshake, then the world snapshot.
124    fn join_frames(&self) -> Vec<Frame> {
125        let mut q = Vec::with_capacity(self.chunks.len() + 32);
126        q.extend(self.login.iter().cloned());
127        q.extend(self.respawn.iter().cloned());
128        q.extend(self.spawn_pos.iter().cloned());
129        q.extend(self.last_position.iter().cloned());
130        q.push(ids::wait_for_chunks_frame());
131        q.extend(self.chunk_radius.iter().cloned());
132        q.extend(self.chunk_center.iter().cloned());
133        q.extend(self.chunks.values().cloned());
134        q.extend(self.world.replay());
135        q
136    }
137}
138
139enum UpstreamState {
140    Config,
141    Game,
142}
143
144struct Session {
145    pipeline: Arc<Pipeline>,
146    upstream_tx: mpsc::Sender<Frame>,
147    clients: HashMap<ClientId, ClientHandle>,
148    /// Whoever's serverbound traffic reaches the server. None = nobody:
149    /// the proxy answers keepalives/teleports itself and the session
150    /// player stands AFK.
151    controller: Option<ClientId>,
152    cache: JoinCache,
153    upstream_state: UpstreamState,
154    seen_first_game_frame: bool,
155    /// The real account's identity, for the reflected entity viewers see.
156    bot_uuid: Uuid,
157    bot_name: String,
158    pose: BotPose,
159    /// The session player's actual game mode (from Login / game events),
160    /// restored to a client when it acquires control.
161    real_game_mode: u8,
162    /// Last clientbound abilities frame, replayed to a new controller.
163    abilities: Option<Frame>,
164    /// Entities were wiped client-side (login/respawn); the reflected
165    /// entity must be re-spawned at the next known pose.
166    respawn_entity_pending: bool,
167    /// Viewers normally don't get the session's PlayerPosition frames
168    /// (their camera is free), but after a dimension change they need
169    /// exactly one to land in the new world.
170    forward_next_position: bool,
171    /// The session player's entity id from the Login packet — a
172    /// viewer's own client entity, used to detach `,spectate` cameras.
173    real_player_id: Option<i32>,
174    opts: SessionOpts,
175    events: broadcast::Sender<ProxyEvent>,
176}
177
178/// Start a session: the controller is already logged in locally, the
179/// upstream leg is established. Returns the handle new viewers attach
180/// through; when it reports closed, the session is dead and the next
181/// connection becomes a fresh controller.
182pub fn spawn(
183    upstream: Upstream,
184    controller: LocalClient,
185    controller_id: ClientId,
186    pipeline: Arc<Pipeline>,
187    opts: SessionOpts,
188    events: broadcast::Sender<ProxyEvent>,
189) -> mpsc::Sender<SessionMsg> {
190    tracing::info!(
191        "session start: controller '{}', upstream compression threshold {:?}",
192        controller.username,
193        upstream.compression_threshold
194    );
195    let bot_uuid = upstream.profile.id;
196    let bot_name = upstream.profile.name.clone();
197
198    for p in &pipeline.plugins {
199        p.on_session_start();
200    }
201
202    let (msg_tx, msg_rx) = mpsc::channel::<SessionMsg>(1024);
203    let upstream_tx = start_upstream_io(upstream, msg_tx.clone());
204
205    let (ctl_tx, ctl_rx) = mpsc::channel::<Frame>(4096);
206    let mut clients = HashMap::new();
207    clients.insert(
208        controller_id,
209        ClientHandle {
210            tx: ctl_tx,
211            state: ClientState::Live,
212            username: controller.username.clone(),
213            uuid: controller.uuid,
214            swallow_next_accept: false,
215            camera_on: false,
216        },
217    );
218    start_client_io(controller_id, controller.connection, msg_tx.clone(), ctl_rx);
219
220    let _ = events.send(ProxyEvent::SessionStarted);
221    let _ = events.send(ProxyEvent::ClientJoined {
222        id: controller_id,
223        username: controller.username.clone(),
224    });
225    let _ = events.send(ProxyEvent::ControlChanged {
226        controller: Some((controller_id, controller.username.clone())),
227    });
228
229    let session = Session {
230        pipeline,
231        upstream_tx,
232        clients,
233        controller: Some(controller_id),
234        cache: JoinCache::default(),
235        upstream_state: UpstreamState::Config,
236        seen_first_game_frame: false,
237        bot_uuid,
238        bot_name,
239        pose: BotPose::default(),
240        real_game_mode: 0,
241        abilities: None,
242        respawn_entity_pending: false,
243        forward_next_position: false,
244        real_player_id: None,
245        opts,
246        events,
247    };
248    // drive the stand-in heartbeat; ends when the session drops msg_rx
249    let tick_tx = msg_tx.clone();
250    tokio::spawn(async move {
251        let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
252        loop {
253            ticker.tick().await;
254            if tick_tx.send(SessionMsg::StandInTick).await.is_err() {
255                break;
256            }
257        }
258    });
259
260    tokio::spawn(session.run(msg_rx));
261    msg_tx
262}
263
264/// Attach an already-logged-in local client to a running session as a
265/// viewer. The Attach message is sent before the reader task spawns so
266/// the session never sees a FromClient for an unknown id.
267pub async fn attach_viewer(
268    session_tx: &mpsc::Sender<SessionMsg>,
269    id: ClientId,
270    client: LocalClient,
271) -> Result<()> {
272    // Sized for the worst-case join replay: a render-distance-32 world
273    // is ~4200 cached chunk frames queued in one burst.
274    let (tx, rx) = mpsc::channel::<Frame>(8192);
275    session_tx
276        .send(SessionMsg::Attach {
277            id,
278            tx,
279            username: client.username.clone(),
280            uuid: client.uuid,
281        })
282        .await
283        .map_err(|_| eyre::eyre!("session closed while attaching"))?;
284    start_client_io(id, client.connection, session_tx.clone(), rx);
285    Ok(())
286}
287
288/// Game mode and player entity id of the session player, from the
289/// Login packet.
290fn login_info(f: &Frame) -> (Option<u8>, Option<i32>) {
291    use azalea_protocol::packets::ProtocolPacket;
292    use azalea_protocol::packets::game::ClientboundGamePacket;
293    use std::io::Cursor;
294    match ClientboundGamePacket::read(f.packet_id, &mut Cursor::new(&f.body[..])) {
295        Ok(ClientboundGamePacket::Login(l)) => {
296            (Some(l.common.game_type.to_id()), Some(l.player_id.0))
297        }
298        _ => (None, None),
299    }
300}
301
302fn start_upstream_io(upstream: Upstream, msg_tx: mpsc::Sender<SessionMsg>) -> mpsc::Sender<Frame> {
303    let (read, write) = upstream.connection.into_split_raw();
304    let (tx, mut rx) = mpsc::channel::<Frame>(1024);
305
306    tokio::spawn(async move {
307        let mut sink = AzaleaFrameSink { writer: write };
308        while let Some(f) = rx.recv().await {
309            if let Err(e) = sink.write_frame(f).await {
310                tracing::warn!("upstream write failed: {e:#}");
311                break;
312            }
313        }
314    });
315
316    tokio::spawn(async move {
317        let mut src = AzaleaFrameSource { reader: read };
318        loop {
319            match src.read_frame().await {
320                Ok(f) => {
321                    if msg_tx.send(SessionMsg::FromUpstream(f)).await.is_err() {
322                        break;
323                    }
324                }
325                Err(e) => {
326                    let _ = msg_tx
327                        .send(SessionMsg::UpstreamClosed(format!("{e:#}")))
328                        .await;
329                    break;
330                }
331            }
332        }
333    });
334
335    tx
336}
337
338fn start_client_io(
339    id: ClientId,
340    conn: Connection<ServerboundConfigPacket, ClientboundConfigPacket>,
341    msg_tx: mpsc::Sender<SessionMsg>,
342    mut frame_rx: mpsc::Receiver<Frame>,
343) {
344    let (read, write) = conn.into_split_raw();
345
346    tokio::spawn(async move {
347        let mut sink = AzaleaFrameSink { writer: write };
348        while let Some(f) = frame_rx.recv().await {
349            if let Err(e) = sink.write_frame(f).await {
350                tracing::debug!("client {id} write failed: {e:#}");
351                break;
352            }
353        }
354    });
355
356    tokio::spawn(async move {
357        let mut src = AzaleaFrameSource { reader: read };
358        loop {
359            match src.read_frame().await {
360                Ok(f) => {
361                    if msg_tx.send(SessionMsg::FromClient(id, f)).await.is_err() {
362                        break;
363                    }
364                }
365                Err(e) => {
366                    tracing::debug!("client {id} read ended: {e:#}");
367                    let _ = msg_tx.send(SessionMsg::Detach(id)).await;
368                    break;
369                }
370            }
371        }
372    });
373}
374
375impl Session {
376    async fn run(mut self, mut rx: mpsc::Receiver<SessionMsg>) {
377        while let Some(msg) = rx.recv().await {
378            match msg {
379                SessionMsg::FromUpstream(frame) => self.on_upstream_frame(frame).await,
380                SessionMsg::UpstreamClosed(reason) => {
381                    tracing::info!("upstream closed: {reason}");
382                    break;
383                }
384                SessionMsg::FromClient(id, frame) => {
385                    if let Err(e) = self.on_client_frame(id, frame).await {
386                        tracing::info!("session ending: {e:#}");
387                        break;
388                    }
389                }
390                SessionMsg::Attach {
391                    id,
392                    tx,
393                    username,
394                    uuid,
395                } => self.on_attach(id, tx, username, uuid),
396                SessionMsg::Detach(id) => self.drop_client(id, "disconnected"),
397                SessionMsg::StandInTick => self.stand_in_tick().await,
398            }
399            if self.clients.is_empty() {
400                tracing::info!("last client left; tearing session down");
401                break;
402            }
403        }
404        tracing::info!(
405            "session ended ({} client(s) still attached will be dropped)",
406            self.clients.len()
407        );
408        let _ = self.events.send(ProxyEvent::SessionEnded);
409    }
410
411    async fn on_upstream_frame(&mut self, frame: Frame) {
412        for f in self.pipeline.clientbound(frame) {
413            let id = f.packet_id;
414            self.observe_clientbound(&f);
415            self.stand_in(&f).await;
416            self.broadcast(f).await;
417            // Login/Respawn reset client game modes — re-spectator every
418            // viewer AFTER they processed the reset
419            if matches!(self.upstream_state, UpstreamState::Game)
420                && (id == ids::CB_GAME_LOGIN || id == ids::CB_GAME_RESPAWN)
421            {
422                self.reassert_spectators();
423            }
424        }
425    }
426
427    /// With no controller attached, the proxy must keep the session
428    /// alive itself: answer keepalives and confirm teleports. (Duplicate
429    /// replies are dangerous, so this runs ONLY when controllerless.)
430    async fn stand_in(&mut self, f: &Frame) {
431        if self.controller.is_some() {
432            return;
433        }
434        let reply = match self.upstream_state {
435            UpstreamState::Game => match f.packet_id {
436                ids::CB_GAME_KEEP_ALIVE => reflect::keepalive_id(f).map(reflect::keepalive_reply),
437                ids::CB_GAME_PLAYER_POSITION => {
438                    reflect::teleport_id(f).map(reflect::accept_teleport_frame)
439                }
440                _ => None,
441            },
442            UpstreamState::Config => match f.packet_id {
443                ids::CB_CONFIG_KEEP_ALIVE => {
444                    reflect::keepalive_id(f).map(reflect::config_keepalive_reply)
445                }
446                _ => None,
447            },
448        };
449        if let Some(r) = reply {
450            if self.upstream_tx.send(r).await.is_err() {
451                tracing::warn!("stand-in reply failed: upstream writer closed");
452            }
453        }
454    }
455
456    /// Controllerless position heartbeat. An idle vanilla client still
457    /// reports its position every second; without this, Hypixel treats
458    /// the silent movement stream as a broken connection ("Out of sync,
459    /// check your internet connection!") and dumps the player to Limbo
460    /// ~15s after the controller releases or disconnects.
461    async fn stand_in_tick(&mut self) {
462        if self.controller.is_some() || !matches!(self.upstream_state, UpstreamState::Game) {
463            return;
464        }
465        let Some(f) = reflect::idle_move_frame(&self.pose) else {
466            return; // pose unknown until the first teleport lands
467        };
468        if self.upstream_tx.send(f).await.is_err() {
469            tracing::warn!("stand-in heartbeat failed: upstream writer closed");
470        }
471    }
472
473    /// Track upstream protocol state and maintain the join cache. Runs on
474    /// post-pipeline frames, so the cache holds what clients actually saw.
475    fn observe_clientbound(&mut self, f: &Frame) {
476        match self.upstream_state {
477            UpstreamState::Config => match f.packet_id {
478                ids::CB_CONFIG_FINISH => {
479                    self.upstream_state = UpstreamState::Game;
480                    self.seen_first_game_frame = false;
481                }
482                // never replay stale keepalives/pings to a joining viewer
483                ids::CB_CONFIG_KEEP_ALIVE | ids::CB_CONFIG_PING => {}
484                _ => self.cache.config_frames.push(f.clone()),
485            },
486            UpstreamState::Game => {
487                if !self.seen_first_game_frame {
488                    self.seen_first_game_frame = true;
489                    if f.packet_id != ids::CB_GAME_LOGIN {
490                        // runtime guard for the one id we can't pin in tests
491                        tracing::warn!(
492                            "first game-state frame has id {} but Login should be {} — \
493                             ids.rs may be stale for this azalea version",
494                            f.packet_id,
495                            ids::CB_GAME_LOGIN
496                        );
497                    }
498                }
499                self.cache.world.observe(f);
500                match f.packet_id {
501                    ids::CB_GAME_LOGIN => {
502                        self.cache.login = Some(f.clone());
503                        let (mode, pid) = login_info(f);
504                        self.real_game_mode = mode.unwrap_or(0);
505                        self.real_player_id = pid;
506                        // reconfiguration path: Live viewers' entities were
507                        // wiped and they need the upcoming position
508                        self.respawn_entity_pending = true;
509                        self.forward_next_position = true;
510                        self.flush_parked();
511                    }
512                    ids::CB_GAME_PLAYER_POSITION => {
513                        self.cache.last_position = Some(f.clone());
514                        reflect::apply_server_teleport(&mut self.pose, f);
515                    }
516                    ids::CB_GAME_RESPAWN => {
517                        self.cache.on_respawn(f.clone());
518                        // dimension change wipes entities and positions
519                        self.pose.pos = None;
520                        self.respawn_entity_pending = true;
521                        self.forward_next_position = true;
522                    }
523                    ids::CB_GAME_PLAYER_ABILITIES => {
524                        self.abilities = Some(f.clone());
525                    }
526                    ids::CB_GAME_GAME_EVENT => {
527                        // event 3 = the session player's mode changed
528                        if f.body.first() == Some(&3) {
529                            if let Some(mode) = f.body.get(1..5).and_then(|b| {
530                                b.try_into().ok().map(|a| f32::from_be_bytes(a) as u8)
531                            }) {
532                                self.real_game_mode = mode;
533                            }
534                        }
535                    }
536                    ids::CB_GAME_SET_DEFAULT_SPAWN_POSITION => {
537                        self.cache.spawn_pos = Some(f.clone());
538                    }
539                    ids::CB_GAME_SET_CHUNK_CACHE_CENTER => {
540                        self.cache.chunk_center = Some(f.clone());
541                    }
542                    ids::CB_GAME_SET_CHUNK_CACHE_RADIUS => {
543                        self.cache.chunk_radius = Some(f.clone());
544                    }
545                    ids::CB_GAME_LEVEL_CHUNK_WITH_LIGHT => {
546                        if let Some(key) = ids::chunk_key(&f.body) {
547                            self.cache.chunks.insert(key, f.clone());
548                        }
549                    }
550                    ids::CB_GAME_FORGET_LEVEL_CHUNK => {
551                        if let Some(key) = ids::forget_chunk_key(&f.body) {
552                            self.cache.chunks.remove(&key);
553                        }
554                    }
555                    ids::CB_GAME_START_CONFIGURATION => {
556                        // server is reconfiguring: every cached frame is
557                        // stale. Live viewers follow the transition like
558                        // the controller does (their acks are swallowed).
559                        self.upstream_state = UpstreamState::Config;
560                        self.cache = JoinCache::default();
561                    }
562                    _ => {}
563                }
564            }
565        }
566    }
567
568    /// Controller frames go upstream (through the pipeline); viewer
569    /// frames are swallowed except join acks. `,commands` work from
570    /// anyone and never reach the server.
571    async fn on_client_frame(&mut self, id: ClientId, frame: Frame) -> Result<()> {
572        // chat commands, from controller and viewers alike
573        if matches!(self.upstream_state, UpstreamState::Game) {
574            if let Some(text) = reflect::chat_text(&frame) {
575                if text.starts_with(',') {
576                    self.handle_command(id, text.trim()).await?;
577                    return Ok(());
578                }
579            }
580        }
581
582        if Some(id) == self.controller {
583            // swallow the accept for a proxy-issued handoff teleport
584            if frame.packet_id == ids::SB_GAME_ACCEPT_TELEPORTATION
585                && matches!(self.upstream_state, UpstreamState::Game)
586            {
587                if let Some(c) = self.clients.get_mut(&id) {
588                    // only the accept whose id matches the synthesized
589                    // handoff teleport is swallowed. A real server teleport
590                    // can race the handoff; blindly eating the NEXT accept
591                    // would forward the handoff id (garbage to the server)
592                    // and drop the real confirm — instant desync.
593                    if c.swallow_next_accept
594                        && reflect::teleport_id(&frame) == Some(reflect::HANDOFF_TELEPORT_ID)
595                    {
596                        c.swallow_next_accept = false;
597                        return Ok(());
598                    }
599                }
600            }
601            // mirror the bot's movement onto the reflected entity before
602            // the frame moves on
603            if reflect::apply_controller_move(&mut self.pose, &frame) {
604                let update = if self.respawn_entity_pending && self.pose.pos.is_some() {
605                    self.respawn_entity_pending = false;
606                    reflect::spawn_frames(self.bot_uuid, &self.pose)
607                } else {
608                    reflect::move_frames(&self.pose)
609                };
610                self.send_to_viewers(&update);
611            }
612            for f in self.pipeline.serverbound(frame) {
613                if self.upstream_tx.send(f).await.is_err() {
614                    eyre::bail!("upstream writer closed");
615                }
616            }
617            return Ok(());
618        }
619
620        let is_join_ack = matches!(
621            self.clients.get(&id),
622            Some(c) if matches!(c.state, ClientState::Joining)
623        ) && frame.packet_id == ids::SB_CONFIG_FINISH;
624
625        if is_join_ack {
626            let mut queue = self.cache.join_frames();
627            // spectator kit + the reflected bot (tab entry, then entity)
628            let (uuid, name) = {
629                let c = self.clients.get(&id).expect("checked above");
630                (c.uuid, c.username.clone())
631            };
632            queue.extend(reflect::spectator_kit(uuid, &name));
633            queue.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
634            queue.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
635            let c = self.clients.get_mut(&id).expect("checked above");
636            let mut ok = true;
637            for f in queue {
638                if c.tx.try_send(f).is_err() {
639                    ok = false;
640                    break;
641                }
642            }
643            if ok {
644                c.state = ClientState::Live;
645                tracing::info!("viewer {id} ('{}') is live", c.username);
646            } else {
647                self.drop_client(id, "queue overflow during join");
648            }
649        }
650        Ok(())
651    }
652
653    /// The `,command` set — port of the original's synchronization
654    /// plugin plus its command modules (acquire/release/spectate/
655    /// gamemode).
656    async fn handle_command(&mut self, id: ClientId, cmd: &str) -> Result<()> {
657        tracing::info!("client {id} issued command: {cmd}");
658        let (verb, arg) = match cmd.split_once(' ') {
659            Some((v, a)) => (v, a.trim()),
660            None => (cmd, ""),
661        };
662        match verb {
663            ",acquire" => {
664                if Some(id) == self.controller {
665                    self.feedback(id, "you already have control");
666                    return Ok(());
667                }
668                // demote whoever had it
669                if let Some(old) = self.controller.take() {
670                    self.demote_to_spectator(old);
671                    self.feedback(old, "your control was taken by another client");
672                }
673                self.promote_to_controller(id);
674                self.feedback(id, "you have control now");
675            }
676            ",release" => {
677                if Some(id) == self.controller {
678                    self.controller = None;
679                    self.demote_to_spectator(id);
680                    self.feedback(id, "control released; proxy is keeping the session alive");
681                    let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
682                } else {
683                    self.feedback(id, "you are not the controller");
684                }
685            }
686            ",spectate" => self.cmd_spectate(id, arg),
687            ",gamemode" | ",gm" => self.cmd_gamemode(id, arg),
688            _ => self.feedback(
689                id,
690                "commands: ,acquire ,release ,spectate [player] ,gamemode <0-3|name>",
691            ),
692        }
693        Ok(())
694    }
695
696    /// `,spectate [username]` — lock the camera to a player entity (no
697    /// arg = the reflected bot; repeat with no arg to detach).
698    fn cmd_spectate(&mut self, id: ClientId, arg: &str) {
699        if Some(id) == self.controller {
700            self.feedback(id, ",release first — the controller cannot spectate");
701            return;
702        }
703        let camera_on = match self.clients.get(&id) {
704            Some(c) => c.camera_on,
705            None => return,
706        };
707        let (target, turning_on) = if arg.is_empty() {
708            if camera_on {
709                // toggle off: back to the viewer's own client entity
710                let Some(own) = self.real_player_id else {
711                    self.feedback(id, "cannot detach camera yet (no login seen)");
712                    return;
713                };
714                (own, false)
715            } else {
716                (reflect::REFLECTED_ENTITY_ID, true)
717            }
718        } else if arg.eq_ignore_ascii_case(&self.bot_name) {
719            (reflect::REFLECTED_ENTITY_ID, true)
720        } else {
721            match self.cache.world.entity_id_for_player(arg) {
722                Some(eid) => (eid, true),
723                None => {
724                    self.feedback(id, "player not found (not in render distance?)");
725                    return;
726                }
727            }
728        };
729        if let Some(c) = self.clients.get_mut(&id) {
730            let _ = c.tx.try_send(reflect::camera_frame(target));
731            c.camera_on = turning_on;
732        }
733        self.feedback(
734            id,
735            if turning_on {
736                "camera attached — ,spectate again to detach"
737            } else {
738                "camera detached"
739            },
740        );
741    }
742
743    /// `,gamemode <0-3|name>` — client-side game mode for the issuing
744    /// viewer only (nothing reaches the server).
745    fn cmd_gamemode(&mut self, id: ClientId, arg: &str) {
746        if Some(id) == self.controller {
747            self.feedback(id, "not while controlling — it would desync your client");
748            return;
749        }
750        let mode = match arg.to_ascii_lowercase().as_str() {
751            "0" | "survival" => 0u8,
752            "1" | "creative" => 1,
753            "2" | "adventure" => 2,
754            "3" | "spectator" => 3,
755            _ => {
756                self.feedback(id, "usage: ,gamemode <survival|creative|adventure|spectator|0-3>");
757                return;
758            }
759        };
760        if let Some(c) = self.clients.get(&id) {
761            for f in reflect::gamemode_kit(c.uuid, &c.username, mode) {
762                let _ = c.tx.try_send(f);
763            }
764        }
765        self.feedback(id, "client-side game mode updated");
766    }
767
768    fn feedback(&mut self, id: ClientId, msg: &str) {
769        if let Some(c) = self.clients.get(&id) {
770            let _ = c.tx.try_send(reflect::system_chat_frame(msg));
771        }
772    }
773
774    /// Turn a client into a spectator: kit + the reflected bot entity.
775    fn demote_to_spectator(&mut self, id: ClientId) {
776        let Some(c) = self.clients.get(&id) else {
777            return;
778        };
779        let mut frames = reflect::spectator_kit(c.uuid, &c.username);
780        frames.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
781        frames.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
782        let c = self.clients.get(&id).expect("checked above");
783        for f in frames {
784            let _ = c.tx.try_send(f);
785        }
786    }
787
788    /// Turn a viewer into the controller: real game mode + abilities
789    /// back, ghost entity gone, client teleported onto the bot so its
790    /// movement continues from the right place (GrimAC-style alignment).
791    fn promote_to_controller(&mut self, id: ClientId) {
792        let Some(c) = self.clients.get(&id) else {
793            return;
794        };
795        let mut frames = reflect::controller_kit(c.uuid, &c.username, self.real_game_mode);
796        frames.extend(self.abilities.iter().cloned());
797        let teleport = reflect::handoff_teleport_frame(&self.pose);
798        let has_teleport = teleport.is_some();
799        frames.extend(teleport);
800        for f in frames {
801            let _ = c.tx.try_send(f);
802        }
803        if let Some(c) = self.clients.get_mut(&id) {
804            c.swallow_next_accept = has_teleport;
805            c.camera_on = false; // controller drives its own camera
806        }
807        self.controller = Some(id);
808        let username = self
809            .clients
810            .get(&id)
811            .map(|c| c.username.clone())
812            .unwrap_or_default();
813        let _ = self.events.send(ProxyEvent::ControlChanged {
814            controller: Some((id, username)),
815        });
816    }
817
818    /// Re-send the spectator kit to every Live viewer (after Login /
819    /// Respawn broadcasts, which reset client game modes).
820    fn reassert_spectators(&mut self) {
821        let viewers: Vec<(ClientId, Uuid, String)> = self
822            .clients
823            .iter()
824            .filter(|(&cid, c)| {
825                Some(cid) != self.controller && matches!(c.state, ClientState::Live)
826            })
827            .map(|(&cid, c)| (cid, c.uuid, c.username.clone()))
828            .collect();
829        for (cid, uuid, name) in viewers {
830            let kit = reflect::spectator_kit(uuid, &name);
831            if let Some(c) = self.clients.get(&cid) {
832                for f in kit {
833                    let _ = c.tx.try_send(f);
834                }
835            }
836        }
837    }
838
839    fn on_attach(&mut self, id: ClientId, tx: mpsc::Sender<Frame>, username: String, uuid: Uuid) {
840        if let Some(max) = self.opts.max_clients {
841            if self.clients.len() >= max {
842                tracing::info!("refusing viewer {id} ('{username}'): max_clients={max} reached");
843                // dropping tx closes the writer and the socket
844                return;
845            }
846        }
847        tracing::info!("viewer {id} ('{username}') attaching");
848        let _ = self.events.send(ProxyEvent::ClientJoined {
849            id,
850            username: username.clone(),
851        });
852        self.clients.insert(
853            id,
854            ClientHandle {
855                tx,
856                state: ClientState::Parked,
857                username,
858                uuid,
859                swallow_next_accept: false,
860                camera_on: false,
861            },
862        );
863        if self.cache.login.is_some() {
864            self.start_replay(id);
865        } else {
866            tracing::info!("viewer {id} parked until session reaches game state");
867        }
868    }
869
870    /// Queue the cached config frames + a synthesized FinishConfiguration
871    /// at a Parked viewer; it answers with the ack handled in
872    /// on_client_frame, which promotes it to Live.
873    fn start_replay(&mut self, id: ClientId) {
874        let mut frames = self.cache.config_frames.clone();
875        frames.push(ids::finish_config_frame());
876
877        let ok = {
878            let Some(c) = self.clients.get_mut(&id) else {
879                return;
880            };
881            let mut ok = true;
882            for f in frames {
883                if c.tx.try_send(f).is_err() {
884                    ok = false;
885                    break;
886                }
887            }
888            if ok {
889                c.state = ClientState::Joining;
890            }
891            ok
892        };
893        if !ok {
894            self.drop_client(id, "queue overflow during replay");
895        }
896    }
897
898    fn flush_parked(&mut self) {
899        let parked: Vec<ClientId> = self
900            .clients
901            .iter()
902            .filter(|(_, c)| matches!(c.state, ClientState::Parked))
903            .map(|(&id, _)| id)
904            .collect();
905        for id in parked {
906            self.start_replay(id);
907        }
908    }
909
910    /// Should a Live viewer receive this session-player frame? Viewers
911    /// are spectators with their own camera and game mode: the session's
912    /// teleports would yank their view to the bot, and its abilities /
913    /// game-mode changes would undo their spectator state. Only relevant
914    /// in the game state — config ids never reach these numbers.
915    fn viewers_receive(&mut self, f: &Frame) -> bool {
916        if !matches!(self.upstream_state, UpstreamState::Game) {
917            return true;
918        }
919        match f.packet_id {
920            ids::CB_GAME_PLAYER_POSITION => {
921                if self.forward_next_position {
922                    self.forward_next_position = false;
923                    true
924                } else {
925                    false
926                }
927            }
928            ids::CB_GAME_PLAYER_ABILITIES => false,
929            // GameEvent body starts with the event byte; 3 = ChangeGameMode
930            ids::CB_GAME_GAME_EVENT => f.body.first() != Some(&3),
931            _ => true,
932        }
933    }
934
935    /// Send a clientbound frame to every Live client. The controller gets
936    /// backpressure (awaited send — it must not lose frames); viewers get
937    /// try_send and are dropped if they can't keep up, so one laggy
938    /// spectator can never stall the real session.
939    async fn broadcast(&mut self, frame: Frame) {
940        let viewers_receive = self.viewers_receive(&frame);
941        let mut dead = Vec::new();
942        for (&id, c) in self.clients.iter() {
943            if !matches!(c.state, ClientState::Live) {
944                continue;
945            }
946            if Some(id) == self.controller {
947                if c.tx.send(frame.clone()).await.is_err() {
948                    dead.push(id);
949                }
950            } else if viewers_receive && c.tx.try_send(frame.clone()).is_err() {
951                dead.push(id);
952            }
953        }
954        for id in dead {
955            self.drop_client(id, "send failed or fell behind");
956        }
957    }
958
959    /// Push synthesized frames at every Live viewer (never the controller).
960    fn send_to_viewers(&mut self, frames: &[Frame]) {
961        let mut dead = Vec::new();
962        for (&id, c) in self.clients.iter() {
963            if Some(id) == self.controller || !matches!(c.state, ClientState::Live) {
964                continue;
965            }
966            for f in frames {
967                if c.tx.try_send(f.clone()).is_err() {
968                    dead.push(id);
969                    break;
970                }
971            }
972        }
973        for id in dead {
974            self.drop_client(id, "send failed or fell behind");
975        }
976    }
977
978    fn drop_client(&mut self, id: ClientId, reason: &str) {
979        if let Some(c) = self.clients.remove(&id) {
980            tracing::info!("client {id} ('{}') dropped: {reason}", c.username);
981            let _ = self.events.send(ProxyEvent::ClientLeft {
982                id,
983                username: c.username,
984            });
985            // dropping c.tx ends the writer task, which closes the socket
986        }
987        if self.controller == Some(id) {
988            self.controller = None;
989            if self.opts.always_first_control {
990                // the original's alwaysFirstControl: oldest live client
991                // inherits control immediately
992                let oldest = self
993                    .clients
994                    .iter()
995                    .filter(|(_, c)| matches!(c.state, ClientState::Live))
996                    .map(|(&cid, _)| cid)
997                    .min();
998                if let Some(next) = oldest {
999                    tracing::info!("controller left; promoting client {next} (always_first_control)");
1000                    self.promote_to_controller(next);
1001                    self.feedback(next, "previous controller left — you have control now");
1002                    return;
1003                }
1004            }
1005            // session survives controllerless: stand_in() takes over
1006            // keepalives/teleports until someone runs ,acquire
1007            tracing::info!("controller left; session is now controllerless (use ,acquire)");
1008            let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
1009        }
1010    }
1011}