use std::collections::{BTreeMap, BTreeSet};
use ahash::HashSet;
use itertools::Itertools;
use nohash_hasher::IntMap;
use re_arrow_store::{StoreDiff, StoreDiffKind, StoreEvent, StoreSubscriber};
use re_log_types::{
ComponentPath, EntityPath, EntityPathHash, EntityPathPart, RowId, TimeInt, TimePoint, Timeline,
};
use re_types_core::{ComponentName, Loggable};
#[allow(unused_imports)]
use re_arrow_store::DataStore;
use crate::TimeHistogramPerTimeline;
pub struct EntityTree {
pub path: EntityPath,
pub children: BTreeMap<EntityPathPart, EntityTree>,
pub recursive_time_histogram: TimeHistogramPerTimeline,
pub flat_clears: BTreeMap<RowId, TimePoint>,
pub recursive_clears: BTreeMap<RowId, TimePoint>,
pub time_histograms_per_component: BTreeMap<ComponentName, TimeHistogramPerTimeline>,
}
impl StoreSubscriber for EntityTree {
fn name(&self) -> String {
"rerun.store_subscribers.EntityTree".into()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
#[allow(clippy::unimplemented)]
fn on_events(&mut self, _events: &[StoreEvent]) {
unimplemented!(
r"EntityTree view is maintained manually, see `EntityTree::on_store_{{additions|deletions}}`"
);
}
}
#[derive(Default)]
pub struct CompactedStoreEvents {
pub row_ids: HashSet<RowId>,
pub timeful: IntMap<EntityPathHash, IntMap<Timeline, IntMap<ComponentName, Vec<TimeInt>>>>,
pub timeless: IntMap<EntityPathHash, IntMap<ComponentName, u64>>,
}
impl CompactedStoreEvents {
pub fn new(store_events: &[&StoreEvent]) -> Self {
let mut this = CompactedStoreEvents {
row_ids: store_events.iter().map(|event| event.row_id).collect(),
timeful: Default::default(),
timeless: Default::default(),
};
for event in store_events {
if event.is_timeless() {
let per_component = this.timeless.entry(event.entity_path.hash()).or_default();
for component_name in event.cells.keys() {
*per_component.entry(*component_name).or_default() +=
event.delta().unsigned_abs();
}
} else {
for (&timeline, &time) in &event.timepoint {
let per_timeline = this.timeful.entry(event.entity_path.hash()).or_default();
let per_component = per_timeline.entry(timeline).or_default();
for component_name in event.cells.keys() {
per_component.entry(*component_name).or_default().push(time);
}
}
}
}
this
}
}
#[derive(Debug, Clone, Default)]
pub struct ClearCascade {
pub to_be_cleared: BTreeMap<RowId, BTreeMap<EntityPath, (TimePoint, BTreeSet<ComponentPath>)>>,
}
impl ClearCascade {
pub fn is_empty(&self) -> bool {
let Self { to_be_cleared } = self;
to_be_cleared.is_empty()
}
}
impl EntityTree {
pub fn root() -> Self {
Self::new(EntityPath::root(), Default::default())
}
pub fn new(path: EntityPath, recursive_clears: BTreeMap<RowId, TimePoint>) -> Self {
Self {
path,
children: Default::default(),
recursive_time_histogram: Default::default(),
flat_clears: recursive_clears.clone(),
recursive_clears,
time_histograms_per_component: Default::default(),
}
}
pub fn is_leaf(&self) -> bool {
self.children.is_empty()
}
pub fn num_children_and_fields(&self) -> usize {
self.children.len() + self.time_histograms_per_component.len()
}
pub fn num_timeless_messages(&self) -> u64 {
self.recursive_time_histogram.num_timeless_messages()
}
pub fn time_histogram_for_component(
&self,
timeline: &Timeline,
component_name: impl Into<ComponentName>,
) -> Option<&crate::TimeHistogram> {
self.time_histograms_per_component
.get(&component_name.into())
.and_then(|per_timeline| per_timeline.get(timeline))
}
pub fn on_store_additions(&mut self, events: &[StoreEvent]) -> ClearCascade {
let mut clear_cascade = ClearCascade::default();
for event in events.iter().filter(|e| e.kind == StoreDiffKind::Addition) {
let leaf = self.create_subtrees_recursively(
event.diff.entity_path.as_slice(),
0,
&event.diff.timepoint,
event.num_components() as _,
);
leaf.on_added_data(&mut clear_cascade, &event.diff);
}
clear_cascade
}
fn on_added_data(&mut self, clear_cascade: &mut ClearCascade, store_diff: &StoreDiff) {
for (component_name, cell) in &store_diff.cells {
let component_path =
ComponentPath::new(store_diff.entity_path.clone(), *component_name);
let mut pending_clears = vec![];
let per_component = self
.time_histograms_per_component
.entry(component_path.component_name)
.or_insert_with(|| {
pending_clears = self.flat_clears.clone().into_iter().collect_vec();
Default::default()
});
per_component.add(&store_diff.timepoint, 1);
for (pending_row_id, pending_timepoint) in pending_clears {
let per_entity = clear_cascade
.to_be_cleared
.entry(pending_row_id)
.or_default();
let (timepoint, component_paths) = per_entity
.entry(store_diff.entity_path.clone())
.or_default();
*timepoint = pending_timepoint.union_max(timepoint);
component_paths.insert(component_path.clone());
}
use re_types_core::components::ClearIsRecursive;
if cell.component_name() == ClearIsRecursive::name() {
let is_recursive = cell
.try_to_native_mono::<ClearIsRecursive>()
.unwrap()
.map_or(false, |settings| settings.0);
self.on_added_clear(clear_cascade, store_diff, is_recursive);
}
}
}
fn on_added_clear(
&mut self,
clear_cascade: &mut ClearCascade,
store_diff: &StoreDiff,
is_recursive: bool,
) {
use re_types_core::{archetypes::Clear, components::ClearIsRecursive, Archetype as _};
re_tracing::profile_function!();
fn filter_out_clear_components(comp_name: &ComponentName) -> bool {
let is_clear_component = [
Clear::indicator().name(), ClearIsRecursive::name(), ]
.contains(comp_name);
!is_clear_component
}
fn clear_tree(
tree: &mut EntityTree,
is_recursive: bool,
row_id: RowId,
timepoint: TimePoint,
) -> impl IntoIterator<Item = ComponentPath> + '_ {
if is_recursive {
let cur_timepoint = tree.recursive_clears.entry(row_id).or_default();
*cur_timepoint = timepoint.clone().union_max(cur_timepoint);
}
let cur_timepoint = tree.flat_clears.entry(row_id).or_default();
*cur_timepoint = timepoint.union_max(cur_timepoint);
tree.time_histograms_per_component
.keys()
.filter(|comp_name| filter_out_clear_components(comp_name))
.map(|component_name| ComponentPath::new(tree.path.clone(), *component_name))
}
let mut cleared_paths = BTreeSet::new();
if is_recursive {
let mut stack = vec![];
stack.push(self);
while let Some(next) = stack.pop() {
cleared_paths.extend(clear_tree(
next,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
));
stack.extend(next.children.values_mut().collect::<Vec<&mut Self>>());
}
} else {
cleared_paths.extend(clear_tree(
self,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
));
}
for component_path in cleared_paths {
let per_entity = clear_cascade
.to_be_cleared
.entry(store_diff.row_id)
.or_default();
let (timepoint, component_paths) = per_entity
.entry(component_path.entity_path().clone())
.or_default();
*timepoint = store_diff.timepoint.clone().union_max(timepoint);
component_paths.insert(component_path.clone());
}
}
pub fn on_store_deletions(
&mut self,
store_events: &[&StoreEvent],
compacted: &CompactedStoreEvents,
) {
re_tracing::profile_function!();
let Self {
path,
children,
recursive_time_histogram,
flat_clears,
recursive_clears,
time_histograms_per_component: _,
} = self;
{
re_tracing::profile_scope!("flat_clears");
flat_clears.retain(|row_id, _| !compacted.row_ids.contains(row_id));
}
{
re_tracing::profile_scope!("recursive_clears");
recursive_clears.retain(|row_id, _| !compacted.row_ids.contains(row_id));
}
let filtered_events = store_events
.iter()
.filter(|e| &e.entity_path == path || e.entity_path.is_descendant_of(path))
.copied() .collect_vec();
for event in filtered_events.iter().filter(|e| &e.entity_path == path) {
for component_name in event.cells.keys() {
if let Some(histo) = self.time_histograms_per_component.get_mut(component_name) {
histo.remove(&event.timepoint, 1);
if histo.is_empty() {
self.time_histograms_per_component.remove(component_name);
}
}
}
}
for event in &filtered_events {
recursive_time_histogram.remove(&event.timepoint, event.num_components() as _);
}
children.retain(|_, child| {
child.on_store_deletions(&filtered_events, compacted);
child.num_children_and_fields() > 0
});
}
fn create_subtrees_recursively(
&mut self,
full_path: &[EntityPathPart],
depth: usize,
timepoint: &TimePoint,
num_components: u32,
) -> &mut Self {
self.recursive_time_histogram.add(timepoint, num_components);
match full_path.get(depth) {
None => {
self }
Some(component) => self
.children
.entry(component.clone())
.or_insert_with(|| {
EntityTree::new(full_path[..depth + 1].into(), self.recursive_clears.clone())
})
.create_subtrees_recursively(full_path, depth + 1, timepoint, num_components),
}
}
pub fn subtree(&self, path: &EntityPath) -> Option<&Self> {
fn subtree_recursive<'tree>(
this: &'tree EntityTree,
path: &[EntityPathPart],
) -> Option<&'tree EntityTree> {
match path {
[] => Some(this),
[first, rest @ ..] => subtree_recursive(this.children.get(first)?, rest),
}
}
subtree_recursive(self, path.as_slice())
}
pub fn visit_children_recursively(&self, visitor: &mut impl FnMut(&EntityPath)) {
visitor(&self.path);
for child in self.children.values() {
child.visit_children_recursively(visitor);
}
}
}