1use std::{collections::BTreeMap, sync::Arc};
2
3use ahash::HashMap;
4use paste::paste;
5use seq_macro::seq;
6
7use re_data_store::{DataStore, LatestAtQuery, TimeInt};
8use re_log_types::{EntityPath, RowId, Timeline};
9use re_query::query_archetype;
10use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};
11
12use crate::{CacheBucket, Caches};
13
14#[derive(Default)]
18pub struct LatestAtCache {
19 pub per_query_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
26
27 pub per_data_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
34
35 pub timeless: Option<Arc<CacheBucket>>,
43
44 pub(crate) timeline: Timeline,
46
47 total_size_bytes: u64,
49}
50
51impl std::fmt::Debug for LatestAtCache {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 let Self {
54 per_query_time,
55 per_data_time,
56 timeless,
57 timeline,
58 total_size_bytes: _,
59 } = self;
60
61 let mut strings = Vec::new();
62
63 if let Some(bucket) = timeless.as_ref() {
64 strings.push(format!(
65 "query_time=<timeless> -> data_time=<timeless> ({})",
66 re_format::format_bytes(bucket.total_size_bytes as _),
67 ));
68 }
69
70 let data_times_per_bucket: HashMap<_, _> = per_data_time
71 .iter()
72 .map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
73 .collect();
74
75 for (query_time, bucket) in per_query_time {
76 let query_time = timeline.typ().format_utc(*query_time);
77 let data_time = data_times_per_bucket
78 .get(&Arc::as_ptr(bucket))
79 .map_or_else(|| "MISSING?!".to_owned(), |t| timeline.typ().format_utc(*t));
80 strings.push(format!(
81 "query_time={query_time} -> data_time={data_time} ({})",
82 re_format::format_bytes(bucket.total_size_bytes as _),
83 ));
84 strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
85 }
86
87 f.write_str(&strings.join("\n").replace("\n\n", "\n"))
88 }
89}
90
91impl SizeBytes for LatestAtCache {
92 #[inline]
93 fn heap_size_bytes(&self) -> u64 {
94 self.total_size_bytes
95 }
96}
97
98impl LatestAtCache {
99 #[inline]
107 pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
108 let Self {
109 per_query_time,
110 per_data_time,
111 timeless: _,
112 timeline: _,
113 total_size_bytes,
114 } = self;
115
116 let mut removed_bytes = 0u64;
117
118 per_query_time.retain(|&query_time, _| query_time < threshold);
119
120 per_data_time.retain(|&data_time, bucket| {
123 if data_time < threshold {
124 return true;
125 }
126
127 if Arc::strong_count(bucket) == 1 {
129 removed_bytes += bucket.total_size_bytes;
130 }
131
132 false
133 });
134
135 *total_size_bytes = total_size_bytes
136 .checked_sub(removed_bytes)
137 .unwrap_or_else(|| {
138 re_log::debug!(
139 current = *total_size_bytes,
140 removed = removed_bytes,
141 "book keeping underflowed"
142 );
143 u64::MIN
144 });
145
146 removed_bytes
147 }
148}
149
150macro_rules! impl_query_archetype_latest_at {
153 (for N=$N:expr, M=$M:expr => povs=[$($pov:ident)+] comps=[$($comp:ident)*]) => { paste! {
154 #[doc = "Cached implementation of [`re_query::query_archetype`] and [`re_query::range_archetype`]"]
155 #[doc = "(combined) for `" $N "` point-of-view components and `" $M "` optional components."]
156 #[allow(non_snake_case)]
157 pub fn [<query_archetype_latest_at_pov$N _comp$M>]<'a, A, $($pov,)+ $($comp,)* F>(
158 &self,
159 store: &'a DataStore,
160 query: &LatestAtQuery,
161 entity_path: &'a EntityPath,
162 mut f: F,
163 ) -> ::re_query::Result<()>
164 where
165 A: Archetype + 'a,
166 $($pov: Component + Send + Sync + 'static,)+
167 $($comp: Component + Send + Sync + 'static,)*
168 F: FnMut(
169 (
170 (Option<TimeInt>, RowId),
171 &[InstanceKey],
172 $(&[$pov],)+
173 $(Option<&[Option<$comp>]>,)*
174 ),
175 ),
176 {
177 let iter_results = |timeless: bool, bucket: &crate::CacheBucket, f: &mut F| -> crate::Result<()> {
178 let it = itertools::izip!(
182 bucket.iter_data_times(),
183 bucket.iter_pov_instance_keys(),
184 $(bucket.iter_component::<$pov>()
185 .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
186 $(bucket.iter_component_opt::<$comp>()
187 .map_or_else(
188 || itertools::Either::Left(std::iter::repeat(&[] as &[Option<$comp>])),
189 |it| itertools::Either::Right(it)),
190 )*
191 ).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
192 (
193 ((!timeless).then_some(*time), *row_id),
194 instance_keys,
195 $($pov,)+
196 $((!$comp.is_empty()).then_some($comp),)*
197 )
198 });
199
200 for data in it {
201 f(data);
202 }
203
204 Ok(())
205 };
206
207 let create_and_fill_bucket = |
208 data_time: TimeInt,
209 arch_view: &::re_query::ArchetypeView<A>,
210 | -> crate::Result<crate::CacheBucket> {
211 re_log::trace!(data_time=?data_time, ?data_time, "fill");
212
213 #[cfg(not(target_arch = "wasm32"))]
215 let now = web_time::Instant::now();
216
217 let mut bucket = crate::CacheBucket::default();
218 bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(data_time, &arch_view)?;
219
220 #[cfg(not(target_arch = "wasm32"))]
221 {
222 let elapsed = now.elapsed();
223 ::re_log::trace!(
224 store_id=%store.id(),
225 %entity_path,
226 archetype=%A::name(),
227 added_size_bytes=bucket.total_size_bytes,
228 "cached new entry in {elapsed:?} ({:0.3} entries/s)",
229 1f64 / elapsed.as_secs_f64()
230 );
231 }
232
233 Ok(bucket)
234 };
235
236 let upsert_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| -> crate::Result<()> {
237 re_tracing::profile_scope!("latest_at", format!("{query:?}"));
238
239 let crate::LatestAtCache {
240 per_query_time,
241 per_data_time,
242 timeless,
243 timeline: _,
244 total_size_bytes,
245 } = latest_at_cache;
246
247 let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
248 std::collections::btree_map::Entry::Occupied(_) => {
249 re_log::trace!(query_time=?query.at, "cache hit (query time)");
252 return Ok(());
253 }
254 std::collections::btree_map::Entry::Vacant(entry) => entry,
255 };
256
257 let arch_view = query_archetype::<A>(store, &query, entity_path)?;
258 let data_time = arch_view.data_time();
259
260 if let Some(data_time) = data_time { if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
264 re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)");
265
266 query_time_bucket_at_query_time.insert(Arc::clone(&data_time_bucket_at_data_time));
267
268 let query_time_bucket_at_data_time = per_query_time.entry(data_time);
272 query_time_bucket_at_data_time
273 .and_modify(|v| *v = Arc::clone(&data_time_bucket_at_data_time))
274 .or_insert(Arc::clone(&data_time_bucket_at_data_time));
275
276 return Ok(());
277 }
278 } else {
279 if let Some(timeless) = timeless.as_ref() {
280 re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)");
281 query_time_bucket_at_query_time.insert(Arc::clone(timeless));
282 return Ok(());
283 }
284 }
285
286 if let Some(data_time) = data_time { re_log::trace!(query_time=?query.at, ?data_time, "cache miss");
289
290 let bucket = Arc::new(create_and_fill_bucket(data_time, &arch_view)?);
291 *total_size_bytes += bucket.total_size_bytes;
292 let query_time_bucket_at_query_time = query_time_bucket_at_query_time.insert(bucket);
293
294 let data_time_bucket_at_data_time = per_data_time.entry(data_time);
295 data_time_bucket_at_data_time
296 .and_modify(|v| *v = Arc::clone(&query_time_bucket_at_query_time))
297 .or_insert(Arc::clone(&query_time_bucket_at_query_time));
298
299 Ok(())
300 } else {
301 re_log::trace!(query_time=?query.at, "cache miss (timeless)");
302
303 let bucket = create_and_fill_bucket(TimeInt::MIN, &arch_view)?;
304 *total_size_bytes += bucket.total_size_bytes;
305
306 let bucket = Arc::new(bucket);
307 *timeless = Some(Arc::clone(&bucket));
308 query_time_bucket_at_query_time.insert(Arc::clone(&bucket));
309
310 Ok(())
311 }
312 };
313
314 let iter_callback = |query: &LatestAtQuery, latest_at_cache: &crate::LatestAtCache, f: &mut F| {
315 re_tracing::profile_scope!("latest_at", format!("{query:?}"));
316
317 let crate::LatestAtCache {
318 per_query_time,
319 per_data_time: _,
320 timeless,
321 timeline: _,
322 total_size_bytes: _,
323 } = latest_at_cache;
324
325 if let Some(query_time_bucket_at_query_time) = per_query_time.get(&query.at) {
327 let is_timeless = std::ptr::eq(
328 Arc::as_ptr(query_time_bucket_at_query_time),
329 timeless.as_ref().map_or(std::ptr::null(), |bucket| Arc::as_ptr(bucket)),
330 );
331 return iter_results(is_timeless, query_time_bucket_at_query_time, f);
332 }
333
334 re_log::trace!(
335 store_id = %store.id(),
336 %entity_path,
337 ?query,
338 "either no data exist at this time or we couldn't upsert the cache (write lock was busy)"
339 );
340
341 Ok(())
342 };
343
344
345 let (res1, res2) = self.with_latest_at::<A, _, _, _, _>(
346 store,
347 entity_path.clone(),
348 query,
349 |latest_at_cache| upsert_callback(query, latest_at_cache),
350 |latest_at_cache| iter_callback(query, latest_at_cache, &mut f),
351 );
352
353 if let Some(res1) = res1 {
354 res1?;
355 }
356 res2?;
357
358 Ok(())
359 } }
360 };
361
362 (for N=1, M=$M:expr) => {
365 seq!(COMP in 1..=$M {
366 impl_query_archetype_latest_at!(for N=1, M=$M => povs=[R1] comps=[#(C~COMP)*]);
367 });
368 };
369}
370
371impl Caches {
372 seq!(NUM_COMP in 0..10 {
373 impl_query_archetype_latest_at!(for N=1, M=NUM_COMP);
374 });
375}