1mod ids;
32mod local_server;
33pub mod plugin;
34mod reflect;
35mod relay;
36mod session;
37mod snapshot;
38mod upstream;
39
40use std::net::SocketAddr;
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, Ordering};
44
45use eyre::Result;
46use tokio::sync::{Mutex, broadcast, mpsc};
47use tokio::task::JoinHandle;
48
49pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
50pub use session::ClientId;
51
52#[derive(Clone, Debug)]
56pub enum ProxyEvent {
57 SessionStarted,
59 SessionEnded,
61 ClientJoined {
62 id: ClientId,
63 username: String,
64 },
65 ClientLeft {
66 id: ClientId,
67 username: String,
68 },
69 ControlChanged {
71 controller: Option<(ClientId, String)>,
72 },
73}
74
75pub struct ProxyBuilder {
78 bind: String,
79 target_host: String,
80 target_port: u16,
81 email: String,
82 auth_cache: Option<PathBuf>,
83 plugins: Vec<Box<dyn ProxyPlugin>>,
84 whitelist: Vec<String>,
85 max_clients: Option<usize>,
86 always_first_control: bool,
87}
88
89impl Default for ProxyBuilder {
90 fn default() -> Self {
91 Self {
92 bind: "127.0.0.1:25566".into(),
93 target_host: "localhost".into(),
94 target_port: 25565,
95 email: String::new(),
96 auth_cache: None,
97 plugins: Vec::new(),
98 whitelist: Vec::new(),
99 max_clients: None,
100 always_first_control: false,
101 }
102 }
103}
104
105impl ProxyBuilder {
106 pub fn bind(mut self, addr: impl Into<String>) -> Self {
109 self.bind = addr.into();
110 self
111 }
112
113 pub fn target(mut self, host: impl Into<String>) -> Self {
115 let host = host.into();
116 match host.rsplit_once(':') {
117 Some((h, p)) if p.parse::<u16>().is_ok() => {
118 self.target_host = h.to_string();
119 self.target_port = p.parse().unwrap();
120 }
121 _ => self.target_host = host,
122 }
123 self
124 }
125
126 pub fn email(mut self, email: impl Into<String>) -> Self {
130 self.email = email.into();
131 self
132 }
133
134 pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
137 self.auth_cache = Some(path.into());
138 self
139 }
140
141 pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
145 self.plugins.push(p);
146 self
147 }
148
149 pub fn whitelist<I: IntoIterator<Item = S>, S: Into<String>>(mut self, names: I) -> Self {
152 self.whitelist = names.into_iter().map(Into::into).collect();
153 self
154 }
155
156 pub fn max_clients(mut self, max: usize) -> Self {
158 self.max_clients = Some(max);
159 self
160 }
161
162 pub fn always_first_control(mut self, on: bool) -> Self {
167 self.always_first_control = on;
168 self
169 }
170
171 pub async fn spawn(self) -> Result<ReflectionProxy> {
173 if self.email.is_empty() {
174 eyre::bail!("ProxyBuilder::email is required");
175 }
176 let listener = local_server::listen(&local_server::LocalServerConfig {
177 bind: self.bind.clone(),
178 })
179 .await?;
180 let local_addr = listener.local_addr()?;
181
182 let cfg = Arc::new(upstream::UpstreamConfig {
183 host: self.target_host,
184 port: self.target_port,
185 email: self.email,
186 auth_cache: self.auth_cache,
187 });
188 let pipeline = Arc::new(Pipeline {
189 plugins: self.plugins,
190 });
191 let registry: SessionRegistry = Arc::new(Mutex::new(None));
192 let (events_tx, _) = broadcast::channel(256);
193 let shared = Arc::new(Shared {
194 cfg,
195 pipeline,
196 registry,
197 events: events_tx.clone(),
198 whitelist: self.whitelist,
199 opts: session::SessionOpts {
200 max_clients: self.max_clients,
201 always_first_control: self.always_first_control,
202 },
203 });
204
205 let accept_task = tokio::spawn(accept_loop(listener, shared));
206
207 Ok(ReflectionProxy {
208 local_addr,
209 accept_task,
210 events: events_tx,
211 })
212 }
213}
214
215pub struct ReflectionProxy {
218 local_addr: SocketAddr,
219 accept_task: JoinHandle<()>,
220 events: broadcast::Sender<ProxyEvent>,
221}
222
223impl ReflectionProxy {
224 pub fn builder() -> ProxyBuilder {
225 ProxyBuilder::default()
226 }
227
228 pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
233 self.events.subscribe()
234 }
235
236 pub fn local_addr(&self) -> SocketAddr {
239 self.local_addr
240 }
241
242 pub fn shutdown(&self) {
245 self.accept_task.abort();
246 }
247
248 pub async fn wait(self) {
251 let _ = self.accept_task.await;
252 }
253}
254
255type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
259
260struct Shared {
262 cfg: Arc<upstream::UpstreamConfig>,
263 pipeline: Arc<Pipeline>,
264 registry: SessionRegistry,
265 events: broadcast::Sender<ProxyEvent>,
266 whitelist: Vec<String>,
267 opts: session::SessionOpts,
268}
269
270static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
271
272async fn accept_loop(listener: tokio::net::TcpListener, shared: Arc<Shared>) {
273 loop {
274 let (stream, addr) = match listener.accept().await {
275 Ok(x) => x,
276 Err(e) => {
277 tracing::error!("accept failed: {e}");
278 break;
279 }
280 };
281 tracing::info!("connection from {addr}");
282 let shared = shared.clone();
283 tokio::spawn(async move {
284 if let Err(e) = handle_connection(stream, shared).await {
285 tracing::info!("connection ended: {e:#}");
287 }
288 });
289 }
290}
291
292async fn handle_connection(stream: tokio::net::TcpStream, shared: Arc<Shared>) -> Result<()> {
293 let mut local = local_server::accept_login(stream).await?;
294 let username = local.username.clone();
295
296 if !shared.whitelist.is_empty()
297 && !shared
298 .whitelist
299 .iter()
300 .any(|w| w.eq_ignore_ascii_case(&username))
301 {
302 use azalea_chat::FormattedText;
303 use azalea_protocol::packets::config::c_disconnect::ClientboundDisconnect;
304 tracing::info!("'{username}' rejected: not whitelisted");
305 let _ = local
306 .connection
307 .write(ClientboundDisconnect {
308 reason: FormattedText::from("not on this proxy's whitelist"),
309 })
310 .await;
311 return Ok(());
312 }
313
314 let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
315
316 let mut guard = shared.registry.lock().await;
320
321 if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
322 drop(guard);
323 session::attach_viewer(&tx, id, local).await?;
324 tracing::info!("'{username}' attached as viewer (client {id})");
325 return Ok(());
326 }
327
328 tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
329 let up = upstream::connect(&shared.cfg).await?;
330 tracing::info!("upstream established as {}", up.profile.name);
331
332 *guard = Some(session::spawn(
333 up,
334 local,
335 id,
336 shared.pipeline.clone(),
337 shared.opts.clone(),
338 shared.events.clone(),
339 ));
340 Ok(())
341}