use std::io::{IsTerminal as _, Write as _};
use anyhow::Context as _;
use itertools::Either;
use re_byte_size::SizeBytes as _;
use re_chunk_store::{ChunkStoreConfig, CompactionOptions, IsStartOfGop, OptimizationProfile};
use re_entity_db::EntityDb;
use re_log_types::StoreId;
use re_sdk::StoreKind;
use crate::commands::read_rrd_streams_from_file_or_stdin;
#[derive(Debug, Clone, clap::Parser)]
pub struct MergeCommand {
path_to_input_rrds: Vec<String>,
#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: Option<String>,
#[clap(long = "continue-on-error", default_value_t = false)]
continue_on_error: bool,
}
impl MergeCommand {
pub fn run(&self) -> anyhow::Result<()> {
let Self {
path_to_input_rrds,
path_to_output_rrd,
continue_on_error,
} = self;
if path_to_output_rrd.is_none() {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);
}
let store_config = ChunkStoreConfig::ALL_DISABLED;
merge_and_compact(
*continue_on_error,
&store_config,
None, path_to_input_rrds,
path_to_output_rrd.as_ref(),
)
}
}
fn parse_size(s: &str) -> Result<u64, String> {
let bytes = re_format::parse_bytes(s).ok_or_else(|| {
format!(
"invalid size {s:?}; expected a value with a unit suffix, e.g. `2MiB`, `1GB`, `1024B`"
)
})?;
u64::try_from(bytes).map_err(|err| format!("size {s:?} must be non-negative: {err}"))
}
#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum ProfileArg {
Live,
ObjectStore,
}
impl ProfileArg {
fn to_profile(self) -> OptimizationProfile {
match self {
Self::Live => OptimizationProfile::LIVE,
Self::ObjectStore => OptimizationProfile::OBJECT_STORE,
}
}
}
#[derive(Debug, Clone, clap::Parser)]
pub struct OptimizeCommand {
path_to_input_rrds: Vec<String>,
#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: Option<String>,
#[arg(long = "profile", value_enum, default_value_t = ProfileArg::ObjectStore)]
profile: ProfileArg,
#[arg(long = "max-size", value_parser = parse_size)]
max_size: Option<u64>,
#[arg(long = "max-rows")]
max_rows: Option<u64>,
#[arg(long = "max-rows-if-unsorted")]
max_rows_if_unsorted: Option<u64>,
#[arg(long = "num-pass")]
num_extra_passes: Option<u32>,
#[clap(long = "continue-on-error", default_value_t = false)]
continue_on_error: bool,
#[clap(long = "no-rebatch-videos", default_value_t = false)]
no_rebatch_videos: bool,
#[arg(long = "split-size-ratio")]
split_size_ratio: Option<f64>,
}
impl OptimizeCommand {
pub fn run(&self) -> anyhow::Result<()> {
let Self {
path_to_input_rrds,
path_to_output_rrd,
profile,
max_size,
max_rows,
max_rows_if_unsorted,
num_extra_passes,
continue_on_error,
no_rebatch_videos,
split_size_ratio,
} = self;
if path_to_output_rrd.is_none() {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);
}
let profile = profile.to_profile();
let mut store_config = profile.to_chunk_store_config();
store_config = store_config.apply_env()?;
if let Some(max_size) = max_size {
store_config.chunk_max_bytes = *max_size;
}
if let Some(max_rows) = max_rows {
store_config.chunk_max_rows = *max_rows;
}
if let Some(max_rows_if_unsorted) = max_rows_if_unsorted {
store_config.chunk_max_rows_if_unsorted = *max_rows_if_unsorted;
}
store_config.enable_changelog = false;
let num_extra_passes = num_extra_passes.unwrap_or(profile.num_extra_passes);
let gop_batching = !*no_rebatch_videos && profile.gop_batching;
let split_size_ratio = split_size_ratio.or(profile.split_size_ratio);
let is_start_of_gop: IsStartOfGop = std::sync::Arc::new(|data, codec| {
re_video::is_start_of_gop(data, codec.into()).map_err(|err| anyhow::anyhow!(err))
});
let compaction_options = CompactionOptions {
config: store_config.clone(),
num_extra_passes: Some(num_extra_passes as usize),
is_start_of_gop: gop_batching.then_some(is_start_of_gop),
split_size_ratio,
};
let any_input_is_dir = path_to_input_rrds
.iter()
.any(|p| std::path::Path::new(p).is_dir());
if any_input_is_dir {
let output_root = path_to_output_rrd.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"directory inputs require an output path (`-o <dir>`); cannot mirror to stdout"
)
})?;
return optimize_dir_mirror(
*continue_on_error,
&store_config,
&compaction_options,
path_to_input_rrds,
output_root,
);
}
merge_and_compact(
*continue_on_error,
&store_config,
Some(&compaction_options),
path_to_input_rrds,
path_to_output_rrd.as_ref(),
)
}
}
fn optimize_dir_mirror(
continue_on_error: bool,
store_config: &ChunkStoreConfig,
compaction_options: &CompactionOptions,
inputs: &[String],
output_root: &str,
) -> anyhow::Result<()> {
let output_root = std::path::PathBuf::from(output_root);
if output_root.exists() && !output_root.is_dir() {
anyhow::bail!(
"output path {output_root:?} must be a directory when any input is a directory"
);
}
let mut pairs: Vec<(std::path::PathBuf, std::path::PathBuf)> = Vec::new();
for input in inputs {
let input_path = std::path::Path::new(input);
if input_path.is_dir() {
for entry in walkdir::WalkDir::new(input_path).follow_links(false) {
let entry = entry.with_context(|| format!("walking {input_path:?}"))?;
if !entry.file_type().is_file() {
continue;
}
if !is_rrd_like(entry.path()) {
continue;
}
let relative = entry
.path()
.strip_prefix(input_path)
.with_context(|| format!("strip_prefix({input_path:?}, {:?})", entry.path()))?;
pairs.push((entry.path().to_path_buf(), output_root.join(relative)));
}
} else if input_path.is_file() {
let file_name = input_path
.file_name()
.ok_or_else(|| anyhow::anyhow!("input path has no file name: {input_path:?}"))?;
pairs.push((input_path.to_path_buf(), output_root.join(file_name)));
} else {
anyhow::bail!("input path does not exist or is not a file/directory: {input_path:?}");
}
}
if pairs.is_empty() {
anyhow::bail!(
"no `.rrd`/`.rbl` files found under any of: {inputs:?}\n\
(directory mirror mode skips other extensions)"
);
}
re_log::info!(
num_files = pairs.len(),
output_root = %output_root.display(),
"optimizing files in directory mirror mode",
);
let total = pairs.len();
let done = std::sync::atomic::AtomicUsize::new(0);
use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _};
pairs
.par_iter()
.try_for_each(|(src, dst)| -> anyhow::Result<()> {
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating output dir {parent:?}"))?;
}
let idx = done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
re_log::info!(
"[{idx}/{total}] optimizing {} -> {}",
src.display(),
dst.display(),
);
let src_str = src.to_string_lossy().into_owned();
let dst_str = dst.to_string_lossy().into_owned();
merge_and_compact(
continue_on_error,
store_config,
Some(compaction_options),
std::slice::from_ref(&src_str),
Some(&dst_str),
)
})?;
Ok(())
}
fn is_rrd_like(path: &std::path::Path) -> bool {
matches!(
path.extension().and_then(|s| s.to_str()),
Some("rrd" | "rbl"),
)
}
#[derive(Debug, Clone, clap::Parser)]
pub struct CompactCommand {
#[arg(trailing_var_arg = true, allow_hyphen_values = true, num_args = 0..)]
_ignored: Vec<String>,
}
impl CompactCommand {
#[expect(clippy::unused_self)]
pub fn run(&self) -> anyhow::Result<()> {
anyhow::bail!(
"`rerun rrd compact` has been renamed to `rerun rrd optimize`. \
Please run `rerun rrd optimize --help` for usage."
)
}
}
fn merge_and_compact(
continue_on_error: bool,
store_config: &ChunkStoreConfig,
compaction_options: Option<&CompactionOptions>,
path_to_input_rrds: &[String],
path_to_output_rrd: Option<&String>,
) -> anyhow::Result<()> {
let file_size_to_string = |size: Option<u64>| {
size.map_or_else(
|| "<unknown>".to_owned(),
|size| re_format::format_bytes(size as _),
)
};
let now = std::time::Instant::now();
re_log::info!(
config = %store_config,
srcs = ?path_to_input_rrds,
"merge/compaction started"
);
let (rx, rx_size_bytes) = read_rrd_streams_from_file_or_stdin(path_to_input_rrds);
let mut entity_dbs: std::collections::HashMap<StoreId, EntityDb> = Default::default();
re_log::info!("processing input…");
let mut num_chunks_before = 0u64;
let mut last_checkpoint = std::time::Instant::now();
for (msg_nr, (_source, res)) in rx.iter().enumerate() {
let mut is_success = true;
match res {
Ok(msg) => {
num_chunks_before += matches!(msg, re_log_types::LogMsg::ArrowMsg(_, _)) as u64;
let db = entity_dbs.entry(msg.store_id().clone()).or_insert_with(|| {
let enable_viewer_indexes = false; re_entity_db::EntityDb::with_store_config(
msg.store_id().clone(),
enable_viewer_indexes,
store_config.clone(),
)
});
if let Err(err) = db.add_log_msg(&msg) {
re_log::error!(%err, "couldn't index corrupt chunk");
is_success = false;
}
}
Err(err) => {
re_log::error!(err = re_error::format(err));
is_success = false;
}
}
if !continue_on_error && !is_success {
anyhow::bail!(
"one or more IO and/or decoding failures in the input stream (check logs)"
)
}
let msg_count = msg_nr + 1;
let check_in_interval = 10_000;
if msg_count % check_in_interval == 0 {
let msg_per_second = check_in_interval as f64 / last_checkpoint.elapsed().as_secs_f64();
last_checkpoint = std::time::Instant::now();
re_log::info!(
"processed {msg_count} messages so far, current speed is {msg_per_second:.2} msg/s"
);
re_tracing::reexports::puffin::GlobalProfiler::lock().new_frame();
}
}
if let Some(compaction_options) = compaction_options {
let now = std::time::Instant::now();
let num_chunks_before = entity_dbs
.values()
.map(|db| db.storage_engine().store().num_physical_chunks() as u64)
.sum::<u64>();
for db in entity_dbs.values() {
#[expect(unsafe_code)]
let engine = unsafe { db.storage_engine_raw() };
let compacted = engine.read().store().compacted(compaction_options)?;
*engine.write().store() = compacted;
}
let num_chunks_after = entity_dbs
.values()
.map(|db| db.storage_engine().store().num_physical_chunks() as u64)
.sum::<u64>();
let num_chunks_reduction = format!(
"-{:3.3}%",
100.0 - num_chunks_after as f64 / (num_chunks_before as f64 + f64::EPSILON) * 100.0
);
re_log::info!(
num_chunks_before, num_chunks_after, num_chunks_reduction, time=?now.elapsed(),
"compaction completed",
);
}
log_chunk_size_stats(&entity_dbs, store_config, "post-compaction");
let mut rrd_out = if let Some(path) = path_to_output_rrd {
Either::Left(std::io::BufWriter::new(
std::fs::File::create(path).with_context(|| format!("{path:?}"))?,
))
} else {
Either::Right(std::io::BufWriter::new(std::io::stdout().lock()))
};
re_log::info!("preparing output…");
let messages_rbl = entity_dbs
.values()
.filter(|entity_db| entity_db.store_kind() == StoreKind::Blueprint)
.flat_map(|entity_db| entity_db.to_messages(None ));
let mut num_chunks_after = 0u64;
let messages_rrd = entity_dbs
.values()
.filter(|entity_db| entity_db.store_kind() == StoreKind::Recording)
.flat_map(|entity_db| entity_db.to_messages(None ))
.inspect(|msg| {
num_chunks_after += matches!(msg, Ok(re_log_types::LogMsg::ArrowMsg(_, _))) as u64;
});
let encoding_options = re_log_encoding::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
let version = entity_dbs
.values()
.next()
.and_then(|db| db.store_info())
.and_then(|info| info.store_version)
.unwrap_or(re_build_info::CrateVersion::LOCAL);
re_log::info!("encoding…");
let rrd_out_size = re_log_encoding::Encoder::encode_into(
version,
encoding_options,
messages_rbl.chain(messages_rrd),
&mut rrd_out,
)
.context("couldn't encode messages")?;
rrd_out.flush().context("couldn't flush output")?;
let rrds_in_size = rx_size_bytes.recv().ok().map(|(size, _footers)| size);
let num_chunks_reduction = format!(
"-{:3.3}%",
100.0 - num_chunks_after as f64 / (num_chunks_before as f64 + f64::EPSILON) * 100.0
);
let size_reduction = if let (Some(rrds_in_size), rrd_out_size) = (rrds_in_size, rrd_out_size) {
format!(
"-{:3.3}%",
100.0 - rrd_out_size as f64 / (rrds_in_size as f64 + f64::EPSILON) * 100.0
)
} else {
"N/A".to_owned()
};
re_log::info!(
srcs = ?path_to_input_rrds,
time = ?now.elapsed(),
"merge/compaction finished. Chunk count {} -> {} ({num_chunks_reduction}), size {} -> {} ({size_reduction})",
re_format::format_uint(num_chunks_before),
re_format::format_uint(num_chunks_after),
file_size_to_string(rrds_in_size),
file_size_to_string(Some(rrd_out_size)),
);
Ok(())
}
fn log_chunk_size_stats(
entity_dbs: &std::collections::HashMap<StoreId, EntityDb>,
store_config: &ChunkStoreConfig,
label: &str,
) {
let max_rows_limit = store_config.chunk_max_rows as usize;
let max_rows_if_unsorted_limit = store_config.chunk_max_rows_if_unsorted as usize;
let mut min_bytes = u64::MAX;
let mut max_bytes = 0u64;
let mut total_bytes = 0u64;
let mut min_rows = usize::MAX;
let mut max_rows_seen = 0usize;
let mut total_rows = 0u64;
let mut num_chunks = 0u64;
let mut num_unsorted = 0u64;
let mut num_unsorted_at_limit = 0u64;
let mut num_sorted_at_max_rows = 0u64;
let mut rest_num_chunks = 0u64;
let mut rest_total_bytes = 0u64;
let mut rest_total_rows = 0u64;
for db in entity_dbs.values() {
for chunk in db.storage_engine().store().iter_physical_chunks() {
let size = chunk.heap_size_bytes();
let rows = chunk.num_rows();
let is_sorted = chunk.is_time_sorted();
min_bytes = min_bytes.min(size);
max_bytes = max_bytes.max(size);
total_bytes += size;
min_rows = min_rows.min(rows);
max_rows_seen = max_rows_seen.max(rows);
total_rows += rows as u64;
if !is_sorted {
num_unsorted += 1;
}
num_chunks += 1;
if !is_sorted && rows == max_rows_if_unsorted_limit {
num_unsorted_at_limit += 1;
} else if is_sorted && rows == max_rows_limit {
num_sorted_at_max_rows += 1;
} else {
rest_num_chunks += 1;
rest_total_bytes += size;
rest_total_rows += rows as u64;
}
}
}
if num_chunks == 0 {
return;
}
let avg_bytes = total_bytes / num_chunks;
let avg_rows = total_rows / num_chunks;
let unsorted_pct = num_unsorted as f64 / num_chunks as f64 * 100.0;
let rest_avg_bytes_str = if rest_num_chunks == 0 {
"N/A".to_owned()
} else {
re_format::format_bytes((rest_total_bytes / rest_num_chunks) as _)
};
let rest_avg_rows_str = if rest_num_chunks == 0 {
"N/A".to_owned()
} else {
re_format::format_uint(rest_total_rows / rest_num_chunks)
};
re_log::info!(
num_chunks,
min = %re_format::format_bytes(min_bytes as _),
max = %re_format::format_bytes(max_bytes as _),
avg = %re_format::format_bytes(avg_bytes as _),
total = %re_format::format_bytes(total_bytes as _),
rows_min = min_rows,
rows_max = max_rows_seen,
rows_avg = avg_rows,
unsorted_chunks = format!("{num_unsorted}/{num_chunks} ({unsorted_pct:.1}%)"),
unsorted_at_limit = format!("{num_unsorted_at_limit} (rows == max_rows_if_unsorted = {max_rows_if_unsorted_limit})"),
sorted_at_max_rows = format!("{num_sorted_at_max_rows} (rows == max_rows = {max_rows_limit})"),
rest_num_chunks,
rest_avg_bytes = %rest_avg_bytes_str,
rest_avg_rows = %rest_avg_rows_str,
"{label} chunk size stats",
);
}