re_data_store/
store_subscriber.rs1use parking_lot::RwLock;
2
3use crate::{DataStore, StoreEvent};
4
5type SharedStoreSubscriber = RwLock<Box<dyn StoreSubscriber>>;
9
10pub trait StoreSubscriber: std::any::Any + Send + Sync {
16 fn name(&self) -> String;
20
21 fn as_any(&self) -> &dyn std::any::Any;
28
29 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
36
37 fn on_events(&mut self, events: &[StoreEvent]);
57}
58
59static SUBSCRIBERS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreSubscriber>>> =
61 once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
62
63#[derive(Debug, Clone, Copy)]
64pub struct StoreSubscriberHandle(u32);
65
66impl DataStore {
67 pub fn register_subscriber(subscriber: Box<dyn StoreSubscriber>) -> StoreSubscriberHandle {
96 let mut subscribers = SUBSCRIBERS.write();
97 subscribers.push(RwLock::new(subscriber));
98 StoreSubscriberHandle(subscribers.len() as u32 - 1)
99 }
100
101 pub fn with_subscriber<V: StoreSubscriber, T, F: FnMut(&V) -> T>(
105 StoreSubscriberHandle(handle): StoreSubscriberHandle,
106 mut f: F,
107 ) -> Option<T> {
108 let subscribers = SUBSCRIBERS.read();
109 subscribers.get(handle as usize).and_then(|subscriber| {
110 let subscriber = subscriber.read();
111 subscriber.as_any().downcast_ref::<V>().map(&mut f)
112 })
113 }
114
115 pub fn with_subscriber_once<V: StoreSubscriber, T, F: FnOnce(&V) -> T>(
119 StoreSubscriberHandle(handle): StoreSubscriberHandle,
120 f: F,
121 ) -> Option<T> {
122 let subscribers = SUBSCRIBERS.read();
123 subscribers.get(handle as usize).and_then(|subscriber| {
124 let subscriber = subscriber.read();
125 subscriber.as_any().downcast_ref::<V>().map(f)
126 })
127 }
128
129 pub fn with_subscriber_mut<V: StoreSubscriber, T, F: FnMut(&mut V) -> T>(
133 StoreSubscriberHandle(handle): StoreSubscriberHandle,
134 mut f: F,
135 ) -> Option<T> {
136 let subscribers = SUBSCRIBERS.read();
137 subscribers.get(handle as usize).and_then(|subscriber| {
138 let mut subscriber = subscriber.write();
139 subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
140 })
141 }
142
143 pub(crate) fn on_events(events: &[StoreEvent]) {
145 re_tracing::profile_function!();
146 let subscribers = SUBSCRIBERS.read();
147 for subscriber in subscribers.iter() {
149 subscriber.write().on_events(events);
150 }
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use ahash::HashSet;
157
158 use re_log_types::{
159 example_components::{MyColor, MyIndex, MyPoint},
160 DataRow, RowId, StoreId, TimePoint, Timeline,
161 };
162
163 use crate::{DataStore, GarbageCollectionOptions, StoreSubscriber};
164
165 use super::*;
166
167 #[derive(Debug)]
169 struct AllEvents {
170 store_ids: HashSet<StoreId>,
171 events: Vec<StoreEvent>,
172 }
173
174 impl AllEvents {
175 fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
176 Self {
177 store_ids: store_ids.into_iter().collect(),
178 events: Vec::new(),
179 }
180 }
181 }
182
183 impl StoreSubscriber for AllEvents {
184 fn name(&self) -> String {
185 "rerun.testing.store_subscribers.AllEvents".into()
186 }
187
188 fn as_any(&self) -> &dyn std::any::Any {
189 self
190 }
191
192 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
193 self
194 }
195
196 fn on_events(&mut self, events: &[StoreEvent]) {
197 self.events.extend(
198 events
199 .iter()
200 .filter(|e| self.store_ids.contains(&e.store_id))
202 .cloned(),
203 );
204 }
205 }
206
207 #[test]
208 fn store_subscriber() -> anyhow::Result<()> {
209 let mut store1 = DataStore::new(
210 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
211 Default::default(),
212 );
213 let mut store2 = DataStore::new(
214 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
215 Default::default(),
216 );
217
218 let mut expected_events = Vec::new();
219
220 let view = AllEvents::new([store1.id().clone(), store2.id().clone()]);
221 let view_handle = DataStore::register_subscriber(Box::new(view));
222
223 let timeline_frame = Timeline::new_sequence("frame");
224 let timeline_other = Timeline::new_temporal("other");
225 let timeline_yet_another = Timeline::new_sequence("yet_another");
226
227 let row = DataRow::from_component_batches(
228 RowId::new(),
229 TimePoint::from_iter([
230 (timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]),
234 "entity_a".into(),
235 [&MyIndex::from_iter(0..10) as _],
236 )?;
237
238 expected_events.extend(store1.insert_row(&row));
239
240 let row = {
241 let num_instances = 3;
242 let points: Vec<_> = (0..num_instances)
243 .map(|i| MyPoint::new(0.0, i as f32))
244 .collect();
245 let colors = vec![MyColor::from(0xFF0000FF)];
246 DataRow::from_component_batches(
247 RowId::new(),
248 TimePoint::from_iter([
249 (timeline_frame, 42), (timeline_yet_another, 1), ]),
252 "entity_b".into(),
253 [&points as _, &colors as _],
254 )?
255 };
256
257 expected_events.extend(store2.insert_row(&row));
258
259 let row = {
260 let num_instances = 6;
261 let colors = vec![MyColor::from(0x00DD00FF); num_instances];
262 DataRow::from_component_batches(
263 RowId::new(),
264 TimePoint::default(),
265 "entity_b".into(),
266 [
267 &MyIndex::from_iter(0..num_instances as _) as _,
268 &colors as _,
269 ],
270 )?
271 };
272
273 expected_events.extend(store1.insert_row(&row));
274
275 expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
276 expected_events.extend(store2.gc(&GarbageCollectionOptions::gc_everything()).0);
277
278 DataStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
279 similar_asserts::assert_eq!(expected_events.len(), got.events.len());
280 similar_asserts::assert_eq!(expected_events, got.events);
281 });
282
283 Ok(())
284 }
285}