use std::collections::{BTreeMap, BTreeSet};
use itertools::Itertools;
use nohash_hasher::IntMap;
use re_log_types::{
ComponentPath, EntityPath, EntityPathPart, PathOp, RowId, TimeInt, TimePoint, Timeline,
};
use re_types::{ComponentName, Loggable};
#[derive(Default)]
pub struct ActuallyDeleted {
pub timeful: IntMap<Timeline, Vec<TimeInt>>,
pub timeless: u64,
}
impl ActuallyDeleted {
fn append(&mut self, other: Self) {
let Self { timeful, timeless } = other;
for (timeline, mut times) in timeful {
self.timeful.entry(timeline).or_default().append(&mut times);
}
self.timeless += timeless;
}
}
pub type TimeHistogram = re_int_histogram::Int64Histogram;
#[derive(Default)]
pub struct TimeHistogramPerTimeline(BTreeMap<Timeline, TimeHistogram>);
impl TimeHistogramPerTimeline {
pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
self.0.keys()
}
pub fn get(&self, timeline: &Timeline) -> Option<&TimeHistogram> {
self.0.get(timeline)
}
pub fn has_timeline(&self, timeline: &Timeline) -> bool {
self.0.contains_key(timeline)
}
pub fn iter(&self) -> impl ExactSizeIterator<Item = (&Timeline, &TimeHistogram)> {
self.0.iter()
}
pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (&Timeline, &mut TimeHistogram)> {
self.0.iter_mut()
}
pub fn purge(&mut self, deleted: &ActuallyDeleted) {
re_tracing::profile_function!();
for (timeline, histogram) in &mut self.0 {
if let Some(times) = deleted.timeful.get(timeline) {
for &time in times {
histogram.decrement(time.as_i64(), 1);
}
}
}
}
}
pub struct TimesPerTimeline(BTreeMap<Timeline, BTreeSet<TimeInt>>);
impl TimesPerTimeline {
pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
self.0.keys()
}
pub fn get(&self, timeline: &Timeline) -> Option<&BTreeSet<TimeInt>> {
self.0.get(timeline)
}
pub fn get_mut(&mut self, timeline: &Timeline) -> Option<&mut BTreeSet<TimeInt>> {
self.0.get_mut(timeline)
}
pub fn insert(&mut self, timeline: Timeline, time: TimeInt) {
self.0.entry(timeline).or_default().insert(time);
}
pub fn has_timeline(&self, timeline: &Timeline) -> bool {
self.0.contains_key(timeline)
}
pub fn iter(&self) -> impl ExactSizeIterator<Item = (&Timeline, &BTreeSet<TimeInt>)> {
self.0.iter()
}
pub fn iter_mut(
&mut self,
) -> impl ExactSizeIterator<Item = (&Timeline, &mut BTreeSet<TimeInt>)> {
self.0.iter_mut()
}
}
impl Default for TimesPerTimeline {
fn default() -> Self {
Self(BTreeMap::from([(Timeline::log_time(), Default::default())]))
}
}
pub struct EntityTree {
pub path: EntityPath,
pub children: BTreeMap<EntityPathPart, EntityTree>,
pub prefix_times: TimeHistogramPerTimeline,
num_timeless_messages: usize,
pub nonrecursive_clears: BTreeMap<RowId, TimePoint>,
pub recursive_clears: BTreeMap<RowId, TimePoint>,
pub components: BTreeMap<ComponentName, ComponentStats>,
}
impl EntityTree {
pub fn root() -> Self {
Self::new(EntityPath::root(), Default::default())
}
pub fn new(path: EntityPath, recursive_clears: BTreeMap<RowId, TimePoint>) -> Self {
Self {
path,
children: Default::default(),
prefix_times: Default::default(),
num_timeless_messages: 0,
nonrecursive_clears: recursive_clears.clone(),
recursive_clears,
components: Default::default(),
}
}
pub fn is_leaf(&self) -> bool {
self.children.is_empty()
}
pub fn num_children_and_fields(&self) -> usize {
self.children.len() + self.components.len()
}
pub fn num_timeless_messages(&self) -> usize {
self.num_timeless_messages
}
pub fn add_data_msg(
&mut self,
time_point: &TimePoint,
component_path: &ComponentPath,
) -> Vec<(RowId, TimePoint)> {
re_tracing::profile_function!();
let leaf =
self.create_subtrees_recursively(component_path.entity_path.as_slice(), 0, time_point);
let mut pending_clears = vec![];
let fields = leaf
.components
.entry(component_path.component_name)
.or_insert_with(|| {
pending_clears = leaf.nonrecursive_clears.clone().into_iter().collect_vec();
Default::default()
});
fields.add(time_point);
pending_clears
}
pub fn add_path_op(
&mut self,
row_id: RowId,
time_point: &TimePoint,
path_op: &PathOp,
) -> Vec<ComponentPath> {
use re_types::{archetypes::Clear, components::ClearIsRecursive, Archetype as _};
re_tracing::profile_function!();
let entity_path = path_op.entity_path();
let leaf = self.create_subtrees_recursively(entity_path.as_slice(), 0, time_point);
fn filter_out_clear_components(comp_name: &ComponentName) -> bool {
let is_clear_component = [
Clear::indicator().name(), ClearIsRecursive::name(), ]
.contains(comp_name);
!is_clear_component
}
match path_op {
PathOp::ClearComponents(entity_path) => {
leaf.nonrecursive_clears
.entry(row_id)
.or_insert_with(|| time_point.clone());
leaf.components
.keys()
.filter(|comp_name| filter_out_clear_components(comp_name))
.map(|component_name| ComponentPath::new(entity_path.clone(), *component_name))
.collect_vec()
}
PathOp::ClearRecursive(_) => {
let mut results = vec![];
let mut trees = vec![];
trees.push(leaf);
while let Some(next) = trees.pop() {
trees.extend(next.children.values_mut().collect::<Vec<&mut Self>>());
next.recursive_clears
.entry(row_id)
.or_insert_with(|| time_point.clone());
next.nonrecursive_clears
.entry(row_id)
.or_insert_with(|| time_point.clone());
results.extend(
next.components
.keys()
.filter(|comp_name| filter_out_clear_components(comp_name))
.map(|component_name| {
ComponentPath::new(next.path.clone(), *component_name)
}),
);
}
results
}
}
}
fn create_subtrees_recursively(
&mut self,
full_path: &[EntityPathPart],
depth: usize,
time_point: &TimePoint,
) -> &mut Self {
if time_point.is_timeless() {
self.num_timeless_messages += 1;
} else {
for (timeline, time_value) in time_point.iter() {
self.prefix_times
.0
.entry(*timeline)
.or_default()
.increment(time_value.as_i64(), 1);
}
}
match full_path.get(depth) {
None => {
self }
Some(component) => self
.children
.entry(component.clone())
.or_insert_with(|| {
EntityTree::new(full_path[..depth + 1].into(), self.recursive_clears.clone())
})
.create_subtrees_recursively(full_path, depth + 1, time_point),
}
}
pub fn subtree(&self, path: &EntityPath) -> Option<&Self> {
fn subtree_recursive<'tree>(
this: &'tree EntityTree,
path: &[EntityPathPart],
) -> Option<&'tree EntityTree> {
match path {
[] => Some(this),
[first, rest @ ..] => subtree_recursive(this.children.get(first)?, rest),
}
}
subtree_recursive(self, path.as_slice())
}
pub fn purge(
&mut self,
deleted: &re_arrow_store::Deleted,
deleted_by_us_and_children: &mut ActuallyDeleted,
) {
let Self {
path,
children,
prefix_times,
num_timeless_messages,
nonrecursive_clears,
recursive_clears,
components,
} = self;
{
re_tracing::profile_scope!("nonrecursive_clears");
nonrecursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id));
}
{
re_tracing::profile_scope!("recursive_clears");
recursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id));
}
let mut deleted_by_children = ActuallyDeleted::default();
for child in children.values_mut() {
child.purge(deleted, &mut deleted_by_children);
}
{
re_tracing::profile_scope!("ComponentStats");
for (comp_name, stats) in components {
let ComponentStats {
times,
num_timeless_messages,
} = stats;
for (timeline, histogram) in &mut times.0 {
if let Some(times) = deleted
.timeful
.get(&path.hash())
.and_then(|map| map.get(timeline))
.and_then(|map| map.get(comp_name))
{
for &time in times {
histogram.decrement(time.as_i64(), 1);
deleted_by_children
.timeful
.entry(*timeline)
.or_default()
.push(time);
}
}
}
if let Some(num_deleted) = deleted
.timeless
.get(&path.hash())
.and_then(|map| map.get(comp_name))
{
*num_timeless_messages =
num_timeless_messages.saturating_sub(*num_deleted as _);
deleted_by_children.timeless += num_deleted;
}
}
}
{
*num_timeless_messages =
num_timeless_messages.saturating_sub(deleted_by_us_and_children.timeless as _);
prefix_times.purge(&deleted_by_children);
}
deleted_by_us_and_children.append(deleted_by_children);
}
pub fn visit_children_recursively(&self, visitor: &mut impl FnMut(&EntityPath)) {
visitor(&self.path);
for child in self.children.values() {
child.visit_children_recursively(visitor);
}
}
}
#[derive(Default)]
pub struct ComponentStats {
pub times: TimeHistogramPerTimeline,
num_timeless_messages: usize,
}
impl ComponentStats {
pub fn num_timeless_messages(&self) -> usize {
self.num_timeless_messages
}
pub fn add(&mut self, time_point: &TimePoint) {
if time_point.is_timeless() {
self.num_timeless_messages += 1;
} else {
for (timeline, time_value) in time_point.iter() {
self.times
.0
.entry(*timeline)
.or_default()
.increment(time_value.as_i64(), 1);
}
}
}
}