re_server/
server.rs

1#![expect(clippy::let_underscore_untyped)]
2#![expect(clippy::let_underscore_must_use)]
3
4use std::net::SocketAddr;
5use std::net::ToSocketAddrs as _;
6
7use tokio::net::TcpListener;
8use tokio::sync::mpsc;
9use tokio::sync::oneshot;
10use tokio::sync::oneshot::Sender;
11use tokio_stream::StreamExt as _;
12use tonic::service::Routes;
13use tonic::service::RoutesBuilder;
14use tracing::error;
15use tracing::info;
16
17// ---
18
19#[derive(thiserror::Error, Debug)]
20pub enum ServerError {
21    #[error("Ready channel closed unexpectedly")]
22    ReadyChannelClosedUnexpectedly,
23
24    #[error("Failed channel closed unexpectedly")]
25    FailedChannelClosedUnexpectedly,
26
27    #[error("Server failed to start: {reason}")]
28    ServerFailedToStart { reason: String },
29}
30
31/// An instance of a Redap gRPC server.
32///
33/// Use [`ServerBuilder`] to create a new instance.
34pub struct Server {
35    addr: SocketAddr,
36    routes: Routes,
37}
38
39/// `ServerHandle` is a tiny helper abstraction that enables us to
40/// deal with the gRPC server lifecycle more easily.
41pub struct ServerHandle {
42    shutdown: Option<Sender<()>>,
43    ready: mpsc::Receiver<SocketAddr>,
44    failed: mpsc::Receiver<String>,
45}
46
47impl ServerHandle {
48    /// Wait until the server is ready to accept connections (or failure occurs)
49    pub async fn wait_for_ready(&mut self) -> Result<SocketAddr, ServerError> {
50        tokio::select! {
51            ready = self.ready.recv() => {
52                match ready {
53                    Some(local_addr) => {
54                        info!("Ready for connections.");
55                        Ok(local_addr)
56                    },
57                    None => Err(ServerError::ReadyChannelClosedUnexpectedly)
58
59                }
60            }
61            failed = self.failed.recv() => {
62                match failed {
63                    Some(reason) => Err(ServerError::ServerFailedToStart { reason }),
64                    None => Err(ServerError::FailedChannelClosedUnexpectedly)
65
66                }
67            }
68        }
69    }
70
71    /// Wait until the server is shutdown.
72    pub async fn wait_for_shutdown(&mut self) {
73        self.failed.recv().await;
74    }
75
76    /// Signal to the gRPC server to shutdown, and then wait for it.
77    pub async fn shutdown_and_wait(mut self) {
78        if let Some(shutdown) = self.shutdown.take() {
79            shutdown.send(()).ok();
80            self.wait_for_shutdown().await;
81        }
82    }
83}
84
85impl Server {
86    /// Starts the server and return `ServerHandle` so that caller can manage
87    /// the server lifecycle.
88    pub fn start(self) -> ServerHandle {
89        let (ready_tx, ready_rx) = mpsc::channel(1);
90        let (failed_tx, failed_rx) = mpsc::channel(1);
91        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
92
93        tokio::spawn(async move {
94            let listener = if let Ok(listener) = TcpListener::bind(self.addr).await {
95                #[expect(clippy::unwrap_used)]
96                let local_addr = listener.local_addr().unwrap();
97                info!(
98                    "Listening on {local_addr}. To connect the Rerun Viewer, use the following address: rerun+http://{local_addr}"
99                );
100
101                #[expect(clippy::unwrap_used)]
102                ready_tx.send(local_addr).await.unwrap();
103                listener
104            } else {
105                error!("Failed to bind to address {}", self.addr);
106                #[expect(clippy::unwrap_used)]
107                failed_tx
108                    .send(format!("Failed to bind to address {}", self.addr))
109                    .await
110                    .unwrap();
111                return;
112            };
113
114            // NOTE: We already set NODELAY at the `tonic` layer just below, but that might
115            // or might not be good enough depending on a bunch of external conditions: make
116            // sure to disable Nagle's on every socket as soon as they're accepted, no
117            // matter what.
118            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener).map(|inc| {
119                let inc = inc?;
120                inc.set_nodelay(true)?;
121                Ok::<_, std::io::Error>(inc)
122            });
123
124            let middlewares = tower::ServiceBuilder::new()
125                .layer({
126                    let name = Some("rerun-oss".to_owned());
127                    let version = None;
128                    let is_client = false;
129                    re_protos::headers::new_rerun_headers_layer(name, version, is_client)
130                })
131                .layer(tower_http::cors::CorsLayer::permissive()) // Allow CORS for all origins (to support web clients)
132                .layer(tonic_web::GrpcWebLayer::new()) // Support `grpc-web` clients
133                .into_inner();
134
135            let mut builder = tonic::transport::Server::builder()
136                // NOTE: This NODELAY very likely does nothing because of the call to
137                // `serve_with_incoming_shutdown` below, but we better be on the defensive here so
138                // we don't get surprised when things inevitably change.
139                .tcp_nodelay(true)
140                .accept_http1(true)
141                .layer(middlewares);
142
143            let _ = builder
144                .add_routes(self.routes)
145                .serve_with_incoming_shutdown(incoming, async {
146                    shutdown_rx.await.ok();
147                })
148                .await
149                .map_err(|err| {
150                    error!("Server error: {err:#}");
151                    err
152                });
153
154            let _ = failed_tx.send("gRPC server stopped".to_owned()).await;
155        });
156
157        ServerHandle {
158            shutdown: Some(shutdown_tx),
159            ready: ready_rx,
160            failed: failed_rx,
161        }
162    }
163}
164
165const DEFAULT_ADDRESS: &str = "127.0.0.1:51234";
166
167/// Builder for the gRPC server instance.
168#[derive(Default)]
169pub struct ServerBuilder {
170    addr: Option<SocketAddr>,
171    routes_builder: RoutesBuilder,
172}
173
174impl ServerBuilder {
175    #[inline]
176    pub fn with_address(mut self, addr: SocketAddr) -> Self {
177        self.addr = Some(addr);
178        self
179    }
180
181    pub fn with_service<S>(mut self, svc: S) -> Self
182    where
183        S: tower_service::Service<
184                http::Request<tonic::body::Body>,
185                Response = http::Response<tonic::body::Body>,
186                Error = std::convert::Infallible,
187            > + tonic::server::NamedService
188            + Clone
189            + Send
190            + Sync
191            + 'static,
192        S::Future: Send + 'static,
193        S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
194    {
195        self.routes_builder.add_service(svc);
196        self
197    }
198
199    pub fn build(self) -> Server {
200        Server {
201            #[expect(clippy::unwrap_used)]
202            addr: self
203                .addr
204                .unwrap_or(DEFAULT_ADDRESS.to_socket_addrs().unwrap().next().unwrap()),
205            routes: self.routes_builder.routes(),
206        }
207    }
208}