Skip to main content

re_chunk_store/
subscribers.rs

1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_byte_size::{MemUsageNode, MemUsageTree, MemUsageTreeCapture};
5use re_log_types::StoreId;
6
7use crate::{ChunkStore, ChunkStoreEvent};
8
9// ---
10
11// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
12type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
13
14/// A [`ChunkStoreSubscriber`] subscribes to atomic changes from all [`ChunkStore`]s
15/// through [`ChunkStoreEvent`]s.
16///
17/// [`ChunkStoreSubscriber`]s can be used to build both secondary indices and trigger systems.
18///
19/// The [`MemUsageTreeCapture`] bound lets the viewer's dev panel show how much memory each subscriber uses.
20pub trait ChunkStoreSubscriber: MemUsageTreeCapture + std::any::Any + Send + Sync {
21    /// Arbitrary name for the subscriber.
22    ///
23    /// Does not need to be unique.
24    fn name(&self) -> String;
25
26    /// Workaround for downcasting support, simply return `self`:
27    /// ```ignore
28    /// fn as_any(&self) -> &dyn std::any::Any {
29    ///     self
30    /// }
31    /// ```
32    fn as_any(&self) -> &dyn std::any::Any;
33
34    /// Workaround for downcasting support, simply return `self`:
35    /// ```ignore
36    /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
37    ///     self
38    /// }
39    /// ```
40    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
41
42    /// The core of this trait: get notified of changes happening in all [`ChunkStore`]s.
43    ///
44    /// This will be called automatically by the [`ChunkStore`] itself if the subscriber has been
45    /// registered: [`ChunkStore::register_subscriber`].
46    /// Or you might want to feed it [`ChunkStoreEvent`]s manually, depending on your use case.
47    ///
48    /// ## Example
49    ///
50    /// ```ignore
51    /// fn on_events(&mut self, events: &[ChunkStoreEvent]) {
52    ///     use re_chunk_store::ChunkStoreDiffKind;
53    ///     for event in events {
54    ///         match event.kind {
55    ///             ChunkStoreDiffKind::Addition => println!("Row added: {}", event.row_id),
56    ///             ChunkStoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
57    ///         }
58    ///     }
59    /// }
60    /// ```
61    fn on_events(&mut self, events: &[ChunkStoreEvent]);
62
63    /// Notifies a subscriber that an entire store was dropped.
64    fn on_drop(&mut self, store_id: &StoreId) {
65        _ = store_id;
66    }
67}
68
69/// A [`ChunkStoreSubscriber`] that is instantiated for each unique [`StoreId`].
70///
71/// The [`MemUsageTreeCapture`] bound lets the viewer's dev panel show memory usage per [`StoreId`].
72pub trait PerStoreChunkSubscriber: MemUsageTreeCapture + Send + Sync + Default {
73    /// Arbitrary name for the subscriber.
74    ///
75    /// Does not need to be unique.
76    fn name() -> String;
77
78    /// Get notified of changes happening in a [`ChunkStore`], see [`ChunkStoreSubscriber::on_events`].
79    ///
80    /// Unlike [`ChunkStoreSubscriber::on_events`], all items are guaranteed to have the same [`StoreId`]
81    /// which does not change per invocation.
82    fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
83}
84
85/// All registered [`ChunkStoreSubscriber`]s.
86static SUBSCRIBERS: std::sync::LazyLock<RwLock<Vec<SharedStoreSubscriber>>> =
87    std::sync::LazyLock::new(|| RwLock::new(Vec::new()));
88
89#[derive(Debug, Clone, Copy)]
90pub struct ChunkStoreSubscriberHandle(u32);
91
92impl ChunkStore {
93    /// Registers a [`ChunkStoreSubscriber`] so it gets automatically notified when data gets added and/or
94    /// removed to/from a [`ChunkStore`].
95    ///
96    /// Refer to [`ChunkStoreEvent`]'s documentation for more information about these events.
97    ///
98    /// ## Scope
99    ///
100    /// Registered [`ChunkStoreSubscriber`]s are global scope: they get notified of all events from all
101    /// existing [`ChunkStore`]s, including [`ChunkStore`]s created after the subscriber was registered.
102    ///
103    /// Use [`ChunkStoreEvent::store_id`] to identify the source of an event.
104    ///
105    /// ## Late registration
106    ///
107    /// Subscribers must be registered before a store gets created to guarantee that no events
108    /// were missed.
109    ///
110    /// [`ChunkStoreEvent::event_id`] can be used to identify missing events.
111    ///
112    /// ## Ordering
113    ///
114    /// The order in which registered subscribers are notified is undefined and will likely become
115    /// concurrent in the future.
116    ///
117    /// If you need a specific order across multiple subscribers, embed them into an orchestrating
118    /// subscriber.
119    //
120    // TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
121    pub fn register_subscriber(
122        subscriber: Box<dyn ChunkStoreSubscriber>,
123    ) -> ChunkStoreSubscriberHandle {
124        let mut subscribers = SUBSCRIBERS.write();
125        subscribers.push(RwLock::new(subscriber));
126        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
127    }
128
129    /// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
130    ///
131    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
132    pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
133        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
134        mut f: F,
135    ) -> Option<T> {
136        let subscribers = SUBSCRIBERS.read();
137        let subscriber = subscribers.get(handle as usize)?;
138        let subscriber = subscriber.read();
139        subscriber.as_any().downcast_ref::<V>().map(&mut f)
140    }
141
142    /// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
143    ///
144    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
145    pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
146        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
147        f: F,
148    ) -> Option<T> {
149        let subscribers = SUBSCRIBERS.read();
150        let subscriber = subscribers.get(handle as usize)?;
151        let subscriber = subscriber.read();
152        subscriber.as_any().downcast_ref::<V>().map(f)
153    }
154
155    /// Passes a mutable reference to the downcasted subscriber to the given callback.
156    ///
157    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
158    pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
159        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
160        mut f: F,
161    ) -> Option<T> {
162        let subscribers = SUBSCRIBERS.read();
163        let subscriber = subscribers.get(handle as usize)?;
164        let mut subscriber = subscriber.write();
165        subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
166    }
167
168    /// Registers a [`PerStoreChunkSubscriber`] type so it gets automatically notified when data gets added and/or
169    /// removed to/from a [`ChunkStore`].
170    pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>()
171    -> ChunkStoreSubscriberHandle {
172        let mut subscribers = SUBSCRIBERS.write();
173        subscribers.push(RwLock::new(Box::new(
174            PerStoreStoreSubscriberWrapper::<S>::default(),
175        )));
176        ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
177    }
178
179    /// Notifies all [`PerStoreChunkSubscriber`]s that a store was dropped.
180    pub fn drop_per_store_subscribers(store_id: &StoreId) {
181        let subscribers = SUBSCRIBERS.read();
182        for subscriber in &*subscribers {
183            let mut subscriber = subscriber.write();
184            subscriber.on_drop(store_id);
185        }
186    }
187
188    /// Passes a reference to the downcasted per-store subscriber to the given `FnMut` callback.
189    ///
190    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
191    pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
192        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
193        store_id: &StoreId,
194        mut f: F,
195    ) -> Option<T> {
196        let subscribers = SUBSCRIBERS.read();
197        let subscriber = subscribers.get(handle as usize)?;
198        let subscriber = subscriber.read();
199        subscriber
200            .as_any()
201            .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()?
202            .get(store_id)
203            .map(&mut f)
204    }
205
206    /// Passes a reference to the downcasted per-store subscriber to the given `FnOnce` callback.
207    ///
208    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
209    pub fn with_per_store_subscriber_once<
210        S: PerStoreChunkSubscriber + 'static,
211        T,
212        F: FnOnce(&S) -> T,
213    >(
214        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
215        store_id: &StoreId,
216        f: F,
217    ) -> Option<T> {
218        let subscribers = SUBSCRIBERS.read();
219        let subscriber = subscribers.get(handle as usize)?;
220        let subscriber = subscriber.read();
221        subscriber
222            .as_any()
223            .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
224            .and_then(|wrapper| wrapper.get(store_id).map(f))
225    }
226
227    /// Passes a mutable reference to the downcasted per-store subscriber to the given callback.
228    ///
229    /// Returns `None` if the subscriber doesn't exist or downcasting failed.
230    pub fn with_per_store_subscriber_mut<
231        S: PerStoreChunkSubscriber + 'static,
232        T,
233        F: FnMut(&mut S) -> T,
234    >(
235        ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
236        store_id: &StoreId,
237        mut f: F,
238    ) -> Option<T> {
239        let subscribers = SUBSCRIBERS.read();
240        let subscriber = subscribers.get(handle as usize)?;
241        let mut subscriber = subscriber.write();
242        subscriber
243            .as_any_mut()
244            .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
245            .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
246    }
247
248    /// Called by [`ChunkStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
249    pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
250        re_tracing::profile_function!();
251        let subscribers = SUBSCRIBERS.read();
252        // TODO(cmc): might want to parallelize at some point.
253        for subscriber in subscribers.iter() {
254            subscriber.write().on_events(events);
255        }
256    }
257
258    /// Captures the memory usage of all registered subscribers.
259    ///
260    /// Names are disambiguated with a `#idx` suffix when multiple subscribers share the same name.
261    pub fn capture_all_subscribers_mem_usage_tree() -> MemUsageTree {
262        re_tracing::profile_function!();
263        let subscribers = SUBSCRIBERS.read();
264        let mut node = MemUsageNode::new();
265        let mut name_counts: HashMap<String, usize> = HashMap::default();
266        for subscriber in subscribers.iter() {
267            let subscriber = subscriber.read();
268            let base_name = subscriber.name();
269            let count = name_counts.entry(base_name.clone()).or_insert(0);
270            let name = if *count == 0 {
271                base_name
272            } else {
273                format!("{base_name}#{count}")
274            };
275            *count += 1;
276            node.add(name, subscriber.capture_mem_usage_tree());
277        }
278        node.into_tree()
279    }
280}
281
282/// Utility that makes a [`PerStoreChunkSubscriber`] a [`ChunkStoreSubscriber`].
283#[derive(Default)]
284struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
285    subscribers: HashMap<StoreId, Box<S>>,
286}
287
288impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
289    fn get(&self, store_id: &StoreId) -> Option<&S> {
290        self.subscribers.get(store_id).map(|s| s.as_ref())
291    }
292
293    fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
294        self.subscribers.get_mut(store_id).map(|s| s.as_mut())
295    }
296}
297
298impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
299    for PerStoreStoreSubscriberWrapper<S>
300{
301    fn name(&self) -> String {
302        S::name()
303    }
304
305    fn as_any(&self) -> &dyn std::any::Any {
306        self
307    }
308
309    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
310        self
311    }
312
313    fn on_events(&mut self, events: &[ChunkStoreEvent]) {
314        for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
315            self.subscribers
316                .entry(store_id)
317                .or_default()
318                .on_events(events);
319        }
320    }
321
322    fn on_drop(&mut self, store_id: &StoreId) {
323        self.subscribers.remove(store_id);
324    }
325}
326
327impl<S: PerStoreChunkSubscriber + 'static> MemUsageTreeCapture
328    for PerStoreStoreSubscriberWrapper<S>
329{
330    fn capture_mem_usage_tree(&self) -> MemUsageTree {
331        let mut node = MemUsageNode::new();
332        for (store_id, subscriber) in &self.subscribers {
333            let name = format!(
334                "{}/{}",
335                store_id.application_id().as_str(),
336                store_id.recording_id().as_str()
337            );
338            node.add(name, subscriber.capture_mem_usage_tree());
339        }
340        node.into_tree()
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::sync::Arc;
347
348    use ahash::HashSet;
349    use re_chunk::{Chunk, RowId};
350    use re_log_types::example_components::{MyColor, MyIndex, MyPoint, MyPoints};
351    use re_log_types::{StoreId, TimePoint, Timeline};
352
353    use super::*;
354    use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
355
356    /// A simple [`ChunkStoreSubscriber`] for test purposes that just accumulates [`ChunkStoreEvent`]s.
357    #[derive(Debug)]
358    struct AllEvents {
359        store_ids: HashSet<StoreId>,
360        events: Vec<ChunkStoreEvent>,
361    }
362
363    impl AllEvents {
364        fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
365            Self {
366                store_ids: store_ids.into_iter().collect(),
367                events: Vec::new(),
368            }
369        }
370    }
371
372    impl MemUsageTreeCapture for AllEvents {
373        fn capture_mem_usage_tree(&self) -> MemUsageTree {
374            MemUsageTree::Bytes(0)
375        }
376    }
377
378    impl ChunkStoreSubscriber for AllEvents {
379        fn name(&self) -> String {
380            "rerun.testing.store_subscribers.AllEvents".into()
381        }
382
383        fn as_any(&self) -> &dyn std::any::Any {
384            self
385        }
386
387        fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
388            self
389        }
390
391        fn on_events(&mut self, events: &[ChunkStoreEvent]) {
392            self.events.extend(
393                events
394                    .iter()
395                    // NOTE: `cargo` implicitly runs tests in parallel!
396                    .filter(|e| self.store_ids.contains(&e.store_id))
397                    .cloned(),
398            );
399        }
400    }
401
402    #[test]
403    fn store_subscriber() -> anyhow::Result<()> {
404        let mut store1 = ChunkStore::new(
405            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
406            Default::default(),
407        );
408        let mut store = ChunkStore::new(
409            re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
410            Default::default(),
411        );
412
413        let mut expected_events = Vec::new();
414
415        let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
416        let view_handle = ChunkStore::register_subscriber(Box::new(view));
417
418        let timeline_frame = Timeline::new_sequence("frame");
419        let timeline_other = Timeline::new_duration("other");
420        let timeline_yet_another = Timeline::new_sequence("yet_another");
421
422        let chunk = Chunk::builder("entity_a")
423            .with_component_batch(
424                RowId::new(),
425                TimePoint::from_iter([
426                    (timeline_frame, 42),      //
427                    (timeline_other, 666),     //
428                    (timeline_yet_another, 1), //
429                ]),
430                (MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
431            )
432            .build()?;
433
434        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
435
436        let chunk = {
437            let num_instances = 3;
438            let points: Vec<_> = (0..num_instances)
439                .map(|i| MyPoint::new(0.0, i as f32))
440                .collect();
441            let colors = vec![MyColor::from(0xFF0000FF)];
442            Chunk::builder("entity_b")
443                .with_component_batches(
444                    RowId::new(),
445                    TimePoint::from_iter([
446                        (timeline_frame, 42),      //
447                        (timeline_yet_another, 1), //
448                    ]),
449                    [
450                        (MyPoints::descriptor_points(), &points as _),
451                        (MyPoints::descriptor_colors(), &colors as _),
452                    ],
453                )
454                .build()?
455        };
456
457        expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
458
459        let chunk = {
460            let num_instances = 6;
461            let colors = vec![MyColor::from(0x00DD00FF); num_instances];
462            Chunk::builder("entity_b")
463                .with_component_batches(
464                    RowId::new(),
465                    TimePoint::default(),
466                    [
467                        (
468                            MyIndex::partial_descriptor(),
469                            &MyIndex::from_iter(0..num_instances as _) as _,
470                        ),
471                        (MyPoints::descriptor_colors(), &colors as _),
472                    ],
473                )
474                .build()?
475        };
476
477        expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
478
479        expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
480        expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
481
482        ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
483            similar_asserts::assert_eq!(expected_events.len(), got.events.len());
484            similar_asserts::assert_eq!(expected_events, got.events);
485        });
486
487        Ok(())
488    }
489}