use std::collections::BTreeMap;
use std::ops::Bound;
use emath::lerp;
use re_chunk::{TimeInt, Timeline, TimelineName};
use re_chunk_store::{ChunkStoreDiffKind, ChunkStoreEvent};
use re_log_encoding::RrdManifestTemporalMapEntry;
use re_log_types::{AbsoluteTimeRange, AbsoluteTimeRangeF, TimeReal};
use crate::RrdManifestIndex;
#[derive(Clone)]
pub struct TimeHistogram {
timeline: Timeline,
hist: re_int_histogram::Int64Histogram,
}
impl std::ops::Deref for TimeHistogram {
type Target = re_int_histogram::Int64Histogram;
#[inline]
fn deref(&self) -> &Self::Target {
&self.hist
}
}
impl TimeHistogram {
pub fn new(timeline: Timeline) -> Self {
Self {
timeline,
hist: Default::default(),
}
}
pub fn timeline(&self) -> Timeline {
self.timeline
}
pub fn num_events(&self) -> u64 {
self.hist.total_count()
}
pub fn insert(&mut self, time: TimeInt, count: u64) {
self.hist.increment(time.as_i64(), count as _);
}
pub fn increment(&mut self, time: i64, n: u32) {
self.hist.increment(time, n);
}
pub fn decrement(&mut self, time: i64, n: u32) {
self.hist.decrement(time, n);
}
pub fn min_opt(&self) -> Option<TimeInt> {
self.min_key().map(TimeInt::new_temporal)
}
pub fn min(&self) -> TimeInt {
self.min_opt().unwrap_or(TimeInt::MIN)
}
pub fn max_opt(&self) -> Option<TimeInt> {
self.max_key().map(TimeInt::new_temporal)
}
pub fn max(&self) -> TimeInt {
self.max_opt().unwrap_or(TimeInt::MIN)
}
pub fn full_range(&self) -> AbsoluteTimeRange {
AbsoluteTimeRange::new(self.min(), self.max())
}
pub fn step_fwd_time(&self, time: TimeReal) -> TimeInt {
self.next_key_after(time.floor().as_i64())
.map(TimeInt::new_temporal)
.unwrap_or_else(|| self.min())
}
pub fn step_back_time(&self, time: TimeReal) -> TimeInt {
self.prev_key_before(time.ceil().as_i64())
.map(TimeInt::new_temporal)
.unwrap_or_else(|| self.max())
}
pub fn step_fwd_time_looped(
&self,
time: TimeReal,
loop_range: &AbsoluteTimeRangeF,
) -> TimeReal {
if time < loop_range.min || loop_range.max <= time {
loop_range.min
} else if let Some(next) = self
.range(
(
Bound::Excluded(time.floor().as_i64()),
Bound::Included(loop_range.max.floor().as_i64()),
),
1,
)
.next()
.map(|(r, _)| r.min)
{
TimeReal::from(next)
} else {
self.step_fwd_time(time).into()
}
}
pub fn step_back_time_looped(
&self,
time: TimeReal,
loop_range: &AbsoluteTimeRangeF,
) -> TimeReal {
re_tracing::profile_function!();
if time <= loop_range.min || loop_range.max < time {
loop_range.max
} else {
let mut prev_key = None;
for (range, _) in self.range(
(
Bound::Included(loop_range.min.ceil().as_i64()),
Bound::Excluded(time.ceil().as_i64()),
),
1,
) {
prev_key = Some(range.max);
}
if let Some(prev) = prev_key {
TimeReal::from(TimeInt::new_temporal(prev))
} else {
self.step_back_time(time).into()
}
}
}
}
#[derive(Default, Clone)]
pub struct TimeHistogramPerTimeline {
times: BTreeMap<TimelineName, TimeHistogram>,
has_static: bool,
}
impl TimeHistogramPerTimeline {
#[inline]
pub fn is_empty(&self) -> bool {
self.times.is_empty() && !self.has_static
}
#[inline]
pub fn timelines(&self) -> impl ExactSizeIterator<Item = Timeline> {
self.times.values().map(|h| h.timeline())
}
pub fn histograms(&self) -> impl ExactSizeIterator<Item = &TimeHistogram> {
self.times.values()
}
#[inline]
pub fn get(&self, timeline: &TimelineName) -> Option<&TimeHistogram> {
self.times.get(timeline)
}
#[inline]
pub fn has_timeline(&self, timeline: &TimelineName) -> bool {
self.times.contains_key(timeline)
}
#[inline]
pub fn iter(&self) -> impl ExactSizeIterator<Item = (&TimelineName, &TimeHistogram)> {
self.times.iter()
}
pub fn num_temporal_messages(&self) -> u64 {
self.times.values().map(|hist| hist.total_count()).sum()
}
fn add_temporal(&mut self, timeline: &Timeline, times: &[i64], n: u32) {
re_tracing::profile_function!();
let histogram = self
.times
.entry(*timeline.name())
.or_insert_with(|| TimeHistogram::new(*timeline));
for &time in times {
histogram.increment(time, n);
}
}
fn remove_temporal(&mut self, timeline: &Timeline, times: &[i64], n: u32) {
re_tracing::profile_function!();
if let Some(histogram) = self.times.get_mut(timeline.name()) {
for &time in times {
histogram.decrement(time, n);
}
if histogram.is_empty() {
self.times.remove(timeline.name());
}
}
}
pub fn on_rrd_manifest(
&mut self,
rrd_manifest: &re_log_encoding::RrdManifest,
) -> re_log_encoding::CodecResult<()> {
re_tracing::profile_function!();
let native_temporal_map = rrd_manifest.get_temporal_data_as_a_map()?;
for timelines in native_temporal_map.values() {
for (timeline, comps) in timelines {
let histogram = self
.times
.entry(*timeline.name())
.or_insert_with(|| TimeHistogram::new(*timeline));
for chunks in comps.values() {
for entry in chunks.values() {
let RrdManifestTemporalMapEntry {
time_range,
num_rows,
} = *entry;
apply_estimate(Application::Add, histogram, time_range, num_rows);
}
}
}
}
Ok(())
}
pub fn on_events(&mut self, rrd_manifest_index: &RrdManifestIndex, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();
for event in events {
let original_chunk_id = if let Some(chunk_id) = event.diff.split_source {
chunk_id
} else {
event.chunk.id()
};
if event.chunk.is_static() {
match event.kind {
ChunkStoreDiffKind::Addition => {
self.has_static = true;
}
ChunkStoreDiffKind::Deletion => {
}
}
} else {
for time_column in event.chunk.timelines().values() {
let times = time_column.times_raw();
let timeline = time_column.timeline();
match event.kind {
ChunkStoreDiffKind::Addition => {
if let Some(info) =
rrd_manifest_index.remote_chunk_info(&original_chunk_id)
&& let Some(info) = &info.temporal
{
let histogram = self
.times
.entry(*timeline.name())
.or_insert_with(|| TimeHistogram::new(*timeline));
apply_estimate(
Application::Remove,
histogram,
info.time_range,
info.num_rows,
);
}
self.add_temporal(
time_column.timeline(),
times,
event.num_components() as _,
);
}
ChunkStoreDiffKind::Deletion => {
self.remove_temporal(
time_column.timeline(),
times,
event.num_components() as _,
);
if let Some(info) =
rrd_manifest_index.remote_chunk_info(&original_chunk_id)
&& let Some(info) = &info.temporal
{
let histogram = self
.times
.entry(*timeline.name())
.or_insert_with(|| TimeHistogram::new(*timeline));
apply_estimate(
Application::Add,
histogram,
info.time_range,
info.num_rows,
);
}
}
}
}
}
}
}
}
#[derive(Clone, Copy, Debug)]
enum Application {
Add,
Remove,
}
impl Application {
fn apply(self, histogram: &mut TimeHistogram, position: i64, inc: u32) {
match self {
Self::Add => {
histogram.increment(position, inc);
}
Self::Remove => {
histogram.decrement(position, inc);
}
}
}
}
fn apply_estimate(
application: Application,
histogram: &mut TimeHistogram,
time_range: re_log_types::AbsoluteTimeRange,
num_rows: u64,
) {
if num_rows == 0 {
return;
}
let num_pieces = u64::min(num_rows, 10);
if num_pieces == 1 || time_range.min == time_range.max {
let position = time_range.center();
application.apply(histogram, position.as_i64(), num_rows as u32);
} else {
let inc = (num_rows / num_pieces) as _;
for i in 0..num_pieces {
let position = lerp(
time_range.min.as_f64()..=time_range.max.as_f64(),
i as f64 / (num_pieces as f64 - 1.0),
)
.round() as i64;
match application {
Application::Add => {
histogram.increment(position, inc);
}
Application::Remove => {
histogram.decrement(position, inc);
}
}
}
}
}