1use std::collections::HashMap;
21use std::sync::Arc;
22
23use azalea_protocol::connect::Connection;
24use azalea_protocol::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
25use eyre::Result;
26use tokio::sync::{broadcast, mpsc};
27
28use uuid::Uuid;
29
30use crate::ProxyEvent;
31
32#[derive(Clone)]
34pub struct SessionOpts {
35 pub max_clients: Option<usize>,
37 pub always_first_control: bool,
41}
42
43use crate::ids;
44use crate::local_server::LocalClient;
45use crate::plugin::{Frame, Pipeline};
46use crate::reflect::{self, BotPose};
47use crate::relay::{AzaleaFrameSink, AzaleaFrameSource, FrameSink, FrameSource};
48use crate::upstream::Upstream;
49
50pub type ClientId = u32;
51
52pub enum SessionMsg {
53 FromUpstream(Frame),
54 UpstreamClosed(String),
55 FromClient(ClientId, Frame),
56 Attach {
57 id: ClientId,
58 tx: mpsc::Sender<Frame>,
59 username: String,
60 uuid: Uuid,
61 },
62 Detach(ClientId),
63 StandInTick,
66}
67
68enum ClientState {
69 Parked,
72 Joining,
75 Live,
77}
78
79struct ClientHandle {
80 tx: mpsc::Sender<Frame>,
81 state: ClientState,
82 username: String,
83 uuid: Uuid,
84 swallow_next_accept: bool,
87 camera_on: bool,
89}
90
91#[derive(Default)]
98struct JoinCache {
99 config_frames: Vec<Frame>,
100 login: Option<Frame>,
101 last_position: Option<Frame>,
102 respawn: Option<Frame>,
103 spawn_pos: Option<Frame>,
104 chunk_center: Option<Frame>,
105 chunk_radius: Option<Frame>,
106 chunks: HashMap<(i32, i32), Frame>,
107 world: crate::snapshot::WorldSnapshot,
108}
109
110impl JoinCache {
111 fn on_respawn(&mut self, respawn: Frame) {
113 self.respawn = Some(respawn);
114 self.last_position = None;
115 self.spawn_pos = None;
116 self.chunk_center = None;
117 self.chunks.clear();
118 self.world.on_respawn();
119 }
120
121 fn join_frames(&self) -> Vec<Frame> {
125 let mut q = Vec::with_capacity(self.chunks.len() + 32);
126 q.extend(self.login.iter().cloned());
127 q.extend(self.respawn.iter().cloned());
128 q.extend(self.spawn_pos.iter().cloned());
129 q.extend(self.last_position.iter().cloned());
130 q.push(ids::wait_for_chunks_frame());
131 q.extend(self.chunk_radius.iter().cloned());
132 q.extend(self.chunk_center.iter().cloned());
133 q.extend(self.chunks.values().cloned());
134 q.extend(self.world.replay());
135 q
136 }
137}
138
139enum UpstreamState {
140 Config,
141 Game,
142}
143
144struct Session {
145 pipeline: Arc<Pipeline>,
146 upstream_tx: mpsc::Sender<Frame>,
147 clients: HashMap<ClientId, ClientHandle>,
148 controller: Option<ClientId>,
152 cache: JoinCache,
153 upstream_state: UpstreamState,
154 seen_first_game_frame: bool,
155 bot_uuid: Uuid,
157 bot_name: String,
158 pose: BotPose,
159 real_game_mode: u8,
162 abilities: Option<Frame>,
164 respawn_entity_pending: bool,
167 forward_next_position: bool,
171 real_player_id: Option<i32>,
174 opts: SessionOpts,
175 events: broadcast::Sender<ProxyEvent>,
176}
177
178pub fn spawn(
183 upstream: Upstream,
184 controller: LocalClient,
185 controller_id: ClientId,
186 pipeline: Arc<Pipeline>,
187 opts: SessionOpts,
188 events: broadcast::Sender<ProxyEvent>,
189) -> mpsc::Sender<SessionMsg> {
190 tracing::info!(
191 "session start: controller '{}', upstream compression threshold {:?}",
192 controller.username,
193 upstream.compression_threshold
194 );
195 let bot_uuid = upstream.profile.id;
196 let bot_name = upstream.profile.name.clone();
197
198 for p in &pipeline.plugins {
199 p.on_session_start();
200 }
201
202 let (msg_tx, msg_rx) = mpsc::channel::<SessionMsg>(1024);
203 let upstream_tx = start_upstream_io(upstream, msg_tx.clone());
204
205 let (ctl_tx, ctl_rx) = mpsc::channel::<Frame>(4096);
206 let mut clients = HashMap::new();
207 clients.insert(
208 controller_id,
209 ClientHandle {
210 tx: ctl_tx,
211 state: ClientState::Live,
212 username: controller.username.clone(),
213 uuid: controller.uuid,
214 swallow_next_accept: false,
215 camera_on: false,
216 },
217 );
218 start_client_io(controller_id, controller.connection, msg_tx.clone(), ctl_rx);
219
220 let _ = events.send(ProxyEvent::SessionStarted);
221 let _ = events.send(ProxyEvent::ClientJoined {
222 id: controller_id,
223 username: controller.username.clone(),
224 });
225 let _ = events.send(ProxyEvent::ControlChanged {
226 controller: Some((controller_id, controller.username.clone())),
227 });
228
229 let session = Session {
230 pipeline,
231 upstream_tx,
232 clients,
233 controller: Some(controller_id),
234 cache: JoinCache::default(),
235 upstream_state: UpstreamState::Config,
236 seen_first_game_frame: false,
237 bot_uuid,
238 bot_name,
239 pose: BotPose::default(),
240 real_game_mode: 0,
241 abilities: None,
242 respawn_entity_pending: false,
243 forward_next_position: false,
244 real_player_id: None,
245 opts,
246 events,
247 };
248 let tick_tx = msg_tx.clone();
250 tokio::spawn(async move {
251 let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
252 loop {
253 ticker.tick().await;
254 if tick_tx.send(SessionMsg::StandInTick).await.is_err() {
255 break;
256 }
257 }
258 });
259
260 tokio::spawn(session.run(msg_rx));
261 msg_tx
262}
263
264pub async fn attach_viewer(
268 session_tx: &mpsc::Sender<SessionMsg>,
269 id: ClientId,
270 client: LocalClient,
271) -> Result<()> {
272 let (tx, rx) = mpsc::channel::<Frame>(8192);
275 session_tx
276 .send(SessionMsg::Attach {
277 id,
278 tx,
279 username: client.username.clone(),
280 uuid: client.uuid,
281 })
282 .await
283 .map_err(|_| eyre::eyre!("session closed while attaching"))?;
284 start_client_io(id, client.connection, session_tx.clone(), rx);
285 Ok(())
286}
287
288fn login_info(f: &Frame) -> (Option<u8>, Option<i32>) {
291 use azalea_protocol::packets::ProtocolPacket;
292 use azalea_protocol::packets::game::ClientboundGamePacket;
293 use std::io::Cursor;
294 match ClientboundGamePacket::read(f.packet_id, &mut Cursor::new(&f.body[..])) {
295 Ok(ClientboundGamePacket::Login(l)) => {
296 (Some(l.common.game_type.to_id()), Some(l.player_id.0))
297 }
298 _ => (None, None),
299 }
300}
301
302fn start_upstream_io(upstream: Upstream, msg_tx: mpsc::Sender<SessionMsg>) -> mpsc::Sender<Frame> {
303 let (read, write) = upstream.connection.into_split_raw();
304 let (tx, mut rx) = mpsc::channel::<Frame>(1024);
305
306 tokio::spawn(async move {
307 let mut sink = AzaleaFrameSink { writer: write };
308 while let Some(f) = rx.recv().await {
309 if let Err(e) = sink.write_frame(f).await {
310 tracing::warn!("upstream write failed: {e:#}");
311 break;
312 }
313 }
314 });
315
316 tokio::spawn(async move {
317 let mut src = AzaleaFrameSource { reader: read };
318 loop {
319 match src.read_frame().await {
320 Ok(f) => {
321 if msg_tx.send(SessionMsg::FromUpstream(f)).await.is_err() {
322 break;
323 }
324 }
325 Err(e) => {
326 let _ = msg_tx
327 .send(SessionMsg::UpstreamClosed(format!("{e:#}")))
328 .await;
329 break;
330 }
331 }
332 }
333 });
334
335 tx
336}
337
338fn start_client_io(
339 id: ClientId,
340 conn: Connection<ServerboundConfigPacket, ClientboundConfigPacket>,
341 msg_tx: mpsc::Sender<SessionMsg>,
342 mut frame_rx: mpsc::Receiver<Frame>,
343) {
344 let (read, write) = conn.into_split_raw();
345
346 tokio::spawn(async move {
347 let mut sink = AzaleaFrameSink { writer: write };
348 while let Some(f) = frame_rx.recv().await {
349 if let Err(e) = sink.write_frame(f).await {
350 tracing::debug!("client {id} write failed: {e:#}");
351 break;
352 }
353 }
354 });
355
356 tokio::spawn(async move {
357 let mut src = AzaleaFrameSource { reader: read };
358 loop {
359 match src.read_frame().await {
360 Ok(f) => {
361 if msg_tx.send(SessionMsg::FromClient(id, f)).await.is_err() {
362 break;
363 }
364 }
365 Err(e) => {
366 tracing::debug!("client {id} read ended: {e:#}");
367 let _ = msg_tx.send(SessionMsg::Detach(id)).await;
368 break;
369 }
370 }
371 }
372 });
373}
374
375impl Session {
376 async fn run(mut self, mut rx: mpsc::Receiver<SessionMsg>) {
377 while let Some(msg) = rx.recv().await {
378 match msg {
379 SessionMsg::FromUpstream(frame) => self.on_upstream_frame(frame).await,
380 SessionMsg::UpstreamClosed(reason) => {
381 tracing::info!("upstream closed: {reason}");
382 break;
383 }
384 SessionMsg::FromClient(id, frame) => {
385 if let Err(e) = self.on_client_frame(id, frame).await {
386 tracing::info!("session ending: {e:#}");
387 break;
388 }
389 }
390 SessionMsg::Attach {
391 id,
392 tx,
393 username,
394 uuid,
395 } => self.on_attach(id, tx, username, uuid),
396 SessionMsg::Detach(id) => self.drop_client(id, "disconnected"),
397 SessionMsg::StandInTick => self.stand_in_tick().await,
398 }
399 if self.clients.is_empty() {
400 tracing::info!("last client left; tearing session down");
401 break;
402 }
403 }
404 tracing::info!(
405 "session ended ({} client(s) still attached will be dropped)",
406 self.clients.len()
407 );
408 let _ = self.events.send(ProxyEvent::SessionEnded);
409 }
410
411 async fn on_upstream_frame(&mut self, frame: Frame) {
412 for f in self.pipeline.clientbound(frame) {
413 let id = f.packet_id;
414 self.observe_clientbound(&f);
415 self.stand_in(&f).await;
416 self.broadcast(f).await;
417 if matches!(self.upstream_state, UpstreamState::Game)
420 && (id == ids::CB_GAME_LOGIN || id == ids::CB_GAME_RESPAWN)
421 {
422 self.reassert_spectators();
423 }
424 }
425 }
426
427 async fn stand_in(&mut self, f: &Frame) {
431 if self.controller.is_some() {
432 return;
433 }
434 let reply = match self.upstream_state {
435 UpstreamState::Game => match f.packet_id {
436 ids::CB_GAME_KEEP_ALIVE => reflect::keepalive_id(f).map(reflect::keepalive_reply),
437 ids::CB_GAME_PLAYER_POSITION => {
438 reflect::teleport_id(f).map(reflect::accept_teleport_frame)
439 }
440 _ => None,
441 },
442 UpstreamState::Config => match f.packet_id {
443 ids::CB_CONFIG_KEEP_ALIVE => {
444 reflect::keepalive_id(f).map(reflect::config_keepalive_reply)
445 }
446 _ => None,
447 },
448 };
449 if let Some(r) = reply {
450 if self.upstream_tx.send(r).await.is_err() {
451 tracing::warn!("stand-in reply failed: upstream writer closed");
452 }
453 }
454 }
455
456 async fn stand_in_tick(&mut self) {
462 if self.controller.is_some() || !matches!(self.upstream_state, UpstreamState::Game) {
463 return;
464 }
465 let Some(f) = reflect::idle_move_frame(&self.pose) else {
466 return; };
468 if self.upstream_tx.send(f).await.is_err() {
469 tracing::warn!("stand-in heartbeat failed: upstream writer closed");
470 }
471 }
472
473 fn observe_clientbound(&mut self, f: &Frame) {
476 match self.upstream_state {
477 UpstreamState::Config => match f.packet_id {
478 ids::CB_CONFIG_FINISH => {
479 self.upstream_state = UpstreamState::Game;
480 self.seen_first_game_frame = false;
481 }
482 ids::CB_CONFIG_KEEP_ALIVE | ids::CB_CONFIG_PING => {}
484 _ => self.cache.config_frames.push(f.clone()),
485 },
486 UpstreamState::Game => {
487 if !self.seen_first_game_frame {
488 self.seen_first_game_frame = true;
489 if f.packet_id != ids::CB_GAME_LOGIN {
490 tracing::warn!(
492 "first game-state frame has id {} but Login should be {} — \
493 ids.rs may be stale for this azalea version",
494 f.packet_id,
495 ids::CB_GAME_LOGIN
496 );
497 }
498 }
499 self.cache.world.observe(f);
500 match f.packet_id {
501 ids::CB_GAME_LOGIN => {
502 self.cache.login = Some(f.clone());
503 let (mode, pid) = login_info(f);
504 self.real_game_mode = mode.unwrap_or(0);
505 self.real_player_id = pid;
506 self.respawn_entity_pending = true;
509 self.forward_next_position = true;
510 self.flush_parked();
511 }
512 ids::CB_GAME_PLAYER_POSITION => {
513 self.cache.last_position = Some(f.clone());
514 reflect::apply_server_teleport(&mut self.pose, f);
515 }
516 ids::CB_GAME_RESPAWN => {
517 self.cache.on_respawn(f.clone());
518 self.pose.pos = None;
520 self.respawn_entity_pending = true;
521 self.forward_next_position = true;
522 }
523 ids::CB_GAME_PLAYER_ABILITIES => {
524 self.abilities = Some(f.clone());
525 }
526 ids::CB_GAME_GAME_EVENT => {
527 if f.body.first() == Some(&3) {
529 if let Some(mode) = f.body.get(1..5).and_then(|b| {
530 b.try_into().ok().map(|a| f32::from_be_bytes(a) as u8)
531 }) {
532 self.real_game_mode = mode;
533 }
534 }
535 }
536 ids::CB_GAME_SET_DEFAULT_SPAWN_POSITION => {
537 self.cache.spawn_pos = Some(f.clone());
538 }
539 ids::CB_GAME_SET_CHUNK_CACHE_CENTER => {
540 self.cache.chunk_center = Some(f.clone());
541 }
542 ids::CB_GAME_SET_CHUNK_CACHE_RADIUS => {
543 self.cache.chunk_radius = Some(f.clone());
544 }
545 ids::CB_GAME_LEVEL_CHUNK_WITH_LIGHT => {
546 if let Some(key) = ids::chunk_key(&f.body) {
547 self.cache.chunks.insert(key, f.clone());
548 }
549 }
550 ids::CB_GAME_FORGET_LEVEL_CHUNK => {
551 if let Some(key) = ids::forget_chunk_key(&f.body) {
552 self.cache.chunks.remove(&key);
553 }
554 }
555 ids::CB_GAME_START_CONFIGURATION => {
556 self.upstream_state = UpstreamState::Config;
560 self.cache = JoinCache::default();
561 }
562 _ => {}
563 }
564 }
565 }
566 }
567
568 async fn on_client_frame(&mut self, id: ClientId, frame: Frame) -> Result<()> {
572 if matches!(self.upstream_state, UpstreamState::Game) {
574 if let Some(text) = reflect::chat_text(&frame) {
575 if text.starts_with(',') {
576 self.handle_command(id, text.trim()).await?;
577 return Ok(());
578 }
579 }
580 }
581
582 if Some(id) == self.controller {
583 if frame.packet_id == ids::SB_GAME_ACCEPT_TELEPORTATION
585 && matches!(self.upstream_state, UpstreamState::Game)
586 {
587 if let Some(c) = self.clients.get_mut(&id) {
588 if c.swallow_next_accept
594 && reflect::teleport_id(&frame) == Some(reflect::HANDOFF_TELEPORT_ID)
595 {
596 c.swallow_next_accept = false;
597 return Ok(());
598 }
599 }
600 }
601 if reflect::apply_controller_move(&mut self.pose, &frame) {
604 let update = if self.respawn_entity_pending && self.pose.pos.is_some() {
605 self.respawn_entity_pending = false;
606 reflect::spawn_frames(self.bot_uuid, &self.pose)
607 } else {
608 reflect::move_frames(&self.pose)
609 };
610 self.send_to_viewers(&update);
611 }
612 for f in self.pipeline.serverbound(frame) {
613 if self.upstream_tx.send(f).await.is_err() {
614 eyre::bail!("upstream writer closed");
615 }
616 }
617 return Ok(());
618 }
619
620 let is_join_ack = matches!(
621 self.clients.get(&id),
622 Some(c) if matches!(c.state, ClientState::Joining)
623 ) && frame.packet_id == ids::SB_CONFIG_FINISH;
624
625 if is_join_ack {
626 let mut queue = self.cache.join_frames();
627 let (uuid, name) = {
629 let c = self.clients.get(&id).expect("checked above");
630 (c.uuid, c.username.clone())
631 };
632 queue.extend(reflect::spectator_kit(uuid, &name));
633 queue.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
634 queue.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
635 let c = self.clients.get_mut(&id).expect("checked above");
636 let mut ok = true;
637 for f in queue {
638 if c.tx.try_send(f).is_err() {
639 ok = false;
640 break;
641 }
642 }
643 if ok {
644 c.state = ClientState::Live;
645 tracing::info!("viewer {id} ('{}') is live", c.username);
646 } else {
647 self.drop_client(id, "queue overflow during join");
648 }
649 }
650 Ok(())
651 }
652
653 async fn handle_command(&mut self, id: ClientId, cmd: &str) -> Result<()> {
657 tracing::info!("client {id} issued command: {cmd}");
658 let (verb, arg) = match cmd.split_once(' ') {
659 Some((v, a)) => (v, a.trim()),
660 None => (cmd, ""),
661 };
662 match verb {
663 ",acquire" => {
664 if Some(id) == self.controller {
665 self.feedback(id, "you already have control");
666 return Ok(());
667 }
668 if let Some(old) = self.controller.take() {
670 self.demote_to_spectator(old);
671 self.feedback(old, "your control was taken by another client");
672 }
673 self.promote_to_controller(id);
674 self.feedback(id, "you have control now");
675 }
676 ",release" => {
677 if Some(id) == self.controller {
678 self.controller = None;
679 self.demote_to_spectator(id);
680 self.feedback(id, "control released; proxy is keeping the session alive");
681 let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
682 } else {
683 self.feedback(id, "you are not the controller");
684 }
685 }
686 ",spectate" => self.cmd_spectate(id, arg),
687 ",gamemode" | ",gm" => self.cmd_gamemode(id, arg),
688 _ => self.feedback(
689 id,
690 "commands: ,acquire ,release ,spectate [player] ,gamemode <0-3|name>",
691 ),
692 }
693 Ok(())
694 }
695
696 fn cmd_spectate(&mut self, id: ClientId, arg: &str) {
699 if Some(id) == self.controller {
700 self.feedback(id, ",release first — the controller cannot spectate");
701 return;
702 }
703 let camera_on = match self.clients.get(&id) {
704 Some(c) => c.camera_on,
705 None => return,
706 };
707 let (target, turning_on) = if arg.is_empty() {
708 if camera_on {
709 let Some(own) = self.real_player_id else {
711 self.feedback(id, "cannot detach camera yet (no login seen)");
712 return;
713 };
714 (own, false)
715 } else {
716 (reflect::REFLECTED_ENTITY_ID, true)
717 }
718 } else if arg.eq_ignore_ascii_case(&self.bot_name) {
719 (reflect::REFLECTED_ENTITY_ID, true)
720 } else {
721 match self.cache.world.entity_id_for_player(arg) {
722 Some(eid) => (eid, true),
723 None => {
724 self.feedback(id, "player not found (not in render distance?)");
725 return;
726 }
727 }
728 };
729 if let Some(c) = self.clients.get_mut(&id) {
730 let _ = c.tx.try_send(reflect::camera_frame(target));
731 c.camera_on = turning_on;
732 }
733 self.feedback(
734 id,
735 if turning_on {
736 "camera attached — ,spectate again to detach"
737 } else {
738 "camera detached"
739 },
740 );
741 }
742
743 fn cmd_gamemode(&mut self, id: ClientId, arg: &str) {
746 if Some(id) == self.controller {
747 self.feedback(id, "not while controlling — it would desync your client");
748 return;
749 }
750 let mode = match arg.to_ascii_lowercase().as_str() {
751 "0" | "survival" => 0u8,
752 "1" | "creative" => 1,
753 "2" | "adventure" => 2,
754 "3" | "spectator" => 3,
755 _ => {
756 self.feedback(id, "usage: ,gamemode <survival|creative|adventure|spectator|0-3>");
757 return;
758 }
759 };
760 if let Some(c) = self.clients.get(&id) {
761 for f in reflect::gamemode_kit(c.uuid, &c.username, mode) {
762 let _ = c.tx.try_send(f);
763 }
764 }
765 self.feedback(id, "client-side game mode updated");
766 }
767
768 fn feedback(&mut self, id: ClientId, msg: &str) {
769 if let Some(c) = self.clients.get(&id) {
770 let _ = c.tx.try_send(reflect::system_chat_frame(msg));
771 }
772 }
773
774 fn demote_to_spectator(&mut self, id: ClientId) {
776 let Some(c) = self.clients.get(&id) else {
777 return;
778 };
779 let mut frames = reflect::spectator_kit(c.uuid, &c.username);
780 frames.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
781 frames.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
782 let c = self.clients.get(&id).expect("checked above");
783 for f in frames {
784 let _ = c.tx.try_send(f);
785 }
786 }
787
788 fn promote_to_controller(&mut self, id: ClientId) {
792 let Some(c) = self.clients.get(&id) else {
793 return;
794 };
795 let mut frames = reflect::controller_kit(c.uuid, &c.username, self.real_game_mode);
796 frames.extend(self.abilities.iter().cloned());
797 let teleport = reflect::handoff_teleport_frame(&self.pose);
798 let has_teleport = teleport.is_some();
799 frames.extend(teleport);
800 for f in frames {
801 let _ = c.tx.try_send(f);
802 }
803 if let Some(c) = self.clients.get_mut(&id) {
804 c.swallow_next_accept = has_teleport;
805 c.camera_on = false; }
807 self.controller = Some(id);
808 let username = self
809 .clients
810 .get(&id)
811 .map(|c| c.username.clone())
812 .unwrap_or_default();
813 let _ = self.events.send(ProxyEvent::ControlChanged {
814 controller: Some((id, username)),
815 });
816 }
817
818 fn reassert_spectators(&mut self) {
821 let viewers: Vec<(ClientId, Uuid, String)> = self
822 .clients
823 .iter()
824 .filter(|(&cid, c)| {
825 Some(cid) != self.controller && matches!(c.state, ClientState::Live)
826 })
827 .map(|(&cid, c)| (cid, c.uuid, c.username.clone()))
828 .collect();
829 for (cid, uuid, name) in viewers {
830 let kit = reflect::spectator_kit(uuid, &name);
831 if let Some(c) = self.clients.get(&cid) {
832 for f in kit {
833 let _ = c.tx.try_send(f);
834 }
835 }
836 }
837 }
838
839 fn on_attach(&mut self, id: ClientId, tx: mpsc::Sender<Frame>, username: String, uuid: Uuid) {
840 if let Some(max) = self.opts.max_clients {
841 if self.clients.len() >= max {
842 tracing::info!("refusing viewer {id} ('{username}'): max_clients={max} reached");
843 return;
845 }
846 }
847 tracing::info!("viewer {id} ('{username}') attaching");
848 let _ = self.events.send(ProxyEvent::ClientJoined {
849 id,
850 username: username.clone(),
851 });
852 self.clients.insert(
853 id,
854 ClientHandle {
855 tx,
856 state: ClientState::Parked,
857 username,
858 uuid,
859 swallow_next_accept: false,
860 camera_on: false,
861 },
862 );
863 if self.cache.login.is_some() {
864 self.start_replay(id);
865 } else {
866 tracing::info!("viewer {id} parked until session reaches game state");
867 }
868 }
869
870 fn start_replay(&mut self, id: ClientId) {
874 let mut frames = self.cache.config_frames.clone();
875 frames.push(ids::finish_config_frame());
876
877 let ok = {
878 let Some(c) = self.clients.get_mut(&id) else {
879 return;
880 };
881 let mut ok = true;
882 for f in frames {
883 if c.tx.try_send(f).is_err() {
884 ok = false;
885 break;
886 }
887 }
888 if ok {
889 c.state = ClientState::Joining;
890 }
891 ok
892 };
893 if !ok {
894 self.drop_client(id, "queue overflow during replay");
895 }
896 }
897
898 fn flush_parked(&mut self) {
899 let parked: Vec<ClientId> = self
900 .clients
901 .iter()
902 .filter(|(_, c)| matches!(c.state, ClientState::Parked))
903 .map(|(&id, _)| id)
904 .collect();
905 for id in parked {
906 self.start_replay(id);
907 }
908 }
909
910 fn viewers_receive(&mut self, f: &Frame) -> bool {
916 if !matches!(self.upstream_state, UpstreamState::Game) {
917 return true;
918 }
919 match f.packet_id {
920 ids::CB_GAME_PLAYER_POSITION => {
921 if self.forward_next_position {
922 self.forward_next_position = false;
923 true
924 } else {
925 false
926 }
927 }
928 ids::CB_GAME_PLAYER_ABILITIES => false,
929 ids::CB_GAME_GAME_EVENT => f.body.first() != Some(&3),
931 _ => true,
932 }
933 }
934
935 async fn broadcast(&mut self, frame: Frame) {
940 let viewers_receive = self.viewers_receive(&frame);
941 let mut dead = Vec::new();
942 for (&id, c) in self.clients.iter() {
943 if !matches!(c.state, ClientState::Live) {
944 continue;
945 }
946 if Some(id) == self.controller {
947 if c.tx.send(frame.clone()).await.is_err() {
948 dead.push(id);
949 }
950 } else if viewers_receive && c.tx.try_send(frame.clone()).is_err() {
951 dead.push(id);
952 }
953 }
954 for id in dead {
955 self.drop_client(id, "send failed or fell behind");
956 }
957 }
958
959 fn send_to_viewers(&mut self, frames: &[Frame]) {
961 let mut dead = Vec::new();
962 for (&id, c) in self.clients.iter() {
963 if Some(id) == self.controller || !matches!(c.state, ClientState::Live) {
964 continue;
965 }
966 for f in frames {
967 if c.tx.try_send(f.clone()).is_err() {
968 dead.push(id);
969 break;
970 }
971 }
972 }
973 for id in dead {
974 self.drop_client(id, "send failed or fell behind");
975 }
976 }
977
978 fn drop_client(&mut self, id: ClientId, reason: &str) {
979 if let Some(c) = self.clients.remove(&id) {
980 tracing::info!("client {id} ('{}') dropped: {reason}", c.username);
981 let _ = self.events.send(ProxyEvent::ClientLeft {
982 id,
983 username: c.username,
984 });
985 }
987 if self.controller == Some(id) {
988 self.controller = None;
989 if self.opts.always_first_control {
990 let oldest = self
993 .clients
994 .iter()
995 .filter(|(_, c)| matches!(c.state, ClientState::Live))
996 .map(|(&cid, _)| cid)
997 .min();
998 if let Some(next) = oldest {
999 tracing::info!("controller left; promoting client {next} (always_first_control)");
1000 self.promote_to_controller(next);
1001 self.feedback(next, "previous controller left — you have control now");
1002 return;
1003 }
1004 }
1005 tracing::info!("controller left; session is now controllerless (use ,acquire)");
1008 let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
1009 }
1010 }
1011}