use ahash::HashMap;
use itertools::Itertools;
use re_arrow_store::{DataStoreConfig, DataStoreStats};
use re_data_store::StoreDb;
use re_log_types::{ApplicationId, StoreId, StoreKind};
use re_viewer_context::StoreContext;
#[cfg(not(target_arch = "wasm32"))]
use re_arrow_store::StoreGeneration;
#[cfg(not(target_arch = "wasm32"))]
use crate::{
loading::load_file_path,
saving::{default_blueprint_path, save_database_to_file},
};
#[derive(Default)]
pub struct StoreHub {
selected_rec_id: Option<StoreId>,
application_id: Option<ApplicationId>,
blueprint_by_app_id: HashMap<ApplicationId, StoreId>,
store_dbs: StoreBundle,
#[cfg(not(target_arch = "wasm32"))]
blueprint_last_save: HashMap<StoreId, StoreGeneration>,
}
#[derive(Default)]
pub struct StoreHubStats {
pub blueprint_stats: DataStoreStats,
pub blueprint_config: DataStoreConfig,
pub recording_stats: DataStoreStats,
pub recording_config: DataStoreConfig,
}
impl StoreHub {
pub fn add_bundle(&mut self, bundle: StoreBundle) {
self.store_dbs.append(bundle);
}
pub fn read_context(&mut self) -> Option<StoreContext<'_>> {
let blueprint_id = self.application_id.as_ref().map(|app_id| {
self.blueprint_by_app_id
.entry(app_id.clone())
.or_insert_with(|| StoreId::from_string(StoreKind::Blueprint, app_id.clone().0))
});
blueprint_id
.as_ref()
.map(|id| self.store_dbs.blueprint_entry(id));
blueprint_id
.and_then(|id| self.store_dbs.blueprint(id))
.map(|blueprint| {
let recording = self
.selected_rec_id
.as_ref()
.and_then(|id| self.store_dbs.recording(id));
StoreContext {
blueprint,
recording,
alternate_recordings: self
.store_dbs
.recordings()
.collect_vec(),
}
})
}
pub fn set_recording_id(&mut self, recording_id: StoreId) {
if let Some(app_id) = self
.store_dbs
.recording(&recording_id)
.as_ref()
.and_then(|recording| recording.app_id())
{
self.set_app_id(app_id.clone());
}
self.selected_rec_id = Some(recording_id);
}
pub fn set_app_id(&mut self, app_id: ApplicationId) {
#[cfg(not(target_arch = "wasm32"))]
if !self.blueprint_by_app_id.contains_key(&app_id) {
if let Err(err) = self.try_to_load_persisted_blueprint(&app_id) {
re_log::warn!("Failed to load persisted blueprint: {err}");
}
}
self.application_id = Some(app_id);
}
#[inline]
pub fn set_blueprint_for_app_id(&mut self, blueprint_id: StoreId, app_id: ApplicationId) {
re_log::debug!("Switching blueprint for {app_id:?} to {blueprint_id:?}");
self.blueprint_by_app_id.insert(app_id, blueprint_id);
}
pub fn clear_blueprint(&mut self) {
if let Some(app_id) = &self.application_id {
if let Some(blueprint_id) = self.blueprint_by_app_id.remove(app_id) {
self.store_dbs.remove(&blueprint_id);
}
}
}
pub fn store_db_mut(&mut self, store_id: &StoreId) -> &mut StoreDb {
self.store_dbs.store_db_entry(store_id)
}
pub fn purge_empty(&mut self) {
self.store_dbs.purge_empty();
}
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
self.store_dbs.purge_fraction_of_ram(fraction_to_purge);
}
pub fn current_recording(&self) -> Option<&StoreDb> {
self.selected_rec_id
.as_ref()
.and_then(|id| self.store_dbs.recording(id))
}
pub fn contains_recording(&self, id: &StoreId) -> bool {
self.store_dbs.contains_recording(id)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn persist_app_blueprints(&mut self) -> anyhow::Result<()> {
for (app_id, blueprint_id) in &self.blueprint_by_app_id {
let blueprint_path = default_blueprint_path(app_id)?;
re_log::debug!("Saving blueprint for {app_id} to {blueprint_path:?}");
if let Some(blueprint) = self.store_dbs.blueprint(blueprint_id) {
if self.blueprint_last_save.get(blueprint_id) != Some(&blueprint.generation()) {
let file_saver = save_database_to_file(blueprint, blueprint_path, None)?;
file_saver()?;
self.blueprint_last_save
.insert(blueprint_id.clone(), blueprint.generation());
}
}
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn try_to_load_persisted_blueprint(
&mut self,
app_id: &ApplicationId,
) -> anyhow::Result<()> {
re_tracing::profile_function!();
let blueprint_path = default_blueprint_path(app_id)?;
if blueprint_path.exists() {
re_log::debug!("Trying to load blueprint for {app_id} from {blueprint_path:?}",);
let with_notification = false;
if let Some(mut bundle) = load_file_path(&blueprint_path, with_notification) {
for store in bundle.drain_store_dbs() {
if store.store_kind() == StoreKind::Blueprint && store.app_id() == Some(app_id)
{
re_log::debug!(
"Switching blueprint for {app_id:?} to {:?}",
store.store_id(),
);
self.blueprint_by_app_id
.insert(app_id.clone(), store.store_id().clone());
self.blueprint_last_save
.insert(store.store_id().clone(), store.generation());
self.store_dbs.insert_blueprint(store);
} else {
anyhow::bail!(
"Found unexpected store while loading blueprint: {:?}",
store.store_id()
);
}
}
}
}
Ok(())
}
pub fn stats(&self) -> StoreHubStats {
let blueprint = self
.application_id
.as_ref()
.and_then(|app_id| self.blueprint_by_app_id.get(app_id))
.and_then(|blueprint_id| self.store_dbs.blueprint(blueprint_id));
let blueprint_stats = blueprint
.map(|store_db| DataStoreStats::from_store(&store_db.entity_db.data_store))
.unwrap_or_default();
let blueprint_config = blueprint
.map(|store_db| store_db.entity_db.data_store.config().clone())
.unwrap_or_default();
let recording = self
.selected_rec_id
.as_ref()
.and_then(|rec_id| self.store_dbs.recording(rec_id));
let recording_stats = recording
.map(|store_db| DataStoreStats::from_store(&store_db.entity_db.data_store))
.unwrap_or_default();
let recording_config = recording
.map(|store_db| store_db.entity_db.data_store.config().clone())
.unwrap_or_default();
StoreHubStats {
blueprint_stats,
blueprint_config,
recording_stats,
recording_config,
}
}
}
#[derive(Default)]
pub struct StoreBundle {
store_dbs: ahash::HashMap<StoreId, StoreDb>,
}
impl StoreBundle {
pub fn from_rrd(read: impl std::io::Read) -> anyhow::Result<Self> {
re_tracing::profile_function!();
let decoder = re_log_encoding::decoder::Decoder::new(read)?;
let mut slf = Self::default();
for msg in decoder {
let msg = msg?;
slf.store_db_entry(msg.store_id()).add(&msg)?;
}
Ok(slf)
}
pub fn store_db_entry(&mut self, id: &StoreId) -> &mut StoreDb {
self.store_dbs
.entry(id.clone())
.or_insert_with(|| StoreDb::new(id.clone()))
}
pub fn store_dbs(&self) -> impl Iterator<Item = &StoreDb> {
self.store_dbs.values()
}
pub fn store_dbs_mut(&mut self) -> impl Iterator<Item = &mut StoreDb> {
self.store_dbs.values_mut()
}
pub fn append(&mut self, mut other: Self) {
for (id, store_db) in other.store_dbs.drain() {
self.store_dbs.insert(id, store_db);
}
}
pub fn remove(&mut self, id: &StoreId) {
self.store_dbs.remove(id);
}
pub fn contains_recording(&self, id: &StoreId) -> bool {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.store_dbs.contains_key(id)
}
pub fn recording(&self, id: &StoreId) -> Option<&StoreDb> {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.store_dbs.get(id)
}
pub fn recording_mut(&mut self, id: &StoreId) -> Option<&mut StoreDb> {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.store_dbs.get_mut(id)
}
pub fn recording_entry(&mut self, id: &StoreId) -> &mut StoreDb {
debug_assert_eq!(id.kind, StoreKind::Recording);
self.store_dbs
.entry(id.clone())
.or_insert_with(|| StoreDb::new(id.clone()))
}
pub fn insert_recording(&mut self, store_db: StoreDb) {
debug_assert_eq!(store_db.store_kind(), StoreKind::Recording);
self.store_dbs.insert(store_db.store_id().clone(), store_db);
}
pub fn insert_blueprint(&mut self, store_db: StoreDb) {
debug_assert_eq!(store_db.store_kind(), StoreKind::Blueprint);
self.store_dbs.insert(store_db.store_id().clone(), store_db);
}
pub fn recordings(&self) -> impl Iterator<Item = &StoreDb> {
self.store_dbs
.values()
.filter(|log| log.store_kind() == StoreKind::Recording)
}
pub fn blueprints(&self) -> impl Iterator<Item = &StoreDb> {
self.store_dbs
.values()
.filter(|log| log.store_kind() == StoreKind::Blueprint)
}
pub fn contains_blueprint(&self, id: &StoreId) -> bool {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.store_dbs.contains_key(id)
}
pub fn blueprint(&self, id: &StoreId) -> Option<&StoreDb> {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.store_dbs.get(id)
}
pub fn blueprint_mut(&mut self, id: &StoreId) -> Option<&mut StoreDb> {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.store_dbs.get_mut(id)
}
pub fn blueprint_entry(&mut self, id: &StoreId) -> &mut StoreDb {
debug_assert_eq!(id.kind, StoreKind::Blueprint);
self.store_dbs.entry(id.clone()).or_insert_with(|| {
let mut blueprint_db = StoreDb::new(id.clone());
blueprint_db.add_begin_recording_msg(&re_log_types::SetStoreInfo {
row_id: re_log_types::RowId::random(),
info: re_log_types::StoreInfo {
application_id: id.as_str().into(),
store_id: id.clone(),
is_official_example: false,
started: re_log_types::Time::now(),
store_source: re_log_types::StoreSource::Other("viewer".to_owned()),
store_kind: StoreKind::Blueprint,
},
});
blueprint_db
})
}
pub fn purge_empty(&mut self) {
self.store_dbs.retain(|_, store_db| !store_db.is_empty());
}
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();
for store_db in self.store_dbs.values_mut() {
store_db.purge_fraction_of_ram(fraction_to_purge);
}
}
pub fn drain_store_dbs(&mut self) -> impl Iterator<Item = StoreDb> + '_ {
self.store_dbs.drain().map(|(_, store)| store)
}
}