use std::collections::BTreeMap;
use std::sync::Arc;
use ahash::{HashMap, HashSet};
use arrow::array::{ArrayRef, BooleanArray, ListArray as ArrowListArray};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::datatypes::Field;
use itertools::izip;
use re_byte_size::SizeBytes as _;
use re_chunk::{Chunk, ChunkId, ChunkShared, EntityPath, TimeColumn, Timeline, TimelineName};
use re_format::format_bytes;
use re_log_types::TimeInt;
use re_sdk_types::archetypes::VideoStream;
use re_sdk_types::components::{VideoCodec, VideoSample};
use re_types_core::ComponentDescriptor;
use crate::{ChunkStore, ChunkStoreConfig, ChunkTrackingMode};
fn keyframe_descriptor() -> ComponentDescriptor {
ComponentDescriptor {
archetype: Some("KeyframeData".into()),
component: "KeyframeData:is_keyframe".into(),
component_type: None,
}
}
#[derive(Clone, Copy)]
struct SampleInfo {
chunk_id: ChunkId,
row_index: usize,
time: TimeInt,
is_start_of_gop: bool,
}
pub fn rebatch_video_chunks_to_gops(
store: &ChunkStore,
config: &ChunkStoreConfig,
is_start_of_gop: &dyn Fn(&[u8], VideoCodec) -> anyhow::Result<bool>,
) -> anyhow::Result<ChunkStore> {
re_tracing::profile_function!();
let sample_component = VideoStream::descriptor_sample().component;
let keyframe_component = keyframe_descriptor().component;
let mut sample_chunks_per_entity: HashMap<EntityPath, HashMap<ChunkId, ChunkShared>> =
Default::default();
let mut existing_keyframe_chunk_ids_per_entity: HashMap<EntityPath, HashSet<ChunkId>> =
Default::default();
for chunk in store.iter_physical_chunks() {
if chunk.is_static() {
continue;
}
if chunk.components().contains_component(sample_component) {
sample_chunks_per_entity
.entry(chunk.entity_path().clone())
.or_default()
.insert(chunk.id(), chunk.clone());
} else if chunk.components().contains_component(keyframe_component) {
existing_keyframe_chunk_ids_per_entity
.entry(chunk.entity_path().clone())
.or_default()
.insert(chunk.id());
}
}
if sample_chunks_per_entity.is_empty() {
return Ok(store.clone()); }
let mut replaced_chunk_ids: HashSet<ChunkId> = HashSet::default();
let mut new_chunks: Vec<Chunk> = Vec::new();
let mut keyframe_chunks: Vec<Chunk> = Vec::new();
re_log::info!(
num_video_entities = sample_chunks_per_entity.len(),
"found video entities for GoP realignment"
);
for (entity_path, sample_chunks) in &sample_chunks_per_entity {
match rebatch_video_entity(store, config, is_start_of_gop, entity_path, sample_chunks) {
Ok(EntityRebatch {
rebatched,
keyframes,
}) => {
replaced_chunk_ids.extend(sample_chunks.keys().copied());
if let Some(stale) = existing_keyframe_chunk_ids_per_entity.get(entity_path) {
replaced_chunk_ids.extend(stale.iter().copied());
}
new_chunks.extend(rebatched);
if let Some(kf) = keyframes {
keyframe_chunks.push(kf);
}
}
Err(err) => {
re_log::warn!(entity = %entity_path, %err, "failed to rebatch video entity, skipping");
}
}
}
if replaced_chunk_ids.is_empty() {
return Ok(store.clone());
}
let new_config = ChunkStoreConfig::ALL_DISABLED; let mut new_store = ChunkStore::new(store.id(), new_config);
for chunk in store.iter_physical_chunks() {
if !replaced_chunk_ids.contains(&chunk.id()) {
new_store.insert_chunk(chunk)?;
}
}
let mut overall_max_chunk_bytes: u64 = 0;
for chunk in new_chunks {
overall_max_chunk_bytes = overall_max_chunk_bytes.max(chunk.heap_size_bytes());
new_store.insert_chunk(&Arc::new(chunk))?;
}
for chunk in keyframe_chunks {
new_store.insert_chunk(&Arc::new(chunk))?;
}
const LARGE_CHUNK_WARN_THRESHOLD: u64 = 10 * 1024 * 1024;
if LARGE_CHUNK_WARN_THRESHOLD < overall_max_chunk_bytes {
re_log::warn_once!(
"GoP rebatching produced a video chunk of size {}. \
Consider re-encoding the source with shorter keyframe intervals, \
or turn off GoP-batching, or fix this code to allow splitting large GoPs-batches",
format_bytes(overall_max_chunk_bytes as _)
);
}
Ok(new_store)
}
struct EntityRebatch {
rebatched: Vec<Chunk>,
keyframes: Option<Chunk>,
}
fn rebatch_video_entity(
store: &ChunkStore,
config: &ChunkStoreConfig,
is_start_of_gop: &dyn Fn(&[u8], VideoCodec) -> anyhow::Result<bool>,
entity_path: &EntityPath,
sample_chunks: &HashMap<ChunkId, ChunkShared>,
) -> anyhow::Result<EntityRebatch> {
re_tracing::profile_function!();
for chunk in sample_chunks.values() {
let unsorted_timelines: Vec<_> = chunk
.timelines()
.iter()
.filter(|(_, tc)| !tc.is_sorted())
.map(|(name, _)| name)
.collect();
if !unsorted_timelines.is_empty() {
anyhow::bail!(
"chunk {} for entity '{entity_path}' has unsorted timelines: {:?} (compared to RowId). Video playback on these timelines may already be broken, and rebatching may make things worse",
chunk.id(),
unsorted_timelines
);
}
}
let timeline =
choose_timeline(sample_chunks).ok_or_else(|| anyhow::anyhow!("no timeline found"))?;
let timeline_name = *timeline.name();
let codec = extract_codec(store, entity_path, timeline_name)
.ok_or_else(|| anyhow::anyhow!("couldn't resolve video codec"))?;
let sample_index = build_sample_index(is_start_of_gop, sample_chunks, timeline_name, codec)?;
anyhow::ensure!(!sample_index.is_empty(), "no video samples found");
let keyframes = build_keyframe_chunk(entity_path, timeline, &sample_index)?;
let gop_groups = split_into_gop_groups(entity_path, &sample_index);
let gop_chunks: Vec<Chunk> = gop_groups
.iter()
.map(|group| chunk_from_gop(group, sample_chunks))
.collect::<anyhow::Result<_, _>>()?;
log_gop_stats(entity_path, &gop_chunks);
let merged = merge_chunks(config, gop_chunks)?;
log_entity_chunk_stats(entity_path, timeline_name, codec, &merged);
Ok(EntityRebatch {
rebatched: merged,
keyframes,
})
}
fn build_keyframe_chunk(
entity_path: &EntityPath,
timeline: Timeline,
sample_index: &[SampleInfo],
) -> anyhow::Result<Option<Chunk>> {
let times: Vec<i64> = sample_index
.iter()
.filter(|s| s.is_start_of_gop)
.map(|s| s.time.as_i64())
.collect();
let num_keyframes = times.len();
if num_keyframes == 0 {
return Ok(None);
}
let values: ArrayRef = Arc::new(BooleanArray::from(vec![true; num_keyframes]));
let offsets = OffsetBuffer::from_lengths(std::iter::repeat_n(1, num_keyframes));
let field = Field::new("item", values.data_type().clone(), true);
let list_array = ArrowListArray::try_new(field.into(), offsets, values, None)
.map_err(|err| anyhow::anyhow!("failed to build keyframe list array: {err}"))?;
let time_column = TimeColumn::new(Some(true), timeline, ScalarBuffer::from(times));
let chunk = Chunk::from_columns(
entity_path.clone(),
[time_column],
[(keyframe_descriptor(), list_array)],
)
.map_err(|err| anyhow::anyhow!("failed to build keyframe chunk: {err}"))?;
Ok(Some(chunk))
}
fn choose_timeline(sample_chunks: &HashMap<ChunkId, ChunkShared>) -> Option<Timeline> {
let mut counts: HashMap<Timeline, u64> = Default::default();
for chunk in sample_chunks.values() {
for tc in chunk.timelines().values() {
*counts.entry(*tc.timeline()).or_default() += chunk.num_rows() as u64;
}
}
if counts.is_empty() {
return None;
}
let timelines: Vec<_> = counts.keys().copied().collect();
Some(Timeline::pick_best_timeline(&timelines, |t| {
counts.get(t).copied().unwrap_or(0)
}))
}
fn extract_codec(
store: &ChunkStore,
entity_path: &EntityPath,
timeline_name: TimelineName,
) -> Option<VideoCodec> {
let codec_component = VideoStream::descriptor_codec().component;
let results = store.latest_at_relevant_chunks(
ChunkTrackingMode::PanicOnMissing,
&crate::LatestAtQuery::new(timeline_name, TimeInt::MAX),
entity_path,
codec_component,
);
results
.chunks
.iter()
.flat_map(|chunk| chunk.iter_component::<VideoCodec>(codec_component))
.find_map(|codec| codec.as_slice().first().copied())
}
fn build_sample_index(
is_start_of_gop: &dyn Fn(&[u8], VideoCodec) -> anyhow::Result<bool>,
sample_chunks: &HashMap<ChunkId, ChunkShared>,
timeline_name: TimelineName,
codec: VideoCodec,
) -> anyhow::Result<Vec<SampleInfo>> {
re_tracing::profile_function!();
let sample_component = VideoStream::descriptor_sample().component;
let mut sample_index = Vec::new();
for chunk in sample_chunks.values() {
if !chunk.timelines().contains_key(&timeline_name) {
anyhow::bail!(
"chunk {} has no values on timeline {timeline_name}",
chunk.id()
);
}
let chunk_id = chunk.id();
let row_id_to_index: HashMap<_, _> = chunk
.row_ids()
.enumerate()
.map(|(idx, rid)| (rid, idx))
.collect();
for ((time, row_id), sample) in izip!(
chunk.iter_component_indices(timeline_name, sample_component),
chunk.iter_component::<VideoSample>(sample_component)
) {
let Some(sample) = sample.as_slice().first() else {
continue;
};
let row_index = row_id_to_index[&row_id];
sample_index.push(SampleInfo {
chunk_id,
row_index,
time,
is_start_of_gop: is_start_of_gop(sample.0.inner().as_slice(), codec)?,
});
}
}
sample_index.sort_by_key(|sample| (sample.time, sample.chunk_id, sample.row_index));
Ok(sample_index)
}
fn split_into_gop_groups<'a>(
entity: &EntityPath,
sample_index: &'a [SampleInfo],
) -> Vec<&'a [SampleInfo]> {
re_tracing::profile_function!();
if sample_index.is_empty() {
return Vec::new();
}
let mut split_points: Vec<usize> = sample_index
.iter()
.enumerate()
.filter(|(_, s)| s.is_start_of_gop)
.map(|(i, _)| i)
.collect();
if split_points.first().copied() != Some(0) {
re_log::warn!(?entity, "first sample is not a keyframe");
split_points.insert(0, 0);
}
split_points
.windows(2)
.map(|w| &sample_index[w[0]..w[1]])
.chain(std::iter::once(
&sample_index[*split_points.last().unwrap_or(&0)..],
))
.filter(|group| !group.is_empty())
.collect()
}
fn chunk_from_gop(
group: &[SampleInfo],
chunks_by_id: &HashMap<ChunkId, ChunkShared>,
) -> anyhow::Result<Chunk> {
re_tracing::profile_function!();
let mut rows_per_chunk: BTreeMap<ChunkId, Vec<i32>> = BTreeMap::new();
let mut chunk_order: Vec<ChunkId> = Vec::new();
for sample in group {
let row_index = i32::try_from(sample.row_index)
.map_err(|_err| anyhow::anyhow!("row index {} exceeds i32::MAX", sample.row_index))?;
rows_per_chunk
.entry(sample.chunk_id)
.or_insert_with(|| {
chunk_order.push(sample.chunk_id);
Vec::new()
})
.push(row_index);
}
let mut result: Option<Chunk> = None;
for chunk_id in &chunk_order {
let source_chunk = &chunks_by_id[chunk_id];
let indices = &rows_per_chunk[chunk_id];
let indices_array = arrow::array::Int32Array::from(indices.clone());
let extracted = source_chunk.taken(&indices_array);
result = Some(match result {
None => extracted,
Some(prev) => Chunk::concat_and_sort(&prev, &extracted)?,
});
}
let mut chunk = result.ok_or_else(|| anyhow::anyhow!("GoP group is empty — this is a bug"))?;
chunk.sort_if_unsorted();
Ok(chunk)
}
fn merge_chunks(config: &ChunkStoreConfig, gop_chunks: Vec<Chunk>) -> anyhow::Result<Vec<Chunk>> {
re_tracing::profile_function!();
let chunk_max_bytes = config.chunk_max_bytes;
re_log::debug!(
num_gops = gop_chunks.len(),
chunk_max_bytes = %format_bytes(chunk_max_bytes as _),
"merging GoPs into chunks"
);
if chunk_max_bytes == 0 || gop_chunks.len() <= 1 {
re_log::debug!("skipping merge (max_bytes=0 or ≤1 GoP)");
return Ok(gop_chunks);
}
let mut merged: Vec<Chunk> = Vec::new();
let mut accumulator: Option<Chunk> = None;
let mut accumulator_bytes: u64 = 0;
for gop in gop_chunks {
let gop_bytes = gop.heap_size_bytes();
if let Some(acc) = accumulator.take() {
if accumulator_bytes + gop_bytes <= chunk_max_bytes {
let combined = Chunk::concat_and_sort(&acc, &gop)?;
accumulator_bytes += gop_bytes;
accumulator = Some(combined);
continue;
} else {
merged.push(acc);
}
}
accumulator_bytes = gop_bytes;
accumulator = Some(gop);
}
if let Some(acc) = accumulator {
merged.push(acc);
}
re_log::debug!(
num_gops_in = merged.iter().map(|c| c.num_rows()).sum::<usize>(),
num_chunks_out = merged.len(),
"merge complete"
);
Ok(merged)
}
fn log_gop_stats(entity_path: &EntityPath, gop_chunks: &[Chunk]) {
if gop_chunks.is_empty() {
return;
}
let num_gops = gop_chunks.len() as u64;
let gop_frames: Vec<u64> = gop_chunks.iter().map(|c| c.num_rows() as u64).collect();
let gop_bytes: Vec<u64> = gop_chunks.iter().map(|c| c.heap_size_bytes()).collect();
let min_frames = gop_frames.iter().copied().min().unwrap_or(0);
let max_frames = gop_frames.iter().copied().max().unwrap_or(0);
let avg_frames = gop_frames.iter().sum::<u64>() / num_gops;
let min_bytes = gop_bytes.iter().copied().min().unwrap_or(0);
let max_bytes = gop_bytes.iter().copied().max().unwrap_or(0);
let avg_bytes = gop_bytes.iter().sum::<u64>() / num_gops;
re_log::info!(
entity = %entity_path,
num_gops,
frames_per_gop = %format!("{min_frames}/{avg_frames}/{max_frames}"),
bytes_per_gop = %format!(
"{}/{}/{}",
format_bytes(min_bytes as _),
format_bytes(avg_bytes as _),
format_bytes(max_bytes as _),
),
"GoP stats (min/avg/max)"
);
}
fn log_entity_chunk_stats(
entity_path: &EntityPath,
timeline_name: TimelineName,
codec: VideoCodec,
chunks: &[Chunk],
) {
let num_chunks = chunks.len() as u64;
if num_chunks == 0 {
return;
}
let chunk_frames: Vec<u64> = chunks.iter().map(|c| c.num_rows() as u64).collect();
let chunk_bytes: Vec<u64> = chunks.iter().map(|c| c.heap_size_bytes()).collect();
let total_frames: u64 = chunk_frames.iter().sum();
let min_frames = chunk_frames.iter().copied().min().unwrap_or(0);
let max_frames = chunk_frames.iter().copied().max().unwrap_or(0);
let avg_frames = total_frames / num_chunks;
let total_bytes: u64 = chunk_bytes.iter().sum();
let min_bytes = chunk_bytes.iter().copied().min().unwrap_or(0);
let max_bytes = chunk_bytes.iter().copied().max().unwrap_or(0);
let avg_bytes = total_bytes / num_chunks;
re_log::info!(
entity = %entity_path,
timeline = %timeline_name,
codec = ?codec,
num_chunks,
total_frames,
frames_per_chunk = %format!("{min_frames}/{avg_frames}/{max_frames}"),
bytes_per_chunk = %format!(
"{}/{}/{}",
format_bytes(min_bytes as _),
format_bytes(avg_bytes as _),
format_bytes(max_bytes as _),
),
"rebatched video entity (min/avg/max)"
);
}