use std::collections::BTreeMap;
use std::sync::{Arc, LazyLock};
use ahash::{HashMap, HashMapExt as _, HashSet};
use anyhow::Context as _;
use itertools::Itertools as _;
use nohash_hasher::IntMap;
use re_byte_size::{MemUsageNode, MemUsageTree, MemUsageTreeCapture, SizeBytes as _};
use re_chunk_store::{
ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreStats, GarbageCollectionOptions,
GarbageCollectionTarget,
};
use re_entity_db::{EntityDb, StoreBundle};
use re_log::debug_assert;
use re_log_channel::LogSource;
use re_log_types::{AbsoluteTimeRange, ApplicationId, StoreId, StoreKind, TableId, TimelinePoint};
use re_query::QueryCachesStats;
use re_sdk_types::archetypes;
use re_sdk_types::components::Timestamp;
use crate::{
BlueprintUndoState, Caches, RecordingOrTable, StorageContext, StoreContext, TableStore,
TableStores,
};
#[derive(Default)]
pub struct StoreHub {
persistence: BlueprintPersistence,
active_recording_or_table: Option<RecordingOrTable>,
active_application_id: Option<ApplicationId>,
default_blueprint_by_app_id: HashMap<ApplicationId, StoreId>,
active_blueprint_by_app_id: HashMap<ApplicationId, StoreId>,
data_source_order: DataSourceOrder,
store_bundle: StoreBundle,
table_stores: HashMap<TableId, TableStore>,
should_enable_heuristics_by_app_id: HashSet<ApplicationId>,
caches_per_recording: HashMap<StoreId, Caches>,
blueprint_last_save: HashMap<StoreId, ChunkStoreGeneration>,
blueprint_last_gc: HashMap<StoreId, ChunkStoreGeneration>,
}
#[derive(Default)]
struct DataSourceOrder {
next: u64,
ordering: HashMap<LogSource, u64>,
}
impl DataSourceOrder {
fn order_of(&self, source: &LogSource) -> u64 {
self.ordering.get(source).copied().unwrap_or(u64::MAX)
}
fn add(&mut self, source: &LogSource) {
if !self.ordering.contains_key(source) {
self.next += 1;
self.ordering.insert(source.clone(), self.next);
}
}
}
pub type BlueprintLoader =
dyn Fn(&ApplicationId) -> anyhow::Result<Option<StoreBundle>> + Send + Sync;
pub type BlueprintSaver = dyn Fn(&ApplicationId, &EntityDb) -> anyhow::Result<()> + Send + Sync;
pub type BlueprintValidator = dyn Fn(&EntityDb) -> bool + Send + Sync;
#[derive(Default)]
pub struct BlueprintPersistence {
pub loader: Option<Box<BlueprintLoader>>,
pub saver: Option<Box<BlueprintSaver>>,
pub validator: Option<Box<BlueprintValidator>>,
}
pub struct StoreStats {
pub store_config: ChunkStoreConfig,
pub store_stats: ChunkStoreStats,
pub query_cache_stats: QueryCachesStats,
pub cache_vram_usage: MemUsageTree,
}
#[derive(Default)]
pub struct StoreHubStats {
pub store_stats: BTreeMap<StoreId, StoreStats>,
pub table_stats: BTreeMap<TableId, u64>,
}
impl StoreHub {
pub fn welcome_screen_app_id() -> ApplicationId {
"Welcome screen".into()
}
fn welcome_screen_blueprint_id() -> StoreId {
StoreId::new(
StoreKind::Blueprint,
Self::welcome_screen_app_id(),
Self::welcome_screen_app_id().to_string(),
)
}
pub fn test_hub() -> Self {
Self::new(
BlueprintPersistence {
loader: None,
saver: None,
validator: None,
},
&|_| {},
)
}
pub fn new(
persistence: BlueprintPersistence,
setup_welcome_screen_blueprint: &dyn Fn(&mut EntityDb),
) -> Self {
re_tracing::profile_function!();
let mut default_blueprint_by_app_id = HashMap::new();
let mut store_bundle = StoreBundle::default();
default_blueprint_by_app_id.insert(
Self::welcome_screen_app_id(),
Self::welcome_screen_blueprint_id(),
);
let welcome_screen_blueprint =
store_bundle.blueprint_entry(&Self::welcome_screen_blueprint_id());
(setup_welcome_screen_blueprint)(welcome_screen_blueprint);
Self {
persistence,
active_recording_or_table: None,
active_application_id: Some(Self::welcome_screen_app_id()),
default_blueprint_by_app_id,
active_blueprint_by_app_id: Default::default(),
store_bundle,
should_enable_heuristics_by_app_id: Default::default(),
data_source_order: Default::default(),
caches_per_recording: Default::default(),
blueprint_last_save: Default::default(),
blueprint_last_gc: Default::default(),
table_stores: TableStores::default(),
}
}
#[inline]
pub fn store_bundle(&self) -> &StoreBundle {
&self.store_bundle
}
#[inline]
pub fn store_bundle_mut(&mut self) -> &mut StoreBundle {
&mut self.store_bundle
}
pub fn read_context(&mut self) -> (StorageContext<'_>, Option<StoreContext<'_>>) {
static EMPTY_ENTITY_DB: LazyLock<EntityDb> =
LazyLock::new(|| EntityDb::new(re_log_types::StoreId::empty_recording()));
static EMPTY_CACHES: LazyLock<Caches> =
LazyLock::new(|| Caches::new(re_log_types::StoreId::empty_recording()));
let store_context = 'ctx: {
let Some(app_id) = self.active_application_id.clone() else {
break 'ctx None;
};
if let Some(blueprint_id) = self.default_blueprint_by_app_id.get(&app_id)
&& !self.store_bundle.contains(blueprint_id)
{
self.default_blueprint_by_app_id.remove(&app_id);
}
if let Some(blueprint_id) = self.active_blueprint_by_app_id.get(&app_id)
&& !self.store_bundle.contains(blueprint_id)
{
self.active_blueprint_by_app_id.remove(&app_id);
}
if !self.active_blueprint_by_app_id.contains_key(&app_id)
&& !self.should_enable_heuristics_by_app_id.contains(&app_id)
&& let Some(blueprint_id) = self.default_blueprint_by_app_id.get(&app_id).cloned()
{
self.set_cloned_blueprint_active_for_app(&blueprint_id)
.unwrap_or_else(|err| {
re_log::warn!("Failed to make blueprint active: {err}");
});
}
let active_blueprint = {
let active_blueprint_id = self
.active_blueprint_by_app_id
.entry(app_id.clone())
.or_insert_with(|| StoreId::default_blueprint(app_id.clone()));
self.store_bundle.blueprint_entry(active_blueprint_id);
let Some(active_blueprint) = self.store_bundle.get(active_blueprint_id) else {
break 'ctx None;
};
active_blueprint
};
let default_blueprint = self
.default_blueprint_by_app_id
.get(&app_id)
.and_then(|id| self.store_bundle.get(id));
let recording = if let Some(id) = &self.active_recording_or_table {
match id {
RecordingOrTable::Recording { store_id } => {
let recording = self.store_bundle.get(store_id);
if recording.is_none() {
self.active_recording_or_table = None;
}
recording
}
RecordingOrTable::Table { .. } => None,
}
} else {
None
};
let should_enable_heuristics = self.should_enable_heuristics_by_app_id.remove(&app_id);
let caches = self.active_caches();
let caches = caches.unwrap_or_else(|| {
if recording.is_some() {
re_log::debug_warn!("Active recording is missing cache");
}
&EMPTY_CACHES
});
Some(StoreContext {
blueprint: active_blueprint,
default_blueprint,
recording: recording.unwrap_or(&EMPTY_ENTITY_DB),
caches,
should_enable_heuristics,
})
};
(
StorageContext {
hub: self,
bundle: &self.store_bundle,
tables: &self.table_stores,
},
store_context,
)
}
pub fn entity_db_mut(&mut self, store_id: &StoreId) -> &mut EntityDb {
self.store_bundle.entry(store_id)
}
pub fn entity_db(&self, store_id: &StoreId) -> Option<&EntityDb> {
self.store_bundle.get(store_id)
}
pub fn data_source_order(&self, data_source: &LogSource) -> u64 {
self.data_source_order.order_of(data_source)
}
pub fn update_data_source_order(&mut self, loading_sources: &[Arc<LogSource>]) {
let keep: HashSet<&LogSource> = loading_sources
.iter()
.map(|source| &**source)
.chain(
self.store_bundle
.recordings()
.filter_map(|db| db.data_source.as_ref()),
)
.collect();
self.data_source_order
.ordering
.retain(|source, _| keep.contains(source));
for source in self
.store_bundle
.recordings()
.filter_map(|db| db.data_source.as_ref())
.chain(loading_sources.iter().map(|s| &**s))
{
self.data_source_order.add(source);
}
}
pub fn insert_entity_db(&mut self, entity_db: EntityDb) {
self.store_bundle.insert(entity_db);
}
pub fn insert_table_store(&mut self, id: TableId, store: TableStore) -> Option<TableStore> {
self.table_stores.insert(id, store)
}
fn remove_store(&mut self, store_id: &StoreId) {
_ = self.caches_per_recording.remove(store_id);
let removed_store = self.store_bundle.remove(store_id);
let Some(removed_store) = removed_store else {
return;
};
match removed_store.store_kind() {
StoreKind::Recording => {
let app_id = removed_store.application_id();
let any_other_recordings_for_this_app = self
.store_bundle
.recordings()
.any(|rec| rec.application_id() == app_id);
if !any_other_recordings_for_this_app {
re_log::trace!("Removed last recording of {app_id}. Closing app.");
self.close_app(app_id);
}
}
StoreKind::Blueprint => {
self.active_blueprint_by_app_id
.retain(|_, id| id != store_id);
self.default_blueprint_by_app_id
.retain(|_, id| id != store_id);
}
}
#[allow(
clippy::allow_attributes,
clippy::disallowed_types,
reason = "If this thread spawn fails due to running on Wasm (or for any other reason),
the error will be ignored and the store will be dropped on this thread."
)]
let (Ok(_) | Err(_)) = std::thread::Builder::new()
.name("drop-removed-store".into())
.spawn(|| {
re_tracing::profile_scope!("drop store");
drop(removed_store);
});
}
pub fn remove(&mut self, entry: &RecordingOrTable) {
match entry {
RecordingOrTable::Recording { store_id } => {
self.remove_store(store_id);
}
RecordingOrTable::Table { table_id } => {
self.table_stores.remove(table_id);
}
}
}
pub fn retain_recordings(&mut self, mut should_retain: impl FnMut(&EntityDb) -> bool) {
let stores_to_remove: Vec<StoreId> = self
.store_bundle
.entity_dbs()
.filter_map(|store| {
if should_retain(store) {
None
} else {
Some(store.store_id().clone())
}
})
.collect();
for store in stores_to_remove {
self.remove(&RecordingOrTable::Recording { store_id: store });
}
}
pub fn find_recording_store_by_source(
&self,
data_source: &re_log_channel::LogSource,
) -> Option<&EntityDb> {
self.store_bundle.entity_dbs().find(|db| {
db.store_id().is_recording()
&& db
.data_source
.as_ref()
.is_some_and(|ds| ds.is_same_ignoring_uri_fragments(data_source))
})
}
pub fn clear_entries(&mut self) {
let mut store_ids_retained = HashSet::default();
self.store_bundle.retain(|db| {
if db.application_id() == &Self::welcome_screen_app_id() {
store_ids_retained.insert(db.store_id().clone());
true
} else {
false
}
});
self.caches_per_recording
.retain(|store_id, _| store_ids_retained.contains(store_id));
self.table_stores.clear();
self.active_application_id = Some(Self::welcome_screen_app_id());
self.active_recording_or_table = None;
}
#[expect(clippy::needless_pass_by_value)]
pub fn set_active_app(&mut self, app_id: ApplicationId) {
if !self.active_blueprint_by_app_id.contains_key(&app_id)
&& let Err(err) = self.try_to_load_persisted_blueprint(&app_id)
{
re_log::warn!("Failed to load persisted blueprint: {err}");
}
if self.active_application_id.as_ref() == Some(&app_id) {
return;
}
if Self::welcome_screen_app_id() == app_id || self.active_application_id.is_none() {
self.active_application_id = Some(app_id.clone());
self.active_recording_or_table = None;
}
for rec in self.store_bundle.recordings().sorted_by_key(|entity_db| {
entity_db.recording_info_property::<Timestamp>(
archetypes::RecordingInfo::descriptor_start_time().component,
)
}) {
if rec.application_id() == &app_id {
self.active_application_id = Some(app_id.clone());
self.active_recording_or_table = Some(RecordingOrTable::Recording {
store_id: rec.store_id().clone(),
});
return;
}
}
}
pub fn close_app(&mut self, app_id: &ApplicationId) {
if let Err(err) = self.save_app_blueprints() {
re_log::warn!("Failed to save blueprints: {err}");
}
let mut store_ids_removed = HashSet::default();
self.store_bundle.retain(|db| {
if db.application_id() == app_id {
store_ids_removed.insert(db.store_id().clone());
false
} else {
true
}
});
self.caches_per_recording
.retain(|store_id, _| !store_ids_removed.contains(store_id));
if self.active_application_id.as_ref() == Some(app_id) {
self.active_application_id = None;
}
self.default_blueprint_by_app_id.remove(app_id);
self.active_blueprint_by_app_id.remove(app_id);
}
#[inline]
pub fn active_app(&self) -> Option<&ApplicationId> {
self.active_application_id.as_ref()
}
#[inline]
pub fn active_store_id(&self) -> Option<&StoreId> {
self.active_recording_or_table.as_ref()?.recording_ref()
}
#[inline]
pub fn active_recording(&self) -> Option<&EntityDb> {
match self.active_recording_or_table.as_ref() {
Some(RecordingOrTable::Recording { store_id }) => self.store_bundle.get(store_id),
_ => None,
}
}
#[inline]
pub fn active_recording_mut(&mut self) -> Option<&mut EntityDb> {
match self.active_recording_or_table.as_mut() {
Some(RecordingOrTable::Recording { store_id }) => self.store_bundle.get_mut(store_id),
_ => None,
}
}
pub fn active_recording_or_table(&self) -> Option<&RecordingOrTable> {
self.active_recording_or_table.as_ref()
}
#[inline]
pub fn active_caches(&self) -> Option<&Caches> {
let store_id = self.active_store_id()?;
self.caches_per_recording.get(store_id)
}
pub fn caches_for_store(&self, store_id: &StoreId) -> Option<&Caches> {
self.caches_per_recording.get(store_id)
}
pub fn set_active_recording_id(&mut self, recording_id: StoreId) {
debug_assert!(recording_id.is_recording());
if let Some(app_id) = self
.store_bundle
.get(&recording_id)
.as_ref()
.map(|recording| recording.application_id().clone())
{
self.set_active_app(app_id);
}
self.active_recording_or_table = Some(RecordingOrTable::Recording {
store_id: recording_id.clone(),
});
_ = self
.caches_per_recording
.entry(recording_id.clone())
.or_insert_with(|| Caches::new(recording_id));
}
pub fn set_active_recording(&mut self, store_id: StoreId) {
match store_id.kind() {
StoreKind::Recording => self.set_active_recording_id(store_id),
StoreKind::Blueprint => {
re_log::debug!("Tried to activate the blueprint {store_id:?} as a recording.");
}
}
}
pub fn default_blueprint_id_for_app(&self, app_id: &ApplicationId) -> Option<&StoreId> {
self.default_blueprint_by_app_id.get(app_id)
}
pub fn default_blueprint_for_app(&self, app_id: &ApplicationId) -> Option<&EntityDb> {
let id = self.default_blueprint_id_for_app(app_id)?;
self.store_bundle.get(id)
}
#[inline]
pub fn set_default_blueprint_for_app(&mut self, blueprint_id: &StoreId) -> anyhow::Result<()> {
let blueprint = self
.store_bundle
.get(blueprint_id)
.context("missing blueprint")?;
if let Some(validator) = &self.persistence.validator
&& !(validator)(blueprint)
{
anyhow::bail!("Blueprint failed validation");
}
re_log::trace!(
"Switching default blueprint for '{:?}' to '{:?}'",
blueprint_id.application_id(),
blueprint_id
);
self.default_blueprint_by_app_id
.insert(blueprint_id.application_id().clone(), blueprint_id.clone());
Ok(())
}
pub fn active_blueprint_id(&self) -> Option<&StoreId> {
let app_id = self.active_app()?;
self.active_blueprint_id_for_app(app_id)
}
pub fn active_blueprint(&self) -> Option<&EntityDb> {
let id = self.active_blueprint_id()?;
self.store_bundle.get(id)
}
pub fn active_blueprint_id_for_app(&self, app_id: &ApplicationId) -> Option<&StoreId> {
self.active_blueprint_by_app_id.get(app_id)
}
pub fn active_blueprint_for_app(&self, app_id: &ApplicationId) -> Option<&EntityDb> {
let id = self.active_blueprint_id_for_app(app_id)?;
self.store_bundle.get(id)
}
pub fn set_cloned_blueprint_active_for_app(
&mut self,
blueprint_id: &StoreId,
) -> anyhow::Result<()> {
let app_id = blueprint_id.application_id().clone();
let new_id = StoreId::random(StoreKind::Blueprint, app_id.clone());
re_log::trace!(
"Cloning '{blueprint_id:?}' as '{new_id:?}' the active blueprint for '{app_id}'"
);
let blueprint = self
.store_bundle
.get(blueprint_id)
.context("missing blueprint")?;
if let Some(validator) = &self.persistence.validator
&& !(validator)(blueprint)
{
anyhow::bail!("Blueprint failed validation");
}
let new_blueprint = blueprint.clone_with_new_id(new_id.clone())?;
self.store_bundle.insert(new_blueprint);
self.active_blueprint_by_app_id.insert(app_id, new_id);
Ok(())
}
pub fn is_active_blueprint(&self, blueprint_id: &StoreId) -> bool {
self.active_blueprint_by_app_id
.values()
.any(|id| id == blueprint_id)
}
pub fn clear_active_blueprint(&mut self) {
if let Some(app_id) = &self.active_application_id
&& let Some(blueprint_id) = self.active_blueprint_by_app_id.remove(app_id)
{
re_log::debug!("Clearing blueprint for {app_id}: {blueprint_id:?}");
self.remove_store(&blueprint_id);
}
}
pub fn clear_active_blueprint_and_generate(&mut self) {
self.clear_active_blueprint();
if let Some(app_id) = self.active_app() {
self.should_enable_heuristics_by_app_id
.insert(app_id.clone());
}
}
pub fn clear_all_cloned_blueprints(&mut self) {
self.retain_recordings(|db| match db.store_kind() {
StoreKind::Recording => true,
StoreKind::Blueprint => db.cloned_from().is_none(),
});
}
pub fn purge_fraction_of_ram(
&mut self,
fraction_to_purge: f32,
time_cursor_for: &dyn Fn(&StoreId) -> Option<TimelinePoint>,
) -> u64 {
re_tracing::profile_function!();
#[expect(clippy::iter_over_hash_type)]
for cache in self.caches_per_recording.values_mut() {
cache.purge_memory();
}
let active_store_id = self.active_store_id().cloned();
let background_recording_ids = self
.store_bundle
.recordings()
.map(|db| db.store_id().clone())
.filter(|store_id| Some(store_id) != active_store_id.as_ref())
.collect::<Vec<_>>();
let mut num_bytes_freed = 0;
let background_target = GarbageCollectionTarget::Everything;
for store_id in &background_recording_ids {
let time_cursor = time_cursor_for(store_id);
num_bytes_freed += self.gc_store(background_target, store_id, time_cursor);
}
let target = GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _);
if num_bytes_freed == 0
&& let Some(active_store_id) = active_store_id
{
let time_cursor = time_cursor_for(&active_store_id);
num_bytes_freed += self.gc_store(target, &active_store_id, time_cursor);
}
if num_bytes_freed == 0 {
let mut closed_count = 0_usize;
for store_id in &background_recording_ids {
let Some(recording) = self.store_bundle.get(store_id) else {
continue;
};
if self.active_store_id() == Some(store_id) {
continue;
}
num_bytes_freed += recording.total_size_bytes();
self.remove_store(store_id);
closed_count += 1;
}
if closed_count > 0 {
re_log::warn!(
"Closed {} to stay within memory limit",
re_format::format_plural_s(closed_count, "background recording")
);
}
}
num_bytes_freed
}
fn gc_store(
&mut self,
target: GarbageCollectionTarget,
store_id: &StoreId,
time_cursor: Option<TimelinePoint>,
) -> u64 {
re_tracing::profile_function!();
let store_bundle = &mut self.store_bundle;
let Some(entity_db) = store_bundle.get_mut(store_id) else {
if cfg!(debug_assertions) {
unreachable!();
}
return 0; };
let store_size_before = entity_db.total_size_bytes();
let store_events = entity_db.gc_with_target(target, time_cursor);
let store_size_after = entity_db.total_size_bytes();
if let Some(caches) = self.caches_per_recording.get_mut(store_id) {
caches.on_store_events(&store_events, entity_db);
}
store_size_before.saturating_sub(store_size_after)
}
pub fn remove_recording_by_uri(&mut self, uri: &str) {
self.retain_recordings(|db| {
let Some(data_source) = &db.data_source else {
return true;
};
match data_source {
re_log_channel::LogSource::HttpStream { url, .. } => url != uri,
re_log_channel::LogSource::RedapGrpcStream { uri: redap_uri, .. } => {
redap_uri.to_string() != uri
}
_ => true,
}
});
}
pub fn gc_blueprints(&mut self, undo_state: &HashMap<StoreId, BlueprintUndoState>) {
re_tracing::profile_function!();
for blueprint_id in self
.active_blueprint_by_app_id
.values()
.chain(self.default_blueprint_by_app_id.values())
{
if let Some(blueprint) = self.store_bundle.get_mut(blueprint_id) {
if self.blueprint_last_gc.get(blueprint_id) == Some(&blueprint.generation()) {
continue; }
let mut protected_time_ranges = IntMap::default();
if let Some(undo) = undo_state.get(blueprint_id)
&& let Some(time) = undo.oldest_undo_point()
{
protected_time_ranges.insert(
crate::blueprint_timeline(),
AbsoluteTimeRange::new(time, re_chunk::TimeInt::MAX),
);
}
let store_events = blueprint.gc(&GarbageCollectionOptions {
target: GarbageCollectionTarget::Everything,
protect_latest: 1, time_budget: re_entity_db::DEFAULT_GC_TIME_BUDGET,
protected_time_ranges,
protected_chunks: HashSet::default(),
furthest_from: None,
perform_deep_deletions: true,
});
if !store_events.is_empty() {
re_log::debug!("Garbage-collected blueprint store");
if let Some(caches) = self.caches_per_recording.get_mut(blueprint_id) {
caches.on_store_events(&store_events, blueprint);
}
}
self.blueprint_last_gc
.insert(blueprint_id.clone(), blueprint.generation());
}
}
}
pub fn begin_frame_caches(&mut self) {
self.caches_per_recording.retain(|store_id, caches| {
if self.store_bundle.contains(store_id) {
caches.begin_frame();
true } else {
false }
});
}
pub fn save_app_blueprints(&mut self) -> anyhow::Result<()> {
let Some(saver) = &self.persistence.saver else {
return Ok(());
};
re_tracing::profile_function!();
#[expect(clippy::iter_over_hash_type)]
for (app_id, blueprint_id) in &self.active_blueprint_by_app_id {
if app_id == &Self::welcome_screen_app_id() {
continue; }
let Some(blueprint) = self.store_bundle.get_mut(blueprint_id) else {
re_log::debug!("Failed to find blueprint {blueprint_id:?}.");
continue;
};
if self.blueprint_last_save.get(blueprint_id) == Some(&blueprint.generation()) {
continue; }
(saver)(app_id, blueprint)?;
self.blueprint_last_save
.insert(blueprint_id.clone(), blueprint.generation());
}
Ok(())
}
fn try_to_load_persisted_blueprint(&mut self, app_id: &ApplicationId) -> anyhow::Result<()> {
re_tracing::profile_function!();
if let Some(loader) = &self.persistence.loader
&& let Some(bundle) = (loader)(app_id)?
{
self.load_blueprint_store(bundle, app_id)?;
}
Ok(())
}
pub fn load_blueprint_store(
&mut self,
mut blueprint_bundle: StoreBundle,
app_id: &ApplicationId,
) -> anyhow::Result<()> {
re_tracing::profile_function!();
for store in blueprint_bundle.drain_entity_dbs() {
match store.store_kind() {
StoreKind::Recording => {
anyhow::bail!(
"Found a recording in a blueprint file: {:?}",
store.store_id()
);
}
StoreKind::Blueprint => {}
}
if store.application_id() != app_id {
anyhow::bail!("Found app_id {}; expected {app_id}", store.application_id());
}
re_log::debug!(
"Activating new blueprint {:?} for {app_id}.",
store.store_id(),
);
self.active_blueprint_by_app_id
.insert(app_id.clone(), store.store_id().clone());
self.blueprint_last_save
.insert(store.store_id().clone(), store.generation());
self.store_bundle.insert(store);
}
Ok(())
}
pub fn stats(&self) -> StoreHubStats {
re_tracing::profile_function!();
let Self {
persistence: _,
active_recording_or_table: _,
active_application_id: _,
default_blueprint_by_app_id: _,
active_blueprint_by_app_id: _,
store_bundle,
table_stores,
data_source_order: _,
should_enable_heuristics_by_app_id: _,
caches_per_recording,
blueprint_last_save: _,
blueprint_last_gc: _,
} = self;
let mut store_stats = BTreeMap::new();
for store in store_bundle.entity_dbs() {
let store_id = store.store_id();
let engine = store.storage_engine();
let cache_vram_usage = caches_per_recording
.get(store_id)
.map(|caches| caches.vram_usage())
.unwrap_or_default();
store_stats.insert(
store_id.clone(),
StoreStats {
store_config: engine.store().config().clone(),
store_stats: engine.store().stats(),
query_cache_stats: engine.cache().stats(),
cache_vram_usage,
},
);
}
let mut table_stats = BTreeMap::new();
#[expect(clippy::iter_over_hash_type)]
for (table_id, table_store) in table_stores {
table_stats.insert(table_id.clone(), table_store.total_size_bytes());
}
StoreHubStats {
store_stats,
table_stats,
}
}
}
impl MemUsageTreeCapture for StoreHub {
#[expect(clippy::iter_over_hash_type)]
fn capture_mem_usage_tree(&self) -> MemUsageTree {
re_tracing::profile_function!();
let Self {
store_bundle,
table_stores,
caches_per_recording,
persistence: _,
active_recording_or_table: _,
active_application_id: _,
default_blueprint_by_app_id: _,
active_blueprint_by_app_id: _,
data_source_order: _,
should_enable_heuristics_by_app_id: _,
blueprint_last_save: _,
blueprint_last_gc: _,
} = self;
let mut node = MemUsageNode::new();
let mut all_store_ids = std::collections::BTreeSet::new();
for entity_db in store_bundle.entity_dbs() {
all_store_ids.insert(entity_db.store_id().clone());
}
for store_id in caches_per_recording.keys() {
all_store_ids.insert(store_id.clone());
}
let mut stores_node = MemUsageNode::new();
for store_id in all_store_ids {
let recording_id = format!("{store_id:?}");
let mut recording_node = MemUsageNode::new();
if let Some(entity_db) = store_bundle.get(&store_id) {
recording_node.add("EntityDb", entity_db.capture_mem_usage_tree());
}
if let Some(caches) = caches_per_recording.get(&store_id) {
recording_node.add("Caches", caches.capture_mem_usage_tree());
}
stores_node.add(recording_id, recording_node.into_tree());
}
node.add("stores", stores_node.into_tree());
let mut table_stores_node = MemUsageNode::new();
for (table_id, table_store) in table_stores {
let name = format!("{table_id:?}");
table_stores_node.add(name, MemUsageTree::Bytes(table_store.total_size_bytes()));
}
node.add("TableStores", table_stores_node.into_tree());
node.into_tree()
}
}