#include "mcap_converter.hpp"
#include <set>
#include <stdexcept>
#include "cloudini_lib/ros_message_definitions.hpp"
#include "cloudini_lib/ros_msg_utils.hpp"
#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <mcap/types.hpp>
#include <mcap/writer.hpp>
McapConverter::TopicsMap McapConverter::open(std::filesystem::path file_in) {
input_stream_.open(file_in);
data_source_ = std::make_shared<mcap::FileStreamReader>(input_stream_);
reader_ = std::make_shared<mcap::McapReader>();
auto res = reader_->open(*data_source_);
if (!res.ok()) {
reader_.reset();
throw std::runtime_error("Error opening MCAP file: " + res.message);
}
TopicsMap topics;
res = reader_->readSummary(mcap::ReadSummaryMethod::AllowFallbackScan);
if (!res.ok()) {
reader_.reset();
throw std::runtime_error("Error reading MCAP sumamry: " + res.message);
}
for (const auto& [channel_id, channel_ptr] : reader_->channels()) {
const auto& schema_ptr = reader_->schema(channel_ptr->schemaId);
if (schema_ptr->name == pointcloud_schema_name || schema_ptr->name == compressed_schema_name) {
topics[channel_ptr->topic] = schema_ptr->name;
}
}
topics_ = topics;
return topics;
}
void McapConverter::duplicateSchemasAndChannels(
const mcap::McapReader& reader, mcap::McapWriter& writer, bool encoding) {
const auto old_schemas = reader.schemas();
std::set<mcap::SchemaId> ordered_schema_id;
for (const auto& [schema_id, _] : old_schemas) {
ordered_schema_id.insert(schema_id);
}
const auto old_channels = reader.channels();
std::set<mcap::ChannelId> ordered_channels_id;
for (const auto& [channel_id, _] : old_channels) {
ordered_channels_id.insert(channel_id);
}
auto copy_string_to_vector = [](const char* str, mcap::ByteArray& array) {
size_t len = strlen(str);
const auto* data_ptr = reinterpret_cast<const std::byte*>(str);
array.resize(len);
std::memcpy(array.data(), data_ptr, len);
};
old_to_new_schema_id_.clear();
old_to_new_channel_id_.clear();
for (const auto& schema_id : ordered_schema_id) {
const auto& schema_ptr = old_schemas.at(schema_id);
auto schema_name = schema_ptr->name;
auto schema_data = schema_ptr->data;
if (encoding && schema_name == pointcloud_schema_name) {
schema_name = compressed_schema_name;
copy_string_to_vector(compressed_schema_data, schema_data);
}
if (!encoding && schema_name == compressed_schema_name) {
schema_name = pointcloud_schema_name;
copy_string_to_vector(pointcloud_schema_data, schema_data);
}
mcap::Schema new_schema(schema_name, schema_ptr->encoding, schema_data);
writer.addSchema(new_schema);
old_to_new_schema_id_.insert({schema_id, new_schema.id});
}
for (const auto& channel_id : ordered_channels_id) {
const auto channel_ptr = old_channels.at(channel_id);
auto new_schema_id = old_to_new_schema_id_.at(channel_ptr->schemaId);
mcap::Channel new_channel(channel_ptr->topic, channel_ptr->messageEncoding, new_schema_id);
writer.addChannel(new_channel);
old_to_new_channel_id_.insert({channel_ptr->id, new_channel.id});
}
for (const auto& [metadata_index_name, metadata_index] : reader_->metadataIndexes()) {
mcap::Record mcap_record;
auto status = mcap::McapReader::ReadRecord(*data_source_, metadata_index.offset, &mcap_record);
if (!status.ok()) {
throw std::runtime_error("Error reading MCAP metadata record: " + status.message);
}
mcap::Metadata mcap_metadata;
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
throw std::runtime_error("Error parsing MCAP metadata record: " + status.message);
}
status = writer.write(mcap_metadata);
if (!status.ok()) {
throw std::runtime_error("Error copying MCAP metadata: " + status.message);
}
}
}
mcap::Compression toMcapCompression(Cloudini::CompressionOption compression) {
switch (compression) {
case Cloudini::CompressionOption::ZSTD:
return mcap::Compression::Zstd;
case Cloudini::CompressionOption::LZ4:
return mcap::Compression::Lz4;
case Cloudini::CompressionOption::NONE:
return mcap::Compression::None;
default:
throw std::runtime_error("Unsupported compression option for MCAP writer");
}
}
void McapConverter::encodePointClouds(
std::filesystem::path file_out, std::optional<float> default_resolution,
Cloudini::CompressionOption mcap_writer_compression) {
if (!reader_) {
throw std::runtime_error("McapReader is not initialized. Call open() first.");
}
processed_messages_ = 0;
total_input_bytes_ = 0;
total_output_bytes_ = 0;
total_processing_time_ = std::chrono::microseconds(0);
mcap::Status status{};
mcap::McapWriter writer;
mcap::McapWriterOptions writer_options(reader_->header()->profile);
writer_options.compression = toMcapCompression(mcap_writer_compression);
writer_options.chunkSize = 2 * 1024 * 1024;
status = writer.open(file_out.string(), writer_options);
if (!status.ok()) {
throw std::runtime_error("Error opening MCAP file for writing: " + status.message);
}
duplicateSchemasAndChannels(*reader_, writer, true);
mcap::ReadMessageOptions reader_options;
mcap::ProblemCallback problem = [](const mcap::Status&) {};
std::vector<uint8_t> compressed_dds_msg;
for (const auto& msg : reader_->readMessages(problem, reader_options)) {
mcap::Message new_msg = msg.message;
new_msg.channelId = old_to_new_channel_id_.at(msg.channel->id);
if (msg.schema->name != pointcloud_schema_name) {
auto status = writer.write(new_msg);
if (!status.ok()) {
throw std::runtime_error("Error writing message to MCAP file: " + status.message);
}
continue;
}
processed_messages_++;
const auto t1 = std::chrono::high_resolution_clock::now();
Cloudini::ConstBufferView raw_dds_msg(msg.message.data, msg.message.dataSize);
auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);
cloudini_ros::applyResolutionProfile(profile_resolutions_, pc_info.fields, default_resolution);
auto encoding_info = cloudini_ros::toEncodingInfo(pc_info);
if (mcap_writer_compression == Cloudini::CompressionOption::ZSTD) {
encoding_info.compression_opt = Cloudini::CompressionOption::NONE;
}
cloudini_ros::convertPointCloud2ToCompressedCloud(pc_info, encoding_info, compressed_dds_msg);
new_msg.data = reinterpret_cast<const std::byte*>(compressed_dds_msg.data());
new_msg.dataSize = compressed_dds_msg.size();
const auto t2 = std::chrono::high_resolution_clock::now();
total_processing_time_ += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1);
total_input_bytes_ += msg.message.dataSize;
total_output_bytes_ += new_msg.dataSize;
auto status = writer.write(new_msg);
if (!status.ok()) {
throw std::runtime_error("Error writing message to MCAP file: " + status.message);
}
}
writer.close();
}
void McapConverter::printStatistics() const {
if (processed_messages_ == 0) {
return;
}
std::cout << "Processed " << processed_messages_ << " point cloud messages" << std::endl;
double compression_ratio = static_cast<double>(total_output_bytes_) / static_cast<double>(total_input_bytes_);
std::cout << "Avg Compression ratio: " << compression_ratio << std::endl;
const double count = static_cast<double>(processed_messages_);
const double avg_time_per_message = 0.001 * static_cast<double>(total_processing_time_.count()) / count;
std::cout << "Average processing time per message: " << avg_time_per_message << " milliseconds" << std::endl;
}
void McapConverter::decodePointClouds(
std::filesystem::path file_out, Cloudini::CompressionOption mcap_writer_compression) {
if (!reader_) {
throw std::runtime_error("McapReader is not initialized. Call open() first.");
}
processed_messages_ = 0;
total_input_bytes_ = 0;
total_output_bytes_ = 0;
total_processing_time_ = std::chrono::microseconds(0);
mcap::McapWriter writer;
mcap::McapWriterOptions writer_options(reader_->header()->profile);
writer_options.compression = toMcapCompression(mcap_writer_compression);
writer_options.chunkSize = 2 * 1024 * 1024;
auto status = writer.open(file_out.string(), writer_options);
if (!status.ok()) {
throw std::runtime_error("Error opening MCAP file for writing: " + status.message);
}
duplicateSchemasAndChannels(*reader_, writer, false);
mcap::ReadMessageOptions reader_options;
mcap::ProblemCallback problem = [](const mcap::Status&) {};
std::vector<uint8_t> decoded_cloud;
std::vector<uint8_t> decoded_dds_msg;
for (const auto& msg : reader_->readMessages(problem, reader_options)) {
mcap::Message new_msg = msg.message;
new_msg.channelId = old_to_new_channel_id_.at(msg.channel->id);
if (msg.schema->name != compressed_schema_name) {
auto status = writer.write(new_msg);
if (!status.ok()) {
throw std::runtime_error("Error writing message to MCAP file: " + status.message);
}
continue;
}
processed_messages_++;
const auto t1 = std::chrono::high_resolution_clock::now();
Cloudini::ConstBufferView raw_dds_msg(msg.message.data, msg.message.dataSize);
const auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);
cloudini_ros::convertCompressedCloudToPointCloud2(pc_info, decoded_dds_msg);
new_msg.data = reinterpret_cast<const std::byte*>(decoded_dds_msg.data());
new_msg.dataSize = decoded_dds_msg.size();
const auto t2 = std::chrono::high_resolution_clock::now();
total_processing_time_ += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1);
total_input_bytes_ += msg.message.dataSize;
total_output_bytes_ += new_msg.dataSize;
auto status = writer.write(new_msg);
if (!status.ok()) {
throw std::runtime_error("Error writing message to MCAP file: " + status.message);
}
}
writer.close();
}
std::vector<std::string_view> split(std::string_view str, char delimiter) {
std::vector<std::string_view> tokens;
size_t start = 0, end = 0;
while ((end = str.find(delimiter, start)) != std::string_view::npos) {
tokens.push_back(str.substr(start, end - start));
start = end + 1;
}
tokens.push_back(str.substr(start));
return tokens;
}
std::string_view trimSpaces(std::string_view str) {
size_t start = 0, end = str.size();
while (start < end && std::isspace(str[start])) {
start++;
}
while (end > start && std::isspace(str[end - 1])) {
end--;
}
return str.substr(start, end - start);
}
void McapConverter::addProfile(const std::string& profile) {
auto tokens = split(profile, ';');
for (const auto& token : tokens) {
auto param_tokens = split(token, ':');
if (param_tokens.size() != 2) {
throw std::runtime_error("Invalid profile (wrong number of parameters): " + profile);
}
std::string field_str(trimSpaces(param_tokens[0]));
std::string resolution_str(trimSpaces(param_tokens[1]));
float resolution = 1.0f;
if (resolution_str == "remove") {
resolution = 0.0f;
} else {
try {
resolution = std::stof(resolution_str);
} catch (const std::invalid_argument& e) {
throw std::runtime_error("Invalid profile (failed conversion to float): " + profile);
}
}
if (field_str == "xyz") {
profile_resolutions_["x"] = resolution;
profile_resolutions_["y"] = resolution;
profile_resolutions_["z"] = resolution;
} else {
profile_resolutions_[field_str] = resolution;
}
}
}
std::vector<std::pair<std::string, float>> McapConverter::getProfile() const {
std::vector<std::pair<std::string, float>> profile;
if (profile_resolutions_.empty()) {
return {};
}
if (profile_resolutions_.count("x")) {
profile.push_back({"x", profile_resolutions_.at("x")});
}
if (profile_resolutions_.count("y")) {
profile.push_back({"y", profile_resolutions_.at("y")});
}
if (profile_resolutions_.count("z")) {
profile.push_back({"z", profile_resolutions_.at("z")});
}
for (const auto& [field, resolution] : profile_resolutions_) {
if (field != "x" && field != "y" && field != "z") {
profile.push_back({field, resolution});
}
}
return profile;
}