1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4};
5
6use ahash::HashMap;
7use nohash_hasher::IntSet;
8use parking_lot::RwLock;
9
10use re_chunk::ChunkId;
11use re_chunk_store::{
12 ChunkCompactionReport, ChunkStoreDiff, ChunkStoreEvent, ChunkStoreHandle, ChunkStoreSubscriber,
13};
14use re_log_types::{AbsoluteTimeRange, EntityPath, StoreId, TimeInt, TimelineName};
15use re_types_core::{ComponentDescriptor, archetypes};
16
17use crate::{LatestAtCache, RangeCache};
18
19#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
23pub struct QueryCacheKey {
24 pub entity_path: EntityPath,
25 pub timeline_name: TimelineName,
26 pub component_descr: ComponentDescriptor,
27}
28
29impl re_byte_size::SizeBytes for QueryCacheKey {
30 #[inline]
31 fn heap_size_bytes(&self) -> u64 {
32 let Self {
33 entity_path,
34 timeline_name: timeline,
35 component_descr,
36 } = self;
37 entity_path.heap_size_bytes()
38 + timeline.heap_size_bytes()
39 + component_descr.heap_size_bytes()
40 }
41}
42
43impl std::fmt::Debug for QueryCacheKey {
44 #[inline]
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 let Self {
47 entity_path,
48 timeline_name: timeline,
49 component_descr,
50 } = self;
51 f.write_fmt(format_args!(
52 "{entity_path}:{component_descr} on '{timeline}'"
53 ))
54 }
55}
56
57impl QueryCacheKey {
58 #[inline]
59 pub fn new(
60 entity_path: impl Into<EntityPath>,
61 timeline: impl Into<TimelineName>,
62 component_descr: ComponentDescriptor,
63 ) -> Self {
64 Self {
65 entity_path: entity_path.into(),
66 timeline_name: timeline.into(),
67 component_descr,
68 }
69 }
70}
71
72#[derive(Clone)]
80pub struct QueryCacheHandle(Arc<parking_lot::RwLock<QueryCache>>);
81
82impl QueryCacheHandle {
83 #[inline]
84 pub fn new(cache: QueryCache) -> Self {
85 Self(Arc::new(parking_lot::RwLock::new(cache)))
86 }
87
88 #[inline]
89 pub fn into_inner(self) -> Arc<parking_lot::RwLock<QueryCache>> {
90 self.0
91 }
92}
93
94impl QueryCacheHandle {
95 #[inline]
96 pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, QueryCache> {
97 self.0.read_recursive()
98 }
99
100 #[inline]
101 pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, QueryCache>> {
102 self.0.try_read_recursive()
103 }
104
105 #[inline]
106 pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, QueryCache> {
107 self.0.write()
108 }
109
110 #[inline]
111 pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, QueryCache>> {
112 self.0.try_write()
113 }
114
115 #[inline]
116 pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache> {
117 parking_lot::RwLock::read_arc_recursive(&self.0)
118 }
119
120 #[inline]
121 pub fn try_read_arc(
122 &self,
123 ) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache>> {
124 parking_lot::RwLock::try_read_recursive_arc(&self.0)
125 }
126
127 #[inline]
128 pub fn write_arc(
129 &self,
130 ) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache> {
131 parking_lot::RwLock::write_arc(&self.0)
132 }
133
134 #[inline]
135 pub fn try_write_arc(
136 &self,
137 ) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache>> {
138 parking_lot::RwLock::try_write_arc(&self.0)
139 }
140}
141
142pub struct QueryCache {
143 pub(crate) store: ChunkStoreHandle,
145
146 pub(crate) store_id: StoreId,
148
149 pub(crate) might_require_clearing: RwLock<IntSet<EntityPath>>,
156
157 pub(crate) latest_at_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<LatestAtCache>>>>,
159
160 pub(crate) range_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<RangeCache>>>>,
162}
163
164impl std::fmt::Debug for QueryCache {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 let Self {
167 store_id,
168 store,
169 might_require_clearing,
170 latest_at_per_cache_key,
171 range_per_cache_key,
172 } = self;
173
174 let mut strings = Vec::new();
175
176 strings.push(format!(
177 "[Entities that must be checked for clears @ {store_id:?}]\n"
178 ));
179 {
180 let sorted: BTreeSet<EntityPath> =
181 might_require_clearing.read().iter().cloned().collect();
182 for entity_path in sorted {
183 strings.push(format!(" * {entity_path}\n"));
184 }
185 strings.push("\n".to_owned());
186 }
187
188 strings.push(format!("[LatestAt @ {store_id:?}]"));
189 {
190 let latest_at_per_cache_key = latest_at_per_cache_key.read();
191 let latest_at_per_cache_key: BTreeMap<_, _> = latest_at_per_cache_key.iter().collect();
192
193 for (cache_key, cache) in &latest_at_per_cache_key {
194 let cache = cache.read();
195 strings.push(format!(
196 " [{cache_key:?} (pending_invalidation_min={:?})]",
197 cache.pending_invalidations.first().map(|&t| {
198 let range = AbsoluteTimeRange::new(t, TimeInt::MAX);
199 if let Some(time_type) =
200 store.read().time_column_type(&cache_key.timeline_name)
201 {
202 time_type.format_range_utc(range)
203 } else {
204 format!("{range:?}")
205 }
206 })
207 ));
208 strings.push(indent::indent_all_by(4, format!("{cache:?}")));
209 }
210 }
211
212 strings.push(format!("[Range @ {store_id:?}]"));
213 {
214 let range_per_cache_key = range_per_cache_key.read();
215 let range_per_cache_key: BTreeMap<_, _> = range_per_cache_key.iter().collect();
216
217 for (cache_key, cache) in &range_per_cache_key {
218 let cache = cache.read();
219 strings.push(format!(
220 " [{cache_key:?} (pending_invalidations={:?})]",
221 cache.pending_invalidations,
222 ));
223 strings.push(indent::indent_all_by(4, format!("{cache:?}")));
224 }
225 }
226
227 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
228 }
229}
230
231impl QueryCache {
232 #[inline]
233 pub fn new(store: ChunkStoreHandle) -> Self {
234 let store_id = store.read().id();
235 Self {
236 store,
237 store_id,
238 might_require_clearing: Default::default(),
239 latest_at_per_cache_key: Default::default(),
240 range_per_cache_key: Default::default(),
241 }
242 }
243
244 #[inline]
245 pub fn new_handle(store: ChunkStoreHandle) -> QueryCacheHandle {
246 QueryCacheHandle::new(Self::new(store))
247 }
248
249 #[inline]
250 pub fn clear(&self) {
251 let Self {
252 store: _,
253 store_id: _,
254 might_require_clearing,
255 latest_at_per_cache_key,
256 range_per_cache_key,
257 } = self;
258
259 might_require_clearing.write().clear();
260 latest_at_per_cache_key.write().clear();
261 range_per_cache_key.write().clear();
262 }
263}
264
265impl ChunkStoreSubscriber for QueryCache {
266 #[inline]
267 fn name(&self) -> String {
268 "rerun.store_subscribers.QueryCache".into()
269 }
270
271 #[inline]
272 fn as_any(&self) -> &dyn std::any::Any {
273 self
274 }
275
276 #[inline]
277 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
278 self
279 }
280
281 fn on_events(&mut self, events: &[ChunkStoreEvent]) {
282 re_tracing::profile_function!(format!("num_events={}", events.len()));
283
284 #[derive(Default, Debug)]
285 struct CompactedEvents {
286 static_: HashMap<(EntityPath, ComponentDescriptor), BTreeSet<ChunkId>>,
287 temporal_latest_at: HashMap<QueryCacheKey, TimeInt>,
288 temporal_range: HashMap<QueryCacheKey, BTreeSet<ChunkId>>,
289 }
290
291 let mut compacted_events = CompactedEvents::default();
292
293 for event in events {
294 let ChunkStoreEvent {
295 store_id,
296 store_generation: _,
297 event_id: _,
298 diff,
299 } = event;
300
301 assert!(
302 self.store_id == *store_id,
303 "attempted to use a query cache {:?} with the wrong datastore ({:?})",
304 self.store_id,
305 store_id,
306 );
307
308 let ChunkStoreDiff {
309 kind: _, chunk,
311 compacted,
312 } = diff;
313
314 {
315 re_tracing::profile_scope!("compact events");
316
317 if chunk.is_static() {
318 for component_descr in chunk.component_descriptors() {
319 let compacted_events = compacted_events
320 .static_
321 .entry((chunk.entity_path().clone(), component_descr))
322 .or_default();
323
324 compacted_events.insert(chunk.id());
325 compacted_events.extend(compacted.iter().flat_map(
327 |ChunkCompactionReport {
328 srcs: compacted_chunks,
329 new_chunk: _,
330 }| compacted_chunks.keys().copied(),
331 ));
332 }
333 }
334
335 for (timeline, per_component) in chunk.time_range_per_component() {
336 for (component_desc, time_range) in per_component {
337 let key = QueryCacheKey::new(
338 chunk.entity_path().clone(),
339 timeline,
340 component_desc,
341 );
342
343 {
345 let mut data_time_min = time_range.min();
346
347 if let Some(ChunkCompactionReport {
349 srcs: compacted_chunks,
350 new_chunk: _,
351 }) = compacted
352 {
353 for chunk in compacted_chunks.values() {
354 let data_time_compacted = chunk
355 .time_range_per_component()
356 .get(&timeline)
357 .and_then(|per_component| {
358 per_component.get(&key.component_descr)
359 })
360 .map_or(TimeInt::MAX, |time_range| time_range.min());
361
362 data_time_min =
363 TimeInt::min(data_time_min, data_time_compacted);
364 }
365 }
366
367 compacted_events
368 .temporal_latest_at
369 .entry(key.clone())
370 .and_modify(|time| *time = TimeInt::min(*time, data_time_min))
371 .or_insert(data_time_min);
372 }
373
374 {
376 let compacted_events =
377 compacted_events.temporal_range.entry(key).or_default();
378
379 compacted_events.insert(chunk.id());
380 compacted_events.extend(compacted.iter().flat_map(
382 |ChunkCompactionReport {
383 srcs: compacted_chunks,
384 new_chunk: _,
385 }| {
386 compacted_chunks.keys().copied()
387 },
388 ));
389 }
390 }
391 }
392 }
393 }
394
395 let mut might_require_clearing = self.might_require_clearing.write();
396 let caches_latest_at = self.latest_at_per_cache_key.write();
397 let caches_range = self.range_per_cache_key.write();
398 {
403 re_tracing::profile_scope!("static");
404
405 for ((entity_path, component_descr), chunk_ids) in compacted_events.static_ {
410 if component_descr == archetypes::Clear::descriptor_is_recursive() {
411 might_require_clearing.insert(entity_path.clone());
412 }
413
414 for (key, cache) in caches_latest_at.iter() {
415 if key.entity_path == entity_path && key.component_descr == component_descr {
416 cache.write().pending_invalidations.insert(TimeInt::STATIC);
417 }
418 }
419
420 for (key, cache) in caches_range.iter() {
421 if key.entity_path == entity_path && key.component_descr == component_descr {
422 cache
423 .write()
424 .pending_invalidations
425 .extend(chunk_ids.iter().copied());
426 }
427 }
428 }
429 }
430
431 {
432 re_tracing::profile_scope!("temporal");
433
434 for (key, time) in compacted_events.temporal_latest_at {
435 if key.component_descr == archetypes::Clear::descriptor_is_recursive() {
436 might_require_clearing.insert(key.entity_path.clone());
437 }
438
439 if let Some(cache) = caches_latest_at.get(&key) {
440 let mut cache = cache.write();
441 cache.pending_invalidations.insert(time);
442 }
443 }
444
445 for (key, chunk_ids) in compacted_events.temporal_range {
446 if let Some(cache) = caches_range.get(&key) {
447 cache
448 .write()
449 .pending_invalidations
450 .extend(chunk_ids.iter().copied());
451 }
452 }
453 }
454 }
455}