1use re_chunk::ChunkBatcherConfig;
2use re_log_types::LogMsg;
3use re_web_viewer_server::{WebViewerServer, WebViewerServerError, WebViewerServerPort};
4
5use crate::log_sink::SinkFlushError;
6
7#[derive(thiserror::Error, Debug)]
11pub enum WebViewerSinkError {
12 #[error(transparent)]
14 WebViewerServer(#[from] WebViewerServerError),
15
16 #[error(transparent)]
18 InvalidAddress(#[from] std::net::AddrParseError),
19}
20
21struct WebViewerSink {
25 open_browser: bool,
26
27 sender: re_smart_channel::Sender<LogMsg>,
29
30 _server_handle: std::thread::JoinHandle<()>,
32
33 server_shutdown_signal: re_grpc_server::shutdown::Signal,
35
36 _webviewer_server: WebViewerServer,
38}
39
40impl WebViewerSink {
41 pub fn new(
43 open_browser: bool,
44 bind_ip: &str,
45 web_port: WebViewerServerPort,
46 grpc_port: u16,
47 server_options: re_grpc_server::ServerOptions,
48 ) -> Result<Self, WebViewerSinkError> {
49 let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
50
51 let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
52 let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
53 re_uri::Scheme::RerunHttp,
54 grpc_server_addr,
55 ));
56 let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
57 re_smart_channel::SmartMessageSource::MessageProxy(uri),
58 re_smart_channel::SmartChannelSource::Sdk,
59 );
60 let server_handle = std::thread::Builder::new()
61 .name("message_proxy_server".to_owned())
62 .spawn(move || {
63 let mut builder = tokio::runtime::Builder::new_current_thread();
64 builder.enable_all();
65 let rt = builder.build().expect("failed to build tokio runtime");
66
67 rt.block_on(re_grpc_server::serve_from_channel(
68 grpc_server_addr,
69 server_options,
70 shutdown,
71 channel_rx,
72 ));
73 })
74 .expect("failed to spawn thread for message proxy server");
75 let webviewer_server = WebViewerServer::new(bind_ip, web_port)?;
76
77 let http_web_viewer_url = webviewer_server.server_url();
78
79 let viewer_url =
80 if grpc_server_addr.ip().is_unspecified() || grpc_server_addr.ip().is_loopback() {
81 format!("{http_web_viewer_url}?url=rerun%2Bhttp://localhost:{grpc_port}/proxy")
82 } else {
83 format!("{http_web_viewer_url}?url=rerun%2Bhttp://{grpc_server_addr}/proxy")
84 };
85
86 re_log::info!("Hosting a web-viewer at {viewer_url}");
87 if open_browser {
88 webbrowser::open(&viewer_url).ok();
89 }
90
91 Ok(Self {
92 open_browser,
93 sender: channel_tx,
94 _server_handle: server_handle,
95 server_shutdown_signal,
96 _webviewer_server: webviewer_server,
97 })
98 }
99}
100
101impl crate::sink::LogSink for WebViewerSink {
102 fn send(&self, msg: LogMsg) {
103 if let Err(err) = self.sender.send(msg) {
104 re_log::error_once!("Failed to send log message to web server: {err}");
105 }
106 }
107
108 #[inline]
109 fn flush_blocking(&self, timeout: std::time::Duration) -> Result<(), SinkFlushError> {
110 self.sender
111 .flush_blocking(timeout)
112 .map_err(|err| match err {
113 re_smart_channel::FlushError::Closed => {
114 SinkFlushError::failed("The viewer is no longer subscribed")
115 }
116 re_smart_channel::FlushError::Timeout => SinkFlushError::Timeout,
117 })
118 }
119
120 fn default_batcher_config(&self) -> ChunkBatcherConfig {
121 ChunkBatcherConfig::LOW_LATENCY
123 }
124
125 fn as_any(&self) -> &dyn std::any::Any {
126 self
127 }
128}
129
130impl Drop for WebViewerSink {
131 fn drop(&mut self) {
132 if self.open_browser {
133 re_log::info!("Sleeping a short while to give the browser time to connect…");
137 std::thread::sleep(std::time::Duration::from_millis(1000));
138 }
139
140 self.server_shutdown_signal.stop();
141 }
142}
143
144#[cfg(feature = "web_viewer")]
148pub struct WebViewerConfig {
149 pub bind_ip: String,
153
154 pub web_port: WebViewerServerPort,
158
159 pub connect_to: Vec<String>,
164
165 pub force_wgpu_backend: Option<String>,
169
170 pub video_decoder: Option<String>,
174
175 pub open_browser: bool,
179}
180
181#[cfg(feature = "web_viewer")]
182impl Default for WebViewerConfig {
183 fn default() -> Self {
184 Self {
185 bind_ip: "0.0.0.0".to_owned(),
186 web_port: WebViewerServerPort::AUTO,
187 connect_to: Vec::new(),
188 force_wgpu_backend: None,
189 video_decoder: None,
190 open_browser: true,
191 }
192 }
193}
194
195#[cfg(feature = "web_viewer")]
196impl WebViewerConfig {
197 pub fn host_web_viewer(self) -> Result<WebViewerServer, WebViewerServerError> {
205 let Self {
206 bind_ip,
207 connect_to,
208 web_port,
209 force_wgpu_backend,
210 video_decoder,
211 open_browser,
212 } = self;
213
214 let web_server = WebViewerServer::new(&bind_ip, web_port)?;
215 let http_web_viewer_url = web_server.server_url();
216
217 let mut viewer_url = http_web_viewer_url;
218
219 let mut first_arg = true;
220 let mut append_argument = |arg| {
221 let arg_delimiter = if first_arg {
222 first_arg = false;
223 "?"
224 } else {
225 "&"
226 };
227 viewer_url = format!("{viewer_url}{arg_delimiter}{arg}");
228 };
229
230 for source_url in connect_to {
231 let source_url = percent_encoding::utf8_percent_encode(
233 &source_url,
234 percent_encoding::NON_ALPHANUMERIC,
235 );
236 append_argument(format!("url={source_url}"));
237 }
238 if let Some(force_graphics) = force_wgpu_backend {
239 append_argument(format!("renderer={force_graphics}"));
240 }
241 if let Some(video_decoder) = video_decoder {
242 append_argument(format!("video_decoder={video_decoder}"));
243 }
244
245 re_log::info!("Hosting a web-viewer at {viewer_url}");
246 if open_browser {
247 webbrowser::open(&viewer_url).ok();
248 }
249
250 Ok(web_server)
251 }
252}
253
254#[must_use = "the sink must be kept around to keep the servers running"]
267pub fn new_sink(
268 open_browser: bool,
269 bind_ip: &str,
270 web_port: WebViewerServerPort,
271 grpc_port: u16,
272 server_options: re_grpc_server::ServerOptions,
273) -> Result<Box<dyn crate::sink::LogSink>, WebViewerSinkError> {
274 Ok(Box::new(WebViewerSink::new(
275 open_browser,
276 bind_ip,
277 web_port,
278 grpc_port,
279 server_options,
280 )?))
281}
282
283pub fn serve_web_viewer(config: WebViewerConfig) -> Result<WebViewerServer, WebViewerServerError> {
295 config.host_web_viewer()
296}