1#![expect(clippy::let_underscore_untyped)]
2#![expect(clippy::let_underscore_must_use)]
3
4use std::net::SocketAddr;
5use std::net::ToSocketAddrs as _;
6
7use tokio::net::TcpListener;
8use tokio::sync::mpsc;
9use tokio::sync::oneshot;
10use tokio::sync::oneshot::Sender;
11use tokio_stream::StreamExt as _;
12use tonic::service::Routes;
13use tonic::service::RoutesBuilder;
14use tracing::error;
15use tracing::info;
16
17#[derive(thiserror::Error, Debug)]
20pub enum ServerError {
21 #[error("Ready channel closed unexpectedly")]
22 ReadyChannelClosedUnexpectedly,
23
24 #[error("Failed channel closed unexpectedly")]
25 FailedChannelClosedUnexpectedly,
26
27 #[error("Server failed to start: {reason}")]
28 ServerFailedToStart { reason: String },
29}
30
31pub struct Server {
35 addr: SocketAddr,
36 routes: Routes,
37}
38
39pub struct ServerHandle {
42 shutdown: Option<Sender<()>>,
43 ready: mpsc::Receiver<SocketAddr>,
44 failed: mpsc::Receiver<String>,
45}
46
47impl ServerHandle {
48 pub async fn wait_for_ready(&mut self) -> Result<SocketAddr, ServerError> {
50 tokio::select! {
51 ready = self.ready.recv() => {
52 match ready {
53 Some(local_addr) => {
54 info!("Ready for connections.");
55 Ok(local_addr)
56 },
57 None => Err(ServerError::ReadyChannelClosedUnexpectedly)
58
59 }
60 }
61 failed = self.failed.recv() => {
62 match failed {
63 Some(reason) => Err(ServerError::ServerFailedToStart { reason }),
64 None => Err(ServerError::FailedChannelClosedUnexpectedly)
65
66 }
67 }
68 }
69 }
70
71 pub async fn wait_for_shutdown(&mut self) {
73 self.failed.recv().await;
74 }
75
76 pub async fn shutdown_and_wait(mut self) {
78 if let Some(shutdown) = self.shutdown.take() {
79 shutdown.send(()).ok();
80 self.wait_for_shutdown().await;
81 }
82 }
83}
84
85impl Server {
86 pub fn start(self) -> ServerHandle {
89 let (ready_tx, ready_rx) = mpsc::channel(1);
90 let (failed_tx, failed_rx) = mpsc::channel(1);
91 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
92
93 tokio::spawn(async move {
94 let listener = if let Ok(listener) = TcpListener::bind(self.addr).await {
95 #[expect(clippy::unwrap_used)]
96 let local_addr = listener.local_addr().unwrap();
97 info!(
98 "Listening on {local_addr}. To connect the Rerun Viewer, use the following address: rerun+http://{local_addr}"
99 );
100
101 #[expect(clippy::unwrap_used)]
102 ready_tx.send(local_addr).await.unwrap();
103 listener
104 } else {
105 error!("Failed to bind to address {}", self.addr);
106 #[expect(clippy::unwrap_used)]
107 failed_tx
108 .send(format!("Failed to bind to address {}", self.addr))
109 .await
110 .unwrap();
111 return;
112 };
113
114 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener).map(|inc| {
119 let inc = inc?;
120 inc.set_nodelay(true)?;
121 Ok::<_, std::io::Error>(inc)
122 });
123
124 let middlewares = tower::ServiceBuilder::new()
125 .layer({
126 let name = Some("rerun-oss".to_owned());
127 let version = None;
128 let is_client = false;
129 re_protos::headers::new_rerun_headers_layer(name, version, is_client)
130 })
131 .layer(tower_http::cors::CorsLayer::permissive()) .layer(tonic_web::GrpcWebLayer::new()) .into_inner();
134
135 let mut builder = tonic::transport::Server::builder()
136 .tcp_nodelay(true)
140 .accept_http1(true)
141 .layer(middlewares);
142
143 let _ = builder
144 .add_routes(self.routes)
145 .serve_with_incoming_shutdown(incoming, async {
146 shutdown_rx.await.ok();
147 })
148 .await
149 .map_err(|err| {
150 error!("Server error: {err:#}");
151 err
152 });
153
154 let _ = failed_tx.send("gRPC server stopped".to_owned()).await;
155 });
156
157 ServerHandle {
158 shutdown: Some(shutdown_tx),
159 ready: ready_rx,
160 failed: failed_rx,
161 }
162 }
163}
164
165const DEFAULT_ADDRESS: &str = "127.0.0.1:51234";
166
167#[derive(Default)]
169pub struct ServerBuilder {
170 addr: Option<SocketAddr>,
171 routes_builder: RoutesBuilder,
172}
173
174impl ServerBuilder {
175 #[inline]
176 pub fn with_address(mut self, addr: SocketAddr) -> Self {
177 self.addr = Some(addr);
178 self
179 }
180
181 pub fn with_service<S>(mut self, svc: S) -> Self
182 where
183 S: tower_service::Service<
184 http::Request<tonic::body::Body>,
185 Response = http::Response<tonic::body::Body>,
186 Error = std::convert::Infallible,
187 > + tonic::server::NamedService
188 + Clone
189 + Send
190 + Sync
191 + 'static,
192 S::Future: Send + 'static,
193 S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
194 {
195 self.routes_builder.add_service(svc);
196 self
197 }
198
199 pub fn build(self) -> Server {
200 Server {
201 #[expect(clippy::unwrap_used)]
202 addr: self
203 .addr
204 .unwrap_or(DEFAULT_ADDRESS.to_socket_addrs().unwrap().next().unwrap()),
205 routes: self.routes_builder.routes(),
206 }
207 }
208}