use re_chunk::ChunkBatcherConfig;
use re_log_types::LogMsg;
use re_web_viewer_server::{WebViewerServer, WebViewerServerError, WebViewerServerPort};
use crate::log_sink::SinkFlushError;
#[derive(thiserror::Error, Debug)]
pub enum WebViewerSinkError {
#[error(transparent)]
WebViewerServer(#[from] WebViewerServerError),
#[error(transparent)]
InvalidAddress(#[from] std::net::AddrParseError),
}
struct WebViewerSink {
open_browser: bool,
sender: re_smart_channel::Sender<LogMsg>,
_server_handle: std::thread::JoinHandle<()>,
server_shutdown_signal: re_grpc_server::shutdown::Signal,
_webviewer_server: WebViewerServer,
}
impl WebViewerSink {
pub fn new(
open_browser: bool,
bind_ip: &str,
web_port: WebViewerServerPort,
grpc_port: u16,
server_options: re_grpc_server::ServerOptions,
) -> Result<Self, WebViewerSinkError> {
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
));
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
re_smart_channel::SmartMessageSource::MessageProxy(uri),
re_smart_channel::SmartChannelSource::Sdk,
);
let server_handle = std::thread::Builder::new()
.name("message_proxy_server".to_owned())
.spawn(move || {
let mut builder = tokio::runtime::Builder::new_current_thread();
builder.enable_all();
let rt = builder.build().expect("failed to build tokio runtime");
rt.block_on(re_grpc_server::serve_from_channel(
grpc_server_addr,
server_options,
shutdown,
channel_rx,
));
})
.expect("failed to spawn thread for message proxy server");
let webviewer_server = WebViewerServer::new(bind_ip, web_port)?;
let http_web_viewer_url = webviewer_server.server_url();
let viewer_url =
if grpc_server_addr.ip().is_unspecified() || grpc_server_addr.ip().is_loopback() {
format!("{http_web_viewer_url}?url=rerun%2Bhttp://localhost:{grpc_port}/proxy")
} else {
format!("{http_web_viewer_url}?url=rerun%2Bhttp://{grpc_server_addr}/proxy")
};
re_log::info!("Hosting a web-viewer at {viewer_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
}
Ok(Self {
open_browser,
sender: channel_tx,
_server_handle: server_handle,
server_shutdown_signal,
_webviewer_server: webviewer_server,
})
}
}
impl crate::sink::LogSink for WebViewerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to web server: {err}");
}
}
#[inline]
fn flush_blocking(&self, timeout: std::time::Duration) -> Result<(), SinkFlushError> {
self.sender
.flush_blocking(timeout)
.map_err(|err| match err {
re_smart_channel::FlushError::Closed => {
SinkFlushError::failed("The viewer is no longer subscribed")
}
re_smart_channel::FlushError::Timeout => SinkFlushError::Timeout,
})
}
fn default_batcher_config(&self) -> ChunkBatcherConfig {
ChunkBatcherConfig::LOW_LATENCY
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl Drop for WebViewerSink {
fn drop(&mut self) {
if self.open_browser {
re_log::info!("Sleeping a short while to give the browser time to connect…");
std::thread::sleep(std::time::Duration::from_millis(1000));
}
self.server_shutdown_signal.stop();
}
}
#[cfg(feature = "web_viewer")]
pub struct WebViewerConfig {
pub bind_ip: String,
pub web_port: WebViewerServerPort,
pub connect_to: Vec<String>,
pub force_wgpu_backend: Option<String>,
pub video_decoder: Option<String>,
pub open_browser: bool,
}
#[cfg(feature = "web_viewer")]
impl Default for WebViewerConfig {
fn default() -> Self {
Self {
bind_ip: "0.0.0.0".to_owned(),
web_port: WebViewerServerPort::AUTO,
connect_to: Vec::new(),
force_wgpu_backend: None,
video_decoder: None,
open_browser: true,
}
}
}
#[cfg(feature = "web_viewer")]
impl WebViewerConfig {
pub fn host_web_viewer(self) -> Result<WebViewerServer, WebViewerServerError> {
let Self {
bind_ip,
connect_to,
web_port,
force_wgpu_backend,
video_decoder,
open_browser,
} = self;
let web_server = WebViewerServer::new(&bind_ip, web_port)?;
let http_web_viewer_url = web_server.server_url();
let mut viewer_url = http_web_viewer_url;
let mut first_arg = true;
let mut append_argument = |arg| {
let arg_delimiter = if first_arg {
first_arg = false;
"?"
} else {
"&"
};
viewer_url = format!("{viewer_url}{arg_delimiter}{arg}");
};
for source_url in connect_to {
let source_url = percent_encoding::utf8_percent_encode(
&source_url,
percent_encoding::NON_ALPHANUMERIC,
);
append_argument(format!("url={source_url}"));
}
if let Some(force_graphics) = force_wgpu_backend {
append_argument(format!("renderer={force_graphics}"));
}
if let Some(video_decoder) = video_decoder {
append_argument(format!("video_decoder={video_decoder}"));
}
re_log::info!("Hosting a web-viewer at {viewer_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
}
Ok(web_server)
}
}
#[must_use = "the sink must be kept around to keep the servers running"]
pub fn new_sink(
open_browser: bool,
bind_ip: &str,
web_port: WebViewerServerPort,
grpc_port: u16,
server_options: re_grpc_server::ServerOptions,
) -> Result<Box<dyn crate::sink::LogSink>, WebViewerSinkError> {
Ok(Box::new(WebViewerSink::new(
open_browser,
bind_ip,
web_port,
grpc_port,
server_options,
)?))
}
pub fn serve_web_viewer(config: WebViewerConfig) -> Result<WebViewerServer, WebViewerServerError> {
config.host_web_viewer()
}