#![allow(dead_code, clippy::allow_attributes)]
use std::collections::{BTreeMap, HashMap};
use re_log_types::{EntityPath, TimeInt};
#[derive(Debug, Default)]
pub(crate) struct SegmentChunkManifest {
outstanding_time_mins_per_entity: HashMap<EntityPath, BTreeMap<TimeInt, usize>>,
entity_heads: BTreeMap<TimeInt, usize>,
locked: bool,
}
fn entity_head(per_entity: &BTreeMap<TimeInt, usize>) -> Option<TimeInt> {
per_entity.keys().next().copied()
}
fn bump_head_index(index: &mut BTreeMap<TimeInt, usize>, t: TimeInt) {
*index.entry(t).or_insert(0) += 1;
}
fn drop_head_index(index: &mut BTreeMap<TimeInt, usize>, t: TimeInt) {
use std::collections::btree_map::Entry;
if let Entry::Occupied(mut entry) = index.entry(t) {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
entry.remove();
}
}
}
impl SegmentChunkManifest {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn expect_chunk(&mut self, entity: EntityPath, time_min: TimeInt) {
re_log::debug_assert!(
!self.locked,
"expect_chunk called after lock(); the manifest is supposed to be append-only \
before lock and read-only after"
);
if time_min.is_static() {
re_log::debug_assert!(
false,
"expect_chunk called with TimeInt::STATIC; callers must pre-filter static chunks \
— they have no temporal :start and would corrupt safe_horizon"
);
re_log::error_once!(
"SegmentChunkManifest::expect_chunk received TimeInt::STATIC; dropping. \
This indicates a chunk_info ingest bug — static chunks must be filtered \
out before reaching the manifest."
);
return;
}
let per_entity = self
.outstanding_time_mins_per_entity
.entry(entity)
.or_default();
let old_head = entity_head(per_entity);
*per_entity.entry(time_min).or_insert(0) += 1;
let head_moved = old_head.is_none_or(|old| time_min < old);
if head_moved {
if let Some(old) = old_head {
drop_head_index(&mut self.entity_heads, old);
}
bump_head_index(&mut self.entity_heads, time_min);
}
}
pub(crate) fn lock(&mut self) {
re_log::debug_assert!(
!self.locked,
"lock() called twice on the same manifest; the worker is supposed to lock exactly \
once after exhausting the per-segment chunk_info list"
);
self.locked = true;
}
pub(crate) fn is_locked(&self) -> bool {
self.locked
}
#[must_use]
pub(crate) fn record_arrival(&mut self, entity: &EntityPath, time_min: TimeInt) -> bool {
let Some(per_entity) = self.outstanding_time_mins_per_entity.get_mut(entity) else {
return false;
};
let Some(old_head) = entity_head(per_entity) else {
re_log::debug_panic!(
"record_arrival: outer map contains an empty inner map for {entity:?}; \
outstanding_time_mins_per_entity invariant violated"
);
return false;
};
let Some(count) = per_entity.get_mut(&time_min) else {
return false;
};
*count -= 1;
let drained = *count == 0;
if drained {
per_entity.remove(&time_min);
}
let new_head = entity_head(per_entity);
if per_entity.is_empty() {
self.outstanding_time_mins_per_entity.remove(entity);
}
if Some(old_head) != new_head {
drop_head_index(&mut self.entity_heads, old_head);
if let Some(new) = new_head {
bump_head_index(&mut self.entity_heads, new);
}
}
true
}
pub(crate) fn safe_horizon(&self) -> Option<TimeInt> {
if !self.locked {
return None;
}
let earliest_unreceived = self.entity_heads.keys().next().copied();
match earliest_unreceived {
None => Some(TimeInt::MAX),
Some(t) => Some(t.dec()),
}
}
pub(crate) fn is_complete(&self) -> bool {
re_log::debug_assert_eq!(
self.entity_heads.is_empty(),
self.outstanding_time_mins_per_entity.is_empty(),
"entity_heads index drifted from outstanding_time_mins_per_entity"
);
self.locked && self.outstanding_time_mins_per_entity.is_empty()
}
#[cfg(test)]
pub(crate) fn outstanding_count(&self) -> usize {
self.outstanding_time_mins_per_entity
.values()
.map(|per_entity| per_entity.values().copied().sum::<usize>())
.sum()
}
#[cfg(test)]
fn check_invariants(&self) {
use std::collections::BTreeMap;
let mut expected: BTreeMap<TimeInt, usize> = BTreeMap::new();
for per_entity in self.outstanding_time_mins_per_entity.values() {
assert!(
!per_entity.is_empty(),
"empty inner map should have been pruned"
);
let head = entity_head(per_entity).expect("non-empty inner map");
*expected.entry(head).or_insert(0) += 1;
}
assert_eq!(
self.entity_heads, expected,
"entity_heads index drifted from outstanding_time_mins_per_entity"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ti(t: i64) -> TimeInt {
TimeInt::saturated_temporal_i64(t)
}
fn ep(path: &str) -> EntityPath {
EntityPath::from(path)
}
#[test]
fn safe_horizon_is_none_until_locked() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(10));
m.expect_chunk(ep("/a"), ti(20));
assert!(!m.is_locked());
assert_eq!(m.safe_horizon(), None);
assert!(!m.is_complete());
m.lock();
assert!(m.is_locked());
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(!m.is_complete());
m.check_invariants();
}
#[test]
fn safe_horizon_is_min_across_entities() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(100));
m.expect_chunk(ep("/b"), ti(50));
m.expect_chunk(ep("/c"), ti(75));
m.lock();
assert_eq!(m.safe_horizon(), Some(ti(49)));
m.check_invariants();
}
#[test]
fn record_arrival_advances_horizon() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(100));
m.expect_chunk(ep("/b"), ti(50));
m.expect_chunk(ep("/c"), ti(75));
m.lock();
assert_eq!(m.safe_horizon(), Some(ti(49)));
assert!(m.record_arrival(&ep("/b"), ti(50)));
assert_eq!(m.safe_horizon(), Some(ti(74)));
assert!(m.record_arrival(&ep("/c"), ti(75)));
assert_eq!(m.safe_horizon(), Some(ti(99)));
assert!(m.record_arrival(&ep("/a"), ti(100)));
assert_eq!(m.safe_horizon(), Some(TimeInt::MAX));
assert!(m.is_complete());
m.check_invariants();
}
#[test]
fn multiset_handles_duplicate_time_min_per_entity() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(10));
m.expect_chunk(ep("/a"), ti(10));
m.lock();
assert_eq!(m.outstanding_count(), 2);
assert!(m.record_arrival(&ep("/a"), ti(10)));
assert_eq!(m.outstanding_count(), 1);
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(10)));
assert_eq!(m.outstanding_count(), 0);
assert!(m.is_complete());
assert_eq!(m.safe_horizon(), Some(TimeInt::MAX));
m.check_invariants();
}
#[test]
fn out_of_order_arrival_keeps_horizon_pinned() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(10));
m.expect_chunk(ep("/a"), ti(20));
m.expect_chunk(ep("/a"), ti(30));
m.lock();
assert!(m.record_arrival(&ep("/a"), ti(30)));
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(20)));
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(10)));
assert_eq!(m.safe_horizon(), Some(TimeInt::MAX));
assert!(m.is_complete());
m.check_invariants();
}
#[test]
fn unexpected_arrival_returns_false() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(10));
m.lock();
assert!(!m.record_arrival(&ep("/b"), ti(10)));
assert!(!m.record_arrival(&ep("/a"), ti(999)));
assert_eq!(m.outstanding_count(), 1);
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(10)));
assert_eq!(m.outstanding_count(), 0);
assert!(m.is_complete());
m.check_invariants();
}
#[test]
fn empty_segment_after_lock_is_complete_with_max_horizon() {
let mut m = SegmentChunkManifest::new();
m.lock();
assert!(m.is_complete());
assert_eq!(m.safe_horizon(), Some(TimeInt::MAX));
m.check_invariants();
}
#[test]
fn horizon_saturates_at_time_int_min() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), TimeInt::MIN);
m.lock();
let h = m.safe_horizon().expect("locked manifest has a horizon");
assert_eq!(h, TimeInt::MIN);
m.check_invariants();
}
#[test]
fn entity_heads_stay_in_sync_under_mixed_ops() {
let mut m = SegmentChunkManifest::new();
m.expect_chunk(ep("/a"), ti(10));
m.check_invariants();
m.expect_chunk(ep("/b"), ti(30));
m.check_invariants();
m.expect_chunk(ep("/a"), ti(20)); m.check_invariants();
m.expect_chunk(ep("/b"), ti(5)); m.check_invariants();
m.expect_chunk(ep("/c"), ti(15));
m.check_invariants();
m.expect_chunk(ep("/a"), ti(10)); m.check_invariants();
m.lock();
assert_eq!(m.safe_horizon(), Some(ti(4)));
m.check_invariants();
assert!(m.record_arrival(&ep("/b"), ti(5)));
m.check_invariants();
assert!(m.record_arrival(&ep("/b"), ti(30)));
m.check_invariants();
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(10)));
m.check_invariants();
assert_eq!(m.safe_horizon(), Some(ti(9)));
assert!(m.record_arrival(&ep("/a"), ti(10)));
m.check_invariants();
assert_eq!(m.safe_horizon(), Some(ti(14)));
assert!(m.record_arrival(&ep("/c"), ti(15)));
m.check_invariants();
assert!(m.record_arrival(&ep("/a"), ti(20)));
m.check_invariants();
assert!(m.is_complete());
assert_eq!(m.safe_horizon(), Some(TimeInt::MAX));
}
}