use std::{collections::BTreeMap, sync::Arc};
use ahash::HashMap;
use paste::paste;
use seq_macro::seq;
use re_data_store::{DataStore, LatestAtQuery, TimeInt};
use re_log_types::{EntityPath, RowId, Timeline};
use re_query::query_archetype;
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};
use crate::{CacheBucket, Caches};
#[derive(Default)]
pub struct LatestAtCache {
pub per_query_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
pub per_data_time: BTreeMap<TimeInt, Arc<CacheBucket>>,
pub timeless: Option<Arc<CacheBucket>>,
pub(crate) timeline: Timeline,
total_size_bytes: u64,
}
impl std::fmt::Debug for LatestAtCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
per_query_time,
per_data_time,
timeless,
timeline,
total_size_bytes: _,
} = self;
let mut strings = Vec::new();
if let Some(bucket) = timeless.as_ref() {
strings.push(format!(
"query_time=<timeless> -> data_time=<timeless> ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
}
let data_times_per_bucket: HashMap<_, _> = per_data_time
.iter()
.map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
.collect();
for (query_time, bucket) in per_query_time {
let query_time = timeline.typ().format_utc(*query_time);
let data_time = data_times_per_bucket
.get(&Arc::as_ptr(bucket))
.map_or_else(|| "MISSING?!".to_owned(), |t| timeline.typ().format_utc(*t));
strings.push(format!(
"query_time={query_time} -> data_time={data_time} ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
}
f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}
impl SizeBytes for LatestAtCache {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.total_size_bytes
}
}
impl LatestAtCache {
#[inline]
pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
let Self {
per_query_time,
per_data_time,
timeless: _,
timeline: _,
total_size_bytes,
} = self;
let mut removed_bytes = 0u64;
per_query_time.retain(|&query_time, _| query_time < threshold);
per_data_time.retain(|&data_time, bucket| {
if data_time < threshold {
return true;
}
if Arc::strong_count(bucket) == 1 {
removed_bytes += bucket.total_size_bytes;
}
false
});
*total_size_bytes = total_size_bytes
.checked_sub(removed_bytes)
.unwrap_or_else(|| {
re_log::debug!(
current = *total_size_bytes,
removed = removed_bytes,
"book keeping underflowed"
);
u64::MIN
});
removed_bytes
}
}
macro_rules! impl_query_archetype_latest_at {
(for N=$N:expr, M=$M:expr => povs=[$($pov:ident)+] comps=[$($comp:ident)*]) => { paste! {
#[doc = "Cached implementation of [`re_query::query_archetype`] and [`re_query::range_archetype`]"]
#[doc = "(combined) for `" $N "` point-of-view components and `" $M "` optional components."]
#[allow(non_snake_case)]
pub fn [<query_archetype_latest_at_pov$N _comp$M>]<'a, A, $($pov,)+ $($comp,)* F>(
&self,
store: &'a DataStore,
query: &LatestAtQuery,
entity_path: &'a EntityPath,
mut f: F,
) -> ::re_query::Result<()>
where
A: Archetype + 'a,
$($pov: Component + Send + Sync + 'static,)+
$($comp: Component + Send + Sync + 'static,)*
F: FnMut(
(
(Option<TimeInt>, RowId),
&[InstanceKey],
$(&[$pov],)+
$(Option<&[Option<$comp>]>,)*
),
),
{
let iter_results = |timeless: bool, bucket: &crate::CacheBucket, f: &mut F| -> crate::Result<()> {
let it = itertools::izip!(
bucket.iter_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
$(bucket.iter_component_opt::<$comp>()
.map_or_else(
|| itertools::Either::Left(std::iter::repeat(&[] as &[Option<$comp>])),
|it| itertools::Either::Right(it)),
)*
).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
(
((!timeless).then_some(*time), *row_id),
instance_keys,
$($pov,)+
$((!$comp.is_empty()).then_some($comp),)*
)
});
for data in it {
f(data);
}
Ok(())
};
let create_and_fill_bucket = |
data_time: TimeInt,
arch_view: &::re_query::ArchetypeView<A>,
| -> crate::Result<crate::CacheBucket> {
re_log::trace!(data_time=?data_time, ?data_time, "fill");
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();
let mut bucket = crate::CacheBucket::default();
bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(data_time, &arch_view)?;
#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
added_size_bytes=bucket.total_size_bytes,
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}
Ok(bucket)
};
let upsert_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| -> crate::Result<()> {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));
let crate::LatestAtCache {
per_query_time,
per_data_time,
timeless,
timeline: _,
total_size_bytes,
} = latest_at_cache;
let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(_) => {
re_log::trace!(query_time=?query.at, "cache hit (query time)");
return Ok(());
}
std::collections::btree_map::Entry::Vacant(entry) => entry,
};
let arch_view = query_archetype::<A>(store, &query, entity_path)?;
let data_time = arch_view.data_time();
if let Some(data_time) = data_time { if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)");
query_time_bucket_at_query_time.insert(Arc::clone(&data_time_bucket_at_data_time));
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
query_time_bucket_at_data_time
.and_modify(|v| *v = Arc::clone(&data_time_bucket_at_data_time))
.or_insert(Arc::clone(&data_time_bucket_at_data_time));
return Ok(());
}
} else {
if let Some(timeless) = timeless.as_ref() {
re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)");
query_time_bucket_at_query_time.insert(Arc::clone(timeless));
return Ok(());
}
}
if let Some(data_time) = data_time { re_log::trace!(query_time=?query.at, ?data_time, "cache miss");
let bucket = Arc::new(create_and_fill_bucket(data_time, &arch_view)?);
*total_size_bytes += bucket.total_size_bytes;
let query_time_bucket_at_query_time = query_time_bucket_at_query_time.insert(bucket);
let data_time_bucket_at_data_time = per_data_time.entry(data_time);
data_time_bucket_at_data_time
.and_modify(|v| *v = Arc::clone(&query_time_bucket_at_query_time))
.or_insert(Arc::clone(&query_time_bucket_at_query_time));
Ok(())
} else {
re_log::trace!(query_time=?query.at, "cache miss (timeless)");
let bucket = create_and_fill_bucket(TimeInt::MIN, &arch_view)?;
*total_size_bytes += bucket.total_size_bytes;
let bucket = Arc::new(bucket);
*timeless = Some(Arc::clone(&bucket));
query_time_bucket_at_query_time.insert(Arc::clone(&bucket));
Ok(())
}
};
let iter_callback = |query: &LatestAtQuery, latest_at_cache: &crate::LatestAtCache, f: &mut F| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));
let crate::LatestAtCache {
per_query_time,
per_data_time: _,
timeless,
timeline: _,
total_size_bytes: _,
} = latest_at_cache;
if let Some(query_time_bucket_at_query_time) = per_query_time.get(&query.at) {
let is_timeless = std::ptr::eq(
Arc::as_ptr(query_time_bucket_at_query_time),
timeless.as_ref().map_or(std::ptr::null(), |bucket| Arc::as_ptr(bucket)),
);
return iter_results(is_timeless, query_time_bucket_at_query_time, f);
}
re_log::trace!(
store_id = %store.id(),
%entity_path,
?query,
"either no data exist at this time or we couldn't upsert the cache (write lock was busy)"
);
Ok(())
};
let (res1, res2) = self.with_latest_at::<A, _, _, _, _>(
store,
entity_path.clone(),
query,
|latest_at_cache| upsert_callback(query, latest_at_cache),
|latest_at_cache| iter_callback(query, latest_at_cache, &mut f),
);
if let Some(res1) = res1 {
res1?;
}
res2?;
Ok(())
} }
};
(for N=1, M=$M:expr) => {
seq!(COMP in 1..=$M {
impl_query_archetype_latest_at!(for N=1, M=$M => povs=[R1] comps=[#(C~COMP)*]);
});
};
}
impl Caches {
seq!(NUM_COMP in 0..10 {
impl_query_archetype_latest_at!(for N=1, M=NUM_COMP);
});
}