re_sdk/
web_viewer.rs

1use re_chunk::ChunkBatcherConfig;
2use re_log_types::LogMsg;
3use re_web_viewer_server::{WebViewerServer, WebViewerServerError, WebViewerServerPort};
4
5use crate::log_sink::SinkFlushError;
6
7// ----------------------------------------------------------------------------
8
9/// Failure to host a web viewer and/or Rerun server.
10#[derive(thiserror::Error, Debug)]
11pub enum WebViewerSinkError {
12    /// Failure to host the web viewer.
13    #[error(transparent)]
14    WebViewerServer(#[from] WebViewerServerError),
15
16    /// Invalid host IP.
17    #[error(transparent)]
18    InvalidAddress(#[from] std::net::AddrParseError),
19}
20
21/// A [`crate::sink::LogSink`] tied to a hosted Rerun web viewer. This internally stores two servers:
22/// * A gRPC server to relay messages from the sink to any connected web viewers
23/// * A [`WebViewerServer`] to serve the Wasm+HTML
24struct WebViewerSink {
25    open_browser: bool,
26
27    /// Sender to send messages to the gRPC server.
28    sender: re_smart_channel::Sender<LogMsg>,
29
30    /// The gRPC server thread.
31    _server_handle: std::thread::JoinHandle<()>,
32
33    /// Signal used to gracefully shutdown the gRPC server.
34    server_shutdown_signal: re_grpc_server::shutdown::Signal,
35
36    /// The http server serving wasm & html.
37    _webviewer_server: WebViewerServer,
38}
39
40impl WebViewerSink {
41    /// A `bind_ip` of `"0.0.0.0"` is a good default.
42    pub fn new(
43        open_browser: bool,
44        bind_ip: &str,
45        web_port: WebViewerServerPort,
46        grpc_port: u16,
47        server_options: re_grpc_server::ServerOptions,
48    ) -> Result<Self, WebViewerSinkError> {
49        let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
50
51        let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
52        let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
53            re_uri::Scheme::RerunHttp,
54            grpc_server_addr,
55        ));
56        let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
57            re_smart_channel::SmartMessageSource::MessageProxy(uri),
58            re_smart_channel::SmartChannelSource::Sdk,
59        );
60        let server_handle = std::thread::Builder::new()
61            .name("message_proxy_server".to_owned())
62            .spawn(move || {
63                let mut builder = tokio::runtime::Builder::new_current_thread();
64                builder.enable_all();
65                let rt = builder.build().expect("failed to build tokio runtime");
66
67                rt.block_on(re_grpc_server::serve_from_channel(
68                    grpc_server_addr,
69                    server_options,
70                    shutdown,
71                    channel_rx,
72                ));
73            })
74            .expect("failed to spawn thread for message proxy server");
75        let webviewer_server = WebViewerServer::new(bind_ip, web_port)?;
76
77        let http_web_viewer_url = webviewer_server.server_url();
78
79        let viewer_url =
80            if grpc_server_addr.ip().is_unspecified() || grpc_server_addr.ip().is_loopback() {
81                format!("{http_web_viewer_url}?url=rerun%2Bhttp://localhost:{grpc_port}/proxy")
82            } else {
83                format!("{http_web_viewer_url}?url=rerun%2Bhttp://{grpc_server_addr}/proxy")
84            };
85
86        re_log::info!("Hosting a web-viewer at {viewer_url}");
87        if open_browser {
88            webbrowser::open(&viewer_url).ok();
89        }
90
91        Ok(Self {
92            open_browser,
93            sender: channel_tx,
94            _server_handle: server_handle,
95            server_shutdown_signal,
96            _webviewer_server: webviewer_server,
97        })
98    }
99}
100
101impl crate::sink::LogSink for WebViewerSink {
102    fn send(&self, msg: LogMsg) {
103        if let Err(err) = self.sender.send(msg) {
104            re_log::error_once!("Failed to send log message to web server: {err}");
105        }
106    }
107
108    #[inline]
109    fn flush_blocking(&self, timeout: std::time::Duration) -> Result<(), SinkFlushError> {
110        self.sender
111            .flush_blocking(timeout)
112            .map_err(|err| match err {
113                re_smart_channel::FlushError::Closed => {
114                    SinkFlushError::failed("The viewer is no longer subscribed")
115                }
116                re_smart_channel::FlushError::Timeout => SinkFlushError::Timeout,
117            })
118    }
119
120    fn default_batcher_config(&self) -> ChunkBatcherConfig {
121        // The GRPC sink is typically used for live streams.
122        ChunkBatcherConfig::LOW_LATENCY
123    }
124
125    fn as_any(&self) -> &dyn std::any::Any {
126        self
127    }
128}
129
130impl Drop for WebViewerSink {
131    fn drop(&mut self) {
132        if self.open_browser {
133            // For small scripts that execute fast we run the risk of finishing
134            // before the browser has a chance to connect.
135            // Let's give it a little more time:
136            re_log::info!("Sleeping a short while to give the browser time to connect…");
137            std::thread::sleep(std::time::Duration::from_millis(1000));
138        }
139
140        self.server_shutdown_signal.stop();
141    }
142}
143
144// ----------------------------------------------------------------------------
145
146/// Helper to spawn an instance of the [`WebViewerServer`] and configure a webviewer url.
147#[cfg(feature = "web_viewer")]
148pub struct WebViewerConfig {
149    /// Ip to which the http server is bound.
150    ///
151    /// Defaults to 0.0.0.0
152    pub bind_ip: String,
153
154    /// The port to which the webviewer should bind.
155    ///
156    /// Defaults to [`WebViewerServerPort::AUTO`].
157    pub web_port: WebViewerServerPort,
158
159    /// The urls to which any spawned webviewer should connect.
160    ///
161    /// This url is a redap uri or a hosted RRD file that we retrieve via the message proxy.
162    /// Has no effect if [`Self::open_browser`] is false.
163    pub connect_to: Vec<String>,
164
165    /// If set, adjusts the browser url to force a specific backend, either `webgl` or `webgpu`.
166    ///
167    /// Has no effect if [`Self::open_browser`] is false.
168    pub force_wgpu_backend: Option<String>,
169
170    /// If set, adjusts the browser url to set the video decoder setting, either `auto`, `prefer_software` or `prefer_hardware`.
171    ///
172    /// Has no effect if [`Self::open_browser`] is false.
173    pub video_decoder: Option<String>,
174
175    /// If set to `true`, opens the default browser after hosting the webviewer.
176    ///
177    /// Defaults to `true`.
178    pub open_browser: bool,
179}
180
181#[cfg(feature = "web_viewer")]
182impl Default for WebViewerConfig {
183    fn default() -> Self {
184        Self {
185            bind_ip: "0.0.0.0".to_owned(),
186            web_port: WebViewerServerPort::AUTO,
187            connect_to: Vec::new(),
188            force_wgpu_backend: None,
189            video_decoder: None,
190            open_browser: true,
191        }
192    }
193}
194
195#[cfg(feature = "web_viewer")]
196impl WebViewerConfig {
197    /// Helper to spawn an instance of the [`WebViewerServer`].
198    /// This serves the HTTP+Wasm+JS files that make up the web-viewer.
199    ///
200    /// The server will immediately start listening for incoming connections
201    /// and stop doing so when the returned [`WebViewerServer`] is dropped.
202    ///
203    /// Note: this does not include the gRPC server.
204    pub fn host_web_viewer(self) -> Result<WebViewerServer, WebViewerServerError> {
205        let Self {
206            bind_ip,
207            connect_to,
208            web_port,
209            force_wgpu_backend,
210            video_decoder,
211            open_browser,
212        } = self;
213
214        let web_server = WebViewerServer::new(&bind_ip, web_port)?;
215        let http_web_viewer_url = web_server.server_url();
216
217        let mut viewer_url = http_web_viewer_url;
218
219        let mut first_arg = true;
220        let mut append_argument = |arg| {
221            let arg_delimiter = if first_arg {
222                first_arg = false;
223                "?"
224            } else {
225                "&"
226            };
227            viewer_url = format!("{viewer_url}{arg_delimiter}{arg}");
228        };
229
230        for source_url in connect_to {
231            // TODO(jan): remove after we change from `rerun+http` to `rerun-http`
232            let source_url = percent_encoding::utf8_percent_encode(
233                &source_url,
234                percent_encoding::NON_ALPHANUMERIC,
235            );
236            append_argument(format!("url={source_url}"));
237        }
238        if let Some(force_graphics) = force_wgpu_backend {
239            append_argument(format!("renderer={force_graphics}"));
240        }
241        if let Some(video_decoder) = video_decoder {
242            append_argument(format!("video_decoder={video_decoder}"));
243        }
244
245        re_log::info!("Hosting a web-viewer at {viewer_url}");
246        if open_browser {
247            webbrowser::open(&viewer_url).ok();
248        }
249
250        Ok(web_server)
251    }
252}
253
254// ----------------------------------------------------------------------------
255
256/// Serve log-data over gRPC and serve a Rerun web viewer over HTTP.
257///
258/// If the `open_browser` argument is `true`, your default browser
259/// will be opened with a connected web-viewer.
260///
261/// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli --locked`).
262///
263/// NOTE: you can not connect one `Session` to another.
264///
265/// This function returns immediately.
266#[must_use = "the sink must be kept around to keep the servers running"]
267pub fn new_sink(
268    open_browser: bool,
269    bind_ip: &str,
270    web_port: WebViewerServerPort,
271    grpc_port: u16,
272    server_options: re_grpc_server::ServerOptions,
273) -> Result<Box<dyn crate::sink::LogSink>, WebViewerSinkError> {
274    Ok(Box::new(WebViewerSink::new(
275        open_browser,
276        bind_ip,
277        web_port,
278        grpc_port,
279        server_options,
280    )?))
281}
282
283/// Serves the Rerun Web Viewer (HTML+JS+Wasm) over http.
284///
285/// The server will immediately start listening for incoming connections
286/// and stop doing so when the returned [`WebViewerServer`] is dropped.
287///
288/// Note: this does NOT start a gRPC server.
289/// To start a gRPC server, use [`crate::RecordingStreamBuilder::serve_grpc`] and connect to it
290/// by setting [`WebViewerConfig::connect_to`] to `rerun+http://localhost/proxy`.
291///
292/// Note: this function just calls [`WebViewerConfig::host_web_viewer`] and is here only
293/// for convenience, visibility, and for symmetry with our Python SDK.
294pub fn serve_web_viewer(config: WebViewerConfig) -> Result<WebViewerServer, WebViewerServerError> {
295    config.host_web_viewer()
296}