re_query/
cache.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    sync::Arc,
4};
5
6use ahash::HashMap;
7use nohash_hasher::IntSet;
8use parking_lot::RwLock;
9
10use re_chunk::ChunkId;
11use re_chunk_store::{
12    ChunkCompactionReport, ChunkStoreDiff, ChunkStoreEvent, ChunkStoreHandle, ChunkStoreSubscriber,
13};
14use re_log_types::{AbsoluteTimeRange, EntityPath, StoreId, TimeInt, TimelineName};
15use re_types_core::{ComponentDescriptor, archetypes};
16
17use crate::{LatestAtCache, RangeCache};
18
19// ---
20
21/// Uniquely identifies cached query results in the [`QueryCache`].
22#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
23pub struct QueryCacheKey {
24    pub entity_path: EntityPath,
25    pub timeline_name: TimelineName,
26    pub component_descr: ComponentDescriptor,
27}
28
29impl re_byte_size::SizeBytes for QueryCacheKey {
30    #[inline]
31    fn heap_size_bytes(&self) -> u64 {
32        let Self {
33            entity_path,
34            timeline_name: timeline,
35            component_descr,
36        } = self;
37        entity_path.heap_size_bytes()
38            + timeline.heap_size_bytes()
39            + component_descr.heap_size_bytes()
40    }
41}
42
43impl std::fmt::Debug for QueryCacheKey {
44    #[inline]
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        let Self {
47            entity_path,
48            timeline_name: timeline,
49            component_descr,
50        } = self;
51        f.write_fmt(format_args!(
52            "{entity_path}:{component_descr} on '{timeline}'"
53        ))
54    }
55}
56
57impl QueryCacheKey {
58    #[inline]
59    pub fn new(
60        entity_path: impl Into<EntityPath>,
61        timeline: impl Into<TimelineName>,
62        component_descr: ComponentDescriptor,
63    ) -> Self {
64        Self {
65            entity_path: entity_path.into(),
66            timeline_name: timeline.into(),
67            component_descr,
68        }
69    }
70}
71
72/// A ref-counted, inner-mutable handle to a [`QueryCache`].
73///
74/// Cheap to clone.
75///
76/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
77/// * [`QueryCacheHandle::read_arc`]
78/// * [`QueryCacheHandle::write_arc`]
79#[derive(Clone)]
80pub struct QueryCacheHandle(Arc<parking_lot::RwLock<QueryCache>>);
81
82impl QueryCacheHandle {
83    #[inline]
84    pub fn new(cache: QueryCache) -> Self {
85        Self(Arc::new(parking_lot::RwLock::new(cache)))
86    }
87
88    #[inline]
89    pub fn into_inner(self) -> Arc<parking_lot::RwLock<QueryCache>> {
90        self.0
91    }
92}
93
94impl QueryCacheHandle {
95    #[inline]
96    pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, QueryCache> {
97        self.0.read_recursive()
98    }
99
100    #[inline]
101    pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, QueryCache>> {
102        self.0.try_read_recursive()
103    }
104
105    #[inline]
106    pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, QueryCache> {
107        self.0.write()
108    }
109
110    #[inline]
111    pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, QueryCache>> {
112        self.0.try_write()
113    }
114
115    #[inline]
116    pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache> {
117        parking_lot::RwLock::read_arc_recursive(&self.0)
118    }
119
120    #[inline]
121    pub fn try_read_arc(
122        &self,
123    ) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache>> {
124        parking_lot::RwLock::try_read_recursive_arc(&self.0)
125    }
126
127    #[inline]
128    pub fn write_arc(
129        &self,
130    ) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache> {
131        parking_lot::RwLock::write_arc(&self.0)
132    }
133
134    #[inline]
135    pub fn try_write_arc(
136        &self,
137    ) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache>> {
138        parking_lot::RwLock::try_write_arc(&self.0)
139    }
140}
141
142pub struct QueryCache {
143    /// Handle to the associated [`ChunkStoreHandle`].
144    pub(crate) store: ChunkStoreHandle,
145
146    /// The [`StoreId`] of the associated [`ChunkStoreHandle`].
147    pub(crate) store_id: StoreId,
148
149    /// Keeps track of which entities have had any `Clear`-related data on any timeline at any
150    /// point in time.
151    ///
152    /// This is used to optimized read-time clears, so that we don't unnecessarily pay for the fixed
153    /// overhead of all the query layers when we know for a fact that there won't be any data there.
154    /// This is a huge performance improvement in practice, especially in recordings with many entities.
155    pub(crate) might_require_clearing: RwLock<IntSet<EntityPath>>,
156
157    // NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
158    pub(crate) latest_at_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<LatestAtCache>>>>,
159
160    // NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
161    pub(crate) range_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<RangeCache>>>>,
162}
163
164impl std::fmt::Debug for QueryCache {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        let Self {
167            store_id,
168            store,
169            might_require_clearing,
170            latest_at_per_cache_key,
171            range_per_cache_key,
172        } = self;
173
174        let mut strings = Vec::new();
175
176        strings.push(format!(
177            "[Entities that must be checked for clears @ {store_id:?}]\n"
178        ));
179        {
180            let sorted: BTreeSet<EntityPath> =
181                might_require_clearing.read().iter().cloned().collect();
182            for entity_path in sorted {
183                strings.push(format!("  * {entity_path}\n"));
184            }
185            strings.push("\n".to_owned());
186        }
187
188        strings.push(format!("[LatestAt @ {store_id:?}]"));
189        {
190            let latest_at_per_cache_key = latest_at_per_cache_key.read();
191            let latest_at_per_cache_key: BTreeMap<_, _> = latest_at_per_cache_key.iter().collect();
192
193            for (cache_key, cache) in &latest_at_per_cache_key {
194                let cache = cache.read();
195                strings.push(format!(
196                    "  [{cache_key:?} (pending_invalidation_min={:?})]",
197                    cache.pending_invalidations.first().map(|&t| {
198                        let range = AbsoluteTimeRange::new(t, TimeInt::MAX);
199                        if let Some(time_type) =
200                            store.read().time_column_type(&cache_key.timeline_name)
201                        {
202                            time_type.format_range_utc(range)
203                        } else {
204                            format!("{range:?}")
205                        }
206                    })
207                ));
208                strings.push(indent::indent_all_by(4, format!("{cache:?}")));
209            }
210        }
211
212        strings.push(format!("[Range @ {store_id:?}]"));
213        {
214            let range_per_cache_key = range_per_cache_key.read();
215            let range_per_cache_key: BTreeMap<_, _> = range_per_cache_key.iter().collect();
216
217            for (cache_key, cache) in &range_per_cache_key {
218                let cache = cache.read();
219                strings.push(format!(
220                    "  [{cache_key:?} (pending_invalidations={:?})]",
221                    cache.pending_invalidations,
222                ));
223                strings.push(indent::indent_all_by(4, format!("{cache:?}")));
224            }
225        }
226
227        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
228    }
229}
230
231impl QueryCache {
232    #[inline]
233    pub fn new(store: ChunkStoreHandle) -> Self {
234        let store_id = store.read().id();
235        Self {
236            store,
237            store_id,
238            might_require_clearing: Default::default(),
239            latest_at_per_cache_key: Default::default(),
240            range_per_cache_key: Default::default(),
241        }
242    }
243
244    #[inline]
245    pub fn new_handle(store: ChunkStoreHandle) -> QueryCacheHandle {
246        QueryCacheHandle::new(Self::new(store))
247    }
248
249    #[inline]
250    pub fn clear(&self) {
251        let Self {
252            store: _,
253            store_id: _,
254            might_require_clearing,
255            latest_at_per_cache_key,
256            range_per_cache_key,
257        } = self;
258
259        might_require_clearing.write().clear();
260        latest_at_per_cache_key.write().clear();
261        range_per_cache_key.write().clear();
262    }
263}
264
265impl ChunkStoreSubscriber for QueryCache {
266    #[inline]
267    fn name(&self) -> String {
268        "rerun.store_subscribers.QueryCache".into()
269    }
270
271    #[inline]
272    fn as_any(&self) -> &dyn std::any::Any {
273        self
274    }
275
276    #[inline]
277    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
278        self
279    }
280
281    fn on_events(&mut self, events: &[ChunkStoreEvent]) {
282        re_tracing::profile_function!(format!("num_events={}", events.len()));
283
284        #[derive(Default, Debug)]
285        struct CompactedEvents {
286            static_: HashMap<(EntityPath, ComponentDescriptor), BTreeSet<ChunkId>>,
287            temporal_latest_at: HashMap<QueryCacheKey, TimeInt>,
288            temporal_range: HashMap<QueryCacheKey, BTreeSet<ChunkId>>,
289        }
290
291        let mut compacted_events = CompactedEvents::default();
292
293        for event in events {
294            let ChunkStoreEvent {
295                store_id,
296                store_generation: _,
297                event_id: _,
298                diff,
299            } = event;
300
301            assert!(
302                self.store_id == *store_id,
303                "attempted to use a query cache {:?} with the wrong datastore ({:?})",
304                self.store_id,
305                store_id,
306            );
307
308            let ChunkStoreDiff {
309                kind: _, // Don't care: both additions and deletions invalidate query results.
310                chunk,
311                compacted,
312            } = diff;
313
314            {
315                re_tracing::profile_scope!("compact events");
316
317                if chunk.is_static() {
318                    for component_descr in chunk.component_descriptors() {
319                        let compacted_events = compacted_events
320                            .static_
321                            .entry((chunk.entity_path().clone(), component_descr))
322                            .or_default();
323
324                        compacted_events.insert(chunk.id());
325                        // If a compaction was triggered, make sure to drop the original chunks too.
326                        compacted_events.extend(compacted.iter().flat_map(
327                            |ChunkCompactionReport {
328                                 srcs: compacted_chunks,
329                                 new_chunk: _,
330                             }| compacted_chunks.keys().copied(),
331                        ));
332                    }
333                }
334
335                for (timeline, per_component) in chunk.time_range_per_component() {
336                    for (component_desc, time_range) in per_component {
337                        let key = QueryCacheKey::new(
338                            chunk.entity_path().clone(),
339                            timeline,
340                            component_desc,
341                        );
342
343                        // latest-at
344                        {
345                            let mut data_time_min = time_range.min();
346
347                            // If a compaction was triggered, make sure to drop the original chunks too.
348                            if let Some(ChunkCompactionReport {
349                                srcs: compacted_chunks,
350                                new_chunk: _,
351                            }) = compacted
352                            {
353                                for chunk in compacted_chunks.values() {
354                                    let data_time_compacted = chunk
355                                        .time_range_per_component()
356                                        .get(&timeline)
357                                        .and_then(|per_component| {
358                                            per_component.get(&key.component_descr)
359                                        })
360                                        .map_or(TimeInt::MAX, |time_range| time_range.min());
361
362                                    data_time_min =
363                                        TimeInt::min(data_time_min, data_time_compacted);
364                                }
365                            }
366
367                            compacted_events
368                                .temporal_latest_at
369                                .entry(key.clone())
370                                .and_modify(|time| *time = TimeInt::min(*time, data_time_min))
371                                .or_insert(data_time_min);
372                        }
373
374                        // range
375                        {
376                            let compacted_events =
377                                compacted_events.temporal_range.entry(key).or_default();
378
379                            compacted_events.insert(chunk.id());
380                            // If a compaction was triggered, make sure to drop the original chunks too.
381                            compacted_events.extend(compacted.iter().flat_map(
382                                |ChunkCompactionReport {
383                                     srcs: compacted_chunks,
384                                     new_chunk: _,
385                                 }| {
386                                    compacted_chunks.keys().copied()
387                                },
388                            ));
389                        }
390                    }
391                }
392            }
393        }
394
395        let mut might_require_clearing = self.might_require_clearing.write();
396        let caches_latest_at = self.latest_at_per_cache_key.write();
397        let caches_range = self.range_per_cache_key.write();
398        // NOTE: Don't release the top-level locks -- even though this cannot happen yet with
399        // our current macro-architecture, we want to prevent queries from concurrently
400        // running while we're updating the invalidation flags.
401
402        {
403            re_tracing::profile_scope!("static");
404
405            // TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
406            // yet another layer of caching indirection.
407            // But since this pretty much never happens in practice, let's not go there until we
408            // have metrics showing that show we need to.
409            for ((entity_path, component_descr), chunk_ids) in compacted_events.static_ {
410                if component_descr == archetypes::Clear::descriptor_is_recursive() {
411                    might_require_clearing.insert(entity_path.clone());
412                }
413
414                for (key, cache) in caches_latest_at.iter() {
415                    if key.entity_path == entity_path && key.component_descr == component_descr {
416                        cache.write().pending_invalidations.insert(TimeInt::STATIC);
417                    }
418                }
419
420                for (key, cache) in caches_range.iter() {
421                    if key.entity_path == entity_path && key.component_descr == component_descr {
422                        cache
423                            .write()
424                            .pending_invalidations
425                            .extend(chunk_ids.iter().copied());
426                    }
427                }
428            }
429        }
430
431        {
432            re_tracing::profile_scope!("temporal");
433
434            for (key, time) in compacted_events.temporal_latest_at {
435                if key.component_descr == archetypes::Clear::descriptor_is_recursive() {
436                    might_require_clearing.insert(key.entity_path.clone());
437                }
438
439                if let Some(cache) = caches_latest_at.get(&key) {
440                    let mut cache = cache.write();
441                    cache.pending_invalidations.insert(time);
442                }
443            }
444
445            for (key, chunk_ids) in compacted_events.temporal_range {
446                if let Some(cache) = caches_range.get(&key) {
447                    cache
448                        .write()
449                        .pending_invalidations
450                        .extend(chunk_ids.iter().copied());
451                }
452            }
453        }
454    }
455}