use ahash::{HashMap, HashMapExt};
use itertools::Itertools;
use re_data_store::StoreGeneration;
use re_data_store::{DataStoreConfig, DataStoreStats};
use re_entity_db::EntityDb;
use re_log_encoding::decoder::VersionPolicy;
use re_log_types::{ApplicationId, StoreId, StoreKind};
use re_query_cache::CachesStats;
use re_viewer_context::{AppOptions, StoreContext};
#[cfg(not(target_arch = "wasm32"))]
use crate::{
loading::load_blueprint_file,
saving::{default_blueprint_path, save_database_to_file},
};
pub struct StoreHub {
selected_rec_id: Option<StoreId>,
selected_application_id: Option<ApplicationId>,
blueprint_by_app_id: HashMap<ApplicationId, StoreId>,
store_bundle: StoreBundle,
was_recording_active: bool,
blueprint_last_save: HashMap<StoreId, StoreGeneration>,
}
#[derive(Default)]
pub struct StoreHubStats {
pub blueprint_stats: DataStoreStats,
pub blueprint_config: DataStoreConfig,
pub recording_stats: DataStoreStats,
pub recording_cached_stats: CachesStats,
pub recording_config: DataStoreConfig,
}
impl StoreHub {
pub fn welcome_screen_app_id() -> ApplicationId {
"<welcome screen>".into()
}
pub fn new() -> Self {
re_tracing::profile_function!();
let mut blueprint_by_app_id = HashMap::new();
let mut store_bundle = StoreBundle::default();
let welcome_screen_store_id = StoreId::from_string(
StoreKind::Blueprint,
Self::welcome_screen_app_id().to_string(),
);
blueprint_by_app_id.insert(
Self::welcome_screen_app_id(),
welcome_screen_store_id.clone(),
);
let welcome_screen_blueprint = store_bundle.blueprint_entry(&welcome_screen_store_id);
crate::app_blueprint::setup_welcome_screen_blueprint(welcome_screen_blueprint);
Self {
selected_rec_id: None,
selected_application_id: None,
blueprint_by_app_id,
store_bundle,
was_recording_active: false,
blueprint_last_save: Default::default(),
}
}
pub fn read_context(&mut self) -> Option<StoreContext<'_>> {
let blueprint_id = self.selected_application_id.as_ref().map(|app_id| {
self.blueprint_by_app_id
.entry(app_id.clone())
.or_insert_with(|| StoreId::from_string(StoreKind::Blueprint, app_id.clone().0))
});
blueprint_id
.as_ref()
.map(|id| self.store_bundle.blueprint_entry(id));
blueprint_id
.and_then(|id| self.store_bundle.blueprint(id))
.map(|blueprint| {
let recording = self
.selected_rec_id
.as_ref()
.and_then(|id| self.store_bundle.recording(id));
StoreContext {
blueprint,
recording,
all_recordings: self.store_bundle.recordings().collect_vec(),
}
})
}
pub fn was_recording_active(&self) -> bool {
self.was_recording_active
}
pub fn set_recording_id(&mut self, recording_id: StoreId) {
if let Some(app_id) = self
.store_bundle
.recording(&recording_id)
.as_ref()
.and_then(|recording| recording.app_id())
{
self.set_app_id(app_id.clone());
}
self.selected_rec_id = Some(recording_id);
self.was_recording_active = true;
}
pub fn remove_recording_id(&mut self, recording_id: &StoreId) {
if self.selected_rec_id.as_ref() == Some(recording_id) {
if let Some(new_selection) = self.store_bundle.find_closest_recording(recording_id) {
self.set_recording_id(new_selection.clone());
} else {
self.selected_application_id = None;
self.selected_rec_id = None;
}
}
self.store_bundle.remove(recording_id);
}
pub fn set_app_id(&mut self, app_id: ApplicationId) {
#[cfg(not(target_arch = "wasm32"))]
if !self.blueprint_by_app_id.contains_key(&app_id) {
if let Err(err) = self.try_to_load_persisted_blueprint(&app_id) {
re_log::warn!("Failed to load persisted blueprint: {err}");
}
}
self.selected_application_id = Some(app_id);
}
pub fn selected_application_id(&self) -> Option<&ApplicationId> {
self.selected_application_id.as_ref()
}
#[inline]
pub fn set_blueprint_for_app_id(&mut self, blueprint_id: StoreId, app_id: ApplicationId) {
re_log::debug!("Switching blueprint for {app_id:?} to {blueprint_id:?}");
self.blueprint_by_app_id.insert(app_id, blueprint_id);
}
pub fn clear_blueprint(&mut self) {
if let Some(app_id) = &self.selected_application_id {
if let Some(blueprint_id) = self.blueprint_by_app_id.remove(app_id) {
self.store_bundle.remove(&blueprint_id);
}
}
}
pub fn insert_recording(&mut self, entity_db: EntityDb) {
self.store_bundle.insert_recording(entity_db);
}
pub fn entity_db_mut(&mut self, store_id: &StoreId) -> &mut EntityDb {
self.store_bundle.entity_db_entry(store_id)
}
pub fn purge_empty(&mut self) {
self.store_bundle.purge_empty();
}
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();
let Some(store_id) = self.store_bundle.find_oldest_modified_recording().cloned() else {
return;
};
let entity_dbs = &mut self.store_bundle.entity_dbs;
let Some(entity_db) = entity_dbs.get_mut(&store_id) else {
if cfg!(debug_assertions) {
unreachable!();
}
return; };
let store_size_before =
entity_db.store().timeless_size_bytes() + entity_db.store().temporal_size_bytes();
entity_db.purge_fraction_of_ram(fraction_to_purge);
let store_size_after =
entity_db.store().timeless_size_bytes() + entity_db.store().temporal_size_bytes();
if entity_db.is_empty() {
self.remove_recording_id(&store_id);
return;
}
if store_size_before == store_size_after && entity_dbs.len() > 1 {
self.remove_recording_id(&store_id);
}
}
pub fn current_recording(&self) -> Option<&EntityDb> {
self.selected_rec_id
.as_ref()
.and_then(|id| self.store_bundle.recording(id))
}
pub fn contains_recording(&self, id: &StoreId) -> bool {
self.store_bundle.contains_recording(id)
}
#[cfg(target_arch = "wasm32")]
pub fn remove_recording_by_uri(&mut self, uri: &str) {
self.store_bundle.entity_dbs.retain(|_, db| {
let Some(data_source) = &db.data_source else {
return true;
};
match data_source {
re_smart_channel::SmartChannelSource::RrdHttpStream { url } => url != uri,
re_smart_channel::SmartChannelSource::WsClient { ws_server_url } => {
ws_server_url != uri
}
_ => true,
}
});
}
pub fn gc_blueprints(&mut self, app_options: &AppOptions) {
re_tracing::profile_function!();
if app_options.blueprint_gc {
for blueprint_id in self.blueprint_by_app_id.values() {
if let Some(blueprint) = self.store_bundle.blueprint_mut(blueprint_id) {
blueprint.gc_everything_but_the_latest_row();
}
}
}
}
#[allow(clippy::unnecessary_wraps)]
pub fn gc_and_persist_app_blueprints(
&mut self,
app_options: &AppOptions,
) -> anyhow::Result<()> {
re_tracing::profile_function!();
for (app_id, blueprint_id) in &self.blueprint_by_app_id {
if let Some(blueprint) = self.store_bundle.blueprint_mut(blueprint_id) {
if self.blueprint_last_save.get(blueprint_id) != Some(&blueprint.generation()) {
if app_options.blueprint_gc {
blueprint.gc_everything_but_the_latest_row();
}
#[cfg(not(target_arch = "wasm32"))]
{
let blueprint_path = default_blueprint_path(app_id)?;
re_log::debug_once!("Saving blueprint for {app_id} to {blueprint_path:?}");
let file_saver = save_database_to_file(blueprint, blueprint_path, None)?;
file_saver()?;
self.blueprint_last_save
.insert(blueprint_id.clone(), blueprint.generation());
}
#[cfg(target_arch = "wasm32")]
{
_ = app_id;
}
}
}
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn try_to_load_persisted_blueprint(
&mut self,
app_id: &ApplicationId,
) -> anyhow::Result<()> {
use crate::blueprint::is_valid_blueprint;
re_tracing::profile_function!();
let blueprint_path = default_blueprint_path(app_id)?;
if blueprint_path.exists() {
re_log::debug!("Trying to load blueprint for {app_id} from {blueprint_path:?}",);
let with_notifications = false;
if let Some(mut bundle) = load_blueprint_file(&blueprint_path, with_notifications) {
for store in bundle.drain_entity_dbs() {
if store.store_kind() == StoreKind::Blueprint && store.app_id() == Some(app_id)
{
if !is_valid_blueprint(&store) {
re_log::warn_once!("Blueprint for {app_id} appears invalid - restoring to default. This is expected if you have just upgraded Rerun versions.");
continue;
}
re_log::debug!(
"Switching blueprint for {app_id:?} to {:?}",
store.store_id(),
);
self.blueprint_by_app_id
.insert(app_id.clone(), store.store_id().clone());
self.blueprint_last_save
.insert(store.store_id().clone(), store.generation());
self.store_bundle.insert_blueprint(store);
} else {
anyhow::bail!(
"Found unexpected store while loading blueprint: {:?}",
store.store_id()
);
}
}
}
}
Ok(())
}
pub fn stats(&self, detailed_cache_stats: bool) -> StoreHubStats {
let blueprint = self
.selected_application_id
.as_ref()
.and_then(|app_id| self.blueprint_by_app_id.get(app_id))
.and_then(|blueprint_id| self.store_bundle.blueprint(blueprint_id));
let blueprint_stats = blueprint
.map(|entity_db| DataStoreStats::from_store(entity_db.store()))
.unwrap_or_default();
let blueprint_config = blueprint
.map(|entity_db| entity_db.store().config().clone())
.unwrap_or_default();
let recording = self
.selected_rec_id
.as_ref()
.and_then(|rec_id| self.store_bundle.recording(rec_id));
let recording_stats = recording
.map(|entity_db| DataStoreStats::from_store(entity_db.store()))
.unwrap_or_default();
let recording_cached_stats = recording
.map(|entity_db| entity_db.query_caches().stats(detailed_cache_stats))
.unwrap_or_default();
let recording_config = recording
.map(|entity_db| entity_db.store().config().clone())
.unwrap_or_default();
StoreHubStats {
blueprint_stats,
blueprint_config,
recording_stats,
recording_cached_stats,
recording_config,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum StoreLoadError {
#[error(transparent)]
Decode(#[from] re_log_encoding::decoder::DecodeError),
#[error(transparent)]
DataStore(#[from] re_entity_db::Error),
}
#[derive(Default)]
pub struct StoreBundle {
entity_dbs: ahash::HashMap<StoreId, EntityDb>,
}
impl StoreBundle {
pub fn from_rrd(
version_policy: VersionPolicy,
read: impl std::io::Read,
) -> Result<Self, StoreLoadError> {
re_tracing::profile_function!();
let decoder = re_log_encoding::decoder::Decoder::new(version_policy, read)?;
let mut slf = Self::default();
for msg in decoder {
let msg = msg?;
slf.entity_db_entry(msg.store_id()).add(&msg)?;
}
Ok(slf)
}
pub fn entity_db_entry(&mut self, id: &StoreId) -> &mut EntityDb {
self.entity_dbs
.entry(id.clone())
.or_insert_with(|| EntityDb::new(id.clone()))
}
pub fn entity_dbs(&self) -> impl Iterator<Item = &EntityDb> {
self.entity_dbs.values()
}
pub fn entity_dbs_mut(&mut self) -> impl Iterator<Item = &mut EntityDb> {
self.entity_dbs.values_mut()
}
pub fn append(&mut self, mut other: Self) {
for (id, entity_db) in other.entity_dbs.drain() {
self.entity_dbs.insert(id, entity_db);
}
}
pub fn remove(&mut self, id: &StoreId) {
self.entity_dbs.remove(id);
}
pub fn find_closest_recording(&self, id: &StoreId) -> Option<&StoreId> {
let mut recs = self.recordings().collect_vec();
recs.sort_by_key(|entity_db| entity_db.sort_key());
let cur_pos = recs.iter().position(|rec| rec.store_id() == id);
if let Some(cur_pos) = cur_pos {
if recs.len() > cur_pos + 1 {
Some(recs[cur_pos + 1].store_id())
} else if cur_pos > 0 {
Some(recs[cur_pos - 1].store_id())
} else {
None
}
} else {
None
}
}
pub fn find_oldest_modified_recording(&self) -> Option<&StoreId> {
let mut entity_dbs = self
.entity_dbs
.values()
.filter(|db| db.store_kind() == StoreKind::Recording)
.collect_vec();
entity_dbs.sort_by_key(|db| db.last_modified_at());
entity_dbs.first().map(|db| db.store_id())
}
pub fn contains_recording(&self, id: &StoreId) -> bool {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.entity_dbs.contains_key(id)
}
pub fn recording(&self, id: &StoreId) -> Option<&EntityDb> {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.entity_dbs.get(id)
}
pub fn recording_mut(&mut self, id: &StoreId) -> Option<&mut EntityDb> {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.entity_dbs.get_mut(id)
}
pub fn recording_entry(&mut self, id: &StoreId) -> &mut EntityDb {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.entity_dbs
.entry(id.clone())
.or_insert_with(|| EntityDb::new(id.clone()))
}
pub fn insert_recording(&mut self, entity_db: EntityDb) {
debug_assert_eq!(entity_db.store_kind(), StoreKind::Recording);
self.entity_dbs
.insert(entity_db.store_id().clone(), entity_db);
}
pub fn insert_blueprint(&mut self, entity_db: EntityDb) {
debug_assert_eq!(entity_db.store_kind(), StoreKind::Blueprint);
self.entity_dbs
.insert(entity_db.store_id().clone(), entity_db);
}
pub fn recordings(&self) -> impl Iterator<Item = &EntityDb> {
self.entity_dbs
.values()
.filter(|log| log.store_kind() == StoreKind::Recording)
}
pub fn blueprints(&self) -> impl Iterator<Item = &EntityDb> {
self.entity_dbs
.values()
.filter(|log| log.store_kind() == StoreKind::Blueprint)
}
pub fn contains_blueprint(&self, id: &StoreId) -> bool {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.entity_dbs.contains_key(id)
}
pub fn blueprint(&self, id: &StoreId) -> Option<&EntityDb> {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.entity_dbs.get(id)
}
pub fn blueprint_mut(&mut self, id: &StoreId) -> Option<&mut EntityDb> {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.entity_dbs.get_mut(id)
}
pub fn blueprint_entry(&mut self, id: &StoreId) -> &mut EntityDb {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.entity_dbs.entry(id.clone()).or_insert_with(|| {
let mut blueprint_db = EntityDb::new(id.clone());
blueprint_db.set_store_info(re_log_types::SetStoreInfo {
row_id: re_log_types::RowId::new(),
info: re_log_types::StoreInfo {
application_id: id.as_str().into(),
store_id: id.clone(),
is_official_example: false,
started: re_log_types::Time::now(),
store_source: re_log_types::StoreSource::Other("viewer".to_owned()),
store_kind: StoreKind::Blueprint,
},
});
blueprint_db
})
}
pub fn purge_empty(&mut self) {
self.entity_dbs.retain(|_, entity_db| !entity_db.is_empty());
}
pub fn drain_entity_dbs(&mut self) -> impl Iterator<Item = EntityDb> + '_ {
self.entity_dbs.drain().map(|(_, store)| store)
}
}