#![expect(clippy::let_underscore_untyped)]
#![expect(clippy::let_underscore_must_use)]
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs as _};
use tokio::net::TcpListener;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt as _;
use tonic::service::{Routes, RoutesBuilder};
use tracing::{error, info};
#[derive(thiserror::Error, Debug)]
pub enum ServerError {
#[error("Server failed to start: {reason}")]
ServerFailedToStart { reason: String },
}
pub struct Server {
addr: SocketAddr,
routes: Routes,
artificial_latency: std::time::Duration,
bandwidth_limit: Option<u64>,
}
pub struct ServerHandle {
shutdown: Option<Sender<()>>,
ready: mpsc::Receiver<SocketAddr>,
failed: mpsc::Receiver<String>,
task: tokio::task::JoinHandle<()>,
}
impl ServerHandle {
pub async fn wait_for_ready(&mut self) -> Result<SocketAddr, ServerError> {
tokio::select! {
ready = self.ready.recv() => {
match ready {
Some(local_addr) => {
info!("Ready for connections.");
Ok(local_addr)
},
None => Err(ServerError::ServerFailedToStart {
reason: "ready channel closed unexpectedly".into(),
}),
}
}
failed = self.failed.recv() => {
match failed {
Some(reason) => Err(ServerError::ServerFailedToStart { reason }),
None => Err(ServerError::ServerFailedToStart {
reason: "failed channel closed unexpectedly".into(),
}),
}
}
result = &mut self.task => {
match result {
Ok(()) => Err(ServerError::ServerFailedToStart {
reason: "server task exited without signaling ready or failed".into(),
}),
Err(join_err) if join_err.is_panic() => Err(ServerError::ServerFailedToStart {
reason: format!("server task panicked: {join_err}"),
}),
Err(join_err) => Err(ServerError::ServerFailedToStart {
reason: format!("server task was cancelled: {join_err}"),
}),
}
}
}
}
pub async fn wait_for_shutdown(&mut self) {
self.failed.recv().await;
}
pub async fn shutdown_and_wait(mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).ok();
self.wait_for_shutdown().await;
}
}
}
impl Server {
pub fn start(self) -> ServerHandle {
let Self {
addr,
routes,
artificial_latency,
bandwidth_limit,
} = self;
let (ready_tx, ready_rx) = mpsc::channel(1);
let (failed_tx, failed_rx) = mpsc::channel(1);
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let listener = if let Ok(listener) = TcpListener::bind(addr).await {
#[expect(clippy::unwrap_used)]
let bind_addr = listener.local_addr().unwrap();
let mut connect_addr = bind_addr;
if connect_addr.ip().is_unspecified() {
if connect_addr.is_ipv4() {
connect_addr.set_ip(Ipv4Addr::LOCALHOST.into());
} else {
connect_addr.set_ip(Ipv6Addr::LOCALHOST.into());
}
}
info!(
"Listening on {bind_addr}. To connect the Rerun Viewer, use the following address: rerun+http://{connect_addr}"
);
#[expect(clippy::unwrap_used)]
ready_tx.send(connect_addr).await.unwrap();
listener
} else {
error!("Failed to bind to address {addr}");
#[expect(clippy::unwrap_used)]
failed_tx
.send(format!("Failed to bind to address {addr}"))
.await
.unwrap();
return;
};
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener).map(|inc| {
let inc = inc?;
inc.set_nodelay(true)?;
Ok::<_, std::io::Error>(inc)
});
let middlewares = tower::ServiceBuilder::new()
.layer({
let name = Some("rerun-oss".to_owned());
let version = None;
let is_client = false;
re_protos::headers::new_rerun_headers_layer(name, version, is_client)
})
.layer(tower_http::cors::CorsLayer::permissive()) .layer(crate::latency_layer::LatencyLayer::new(artificial_latency))
.layer(crate::bandwidth_layer::BandwidthLayer::new(bandwidth_limit))
.into_inner();
let mut builder = tonic::transport::Server::builder()
.tcp_nodelay(true)
.accept_http1(true)
.http2_adaptive_window(Some(true)) .layer(middlewares);
let _ = builder
.add_routes(routes)
.serve_with_incoming_shutdown(incoming, async {
shutdown_rx.await.ok();
})
.await
.map_err(|err| {
error!("Server error: {err:#}");
err
});
let _ = failed_tx.send("gRPC server stopped".to_owned()).await;
});
ServerHandle {
shutdown: Some(shutdown_tx),
ready: ready_rx,
failed: failed_rx,
task,
}
}
}
const DEFAULT_ADDRESS: &str = "127.0.0.1:51234";
#[derive(Default)]
pub struct ServerBuilder {
addr: Option<SocketAddr>,
routes_builder: RoutesBuilder,
axum_routes: axum::Router,
artificial_latency: std::time::Duration,
bandwidth_limit: Option<u64>,
}
impl ServerBuilder {
#[inline]
pub fn with_address(mut self, addr: SocketAddr) -> Self {
self.addr = Some(addr);
self
}
pub fn with_service<S>(mut self, svc: S) -> Self
where
S: tower_service::Service<
http::Request<tonic::body::Body>,
Response = http::Response<tonic::body::Body>,
Error = std::convert::Infallible,
> + tonic::server::NamedService
+ Clone
+ Send
+ Sync
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
{
self.routes_builder.add_service(svc);
self
}
pub fn with_http_route(mut self, path: &str, handler: axum::routing::MethodRouter) -> Self {
self.axum_routes = self.axum_routes.route(path, handler);
self
}
pub fn with_artificial_latency(mut self, artificial_latency: std::time::Duration) -> Self {
self.artificial_latency = artificial_latency;
self
}
pub fn with_bandwidth_limit(mut self, bytes_per_second: Option<u64>) -> Self {
self.bandwidth_limit = bytes_per_second;
self
}
pub fn build(self) -> Server {
let Self {
addr,
routes_builder,
axum_routes,
artificial_latency,
bandwidth_limit,
} = self;
let grpc_routes = routes_builder.routes();
let grpc_routes = grpc_routes.into_axum_router();
let grpc_routes = grpc_routes.layer(tonic_web::GrpcWebLayer::new());
let routes =
grpc_routes
.merge(axum_routes)
.fallback(|_req: axum::extract::Request| async {
use axum::response::IntoResponse as _;
http::StatusCode::NOT_FOUND.into_response()
});
Server {
#[expect(clippy::unwrap_used)]
addr: addr
.unwrap_or_else(|| DEFAULT_ADDRESS.to_socket_addrs().unwrap().next().unwrap()),
routes: routes.into(),
artificial_latency,
bandwidth_limit,
}
}
}