use std::collections::binary_heap::PeekMut;
use std::collections::{BTreeMap, BinaryHeap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use ahash::HashMap;
use arrow::datatypes::DataType;
use egui::NumExt as _;
use parking_lot::RwLock;
use re_byte_size::SizeBytes as _;
use re_chunk::{ChunkId, EntityPath, Span, Timeline, TimelineName};
use re_chunk_store::{ChunkDirectLineageReport, ChunkStoreDiff, ChunkStoreEvent};
use re_entity_db::EntityDb;
use re_log::{debug_assert, debug_panic};
use re_log_types::{EntityPathHash, TimeType};
use re_sdk_types::archetypes::VideoStream;
use re_sdk_types::components;
use re_video::{DecodeSettings, SampleMetadataState, StableIndexDeque};
use crate::Cache;
#[cfg(test)]
mod test_player;
pub struct PlayableVideoStream {
pub video_renderer: re_renderer::video::Video,
}
impl re_byte_size::SizeBytes for PlayableVideoStream {
fn heap_size_bytes(&self) -> u64 {
let Self { video_renderer } = self;
video_renderer.heap_size_bytes()
}
}
impl PlayableVideoStream {
pub fn video_descr(&self) -> &re_video::VideoDataDescription {
self.video_renderer.data_descr()
}
}
struct VideoStreamCacheEntry {
used_this_frame: AtomicBool,
video_stream: Arc<RwLock<PlayableVideoStream>>,
known_chunk_ranges: BTreeMap<ChunkId, ChunkSampleRange>,
}
impl re_byte_size::SizeBytes for VideoStreamCacheEntry {
fn heap_size_bytes(&self) -> u64 {
let Self {
used_this_frame: _,
video_stream,
known_chunk_ranges,
} = self;
video_stream.read().heap_size_bytes() + known_chunk_ranges.heap_size_bytes()
}
}
#[derive(Hash, Eq, PartialEq)]
struct VideoStreamKey {
entity_path: EntityPathHash,
timeline: TimelineName,
}
impl re_byte_size::SizeBytes for VideoStreamKey {
fn heap_size_bytes(&self) -> u64 {
let Self {
entity_path,
timeline,
} = self;
entity_path.heap_size_bytes() + timeline.heap_size_bytes()
}
}
#[derive(Default)]
pub struct VideoStreamCache(HashMap<VideoStreamKey, VideoStreamCacheEntry>);
#[derive(thiserror::Error, Debug)]
pub enum VideoStreamProcessingError {
#[error("No video samples.")]
NoVideoSamplesFound,
#[error("Unexpected arrow type for video sample {0}")]
InvalidVideoSampleType(DataType),
#[error("No codec specified.")]
MissingCodec,
#[error("Codec not loaded yet.")]
UnloadedCodec,
#[error("Failed to read codec - {0}")]
FailedReadingCodec(Box<re_chunk::ChunkError>),
#[error("Received video samples were not in chronological order.")]
OutOfOrderSamples,
#[error("Chunks changed unexpectedly")]
UnexpectedChunkChanges,
}
const _: () = assert!(
std::mem::size_of::<VideoStreamProcessingError>() <= 64,
"Error type is too large. Try to reduce its size by boxing some of its variants.",
);
pub type SharablePlayableVideoStream = Arc<RwLock<PlayableVideoStream>>;
impl VideoStreamCache {
pub fn entry(
&mut self,
store: &re_entity_db::EntityDb,
entity_path: &EntityPath,
timeline: TimelineName,
decode_settings: DecodeSettings,
) -> Result<SharablePlayableVideoStream, VideoStreamProcessingError> {
re_tracing::profile_function!();
let key = VideoStreamKey {
entity_path: entity_path.hash(),
timeline,
};
let entry = match self.0.entry(key) {
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
occupied_entry.into_mut()
}
std::collections::hash_map::Entry::Vacant(vacant_entry) => {
let (video_descr, known_chunk_ranges) =
load_video_data_from_chunks(store, entity_path, timeline)?;
let video = re_renderer::video::Video::load(
entity_path.to_string(),
video_descr,
decode_settings,
);
vacant_entry.insert(VideoStreamCacheEntry {
used_this_frame: AtomicBool::new(true),
video_stream: Arc::new(RwLock::new(PlayableVideoStream {
video_renderer: video,
})),
known_chunk_ranges,
})
}
};
entry.used_this_frame.store(true, Ordering::Release);
Ok(entry.video_stream.clone())
}
fn handle_store_event(
&mut self,
entity_db: &EntityDb,
event: &ChunkStoreEvent,
timeline: &Timeline,
key: &VideoStreamKey,
) {
let Some(entry) = self.0.get_mut(key) else {
return;
};
let mut video_stream = entry.video_stream.write();
let PlayableVideoStream { video_renderer } = &mut *video_stream;
let video_data = video_renderer.data_descr_mut();
video_data.delivery_method = re_video::VideoDeliveryMethod::new_stream();
let timeline_name = *timeline.name();
let encoding_details_before = video_data.encoding_details.clone();
let mut insertion_info: Option<SampleReorderInfo> = None;
let result = match &event.diff {
ChunkStoreDiff::Addition(add) => {
let chunk = &add.chunk_after_processing;
let requires_sorting = !chunk.is_timeline_sorted(&timeline_name);
let chunk = if requires_sorting {
&chunk.sorted_by_timeline_if_unsorted(&timeline_name)
} else {
chunk
};
let result = if let Some(known_range) = entry.known_chunk_ranges.get(&chunk.id()) {
read_samples_from_known_chunk(
timeline_name,
chunk,
known_range,
known_range,
video_data,
)
} else {
match &add.direct_lineage {
ChunkDirectLineageReport::SplitFrom(original_chunk, siblings) => {
handle_split_chunk_addition(
timeline_name,
&mut entry.known_chunk_ranges,
video_data,
chunk,
original_chunk,
siblings,
)
}
ChunkDirectLineageReport::CompactedFrom(old_chunks) => {
handle_compacted_chunk_addition(
timeline_name,
&mut entry.known_chunk_ranges,
video_data,
chunk,
old_chunks,
requires_sorting,
)
}
_ => read_samples_from_new_chunk(
timeline_name,
chunk,
video_data,
&mut entry.known_chunk_ranges,
),
}
};
match result {
Err(VideoStreamProcessingError::OutOfOrderSamples) => {
re_log::trace_once!(
"Out-of-order chunk detected, performing delta re-merge"
);
match handle_out_of_order_chunk(
video_data,
&mut entry.known_chunk_ranges,
timeline_name,
chunk,
) {
Ok(info) => {
insertion_info = Some(info);
Ok(())
}
Err(err) => Err(err),
}
}
other => other,
}
}
ChunkStoreDiff::Deletion(del) => {
let known_ranges = &mut entry.known_chunk_ranges;
handle_deletion(entity_db, timeline, video_data, &del.chunk, known_ranges)
}
ChunkStoreDiff::VirtualAddition(_) | ChunkStoreDiff::SchemaAddition(_) => Ok(()),
};
let encoding_details_changed = encoding_details_before != video_data.encoding_details;
if encoding_details_changed
&& let Some(before) = encoding_details_before
&& let Some(after) = &video_data.encoding_details
{
let name = event
.delta_chunk()
.map(|c| c.entity_path().to_string())
.unwrap_or_else(|| "<UNKNOWN>".to_owned());
re_log::error_once!(
"The video stream codec details of {name:?} changed (from {before:?} to {after:?}). This is not supported."
);
}
let _ = video_data;
if encoding_details_changed {
video_renderer.reset_all_decoders();
}
if let Some(info) = insertion_info {
video_renderer.handle_sample_insertion(
info.change_start,
info.change_end,
info.size_delta,
);
}
match result {
Ok(()) => {
if cfg!(debug_assertions)
&& let Err(err) = video_renderer.data_descr().sanity_check()
{
debug_panic!(
"VideoDataDescription sanity check stream at {:?} failed: {err}",
event.delta_chunk().map(|c| c.entity_path())
);
}
}
Err(VideoStreamProcessingError::OutOfOrderSamples) => {
debug_panic!(
"Unexpected chunk samples after `handle_out_of_order_chunk` has been called"
);
re_log::debug_once!(
"Found out of order samples after `handle_out_of_order_chunk` has been called"
);
drop(video_stream);
self.0.remove(key);
}
Err(VideoStreamProcessingError::UnexpectedChunkChanges) => {
re_log::debug_warn_once!("Unexpected chunk changes for video samples");
drop(video_stream);
self.0.remove(key);
}
Err(err) => {
re_log::error_once!(
"Failed to read process additional incoming video samples: {err}"
);
}
}
}
}
fn handle_deletion(
entity_db: &EntityDb,
timeline: &Timeline,
video_data: &mut re_video::VideoDataDescription,
deleted_chunk: &re_chunk::Chunk,
known_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
) -> Result<(), VideoStreamProcessingError> {
let Some(known_range) = known_ranges.remove(&deleted_chunk.id()) else {
return Ok(());
};
let storage_engine = entity_db.storage_engine();
let store = storage_engine.store();
let tmp = BTreeMap::default();
let per_chunk_map = entity_db
.rrd_manifest_index()
.manifest()
.and_then(|manifest| manifest.temporal_map().get(deleted_chunk.entity_path()))
.and_then(|per_timeline| {
let per_component = per_timeline.get(timeline)?;
per_component.get(&VideoStream::descriptor_sample().component)
})
.unwrap_or(&tmp);
let mut rrd_manifest_chunks: Vec<_> = store
.find_root_chunks(&deleted_chunk.id())
.into_iter()
.filter_map(|chunk_id| {
let entry = per_chunk_map.get(&chunk_id)?;
Some((entry, chunk_id))
})
.collect();
let (sample_count, other_min, other_max) = video_data
.samples
.iter_index_range_clamped(&known_range.idx_range())
.fold(
(
0usize,
video_data.samples.next_index(),
video_data.samples.min_index(),
),
|(mut count, mut other_min, mut other_max), (idx, sample)| {
if sample.source_id() == deleted_chunk.id().as_tuid() {
count += 1;
} else {
other_min = other_min.min(idx);
other_max = other_max.max(idx);
}
(count, other_min, other_max)
},
);
let count_from_manifest = rrd_manifest_chunks
.iter()
.map(|(entry, _)| entry.num_rows as usize)
.sum::<usize>();
let possibly_start_removals = if known_range.first_sample == video_data.samples.min_index() {
other_min - known_range.first_sample + 1
} else {
0
};
let possibly_end_removals = if known_range.last_sample + 1 == video_data.samples.next_index() {
known_range.last_sample - other_max + 1
} else {
0
};
let required_removals = sample_count.saturating_sub(count_from_manifest);
let clear_start = if required_removals <= possibly_start_removals {
required_removals > 0
} else if required_removals <= possibly_end_removals {
false
} else {
return Err(VideoStreamProcessingError::UnexpectedChunkChanges);
};
let mut samples = video_data
.samples
.iter_index_range_clamped_mut(&known_range.idx_range());
if clear_start {
rrd_manifest_chunks
.sort_unstable_by_key(|(entry, _)| std::cmp::Reverse(entry.time_range.min));
} else {
rrd_manifest_chunks.sort_unstable_by_key(|(entry, _)| entry.time_range.min);
}
'outer: for (entry, chunk_id) in rrd_manifest_chunks {
for _ in 0..entry.num_rows {
let sample = if clear_start {
samples.next_back()
} else {
samples.next()
};
let Some((idx, sample)) = sample else {
break 'outer;
};
if sample.source_id() != deleted_chunk.id().as_tuid() {
continue;
}
known_ranges
.entry(chunk_id)
.and_modify(|range| {
range.add_sample(idx);
})
.or_insert_with(|| ChunkSampleRange::one(idx));
sample.unload(Some(chunk_id.as_tuid()));
}
}
drop(samples);
if required_removals > 0 {
if clear_start {
let to_index = video_data.samples.min_index() + required_removals - 1;
video_data
.samples
.remove_all_with_index_smaller_equal(to_index);
} else {
let from_index = video_data.samples.next_index() - required_removals;
video_data
.samples
.remove_all_with_index_larger_equal(from_index);
}
}
adjust_keyframes_for_removed_samples(video_data);
Ok(())
}
fn handle_compacted_chunk_addition(
timeline: TimelineName,
known_chunk_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
video_data: &mut re_video::VideoDataDescription,
compacted_chunk: &re_chunk::Chunk,
old_chunks: &BTreeMap<ChunkId, Arc<re_chunk::Chunk>>,
chunk_order_changed: bool,
) -> Result<(), VideoStreamProcessingError> {
let mut reused_chunks = Vec::new();
let mut unseen_chunks = Vec::new();
for (old_chunk_id, old_chunk) in old_chunks {
if let Some(old_known_range) = known_chunk_ranges.remove(old_chunk_id) {
reused_chunks.push((old_known_range, old_chunk));
} else {
unseen_chunks.push(old_chunk);
}
}
if reused_chunks.is_empty() {
return read_samples_from_new_chunk(
timeline,
compacted_chunk,
video_data,
known_chunk_ranges,
);
}
let first_unloaded_sample_idx = reused_chunks
.iter()
.filter_map(|(range, chunk)| {
video_data
.samples
.iter_index_range_clamped_mut(&range.idx_range())
.filter(|(_, s)| s.source_id() == chunk.id().as_tuid())
.find(|(_, s)| s.is_unloaded())
.map(|(idx, _)| idx)
})
.min()
.or_else(|| {
if unseen_chunks.is_empty() {
None
} else {
Some(video_data.samples.next_index())
}
});
let mut min = None;
let mut max = None;
let mut sample_count = 0;
let mut sample_count_after_first_unloaded = 0;
let mut update_min_max = |idx: re_video::SampleIndex| {
let min = min.get_or_insert(idx);
let max = max.get_or_insert(idx);
*min = idx.min(*min);
*max = idx.max(*max);
sample_count += 1;
if first_unloaded_sample_idx.is_some_and(|first_idx| first_idx <= idx) {
sample_count_after_first_unloaded += 1;
}
};
for (range, reused_chunk) in reused_chunks {
for (idx, sample) in video_data
.samples
.iter_index_range_clamped_mut(&range.idx_range())
.filter(|(_, s)| s.source_id() == reused_chunk.id().as_tuid())
{
update_min_max(idx);
*sample.source_id_mut() = compacted_chunk.id().as_tuid();
}
}
for unseen_chunk in unseen_chunks {
let Some(chunk_samples) = ChunkSamples::from_physical(unseen_chunk, timeline) else {
continue;
};
for mut sample in chunk_samples.samples {
*sample.source_id_mut() = compacted_chunk.id().as_tuid();
let idx = video_data.samples.next_index();
update_min_max(idx);
video_data.samples.push_back(sample);
}
}
if let Some(first_sample) = min
&& let Some(last_sample) = max
{
let known_range = ChunkSampleRange::new(first_sample, last_sample, sample_count);
let load_range = if chunk_order_changed {
&known_range
} else {
&ChunkSampleRange::new(
first_unloaded_sample_idx.unwrap_or(first_sample),
last_sample,
sample_count_after_first_unloaded,
)
};
let res = read_samples_from_known_chunk(
timeline,
compacted_chunk,
&known_range,
load_range,
video_data,
);
known_chunk_ranges.insert(compacted_chunk.id(), known_range);
video_data.keyframe_indices.dedup();
res
} else {
Ok(())
}
}
fn handle_split_chunk_addition(
timeline: TimelineName,
known_chunk_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
video_data: &mut re_video::VideoDataDescription,
split_chunk: &re_chunk::Chunk,
original_chunk: &re_chunk::Chunk,
siblings: &[Arc<re_chunk::Chunk>],
) -> Result<(), VideoStreamProcessingError> {
let Some(old_known_range) = known_chunk_ranges.remove(&original_chunk.id()) else {
return read_samples_from_new_chunk(timeline, split_chunk, video_data, known_chunk_ranges);
};
let mut samples = video_data
.samples
.iter_index_range_clamped_mut(&old_known_range.idx_range())
.filter(|(_, s)| s.source_id() == original_chunk.id().as_tuid());
flatten_chunk_samples(
std::iter::once(split_chunk)
.chain(siblings.iter().map(|c| &**c))
.filter_map(|chunk| ChunkSamples::from_physical(chunk, timeline))
.collect(),
known_chunk_ranges,
|next_sample| {
if let Some((idx, sample)) = samples.next() {
*sample = next_sample;
idx
} else {
re_log::debug_warn_once!(
"Split chunks ended up with more samples than the original chunk?"
);
old_known_range.last_sample
}
},
);
if cfg!(debug_assertions) {
let samples = samples.collect::<Vec<_>>();
debug_assert!(
samples.is_empty(),
"Expected no additional samples after processing split chunk, got {samples:?}"
);
} else {
drop(samples);
}
if let Some(known_range) = known_chunk_ranges.get(&split_chunk.id()) {
let res = read_samples_from_known_chunk(
timeline,
split_chunk,
known_range,
known_range,
video_data,
);
adjust_keyframes_for_removed_samples(video_data);
res
} else {
debug_panic!("This should've been inserted above in `handle_samples`");
Ok(())
}
}
fn load_video_data_from_chunks(
store: &re_entity_db::EntityDb,
entity_path: &EntityPath,
timeline: TimelineName,
) -> Result<
(
re_video::VideoDataDescription,
BTreeMap<ChunkId, ChunkSampleRange>,
),
VideoStreamProcessingError,
> {
re_tracing::profile_function!();
let sample_component = VideoStream::descriptor_sample().component;
let codec_component = VideoStream::descriptor_codec().component;
let entire_timeline_query =
re_chunk::RangeQuery::new(timeline, re_log_types::AbsoluteTimeRange::EVERYTHING);
let query_results = store.storage_engine().cache().range(
&entire_timeline_query,
entity_path,
[sample_component, codec_component],
);
let sample_chunks = query_results.get_required(sample_component).unwrap_or(&[]);
let codec_chunks = query_results
.get_required(codec_component)
.map_err(|_err| {
if store
.storage_engine()
.store()
.entity_has_component_on_timeline(&timeline, entity_path, codec_component)
{
VideoStreamProcessingError::UnloadedCodec
} else {
VideoStreamProcessingError::MissingCodec
}
})?;
let last_codec = codec_chunks
.last()
.and_then(|chunk| chunk.component_instance::<components::VideoCodec>(codec_component, 0, 0))
.ok_or(VideoStreamProcessingError::MissingCodec)?
.map_err(|err| VideoStreamProcessingError::FailedReadingCodec(Box::new(err)))?;
let codec = last_codec.into();
let mut video_descr = re_video::VideoDataDescription {
codec,
encoding_details: None, timescale: timescale_for_timeline(store, timeline),
delivery_method: re_video::VideoDeliveryMethod::new_stream(),
keyframe_indices: Vec::new(),
samples: StableIndexDeque::with_capacity(sample_chunks.len()), samples_statistics: re_video::SamplesStatistics::NO_BFRAMES, mp4_tracks: Default::default(),
};
let mut known_chunk_ranges = BTreeMap::new();
let known_chunks = if let Some(manifest) = store.rrd_manifest_index().manifest()
&& let Some(entity_timelines) = manifest.temporal_map().get(entity_path)
&& let Some((_, components)) = entity_timelines.iter().find(|(t, _)| *t.name() == timeline)
&& let Some(chunks) = components.get(&sample_component)
{
chunks
} else {
&BTreeMap::new()
};
let sorted_chunks = sample_chunks
.iter()
.map(|c| c.sorted_by_timeline_if_unsorted(&timeline))
.collect::<Vec<_>>();
load_known_chunk_ranges(
&mut video_descr,
store.storage_engine().store(),
&mut known_chunk_ranges,
known_chunks,
&sorted_chunks,
timeline,
);
re_tracing::profile_scope!("read", format!("{} chunks", sorted_chunks.len()));
for chunk in &sorted_chunks {
let Some(known_range) = known_chunk_ranges.get(&chunk.id()) else {
continue;
};
if let Err(err) = read_samples_from_known_chunk(
timeline,
chunk,
known_range,
known_range,
&mut video_descr,
) {
match err {
VideoStreamProcessingError::OutOfOrderSamples => {
re_log::warn_once!(
"Late insertions of video frames within an established video stream is not supported, some video data has been ignored."
);
}
err => return Err(err),
}
}
}
Ok((video_descr, known_chunk_ranges))
}
fn timescale_for_timeline(
store: &re_entity_db::EntityDb,
timeline: TimelineName,
) -> Option<re_video::Timescale> {
let timeline_typ = store.timelines().get(&timeline).map(|t| t.typ());
match timeline_typ {
Some(TimeType::Sequence) | None => None, Some(TimeType::DurationNs | TimeType::TimestampNs) => Some(re_video::Timescale::NANOSECOND),
}
}
#[derive(Debug, Clone)]
struct ChunkSampleRange {
first_sample: re_video::SampleIndex,
last_sample: re_video::SampleIndex,
sample_count: usize,
}
impl ChunkSampleRange {
fn new(
first_sample: re_video::SampleIndex,
last_sample: re_video::SampleIndex,
sample_count: usize,
) -> Self {
debug_assert!(
sample_count <= last_sample - first_sample + 1,
"Counting sample indices twice? {sample_count} <= {}",
last_sample - first_sample + 1,
);
Self {
first_sample,
last_sample,
sample_count,
}
}
fn one(idx: re_video::SampleIndex) -> Self {
Self {
first_sample: idx,
last_sample: idx,
sample_count: 1,
}
}
fn add_sample(&mut self, idx: re_video::SampleIndex) {
self.first_sample = self.first_sample.min(idx);
self.last_sample = self.last_sample.max(idx);
self.sample_count += 1;
debug_assert!(
self.sample_count <= self.last_sample - self.first_sample + 1,
"Counting sample indices twice?",
);
}
fn idx_range(&self) -> std::ops::Range<re_video::SampleIndex> {
self.first_sample..self.last_sample + 1
}
}
impl re_byte_size::SizeBytes for ChunkSampleRange {
fn heap_size_bytes(&self) -> u64 {
0
}
}
fn read_samples_from_known_chunk(
timeline: TimelineName,
chunk: &re_chunk::Chunk,
known_range: &ChunkSampleRange,
load_range: &ChunkSampleRange,
video_descr: &mut re_video::VideoDataDescription,
) -> Result<(), VideoStreamProcessingError> {
re_tracing::profile_function!(format!("{} rows", chunk.num_rows()));
let re_video::VideoDataDescription {
codec,
samples,
keyframe_indices,
encoding_details,
..
} = video_descr;
let sample_component = VideoStream::descriptor_sample().component;
let Some(raw_array) = chunk.raw_component_array(sample_component) else {
return Ok(());
};
let (offsets, values) = re_arrow_util::blob_arrays_offsets_and_buffer(&raw_array).ok_or(
VideoStreamProcessingError::InvalidVideoSampleType(raw_array.data_type().clone()),
)?;
let lengths = offsets.lengths().collect::<Vec<_>>();
let split_idx = keyframe_indices
.binary_search(&load_range.first_sample)
.unwrap_or_else(|e| e);
let end_keyframes = keyframe_indices.drain(split_idx..).collect::<Vec<_>>();
let mut samples_iter = samples
.iter_index_range_clamped_mut(&load_range.idx_range())
.filter(|(_, c)| c.source_id() == chunk.id().as_tuid());
for (component_offset, (time, _row_id)) in chunk
.iter_component_offsets(sample_component)
.zip(chunk.iter_component_indices(timeline, sample_component))
.filter(|(component_offset, _)| component_offset.len > 0)
.skip(
known_range
.sample_count
.saturating_sub(load_range.sample_count),
)
.take(load_range.sample_count)
{
if component_offset.len != 1 {
re_log::warn_once!(
"Expected only a single VideoSample per row (it is a mono-component)"
);
continue;
}
let Some((sample_idx, sample)) = samples_iter.next() else {
re_log::error!("Failed to add all video stream samples from chunk");
break;
};
let byte_span = Span {
start: offsets[component_offset.start] as usize,
len: lengths[component_offset.start],
};
let sample_bytes = &values[byte_span.range()];
let Some(byte_span) = byte_span.try_cast::<u32>() else {
re_log::warn_once!("Video byte range does not fit in u32: {byte_span:?}");
continue;
};
let decode_timestamp = re_video::Time(time.as_i64());
let is_sync = is_sample_sync(codec, encoding_details, sample_bytes);
if is_sync {
keyframe_indices.push(sample_idx);
}
*sample = SampleMetadataState::Present(re_video::SampleMetadata {
is_sync,
frame_nr: sample_idx as u32,
decode_timestamp,
presentation_timestamp: decode_timestamp,
duration: None,
source_id: chunk.id().as_tuid(),
byte_span,
});
}
{
let _samples_iter = samples_iter;
re_log::debug_assert_eq!(
_samples_iter
.map(|(idx, s)| (idx, s.source_id()))
.collect::<Vec<_>>()
.as_slice(),
&[],
"Known video sample chunk '{:?}' didn't fill up all pre-allocated samples",
chunk.entity_path()
);
}
let n = end_keyframes.partition_point(|sample_idx| *sample_idx <= known_range.last_sample);
let sort_to = keyframe_indices.len() + n;
keyframe_indices.extend(end_keyframes);
if n > 0 {
keyframe_indices[split_idx..sort_to].sort_unstable();
}
update_sample_durations(known_range.idx_range(), samples)?;
Ok(())
}
fn is_sample_sync(
codec: &re_video::VideoCodec,
encoding_details: &mut Option<re_video::VideoEncodingDetails>,
sample_bytes: &[u8],
) -> bool {
match re_video::detect_gop_start(sample_bytes, *codec) {
Ok(re_video::GopStartDetection::StartOfGop(new_encoding_details)) => {
if encoding_details.as_ref() != Some(&new_encoding_details) {
if let Some(old_encoding_details) = encoding_details.as_ref() {
re_log::warn_once!(
"Detected change of video encoding properties (like size, bit depth, compression etc.) over time. \
This is not supported and may cause playback issues."
);
re_log::trace!(
"Previous encoding details: {:?}\n\nNew encoding details: {:?}",
old_encoding_details,
new_encoding_details
);
}
*encoding_details = Some(new_encoding_details);
}
true
}
Ok(re_video::GopStartDetection::NotStartOfGop) => false,
Err(err) => {
re_log::error_once!("Failed to detect GOP for video sample: {err}");
false
}
}
}
fn update_sample_durations(
known_range: std::ops::Range<re_video::SampleIndex>,
samples: &mut StableIndexDeque<SampleMetadataState>,
) -> Result<(), VideoStreamProcessingError> {
let mut start = known_range.start.at_least(samples.min_index());
while let Some(new_start) = start.checked_sub(1)
&& let Some(sample) = samples.get(new_start)
{
start = new_start;
if sample.is_loaded() {
break;
}
}
let mut end = known_range.end.at_most(samples.next_index());
while let Some(new_end) = end.checked_add(1)
&& let Some(sample) = samples.get(new_end - 1)
{
end = new_end;
if sample.is_loaded() {
break;
}
}
struct PresentSampleMeta {
idx: re_video::SampleIndex,
range_fully_loaded: bool,
pts: re_video::Time,
}
let mut last_present_sample = None::<PresentSampleMeta>;
for sample_idx in start..end {
let sample = match &samples[sample_idx] {
SampleMetadataState::Present(sample) => sample,
SampleMetadataState::Unloaded { .. } => {
if let Some(last_sample) = &mut last_present_sample {
last_sample.range_fully_loaded = false;
}
continue;
}
};
let current = sample.presentation_timestamp;
if let Some(last_sample_meta) = last_present_sample
&& let Some(last_sample) = samples[last_sample_meta.idx].sample_mut()
{
let duration = current - last_sample_meta.pts;
if duration.0 < 0 {
return Err(VideoStreamProcessingError::OutOfOrderSamples);
}
if last_sample_meta.range_fully_loaded {
last_sample.duration = Some(duration);
}
}
last_present_sample = Some(PresentSampleMeta {
idx: sample_idx,
pts: current,
range_fully_loaded: true,
});
}
Ok(())
}
fn read_samples_from_new_chunk(
timeline: TimelineName,
chunk: &re_chunk::Chunk,
video_descr: &mut re_video::VideoDataDescription,
known_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
) -> Result<(), VideoStreamProcessingError> {
re_tracing::profile_function!();
let re_video::VideoDataDescription {
codec,
samples,
keyframe_indices,
encoding_details,
..
} = video_descr;
let sample_component = VideoStream::descriptor_sample().component;
let Some(raw_array) = chunk.raw_component_array(sample_component) else {
return Ok(());
};
let mut previous_max_presentation_timestamp = samples
.iter()
.rev()
.find_map(|s| s.sample())
.map_or(re_video::Time::MIN, |s| s.presentation_timestamp);
let time_ranges = chunk.time_range_per_component();
match time_ranges
.get(&timeline)
.and_then(|time_range| time_range.get(&sample_component))
{
Some(time_range) => {
if time_range.min().as_i64() < previous_max_presentation_timestamp.0 {
return Err(VideoStreamProcessingError::OutOfOrderSamples);
}
}
None => {
return Ok(());
}
}
let (offsets, values) = re_arrow_util::blob_arrays_offsets_and_buffer(&raw_array).ok_or(
VideoStreamProcessingError::InvalidVideoSampleType(raw_array.data_type().clone()),
)?;
let lengths = offsets.lengths().collect::<Vec<_>>();
let sample_base_idx = samples.next_index();
let chunk_id = chunk.id();
samples.extend(
chunk
.iter_component_offsets(sample_component)
.zip(chunk.iter_component_indices(timeline, sample_component))
.enumerate()
.filter_map(move |(idx, (component_offset, (time, _row_id)))| {
if component_offset.len == 0 {
return None;
}
if component_offset.len != 1 {
re_log::warn_once!(
"Expected only a single VideoSample per row (it is a mono-component)"
);
return None;
}
let sample_idx = sample_base_idx + idx;
let byte_span = Span {
start: offsets[component_offset.start] as usize,
len: lengths[component_offset.start],
};
let sample_bytes = &values[byte_span.range()];
let Some(byte_span) = byte_span.try_cast::<u32>() else {
re_log::warn_once!("Video byte range does not fit in u32: {byte_span:?}");
return None;
};
let decode_timestamp = re_video::Time(time.as_i64());
debug_assert!(decode_timestamp >= previous_max_presentation_timestamp);
previous_max_presentation_timestamp = decode_timestamp;
let is_sync = is_sample_sync(codec, encoding_details, sample_bytes);
if is_sync {
keyframe_indices.push(sample_idx);
}
Some(SampleMetadataState::Present(re_video::SampleMetadata {
is_sync,
frame_nr: sample_idx as u32,
decode_timestamp,
presentation_timestamp: decode_timestamp,
duration: None,
source_id: chunk_id.as_tuid(),
byte_span,
}))
}),
);
if sample_base_idx == samples.next_index() {
return Ok(());
}
let last_sample = samples.next_index().saturating_sub(1);
let sample_count = samples.next_index() - sample_base_idx;
let chunk_range = ChunkSampleRange::new(sample_base_idx, last_sample, sample_count);
update_sample_durations(chunk_range.idx_range(), samples)?;
known_ranges.insert(chunk.id(), chunk_range);
Ok(())
}
impl Cache for VideoStreamCache {
fn begin_frame(&mut self) {
#[expect(clippy::iter_over_hash_type)]
for entry in self.0.values_mut() {
entry.used_this_frame.store(false, Ordering::Release);
let video_stream = entry.video_stream.write();
video_stream.video_renderer.begin_frame();
}
}
fn name(&self) -> &'static str {
"VideoStreamCache"
}
fn purge_memory(&mut self) {
self.0
.retain(|_, entry| entry.used_this_frame.load(Ordering::Acquire));
}
fn on_store_events(&mut self, events: &[&ChunkStoreEvent], entity_db: &EntityDb) {
re_tracing::profile_function!();
let sample_component = VideoStream::descriptor_sample().component;
for event in events {
if event.is_virtual_addition() {
self.0.clear();
continue;
}
let Some(delta_chunk) = event.delta_chunk() else {
continue;
};
if !delta_chunk
.components()
.contains_component(sample_component)
{
continue;
}
#[expect(clippy::iter_over_hash_type)] for col in delta_chunk.timelines().values() {
let timeline = col.timeline();
self.handle_store_event(
entity_db,
event,
timeline,
&VideoStreamKey {
entity_path: delta_chunk.entity_path().hash(),
timeline: *timeline.name(),
},
);
}
}
}
}
impl re_byte_size::MemUsageTreeCapture for VideoStreamCache {
fn capture_mem_usage_tree(&self) -> re_byte_size::MemUsageTree {
re_byte_size::MemUsageTree::Bytes(self.0.total_size_bytes())
}
}
struct ChunkSamples {
samples: VecDeque<SampleMetadataState>,
}
impl ChunkSamples {
fn from_physical(chunk: &re_chunk::Chunk, timeline: TimelineName) -> Option<Self> {
let mut samples: Vec<_> = chunk
.iter_component_timepoints(VideoStream::descriptor_sample().component)
.filter_map(|t| t.get(&timeline))
.map(|time| SampleMetadataState::Unloaded {
source_id: chunk.id().as_tuid(),
min_dts: re_video::Time::new(time.get()),
})
.collect();
if samples.is_empty() {
return None;
}
samples.sort_by_key(|s| s.decode_timestamp());
Some(Self {
samples: VecDeque::from(samples),
})
}
fn from_root(
id: ChunkId,
entry: &re_log_encoding::RrdManifestTemporalMapEntry,
) -> Option<Self> {
if entry.num_rows == 0 {
return None;
}
Some(Self {
samples: (0..entry.num_rows)
.map(|_| SampleMetadataState::Unloaded {
source_id: id.as_tuid(),
min_dts: re_video::Time::new(entry.time_range.min.as_i64()),
})
.collect(),
})
}
fn min_time(&self) -> re_video::Time {
let front = self.samples.front();
debug_assert!(front.is_some(), "`ChunkSamples` should never be empty");
front
.map(|s| s.decode_timestamp())
.unwrap_or(re_video::Time::MAX)
}
}
impl PartialEq for ChunkSamples {
fn eq(&self, other: &Self) -> bool {
self.samples.front().map(|s| s.decode_timestamp())
== other.samples.front().map(|s| s.decode_timestamp())
}
}
impl Eq for ChunkSamples {}
impl PartialOrd for ChunkSamples {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ChunkSamples {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use std::cmp::Ordering;
match (self.samples.front(), other.samples.front()) {
(Some(l), Some(r)) => l.decode_timestamp().cmp(&r.decode_timestamp()).reverse(),
(Some(_), None) => Ordering::Greater,
(None, Some(_)) => Ordering::Less,
(None, None) => Ordering::Equal,
}
}
}
#[derive(Default)]
struct ChunkSampleIterators {
iterators: BinaryHeap<ChunkSamples>,
}
impl ChunkSampleIterators {
fn add_chunk(&mut self, samples: ChunkSamples) {
self.iterators.push(samples);
}
fn next_if(&mut self, f: impl FnOnce(re_video::Time) -> bool) -> Option<SampleMetadataState> {
let mut next = self.iterators.peek_mut()?;
debug_assert!(
next.samples.front().is_some(),
"We make sure to never keep empty queues around here"
);
if !f(next.samples.front()?.decode_timestamp()) {
return None;
}
let sample = next.samples.pop_front()?;
if next.samples.is_empty() {
PeekMut::pop(next);
}
Some(sample)
}
fn handle_samples(
&mut self,
known_chunk_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
predicate: impl Fn(re_video::Time) -> bool,
mut handle_sample: impl FnMut(SampleMetadataState) -> usize,
) {
while let Some(sample) = self.next_if(&predicate) {
let id = sample.source_id();
let idx = handle_sample(sample);
known_chunk_ranges
.entry(ChunkId::from_tuid(id))
.and_modify(|range| {
range.add_sample(idx);
})
.or_insert_with(|| ChunkSampleRange::one(idx));
}
}
}
fn flatten_chunk_samples(
mut samples: Vec<ChunkSamples>,
known_chunk_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
mut handle_sample: impl FnMut(SampleMetadataState) -> usize,
) {
samples.sort_by_key(|c| c.min_time());
let mut loaded_samples_timepoint_iterators = ChunkSampleIterators::default();
for next_chunk in samples {
let next_timepoint = next_chunk.min_time();
loaded_samples_timepoint_iterators.handle_samples(
known_chunk_ranges,
|t| t <= next_timepoint,
&mut handle_sample,
);
loaded_samples_timepoint_iterators.add_chunk(next_chunk);
}
loaded_samples_timepoint_iterators.handle_samples(
known_chunk_ranges,
|_| true,
&mut handle_sample,
);
}
fn load_known_chunk_ranges(
data_descr: &mut re_video::VideoDataDescription,
store: &re_chunk_store::ChunkStore,
known_chunk_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
chunks_from_manifest: &BTreeMap<ChunkId, re_log_encoding::RrdManifestTemporalMapEntry>,
loaded_chunks: &[re_chunk::Chunk],
timeline: TimelineName,
) {
re_tracing::profile_function!();
let mut loaded_chunks_counts: ahash::HashMap<re_chunk::ChunkId, u64> = HashMap::default();
for c in loaded_chunks {
if let Some(cs) = ChunkSamples::from_physical(c, timeline) {
for id in store.find_root_chunks(&c.id()) {
*loaded_chunks_counts.entry(id).or_default() += cs.samples.len() as u64;
}
}
}
let chunk_timepoints: Vec<ChunkSamples> = chunks_from_manifest
.iter()
.filter_map(|(id, entry)| {
let loaded = loaded_chunks_counts.get(id).copied().unwrap_or(0);
let remaining = entry.num_rows.saturating_sub(loaded);
if remaining == 0 {
return None;
}
ChunkSamples::from_root(
*id,
&re_log_encoding::RrdManifestTemporalMapEntry {
num_rows: remaining,
..*entry
},
)
})
.chain(
loaded_chunks
.iter()
.filter_map(|c| ChunkSamples::from_physical(c, timeline)),
)
.collect();
flatten_chunk_samples(chunk_timepoints, known_chunk_ranges, |sample| {
let idx = data_descr.samples.next_index();
data_descr.samples.push_back(sample);
idx
});
}
#[must_use]
pub struct SampleReorderInfo {
pub change_start: re_video::SampleIndex,
pub change_end: re_video::SampleIndex,
pub size_delta: isize,
}
fn find_affected_sample_range(
video_descr: &re_video::VideoDataDescription,
conflicting_chunk: &re_chunk::Chunk,
conflicting_chunk_samples: &ChunkSamples,
known_range: Option<&ChunkSampleRange>,
) -> Result<std::ops::RangeInclusive<re_video::SampleIndex>, VideoStreamProcessingError> {
let affected_range_min = conflicting_chunk_samples
.samples
.front()
.expect("ChunkSamples::from_physical guarantees non-empty")
.decode_timestamp();
let affected_range_max = conflicting_chunk_samples
.samples
.back()
.expect("ChunkSamples::from_physical guarantees non-empty")
.decode_timestamp();
let start_keyframe = video_descr
.keyframe_indices
.iter()
.position(|idx| {
video_descr.samples.get(*idx).is_some_and(|s| {
s.source_id() == conflicting_chunk.id().as_tuid()
|| s.decode_timestamp() > affected_range_min
})
})
.and_then(|idx| idx.checked_sub(1));
let start = start_keyframe
.map(|idx| video_descr.keyframe_indices[idx])
.unwrap_or_else(|| video_descr.samples.min_index());
let range = known_range
.as_ref()
.map(|r| r.first_sample.min(start))
.unwrap_or(start)..video_descr.samples.next_index();
let mut start_sample = None;
let mut end_sample = None;
let mut last_timestamp = None;
for (idx, sample) in video_descr.samples.iter_index_range_clamped(&range) {
if sample.source_id() == conflicting_chunk.id().as_tuid() {
if start_sample.is_none() {
start_sample = Some(idx);
}
end_sample = Some(idx);
continue;
}
if sample.is_loaded() {
if let Some(last) = last_timestamp
&& last > sample.decode_timestamp()
{
debug_panic!(
"Found non monotonically increasing decode timestamp for loaded sample that shouldn't conflict"
);
}
last_timestamp = Some(sample.decode_timestamp());
}
if start_sample.is_none() && sample.decode_timestamp() >= affected_range_min {
start_sample = Some(idx);
}
if start_sample.is_some() {
if sample.is_loaded()
&& sample.decode_timestamp() > affected_range_max
&& known_range.as_ref().is_none_or(|r| r.last_sample < idx)
{
break;
}
end_sample = Some(idx);
}
}
let (Some(start_sample), Some(end_sample)) = (start_sample, end_sample.or(start_sample)) else {
debug_panic!("There wasn't any conflicting samples for passed chunk");
return Err(VideoStreamProcessingError::UnexpectedChunkChanges);
};
debug_assert!(
start_sample <= end_sample,
"Expected a valid inclusive range, got: {start_sample} <= {end_sample}"
);
Ok(start_sample..=end_sample)
}
fn handle_out_of_order_chunk(
video_descr: &mut re_video::VideoDataDescription,
known_ranges: &mut BTreeMap<ChunkId, ChunkSampleRange>,
timeline: TimelineName,
conflicting_chunk: &re_chunk::Chunk,
) -> Result<SampleReorderInfo, VideoStreamProcessingError> {
re_tracing::profile_function!();
let conflicting_chunk_samples = ChunkSamples::from_physical(conflicting_chunk, timeline)
.ok_or(VideoStreamProcessingError::NoVideoSamplesFound)?;
let sample_range = find_affected_sample_range(
video_descr,
conflicting_chunk,
&conflicting_chunk_samples,
known_ranges.remove(&conflicting_chunk.id()).as_ref(),
)?;
let mut chunk_samples = BTreeMap::<ChunkId, ChunkSamples>::default();
for (_, sample) in video_descr
.samples
.iter_index_range_clamped(&(*sample_range.start()..sample_range.end() + 1))
{
let id = ChunkId::from_tuid(sample.source_id());
if id == conflicting_chunk.id() {
continue;
}
chunk_samples
.entry(id)
.or_insert_with(|| ChunkSamples {
samples: VecDeque::new(),
})
.samples
.push_back(sample.clone());
}
let old_count = sample_range.end() - sample_range.start() + 1;
let mut old_known_ranges: BTreeMap<ChunkId, Option<ChunkSampleRange>> = chunk_samples
.keys()
.copied()
.map(|id| (id, known_ranges.remove(&id)))
.collect();
old_known_ranges.insert(conflicting_chunk.id(), None);
let mut new_samples = Vec::new();
flatten_chunk_samples(
std::iter::once(conflicting_chunk_samples)
.chain(chunk_samples.into_values())
.collect(),
known_ranges,
|sample| {
let idx = *sample_range.start() + new_samples.len();
new_samples.push(sample);
idx
},
);
let new_count = new_samples.len();
video_descr
.samples
.replace(*sample_range.start()..sample_range.end() + 1, new_samples);
let size_delta = if new_count < old_count {
-isize::try_from(old_count - new_count).unwrap_or(0)
} else {
isize::try_from(new_count - old_count).unwrap_or(0)
};
let shift_old_idx = |idx: usize| {
if idx >= *sample_range.end() {
idx.saturating_add_signed(size_delta)
} else if idx <= *sample_range.start() {
idx
} else {
debug_panic!(
"shift_old_idx should not be called with indices within the reordered range"
);
idx
}
};
if size_delta != 0 {
for (id, range) in known_ranges.iter_mut() {
if !old_known_ranges.contains_key(id) {
range.first_sample = shift_old_idx(range.first_sample);
range.last_sample = shift_old_idx(range.last_sample);
}
}
}
for (id, maybe_old_range) in old_known_ranges {
let Some(old_range) = maybe_old_range else {
continue;
};
known_ranges
.entry(id)
.and_modify(|range| {
range.sample_count = old_range.sample_count;
if old_range.first_sample < *sample_range.start() {
range.first_sample = shift_old_idx(old_range.first_sample);
}
if old_range.last_sample > *sample_range.end() {
range.last_sample = shift_old_idx(old_range.last_sample);
}
})
.or_insert_with(|| ChunkSampleRange {
first_sample: shift_old_idx(old_range.first_sample),
last_sample: shift_old_idx(old_range.last_sample),
sample_count: old_range.sample_count,
});
}
let first_in_range = video_descr
.keyframe_indices
.partition_point(|idx| *idx < *sample_range.start());
let first_after_range = video_descr
.keyframe_indices
.partition_point(|idx| *idx <= *sample_range.end());
{
let shifted: Vec<_> = video_descr
.keyframe_indices
.drain(first_in_range..)
.skip(first_after_range - first_in_range)
.map(|idx| idx.saturating_add_signed(size_delta))
.collect();
for idx in *sample_range.start()..*sample_range.start() + new_count {
if let Some(SampleMetadataState::Present(meta)) = video_descr.samples.get(idx)
&& meta.is_sync
{
video_descr.keyframe_indices.push(idx);
}
}
video_descr.keyframe_indices.extend(shifted);
}
let min_index = video_descr.samples.min_index();
for (idx, sample) in video_descr
.samples
.iter_indexed_mut()
.skip(sample_range.start().saturating_sub(min_index))
{
if let SampleMetadataState::Present(meta) = sample {
meta.frame_nr = idx as u32;
}
}
update_sample_durations(
*sample_range.start()..sample_range.start() + new_count,
&mut video_descr.samples,
)?;
let Some(known_range) = known_ranges.get(&conflicting_chunk.id()) else {
debug_panic!("We've processed the samples from this chunk, so should have a known range.");
return Err(VideoStreamProcessingError::UnexpectedChunkChanges);
};
read_samples_from_known_chunk(
timeline,
conflicting_chunk,
known_range,
known_range,
video_descr,
)?;
Ok(SampleReorderInfo {
change_start: *sample_range.start(),
change_end: *sample_range.end(),
size_delta,
})
}
fn adjust_keyframes_for_removed_samples(descr: &mut re_video::VideoDataDescription) {
let samples = &descr.samples;
descr.keyframe_indices.retain(|idx| {
samples
.get(*idx)
.is_some_and(|s| s.sample().is_some_and(|s| s.is_sync))
});
descr.keyframe_indices.dedup();
}
#[cfg(test)]
mod tests {
#![expect(clippy::cast_possible_wrap)]
use re_chunk::{ChunkBuilder, RowId, TimePoint, Timeline};
use re_chunk_store::ChunkStoreDiff;
use re_log_types::StoreId;
use re_sdk_types::archetypes::VideoStream;
use re_sdk_types::components::VideoCodec;
use re_video::{VideoDataDescription, VideoEncodingDetails};
use super::*;
const RAW_H264_DATA: &[u8] =
include_bytes!("../../../../../tests/assets/video/gif_as_h264_nobframes.h264");
const NUM_FRAMES: usize = 44;
fn iter_h264_frames(data: &[u8]) -> impl Iterator<Item = &[u8]> {
let mut pos = 0;
std::iter::from_fn(move || {
if pos >= data.len() {
return None;
}
let start = pos;
pos += 4;
while pos < data.len() {
if pos + 4 < data.len() && data[pos..pos + 4] == [0, 0, 0, 1] {
let nal_type = data[pos + 4] & 0x1F;
if nal_type == 1 || nal_type == 7 {
return Some(&data[start..pos]);
}
}
pos += 1;
}
Some(&data[start..])
})
}
fn validate_stream_from_test_data(
video_stream: &PlayableVideoStream,
num_frames_submitted: usize,
) {
let data_descr = video_stream.video_renderer.data_descr();
data_descr.sanity_check().unwrap();
let VideoDataDescription {
codec,
encoding_details,
timescale,
delivery_method,
keyframe_indices,
samples,
samples_statistics,
mp4_tracks,
} = data_descr.clone();
assert_eq!(codec, re_video::VideoCodec::H264);
assert_eq!(timescale, None); assert!(matches!(
delivery_method,
re_video::VideoDeliveryMethod::Stream { .. }
));
assert_eq!(samples_statistics, re_video::SamplesStatistics::NO_BFRAMES);
assert!(mp4_tracks.is_empty());
let VideoEncodingDetails {
codec_string,
coded_dimensions,
bit_depth,
chroma_subsampling,
stsd,
} = encoding_details.unwrap();
assert_eq!(codec_string, "avc1.64000A");
assert_eq!(coded_dimensions, [110, 82]);
assert_eq!(bit_depth, Some(8));
assert_eq!(
chroma_subsampling,
Some(re_video::ChromaSubsamplingModes::Yuv420)
);
assert_eq!(stsd, None);
assert_eq!(samples.num_elements(), num_frames_submitted);
assert_eq!(keyframe_indices[0], 0);
if num_frames_submitted > 10 {
assert_eq!(keyframe_indices[1], 10);
}
if num_frames_submitted > 20 {
assert_eq!(keyframe_indices[2], 20);
}
if num_frames_submitted > 30 {
assert_eq!(keyframe_indices[3], 30);
}
if num_frames_submitted > 40 {
assert_eq!(keyframe_indices[4], 40);
}
}
#[test]
fn video_stream_cache_from_single_chunk() {
let mut cache = VideoStreamCache::default();
let mut store = re_entity_db::EntityDb::new(StoreId::random(
re_log_types::StoreKind::Recording,
"test_app",
));
let timeline = Timeline::new_sequence("frame");
let mut chunk_builder = ChunkBuilder::new(ChunkId::new(), "vid".into());
for (i, frame_bytes) in iter_h264_frames(RAW_H264_DATA).enumerate() {
chunk_builder = chunk_builder.with_archetype(
RowId::new(),
TimePoint::from_iter([(timeline, i as i64)]),
&VideoStream::new(VideoCodec::H264).with_sample(frame_bytes),
);
}
store
.add_chunk(&Arc::new(chunk_builder.build().unwrap()))
.unwrap();
let video_stream_lock = cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
let video_stream = video_stream_lock.read();
validate_stream_from_test_data(&video_stream, NUM_FRAMES);
}
#[test]
fn video_stream_cache_from_chunk_per_frame() {
let mut cache = VideoStreamCache::default();
let enable_viewer_indexes = false;
let mut store = re_entity_db::EntityDb::with_store_config(
StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
enable_viewer_indexes,
re_chunk_store::ChunkStoreConfig::COMPACTION_DISABLED,
);
let timeline = Timeline::new_sequence("frame");
for (i, frame_bytes) in iter_h264_frames(RAW_H264_DATA).enumerate() {
let chunk_builder = ChunkBuilder::new(ChunkId::new(), "vid".into()).with_archetype(
RowId::new(),
TimePoint::from_iter([(timeline, i as i64)]),
&VideoStream::new(VideoCodec::H264).with_sample(frame_bytes),
);
store
.add_chunk(&Arc::new(chunk_builder.build().unwrap()))
.unwrap();
}
let video_stream_lock = cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
let video_stream = video_stream_lock.read();
validate_stream_from_test_data(&video_stream, NUM_FRAMES);
}
#[test]
fn video_stream_cache_from_chunk_per_frame_buildup_over_time() {
let timeline = Timeline::new_sequence("frame");
for compaction_enabled in [true, false] {
println!("compaction enabled: {compaction_enabled}");
let mut cache = VideoStreamCache::default();
let enable_viewer_indexes = true;
let mut store = re_entity_db::EntityDb::with_store_config(
StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
enable_viewer_indexes,
if compaction_enabled {
re_chunk_store::ChunkStoreConfig::DEFAULT
} else {
re_chunk_store::ChunkStoreConfig::COMPACTION_DISABLED
},
);
let mut frame_iter = iter_h264_frames(RAW_H264_DATA);
let chunk_builder = ChunkBuilder::new(ChunkId::new(), "vid".into()).with_archetype(
RowId::new(),
TimePoint::from_iter([(timeline, 0)]),
&VideoStream::new(VideoCodec::H264).with_sample(frame_iter.next().unwrap()),
);
store
.add_chunk(&Arc::new(chunk_builder.build().unwrap()))
.unwrap();
let video_stream = cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
validate_stream_from_test_data(&video_stream.read(), 1);
for (i, frame_bytes) in frame_iter.enumerate() {
let t = 1 + i as i64;
let timepoint = TimePoint::from_iter([(timeline, t)]);
let chunk_builder = ChunkBuilder::new(ChunkId::new(), "vid".into()).with_archetype(
RowId::new(),
timepoint,
&VideoStream::new(VideoCodec::H264).with_sample(frame_bytes),
);
let store_events = store
.add_chunk(&Arc::new(chunk_builder.build().unwrap()))
.unwrap();
let store_events_refs = store_events.iter().collect::<Vec<_>>();
cache.on_store_events(&store_events_refs, &store);
let video_stream = cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
validate_stream_from_test_data(&video_stream.read(), t as usize + 1);
}
}
}
#[test]
fn video_stream_cache_from_chunk_per_frame_with_gc() {
let mut cache = VideoStreamCache::default();
let enable_viewer_indexes = true;
let mut store = re_entity_db::EntityDb::with_store_config(
StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
enable_viewer_indexes,
re_chunk_store::ChunkStoreConfig::COMPACTION_DISABLED,
);
let timeline = Timeline::new_sequence("frame");
for (i, frame_bytes) in iter_h264_frames(RAW_H264_DATA).enumerate() {
let chunk_builder = ChunkBuilder::new(ChunkId::new(), "vid".into()).with_archetype(
RowId::new(),
TimePoint::from_iter([(timeline, i as i64)]),
&VideoStream::new(VideoCodec::H264).with_sample(frame_bytes),
);
store
.add_chunk(&Arc::new(chunk_builder.build().unwrap()))
.unwrap();
}
cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
let storage_engine = store.storage_engine();
let chunk_store = storage_engine.store();
cache.on_store_events(
&[&ChunkStoreEvent {
store_id: store.store_id().clone(),
store_generation: store.generation(),
event_id: 0, diff: ChunkStoreDiff::deletion(
chunk_store.iter_physical_chunks().next().unwrap().clone(),
),
}],
&store,
);
let video_stream_lock = cache
.entry(
&store,
&"vid".into(),
*timeline.name(),
DecodeSettings::default(),
)
.unwrap();
let video_stream = video_stream_lock.read();
let data_descr = video_stream.video_renderer.data_descr();
data_descr.sanity_check().unwrap();
assert_eq!(
data_descr.samples.iter().filter(|s| s.is_loaded()).count(),
NUM_FRAMES - 1
);
assert_eq!(data_descr.keyframe_indices.first(), Some(&10));
}
}