use std::collections::{BTreeMap, BTreeSet};
use std::ops::RangeInclusive;
use std::time::Duration;
use ahash::{HashMap, HashSet};
use arrow::array::RecordBatch;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, ChunkId, ComponentIdentifier, TimeInt, Timeline, TimelineName};
use re_chunk_store::{ChunkStore, QueriedChunkIdTracker};
use re_log::debug_assert;
use re_log_encoding::RrdManifest;
use re_log_types::{AbsoluteTimeRange, EntityPathHash, TimelinePoint};
use re_mutex::Mutex;
use crate::{
chunk_requests::{ChunkRequests, RequestInfo},
rrd_manifest_index::{LoadState, RootChunkInfo},
sorted_range_map::{OverlapIterState, SortedRangeMap},
};
#[derive(Clone, Copy, Default)]
pub struct PrioritizationState {
pub transit_budget_filled: bool,
pub memory_budget_filled: bool,
pub some_chunks_too_big: bool,
pub all_required_are_loaded: Option<bool>,
}
impl PrioritizationState {
pub fn all_chunks_loaded_or_in_transit(&self) -> bool {
!self.transit_budget_filled && !self.memory_budget_filled && !self.some_chunks_too_big
}
}
#[derive(thiserror::Error, Debug)]
pub enum PrefetchError {
#[error("No manifest available")]
NoManifest,
#[error("Unknown timeline: {0:?}")]
UnknownTimeline(Timeline),
#[error("Codec: {0}")]
Codec(#[from] re_log_encoding::CodecError),
#[error("Arrow: {0}")]
Arrow(#[from] arrow::error::ArrowError),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChunkPrefetchOptions {
pub max_fetch_stage: FetchStage,
pub max_on_wire_bytes_per_batch: u64,
pub max_bytes_on_wire_at_once: u64,
}
impl Default for ChunkPrefetchOptions {
fn default() -> Self {
Self {
max_fetch_stage: FetchStage::default(),
max_on_wire_bytes_per_batch: 256 * 1024,
max_bytes_on_wire_at_once: 4_000_000,
}
}
}
#[derive(Clone, Default)]
struct HighPrioChunks {
temporal_chunks: BTreeMap<TimelineName, Vec<HighPrioChunk>>,
}
impl re_byte_size::SizeBytes for HighPrioChunks {
fn heap_size_bytes(&self) -> u64 {
let Self { temporal_chunks } = self;
temporal_chunks.heap_size_bytes()
}
}
#[derive(Clone)]
struct HighPrioChunk {
chunk_id: ChunkId,
time_range: AbsoluteTimeRange,
}
impl re_byte_size::SizeBytes for HighPrioChunk {
fn heap_size_bytes(&self) -> u64 {
let Self {
chunk_id: _,
time_range: _,
} = self;
0
}
fn is_pod() -> bool {
true
}
}
#[derive(Default)]
struct CurrentBatch {
row_indices: Vec<usize>,
uncompressed_bytes: u64,
on_wire_bytes: u64,
}
impl CurrentBatch {
fn reset(&mut self) {
let Self {
row_indices,
uncompressed_bytes,
on_wire_bytes,
} = self;
row_indices.clear();
*uncompressed_bytes = 0;
*on_wire_bytes = 0;
}
}
pub(crate) struct ChunkRequestBatcher<'a> {
manifest: &'a RrdManifest,
chunk_byte_size_uncompressed: &'a [u64],
chunk_byte_size: &'a [u64],
max_on_wire_bytes_per_batch: u64,
current_batch: CurrentBatch,
to_load: Vec<(RecordBatch, RequestInfo)>,
}
impl<'a> ChunkRequestBatcher<'a> {
pub(crate) fn new(manifest: &'a RrdManifest, options: &ChunkPrefetchOptions) -> Self {
Self {
chunk_byte_size_uncompressed: manifest.col_chunk_byte_size_uncompressed(),
chunk_byte_size: manifest.col_chunk_byte_size(),
manifest,
max_on_wire_bytes_per_batch: options.max_on_wire_bytes_per_batch,
current_batch: Default::default(),
to_load: Vec::new(),
}
}
fn finish_batch(&mut self) -> Result<(), PrefetchError> {
if self.current_batch.row_indices.is_empty() {
return Ok(());
}
let row_indices: BTreeSet<usize> = self.current_batch.row_indices.iter().copied().collect();
let col_chunk_ids: &[ChunkId] = self.manifest.col_chunk_ids();
let mut root_chunk_ids = ahash::HashSet::default();
for &row_idx in &row_indices {
root_chunk_ids.insert(col_chunk_ids[row_idx]);
}
let rb = re_arrow_util::take_record_batch(
self.manifest.chunk_fetcher_rb(),
&std::mem::take(&mut self.current_batch.row_indices),
)?;
self.to_load.push((
rb,
RequestInfo {
root_chunk_ids,
row_indices,
size_bytes_uncompressed: self.current_batch.uncompressed_bytes,
size_bytes_on_wire: self.current_batch.on_wire_bytes,
},
));
self.current_batch.reset();
Ok(())
}
fn try_fetch(
&mut self,
chunk_row_idx: usize,
budget: &mut RemainingByteBudget,
) -> Result<bool, PrefetchError> {
let on_wire_byte_size = self.chunk_byte_size[chunk_row_idx];
if !budget.try_fit_on_wire(on_wire_byte_size) {
return Ok(false);
}
let uncompressed_chunk_size = self.chunk_byte_size_uncompressed[chunk_row_idx];
self.current_batch.row_indices.push(chunk_row_idx);
self.current_batch.uncompressed_bytes += uncompressed_chunk_size;
self.current_batch.on_wire_bytes += on_wire_byte_size;
if self.max_on_wire_bytes_per_batch <= self.current_batch.on_wire_bytes {
self.finish_batch()?;
}
Ok(true)
}
#[must_use = "Load the returned batches"]
pub fn finish(mut self) -> Result<Vec<(RecordBatch, RequestInfo)>, PrefetchError> {
self.finish_batch()?;
Ok(self.to_load)
}
}
fn warn_entity_exceeds_memory(entity_path: &str) {
if cfg!(target_arch = "wasm32") {
re_log::debug_once!(
"Cannot load all of entity '{entity_path}', because its size exceeds the memory budget. Try the native viewer instead, or split up your large assets (e.g. prefer VideoStream over VideoAsset)."
);
} else {
re_log::warn_once!(
"Cannot load all of entity '{entity_path}', because its size exceeds the memory budget. You should increase the `--memory-limit` or try to split up your large assets (e.g. prefer VideoStream over VideoAsset)."
);
}
}
pub struct RemainingByteBudget {
pub total_bytes_in_memory: u64,
remaining_bytes_in_memory: u64,
pub remaining_bytes_on_wire: i64,
}
impl RemainingByteBudget {
pub fn full(&self) -> bool {
self.remaining_bytes_in_memory == 0 || self.remaining_bytes_on_wire <= 0
}
pub fn new(total_bytes_in_memory: u64, max_bytes_on_wire: u64) -> Self {
Self {
total_bytes_in_memory,
remaining_bytes_in_memory: total_bytes_in_memory,
remaining_bytes_on_wire: i64::try_from(max_bytes_on_wire).unwrap_or(i64::MAX),
}
}
fn try_fit_in_memory(&mut self, bytes: u64, required: bool) -> bool {
self.remaining_bytes_in_memory = self.remaining_bytes_in_memory.saturating_sub(bytes);
if self.remaining_bytes_in_memory == 0 {
if required {
if cfg!(target_arch = "wasm32") {
re_log::warn_once!(
"This recording is very memory intense, and the Wasm32 build only has 4GiB of memory. Consider using the native viewer to use all of your RAM."
);
} else {
re_log::warn_once!(
"The current recording may use more data than your current memory budget."
);
}
true } else {
false
}
} else {
true
}
}
fn try_fit_on_wire(&mut self, bytes: u64) -> bool {
let fit_on_wire = self.remaining_bytes_on_wire > 0;
self.remaining_bytes_on_wire = self.remaining_bytes_on_wire.saturating_sub_unsigned(bytes);
fit_on_wire
}
}
#[derive(Clone, Copy)]
pub struct PrioritizedRootChunk {
stage: FetchStage,
root_chunk_id: ChunkId,
}
impl PrioritizedRootChunk {
fn required(root_chunk_id: ChunkId) -> Self {
Self {
stage: FetchStage::Required,
root_chunk_id,
}
}
fn similar(chunk_id: ChunkId, time_cursor_offset: Option<Duration>) -> Self {
Self {
stage: FetchStage::Similar(time_cursor_offset),
root_chunk_id: chunk_id,
}
}
fn everything(chunk_id: ChunkId) -> Self {
Self {
stage: FetchStage::Everything,
root_chunk_id: chunk_id,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct ComponentPathKey {
entity_path: EntityPathHash,
component: ComponentIdentifier,
}
#[cfg(test)]
impl ComponentPathKey {
pub fn dummy() -> Self {
Self {
entity_path: EntityPathHash::NONE,
component: ComponentIdentifier::new("test"),
}
}
}
impl re_byte_size::SizeBytes for ComponentPathKey {
fn heap_size_bytes(&self) -> u64 {
let Self {
entity_path: _,
component: _,
} = self;
0
}
fn is_pod() -> bool {
true
}
}
#[derive(Clone, Default)]
pub struct ProtectedChunks {
pub roots: HashSet<ChunkId>,
pub physical: HashSet<ChunkId>,
}
impl re_byte_size::SizeBytes for ProtectedChunks {
fn heap_size_bytes(&self) -> u64 {
let Self { roots, physical } = self;
roots.heap_size_bytes() + physical.heap_size_bytes()
}
}
#[derive(Default)]
#[cfg_attr(feature = "testing", derive(Clone))]
pub struct ChunkPrioritizer {
protected_chunks: ProtectedChunks,
latest_result: Option<PrioritizationState>,
chunk_requests: ChunkRequests,
root_chunk_intervals: BTreeMap<Timeline, SortedRangeMap<TimeInt, ChunkId>>,
static_chunk_ids: Vec<ChunkId>,
high_priority_chunks: HighPrioChunks,
pub component_paths_from_root_id: HashMap<ChunkId, Vec<ComponentPathKey>>,
pub components_of_interest: HashSet<ComponentPathKey>,
frame_visited: HashSet<ChunkId>,
}
impl re_byte_size::SizeBytes for ChunkPrioritizer {
fn heap_size_bytes(&self) -> u64 {
let Self {
protected_chunks,
latest_result: _,
chunk_requests: _, root_chunk_intervals: virtual_chunk_intervals,
static_chunk_ids,
high_priority_chunks,
component_paths_from_root_id,
components_of_interest,
frame_visited,
} = self;
protected_chunks.heap_size_bytes()
+ virtual_chunk_intervals.heap_size_bytes()
+ static_chunk_ids.heap_size_bytes()
+ high_priority_chunks.heap_size_bytes()
+ component_paths_from_root_id.heap_size_bytes()
+ components_of_interest.heap_size_bytes()
+ frame_visited.heap_size_bytes()
}
}
#[derive(Clone, Copy)]
pub struct PrefetchTimeCursor {
pub time_cursor: TimelinePoint,
pub speed_if_unpaused: f64,
pub loop_range: Option<AbsoluteTimeRange>,
}
impl std::ops::Deref for PrefetchTimeCursor {
type Target = TimelinePoint;
#[inline]
fn deref(&self) -> &Self::Target {
&self.time_cursor
}
}
impl ChunkPrioritizer {
pub fn on_rrd_manifest(&mut self, delta: &RrdManifest) {
self.update_static_chunks(delta);
self.update_chunk_intervals(delta);
self.update_high_priority_chunks(delta);
for (entity, per_component) in delta.static_map() {
for (component, chunk) in per_component {
self.component_paths_from_root_id
.entry(*chunk)
.or_default()
.push(ComponentPathKey {
entity_path: entity.hash(),
component: *component,
});
}
}
for (entity, per_timeline) in delta.temporal_map() {
for per_component in per_timeline.values() {
for (component, chunks) in per_component {
for chunk in chunks.keys() {
self.component_paths_from_root_id
.entry(*chunk)
.or_default()
.push(ComponentPathKey {
entity_path: entity.hash(),
component: *component,
});
}
}
}
}
}
pub fn latest_result(&self) -> Option<PrioritizationState> {
self.latest_result
}
fn find_chunks_with_component_prefix(manifest: &RrdManifest, prefix: &str) -> HighPrioChunks {
let mut temporal_chunks: BTreeMap<TimelineName, Vec<HighPrioChunk>> = Default::default();
for timelines in manifest.temporal_map().values() {
for (timeline, components) in timelines {
for (component, chunks) in components {
if component.as_str().starts_with(prefix) {
for (chunk_id, entry) in chunks {
temporal_chunks.entry(*timeline.name()).or_default().push(
HighPrioChunk {
chunk_id: *chunk_id,
time_range: entry.time_range,
},
);
}
}
}
}
}
for chunks in temporal_chunks.values_mut() {
chunks.sort_by_key(|chunk| chunk.time_range.min);
}
HighPrioChunks { temporal_chunks }
}
fn update_high_priority_chunks(&mut self, manifest: &RrdManifest) {
let new_chunks = Self::find_chunks_with_component_prefix(
manifest,
"Transform3D:", );
for (timeline, mut chunks) in new_chunks.temporal_chunks {
let existing = self
.high_priority_chunks
.temporal_chunks
.entry(timeline)
.or_default();
existing.append(&mut chunks);
existing.sort_by_key(|chunk| chunk.time_range.min);
}
}
fn update_static_chunks(&mut self, manifest: &RrdManifest) {
for entity_chunks in manifest.static_map().values() {
self.static_chunk_ids.extend(entity_chunks.values());
}
self.static_chunk_ids.sort();
self.static_chunk_ids.dedup();
}
fn update_chunk_intervals(&mut self, manifest: &RrdManifest) {
let mut per_timeline_chunks: BTreeMap<Timeline, Vec<(RangeInclusive<TimeInt>, ChunkId)>> =
BTreeMap::default();
for timelines in manifest.temporal_map().values() {
for (timeline, components) in timelines {
let timeline_chunks = per_timeline_chunks.entry(*timeline).or_default();
for chunks in components.values() {
for (chunk_id, entry) in chunks {
timeline_chunks.push((entry.time_range.into(), *chunk_id));
}
}
}
}
for (timeline, chunks) in per_timeline_chunks {
self.root_chunk_intervals
.entry(timeline)
.or_default()
.extend(chunks);
}
}
pub fn chunk_requests(&self) -> &ChunkRequests {
&self.chunk_requests
}
pub fn chunk_requests_mut(&mut self) -> &mut ChunkRequests {
&mut self.chunk_requests
}
pub fn protected_chunks(&self) -> &ProtectedChunks {
&self.protected_chunks
}
pub fn prepare_chunk_fetcher<'a>(
&'a mut self,
store: &'a ChunkStore,
manifest: &'a RrdManifest,
options: &ChunkPrefetchOptions,
time_cursor: Option<PrefetchTimeCursor>,
root_chunks: &'a HashMap<ChunkId, RootChunkInfo>,
budget: &mut RemainingByteBudget,
) -> ChunkFetcher<'a> {
let used_and_missing = store.take_tracked_chunk_ids();
self.frame_visited.clear();
self.update_components_of_interest(store, &used_and_missing);
self.protected_chunks.roots.clear();
self.protected_chunks.physical.clear();
self.protect_used_and_missing(store, &used_and_missing);
for &physical_chunk_id in &used_and_missing.used_physical {
debug_assert!(
self.protected_chunks.physical.contains(&physical_chunk_id),
"We added it earlier"
);
if let Some(chunk) = store.physical_chunk(&physical_chunk_id) {
budget.try_fit_in_memory(Chunk::total_size_bytes(chunk.as_ref()), true);
} else {
re_log::debug_warn_once!("Couldn't get physical chunk from chunk store");
}
}
ChunkFetcher {
visited_root_chunks: std::mem::take(&mut self.frame_visited),
chunk_id_scratch: Vec::new(),
state: PrioritizationState::default(),
prioritizer: self,
root_chunks,
time_cursor,
store,
next_chunk: None,
fetch_stage: ChunkPriorityStage::Start(used_and_missing),
request_batcher: Some(ChunkRequestBatcher::new(manifest, options)),
}
}
fn update_components_of_interest(
&mut self,
store: &ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
) {
re_tracing::profile_function!();
self.components_of_interest.clear();
let QueriedChunkIdTracker {
used_physical,
missing_virtual,
} = used_and_missing;
for physical_chunk_id in used_physical {
if let Some(chunk) = store.physical_chunk(physical_chunk_id) {
for component in chunk.components_identifiers() {
self.components_of_interest.insert(ComponentPathKey {
entity_path: chunk.entity_path().hash(),
component,
});
}
}
}
for missing_virtual_chunk_id in missing_virtual {
for root_id in store.find_root_chunks(missing_virtual_chunk_id) {
if let Some(components) = self.component_paths_from_root_id.get(&root_id) {
self.components_of_interest
.extend(components.iter().copied());
}
}
}
}
fn protect_used_and_missing(
&mut self,
store: &ChunkStore,
used_and_missing: &QueriedChunkIdTracker,
) {
let QueriedChunkIdTracker {
used_physical,
missing_virtual,
} = used_and_missing;
for physical_chunk_id in used_physical {
self.protected_chunks.physical.insert(*physical_chunk_id);
}
for chunk_id in missing_virtual {
for root_id in store.find_root_chunks(chunk_id) {
self.protected_chunks.roots.insert(root_id);
}
}
}
#[must_use = "Returns root chunks whose download got cancelled. Mark them as unloaded!"]
pub fn cancel_outdated_requests(&mut self, egui_now_time: f64) -> Vec<ChunkId> {
self.chunk_requests
.cancel_outdated_requests(egui_now_time, &self.protected_chunks.roots)
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
#[repr(u32)]
pub enum FetchStage {
Required = 0,
Similar(Option<Duration>) = 1,
Everything = 2,
}
impl PartialOrd for FetchStage {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FetchStage {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use std::cmp::Ordering;
match (self, other) {
(Self::Required, Self::Required) | (Self::Everything, Self::Everything) => {
Ordering::Equal
}
(Self::Similar(a), Self::Similar(b)) => match (a, b) {
(Some(a), Some(b)) => a.cmp(b),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
},
(Self::Required, _) | (_, Self::Everything) => Ordering::Less,
(_, Self::Required) | (Self::Everything, _) => Ordering::Greater,
}
}
}
impl Default for FetchStage {
fn default() -> Self {
Self::Similar(Some(Duration::from_secs(30)))
}
}
impl FetchStage {
pub fn is_required(&self) -> bool {
match self {
Self::Required => true,
Self::Similar(_) | Self::Everything => false,
}
}
pub fn is_everything(&self) -> bool {
match self {
Self::Required | Self::Similar(_) => false,
Self::Everything => true,
}
}
}
enum IterState {
Uninited,
Idx(usize),
Done,
}
#[derive(Clone, Copy)]
enum TimeRangeStage {
AfterCursor,
BeforeCursor,
AfterCursorOutsideLoop,
BeforeCursorOutsideLoop,
}
impl TimeRangeStage {
fn next(&self) -> Option<Self> {
match self {
Self::AfterCursor => Some(Self::BeforeCursor),
Self::BeforeCursor => Some(Self::AfterCursorOutsideLoop),
Self::AfterCursorOutsideLoop => Some(Self::BeforeCursorOutsideLoop),
Self::BeforeCursorOutsideLoop => None,
}
}
}
enum ChunkPriorityStage<'a> {
Start(QueriedChunkIdTracker),
Missing(std::vec::IntoIter<ChunkId>),
Static(usize),
HighPrio(IterState),
TimeQuery {
stage: TimeRangeStage,
iter_state: Option<OverlapIterState>,
interesting: bool,
},
Everything(std::collections::hash_map::Keys<'a, ChunkId, RootChunkInfo>),
Done,
}
#[must_use]
pub struct ChunkFetcher<'a> {
time_cursor: Option<PrefetchTimeCursor>,
visited_root_chunks: HashSet<ChunkId>,
chunk_id_scratch: Vec<ChunkId>,
pub state: PrioritizationState,
store: &'a ChunkStore,
prioritizer: &'a mut ChunkPrioritizer,
root_chunks: &'a HashMap<ChunkId, RootChunkInfo>,
next_chunk: Option<PrioritizedRootChunk>,
fetch_stage: ChunkPriorityStage<'a>,
request_batcher: Option<ChunkRequestBatcher<'a>>,
}
impl Drop for ChunkFetcher<'_> {
fn drop(&mut self) {
if self.request_batcher.is_some() {
re_log::debug_warn_once!("`ChunkFetcher::finish` not called for `ChunkFetcher`");
}
}
}
impl ChunkFetcher<'_> {
fn peek_chunk(&mut self) -> Option<PrioritizedRootChunk> {
if self.next_chunk.is_none() {
self.next_chunk = self.next_chunk();
}
self.next_chunk
}
fn next_chunk(&mut self) -> Option<PrioritizedRootChunk> {
if let Some(chunk) = self.next_chunk.take() {
return Some(chunk);
}
loop {
match &mut self.fetch_stage {
ChunkPriorityStage::Start(tracker) => {
let mut missing_roots = Vec::new();
for missing_virtual_chunk_id in &tracker.missing_virtual {
self.store
.collect_root_ids(missing_virtual_chunk_id, &mut missing_roots);
}
missing_roots.sort();
missing_roots.dedup();
self.fetch_stage = ChunkPriorityStage::Missing(missing_roots.into_iter());
}
ChunkPriorityStage::Missing(missing) => {
if let Some(missing) = missing.next() {
return Some(PrioritizedRootChunk::required(missing));
} else {
self.fetch_stage = ChunkPriorityStage::Static(0);
}
}
ChunkPriorityStage::Static(idx) => {
if let Some(c) = self.prioritizer.static_chunk_ids.get(*idx) {
*idx += 1;
return Some(PrioritizedRootChunk::required(*c));
} else {
self.fetch_stage = ChunkPriorityStage::HighPrio(IterState::Uninited);
}
}
ChunkPriorityStage::HighPrio(idx) => {
if let Some(time_cursor) = self.time_cursor
&& let Some(chunks_on_timeline) = self
.prioritizer
.high_priority_chunks
.temporal_chunks
.get(time_cursor.timeline().name())
&& let Some(current_idx) = match idx {
IterState::Uninited => {
let (new_idx, res) = if let Some(idx) = chunks_on_timeline
.partition_point(|c| c.time_range.min <= time_cursor.time)
.checked_sub(1)
{
(IterState::Idx(idx), Some(idx))
} else {
(IterState::Done, None)
};
*idx = new_idx;
res
}
IterState::Idx(idx) => Some(*idx),
IterState::Done => None,
}
&& let Some(c) = chunks_on_timeline.get(current_idx)
{
*idx = if let Some(idx) = current_idx.checked_sub(1) {
IterState::Idx(idx)
} else {
IterState::Done
};
return Some(PrioritizedRootChunk::required(c.chunk_id));
} else {
self.fetch_stage = ChunkPriorityStage::TimeQuery {
stage: TimeRangeStage::AfterCursor,
iter_state: None,
interesting: true,
};
}
}
ChunkPriorityStage::TimeQuery {
stage,
iter_state,
interesting,
} => {
let stage = *stage;
let interesting = *interesting;
let mut iter_state = *iter_state;
if let Some(chunk) =
self.next_in_time_query(stage, &mut iter_state, interesting)
{
self.fetch_stage = ChunkPriorityStage::TimeQuery {
stage,
iter_state,
interesting,
};
return Some(chunk);
} else if let Some(stage) = stage.next() {
self.fetch_stage = ChunkPriorityStage::TimeQuery {
stage,
iter_state: None,
interesting,
};
} else if interesting {
self.fetch_stage = ChunkPriorityStage::TimeQuery {
stage: TimeRangeStage::AfterCursor,
iter_state: None,
interesting: false,
};
} else {
self.fetch_stage = ChunkPriorityStage::Everything(self.root_chunks.keys());
}
}
ChunkPriorityStage::Everything(chunks) => {
if let Some(chunk_id) = chunks.next() {
return Some(PrioritizedRootChunk::everything(*chunk_id));
} else {
self.fetch_stage = ChunkPriorityStage::Done;
}
}
ChunkPriorityStage::Done => return None,
}
}
}
fn next_in_time_query(
&self,
stage: TimeRangeStage,
cursor: &mut Option<OverlapIterState>,
interesting: bool,
) -> Option<PrioritizedRootChunk> {
let time_cursor = self.time_cursor?;
let query = match stage {
TimeRangeStage::AfterCursor => {
let loop_range = time_cursor.loop_range?;
AbsoluteTimeRange::new(loop_range.min.max(time_cursor.time), loop_range.max)
}
TimeRangeStage::BeforeCursor => {
let loop_range = time_cursor.loop_range?;
AbsoluteTimeRange::new(
loop_range.min,
loop_range.max.min(time_cursor.time.saturating_sub(1)),
)
}
TimeRangeStage::AfterCursorOutsideLoop => AbsoluteTimeRange::new(
time_cursor
.loop_range
.map(|r| r.max + TimeInt::new_temporal(1))
.unwrap_or(time_cursor.time),
TimeInt::MAX,
),
TimeRangeStage::BeforeCursorOutsideLoop => AbsoluteTimeRange::new(
TimeInt::MIN,
time_cursor
.loop_range
.map(|r| r.min.saturating_sub(1))
.unwrap_or_else(|| time_cursor.time.saturating_sub(1)),
),
};
if query.is_empty() {
return None;
}
let map = self
.prioritizer
.root_chunk_intervals
.get(&time_cursor.timeline())?;
let mut iter = match *cursor {
Some(c) => map.resume_query(query.min..=query.max, c),
None => map.query(query.min..=query.max),
};
let chunk = iter.find(|(_, c)| {
let is_interesting = self
.prioritizer
.component_paths_from_root_id
.get(c)
.is_some_and(|k| {
k.iter()
.any(|k| self.prioritizer.components_of_interest.contains(k))
});
is_interesting == interesting
});
*cursor = Some(iter.cursor());
let (range, chunk_id) = chunk?;
let range = AbsoluteTimeRange::new(*range.start(), *range.end());
let chunk = if interesting {
let after = Duration::try_from_secs_f64(
(range.min - time_cursor.time).max(TimeInt::ZERO).as_f64()
/ time_cursor.speed_if_unpaused,
)
.ok();
let real_time_offset = match stage {
TimeRangeStage::AfterCursor => after,
TimeRangeStage::BeforeCursor => time_cursor.loop_range.and_then(|loop_range| {
Duration::try_from_secs_f64(
((loop_range.max - time_cursor.time).max(TimeInt::ZERO)
+ (range.min - loop_range.min).max(TimeInt::ZERO))
.as_f64()
/ time_cursor.speed_if_unpaused,
)
.ok()
}),
TimeRangeStage::AfterCursorOutsideLoop => {
if time_cursor.loop_range.is_some() {
None
} else {
after
}
}
TimeRangeStage::BeforeCursorOutsideLoop => None,
};
PrioritizedRootChunk::similar(*chunk_id, real_time_offset)
} else {
PrioritizedRootChunk::everything(*chunk_id)
};
Some(chunk)
}
pub fn fetch(
&mut self,
budget: &mut RemainingByteBudget,
to_state: FetchStage,
) -> Result<(), PrefetchError> {
let Some(mut batcher) = self.request_batcher.take() else {
return Ok(());
};
let res = self.fetch_inner(&mut batcher, budget, to_state);
self.request_batcher = Some(batcher);
res
}
fn fetch_inner(
&mut self,
batcher: &mut ChunkRequestBatcher<'_>,
budget: &mut RemainingByteBudget,
to_state: FetchStage,
) -> Result<(), PrefetchError> {
if self.state.all_required_are_loaded.is_none() {
self.state.all_required_are_loaded = Some(true);
}
let entity_paths = batcher.manifest.col_chunk_entity_path_raw();
loop {
if self.peek_chunk().is_some_and(|next| next.stage > to_state) {
break;
}
let Some(PrioritizedRootChunk {
stage,
root_chunk_id,
}) = self.next_chunk()
else {
break;
};
if !self.visited_root_chunks.insert(root_chunk_id) {
continue; }
let Some(root_chunk) = self.root_chunks.get(&root_chunk_id) else {
re_log::debug_warn_once!("Missing root chunk");
continue;
};
self.store
.collect_physical_descendents_of(&root_chunk_id, &mut self.chunk_id_scratch);
match root_chunk.state {
LoadState::Unloaded | LoadState::InTransit => {
if stage.is_required() {
self.state.all_required_are_loaded = Some(false);
}
let row_idx = root_chunk.row_id;
let uncompressed_chunk_size = batcher.chunk_byte_size_uncompressed[row_idx];
if budget.total_bytes_in_memory < uncompressed_chunk_size {
warn_entity_exceeds_memory(entity_paths.value(row_idx));
self.state.some_chunks_too_big = true;
self.chunk_id_scratch.clear();
continue;
}
if !budget.try_fit_in_memory(uncompressed_chunk_size, stage.is_required()) {
self.state.memory_budget_filled = true;
self.chunk_id_scratch.clear();
break;
}
if root_chunk.state == LoadState::Unloaded
&& !batcher.try_fetch(row_idx, budget)?
{
self.state.transit_budget_filled = true;
self.chunk_id_scratch.clear();
break;
}
self.prioritizer
.protected_chunks
.roots
.insert(root_chunk_id);
self.prioritizer
.protected_chunks
.physical
.extend(self.chunk_id_scratch.drain(..));
}
LoadState::FullyLoaded => {
self.prioritizer
.protected_chunks
.roots
.insert(root_chunk_id);
for chunk_id in self.chunk_id_scratch.drain(..) {
if self
.prioritizer
.protected_chunks
.physical
.contains(&chunk_id)
{
continue; }
let Some(chunk) = self.store.physical_chunk(&chunk_id) else {
re_log::debug_warn_once!(
"Couldn't get physical chunk from chunk store"
);
continue;
};
let bytes = Chunk::total_size_bytes(chunk.as_ref());
if !budget.try_fit_in_memory(bytes, stage.is_required()) {
self.state.memory_budget_filled = true;
break;
}
self.prioritizer.protected_chunks.physical.insert(chunk_id);
}
self.chunk_id_scratch.clear();
if self.state.memory_budget_filled {
break;
}
}
}
}
if self
.peek_chunk()
.is_some_and(|next| next.stage.is_required())
{
self.state.all_required_are_loaded = Some(false);
}
Ok(())
}
pub fn finish(
mut self,
load_chunks: &dyn Fn(RecordBatch) -> super::ChunkPromise,
) -> Result<ChunkFetchResult, PrefetchError> {
let prioritizer = &mut *self.prioritizer;
prioritizer.frame_visited = std::mem::take(&mut self.visited_root_chunks);
let mut state = self.state;
if state.all_required_are_loaded.is_none() {
state.all_required_are_loaded = prioritizer
.latest_result
.as_ref()
.and_then(|prev| prev.all_required_are_loaded);
}
prioritizer.latest_result = Some(state);
let mut res = ChunkFetchResult {
new_in_transit_chunks: Vec::new(),
time_cursor: self.time_cursor.as_deref().copied(),
};
if let Some(batcher) = self.request_batcher.take() {
let to_load = batcher.finish()?;
for (rb, batch_info) in to_load {
res.new_in_transit_chunks
.extend(batch_info.root_chunk_ids.iter().copied());
let promise = load_chunks(rb);
let batch = crate::chunk_requests::ChunkBatchRequest {
promise: Mutex::new(Some(promise)),
info: batch_info.into(),
};
self.prioritizer.chunk_requests_mut().add(batch);
}
}
Ok(res)
}
}
#[must_use]
pub struct ChunkFetchResult {
pub(super) new_in_transit_chunks: Vec<ChunkId>,
pub(super) time_cursor: Option<TimelinePoint>,
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use arrow::array::RecordBatch;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, EntityPath, RowId, TimeInt, Timeline};
use re_chunk_store::ChunkStore;
use re_log_encoding::RrdManifest;
use re_log_types::example_components::{MyPoint, MyPoints};
use re_log_types::{AbsoluteTimeRange, StoreId, StoreKind, TimePoint};
use re_types_core::ChunkId;
use crate::ChunkPromise;
use crate::rrd_manifest_index::RrdManifestIndex;
use super::*;
fn setup_test_recording(chunks: &[Arc<Chunk>]) -> (ChunkStore, RrdManifestIndex) {
let store_id = StoreId::random(StoreKind::Recording, "test");
let manifest = re_log_encoding::RrdManifest::build_in_memory_from_chunks(
store_id.clone(),
chunks.iter().map(|c| &**c),
)
.unwrap();
let mut store = ChunkStore::new(store_id, Default::default());
let _events = store.insert_rrd_manifest(manifest.clone());
let mut manifest_index = RrdManifestIndex::default();
manifest_index
.append(manifest, store.entity_tree())
.unwrap();
(store, manifest_index)
}
fn build_temporal_chunk(entity: &str, timeline: Timeline, time: i64) -> Arc<Chunk> {
let point = MyPoint::new(1.0, 1.0);
Arc::new(
Chunk::builder(EntityPath::from(entity))
.with_component_batch(
RowId::new(),
TimePoint::from_iter([(timeline, time)]),
(MyPoints::descriptor_points(), &[point] as _),
)
.build()
.unwrap(),
)
}
fn build_static_chunk(entity: &str) -> Arc<Chunk> {
let point = MyPoint::new(1.0, 1.0);
Arc::new(
Chunk::builder(EntityPath::from(entity))
.with_component_batch(
RowId::new(),
TimePoint::STATIC,
(MyPoints::descriptor_points(), &[point] as _),
)
.build()
.unwrap(),
)
}
fn chunk_ids_in_batch(rb: &RecordBatch) -> Vec<ChunkId> {
let col = rb
.column_by_name(RrdManifest::FIELD_CHUNK_ID)
.expect("missing chunk_id column");
let arr = col
.as_any()
.downcast_ref::<arrow::array::FixedSizeBinaryArray>()
.expect("chunk_id column should be FixedSizeBinaryArray");
ChunkId::try_slice_from_arrow(arr)
.expect("chunk_id should decode")
.to_vec()
}
fn recording_load_fn() -> (
impl Fn(RecordBatch) -> ChunkPromise,
Arc<re_mutex::Mutex<Vec<ChunkId>>>,
) {
let requested = Arc::new(re_mutex::Mutex::new(Vec::<ChunkId>::new()));
let out = Arc::clone(&requested);
let load = move |rb: RecordBatch| {
out.lock().extend(chunk_ids_in_batch(&rb));
poll_promise::Promise::from_ready(Ok(vec![]))
};
(load, requested)
}
struct FetchOutcome {
requested: Vec<ChunkId>,
result: ChunkFetchResult,
state: PrioritizationState,
}
fn run_fetch(
manifest_index: &mut RrdManifestIndex,
store: &ChunkStore,
budget: &mut RemainingByteBudget,
stage: FetchStage,
) -> FetchOutcome {
let options = ChunkPrefetchOptions::default();
let mut fetcher = manifest_index
.prepare_chunk_fetcher(store, &options, None, budget)
.expect("should create fetcher");
fetcher.fetch(budget, stage).unwrap();
let state = fetcher.state;
let (load, requested) = recording_load_fn();
let result = fetcher.finish(&load).unwrap();
let requested = std::mem::take(&mut *requested.lock());
FetchOutcome {
requested,
result,
state,
}
}
#[test]
fn fully_loaded_chunks_drop_protection_when_memory_budget_is_exhausted() {
let tl = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = (0..5)
.map(|i| build_temporal_chunk(&format!("/entity_{i}"), tl, (i as i64 + 1) * 100))
.collect();
let (mut store, mut manifest_index) = setup_test_recording(&chunks);
for chunk in &chunks {
let events = store.insert_chunk(chunk).unwrap();
manifest_index.on_events(&store, &events);
}
let total_physical_bytes: u64 = store
.iter_physical_chunks()
.map(|c| Chunk::total_size_bytes(c.as_ref()))
.sum();
assert!(total_physical_bytes > 0);
let budget_bytes = total_physical_bytes / 2;
let mut budget = RemainingByteBudget::new(budget_bytes, u64::MAX);
let outcome = run_fetch(
&mut manifest_index,
&store,
&mut budget,
FetchStage::Everything,
);
assert!(
outcome.state.memory_budget_filled,
"budget should be exhausted with budget {budget_bytes} for {total_physical_bytes} total bytes"
);
assert!(
outcome.requested.is_empty(),
"fully-loaded chunks should never go through the load callback"
);
assert!(
outcome.result.new_in_transit_chunks.is_empty(),
"fully-loaded chunks should not transition to InTransit"
);
let protected = manifest_index.chunk_prioritizer().protected_chunks();
assert!(
protected.roots.len() < chunks.len(),
"budget fit {} out of {} root chunks; expected fewer than all",
protected.roots.len(),
chunks.len()
);
assert!(
!protected.roots.is_empty(),
"at least one chunk must fit in half the total budget"
);
for root_id in &protected.roots {
assert!(
chunks.iter().any(|c| c.id() == *root_id),
"protected root {root_id:?} should be one of the chunks we inserted"
);
}
}
#[test]
fn required_pass_only_fetches_static_then_everything_fetches_the_rest() {
let tl = Timeline::new_sequence("frame");
let static_chunk = build_static_chunk("/static_entity");
let temporal_chunks: Vec<Arc<Chunk>> = (0..3)
.map(|i| build_temporal_chunk("/temporal_entity", tl, (i + 1) * 100))
.collect();
let mut all_chunks = vec![Arc::clone(&static_chunk)];
all_chunks.extend(temporal_chunks.iter().cloned());
let (store, mut manifest_index) = setup_test_recording(&all_chunks);
let mut budget = RemainingByteBudget::new(u64::MAX, u64::MAX);
let required = run_fetch(
&mut manifest_index,
&store,
&mut budget,
FetchStage::Required,
);
assert_eq!(
required.requested,
vec![static_chunk.id()],
"Required pass should only load the static chunk"
);
assert_eq!(
required.result.new_in_transit_chunks,
vec![static_chunk.id()],
"only the static chunk should transition to InTransit"
);
manifest_index.handle_fetch_result(required.result);
let everything = run_fetch(
&mut manifest_index,
&store,
&mut budget,
FetchStage::Everything,
);
let requested: HashSet<ChunkId> = everything.requested.iter().copied().collect();
let expected: HashSet<ChunkId> = temporal_chunks.iter().map(|c| c.id()).collect();
assert_eq!(
requested, expected,
"Everything pass should load exactly the remaining temporal chunks"
);
assert_eq!(
everything
.result
.new_in_transit_chunks
.iter()
.copied()
.collect::<HashSet<_>>(),
expected,
);
}
#[test]
fn memory_budget_caps_how_many_chunks_get_loaded() {
let tl = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = (0..5)
.map(|i| build_temporal_chunk("/e", tl, (i + 1) * 100))
.collect();
let (store, mut manifest_index) = setup_test_recording(&chunks);
let manifest = manifest_index.manifest().unwrap();
let one_chunk_size = manifest.col_chunk_byte_size_uncompressed()[0];
assert!(
one_chunk_size > 0,
"manifest should report nonzero chunk size"
);
let budget_bytes = one_chunk_size * 2 + one_chunk_size / 2;
let mut budget = RemainingByteBudget::new(budget_bytes, u64::MAX);
let outcome = run_fetch(
&mut manifest_index,
&store,
&mut budget,
FetchStage::Everything,
);
assert!(
outcome.state.memory_budget_filled,
"memory budget should report as filled"
);
assert!(
outcome.requested.len() < chunks.len(),
"budget should cap the load: got {} out of {}",
outcome.requested.len(),
chunks.len()
);
assert!(
!outcome.requested.is_empty(),
"budget should allow at least one chunk"
);
assert_eq!(
outcome
.result
.new_in_transit_chunks
.iter()
.copied()
.collect::<HashSet<_>>(),
outcome.requested.iter().copied().collect::<HashSet<_>>(),
"new_in_transit_chunks should match the IDs passed to the load callback"
);
}
#[test]
fn similar_stage_skips_chunks_beyond_reach_time() {
let tl = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = (0..5)
.map(|i| build_temporal_chunk("/entity", tl, (i + 1) * 100))
.collect();
let (store, mut manifest_index) = setup_test_recording(&chunks);
store.report_missing_virtual_chunk_id(chunks[0].id());
let time_cursor = PrefetchTimeCursor {
time_cursor: (tl, TimeInt::new_temporal(0)).into(),
speed_if_unpaused: 100.0,
loop_range: None,
};
let options = ChunkPrefetchOptions::default();
let mut budget = RemainingByteBudget::new(u64::MAX, u64::MAX);
let mut fetcher = manifest_index
.prepare_chunk_fetcher(&store, &options, Some(time_cursor), &mut budget)
.expect("should create fetcher");
fetcher
.fetch(
&mut budget,
FetchStage::Similar(Some(Duration::from_secs(3))),
)
.unwrap();
let (load, requested) = recording_load_fn();
let _result = fetcher.finish(&load).unwrap();
let requested: HashSet<ChunkId> = requested.lock().iter().copied().collect();
let within_cap: HashSet<ChunkId> = chunks[..=2].iter().map(|c| c.id()).collect();
let beyond_cap: HashSet<ChunkId> = chunks[3..].iter().map(|c| c.id()).collect();
assert!(within_cap.is_subset(&requested));
assert!(beyond_cap.is_disjoint(&requested));
}
#[test]
fn similar_stage_without_time_cap_fetches_all_reachable_chunks() {
let tl = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = (0..5)
.map(|i| build_temporal_chunk("/entity", tl, (i + 1) * 100))
.collect();
let (store, mut manifest_index) = setup_test_recording(&chunks);
store.report_missing_virtual_chunk_id(chunks[0].id());
let time_cursor = PrefetchTimeCursor {
time_cursor: (tl, TimeInt::new_temporal(0)).into(),
speed_if_unpaused: 100.0,
loop_range: None,
};
let options = ChunkPrefetchOptions::default();
let mut budget = RemainingByteBudget::new(u64::MAX, u64::MAX);
let mut fetcher = manifest_index
.prepare_chunk_fetcher(&store, &options, Some(time_cursor), &mut budget)
.expect("should create fetcher");
fetcher
.fetch(&mut budget, FetchStage::Similar(None))
.unwrap();
let (load, requested) = recording_load_fn();
let _result = fetcher.finish(&load).unwrap();
let requested: HashSet<ChunkId> = requested.lock().iter().copied().collect();
let expected: HashSet<ChunkId> = chunks.iter().map(|c| c.id()).collect();
assert_eq!(requested, expected);
}
#[test]
fn similar_stage_honours_loop_wrap_around_and_excludes_chunks_outside_loop() {
let tl = Timeline::new_sequence("frame");
let chunks: Vec<Arc<Chunk>> = [100, 450, 500, 600, 700, 900]
.into_iter()
.map(|t| build_temporal_chunk("/entity", tl, t))
.collect();
let (store, mut manifest_index) = setup_test_recording(&chunks);
store.report_missing_virtual_chunk_id(chunks[0].id());
let time_cursor = PrefetchTimeCursor {
time_cursor: (tl, TimeInt::new_temporal(500)).into(),
speed_if_unpaused: 100.0,
loop_range: Some(AbsoluteTimeRange::new(
TimeInt::new_temporal(400),
TimeInt::new_temporal(700),
)),
};
let options = ChunkPrefetchOptions::default();
let mut budget = RemainingByteBudget::new(u64::MAX, u64::MAX);
let mut fetcher = manifest_index
.prepare_chunk_fetcher(&store, &options, Some(time_cursor), &mut budget)
.expect("should create fetcher");
fetcher
.fetch(
&mut budget,
FetchStage::Similar(Some(Duration::from_secs(3))),
)
.unwrap();
let (load, requested) = recording_load_fn();
let _result = fetcher.finish(&load).unwrap();
let requested: HashSet<ChunkId> = requested.lock().iter().copied().collect();
let inside_loop_and_required: HashSet<ChunkId> =
chunks[..5].iter().map(|c| c.id()).collect();
assert!(inside_loop_and_required.is_subset(&requested));
assert!(!requested.contains(&chunks[5].id()));
}
}