use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufWriter;
use clap::Subcommand;
use clap::builder::TypedValueParser as _;
use re_log_encoding::Encoder;
use re_log_types::{LogMsg, RecordingId, TimeType};
use re_mcap::{DecoderIdentifier, SelectedDecoders};
use re_sdk::external::re_data_loader::{McapLoader, supported_mcap_decoder_identifiers};
use re_sdk::{ApplicationId, DataLoader, DataLoaderSettings, LoadedData};
fn possible_timeline_types() -> impl clap::builder::TypedValueParser {
clap::builder::PossibleValuesParser::new(["timestamp", "duration"]).map(|value: String| {
match value.as_str() {
"timestamp" => TimeType::TimestampNs,
"duration" => TimeType::DurationNs,
_ => unreachable!("PossibleValuesParser already validated the input"),
}
})
}
fn possible_decoders() -> clap::builder::PossibleValuesParser {
static DECODER_IDS: std::sync::LazyLock<Vec<String>> = std::sync::LazyLock::new(|| {
supported_mcap_decoder_identifiers(true)
.into_iter()
.map(|identifier| identifier.to_string())
.collect()
});
clap::builder::PossibleValuesParser::new(
DECODER_IDS.iter().map(String::as_str).collect::<Vec<_>>(),
)
}
#[derive(Debug, Clone, clap::Parser)]
pub struct ConvertCommand {
path_to_input_mcap: String,
#[arg(short = 'o', long = "output", value_name = "dst.rrd")]
path_to_output_rrd: Option<String>,
#[clap(long = "application-id")]
application_id: Option<String>,
#[clap(short = 'd', long = "decoder", value_parser = possible_decoders())]
selected_decoders: Vec<String>,
#[clap(long = "disable-raw-fallback")]
disable_raw_fallback: bool,
#[clap(long = "recording-id")]
recording_id: Option<String>,
#[clap(long = "timestamp-offset-ns")]
timestamp_offset_ns: Option<i64>,
#[clap(long = "timeline-type", value_parser = possible_timeline_types(), default_value = "timestamp")]
timeline_type: TimeType,
}
impl ConvertCommand {
fn run(&self) -> anyhow::Result<()> {
let Self {
path_to_input_mcap,
path_to_output_rrd,
application_id,
recording_id,
selected_decoders,
disable_raw_fallback,
timestamp_offset_ns,
timeline_type,
} = self;
let start_time = std::time::Instant::now();
let application_id = application_id
.to_owned()
.map(ApplicationId::from)
.unwrap_or_else(|| ApplicationId::from(path_to_input_mcap.clone()));
let recording_id = recording_id
.to_owned()
.map(RecordingId::from)
.unwrap_or_else(RecordingId::random);
let selected_decoders = if selected_decoders.is_empty() {
SelectedDecoders::All
} else {
SelectedDecoders::Subset(
selected_decoders
.iter()
.cloned()
.map(DecoderIdentifier::from)
.collect(),
)
};
let loader: &dyn DataLoader =
&McapLoader::new(&selected_decoders).with_raw_fallback(!*disable_raw_fallback);
let (tx, rx) = crossbeam::channel::bounded::<LoadedData>(1024);
loader.load_from_path(
&DataLoaderSettings {
application_id: Some(application_id),
timestamp_offset_ns: *timestamp_offset_ns,
timeline_type: *timeline_type,
..DataLoaderSettings::recommended(recording_id)
},
path_to_input_mcap.into(),
tx,
)?;
if let Some(path) = path_to_output_rrd {
let writer = BufWriter::new(File::create(path)?);
process_mcap(writer, &rx)?;
} else {
let stdout = std::io::stdout();
let lock = stdout.lock();
let writer = BufWriter::new(lock);
process_mcap(writer, &rx)?;
}
re_log::info!("Processing took {}s", start_time.elapsed().as_secs());
Ok(())
}
}
#[derive(Debug, Clone, Subcommand)]
pub enum McapCommands {
Convert(ConvertCommand),
}
impl McapCommands {
pub fn run(&self) -> anyhow::Result<()> {
match self {
Self::Convert(cmd) => cmd.run(),
}
}
}
fn process_mcap<W: std::io::Write>(
writer: W,
receiver: &crossbeam::channel::Receiver<LoadedData>,
) -> anyhow::Result<()> {
let mut num_total_msgs = 0;
let mut topics = BTreeSet::new();
let options = re_log_encoding::rrd::EncodingOptions::PROTOBUF_COMPRESSED;
let version = re_build_info::CrateVersion::LOCAL;
let mut encoder = Encoder::new_eager(version, options, writer)?;
while let Ok(res) = receiver.recv() {
num_total_msgs += 1;
let log_msg = match res {
LoadedData::LogMsg(_, log_msg) => log_msg,
LoadedData::Chunk(_, store_id, chunk) => {
topics.insert(chunk.entity_path().clone());
let arrow_msg = chunk.to_arrow_msg()?;
LogMsg::ArrowMsg(store_id, arrow_msg)
}
LoadedData::ArrowMsg(_, store_id, arrow_msg) => LogMsg::ArrowMsg(store_id, arrow_msg),
};
encoder.append(&log_msg)?;
}
re_log::info_once!("Processed {num_total_msgs} messages.");
re_log::info_once!("Entities: {topics:#?}");
Ok(())
}