re_query_cache/
latest_at.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use ahash::HashMap;
4use paste::paste;
5use seq_macro::seq;
6
7use re_data_store::{DataStore, LatestAtQuery, TimeInt};
8use re_log_types::{EntityPath, RowId, Timeline};
9use re_query::query_archetype;
10use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};
11
12use crate::{CacheBucket, Caches};
13
14// --- Data structures ---
15
16/// Caches the results of `LatestAt` queries.
17#[derive(Default)]
18pub struct LatestAtCache {
19    /// Organized by _query_ time.
20    ///
21    /// If the data you're looking for isn't in here, try partially running the query and check
22    /// if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
23    //
24    // NOTE: `Arc` so we can deduplicate buckets across query time & data time.
25    pub per_query_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
26
27    /// Organized by _data_ time.
28    ///
29    /// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
30    /// can result in a data time of `T`.
31    //
32    // NOTE: `Arc` so we can deduplicate buckets across query time & data time.
33    pub per_data_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
34
35    /// Dedicated bucket for timeless data, if any.
36    ///
37    /// Query time and data time are one and the same in the timeless case, therefore we only need
38    /// this one bucket.
39    //
40    // NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
41    // timeful case.
42    pub timeless: Option<Arc<CacheBucket>>,
43
44    /// For debugging purposes.
45    pub(crate) timeline: Timeline,
46
47    /// Total size of the data stored in this cache in bytes.
48    total_size_bytes: u64,
49}
50
51impl std::fmt::Debug for LatestAtCache {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        let Self {
54            per_query_time,
55            per_data_time,
56            timeless,
57            timeline,
58            total_size_bytes: _,
59        } = self;
60
61        let mut strings = Vec::new();
62
63        if let Some(bucket) = timeless.as_ref() {
64            strings.push(format!(
65                "query_time=<timeless> -> data_time=<timeless> ({})",
66                re_format::format_bytes(bucket.total_size_bytes as _),
67            ));
68        }
69
70        let data_times_per_bucket: HashMap<_, _> = per_data_time
71            .iter()
72            .map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
73            .collect();
74
75        for (query_time, bucket) in per_query_time {
76            let query_time = timeline.typ().format_utc(*query_time);
77            let data_time = data_times_per_bucket
78                .get(&Arc::as_ptr(bucket))
79                .map_or_else(|| "MISSING?!".to_owned(), |t| timeline.typ().format_utc(*t));
80            strings.push(format!(
81                "query_time={query_time} -> data_time={data_time} ({})",
82                re_format::format_bytes(bucket.total_size_bytes as _),
83            ));
84            strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
85        }
86
87        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
88    }
89}
90
91impl SizeBytes for LatestAtCache {
92    #[inline]
93    fn heap_size_bytes(&self) -> u64 {
94        self.total_size_bytes
95    }
96}
97
98impl LatestAtCache {
99    /// Removes everything from the cache that corresponds to a time equal or greater than the
100    /// specified `threshold`.
101    ///
102    /// Reminder: invalidating timeless data is the same as invalidating everything, so just reset
103    /// the `LatestAtCache` entirely in that case.
104    ///
105    /// Returns the number of bytes removed.
106    #[inline]
107    pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
108        let Self {
109            per_query_time,
110            per_data_time,
111            timeless: _,
112            timeline: _,
113            total_size_bytes,
114        } = self;
115
116        let mut removed_bytes = 0u64;
117
118        per_query_time.retain(|&query_time, _| query_time < threshold);
119
120        // Buckets for latest-at queries are guaranteed to only ever contain a single entry, so
121        // just remove the buckets entirely directly.
122        per_data_time.retain(|&data_time, bucket| {
123            if data_time < threshold {
124                return true;
125            }
126
127            // Only if that bucket is about to be dropped.
128            if Arc::strong_count(bucket) == 1 {
129                removed_bytes += bucket.total_size_bytes;
130            }
131
132            false
133        });
134
135        *total_size_bytes = total_size_bytes
136            .checked_sub(removed_bytes)
137            .unwrap_or_else(|| {
138                re_log::debug!(
139                    current = *total_size_bytes,
140                    removed = removed_bytes,
141                    "book keeping underflowed"
142                );
143                u64::MIN
144            });
145
146        removed_bytes
147    }
148}
149
150// --- Queries ---
151
152macro_rules! impl_query_archetype_latest_at {
153    (for N=$N:expr, M=$M:expr => povs=[$($pov:ident)+] comps=[$($comp:ident)*]) => { paste! {
154        #[doc = "Cached implementation of [`re_query::query_archetype`] and [`re_query::range_archetype`]"]
155        #[doc = "(combined) for `" $N "` point-of-view components and `" $M "` optional components."]
156        #[allow(non_snake_case)]
157        pub fn [<query_archetype_latest_at_pov$N _comp$M>]<'a, A, $($pov,)+ $($comp,)* F>(
158            &self,
159            store: &'a DataStore,
160            query: &LatestAtQuery,
161            entity_path: &'a EntityPath,
162            mut f: F,
163        ) -> ::re_query::Result<()>
164        where
165            A: Archetype + 'a,
166            $($pov: Component + Send + Sync + 'static,)+
167            $($comp: Component + Send + Sync + 'static,)*
168            F: FnMut(
169                (
170                    (Option<TimeInt>, RowId),
171                    &[InstanceKey],
172                    $(&[$pov],)+
173                    $(Option<&[Option<$comp>]>,)*
174                ),
175            ),
176        {
177            let iter_results = |timeless: bool, bucket: &crate::CacheBucket, f: &mut F| -> crate::Result<()> {
178                // Profiling this in isolation can be useful, but adds a lot of noise for small queries.
179                //re_tracing::profile_scope!("iter");
180
181                let it = itertools::izip!(
182                    bucket.iter_data_times(),
183                    bucket.iter_pov_instance_keys(),
184                    $(bucket.iter_component::<$pov>()
185                        .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
186                    $(bucket.iter_component_opt::<$comp>()
187                        .map_or_else(
188                            || itertools::Either::Left(std::iter::repeat(&[] as &[Option<$comp>])),
189                            |it| itertools::Either::Right(it)),
190                    )*
191                ).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
192                    (
193                        ((!timeless).then_some(*time), *row_id),
194                        instance_keys,
195                        $($pov,)+
196                        $((!$comp.is_empty()).then_some($comp),)*
197                    )
198                });
199
200                for data in it {
201                    f(data);
202                }
203
204                Ok(())
205            };
206
207            let create_and_fill_bucket = |
208                data_time: TimeInt,
209                arch_view: &::re_query::ArchetypeView<A>,
210            | -> crate::Result<crate::CacheBucket> {
211                re_log::trace!(data_time=?data_time, ?data_time, "fill");
212
213                // Grabbing the current time is quite costly on web.
214                #[cfg(not(target_arch = "wasm32"))]
215                let now = web_time::Instant::now();
216
217                let mut bucket = crate::CacheBucket::default();
218                bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(data_time, &arch_view)?;
219
220                #[cfg(not(target_arch = "wasm32"))]
221                {
222                    let elapsed = now.elapsed();
223                    ::re_log::trace!(
224                        store_id=%store.id(),
225                        %entity_path,
226                        archetype=%A::name(),
227                        added_size_bytes=bucket.total_size_bytes,
228                        "cached new entry in {elapsed:?} ({:0.3} entries/s)",
229                        1f64 / elapsed.as_secs_f64()
230                    );
231                }
232
233                Ok(bucket)
234            };
235
236            let upsert_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| -> crate::Result<()> {
237                re_tracing::profile_scope!("latest_at", format!("{query:?}"));
238
239                let crate::LatestAtCache {
240                    per_query_time,
241                    per_data_time,
242                    timeless,
243                    timeline: _,
244                    total_size_bytes,
245                } = latest_at_cache;
246
247                let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
248                    std::collections::btree_map::Entry::Occupied(_) => {
249                        // Fastest path: we have an entry for this exact query time, no need to look any
250                        // further.
251                        re_log::trace!(query_time=?query.at, "cache hit (query time)");
252                        return Ok(());
253                    }
254                    std::collections::btree_map::Entry::Vacant(entry) => entry,
255                };
256
257                let arch_view = query_archetype::<A>(store, &query, entity_path)?;
258                let data_time = arch_view.data_time();
259
260                // Fast path: we've run the query and realized that we already have the data for the resulting
261                // _data_ time, so let's use that to avoid join & deserialization costs.
262                if let Some(data_time) = data_time { // Reminder: `None` means timeless.
263                    if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
264                        re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)");
265
266                        query_time_bucket_at_query_time.insert(Arc::clone(&data_time_bucket_at_data_time));
267
268                        // We now know for a fact that a query at that data time would yield the same
269                        // results: copy the bucket accordingly so that the next cache hit for that query
270                        // time ends up taking the fastest path.
271                        let query_time_bucket_at_data_time = per_query_time.entry(data_time);
272                        query_time_bucket_at_data_time
273                            .and_modify(|v| *v = Arc::clone(&data_time_bucket_at_data_time))
274                            .or_insert(Arc::clone(&data_time_bucket_at_data_time));
275
276                        return Ok(());
277                    }
278                } else {
279                    if let Some(timeless) = timeless.as_ref() {
280                        re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)");
281                        query_time_bucket_at_query_time.insert(Arc::clone(timeless));
282                        return Ok(());
283                    }
284                }
285
286                // Slowest path: this is a complete cache miss.
287                if let Some(data_time) = data_time { // Reminder: `None` means timeless.
288                    re_log::trace!(query_time=?query.at, ?data_time, "cache miss");
289
290                    let bucket = Arc::new(create_and_fill_bucket(data_time, &arch_view)?);
291                    *total_size_bytes += bucket.total_size_bytes;
292                    let query_time_bucket_at_query_time = query_time_bucket_at_query_time.insert(bucket);
293
294                    let data_time_bucket_at_data_time = per_data_time.entry(data_time);
295                    data_time_bucket_at_data_time
296                        .and_modify(|v| *v = Arc::clone(&query_time_bucket_at_query_time))
297                        .or_insert(Arc::clone(&query_time_bucket_at_query_time));
298
299                    Ok(())
300                } else {
301                    re_log::trace!(query_time=?query.at, "cache miss (timeless)");
302
303                    let bucket = create_and_fill_bucket(TimeInt::MIN, &arch_view)?;
304                    *total_size_bytes += bucket.total_size_bytes;
305
306                    let bucket = Arc::new(bucket);
307                    *timeless = Some(Arc::clone(&bucket));
308                    query_time_bucket_at_query_time.insert(Arc::clone(&bucket));
309
310                    Ok(())
311                }
312            };
313
314            let iter_callback = |query: &LatestAtQuery, latest_at_cache: &crate::LatestAtCache, f: &mut F| {
315                re_tracing::profile_scope!("latest_at", format!("{query:?}"));
316
317                let crate::LatestAtCache {
318                    per_query_time,
319                    per_data_time: _,
320                    timeless,
321                    timeline: _,
322                    total_size_bytes: _,
323                } = latest_at_cache;
324
325                // Expected path: cache was properly upserted.
326                if let Some(query_time_bucket_at_query_time) = per_query_time.get(&query.at) {
327                    let is_timeless = std::ptr::eq(
328                        Arc::as_ptr(query_time_bucket_at_query_time),
329                        timeless.as_ref().map_or(std::ptr::null(), |bucket| Arc::as_ptr(bucket)),
330                    );
331                    return iter_results(is_timeless, query_time_bucket_at_query_time, f);
332                }
333
334                re_log::trace!(
335                    store_id = %store.id(),
336                    %entity_path,
337                    ?query,
338                    "either no data exist at this time or we couldn't upsert the cache (write lock was busy)"
339                );
340
341                Ok(())
342            };
343
344
345            let (res1, res2) = self.with_latest_at::<A, _, _, _, _>(
346                store,
347                entity_path.clone(),
348                query,
349                |latest_at_cache| upsert_callback(query, latest_at_cache),
350                |latest_at_cache| iter_callback(query, latest_at_cache, &mut f),
351            );
352
353            if let Some(res1) = res1 {
354                res1?;
355            }
356            res2?;
357
358            Ok(())
359        } }
360    };
361
362    // TODO(cmc): Supporting N>1 generically is quite painful due to limitations in declarative macros,
363    // not that we care at the moment.
364    (for N=1, M=$M:expr) => {
365        seq!(COMP in 1..=$M {
366            impl_query_archetype_latest_at!(for N=1, M=$M => povs=[R1] comps=[#(C~COMP)*]);
367        });
368    };
369}
370
371impl Caches {
372    seq!(NUM_COMP in 0..10 {
373        impl_query_archetype_latest_at!(for N=1, M=NUM_COMP);
374    });
375}