Skip to main content

azalea_reflection_proxy/
lib.rs

1//! azalea-reflection-proxy — spectate and control an azalea bot session
2//! through a local reflection proxy. Rust port of
3//! aesthetic0001/mineflayer-reflection-proxy.
4//!
5//! The proxy owns the single real (Microsoft-authed) connection to the
6//! target server. Your bot connects to the proxy locally as an offline
7//! client and becomes the controller; vanilla clients that join the
8//! same local address become spectators, see the bot as a live player
9//! entity, and can take over with `,acquire`.
10//!
11//! ```no_run
12//! # async fn example() -> eyre::Result<()> {
13//! use azalea_reflection_proxy::ReflectionProxy;
14//!
15//! let proxy = ReflectionProxy::builder()
16//!     .target("mc.hypixel.net")
17//!     .email("you@example.com")
18//!     .spawn()
19//!     .await?;
20//!
21//! // then point your azalea bot at it instead of the real server:
22//! //   ClientBuilder::new()
23//! //       .set_handler(handle)
24//! //       .start(Account::offline("reflected"), proxy.local_addr())
25//! // and add a vanilla-client server entry for the same address to
26//! // spectate. proxy.local_addr() is a real SocketAddr, so a bound
27//! // port of 0 picks a free one.
28//! # Ok(()) }
29//! ```
30
31mod ids;
32mod local_server;
33pub mod plugin;
34mod reflect;
35mod relay;
36mod session;
37mod snapshot;
38mod upstream;
39
40use std::net::SocketAddr;
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, Ordering};
44
45use eyre::Result;
46use tokio::sync::{Mutex, broadcast, mpsc};
47use tokio::task::JoinHandle;
48
49pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
50pub use session::ClientId;
51
52/// Things happening inside the proxy that the host program may care
53/// about — the port of the original's `clientJoin`/`changeControl`
54/// server events. Subscribe with [`ReflectionProxy::subscribe`].
55#[derive(Clone, Debug)]
56pub enum ProxyEvent {
57    /// A session (upstream connection) was established.
58    SessionStarted,
59    /// The session ended; the next client starts a fresh one.
60    SessionEnded,
61    ClientJoined {
62        id: ClientId,
63        username: String,
64    },
65    ClientLeft {
66        id: ClientId,
67        username: String,
68    },
69    /// Control moved (None = controllerless; the proxy stands in).
70    ControlChanged {
71        controller: Option<(ClientId, String)>,
72    },
73}
74
75/// Configuration for a reflection proxy. Build with
76/// [`ReflectionProxy::builder`].
77pub struct ProxyBuilder {
78    bind: String,
79    target_host: String,
80    target_port: u16,
81    email: String,
82    auth_cache: Option<PathBuf>,
83    plugins: Vec<Box<dyn ProxyPlugin>>,
84    whitelist: Vec<String>,
85    max_clients: Option<usize>,
86    always_first_control: bool,
87}
88
89impl Default for ProxyBuilder {
90    fn default() -> Self {
91        Self {
92            bind: "127.0.0.1:25566".into(),
93            target_host: "localhost".into(),
94            target_port: 25565,
95            email: String::new(),
96            auth_cache: None,
97            plugins: Vec::new(),
98            whitelist: Vec::new(),
99            max_clients: None,
100            always_first_control: false,
101        }
102    }
103}
104
105impl ProxyBuilder {
106    /// Local address the proxy listens on (default `127.0.0.1:25566`;
107    /// use port 0 for an OS-assigned free port).
108    pub fn bind(mut self, addr: impl Into<String>) -> Self {
109        self.bind = addr.into();
110        self
111    }
112
113    /// The real server, e.g. `"mc.hypixel.net"` or `"host:port"`.
114    pub fn target(mut self, host: impl Into<String>) -> Self {
115        let host = host.into();
116        match host.rsplit_once(':') {
117            Some((h, p)) if p.parse::<u16>().is_ok() => {
118                self.target_host = h.to_string();
119                self.target_port = p.parse().unwrap();
120            }
121            _ => self.target_host = host,
122        }
123        self
124    }
125
126    /// Microsoft account email. Tokens are cached (and refreshed) in
127    /// azalea's standard cache file unless [`Self::auth_cache`] is set,
128    /// so interactive login happens at most once per account.
129    pub fn email(mut self, email: impl Into<String>) -> Self {
130        self.email = email.into();
131        self
132    }
133
134    /// Override the auth token cache path (default:
135    /// `~/.minecraft/azalea-auth.json`, shared with azalea itself).
136    pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
137        self.auth_cache = Some(path.into());
138        self
139    }
140
141    /// Add a frame-level plugin (Forward/Drop/Replace verdicts on raw
142    /// packets, in registration order — the port of the original's
143    /// plugin pipeline).
144    pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
145        self.plugins.push(p);
146        self
147    }
148
149    /// Only allow these usernames to connect (case-insensitive). Empty
150    /// (the default) = anyone who can reach the bind address.
151    pub fn whitelist<I: IntoIterator<Item = S>, S: Into<String>>(mut self, names: I) -> Self {
152        self.whitelist = names.into_iter().map(Into::into).collect();
153        self
154    }
155
156    /// Cap simultaneous clients (controller + viewers). Default: no cap.
157    pub fn max_clients(mut self, max: usize) -> Self {
158        self.max_clients = Some(max);
159        self
160    }
161
162    /// When the controller disconnects, hand control to the oldest
163    /// connected viewer instead of going controllerless (the original's
164    /// `alwaysFirstControl`). Default: off — the proxy stands in and the
165    /// session idles until someone runs `,acquire`.
166    pub fn always_first_control(mut self, on: bool) -> Self {
167        self.always_first_control = on;
168        self
169    }
170
171    /// Bind the listener and start accepting clients in the background.
172    pub async fn spawn(self) -> Result<ReflectionProxy> {
173        if self.email.is_empty() {
174            eyre::bail!("ProxyBuilder::email is required");
175        }
176        let listener = local_server::listen(&local_server::LocalServerConfig {
177            bind: self.bind.clone(),
178        })
179        .await?;
180        let local_addr = listener.local_addr()?;
181
182        let cfg = Arc::new(upstream::UpstreamConfig {
183            host: self.target_host,
184            port: self.target_port,
185            email: self.email,
186            auth_cache: self.auth_cache,
187        });
188        let pipeline = Arc::new(Pipeline {
189            plugins: self.plugins,
190        });
191        let registry: SessionRegistry = Arc::new(Mutex::new(None));
192        let (events_tx, _) = broadcast::channel(256);
193        let shared = Arc::new(Shared {
194            cfg,
195            pipeline,
196            registry,
197            events: events_tx.clone(),
198            whitelist: self.whitelist,
199            opts: session::SessionOpts {
200                max_clients: self.max_clients,
201                always_first_control: self.always_first_control,
202            },
203        });
204
205        let accept_task = tokio::spawn(accept_loop(listener, shared));
206
207        Ok(ReflectionProxy {
208            local_addr,
209            accept_task,
210            events: events_tx,
211        })
212    }
213}
214
215/// A running reflection proxy. Dropping the handle does NOT stop it;
216/// call [`Self::shutdown`] for that.
217pub struct ReflectionProxy {
218    local_addr: SocketAddr,
219    accept_task: JoinHandle<()>,
220    events: broadcast::Sender<ProxyEvent>,
221}
222
223impl ReflectionProxy {
224    pub fn builder() -> ProxyBuilder {
225        ProxyBuilder::default()
226    }
227
228    /// Subscribe to proxy events (client joins/leaves, control changes,
229    /// session lifecycle). Each subscriber gets every event from the
230    /// moment it subscribes; slow subscribers may observe
231    /// [`broadcast::error::RecvError::Lagged`].
232    pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
233        self.events.subscribe()
234    }
235
236    /// The address your bot (`Account::offline(...)`) and any vanilla
237    /// spectator clients should connect to.
238    pub fn local_addr(&self) -> SocketAddr {
239        self.local_addr
240    }
241
242    /// Stop accepting new clients. Live sessions keep running until
243    /// their connections close.
244    pub fn shutdown(&self) {
245        self.accept_task.abort();
246    }
247
248    /// Run until the accept loop ends (i.e. forever, unless shutdown()
249    /// is called or the listener fails). Handy for binary main().
250    pub async fn wait(self) {
251        let _ = self.accept_task.await;
252    }
253}
254
255/// At most one live session; new connections attach to it as viewers.
256/// When its sender reports closed the session task has exited, and the
257/// next connection becomes a fresh controller.
258type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
259
260/// Everything the accept path needs, bundled once at spawn.
261struct Shared {
262    cfg: Arc<upstream::UpstreamConfig>,
263    pipeline: Arc<Pipeline>,
264    registry: SessionRegistry,
265    events: broadcast::Sender<ProxyEvent>,
266    whitelist: Vec<String>,
267    opts: session::SessionOpts,
268}
269
270static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
271
272async fn accept_loop(listener: tokio::net::TcpListener, shared: Arc<Shared>) {
273    loop {
274        let (stream, addr) = match listener.accept().await {
275            Ok(x) => x,
276            Err(e) => {
277                tracing::error!("accept failed: {e}");
278                break;
279            }
280        };
281        tracing::info!("connection from {addr}");
282        let shared = shared.clone();
283        tokio::spawn(async move {
284            if let Err(e) = handle_connection(stream, shared).await {
285                // status pings land here too, so this is not an error
286                tracing::info!("connection ended: {e:#}");
287            }
288        });
289    }
290}
291
292async fn handle_connection(stream: tokio::net::TcpStream, shared: Arc<Shared>) -> Result<()> {
293    let mut local = local_server::accept_login(stream).await?;
294    let username = local.username.clone();
295
296    if !shared.whitelist.is_empty()
297        && !shared
298            .whitelist
299            .iter()
300            .any(|w| w.eq_ignore_ascii_case(&username))
301    {
302        use azalea_chat::FormattedText;
303        use azalea_protocol::packets::config::c_disconnect::ClientboundDisconnect;
304        tracing::info!("'{username}' rejected: not whitelisted");
305        let _ = local
306            .connection
307            .write(ClientboundDisconnect {
308                reason: FormattedText::from("not on this proxy's whitelist"),
309            })
310            .await;
311        return Ok(());
312    }
313
314    let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
315
316    // Held across the upstream connect on purpose: a second client that
317    // races in while the controller is still authenticating waits here,
318    // then attaches as a viewer instead of spawning a second session.
319    let mut guard = shared.registry.lock().await;
320
321    if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
322        drop(guard);
323        session::attach_viewer(&tx, id, local).await?;
324        tracing::info!("'{username}' attached as viewer (client {id})");
325        return Ok(());
326    }
327
328    tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
329    let up = upstream::connect(&shared.cfg).await?;
330    tracing::info!("upstream established as {}", up.profile.name);
331
332    *guard = Some(session::spawn(
333        up,
334        local,
335        id,
336        shared.pipeline.clone(),
337        shared.opts.clone(),
338        shared.events.clone(),
339    ));
340    Ok(())
341}