Skip to main content

azalea_reflection_proxy/
session.rs

1//! Phase 2: the replicator. One upstream session shared by many local
2//! clients — the azalea equivalent of the original's replicator plugin.
3//!
4//! Clientbound traffic is broadcast to every attached client; only the
5//! controller's serverbound traffic reaches the target server. Viewers'
6//! serverbound frames (keepalive replies, teleport confirms, everything)
7//! are swallowed here — the server only ever hears the controller.
8//!
9//! Everything runs through one actor task that owns all mutable state;
10//! upstream and client sockets talk to it over channels, so there are no
11//! locks on the packet path.
12//!
13//! Mid-session joins need the config-state registry data the server only
14//! sent once, so the session keeps a minimal JoinCache: config frames,
15//! the game Login packet, and the last teleport. That's just enough for
16//! a viewer to reach the game state — chunks/entities/inventory replay
17//! is Phase 3, so until then late viewers spawn over the void and see
18//! only live traffic from their join onward.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use azalea_protocol::connect::Connection;
24use azalea_protocol::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
25use eyre::Result;
26use tokio::sync::{broadcast, mpsc};
27
28use uuid::Uuid;
29
30use crate::ProxyEvent;
31
32/// Behavior knobs forwarded from the builder.
33#[derive(Clone)]
34pub struct SessionOpts {
35    /// Refuse attaches beyond this many simultaneous clients.
36    pub max_clients: Option<usize>,
37    /// When the controller disconnects, promote the oldest live client
38    /// instead of going controllerless (the original's
39    /// `alwaysFirstControl`).
40    pub always_first_control: bool,
41}
42
43use crate::ids;
44use crate::local_server::LocalClient;
45use crate::plugin::{Frame, Pipeline};
46use crate::reflect::{self, BotPose};
47use crate::relay::{AzaleaFrameSink, AzaleaFrameSource, FrameSink, FrameSource};
48use crate::upstream::Upstream;
49
50pub type ClientId = u32;
51
52pub enum SessionMsg {
53    FromUpstream(Frame),
54    UpstreamClosed(String),
55    FromClient(ClientId, Frame),
56    Attach {
57        id: ClientId,
58        tx: mpsc::Sender<Frame>,
59        username: String,
60        uuid: Uuid,
61    },
62    Detach(ClientId),
63}
64
65enum ClientState {
66    /// Attached before the session reached game state; replay starts once
67    /// the Login packet is cached.
68    Parked,
69    /// Config replay sent; waiting for the client's serverbound
70    /// FinishConfiguration ack.
71    Joining,
72    /// Receiving live broadcast.
73    Live,
74}
75
76struct ClientHandle {
77    tx: mpsc::Sender<Frame>,
78    state: ClientState,
79    username: String,
80    uuid: Uuid,
81    /// Swallow the accept for a proxy-synthesized handoff teleport so it
82    /// never reaches the server.
83    swallow_next_accept: bool,
84    /// `,spectate` camera lock currently active for this viewer.
85    camera_on: bool,
86}
87
88/// Join cache: config replay + world state a late viewer needs. Chunks
89/// are cached raw, keyed by coordinates parsed from the frame body —
90/// the vanilla client refuses to leave "Loading terrain..." until the
91/// chunk under its feet loads, so chunk replay is a join requirement,
92/// not a nicety. Everything else world-shaped (entities, players,
93/// scoreboards, inventory, vitals) lives in the WorldSnapshot.
94#[derive(Default)]
95struct JoinCache {
96    config_frames: Vec<Frame>,
97    login: Option<Frame>,
98    last_position: Option<Frame>,
99    respawn: Option<Frame>,
100    spawn_pos: Option<Frame>,
101    chunk_center: Option<Frame>,
102    chunk_radius: Option<Frame>,
103    chunks: HashMap<(i32, i32), Frame>,
104    world: crate::snapshot::WorldSnapshot,
105}
106
107impl JoinCache {
108    /// The dimension changed: everything tied to the old world is stale.
109    fn on_respawn(&mut self, respawn: Frame) {
110        self.respawn = Some(respawn);
111        self.last_position = None;
112        self.spawn_pos = None;
113        self.chunk_center = None;
114        self.chunks.clear();
115        self.world.on_respawn();
116    }
117
118    /// The game-state frames to replay at a viewer that just entered the
119    /// game state, in vanilla join order: identity, position, the
120    /// chunk-loading handshake, then the world snapshot.
121    fn join_frames(&self) -> Vec<Frame> {
122        let mut q = Vec::with_capacity(self.chunks.len() + 32);
123        q.extend(self.login.iter().cloned());
124        q.extend(self.respawn.iter().cloned());
125        q.extend(self.spawn_pos.iter().cloned());
126        q.extend(self.last_position.iter().cloned());
127        q.push(ids::wait_for_chunks_frame());
128        q.extend(self.chunk_radius.iter().cloned());
129        q.extend(self.chunk_center.iter().cloned());
130        q.extend(self.chunks.values().cloned());
131        q.extend(self.world.replay());
132        q
133    }
134}
135
136enum UpstreamState {
137    Config,
138    Game,
139}
140
141struct Session {
142    pipeline: Arc<Pipeline>,
143    upstream_tx: mpsc::Sender<Frame>,
144    clients: HashMap<ClientId, ClientHandle>,
145    /// Whoever's serverbound traffic reaches the server. None = nobody:
146    /// the proxy answers keepalives/teleports itself and the session
147    /// player stands AFK.
148    controller: Option<ClientId>,
149    cache: JoinCache,
150    upstream_state: UpstreamState,
151    seen_first_game_frame: bool,
152    /// The real account's identity, for the reflected entity viewers see.
153    bot_uuid: Uuid,
154    bot_name: String,
155    pose: BotPose,
156    /// The session player's actual game mode (from Login / game events),
157    /// restored to a client when it acquires control.
158    real_game_mode: u8,
159    /// Last clientbound abilities frame, replayed to a new controller.
160    abilities: Option<Frame>,
161    /// Entities were wiped client-side (login/respawn); the reflected
162    /// entity must be re-spawned at the next known pose.
163    respawn_entity_pending: bool,
164    /// Viewers normally don't get the session's PlayerPosition frames
165    /// (their camera is free), but after a dimension change they need
166    /// exactly one to land in the new world.
167    forward_next_position: bool,
168    /// The session player's entity id from the Login packet — a
169    /// viewer's own client entity, used to detach `,spectate` cameras.
170    real_player_id: Option<i32>,
171    opts: SessionOpts,
172    events: broadcast::Sender<ProxyEvent>,
173}
174
175/// Start a session: the controller is already logged in locally, the
176/// upstream leg is established. Returns the handle new viewers attach
177/// through; when it reports closed, the session is dead and the next
178/// connection becomes a fresh controller.
179pub fn spawn(
180    upstream: Upstream,
181    controller: LocalClient,
182    controller_id: ClientId,
183    pipeline: Arc<Pipeline>,
184    opts: SessionOpts,
185    events: broadcast::Sender<ProxyEvent>,
186) -> mpsc::Sender<SessionMsg> {
187    tracing::info!(
188        "session start: controller '{}', upstream compression threshold {:?}",
189        controller.username,
190        upstream.compression_threshold
191    );
192    let bot_uuid = upstream.profile.id;
193    let bot_name = upstream.profile.name.clone();
194
195    for p in &pipeline.plugins {
196        p.on_session_start();
197    }
198
199    let (msg_tx, msg_rx) = mpsc::channel::<SessionMsg>(1024);
200    let upstream_tx = start_upstream_io(upstream, msg_tx.clone());
201
202    let (ctl_tx, ctl_rx) = mpsc::channel::<Frame>(4096);
203    let mut clients = HashMap::new();
204    clients.insert(
205        controller_id,
206        ClientHandle {
207            tx: ctl_tx,
208            state: ClientState::Live,
209            username: controller.username.clone(),
210            uuid: controller.uuid,
211            swallow_next_accept: false,
212            camera_on: false,
213        },
214    );
215    start_client_io(controller_id, controller.connection, msg_tx.clone(), ctl_rx);
216
217    let _ = events.send(ProxyEvent::SessionStarted);
218    let _ = events.send(ProxyEvent::ClientJoined {
219        id: controller_id,
220        username: controller.username.clone(),
221    });
222    let _ = events.send(ProxyEvent::ControlChanged {
223        controller: Some((controller_id, controller.username.clone())),
224    });
225
226    let session = Session {
227        pipeline,
228        upstream_tx,
229        clients,
230        controller: Some(controller_id),
231        cache: JoinCache::default(),
232        upstream_state: UpstreamState::Config,
233        seen_first_game_frame: false,
234        bot_uuid,
235        bot_name,
236        pose: BotPose::default(),
237        real_game_mode: 0,
238        abilities: None,
239        respawn_entity_pending: false,
240        forward_next_position: false,
241        real_player_id: None,
242        opts,
243        events,
244    };
245    tokio::spawn(session.run(msg_rx));
246    msg_tx
247}
248
249/// Attach an already-logged-in local client to a running session as a
250/// viewer. The Attach message is sent before the reader task spawns so
251/// the session never sees a FromClient for an unknown id.
252pub async fn attach_viewer(
253    session_tx: &mpsc::Sender<SessionMsg>,
254    id: ClientId,
255    client: LocalClient,
256) -> Result<()> {
257    // Sized for the worst-case join replay: a render-distance-32 world
258    // is ~4200 cached chunk frames queued in one burst.
259    let (tx, rx) = mpsc::channel::<Frame>(8192);
260    session_tx
261        .send(SessionMsg::Attach {
262            id,
263            tx,
264            username: client.username.clone(),
265            uuid: client.uuid,
266        })
267        .await
268        .map_err(|_| eyre::eyre!("session closed while attaching"))?;
269    start_client_io(id, client.connection, session_tx.clone(), rx);
270    Ok(())
271}
272
273/// Game mode and player entity id of the session player, from the
274/// Login packet.
275fn login_info(f: &Frame) -> (Option<u8>, Option<i32>) {
276    use azalea_protocol::packets::ProtocolPacket;
277    use azalea_protocol::packets::game::ClientboundGamePacket;
278    use std::io::Cursor;
279    match ClientboundGamePacket::read(f.packet_id, &mut Cursor::new(&f.body[..])) {
280        Ok(ClientboundGamePacket::Login(l)) => {
281            (Some(l.common.game_type.to_id()), Some(l.player_id.0))
282        }
283        _ => (None, None),
284    }
285}
286
287fn start_upstream_io(upstream: Upstream, msg_tx: mpsc::Sender<SessionMsg>) -> mpsc::Sender<Frame> {
288    let (read, write) = upstream.connection.into_split_raw();
289    let (tx, mut rx) = mpsc::channel::<Frame>(1024);
290
291    tokio::spawn(async move {
292        let mut sink = AzaleaFrameSink { writer: write };
293        while let Some(f) = rx.recv().await {
294            if let Err(e) = sink.write_frame(f).await {
295                tracing::warn!("upstream write failed: {e:#}");
296                break;
297            }
298        }
299    });
300
301    tokio::spawn(async move {
302        let mut src = AzaleaFrameSource { reader: read };
303        loop {
304            match src.read_frame().await {
305                Ok(f) => {
306                    if msg_tx.send(SessionMsg::FromUpstream(f)).await.is_err() {
307                        break;
308                    }
309                }
310                Err(e) => {
311                    let _ = msg_tx
312                        .send(SessionMsg::UpstreamClosed(format!("{e:#}")))
313                        .await;
314                    break;
315                }
316            }
317        }
318    });
319
320    tx
321}
322
323fn start_client_io(
324    id: ClientId,
325    conn: Connection<ServerboundConfigPacket, ClientboundConfigPacket>,
326    msg_tx: mpsc::Sender<SessionMsg>,
327    mut frame_rx: mpsc::Receiver<Frame>,
328) {
329    let (read, write) = conn.into_split_raw();
330
331    tokio::spawn(async move {
332        let mut sink = AzaleaFrameSink { writer: write };
333        while let Some(f) = frame_rx.recv().await {
334            if let Err(e) = sink.write_frame(f).await {
335                tracing::debug!("client {id} write failed: {e:#}");
336                break;
337            }
338        }
339    });
340
341    tokio::spawn(async move {
342        let mut src = AzaleaFrameSource { reader: read };
343        loop {
344            match src.read_frame().await {
345                Ok(f) => {
346                    if msg_tx.send(SessionMsg::FromClient(id, f)).await.is_err() {
347                        break;
348                    }
349                }
350                Err(e) => {
351                    tracing::debug!("client {id} read ended: {e:#}");
352                    let _ = msg_tx.send(SessionMsg::Detach(id)).await;
353                    break;
354                }
355            }
356        }
357    });
358}
359
360impl Session {
361    async fn run(mut self, mut rx: mpsc::Receiver<SessionMsg>) {
362        while let Some(msg) = rx.recv().await {
363            match msg {
364                SessionMsg::FromUpstream(frame) => self.on_upstream_frame(frame).await,
365                SessionMsg::UpstreamClosed(reason) => {
366                    tracing::info!("upstream closed: {reason}");
367                    break;
368                }
369                SessionMsg::FromClient(id, frame) => {
370                    if let Err(e) = self.on_client_frame(id, frame).await {
371                        tracing::info!("session ending: {e:#}");
372                        break;
373                    }
374                }
375                SessionMsg::Attach {
376                    id,
377                    tx,
378                    username,
379                    uuid,
380                } => self.on_attach(id, tx, username, uuid),
381                SessionMsg::Detach(id) => self.drop_client(id, "disconnected"),
382            }
383            if self.clients.is_empty() {
384                tracing::info!("last client left; tearing session down");
385                break;
386            }
387        }
388        tracing::info!(
389            "session ended ({} client(s) still attached will be dropped)",
390            self.clients.len()
391        );
392        let _ = self.events.send(ProxyEvent::SessionEnded);
393    }
394
395    async fn on_upstream_frame(&mut self, frame: Frame) {
396        for f in self.pipeline.clientbound(frame) {
397            let id = f.packet_id;
398            self.observe_clientbound(&f);
399            self.stand_in(&f).await;
400            self.broadcast(f).await;
401            // Login/Respawn reset client game modes — re-spectator every
402            // viewer AFTER they processed the reset
403            if matches!(self.upstream_state, UpstreamState::Game)
404                && (id == ids::CB_GAME_LOGIN || id == ids::CB_GAME_RESPAWN)
405            {
406                self.reassert_spectators();
407            }
408        }
409    }
410
411    /// With no controller attached, the proxy must keep the session
412    /// alive itself: answer keepalives and confirm teleports. (Duplicate
413    /// replies are dangerous, so this runs ONLY when controllerless.)
414    async fn stand_in(&mut self, f: &Frame) {
415        if self.controller.is_some() {
416            return;
417        }
418        let reply = match self.upstream_state {
419            UpstreamState::Game => match f.packet_id {
420                ids::CB_GAME_KEEP_ALIVE => reflect::keepalive_id(f).map(reflect::keepalive_reply),
421                ids::CB_GAME_PLAYER_POSITION => {
422                    reflect::teleport_id(f).map(reflect::accept_teleport_frame)
423                }
424                _ => None,
425            },
426            UpstreamState::Config => match f.packet_id {
427                ids::CB_CONFIG_KEEP_ALIVE => {
428                    reflect::keepalive_id(f).map(reflect::config_keepalive_reply)
429                }
430                _ => None,
431            },
432        };
433        if let Some(r) = reply {
434            if self.upstream_tx.send(r).await.is_err() {
435                tracing::warn!("stand-in reply failed: upstream writer closed");
436            }
437        }
438    }
439
440    /// Track upstream protocol state and maintain the join cache. Runs on
441    /// post-pipeline frames, so the cache holds what clients actually saw.
442    fn observe_clientbound(&mut self, f: &Frame) {
443        match self.upstream_state {
444            UpstreamState::Config => match f.packet_id {
445                ids::CB_CONFIG_FINISH => {
446                    self.upstream_state = UpstreamState::Game;
447                    self.seen_first_game_frame = false;
448                }
449                // never replay stale keepalives/pings to a joining viewer
450                ids::CB_CONFIG_KEEP_ALIVE | ids::CB_CONFIG_PING => {}
451                _ => self.cache.config_frames.push(f.clone()),
452            },
453            UpstreamState::Game => {
454                if !self.seen_first_game_frame {
455                    self.seen_first_game_frame = true;
456                    if f.packet_id != ids::CB_GAME_LOGIN {
457                        // runtime guard for the one id we can't pin in tests
458                        tracing::warn!(
459                            "first game-state frame has id {} but Login should be {} — \
460                             ids.rs may be stale for this azalea version",
461                            f.packet_id,
462                            ids::CB_GAME_LOGIN
463                        );
464                    }
465                }
466                self.cache.world.observe(f);
467                match f.packet_id {
468                    ids::CB_GAME_LOGIN => {
469                        self.cache.login = Some(f.clone());
470                        let (mode, pid) = login_info(f);
471                        self.real_game_mode = mode.unwrap_or(0);
472                        self.real_player_id = pid;
473                        // reconfiguration path: Live viewers' entities were
474                        // wiped and they need the upcoming position
475                        self.respawn_entity_pending = true;
476                        self.forward_next_position = true;
477                        self.flush_parked();
478                    }
479                    ids::CB_GAME_PLAYER_POSITION => {
480                        self.cache.last_position = Some(f.clone());
481                        reflect::apply_server_teleport(&mut self.pose, f);
482                    }
483                    ids::CB_GAME_RESPAWN => {
484                        self.cache.on_respawn(f.clone());
485                        // dimension change wipes entities and positions
486                        self.pose.pos = None;
487                        self.respawn_entity_pending = true;
488                        self.forward_next_position = true;
489                    }
490                    ids::CB_GAME_PLAYER_ABILITIES => {
491                        self.abilities = Some(f.clone());
492                    }
493                    ids::CB_GAME_GAME_EVENT => {
494                        // event 3 = the session player's mode changed
495                        if f.body.first() == Some(&3) {
496                            if let Some(mode) = f.body.get(1..5).and_then(|b| {
497                                b.try_into().ok().map(|a| f32::from_be_bytes(a) as u8)
498                            }) {
499                                self.real_game_mode = mode;
500                            }
501                        }
502                    }
503                    ids::CB_GAME_SET_DEFAULT_SPAWN_POSITION => {
504                        self.cache.spawn_pos = Some(f.clone());
505                    }
506                    ids::CB_GAME_SET_CHUNK_CACHE_CENTER => {
507                        self.cache.chunk_center = Some(f.clone());
508                    }
509                    ids::CB_GAME_SET_CHUNK_CACHE_RADIUS => {
510                        self.cache.chunk_radius = Some(f.clone());
511                    }
512                    ids::CB_GAME_LEVEL_CHUNK_WITH_LIGHT => {
513                        if let Some(key) = ids::chunk_key(&f.body) {
514                            self.cache.chunks.insert(key, f.clone());
515                        }
516                    }
517                    ids::CB_GAME_FORGET_LEVEL_CHUNK => {
518                        if let Some(key) = ids::forget_chunk_key(&f.body) {
519                            self.cache.chunks.remove(&key);
520                        }
521                    }
522                    ids::CB_GAME_START_CONFIGURATION => {
523                        // server is reconfiguring: every cached frame is
524                        // stale. Live viewers follow the transition like
525                        // the controller does (their acks are swallowed).
526                        self.upstream_state = UpstreamState::Config;
527                        self.cache = JoinCache::default();
528                    }
529                    _ => {}
530                }
531            }
532        }
533    }
534
535    /// Controller frames go upstream (through the pipeline); viewer
536    /// frames are swallowed except join acks. `,commands` work from
537    /// anyone and never reach the server.
538    async fn on_client_frame(&mut self, id: ClientId, frame: Frame) -> Result<()> {
539        // chat commands, from controller and viewers alike
540        if matches!(self.upstream_state, UpstreamState::Game) {
541            if let Some(text) = reflect::chat_text(&frame) {
542                if text.starts_with(',') {
543                    self.handle_command(id, text.trim()).await?;
544                    return Ok(());
545                }
546            }
547        }
548
549        if Some(id) == self.controller {
550            // swallow the accept for a proxy-issued handoff teleport
551            if frame.packet_id == ids::SB_GAME_ACCEPT_TELEPORTATION
552                && matches!(self.upstream_state, UpstreamState::Game)
553            {
554                if let Some(c) = self.clients.get_mut(&id) {
555                    if c.swallow_next_accept {
556                        c.swallow_next_accept = false;
557                        return Ok(());
558                    }
559                }
560            }
561            // mirror the bot's movement onto the reflected entity before
562            // the frame moves on
563            if reflect::apply_controller_move(&mut self.pose, &frame) {
564                let update = if self.respawn_entity_pending && self.pose.pos.is_some() {
565                    self.respawn_entity_pending = false;
566                    reflect::spawn_frames(self.bot_uuid, &self.pose)
567                } else {
568                    reflect::move_frames(&self.pose)
569                };
570                self.send_to_viewers(&update);
571            }
572            for f in self.pipeline.serverbound(frame) {
573                if self.upstream_tx.send(f).await.is_err() {
574                    eyre::bail!("upstream writer closed");
575                }
576            }
577            return Ok(());
578        }
579
580        let is_join_ack = matches!(
581            self.clients.get(&id),
582            Some(c) if matches!(c.state, ClientState::Joining)
583        ) && frame.packet_id == ids::SB_CONFIG_FINISH;
584
585        if is_join_ack {
586            let mut queue = self.cache.join_frames();
587            // spectator kit + the reflected bot (tab entry, then entity)
588            let (uuid, name) = {
589                let c = self.clients.get(&id).expect("checked above");
590                (c.uuid, c.username.clone())
591            };
592            queue.extend(reflect::spectator_kit(uuid, &name));
593            queue.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
594            queue.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
595            let c = self.clients.get_mut(&id).expect("checked above");
596            let mut ok = true;
597            for f in queue {
598                if c.tx.try_send(f).is_err() {
599                    ok = false;
600                    break;
601                }
602            }
603            if ok {
604                c.state = ClientState::Live;
605                tracing::info!("viewer {id} ('{}') is live", c.username);
606            } else {
607                self.drop_client(id, "queue overflow during join");
608            }
609        }
610        Ok(())
611    }
612
613    /// The `,command` set — port of the original's synchronization
614    /// plugin plus its command modules (acquire/release/spectate/
615    /// gamemode).
616    async fn handle_command(&mut self, id: ClientId, cmd: &str) -> Result<()> {
617        tracing::info!("client {id} issued command: {cmd}");
618        let (verb, arg) = match cmd.split_once(' ') {
619            Some((v, a)) => (v, a.trim()),
620            None => (cmd, ""),
621        };
622        match verb {
623            ",acquire" => {
624                if Some(id) == self.controller {
625                    self.feedback(id, "you already have control");
626                    return Ok(());
627                }
628                // demote whoever had it
629                if let Some(old) = self.controller.take() {
630                    self.demote_to_spectator(old);
631                    self.feedback(old, "your control was taken by another client");
632                }
633                self.promote_to_controller(id);
634                self.feedback(id, "you have control now");
635            }
636            ",release" => {
637                if Some(id) == self.controller {
638                    self.controller = None;
639                    self.demote_to_spectator(id);
640                    self.feedback(id, "control released; proxy is keeping the session alive");
641                    let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
642                } else {
643                    self.feedback(id, "you are not the controller");
644                }
645            }
646            ",spectate" => self.cmd_spectate(id, arg),
647            ",gamemode" | ",gm" => self.cmd_gamemode(id, arg),
648            _ => self.feedback(
649                id,
650                "commands: ,acquire ,release ,spectate [player] ,gamemode <0-3|name>",
651            ),
652        }
653        Ok(())
654    }
655
656    /// `,spectate [username]` — lock the camera to a player entity (no
657    /// arg = the reflected bot; repeat with no arg to detach).
658    fn cmd_spectate(&mut self, id: ClientId, arg: &str) {
659        if Some(id) == self.controller {
660            self.feedback(id, ",release first — the controller cannot spectate");
661            return;
662        }
663        let camera_on = match self.clients.get(&id) {
664            Some(c) => c.camera_on,
665            None => return,
666        };
667        let (target, turning_on) = if arg.is_empty() {
668            if camera_on {
669                // toggle off: back to the viewer's own client entity
670                let Some(own) = self.real_player_id else {
671                    self.feedback(id, "cannot detach camera yet (no login seen)");
672                    return;
673                };
674                (own, false)
675            } else {
676                (reflect::REFLECTED_ENTITY_ID, true)
677            }
678        } else if arg.eq_ignore_ascii_case(&self.bot_name) {
679            (reflect::REFLECTED_ENTITY_ID, true)
680        } else {
681            match self.cache.world.entity_id_for_player(arg) {
682                Some(eid) => (eid, true),
683                None => {
684                    self.feedback(id, "player not found (not in render distance?)");
685                    return;
686                }
687            }
688        };
689        if let Some(c) = self.clients.get_mut(&id) {
690            let _ = c.tx.try_send(reflect::camera_frame(target));
691            c.camera_on = turning_on;
692        }
693        self.feedback(
694            id,
695            if turning_on {
696                "camera attached — ,spectate again to detach"
697            } else {
698                "camera detached"
699            },
700        );
701    }
702
703    /// `,gamemode <0-3|name>` — client-side game mode for the issuing
704    /// viewer only (nothing reaches the server).
705    fn cmd_gamemode(&mut self, id: ClientId, arg: &str) {
706        if Some(id) == self.controller {
707            self.feedback(id, "not while controlling — it would desync your client");
708            return;
709        }
710        let mode = match arg.to_ascii_lowercase().as_str() {
711            "0" | "survival" => 0u8,
712            "1" | "creative" => 1,
713            "2" | "adventure" => 2,
714            "3" | "spectator" => 3,
715            _ => {
716                self.feedback(id, "usage: ,gamemode <survival|creative|adventure|spectator|0-3>");
717                return;
718            }
719        };
720        if let Some(c) = self.clients.get(&id) {
721            for f in reflect::gamemode_kit(c.uuid, &c.username, mode) {
722                let _ = c.tx.try_send(f);
723            }
724        }
725        self.feedback(id, "client-side game mode updated");
726    }
727
728    fn feedback(&mut self, id: ClientId, msg: &str) {
729        if let Some(c) = self.clients.get(&id) {
730            let _ = c.tx.try_send(reflect::system_chat_frame(msg));
731        }
732    }
733
734    /// Turn a client into a spectator: kit + the reflected bot entity.
735    fn demote_to_spectator(&mut self, id: ClientId) {
736        let Some(c) = self.clients.get(&id) else {
737            return;
738        };
739        let mut frames = reflect::spectator_kit(c.uuid, &c.username);
740        frames.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
741        frames.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
742        let c = self.clients.get(&id).expect("checked above");
743        for f in frames {
744            let _ = c.tx.try_send(f);
745        }
746    }
747
748    /// Turn a viewer into the controller: real game mode + abilities
749    /// back, ghost entity gone, client teleported onto the bot so its
750    /// movement continues from the right place (GrimAC-style alignment).
751    fn promote_to_controller(&mut self, id: ClientId) {
752        let Some(c) = self.clients.get(&id) else {
753            return;
754        };
755        let mut frames = reflect::controller_kit(c.uuid, &c.username, self.real_game_mode);
756        frames.extend(self.abilities.iter().cloned());
757        let teleport = reflect::handoff_teleport_frame(&self.pose);
758        let has_teleport = teleport.is_some();
759        frames.extend(teleport);
760        for f in frames {
761            let _ = c.tx.try_send(f);
762        }
763        if let Some(c) = self.clients.get_mut(&id) {
764            c.swallow_next_accept = has_teleport;
765            c.camera_on = false; // controller drives its own camera
766        }
767        self.controller = Some(id);
768        let username = self
769            .clients
770            .get(&id)
771            .map(|c| c.username.clone())
772            .unwrap_or_default();
773        let _ = self.events.send(ProxyEvent::ControlChanged {
774            controller: Some((id, username)),
775        });
776    }
777
778    /// Re-send the spectator kit to every Live viewer (after Login /
779    /// Respawn broadcasts, which reset client game modes).
780    fn reassert_spectators(&mut self) {
781        let viewers: Vec<(ClientId, Uuid, String)> = self
782            .clients
783            .iter()
784            .filter(|(&cid, c)| {
785                Some(cid) != self.controller && matches!(c.state, ClientState::Live)
786            })
787            .map(|(&cid, c)| (cid, c.uuid, c.username.clone()))
788            .collect();
789        for (cid, uuid, name) in viewers {
790            let kit = reflect::spectator_kit(uuid, &name);
791            if let Some(c) = self.clients.get(&cid) {
792                for f in kit {
793                    let _ = c.tx.try_send(f);
794                }
795            }
796        }
797    }
798
799    fn on_attach(&mut self, id: ClientId, tx: mpsc::Sender<Frame>, username: String, uuid: Uuid) {
800        if let Some(max) = self.opts.max_clients {
801            if self.clients.len() >= max {
802                tracing::info!("refusing viewer {id} ('{username}'): max_clients={max} reached");
803                // dropping tx closes the writer and the socket
804                return;
805            }
806        }
807        tracing::info!("viewer {id} ('{username}') attaching");
808        let _ = self.events.send(ProxyEvent::ClientJoined {
809            id,
810            username: username.clone(),
811        });
812        self.clients.insert(
813            id,
814            ClientHandle {
815                tx,
816                state: ClientState::Parked,
817                username,
818                uuid,
819                swallow_next_accept: false,
820                camera_on: false,
821            },
822        );
823        if self.cache.login.is_some() {
824            self.start_replay(id);
825        } else {
826            tracing::info!("viewer {id} parked until session reaches game state");
827        }
828    }
829
830    /// Queue the cached config frames + a synthesized FinishConfiguration
831    /// at a Parked viewer; it answers with the ack handled in
832    /// on_client_frame, which promotes it to Live.
833    fn start_replay(&mut self, id: ClientId) {
834        let mut frames = self.cache.config_frames.clone();
835        frames.push(ids::finish_config_frame());
836
837        let ok = {
838            let Some(c) = self.clients.get_mut(&id) else {
839                return;
840            };
841            let mut ok = true;
842            for f in frames {
843                if c.tx.try_send(f).is_err() {
844                    ok = false;
845                    break;
846                }
847            }
848            if ok {
849                c.state = ClientState::Joining;
850            }
851            ok
852        };
853        if !ok {
854            self.drop_client(id, "queue overflow during replay");
855        }
856    }
857
858    fn flush_parked(&mut self) {
859        let parked: Vec<ClientId> = self
860            .clients
861            .iter()
862            .filter(|(_, c)| matches!(c.state, ClientState::Parked))
863            .map(|(&id, _)| id)
864            .collect();
865        for id in parked {
866            self.start_replay(id);
867        }
868    }
869
870    /// Should a Live viewer receive this session-player frame? Viewers
871    /// are spectators with their own camera and game mode: the session's
872    /// teleports would yank their view to the bot, and its abilities /
873    /// game-mode changes would undo their spectator state. Only relevant
874    /// in the game state — config ids never reach these numbers.
875    fn viewers_receive(&mut self, f: &Frame) -> bool {
876        if !matches!(self.upstream_state, UpstreamState::Game) {
877            return true;
878        }
879        match f.packet_id {
880            ids::CB_GAME_PLAYER_POSITION => {
881                if self.forward_next_position {
882                    self.forward_next_position = false;
883                    true
884                } else {
885                    false
886                }
887            }
888            ids::CB_GAME_PLAYER_ABILITIES => false,
889            // GameEvent body starts with the event byte; 3 = ChangeGameMode
890            ids::CB_GAME_GAME_EVENT => f.body.first() != Some(&3),
891            _ => true,
892        }
893    }
894
895    /// Send a clientbound frame to every Live client. The controller gets
896    /// backpressure (awaited send — it must not lose frames); viewers get
897    /// try_send and are dropped if they can't keep up, so one laggy
898    /// spectator can never stall the real session.
899    async fn broadcast(&mut self, frame: Frame) {
900        let viewers_receive = self.viewers_receive(&frame);
901        let mut dead = Vec::new();
902        for (&id, c) in self.clients.iter() {
903            if !matches!(c.state, ClientState::Live) {
904                continue;
905            }
906            if Some(id) == self.controller {
907                if c.tx.send(frame.clone()).await.is_err() {
908                    dead.push(id);
909                }
910            } else if viewers_receive && c.tx.try_send(frame.clone()).is_err() {
911                dead.push(id);
912            }
913        }
914        for id in dead {
915            self.drop_client(id, "send failed or fell behind");
916        }
917    }
918
919    /// Push synthesized frames at every Live viewer (never the controller).
920    fn send_to_viewers(&mut self, frames: &[Frame]) {
921        let mut dead = Vec::new();
922        for (&id, c) in self.clients.iter() {
923            if Some(id) == self.controller || !matches!(c.state, ClientState::Live) {
924                continue;
925            }
926            for f in frames {
927                if c.tx.try_send(f.clone()).is_err() {
928                    dead.push(id);
929                    break;
930                }
931            }
932        }
933        for id in dead {
934            self.drop_client(id, "send failed or fell behind");
935        }
936    }
937
938    fn drop_client(&mut self, id: ClientId, reason: &str) {
939        if let Some(c) = self.clients.remove(&id) {
940            tracing::info!("client {id} ('{}') dropped: {reason}", c.username);
941            let _ = self.events.send(ProxyEvent::ClientLeft {
942                id,
943                username: c.username,
944            });
945            // dropping c.tx ends the writer task, which closes the socket
946        }
947        if self.controller == Some(id) {
948            self.controller = None;
949            if self.opts.always_first_control {
950                // the original's alwaysFirstControl: oldest live client
951                // inherits control immediately
952                let oldest = self
953                    .clients
954                    .iter()
955                    .filter(|(_, c)| matches!(c.state, ClientState::Live))
956                    .map(|(&cid, _)| cid)
957                    .min();
958                if let Some(next) = oldest {
959                    tracing::info!("controller left; promoting client {next} (always_first_control)");
960                    self.promote_to_controller(next);
961                    self.feedback(next, "previous controller left — you have control now");
962                    return;
963                }
964            }
965            // session survives controllerless: stand_in() takes over
966            // keepalives/teleports until someone runs ,acquire
967            tracing::info!("controller left; session is now controllerless (use ,acquire)");
968            let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
969        }
970    }
971}