rustbag 0.1.1

A high-performance ROS 2 bag player
// Copyright 2025 Ivo Ivanov.
// Copyright 2018 Open Source Robotics Foundation, Inc.
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{Context, Result};
use mcap::read::Summary;
use memmap2::Mmap;
use r2r::qos::{DurabilityPolicy, HistoryPolicy, LivelinessPolicy, ReliabilityPolicy};
use r2r::QosProfile;
use serde::Deserialize;
use std::collections::HashMap;
use std::time::Duration;

#[derive(Debug, Deserialize, Clone, Copy)]
struct QosDurationYaml {
    sec: i64,
    nsec: i64,
}

impl From<QosDurationYaml> for Duration {
    fn from(v: QosDurationYaml) -> Self {
        Duration::new(v.sec as u64, v.nsec as u32)
    }
}

#[derive(Debug, Deserialize, Clone)]
struct QosYamlProfile {
    reliability: i64,
    durability: i64,
    history: i64,
    depth: usize,
    deadline: QosDurationYaml,
    lifespan: QosDurationYaml,
    liveliness: i64,
    liveliness_lease_duration: QosDurationYaml,
    avoid_ros_namespace_conventions: bool,
}

impl From<&QosYamlProfile> for QosProfile {
    fn from(p: &QosYamlProfile) -> Self {
        let reliability = match p.reliability {
            0 => ReliabilityPolicy::BestEffort,
            1 => ReliabilityPolicy::Reliable,
            _ => ReliabilityPolicy::Reliable,
        };
        let durability = match p.durability {
            0 => DurabilityPolicy::TransientLocal,
            1 => DurabilityPolicy::Volatile,
            2 => DurabilityPolicy::SystemDefault,
            _ => DurabilityPolicy::SystemDefault,
        };
        let history = match p.history {
            0 => HistoryPolicy::KeepAll,
            1 => HistoryPolicy::KeepLast,
            _ => HistoryPolicy::KeepLast,
        };
        let liveliness = match p.liveliness {
            0 => LivelinessPolicy::Automatic,
            1 => LivelinessPolicy::ManualByNode,
            2 => LivelinessPolicy::ManualByTopic,
            3 => LivelinessPolicy::SystemDefault,
            _ => LivelinessPolicy::SystemDefault,
        };

        QosProfile {
            depth: p.depth,
            reliability,
            durability,
            history,
            deadline: p.deadline.into(),
            lifespan: p.lifespan.into(),
            liveliness,
            liveliness_lease_duration: p.liveliness_lease_duration.into(),
            avoid_ros_namespace_conventions: p.avoid_ros_namespace_conventions,
        }
    }
}

pub fn parse_qos_from_metadata(root: &serde_yaml::Value) -> Result<HashMap<String, QosProfile>> {
    // topics_with_message_count is a sequence with each having topic_metadata
    let topics = root
        .get("topics_with_message_count")
        .and_then(|v| v.as_sequence())
        .context("missing topics_with_message_count")?;

    let mut qos_map: HashMap<String, QosProfile> = HashMap::new();
    for topic_entry in topics {
        let topic_metadata = &topic_entry["topic_metadata"];
        let topic_name = topic_metadata["name"]
            .as_str()
            .context("missing topic name")?;
        let qos_yaml_str = topic_metadata["offered_qos_profiles"]
            .as_str()
            .context("missing offered_qos_profiles")?;
        let profiles: Vec<QosYamlProfile> =
            serde_yaml::from_str(qos_yaml_str).context("invalid qos profiles yaml")?;
        if let Some(first) = profiles.first() {
            qos_map.insert(topic_name.to_string(), QosProfile::from(first));
        }
    }
    Ok(qos_map)
}

pub fn read_qos_for_publishers(
    file_buf: &Mmap,
    mcap_summary: &Summary,
) -> HashMap<String, QosProfile> {
    if let Some(mt_index) = &mcap_summary.metadata_indexes.first() {
        let meta = match mcap::read::metadata(file_buf, mt_index) {
            Ok(m) => m,
            Err(e) => {
                log::warn!("Failed reading MCAP metadata: {}", e);
                return HashMap::new();
            }
        };
        let Some(serialized) = meta.metadata.get("serialized_metadata") else {
            log::warn!("Missing 'serialized_metadata' in MCAP metadata");
            return HashMap::new();
        };
        let root: serde_yaml::Value = match serde_yaml::from_str(serialized) {
            Ok(v) => v,
            Err(e) => {
                log::warn!("Invalid serialized_metadata yaml: {}", e);
                return HashMap::new();
            }
        };
        match parse_qos_from_metadata(&root) {
            Ok(q) => {
                log::debug!("Parsed qoses = {:#?}", q);
                q
            }
            Err(e) => {
                log::warn!(
                    "Failed to parse QoS metadata, falling back to defaults: {}",
                    e
                );
                HashMap::new()
            }
        }
    } else {
        log::warn!("Found no custom QoS in mcap");
        HashMap::new()
    }
}