1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_log_types::StoreId;
5
6use crate::{ChunkStore, ChunkStoreEvent};
7
8type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
12
13pub trait ChunkStoreSubscriber: std::any::Any + Send + Sync {
20 fn name(&self) -> String;
24
25 fn as_any(&self) -> &dyn std::any::Any;
32
33 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
40
41 fn on_events(&mut self, events: &[ChunkStoreEvent]);
61
62 fn on_drop(&mut self, store_id: &StoreId) {
64 _ = store_id;
65 }
66}
67
68pub trait PerStoreChunkSubscriber: Send + Sync + Default {
70 fn name() -> String;
74
75 fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
80}
81
82static SUBSCRIBERS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreSubscriber>>> =
84 once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
85
86#[derive(Debug, Clone, Copy)]
87pub struct ChunkStoreSubscriberHandle(u32);
88
89impl ChunkStore {
90 pub fn register_subscriber(
119 subscriber: Box<dyn ChunkStoreSubscriber>,
120 ) -> ChunkStoreSubscriberHandle {
121 let mut subscribers = SUBSCRIBERS.write();
122 subscribers.push(RwLock::new(subscriber));
123 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
124 }
125
126 pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
130 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
131 mut f: F,
132 ) -> Option<T> {
133 let subscribers = SUBSCRIBERS.read();
134 subscribers.get(handle as usize).and_then(|subscriber| {
135 let subscriber = subscriber.read();
136 subscriber.as_any().downcast_ref::<V>().map(&mut f)
137 })
138 }
139
140 pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
144 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
145 f: F,
146 ) -> Option<T> {
147 let subscribers = SUBSCRIBERS.read();
148 subscribers.get(handle as usize).and_then(|subscriber| {
149 let subscriber = subscriber.read();
150 subscriber.as_any().downcast_ref::<V>().map(f)
151 })
152 }
153
154 pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
158 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
159 mut f: F,
160 ) -> Option<T> {
161 let subscribers = SUBSCRIBERS.read();
162 subscribers.get(handle as usize).and_then(|subscriber| {
163 let mut subscriber = subscriber.write();
164 subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
165 })
166 }
167
168 pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>(
171 ) -> ChunkStoreSubscriberHandle {
172 let mut subscribers = SUBSCRIBERS.write();
173 subscribers.push(RwLock::new(Box::new(
174 PerStoreStoreSubscriberWrapper::<S>::default(),
175 )));
176 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
177 }
178
179 pub fn drop_per_store_subscribers(store_id: &StoreId) {
181 let subscribers = SUBSCRIBERS.read();
182 for subscriber in &*subscribers {
183 let mut subscriber = subscriber.write();
184 subscriber.on_drop(store_id);
185 }
186 }
187
188 pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
192 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
193 store_id: &StoreId,
194 mut f: F,
195 ) -> Option<T> {
196 let subscribers = SUBSCRIBERS.read();
197 subscribers.get(handle as usize).and_then(|subscriber| {
198 let subscriber = subscriber.read();
199 subscriber
200 .as_any()
201 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
202 .and_then(|wrapper| wrapper.get(store_id).map(&mut f))
203 })
204 }
205
206 pub fn with_per_store_subscriber_once<
210 S: PerStoreChunkSubscriber + 'static,
211 T,
212 F: FnOnce(&S) -> T,
213 >(
214 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
215 store_id: &StoreId,
216 f: F,
217 ) -> Option<T> {
218 let subscribers = SUBSCRIBERS.read();
219 subscribers.get(handle as usize).and_then(|subscriber| {
220 let subscriber = subscriber.read();
221 subscriber
222 .as_any()
223 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
224 .and_then(|wrapper| wrapper.get(store_id).map(f))
225 })
226 }
227
228 pub fn with_per_store_subscriber_mut<
232 S: PerStoreChunkSubscriber + 'static,
233 T,
234 F: FnMut(&mut S) -> T,
235 >(
236 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
237 store_id: &StoreId,
238 mut f: F,
239 ) -> Option<T> {
240 let subscribers = SUBSCRIBERS.read();
241 subscribers.get(handle as usize).and_then(|subscriber| {
242 let mut subscriber = subscriber.write();
243 subscriber
244 .as_any_mut()
245 .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
246 .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
247 })
248 }
249
250 pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
252 re_tracing::profile_function!();
253 let subscribers = SUBSCRIBERS.read();
254 for subscriber in subscribers.iter() {
256 subscriber.write().on_events(events);
257 }
258 }
259}
260
261#[derive(Default)]
263struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
264 subscribers: HashMap<StoreId, Box<S>>,
265}
266
267impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
268 fn get(&self, store_id: &StoreId) -> Option<&S> {
269 self.subscribers.get(store_id).map(|s| s.as_ref())
270 }
271
272 fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
273 self.subscribers.get_mut(store_id).map(|s| s.as_mut())
274 }
275}
276
277impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
278 for PerStoreStoreSubscriberWrapper<S>
279{
280 fn name(&self) -> String {
281 S::name()
282 }
283
284 fn as_any(&self) -> &dyn std::any::Any {
285 self
286 }
287
288 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
289 self
290 }
291
292 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
293 for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
294 self.subscribers
295 .entry(store_id)
296 .or_default()
297 .on_events(events);
298 }
299 }
300
301 fn on_drop(&mut self, store_id: &StoreId) {
302 self.subscribers.remove(store_id);
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use std::sync::Arc;
309
310 use ahash::HashSet;
311
312 use re_chunk::{Chunk, RowId};
313 use re_log_types::{
314 example_components::{MyColor, MyIndex, MyPoint},
315 StoreId, TimePoint, Timeline,
316 };
317
318 use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
319
320 use super::*;
321
322 #[derive(Debug)]
324 struct AllEvents {
325 store_ids: HashSet<StoreId>,
326 events: Vec<ChunkStoreEvent>,
327 }
328
329 impl AllEvents {
330 fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
331 Self {
332 store_ids: store_ids.into_iter().collect(),
333 events: Vec::new(),
334 }
335 }
336 }
337
338 impl ChunkStoreSubscriber for AllEvents {
339 fn name(&self) -> String {
340 "rerun.testing.store_subscribers.AllEvents".into()
341 }
342
343 fn as_any(&self) -> &dyn std::any::Any {
344 self
345 }
346
347 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
348 self
349 }
350
351 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
352 self.events.extend(
353 events
354 .iter()
355 .filter(|e| self.store_ids.contains(&e.store_id))
357 .cloned(),
358 );
359 }
360 }
361
362 #[test]
363 fn store_subscriber() -> anyhow::Result<()> {
364 let mut store1 = ChunkStore::new(
365 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
366 Default::default(),
367 );
368 let mut store = ChunkStore::new(
369 re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
370 Default::default(),
371 );
372
373 let mut expected_events = Vec::new();
374
375 let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
376 let view_handle = ChunkStore::register_subscriber(Box::new(view));
377
378 let timeline_frame = Timeline::new_sequence("frame");
379 let timeline_other = Timeline::new_temporal("other");
380 let timeline_yet_another = Timeline::new_sequence("yet_another");
381
382 let chunk = Chunk::builder("entity_a".into())
383 .with_component_batch(
384 RowId::new(),
385 TimePoint::from_iter([
386 (timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]),
390 &MyIndex::from_iter(0..10),
391 )
392 .build()?;
393
394 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
395
396 let chunk = {
397 let num_instances = 3;
398 let points: Vec<_> = (0..num_instances)
399 .map(|i| MyPoint::new(0.0, i as f32))
400 .collect();
401 let colors = vec![MyColor::from(0xFF0000FF)];
402 Chunk::builder("entity_b".into())
403 .with_component_batches(
404 RowId::new(),
405 TimePoint::from_iter([
406 (timeline_frame, 42), (timeline_yet_another, 1), ]),
409 [&points as _, &colors as _],
410 )
411 .build()?
412 };
413
414 expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
415
416 let chunk = {
417 let num_instances = 6;
418 let colors = vec![MyColor::from(0x00DD00FF); num_instances];
419 Chunk::builder("entity_b".into())
420 .with_component_batches(
421 RowId::new(),
422 TimePoint::default(),
423 [
424 &MyIndex::from_iter(0..num_instances as _) as _,
425 &colors as _,
426 ],
427 )
428 .build()?
429 };
430
431 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
432
433 expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
434 expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
435
436 ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
437 similar_asserts::assert_eq!(expected_events.len(), got.events.len());
438 similar_asserts::assert_eq!(expected_events, got.events);
439 });
440
441 Ok(())
442 }
443}