use aetheris_protocol::auth::v1::auth_service_server::AuthServiceServer;
use aetheris_protocol::matchmaking::v1::matchmaking_service_server::MatchmakingServiceServer;
use aetheris_protocol::telemetry::v1::telemetry_service_server::TelemetryServiceServer;
use aetheris_server::{
auth::AuthServiceImpl,
auth::email::{EmailSender, LettreSmtpEmailSender, LogEmailSender, ResendEmailSender},
config::ServerConfig,
matchmaking::MatchmakingServiceImpl,
telemetry::{AetherisTelemetryService, json_telemetry_handler},
};
use axum::Router;
use axum::routing::post;
use std::sync::Arc;
use tokio::sync::broadcast;
use tonic::codegen::http::{Method, header};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tower_http::cors::{Any, CorsLayer};
#[cfg(feature = "phase1")]
use aetheris_ecs_bevy::BevyWorldAdapter;
#[cfg(feature = "phase1")]
use aetheris_encoder_serde::SerdeEncoder;
#[cfg(feature = "phase1")]
use aetheris_server::MultiTransport;
#[cfg(feature = "phase1")]
use aetheris_server::TickScheduler;
#[cfg(feature = "phase1")]
use aetheris_transport_renet::RenetTransport;
#[cfg(feature = "phase1")]
use aetheris_transport_webtransport::WebTransportBridge;
#[cfg(not(target_arch = "wasm32"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls provider");
let _provider = {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let log_format = std::env::var("LOG_FORMAT").unwrap_or_else(|_| "text".to_string());
let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:4317".to_string());
let build_exporter = || -> Result<_, Box<dyn std::error::Error + Send + Sync>> {
use opentelemetry_otlp::WithExportConfig;
Ok(opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&otlp_endpoint)
.build()?)
};
let (otlp_layer, provider) = match build_exporter() {
Ok(exporter) => {
use opentelemetry_sdk::trace::SdkTracerProvider;
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_attributes(vec![opentelemetry::KeyValue::new(
"service.name",
"aetheris-server",
)])
.build(),
)
.build();
let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "aetheris");
(
Some(tracing_opentelemetry::layer().with_tracer(tracer)),
Some(provider),
)
}
Err(_e) => (None, None),
};
let fmt_layer_json = (log_format == "json")
.then(|| tracing_subscriber::fmt::layer().json().with_target(true));
let fmt_layer_text =
(log_format != "json").then(|| tracing_subscriber::fmt::layer().with_target(true));
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer_json)
.with(fmt_layer_text)
.with(otlp_layer)
.init();
if let Some(_p) = &provider {
tracing::info!(otlp_endpoint = %otlp_endpoint, "OTLP tracing initialised");
} else {
tracing::warn!(otlp_endpoint = %otlp_endpoint, "OTLP init failed, tracing continues without Jaeger");
}
provider
};
let config = ServerConfig::load();
metrics_exporter_prometheus::PrometheusBuilder::new()
.with_http_listener(([0, 0, 0, 0], config.metrics_port))
.set_buckets(&[
0.0001, 0.00025, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0,
2.0, 5.0, 10.0,
])
.expect("Failed to set buckets")
.install()
.expect("Failed to install Prometheus metrics exporter");
tracing::info!(
"Prometheus metrics exporter listening on :{}/metrics",
config.metrics_port
);
let grpc_addr =
std::env::var("AETHERIS_GRPC_ADDR").unwrap_or_else(|_| "0.0.0.0:50051".to_string());
let addr = grpc_addr.parse()?;
tracing::info!("Aetheris Control Plane listening on {}", addr);
let sender_type = std::env::var("OTP_EMAIL_SENDER").unwrap_or_else(|_| "log".to_string());
let env = std::env::var("AETHERIS_ENV").unwrap_or_else(|_| "dev".to_string());
let email_sender: Arc<dyn EmailSender> = match sender_type.as_str() {
"log" => {
if env == "production" {
return Err("OTP_EMAIL_SENDER=log is forbidden in AETHERIS_ENV=production".into());
}
Arc::new(LogEmailSender)
}
"smtp" => Arc::new(
LettreSmtpEmailSender::from_env()
.map_err(|e| format!("Failed to initialize SMTP sender: {e}"))?,
),
"resend" => Arc::new(
ResendEmailSender::from_env()
.map_err(|e| format!("Failed to initialize Resend sender: {e}"))?,
),
other => {
return Err(format!("Unknown OTP_EMAIL_SENDER: {other}").into());
}
};
let auth_service = Arc::new(AuthServiceImpl::new(email_sender).await);
let matchmaking_service = MatchmakingServiceImpl::new(auth_service.clone());
let telemetry_service = AetherisTelemetryService::new();
let (shutdown_tx, _) = broadcast::channel::<()>(1);
{
let telemetry_svc_clone = telemetry_service.clone();
let mut shutdown_http = shutdown_tx.subscribe();
let http_port: u16 = std::env::var("AETHERIS_TELEMETRY_HTTP_PORT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(50055);
let http_addr = std::net::SocketAddr::from(([0, 0, 0, 0], http_port));
let cors_http = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::POST, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE]);
let app = Router::new()
.route("/telemetry/json", post(json_telemetry_handler))
.layer(cors_http)
.with_state(telemetry_svc_clone)
.into_make_service_with_connect_info::<std::net::SocketAddr>();
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(http_addr)
.await
.expect("Failed to bind telemetry HTTP server");
tracing::info!("Telemetry JSON endpoint on :{}/telemetry/json", http_port);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_http.recv().await;
})
.await
.ok();
});
}
let use_tls = std::env::var("AETHERIS_GRPC_TLS")
.map(|v| {
let v = v.to_lowercase();
v == "1" || v == "true" || v == "yes" || v == "on"
})
.unwrap_or(false);
if env == "production" {
if !use_tls {
return Err("AETHERIS_GRPC_TLS=true is REQUIRED in AETHERIS_ENV=production".into());
}
tracing::info!("Production mode verified: Secure transport enabled.");
}
#[cfg(feature = "phase1")]
{
let encode_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(config.encode_threads)
.thread_name(|i| format!("aetheris-encode-{}", i))
.build()
.expect("Failed to build encode thread pool"),
);
let mut transport = MultiTransport::new();
let renet_addr_str =
std::env::var("AETHERIS_RENET_ADDR").unwrap_or_else(|_| "0.0.0.0:5000".to_string());
let renet_addr = renet_addr_str.parse()?;
let mut renet_config = aetheris_transport_renet::RenetServerConfig::default();
if std::env::var("AETHERIS_AUTH_BYPASS").is_ok_and(|v| {
let v = v.to_lowercase();
v == "1" || v == "true" || v == "yes" || v == "on"
}) {
renet_config.max_new_connections_per_second = 100;
}
renet_config.max_unreliable_channel_memory_bytes = 5 * 1024 * 1024;
let renet = RenetTransport::new_server(renet_addr, Some(renet_config))?;
transport.add_transport(Box::new(renet));
tracing::info!(
"Aetheris Game Server (Renet/UDP) listening on {}",
renet_addr
);
let wt_addr_str =
std::env::var("AETHERIS_WT_ADDR").unwrap_or_else(|_| "[::]:4433".to_string());
let wt_addr = wt_addr_str.parse()?;
let auth_service_wt = auth_service.clone();
let wt_validator = Arc::new(move |token: &str| -> bool {
let authorized = auth_service_wt.is_authorized(token);
if authorized {
metrics::counter!("aetheris_webtransport_auth_allowed_total").increment(1);
} else {
metrics::counter!("aetheris_webtransport_auth_denied_total").increment(1);
}
authorized
});
use tracing::Instrument;
let wt = WebTransportBridge::new(wt_addr, Some(wt_validator))
.instrument(tracing::info_span!("webtransport_init", addr = %wt_addr))
.await;
transport.add_transport(Box::new(wt));
let config = aetheris_server::config::ServerConfig::load();
let tick_rate = config.tick_rate;
let mut world = BevyWorldAdapter::new(bevy_ecs::world::World::new(), tick_rate);
let mut registry = aetheris_ecs_bevy::registry::ComponentRegistry::new();
aetheris_ecs_bevy::registry::register_platform_components(&mut registry);
for descriptor in registry.components.values() {
world.register_replicator(descriptor.replicator.clone());
}
let encoder = SerdeEncoder::new();
let mut scheduler = TickScheduler::new(
tick_rate,
auth_service.clone() as Arc<dyn aetheris_server::auth::AuthSessionVerifier>,
encode_pool,
);
let shutdown_clone = shutdown_tx.subscribe();
let scheduler_handle = tokio::spawn(async move {
scheduler
.run(
Box::new(transport),
Box::new(world),
Box::new(encoder),
shutdown_clone,
)
.await;
});
{
let mut shutdown_mon = shutdown_tx.subscribe();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tokio::select! {
_ = interval.tick() => {
if let (Ok(allocated), Ok(resident)) = (
tikv_jemalloc_ctl::stats::allocated::read(),
tikv_jemalloc_ctl::stats::resident::read(),
) {
metrics::gauge!("aetheris_memory_allocated_bytes").set(allocated as f64);
metrics::gauge!("aetheris_memory_resident_bytes").set(resident as f64);
}
}
_ = shutdown_mon.recv() => break,
}
}
});
}
let shutdown_auth_tx = shutdown_tx.clone();
tokio::spawn(
async move {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler");
let mut sigint =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler");
tokio::select! {
_ = sigterm.recv() => tracing::info!("SIGTERM received, triggering cancellation..."),
_ = sigint.recv() => tracing::info!("SIGINT received, triggering cancellation..."),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
tracing::info!("Shutdown signal received, triggering cancellation...");
}
let _ = shutdown_auth_tx.send(());
}
.instrument(tracing::info_span!("shutdown_watcher")),
);
tracing::info!(
"Aetheris Game Server (WebTransport) listening on {}",
wt_addr
);
let cors = if env == "production" {
CorsLayer::new().allow_origin(
std::env::var("ALLOWED_ORIGINS")
.unwrap_or_else(|_| "https://aetheris.io".to_string())
.parse::<header::HeaderValue>()
.map_err(|e| format!("Invalid ALLOWED_ORIGINS: {e}"))?,
)
} else {
CorsLayer::new().allow_origin(Any).allow_credentials(false)
};
let cors = cors
.allow_methods([Method::POST, Method::OPTIONS])
.allow_headers([
header::CONTENT_TYPE,
header::USER_AGENT,
header::HeaderName::from_static("x-grpc-web"),
header::HeaderName::from_static("x-user-agent"),
])
.expose_headers([
header::HeaderName::from_static("grpc-status"),
header::HeaderName::from_static("grpc-message"),
header::HeaderName::from_static("grpc-status-details-bin"),
]);
let mut builder = Server::builder()
.accept_http1(true)
.layer(cors)
.layer(tonic_web::GrpcWebLayer::new());
if use_tls {
let (cert_path, key_path) = get_tls_paths();
let tls_config = build_tls_config(&cert_path, &key_path).await?;
builder = builder.tls_config(tls_config)?;
tracing::info!("Aetheris Control Plane (TLS Enabled) listening on {}", addr);
} else {
tracing::info!("Aetheris Control Plane (Insecure) listening on {}", addr);
}
let router = register_services(
builder,
auth_service,
matchmaking_service,
telemetry_service,
);
router
.serve_with_shutdown(addr, async move {
let _ = shutdown_tx.subscribe().recv().await;
})
.await?;
tracing::info!("gRPC server drained, waiting for scheduler...");
let _ = scheduler_handle.await;
}
#[cfg(not(feature = "phase1"))]
{
let shutdown_fallback_tx = shutdown_tx.clone();
use tracing::Instrument;
tokio::spawn(
async move {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler");
let mut sigint =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler");
tokio::select! {
_ = sigterm.recv() => tracing::info!("SIGTERM received, triggering cancellation..."),
_ = sigint.recv() => tracing::info!("SIGINT received, triggering cancellation..."),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
tracing::info!("Shutdown signal received, triggering cancellation...");
}
let _ = shutdown_fallback_tx.send(());
}
.instrument(tracing::info_span!("shutdown_watcher_fallback")),
);
let cors = if env == "production" {
CorsLayer::new().allow_origin(
std::env::var("ALLOWED_ORIGINS")
.unwrap_or_else(|_| "https://aetheris.io".to_string())
.parse::<header::HeaderValue>()
.map_err(|e| format!("Invalid ALLOWED_ORIGINS: {e}"))?,
)
} else {
CorsLayer::new().allow_origin(Any)
};
let cors = cors
.allow_methods([Method::POST, Method::OPTIONS])
.allow_headers([
header::CONTENT_TYPE,
header::USER_AGENT,
header::HeaderName::from_static("x-grpc-web"),
header::HeaderName::from_static("x-user-agent"),
])
.expose_headers([
header::HeaderName::from_static("grpc-status"),
header::HeaderName::from_static("grpc-message"),
header::HeaderName::from_static("grpc-status-details-bin"),
]);
let mut builder = Server::builder()
.accept_http1(true)
.layer(cors)
.layer(tonic_web::GrpcWebLayer::new());
if use_tls {
let (cert_path, key_path) = get_tls_paths();
let tls_config = build_tls_config(&cert_path, &key_path).await?;
builder = builder.tls_config(tls_config)?;
tracing::info!("Aetheris Control Plane (TLS Enabled) listening on {}", addr);
} else {
tracing::info!("Aetheris Control Plane (Insecure) listening on {}", addr);
}
let router = register_services(
builder,
auth_service,
matchmaking_service,
telemetry_service,
);
router
.serve_with_shutdown(addr, async move {
let _ = shutdown_tx.subscribe().recv().await;
})
.await?;
}
tracing::info!("Aetheris Server shutdown complete. Telemetry will flush on exit.");
Ok(())
}
fn get_tls_paths() -> (String, String) {
let cert_path = std::env::var("AETHERIS_GRPC_TLS_CERT_PATH")
.unwrap_or_else(|_| "target/dev-certs/cert.pem".to_string());
let key_path = std::env::var("AETHERIS_GRPC_TLS_KEY_PATH")
.unwrap_or_else(|_| "target/dev-certs/key.pem".to_string());
(cert_path, key_path)
}
async fn build_tls_config(cert_path: &str, key_path: &str) -> Result<ServerTlsConfig, String> {
let cert = tokio::fs::read(cert_path)
.await
.map_err(|e| format!("Failed to read TLS cert from {cert_path}: {e}"))?;
let key = tokio::fs::read(key_path)
.await
.map_err(|e| format!("Failed to read TLS key from {key_path}: {e}"))?;
let identity = Identity::from_pem(cert, key);
Ok(ServerTlsConfig::new().identity(identity))
}
fn register_services<L: Clone>(
mut builder: Server<L>,
auth_service: Arc<AuthServiceImpl>,
matchmaking_service: MatchmakingServiceImpl,
telemetry_service: AetherisTelemetryService,
) -> tonic::transport::server::Router<L> {
builder
.add_service(AuthServiceServer::new(auth_service.as_ref().clone()))
.add_service(MatchmakingServiceServer::new(matchmaking_service))
.add_service(TelemetryServiceServer::new(telemetry_service))
}