use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use ahash::HashMap;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use parking_lot::RwLock;
use smallvec::SmallVec;
use re_log_types::{
DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, NumInstancesVec, RowId,
RowIdVec, StoreId, TimeInt, TimePoint, TimeRange, Timeline,
};
use re_types_core::{ComponentName, ComponentNameSet, SizeBytes};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataStoreConfig {
pub indexed_bucket_num_rows: u64,
pub store_insert_ids: bool,
pub enable_typecheck: bool,
}
impl Default for DataStoreConfig {
#[inline]
fn default() -> Self {
Self::DEFAULT
}
}
impl DataStoreConfig {
pub const DEFAULT: Self = Self {
indexed_bucket_num_rows: 512,
store_insert_ids: cfg!(debug_assertions),
enable_typecheck: cfg!(debug_assertions),
};
}
pub type InsertIdVec = SmallVec<[u64; 4]>;
#[derive(Debug, Default, Clone)]
pub struct DataTypeRegistry(pub IntMap<ComponentName, DataType>);
impl std::ops::Deref for DataTypeRegistry {
type Target = IntMap<ComponentName, DataType>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for DataTypeRegistry {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, Clone)]
pub struct MetadataRegistry<T: Clone> {
pub registry: BTreeMap<RowId, T>,
pub heap_size_bytes: u64,
}
impl Default for MetadataRegistry<TimePoint> {
fn default() -> Self {
let mut this = Self {
registry: Default::default(),
heap_size_bytes: 0,
};
this.heap_size_bytes = this.heap_size_bytes(); this
}
}
impl<T: Clone> std::ops::Deref for MetadataRegistry<T> {
type Target = BTreeMap<RowId, T>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.registry
}
}
impl<T: Clone> std::ops::DerefMut for MetadataRegistry<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.registry
}
}
#[derive(Debug, Default, Clone)]
pub struct ClusterCellCache(pub IntMap<u32, DataCell>);
impl std::ops::Deref for ClusterCellCache {
type Target = IntMap<u32, DataCell>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for ClusterCellCache {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct StoreGeneration {
insert_id: u64,
gc_id: u64,
}
pub struct DataStore {
pub(crate) id: StoreId,
pub(crate) cluster_key: ComponentName,
pub(crate) config: DataStoreConfig,
pub(crate) type_registry: DataTypeRegistry,
pub(crate) metadata_registry: MetadataRegistry<TimePoint>,
pub(crate) cluster_cell_cache: ClusterCellCache,
pub(crate) tables: HashMap<(Timeline, EntityPathHash), IndexedTable>,
pub(crate) timeless_tables: IntMap<EntityPathHash, PersistentIndexedTable>,
pub(crate) insert_id: u64,
pub(crate) query_id: AtomicU64,
pub(crate) gc_id: u64,
pub(crate) event_id: AtomicU64,
}
impl Clone for DataStore {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
cluster_key: self.cluster_key,
config: self.config.clone(),
type_registry: self.type_registry.clone(),
metadata_registry: self.metadata_registry.clone(),
cluster_cell_cache: self.cluster_cell_cache.clone(),
tables: self.tables.clone(),
timeless_tables: self.timeless_tables.clone(),
insert_id: Default::default(),
query_id: Default::default(),
gc_id: Default::default(),
event_id: Default::default(),
}
}
}
impl DataStore {
pub fn new(id: StoreId, cluster_key: ComponentName, config: DataStoreConfig) -> Self {
Self {
id,
cluster_key,
config,
cluster_cell_cache: Default::default(),
type_registry: Default::default(),
metadata_registry: Default::default(),
tables: Default::default(),
timeless_tables: Default::default(),
insert_id: 0,
query_id: AtomicU64::new(0),
gc_id: 0,
event_id: AtomicU64::new(0),
}
}
#[inline]
pub fn id(&self) -> &StoreId {
&self.id
}
pub fn insert_id_component_name() -> ComponentName {
"rerun.controls.InsertId".into()
}
pub fn generation(&self) -> StoreGeneration {
StoreGeneration {
insert_id: self.insert_id,
gc_id: self.gc_id,
}
}
pub fn cluster_key(&self) -> ComponentName {
self.cluster_key
}
pub fn config(&self) -> &DataStoreConfig {
&self.config
}
pub fn lookup_datatype(&self, component: &ComponentName) -> Option<&DataType> {
self.type_registry.get(component)
}
pub fn oldest_time_per_timeline(&self) -> BTreeMap<Timeline, TimeInt> {
re_tracing::profile_function!();
let mut oldest_time_per_timeline = BTreeMap::default();
for index in self.tables.values() {
if let Some(bucket) = index.buckets.values().next() {
let entry = oldest_time_per_timeline
.entry(bucket.timeline)
.or_insert(TimeInt::MAX);
if let Some(time) = bucket.inner.read().col_time.first() {
*entry = TimeInt::min(*entry, (*time).into());
}
}
}
oldest_time_per_timeline
}
pub fn iter_indices(
&self,
) -> impl ExactSizeIterator<Item = ((Timeline, EntityPath), &IndexedTable)> {
self.tables.iter().map(|((timeline, _), table)| {
((*timeline, table.ent_path.clone() ), table)
})
}
}
#[test]
#[cfg(test)]
fn datastore_internal_repr() {
use re_log_types::DataTable;
use re_types_core::Loggable as _;
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
re_types::components::InstanceKey::name(),
DataStoreConfig {
indexed_bucket_num_rows: 0,
store_insert_ids: true,
enable_typecheck: true,
},
);
let timeless = DataTable::example(true);
eprintln!("{timeless}");
for row in timeless.to_rows() {
store.insert_row(&row.unwrap()).unwrap();
}
let temporal = DataTable::example(false);
eprintln!("{temporal}");
for row in temporal.to_rows() {
store.insert_row(&row.unwrap()).unwrap();
}
store.sanity_check().unwrap();
eprintln!("{store}");
}
#[derive(Debug, Clone)]
pub struct IndexedTable {
pub timeline: Timeline,
pub ent_path: EntityPath,
pub cluster_key: ComponentName,
pub buckets: BTreeMap<TimeInt, IndexedBucket>,
pub all_components: ComponentNameSet,
pub buckets_num_rows: u64,
pub buckets_size_bytes: u64,
}
impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.total_size_bytes();
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), bucket)].into(),
cluster_key,
all_components: Default::default(),
buckets_num_rows: 0,
buckets_size_bytes,
}
}
}
#[derive(Debug)]
pub struct IndexedBucket {
pub timeline: Timeline,
pub cluster_key: ComponentName,
pub inner: RwLock<IndexedBucketInner>,
}
impl Clone for IndexedBucket {
fn clone(&self) -> Self {
Self {
timeline: self.timeline,
cluster_key: self.cluster_key,
inner: RwLock::new(self.inner.read().clone()),
}
}
}
impl IndexedBucket {
fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
Self {
timeline,
inner: RwLock::new(IndexedBucketInner::default()),
cluster_key,
}
}
}
#[derive(Debug, Clone)]
pub struct IndexedBucketInner {
pub is_sorted: bool,
pub time_range: TimeRange,
pub col_time: ErasedTimeVec,
pub col_insert_id: InsertIdVec,
pub col_row_id: RowIdVec,
pub col_num_instances: NumInstancesVec,
pub columns: IntMap<ComponentName, DataCellColumn>,
pub size_bytes: u64,
}
impl Default for IndexedBucketInner {
fn default() -> Self {
let mut this = Self {
is_sorted: true,
time_range: TimeRange::EMPTY,
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
size_bytes: 0, };
this.compute_size_bytes();
this
}
}
#[derive(Debug, Clone)]
pub struct PersistentIndexedTable {
pub ent_path: EntityPath,
pub cluster_key: ComponentName,
pub col_insert_id: InsertIdVec,
pub col_row_id: RowIdVec,
pub col_num_instances: NumInstancesVec,
pub columns: IntMap<ComponentName, DataCellColumn>,
}
impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
}
}
pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
}
}