use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use ahash::{HashMap, HashSet};
use arrow::array::{Int32Array, RecordBatch};
use arrow::compute::take_record_batch;
use itertools::Itertools as _;
use nohash_hasher::{IntMap, IntSet};
use re_chunk::{ChunkId, TimeInt, Timeline, TimelineName};
use re_chunk_store::ChunkStoreEvent;
use re_log_encoding::{CodecResult, RrdManifest, RrdManifestTemporalMapEntry};
use re_log_types::{AbsoluteTimeRange, StoreKind};
use crate::sorted_range_map::SortedRangeMap;
#[derive(thiserror::Error, Debug)]
pub enum PrefetchError {
#[error("No manifest available")]
NoManifest,
#[error("Unknown timeline: {0:?}")]
UnknownTimeline(Timeline),
#[error("Codec: {0}")]
Codec(#[from] re_log_encoding::CodecError),
#[error("Arrow: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("Row index too large: {0}")]
BadIndex(usize),
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum LoadState {
#[default]
Unloaded,
InTransit,
Loaded,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ChunkPrefetchOptions {
pub timeline: Timeline,
pub desired_range: AbsoluteTimeRange,
pub total_byte_budget: u64,
pub delta_byte_budget: u64,
}
#[derive(Clone, Debug, Default)]
pub struct ChunkInfo {
pub state: LoadState,
pub temporal: Option<TemporalChunkInfo>,
}
#[derive(Clone, Copy, Debug)]
pub struct TemporalChunkInfo {
pub timeline: Timeline,
pub time_range: AbsoluteTimeRange,
pub num_rows: u64,
}
#[derive(Default, Debug, Clone)]
pub struct RrdManifestIndex {
manifest: Option<RrdManifest>,
remote_chunks: HashMap<ChunkId, ChunkInfo>,
parents: HashMap<ChunkId, HashSet<ChunkId>>,
has_deleted: bool,
timelines: BTreeMap<TimelineName, AbsoluteTimeRange>,
pub entity_tree: crate::EntityTree,
entity_has_temporal_data_on_timeline: IntMap<re_chunk::EntityPath, IntSet<TimelineName>>,
entity_has_static_data: IntSet<re_chunk::EntityPath>,
native_static_map: re_log_encoding::RrdManifestStaticMap,
native_temporal_map: re_log_encoding::RrdManifestTemporalMap,
chunk_intervals: BTreeMap<Timeline, SortedRangeMap<TimeInt, ChunkId>>,
manifest_row_from_chunk_id: BTreeMap<ChunkId, usize>,
}
impl RrdManifestIndex {
pub fn append(&mut self, manifest: RrdManifest) -> CodecResult<()> {
re_tracing::profile_function!();
self.native_static_map = manifest.get_static_data_as_a_map()?;
self.native_temporal_map = manifest.get_temporal_data_as_a_map()?;
self.update_timeline_stats();
self.update_entity_tree();
self.update_entity_temporal_data();
self.update_entity_static_data();
self.update_chunk_intervals();
for chunk_id in manifest.col_chunk_id()? {
self.remote_chunks.entry(chunk_id).or_default();
}
for timelines in self.native_temporal_map.values() {
for (&timeline, comps) in timelines {
for chunks in comps.values() {
for (&chunk_id, entry) in chunks {
let chunk_info = self.remote_chunks.entry(chunk_id).or_default();
chunk_info.temporal = Some(TemporalChunkInfo {
timeline,
time_range: entry.time_range,
num_rows: entry.num_rows,
});
}
}
}
}
if self.manifest.is_some() {
re_log::warn!(
"Received a second RRD manifest schema for the same recording. This is not yet supported."
);
}
self.manifest_row_from_chunk_id.clear();
let chunk_id = manifest.col_chunk_id()?;
for (row_idx, chunk_id) in chunk_id.enumerate() {
self.manifest_row_from_chunk_id.insert(chunk_id, row_idx);
}
self.manifest = Some(manifest);
Ok(())
}
pub fn remote_chunk_info(&self, chunk_id: &ChunkId) -> Option<&ChunkInfo> {
self.remote_chunks.get(chunk_id)
}
fn update_timeline_stats(&mut self) {
for timelines in self.native_temporal_map.values() {
for (timeline, comps) in timelines {
let mut timeline_range = self
.timelines
.get(timeline.name())
.copied()
.unwrap_or(AbsoluteTimeRange::EMPTY);
for chunks in comps.values() {
for entry in chunks.values() {
timeline_range = timeline_range.union(entry.time_range);
}
}
if timeline_range != AbsoluteTimeRange::EMPTY {
self.timelines.insert(*timeline.name(), timeline_range);
}
}
}
}
fn update_entity_tree(&mut self) {
for entity in self
.native_static_map
.keys()
.chain(self.native_temporal_map.keys())
{
self.entity_tree.on_new_entity(entity);
}
}
fn update_entity_temporal_data(&mut self) {
for (entity, timelines) in &self.native_temporal_map {
self.entity_has_temporal_data_on_timeline
.entry(entity.clone())
.or_default()
.extend(timelines.keys().map(|t| *t.name()));
}
}
fn update_entity_static_data(&mut self) {
for entity in self.native_static_map.keys() {
self.entity_has_static_data.insert(entity.clone());
}
}
fn update_chunk_intervals(&mut self) {
let mut per_timeline_chunks: BTreeMap<Timeline, Vec<(RangeInclusive<TimeInt>, ChunkId)>> =
BTreeMap::default();
for timelines in self.native_temporal_map.values() {
for (timeline, components) in timelines {
let timeline_chunks = per_timeline_chunks.entry(*timeline).or_default();
for chunks in components.values() {
for (chunk_id, entry) in chunks {
timeline_chunks.push((entry.time_range.into(), *chunk_id));
}
}
}
}
self.chunk_intervals.clear();
for (timeline, chunks) in per_timeline_chunks {
self.chunk_intervals
.insert(timeline, SortedRangeMap::new(chunks));
}
}
pub fn entity_has_temporal_data_on_timeline(
&self,
entity: &re_chunk::EntityPath,
timeline: &TimelineName,
) -> bool {
self.entity_has_temporal_data_on_timeline
.get(entity)
.is_some_and(|timelines| timelines.contains(timeline))
}
pub fn entity_has_static_data(&self, entity: &re_chunk::EntityPath) -> bool {
self.entity_has_static_data.contains(entity)
}
pub fn entity_has_data_on_timeline(
&self,
entity: &re_chunk::EntityPath,
timeline: &TimelineName,
) -> bool {
self.entity_has_static_data(entity)
|| self.entity_has_temporal_data_on_timeline(entity, timeline)
}
pub fn has_manifest(&self) -> bool {
self.manifest.is_some()
}
pub fn manifest(&self) -> Option<&RrdManifest> {
self.manifest.as_ref()
}
pub fn native_temporal_map(&self) -> &re_log_encoding::RrdManifestTemporalMap {
&self.native_temporal_map
}
pub fn mark_as_loaded(&mut self, chunk_id: ChunkId) {
let chunk_info = self.remote_chunks.entry(chunk_id).or_default();
chunk_info.state = LoadState::Loaded;
}
pub fn on_events(&mut self, store_events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();
if self.manifest.is_none() {
return;
}
for event in store_events {
let store_kind = event.store_id.kind();
let chunk_id = event.chunk.id();
match event.kind {
re_chunk_store::ChunkStoreDiffKind::Addition => {
if let Some(chunk_info) = self.remote_chunks.get_mut(&chunk_id) {
chunk_info.state = LoadState::Loaded;
} else if let Some(source) = event.split_source {
self.parents.entry(chunk_id).or_default().insert(source);
} else {
warn_when_editing_recording(
store_kind,
"Added chunk that was not part of the chunk index",
);
}
}
re_chunk_store::ChunkStoreDiffKind::Deletion => {
self.mark_deleted(store_kind, &chunk_id);
}
}
}
}
fn mark_deleted(&mut self, store_kind: StoreKind, chunk_id: &ChunkId) {
self.has_deleted = true;
if let Some(chunk_info) = self.remote_chunks.get_mut(chunk_id) {
chunk_info.state = LoadState::Unloaded;
} else if let Some(parents) = self.parents.remove(chunk_id) {
let mut ancestors = parents.into_iter().collect_vec();
while let Some(chunk_id) = ancestors.pop() {
if let Some(chunk_info) = self.remote_chunks.get_mut(&chunk_id) {
chunk_info.state = LoadState::Unloaded;
} else if let Some(grandparents) = self.parents.get(&chunk_id) {
ancestors.extend(grandparents);
} else {
warn_when_editing_recording(
store_kind,
"Removed ancestor chunk that was not part of the index",
);
}
}
} else {
warn_when_editing_recording(store_kind, "Removed chunk that was not part of the index");
}
}
pub fn timeline_range(&self, timeline: &TimelineName) -> Option<AbsoluteTimeRange> {
self.timelines.get(timeline).copied()
}
pub fn prefetch_chunks(
&mut self,
options: &ChunkPrefetchOptions,
) -> Result<RecordBatch, PrefetchError> {
re_tracing::profile_function!();
let ChunkPrefetchOptions {
timeline,
desired_range,
mut total_byte_budget,
mut delta_byte_budget,
} = *options;
let Some(manifest) = self.manifest.as_ref() else {
return Err(PrefetchError::NoManifest);
};
let Some(chunks) = self.chunk_intervals.get(&timeline) else {
return Err(PrefetchError::UnknownTimeline(timeline));
};
let chunk_byte_size_uncompressed_raw: &[u64] =
manifest.col_chunk_byte_size_uncompressed_raw()?.values();
let mut indices: Vec<i32> = vec![];
for (_, chunk_id) in chunks.query(desired_range.into()) {
let Some(remote_chunk) = self.remote_chunks.get_mut(chunk_id) else {
re_log::warn_once!("Chunk {chunk_id:?} not found in RRD manifest index");
continue;
};
let row_idx = self.manifest_row_from_chunk_id[chunk_id];
let chunk_size = chunk_byte_size_uncompressed_raw[row_idx];
total_byte_budget = total_byte_budget.saturating_sub(chunk_size);
if total_byte_budget == 0 {
break; }
if remote_chunk.state == LoadState::Unloaded {
remote_chunk.state = LoadState::InTransit;
if let Ok(row_idx) = i32::try_from(row_idx) {
indices.push(row_idx);
} else {
return Err(PrefetchError::BadIndex(row_idx));
}
delta_byte_budget = delta_byte_budget.saturating_sub(chunk_size);
if delta_byte_budget == 0 {
break; }
}
}
Ok(take_record_batch(
&manifest.data,
&Int32Array::from(indices),
)?)
}
#[must_use]
pub fn time_ranges_all_chunks(
&self,
timeline: &Timeline,
) -> Vec<(LoadState, AbsoluteTimeRange)> {
re_tracing::profile_function!();
let mut time_ranges_all_chunks = Vec::new();
for timelines in self.native_temporal_map.values() {
let Some(entity_component_chunks) = timelines.get(timeline) else {
continue;
};
for chunks in entity_component_chunks.values() {
for (chunk_id, entry) in chunks {
let RrdManifestTemporalMapEntry { time_range, .. } = entry;
let Some(info) = self.remote_chunks.get(chunk_id) else {
continue;
};
debug_assert!(
time_range.min <= time_range.max,
"Unexpected negative time range in RRD manifest"
);
time_ranges_all_chunks.push((info.state, *time_range));
}
}
}
time_ranges_all_chunks
}
pub fn full_uncompressed_size(&self) -> Option<u64> {
re_tracing::profile_function!();
Some(
self.manifest()?
.col_chunk_byte_size_uncompressed_raw()
.ok()?
.values()
.iter()
.sum(),
)
}
}
#[track_caller]
fn warn_when_editing_recording(store_kind: StoreKind, warning: &str) {
match store_kind {
StoreKind::Recording => {
if cfg!(debug_assertions) {
re_log::warn_once!("[DEBUG] {warning}");
} else {
re_log::debug_once!("{warning}");
}
}
StoreKind::Blueprint => {
}
}
}