Skip to main content

azalea_reflection_proxy/
lib.rs

1//! azalea-reflection-proxy — spectate and control an azalea bot session
2//! through a local reflection proxy. Rust port of
3//! aesthetic0001/mineflayer-reflection-proxy.
4//!
5//! The proxy owns the single real (Microsoft-authed) connection to the
6//! target server. Your bot connects to the proxy locally as an offline
7//! client and becomes the controller; vanilla clients that join the
8//! same local address become spectators, see the bot as a live player
9//! entity, and can take over with `,acquire`.
10//!
11//! ```no_run
12//! # async fn example() -> eyre::Result<()> {
13//! use azalea_reflection_proxy::ReflectionProxy;
14//!
15//! let proxy = ReflectionProxy::builder()
16//!     .target("mc.hypixel.net")
17//!     .email("you@example.com")
18//!     .spawn()
19//!     .await?;
20//!
21//! // then point your azalea bot at it instead of the real server:
22//! //   ClientBuilder::new()
23//! //       .set_handler(handle)
24//! //       .start(Account::offline("reflected"), proxy.local_addr())
25//! // and add a vanilla-client server entry for the same address to
26//! // spectate. proxy.local_addr() is a real SocketAddr, so a bound
27//! // port of 0 picks a free one.
28//! # Ok(()) }
29//! ```
30
31mod ids;
32mod local_server;
33pub mod plugin;
34mod reflect;
35mod relay;
36mod session;
37mod snapshot;
38mod upstream;
39
40use std::net::SocketAddr;
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, Ordering};
44
45use eyre::Result;
46use tokio::sync::{Mutex, mpsc};
47use tokio::task::JoinHandle;
48
49pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
50
51/// Configuration for a reflection proxy. Build with
52/// [`ReflectionProxy::builder`].
53pub struct ProxyBuilder {
54    bind: String,
55    target_host: String,
56    target_port: u16,
57    email: String,
58    auth_cache: Option<PathBuf>,
59    plugins: Vec<Box<dyn ProxyPlugin>>,
60}
61
62impl Default for ProxyBuilder {
63    fn default() -> Self {
64        Self {
65            bind: "127.0.0.1:25566".into(),
66            target_host: "localhost".into(),
67            target_port: 25565,
68            email: String::new(),
69            auth_cache: None,
70            plugins: Vec::new(),
71        }
72    }
73}
74
75impl ProxyBuilder {
76    /// Local address the proxy listens on (default `127.0.0.1:25566`;
77    /// use port 0 for an OS-assigned free port).
78    pub fn bind(mut self, addr: impl Into<String>) -> Self {
79        self.bind = addr.into();
80        self
81    }
82
83    /// The real server, e.g. `"mc.hypixel.net"` or `"host:port"`.
84    pub fn target(mut self, host: impl Into<String>) -> Self {
85        let host = host.into();
86        match host.rsplit_once(':') {
87            Some((h, p)) if p.parse::<u16>().is_ok() => {
88                self.target_host = h.to_string();
89                self.target_port = p.parse().unwrap();
90            }
91            _ => self.target_host = host,
92        }
93        self
94    }
95
96    /// Microsoft account email. Tokens are cached (and refreshed) in
97    /// azalea's standard cache file unless [`Self::auth_cache`] is set,
98    /// so interactive login happens at most once per account.
99    pub fn email(mut self, email: impl Into<String>) -> Self {
100        self.email = email.into();
101        self
102    }
103
104    /// Override the auth token cache path (default:
105    /// `~/.minecraft/azalea-auth.json`, shared with azalea itself).
106    pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
107        self.auth_cache = Some(path.into());
108        self
109    }
110
111    /// Add a frame-level plugin (Forward/Drop/Replace verdicts on raw
112    /// packets, in registration order — the port of the original's
113    /// plugin pipeline).
114    pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
115        self.plugins.push(p);
116        self
117    }
118
119    /// Bind the listener and start accepting clients in the background.
120    pub async fn spawn(self) -> Result<ReflectionProxy> {
121        if self.email.is_empty() {
122            eyre::bail!("ProxyBuilder::email is required");
123        }
124        let listener = local_server::listen(&local_server::LocalServerConfig {
125            bind: self.bind.clone(),
126        })
127        .await?;
128        let local_addr = listener.local_addr()?;
129
130        let cfg = Arc::new(upstream::UpstreamConfig {
131            host: self.target_host,
132            port: self.target_port,
133            email: self.email,
134            auth_cache: self.auth_cache,
135        });
136        let pipeline = Arc::new(Pipeline {
137            plugins: self.plugins,
138        });
139        let registry: SessionRegistry = Arc::new(Mutex::new(None));
140
141        let accept_task = tokio::spawn(accept_loop(listener, cfg, pipeline, registry));
142
143        Ok(ReflectionProxy {
144            local_addr,
145            accept_task,
146        })
147    }
148}
149
150/// A running reflection proxy. Dropping the handle does NOT stop it;
151/// call [`Self::shutdown`] for that.
152pub struct ReflectionProxy {
153    local_addr: SocketAddr,
154    accept_task: JoinHandle<()>,
155}
156
157impl ReflectionProxy {
158    pub fn builder() -> ProxyBuilder {
159        ProxyBuilder::default()
160    }
161
162    /// The address your bot (`Account::offline(...)`) and any vanilla
163    /// spectator clients should connect to.
164    pub fn local_addr(&self) -> SocketAddr {
165        self.local_addr
166    }
167
168    /// Stop accepting new clients. Live sessions keep running until
169    /// their connections close.
170    pub fn shutdown(&self) {
171        self.accept_task.abort();
172    }
173
174    /// Run until the accept loop ends (i.e. forever, unless shutdown()
175    /// is called or the listener fails). Handy for binary main().
176    pub async fn wait(self) {
177        let _ = self.accept_task.await;
178    }
179}
180
181/// At most one live session; new connections attach to it as viewers.
182/// When its sender reports closed the session task has exited, and the
183/// next connection becomes a fresh controller.
184type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
185
186static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
187
188async fn accept_loop(
189    listener: tokio::net::TcpListener,
190    cfg: Arc<upstream::UpstreamConfig>,
191    pipeline: Arc<Pipeline>,
192    registry: SessionRegistry,
193) {
194    loop {
195        let (stream, addr) = match listener.accept().await {
196            Ok(x) => x,
197            Err(e) => {
198                tracing::error!("accept failed: {e}");
199                break;
200            }
201        };
202        tracing::info!("connection from {addr}");
203        let (cfg, pipeline, registry) = (cfg.clone(), pipeline.clone(), registry.clone());
204        tokio::spawn(async move {
205            if let Err(e) = handle_connection(stream, cfg, pipeline, registry).await {
206                // status pings land here too, so this is not an error
207                tracing::info!("connection ended: {e:#}");
208            }
209        });
210    }
211}
212
213async fn handle_connection(
214    stream: tokio::net::TcpStream,
215    cfg: Arc<upstream::UpstreamConfig>,
216    pipeline: Arc<Pipeline>,
217    registry: SessionRegistry,
218) -> Result<()> {
219    let local = local_server::accept_login(stream).await?;
220    let username = local.username.clone();
221    let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
222
223    // Held across the upstream connect on purpose: a second client that
224    // races in while the controller is still authenticating waits here,
225    // then attaches as a viewer instead of spawning a second session.
226    let mut guard = registry.lock().await;
227
228    if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
229        drop(guard);
230        session::attach_viewer(&tx, id, local).await?;
231        tracing::info!("'{username}' attached as viewer (client {id})");
232        return Ok(());
233    }
234
235    tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
236    let up = upstream::connect(&cfg).await?;
237    tracing::info!("upstream established as {}", up.profile.name);
238
239    *guard = Some(session::spawn(up, local, id, pipeline));
240    Ok(())
241}