use anyhow::{Context, Result};
use mcap::read::Summary;
use memmap2::Mmap;
use r2r::qos::{DurabilityPolicy, HistoryPolicy, LivelinessPolicy, ReliabilityPolicy};
use r2r::QosProfile;
use serde::Deserialize;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Deserialize, Clone, Copy)]
struct QosDurationYaml {
sec: i64,
nsec: i64,
}
impl From<QosDurationYaml> for Duration {
fn from(v: QosDurationYaml) -> Self {
Duration::new(v.sec as u64, v.nsec as u32)
}
}
#[derive(Debug, Deserialize, Clone)]
struct QosYamlProfile {
reliability: i64,
durability: i64,
history: i64,
depth: usize,
deadline: QosDurationYaml,
lifespan: QosDurationYaml,
liveliness: i64,
liveliness_lease_duration: QosDurationYaml,
avoid_ros_namespace_conventions: bool,
}
impl From<&QosYamlProfile> for QosProfile {
fn from(p: &QosYamlProfile) -> Self {
let reliability = match p.reliability {
0 => ReliabilityPolicy::BestEffort,
1 => ReliabilityPolicy::Reliable,
_ => ReliabilityPolicy::Reliable,
};
let durability = match p.durability {
0 => DurabilityPolicy::TransientLocal,
1 => DurabilityPolicy::Volatile,
2 => DurabilityPolicy::SystemDefault,
_ => DurabilityPolicy::SystemDefault,
};
let history = match p.history {
0 => HistoryPolicy::KeepAll,
1 => HistoryPolicy::KeepLast,
_ => HistoryPolicy::KeepLast,
};
let liveliness = match p.liveliness {
0 => LivelinessPolicy::Automatic,
1 => LivelinessPolicy::ManualByNode,
2 => LivelinessPolicy::ManualByTopic,
3 => LivelinessPolicy::SystemDefault,
_ => LivelinessPolicy::SystemDefault,
};
QosProfile {
depth: p.depth,
reliability,
durability,
history,
deadline: p.deadline.into(),
lifespan: p.lifespan.into(),
liveliness,
liveliness_lease_duration: p.liveliness_lease_duration.into(),
avoid_ros_namespace_conventions: p.avoid_ros_namespace_conventions,
}
}
}
pub fn parse_qos_from_metadata(root: &serde_yaml::Value) -> Result<HashMap<String, QosProfile>> {
let topics = root
.get("topics_with_message_count")
.and_then(|v| v.as_sequence())
.context("missing topics_with_message_count")?;
let mut qos_map: HashMap<String, QosProfile> = HashMap::new();
for topic_entry in topics {
let topic_metadata = &topic_entry["topic_metadata"];
let topic_name = topic_metadata["name"]
.as_str()
.context("missing topic name")?;
let qos_yaml_str = topic_metadata["offered_qos_profiles"]
.as_str()
.context("missing offered_qos_profiles")?;
let profiles: Vec<QosYamlProfile> =
serde_yaml::from_str(qos_yaml_str).context("invalid qos profiles yaml")?;
if let Some(first) = profiles.first() {
qos_map.insert(topic_name.to_string(), QosProfile::from(first));
}
}
Ok(qos_map)
}
pub fn read_qos_for_publishers(
file_buf: &Mmap,
mcap_summary: &Summary,
) -> HashMap<String, QosProfile> {
if let Some(mt_index) = &mcap_summary.metadata_indexes.first() {
let meta = match mcap::read::metadata(file_buf, mt_index) {
Ok(m) => m,
Err(e) => {
log::warn!("Failed reading MCAP metadata: {}", e);
return HashMap::new();
}
};
let Some(serialized) = meta.metadata.get("serialized_metadata") else {
log::warn!("Missing 'serialized_metadata' in MCAP metadata");
return HashMap::new();
};
let root: serde_yaml::Value = match serde_yaml::from_str(serialized) {
Ok(v) => v,
Err(e) => {
log::warn!("Invalid serialized_metadata yaml: {}", e);
return HashMap::new();
}
};
match parse_qos_from_metadata(&root) {
Ok(q) => {
log::debug!("Parsed qoses = {:#?}", q);
q
}
Err(e) => {
log::warn!(
"Failed to parse QoS metadata, falling back to defaults: {}",
e
);
HashMap::new()
}
}
} else {
log::warn!("Found no custom QoS in mcap");
HashMap::new()
}
}