pub mod auth;
pub mod config;
pub mod connection;
pub mod crypto;
pub mod event;
pub mod rpc;
pub mod server;
pub mod status;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::net::TcpListener;
use tracing::{Instrument, error, info, info_span};
pub use config::Config;
pub use event::{EventManager, PlayerInfo, Plugin, PostOrder};
pub use server::{Server, ServerRegistry};
use connection::MinecraftConnection;
use crypto::ServerKeyPair;
use rpc::DeepslateService;
use rpc::proto::deepslate_server::DeepslateServer;
pub struct Proxy {
config: Arc<Config>,
registry: Arc<ServerRegistry>,
key_pair: Arc<ServerKeyPair>,
event_manager: Arc<EventManager>,
http_client: reqwest::Client,
}
impl Proxy {
#[must_use]
pub fn builder() -> ProxyBuilder {
ProxyBuilder {
config: None,
config_overrides: ConfigOverrides::default(),
bootstrap_servers: Vec::new(),
plugins: Vec::new(),
}
}
#[must_use]
pub const fn registry(&self) -> &Arc<ServerRegistry> {
&self.registry
}
#[must_use]
pub const fn event_manager(&self) -> &Arc<EventManager> {
&self.event_manager
}
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(self.config.listen_addr).await?;
info!(addr = %self.config.listen_addr, "proxy listening");
let grpc_addr = self.config.grpc_addr;
let grpc_registry = Arc::clone(&self.registry);
let grpc_handle = tokio::spawn(async move {
let grpc_service = DeepslateService::new(grpc_registry);
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(rpc::proto::FILE_DESCRIPTOR_SET)
.build_v1()
.expect("failed to build reflection service");
info!(addr = %grpc_addr, "gRPC control plane listening");
tonic::transport::Server::builder()
.add_service(DeepslateServer::new(grpc_service))
.add_service(reflection_service)
.serve(grpc_addr)
.await
});
let config = Arc::clone(&self.config);
let key_pair = Arc::clone(&self.key_pair);
let registry = Arc::clone(&self.registry);
let http_client = self.http_client.clone();
let event_manager = Arc::clone(&self.event_manager);
let accept_handle = tokio::spawn(async move {
let session_counter = AtomicUsize::new(0);
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let session_id = session_counter.fetch_add(1, Ordering::SeqCst);
let config = Arc::clone(&config);
let key_pair = Arc::clone(&key_pair);
let registry = Arc::clone(®istry);
let http_client = http_client.clone();
let event_manager = Arc::clone(&event_manager);
tokio::spawn(
async move {
let conn = MinecraftConnection::new(
stream,
libdeflater::CompressionLvl::new(config.compression_level)
.expect("validated in Config::validate"),
);
if let Err(e) = Box::pin(connection::client::handle_client(
conn,
addr,
config,
key_pair,
registry,
http_client,
event_manager,
))
.await
{
tracing::debug!(error = %e, "client connection error");
}
}
.instrument(info_span!(
"conn",
sid = session_id,
ip = %addr.ip(),
port = addr.port()
)),
);
}
Err(e) => {
error!(error = %e, "failed to accept connection");
}
}
}
});
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("received shutdown signal");
}
result = grpc_handle => {
if let Err(e) = result {
error!(error = %e, "gRPC server error");
}
}
_ = accept_handle => {}
}
info!("shutting down");
Ok(())
}
}
pub struct ProxyBuilder {
config: Option<Config>,
config_overrides: ConfigOverrides,
bootstrap_servers: Vec<Server>,
plugins: Vec<Box<dyn Plugin>>,
}
impl ProxyBuilder {
#[must_use]
pub fn config(mut self, config: Config) -> Self {
self.config = Some(config);
self
}
#[must_use]
pub const fn listen_addr(mut self, listen_addr: std::net::SocketAddr) -> Self {
self.config_overrides.listen_addr = Some(listen_addr);
self
}
#[must_use]
pub const fn grpc_addr(mut self, grpc_addr: std::net::SocketAddr) -> Self {
self.config_overrides.grpc_addr = Some(grpc_addr);
self
}
#[must_use]
pub const fn online_mode(mut self, online_mode: bool) -> Self {
self.config_overrides.online_mode = Some(online_mode);
self
}
#[must_use]
pub fn forwarding_secret(mut self, forwarding_secret: impl AsRef<[u8]>) -> Self {
self.config_overrides.forwarding_secret = Some(forwarding_secret.as_ref().to_vec());
self
}
#[must_use]
pub const fn compression_threshold(mut self, compression_threshold: i32) -> Self {
self.config_overrides.compression_threshold = Some(compression_threshold);
self
}
#[must_use]
pub const fn compression_level(mut self, compression_level: i32) -> Self {
self.config_overrides.compression_level = Some(compression_level);
self
}
#[must_use]
pub fn motd(mut self, motd: impl Into<String>) -> Self {
self.config_overrides.motd = Some(motd.into());
self
}
#[must_use]
pub const fn max_players(mut self, max_players: i32) -> Self {
self.config_overrides.max_players = Some(max_players);
self
}
#[must_use]
pub const fn read_timeout_ms(mut self, read_timeout_ms: u64) -> Self {
self.config_overrides.read_timeout_ms = Some(read_timeout_ms);
self
}
#[must_use]
pub fn try_servers<I, S>(mut self, try_servers: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.config_overrides.try_servers = Some(try_servers.into_iter().map(Into::into).collect());
self
}
#[must_use]
pub fn log_level(mut self, log_level: impl Into<String>) -> Self {
self.config_overrides.log_level = Some(log_level.into());
self
}
#[must_use]
pub const fn log_json(mut self, log_json: bool) -> Self {
self.config_overrides.log_json = Some(log_json);
self
}
#[must_use]
pub fn server(mut self, id: impl Into<String>, addr: impl Into<String>) -> Self {
self.bootstrap_servers.push(Server::new(id, addr));
self
}
#[must_use]
pub fn servers<I>(mut self, servers: I) -> Self
where
I: IntoIterator<Item = Server>,
{
self.bootstrap_servers.extend(servers);
self
}
#[must_use]
pub fn plugin(mut self, plugin: impl Plugin) -> Self {
self.plugins.push(Box::new(plugin));
self
}
pub fn build(self) -> Result<Proxy, Box<dyn std::error::Error + Send + Sync>> {
let mut config = match self.config {
Some(config) => config,
None if self.config_overrides.has_any() => Config::default(),
None => Config::from_env()?,
};
self.config_overrides.apply(&mut config);
let config = config.validate()?;
let key_pair = ServerKeyPair::generate()?;
info!("generated RSA key pair");
let mut event_manager = EventManager::new();
for plugin in &self.plugins {
plugin.register(&mut event_manager);
}
let http_client = reqwest::Client::builder()
.user_agent("Deepslate/0.1.0")
.build()?;
let registry = ServerRegistry::new();
for server in &self.bootstrap_servers {
if !registry.register(server) {
return Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
format!("server with ID '{}' already exists", server.id),
)
.into());
}
}
if !config.try_servers.is_empty() {
registry.set_try_order(config.try_servers.clone());
}
Ok(Proxy {
config: Arc::new(config),
registry: Arc::new(registry),
key_pair: Arc::new(key_pair),
event_manager: Arc::new(event_manager),
http_client,
})
}
}
#[derive(Default)]
struct ConfigOverrides {
listen_addr: Option<std::net::SocketAddr>,
grpc_addr: Option<std::net::SocketAddr>,
online_mode: Option<bool>,
forwarding_secret: Option<Vec<u8>>,
compression_threshold: Option<i32>,
compression_level: Option<i32>,
motd: Option<String>,
max_players: Option<i32>,
read_timeout_ms: Option<u64>,
try_servers: Option<Vec<String>>,
log_level: Option<String>,
log_json: Option<bool>,
}
impl ConfigOverrides {
const fn has_any(&self) -> bool {
self.listen_addr.is_some()
|| self.grpc_addr.is_some()
|| self.online_mode.is_some()
|| self.forwarding_secret.is_some()
|| self.compression_threshold.is_some()
|| self.compression_level.is_some()
|| self.motd.is_some()
|| self.max_players.is_some()
|| self.read_timeout_ms.is_some()
|| self.try_servers.is_some()
|| self.log_level.is_some()
|| self.log_json.is_some()
}
fn apply(self, config: &mut Config) {
if let Some(listen_addr) = self.listen_addr {
config.listen_addr = listen_addr;
}
if let Some(grpc_addr) = self.grpc_addr {
config.grpc_addr = grpc_addr;
}
if let Some(online_mode) = self.online_mode {
config.online_mode = online_mode;
}
if let Some(forwarding_secret) = self.forwarding_secret {
config.forwarding_secret = forwarding_secret;
}
if let Some(compression_threshold) = self.compression_threshold {
config.compression_threshold = compression_threshold;
}
if let Some(compression_level) = self.compression_level {
config.compression_level = compression_level;
}
if let Some(motd) = self.motd {
config.motd = motd;
}
if let Some(max_players) = self.max_players {
config.max_players = max_players;
}
if let Some(read_timeout_ms) = self.read_timeout_ms {
config.read_timeout_ms = read_timeout_ms;
}
if let Some(try_servers) = self.try_servers {
config.try_servers = try_servers;
}
if let Some(log_level) = self.log_level {
config.log_level = log_level;
}
if let Some(log_json) = self.log_json {
config.log_json = log_json;
}
}
}
pub fn init_tracing(config: &Config) {
use tracing_subscriber::EnvFilter;
let filter = EnvFilter::try_new(&config.log_level).unwrap_or_else(|_| EnvFilter::new("info"));
if config.log_json {
tracing_subscriber::fmt()
.with_env_filter(filter)
.json()
.init();
} else {
tracing_subscriber::fmt().with_env_filter(filter).init();
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, SocketAddr};
use super::*;
#[test]
fn builder_uses_code_config_without_env() {
let proxy = Proxy::builder()
.forwarding_secret("secret")
.listen_addr(SocketAddr::from((Ipv4Addr::LOCALHOST, 25_565)))
.grpc_addr(SocketAddr::from((Ipv4Addr::LOCALHOST, 25_577)))
.motd("Configured in code")
.build()
.unwrap();
assert_eq!(
proxy.config.listen_addr,
SocketAddr::from((Ipv4Addr::LOCALHOST, 25_565))
);
assert_eq!(
proxy.config.grpc_addr,
SocketAddr::from((Ipv4Addr::LOCALHOST, 25_577))
);
assert_eq!(proxy.config.motd, "Configured in code");
}
#[test]
fn builder_rejects_invalid_manual_config() {
let result = Proxy::builder()
.forwarding_secret("secret")
.compression_level(13)
.build();
assert!(result.is_err());
}
#[test]
fn builder_bootstraps_servers_and_try_order() {
let proxy = Proxy::builder()
.forwarding_secret("secret")
.server("lobby", "127.0.0.1:25566")
.server("survival", "127.0.0.1:25567")
.try_servers(["survival", "lobby"])
.build()
.unwrap();
assert_eq!(proxy.registry.list().len(), 2);
assert_eq!(proxy.registry.try_order(), vec!["survival", "lobby"]);
assert_eq!(proxy.registry.select_initial().unwrap().id, "survival");
}
#[test]
fn builder_overrides_explicit_config_fields() {
let proxy = Proxy::builder()
.config(Config {
forwarding_secret: b"base-secret".to_vec(),
motd: "Base MOTD".to_string(),
..Config::default()
})
.forwarding_secret("override-secret")
.motd("Override MOTD")
.build()
.unwrap();
assert_eq!(proxy.config.forwarding_secret, b"override-secret".to_vec());
assert_eq!(proxy.config.motd, "Override MOTD");
}
}