#![allow(clippy::allow_attributes, clippy::mem_forget)]
use std::rc::Rc;
use std::str::FromStr as _;
use ahash::HashMap;
use arrow::array::RecordBatch;
use serde::Deserialize;
use wasm_bindgen::prelude::*;
use re_log::ResultExt as _;
use re_log_types::{TableId, TableMsg};
use re_memory::AccountingAllocator;
use re_types::blueprint::components::PlayState;
use re_viewer_context::{
AsyncRuntimeHandle, SystemCommand, SystemCommandSender as _, TimeControlCommand, open_url,
};
use crate::history::install_popstate_listener;
use crate::web_tools::{Callback, JsResultExt as _, StringOrStringArray};
#[global_allocator]
static GLOBAL: AccountingAllocator<std::alloc::System> =
AccountingAllocator::new(std::alloc::System);
struct Channel {
log_tx: re_smart_channel::Sender<re_log_types::DataSourceMessage>,
table_tx: crossbeam::channel::Sender<re_log_types::TableMsg>,
}
#[wasm_bindgen]
pub struct WebHandle {
runner: eframe::WebRunner,
tx_channels: HashMap<String, Channel>,
connection_registry: re_redap_client::ConnectionRegistryHandle,
app_options: AppOptions,
}
#[wasm_bindgen]
impl WebHandle {
#[allow(
clippy::allow_attributes,
clippy::new_without_default,
clippy::use_self
)] #[wasm_bindgen(constructor)]
pub fn new(app_options: JsValue) -> Result<WebHandle, JsValue> {
re_log::setup_logging();
let app_options: Option<AppOptions> = serde_wasm_bindgen::from_value(app_options)?;
let connection_registry =
re_redap_client::ConnectionRegistry::new_with_stored_credentials();
Ok(Self {
runner: eframe::WebRunner::new(),
tx_channels: Default::default(),
connection_registry,
app_options: app_options.unwrap_or_default(),
})
}
#[wasm_bindgen]
pub async fn start(&self, canvas: JsValue) -> Result<(), wasm_bindgen::JsValue> {
let main_thread_token = crate::MainThreadToken::i_promise_i_am_on_the_main_thread();
let canvas = if let Some(canvas_id) = canvas.as_string() {
let document = web_sys::window()
.ok_or_else(|| "Failed to get window. Are we not in a browser?".to_owned())?
.document()
.ok_or_else(|| {
"Failed to get window.document. Are we not in a browser?".to_owned()
})?;
let element = document
.get_element_by_id(&canvas_id)
.ok_or_else(|| format!("Canvas element '{canvas_id}' not found."))?;
element
.dyn_into::<web_sys::HtmlCanvasElement>()
.map_err(|element| {
format!("Expected a canvas element or canvas id, got {element:?}")
})?
} else {
canvas
.dyn_into::<web_sys::HtmlCanvasElement>()
.map_err(|element| {
format!("Expected a canvas element or canvas id, got {element:?}")
})?
};
let app_options = self.app_options.clone();
let web_options = eframe::WebOptions {
wgpu_options: crate::wgpu_options(app_options.render_backend.as_deref()),
depth_buffer: 0,
dithering: true,
..Default::default()
};
let connection_registry = self.connection_registry.clone();
self.runner
.start(
canvas,
web_options,
Box::new(move |cc| {
Ok(Box::new(create_app(
main_thread_token,
cc,
connection_registry,
app_options,
)?))
}),
)
.await?;
re_log::debug!("Web app started.");
Ok(())
}
#[wasm_bindgen]
pub fn toggle_panel_overrides(&self, value: Option<bool>) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
match value {
Some(value) => app.panel_state_overrides_active = value,
None => app.panel_state_overrides_active ^= true,
}
app.egui_ctx.request_repaint();
}
#[wasm_bindgen]
pub fn override_panel_state(&self, panel: &str, state: Option<String>) -> Result<(), JsValue> {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return Ok(());
};
let panel = Panel::from_str(panel)
.map_err(|err| js_sys::TypeError::new(&format!("invalid panel: {err}")))?;
let state = match state {
Some(state) => Some(
PanelState::from_str(&state)
.map_err(|err| js_sys::TypeError::new(&format!("invalid state: {err}")))?
.into(),
),
None => None,
};
let overrides = &mut app.panel_state_overrides;
match panel {
Panel::Top => overrides.top = state,
Panel::Blueprint => overrides.blueprint = state,
Panel::Selection => overrides.selection = state,
Panel::Time => overrides.time = state,
}
app.egui_ctx.request_repaint();
Ok(())
}
#[wasm_bindgen]
pub fn destroy(&self) {
self.runner.destroy();
}
#[wasm_bindgen]
pub fn has_panicked(&self) -> bool {
self.runner.panic_summary().is_some()
}
#[wasm_bindgen]
pub fn panic_message(&self) -> Option<String> {
self.runner.panic_summary().map(|s| s.message())
}
#[wasm_bindgen]
pub fn panic_callstack(&self) -> Option<String> {
self.runner.panic_summary().map(|s| s.callstack())
}
#[wasm_bindgen]
pub fn add_receiver(&self, url: &str, follow_if_http: Option<bool>) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
match url.parse::<open_url::ViewerOpenUrl>() {
Ok(url) => {
url.open(
&app.egui_ctx,
&open_url::OpenUrlOptions {
follow_if_http: follow_if_http.unwrap_or(false),
select_redap_source_when_loaded: true,
show_loader: false,
},
&app.command_sender,
);
}
Err(err) => {
re_log::warn!("Failed to open URL {url:?}: {err}");
}
}
}
#[wasm_bindgen]
pub fn remove_receiver(&self, url: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
app.msg_receive_set().remove_by_uri(url);
if let Some(store_hub) = app.store_hub.as_mut() {
store_hub.remove_recording_by_uri(url);
}
app.egui_ctx
.request_repaint_after(std::time::Duration::from_millis(10));
}
#[wasm_bindgen]
pub fn open_channel(&mut self, id: &str, channel_name: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
if self.tx_channels.contains_key(id) {
re_log::warn!("Channel with id '{}' already exists.", id);
return;
}
let (log_tx, log_rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::JsChannelPush,
re_smart_channel::SmartChannelSource::JsChannel {
channel_name: channel_name.to_owned(),
},
);
let (table_tx, table_rx) = crossbeam::channel::unbounded();
app.add_log_receiver(log_rx);
app.add_table_receiver(table_rx);
self.tx_channels
.insert(id.to_owned(), Channel { log_tx, table_tx });
}
#[wasm_bindgen]
pub fn close_channel(&mut self, id: &str) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
if let Some(Channel { log_tx, table_tx }) = self.tx_channels.remove(id) {
log_tx
.quit(None)
.warn_on_err_once("Failed to send quit marker");
drop(table_tx);
}
app.egui_ctx
.request_repaint_after(std::time::Duration::from_millis(10));
}
#[wasm_bindgen]
pub fn send_rrd_to_channel(&self, id: &str, data: &[u8]) {
use std::{ops::ControlFlow, sync::Arc};
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
if let Some(channel) = self.tx_channels.get(id) {
let tx = channel.log_tx.clone();
let data: Vec<u8> = data.to_vec();
let egui_ctx = app.egui_ctx.clone();
let ui_waker = Box::new(move || {
egui_ctx.request_repaint_after(std::time::Duration::from_millis(10));
});
re_log_encoding::rrd::stream_from_http::web_decode::decode_rrd(
data,
Arc::new({
move |msg| {
ui_waker();
use re_log_encoding::rrd::stream_from_http::HttpMessage;
match msg {
HttpMessage::LogMsg(msg) => {
if tx.send(msg.into()).is_ok() {
ControlFlow::Continue(())
} else {
re_log::info_once!("Failed to dispatch log message to viewer.");
ControlFlow::Break(())
}
}
HttpMessage::Success => ControlFlow::Continue(()),
HttpMessage::Failure(err) => {
tx.quit(Some(err))
.warn_on_err_once("Failed to send quit marker");
ControlFlow::Break(())
}
}
}
}),
);
}
}
#[wasm_bindgen]
pub fn send_table_to_channel(&self, id: &str, data: &[u8]) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
if let Some(channel) = self.tx_channels.get(id) {
let tx = channel.table_tx.clone();
let cursor = std::io::Cursor::new(data);
let stream_reader = match arrow::ipc::reader::StreamReader::try_new(cursor, None) {
Ok(stream_reader) => stream_reader,
Err(err) => {
re_log::error_once!("Failed to interpret data as IPC-encoded arrow: {err}");
return;
}
};
let mut batches = match stream_reader.collect::<Result<Vec<_>, _>>() {
Ok(batches) => batches,
Err(err) => {
re_log::error_once!("Could not read from IPC stream: {err}");
return;
}
};
if batches.len() != 1 {
re_log::warn_once!("Expected exactly one record batch, got {}", batches.len());
return;
}
let record_batch = batches.remove(0);
let msg = match from_arrow_encoded(record_batch) {
Ok(msg) => msg,
Err(err) => {
re_log::error_once!("Failed to decode Arrow message: {err}");
return;
}
};
let egui_ctx = app.egui_ctx.clone();
match tx.send(msg) {
Ok(_) => egui_ctx.request_repaint_after(std::time::Duration::from_millis(10)),
Err(err) => {
re_log::info_once!("Failed to dispatch log message to viewer: {err}");
}
}
}
}
#[wasm_bindgen]
pub fn get_active_recording_id(&self) -> Option<String> {
let app = self.runner.app_mut::<crate::App>()?;
let hub = app.store_hub.as_ref()?;
let recording = hub.active_recording()?;
Some(recording.store_id().recording_id().to_string())
}
#[wasm_bindgen]
pub fn set_active_recording_id(&self, recording_id: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let Some(hub) = app.store_hub.as_mut() else {
return;
};
let Some(store_id) = store_id_from_recording_id(hub, recording_id) else {
return;
};
hub.set_active_recording(store_id);
app.egui_ctx.request_repaint();
}
#[wasm_bindgen]
pub fn get_active_timeline(&self, recording_id: &str) -> Option<String> {
let mut app = self.runner.app_mut::<crate::App>()?;
let crate::App {
store_hub: Some(hub),
state,
..
} = &mut *app
else {
return None;
};
let store_id = store_id_from_recording_id(hub, recording_id)?;
let time_ctrl = state.time_control(&store_id)?;
Some(time_ctrl.timeline().name().as_str().to_owned())
}
#[wasm_bindgen]
pub fn set_active_timeline(&self, recording_id: &str, timeline_name: &str) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
let Some(hub) = &app.store_hub else {
return;
};
let Some(recording_id) = store_id_from_recording_id(hub, recording_id) else {
return;
};
app.command_sender
.send_system(SystemCommand::TimeControlCommands {
store_id: recording_id,
time_commands: vec![TimeControlCommand::SetActiveTimeline(timeline_name.into())],
});
app.egui_ctx.request_repaint();
}
#[wasm_bindgen]
pub fn get_time_for_timeline(&self, recording_id: &str, timeline_name: &str) -> Option<f64> {
let app = self.runner.app_mut::<crate::App>()?;
let store_id = store_id_from_recording_id(app.store_hub.as_ref()?, recording_id)?;
let time_ctrl = app.state.time_control(&store_id)?;
time_ctrl
.time_for_timeline(timeline_name.into())
.map(|v| v.as_f64())
}
#[wasm_bindgen]
pub fn set_time_for_timeline(&self, recording_id: &str, timeline_name: &str, time: f64) {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return;
};
let Some(hub) = &app.store_hub else {
return;
};
let Some(recording_id) = store_id_from_recording_id(hub, recording_id) else {
return;
};
app.command_sender
.send_system(SystemCommand::TimeControlCommands {
store_id: recording_id,
time_commands: vec![
TimeControlCommand::SetActiveTimeline(timeline_name.into()),
TimeControlCommand::SetTime(time.into()),
],
});
app.egui_ctx.request_repaint();
}
#[wasm_bindgen]
pub fn get_timeline_time_range(&self, recording_id: &str, timeline_name: &str) -> JsValue {
let Some(app) = self.runner.app_mut::<crate::App>() else {
return JsValue::null();
};
let crate::App {
store_hub: Some(hub),
..
} = &*app
else {
return JsValue::null();
};
let Some(store_id) = store_id_from_recording_id(hub, recording_id) else {
return JsValue::null();
};
let Some(recording) = hub.store_bundle().get(&store_id) else {
return JsValue::null();
};
let Some(time_range) = recording.time_range_for(&timeline_name.into()) else {
return JsValue::null();
};
let min = time_range.min().as_f64();
let max = time_range.max().as_f64();
let obj = js_sys::Object::new();
js_sys::Reflect::set(&obj, &"min".into(), &min.into()).ok_or_log_js_error();
js_sys::Reflect::set(&obj, &"max".into(), &max.into()).ok_or_log_js_error();
JsValue::from(obj)
}
#[wasm_bindgen]
pub fn get_playing(&self, recording_id: &str) -> Option<bool> {
let app = self.runner.app_mut::<crate::App>()?;
let crate::App {
store_hub: Some(hub),
state,
..
} = &*app
else {
return None;
};
let store_id = store_id_from_recording_id(hub, recording_id)?;
if !hub.store_bundle().contains(&store_id) {
return None;
}
let time_ctrl = state.time_control(&store_id)?;
Some(time_ctrl.play_state() == PlayState::Playing)
}
#[wasm_bindgen]
pub fn set_playing(&self, recording_id: &str, value: bool) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let crate::App {
store_hub,
egui_ctx,
command_sender,
..
} = &mut *app;
let Some(hub) = store_hub.as_ref() else {
return;
};
let Some(store_id) = store_id_from_recording_id(hub, recording_id) else {
return;
};
let play_state = if value {
PlayState::Playing
} else {
PlayState::Paused
};
command_sender.send_system(SystemCommand::TimeControlCommands {
store_id: store_id.clone(),
time_commands: vec![TimeControlCommand::SetPlayState(play_state)],
});
egui_ctx.request_repaint();
}
#[wasm_bindgen]
pub fn set_credentials(&self, access_token: &str, email: &str) {
let Some(mut app) = self.runner.app_mut::<crate::App>() else {
return;
};
let crate::App {
command_sender,
egui_ctx,
..
} = &mut *app;
command_sender.send_system(SystemCommand::SetAuthCredentials {
access_token: access_token.to_owned(),
email: email.to_owned(),
});
egui_ctx.request_repaint();
}
}
fn store_id_from_recording_id(
store_hub: &re_viewer_context::StoreHub,
recording_id: &str,
) -> Option<re_log_types::StoreId> {
store_hub
.store_bundle()
.recordings()
.map(|entity_db| entity_db.store_id())
.find(|store_id| store_id.recording_id().as_str() == recording_id)
.cloned()
}
#[derive(Clone, Deserialize, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
enum Panel {
Top,
Blueprint,
Selection,
Time,
}
#[derive(Clone, Deserialize, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
enum PanelState {
Hidden,
Collapsed,
Expanded,
}
impl From<PanelState> for re_types::blueprint::components::PanelState {
fn from(value: PanelState) -> Self {
match value {
PanelState::Hidden => Self::Hidden,
PanelState::Collapsed => Self::Collapsed,
PanelState::Expanded => Self::Expanded,
}
}
}
#[derive(Clone, Default, Deserialize)]
pub struct AppOptions {
manifest_url: Option<String>,
render_backend: Option<String>,
video_decoder: Option<String>,
hide_welcome_screen: Option<bool>,
enable_history: Option<bool>,
fallback_token: Option<String>,
viewer_base_url: Option<String>,
notebook: Option<bool>,
url: Option<StringOrStringArray>,
panel_state_overrides: Option<PanelStateOverrides>,
on_viewer_event: Option<Callback>,
fullscreen: Option<FullscreenOptions>,
}
#[derive(Clone, Deserialize)]
pub struct FullscreenOptions {
pub get_state: Callback,
pub on_toggle: Callback,
}
#[derive(Clone, Default, Deserialize)]
pub struct PanelStateOverrides {
top: Option<PanelState>,
blueprint: Option<PanelState>,
selection: Option<PanelState>,
time: Option<PanelState>,
}
impl From<PanelStateOverrides> for crate::app_blueprint::PanelStateOverrides {
fn from(value: PanelStateOverrides) -> Self {
Self {
top: value.top.map(|v| v.into()),
blueprint: value.blueprint.map(|v| v.into()),
selection: value.selection.map(|v| v.into()),
time: value.time.map(|v| v.into()),
}
}
}
fn create_app(
main_thread_token: crate::MainThreadToken,
cc: &eframe::CreationContext<'_>,
connection_registry: re_redap_client::ConnectionRegistryHandle,
app_options: AppOptions,
) -> Result<crate::App, re_renderer::RenderContextError> {
let build_info = re_build_info::build_info!();
let app_env = crate::AppEnvironment::Web {
url: cc.integration_info.web_info.location.url.clone(),
};
let AppOptions {
viewer_base_url,
url,
manifest_url,
render_backend,
video_decoder,
hide_welcome_screen,
panel_state_overrides,
on_viewer_event,
fullscreen,
enable_history,
notebook,
fallback_token,
} = app_options;
if let Some(fallback_token) = fallback_token {
match re_auth::Jwt::try_from(fallback_token) {
Ok(token) => connection_registry.set_fallback_token(token),
Err(err) => {
re_log::warn!("Failed to parse JWT token: {err}");
}
}
}
let enable_history = enable_history.unwrap_or(false);
let video_decoder_hw_acceleration = video_decoder.and_then(|s| match s.parse() {
Err(()) => {
re_log::warn_once!("Failed to parse --video-decoder value: {s}. Ignoring.");
None
}
Ok(hw_accell) => Some(hw_accell),
});
let startup_options = crate::StartupOptions {
memory_limit: re_memory::MemoryLimit {
max_bytes: Some(2_500_000_000),
},
location: Some(cc.integration_info.web_info.location.clone()),
persist_state: true,
is_in_notebook: notebook.unwrap_or(false),
expect_data_soon: None,
force_wgpu_backend: render_backend.clone(),
video_decoder_hw_acceleration,
hide_welcome_screen: hide_welcome_screen.unwrap_or(false),
on_event: on_viewer_event.clone().map(|on_event| {
Rc::new(move |event: crate::ViewerEvent| {
let Some(event) = serde_json::to_string(&event).ok_or_log_error() else {
return;
};
on_event
.call1(&JsValue::from_str(&event))
.ok_or_log_js_error();
}) as crate::event::ViewerEventCallback
}),
fullscreen_options: fullscreen.clone(),
panel_state_overrides: panel_state_overrides.unwrap_or_default().into(),
enable_history,
viewer_base_url,
};
crate::customize_eframe_and_setup_renderer(cc)?;
let mut app = crate::App::new(
main_thread_token,
build_info,
app_env,
startup_options,
cc,
Some(connection_registry),
AsyncRuntimeHandle::from_current_tokio_runtime_or_wasmbindgen().expect("Infallible on web"),
);
if enable_history {
install_popstate_listener(&mut app).ok_or_log_js_error();
}
if let Some(manifest_url) = manifest_url {
app.set_examples_manifest_url(manifest_url);
}
if let Some(urls) = url {
for url in urls.into_inner() {
match url.parse::<open_url::ViewerOpenUrl>() {
Ok(url) => {
url.open(
&app.egui_ctx,
&open_url::OpenUrlOptions {
follow_if_http: false,
select_redap_source_when_loaded: true,
show_loader: true,
},
&app.command_sender,
);
}
Err(err) => {
re_log::warn!("Failed to open URL {url:?}: {err}");
}
}
}
}
Ok(app)
}
#[cfg(feature = "analytics")]
#[allow(clippy::allow_attributes, clippy::unwrap_used)] #[wasm_bindgen]
pub fn set_email(email: String) {
let mut config = re_analytics::Config::load().unwrap().unwrap_or_default();
config.opt_in_metadata.insert("email".into(), email.into());
config.save().unwrap();
}
pub fn from_arrow_encoded(mut data: RecordBatch) -> Result<TableMsg, Box<dyn std::error::Error>> {
let id = data
.schema_metadata_mut()
.remove("__table_id")
.ok_or("encoded record batch is missing `__table_id` metadata.")?;
Ok(TableMsg {
id: TableId::new(id),
data,
})
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::ArrowError;
use arrow::array::{RecordBatch, RecordBatchOptions};
pub fn to_arrow_encoded(table: &TableMsg) -> Result<RecordBatch, ArrowError> {
let current_schema = table.data.schema();
let mut metadata = current_schema.metadata().clone();
metadata.insert("__table_id".to_owned(), table.id.as_str().to_owned());
let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata(
current_schema.fields().clone(),
metadata,
));
RecordBatch::try_new_with_options(
new_schema,
table.data.columns().to_vec(),
&RecordBatchOptions::default(),
)
}
#[test]
fn table_msg_encoded_roundtrip() {
use arrow::{
array::{ArrayRef, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
};
let data = {
let schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("id", DataType::UInt64, false),
Field::new("name", DataType::Utf8, false),
],
Default::default(),
));
let id_array = UInt64Array::from(vec![1, 2, 3, 4, 5]);
let name_array = StringArray::from(vec![
"Alice",
"Bob",
"Charlie",
"Dave",
"http://www.rerun.io",
]);
let arrays: Vec<ArrayRef> = vec![
Arc::new(id_array) as ArrayRef,
Arc::new(name_array) as ArrayRef,
];
ArrowRecordBatch::try_new_with_options(schema, arrays, &RecordBatchOptions::default())
.unwrap()
};
let msg = TableMsg {
id: TableId::new("test123".to_owned()),
data,
};
let encoded = to_arrow_encoded(&msg).expect("to encoded failed");
let decoded = from_arrow_encoded(encoded).expect("from concatenated failed");
assert_eq!(msg, decoded);
}
}