re_chunk_store/
subscribers.rs

1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_log_types::StoreId;
5
6use crate::{ChunkStore, ChunkStoreEvent};
7
8// ---
9
10// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
11type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
12
13/// A [`ChunkStoreSubscriber`] subscribes to atomic changes from all [`ChunkStore`]s
14/// through [`ChunkStoreEvent`]s.
15///
16/// [`ChunkStoreSubscriber`]s can be used to build both secondary indices and trigger systems.
17//
18// TODO(#4204): StoreSubscriber should require SizeBytes so they can be part of memstats.
19pub trait ChunkStoreSubscriber: std::any::Any + Send + Sync {
20    /// Arbitrary name for the subscriber.
21    ///
22    /// Does not need to be unique.
23    fn name(&self) -> String;
24
25    /// Workaround for downcasting support, simply return `self`:
26    /// ```ignore
27    /// fn as_any(&self) -> &dyn std::any::Any {
28    ///     self
29    /// }
30    /// ```
31    fn as_any(&self) -> &dyn std::any::Any;
32
33    /// Workaround for downcasting support, simply return `self`:
34    /// ```ignore
35    /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
36    ///     self
37    /// }
38    /// ```
39    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
40
41    /// The core of this trait: get notified of changes happening in all [`ChunkStore`]s.
42    ///
43    /// This will be called automatically by the [`ChunkStore`] itself if the subscriber has been
44    /// registered: [`ChunkStore::register_subscriber`].
45    /// Or you might want to feed it [`ChunkStoreEvent`]s manually, depending on your use case.
46    ///
47    /// ## Example
48    ///
49    /// ```ignore
50    /// fn on_events(&mut self, events: &[ChunkStoreEvent]) {
51    ///     use re_chunk_store::ChunkStoreDiffKind;
52    ///     for event in events {
53    ///         match event.kind {
54    ///             ChunkStoreDiffKind::Addition => println!("Row added: {}", event.row_id),
55    ///             ChunkStoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
56    ///         }
57    ///     }
58    /// }
59    /// ```
60    fn on_events(&mut self, events: &[ChunkStoreEvent]);
61
62    /// Notifies a subscriber that an entire store was dropped.
63    fn on_drop(&mut self, store_id: &StoreId) {
64        _ = store_id;
65    }
66}
67
68/// A [`ChunkStoreSubscriber`] that is instantiated for each unique [`StoreId`].
69pub trait PerStoreChunkSubscriber: Send + Sync + Default {
70    /// Arbitrary name for the subscriber.
71    ///
72    /// Does not need to be unique.
73    fn name() -> String;
74
75    /// Get notified of changes happening in a [`ChunkStore`], see [`ChunkStoreSubscriber::on_events`].
76    ///
77    /// Unlike [`ChunkStoreSubscriber::on_events`], all items are guaranteed to have the same [`StoreId`]
78    /// which does not change per invocation.
79    fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
80}
81
82/// All registered [`ChunkStoreSubscriber`]s.
83static SUBSCRIBERS: std::sync::LazyLock<RwLock<Vec<SharedStoreSubscriber>>> =
84    std::sync::LazyLock::new(|| RwLock::new(Vec::new()));
85
86#[derive(Debug, Clone, Copy)]
87pub struct ChunkStoreSubscriberHandle(u32);
88
89impl ChunkStore {
90    /// Registers a [`ChunkStoreSubscriber`] so it gets automatically notified when data gets added and/or
91    /// removed to/from a [`ChunkStore`].
92    ///
93    /// Refer to [`ChunkStoreEvent`]'s documentation for more information about these events.
94    ///
95    /// ## Scope
96    ///
97    /// Registered [`ChunkStoreSubscriber`]s are global scope: they get notified of all events from all
98    /// existing [`ChunkStore`]s, including [`ChunkStore`]s created after the subscriber was registered.
99    ///
100    /// Use [`ChunkStoreEvent::store_id`] to identify the source of an event.
101    ///
102    /// ## Late registration
103    ///
104    /// Subscribers must be registered before a store gets created to guarantee that no events
105    /// were missed.
106    ///
107    /// [`ChunkStoreEvent::event_id`] can be used to identify missing events.
108    ///
109    /// ## Ordering
110    ///
111    /// The order in which registered subscribers are notified is undefined and will likely become
112    /// concurrent in the future.
113    ///
114    /// If you need a specific order across multiple subscribers, embed them into an orchestrating
115    /// subscriber.
116    //
117    // TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
118    pub fn register_subscriber(
119        subscriber: Box<dyn ChunkStoreSubscriber>,
120    ) -> ChunkStoreSubscriberHandle {
121        let mut subscribers = SUBSCRIBERS.write();
122        subscribers.push(RwLock::new(subscriber));
123        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
124    }
125
126    /// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
127    ///
128    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
129    pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
130        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
131        mut f: F,
132    ) -> Option<T> {
133        let subscribers = SUBSCRIBERS.read();
134        let subscriber = subscribers.get(handle as usize)?;
135        let subscriber = subscriber.read();
136        subscriber.as_any().downcast_ref::<V>().map(&mut f)
137    }
138
139    /// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
140    ///
141    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
142    pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
143        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
144        f: F,
145    ) -> Option<T> {
146        let subscribers = SUBSCRIBERS.read();
147        let subscriber = subscribers.get(handle as usize)?;
148        let subscriber = subscriber.read();
149        subscriber.as_any().downcast_ref::<V>().map(f)
150    }
151
152    /// Passes a mutable reference to the downcasted subscriber to the given callback.
153    ///
154    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
155    pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
156        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
157        mut f: F,
158    ) -> Option<T> {
159        let subscribers = SUBSCRIBERS.read();
160        let subscriber = subscribers.get(handle as usize)?;
161        let mut subscriber = subscriber.write();
162        subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
163    }
164
165    /// Registers a [`PerStoreChunkSubscriber`] type so it gets automatically notified when data gets added and/or
166    /// removed to/from a [`ChunkStore`].
167    pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>()
168    -> ChunkStoreSubscriberHandle {
169        let mut subscribers = SUBSCRIBERS.write();
170        subscribers.push(RwLock::new(Box::new(
171            PerStoreStoreSubscriberWrapper::<S>::default(),
172        )));
173        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
174    }
175
176    /// Notifies all [`PerStoreChunkSubscriber`]s that a store was dropped.
177    pub fn drop_per_store_subscribers(store_id: &StoreId) {
178        let subscribers = SUBSCRIBERS.read();
179        for subscriber in &*subscribers {
180            let mut subscriber = subscriber.write();
181            subscriber.on_drop(store_id);
182        }
183    }
184
185    /// Passes a reference to the downcasted per-store subscriber to the given `FnMut` callback.
186    ///
187    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
188    pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
189        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
190        store_id: &StoreId,
191        mut f: F,
192    ) -> Option<T> {
193        let subscribers = SUBSCRIBERS.read();
194        let subscriber = subscribers.get(handle as usize)?;
195        let subscriber = subscriber.read();
196        subscriber
197            .as_any()
198            .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()?
199            .get(store_id)
200            .map(&mut f)
201    }
202
203    /// Passes a reference to the downcasted per-store subscriber to the given `FnOnce` callback.
204    ///
205    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
206    pub fn with_per_store_subscriber_once<
207        S: PerStoreChunkSubscriber + 'static,
208        T,
209        F: FnOnce(&S) -> T,
210    >(
211        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
212        store_id: &StoreId,
213        f: F,
214    ) -> Option<T> {
215        let subscribers = SUBSCRIBERS.read();
216        let subscriber = subscribers.get(handle as usize)?;
217        let subscriber = subscriber.read();
218        subscriber
219            .as_any()
220            .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
221            .and_then(|wrapper| wrapper.get(store_id).map(f))
222    }
223
224    /// Passes a mutable reference to the downcasted per-store subscriber to the given callback.
225    ///
226    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
227    pub fn with_per_store_subscriber_mut<
228        S: PerStoreChunkSubscriber + 'static,
229        T,
230        F: FnMut(&mut S) -> T,
231    >(
232        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
233        store_id: &StoreId,
234        mut f: F,
235    ) -> Option<T> {
236        let subscribers = SUBSCRIBERS.read();
237        let subscriber = subscribers.get(handle as usize)?;
238        let mut subscriber = subscriber.write();
239        subscriber
240            .as_any_mut()
241            .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
242            .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
243    }
244
245    /// Called by [`ChunkStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
246    pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
247        re_tracing::profile_function!();
248        let subscribers = SUBSCRIBERS.read();
249        // TODO(cmc): might want to parallelize at some point.
250        for subscriber in subscribers.iter() {
251            subscriber.write().on_events(events);
252        }
253    }
254}
255
256/// Utility that makes a [`PerStoreChunkSubscriber`] a [`ChunkStoreSubscriber`].
257#[derive(Default)]
258struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
259    subscribers: HashMap<StoreId, Box<S>>,
260}
261
262impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
263    fn get(&self, store_id: &StoreId) -> Option<&S> {
264        self.subscribers.get(store_id).map(|s| s.as_ref())
265    }
266
267    fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
268        self.subscribers.get_mut(store_id).map(|s| s.as_mut())
269    }
270}
271
272impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
273    for PerStoreStoreSubscriberWrapper<S>
274{
275    fn name(&self) -> String {
276        S::name()
277    }
278
279    fn as_any(&self) -> &dyn std::any::Any {
280        self
281    }
282
283    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
284        self
285    }
286
287    fn on_events(&mut self, events: &[ChunkStoreEvent]) {
288        for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
289            self.subscribers
290                .entry(store_id)
291                .or_default()
292                .on_events(events);
293        }
294    }
295
296    fn on_drop(&mut self, store_id: &StoreId) {
297        self.subscribers.remove(store_id);
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use std::sync::Arc;
304
305    use ahash::HashSet;
306
307    use re_chunk::{Chunk, RowId};
308    use re_log_types::{
309        StoreId, TimePoint, Timeline,
310        example_components::{MyColor, MyIndex, MyPoint, MyPoints},
311    };
312
313    use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
314
315    use super::*;
316
317    /// A simple [`ChunkStoreSubscriber`] for test purposes that just accumulates [`ChunkStoreEvent`]s.
318    #[derive(Debug)]
319    struct AllEvents {
320        store_ids: HashSet<StoreId>,
321        events: Vec<ChunkStoreEvent>,
322    }
323
324    impl AllEvents {
325        fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
326            Self {
327                store_ids: store_ids.into_iter().collect(),
328                events: Vec::new(),
329            }
330        }
331    }
332
333    impl ChunkStoreSubscriber for AllEvents {
334        fn name(&self) -> String {
335            "rerun.testing.store_subscribers.AllEvents".into()
336        }
337
338        fn as_any(&self) -> &dyn std::any::Any {
339            self
340        }
341
342        fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
343            self
344        }
345
346        fn on_events(&mut self, events: &[ChunkStoreEvent]) {
347            self.events.extend(
348                events
349                    .iter()
350                    // NOTE: `cargo` implicitly runs tests in parallel!
351                    .filter(|e| self.store_ids.contains(&e.store_id))
352                    .cloned(),
353            );
354        }
355    }
356
357    #[test]
358    fn store_subscriber() -> anyhow::Result<()> {
359        let mut store1 = ChunkStore::new(
360            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
361            Default::default(),
362        );
363        let mut store = ChunkStore::new(
364            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
365            Default::default(),
366        );
367
368        let mut expected_events = Vec::new();
369
370        let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
371        let view_handle = ChunkStore::register_subscriber(Box::new(view));
372
373        let timeline_frame = Timeline::new_sequence("frame");
374        let timeline_other = Timeline::new_duration("other");
375        let timeline_yet_another = Timeline::new_sequence("yet_another");
376
377        let chunk = Chunk::builder("entity_a")
378            .with_component_batch(
379                RowId::new(),
380                TimePoint::from_iter([
381                    (timeline_frame, 42),      //
382                    (timeline_other, 666),     //
383                    (timeline_yet_another, 1), //
384                ]),
385                (MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
386            )
387            .build()?;
388
389        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
390
391        let chunk = {
392            let num_instances = 3;
393            let points: Vec<_> = (0..num_instances)
394                .map(|i| MyPoint::new(0.0, i as f32))
395                .collect();
396            let colors = vec![MyColor::from(0xFF0000FF)];
397            Chunk::builder("entity_b")
398                .with_component_batches(
399                    RowId::new(),
400                    TimePoint::from_iter([
401                        (timeline_frame, 42),      //
402                        (timeline_yet_another, 1), //
403                    ]),
404                    [
405                        (MyPoints::descriptor_points(), &points as _),
406                        (MyPoints::descriptor_colors(), &colors as _),
407                    ],
408                )
409                .build()?
410        };
411
412        expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
413
414        let chunk = {
415            let num_instances = 6;
416            let colors = vec![MyColor::from(0x00DD00FF); num_instances];
417            Chunk::builder("entity_b")
418                .with_component_batches(
419                    RowId::new(),
420                    TimePoint::default(),
421                    [
422                        (
423                            MyIndex::partial_descriptor(),
424                            &MyIndex::from_iter(0..num_instances as _) as _,
425                        ),
426                        (MyPoints::descriptor_colors(), &colors as _),
427                    ],
428                )
429                .build()?
430        };
431
432        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
433
434        expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
435        expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
436
437        ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
438            similar_asserts::assert_eq!(expected_events.len(), got.events.len());
439            similar_asserts::assert_eq!(expected_events, got.events);
440        });
441
442        Ok(())
443    }
444}