use nohash_hasher::IntMap;
use re_log_types::{DataCell, EntityPath, RowId, StoreId, TimeInt, TimePoint, Timeline};
use re_types_core::ComponentName;
use crate::StoreGeneration;
#[allow(unused_imports)]
use crate::{DataStore, StoreSubscriber};
#[derive(Debug, Clone, PartialEq)]
pub struct StoreEvent {
pub store_id: StoreId,
pub store_generation: StoreGeneration,
pub event_id: u64,
pub diff: StoreDiff,
}
impl std::ops::Deref for StoreEvent {
type Target = StoreDiff;
fn deref(&self) -> &Self::Target {
&self.diff
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StoreDiffKind {
Addition,
Deletion,
}
impl StoreDiffKind {
#[inline]
pub fn delta(&self) -> i64 {
match self {
StoreDiffKind::Addition => 1,
StoreDiffKind::Deletion => -1,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct StoreDiff {
pub kind: StoreDiffKind,
pub row_id: RowId,
pub timepoint: TimePoint,
pub entity_path: EntityPath,
pub cells: IntMap<ComponentName, DataCell>,
}
impl StoreDiff {
#[inline]
pub fn addition(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
Self {
kind: StoreDiffKind::Addition,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
entity_path: entity_path.into(),
cells: Default::default(),
}
}
#[inline]
pub fn deletion(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
Self {
kind: StoreDiffKind::Deletion,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
entity_path: entity_path.into(),
cells: Default::default(),
}
}
#[inline]
pub fn at_timepoint(mut self, timepoint: impl Into<TimePoint>) -> StoreDiff {
self.timepoint = self.timepoint.union_max(&timepoint.into());
self
}
#[inline]
pub fn at_timestamp(
mut self,
timeline: impl Into<Timeline>,
time: impl Into<TimeInt>,
) -> StoreDiff {
self.timepoint.insert(timeline.into(), time.into());
self
}
#[inline]
pub fn with_cells(mut self, cells: impl IntoIterator<Item = DataCell>) -> Self {
self.cells
.extend(cells.into_iter().map(|cell| (cell.component_name(), cell)));
self
}
#[inline]
pub fn union(&self, rhs: &Self) -> Option<Self> {
let Self {
kind: lhs_kind,
row_id: lhs_row_id,
timepoint: lhs_timepoint,
entity_path: lhs_entity_path,
cells: lhs_cells,
} = self;
let Self {
kind: rhs_kind,
row_id: rhs_row_id,
timepoint: rhs_timepoint,
entity_path: rhs_entity_path,
cells: rhs_cells,
} = rhs;
let same_kind = lhs_kind == rhs_kind;
let same_row_id = lhs_row_id == rhs_row_id;
let same_entity_path = lhs_entity_path == rhs_entity_path;
(same_kind && same_row_id && same_entity_path).then(|| Self {
kind: *lhs_kind,
row_id: *lhs_row_id,
timepoint: lhs_timepoint.clone().union_max(rhs_timepoint),
entity_path: lhs_entity_path.clone(),
cells: [lhs_cells.values(), rhs_cells.values()]
.into_iter()
.flatten()
.map(|cell| (cell.component_name(), cell.clone()))
.collect(),
})
}
#[inline]
pub fn is_timeless(&self) -> bool {
self.timepoint.is_timeless()
}
#[inline]
pub fn delta(&self) -> i64 {
self.kind.delta()
}
#[inline]
pub fn num_components(&self) -> usize {
self.cells.len()
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use re_log_types::{
example_components::{MyColor, MyPoint},
DataRow, RowId, TimePoint, Timeline,
};
use re_types_core::{components::InstanceKey, Loggable as _};
use crate::{DataStore, GarbageCollectionOptions};
use super::*;
#[derive(Default, Debug, PartialEq, Eq)]
struct GlobalCounts {
row_ids: BTreeMap<RowId, i64>,
timelines: BTreeMap<Timeline, i64>,
entity_paths: BTreeMap<EntityPath, i64>,
component_names: BTreeMap<ComponentName, i64>,
times: BTreeMap<TimeInt, i64>,
timeless: i64,
}
impl GlobalCounts {
fn new(
row_ids: impl IntoIterator<Item = (RowId, i64)>, timelines: impl IntoIterator<Item = (Timeline, i64)>, entity_paths: impl IntoIterator<Item = (EntityPath, i64)>, component_names: impl IntoIterator<Item = (ComponentName, i64)>, times: impl IntoIterator<Item = (TimeInt, i64)>, timeless: i64,
) -> Self {
Self {
row_ids: row_ids.into_iter().collect(),
timelines: timelines.into_iter().collect(),
entity_paths: entity_paths.into_iter().collect(),
component_names: component_names.into_iter().collect(),
times: times.into_iter().collect(),
timeless,
}
}
}
impl GlobalCounts {
fn on_events(&mut self, events: &[StoreEvent]) {
for event in events {
let delta = event.delta();
*self.row_ids.entry(event.row_id).or_default() += delta;
*self
.entity_paths
.entry(event.entity_path.clone())
.or_default() += delta;
for component_name in event.cells.keys() {
*self.component_names.entry(*component_name).or_default() += delta;
}
if event.is_timeless() {
self.timeless += delta;
} else {
for (&timeline, &time) in &event.timepoint {
*self.timelines.entry(timeline).or_default() += delta;
*self.times.entry(time).or_default() += delta;
}
}
}
}
}
#[test]
fn store_events() -> anyhow::Result<()> {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let mut view = GlobalCounts::default();
let timeline_frame = Timeline::new_sequence("frame");
let timeline_other = Timeline::new_temporal("other");
let timeline_yet_another = Timeline::new_sequence("yet_another");
let row_id1 = RowId::random();
let timepoint1 = TimePoint::from_iter([
(timeline_frame, 42.into()), (timeline_other, 666.into()), (timeline_yet_another, 1.into()), ]);
let entity_path1: EntityPath = "entity_a".into();
let row1 = DataRow::from_component_batches(
row_id1,
timepoint1.clone(),
entity_path1.clone(),
[&InstanceKey::from_iter(0..10) as _],
)?;
view.on_events(&[store.insert_row(&row1)?]);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), ],
[
(timeline_frame, 1),
(timeline_other, 1),
(timeline_yet_another, 1),
],
[
(entity_path1.clone(), 1), ],
[
(InstanceKey::name(), 1), ],
[
(42.into(), 1), (666.into(), 1),
(1.into(), 1),
],
0,
),
view,
);
let row_id2 = RowId::random();
let timepoint2 = TimePoint::from_iter([
(timeline_frame, 42.into()), (timeline_yet_another, 1.into()), ]);
let entity_path2: EntityPath = "entity_b".into();
let row2 = {
let num_instances = 3;
let points: Vec<_> = (0..num_instances)
.map(|i| MyPoint::new(0.0, i as f32))
.collect();
let colors = vec![MyColor::from(0xFF0000FF)];
DataRow::from_component_batches(
row_id2,
timepoint2.clone(),
entity_path2.clone(),
[&points as _, &colors as _],
)?
};
view.on_events(&[store.insert_row(&row2)?]);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
],
[
(timeline_frame, 2),
(timeline_other, 1),
(timeline_yet_another, 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 1), ],
[
(InstanceKey::name(), 1), (MyPoint::name(), 1), (MyColor::name(), 1), ],
[
(42.into(), 2), (666.into(), 1),
(1.into(), 2),
],
0,
),
view,
);
let row_id3 = RowId::random();
let timepoint3 = TimePoint::timeless();
let row3 = {
let num_instances = 6;
let colors = vec![MyColor::from(0x00DD00FF); num_instances];
DataRow::from_component_batches(
row_id3,
timepoint3.clone(),
entity_path2.clone(),
[
&InstanceKey::from_iter(0..num_instances as _) as _,
&colors as _,
],
)?
};
view.on_events(&[store.insert_row(&row3)?]);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
(row_id3, 1),
],
[
(timeline_frame, 2),
(timeline_other, 1),
(timeline_yet_another, 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 2), ],
[
(InstanceKey::name(), 2), (MyPoint::name(), 1), (MyColor::name(), 2), ],
[
(42.into(), 2), (666.into(), 1),
(1.into(), 2),
],
1,
),
view,
);
view.on_events(&store.gc(&GarbageCollectionOptions::gc_everything()).0);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 0), (row_id2, 0),
(row_id3, 0),
],
[
(timeline_frame, 0),
(timeline_other, 0),
(timeline_yet_another, 0),
],
[
(entity_path1.clone(), 0), (entity_path2.clone(), 0), ],
[
(InstanceKey::name(), 0), (MyPoint::name(), 0), (MyColor::name(), 0), ],
[
(42.into(), 0), (666.into(), 0),
(1.into(), 0),
],
0,
),
view,
);
Ok(())
}
#[test]
fn autogenerated_cluster_keys() -> anyhow::Result<()> {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let timeline_frame = Timeline::new_sequence("frame");
let row1 = DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([(timeline_frame, 42.into())]),
"entity_a".into(),
[&InstanceKey::from_iter(0..10) as _],
)?;
{
let event = store.insert_row(&row1)?;
assert!(event.cells.contains_key(&store.cluster_key()));
let (events, _) = store.gc(&GarbageCollectionOptions::gc_everything());
assert!(events.len() == 1);
assert!(events[0].cells.contains_key(&store.cluster_key()));
}
let row2 = DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([(timeline_frame, 42.into())]),
"entity_b".into(),
[&[MyColor::from(0xAABBCCDD)] as _],
)?;
{
let event = store.insert_row(&row2)?;
assert!(!event.cells.contains_key(&store.cluster_key()));
let (events, _) = store.gc(&GarbageCollectionOptions::gc_everything());
assert!(events.len() == 1);
assert!(!events[0].cells.contains_key(&store.cluster_key()));
}
Ok(())
}
}