use std::collections::BTreeMap;
use arrow::array::ListArray as ArrowListArray;
use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField};
use nohash_hasher::IntMap;
use re_byte_size::SizeBytes;
use re_chunk::ComponentIdentifier;
use re_log_types::{EntityPath, TimeType, Timeline, TimelineName};
use re_sdk_types::ComponentDescriptor;
use re_sorbet::{
ChunkColumnDescriptors, ComponentColumnDescriptor, IndexColumnDescriptor, RowIdColumnDescriptor,
};
use re_types_core::{ArchetypeName, ComponentSet, ComponentType};
use crate::ColumnMetadataState;
#[derive(Debug, Clone)]
pub struct ColumnMetadataEntry {
pub descriptor: ComponentDescriptor,
pub metadata_state: ColumnMetadataState,
pub datatype: ArrowDataType,
}
impl re_byte_size::SizeBytes for ColumnMetadataEntry {
fn heap_size_bytes(&self) -> u64 {
let Self {
descriptor,
metadata_state,
datatype,
} = self;
descriptor.heap_size_bytes() + metadata_state.heap_size_bytes() + datatype.heap_size_bytes()
}
}
use crate::{ChunkComponentMeta, ChunkMeta, ChunkStoreEvent};
type SchemaComponentKey = (
EntityPath,
Option<ArchetypeName>,
ComponentIdentifier,
Option<ComponentType>,
);
fn schema_component_key(descr: &ComponentColumnDescriptor) -> SchemaComponentKey {
(
descr.entity_path.clone(),
descr.archetype,
descr.component,
descr.component_type,
)
}
#[derive(Debug, Clone, Default)]
pub struct StoreSchema {
time_type_registry: IntMap<TimelineName, TimeType>,
components: BTreeMap<SchemaComponentKey, ComponentColumnDescriptor>,
components_per_entity: IntMap<EntityPath, ComponentSet>,
per_column_metadata: IntMap<EntityPath, IntMap<ComponentIdentifier, ColumnMetadataEntry>>,
entity_tree: crate::EntityTree,
}
impl StoreSchema {
#[inline]
pub fn entity_tree(&self) -> &crate::EntityTree {
&self.entity_tree
}
#[inline]
pub fn timelines(&self) -> BTreeMap<TimelineName, Timeline> {
self.time_type_registry
.iter()
.map(|(name, typ)| (*name, Timeline::new(*name, *typ)))
.collect()
}
#[inline]
pub fn time_column_type(&self, timeline_name: &TimelineName) -> Option<TimeType> {
self.time_type_registry.get(timeline_name).copied()
}
#[inline]
pub fn all_components_for_entity(&self, entity_path: &EntityPath) -> Option<&ComponentSet> {
self.components_per_entity.get(entity_path)
}
pub fn entity_component_descriptor(
&self,
entity_path: &EntityPath,
component: ComponentIdentifier,
) -> Option<ComponentDescriptor> {
self.per_column_metadata
.get(entity_path)
.and_then(|per_identifier| per_identifier.get(&component))
.map(|entry| entry.descriptor.clone())
}
pub fn lookup_component_type(
&self,
entity_path: &EntityPath,
component: ComponentIdentifier,
) -> Option<(Option<ComponentType>, ArrowDataType)> {
let entry = self
.per_column_metadata
.get(entity_path)
.and_then(|per_identifier| per_identifier.get(&component))?;
Some((entry.descriptor.component_type, entry.datatype.clone()))
}
pub fn lookup_column_metadata_state(
&self,
entity_path: &EntityPath,
component: ComponentIdentifier,
) -> Option<&ColumnMetadataState> {
self.per_column_metadata
.get(entity_path)
.and_then(|per_identifier| per_identifier.get(&component))
.map(|entry| &entry.metadata_state)
}
pub fn has_mismatched_datatype_for_component_type(
&self,
component_type: &ComponentType,
expected_datatype: &ArrowDataType,
) -> Option<&ArrowDataType> {
re_tracing::profile_function!();
for per_component in self.per_column_metadata.values() {
for entry in per_component.values() {
if entry.descriptor.component_type.as_ref() == Some(component_type)
&& entry.datatype != *expected_datatype
{
return Some(&entry.datatype);
}
}
}
None
}
pub fn per_column_metadata_for_entity(
&self,
entity_path: &EntityPath,
) -> Option<&IntMap<ComponentIdentifier, ColumnMetadataEntry>> {
self.per_column_metadata.get(entity_path)
}
pub fn chunk_column_descriptors(&self) -> ChunkColumnDescriptors {
let mut indices: Vec<IndexColumnDescriptor> = self
.time_type_registry
.iter()
.map(|(name, typ)| IndexColumnDescriptor::from(Timeline::new(*name, *typ)))
.collect();
indices.sort();
ChunkColumnDescriptors {
row_id: RowIdColumnDescriptor::from_sorted(false),
indices,
components: self.components.values().cloned().collect(),
}
}
fn update_column_metadata(
&mut self,
col_descr: &ComponentColumnDescriptor,
) -> Option<ChunkComponentMeta> {
let ComponentColumnDescriptor {
entity_path,
component,
is_static,
is_semantically_empty,
store_datatype: _,
component_type: _,
archetype: _,
is_tombstone: _,
} = col_descr;
let descriptor = col_descr.component_descriptor();
let inner_datatype = col_descr.inner_datatype();
let metadata_state = ColumnMetadataState {
is_semantically_empty: *is_semantically_empty,
is_static: *is_static,
};
let key = schema_component_key(col_descr);
self.components
.entry(key)
.and_modify(|existing| {
existing.is_static |= is_static;
existing.is_semantically_empty &= is_semantically_empty;
})
.or_insert_with(|| col_descr.clone());
let is_new = self
.components_per_entity
.entry(entity_path.clone())
.or_default()
.insert(*component);
let prev_is_static = self
.per_column_metadata
.get(entity_path)
.and_then(|per_id| per_id.get(component))
.map(|e| e.metadata_state.is_static);
let entry = self
.per_column_metadata
.entry(entity_path.clone())
.or_default()
.entry(*component)
.and_modify(|e| {
if e.datatype != inner_datatype {
re_log::warn_once!(
"Datatype of column {} in {entity_path} has changed from {} to {inner_datatype}",
e.descriptor,
e.datatype,
);
e.datatype = inner_datatype.clone();
}
e.metadata_state.is_static |= is_static;
e.metadata_state.is_semantically_empty &= is_semantically_empty;
})
.or_insert_with(|| ColumnMetadataEntry {
descriptor: descriptor.clone(),
metadata_state,
datatype: inner_datatype.clone(),
});
let new_is_static = entry.metadata_state.is_static;
let static_changed = prev_is_static.is_some_and(|prev| !prev && new_is_static);
if is_new || static_changed {
Some(ChunkComponentMeta {
descriptor: descriptor.clone(),
inner_arrow_datatype: Some(inner_datatype.clone()),
has_data: !entry.metadata_state.is_semantically_empty,
is_static: new_is_static,
})
} else {
None
}
}
pub fn on_events(&mut self, events: &[ChunkStoreEvent]) -> Vec<ChunkMeta> {
re_tracing::profile_function!();
let mut all_new: nohash_hasher::IntMap<EntityPath, Vec<ChunkComponentMeta>> =
Default::default();
for event in events {
match &event.diff {
crate::ChunkStoreDiff::Addition(add) => {
for new_col in self.on_chunk_addition(&add.chunk_after_processing) {
all_new
.entry(add.chunk_after_processing.entity_path().clone())
.or_default()
.push(new_col);
}
}
crate::ChunkStoreDiff::VirtualAddition(vadd) => {
for (entity_path, new_cols) in self.on_rrd_manifest(&vadd.rrd_manifest) {
all_new.entry(entity_path).or_default().extend(new_cols);
}
}
crate::ChunkStoreDiff::Deletion(_) | crate::ChunkStoreDiff::SchemaAddition(_) => {
}
}
}
all_new
.into_iter()
.map(|(entity_path, components)| ChunkMeta {
entity_path,
components,
})
.collect()
}
fn on_chunk_addition(&mut self, chunk: &re_chunk::Chunk) -> Vec<ChunkComponentMeta> {
let is_static = chunk.is_static();
for (name, time_column) in chunk.timelines() {
let new_typ = time_column.timeline().typ();
if let Some(old_typ) = self.time_type_registry.insert(*name, new_typ)
&& old_typ != new_typ
{
re_log::warn_once!(
"Timeline '{name}' changed type from {old_typ:?} to {new_typ:?}. \
Rerun does not support using different types for the same timeline.",
);
}
}
let entity_path = chunk.entity_path();
self.entity_tree.on_new_entity(entity_path);
let mut new_columns = Vec::new();
for column in chunk.components().values() {
let descriptor = &column.descriptor;
let component = descriptor.component;
let is_semantically_empty =
re_arrow_util::is_list_array_semantically_empty(&column.list_array);
use re_types_core::Archetype as _;
let is_tombstone = re_types_core::archetypes::Clear::all_components()
.iter()
.any(|descr| descr.component == component);
let col_descr = ComponentColumnDescriptor {
store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR(
ArrowField::new("item", column.list_array.value_type().clone(), true).into(),
),
entity_path: entity_path.clone(),
archetype: descriptor.archetype,
component: descriptor.component,
component_type: descriptor.component_type,
is_static,
is_tombstone,
is_semantically_empty,
};
if let Some(meta) = self.update_column_metadata(&col_descr) {
new_columns.push(meta);
}
}
new_columns
}
fn on_rrd_manifest(
&mut self,
rrd_manifest: &re_log_encoding::RrdManifest,
) -> Vec<(EntityPath, Vec<ChunkComponentMeta>)> {
let sorbet_schema = rrd_manifest.recording_schema();
for descr in sorbet_schema.columns.index_columns() {
self.time_type_registry
.insert(descr.timeline_name(), descr.timeline().typ());
}
for entity in sorbet_schema.all_entities() {
self.entity_tree.on_new_entity(entity);
}
let mut new_per_entity: nohash_hasher::IntMap<EntityPath, Vec<ChunkComponentMeta>> =
Default::default();
for descr in sorbet_schema.columns.component_columns() {
if let Some(meta) = self.update_column_metadata(descr) {
new_per_entity
.entry(descr.entity_path.clone())
.or_default()
.push(meta);
}
}
new_per_entity.into_iter().collect()
}
pub fn drop_entity(&mut self, entity_path: &EntityPath) {
self.components.retain(|key, _| key.0 != *entity_path);
self.components_per_entity.remove(entity_path);
self.per_column_metadata.remove(entity_path);
}
pub fn prune_entity_tree(&mut self, entity_has_data: &impl Fn(&EntityPath) -> bool) {
self.entity_tree.prune_empty_entities(entity_has_data);
}
}
impl SizeBytes for StoreSchema {
fn heap_size_bytes(&self) -> u64 {
let Self {
time_type_registry,
components,
components_per_entity,
per_column_metadata,
entity_tree,
} = self;
time_type_registry.heap_size_bytes()
+ components.heap_size_bytes()
+ components_per_entity.heap_size_bytes()
+ per_column_metadata.heap_size_bytes()
+ entity_tree.heap_size_bytes()
}
}