1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_log_types::StoreId;
5
6use crate::{ChunkStore, ChunkStoreEvent};
7
8type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
12
13pub trait ChunkStoreSubscriber: std::any::Any + Send + Sync {
20 fn name(&self) -> String;
24
25 fn as_any(&self) -> &dyn std::any::Any;
32
33 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
40
41 fn on_events(&mut self, events: &[ChunkStoreEvent]);
61
62 fn on_drop(&mut self, store_id: &StoreId) {
64 _ = store_id;
65 }
66}
67
68pub trait PerStoreChunkSubscriber: Send + Sync + Default {
70 fn name() -> String;
74
75 fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
80}
81
82static SUBSCRIBERS: std::sync::LazyLock<RwLock<Vec<SharedStoreSubscriber>>> =
84 std::sync::LazyLock::new(|| RwLock::new(Vec::new()));
85
86#[derive(Debug, Clone, Copy)]
87pub struct ChunkStoreSubscriberHandle(u32);
88
89impl ChunkStore {
90 pub fn register_subscriber(
119 subscriber: Box<dyn ChunkStoreSubscriber>,
120 ) -> ChunkStoreSubscriberHandle {
121 let mut subscribers = SUBSCRIBERS.write();
122 subscribers.push(RwLock::new(subscriber));
123 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
124 }
125
126 pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
130 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
131 mut f: F,
132 ) -> Option<T> {
133 let subscribers = SUBSCRIBERS.read();
134 let subscriber = subscribers.get(handle as usize)?;
135 let subscriber = subscriber.read();
136 subscriber.as_any().downcast_ref::<V>().map(&mut f)
137 }
138
139 pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
143 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
144 f: F,
145 ) -> Option<T> {
146 let subscribers = SUBSCRIBERS.read();
147 let subscriber = subscribers.get(handle as usize)?;
148 let subscriber = subscriber.read();
149 subscriber.as_any().downcast_ref::<V>().map(f)
150 }
151
152 pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
156 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
157 mut f: F,
158 ) -> Option<T> {
159 let subscribers = SUBSCRIBERS.read();
160 let subscriber = subscribers.get(handle as usize)?;
161 let mut subscriber = subscriber.write();
162 subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
163 }
164
165 pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>()
168 -> ChunkStoreSubscriberHandle {
169 let mut subscribers = SUBSCRIBERS.write();
170 subscribers.push(RwLock::new(Box::new(
171 PerStoreStoreSubscriberWrapper::<S>::default(),
172 )));
173 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
174 }
175
176 pub fn drop_per_store_subscribers(store_id: &StoreId) {
178 let subscribers = SUBSCRIBERS.read();
179 for subscriber in &*subscribers {
180 let mut subscriber = subscriber.write();
181 subscriber.on_drop(store_id);
182 }
183 }
184
185 pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
189 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
190 store_id: &StoreId,
191 mut f: F,
192 ) -> Option<T> {
193 let subscribers = SUBSCRIBERS.read();
194 let subscriber = subscribers.get(handle as usize)?;
195 let subscriber = subscriber.read();
196 subscriber
197 .as_any()
198 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()?
199 .get(store_id)
200 .map(&mut f)
201 }
202
203 pub fn with_per_store_subscriber_once<
207 S: PerStoreChunkSubscriber + 'static,
208 T,
209 F: FnOnce(&S) -> T,
210 >(
211 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
212 store_id: &StoreId,
213 f: F,
214 ) -> Option<T> {
215 let subscribers = SUBSCRIBERS.read();
216 let subscriber = subscribers.get(handle as usize)?;
217 let subscriber = subscriber.read();
218 subscriber
219 .as_any()
220 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
221 .and_then(|wrapper| wrapper.get(store_id).map(f))
222 }
223
224 pub fn with_per_store_subscriber_mut<
228 S: PerStoreChunkSubscriber + 'static,
229 T,
230 F: FnMut(&mut S) -> T,
231 >(
232 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
233 store_id: &StoreId,
234 mut f: F,
235 ) -> Option<T> {
236 let subscribers = SUBSCRIBERS.read();
237 let subscriber = subscribers.get(handle as usize)?;
238 let mut subscriber = subscriber.write();
239 subscriber
240 .as_any_mut()
241 .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
242 .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
243 }
244
245 pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
247 re_tracing::profile_function!();
248 let subscribers = SUBSCRIBERS.read();
249 for subscriber in subscribers.iter() {
251 subscriber.write().on_events(events);
252 }
253 }
254}
255
256#[derive(Default)]
258struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
259 subscribers: HashMap<StoreId, Box<S>>,
260}
261
262impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
263 fn get(&self, store_id: &StoreId) -> Option<&S> {
264 self.subscribers.get(store_id).map(|s| s.as_ref())
265 }
266
267 fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
268 self.subscribers.get_mut(store_id).map(|s| s.as_mut())
269 }
270}
271
272impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
273 for PerStoreStoreSubscriberWrapper<S>
274{
275 fn name(&self) -> String {
276 S::name()
277 }
278
279 fn as_any(&self) -> &dyn std::any::Any {
280 self
281 }
282
283 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
284 self
285 }
286
287 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
288 for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
289 self.subscribers
290 .entry(store_id)
291 .or_default()
292 .on_events(events);
293 }
294 }
295
296 fn on_drop(&mut self, store_id: &StoreId) {
297 self.subscribers.remove(store_id);
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use std::sync::Arc;
304
305 use ahash::HashSet;
306
307 use re_chunk::{Chunk, RowId};
308 use re_log_types::{
309 StoreId, TimePoint, Timeline,
310 example_components::{MyColor, MyIndex, MyPoint, MyPoints},
311 };
312
313 use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
314
315 use super::*;
316
317 #[derive(Debug)]
319 struct AllEvents {
320 store_ids: HashSet<StoreId>,
321 events: Vec<ChunkStoreEvent>,
322 }
323
324 impl AllEvents {
325 fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
326 Self {
327 store_ids: store_ids.into_iter().collect(),
328 events: Vec::new(),
329 }
330 }
331 }
332
333 impl ChunkStoreSubscriber for AllEvents {
334 fn name(&self) -> String {
335 "rerun.testing.store_subscribers.AllEvents".into()
336 }
337
338 fn as_any(&self) -> &dyn std::any::Any {
339 self
340 }
341
342 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
343 self
344 }
345
346 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
347 self.events.extend(
348 events
349 .iter()
350 .filter(|e| self.store_ids.contains(&e.store_id))
352 .cloned(),
353 );
354 }
355 }
356
357 #[test]
358 fn store_subscriber() -> anyhow::Result<()> {
359 let mut store1 = ChunkStore::new(
360 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
361 Default::default(),
362 );
363 let mut store = ChunkStore::new(
364 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
365 Default::default(),
366 );
367
368 let mut expected_events = Vec::new();
369
370 let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
371 let view_handle = ChunkStore::register_subscriber(Box::new(view));
372
373 let timeline_frame = Timeline::new_sequence("frame");
374 let timeline_other = Timeline::new_duration("other");
375 let timeline_yet_another = Timeline::new_sequence("yet_another");
376
377 let chunk = Chunk::builder("entity_a")
378 .with_component_batch(
379 RowId::new(),
380 TimePoint::from_iter([
381 (timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]),
385 (MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
386 )
387 .build()?;
388
389 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
390
391 let chunk = {
392 let num_instances = 3;
393 let points: Vec<_> = (0..num_instances)
394 .map(|i| MyPoint::new(0.0, i as f32))
395 .collect();
396 let colors = vec![MyColor::from(0xFF0000FF)];
397 Chunk::builder("entity_b")
398 .with_component_batches(
399 RowId::new(),
400 TimePoint::from_iter([
401 (timeline_frame, 42), (timeline_yet_another, 1), ]),
404 [
405 (MyPoints::descriptor_points(), &points as _),
406 (MyPoints::descriptor_colors(), &colors as _),
407 ],
408 )
409 .build()?
410 };
411
412 expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
413
414 let chunk = {
415 let num_instances = 6;
416 let colors = vec![MyColor::from(0x00DD00FF); num_instances];
417 Chunk::builder("entity_b")
418 .with_component_batches(
419 RowId::new(),
420 TimePoint::default(),
421 [
422 (
423 MyIndex::partial_descriptor(),
424 &MyIndex::from_iter(0..num_instances as _) as _,
425 ),
426 (MyPoints::descriptor_colors(), &colors as _),
427 ],
428 )
429 .build()?
430 };
431
432 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
433
434 expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
435 expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
436
437 ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
438 similar_asserts::assert_eq!(expected_events.len(), got.events.len());
439 similar_asserts::assert_eq!(expected_events, got.events);
440 });
441
442 Ok(())
443 }
444}