use std::collections::HashMap;
use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;
use crossbeam::channel::Sender;
use re_chunk::RowId;
use re_lenses::Lenses;
use re_log_types::{SetStoreInfo, StoreId, StoreInfo};
use re_mcap::{DecoderIdentifier, DecoderRegistry, SelectedDecoders, TopicFilter};
use re_quota_channel::send_crossbeam;
use crate::{ImportedData, Importer, ImporterError, ImporterSettings, URDF_DECODER_IDENTIFIER};
const MCAP_IMPORTER_NAME: &str = "McapImporter";
#[derive(Clone)]
pub struct McapImporter {
selected_decoders: SelectedDecoders,
raw_fallback_enabled: bool,
topic_filter: TopicFilter,
lenses_by_time_type: HashMap<re_log_types::TimeType, Arc<Lenses>>,
}
impl Default for McapImporter {
fn default() -> Self {
Self::new(&SelectedDecoders::All)
}
}
impl McapImporter {
pub fn new(selected_decoders: &SelectedDecoders) -> Self {
let mut lenses_by_time_type = HashMap::new();
for time_type in [
re_log_types::TimeType::TimestampNs,
re_log_types::TimeType::DurationNs,
] {
if let Some(lenses) = Self::build_lenses(selected_decoders, time_type) {
lenses_by_time_type.insert(time_type, lenses);
}
}
Self {
selected_decoders: selected_decoders.clone(),
raw_fallback_enabled: true,
topic_filter: TopicFilter::default(),
lenses_by_time_type,
}
}
pub fn with_raw_fallback(mut self, raw_fallback_enabled: bool) -> Self {
self.raw_fallback_enabled = raw_fallback_enabled;
self
}
pub fn with_topic_filter(mut self, topic_filter: TopicFilter) -> Self {
self.topic_filter = topic_filter;
self
}
fn lenses_for(&self, time_type: re_log_types::TimeType) -> Option<Arc<Lenses>> {
if time_type == re_log_types::TimeType::Sequence {
re_log::error_once!("Sequence is not a supported timeline type for MCAP lenses");
return None;
}
self.lenses_by_time_type.get(&time_type).cloned()
}
fn build_lenses(
selected_decoders: &SelectedDecoders,
time_type: re_log_types::TimeType,
) -> Option<Arc<Lenses>> {
match super::lenses::mcap_lenses(selected_decoders, time_type) {
Ok(Some(lenses)) => Some(Arc::new(lenses)),
Ok(None) => None,
Err(err) => {
re_log::error_once!(
"Failed to build MCAP lenses: {err}. MCAP importer will run without them."
);
None
}
}
}
pub fn emit_chunks(
&self,
mcap: &[u8],
timeline_type: re_log_types::TimeType,
timestamp_offset_ns: Option<i64>,
emit_chunk: &(dyn Fn(re_chunk::Chunk) + Send + Sync),
) -> Result<(), ImporterError> {
re_tracing::profile_function!();
let lenses = self.lenses_for(timeline_type);
let emit_final_chunk = |chunk: re_chunk::Chunk| {
let mut chunk = apply_timestamp_offset(chunk, timestamp_offset_ns);
chunk.sort_if_unsorted();
emit_chunk(chunk);
};
let on_chunk_with_transforms = |chunk: re_chunk::Chunk| {
if let Some(ref lenses) = lenses {
for result in lenses.apply(&chunk) {
match result {
Ok(chunk) => emit_final_chunk(chunk),
Err(partial) => {
for error in partial.errors() {
re_log::error_once!("Lens error: {error}");
}
if let Some(chunk) = partial.partial_chunk() {
emit_final_chunk(chunk);
}
}
}
}
} else {
emit_final_chunk(chunk);
}
};
let reader = Cursor::new(&mcap);
let summary = re_mcap::read_summary(reader)?
.ok_or_else(|| anyhow::anyhow!("MCAP file does not contain a summary"))?;
DecoderRegistry::all_builtin(self.raw_fallback_enabled)
.select(&self.selected_decoders)
.plan(mcap, &summary, &self.topic_filter)?
.run(mcap, &summary, timeline_type, &on_chunk_with_transforms)?;
if self
.selected_decoders
.contains(&DecoderIdentifier::from(URDF_DECODER_IDENTIFIER))
&& let Err(err) = super::robot_description::extract_urdf_from_robot_descriptions(
mcap,
&summary,
&self.topic_filter,
&on_chunk_with_transforms,
)
{
re_log::warn_once!("Failed to extract URDF from robot_description topics: {err}");
}
Ok(())
}
}
impl Importer for McapImporter {
fn name(&self) -> crate::ImporterName {
MCAP_IMPORTER_NAME.into()
}
#[cfg(not(target_arch = "wasm32"))]
fn import_from_path(
&self,
settings: &crate::ImporterSettings,
path: std::path::PathBuf,
tx: Sender<crate::ImportedData>,
) -> Result<(), ImporterError> {
if !path.is_file() || !has_mcap_extension(&path) {
return Err(ImporterError::Incompatible(path)); }
re_tracing::profile_function!();
let loader = self.clone();
let settings = settings.clone();
std::thread::Builder::new()
.name(format!("load_mcap({path:?})"))
.spawn(move || {
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(err) => {
re_log::error!("Failed to open MCAP file: {err}");
return;
}
};
#[expect(unsafe_code)]
let mmap = match unsafe { memmap2::Mmap::map(&file) } {
Ok(m) => m,
Err(err) => {
re_log::error!("Failed to mmap MCAP file: {err}");
return;
}
};
if let Err(err) = loader.load_and_send(&mmap, &settings, &tx) {
re_log::error!("Failed to load MCAP file: {err}");
}
})
.map_err(|err| ImporterError::Other(err.into()))?;
Ok(())
}
fn import_from_file_contents(
&self,
settings: &crate::ImporterSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: Sender<crate::ImportedData>,
) -> Result<(), crate::ImporterError> {
if !has_mcap_extension(&filepath) {
return Err(ImporterError::Incompatible(filepath)); }
re_tracing::profile_function!();
let contents = contents.into_owned();
let loader = self.clone();
let settings = settings.clone();
cfg_if::cfg_if! {
if #[cfg(target_arch = "wasm32")] {
loader.load_and_send(&contents, &settings, &tx)?;
} else {
std::thread::Builder::new()
.name(format!("load_mcap({filepath:?})"))
.spawn(move || {
if let Err(err) = loader.load_and_send(&contents, &settings, &tx) {
re_log::error!("Failed to load MCAP file: {err}");
}
})
.map_err(|err| ImporterError::Other(err.into()))?;
}
}
Ok(())
}
}
impl McapImporter {
pub fn load_and_send(
&self,
mcap: &[u8],
settings: &ImporterSettings,
tx: &Sender<ImportedData>,
) -> Result<(), ImporterError> {
re_log::debug!(
"Loading MCAP with timeline type {:?}",
settings.timeline_type
);
let store_id = settings.recommended_store_id();
if send_crossbeam(
tx,
ImportedData::LogMsg(
MCAP_IMPORTER_NAME.to_owned(),
re_log_types::LogMsg::SetStoreInfo(store_info(store_id.clone())),
),
)
.is_err()
{
re_log::debug_once!(
"Failed to send `SetStoreInfo` because smart channel closed unexpectedly."
);
return Ok(());
}
self.emit_chunks(
mcap,
settings.timeline_type,
settings.timestamp_offset_ns,
&|chunk| {
send_chunk_to_channel(tx, &store_id, chunk);
},
)
}
}
fn apply_timestamp_offset(mut chunk: re_chunk::Chunk, offset_ns: Option<i64>) -> re_chunk::Chunk {
if let Some(offset_ns) = offset_ns {
let offset_timelines: Vec<_> = chunk
.timelines()
.values()
.filter(|time_col| time_col.timeline().typ() == re_log_types::TimeType::TimestampNs)
.map(|time_col| time_col.offset_by_nanos(offset_ns))
.collect();
for time_col in offset_timelines {
chunk.add_timeline(time_col).ok();
}
}
chunk
}
fn send_chunk_to_channel(tx: &Sender<ImportedData>, store_id: &StoreId, chunk: re_chunk::Chunk) {
if send_crossbeam(
tx,
ImportedData::Chunk(MCAP_IMPORTER_NAME.to_owned(), store_id.clone(), chunk),
)
.is_err()
{
re_log::debug_once!(
"Failed to send chunk because the smart channel has been closed unexpectedly."
);
}
}
fn store_info(store_id: StoreId) -> SetStoreInfo {
SetStoreInfo {
row_id: *RowId::new(),
info: StoreInfo::new(
store_id,
re_log_types::StoreSource::Other(MCAP_IMPORTER_NAME.to_owned()),
),
}
}
fn has_mcap_extension(filepath: &Path) -> bool {
filepath
.extension()
.map(|ext| ext.eq_ignore_ascii_case("mcap"))
.unwrap_or(false)
}