use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use itertools::Itertools as _;
use re_chunk::{Chunk, ChunkId};
use crate::ChunkStore;
#[derive(Clone, PartialEq, Eq)]
pub enum ChunkDirectLineage {
SplitFrom(ChunkId, Vec<ChunkId>),
CompactedFrom(BTreeSet<ChunkId>),
RootFromManifest { is_static: bool },
Volatile,
}
impl re_byte_size::SizeBytes for ChunkDirectLineage {
fn heap_size_bytes(&self) -> u64 {
match self {
Self::SplitFrom(chunk_id, chunk_ids) => {
chunk_id.heap_size_bytes() + chunk_ids.heap_size_bytes()
}
Self::CompactedFrom(btree_set) => btree_set.heap_size_bytes(),
Self::RootFromManifest { .. } | Self::Volatile => 0,
}
}
}
impl std::fmt::Debug for ChunkDirectLineage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SplitFrom(chunk_id, sibling_ids) => f.write_fmt(format_args!(
"split-from:{chunk_id} siblings:[{}]",
sibling_ids.iter().map(|id| id.to_string()).join(",")
)),
Self::CompactedFrom(chunk_ids) => f.write_fmt(format_args!(
"compacted-from:[{}]",
chunk_ids.iter().join(", ")
)),
Self::RootFromManifest { is_static } => {
write!(f, "origin:(static: {is_static})")
}
Self::Volatile => f.write_str("origin:<volatile> (cannot be re-fetched)"),
}
}
}
impl From<ChunkDirectLineageReport> for ChunkDirectLineage {
fn from(value: ChunkDirectLineageReport) -> Self {
(&value).into()
}
}
impl From<&ChunkDirectLineageReport> for ChunkDirectLineage {
fn from(report: &ChunkDirectLineageReport) -> Self {
match report {
ChunkDirectLineageReport::SplitFrom(chunk, siblings) => {
Self::SplitFrom(chunk.id(), siblings.iter().map(|c| c.id()).collect())
}
ChunkDirectLineageReport::CompactedFrom(chunks) => {
Self::CompactedFrom(chunks.keys().copied().collect())
}
ChunkDirectLineageReport::RootFromManifest { is_static } => Self::RootFromManifest {
is_static: *is_static,
},
ChunkDirectLineageReport::Volatile => Self::Volatile,
}
}
}
impl ChunkDirectLineage {
pub fn to_report(&self, store: &ChunkStore) -> Option<ChunkDirectLineageReport> {
match self {
Self::SplitFrom(chunk_id, sibling_ids) => {
let mut siblings = Vec::new();
for sibling_id in sibling_ids {
siblings.push(
store
.physical_chunks_per_chunk_id
.get(sibling_id)
.cloned()?,
);
}
Some(ChunkDirectLineageReport::SplitFrom(
store.physical_chunks_per_chunk_id.get(chunk_id).cloned()?,
siblings,
))
}
Self::CompactedFrom(chunk_ids) => {
let mut chunks = BTreeMap::new();
for chunk_id in chunk_ids {
chunks.insert(
*chunk_id,
store.physical_chunks_per_chunk_id.get(chunk_id).cloned()?,
);
}
Some(ChunkDirectLineageReport::CompactedFrom(chunks))
}
Self::RootFromManifest { is_static } => {
Some(ChunkDirectLineageReport::RootFromManifest {
is_static: *is_static,
})
}
Self::Volatile => Some(ChunkDirectLineageReport::Volatile),
}
}
}
#[derive(Clone, PartialEq)]
pub enum ChunkDirectLineageReport {
SplitFrom(Arc<Chunk>, Vec<Arc<Chunk>>),
CompactedFrom(BTreeMap<ChunkId, Arc<Chunk>>),
RootFromManifest { is_static: bool },
Volatile,
}
impl std::fmt::Debug for ChunkDirectLineageReport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SplitFrom(parent, siblings) => f
.debug_struct("SplitFrom")
.field("parent", &parent.id())
.field("siblings", &siblings.iter().map(|c| c.id()).format(", "))
.finish(),
Self::CompactedFrom(map) => f
.debug_map()
.entries(map.iter().map(|(k, v)| (k, v.id())))
.finish(),
Self::RootFromManifest { is_static } => {
write!(f, "RootFromManifest(static: {is_static})")
}
Self::Volatile => write!(f, "Volatile"),
}
}
}
impl ChunkStore {
pub fn format_lineage(&self, chunk_id: &ChunkId) -> String {
fn compute_staticness(store: &ChunkStore, chunk_id: &ChunkId) -> &'static str {
if let Some(chunk) = store.physical_chunks_per_chunk_id.get(chunk_id) {
return if chunk.is_static() { "yes" } else { "no" };
}
for root_id in store.find_root_manifest_chunks(chunk_id) {
if let Some(ChunkDirectLineage::RootFromManifest { is_static }) =
store.chunks_lineage.get(&root_id)
{
return if *is_static { "yes" } else { "no" };
}
}
"unknown"
}
#[expect(clippy::string_add)] fn recurse(store: &ChunkStore, chunk_id: &ChunkId, depth: usize) -> String {
let chunk = store.physical_chunks_per_chunk_id.get(chunk_id);
let lineage = store.chunks_lineage.get(chunk_id);
let status = if chunk.is_some() {
"loaded"
} else {
"offloaded"
};
let is_static = compute_staticness(store, chunk_id);
let width = (depth + 1) * 4;
let sibling_ids = match lineage {
Some(ChunkDirectLineage::SplitFrom(_, sibling_ids)) => sibling_ids.as_slice(),
_ => &[],
};
(if sibling_ids.is_empty() {
format!("{chunk_id} (status:{status} static:{is_static})\n")
} else {
let sibling_ids = sibling_ids.iter().map(|id| id.to_string()).join(",");
format!(
"{chunk_id} (status:{status} static:{is_static} siblings:[{sibling_ids}])\n"
)
}) + &match lineage {
Some(ChunkDirectLineage::SplitFrom(id, _sibling_ids)) => {
format!(
"{:width$}split-from: {}",
"",
recurse(store, id, depth + 1),
width = width
)
}
Some(ChunkDirectLineage::CompactedFrom(ids)) => ids
.iter()
.map(|id| {
format!(
"{:width$}compacted-from: {}",
"",
recurse(store, id, depth + 1),
width = width
)
})
.join("\n"),
Some(lineage) => format!("{:width$}{lineage:?}", "", width = width),
None => format!("{:width$}<invalid>", "", width = width),
}
}
recurse(self, chunk_id, 0)
}
pub fn is_root_chunk(&self, chunk_id: &ChunkId) -> bool {
let Some(lineage) = self.chunks_lineage.get(chunk_id) else {
return true;
};
matches!(
lineage,
ChunkDirectLineage::RootFromManifest { .. } | ChunkDirectLineage::Volatile
)
}
pub fn find_root_chunks(&self, chunk_id: &ChunkId) -> Vec<ChunkId> {
let mut roots = Vec::new();
self.collect_root_ids(chunk_id, &mut roots);
roots
}
pub fn collect_root_ids(&self, chunk_id: &ChunkId, roots: &mut Vec<ChunkId>) {
let lineage = self.chunks_lineage.get(chunk_id);
match lineage {
Some(ChunkDirectLineage::SplitFrom(chunk_id, _sibling_ids)) => {
self.collect_root_ids(chunk_id, roots);
}
Some(ChunkDirectLineage::CompactedFrom(chunk_ids)) => {
for chunk_id in chunk_ids {
self.collect_root_ids(chunk_id, roots);
}
}
Some(ChunkDirectLineage::RootFromManifest { .. } | ChunkDirectLineage::Volatile) => {
roots.push(*chunk_id);
}
None => {}
}
}
pub fn find_root_manifest_chunks(&self, chunk_id: &ChunkId) -> Vec<ChunkId> {
let mut roots = Vec::new();
self.collect_root_manifest_chunks(chunk_id, &mut roots);
roots
}
fn collect_root_manifest_chunks(&self, chunk_id: &ChunkId, roots: &mut Vec<ChunkId>) {
let lineage = self.chunks_lineage.get(chunk_id);
match lineage {
Some(ChunkDirectLineage::SplitFrom(chunk_id, _sibling_ids)) => {
self.collect_root_manifest_chunks(chunk_id, roots);
}
Some(ChunkDirectLineage::CompactedFrom(chunk_ids)) => {
for chunk_id in chunk_ids {
self.collect_root_manifest_chunks(chunk_id, roots);
}
}
Some(ChunkDirectLineage::RootFromManifest { .. }) => {
roots.push(*chunk_id);
}
_ => {}
}
}
pub fn collect_physical_descendents_of(
&self,
chunk_id: &ChunkId,
descendents: &mut Vec<ChunkId>,
) {
let is_physical = |c: &&ChunkId| self.physical_chunks_per_chunk_id.contains_key(c);
if is_physical(&chunk_id) {
descendents.push(*chunk_id);
} else if let Some(split_chunks) = self.dangling_splits.get(chunk_id) {
descendents.extend(split_chunks.iter().filter(is_physical).copied());
} else {
let mut source_id = *chunk_id;
let compacted = loop {
let Some(chunk_id) = self.leaky_compactions.get(&source_id) else {
break None;
};
if is_physical(&chunk_id) {
break Some(*chunk_id);
}
source_id = *chunk_id;
};
if let Some(chunk_id) = compacted {
descendents.push(chunk_id);
}
}
}
pub fn descends_from_a_split(&self, chunk_id: &ChunkId) -> bool {
if cfg!(debug_assertions) {
fn recurse(store: &ChunkStore, chunk_id: &ChunkId, compaction_found: bool) -> bool {
let lineage = store.chunks_lineage.get(chunk_id);
match lineage {
Some(ChunkDirectLineage::SplitFrom(_chunk_id, _sibling_ids)) => {
re_log::debug_assert!(
!compaction_found,
"Chunk {chunk_id} mixes compaction and splitting in its lineage tree"
);
true
}
Some(ChunkDirectLineage::CompactedFrom(chunk_ids)) => {
for chunk_id in chunk_ids {
if recurse(store, chunk_id, true) {
return true;
}
}
false
}
_ => false,
}
}
let compaction_found = false;
recurse(self, chunk_id, compaction_found)
} else {
matches!(
self.chunks_lineage.get(chunk_id),
Some(ChunkDirectLineage::SplitFrom { .. })
)
}
}
pub fn descends_from_a_compaction(&self, chunk_id: &ChunkId) -> bool {
fn recurse(store: &ChunkStore, chunk_id: &ChunkId, split_found: bool) -> bool {
let lineage = store.chunks_lineage.get(chunk_id);
match lineage {
Some(ChunkDirectLineage::SplitFrom(chunk_id, _sibling_ids)) => {
recurse(store, chunk_id, true)
}
Some(ChunkDirectLineage::CompactedFrom(_chunk_ids)) => {
#[expect(clippy::manual_assert)]
if cfg!(debug_assertions) && split_found {
panic!(
"Chunk {chunk_id} mixes compaction and splitting in its lineage tree"
)
}
true
}
_ => false,
}
}
let split_found = false;
recurse(self, chunk_id, split_found)
}
pub fn direct_lineage(&self, chunk_id: &ChunkId) -> Option<&ChunkDirectLineage> {
self.chunks_lineage.get(chunk_id)
}
}
#[cfg(test)]
#[expect(clippy::bool_assert_comparison)] mod tests {
use re_chunk::{Chunk, EntityPath, RowId, Timeline};
use re_log_encoding::RrdManifest;
use re_log_types::StoreId;
use re_log_types::example_components::{MyPoint, MyPoints};
use re_log_types::external::re_tuid::Tuid;
use crate::ChunkStoreConfig;
use super::*;
#[test]
fn lineage_basics_volatile() {
let mut store = ChunkStore::new(
StoreId::recording("app_id", "rec_id"),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 3, chunk_max_rows_if_unsorted: 3, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunks = [
build_chunk(1),
build_chunk(1),
build_chunk(1),
build_chunk(1),
build_chunk(3),
build_chunk(3),
build_chunk(6),
];
for chunk in &chunks {
let events = store.insert_chunk(chunk).unwrap();
for diff in events.iter().filter_map(|event| event.to_addition()) {
if let ChunkDirectLineageReport::SplitFrom(src, _siblings) = &diff.direct_lineage {
assert_eq!(
diff.chunk_before_processing.id(),
src.id(),
"splits are guaranteed flat, and therefore the origin of a split should always match the unprocessed chunk",
);
}
}
}
insta::assert_snapshot!("lineage_volatile", generate_redacted_lineage_report(&store));
for chunk in store.physical_chunks_per_chunk_id.values() {
assert!(
store
.find_root_chunks(&chunk.id())
.into_iter()
.all(|root_chunk_id| chunks.iter().any(|c| c.id() == root_chunk_id)),
"all these chunks' respective roots should come from the starting set"
);
assert!(
store.find_root_manifest_chunks(&chunk.id()).is_empty(),
"none of these chunks should have a root RRD manifest"
);
}
}
#[test]
fn lineage_basics_bootstrapped() {
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunks = [
build_chunk(1),
build_chunk(1),
build_chunk(1),
build_chunk(1),
build_chunk(3),
build_chunk(3),
build_chunk(6),
];
let store_id = StoreId::recording("app_id", "rec_id");
let rrd_manifest =
RrdManifest::build_in_memory_from_chunks(store_id.clone(), chunks.iter().map(|c| &**c))
.unwrap();
let mut store = ChunkStore::new(
store_id,
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 3, chunk_max_rows_if_unsorted: 3, },
);
let _ignored_events = store.insert_rrd_manifest(rrd_manifest.clone());
for chunk in &chunks {
let events = store.insert_chunk(chunk).unwrap();
assert!(
events.len() > 1,
"removal of the ghost index + insertion(s) for the physical chunk, got {events:#?}"
);
{
let diff = events[0].to_deletion().unwrap();
assert_eq!(chunk.id(), diff.chunk.id(), "ghost index");
}
for diff in events
.iter()
.filter_map(|event| event.to_addition())
.skip(1)
{
if let ChunkDirectLineageReport::SplitFrom(src, _siblings) = &diff.direct_lineage {
assert_eq!(
diff.chunk_before_processing.id(),
src.id(),
"splits are guaranteed flat, and therefore the origin of a split should always match the unprocessed chunk",
);
}
}
}
insta::assert_snapshot!(
"lineage_bootstrapped",
generate_redacted_lineage_report(&store)
);
for chunk in store.physical_chunks_per_chunk_id.values() {
assert!(
store
.find_root_chunks(&chunk.id())
.into_iter()
.all(|root_chunk_id| chunks.iter().any(|c| c.id() == root_chunk_id)),
"all these chunks' respective roots should come from the starting set"
);
for root_chunk_id in store.find_root_manifest_chunks(&chunk.id()) {
assert!(
chunks.iter().any(|c| c.id() == root_chunk_id),
"all these chunks' respective roots should come from the starting manifest",
);
}
}
}
#[test]
fn lineage_dangling_splits() {
let mut store = ChunkStore::new(
StoreId::recording("app_id", "rec_id"),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 1, chunk_max_rows_if_unsorted: 1, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunk = build_chunk(4);
let events = store.insert_chunk(&chunk).unwrap();
assert_eq!(5, events.len());
assert!(
events[4].is_schema_addition(),
"the first write should emit a schema addition for newly seen columns"
);
for event in &events[..4] {
assert_eq!(true, event.is_addition());
let siblings = events[..4]
.iter()
.filter(|e| e.delta_chunk().unwrap().id() != event.delta_chunk().unwrap().id())
.map(|e| e.delta_chunk().unwrap().clone())
.collect_vec();
assert_eq!(
ChunkDirectLineageReport::SplitFrom(chunk.clone(), siblings),
event.to_addition().unwrap().direct_lineage,
);
}
assert_eq!(4, store.num_physical_chunks());
for chunk in store.iter_physical_chunks() {
assert_eq!(true, store.descends_from_a_split(&chunk.id()));
assert_eq!(false, store.descends_from_a_compaction(&chunk.id()));
}
let (events, _) = store.gc(&crate::GarbageCollectionOptions {
target: crate::GarbageCollectionTarget::DropAtLeastFraction(0.5),
time_budget: std::time::Duration::MAX,
protect_latest: 0,
protected_time_ranges: Default::default(),
protected_chunks: Default::default(),
furthest_from: None,
perform_deep_deletions: false,
});
assert_eq!(2, events.len());
for event in events {
assert_eq!(true, event.is_deletion());
}
assert_eq!(2, store.num_physical_chunks());
for chunk in store.iter_physical_chunks() {
assert_eq!(true, store.descends_from_a_split(&chunk.id()));
assert_eq!(false, store.descends_from_a_compaction(&chunk.id()));
}
let events = store.insert_chunk(&chunk).unwrap();
assert_eq!(6, events.len());
for event in &events[..2] {
assert_eq!(true, event.is_deletion()); }
for event in &events[2..] {
assert_eq!(true, event.is_addition()); }
assert_eq!(4, store.num_physical_chunks());
for chunk in store.iter_physical_chunks() {
assert_eq!(true, store.descends_from_a_split(&chunk.id()));
assert_eq!(false, store.descends_from_a_compaction(&chunk.id()));
}
}
#[test]
fn splits_cannot_compact() {
let mut store = ChunkStore::new(
StoreId::recording("app_id", "rec_id"),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 10, chunk_max_rows_if_unsorted: 10, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunk1 = build_chunk(12);
let chunk2 = build_chunk(1);
let chunk3 = build_chunk(2);
let events = store.insert_chunk(&chunk1).unwrap();
assert_eq!(3, events.len());
for event in &events[..2] {
assert_eq!(true, event.is_addition());
}
assert!(
events[2].is_schema_addition(),
"the first write should emit a schema addition for newly seen columns"
);
assert_eq!(2, store.num_physical_chunks());
for chunk in store.iter_physical_chunks() {
assert_eq!(true, store.descends_from_a_split(&chunk.id()));
assert_eq!(false, store.descends_from_a_compaction(&chunk.id()));
}
let events = store.insert_chunk(&chunk2).unwrap();
assert_eq!(1, events.len());
for event in events {
assert_eq!(true, event.is_addition());
}
assert_eq!(3, store.num_physical_chunks());
{
let chunk_ids = store
.physical_chunk_ids_per_min_row_id
.values()
.collect_vec();
assert_eq!(true, store.descends_from_a_split(chunk_ids[0]));
assert_eq!(false, store.descends_from_a_compaction(chunk_ids[0]));
assert_eq!(true, store.descends_from_a_split(chunk_ids[1]));
assert_eq!(false, store.descends_from_a_compaction(chunk_ids[1]));
assert_eq!(false, store.descends_from_a_split(chunk_ids[2]));
assert_eq!(false, store.descends_from_a_compaction(chunk_ids[2]));
}
let events = store.insert_chunk(&chunk3).unwrap();
assert_eq!(1, events.len());
assert_eq!(true, events[0].is_addition());
assert_eq!(&chunk3, events[0].delta_chunk().unwrap());
assert_eq!(
ChunkDirectLineageReport::CompactedFrom(
[(chunk2.id(), chunk2.clone()), (chunk3.id(), chunk3.clone())]
.into_iter()
.collect()
),
events[0].to_addition().unwrap().direct_lineage
);
assert_eq!(3, store.num_physical_chunks());
{
let chunk_ids = store
.physical_chunk_ids_per_min_row_id
.values()
.collect_vec();
assert_eq!(true, store.descends_from_a_split(chunk_ids[0]));
assert_eq!(false, store.descends_from_a_compaction(chunk_ids[0]));
assert_eq!(true, store.descends_from_a_split(chunk_ids[1]));
assert_eq!(false, store.descends_from_a_compaction(chunk_ids[1]));
assert_eq!(false, store.descends_from_a_split(chunk_ids[2]));
assert_eq!(true, store.descends_from_a_compaction(chunk_ids[2]));
}
}
#[test]
fn compacted_cannot_split() {
let mut store = ChunkStore::new(
StoreId::recording("app_id", "rec_id"),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 10, chunk_max_rows_if_unsorted: 10, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunk1 = build_chunk(9);
let chunk2 = build_chunk(9);
let events = store.insert_chunk(&chunk1).unwrap();
assert_eq!(2, events.len());
assert_eq!(true, events[0].is_addition());
assert!(
events[1].is_schema_addition(),
"the first write should emit a schema addition for newly seen columns"
);
let events = store.insert_chunk(&chunk2).unwrap();
assert_eq!(1, events.len());
assert_eq!(true, events[0].is_addition());
assert_eq!(2, store.num_physical_chunks());
assert_eq!(
vec![chunk1.id(), chunk2.id()],
store
.physical_chunks_per_chunk_id
.keys()
.copied()
.collect_vec()
);
for chunk in store.iter_physical_chunks() {
assert_eq!(false, store.descends_from_a_split(&chunk.id()));
assert_eq!(false, store.descends_from_a_compaction(&chunk.id()));
}
}
#[test]
fn linear_recursive_compaction() {
let mut store = ChunkStore::new(
StoreId::recording("app_id", "rec_id"),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 10, chunk_max_rows_if_unsorted: 10, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunks = (0..10).map(|_| build_chunk(1)).collect_vec();
let mut prev_chunk: Option<Arc<Chunk>> = None;
let mut is_first_insert = true;
for chunk in chunks {
let mut events = store.insert_chunk(&chunk).unwrap();
if is_first_insert {
assert_eq!(2, events.len());
assert!(
events[1].is_schema_addition(),
"the first write should emit a schema addition for newly seen columns"
);
is_first_insert = false;
} else {
assert_eq!(1, events.len());
}
let event = events.remove(0);
let event = event.to_addition().unwrap();
assert_eq!(chunk.id(), event.chunk_before_processing.id());
assert_eq!(
false,
store.descends_from_a_split(&event.chunk_before_processing.id())
);
assert_eq!(
false,
store.descends_from_a_compaction(&event.chunk_before_processing.id())
);
assert_eq!(
false,
store.descends_from_a_split(&event.chunk_after_processing.id())
);
let lineage: ChunkDirectLineage = event.direct_lineage.clone().into();
if let Some(prev_chunk) = prev_chunk.take() {
let expected = ChunkDirectLineage::CompactedFrom(
[chunk.id(), prev_chunk.id()].into_iter().collect(),
);
assert_eq!(expected, lineage);
assert_eq!(
true,
store.descends_from_a_compaction(&event.chunk_after_processing.id())
);
} else {
let expected = ChunkDirectLineage::Volatile;
assert_eq!(expected, lineage);
assert_eq!(
false,
store.descends_from_a_compaction(&event.chunk_after_processing.id())
);
}
prev_chunk = Some(event.chunk_after_processing.clone());
}
assert_eq!(1, store.num_physical_chunks());
}
#[test]
fn lineage_leaky_compactions() {
let store_id = StoreId::recording("app_id", "rec_id");
let mut store = ChunkStore::new(
store_id.clone(),
ChunkStoreConfig {
enable_changelog: false, chunk_max_bytes: u64::MAX,
chunk_max_rows: 10, chunk_max_rows_if_unsorted: 10, },
);
let mut next_chunk_id = next_chunk_id_generator(1);
let entity_path = EntityPath::from("this/that");
let timepoint = [(Timeline::new_sequence("frame"), 1)];
let points = &[MyPoint::new(1.0, 1.0)];
let mut build_chunk = |num_rows: usize| {
let mut builder = Chunk::builder_with_id(next_chunk_id(), entity_path.clone());
for _ in 0..num_rows {
builder = builder.with_component_batches(
RowId::new(),
timepoint,
[(MyPoints::descriptor_points(), points as _)],
);
}
Arc::new(builder.build().unwrap())
};
let chunk1 = build_chunk(1);
let chunk2 = build_chunk(1);
let chunk3 = build_chunk(1);
for _ in 0..3 {
store.insert_chunk(&chunk1).unwrap();
store.insert_chunk(&chunk2).unwrap();
store.insert_chunk(&chunk3).unwrap();
}
assert_eq!(1, store.num_physical_chunks());
insta::assert_snapshot!(
"lineage_leaky_compactions",
generate_redacted_lineage_report(&store)
);
}
fn next_chunk_id_generator(prefix: u64) -> impl FnMut() -> re_chunk::ChunkId {
let mut chunk_id = re_chunk::ChunkId::from_tuid(Tuid::from_nanos_and_inc(prefix, 0));
move || {
chunk_id = chunk_id.next();
chunk_id
}
}
fn generate_redacted_lineage_report(store: &ChunkStore) -> String {
let mut next_chunk_id = next_chunk_id_generator(1337);
let redacted_chunk_ids: ahash::HashMap<_, _> = store
.chunks_lineage
.keys()
.sorted()
.map(|chunk_id| (*chunk_id, next_chunk_id()))
.collect();
let mut lineage_report = Vec::new();
for chunk_id in store.physical_chunks_per_chunk_id.keys().sorted() {
lineage_report.push(store.format_lineage(chunk_id));
}
let mut lineage_report = lineage_report.join("\n");
for (chunk_id, redacted_chunk_id) in redacted_chunk_ids {
lineage_report =
lineage_report.replace(&chunk_id.to_string(), &redacted_chunk_id.to_string());
}
lineage_report
}
}