use nohash_hasher::IntMap;
use re_log_types::{TimePoint, TimeRange};
use re_types_core::{ComponentName, SizeBytes};
use crate::{
store::IndexedBucketInner, ClusterCellCache, DataStore, DataTypeRegistry, IndexedBucket,
IndexedTable, MetadataRegistry, PersistentIndexedTable,
};
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub struct DataStoreRowStats {
pub num_rows: u64,
pub num_bytes: u64,
}
impl std::ops::Sub for DataStoreRowStats {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self {
num_rows: self.num_rows - rhs.num_rows,
num_bytes: self.num_bytes - rhs.num_bytes,
}
}
}
impl std::ops::Add for DataStoreRowStats {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self {
num_rows: self.num_rows + rhs.num_rows,
num_bytes: self.num_bytes + rhs.num_bytes,
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd)]
pub struct DataStoreStats {
pub type_registry: DataStoreRowStats,
pub metadata_registry: DataStoreRowStats,
pub autogenerated: DataStoreRowStats,
pub timeless: DataStoreRowStats,
pub temporal: DataStoreRowStats,
pub temporal_buckets: u64,
pub total: DataStoreRowStats,
}
impl std::ops::Sub for DataStoreStats {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self {
type_registry: self.type_registry - rhs.type_registry,
metadata_registry: self.metadata_registry - rhs.metadata_registry,
autogenerated: self.autogenerated - rhs.autogenerated,
timeless: self.timeless - rhs.timeless,
temporal: self.temporal - rhs.temporal,
temporal_buckets: self.temporal_buckets - rhs.temporal_buckets,
total: self.total - rhs.total,
}
}
}
impl std::ops::Add for DataStoreStats {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self {
type_registry: self.type_registry + rhs.type_registry,
metadata_registry: self.metadata_registry + rhs.metadata_registry,
autogenerated: self.autogenerated + rhs.autogenerated,
timeless: self.timeless + rhs.timeless,
temporal: self.temporal + rhs.temporal,
temporal_buckets: self.temporal_buckets + rhs.temporal_buckets,
total: self.total + rhs.total,
}
}
}
impl DataStoreStats {
pub fn from_store(store: &DataStore) -> Self {
re_tracing::profile_function!();
let type_registry = {
re_tracing::profile_scope!("type_registry");
DataStoreRowStats {
num_rows: store.type_registry.len() as _,
num_bytes: store.type_registry.total_size_bytes(),
}
};
let metadata_registry = {
re_tracing::profile_scope!("metadata_registry");
DataStoreRowStats {
num_rows: store.metadata_registry.len() as _,
num_bytes: store.metadata_registry.total_size_bytes(),
}
};
let autogenerated = {
re_tracing::profile_scope!("autogenerated");
DataStoreRowStats {
num_rows: store.cluster_cell_cache.len() as _,
num_bytes: store.cluster_cell_cache.total_size_bytes(),
}
};
let timeless = {
re_tracing::profile_scope!("timeless");
DataStoreRowStats {
num_rows: store.num_timeless_rows(),
num_bytes: store.timeless_size_bytes(),
}
};
let (temporal, temporal_buckets) = {
re_tracing::profile_scope!("temporal");
(
DataStoreRowStats {
num_rows: store.num_temporal_rows(),
num_bytes: store.temporal_size_bytes(),
},
store.num_temporal_buckets(),
)
};
let total = DataStoreRowStats {
num_rows: timeless.num_rows + temporal.num_rows,
num_bytes: type_registry.num_bytes
+ metadata_registry.num_bytes
+ autogenerated.num_bytes
+ timeless.num_bytes
+ temporal.num_bytes,
};
Self {
type_registry,
metadata_registry,
autogenerated,
timeless,
temporal,
temporal_buckets,
total,
}
}
pub fn total_rows_and_bytes_with_timeless(&self, include_timeless: bool) -> (u64, f64) {
let mut num_rows = self.temporal.num_rows + self.metadata_registry.num_rows;
let mut num_bytes = (self.temporal.num_bytes + self.metadata_registry.num_bytes) as f64;
if include_timeless {
num_rows += self.timeless.num_rows;
num_bytes += self.timeless.num_bytes as f64;
}
(num_rows, num_bytes)
}
}
impl SizeBytes for DataTypeRegistry {
#[inline]
fn heap_size_bytes(&self) -> u64 {
type K = ComponentName;
let inner: &IntMap<K, _> = &self.0;
let keys_size_bytes = std::mem::size_of::<K>() * inner.len();
let values_size_bytes = self.values().map(SizeBytes::total_size_bytes).sum::<u64>();
keys_size_bytes as u64 + values_size_bytes
}
}
impl SizeBytes for MetadataRegistry<TimePoint> {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.heap_size_bytes
}
}
impl SizeBytes for ClusterCellCache {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.0.heap_size_bytes()
}
}
impl DataStore {
#[inline]
pub fn num_timeless_rows(&self) -> u64 {
re_tracing::profile_function!();
self.timeless_tables
.values()
.map(|table| table.num_rows())
.sum()
}
#[inline]
pub fn timeless_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
self.timeless_tables
.values()
.map(|table| table.total_size_bytes())
.sum()
}
#[inline]
pub fn num_temporal_rows(&self) -> u64 {
re_tracing::profile_function!();
self.tables.values().map(|table| table.num_rows()).sum()
}
#[inline]
pub fn temporal_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
self.tables
.values()
.map(|table| table.total_size_bytes())
.sum()
}
#[inline]
pub fn num_temporal_buckets(&self) -> u64 {
re_tracing::profile_function!();
self.tables.values().map(|table| table.num_buckets()).sum()
}
pub fn entity_stats(
&self,
timeline: re_log_types::Timeline,
entity_path_hash: re_log_types::EntityPathHash,
) -> EntityStats {
let mut entity_stats = self.tables.get(&(timeline, entity_path_hash)).map_or(
EntityStats::default(),
|table| EntityStats {
num_rows: table.buckets_num_rows,
size_bytes: table.buckets_size_bytes,
time_range: table.time_range(),
timelines_rows: 0,
timelines_size_bytes: 0,
},
);
if let Some(timeless) = self.timeless_tables.get(&entity_path_hash) {
entity_stats.timelines_rows = timeless.num_rows();
entity_stats.timelines_size_bytes = timeless.total_size_bytes();
}
entity_stats
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EntityStats {
pub num_rows: u64,
pub size_bytes: u64,
pub time_range: re_log_types::TimeRange,
pub timelines_rows: u64,
pub timelines_size_bytes: u64,
}
impl Default for EntityStats {
fn default() -> Self {
Self {
num_rows: 0,
size_bytes: 0,
time_range: re_log_types::TimeRange::EMPTY,
timelines_rows: 0,
timelines_size_bytes: 0,
}
}
}
impl IndexedTable {
#[inline]
pub fn num_rows(&self) -> u64 {
self.buckets_num_rows
}
#[inline]
pub(crate) fn num_rows_uncached(&self) -> u64 {
re_tracing::profile_function!();
self.buckets.values().map(|bucket| bucket.num_rows()).sum()
}
#[inline]
pub(crate) fn size_bytes_uncached(&self) -> u64 {
re_tracing::profile_function!();
self.stack_size_bytes()
+ self
.buckets
.values()
.map(|bucket| bucket.total_size_bytes())
.sum::<u64>()
}
#[inline]
pub fn num_buckets(&self) -> u64 {
self.buckets.len() as _
}
pub fn time_range(&self) -> TimeRange {
if let (Some((_, first)), Some((_, last))) = (
self.buckets.first_key_value(),
self.buckets.last_key_value(),
) {
first
.inner
.read()
.time_range
.union(last.inner.read().time_range)
} else {
TimeRange::EMPTY
}
}
}
impl SizeBytes for IndexedTable {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.buckets_size_bytes
}
}
impl IndexedBucket {
#[inline]
pub fn num_rows(&self) -> u64 {
self.inner.read().col_time.len() as u64
}
}
impl SizeBytes for IndexedBucket {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.inner.read().size_bytes
}
}
impl IndexedBucketInner {
#[inline]
pub fn compute_size_bytes(&mut self) -> u64 {
re_tracing::profile_function!();
let Self {
is_sorted,
time_range,
col_time,
col_insert_id,
col_row_id,
col_num_instances,
columns,
size_bytes,
} = self;
*size_bytes = is_sorted.total_size_bytes()
+ time_range.total_size_bytes()
+ col_time.total_size_bytes()
+ col_insert_id.total_size_bytes()
+ col_row_id.total_size_bytes()
+ col_num_instances.total_size_bytes()
+ columns.total_size_bytes()
+ size_bytes.total_size_bytes();
*size_bytes
}
}
impl PersistentIndexedTable {
#[inline]
pub fn num_rows(&self) -> u64 {
self.col_num_instances.len() as _
}
}
impl SizeBytes for PersistentIndexedTable {
#[inline]
fn heap_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
let Self {
ent_path,
cluster_key,
col_insert_id,
col_row_id,
col_num_instances,
columns,
} = self;
ent_path.total_size_bytes()
+ cluster_key.total_size_bytes()
+ col_insert_id.total_size_bytes()
+ col_row_id.total_size_bytes()
+ col_num_instances.total_size_bytes()
+ columns.total_size_bytes()
}
}