re_data_store/
store_subscriber.rs

1use parking_lot::RwLock;
2
3use crate::{DataStore, StoreEvent};
4
5// ---
6
7// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
8type SharedStoreSubscriber = RwLock<Box<dyn StoreSubscriber>>;
9
10/// A [`StoreSubscriber`] subscribes to atomic changes from all [`DataStore`]s through [`StoreEvent`]s.
11///
12/// [`StoreSubscriber`]s can be used to build both secondary indices and trigger systems.
13//
14// TODO(#4204): StoreSubscriber should require SizeBytes so they can be part of memstats.
15pub trait StoreSubscriber: std::any::Any + Send + Sync {
16    /// Arbitrary name for the subscriber.
17    ///
18    /// Does not need to be unique.
19    fn name(&self) -> String;
20
21    /// Workaround for downcasting support, simply return `self`:
22    /// ```ignore
23    /// fn as_any(&self) -> &dyn std::any::Any {
24    ///     self
25    /// }
26    /// ```
27    fn as_any(&self) -> &dyn std::any::Any;
28
29    /// Workaround for downcasting support, simply return `self`:
30    /// ```ignore
31    /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
32    ///     self
33    /// }
34    /// ```
35    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
36
37    /// The core of this trait: get notified of changes happening in all [`DataStore`]s.
38    ///
39    /// This will be called automatically by the [`DataStore`] itself if the subscriber has been
40    /// registered: [`DataStore::register_subscriber`].
41    /// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case.
42    ///
43    /// ## Example
44    ///
45    /// ```ignore
46    /// fn on_events(&mut self, events: &[StoreEvent]) {
47    ///     use re_data_store::StoreDiffKind;
48    ///     for event in events {
49    ///         match event.kind {
50    ///             StoreDiffKind::Addition => println!("Row added: {}", event.row_id),
51    ///             StoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
52    ///         }
53    ///     }
54    /// }
55    /// ```
56    fn on_events(&mut self, events: &[StoreEvent]);
57}
58
59/// All registered [`StoreSubscriber`]s.
60static SUBSCRIBERS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreSubscriber>>> =
61    once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
62
63#[derive(Debug, Clone, Copy)]
64pub struct StoreSubscriberHandle(u32);
65
66impl DataStore {
67    /// Registers a [`StoreSubscriber`] so it gets automatically notified when data gets added and/or
68    /// removed to/from a [`DataStore`].
69    ///
70    /// Refer to [`StoreEvent`]'s documentation for more information about these events.
71    ///
72    /// ## Scope
73    ///
74    /// Registered [`StoreSubscriber`]s are global scope: they get notified of all events from all
75    /// existing [`DataStore`]s, including [`DataStore`]s created after the subscriber was registered.
76    ///
77    /// Use [`StoreEvent::store_id`] to identify the source of an event.
78    ///
79    /// ## Late registration
80    ///
81    /// Subscribers must be registered before a store gets created to guarantee that no events
82    /// were missed.
83    ///
84    /// [`StoreEvent::event_id`] can be used to identify missing events.
85    ///
86    /// ## Ordering
87    ///
88    /// The order in which registered subscribers are notified is undefined and will likely become
89    /// concurrent in the future.
90    ///
91    /// If you need a specific order across multiple subscribers, embed them into an orchestrating
92    /// subscriber.
93    //
94    // TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
95    pub fn register_subscriber(subscriber: Box<dyn StoreSubscriber>) -> StoreSubscriberHandle {
96        let mut subscribers = SUBSCRIBERS.write();
97        subscribers.push(RwLock::new(subscriber));
98        StoreSubscriberHandle(subscribers.len() as u32 - 1)
99    }
100
101    /// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
102    ///
103    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
104    pub fn with_subscriber<V: StoreSubscriber, T, F: FnMut(&V) -> T>(
105        StoreSubscriberHandle(handle): StoreSubscriberHandle,
106        mut f: F,
107    ) -> Option<T> {
108        let subscribers = SUBSCRIBERS.read();
109        subscribers.get(handle as usize).and_then(|subscriber| {
110            let subscriber = subscriber.read();
111            subscriber.as_any().downcast_ref::<V>().map(&mut f)
112        })
113    }
114
115    /// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
116    ///
117    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
118    pub fn with_subscriber_once<V: StoreSubscriber, T, F: FnOnce(&V) -> T>(
119        StoreSubscriberHandle(handle): StoreSubscriberHandle,
120        f: F,
121    ) -> Option<T> {
122        let subscribers = SUBSCRIBERS.read();
123        subscribers.get(handle as usize).and_then(|subscriber| {
124            let subscriber = subscriber.read();
125            subscriber.as_any().downcast_ref::<V>().map(f)
126        })
127    }
128
129    /// Passes a mutable reference to the downcasted subscriber to the given callback.
130    ///
131    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
132    pub fn with_subscriber_mut<V: StoreSubscriber, T, F: FnMut(&mut V) -> T>(
133        StoreSubscriberHandle(handle): StoreSubscriberHandle,
134        mut f: F,
135    ) -> Option<T> {
136        let subscribers = SUBSCRIBERS.read();
137        subscribers.get(handle as usize).and_then(|subscriber| {
138            let mut subscriber = subscriber.write();
139            subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
140        })
141    }
142
143    /// Called by [`DataStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
144    pub(crate) fn on_events(events: &[StoreEvent]) {
145        re_tracing::profile_function!();
146        let subscribers = SUBSCRIBERS.read();
147        // TODO(cmc): might want to parallelize at some point.
148        for subscriber in subscribers.iter() {
149            subscriber.write().on_events(events);
150        }
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use ahash::HashSet;
157
158    use re_log_types::{
159        example_components::{MyColor, MyIndex, MyPoint},
160        DataRow, RowId, StoreId, TimePoint, Timeline,
161    };
162
163    use crate::{DataStore, GarbageCollectionOptions, StoreSubscriber};
164
165    use super::*;
166
167    /// A simple [`StoreSubscriber`] for test purposes that just accumulates [`StoreEvent`]s.
168    #[derive(Debug)]
169    struct AllEvents {
170        store_ids: HashSet<StoreId>,
171        events: Vec<StoreEvent>,
172    }
173
174    impl AllEvents {
175        fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
176            Self {
177                store_ids: store_ids.into_iter().collect(),
178                events: Vec::new(),
179            }
180        }
181    }
182
183    impl StoreSubscriber for AllEvents {
184        fn name(&self) -> String {
185            "rerun.testing.store_subscribers.AllEvents".into()
186        }
187
188        fn as_any(&self) -> &dyn std::any::Any {
189            self
190        }
191
192        fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
193            self
194        }
195
196        fn on_events(&mut self, events: &[StoreEvent]) {
197            self.events.extend(
198                events
199                    .iter()
200                    // NOTE: `cargo` implicitly runs tests in parallel!
201                    .filter(|e| self.store_ids.contains(&e.store_id))
202                    .cloned(),
203            );
204        }
205    }
206
207    #[test]
208    fn store_subscriber() -> anyhow::Result<()> {
209        let mut store1 = DataStore::new(
210            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
211            Default::default(),
212        );
213        let mut store2 = DataStore::new(
214            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
215            Default::default(),
216        );
217
218        let mut expected_events = Vec::new();
219
220        let view = AllEvents::new([store1.id().clone(), store2.id().clone()]);
221        let view_handle = DataStore::register_subscriber(Box::new(view));
222
223        let timeline_frame = Timeline::new_sequence("frame");
224        let timeline_other = Timeline::new_temporal("other");
225        let timeline_yet_another = Timeline::new_sequence("yet_another");
226
227        let row = DataRow::from_component_batches(
228            RowId::new(),
229            TimePoint::from_iter([
230                (timeline_frame, 42),      //
231                (timeline_other, 666),     //
232                (timeline_yet_another, 1), //
233            ]),
234            "entity_a".into(),
235            [&MyIndex::from_iter(0..10) as _],
236        )?;
237
238        expected_events.extend(store1.insert_row(&row));
239
240        let row = {
241            let num_instances = 3;
242            let points: Vec<_> = (0..num_instances)
243                .map(|i| MyPoint::new(0.0, i as f32))
244                .collect();
245            let colors = vec![MyColor::from(0xFF0000FF)];
246            DataRow::from_component_batches(
247                RowId::new(),
248                TimePoint::from_iter([
249                    (timeline_frame, 42),      //
250                    (timeline_yet_another, 1), //
251                ]),
252                "entity_b".into(),
253                [&points as _, &colors as _],
254            )?
255        };
256
257        expected_events.extend(store2.insert_row(&row));
258
259        let row = {
260            let num_instances = 6;
261            let colors = vec![MyColor::from(0x00DD00FF); num_instances];
262            DataRow::from_component_batches(
263                RowId::new(),
264                TimePoint::default(),
265                "entity_b".into(),
266                [
267                    &MyIndex::from_iter(0..num_instances as _) as _,
268                    &colors as _,
269                ],
270            )?
271        };
272
273        expected_events.extend(store1.insert_row(&row));
274
275        expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
276        expected_events.extend(store2.gc(&GarbageCollectionOptions::gc_everything()).0);
277
278        DataStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
279            similar_asserts::assert_eq!(expected_events.len(), got.events.len());
280            similar_asserts::assert_eq!(expected_events, got.events);
281        });
282
283        Ok(())
284    }
285}