use std::fs::File;
use std::io::BufWriter;
use crossbeam::channel::Receiver;
use itertools::Itertools as _;
use re_chunk::ChunkId;
use re_log_encoding::{Encoder, RawRrdManifest};
use re_protos::common::v1alpha1::ApplicationId;
use re_protos::log_msg::v1alpha1::log_msg::Msg;
use re_protos::log_msg::v1alpha1::{ArrowMsg, BlueprintActivationCommand, SetStoreInfo, StoreInfo};
use crate::commands::read_raw_rrd_streams_from_file_or_stdin;
use crate::commands::stdio::InputSource;
#[derive(Debug, Clone, clap::Parser)]
pub struct RouteCommand {
path_to_input_rrds: Vec<String>,
#[arg(short = 'o', long = "output", value_name = "dst.rrd")]
path_to_output_rrd: Option<String>,
#[clap(long = "continue-on-error", default_value_t = false)]
continue_on_error: bool,
#[clap(long = "application-id")]
application_id: Option<String>,
#[clap(long = "recording-id")]
recording_id: Option<String>,
}
struct Rewrites {
application_id: Option<ApplicationId>,
recording_id: Option<String>,
}
impl RouteCommand {
pub fn run(&self) -> anyhow::Result<()> {
let Self {
path_to_input_rrds,
path_to_output_rrd,
continue_on_error,
application_id,
recording_id,
} = self;
let rewrites = Rewrites {
application_id: application_id
.as_ref()
.map(|id| ApplicationId { id: id.clone() }),
recording_id: recording_id.clone(),
};
let (rx, rx_footers) = read_raw_rrd_streams_from_file_or_stdin(path_to_input_rrds);
let drop_blueprint_activation_cmds =
path_to_input_rrds.len() > 1 && rewrites.recording_id.is_some();
if let Some(path) = path_to_output_rrd {
let writer = BufWriter::new(File::create(path)?);
process_messages(
&rewrites,
*continue_on_error,
writer,
&rx,
&rx_footers,
drop_blueprint_activation_cmds,
)?;
} else {
let stdout = std::io::stdout();
let lock = stdout.lock();
let writer = BufWriter::new(lock);
process_messages(
&rewrites,
*continue_on_error,
writer,
&rx,
&rx_footers,
drop_blueprint_activation_cmds,
)?;
}
Ok(())
}
}
#[expect(clippy::fn_params_excessive_bools)] #[expect(clippy::type_complexity)] fn process_messages<W: std::io::Write>(
rewrites: &Rewrites,
continue_on_error: bool,
writer: W,
rx: &Receiver<(InputSource, anyhow::Result<Msg>)>,
rx_footers: &Receiver<(u64, Vec<(InputSource, anyhow::Result<RawRrdManifest>)>)>,
drop_blueprint_activation_cmds: bool,
) -> anyhow::Result<()> {
re_log::info!("processing input…");
let mut num_total_msgs = 0;
let mut num_unexpected_msgs = 0;
let mut num_blueprint_activations = 0;
let options = re_log_encoding::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
let version = re_build_info::CrateVersion::LOCAL;
let mut encoder = Encoder::new_eager(version, options, writer)?;
let mut chunk_ids = Vec::new();
let mut byte_offsets_excluding_header = Vec::new();
let mut byte_sizes_excluding_header = Vec::new();
let mut byte_sizes_uncompressed = Vec::new();
while let Ok((_input, res)) = rx.recv() {
let mut is_success = true;
match res {
Ok(mut msg) => {
num_total_msgs += 1;
#[expect(deprecated)]
match &mut msg {
Msg::BlueprintActivationCommand(_) if drop_blueprint_activation_cmds => {
num_blueprint_activations += 1;
continue;
}
Msg::SetStoreInfo(SetStoreInfo {
info:
Some(StoreInfo {
store_id,
application_id: _, store_source: _,
store_version: _,
}),
row_id: _,
})
| Msg::BlueprintActivationCommand(BlueprintActivationCommand {
blueprint_id: store_id,
make_active: _,
make_default: _,
})
| Msg::ArrowMsg(ArrowMsg {
store_id,
chunk_id: _,
compression: _,
uncompressed_size: _,
encoding: _,
payload: _,
is_static: _,
}) => {
if let Some(target_store_id) = store_id {
if let Some(recording_id) = &rewrites.recording_id {
target_store_id.recording_id = recording_id.clone();
}
if let Some(application_id) = &rewrites.application_id {
target_store_id.application_id = Some(application_id.clone());
}
}
}
Msg::SetStoreInfo(SetStoreInfo {
row_id: _,
info: None,
}) => {
num_unexpected_msgs += 1;
is_success = false;
re_log::warn_once!(
"Encountered `SetStoreInfo` without `info` field: {:#?}",
msg
);
}
}
#[expect(unsafe_code)]
let (byte_span_excluding_header, byte_size_uncompressed) = unsafe {
encoder.append_transport(&msg)?
};
if let re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg) = msg {
chunk_ids.push(arrow_msg.chunk_id.expect("chunk must have a chunk ID"));
byte_offsets_excluding_header.push(byte_span_excluding_header.start);
byte_sizes_excluding_header.push(byte_span_excluding_header.len);
byte_sizes_uncompressed.push(byte_size_uncompressed);
}
}
Err(err) => {
re_log::error_once!("{}", re_error::format(err));
is_success = false;
}
}
if !continue_on_error && !is_success {
anyhow::bail!(
"one or more IO and/or decoding failures in the input stream (check logs)"
)
}
}
let mut rrd_footer = re_log_encoding::RrdFooter::default();
let mut i = 0;
for (_, manifests) in rx_footers {
for (_, res) in manifests {
let manifest = res?;
let RawRrdManifest {
store_id,
sorbet_schema,
sorbet_schema_sha256,
data,
} = manifest.clone();
let patched_store_id = re_log_types::StoreId::new(
store_id.kind(),
rewrites
.application_id
.clone()
.unwrap_or_else(|| store_id.application_id().clone().into()),
rewrites
.recording_id
.clone()
.unwrap_or_else(|| store_id.recording_id().to_string()),
);
let byte_offsets = &byte_offsets_excluding_header[i..i + data.num_rows()];
let byte_sizes = &byte_sizes_excluding_header[i..i + data.num_rows()];
let byte_sizes_uncompressed = &byte_sizes_uncompressed[i..i + data.num_rows()];
let chunk_ids = &chunk_ids[i..i + data.num_rows()];
for (chunk_id, expected_chunk_id) in
itertools::izip!(chunk_ids, manifest.col_chunk_id()?)
{
assert_eq!(
*chunk_id,
(*expected_chunk_id).into(),
"[i={i}] {expected_chunk_id} != {}: {:#?}",
ChunkId::from_tuid((*chunk_id).try_into().expect("must be valid TUID")),
manifest.col_chunk_id()?.take(5).collect_vec(),
);
}
i += data.num_rows();
use arrow::array::{ArrayRef, UInt64Array};
use std::sync::Arc;
let column_byte_offsets =
Arc::new(UInt64Array::from(byte_offsets.to_vec())) as ArrayRef;
let column_byte_sizes = Arc::new(UInt64Array::from(byte_sizes.to_vec())) as ArrayRef;
let column_byte_sizes_uncompressed =
Arc::new(UInt64Array::from(byte_sizes_uncompressed.to_vec())) as ArrayRef;
let (schema, mut columns, num_rows) = data.into_parts();
for (field, column) in itertools::izip!(schema.fields(), &mut columns) {
match field.name().as_str() {
RawRrdManifest::FIELD_CHUNK_BYTE_OFFSET => {
*column = column_byte_offsets.clone();
}
RawRrdManifest::FIELD_CHUNK_BYTE_SIZE => {
*column = column_byte_sizes.clone();
}
RawRrdManifest::FIELD_CHUNK_BYTE_SIZE_UNCOMPRESSED => {
*column = column_byte_sizes_uncompressed.clone();
}
_ => {}
}
}
let data = arrow::array::RecordBatch::try_new_with_options(
schema,
columns,
&arrow::array::RecordBatchOptions::new().with_row_count(Some(num_rows)),
)?;
match rrd_footer.manifests.entry(patched_store_id.clone()) {
std::collections::hash_map::Entry::Occupied(mut e) => {
let existing_manifest = e.get_mut();
assert_eq!(existing_manifest.store_id, patched_store_id);
existing_manifest.sorbet_schema = arrow::datatypes::Schema::try_merge([
existing_manifest.sorbet_schema.clone(),
sorbet_schema.clone(),
])?;
existing_manifest.data = arrow::compute::concat_batches(
&existing_manifest.data.schema(),
&[existing_manifest.data.clone(), data.clone()],
)?;
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(RawRrdManifest {
store_id: patched_store_id,
sorbet_schema,
sorbet_schema_sha256,
data,
});
}
}
}
}
for manifest in rrd_footer.manifests.values_mut() {
manifest.sorbet_schema_sha256 =
RawRrdManifest::compute_sorbet_schema_sha256(&manifest.sorbet_schema)?;
}
#[expect(unsafe_code)]
unsafe {
encoder.finish_with_custom_footer(&rrd_footer)?;
}
re_log::info_once!(
"Processed {num_total_msgs} messages, dropped {num_blueprint_activations} blueprint activations, and encountered {num_unexpected_msgs} unexpected messages."
);
Ok(())
}