re_query_cache/
cache.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    ops::Range,
4    sync::Arc,
5};
6
7use ahash::{HashMap, HashSet};
8use itertools::Itertools;
9use parking_lot::RwLock;
10use paste::paste;
11use seq_macro::seq;
12
13use re_data_store::{DataStore, LatestAtQuery, RangeQuery, StoreDiff, StoreEvent, StoreSubscriber};
14use re_log_types::{EntityPath, RowId, StoreId, TimeInt, TimeRange, Timeline};
15use re_query::ArchetypeView;
16use re_types_core::{
17    components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
18};
19
20use crate::{ErasedFlatVecDeque, FlatVecDeque, LatestAtCache, RangeCache};
21
22// ---
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub enum AnyQuery {
26    LatestAt(LatestAtQuery),
27    Range(RangeQuery),
28}
29
30impl From<LatestAtQuery> for AnyQuery {
31    #[inline]
32    fn from(query: LatestAtQuery) -> Self {
33        Self::LatestAt(query)
34    }
35}
36
37impl From<RangeQuery> for AnyQuery {
38    #[inline]
39    fn from(query: RangeQuery) -> Self {
40        Self::Range(query)
41    }
42}
43
44// ---
45
46/// Maintains the top-level cache mappings.
47pub struct Caches {
48    /// The [`StoreId`] of the associated [`DataStore`].
49    store_id: StoreId,
50
51    // NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
52    per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>,
53}
54
55impl std::fmt::Debug for Caches {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        let Self {
58            store_id,
59            per_cache_key,
60        } = self;
61
62        let mut strings = Vec::new();
63
64        strings.push(format!("[Caches({store_id})]"));
65
66        let per_cache_key = per_cache_key.read();
67        let per_cache_key: BTreeMap<_, _> = per_cache_key.iter().collect();
68
69        for (cache_key, caches_per_archetype) in &per_cache_key {
70            let caches_per_archetype = caches_per_archetype.read();
71            strings.push(format!(
72                "  [{cache_key:?} (pending_timeful={:?} pending_timeless={:?})]",
73                caches_per_archetype
74                    .pending_timeful_invalidation
75                    .map(|t| cache_key
76                        .timeline
77                        .format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
78                caches_per_archetype.pending_timeless_invalidation,
79            ));
80            strings.push(indent::indent_all_by(
81                4,
82                format!("{caches_per_archetype:?}"),
83            ));
84        }
85
86        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
87    }
88}
89
90impl std::ops::Deref for Caches {
91    type Target = RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>;
92
93    #[inline]
94    fn deref(&self) -> &Self::Target {
95        &self.per_cache_key
96    }
97}
98
99impl Caches {
100    #[inline]
101    pub fn new(store: &DataStore) -> Self {
102        Self {
103            store_id: store.id().clone(),
104            per_cache_key: Default::default(),
105        }
106    }
107}
108
109#[derive(Default)]
110pub struct CachesPerArchetype {
111    /// Which [`Archetype`] are we querying for?
112    ///
113    /// This is very important because of our data model: we not only query for components, but we
114    /// query for components from a specific point-of-view (the so-called primary component).
115    /// Different archetypes have different point-of-views, and therefore can end up with different
116    /// results, even from the same raw data.
117    //
118    // NOTE: `Arc` so we can cheaply free the archetype-level lock early when needed.
119    //
120    // TODO(cmc): At some point we should probably just store the PoV and optional components rather
121    // than an `ArchetypeName`: the query system doesn't care about archetypes.
122    pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,
123
124    /// Which [`Archetype`] are we querying for?
125    ///
126    /// This is very important because of our data model: we not only query for components, but we
127    /// query for components from a specific point-of-view (the so-called primary component).
128    /// Different archetypes have different point-of-views, and therefore can end up with different
129    /// results, even from the same raw data.
130    //
131    // NOTE: `Arc` so we can cheaply free the archetype-level lock early when needed.
132    //
133    // TODO(cmc): At some point we should probably just store the PoV and optional components rather
134    // than an `ArchetypeName`: the query system doesn't care about archetypes.
135    pub(crate) range_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<RangeCache>>>>,
136
137    /// Everything greater than or equal to this timestamp has been asynchronously invalidated.
138    ///
139    /// The next time this cache gets queried, it must remove any entry matching this criteria.
140    /// `None` indicates that there's no pending invalidation.
141    ///
142    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
143    /// time effectively behaves as a natural micro-batching mechanism.
144    pending_timeful_invalidation: Option<TimeInt>,
145
146    /// If `true`, the timeless data associated with this cache has been asynchronously invalidated.
147    ///
148    /// If `true`, this cache must remove all of its timeless entries the next time it gets queried.
149    /// `false` indicates that there's no pending invalidation.
150    ///
151    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
152    /// time effectively behaves as a natural micro-batching mechanism.
153    pending_timeless_invalidation: bool,
154}
155
156impl std::fmt::Debug for CachesPerArchetype {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        let CachesPerArchetype {
159            latest_at_per_archetype,
160            range_per_archetype,
161            pending_timeful_invalidation: _,
162            pending_timeless_invalidation: _,
163        } = self;
164
165        let mut strings = Vec::new();
166
167        {
168            let latest_at_per_archetype = latest_at_per_archetype.read();
169            let latest_at_per_archetype: BTreeMap<_, _> = latest_at_per_archetype.iter().collect();
170
171            for (archetype_name, latest_at_cache) in &latest_at_per_archetype {
172                let latest_at_cache = latest_at_cache.read();
173                strings.push(format!(
174                    "[latest_at for {archetype_name} ({})]",
175                    re_format::format_bytes(latest_at_cache.total_size_bytes() as _)
176                ));
177                strings.push(indent::indent_all_by(2, format!("{latest_at_cache:?}")));
178            }
179        }
180
181        {
182            let range_per_archetype = range_per_archetype.read();
183            let range_per_archetype: BTreeMap<_, _> = range_per_archetype.iter().collect();
184
185            for (archetype_name, range_cache) in &range_per_archetype {
186                let range_cache = range_cache.read();
187                strings.push(format!(
188                    "[range for {archetype_name} ({})]",
189                    re_format::format_bytes(range_cache.total_size_bytes() as _)
190                ));
191                strings.push(indent::indent_all_by(2, format!("{range_cache:?}")));
192            }
193        }
194
195        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
196    }
197}
198
199impl Caches {
200    /// Clears all caches.
201    #[inline]
202    pub fn clear(&self) {
203        self.write().clear();
204    }
205
206    /// Gives access to the appropriate `LatestAtCache` according to the specified
207    /// query parameters.
208    ///
209    /// `upsert` is a user-defined callback that will be run first, with full mutable access to the cache.
210    /// `iter` is a user-defined callback that will be run last, with shared access.
211    ///
212    /// These callback semantics allow for reentrancy: you can use the same cache from multiple
213    /// query contexts (i.e. space views), even in a work-stealing environment.
214    #[inline]
215    pub fn with_latest_at<A, F1, F2, R1, R2>(
216        &self,
217        store: &DataStore,
218        entity_path: EntityPath,
219        query: &LatestAtQuery,
220        mut upsert: F1,
221        mut iter: F2,
222    ) -> (Option<R1>, R2)
223    where
224        A: Archetype,
225        F1: FnMut(&mut LatestAtCache) -> R1,
226        F2: FnMut(&LatestAtCache) -> R2,
227    {
228        assert!(
229            self.store_id == *store.id(),
230            "attempted to use a query cache {} with the wrong datastore ({})",
231            self.store_id,
232            store.id(),
233        );
234
235        let key = CacheKey::new(entity_path, query.timeline);
236
237        let cache = {
238            let caches_per_archetype = Arc::clone(self.write().entry(key.clone()).or_default());
239            // Implicitly releasing top-level cache mappings -- concurrent queries can run once again.
240
241            let removed_bytes = caches_per_archetype.write().handle_pending_invalidation();
242            // Implicitly releasing archetype-level cache mappings -- concurrent queries using the
243            // same `CacheKey` but a different `ArchetypeName` can run once again.
244            if removed_bytes > 0 {
245                re_log::trace!(
246                    store_id=%self.store_id,
247                    entity_path = %key.entity_path,
248                    removed = removed_bytes,
249                    "invalidated latest-at caches"
250                );
251            }
252
253            let caches_per_archetype = caches_per_archetype.read();
254            let mut latest_at_per_archetype = caches_per_archetype.latest_at_per_archetype.write();
255            Arc::clone(latest_at_per_archetype.entry(A::name()).or_default())
256            // Implicitly releasing bottom-level cache mappings -- identical concurrent queries
257            // can run once again.
258        };
259
260        // # Multithreading semantics
261        //
262        // There is only one situation where this `try_write()` might fail: there is another task that
263        // is already in the process of upserting that specific cache (e.g. a cloned space view).
264        //
265        // That task might be on the same thread (due to work-stealing), or a different one.
266        // Either way, we need to give up trying to upsert the cache in order to prevent a
267        // deadlock in case the other task is in fact running on the same thread.
268        //
269        // It's fine, though:
270        // - Best case scenario, the data we need is already cached.
271        // - Worst case scenario, the data is missing and we'll be missing some data for the current
272        //   frame.
273        //   It'll get cached at some point in an upcoming frame (statistically, we're bound to win
274        //   the race at some point).
275        //
276        // Data invalidation happens at the per-archetype cache layer, so this won't return
277        // out-of-date data in either scenario.
278        //
279        // There is a lot of complexity we could add to make this whole process more efficient:
280        // keep track of failed queries in a queue so we don't rely on probabilities, keep track
281        // of the thread-local reentrancy state to skip this logic when it's not needed, return raw
282        // data when the lock is busy and the data isn't already cached, etc.
283        //
284        // In the end, this is a edge-case inherent to our current "immediate query" model that we
285        // already know we want -- and have to -- move away from; the extra complexity isn't worth it.
286        let r1 = cache.try_write().map(|mut cache| upsert(&mut cache));
287        // Implicitly releasing the write lock -- if any.
288
289        // # Multithreading semantics
290        //
291        // We need the reentrant lock because query contexts (i.e. space views) generally run on a
292        // work-stealing thread-pool and might swap a task on one thread with another task on the
293        // same thread, where both tasks happen to query the same exact data (e.g. cloned space views).
294        //
295        // See comment above for more details.
296        let r2 = iter(&cache.read_recursive());
297
298        (r1, r2)
299    }
300
301    /// Gives access to the appropriate `RangeCache` according to the specified query parameters.
302    ///
303    /// `upsert` is a user-defined callback that will be run first, with full mutable access to the cache.
304    /// `iter` is a user-defined callback that will be run last, with shared access.
305    ///
306    /// These callback semantics allow for reentrancy: you can use the same cache from multiple
307    /// query contexts (i.e. space views), even in a work-stealing environment.
308    #[inline]
309    pub fn with_range<A, F1, F2, R1, R2>(
310        &self,
311        store: &DataStore,
312        entity_path: EntityPath,
313        query: &RangeQuery,
314        mut upsert: F1,
315        mut iter: F2,
316    ) -> (Option<R1>, R2)
317    where
318        A: Archetype,
319        F1: FnMut(&mut RangeCache) -> R1,
320        F2: FnMut(&RangeCache) -> R2,
321    {
322        assert!(
323            self.store_id == *store.id(),
324            "attempted to use a query cache {} with the wrong datastore ({})",
325            self.store_id,
326            store.id(),
327        );
328
329        let key = CacheKey::new(entity_path, query.timeline);
330
331        let cache = {
332            let caches_per_archetype = Arc::clone(self.write().entry(key.clone()).or_default());
333            // Implicitly releasing top-level cache mappings -- concurrent queries can run once again.
334
335            let removed_bytes = caches_per_archetype.write().handle_pending_invalidation();
336            // Implicitly releasing archetype-level cache mappings -- concurrent queries using the
337            // same `CacheKey` but a different `ArchetypeName` can run once again.
338            if removed_bytes > 0 {
339                re_log::trace!(
340                    store_id=%self.store_id,
341                    entity_path = %key.entity_path,
342                    removed = removed_bytes,
343                    "invalidated range caches"
344                );
345            }
346
347            let caches_per_archetype = caches_per_archetype.read();
348            let mut range_per_archetype = caches_per_archetype.range_per_archetype.write();
349            Arc::clone(range_per_archetype.entry(A::name()).or_default())
350            // Implicitly releasing bottom-level cache mappings -- identical concurrent queries
351            // can run once again.
352        };
353
354        // # Multithreading semantics
355        //
356        // There is only one situation where this `try_write()` might fail: there is another task that
357        // is already in the process of upserting that specific cache (e.g. a cloned space view).
358        //
359        // That task might be on the same thread (due to work-stealing), or a different one.
360        // Either way, we need to give up trying to upsert the cache in order to prevent a
361        // deadlock in case the other task is in fact running on the same thread.
362        //
363        // It's fine, though:
364        // - Best case scenario, the data we need is already cached.
365        // - Worst case scenario, the data is missing and we'll be missing some data for the current
366        //   frame.
367        //   It'll get cached at some point in an upcoming frame (statistically, we're bound to win
368        //   the race at some point).
369        //
370        // Data invalidation happens at the per-archetype cache layer, so this won't return
371        // out-of-date data in either scenario.
372        //
373        // There is a lot of complexity we could add to make this whole process more efficient:
374        // keep track of failed queries in a queue so we don't rely on probabilities, keep track
375        // of the thread-local reentrancy state to skip this logic when it's not needed, keep track
376        // of min-max timestamp values per entity so we can clamp range queries and thus know
377        // whether the data is already cached or not, etc.
378        //
379        // In the end, this is a edge-case inherent to our current "immediate query" model that we
380        // already know we want -- and have to -- move away from; the extra complexity isn't worth it.
381        let r1 = cache.try_write().map(|mut cache| upsert(&mut cache));
382        // Implicitly releasing the write lock -- if any.
383
384        // # Multithreading semantics
385        //
386        // We need the reentrant lock because query contexts (i.e. space views) generally run on a
387        // work-stealing thread-pool and might swap a task on one thread with another task on the
388        // same thread, where both tasks happen to query the same exact data (e.g. cloned space views).
389        //
390        // See comment above for more details.
391        let r2 = iter(&cache.read_recursive());
392
393        (r1, r2)
394    }
395}
396
397/// Uniquely identifies cached query results in the [`Caches`].
398#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
399pub struct CacheKey {
400    /// Which [`EntityPath`] is the query targeting?
401    pub entity_path: EntityPath,
402
403    /// Which [`Timeline`] is the query targeting?
404    pub timeline: Timeline,
405}
406
407impl std::fmt::Debug for CacheKey {
408    #[inline]
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        let Self {
411            entity_path,
412            timeline,
413        } = self;
414        f.write_fmt(format_args!("{entity_path} on {}", timeline.name()))
415    }
416}
417
418impl CacheKey {
419    #[inline]
420    pub fn new(entity_path: impl Into<EntityPath>, timeline: impl Into<Timeline>) -> Self {
421        Self {
422            entity_path: entity_path.into(),
423            timeline: timeline.into(),
424        }
425    }
426}
427
428// --- Invalidation ---
429
430impl StoreSubscriber for Caches {
431    #[inline]
432    fn name(&self) -> String {
433        "rerun.store_subscribers.QueryCache".into()
434    }
435
436    #[inline]
437    fn as_any(&self) -> &dyn std::any::Any {
438        self
439    }
440
441    #[inline]
442    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
443        self
444    }
445
446    fn on_events(&mut self, events: &[StoreEvent]) {
447        re_tracing::profile_function!(format!("num_events={}", events.len()));
448
449        for event in events {
450            let StoreEvent {
451                store_id,
452                store_generation: _,
453                event_id: _,
454                diff,
455            } = event;
456
457            assert!(
458                self.store_id == *store_id,
459                "attempted to use a query cache {} with the wrong datastore ({})",
460                self.store_id,
461                store_id,
462            );
463
464            let StoreDiff {
465                kind: _, // Don't care: both additions and deletions invalidate query results.
466                row_id: _,
467                times,
468                entity_path,
469                cells: _, // Don't care: we invalidate at the entity level, not component level.
470            } = diff;
471
472            #[derive(Default, Debug)]
473            struct CompactedEvents {
474                timeless: HashSet<EntityPath>,
475                timeful: HashMap<CacheKey, TimeInt>,
476            }
477
478            let mut compacted = CompactedEvents::default();
479            {
480                re_tracing::profile_scope!("compact events");
481
482                if times.is_empty() {
483                    compacted.timeless.insert(entity_path.clone());
484                }
485
486                for &(timeline, time) in times {
487                    let key = CacheKey::new(entity_path.clone(), timeline);
488                    let min_time = compacted.timeful.entry(key).or_insert(TimeInt::MAX);
489                    *min_time = TimeInt::min(*min_time, time);
490                }
491            }
492
493            let caches = self.write();
494            // NOTE: Don't release the top-level lock -- even though this cannot happen yet with
495            // our current macro-architecture, we want to prevent queries from concurrently
496            // running while we're updating the invalidation flags.
497
498            // TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
499            // yet another layer of caching indirection.
500            // But since this pretty much never happens in practice, let's not go there until we
501            // have metrics showing that show we need to.
502            {
503                re_tracing::profile_scope!("timeless");
504
505                for entity_path in compacted.timeless {
506                    for (key, caches_per_archetype) in caches.iter() {
507                        if key.entity_path == entity_path {
508                            caches_per_archetype.write().pending_timeless_invalidation = true;
509                        }
510                    }
511                }
512            }
513
514            {
515                re_tracing::profile_scope!("timeful");
516
517                for (key, time) in compacted.timeful {
518                    if let Some(caches_per_archetype) = caches.get(&key) {
519                        // NOTE: Do _NOT_ lock from within the if clause itself or the guard will live
520                        // for the remainder of the if statement and hell will ensue.
521                        // <https://rust-lang.github.io/rust-clippy/master/#if_let_mutex> is
522                        // supposed to catch that but it doesn't, I don't know why.
523                        let mut caches_per_archetype = caches_per_archetype.write();
524                        if let Some(min_time) =
525                            caches_per_archetype.pending_timeful_invalidation.as_mut()
526                        {
527                            *min_time = TimeInt::min(*min_time, time);
528                        } else {
529                            caches_per_archetype.pending_timeful_invalidation = Some(time);
530                        }
531                    }
532                }
533            }
534        }
535    }
536}
537
538impl CachesPerArchetype {
539    /// Removes all entries from the cache that have been asynchronously invalidated.
540    ///
541    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
542    /// time effectively behaves as a natural micro-batching mechanism.
543    ///
544    /// Returns the number of bytes removed.
545    fn handle_pending_invalidation(&mut self) -> u64 {
546        let pending_timeless_invalidation = self.pending_timeless_invalidation;
547        let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();
548
549        if !pending_timeless_invalidation && !pending_timeful_invalidation {
550            return 0;
551        }
552
553        re_tracing::profile_function!();
554
555        let time_threshold = self.pending_timeful_invalidation.unwrap_or(TimeInt::MAX);
556
557        self.pending_timeful_invalidation = None;
558        self.pending_timeless_invalidation = false;
559
560        // Timeless being infinitely into the past, this effectively invalidates _everything_ with
561        // the current coarse-grained / archetype-level caching strategy.
562        if pending_timeless_invalidation {
563            re_tracing::profile_scope!("timeless");
564
565            let latest_at_removed_bytes = self
566                .latest_at_per_archetype
567                .read()
568                .values()
569                .map(|latest_at_cache| latest_at_cache.read().total_size_bytes())
570                .sum::<u64>();
571            let range_removed_bytes = self
572                .range_per_archetype
573                .read()
574                .values()
575                .map(|range_cache| range_cache.read().total_size_bytes())
576                .sum::<u64>();
577
578            *self = CachesPerArchetype::default();
579
580            return latest_at_removed_bytes + range_removed_bytes;
581        }
582
583        re_tracing::profile_scope!("timeful");
584
585        let mut removed_bytes = 0u64;
586
587        for latest_at_cache in self.latest_at_per_archetype.read().values() {
588            let mut latest_at_cache = latest_at_cache.write();
589            removed_bytes =
590                removed_bytes.saturating_add(latest_at_cache.truncate_at_time(time_threshold));
591        }
592
593        for range_cache in self.range_per_archetype.read().values() {
594            let mut range_cache = range_cache.write();
595            removed_bytes =
596                removed_bytes.saturating_add(range_cache.truncate_at_time(time_threshold));
597        }
598
599        removed_bytes
600    }
601}
602
603// ---
604
605/// Caches the results of any query for an arbitrary range of time.
606///
607/// This caches all the steps involved in getting data ready for space views:
608/// - index search,
609/// - instance key joining,
610/// - deserialization.
611///
612/// We share the `CacheBucket` implementation between all types of queries to avoid duplication of
613/// logic, especially for things that require metaprogramming, to keep the macro madness to a
614/// minimum.
615/// In the case of `LatestAt` queries, a `CacheBucket` will always contain a single timestamp worth
616/// of data.
617#[derive(Default)]
618pub struct CacheBucket {
619    /// The _data_ timestamps and [`RowId`]s of all cached rows.
620    ///
621    /// This corresponds to the data time and `RowId` returned by `re_query::query_archetype`.
622    ///
623    /// This is guaranteed to always be sorted and dense (i.e. there cannot be a hole in the cached
624    /// data, unless the raw data itself in the store has a hole at that particular point in time).
625    ///
626    /// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
627    ///
628    /// Invariant: `data_times.len() == pov_instance_keys.num_entries()`
629    pub(crate) data_times: VecDeque<(TimeInt, RowId)>,
630
631    /// The [`InstanceKey`]s of the point-of-view components.
632    ///
633    /// Invariant: `data_times.len() == pov_instance_keys.num_entries()`
634    pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
635
636    /// The resulting component data, pre-deserialized, pre-joined.
637    ///
638    /// All the contained FlatVecDeques have the same length as `data_times`.
639    //
640    // TODO(#4733): Don't denormalize auto-generated instance keys.
641    // TODO(#4734): Don't denormalize splatted values.
642    pub(crate) components: BTreeMap<ComponentName, Box<dyn ErasedFlatVecDeque + Send + Sync>>,
643
644    /// The total size in bytes stored in this bucket.
645    ///
646    /// Only used so we can decrement the global cache size when the last reference to a bucket
647    /// gets dropped.
648    pub(crate) total_size_bytes: u64,
649    //
650    // TODO(cmc): secondary cache
651}
652
653impl std::fmt::Debug for CacheBucket {
654    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655        let Self {
656            data_times: _,
657            pov_instance_keys: _,
658            components,
659            total_size_bytes: _,
660        } = self;
661
662        let strings = components
663            .iter()
664            .filter(|(_, data)| data.dyn_num_values() > 0)
665            .map(|(name, data)| {
666                format!(
667                    "{} {name} values spread across {} entries ({})",
668                    data.dyn_num_values(),
669                    data.dyn_num_entries(),
670                    re_format::format_bytes(data.dyn_total_size_bytes() as _),
671                )
672            })
673            .collect_vec();
674
675        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
676    }
677}
678
679impl CacheBucket {
680    // Check invariants in debug builds
681    fn sanity_check(&self) {
682        if cfg!(debug_assertions) {
683            assert_eq!(self.data_times.len(), self.pov_instance_keys.num_entries());
684            let n = self.data_times.len();
685            for (name, data) in &self.components {
686                assert_eq!(data.dyn_num_entries(), n, "{name}");
687            }
688        }
689    }
690
691    #[inline]
692    pub fn time_range(&self) -> Option<TimeRange> {
693        let first_time = self.data_times.front().map(|(t, _)| *t)?;
694        let last_time = self.data_times.back().map(|(t, _)| *t)?;
695        Some(TimeRange::new(first_time, last_time))
696    }
697
698    #[inline]
699    pub fn contains_data_time(&self, data_time: TimeInt) -> bool {
700        let first_time = self.data_times.front().map_or(&TimeInt::MAX, |(t, _)| t);
701        let last_time = self.data_times.back().map_or(&TimeInt::MIN, |(t, _)| t);
702        *first_time <= data_time && data_time <= *last_time
703    }
704
705    #[inline]
706    pub fn contains_data_row(&self, data_time: TimeInt, row_id: RowId) -> bool {
707        self.data_times.binary_search(&(data_time, row_id)).is_ok()
708    }
709
710    /// How many timestamps' worth of data is stored in this bucket?
711    #[inline]
712    pub fn num_entries(&self) -> usize {
713        self.data_times.len()
714    }
715
716    #[inline]
717    pub fn is_empty(&self) -> bool {
718        self.num_entries() == 0
719    }
720
721    // ---
722
723    /// Iterate over the timestamps of the point-of-view components.
724    #[inline]
725    pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
726        self.data_times.iter()
727    }
728
729    /// Iterate over the [`InstanceKey`] batches of the point-of-view components.
730    #[inline]
731    pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
732        self.pov_instance_keys.iter()
733    }
734
735    /// Iterate over the batches of the specified non-optional component.
736    #[inline]
737    pub fn iter_component<C: Component + Send + Sync + 'static>(
738        &self,
739    ) -> Option<impl Iterator<Item = &[C]>> {
740        let data = self
741            .components
742            .get(&C::name())
743            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
744        Some(data.iter())
745    }
746
747    /// Iterate over the batches of the specified optional component.
748    #[inline]
749    pub fn iter_component_opt<C: Component + Send + Sync + 'static>(
750        &self,
751    ) -> Option<impl Iterator<Item = &[Option<C>]>> {
752        let data = self
753            .components
754            .get(&C::name())
755            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
756        Some(data.iter())
757    }
758
759    // ---
760
761    /// Returns the index range that corresponds to the specified `time_range`.
762    ///
763    /// Use the returned range with one of the range iteration methods:
764    /// - [`Self::range_data_times`]
765    /// - [`Self::range_pov_instance_keys`]
766    /// - [`Self::range_component`]
767    /// - [`Self::range_component_opt`]
768    ///
769    /// Make sure that the bucket hasn't been modified in-between!
770    ///
771    /// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this
772    /// multiple times.
773    #[inline]
774    pub fn entry_range(&self, time_range: TimeRange) -> Range<usize> {
775        let start_index = self
776            .data_times
777            .partition_point(|(data_time, _)| data_time < &time_range.min);
778        let end_index = self
779            .data_times
780            .partition_point(|(data_time, _)| data_time <= &time_range.max);
781        start_index..end_index
782    }
783
784    /// Range over the timestamps of the point-of-view components.
785    #[inline]
786    pub fn range_data_times(
787        &self,
788        entry_range: Range<usize>,
789    ) -> impl Iterator<Item = &(TimeInt, RowId)> {
790        self.data_times.range(entry_range)
791    }
792
793    /// Range over the [`InstanceKey`] batches of the point-of-view components.
794    #[inline]
795    pub fn range_pov_instance_keys(
796        &self,
797        entry_range: Range<usize>,
798    ) -> impl Iterator<Item = &[InstanceKey]> {
799        self.pov_instance_keys.range(entry_range)
800    }
801
802    /// Get the raw batches for the specified non-optional component.
803    #[inline]
804    pub fn component<C: Component + Send + Sync + 'static>(&self) -> Option<&FlatVecDeque<C>> {
805        self.components
806            .get(&C::name())
807            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())
808    }
809
810    /// Range over the batches of the specified non-optional component.
811    #[inline]
812    pub fn range_component<C: Component + Send + Sync + 'static>(
813        &self,
814        entry_range: Range<usize>,
815    ) -> Option<impl Iterator<Item = &[C]>> {
816        let data = self
817            .components
818            .get(&C::name())
819            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
820        Some(data.range(entry_range))
821    }
822
823    /// Get the raw batches for the specified optional component.
824    #[inline]
825    pub fn component_opt<C: Component + Send + Sync + 'static>(
826        &self,
827    ) -> Option<&FlatVecDeque<Option<C>>> {
828        self.components
829            .get(&C::name())
830            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())
831    }
832
833    /// Range over the batches of the specified optional component.
834    #[inline]
835    pub fn range_component_opt<C: Component + Send + Sync + 'static>(
836        &self,
837        entry_range: Range<usize>,
838    ) -> Option<impl Iterator<Item = &[Option<C>]>> {
839        let data = self
840            .components
841            .get(&C::name())
842            .and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
843        Some(data.range(entry_range))
844    }
845
846    /// Removes everything from the bucket that corresponds to a time equal or greater than the
847    /// specified `threshold`.
848    ///
849    /// Returns the number of bytes removed.
850    #[inline]
851    pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
852        self.sanity_check();
853
854        let Self {
855            data_times,
856            pov_instance_keys,
857            components,
858            total_size_bytes,
859        } = self;
860
861        let mut removed_bytes = 0u64;
862
863        let threshold_idx = data_times.partition_point(|(data_time, _)| data_time < &threshold);
864
865        {
866            let total_size_bytes_before = data_times.total_size_bytes();
867            data_times.truncate(threshold_idx);
868            removed_bytes += total_size_bytes_before - data_times.total_size_bytes();
869        }
870
871        {
872            let total_size_bytes_before = pov_instance_keys.total_size_bytes();
873            pov_instance_keys.truncate(threshold_idx);
874            removed_bytes += total_size_bytes_before - pov_instance_keys.total_size_bytes();
875        }
876
877        for data in components.values_mut() {
878            let total_size_bytes_before = data.dyn_total_size_bytes();
879            data.dyn_truncate(threshold_idx);
880            removed_bytes += total_size_bytes_before - data.dyn_total_size_bytes();
881        }
882
883        *total_size_bytes = total_size_bytes
884            .checked_sub(removed_bytes)
885            .unwrap_or_else(|| {
886                re_log::debug!(
887                    current = *total_size_bytes,
888                    removed = removed_bytes,
889                    "book keeping underflowed"
890                );
891                u64::MIN
892            });
893
894        self.sanity_check();
895
896        removed_bytes
897    }
898}
899
900macro_rules! impl_insert {
901    (for N=$N:expr, M=$M:expr => povs=[$($pov:ident)+] comps=[$($comp:ident)*]) => { paste! {
902        #[doc = "Inserts the contents of the given [`ArchetypeView`], which are made of the specified"]
903        #[doc = "`" $N "` point-of-view components and `" $M "` optional components, to the cache."]
904        #[doc = ""]
905        #[doc = "Returns the size in bytes of the data that was cached."]
906        #[doc = ""]
907        #[doc = "`query_time` must be the time of query, _not_ of the resulting data."]
908        pub fn [<insert_pov$N _comp$M>]<A, $($pov,)+ $($comp),*>(
909            &mut self,
910            query_time: TimeInt,
911            arch_view: &ArchetypeView<A>,
912        ) -> ::re_query::Result<u64>
913        where
914            A: Archetype,
915            $($pov: Component + Send + Sync + 'static,)+
916            $($comp: Component + Send + Sync + 'static,)*
917        {
918            // NOTE: not `profile_function!` because we want them merged together.
919            re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));
920
921            self.sanity_check();
922
923            let pov_row_id = arch_view.primary_row_id();
924            let index = self.data_times.partition_point(|t| t < &(query_time, pov_row_id));
925
926            let mut added_size_bytes = 0u64;
927
928            self.data_times.insert(index, (query_time, pov_row_id));
929            added_size_bytes += (query_time, pov_row_id).total_size_bytes();
930
931            {
932                // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
933                // instead, that way we can efficiently compute its size while we're at it.
934                let added: FlatVecDeque<InstanceKey> = arch_view
935                    .iter_instance_keys()
936                    .collect::<VecDeque<InstanceKey>>()
937                    .into();
938                added_size_bytes += added.total_size_bytes();
939                self.pov_instance_keys.insert_deque(index, added);
940            }
941
942            $(added_size_bytes += self.insert_component::<A, $pov>(index, arch_view)?;)+
943            $(added_size_bytes += self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
944
945            self.sanity_check();
946
947            self.total_size_bytes += added_size_bytes;
948
949            Ok(added_size_bytes)
950        } }
951    };
952
953    // TODO(cmc): Supporting N>1 generically is quite painful due to limitations in declarative macros,
954    // not that we care at the moment.
955    (for N=1, M=$M:expr) => {
956        seq!(COMP in 1..=$M {
957            impl_insert!(for N=1, M=$M => povs=[R1] comps=[#(C~COMP)*]);
958        });
959    };
960}
961
962impl CacheBucket {
963    /// Alias for [`Self::insert_pov1_comp0`].
964    #[inline]
965    #[allow(dead_code)]
966    fn insert_pov1<A, R1>(
967        &mut self,
968        query_time: TimeInt,
969        arch_view: &ArchetypeView<A>,
970    ) -> ::re_query::Result<u64>
971    where
972        A: Archetype,
973        R1: Component + Send + Sync + 'static,
974    {
975        self.insert_pov1_comp0::<A, R1>(query_time, arch_view)
976    }
977
978    seq!(NUM_COMP in 0..10 {
979        impl_insert!(for N=1, M=NUM_COMP);
980    });
981
982    #[inline]
983    fn insert_component<A: Archetype, C: Component + Send + Sync + 'static>(
984        &mut self,
985        at: usize,
986        arch_view: &ArchetypeView<A>,
987    ) -> re_query::Result<u64> {
988        re_tracing::profile_function!(C::name());
989        // no sanity checks here - we are called while in an invariant-breaking state!
990
991        let num_entries = self.data_times.len();
992
993        let data = self.components.entry(C::name()).or_insert_with(|| {
994            Box::new(FlatVecDeque::<C>::from_vecs(
995                std::iter::repeat(vec![]).take(
996                    num_entries
997                        .checked_sub(1)
998                        .expect("We should have been called AFTER inserting to data_times"),
999                ),
1000            ))
1001        });
1002
1003        debug_assert!(at <= data.dyn_num_entries());
1004
1005        // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
1006        // instead, that way we can efficiently compute its size while we're at it.
1007        let added: FlatVecDeque<C> = arch_view
1008            .iter_required_component::<C>()?
1009            .collect::<VecDeque<C>>()
1010            .into();
1011        let added_size_bytes = added.total_size_bytes();
1012
1013        // NOTE: downcast cannot fail, we create it just above.
1014        let data = data.as_any_mut().downcast_mut::<FlatVecDeque<C>>().unwrap();
1015        data.insert_deque(at, added);
1016
1017        Ok(added_size_bytes)
1018    }
1019
1020    /// This will insert an empty slice for a missing component (instead of N `None` values).
1021    #[inline]
1022    fn insert_component_opt<A: Archetype, C: Component + Send + Sync + 'static>(
1023        &mut self,
1024        at: usize,
1025        arch_view: &ArchetypeView<A>,
1026    ) -> re_query::Result<u64> {
1027        re_tracing::profile_function!(C::name());
1028        // no sanity checks here - we are called while in an invariant-breaking state!
1029
1030        let num_entries = self.num_entries();
1031
1032        let data = self.components.entry(C::name()).or_insert_with(|| {
1033            Box::new(FlatVecDeque::<Option<C>>::from_vecs(
1034                std::iter::repeat(vec![]).take(
1035                    num_entries
1036                        .checked_sub(1)
1037                        .expect("We should have been called AFTER inserting to data_times"),
1038                ),
1039            ))
1040        });
1041
1042        debug_assert!(at <= data.dyn_num_entries());
1043
1044        let added: FlatVecDeque<Option<C>> = if arch_view.has_component::<C>() {
1045            // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
1046            // instead, that way we can efficiently computes its size while we're at it.
1047            arch_view
1048                .iter_optional_component::<C>()?
1049                .collect::<VecDeque<Option<C>>>()
1050                .into()
1051        } else {
1052            // If an optional component is missing entirely, we just store an empty slice in its
1053            // stead, rather than a bunch of `None` values.
1054            let mut added = FlatVecDeque::<Option<C>>::new();
1055            added.push_back(std::iter::empty());
1056            added
1057        };
1058        let added_size_bytes = added.total_size_bytes();
1059
1060        // NOTE: downcast cannot fail, we create it just above.
1061        let data = data
1062            .as_any_mut()
1063            .downcast_mut::<FlatVecDeque<Option<C>>>()
1064            .unwrap();
1065        data.insert_deque(at, added);
1066
1067        Ok(added_size_bytes)
1068    }
1069}