mod attachments;
mod metadata;
mod protobuf;
mod raw;
mod recording_info;
mod ros2;
mod ros2_reflection;
mod schema;
mod stats;
use std::collections::{BTreeMap, BTreeSet};
use itertools::Itertools as _;
#[cfg(not(target_arch = "wasm32"))]
use re_chunk::RowId;
use re_chunk::external::nohash_hasher::IntMap;
use re_chunk::{Chunk, EntityPath};
use re_log_types::TimeType;
pub use self::attachments::McapAttachmentsDecoder;
pub use self::metadata::McapMetadataDecoder;
pub use self::protobuf::McapProtobufDecoder;
pub use self::raw::McapRawDecoder;
pub use self::recording_info::McapRecordingInfoDecoder;
pub use self::ros2::McapRos2Decoder;
pub use self::ros2_reflection::McapRos2ReflectionDecoder;
pub use self::schema::McapSchemaDecoder;
pub use self::stats::McapStatisticDecoder;
use crate::Error;
use crate::parsers::{ChannelId, MessageParser, ParserContext};
use crate::util::collect_empty_channels;
const MCAP_PROPERTIES_ENTITY_PATH: &str = "__mcap_properties";
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq)]
#[repr(transparent)]
pub struct DecoderIdentifier(String);
impl From<&'static str> for DecoderIdentifier {
fn from(value: &'static str) -> Self {
Self(value.to_owned())
}
}
impl From<String> for DecoderIdentifier {
fn from(value: String) -> Self {
Self(value)
}
}
impl std::fmt::Display for DecoderIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
pub trait Decoder {
fn identifier() -> DecoderIdentifier
where
Self: Sized;
fn process(
&mut self,
ctx: &DecoderContext<'_>,
emit: &(dyn Fn(Chunk) + Send + Sync),
) -> Result<(), Error>;
}
pub struct DecoderContext<'a> {
mcap_bytes: &'a [u8],
summary: &'a ::mcap::Summary,
topic_filter: &'a TopicFilter,
empty_channels: BTreeSet<ChannelId>,
}
impl<'a> DecoderContext<'a> {
pub fn new(
mcap_bytes: &'a [u8],
summary: &'a ::mcap::Summary,
topic_filter: &'a TopicFilter,
empty_channels: BTreeSet<ChannelId>,
) -> Self {
Self {
mcap_bytes,
summary,
topic_filter,
empty_channels,
}
}
pub fn summary(&self) -> &'a ::mcap::Summary {
self.summary
}
pub fn relevant_channels(&self) -> impl Iterator<Item = &std::sync::Arc<mcap::Channel<'a>>> {
self.summary.channels.values().filter(|channel| {
!self.empty_channels.contains(&ChannelId(channel.id))
&& self.topic_filter.matches(&channel.topic)
})
}
pub fn metadata_records(
&self,
) -> impl Iterator<
Item = (
&'a mcap::records::MetadataIndex,
Result<mcap::records::Metadata, mcap::McapError>,
),
> + '_ {
self.summary
.metadata_indexes
.iter()
.map(|index| (index, mcap::read::metadata(self.mcap_bytes, index)))
}
pub fn attachment_records(
&self,
) -> impl Iterator<
Item = (
&'a mcap::records::AttachmentIndex,
Result<mcap::Attachment<'a>, mcap::McapError>,
),
> + '_ {
self.summary
.attachment_indexes
.iter()
.map(|index| (index, mcap::read::attachment(self.mcap_bytes, index)))
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct TestEmitter {
rx: std::sync::mpsc::Receiver<Chunk>,
emitter: Box<dyn Fn(Chunk) + Send + Sync>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Default for TestEmitter {
fn default() -> Self {
#[expect(clippy::disallowed_methods)] let (tx, rx) = std::sync::mpsc::channel();
let emitter = Box::new(move |chunk| {
let _res = tx.send(chunk);
});
Self { rx, emitter }
}
}
#[cfg(not(target_arch = "wasm32"))]
impl std::ops::Deref for TestEmitter {
type Target = dyn Fn(Chunk) + Send + Sync;
#[inline]
fn deref(&self) -> &Self::Target {
&self.emitter
}
}
#[cfg(not(target_arch = "wasm32"))]
impl TestEmitter {
pub fn finish(self) -> Vec<Chunk> {
let Self { rx, emitter } = self;
drop(emitter);
rx.iter().collect()
}
}
pub trait MessageDecoder: Send + Sync {
fn identifier() -> DecoderIdentifier
where
Self: Sized;
fn init(&mut self, _summary: &::mcap::Summary) -> Result<(), Error> {
Ok(())
}
fn supports_channel(&self, channel: &mcap::Channel<'_>) -> bool;
fn message_parser(
&self,
channel: &mcap::Channel<'_>,
num_rows: usize,
) -> Option<Box<dyn MessageParser>>;
}
type Parser = (ParserContext, Box<dyn MessageParser>);
struct McapChunkDecoder {
parsers: IntMap<ChannelId, Parser>,
time_type: TimeType,
}
impl McapChunkDecoder {
pub fn new(parsers: IntMap<ChannelId, Parser>, time_type: TimeType) -> Self {
Self { parsers, time_type }
}
pub fn decode_next(&mut self, msg: &::mcap::Message<'_>) -> Result<(), Error> {
re_tracing::profile_function!();
let channel = msg.channel.as_ref();
let channel_id = ChannelId(channel.id);
if let Some((ctx, parser)) = self.parsers.get_mut(&channel_id) {
parser.append(ctx, msg)?;
for timepoint in parser.get_log_and_publish_timepoints(msg, self.time_type)? {
ctx.add_timepoint(timepoint);
}
} else {
}
Ok(())
}
pub fn finish(self) -> impl Iterator<Item = Result<Chunk, Error>> {
self.parsers
.into_values()
.flat_map(|(ctx, parser)| match parser.finalize(ctx) {
Ok(chunks) => chunks.into_iter().map(Ok).collect::<Vec<_>>(),
Err(err) => vec![Err(Error::Other(err))],
})
}
}
#[derive(Clone, Debug)]
pub enum SelectedDecoders {
All,
Subset(BTreeSet<DecoderIdentifier>),
}
impl SelectedDecoders {
pub fn contains(&self, value: &DecoderIdentifier) -> bool {
match self {
Self::All => true,
Self::Subset(subset) => subset.contains(value),
}
}
}
#[derive(Default, Clone, Debug)]
pub struct TopicFilter {
include: Vec<regex_lite::Regex>,
exclude: Vec<regex_lite::Regex>,
}
impl TopicFilter {
pub fn with_include_patterns(mut self, include: &[String]) -> Result<Self, regex_lite::Error> {
self.include = include
.iter()
.map(|pattern| regex_lite::Regex::new(pattern))
.try_collect()?;
Ok(self)
}
pub fn with_exclude_patterns(mut self, exclude: &[String]) -> Result<Self, regex_lite::Error> {
self.exclude = exclude
.iter()
.map(|pattern| regex_lite::Regex::new(pattern))
.try_collect()?;
Ok(self)
}
pub fn matches(&self, topic: &str) -> bool {
let included = self.include.is_empty() || self.include.iter().any(|r| r.is_match(topic));
let excluded = self.exclude.iter().any(|r| r.is_match(topic));
included && !excluded
}
pub fn is_empty(&self) -> bool {
self.include.is_empty() && self.exclude.is_empty()
}
}
#[derive(Clone, Debug, Default)]
pub enum Fallback {
#[default]
None,
Global(DecoderIdentifier),
}
pub struct MessageDecoderRunner {
inner: Box<dyn MessageDecoder>,
allowed: BTreeSet<ChannelId>,
}
impl MessageDecoderRunner {
fn new(inner: Box<dyn MessageDecoder>, allowed: BTreeSet<ChannelId>) -> Self {
Self { inner, allowed }
}
fn process(
&mut self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
time_type: TimeType,
emit: &(dyn Fn(Chunk) + Send + Sync),
) -> Result<(), Error> {
self.inner.init(summary)?;
let allowed = &self.allowed;
let inner = &*self.inner;
let decode_chunk = |chunk: &::mcap::records::ChunkIndex| -> Result<Vec<Chunk>, Error> {
let parsers = summary
.read_message_indexes(mcap_bytes, chunk)?
.iter()
.filter_map(|(channel, msg_offsets)| {
let channel_id = ChannelId::from(channel.id);
if !allowed.contains(&channel_id) {
return None;
}
let parser = inner.message_parser(channel, msg_offsets.len())?;
let entity_path = EntityPath::from(channel.topic.as_str());
let ctx = ParserContext::new(entity_path, channel.topic.clone(), time_type);
Some((channel_id, (ctx, parser)))
})
.collect::<IntMap<_, _>>();
let mut decoder = McapChunkDecoder::new(parsers, time_type);
for msg in summary.stream_chunk(mcap_bytes, chunk)? {
match msg {
Ok(message) => {
if let Err(err) = decoder.decode_next(&message) {
re_log::error_once!(
"Failed to decode message on channel {}: {err}",
message.channel.topic
);
}
}
Err(err) => {
re_log::error!("Failed to read message from MCAP file: {err}");
}
}
}
let mut batch = Vec::new();
for mut chunk in decoder.finish() {
if let Ok(chunk) = &mut chunk {
chunk.sort_by_row_ids_if_needed();
chunk.warn_if_out_of_order();
}
match chunk {
Ok(c) => batch.push(c),
Err(err) => re_log::error!("Failed to decode chunk: {err}"),
}
}
Ok(batch)
};
#[cfg(target_arch = "wasm32")]
let workers = 1;
#[cfg(not(target_arch = "wasm32"))]
let workers = rayon::current_num_threads().max(1);
if workers <= 2 {
for chunk in &summary.chunk_indexes {
for c in decode_chunk(chunk)? {
emit(c);
}
}
return Ok(());
}
#[cfg(not(target_arch = "wasm32"))]
{
let max_in_flight = workers * 2;
let (batch_tx, batch_rx) =
crossbeam::channel::bounded::<(usize, Vec<Chunk>)>(max_in_flight);
let (producer_result, ()) = rayon::join(
|| {
let next_idx = std::sync::atomic::AtomicUsize::new(0);
let total = summary.chunk_indexes.len();
let chunk_indexes = &summary.chunk_indexes;
let result: Result<(), Error> = std::thread::scope(|scope| {
let handles: Vec<_> = (0..workers)
.map(|_| {
let batch_tx = batch_tx.clone();
let next_idx = &next_idx;
let decode_chunk = &decode_chunk;
scope.spawn(move || -> Result<(), Error> {
loop {
let idx = next_idx
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if idx >= total {
return Ok(());
}
let batch = decode_chunk(&chunk_indexes[idx])?;
re_quota_channel::send_crossbeam(&batch_tx, (idx, batch))
.map_err(|err| {
Error::Other(anyhow::format_err!(
"Failed to send batch: {err}"
))
})?;
}
})
})
.collect();
let mut first_err: Result<(), Error> = Ok(());
for handle in handles {
match handle.join().expect("decoder worker panicked") {
Err(err) if first_err.is_ok() => first_err = Err(err),
Ok(()) | Err(_) => {}
}
}
first_err
});
drop(batch_tx);
result
},
|| {
let emit_with_new_row_ids = |chunk: Chunk| {
let chunk = chunk.clone_as(chunk.id(), RowId::new());
emit(chunk);
};
let mut buffer: BTreeMap<usize, Vec<Chunk>> = BTreeMap::new();
let mut current_idx = 0usize;
for (idx, batch) in &batch_rx {
if idx == current_idx {
for chunk in batch {
emit_with_new_row_ids(chunk);
}
current_idx += 1;
while let Some(b) = buffer.remove(¤t_idx) {
for chunk in b {
emit_with_new_row_ids(chunk);
}
current_idx += 1;
}
} else {
buffer.insert(idx, batch);
}
}
re_log::debug_assert!(buffer.is_empty(), "All batches should've been consumed");
},
);
producer_result?;
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DecoderAssignment {
pub channel_id: ChannelId,
pub topic: String,
pub encoding: String,
pub schema_name: Option<String>,
pub decoder: DecoderIdentifier,
}
pub struct ExecutionPlan {
pub file_decoders: Vec<Box<dyn Decoder>>,
pub runners: Vec<MessageDecoderRunner>,
pub assignments: Vec<DecoderAssignment>,
pub topic_filter: TopicFilter,
}
impl ExecutionPlan {
pub fn run(
mut self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
time_type: TimeType,
emit: &(dyn Fn(Chunk) + Send + Sync),
) -> anyhow::Result<()> {
let empty_channels = collect_empty_channels(mcap_bytes, summary)?;
let ctx = DecoderContext::new(mcap_bytes, summary, &self.topic_filter, empty_channels);
for mut decoder in self.file_decoders {
decoder.process(&ctx, emit)?;
}
for runner in &mut self.runners {
runner.process(mcap_bytes, summary, time_type, emit)?;
}
Ok(())
}
}
pub struct DecoderRegistry {
file_factories: BTreeMap<DecoderIdentifier, fn() -> Box<dyn Decoder>>,
msg_factories: BTreeMap<DecoderIdentifier, fn() -> Box<dyn MessageDecoder>>,
msg_order: Vec<DecoderIdentifier>,
fallback: Fallback,
}
impl DecoderRegistry {
pub fn empty() -> Self {
Self {
file_factories: Default::default(),
msg_factories: Default::default(),
msg_order: Vec::new(),
fallback: Fallback::None,
}
}
pub fn all_with_raw_fallback() -> Self {
Self::all_builtin(true)
}
pub fn all_without_raw_fallback() -> Self {
Self::all_builtin(false)
}
pub fn all_builtin(raw_fallback_enabled: bool) -> Self {
let mut registry = Self::empty()
.register_file_decoder::<McapRecordingInfoDecoder>()
.register_file_decoder::<McapAttachmentsDecoder>()
.register_file_decoder::<McapMetadataDecoder>()
.register_file_decoder::<McapSchemaDecoder>()
.register_file_decoder::<McapStatisticDecoder>()
.register_message_decoder::<McapRos2Decoder>()
.register_message_decoder::<McapRos2ReflectionDecoder>()
.register_message_decoder::<McapProtobufDecoder>();
if raw_fallback_enabled {
registry = registry
.register_message_decoder::<McapRawDecoder>()
.with_global_fallback::<McapRawDecoder>();
} else {
registry = registry.register_message_decoder::<McapRawDecoder>();
}
registry
}
pub fn register_file_decoder<L: Decoder + Default + 'static>(mut self) -> Self {
let id = L::identifier();
if self
.file_factories
.insert(id.clone(), || Box::new(L::default()))
.is_some()
{
re_log::warn_once!("Inserted file decoder {} twice.", id);
}
self
}
pub fn register_message_decoder<M: MessageDecoder + Default + 'static>(mut self) -> Self {
let id = <M as MessageDecoder>::identifier();
if self
.msg_factories
.insert(id.clone(), || Box::new(M::default()))
.is_some()
{
re_log::warn_once!("Inserted message decoder {} twice.", id);
}
self.msg_order.push(id);
self
}
pub fn with_global_fallback<M: MessageDecoder + 'static>(mut self) -> Self {
self.fallback = Fallback::Global(<M as MessageDecoder>::identifier());
self
}
pub fn all_identifiers(&self) -> Vec<String> {
self.file_factories
.keys()
.chain(self.msg_factories.keys())
.map(|id| id.to_string())
.collect()
}
pub fn select(&self, selected: &SelectedDecoders) -> Self {
let file_factories = self
.file_factories
.iter()
.filter(|(id, _)| selected.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
let msg_factories = self
.msg_factories
.iter()
.filter(|(id, _)| selected.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
let msg_order = self
.msg_order
.iter()
.filter(|&id| selected.contains(id))
.cloned()
.collect();
let fallback = self.select_fallback(selected);
Self {
file_factories,
msg_factories,
msg_order,
fallback,
}
}
fn select_fallback(&self, selected: &SelectedDecoders) -> Fallback {
match &self.fallback {
Fallback::Global(id) if selected.contains(id) => Fallback::Global(id.clone()),
Fallback::Global(_) | Fallback::None => Fallback::None,
}
}
pub fn plan(
&self,
mcap_bytes: &[u8],
summary: &mcap::Summary,
topic_filter: &TopicFilter,
) -> anyhow::Result<ExecutionPlan> {
let file_decoders = self
.file_factories
.values()
.map(|f| f())
.collect::<Vec<_>>();
let empty_channels = collect_empty_channels(mcap_bytes, summary)?;
let mut msg_decoders: Vec<(DecoderIdentifier, Box<dyn MessageDecoder>)> = self
.msg_order
.iter()
.filter_map(|id| self.msg_factories.get(id).map(|f| (id.clone(), f())))
.collect();
for (_, l) in &mut msg_decoders {
l.init(summary)?;
}
let mut by_decoder: BTreeMap<DecoderIdentifier, BTreeSet<ChannelId>> = BTreeMap::new();
let mut assignments: Vec<DecoderAssignment> = Vec::new();
for channel_id in summary.channels.values() {
let channel_id = ChannelId::from(channel_id.id);
let channel = summary.channels[&channel_id.0].as_ref();
if empty_channels.contains(&channel_id) {
re_log::debug!(
"Skipping MCAP channel '{}' (id={}) because it contains no messages.",
channel.topic,
channel_id.0,
);
continue;
}
if !topic_filter.matches(&channel.topic) {
re_log::debug!(
"Skipping MCAP channel '{}' because it does not match the topic filter.",
channel.topic,
);
continue;
}
if channel.message_encoding.trim().is_empty() {
re_log::warn_once!(
"MCAP channel '{}' does not specify a message encoding.",
channel.topic,
);
}
if channel.schema.is_none() {
re_log::warn!(
"MCAP channel '{}' does not specify a schema.",
channel.topic,
);
}
let mut chosen: Option<DecoderIdentifier> = None;
for (id, decoder) in &msg_decoders {
if decoder.supports_channel(channel) {
chosen = Some(id.clone());
break;
}
}
if chosen.is_none() {
if let Fallback::Global(id) = &self.fallback
&& self.msg_factories.contains_key(id)
{
chosen = Some(id.clone());
}
}
let schema_name = channel.schema.as_ref().map(|s| s.name.clone());
let schema_encoding = channel
.schema
.as_ref()
.map(|s| s.encoding.as_str())
.unwrap_or("Unknown");
if let Some(id) = chosen {
by_decoder.entry(id.clone()).or_default().insert(channel_id);
assignments.push(DecoderAssignment {
channel_id,
topic: channel.topic.clone(),
encoding: schema_encoding.to_owned(),
schema_name: channel.schema.as_ref().map(|s| s.name.clone()),
decoder: id,
});
} else {
re_log::debug!(
"No message decoder selected for topic '{}' (encoding='{}', schema='{:?}')",
channel.topic,
schema_encoding,
schema_name,
);
}
}
let mut runners = Vec::new();
for (decoder_id, allowed) in by_decoder {
if let Some(factory) = self.msg_factories.get(&decoder_id) {
let inner = factory();
runners.push(MessageDecoderRunner::new(inner, allowed));
}
}
Ok(ExecutionPlan {
file_decoders,
runners,
assignments,
topic_filter: topic_filter.clone(),
})
}
}
#[cfg(test)]
mod tests {
use std::io;
use re_log_types::TimeType;
use re_sdk_types::archetypes::McapMessage;
use super::*;
#[test]
fn skips_channels_without_messages() {
let (summary, buffer, empty_channel_id, active_channel_id) = {
let cursor = io::Cursor::new(Vec::new());
let mut writer = mcap::Writer::new(cursor).expect("failed to create writer");
let empty_channel_id = writer
.add_channel(0, "empty_topic", "raw", &Default::default())
.expect("failed to add empty channel");
let active_channel_id = writer
.add_channel(0, "active_topic", "raw", &Default::default())
.expect("failed to add active channel");
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id: active_channel_id,
sequence: 0,
log_time: 1,
publish_time: 1,
},
&[1, 2, 3],
)
.expect("failed to write message");
let summary = writer.finish().expect("failed to finish writer");
let buffer = writer.into_inner().into_inner();
(summary, buffer, empty_channel_id, active_channel_id)
};
let plan = DecoderRegistry::empty()
.register_file_decoder::<McapSchemaDecoder>()
.register_message_decoder::<McapRawDecoder>()
.plan(&buffer, &summary, &TopicFilter::default())
.expect("failed to plan");
assert_eq!(plan.assignments.len(), 1);
assert_eq!(plan.assignments[0].channel_id, ChannelId(active_channel_id));
assert_ne!(plan.assignments[0].channel_id, ChannelId(empty_channel_id));
let emitter = TestEmitter::default();
plan.run(&buffer, &summary, TimeType::TimestampNs, &*emitter)
.expect("failed to run plan");
let chunks = emitter.finish();
assert_eq!(chunks.len(), 2);
assert!(
chunks
.iter()
.all(|chunk| !chunk.entity_path().to_string().ends_with("empty_topic"))
);
assert!(
chunks
.iter()
.any(|chunk| chunk.entity_path().to_string().ends_with("active_topic"))
);
}
fn ros2_summary_with_message_encoding(
schema_name: &str,
topic: &str,
message_encoding: &str,
payload: &[u8],
) -> (mcap::Summary, Vec<u8>) {
let cursor = io::Cursor::new(Vec::new());
let mut writer = mcap::Writer::new(cursor).expect("failed to create writer");
let schema_id = writer
.add_schema(schema_name, "ros2msg", b"string data")
.expect("failed to add schema");
let channel_id = writer
.add_channel(schema_id, topic, message_encoding, &Default::default())
.expect("failed to add channel");
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: 0,
log_time: 1,
publish_time: 1,
},
payload,
)
.expect("failed to write message");
let summary = writer.finish().expect("failed to finish writer");
let buffer = writer.into_inner().into_inner();
(summary, buffer)
}
#[test]
fn non_cdr_ros2msg_channel_is_forwarded_as_raw_blob() {
let (summary, buffer) = ros2_summary_with_message_encoding(
"custom_msgs/msg/Foo",
"non_cdr_topic",
"json",
br#"{"data":"hello"}"#,
);
let plan = DecoderRegistry::all_with_raw_fallback()
.plan(&buffer, &summary, &TopicFilter::default())
.expect("failed to plan");
let assignment = plan
.assignments
.iter()
.find(|assignment| assignment.topic == "non_cdr_topic")
.expect("missing assignment");
assert_eq!(assignment.decoder.to_string(), "raw");
let test_emitter = TestEmitter::default();
plan.run(&buffer, &summary, TimeType::TimestampNs, &*test_emitter)
.expect("failed to run plan");
let chunks = test_emitter.finish();
assert!(chunks.iter().any(|chunk| {
chunk.entity_path().to_string().ends_with("non_cdr_topic")
&& chunk
.component_descriptors()
.any(|descr| descr.component == McapMessage::descriptor_data().component)
}));
}
#[test]
fn schema_less_channel_is_forwarded_as_raw_blob() {
let (summary, buffer) = {
let cursor = io::Cursor::new(Vec::new());
let mut writer = mcap::Writer::new(cursor).expect("failed to create writer");
let channel_id = writer
.add_channel(0, "schema_less_topic", "raw", &Default::default())
.expect("failed to add channel");
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: 0,
log_time: 1,
publish_time: 2,
},
&[1, 2, 3],
)
.expect("failed to write message");
let summary = writer.finish().expect("failed to finish writer");
let buffer = writer.into_inner().into_inner();
(summary, buffer)
};
let plan = DecoderRegistry::all_with_raw_fallback()
.plan(&buffer, &summary, &TopicFilter::default())
.expect("failed to plan");
let assignment = plan
.assignments
.iter()
.find(|assignment| assignment.topic == "schema_less_topic")
.expect("missing assignment");
assert_eq!(assignment.schema_name, None);
assert_eq!(assignment.decoder.to_string(), "raw");
let test_emitter = TestEmitter::default();
plan.run(&buffer, &summary, TimeType::TimestampNs, &*test_emitter)
.expect("failed to run plan");
let chunks = test_emitter.finish();
assert!(chunks.iter().any(|chunk| {
chunk
.entity_path()
.to_string()
.ends_with("schema_less_topic")
&& chunk.num_rows() == 1
&& chunk
.component_descriptors()
.any(|descr| descr.component == McapMessage::descriptor_data().component)
&& chunk
.timelines()
.values()
.any(|timeline| timeline.name() == "message_log_time")
&& chunk
.timelines()
.values()
.any(|timeline| timeline.name() == "message_publish_time")
}));
}
#[test]
fn semantic_ros2_decoder_does_not_claim_non_cdr_channels() {
let (summary, buffer) = ros2_summary_with_message_encoding(
"std_msgs/msg/String",
"non_cdr_string_topic",
"json",
br#"{"data":"hello"}"#,
);
let plan = DecoderRegistry::all_with_raw_fallback()
.plan(&buffer, &summary, &TopicFilter::default())
.expect("failed to plan");
let assignment = plan
.assignments
.iter()
.find(|assignment| assignment.topic == "non_cdr_string_topic")
.expect("missing assignment");
assert_eq!(assignment.decoder.to_string(), "raw");
}
#[test]
fn topic_filter_matches() {
let filter = TopicFilter::default();
assert!(filter.is_empty());
assert!(filter.matches("/anything"));
assert!(filter.matches("/foo/bar"));
let filter = TopicFilter {
include: vec![regex_lite::Regex::new(r"^/camera/").unwrap()],
exclude: vec![],
};
assert!(!filter.is_empty());
assert!(filter.matches("/camera/rgb"));
assert!(filter.matches("/camera/depth"));
assert!(!filter.matches("/imu"));
let filter = TopicFilter {
include: vec![],
exclude: vec![regex_lite::Regex::new(r"^/diagnostics").unwrap()],
};
assert!(filter.matches("/camera/rgb"));
assert!(!filter.matches("/diagnostics/agg"));
let filter = TopicFilter {
include: vec![regex_lite::Regex::new(r"^/camera/").unwrap()],
exclude: vec![regex_lite::Regex::new(r"depth$").unwrap()],
};
assert!(filter.matches("/camera/rgb"));
assert!(!filter.matches("/camera/depth"));
assert!(!filter.matches("/imu"));
let filter = TopicFilter {
include: vec![
regex_lite::Regex::new(r"^/camera/").unwrap(),
regex_lite::Regex::new(r"^/imu$").unwrap(),
],
exclude: vec![],
};
assert!(filter.matches("/camera/rgb"));
assert!(filter.matches("/imu"));
assert!(!filter.matches("/lidar"));
}
#[test]
fn filter_skips_unselected_topics() {
let (summary, buffer) = {
let cursor = io::Cursor::new(Vec::new());
let mut writer = mcap::Writer::new(cursor).expect("failed to create writer");
let camera_rgb = writer
.add_channel(0, "/camera/rgb", "raw", &Default::default())
.expect("failed to add channel");
let camera_depth = writer
.add_channel(0, "/camera/depth", "raw", &Default::default())
.expect("failed to add channel");
let imu = writer
.add_channel(0, "/imu", "raw", &Default::default())
.expect("failed to add channel");
for channel_id in [camera_rgb, camera_depth, imu] {
writer
.write_to_known_channel(
&mcap::records::MessageHeader {
channel_id,
sequence: 0,
log_time: 1,
publish_time: 1,
},
&[1, 2, 3],
)
.expect("failed to write message");
}
let summary = writer.finish().expect("failed to finish writer");
let buffer = writer.into_inner().into_inner();
(summary, buffer)
};
let filter = TopicFilter {
include: vec![regex_lite::Regex::new(r"^/camera/").unwrap()],
exclude: vec![],
};
let plan = DecoderRegistry::empty()
.register_message_decoder::<McapRawDecoder>()
.plan(&buffer, &summary, &filter)
.expect("failed to plan");
assert_eq!(plan.assignments.len(), 2);
let topics: BTreeSet<_> = plan.assignments.iter().map(|a| a.topic.as_str()).collect();
assert!(topics.contains("/camera/rgb"));
assert!(topics.contains("/camera/depth"));
assert!(!topics.contains("/imu"));
let filter = TopicFilter {
include: vec![],
exclude: vec![regex_lite::Regex::new(r"depth$").unwrap()],
};
let plan = DecoderRegistry::empty()
.register_message_decoder::<McapRawDecoder>()
.plan(&buffer, &summary, &filter)
.expect("failed to plan");
let topics: BTreeSet<_> = plan.assignments.iter().map(|a| a.topic.as_str()).collect();
assert_eq!(topics.len(), 2);
assert!(topics.contains("/camera/rgb"));
assert!(topics.contains("/imu"));
assert!(!topics.contains("/camera/depth"));
let filter = TopicFilter {
include: vec![regex_lite::Regex::new(r"^/camera/").unwrap()],
exclude: vec![regex_lite::Regex::new(r"depth$").unwrap()],
};
let plan = DecoderRegistry::empty()
.register_message_decoder::<McapRawDecoder>()
.plan(&buffer, &summary, &filter)
.expect("failed to plan");
assert_eq!(plan.assignments.len(), 1);
assert_eq!(plan.assignments[0].topic, "/camera/rgb");
}
}