use std::sync::Arc;
use re_chunk::Chunk;
use re_log_encoding::RrdManifest;
use re_log_types::StoreId;
use crate::{ChunkDirectLineageReport, ChunkStoreGeneration};
#[expect(unused_imports, clippy::unused_trait_names)] use crate::{ChunkId, ChunkStore, ChunkStoreSubscriber, RowId};
#[derive(Debug, Clone, PartialEq)]
pub struct ChunkStoreEvent {
pub store_id: StoreId,
pub store_generation: ChunkStoreGeneration,
pub event_id: u64,
pub diff: ChunkStoreDiff,
}
impl std::ops::Deref for ChunkStoreEvent {
type Target = ChunkStoreDiff;
#[inline]
fn deref(&self) -> &Self::Target {
&self.diff
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ChunkStoreDiff {
Addition(ChunkStoreDiffAddition),
VirtualAddition(ChunkStoreDiffVirtualAddition),
Deletion(ChunkStoreDiffDeletion),
}
impl From<ChunkStoreDiffAddition> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffAddition) -> Self {
Self::Addition(value)
}
}
impl From<ChunkStoreDiffVirtualAddition> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffVirtualAddition) -> Self {
Self::VirtualAddition(value)
}
}
impl From<ChunkStoreDiffDeletion> for ChunkStoreDiff {
fn from(value: ChunkStoreDiffDeletion) -> Self {
Self::Deletion(value)
}
}
impl ChunkStoreDiff {
pub fn addition(
chunk_before_processing: Arc<Chunk>,
chunk_after_processing: Arc<Chunk>,
direct_lineage: ChunkDirectLineageReport,
) -> Self {
Self::Addition(ChunkStoreDiffAddition {
chunk_before_processing,
chunk_after_processing,
direct_lineage,
})
}
pub fn virtual_addition(rrd_manifest: Arc<RrdManifest>) -> Self {
Self::VirtualAddition(ChunkStoreDiffVirtualAddition { rrd_manifest })
}
pub fn deletion(chunk: Arc<Chunk>) -> Self {
Self::Deletion(ChunkStoreDiffDeletion { chunk })
}
pub fn is_addition(&self) -> bool {
matches!(self, Self::Addition(_))
}
pub fn is_virtual_addition(&self) -> bool {
matches!(self, Self::VirtualAddition(_))
}
pub fn is_deletion(&self) -> bool {
matches!(self, Self::Deletion(_))
}
pub fn into_addition(self) -> Option<ChunkStoreDiffAddition> {
match self {
Self::Addition(addition) => Some(addition),
_ => None,
}
}
pub fn into_virtual_addition(self) -> Option<ChunkStoreDiffVirtualAddition> {
match self {
Self::VirtualAddition(addition) => Some(addition),
_ => None,
}
}
pub fn into_deletion(self) -> Option<ChunkStoreDiffDeletion> {
match self {
Self::Deletion(deletion) => Some(deletion),
_ => None,
}
}
pub fn to_addition(&self) -> Option<&ChunkStoreDiffAddition> {
match self {
Self::Addition(addition) => Some(addition),
_ => None,
}
}
pub fn to_virtual_addition(&self) -> Option<&ChunkStoreDiffVirtualAddition> {
match self {
Self::VirtualAddition(addition) => Some(addition),
_ => None,
}
}
pub fn to_deletion(&self) -> Option<&ChunkStoreDiffDeletion> {
match self {
Self::Deletion(deletion) => Some(deletion),
_ => None,
}
}
#[inline]
pub fn delta(&self) -> i64 {
match self {
Self::Addition(_) => 1,
Self::VirtualAddition(_) => 0,
Self::Deletion(_) => -1,
}
}
pub fn delta_chunk(&self) -> Option<&Arc<Chunk>> {
match self {
Self::Addition(addition) => Some(addition.delta_chunk()),
Self::VirtualAddition(_) => None,
Self::Deletion(deletion) => Some(&deletion.chunk),
}
}
}
#[derive(Debug, Clone)]
pub struct ChunkStoreDiffAddition {
pub chunk_before_processing: Arc<Chunk>,
pub chunk_after_processing: Arc<Chunk>,
pub direct_lineage: ChunkDirectLineageReport,
}
impl PartialEq for ChunkStoreDiffAddition {
fn eq(&self, other: &Self) -> bool {
let Self {
chunk_before_processing,
chunk_after_processing,
direct_lineage,
} = self;
chunk_before_processing.id() == other.chunk_before_processing.id()
&& chunk_after_processing.id() == other.chunk_after_processing.id()
&& *direct_lineage == other.direct_lineage
}
}
impl ChunkStoreDiffAddition {
pub fn delta_chunk(&self) -> &Arc<Chunk> {
#[expect(clippy::match_same_arms)] match self.direct_lineage {
ChunkDirectLineageReport::CompactedFrom(_) => &self.chunk_before_processing,
ChunkDirectLineageReport::SplitFrom(_, _) => &self.chunk_after_processing,
_ => &self.chunk_before_processing,
}
}
#[inline]
pub fn is_static(&self) -> bool {
self.chunk_before_processing.is_static()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ChunkStoreDiffVirtualAddition {
pub rrd_manifest: Arc<RrdManifest>,
}
#[derive(Debug, Clone)]
pub struct ChunkStoreDiffDeletion {
pub chunk: Arc<Chunk>,
}
impl PartialEq for ChunkStoreDiffDeletion {
fn eq(&self, other: &Self) -> bool {
let Self { chunk } = self;
chunk.id() == other.chunk.id()
}
}
impl ChunkStoreDiffDeletion {
#[inline]
pub fn is_static(&self) -> bool {
self.chunk.is_static()
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use re_chunk::{RowId, TimelineName};
use re_log_types::example_components::{MyColor, MyIndex, MyPoint, MyPoints};
use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_sdk_types::ComponentDescriptor;
use super::*;
use crate::{ChunkStore, GarbageCollectionOptions};
#[derive(Default, Debug, PartialEq, Eq)]
struct GlobalCounts {
row_ids: BTreeMap<RowId, i64>,
timelines: BTreeMap<TimelineName, i64>,
entity_paths: BTreeMap<EntityPath, i64>,
component_descrs: BTreeMap<ComponentDescriptor, i64>,
times: BTreeMap<TimeInt, i64>,
num_static: i64,
}
impl GlobalCounts {
fn new(
row_ids: impl IntoIterator<Item = (RowId, i64)>, timelines: impl IntoIterator<Item = (TimelineName, i64)>, entity_paths: impl IntoIterator<Item = (EntityPath, i64)>, component_descrs: impl IntoIterator<Item = (ComponentDescriptor, i64)>, times: impl IntoIterator<Item = (TimeInt, i64)>, num_static: i64,
) -> Self {
Self {
row_ids: row_ids.into_iter().collect(),
timelines: timelines.into_iter().collect(),
entity_paths: entity_paths.into_iter().collect(),
component_descrs: component_descrs.into_iter().collect(),
times: times.into_iter().collect(),
num_static,
}
}
}
impl GlobalCounts {
fn on_events(&mut self, events: &[ChunkStoreEvent]) {
#![expect(clippy::cast_possible_wrap)]
for event in events {
let delta = event.delta();
let delta_chunk = event.delta_chunk().unwrap();
let delta_rows = delta * delta_chunk.num_rows() as i64;
for row_id in delta_chunk.row_ids() {
*self.row_ids.entry(row_id).or_default() += delta;
}
*self
.entity_paths
.entry(delta_chunk.entity_path().clone())
.or_default() += delta;
for column in delta_chunk.components().values() {
let delta = event.delta() * column.list_array.iter().flatten().count() as i64;
*self
.component_descrs
.entry(column.descriptor.clone())
.or_default() += delta;
}
if delta_chunk.is_static() {
self.num_static += delta_rows;
} else {
for (&timeline, time_column) in delta_chunk.timelines() {
*self.timelines.entry(timeline).or_default() += delta_rows;
for time in time_column.times() {
*self.times.entry(time).or_default() += delta;
}
}
}
}
}
}
#[test]
fn store_events() -> anyhow::Result<()> {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
Default::default(),
);
let mut view = GlobalCounts::default();
let timeline_frame = Timeline::new_sequence("frame");
let timeline_other = Timeline::new_duration("other");
let timeline_yet_another = Timeline::new_sequence("yet_another");
let row_id1 = RowId::new();
let timepoint1 = TimePoint::from_iter([
(timeline_frame, 42), (timeline_other, 666), (timeline_yet_another, 1), ]);
let entity_path1: EntityPath = "entity_a".into();
let chunk1 = Chunk::builder(entity_path1.clone())
.with_component_batch(
row_id1,
timepoint1.clone(),
(MyIndex::partial_descriptor(), &MyIndex::from_iter(0..10)),
)
.build()?;
view.on_events(&store.insert_chunk(&Arc::new(chunk1))?);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), ],
[
(*timeline_frame.name(), 1),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 1),
],
[
(entity_path1.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), ],
[
(42.try_into().unwrap(), 1), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 1),
],
0,
),
view,
);
let row_id2 = RowId::new();
let timepoint2 = TimePoint::from_iter([
(timeline_frame, 42), (timeline_yet_another, 1), ]);
let entity_path2: EntityPath = "entity_b".into();
let chunk2 = {
let num_instances = 3;
let points: Vec<_> = (0..num_instances)
.map(|i| MyPoint::new(0.0, i as f32))
.collect();
let colors = vec![MyColor::from(0xFF0000FF)];
Chunk::builder(entity_path2.clone())
.with_component_batches(
row_id2,
timepoint2.clone(),
[
(MyPoints::descriptor_points(), &points as _),
(MyPoints::descriptor_colors(), &colors as _),
],
)
.build()?
};
view.on_events(&store.insert_chunk(&Arc::new(chunk2))?);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
],
[
(*timeline_frame.name(), 2),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), (MyPoints::descriptor_points(), 1), (MyPoints::descriptor_colors(), 1), ],
[
(42.try_into().unwrap(), 2), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 2),
],
0,
),
view,
);
let row_id3 = RowId::new();
let timepoint3 = TimePoint::default();
let chunk3 = {
let num_instances = 6;
let colors = vec![MyColor::from(0x00DD00FF); num_instances];
Chunk::builder(entity_path2.clone())
.with_component_batches(
row_id3,
timepoint3.clone(),
[
(
MyIndex::partial_descriptor(),
&MyIndex::from_iter(0..num_instances as _) as _,
),
(MyPoints::descriptor_colors(), &colors as _),
],
)
.build()?
};
view.on_events(&store.insert_chunk(&Arc::new(chunk3))?);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 1), (row_id2, 1),
(row_id3, 1),
],
[
(*timeline_frame.name(), 2),
(*timeline_other.name(), 1),
(*timeline_yet_another.name(), 2),
],
[
(entity_path1.clone(), 1), (entity_path2.clone(), 2), ],
[
(MyIndex::partial_descriptor(), 2), (MyPoints::descriptor_points(), 1), (MyPoints::descriptor_colors(), 2), ],
[
(42.try_into().unwrap(), 2), (666.try_into().unwrap(), 1),
(1.try_into().unwrap(), 2),
],
1,
),
view,
);
let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
view.on_events(&events);
similar_asserts::assert_eq!(
GlobalCounts::new(
[
(row_id1, 0), (row_id2, 0),
(row_id3, 1), ],
[
(*timeline_frame.name(), 0),
(*timeline_other.name(), 0),
(*timeline_yet_another.name(), 0),
],
[
(entity_path1.clone(), 0), (entity_path2.clone(), 1), ],
[
(MyIndex::partial_descriptor(), 1), (MyPoints::descriptor_points(), 0), (MyPoints::descriptor_colors(), 1), ],
[
(42.try_into().unwrap(), 0), (666.try_into().unwrap(), 0),
(1.try_into().unwrap(), 0),
],
1, ),
view,
);
Ok(())
}
}