1use std::collections::HashMap;
21use std::sync::Arc;
22
23use azalea_protocol::connect::Connection;
24use azalea_protocol::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
25use eyre::Result;
26use tokio::sync::{broadcast, mpsc};
27
28use uuid::Uuid;
29
30use crate::ProxyEvent;
31
32#[derive(Clone)]
34pub struct SessionOpts {
35 pub max_clients: Option<usize>,
37 pub always_first_control: bool,
41}
42
43use crate::ids;
44use crate::local_server::LocalClient;
45use crate::plugin::{Frame, Pipeline};
46use crate::reflect::{self, BotPose};
47use crate::relay::{AzaleaFrameSink, AzaleaFrameSource, FrameSink, FrameSource};
48use crate::upstream::Upstream;
49
50pub type ClientId = u32;
51
52pub enum SessionMsg {
53 FromUpstream(Frame),
54 UpstreamClosed(String),
55 FromClient(ClientId, Frame),
56 Attach {
57 id: ClientId,
58 tx: mpsc::Sender<Frame>,
59 username: String,
60 uuid: Uuid,
61 },
62 Detach(ClientId),
63}
64
65enum ClientState {
66 Parked,
69 Joining,
72 Live,
74}
75
76struct ClientHandle {
77 tx: mpsc::Sender<Frame>,
78 state: ClientState,
79 username: String,
80 uuid: Uuid,
81 swallow_next_accept: bool,
84 camera_on: bool,
86}
87
88#[derive(Default)]
95struct JoinCache {
96 config_frames: Vec<Frame>,
97 login: Option<Frame>,
98 last_position: Option<Frame>,
99 respawn: Option<Frame>,
100 spawn_pos: Option<Frame>,
101 chunk_center: Option<Frame>,
102 chunk_radius: Option<Frame>,
103 chunks: HashMap<(i32, i32), Frame>,
104 world: crate::snapshot::WorldSnapshot,
105}
106
107impl JoinCache {
108 fn on_respawn(&mut self, respawn: Frame) {
110 self.respawn = Some(respawn);
111 self.last_position = None;
112 self.spawn_pos = None;
113 self.chunk_center = None;
114 self.chunks.clear();
115 self.world.on_respawn();
116 }
117
118 fn join_frames(&self) -> Vec<Frame> {
122 let mut q = Vec::with_capacity(self.chunks.len() + 32);
123 q.extend(self.login.iter().cloned());
124 q.extend(self.respawn.iter().cloned());
125 q.extend(self.spawn_pos.iter().cloned());
126 q.extend(self.last_position.iter().cloned());
127 q.push(ids::wait_for_chunks_frame());
128 q.extend(self.chunk_radius.iter().cloned());
129 q.extend(self.chunk_center.iter().cloned());
130 q.extend(self.chunks.values().cloned());
131 q.extend(self.world.replay());
132 q
133 }
134}
135
136enum UpstreamState {
137 Config,
138 Game,
139}
140
141struct Session {
142 pipeline: Arc<Pipeline>,
143 upstream_tx: mpsc::Sender<Frame>,
144 clients: HashMap<ClientId, ClientHandle>,
145 controller: Option<ClientId>,
149 cache: JoinCache,
150 upstream_state: UpstreamState,
151 seen_first_game_frame: bool,
152 bot_uuid: Uuid,
154 bot_name: String,
155 pose: BotPose,
156 real_game_mode: u8,
159 abilities: Option<Frame>,
161 respawn_entity_pending: bool,
164 forward_next_position: bool,
168 real_player_id: Option<i32>,
171 opts: SessionOpts,
172 events: broadcast::Sender<ProxyEvent>,
173}
174
175pub fn spawn(
180 upstream: Upstream,
181 controller: LocalClient,
182 controller_id: ClientId,
183 pipeline: Arc<Pipeline>,
184 opts: SessionOpts,
185 events: broadcast::Sender<ProxyEvent>,
186) -> mpsc::Sender<SessionMsg> {
187 tracing::info!(
188 "session start: controller '{}', upstream compression threshold {:?}",
189 controller.username,
190 upstream.compression_threshold
191 );
192 let bot_uuid = upstream.profile.id;
193 let bot_name = upstream.profile.name.clone();
194
195 for p in &pipeline.plugins {
196 p.on_session_start();
197 }
198
199 let (msg_tx, msg_rx) = mpsc::channel::<SessionMsg>(1024);
200 let upstream_tx = start_upstream_io(upstream, msg_tx.clone());
201
202 let (ctl_tx, ctl_rx) = mpsc::channel::<Frame>(4096);
203 let mut clients = HashMap::new();
204 clients.insert(
205 controller_id,
206 ClientHandle {
207 tx: ctl_tx,
208 state: ClientState::Live,
209 username: controller.username.clone(),
210 uuid: controller.uuid,
211 swallow_next_accept: false,
212 camera_on: false,
213 },
214 );
215 start_client_io(controller_id, controller.connection, msg_tx.clone(), ctl_rx);
216
217 let _ = events.send(ProxyEvent::SessionStarted);
218 let _ = events.send(ProxyEvent::ClientJoined {
219 id: controller_id,
220 username: controller.username.clone(),
221 });
222 let _ = events.send(ProxyEvent::ControlChanged {
223 controller: Some((controller_id, controller.username.clone())),
224 });
225
226 let session = Session {
227 pipeline,
228 upstream_tx,
229 clients,
230 controller: Some(controller_id),
231 cache: JoinCache::default(),
232 upstream_state: UpstreamState::Config,
233 seen_first_game_frame: false,
234 bot_uuid,
235 bot_name,
236 pose: BotPose::default(),
237 real_game_mode: 0,
238 abilities: None,
239 respawn_entity_pending: false,
240 forward_next_position: false,
241 real_player_id: None,
242 opts,
243 events,
244 };
245 tokio::spawn(session.run(msg_rx));
246 msg_tx
247}
248
249pub async fn attach_viewer(
253 session_tx: &mpsc::Sender<SessionMsg>,
254 id: ClientId,
255 client: LocalClient,
256) -> Result<()> {
257 let (tx, rx) = mpsc::channel::<Frame>(8192);
260 session_tx
261 .send(SessionMsg::Attach {
262 id,
263 tx,
264 username: client.username.clone(),
265 uuid: client.uuid,
266 })
267 .await
268 .map_err(|_| eyre::eyre!("session closed while attaching"))?;
269 start_client_io(id, client.connection, session_tx.clone(), rx);
270 Ok(())
271}
272
273fn login_info(f: &Frame) -> (Option<u8>, Option<i32>) {
276 use azalea_protocol::packets::ProtocolPacket;
277 use azalea_protocol::packets::game::ClientboundGamePacket;
278 use std::io::Cursor;
279 match ClientboundGamePacket::read(f.packet_id, &mut Cursor::new(&f.body[..])) {
280 Ok(ClientboundGamePacket::Login(l)) => {
281 (Some(l.common.game_type.to_id()), Some(l.player_id.0))
282 }
283 _ => (None, None),
284 }
285}
286
287fn start_upstream_io(upstream: Upstream, msg_tx: mpsc::Sender<SessionMsg>) -> mpsc::Sender<Frame> {
288 let (read, write) = upstream.connection.into_split_raw();
289 let (tx, mut rx) = mpsc::channel::<Frame>(1024);
290
291 tokio::spawn(async move {
292 let mut sink = AzaleaFrameSink { writer: write };
293 while let Some(f) = rx.recv().await {
294 if let Err(e) = sink.write_frame(f).await {
295 tracing::warn!("upstream write failed: {e:#}");
296 break;
297 }
298 }
299 });
300
301 tokio::spawn(async move {
302 let mut src = AzaleaFrameSource { reader: read };
303 loop {
304 match src.read_frame().await {
305 Ok(f) => {
306 if msg_tx.send(SessionMsg::FromUpstream(f)).await.is_err() {
307 break;
308 }
309 }
310 Err(e) => {
311 let _ = msg_tx
312 .send(SessionMsg::UpstreamClosed(format!("{e:#}")))
313 .await;
314 break;
315 }
316 }
317 }
318 });
319
320 tx
321}
322
323fn start_client_io(
324 id: ClientId,
325 conn: Connection<ServerboundConfigPacket, ClientboundConfigPacket>,
326 msg_tx: mpsc::Sender<SessionMsg>,
327 mut frame_rx: mpsc::Receiver<Frame>,
328) {
329 let (read, write) = conn.into_split_raw();
330
331 tokio::spawn(async move {
332 let mut sink = AzaleaFrameSink { writer: write };
333 while let Some(f) = frame_rx.recv().await {
334 if let Err(e) = sink.write_frame(f).await {
335 tracing::debug!("client {id} write failed: {e:#}");
336 break;
337 }
338 }
339 });
340
341 tokio::spawn(async move {
342 let mut src = AzaleaFrameSource { reader: read };
343 loop {
344 match src.read_frame().await {
345 Ok(f) => {
346 if msg_tx.send(SessionMsg::FromClient(id, f)).await.is_err() {
347 break;
348 }
349 }
350 Err(e) => {
351 tracing::debug!("client {id} read ended: {e:#}");
352 let _ = msg_tx.send(SessionMsg::Detach(id)).await;
353 break;
354 }
355 }
356 }
357 });
358}
359
360impl Session {
361 async fn run(mut self, mut rx: mpsc::Receiver<SessionMsg>) {
362 while let Some(msg) = rx.recv().await {
363 match msg {
364 SessionMsg::FromUpstream(frame) => self.on_upstream_frame(frame).await,
365 SessionMsg::UpstreamClosed(reason) => {
366 tracing::info!("upstream closed: {reason}");
367 break;
368 }
369 SessionMsg::FromClient(id, frame) => {
370 if let Err(e) = self.on_client_frame(id, frame).await {
371 tracing::info!("session ending: {e:#}");
372 break;
373 }
374 }
375 SessionMsg::Attach {
376 id,
377 tx,
378 username,
379 uuid,
380 } => self.on_attach(id, tx, username, uuid),
381 SessionMsg::Detach(id) => self.drop_client(id, "disconnected"),
382 }
383 if self.clients.is_empty() {
384 tracing::info!("last client left; tearing session down");
385 break;
386 }
387 }
388 tracing::info!(
389 "session ended ({} client(s) still attached will be dropped)",
390 self.clients.len()
391 );
392 let _ = self.events.send(ProxyEvent::SessionEnded);
393 }
394
395 async fn on_upstream_frame(&mut self, frame: Frame) {
396 for f in self.pipeline.clientbound(frame) {
397 let id = f.packet_id;
398 self.observe_clientbound(&f);
399 self.stand_in(&f).await;
400 self.broadcast(f).await;
401 if matches!(self.upstream_state, UpstreamState::Game)
404 && (id == ids::CB_GAME_LOGIN || id == ids::CB_GAME_RESPAWN)
405 {
406 self.reassert_spectators();
407 }
408 }
409 }
410
411 async fn stand_in(&mut self, f: &Frame) {
415 if self.controller.is_some() {
416 return;
417 }
418 let reply = match self.upstream_state {
419 UpstreamState::Game => match f.packet_id {
420 ids::CB_GAME_KEEP_ALIVE => reflect::keepalive_id(f).map(reflect::keepalive_reply),
421 ids::CB_GAME_PLAYER_POSITION => {
422 reflect::teleport_id(f).map(reflect::accept_teleport_frame)
423 }
424 _ => None,
425 },
426 UpstreamState::Config => match f.packet_id {
427 ids::CB_CONFIG_KEEP_ALIVE => {
428 reflect::keepalive_id(f).map(reflect::config_keepalive_reply)
429 }
430 _ => None,
431 },
432 };
433 if let Some(r) = reply {
434 if self.upstream_tx.send(r).await.is_err() {
435 tracing::warn!("stand-in reply failed: upstream writer closed");
436 }
437 }
438 }
439
440 fn observe_clientbound(&mut self, f: &Frame) {
443 match self.upstream_state {
444 UpstreamState::Config => match f.packet_id {
445 ids::CB_CONFIG_FINISH => {
446 self.upstream_state = UpstreamState::Game;
447 self.seen_first_game_frame = false;
448 }
449 ids::CB_CONFIG_KEEP_ALIVE | ids::CB_CONFIG_PING => {}
451 _ => self.cache.config_frames.push(f.clone()),
452 },
453 UpstreamState::Game => {
454 if !self.seen_first_game_frame {
455 self.seen_first_game_frame = true;
456 if f.packet_id != ids::CB_GAME_LOGIN {
457 tracing::warn!(
459 "first game-state frame has id {} but Login should be {} — \
460 ids.rs may be stale for this azalea version",
461 f.packet_id,
462 ids::CB_GAME_LOGIN
463 );
464 }
465 }
466 self.cache.world.observe(f);
467 match f.packet_id {
468 ids::CB_GAME_LOGIN => {
469 self.cache.login = Some(f.clone());
470 let (mode, pid) = login_info(f);
471 self.real_game_mode = mode.unwrap_or(0);
472 self.real_player_id = pid;
473 self.respawn_entity_pending = true;
476 self.forward_next_position = true;
477 self.flush_parked();
478 }
479 ids::CB_GAME_PLAYER_POSITION => {
480 self.cache.last_position = Some(f.clone());
481 reflect::apply_server_teleport(&mut self.pose, f);
482 }
483 ids::CB_GAME_RESPAWN => {
484 self.cache.on_respawn(f.clone());
485 self.pose.pos = None;
487 self.respawn_entity_pending = true;
488 self.forward_next_position = true;
489 }
490 ids::CB_GAME_PLAYER_ABILITIES => {
491 self.abilities = Some(f.clone());
492 }
493 ids::CB_GAME_GAME_EVENT => {
494 if f.body.first() == Some(&3) {
496 if let Some(mode) = f.body.get(1..5).and_then(|b| {
497 b.try_into().ok().map(|a| f32::from_be_bytes(a) as u8)
498 }) {
499 self.real_game_mode = mode;
500 }
501 }
502 }
503 ids::CB_GAME_SET_DEFAULT_SPAWN_POSITION => {
504 self.cache.spawn_pos = Some(f.clone());
505 }
506 ids::CB_GAME_SET_CHUNK_CACHE_CENTER => {
507 self.cache.chunk_center = Some(f.clone());
508 }
509 ids::CB_GAME_SET_CHUNK_CACHE_RADIUS => {
510 self.cache.chunk_radius = Some(f.clone());
511 }
512 ids::CB_GAME_LEVEL_CHUNK_WITH_LIGHT => {
513 if let Some(key) = ids::chunk_key(&f.body) {
514 self.cache.chunks.insert(key, f.clone());
515 }
516 }
517 ids::CB_GAME_FORGET_LEVEL_CHUNK => {
518 if let Some(key) = ids::forget_chunk_key(&f.body) {
519 self.cache.chunks.remove(&key);
520 }
521 }
522 ids::CB_GAME_START_CONFIGURATION => {
523 self.upstream_state = UpstreamState::Config;
527 self.cache = JoinCache::default();
528 }
529 _ => {}
530 }
531 }
532 }
533 }
534
535 async fn on_client_frame(&mut self, id: ClientId, frame: Frame) -> Result<()> {
539 if matches!(self.upstream_state, UpstreamState::Game) {
541 if let Some(text) = reflect::chat_text(&frame) {
542 if text.starts_with(',') {
543 self.handle_command(id, text.trim()).await?;
544 return Ok(());
545 }
546 }
547 }
548
549 if Some(id) == self.controller {
550 if frame.packet_id == ids::SB_GAME_ACCEPT_TELEPORTATION
552 && matches!(self.upstream_state, UpstreamState::Game)
553 {
554 if let Some(c) = self.clients.get_mut(&id) {
555 if c.swallow_next_accept {
556 c.swallow_next_accept = false;
557 return Ok(());
558 }
559 }
560 }
561 if reflect::apply_controller_move(&mut self.pose, &frame) {
564 let update = if self.respawn_entity_pending && self.pose.pos.is_some() {
565 self.respawn_entity_pending = false;
566 reflect::spawn_frames(self.bot_uuid, &self.pose)
567 } else {
568 reflect::move_frames(&self.pose)
569 };
570 self.send_to_viewers(&update);
571 }
572 for f in self.pipeline.serverbound(frame) {
573 if self.upstream_tx.send(f).await.is_err() {
574 eyre::bail!("upstream writer closed");
575 }
576 }
577 return Ok(());
578 }
579
580 let is_join_ack = matches!(
581 self.clients.get(&id),
582 Some(c) if matches!(c.state, ClientState::Joining)
583 ) && frame.packet_id == ids::SB_CONFIG_FINISH;
584
585 if is_join_ack {
586 let mut queue = self.cache.join_frames();
587 let (uuid, name) = {
589 let c = self.clients.get(&id).expect("checked above");
590 (c.uuid, c.username.clone())
591 };
592 queue.extend(reflect::spectator_kit(uuid, &name));
593 queue.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
594 queue.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
595 let c = self.clients.get_mut(&id).expect("checked above");
596 let mut ok = true;
597 for f in queue {
598 if c.tx.try_send(f).is_err() {
599 ok = false;
600 break;
601 }
602 }
603 if ok {
604 c.state = ClientState::Live;
605 tracing::info!("viewer {id} ('{}') is live", c.username);
606 } else {
607 self.drop_client(id, "queue overflow during join");
608 }
609 }
610 Ok(())
611 }
612
613 async fn handle_command(&mut self, id: ClientId, cmd: &str) -> Result<()> {
617 tracing::info!("client {id} issued command: {cmd}");
618 let (verb, arg) = match cmd.split_once(' ') {
619 Some((v, a)) => (v, a.trim()),
620 None => (cmd, ""),
621 };
622 match verb {
623 ",acquire" => {
624 if Some(id) == self.controller {
625 self.feedback(id, "you already have control");
626 return Ok(());
627 }
628 if let Some(old) = self.controller.take() {
630 self.demote_to_spectator(old);
631 self.feedback(old, "your control was taken by another client");
632 }
633 self.promote_to_controller(id);
634 self.feedback(id, "you have control now");
635 }
636 ",release" => {
637 if Some(id) == self.controller {
638 self.controller = None;
639 self.demote_to_spectator(id);
640 self.feedback(id, "control released; proxy is keeping the session alive");
641 let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
642 } else {
643 self.feedback(id, "you are not the controller");
644 }
645 }
646 ",spectate" => self.cmd_spectate(id, arg),
647 ",gamemode" | ",gm" => self.cmd_gamemode(id, arg),
648 _ => self.feedback(
649 id,
650 "commands: ,acquire ,release ,spectate [player] ,gamemode <0-3|name>",
651 ),
652 }
653 Ok(())
654 }
655
656 fn cmd_spectate(&mut self, id: ClientId, arg: &str) {
659 if Some(id) == self.controller {
660 self.feedback(id, ",release first — the controller cannot spectate");
661 return;
662 }
663 let camera_on = match self.clients.get(&id) {
664 Some(c) => c.camera_on,
665 None => return,
666 };
667 let (target, turning_on) = if arg.is_empty() {
668 if camera_on {
669 let Some(own) = self.real_player_id else {
671 self.feedback(id, "cannot detach camera yet (no login seen)");
672 return;
673 };
674 (own, false)
675 } else {
676 (reflect::REFLECTED_ENTITY_ID, true)
677 }
678 } else if arg.eq_ignore_ascii_case(&self.bot_name) {
679 (reflect::REFLECTED_ENTITY_ID, true)
680 } else {
681 match self.cache.world.entity_id_for_player(arg) {
682 Some(eid) => (eid, true),
683 None => {
684 self.feedback(id, "player not found (not in render distance?)");
685 return;
686 }
687 }
688 };
689 if let Some(c) = self.clients.get_mut(&id) {
690 let _ = c.tx.try_send(reflect::camera_frame(target));
691 c.camera_on = turning_on;
692 }
693 self.feedback(
694 id,
695 if turning_on {
696 "camera attached — ,spectate again to detach"
697 } else {
698 "camera detached"
699 },
700 );
701 }
702
703 fn cmd_gamemode(&mut self, id: ClientId, arg: &str) {
706 if Some(id) == self.controller {
707 self.feedback(id, "not while controlling — it would desync your client");
708 return;
709 }
710 let mode = match arg.to_ascii_lowercase().as_str() {
711 "0" | "survival" => 0u8,
712 "1" | "creative" => 1,
713 "2" | "adventure" => 2,
714 "3" | "spectator" => 3,
715 _ => {
716 self.feedback(id, "usage: ,gamemode <survival|creative|adventure|spectator|0-3>");
717 return;
718 }
719 };
720 if let Some(c) = self.clients.get(&id) {
721 for f in reflect::gamemode_kit(c.uuid, &c.username, mode) {
722 let _ = c.tx.try_send(f);
723 }
724 }
725 self.feedback(id, "client-side game mode updated");
726 }
727
728 fn feedback(&mut self, id: ClientId, msg: &str) {
729 if let Some(c) = self.clients.get(&id) {
730 let _ = c.tx.try_send(reflect::system_chat_frame(msg));
731 }
732 }
733
734 fn demote_to_spectator(&mut self, id: ClientId) {
736 let Some(c) = self.clients.get(&id) else {
737 return;
738 };
739 let mut frames = reflect::spectator_kit(c.uuid, &c.username);
740 frames.push(reflect::bot_info_frame(self.bot_uuid, &self.bot_name));
741 frames.extend(reflect::spawn_frames(self.bot_uuid, &self.pose));
742 let c = self.clients.get(&id).expect("checked above");
743 for f in frames {
744 let _ = c.tx.try_send(f);
745 }
746 }
747
748 fn promote_to_controller(&mut self, id: ClientId) {
752 let Some(c) = self.clients.get(&id) else {
753 return;
754 };
755 let mut frames = reflect::controller_kit(c.uuid, &c.username, self.real_game_mode);
756 frames.extend(self.abilities.iter().cloned());
757 let teleport = reflect::handoff_teleport_frame(&self.pose);
758 let has_teleport = teleport.is_some();
759 frames.extend(teleport);
760 for f in frames {
761 let _ = c.tx.try_send(f);
762 }
763 if let Some(c) = self.clients.get_mut(&id) {
764 c.swallow_next_accept = has_teleport;
765 c.camera_on = false; }
767 self.controller = Some(id);
768 let username = self
769 .clients
770 .get(&id)
771 .map(|c| c.username.clone())
772 .unwrap_or_default();
773 let _ = self.events.send(ProxyEvent::ControlChanged {
774 controller: Some((id, username)),
775 });
776 }
777
778 fn reassert_spectators(&mut self) {
781 let viewers: Vec<(ClientId, Uuid, String)> = self
782 .clients
783 .iter()
784 .filter(|(&cid, c)| {
785 Some(cid) != self.controller && matches!(c.state, ClientState::Live)
786 })
787 .map(|(&cid, c)| (cid, c.uuid, c.username.clone()))
788 .collect();
789 for (cid, uuid, name) in viewers {
790 let kit = reflect::spectator_kit(uuid, &name);
791 if let Some(c) = self.clients.get(&cid) {
792 for f in kit {
793 let _ = c.tx.try_send(f);
794 }
795 }
796 }
797 }
798
799 fn on_attach(&mut self, id: ClientId, tx: mpsc::Sender<Frame>, username: String, uuid: Uuid) {
800 if let Some(max) = self.opts.max_clients {
801 if self.clients.len() >= max {
802 tracing::info!("refusing viewer {id} ('{username}'): max_clients={max} reached");
803 return;
805 }
806 }
807 tracing::info!("viewer {id} ('{username}') attaching");
808 let _ = self.events.send(ProxyEvent::ClientJoined {
809 id,
810 username: username.clone(),
811 });
812 self.clients.insert(
813 id,
814 ClientHandle {
815 tx,
816 state: ClientState::Parked,
817 username,
818 uuid,
819 swallow_next_accept: false,
820 camera_on: false,
821 },
822 );
823 if self.cache.login.is_some() {
824 self.start_replay(id);
825 } else {
826 tracing::info!("viewer {id} parked until session reaches game state");
827 }
828 }
829
830 fn start_replay(&mut self, id: ClientId) {
834 let mut frames = self.cache.config_frames.clone();
835 frames.push(ids::finish_config_frame());
836
837 let ok = {
838 let Some(c) = self.clients.get_mut(&id) else {
839 return;
840 };
841 let mut ok = true;
842 for f in frames {
843 if c.tx.try_send(f).is_err() {
844 ok = false;
845 break;
846 }
847 }
848 if ok {
849 c.state = ClientState::Joining;
850 }
851 ok
852 };
853 if !ok {
854 self.drop_client(id, "queue overflow during replay");
855 }
856 }
857
858 fn flush_parked(&mut self) {
859 let parked: Vec<ClientId> = self
860 .clients
861 .iter()
862 .filter(|(_, c)| matches!(c.state, ClientState::Parked))
863 .map(|(&id, _)| id)
864 .collect();
865 for id in parked {
866 self.start_replay(id);
867 }
868 }
869
870 fn viewers_receive(&mut self, f: &Frame) -> bool {
876 if !matches!(self.upstream_state, UpstreamState::Game) {
877 return true;
878 }
879 match f.packet_id {
880 ids::CB_GAME_PLAYER_POSITION => {
881 if self.forward_next_position {
882 self.forward_next_position = false;
883 true
884 } else {
885 false
886 }
887 }
888 ids::CB_GAME_PLAYER_ABILITIES => false,
889 ids::CB_GAME_GAME_EVENT => f.body.first() != Some(&3),
891 _ => true,
892 }
893 }
894
895 async fn broadcast(&mut self, frame: Frame) {
900 let viewers_receive = self.viewers_receive(&frame);
901 let mut dead = Vec::new();
902 for (&id, c) in self.clients.iter() {
903 if !matches!(c.state, ClientState::Live) {
904 continue;
905 }
906 if Some(id) == self.controller {
907 if c.tx.send(frame.clone()).await.is_err() {
908 dead.push(id);
909 }
910 } else if viewers_receive && c.tx.try_send(frame.clone()).is_err() {
911 dead.push(id);
912 }
913 }
914 for id in dead {
915 self.drop_client(id, "send failed or fell behind");
916 }
917 }
918
919 fn send_to_viewers(&mut self, frames: &[Frame]) {
921 let mut dead = Vec::new();
922 for (&id, c) in self.clients.iter() {
923 if Some(id) == self.controller || !matches!(c.state, ClientState::Live) {
924 continue;
925 }
926 for f in frames {
927 if c.tx.try_send(f.clone()).is_err() {
928 dead.push(id);
929 break;
930 }
931 }
932 }
933 for id in dead {
934 self.drop_client(id, "send failed or fell behind");
935 }
936 }
937
938 fn drop_client(&mut self, id: ClientId, reason: &str) {
939 if let Some(c) = self.clients.remove(&id) {
940 tracing::info!("client {id} ('{}') dropped: {reason}", c.username);
941 let _ = self.events.send(ProxyEvent::ClientLeft {
942 id,
943 username: c.username,
944 });
945 }
947 if self.controller == Some(id) {
948 self.controller = None;
949 if self.opts.always_first_control {
950 let oldest = self
953 .clients
954 .iter()
955 .filter(|(_, c)| matches!(c.state, ClientState::Live))
956 .map(|(&cid, _)| cid)
957 .min();
958 if let Some(next) = oldest {
959 tracing::info!("controller left; promoting client {next} (always_first_control)");
960 self.promote_to_controller(next);
961 self.feedback(next, "previous controller left — you have control now");
962 return;
963 }
964 }
965 tracing::info!("controller left; session is now controllerless (use ,acquire)");
968 let _ = self.events.send(ProxyEvent::ControlChanged { controller: None });
969 }
970 }
971}