mod metadata;
mod protobuf;
mod raw;
mod recording_info;
mod ros2;
mod ros2_reflection;
mod schema;
mod stats;
use std::collections::{BTreeMap, BTreeSet};
use re_chunk::external::nohash_hasher::IntMap;
use re_chunk::{Chunk, EntityPath};
use re_log_types::TimeType;
pub use self::metadata::McapMetadataDecoder;
pub use self::protobuf::McapProtobufDecoder;
pub use self::raw::McapRawDecoder;
pub use self::recording_info::McapRecordingInfoDecoder;
pub use self::ros2::McapRos2Decoder;
pub use self::ros2_reflection::McapRos2ReflectionDecoder;
pub use self::schema::McapSchemaDecoder;
pub use self::stats::McapStatisticDecoder;
use crate::Error;
use crate::parsers::{ChannelId, MessageParser, ParserContext};
use crate::util::collect_empty_channels;
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq)]
#[repr(transparent)]
pub struct DecoderIdentifier(String);
impl From<&'static str> for DecoderIdentifier {
fn from(value: &'static str) -> Self {
Self(value.to_owned())
}
}
impl From<String> for DecoderIdentifier {
fn from(value: String) -> Self {
Self(value)
}
}
impl std::fmt::Display for DecoderIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
pub trait Decoder {
fn identifier() -> DecoderIdentifier
where
Self: Sized;
fn process(
&mut self,
mcap_bytes: &[u8],
summary: &::mcap::Summary,
emit: &mut dyn FnMut(Chunk),
) -> Result<(), Error>;
}
pub trait MessageDecoder {
fn identifier() -> DecoderIdentifier
where
Self: Sized;
fn init(&mut self, _summary: &::mcap::Summary) -> Result<(), Error> {
Ok(())
}
fn supports_channel(&self, channel: &mcap::Channel<'_>) -> bool;
fn message_parser(
&self,
channel: &mcap::Channel<'_>,
num_rows: usize,
) -> Option<Box<dyn MessageParser>>;
}
type Parser = (ParserContext, Box<dyn MessageParser>);
struct McapChunkDecoder {
parsers: IntMap<ChannelId, Parser>,
time_type: TimeType,
}
impl McapChunkDecoder {
pub fn new(parsers: IntMap<ChannelId, Parser>, time_type: TimeType) -> Self {
Self { parsers, time_type }
}
pub fn decode_next(&mut self, msg: &::mcap::Message<'_>) -> Result<(), Error> {
re_tracing::profile_function!();
let channel = msg.channel.as_ref();
let channel_id = ChannelId(channel.id);
if let Some((ctx, parser)) = self.parsers.get_mut(&channel_id) {
parser.append(ctx, msg)?;
for timepoint in parser.get_log_and_publish_timepoints(msg, self.time_type)? {
ctx.add_timepoint(timepoint);
}
} else {
}
Ok(())
}
pub fn finish(self) -> impl Iterator<Item = Result<Chunk, Error>> {
self.parsers
.into_values()
.flat_map(|(ctx, parser)| match parser.finalize(ctx) {
Ok(chunks) => chunks.into_iter().map(Ok).collect::<Vec<_>>(),
Err(err) => vec![Err(Error::Other(err))],
})
}
}
#[derive(Clone, Debug)]
pub enum SelectedDecoders {
All,
Subset(BTreeSet<DecoderIdentifier>),
}
impl SelectedDecoders {
pub fn contains(&self, value: &DecoderIdentifier) -> bool {
match self {
Self::All => true,
Self::Subset(subset) => subset.contains(value),
}
}
}
#[derive(Clone, Debug, Default)]
pub enum Fallback {
#[default]
None,
Global(DecoderIdentifier),
}
pub struct MessageDecoderRunner {
inner: Box<dyn MessageDecoder>,
allowed: BTreeSet<ChannelId>,
}
impl MessageDecoderRunner {
fn new(inner: Box<dyn MessageDecoder>, allowed: BTreeSet<ChannelId>) -> Self {
Self { inner, allowed }
}
fn process(
&mut self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
time_type: TimeType,
emit: &mut dyn FnMut(Chunk),
) -> Result<(), Error> {
self.inner.init(summary)?;
for chunk in &summary.chunk_indexes {
let parsers = summary
.read_message_indexes(mcap_bytes, chunk)?
.iter()
.filter_map(|(channel, msg_offsets)| {
let channel_id = ChannelId::from(channel.id);
if !self.allowed.contains(&channel_id) {
return None;
}
let parser = self.inner.message_parser(channel, msg_offsets.len())?;
let entity_path = EntityPath::from(channel.topic.as_str());
let ctx = ParserContext::new(entity_path, channel.topic.clone(), time_type);
Some((channel_id, (ctx, parser)))
})
.collect::<IntMap<_, _>>();
let mut decoder = McapChunkDecoder::new(parsers, time_type);
for msg in summary.stream_chunk(mcap_bytes, chunk)? {
match msg {
Ok(message) => {
if let Err(err) = decoder.decode_next(&message) {
re_log::error_once!(
"Failed to decode message on channel {}: {err}",
message.channel.topic
);
}
}
Err(err) => re_log::error!("Failed to read message from MCAP file: {err}"),
}
}
for chunk in decoder.finish() {
match chunk {
Ok(c) => emit(c),
Err(err) => re_log::error!("Failed to decode chunk: {err}"),
}
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DecoderAssignment {
pub channel_id: ChannelId,
pub topic: String,
pub encoding: String,
pub schema_name: Option<String>,
pub decoder: DecoderIdentifier,
}
pub struct ExecutionPlan {
pub file_decoders: Vec<Box<dyn Decoder>>,
pub runners: Vec<MessageDecoderRunner>,
pub assignments: Vec<DecoderAssignment>,
}
impl ExecutionPlan {
pub fn run(
mut self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
time_type: TimeType,
emit: &mut dyn FnMut(Chunk),
) -> anyhow::Result<()> {
for mut decoder in self.file_decoders {
decoder.process(mcap_bytes, summary, emit)?;
}
for runner in &mut self.runners {
runner.process(mcap_bytes, summary, time_type, emit)?;
}
Ok(())
}
}
pub struct DecoderRegistry {
file_factories: BTreeMap<DecoderIdentifier, fn() -> Box<dyn Decoder>>,
msg_factories: BTreeMap<DecoderIdentifier, fn() -> Box<dyn MessageDecoder>>,
msg_order: Vec<DecoderIdentifier>,
fallback: Fallback,
}
impl DecoderRegistry {
pub fn empty() -> Self {
Self {
file_factories: Default::default(),
msg_factories: Default::default(),
msg_order: Vec::new(),
fallback: Fallback::None,
}
}
pub fn all_with_raw_fallback() -> Self {
Self::all_builtin(true)
}
pub fn all_without_raw_fallback() -> Self {
Self::all_builtin(false)
}
pub fn all_builtin(raw_fallback_enabled: bool) -> Self {
let mut registry = Self::empty()
.register_file_decoder::<McapRecordingInfoDecoder>()
.register_file_decoder::<McapMetadataDecoder>()
.register_file_decoder::<McapSchemaDecoder>()
.register_file_decoder::<McapStatisticDecoder>()
.register_message_decoder::<McapRos2Decoder>()
.register_message_decoder::<McapRos2ReflectionDecoder>()
.register_message_decoder::<McapProtobufDecoder>();
if raw_fallback_enabled {
registry = registry
.register_message_decoder::<McapRawDecoder>()
.with_global_fallback::<McapRawDecoder>();
} else {
registry = registry.register_message_decoder::<McapRawDecoder>();
}
registry
}
pub fn register_file_decoder<L: Decoder + Default + 'static>(mut self) -> Self {
let id = L::identifier();
if self
.file_factories
.insert(id.clone(), || Box::new(L::default()))
.is_some()
{
re_log::warn_once!("Inserted file decoder {} twice.", id);
}
self
}
pub fn register_message_decoder<M: MessageDecoder + Default + 'static>(mut self) -> Self {
let id = <M as MessageDecoder>::identifier();
if self
.msg_factories
.insert(id.clone(), || Box::new(M::default()))
.is_some()
{
re_log::warn_once!("Inserted message decoder {} twice.", id);
}
self.msg_order.push(id);
self
}
pub fn with_global_fallback<M: MessageDecoder + 'static>(mut self) -> Self {
self.fallback = Fallback::Global(<M as MessageDecoder>::identifier());
self
}
pub fn all_identifiers(&self) -> Vec<String> {
self.file_factories
.keys()
.chain(self.msg_factories.keys())
.map(|id| id.to_string())
.collect()
}
pub fn select(&self, selected: &SelectedDecoders) -> Self {
let file_factories = self
.file_factories
.iter()
.filter(|(id, _)| selected.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
let msg_factories = self
.msg_factories
.iter()
.filter(|(id, _)| selected.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
let msg_order = self
.msg_order
.iter()
.filter(|&id| selected.contains(id))
.cloned()
.collect();
let fallback = self.select_fallback(selected);
Self {
file_factories,
msg_factories,
msg_order,
fallback,
}
}
fn select_fallback(&self, selected: &SelectedDecoders) -> Fallback {
match &self.fallback {
Fallback::Global(id) if selected.contains(id) => Fallback::Global(id.clone()),
Fallback::Global(_) | Fallback::None => Fallback::None,
}
}
pub fn plan(
&self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
) -> anyhow::Result<ExecutionPlan> {
let file_decoders = self
.file_factories
.values()
.map(|f| f())
.collect::<Vec<_>>();
let empty_channels = collect_empty_channels(mcap_bytes, summary)?;
let mut msg_decoders: Vec<(DecoderIdentifier, Box<dyn MessageDecoder>)> = self
.msg_order
.iter()
.filter_map(|id| self.msg_factories.get(id).map(|f| (id.clone(), f())))
.collect();
for (_, l) in &mut msg_decoders {
l.init(summary)?;
}
let mut by_decoder: BTreeMap<DecoderIdentifier, BTreeSet<ChannelId>> = BTreeMap::new();
let mut assignments: Vec<DecoderAssignment> = Vec::new();
for channel_id in summary.channels.values() {
let channel_id = ChannelId::from(channel_id.id);
let channel = summary.channels[&channel_id.0].as_ref();
if empty_channels.contains(&channel_id) {
re_log::debug!(
"Skipping MCAP channel '{}' (id={}) because it contains no messages.",
channel.topic,
channel_id.0,
);
continue;
}
let mut chosen: Option<DecoderIdentifier> = None;
for (id, decoder) in &msg_decoders {
if decoder.supports_channel(channel) {
chosen = Some(id.clone());
break;
}
}
if chosen.is_none() {
if let Fallback::Global(id) = &self.fallback
&& self.msg_factories.contains_key(id)
{
chosen = Some(id.clone());
}
}
let schema_name = channel.schema.as_ref().map(|s| s.name.clone());
let schema_encoding = channel
.schema
.as_ref()
.map(|s| s.encoding.as_str())
.unwrap_or("Unknown");
if let Some(id) = chosen {
by_decoder.entry(id.clone()).or_default().insert(channel_id);
assignments.push(DecoderAssignment {
channel_id,
topic: channel.topic.clone(),
encoding: schema_encoding.to_owned(),
schema_name: channel.schema.as_ref().map(|s| s.name.clone()),
decoder: id,
});
} else {
re_log::debug!(
"No message decoder selected for topic '{}' (encoding='{}', schema='{:?}')",
channel.topic,
schema_encoding,
schema_name,
);
}
}
let mut runners = Vec::new();
for (decoder_id, allowed) in by_decoder {
if let Some(factory) = self.msg_factories.get(&decoder_id) {
let inner = factory();
runners.push(MessageDecoderRunner::new(inner, allowed));
}
}
Ok(ExecutionPlan {
file_decoders,
runners,
assignments,
})
}
}
#[cfg(test)]
mod tests {
use std::io;
use re_chunk::Chunk;
use re_log_types::TimeType;
use super::*;
#[test]
fn skips_channels_without_messages() {
let (summary, buffer, empty_channel_id, active_channel_id) = {
let cursor = io::Cursor::new(Vec::new());
let mut writer = mcap::Writer::new(cursor).expect("failed to create writer");
let empty_channel_id = writer
.add_channel(0, "empty_topic", "raw", &Default::default())
.expect("failed to add empty channel");
let active_channel_id = writer
.add_channel(0, "active_topic", "raw", &Default::default())
.expect("failed to add active channel");
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id: active_channel_id,
sequence: 0,
log_time: 1,
publish_time: 1,
},
&[1, 2, 3],
)
.expect("failed to write message");
let summary = writer.finish().expect("failed to finish writer");
let buffer = writer.into_inner().into_inner();
(summary, buffer, empty_channel_id, active_channel_id)
};
let plan = DecoderRegistry::empty()
.register_file_decoder::<McapSchemaDecoder>()
.register_message_decoder::<McapRawDecoder>()
.plan(&buffer, &summary)
.expect("failed to plan");
assert_eq!(plan.assignments.len(), 1);
assert_eq!(plan.assignments[0].channel_id, ChannelId(active_channel_id));
assert_ne!(plan.assignments[0].channel_id, ChannelId(empty_channel_id));
let mut chunks = Vec::<Chunk>::new();
plan.run(&buffer, &summary, TimeType::TimestampNs, &mut |chunk| {
chunks.push(chunk);
})
.expect("failed to run plan");
assert_eq!(chunks.len(), 2);
assert!(
chunks
.iter()
.all(|chunk| !chunk.entity_path().to_string().ends_with("empty_topic"))
);
assert!(
chunks
.iter()
.any(|chunk| chunk.entity_path().to_string().ends_with("active_topic"))
);
}
}