mod ids;
mod local_server;
pub mod plugin;
mod reflect;
mod relay;
mod session;
mod snapshot;
mod upstream;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use eyre::Result;
use tokio::sync::{Mutex, broadcast, mpsc};
use tokio::task::JoinHandle;
pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
pub use session::ClientId;
#[derive(Clone, Debug)]
pub enum ProxyEvent {
SessionStarted,
SessionEnded,
ClientJoined {
id: ClientId,
username: String,
},
ClientLeft {
id: ClientId,
username: String,
},
ControlChanged {
controller: Option<(ClientId, String)>,
},
}
pub struct ProxyBuilder {
bind: String,
target_host: String,
target_port: u16,
email: String,
auth_cache: Option<PathBuf>,
plugins: Vec<Box<dyn ProxyPlugin>>,
whitelist: Vec<String>,
max_clients: Option<usize>,
always_first_control: bool,
}
impl Default for ProxyBuilder {
fn default() -> Self {
Self {
bind: "127.0.0.1:25566".into(),
target_host: "localhost".into(),
target_port: 25565,
email: String::new(),
auth_cache: None,
plugins: Vec::new(),
whitelist: Vec::new(),
max_clients: None,
always_first_control: false,
}
}
}
impl ProxyBuilder {
pub fn bind(mut self, addr: impl Into<String>) -> Self {
self.bind = addr.into();
self
}
pub fn target(mut self, host: impl Into<String>) -> Self {
let host = host.into();
match host.rsplit_once(':') {
Some((h, p)) if p.parse::<u16>().is_ok() => {
self.target_host = h.to_string();
self.target_port = p.parse().unwrap();
}
_ => self.target_host = host,
}
self
}
pub fn email(mut self, email: impl Into<String>) -> Self {
self.email = email.into();
self
}
pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
self.auth_cache = Some(path.into());
self
}
pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
self.plugins.push(p);
self
}
pub fn whitelist<I: IntoIterator<Item = S>, S: Into<String>>(mut self, names: I) -> Self {
self.whitelist = names.into_iter().map(Into::into).collect();
self
}
pub fn max_clients(mut self, max: usize) -> Self {
self.max_clients = Some(max);
self
}
pub fn always_first_control(mut self, on: bool) -> Self {
self.always_first_control = on;
self
}
pub async fn spawn(self) -> Result<ReflectionProxy> {
if self.email.is_empty() {
eyre::bail!("ProxyBuilder::email is required");
}
let listener = local_server::listen(&local_server::LocalServerConfig {
bind: self.bind.clone(),
})
.await?;
let local_addr = listener.local_addr()?;
let cfg = Arc::new(upstream::UpstreamConfig {
host: self.target_host,
port: self.target_port,
email: self.email,
auth_cache: self.auth_cache,
});
let pipeline = Arc::new(Pipeline {
plugins: self.plugins,
});
let registry: SessionRegistry = Arc::new(Mutex::new(None));
let (events_tx, _) = broadcast::channel(256);
let shared = Arc::new(Shared {
cfg,
pipeline,
registry,
events: events_tx.clone(),
whitelist: self.whitelist,
opts: session::SessionOpts {
max_clients: self.max_clients,
always_first_control: self.always_first_control,
},
});
let accept_task = tokio::spawn(accept_loop(listener, shared));
Ok(ReflectionProxy {
local_addr,
accept_task,
events: events_tx,
})
}
}
pub struct ReflectionProxy {
local_addr: SocketAddr,
accept_task: JoinHandle<()>,
events: broadcast::Sender<ProxyEvent>,
}
impl ReflectionProxy {
pub fn builder() -> ProxyBuilder {
ProxyBuilder::default()
}
pub fn subscribe(&self) -> broadcast::Receiver<ProxyEvent> {
self.events.subscribe()
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn shutdown(&self) {
self.accept_task.abort();
}
pub async fn wait(self) {
let _ = self.accept_task.await;
}
}
type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
struct Shared {
cfg: Arc<upstream::UpstreamConfig>,
pipeline: Arc<Pipeline>,
registry: SessionRegistry,
events: broadcast::Sender<ProxyEvent>,
whitelist: Vec<String>,
opts: session::SessionOpts,
}
static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
async fn accept_loop(listener: tokio::net::TcpListener, shared: Arc<Shared>) {
loop {
let (stream, addr) = match listener.accept().await {
Ok(x) => x,
Err(e) => {
tracing::error!("accept failed: {e}");
break;
}
};
tracing::info!("connection from {addr}");
let shared = shared.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, shared).await {
tracing::info!("connection ended: {e:#}");
}
});
}
}
async fn handle_connection(stream: tokio::net::TcpStream, shared: Arc<Shared>) -> Result<()> {
let mut local = local_server::accept_login(stream).await?;
let username = local.username.clone();
if !shared.whitelist.is_empty()
&& !shared
.whitelist
.iter()
.any(|w| w.eq_ignore_ascii_case(&username))
{
use azalea_chat::FormattedText;
use azalea_protocol::packets::config::c_disconnect::ClientboundDisconnect;
tracing::info!("'{username}' rejected: not whitelisted");
let _ = local
.connection
.write(ClientboundDisconnect {
reason: FormattedText::from("not on this proxy's whitelist"),
})
.await;
return Ok(());
}
let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
let mut guard = shared.registry.lock().await;
if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
drop(guard);
session::attach_viewer(&tx, id, local).await?;
tracing::info!("'{username}' attached as viewer (client {id})");
return Ok(());
}
tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
let up = upstream::connect(&shared.cfg).await?;
tracing::info!("upstream established as {}", up.profile.name);
*guard = Some(session::spawn(
up,
local,
id,
shared.pipeline.clone(),
shared.opts.clone(),
shared.events.clone(),
));
Ok(())
}