re_data_store/store_event.rs
1use nohash_hasher::IntMap;
2
3use re_log_types::{DataCell, EntityPath, RowId, StoreId, TimeInt, TimePoint, Timeline};
4use re_types_core::ComponentName;
5
6use crate::StoreGeneration;
7
8// Used all over in docstrings.
9#[allow(unused_imports)]
10use crate::{DataStore, StoreSubscriber};
11
12// ---
13
14/// The atomic unit of change in the Rerun [`DataStore`].
15///
16/// A [`StoreEvent`] describes the changes caused by the addition or deletion of a
17/// [`re_log_types::DataRow`] in the store.
18///
19/// Methods that mutate the [`DataStore`], such as [`DataStore::insert_row`] and [`DataStore::gc`],
20/// return [`StoreEvent`]s that describe the changes.
21/// You can also register your own [`StoreSubscriber`] in order to be notified of changes as soon as they
22/// happen.
23///
24/// Refer to field-level documentation for more details and check out [`StoreDiff`] for a precise
25/// definition of what an event involves.
26#[derive(Debug, Clone, PartialEq)]
27pub struct StoreEvent {
28 /// Which [`DataStore`] sent this event?
29 pub store_id: StoreId,
30
31 /// What was the store's generation when it sent that event?
32 pub store_generation: StoreGeneration,
33
34 /// Monotonically increasing ID of the event.
35 ///
36 /// This is on a per-store basis.
37 ///
38 /// When handling a [`StoreEvent`], if this is the first time you process this [`StoreId`] and
39 /// the associated `event_id` is not `1`, it means you registered late and missed some updates.
40 pub event_id: u64,
41
42 /// What actually changed?
43 ///
44 /// Refer to [`StoreDiff`] for more information.
45 pub diff: StoreDiff,
46}
47
48impl std::ops::Deref for StoreEvent {
49 type Target = StoreDiff;
50
51 #[inline]
52 fn deref(&self) -> &Self::Target {
53 &self.diff
54 }
55}
56
57/// Is it an addition or a deletion?
58///
59/// Reminder: ⚠ Do not confuse _a deletion_ and _a clear_ ⚠.
60///
61/// A deletion is the result of a row being completely removed from the store as part of the
62/// garbage collection process.
63///
64/// A clear, on the other hand, is the act of logging an empty [`re_types_core::ComponentBatch`],
65/// either directly using the logging APIs, or indirectly through the use of a
66/// [`re_types_core::archetypes::Clear`] archetype.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum StoreDiffKind {
69 Addition,
70 Deletion,
71}
72
73impl StoreDiffKind {
74 #[inline]
75 pub fn delta(&self) -> i64 {
76 match self {
77 Self::Addition => 1,
78 Self::Deletion => -1,
79 }
80 }
81}
82
83/// Describes an atomic change in the Rerun [`DataStore`]: a row has been added or deleted.
84///
85/// From a query model standpoint, the [`DataStore`] _always_ operates one row at a time:
86/// - The contents of a row (i.e. its columns) are immutable past insertion, by virtue of
87/// [`RowId`]s being unique and non-reusable.
88/// - Similarly, garbage collection always removes _all the data_ associated with a row in one go:
89/// there cannot be orphaned columns. When a row is gone, all data associated with it is gone too.
90///
91/// Refer to field-level documentation for more information.
92#[derive(Debug, Clone, PartialEq)]
93pub struct StoreDiff {
94 /// Addition or deletion?
95 ///
96 /// The store's internals are opaque and don't necessarily reflect the query model (e.g. there
97 /// might be data in the store that cannot by reached by any query).
98 ///
99 /// A [`StoreDiff`] answers a logical question: "does there exist a query path which can return
100 /// data from that row?".
101 ///
102 /// An event of kind deletion only tells you that, from this point on, no query can return data from that row.
103 /// That doesn't necessarily mean that the data is actually gone, i.e. don't make assumptions of e.g. the size
104 /// in bytes of the store based on these events.
105 /// They are in "query-model space" and are not an accurate representation of what happens in storage space.
106 pub kind: StoreDiffKind,
107
108 /// What's the row's [`RowId`]?
109 ///
110 /// [`RowId`]s are guaranteed to be unique within a single [`DataStore`].
111 ///
112 /// Put another way, the same [`RowId`] can only appear twice in a [`StoreDiff`] event:
113 /// one addition and (optionally) one deletion (in that order!).
114 pub row_id: RowId,
115
116 /// The time data associated with that row.
117 ///
118 /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
119 /// same value for both the insertion and deletion events (if any).
120 ///
121 /// This is not a [`TimePoint`] for performance reasons.
122 //
123 // NOTE: Empirical testing shows that a SmallVec isn't any better in the best case, and can be a
124 // significant performant drop at worst.
125 // pub times: SmallVec<[(Timeline, TimeInt); 5]>, // "5 timelines ought to be enough for anyone"
126 pub times: Vec<(Timeline, TimeInt)>,
127
128 /// The [`EntityPath`] associated with that row.
129 ///
130 /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
131 /// same value for both the insertion and deletion events (if any).
132 pub entity_path: EntityPath,
133
134 /// All the [`DataCell`]s associated with that row.
135 ///
136 /// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
137 /// same set of values for both the insertion and deletion events (if any).
138 pub cells: IntMap<ComponentName, DataCell>,
139}
140
141impl StoreDiff {
142 #[inline]
143 pub fn addition(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
144 Self {
145 kind: StoreDiffKind::Addition,
146 row_id: row_id.into(),
147 times: Default::default(),
148 entity_path: entity_path.into(),
149 cells: Default::default(),
150 }
151 }
152
153 #[inline]
154 pub fn deletion(row_id: impl Into<RowId>, entity_path: impl Into<EntityPath>) -> Self {
155 Self {
156 kind: StoreDiffKind::Deletion,
157 row_id: row_id.into(),
158 times: Default::default(),
159 entity_path: entity_path.into(),
160 cells: Default::default(),
161 }
162 }
163
164 #[inline]
165 pub fn at_timepoint(&mut self, timepoint: impl Into<TimePoint>) -> &mut Self {
166 self.times.extend(timepoint.into());
167 self
168 }
169
170 #[inline]
171 pub fn at_timestamp(
172 &mut self,
173 timeline: impl Into<Timeline>,
174 time: impl Into<TimeInt>,
175 ) -> &mut Self {
176 self.times.push((timeline.into(), time.into()));
177 self
178 }
179
180 #[inline]
181 pub fn with_cells(&mut self, cells: impl IntoIterator<Item = DataCell>) -> &mut Self {
182 self.cells
183 .extend(cells.into_iter().map(|cell| (cell.component_name(), cell)));
184 self
185 }
186
187 #[inline]
188 pub fn timepoint(&self) -> TimePoint {
189 self.times.clone().into_iter().collect()
190 }
191
192 #[inline]
193 pub fn is_static(&self) -> bool {
194 self.times.is_empty()
195 }
196
197 /// `-1` for deletions, `+1` for additions.
198 #[inline]
199 pub fn delta(&self) -> i64 {
200 self.kind.delta()
201 }
202
203 #[inline]
204 pub fn num_components(&self) -> usize {
205 self.cells.len()
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::collections::BTreeMap;
212
213 use re_log_types::{
214 example_components::{MyColor, MyIndex, MyPoint},
215 DataRow, RowId, TimePoint, Timeline,
216 };
217 use re_types_core::Loggable as _;
218
219 use crate::{DataStore, GarbageCollectionOptions};
220
221 use super::*;
222
223 /// A simple store subscriber for test purposes that keeps track of the quantity of data available
224 /// in the store at the lowest level of detail.
225 ///
226 /// The counts represent numbers of rows: e.g. how many unique rows contain this entity path?
227 #[derive(Default, Debug, PartialEq, Eq)]
228 struct GlobalCounts {
229 row_ids: BTreeMap<RowId, i64>,
230 timelines: BTreeMap<Timeline, i64>,
231 entity_paths: BTreeMap<EntityPath, i64>,
232 component_names: BTreeMap<ComponentName, i64>,
233 times: BTreeMap<TimeInt, i64>,
234 num_static: i64,
235 }
236
237 impl GlobalCounts {
238 fn new(
239 row_ids: impl IntoIterator<Item = (RowId, i64)>, //
240 timelines: impl IntoIterator<Item = (Timeline, i64)>, //
241 entity_paths: impl IntoIterator<Item = (EntityPath, i64)>, //
242 component_names: impl IntoIterator<Item = (ComponentName, i64)>, //
243 times: impl IntoIterator<Item = (TimeInt, i64)>, //
244 num_static: i64,
245 ) -> Self {
246 Self {
247 row_ids: row_ids.into_iter().collect(),
248 timelines: timelines.into_iter().collect(),
249 entity_paths: entity_paths.into_iter().collect(),
250 component_names: component_names.into_iter().collect(),
251 times: times.into_iter().collect(),
252 num_static,
253 }
254 }
255 }
256
257 impl GlobalCounts {
258 fn on_events(&mut self, events: &[StoreEvent]) {
259 for event in events {
260 let delta = event.delta();
261
262 *self.row_ids.entry(event.row_id).or_default() += delta;
263 *self
264 .entity_paths
265 .entry(event.entity_path.clone())
266 .or_default() += delta;
267
268 for component_name in event.cells.keys() {
269 *self.component_names.entry(*component_name).or_default() += delta;
270 }
271
272 if event.is_static() {
273 self.num_static += delta;
274 } else {
275 for &(timeline, time) in &event.times {
276 *self.timelines.entry(timeline).or_default() += delta;
277 *self.times.entry(time).or_default() += delta;
278 }
279 }
280 }
281 }
282 }
283
284 #[test]
285 fn store_events() -> anyhow::Result<()> {
286 let mut store = DataStore::new(
287 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
288 Default::default(),
289 );
290
291 let mut view = GlobalCounts::default();
292
293 let timeline_frame = Timeline::new_sequence("frame");
294 let timeline_other = Timeline::new_temporal("other");
295 let timeline_yet_another = Timeline::new_sequence("yet_another");
296
297 let row_id1 = RowId::new();
298 let timepoint1 = TimePoint::from_iter([
299 (timeline_frame, 42), //
300 (timeline_other, 666), //
301 (timeline_yet_another, 1), //
302 ]);
303 let entity_path1: EntityPath = "entity_a".into();
304 let row1 = DataRow::from_component_batches(
305 row_id1,
306 timepoint1.clone(),
307 entity_path1.clone(),
308 [&MyIndex::from_iter(0..10) as _],
309 )?;
310
311 view.on_events(&[store.insert_row(&row1)?]);
312
313 similar_asserts::assert_eq!(
314 GlobalCounts::new(
315 [
316 (row_id1, 1), //
317 ],
318 [
319 (timeline_frame, 1),
320 (timeline_other, 1),
321 (timeline_yet_another, 1),
322 ],
323 [
324 (entity_path1.clone(), 1), //
325 ],
326 [
327 (MyIndex::name(), 1), //
328 ],
329 [
330 (42.try_into().unwrap(), 1), //
331 (666.try_into().unwrap(), 1),
332 (1.try_into().unwrap(), 1),
333 ],
334 0,
335 ),
336 view,
337 );
338
339 let row_id2 = RowId::new();
340 let timepoint2 = TimePoint::from_iter([
341 (timeline_frame, 42), //
342 (timeline_yet_another, 1), //
343 ]);
344 let entity_path2: EntityPath = "entity_b".into();
345 let row2 = {
346 let num_instances = 3;
347 let points: Vec<_> = (0..num_instances)
348 .map(|i| MyPoint::new(0.0, i as f32))
349 .collect();
350 let colors = vec![MyColor::from(0xFF0000FF)];
351 DataRow::from_component_batches(
352 row_id2,
353 timepoint2.clone(),
354 entity_path2.clone(),
355 [&points as _, &colors as _],
356 )?
357 };
358
359 view.on_events(&[store.insert_row(&row2)?]);
360
361 similar_asserts::assert_eq!(
362 GlobalCounts::new(
363 [
364 (row_id1, 1), //
365 (row_id2, 1),
366 ],
367 [
368 (timeline_frame, 2),
369 (timeline_other, 1),
370 (timeline_yet_another, 2),
371 ],
372 [
373 (entity_path1.clone(), 1), //
374 (entity_path2.clone(), 1), //
375 ],
376 [
377 (MyIndex::name(), 1), // autogenerated, doesn't change
378 (MyPoint::name(), 1), //
379 (MyColor::name(), 1), //
380 ],
381 [
382 (42.try_into().unwrap(), 2), //
383 (666.try_into().unwrap(), 1),
384 (1.try_into().unwrap(), 2),
385 ],
386 0,
387 ),
388 view,
389 );
390
391 let row_id3 = RowId::new();
392 let timepoint3 = TimePoint::default();
393 let row3 = {
394 let num_instances = 6;
395 let colors = vec![MyColor::from(0x00DD00FF); num_instances];
396 DataRow::from_component_batches(
397 row_id3,
398 timepoint3.clone(),
399 entity_path2.clone(),
400 [
401 &MyIndex::from_iter(0..num_instances as _) as _,
402 &colors as _,
403 ],
404 )?
405 };
406
407 view.on_events(&[store.insert_row(&row3)?]);
408
409 similar_asserts::assert_eq!(
410 GlobalCounts::new(
411 [
412 (row_id1, 1), //
413 (row_id2, 1),
414 (row_id3, 1),
415 ],
416 [
417 (timeline_frame, 2),
418 (timeline_other, 1),
419 (timeline_yet_another, 2),
420 ],
421 [
422 (entity_path1.clone(), 1), //
423 (entity_path2.clone(), 2), //
424 ],
425 [
426 (MyIndex::name(), 2), //
427 (MyPoint::name(), 1), //
428 (MyColor::name(), 2), //
429 ],
430 [
431 (42.try_into().unwrap(), 2), //
432 (666.try_into().unwrap(), 1),
433 (1.try_into().unwrap(), 2),
434 ],
435 1,
436 ),
437 view,
438 );
439
440 let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
441 view.on_events(&events);
442
443 similar_asserts::assert_eq!(
444 GlobalCounts::new(
445 [
446 (row_id1, 0), //
447 (row_id2, 0),
448 (row_id3, 1), // static -- no gc
449 ],
450 [
451 (timeline_frame, 0),
452 (timeline_other, 0),
453 (timeline_yet_another, 0),
454 ],
455 [
456 (entity_path1.clone(), 0), //
457 (entity_path2.clone(), 1), // static -- no gc
458 ],
459 [
460 (MyIndex::name(), 1), // static -- no gc
461 (MyPoint::name(), 0), //
462 (MyColor::name(), 1), // static -- no gc
463 ],
464 [
465 (42.try_into().unwrap(), 0), //
466 (666.try_into().unwrap(), 0),
467 (1.try_into().unwrap(), 0),
468 ],
469 1, // static -- no gc
470 ),
471 view,
472 );
473
474 Ok(())
475 }
476}