1use ahash::HashMap;
2use itertools::Itertools as _;
3use parking_lot::RwLock;
4use re_byte_size::{MemUsageNode, MemUsageTree, MemUsageTreeCapture};
5use re_log_types::StoreId;
6
7use crate::{ChunkStore, ChunkStoreEvent};
8
9type SharedStoreSubscriber = RwLock<Box<dyn ChunkStoreSubscriber>>;
13
14pub trait ChunkStoreSubscriber: MemUsageTreeCapture + std::any::Any + Send + Sync {
21 fn name(&self) -> String;
25
26 fn as_any(&self) -> &dyn std::any::Any;
33
34 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
41
42 fn on_events(&mut self, events: &[ChunkStoreEvent]);
62
63 fn on_drop(&mut self, store_id: &StoreId) {
65 _ = store_id;
66 }
67}
68
69pub trait PerStoreChunkSubscriber: MemUsageTreeCapture + Send + Sync + Default {
73 fn name() -> String;
77
78 fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
83}
84
85static SUBSCRIBERS: std::sync::LazyLock<RwLock<Vec<SharedStoreSubscriber>>> =
87 std::sync::LazyLock::new(|| RwLock::new(Vec::new()));
88
89#[derive(Debug, Clone, Copy)]
90pub struct ChunkStoreSubscriberHandle(u32);
91
92impl ChunkStore {
93 pub fn register_subscriber(
122 subscriber: Box<dyn ChunkStoreSubscriber>,
123 ) -> ChunkStoreSubscriberHandle {
124 let mut subscribers = SUBSCRIBERS.write();
125 subscribers.push(RwLock::new(subscriber));
126 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
127 }
128
129 pub fn with_subscriber<V: ChunkStoreSubscriber, T, F: FnMut(&V) -> T>(
133 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
134 mut f: F,
135 ) -> Option<T> {
136 let subscribers = SUBSCRIBERS.read();
137 let subscriber = subscribers.get(handle as usize)?;
138 let subscriber = subscriber.read();
139 subscriber.as_any().downcast_ref::<V>().map(&mut f)
140 }
141
142 pub fn with_subscriber_once<V: ChunkStoreSubscriber, T, F: FnOnce(&V) -> T>(
146 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
147 f: F,
148 ) -> Option<T> {
149 let subscribers = SUBSCRIBERS.read();
150 let subscriber = subscribers.get(handle as usize)?;
151 let subscriber = subscriber.read();
152 subscriber.as_any().downcast_ref::<V>().map(f)
153 }
154
155 pub fn with_subscriber_mut<V: ChunkStoreSubscriber, T, F: FnMut(&mut V) -> T>(
159 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
160 mut f: F,
161 ) -> Option<T> {
162 let subscribers = SUBSCRIBERS.read();
163 let subscriber = subscribers.get(handle as usize)?;
164 let mut subscriber = subscriber.write();
165 subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
166 }
167
168 pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>()
171 -> ChunkStoreSubscriberHandle {
172 let mut subscribers = SUBSCRIBERS.write();
173 subscribers.push(RwLock::new(Box::new(
174 PerStoreStoreSubscriberWrapper::<S>::default(),
175 )));
176 ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
177 }
178
179 pub fn drop_per_store_subscribers(store_id: &StoreId) {
181 let subscribers = SUBSCRIBERS.read();
182 for subscriber in &*subscribers {
183 let mut subscriber = subscriber.write();
184 subscriber.on_drop(store_id);
185 }
186 }
187
188 pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
192 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
193 store_id: &StoreId,
194 mut f: F,
195 ) -> Option<T> {
196 let subscribers = SUBSCRIBERS.read();
197 let subscriber = subscribers.get(handle as usize)?;
198 let subscriber = subscriber.read();
199 subscriber
200 .as_any()
201 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()?
202 .get(store_id)
203 .map(&mut f)
204 }
205
206 pub fn with_per_store_subscriber_once<
210 S: PerStoreChunkSubscriber + 'static,
211 T,
212 F: FnOnce(&S) -> T,
213 >(
214 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
215 store_id: &StoreId,
216 f: F,
217 ) -> Option<T> {
218 let subscribers = SUBSCRIBERS.read();
219 let subscriber = subscribers.get(handle as usize)?;
220 let subscriber = subscriber.read();
221 subscriber
222 .as_any()
223 .downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
224 .and_then(|wrapper| wrapper.get(store_id).map(f))
225 }
226
227 pub fn with_per_store_subscriber_mut<
231 S: PerStoreChunkSubscriber + 'static,
232 T,
233 F: FnMut(&mut S) -> T,
234 >(
235 ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
236 store_id: &StoreId,
237 mut f: F,
238 ) -> Option<T> {
239 let subscribers = SUBSCRIBERS.read();
240 let subscriber = subscribers.get(handle as usize)?;
241 let mut subscriber = subscriber.write();
242 subscriber
243 .as_any_mut()
244 .downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
245 .and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
246 }
247
248 pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
250 re_tracing::profile_function!();
251 let subscribers = SUBSCRIBERS.read();
252 for subscriber in subscribers.iter() {
254 subscriber.write().on_events(events);
255 }
256 }
257
258 pub fn capture_all_subscribers_mem_usage_tree() -> MemUsageTree {
262 re_tracing::profile_function!();
263 let subscribers = SUBSCRIBERS.read();
264 let mut node = MemUsageNode::new();
265 let mut name_counts: HashMap<String, usize> = HashMap::default();
266 for subscriber in subscribers.iter() {
267 let subscriber = subscriber.read();
268 let base_name = subscriber.name();
269 let count = name_counts.entry(base_name.clone()).or_insert(0);
270 let name = if *count == 0 {
271 base_name
272 } else {
273 format!("{base_name}#{count}")
274 };
275 *count += 1;
276 node.add(name, subscriber.capture_mem_usage_tree());
277 }
278 node.into_tree()
279 }
280}
281
282#[derive(Default)]
284struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
285 subscribers: HashMap<StoreId, Box<S>>,
286}
287
288impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
289 fn get(&self, store_id: &StoreId) -> Option<&S> {
290 self.subscribers.get(store_id).map(|s| s.as_ref())
291 }
292
293 fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
294 self.subscribers.get_mut(store_id).map(|s| s.as_mut())
295 }
296}
297
298impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
299 for PerStoreStoreSubscriberWrapper<S>
300{
301 fn name(&self) -> String {
302 S::name()
303 }
304
305 fn as_any(&self) -> &dyn std::any::Any {
306 self
307 }
308
309 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
310 self
311 }
312
313 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
314 for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
315 self.subscribers
316 .entry(store_id)
317 .or_default()
318 .on_events(events);
319 }
320 }
321
322 fn on_drop(&mut self, store_id: &StoreId) {
323 self.subscribers.remove(store_id);
324 }
325}
326
327impl<S: PerStoreChunkSubscriber + 'static> MemUsageTreeCapture
328 for PerStoreStoreSubscriberWrapper<S>
329{
330 fn capture_mem_usage_tree(&self) -> MemUsageTree {
331 let mut node = MemUsageNode::new();
332 for (store_id, subscriber) in &self.subscribers {
333 let name = format!(
334 "{}/{}",
335 store_id.application_id().as_str(),
336 store_id.recording_id().as_str()
337 );
338 node.add(name, subscriber.capture_mem_usage_tree());
339 }
340 node.into_tree()
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::sync::Arc;
347
348 use ahash::HashSet;
349 use re_chunk::{Chunk, RowId};
350 use re_log_types::example_components::{MyColor, MyIndex, MyPoint, MyPoints};
351 use re_log_types::{StoreId, TimePoint, Timeline};
352
353 use super::*;
354 use crate::{ChunkStore, ChunkStoreSubscriber, GarbageCollectionOptions};
355
356 #[derive(Debug)]
358 struct AllEvents {
359 store_ids: HashSet<StoreId>,
360 events: Vec<ChunkStoreEvent>,
361 }
362
363 impl AllEvents {
364 fn new(store_ids: impl IntoIterator<Item = StoreId>) -> Self {
365 Self {
366 store_ids: store_ids.into_iter().collect(),
367 events: Vec::new(),
368 }
369 }
370 }
371
372 impl MemUsageTreeCapture for AllEvents {
373 fn capture_mem_usage_tree(&self) -> MemUsageTree {
374 MemUsageTree::Bytes(0)
375 }
376 }
377
378 impl ChunkStoreSubscriber for AllEvents {
379 fn name(&self) -> String {
380 "rerun.testing.store_subscribers.AllEvents".into()
381 }
382
383 fn as_any(&self) -> &dyn std::any::Any {
384 self
385 }
386
387 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
388 self
389 }
390
391 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
392 self.events.extend(
393 events
394 .iter()
395 .filter(|e| self.store_ids.contains(&e.store_id))
397 .cloned(),
398 );
399 }
400 }
401
402 #[test]
403 fn store_subscriber() -> anyhow::Result<()> {
404 let mut store1 = ChunkStore::new(
405 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
406 Default::default(),
407 );
408 let mut store = ChunkStore::new(
409 re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
410 Default::default(),
411 );
412
413 let mut expected_events = Vec::new();
414
415 let view = AllEvents::new([store1.id().clone(), store.id().clone()]);
416 let view_handle = ChunkStore::register_subscriber(Box::new(view));
417
418 let timeline_frame = Timeline::new_sequence("frame");
419 let timeline_other = Timeline::new_duration("other");
420 let timeline_yet_another = Timeline::new_sequence("yet_another");
421
422 let chunk = Chunk::builder("entity_a")
423 .with_component_batch(
424 RowId::new(),
425 TimePoint::from_iter([
426 (timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]),
430 (MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
431 )
432 .build()?;
433
434 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
435
436 let chunk = {
437 let num_instances = 3;
438 let points: Vec<_> = (0..num_instances)
439 .map(|i| MyPoint::new(0.0, i as f32))
440 .collect();
441 let colors = vec![MyColor::from(0xFF0000FF)];
442 Chunk::builder("entity_b")
443 .with_component_batches(
444 RowId::new(),
445 TimePoint::from_iter([
446 (timeline_frame, 42), (timeline_yet_another, 1), ]),
449 [
450 (MyPoints::descriptor_points(), &points as _),
451 (MyPoints::descriptor_colors(), &colors as _),
452 ],
453 )
454 .build()?
455 };
456
457 expected_events.extend(store.insert_chunk(&Arc::new(chunk))?);
458
459 let chunk = {
460 let num_instances = 6;
461 let colors = vec![MyColor::from(0x00DD00FF); num_instances];
462 Chunk::builder("entity_b")
463 .with_component_batches(
464 RowId::new(),
465 TimePoint::default(),
466 [
467 (
468 MyIndex::partial_descriptor(),
469 &MyIndex::from_iter(0..num_instances as _) as _,
470 ),
471 (MyPoints::descriptor_colors(), &colors as _),
472 ],
473 )
474 .build()?
475 };
476
477 expected_events.extend(store1.insert_chunk(&Arc::new(chunk))?);
478
479 expected_events.extend(store1.gc(&GarbageCollectionOptions::gc_everything()).0);
480 expected_events.extend(store.gc(&GarbageCollectionOptions::gc_everything()).0);
481
482 ChunkStore::with_subscriber::<AllEvents, _, _>(view_handle, |got| {
483 similar_asserts::assert_eq!(expected_events.len(), got.events.len());
484 similar_asserts::assert_eq!(expected_events, got.events);
485 });
486
487 Ok(())
488 }
489}