mod ids;
mod local_server;
pub mod plugin;
mod reflect;
mod relay;
mod session;
mod snapshot;
mod upstream;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use eyre::Result;
use tokio::sync::{Mutex, mpsc};
use tokio::task::JoinHandle;
pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
pub struct ProxyBuilder {
bind: String,
target_host: String,
target_port: u16,
email: String,
auth_cache: Option<PathBuf>,
plugins: Vec<Box<dyn ProxyPlugin>>,
}
impl Default for ProxyBuilder {
fn default() -> Self {
Self {
bind: "127.0.0.1:25566".into(),
target_host: "localhost".into(),
target_port: 25565,
email: String::new(),
auth_cache: None,
plugins: Vec::new(),
}
}
}
impl ProxyBuilder {
pub fn bind(mut self, addr: impl Into<String>) -> Self {
self.bind = addr.into();
self
}
pub fn target(mut self, host: impl Into<String>) -> Self {
let host = host.into();
match host.rsplit_once(':') {
Some((h, p)) if p.parse::<u16>().is_ok() => {
self.target_host = h.to_string();
self.target_port = p.parse().unwrap();
}
_ => self.target_host = host,
}
self
}
pub fn email(mut self, email: impl Into<String>) -> Self {
self.email = email.into();
self
}
pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
self.auth_cache = Some(path.into());
self
}
pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
self.plugins.push(p);
self
}
pub async fn spawn(self) -> Result<ReflectionProxy> {
if self.email.is_empty() {
eyre::bail!("ProxyBuilder::email is required");
}
let listener = local_server::listen(&local_server::LocalServerConfig {
bind: self.bind.clone(),
})
.await?;
let local_addr = listener.local_addr()?;
let cfg = Arc::new(upstream::UpstreamConfig {
host: self.target_host,
port: self.target_port,
email: self.email,
auth_cache: self.auth_cache,
});
let pipeline = Arc::new(Pipeline {
plugins: self.plugins,
});
let registry: SessionRegistry = Arc::new(Mutex::new(None));
let accept_task = tokio::spawn(accept_loop(listener, cfg, pipeline, registry));
Ok(ReflectionProxy {
local_addr,
accept_task,
})
}
}
pub struct ReflectionProxy {
local_addr: SocketAddr,
accept_task: JoinHandle<()>,
}
impl ReflectionProxy {
pub fn builder() -> ProxyBuilder {
ProxyBuilder::default()
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn shutdown(&self) {
self.accept_task.abort();
}
pub async fn wait(self) {
let _ = self.accept_task.await;
}
}
type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
async fn accept_loop(
listener: tokio::net::TcpListener,
cfg: Arc<upstream::UpstreamConfig>,
pipeline: Arc<Pipeline>,
registry: SessionRegistry,
) {
loop {
let (stream, addr) = match listener.accept().await {
Ok(x) => x,
Err(e) => {
tracing::error!("accept failed: {e}");
break;
}
};
tracing::info!("connection from {addr}");
let (cfg, pipeline, registry) = (cfg.clone(), pipeline.clone(), registry.clone());
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, cfg, pipeline, registry).await {
tracing::info!("connection ended: {e:#}");
}
});
}
}
async fn handle_connection(
stream: tokio::net::TcpStream,
cfg: Arc<upstream::UpstreamConfig>,
pipeline: Arc<Pipeline>,
registry: SessionRegistry,
) -> Result<()> {
let local = local_server::accept_login(stream).await?;
let username = local.username.clone();
let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
let mut guard = registry.lock().await;
if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
drop(guard);
session::attach_viewer(&tx, id, local).await?;
tracing::info!("'{username}' attached as viewer (client {id})");
return Ok(());
}
tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
let up = upstream::connect(&cfg).await?;
tracing::info!("upstream established as {}", up.profile.name);
*guard = Some(session::spawn(up, local, id, pipeline));
Ok(())
}