re_chunk_store/
subscribers.rs

1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_log_types::StoreId;
5
6use crate::{ChunkStore, ChunkStoreEvent};
7
8// ---
9
10// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
11type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
12
13/// A [`ChunkStoreSubscriber`] subscribes to atomic changes from all [`ChunkStore`]s
14/// through [`ChunkStoreEvent`]s.
15///
16/// [`ChunkStoreSubscriber`]s can be used to build both secondary indices and trigger systems.
17//
18// TODO(#4204): StoreSubscriber should require SizeBytes so they can be part of memstats.
19pub trait ChunkStoreSubscriber: std::any::Any + Send + Sync {
20    /// Arbitrary name for the subscriber.
21    ///
22    /// Does not need to be unique.
23    fn name(&self) -> String;
24
25    /// Workaround for downcasting support, simply return `self`:
26    /// ```ignore
27    /// fn as_any(&self) -> &dyn std::any::Any {
28    ///     self
29    /// }
30    /// ```
31    fn as_any(&self) -> &dyn std::any::Any;
32
33    /// Workaround for downcasting support, simply return `self`:
34    /// ```ignore
35    /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
36    ///     self
37    /// }
38    /// ```
39    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
40
41    /// The core of this trait: get notified of changes happening in all [`ChunkStore`]s.
42    ///
43    /// This will be called automatically by the [`ChunkStore`] itself if the subscriber has been
44    /// registered: [`ChunkStore::register_subscriber`].
45    /// Or you might want to feed it [`ChunkStoreEvent`]s manually, depending on your use case.
46    ///
47    /// ## Example
48    ///
49    /// ```ignore
50    /// fn on_events(&mut self, events: &[ChunkStoreEvent]) {
51    ///     use re_chunk_store::ChunkStoreDiffKind;
52    ///     for event in events {
53    ///         match event.kind {
54    ///             ChunkStoreDiffKind::Addition => println!("Row added: {}", event.row_id),
55    ///             ChunkStoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
56    ///         }
57    ///     }
58    /// }
59    /// ```
60    fn on_events(&mut self, events: &[ChunkStoreEvent]);
61
62    /// Notifies a subscriber that an entire store was dropped.
63    fn on_drop(&mut self, store_id: &StoreId) {
64        _ = store_id;
65    }
66}
67
68/// A [`ChunkStoreSubscriber`] that is instantiated for each unique [`StoreId`].
69pub trait PerStoreChunkSubscriber: Send + Sync + Default {
70    /// Arbitrary name for the subscriber.
71    ///
72    /// Does not need to be unique.
73    fn name() -> String;
74
75    /// Get notified of changes happening in a [`ChunkStore`], see [`ChunkStoreSubscriber::on_events`].
76    ///
77    /// Unlike [`ChunkStoreSubscriber::on_events`], all items are guaranteed to have the same [`StoreId`]
78    /// which does not change per invocation.
79    fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
80}
81
82/// All registered [`ChunkStoreSubscriber`]s.
83static SUBSCRIBERS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreSubscriber>>> =
84    once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
85
86#[derive(Debug, Clone, Copy)]
87pub struct ChunkStoreSubscriberHandle(u32);
88
89impl ChunkStore {
90    /// Registers a [`ChunkStoreSubscriber`] so it gets automatically notified when data gets added and/or
91    /// removed to/from a [`ChunkStore`].
92    ///
93    /// Refer to [`ChunkStoreEvent`]'s documentation for more information about these events.
94    ///
95    /// ## Scope
96    ///
97    /// Registered [`ChunkStoreSubscriber`]s are global scope: they get notified of all events from all
98    /// existing [`ChunkStore`]s, including [`ChunkStore`]s created after the subscriber was registered.
99    ///
100    /// Use [`ChunkStoreEvent::store_id`] to identify the source of an event.
101    ///
102    /// ## Late registration
103    ///
104    /// Subscribers must be registered before a store gets created to guarantee that no events
105    /// were missed.
106    ///
107    /// [`ChunkStoreEvent::event_id`] can be used to identify missing events.
108    ///
109    /// ## Ordering
110    ///
111    /// The order in which registered subscribers are notified is undefined and will likely become
112    /// concurrent in the future.
113    ///
114    /// If you need a specific order across multiple subscribers, embed them into an orchestrating
115    /// subscriber.
116    //
117    // TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
118    pub fn register_subscriber(
119        subscriber: Box<dyn ChunkStoreSubscriber>,
120    ) -> ChunkStoreSubscriberHandle {
121        let mut subscribers = SUBSCRIBERS.write();
122        subscribers.push(RwLock::new(subscriber));
123        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
124    }
125
126    /// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
127    ///
128    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
129    pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
130        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
131        mut f: F,
132    ) -> Option<T> {
133        let subscribers = SUBSCRIBERS.read();
134        subscribers.get(handle as usize).and_then(|subscriber| {
135            let subscriber = subscriber.read();
136            subscriber.as_any().downcast_ref::<V>().map(&mut f)
137        })
138    }
139
140    /// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
141    ///
142    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
143    pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
144        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
145        f: F,
146    ) -> Option<T> {
147        let subscribers = SUBSCRIBERS.read();
148        subscribers.get(handle as usize).and_then(|subscriber| {
149            let subscriber = subscriber.read();
150            subscriber.as_any().downcast_ref::<V>().map(f)
151        })
152    }
153
154    /// Passes a mutable reference to the downcasted subscriber to the given callback.
155    ///
156    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
157    pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
158        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
159        mut f: F,
160    ) -> Option<T> {
161        let subscribers = SUBSCRIBERS.read();
162        subscribers.get(handle as usize).and_then(|subscriber| {
163            let mut subscriber = subscriber.write();
164            subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
165        })
166    }
167
168    /// Registers a [`PerStoreChunkSubscriber`] type so it gets automatically notified when data gets added and/or
169    /// removed to/from a [`ChunkStore`].
170    pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>(
171    ) -> ChunkStoreSubscriberHandle {
172        let mut subscribers = SUBSCRIBERS.write();
173        subscribers.push(RwLock::new(Box::new(
174            PerStoreStoreSubscriberWrapper::<S>::default(),
175        )));
176        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
177    }
178
179    /// Notifies all [`PerStoreChunkSubscriber`]s that a store was dropped.
180    pub fn drop_per_store_subscribers(store_id: &StoreId) {
181        let subscribers = SUBSCRIBERS.read();
182        for subscriber in &*subscribers {
183            let mut subscriber = subscriber.write();
184            subscriber.on_drop(store_id);
185        }
186    }
187
188    /// Passes a reference to the downcasted per-store subscriber to the given `FnMut` callback.
189    ///
190    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
191    pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
192        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
193        store_id: &StoreId,
194        mut f: F,
195    ) -> Option<T> {
196        let subscribers = SUBSCRIBERS.read();
197        subscribers.get(handle as usize).and_then(|subscriber| {
198            let subscriber = subscriber.read();
199            subscriber
200                .as_any()
201                .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
202                .and_then(|wrapper| wrapper.get(store_id).map(&mut f))
203        })
204    }
205
206    /// Passes a reference to the downcasted per-store subscriber to the given `FnOnce` callback.
207    ///
208    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
209    pub fn with_per_store_subscriber_once<
210        S: PerStoreChunkSubscriber + 'static,
211        T,
212        F: FnOnce(&S) -> T,
213    >(
214        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
215        store_id: &StoreId,
216        f: F,
217    ) -> Option<T> {
218        let subscribers = SUBSCRIBERS.read();
219        subscribers.get(handle as usize).and_then(|subscriber| {
220            let subscriber = subscriber.read();
221            subscriber
222                .as_any()
223                .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
224                .and_then(|wrapper| wrapper.get(store_id).map(f))
225        })
226    }
227
228    /// Passes a mutable reference to the downcasted per-store subscriber to the given callback.
229    ///
230    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
231    pub fn with_per_store_subscriber_mut<
232        S: PerStoreChunkSubscriber + 'static,
233        T,
234        F: FnMut(&mut S) -> T,
235    >(
236        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
237        store_id: &StoreId,
238        mut f: F,
239    ) -> Option<T> {
240        let subscribers = SUBSCRIBERS.read();
241        subscribers.get(handle as usize).and_then(|subscriber| {
242            let mut subscriber = subscriber.write();
243            subscriber
244                .as_any_mut()
245                .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
246                .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
247        })
248    }
249
250    /// Called by [`ChunkStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
251    pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
252        re_tracing::profile_function!();
253        let subscribers = SUBSCRIBERS.read();
254        // TODO(cmc): might want to parallelize at some point.
255        for subscriber in subscribers.iter() {
256            subscriber.write().on_events(events);
257        }
258    }
259}
260
261/// Utility that makes a [`PerStoreChunkSubscriber`] a [`ChunkStoreSubscriber`].
262#[derive(Default)]
263struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
264    subscribers: HashMap<StoreId, Box<S>>,
265}
266
267impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
268    fn get(&self, store_id: &StoreId) -> Option<&S> {
269        self.subscribers.get(store_id).map(|s| s.as_ref())
270    }
271
272    fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
273        self.subscribers.get_mut(store_id).map(|s| s.as_mut())
274    }
275}
276
277impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
278    for PerStoreStoreSubscriberWrapper<S>
279{
280    fn name(&self) -> String {
281        S::name()
282    }
283
284    fn as_any(&self) -> &dyn std::any::Any {
285        self
286    }
287
288    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
289        self
290    }
291
292    fn on_events(&mut self, events: &[ChunkStoreEvent]) {
293        for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
294            self.subscribers
295                .entry(store_id)
296                .or_default()
297                .on_events(events);
298        }
299    }
300
301    fn on_drop(&mut self, store_id: &StoreId) {
302        self.subscribers.remove(store_id);
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use std::sync::Arc;
309
310    use ahash::HashSet;
311
312    use re_chunk::{Chunk, RowId};
313    use re_log_types::{
314        example_components::{MyColor, MyIndex, MyPoint},
315        StoreId, TimePoint, Timeline,
316    };
317
318    use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
319
320    use super::*;
321
322    /// A simple [`ChunkStoreSubscriber`] for test purposes that just accumulates [`ChunkStoreEvent`]s.
323    #[derive(Debug)]
324    struct AllEvents {
325        store_ids: HashSet<StoreId>,
326        events: Vec<ChunkStoreEvent>,
327    }
328
329    impl AllEvents {
330        fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
331            Self {
332                store_ids: store_ids.into_iter().collect(),
333                events: Vec::new(),
334            }
335        }
336    }
337
338    impl ChunkStoreSubscriber for AllEvents {
339        fn name(&self) -> String {
340            "rerun.testing.store_subscribers.AllEvents".into()
341        }
342
343        fn as_any(&self) -> &dyn std::any::Any {
344            self
345        }
346
347        fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
348            self
349        }
350
351        fn on_events(&mut self, events: &[ChunkStoreEvent]) {
352            self.events.extend(
353                events
354                    .iter()
355                    // NOTE: `cargo` implicitly runs tests in parallel!
356                    .filter(|e| self.store_ids.contains(&e.store_id))
357                    .cloned(),
358            );
359        }
360    }
361
362    #[test]
363    fn store_subscriber() -> anyhow::Result<()> {
364        let mut store1 = ChunkStore::new(
365            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
366            Default::default(),
367        );
368        let mut store = ChunkStore::new(
369            re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
370            Default::default(),
371        );
372
373        let mut expected_events = Vec::new();
374
375        let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
376        let view_handle = ChunkStore::register_subscriber(Box::new(view));
377
378        let timeline_frame = Timeline::new_sequence("frame");
379        let timeline_other = Timeline::new_temporal("other");
380        let timeline_yet_another = Timeline::new_sequence("yet_another");
381
382        let chunk = Chunk::builder("entity_a".into())
383            .with_component_batch(
384                RowId::new(),
385                TimePoint::from_iter([
386                    (timeline_frame, 42),      //
387                    (timeline_other, 666),     //
388                    (timeline_yet_another, 1), //
389                ]),
390                &MyIndex::from_iter(0..10),
391            )
392            .build()?;
393
394        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
395
396        let chunk = {
397            let num_instances = 3;
398            let points: Vec<_> = (0..num_instances)
399                .map(|i| MyPoint::new(0.0, i as f32))
400                .collect();
401            let colors = vec![MyColor::from(0xFF0000FF)];
402            Chunk::builder("entity_b".into())
403                .with_component_batches(
404                    RowId::new(),
405                    TimePoint::from_iter([
406                        (timeline_frame, 42),      //
407                        (timeline_yet_another, 1), //
408                    ]),
409                    [&points as _, &colors as _],
410                )
411                .build()?
412        };
413
414        expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
415
416        let chunk = {
417            let num_instances = 6;
418            let colors = vec![MyColor::from(0x00DD00FF); num_instances];
419            Chunk::builder("entity_b".into())
420                .with_component_batches(
421                    RowId::new(),
422                    TimePoint::default(),
423                    [
424                        &MyIndex::from_iter(0..num_instances as _) as _,
425                        &colors as _,
426                    ],
427                )
428                .build()?
429        };
430
431        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
432
433        expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
434        expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
435
436        ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
437            similar_asserts::assert_eq!(expected_events.len(), got.events.len());
438            similar_asserts::assert_eq!(expected_events, got.events);
439        });
440
441        Ok(())
442    }
443}