use std::sync::Arc;
use re_chunk::Chunk;
use re_log_encoding::RrdManifest;
use re_log_types::StoreId;
use crate::{ChunkDirectLineageReport, ChunkStoreGeneration};
#[expect(unused_imports, clippy::unused_trait_names)] use crate::{ChunkId, ChunkStore, ChunkStoreSubscriber, RowId};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkComponentMeta {
pub descriptor: re_sdk_types::ComponentDescriptor,
pub inner_arrow_datatype: Option<arrow::datatypes::DataType>,
pub has_data: bool,
pub is_static: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkMeta {
pub entity_path: re_chunk::EntityPath,
pub components: Vec<ChunkComponentMeta>,
}
impl ChunkMeta {
pub fn from_chunk(chunk: &Chunk) -> Self {
let components: Vec<ChunkComponentMeta> = chunk
.components()
.values()
.map(|column| ChunkComponentMeta {
descriptor: column.descriptor.clone(),
inner_arrow_datatype: Some(column.list_array.value_type()),
has_data: !column.list_array.values().is_empty(),
is_static: chunk.is_static(),
})
.collect();
Self {
entity_path: chunk.entity_path().clone(),
components,
}
}
pub fn from_manifest(manifest: &RrdManifest) -> Vec<Self> {
re_tracing::profile_function!();
ChunkStoreDiffVirtualAddition {
rrd_manifest: Arc::new(manifest.clone()),
}
.chunk_metas()
.collect()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ChunkStoreEvent {
pub store_id: StoreId,
pub store_generation: ChunkStoreGeneration,
pub event_id: u64,
pub diff: ChunkStoreDiff,
}
impl std::ops::Deref for ChunkStoreEvent {
type Target = ChunkStoreDiff;
#[inline]
fn deref(&self) -> &Self::Target {
&self.diff
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ChunkStoreDiff {
Addition(ChunkStoreDiffAddition),
VirtualAddition(ChunkStoreDiffVirtualAddition),
Deletion(ChunkStoreDiffDeletion),
SchemaAddition(ChunkStoreDiffSchemaAddition),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkStoreDiffSchemaAddition {
pub new_columns: Vec<ChunkMeta>,
}
impl From<ChunkStoreDiffAddition> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffAddition) -> Self {
Self::Addition(value)
}
}
impl From<ChunkStoreDiffVirtualAddition> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffVirtualAddition) -> Self {
Self::VirtualAddition(value)
}
}
impl From<ChunkStoreDiffDeletion> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffDeletion) -> Self {
Self::Deletion(value)
}
}
impl From<ChunkStoreDiffSchemaAddition> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffSchemaAddition) -> Self {
Self::SchemaAddition(value)
}
}
impl ChunkStoreDiff {
pub fn addition(
chunk_before_processing: Arc<Chunk>,
chunk_after_processing: Arc<Chunk>,
direct_lineage: ChunkDirectLineageReport,
) -> Self {
Self::Addition(ChunkStoreDiffAddition {
chunk_before_processing,
chunk_after_processing,
direct_lineage,
})
}
pub fn virtual_addition(rrd_manifest: Arc<RrdManifest>) -> Self {
Self::VirtualAddition(ChunkStoreDiffVirtualAddition { rrd_manifest })
}
pub fn deletion(chunk: Arc<Chunk>, reason: ChunkDeletionReason) -> Self {
Self::Deletion(ChunkStoreDiffDeletion { chunk, reason })
}
pub fn is_addition(&self) -> bool {
matches!(self, Self::Addition(_))
}
pub fn is_virtual_addition(&self) -> bool {
matches!(self, Self::VirtualAddition(_))
}
pub fn is_deletion(&self) -> bool {
matches!(self, Self::Deletion(_))
}
pub fn is_schema_addition(&self) -> bool {
matches!(self, Self::SchemaAddition(_))
}
pub fn into_addition(self) -> Option<ChunkStoreDiffAddition> {
match self {
Self::Addition(addition) => Some(addition),
_ => None,
}
}
pub fn into_virtual_addition(self) -> Option<ChunkStoreDiffVirtualAddition> {
match self {
Self::VirtualAddition(addition) => Some(addition),
_ => None,
}
}
pub fn into_deletion(self) -> Option<ChunkStoreDiffDeletion> {
match self {
Self::Deletion(deletion) => Some(deletion),
_ => None,
}
}
pub fn to_addition(&self) -> Option<&ChunkStoreDiffAddition> {
match self {
Self::Addition(addition) => Some(addition),
_ => None,
}
}
pub fn to_virtual_addition(&self) -> Option<&ChunkStoreDiffVirtualAddition> {
match self {
Self::VirtualAddition(addition) => Some(addition),
_ => None,
}
}
pub fn to_deletion(&self) -> Option<&ChunkStoreDiffDeletion> {
match self {
Self::Deletion(deletion) => Some(deletion),
_ => None,
}
}
#[inline]
pub fn delta(&self) -> i64 {
match self {
Self::Addition(_) => 1,
Self::VirtualAddition(_) | Self::SchemaAddition(_) => 0,
Self::Deletion(_) => -1,
}
}
pub fn delta_chunk(&self) -> Option<&Arc<Chunk>> {
match self {
Self::Addition(addition) => Some(addition.delta_chunk()),
Self::VirtualAddition(_) | Self::SchemaAddition(_) => None,
Self::Deletion(deletion) => Some(&deletion.chunk),
}
}
}
#[derive(Clone)]
pub struct ChunkStoreDiffAddition {
pub chunk_before_processing: Arc<Chunk>,
pub chunk_after_processing: Arc<Chunk>,
pub direct_lineage: ChunkDirectLineageReport,
}
impl std::fmt::Debug for ChunkStoreDiffAddition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
chunk_before_processing,
chunk_after_processing,
direct_lineage,
} = self;
f.debug_struct("ChunkStoreDiffAddition")
.field("chunk_before_processing", &chunk_before_processing.id())
.field("chunk_after_processing", &chunk_after_processing.id())
.field("direct_lineage", direct_lineage)
.finish()
}
}
impl PartialEq for ChunkStoreDiffAddition {
fn eq(&self, other: &Self) -> bool {
let Self {
chunk_before_processing,
chunk_after_processing,
direct_lineage,
} = self;
chunk_before_processing.id() == other.chunk_before_processing.id()
&& chunk_after_processing.id() == other.chunk_after_processing.id()
&& *direct_lineage == other.direct_lineage
}
}
impl ChunkStoreDiffAddition {
pub fn delta_chunk(&self) -> &Arc<Chunk> {
#[expect(clippy::match_same_arms)] match self.direct_lineage {
ChunkDirectLineageReport::CompactedFrom(_) => &self.chunk_before_processing,
ChunkDirectLineageReport::SplitFrom(_, _) => &self.chunk_after_processing,
_ => &self.chunk_before_processing,
}
}
#[inline]
pub fn is_static(&self) -> bool {
self.chunk_before_processing.is_static()
}
pub fn chunk_meta(&self) -> ChunkMeta {
let delta_chunk = self.delta_chunk();
let entity_path = delta_chunk.entity_path();
let components: Vec<ChunkComponentMeta> = delta_chunk
.components()
.values()
.map(|column| ChunkComponentMeta {
descriptor: column.descriptor.clone(),
inner_arrow_datatype: Some(column.list_array.value_type()),
has_data: !column.list_array.values().is_empty(),
is_static: delta_chunk.is_static(),
})
.collect();
ChunkMeta {
entity_path: entity_path.clone(),
components,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ChunkStoreDiffVirtualAddition {
pub rrd_manifest: Arc<RrdManifest>,
}
impl ChunkStoreDiffVirtualAddition {
pub fn chunk_metas(&self) -> impl Iterator<Item = ChunkMeta> {
re_tracing::profile_function!();
let component_schema_info: ahash::HashMap<
re_chunk::ComponentIdentifier,
ChunkComponentMeta,
> = self
.rrd_manifest
.sorbet_schema()
.fields()
.iter()
.filter(|f| {
re_sorbet::ColumnKind::try_from(f.as_ref()).ok()
== Some(re_sorbet::ColumnKind::Component)
})
.map(|field| {
let inner_arrow_datatype = match field.data_type() {
arrow::datatypes::DataType::List(inner)
| arrow::datatypes::DataType::LargeList(inner) => inner.data_type().clone(),
other => other.clone(),
};
let descriptor = re_sdk_types::ComponentDescriptor::from((**field).clone());
(
descriptor.component,
ChunkComponentMeta {
descriptor,
inner_arrow_datatype: Some(inner_arrow_datatype),
has_data: false,
is_static: false,
},
)
})
.collect();
#[derive(Default)]
struct VirtualComponentInfo {
is_static: bool,
has_rows: bool,
}
let mut entity_components = ahash::HashMap::<_, nohash_hasher::IntMap<_, _>>::default();
#[expect(
clippy::iter_over_hash_type,
reason = "This collects information into hashmaps"
)]
for (entity_path, per_component) in self.rrd_manifest.static_map() {
let entry = entity_components.entry(entity_path).or_default();
for &component in per_component.keys() {
entry.insert(
component,
VirtualComponentInfo {
is_static: true,
has_rows: true,
},
);
}
}
#[expect(
clippy::iter_over_hash_type,
reason = "This collects information into hashmaps"
)]
for (entity_path, per_timeline) in self.rrd_manifest.temporal_map() {
let entry = entity_components.entry(entity_path).or_default();
for per_component in per_timeline.values() {
for (&component, per_chunk) in per_component {
let has_rows = per_chunk.values().any(|e| e.num_rows > 0);
let existing = entry.entry(component).or_default();
existing.has_rows |= has_rows;
}
}
}
entity_components
.into_iter()
.map(move |(entity_path, components)| ChunkMeta {
entity_path: entity_path.clone(),
components: components
.into_iter()
.map(|(component, info)| {
let has_data = info.has_rows;
let is_static = info.is_static;
if let Some(meta) = component_schema_info.get(&component) {
ChunkComponentMeta {
has_data,
is_static,
..meta.clone()
}
} else {
ChunkComponentMeta {
has_data,
is_static,
descriptor: re_sdk_types::ComponentDescriptor::partial(component),
inner_arrow_datatype: None,
}
}
})
.collect(),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkDeletionReason {
GarbageCollection,
VirtualToPhysicalReplacement,
DanglingSplitCleanup,
Compaction,
Overwrite,
ExplicitDrop,
}
#[derive(Clone)]
pub struct ChunkStoreDiffDeletion {
pub chunk: Arc<Chunk>,
pub reason: ChunkDeletionReason,
}
impl std::fmt::Debug for ChunkStoreDiffDeletion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { chunk, reason } = self;
f.debug_tuple("ChunkStoreDiffDeletion")
.field(&chunk.id())
.field(reason)
.finish()
}
}
impl PartialEq for ChunkStoreDiffDeletion {
fn eq(&self, other: &Self) -> bool {
let Self { chunk, reason } = self;
chunk.id() == other.chunk.id() && *reason == other.reason
}
}
impl ChunkStoreDiffDeletion {
#[inline]
pub fn is_static(&self) -> bool {
self.chunk.is_static()
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use re_chunk::{RowId, TimelineName};
use re_log_types::example_components::{MyColor, MyIndex, MyPoint, MyPoints};
use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_sdk_types::ComponentDescriptor;
use super::*;
use crate::{ChunkStore, GarbageCollectionOptions};
#[derive(Default, Debug, PartialEq, Eq)]
struct GlobalCounts {
row_ids: BTreeMap<RowId, i64>,
timelines: BTreeMap<TimelineName, i64>,
entity_paths: BTreeMap<EntityPath, i64>,
component_descrs: BTreeMap<ComponentDescriptor, i64>,
times: BTreeMap<TimeInt, i64>,
num_static: i64,
}
impl GlobalCounts {
fn new(
row_ids: impl IntoIterator<Item = (RowId, i64)>, timelines: impl IntoIterator<Item = (TimelineName, i64)>, entity_paths: impl IntoIterator<Item = (EntityPath, i64)>, component_descrs: impl IntoIterator<Item = (ComponentDescriptor, i64)>, times: impl IntoIterator<Item = (TimeInt, i64)>, num_static: i64,
) -> Self {
Self {
row_ids: row_ids.into_iter().collect(),
timelines: timelines.into_iter().collect(),
entity_paths: entity_paths.into_iter().collect(),
component_descrs: component_descrs.into_iter().collect(),
times: times.into_iter().collect(),
num_static,
}
}
}
impl GlobalCounts {
fn on_events(&mut self, events: &[ChunkStoreEvent]) {
#![expect(clippy::cast_possible_wrap)]
for event in events {
let delta = event.delta();
let Some(delta_chunk) = event.delta_chunk() else {
continue;
};
let delta_rows = delta * delta_chunk.num_rows() as i64;
for row_id in delta_chunk.row_ids() {
*self.row_ids.entry(row_id).or_default() += delta;
}
*self
.entity_paths
.entry(delta_chunk.entity_path().clone())
.or_default() += delta;
for column in delta_chunk.components().values() {
let delta = event.delta() * column.list_array.iter().flatten().count() as i64;
*self
.component_descrs
.entry(column.descriptor.clone())
.or_default() += delta;
}
if delta_chunk.is_static() {
self.num_static += delta_rows;
} else {
for (&timeline, time_column) in delta_chunk.timelines() {
*self.timelines.entry(timeline).or_default() += delta_rows;
for time in time_column.times() {
*self.times.entry(time).or_default() += delta;
}
}
}
}
}
}
fn schema_addition_descriptors(event: &ChunkStoreEvent) -> BTreeSet<ComponentDescriptor> {
match &event.diff {
ChunkStoreDiff::SchemaAddition(sa) => sa
.new_columns
.iter()
.flat_map(|m| m.components.iter().map(|c| c.descriptor.clone()))
.collect(),
other => panic!("expected SchemaAddition, got {other:?}"),
}
}
#[test]
fn store_events() -> anyhow::Result<()> {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
Default::default(),
);
let mut view = GlobalCounts::default();
let timeline_frame = Timeline::new_sequence("frame");
let timeline_other = Timeline::new_duration("other");
let timeline_yet_another = Timeline::new_sequence("yet_another");
let row_id1 = RowId::new();
let timepoint1 = TimePoint::from_iter([
(timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]);
let entity_path1: EntityPath = "entity_a".into();
let chunk1 = Chunk::builder(entity_path1.clone())
.with_component_batch(
row_id1,
timepoint1.clone(),
(MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
)
.build()?;
let events = store.insert_chunk(&Arc::new(chunk1))?;
assert_eq!(events.len(), 2);
assert!(events[0].is_addition());
assert!(events[1].is_schema_addition());
assert_eq!(
schema_addition_descriptors(&events[1]),
BTreeSet::from([MyIndex::partial_descriptor()]),
);
view.on_events(&events);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), ],
[
(*timeline_frame.name(), 1),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 1),
],
[
(entity_path1.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), ],
[
(42.try_into().unwrap(), 1), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 1),
],
0,
),
view,
);
let row_id2 = RowId::new();
let timepoint2 = TimePoint::from_iter([
(timeline_frame, 42), (timeline_yet_another, 1), ]);
let entity_path2: EntityPath = "entity_b".into();
let chunk2 = {
let num_instances = 3;
let points: Vec<_> = (0..num_instances)
.map(|i| MyPoint::new(0.0, i as f32))
.collect();
let colors = vec![MyColor::from(0xFF0000FF)];
Chunk::builder(entity_path2.clone())
.with_component_batches(
row_id2,
timepoint2.clone(),
[
(MyPoints::descriptor_points(), &points as _),
(MyPoints::descriptor_colors(), &colors as _),
],
)
.build()?
};
let events = store.insert_chunk(&Arc::new(chunk2))?;
assert_eq!(events.len(), 2);
assert!(events[0].is_addition());
assert!(events[1].is_schema_addition());
assert_eq!(
schema_addition_descriptors(&events[1]),
BTreeSet::from([MyPoints::descriptor_points(), MyPoints::descriptor_colors(),]),
);
view.on_events(&events);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
],
[
(*timeline_frame.name(), 2),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), (MyPoints::descriptor_points(), 1), (MyPoints::descriptor_colors(), 1), ],
[
(42.try_into().unwrap(), 2), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 2),
],
0,
),
view,
);
let row_id3 = RowId::new();
let timepoint3 = TimePoint::default();
let chunk3 = {
let num_instances = 6;
let colors = vec![MyColor::from(0x00DD00FF); num_instances];
Chunk::builder(entity_path2.clone())
.with_component_batches(
row_id3,
timepoint3.clone(),
[
(
MyIndex::partial_descriptor(),
&MyIndex::from_iter(0..num_instances as _) as _,
),
(MyPoints::descriptor_colors(), &colors as _),
],
)
.build()?
};
let events = store.insert_chunk(&Arc::new(chunk3))?;
assert_eq!(events.len(), 2);
assert!(events[0].is_addition());
assert!(events[1].is_schema_addition());
assert_eq!(
schema_addition_descriptors(&events[1]),
BTreeSet::from([MyIndex::partial_descriptor(), MyPoints::descriptor_colors(),]),
"MyIndex is new on entity_b; Colors gets is_static transition"
);
view.on_events(&events);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
(row_id3, 1),
],
[
(*timeline_frame.name(), 2),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 2), ],
[
(MyIndex::partial_descriptor(), 2), (MyPoints::descriptor_points(), 1), (MyPoints::descriptor_colors(), 2), ],
[
(42.try_into().unwrap(), 2), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 2),
],
1,
),
view,
);
let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
for event in &events {
assert!(
event.is_deletion(),
"GC should only produce deletions, got: {:?}",
event.diff
);
}
view.on_events(&events);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 0), (row_id2, 0),
(row_id3, 1), ],
[
(*timeline_frame.name(), 0),
(*timeline_other.name(), 0),
(*timeline_yet_another.name(), 0),
],
[
(entity_path1.clone(), 0), (entity_path2.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), (MyPoints::descriptor_points(), 0), (MyPoints::descriptor_colors(), 1), ],
[
(42.try_into().unwrap(), 0), (666.try_into().unwrap(), 0),
(1.try_into().unwrap(), 0),
],
1, ),
view,
);
Ok(())
}
}