re_data_store/
store_event.rs

1use nohash_hasher::IntMap;
2
3use re_log_types::{DataCell, EntityPath, RowId, StoreId, TimeInt, TimePoint, Timeline};
4use re_types_core::ComponentName;
5
6use crate::StoreGeneration;
7
8// Used all over in docstrings.
9#[allow(unused_imports)]
10use crate::{DataStore, StoreSubscriber};
11
12// ---
13
14/// The atomic unit of change in the Rerun [`DataStore`].
15///
16/// A [`StoreEvent`] describes the changes caused by the addition or deletion of a
17/// [`re_log_types::DataRow`] in the store.
18///
19/// Methods that mutate the [`DataStore`], such as [`DataStore::insert_row`] and [`DataStore::gc`],
20/// return [`StoreEvent`]s that describe the changes.
21/// You can also register your own [`StoreSubscriber`] in order to be notified of changes as soon as they
22/// happen.
23///
24/// Refer to field-level documentation for more details and check out [`StoreDiff`] for a precise
25/// definition of what an event involves.
26#[derive(Debug, Clone, PartialEq)]
27pub struct StoreEvent {
28    /// Which [`DataStore`] sent this event?
29    pub store_id: StoreId,
30
31    /// What was the store's generation when it sent that event?
32    pub store_generation: StoreGeneration,
33
34    /// Monotonically increasing ID of the event.
35    ///
36    /// This is on a per-store basis.
37    ///
38    /// When handling a [`StoreEvent`], if this is the first time you process this [`StoreId`] and
39    /// the associated `event_id` is not `1`, it means you registered late and missed some updates.
40    pub event_id: u64,
41
42    /// What actually changed?
43    ///
44    /// Refer to [`StoreDiff`] for more information.
45    pub diff: StoreDiff,
46}
47
48impl std::ops::Deref for StoreEvent {
49    type Target = StoreDiff;
50
51    #[inline]
52    fn deref(&self) -> &Self::Target {
53        &self.diff
54    }
55}
56
57/// Is it an addition or a deletion?
58///
59/// Reminder: ⚠ Do not confuse _a deletion_ and _a clear_ ⚠.
60///
61/// A deletion is the result of a row being completely removed from the store as part of the
62/// garbage collection process.
63///
64/// A clear, on the other hand, is the act of logging an empty [`re_types_core::ComponentBatch`],
65/// either directly using the logging APIs, or indirectly through the use of a
66/// [`re_types_core::archetypes::Clear`] archetype.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum StoreDiffKind {
69    Addition,
70    Deletion,
71}
72
73impl StoreDiffKind {
74    #[inline]
75    pub fn delta(&self) -> i64 {
76        match self {
77            Self::Addition => 1,
78            Self::Deletion => -1,
79        }
80    }
81}
82
83/// Describes an atomic change in the Rerun [`DataStore`]: a row has been added or deleted.
84///
85/// From a query model standpoint, the [`DataStore`] _always_ operates one row at a time:
86/// - The contents of a row (i.e. its columns) are immutable past insertion, by virtue of
87///   [`RowId`]s being unique and non-reusable.
88/// - Similarly, garbage collection always removes _all the data_ associated with a row in one go:
89///   there cannot be orphaned columns. When a row is gone, all data associated with it is gone too.
90///
91/// Refer to field-level documentation for more information.
92#[derive(Debug, Clone, PartialEq)]
93pub struct StoreDiff {
94    /// Addition or deletion?
95    ///
96    /// The store's internals are opaque and don't necessarily reflect the query model (e.g. there
97    /// might be data in the store that cannot by reached by any query).
98    ///
99    /// A [`StoreDiff`] answers a logical question: "does there exist a query path which can return
100    /// data from that row?".
101    ///
102    /// An event of kind deletion only tells you that, from this point on, no query can return data from that row.
103    /// That doesn't necessarily mean that the data is actually gone, i.e. don't make assumptions of e.g. the size
104    /// in bytes of the store based on these events.
105    /// They are in "query-model space" and are not an accurate representation of what happens in storage space.
106    pub kind: StoreDiffKind,
107
108    /// What's the row's [`RowId`]?
109    ///
110    /// [`RowId`]s are guaranteed to be unique within a single [`DataStore`].
111    ///
112    /// Put another way, the same [`RowId`] can only appear twice in a [`StoreDiff`] event:
113    /// one addition and (optionally) one deletion (in that order!).
114    pub row_id: RowId,
115
116    /// The time data associated with that row.
117    ///
118    /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
119    /// same value for both the insertion and deletion events (if any).
120    ///
121    /// This is not a [`TimePoint`] for performance reasons.
122    //
123    // NOTE: Empirical testing shows that a SmallVec isn't any better in the best case, and can be a
124    // significant performant drop at worst.
125    // pub times: SmallVec<[(Timeline, TimeInt); 5]>, // "5 timelines ought to be enough for anyone"
126    pub times: Vec<(Timeline, TimeInt)>,
127
128    /// The [`EntityPath`] associated with that row.
129    ///
130    /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
131    /// same value for both the insertion and deletion events (if any).
132    pub entity_path: EntityPath,
133
134    /// All the [`DataCell`]s associated with that row.
135    ///
136    /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
137    /// same set of values for both the insertion and deletion events (if any).
138    pub cells: IntMap<ComponentName, DataCell>,
139}
140
141impl StoreDiff {
142    #[inline]
143    pub fn addition(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
144        Self {
145            kind: StoreDiffKind::Addition,
146            row_id: row_id.into(),
147            times: Default::default(),
148            entity_path: entity_path.into(),
149            cells: Default::default(),
150        }
151    }
152
153    #[inline]
154    pub fn deletion(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
155        Self {
156            kind: StoreDiffKind::Deletion,
157            row_id: row_id.into(),
158            times: Default::default(),
159            entity_path: entity_path.into(),
160            cells: Default::default(),
161        }
162    }
163
164    #[inline]
165    pub fn at_timepoint(&mut self, timepoint: impl Into<TimePoint>) -> &mut Self {
166        self.times.extend(timepoint.into());
167        self
168    }
169
170    #[inline]
171    pub fn at_timestamp(
172        &mut self,
173        timeline: impl Into<Timeline>,
174        time: impl Into<TimeInt>,
175    ) -> &mut Self {
176        self.times.push((timeline.into(), time.into()));
177        self
178    }
179
180    #[inline]
181    pub fn with_cells(&mut self, cells: impl IntoIterator<Item = DataCell>) -> &mut Self {
182        self.cells
183            .extend(cells.into_iter().map(|cell| (cell.component_name(), cell)));
184        self
185    }
186
187    #[inline]
188    pub fn timepoint(&self) -> TimePoint {
189        self.times.clone().into_iter().collect()
190    }
191
192    #[inline]
193    pub fn is_static(&self) -> bool {
194        self.times.is_empty()
195    }
196
197    /// `-1` for deletions, `+1` for additions.
198    #[inline]
199    pub fn delta(&self) -> i64 {
200        self.kind.delta()
201    }
202
203    #[inline]
204    pub fn num_components(&self) -> usize {
205        self.cells.len()
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::collections::BTreeMap;
212
213    use re_log_types::{
214        example_components::{MyColor, MyIndex, MyPoint},
215        DataRow, RowId, TimePoint, Timeline,
216    };
217    use re_types_core::Loggable as _;
218
219    use crate::{DataStore, GarbageCollectionOptions};
220
221    use super::*;
222
223    /// A simple store subscriber for test purposes that keeps track of the quantity of data available
224    /// in the store at the lowest level of detail.
225    ///
226    /// The counts represent numbers of rows: e.g. how many unique rows contain this entity path?
227    #[derive(Default, Debug, PartialEq, Eq)]
228    struct GlobalCounts {
229        row_ids: BTreeMap<RowId, i64>,
230        timelines: BTreeMap<Timeline, i64>,
231        entity_paths: BTreeMap<EntityPath, i64>,
232        component_names: BTreeMap<ComponentName, i64>,
233        times: BTreeMap<TimeInt, i64>,
234        num_static: i64,
235    }
236
237    impl GlobalCounts {
238        fn new(
239            row_ids: impl IntoIterator<Item = (RowId, i64)>, //
240            timelines: impl IntoIterator<Item = (Timeline, i64)>, //
241            entity_paths: impl IntoIterator<Item = (EntityPath, i64)>, //
242            component_names: impl IntoIterator<Item = (ComponentName, i64)>, //
243            times: impl IntoIterator<Item = (TimeInt, i64)>, //
244            num_static: i64,
245        ) -> Self {
246            Self {
247                row_ids: row_ids.into_iter().collect(),
248                timelines: timelines.into_iter().collect(),
249                entity_paths: entity_paths.into_iter().collect(),
250                component_names: component_names.into_iter().collect(),
251                times: times.into_iter().collect(),
252                num_static,
253            }
254        }
255    }
256
257    impl GlobalCounts {
258        fn on_events(&mut self, events: &[StoreEvent]) {
259            for event in events {
260                let delta = event.delta();
261
262                *self.row_ids.entry(event.row_id).or_default() += delta;
263                *self
264                    .entity_paths
265                    .entry(event.entity_path.clone())
266                    .or_default() += delta;
267
268                for component_name in event.cells.keys() {
269                    *self.component_names.entry(*component_name).or_default() += delta;
270                }
271
272                if event.is_static() {
273                    self.num_static += delta;
274                } else {
275                    for &(timeline, time) in &event.times {
276                        *self.timelines.entry(timeline).or_default() += delta;
277                        *self.times.entry(time).or_default() += delta;
278                    }
279                }
280            }
281        }
282    }
283
284    #[test]
285    fn store_events() -> anyhow::Result<()> {
286        let mut store = DataStore::new(
287            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
288            Default::default(),
289        );
290
291        let mut view = GlobalCounts::default();
292
293        let timeline_frame = Timeline::new_sequence("frame");
294        let timeline_other = Timeline::new_temporal("other");
295        let timeline_yet_another = Timeline::new_sequence("yet_another");
296
297        let row_id1 = RowId::new();
298        let timepoint1 = TimePoint::from_iter([
299            (timeline_frame, 42),      //
300            (timeline_other, 666),     //
301            (timeline_yet_another, 1), //
302        ]);
303        let entity_path1: EntityPath = "entity_a".into();
304        let row1 = DataRow::from_component_batches(
305            row_id1,
306            timepoint1.clone(),
307            entity_path1.clone(),
308            [&MyIndex::from_iter(0..10) as _],
309        )?;
310
311        view.on_events(&[store.insert_row(&row1)?]);
312
313        similar_asserts::assert_eq!(
314            GlobalCounts::new(
315                [
316                    (row_id1, 1), //
317                ],
318                [
319                    (timeline_frame, 1),
320                    (timeline_other, 1),
321                    (timeline_yet_another, 1),
322                ],
323                [
324                    (entity_path1.clone(), 1), //
325                ],
326                [
327                    (MyIndex::name(), 1), //
328                ],
329                [
330                    (42.try_into().unwrap(), 1), //
331                    (666.try_into().unwrap(), 1),
332                    (1.try_into().unwrap(), 1),
333                ],
334                0,
335            ),
336            view,
337        );
338
339        let row_id2 = RowId::new();
340        let timepoint2 = TimePoint::from_iter([
341            (timeline_frame, 42),      //
342            (timeline_yet_another, 1), //
343        ]);
344        let entity_path2: EntityPath = "entity_b".into();
345        let row2 = {
346            let num_instances = 3;
347            let points: Vec<_> = (0..num_instances)
348                .map(|i| MyPoint::new(0.0, i as f32))
349                .collect();
350            let colors = vec![MyColor::from(0xFF0000FF)];
351            DataRow::from_component_batches(
352                row_id2,
353                timepoint2.clone(),
354                entity_path2.clone(),
355                [&points as _, &colors as _],
356            )?
357        };
358
359        view.on_events(&[store.insert_row(&row2)?]);
360
361        similar_asserts::assert_eq!(
362            GlobalCounts::new(
363                [
364                    (row_id1, 1), //
365                    (row_id2, 1),
366                ],
367                [
368                    (timeline_frame, 2),
369                    (timeline_other, 1),
370                    (timeline_yet_another, 2),
371                ],
372                [
373                    (entity_path1.clone(), 1), //
374                    (entity_path2.clone(), 1), //
375                ],
376                [
377                    (MyIndex::name(), 1), // autogenerated, doesn't change
378                    (MyPoint::name(), 1), //
379                    (MyColor::name(), 1), //
380                ],
381                [
382                    (42.try_into().unwrap(), 2), //
383                    (666.try_into().unwrap(), 1),
384                    (1.try_into().unwrap(), 2),
385                ],
386                0,
387            ),
388            view,
389        );
390
391        let row_id3 = RowId::new();
392        let timepoint3 = TimePoint::default();
393        let row3 = {
394            let num_instances = 6;
395            let colors = vec![MyColor::from(0x00DD00FF); num_instances];
396            DataRow::from_component_batches(
397                row_id3,
398                timepoint3.clone(),
399                entity_path2.clone(),
400                [
401                    &MyIndex::from_iter(0..num_instances as _) as _,
402                    &colors as _,
403                ],
404            )?
405        };
406
407        view.on_events(&[store.insert_row(&row3)?]);
408
409        similar_asserts::assert_eq!(
410            GlobalCounts::new(
411                [
412                    (row_id1, 1), //
413                    (row_id2, 1),
414                    (row_id3, 1),
415                ],
416                [
417                    (timeline_frame, 2),
418                    (timeline_other, 1),
419                    (timeline_yet_another, 2),
420                ],
421                [
422                    (entity_path1.clone(), 1), //
423                    (entity_path2.clone(), 2), //
424                ],
425                [
426                    (MyIndex::name(), 2), //
427                    (MyPoint::name(), 1), //
428                    (MyColor::name(), 2), //
429                ],
430                [
431                    (42.try_into().unwrap(), 2), //
432                    (666.try_into().unwrap(), 1),
433                    (1.try_into().unwrap(), 2),
434                ],
435                1,
436            ),
437            view,
438        );
439
440        let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
441        view.on_events(&events);
442
443        similar_asserts::assert_eq!(
444            GlobalCounts::new(
445                [
446                    (row_id1, 0), //
447                    (row_id2, 0),
448                    (row_id3, 1), // static -- no gc
449                ],
450                [
451                    (timeline_frame, 0),
452                    (timeline_other, 0),
453                    (timeline_yet_another, 0),
454                ],
455                [
456                    (entity_path1.clone(), 0), //
457                    (entity_path2.clone(), 1), // static -- no gc
458                ],
459                [
460                    (MyIndex::name(), 1), // static -- no gc
461                    (MyPoint::name(), 0), //
462                    (MyColor::name(), 1), // static -- no gc
463                ],
464                [
465                    (42.try_into().unwrap(), 0), //
466                    (666.try_into().unwrap(), 0),
467                    (1.try_into().unwrap(), 0),
468                ],
469                1, // static -- no gc
470            ),
471            view,
472        );
473
474        Ok(())
475    }
476}