use std::collections::{BTreeMap, BTreeSet};
use std::ops::RangeInclusive;
use ahash::{HashMap, HashSet};
use arrow::array::RecordBatch;
use itertools::chain;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, ChunkId, ComponentIdentifier, TimeInt, Timeline, TimelineName};
use re_chunk_store::{ChunkStore, QueriedChunkIdTracker};
use re_log::debug_assert;
use re_log_encoding::RrdManifest;
use re_log_types::{AbsoluteTimeRange, EntityPathHash, TimelinePoint};
use crate::{
chunk_requests::{ChunkRequests, RequestInfo},
rrd_manifest_index::{LoadState, RootChunkInfo},
sorted_range_map::SortedRangeMap,
};
#[derive(Clone, Copy, Default)]
pub struct PrioritizationState {
pub transit_budget_filled: bool,
pub memory_budget_filled: bool,
pub some_chunks_too_big: bool,
pub all_required_are_loaded: bool,
}
impl PrioritizationState {
pub fn all_chunks_loaded_or_in_transit(&self) -> bool {
!self.transit_budget_filled && !self.memory_budget_filled && !self.some_chunks_too_big
}
}
#[derive(thiserror::Error, Debug)]
pub enum PrefetchError {
#[error("No manifest available")]
NoManifest,
#[error("Unknown timeline: {0:?}")]
UnknownTimeline(Timeline),
#[error("Codec: {0}")]
Codec(#[from] re_log_encoding::CodecError),
#[error("Arrow: {0}")]
Arrow(#[from] arrow::error::ArrowError),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChunkPrefetchOptions {
pub max_on_wire_bytes_per_batch: u64,
pub total_uncompressed_byte_budget: u64,
pub max_bytes_on_wire_at_once: u64,
}
impl Default for ChunkPrefetchOptions {
fn default() -> Self {
Self {
total_uncompressed_byte_budget: u64::MAX,
max_on_wire_bytes_per_batch: 256 * 1024,
max_bytes_on_wire_at_once: 4_000_000,
}
}
}
#[derive(Clone, Default)]
struct HighPrioChunks {
temporal_chunks: BTreeMap<TimelineName, Vec<HighPrioChunk>>,
}
impl HighPrioChunks {
fn all_before(&self, timeline_point: TimelinePoint) -> impl Iterator<Item = ChunkId> + '_ {
self.temporal_chunks
.get(timeline_point.name())
.into_iter()
.flat_map(move |chunks| {
let idx =
chunks.partition_point(|chunk| chunk.time_range.min <= timeline_point.time);
chunks[..idx].iter().rev()
})
.map(|chunk| chunk.chunk_id)
}
}
impl re_byte_size::SizeBytes for HighPrioChunks {
fn heap_size_bytes(&self) -> u64 {
let Self { temporal_chunks } = self;
temporal_chunks.heap_size_bytes()
}
}
#[derive(Clone)]
struct HighPrioChunk {
chunk_id: ChunkId,
time_range: AbsoluteTimeRange,
}
impl re_byte_size::SizeBytes for HighPrioChunk {
fn heap_size_bytes(&self) -> u64 {
let Self {
chunk_id: _,
time_range: _,
} = self;
0
}
fn is_pod() -> bool {
true
}
}
#[derive(Default)]
struct CurrentBatch {
row_indices: Vec<usize>,
uncompressed_bytes: u64,
on_wire_bytes: u64,
}
impl CurrentBatch {
fn reset(&mut self) {
let Self {
row_indices,
uncompressed_bytes,
on_wire_bytes,
} = self;
row_indices.clear();
*uncompressed_bytes = 0;
*on_wire_bytes = 0;
}
}
struct ChunkRequestBatcher<'a> {
manifest: &'a RrdManifest,
chunk_byte_size_uncompressed: &'a [u64],
chunk_byte_size: &'a [u64],
max_on_wire_bytes_per_batch: u64,
remaining_bytes_in_on_wire_budget: u64,
current_batch: CurrentBatch,
to_load: Vec<(RecordBatch, RequestInfo)>,
}
impl<'a> ChunkRequestBatcher<'a> {
fn new(
manifest: &'a RrdManifest,
requests: &ChunkRequests,
options: &ChunkPrefetchOptions,
) -> Self {
Self {
chunk_byte_size_uncompressed: manifest.col_chunk_byte_size_uncompressed(),
chunk_byte_size: manifest.col_chunk_byte_size(),
manifest,
max_on_wire_bytes_per_batch: options.max_on_wire_bytes_per_batch,
remaining_bytes_in_on_wire_budget: options
.max_bytes_on_wire_at_once
.saturating_sub(requests.num_on_wire_bytes_pending()),
current_batch: Default::default(),
to_load: Vec::new(),
}
}
fn finish_batch(&mut self) -> Result<(), PrefetchError> {
if self.current_batch.row_indices.is_empty() {
return Ok(());
}
let row_indices: BTreeSet<usize> = self.current_batch.row_indices.iter().copied().collect();
let col_chunk_ids: &[ChunkId] = self.manifest.col_chunk_ids();
let mut root_chunk_ids = ahash::HashSet::default();
for &row_idx in &row_indices {
root_chunk_ids.insert(col_chunk_ids[row_idx]);
}
let rb = re_arrow_util::take_record_batch(
self.manifest.data(),
&std::mem::take(&mut self.current_batch.row_indices),
)?;
self.to_load.push((
rb,
RequestInfo {
root_chunk_ids,
row_indices,
size_bytes_uncompressed: self.current_batch.uncompressed_bytes,
size_bytes_on_wire: self.current_batch.on_wire_bytes,
},
));
self.current_batch.reset();
Ok(())
}
fn try_fetch(&mut self, chunk_row_idx: usize) -> Result<bool, PrefetchError> {
if self.remaining_bytes_in_on_wire_budget == 0 {
return Ok(false);
}
let uncompressed_chunk_size = self.chunk_byte_size_uncompressed[chunk_row_idx];
let on_wire_byte_size = self.chunk_byte_size[chunk_row_idx];
self.current_batch.row_indices.push(chunk_row_idx);
self.current_batch.uncompressed_bytes += uncompressed_chunk_size;
self.current_batch.on_wire_bytes += on_wire_byte_size;
if self.max_on_wire_bytes_per_batch <= self.current_batch.on_wire_bytes {
self.finish_batch()?;
}
self.remaining_bytes_in_on_wire_budget = self
.remaining_bytes_in_on_wire_budget
.saturating_sub(on_wire_byte_size);
Ok(true)
}
#[must_use = "Load the returned batches"]
pub fn finish(mut self) -> Result<Vec<(RecordBatch, RequestInfo)>, PrefetchError> {
self.finish_batch()?;
Ok(self.to_load)
}
}
fn warn_entity_exceeds_memory(entity_path: &str) {
if cfg!(target_arch = "wasm32") {
re_log::debug_once!(
"Cannot load all of entity '{entity_path}', because its size exceeds the memory budget. Try the native viewer instead, or split up your large assets (e.g. prefer VideoStream over VideoAsset)."
);
} else {
re_log::warn_once!(
"Cannot load all of entity '{entity_path}', because its size exceeds the memory budget. You should increase the `--memory-limit` or try to split up your large assets (e.g. prefer VideoStream over VideoAsset)."
);
}
}
struct RemainingByteBudget {
remaining_bytes: u64,
}
impl RemainingByteBudget {
fn try_fit_into_budget(&mut self, bytes: u64, required: bool) -> bool {
self.remaining_bytes = self.remaining_bytes.saturating_sub(bytes);
if self.remaining_bytes == 0 {
if required {
if cfg!(target_arch = "wasm32") {
re_log::warn_once!(
"This recording is very memory intense, and the Wasm32 build only has 4GiB of memory. Consider using the native viewer to use all of your RAM."
);
} else {
re_log::warn_once!(
"The current recording may use more data than your current memory budget."
);
}
true } else {
false
}
} else {
true
}
}
}
struct PrioritizedRootChunk {
required: bool,
root_chunk_id: ChunkId,
}
impl PrioritizedRootChunk {
fn required(root_chunk_id: ChunkId) -> Self {
Self {
required: true,
root_chunk_id,
}
}
fn optional(chunk_id: ChunkId) -> Self {
Self {
required: false,
root_chunk_id: chunk_id,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct ComponentPathKey {
entity_path: EntityPathHash,
component: ComponentIdentifier,
}
#[cfg(test)]
impl ComponentPathKey {
pub fn dummy() -> Self {
Self {
entity_path: EntityPathHash::NONE,
component: ComponentIdentifier::new("test"),
}
}
}
impl re_byte_size::SizeBytes for ComponentPathKey {
fn heap_size_bytes(&self) -> u64 {
let Self {
entity_path: _,
component: _,
} = self;
0
}
fn is_pod() -> bool {
true
}
}
#[derive(Clone, Default)]
pub struct ProtectedChunks {
pub roots: HashSet<ChunkId>,
pub physical: HashSet<ChunkId>,
}
impl re_byte_size::SizeBytes for ProtectedChunks {
fn heap_size_bytes(&self) -> u64 {
let Self { roots, physical } = self;
roots.heap_size_bytes() + physical.heap_size_bytes()
}
}
#[derive(Default)]
#[cfg_attr(feature = "testing", derive(Clone))]
pub struct ChunkPrioritizer {
protected_chunks: ProtectedChunks,
latest_result: Option<PrioritizationState>,
chunk_requests: ChunkRequests,
root_chunk_intervals: BTreeMap<Timeline, SortedRangeMap<TimeInt, ChunkId>>,
static_chunk_ids: HashSet<ChunkId>,
high_priority_chunks: HighPrioChunks,
pub component_paths_from_root_id: HashMap<ChunkId, Vec<ComponentPathKey>>,
pub components_of_interest: HashSet<ComponentPathKey>,
}
impl re_byte_size::SizeBytes for ChunkPrioritizer {
fn heap_size_bytes(&self) -> u64 {
let Self {
protected_chunks,
latest_result: _,
chunk_requests: _, root_chunk_intervals: virtual_chunk_intervals,
static_chunk_ids,
high_priority_chunks,
component_paths_from_root_id,
components_of_interest,
} = self;
protected_chunks.heap_size_bytes()
+ virtual_chunk_intervals.heap_size_bytes()
+ static_chunk_ids.heap_size_bytes()
+ high_priority_chunks.heap_size_bytes()
+ component_paths_from_root_id.heap_size_bytes()
+ components_of_interest.heap_size_bytes()
}
}
impl ChunkPrioritizer {
pub fn on_rrd_manifest(&mut self, delta: &RrdManifest) {
self.update_static_chunks(delta);
self.update_chunk_intervals(delta);
self.update_high_priority_chunks(delta);
for (entity, per_component) in delta.static_map() {
for (component, chunk) in per_component {
self.component_paths_from_root_id
.entry(*chunk)
.or_default()
.push(ComponentPathKey {
entity_path: entity.hash(),
component: *component,
});
}
}
for (entity, per_timeline) in delta.temporal_map() {
for per_component in per_timeline.values() {
for (component, chunks) in per_component {
for chunk in chunks.keys() {
self.component_paths_from_root_id
.entry(*chunk)
.or_default()
.push(ComponentPathKey {
entity_path: entity.hash(),
component: *component,
});
}
}
}
}
}
pub fn latest_result(&self) -> Option<PrioritizationState> {
self.latest_result
}
fn find_chunks_with_component_prefix(manifest: &RrdManifest, prefix: &str) -> HighPrioChunks {
let mut temporal_chunks: BTreeMap<TimelineName, Vec<HighPrioChunk>> = Default::default();
for timelines in manifest.temporal_map().values() {
for (timeline, components) in timelines {
for (component, chunks) in components {
if component.as_str().starts_with(prefix) {
for (chunk_id, entry) in chunks {
temporal_chunks.entry(*timeline.name()).or_default().push(
HighPrioChunk {
chunk_id: *chunk_id,
time_range: entry.time_range,
},
);
}
}
}
}
}
for chunks in temporal_chunks.values_mut() {
chunks.sort_by_key(|chunk| chunk.time_range.min);
}
HighPrioChunks { temporal_chunks }
}
fn update_high_priority_chunks(&mut self, manifest: &RrdManifest) {
let new_chunks = Self::find_chunks_with_component_prefix(
manifest,
"Transform3D:", );
for (timeline, mut chunks) in new_chunks.temporal_chunks {
let existing = self
.high_priority_chunks
.temporal_chunks
.entry(timeline)
.or_default();
existing.append(&mut chunks);
existing.sort_by_key(|chunk| chunk.time_range.min);
}
}
fn update_static_chunks(&mut self, manifest: &RrdManifest) {
for entity_chunks in manifest.static_map().values() {
for &chunk_id in entity_chunks.values() {
self.static_chunk_ids.insert(chunk_id);
}
}
}
fn update_chunk_intervals(&mut self, manifest: &RrdManifest) {
let mut per_timeline_chunks: BTreeMap<Timeline, Vec<(RangeInclusive<TimeInt>, ChunkId)>> =
BTreeMap::default();
for timelines in manifest.temporal_map().values() {
for (timeline, components) in timelines {
let timeline_chunks = per_timeline_chunks.entry(*timeline).or_default();
for chunks in components.values() {
for (chunk_id, entry) in chunks {
timeline_chunks.push((entry.time_range.into(), *chunk_id));
}
}
}
}
for (timeline, chunks) in per_timeline_chunks {
self.root_chunk_intervals
.entry(timeline)
.or_default()
.extend(chunks);
}
}
pub fn chunk_requests(&self) -> &ChunkRequests {
&self.chunk_requests
}
pub fn chunk_requests_mut(&mut self) -> &mut ChunkRequests {
&mut self.chunk_requests
}
pub fn protected_chunks(&self) -> &ProtectedChunks {
&self.protected_chunks
}
#[expect(clippy::too_many_arguments)] fn root_chunks_in_priority<'a>(
components_of_interest: &'a HashSet<ComponentPathKey>,
component_paths_from_root_id: &'a HashMap<ChunkId, Vec<ComponentPathKey>>,
static_chunk_ids: &'a HashSet<ChunkId>,
high_priority_chunks: &'a HighPrioChunks,
store: &'a ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
time_cursor: Option<TimelinePoint>,
root_chunks: &'a HashMap<ChunkId, RootChunkInfo>,
root_chunks_on_timeline: Option<&'a SortedRangeMap<TimeInt, ChunkId>>,
) -> impl Iterator<Item = PrioritizedRootChunk> + use<'a> {
re_tracing::profile_function!();
let mut missing_roots = Vec::new();
for missing_virtual_chunk_id in &used_and_missing.missing_virtual {
store.collect_root_ids(missing_virtual_chunk_id, &mut missing_roots);
}
missing_roots.sort();
missing_roots.dedup();
let chunks_ids_after_time_cursor = move || {
time_cursor
.zip(root_chunks_on_timeline)
.map(|(time_cursor, root_chunks_on_timeline)| {
root_chunks_on_timeline
.query(time_cursor.time..=TimeInt::MAX)
.map(|(_, chunk_id)| *chunk_id)
})
.into_iter()
.flatten()
};
let chunks_ids_before_time_cursor = move || {
time_cursor
.zip(root_chunks_on_timeline)
.map(|(time_cursor, root_chunks_on_timeline)| {
root_chunks_on_timeline
.query(TimeInt::MIN..=time_cursor.time.saturating_sub(1))
.map(|(_, chunk_id)| *chunk_id)
})
.into_iter()
.flatten()
};
let high_prio_chunks_before_time_cursor = time_cursor
.map(|time_cursor| high_priority_chunks.all_before(time_cursor))
.into_iter()
.flatten();
let required_chunks = chain!(
missing_roots,
static_chunk_ids.iter().copied(),
high_prio_chunks_before_time_cursor,
);
let optional_chunks = {
let is_interesting_chunk = |chunk_id: &ChunkId| {
component_paths_from_root_id[chunk_id]
.iter()
.any(|path| components_of_interest.contains(path))
};
let is_uninteresting_chunk = |chunk_id: &ChunkId| {
!component_paths_from_root_id[chunk_id]
.iter()
.any(|path| components_of_interest.contains(path))
};
let optional_interesting_chunks = chain!(
std::iter::once_with(chunks_ids_after_time_cursor).flatten(),
std::iter::once_with(chunks_ids_before_time_cursor).flatten(),
)
.filter(is_interesting_chunk);
let optional_uninteresting_chunks = std::iter::once_with(chunks_ids_after_time_cursor)
.flatten()
.filter(is_uninteresting_chunk);
let all_chunks = root_chunks.keys().copied();
chain!(
optional_interesting_chunks,
optional_uninteresting_chunks,
all_chunks,
)
};
chain!(
required_chunks.map(PrioritizedRootChunk::required),
optional_chunks.map(PrioritizedRootChunk::optional),
)
}
#[must_use = "Load the returned batches"]
pub fn prioritize_and_prefetch(
&mut self,
store: &ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
options: &ChunkPrefetchOptions,
time_cursor: Option<TimelinePoint>,
manifest: &RrdManifest,
root_chunks: &HashMap<ChunkId, RootChunkInfo>,
) -> Result<Vec<(RecordBatch, RequestInfo)>, PrefetchError> {
re_tracing::profile_function!();
let mut chunk_batcher = ChunkRequestBatcher::new(manifest, &self.chunk_requests, options);
if let Some(latest_result) = &mut self.latest_result
&& chunk_batcher.remaining_bytes_in_on_wire_budget == 0
{
if !used_and_missing.missing_virtual.is_empty() {
latest_result.all_required_are_loaded = false;
}
self.protect_used_and_missing(store, used_and_missing);
return Ok(vec![]);
}
self.update_components_of_interest(store, used_and_missing);
self.protected_chunks.roots.clear();
self.protected_chunks.physical.clear();
self.protect_used_and_missing(store, used_and_missing);
let mut remaining_byte_budget = RemainingByteBudget {
remaining_bytes: options.total_uncompressed_byte_budget,
};
for &physical_chunk_id in &used_and_missing.used_physical {
debug_assert!(
self.protected_chunks.physical.contains(&physical_chunk_id),
"We added it earlier"
);
if let Some(chunk) = store.physical_chunk(&physical_chunk_id) {
let required = true;
remaining_byte_budget
.try_fit_into_budget(Chunk::total_size_bytes(chunk.as_ref()), required);
} else {
re_log::debug_warn_once!("Couldn't get physical chunk from chunk store");
}
}
let root_chunks_on_timeline = time_cursor
.and_then(|time_cursor| self.root_chunk_intervals.get(&time_cursor.timeline()));
let root_chunk_ids_in_priority_order = Self::root_chunks_in_priority(
&self.components_of_interest,
&self.component_paths_from_root_id,
&self.static_chunk_ids,
&self.high_priority_chunks,
store,
used_and_missing,
time_cursor,
root_chunks,
root_chunks_on_timeline,
);
let state = Self::fill_byte_budget(
&mut self.protected_chunks,
store,
options,
manifest,
root_chunks,
&mut chunk_batcher,
&mut remaining_byte_budget,
root_chunk_ids_in_priority_order,
)?;
self.latest_result = Some(state);
chunk_batcher.finish()
}
#[expect(clippy::too_many_arguments)]
fn fill_byte_budget(
protected_chunks: &mut ProtectedChunks,
store: &ChunkStore,
options: &ChunkPrefetchOptions,
manifest: &RrdManifest,
root_chunks: &HashMap<ChunkId, RootChunkInfo>,
chunk_batcher: &mut ChunkRequestBatcher<'_>,
remaining_byte_budget: &mut RemainingByteBudget,
mut root_chunk_ids_in_priority_order: impl Iterator<Item = PrioritizedRootChunk>,
) -> Result<PrioritizationState, PrefetchError> {
re_tracing::profile_function!();
let entity_paths = manifest.col_chunk_entity_path_raw();
let mut visited_root_chunks: HashSet<ChunkId> = Default::default();
let mut physical_chunks_scratch = Vec::new();
let mut state = PrioritizationState {
transit_budget_filled: false,
memory_budget_filled: false,
some_chunks_too_big: false,
all_required_are_loaded: true,
};
for next in root_chunk_ids_in_priority_order.by_ref() {
let PrioritizedRootChunk {
required,
root_chunk_id,
} = next;
if !visited_root_chunks.insert(root_chunk_id) {
continue; }
let Some(root_chunk) = root_chunks.get(&root_chunk_id) else {
re_log::debug_warn_once!("Missing root chunk");
continue;
};
store.collect_physical_descendents_of(&root_chunk_id, &mut physical_chunks_scratch);
match root_chunk.state {
LoadState::Unloaded | LoadState::InTransit => {
if required {
state.all_required_are_loaded = false;
}
let row_idx = root_chunk.row_id;
let uncompressed_chunk_size =
chunk_batcher.chunk_byte_size_uncompressed[row_idx];
if options.total_uncompressed_byte_budget < uncompressed_chunk_size {
warn_entity_exceeds_memory(entity_paths.value(row_idx));
state.some_chunks_too_big = true;
continue;
}
if !remaining_byte_budget.try_fit_into_budget(uncompressed_chunk_size, required)
{
state.memory_budget_filled = true;
break;
}
if root_chunk.state == LoadState::Unloaded
&& !chunk_batcher.try_fetch(row_idx)?
{
state.transit_budget_filled = true;
break;
}
protected_chunks.roots.insert(root_chunk_id);
protected_chunks
.physical
.extend(physical_chunks_scratch.drain(..));
}
LoadState::FullyLoaded => {
protected_chunks.roots.insert(root_chunk_id);
for chunk_id in physical_chunks_scratch.drain(..) {
if protected_chunks.physical.contains(&chunk_id) {
continue; }
let Some(chunk) = store.physical_chunk(&chunk_id) else {
re_log::debug_warn_once!(
"Couldn't get physical chunk from chunk store"
);
continue;
};
let bytes = Chunk::total_size_bytes(chunk.as_ref());
if !remaining_byte_budget.try_fit_into_budget(bytes, required) {
state.memory_budget_filled = true;
break;
}
protected_chunks.physical.insert(chunk_id);
}
}
}
}
if root_chunk_ids_in_priority_order
.next()
.is_some_and(|next| next.required)
{
state.all_required_are_loaded = false;
}
Ok(state)
}
fn update_components_of_interest(
&mut self,
store: &ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
) {
re_tracing::profile_function!();
self.components_of_interest.clear();
let QueriedChunkIdTracker {
used_physical,
missing_virtual,
} = used_and_missing;
for physical_chunk_id in used_physical {
if let Some(chunk) = store.physical_chunk(physical_chunk_id) {
for component in chunk.components_identifiers() {
self.components_of_interest.insert(ComponentPathKey {
entity_path: chunk.entity_path().hash(),
component,
});
}
}
}
for missing_virtual_chunk_id in missing_virtual {
for root_id in store.find_root_chunks(missing_virtual_chunk_id) {
if let Some(components) = self.component_paths_from_root_id.get(&root_id) {
self.components_of_interest
.extend(components.iter().copied());
}
}
}
}
fn protect_used_and_missing(
&mut self,
store: &ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
) {
let QueriedChunkIdTracker {
used_physical,
missing_virtual,
} = used_and_missing;
for physical_chunk_id in used_physical {
self.protected_chunks.physical.insert(*physical_chunk_id);
}
for chunk_id in missing_virtual {
for root_id in store.find_root_chunks(chunk_id) {
self.protected_chunks.roots.insert(root_id);
}
}
}
#[must_use = "Returns root chunks whose download got cancelled. Mark them as unloaded!"]
pub fn cancel_outdated_requests(&mut self, egui_now_time: f64) -> Vec<ChunkId> {
self.chunk_requests
.cancel_outdated_requests(egui_now_time, &self.protected_chunks.roots)
}
}