cloudini 0.3.1

The cloudini point cloud compression library for Rust.
Documentation
/*
 * Copyright 2025 Davide Faconti
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "mcap_converter.hpp"

#include <set>
#include <stdexcept>

#include "cloudini_lib/ros_message_definitions.hpp"
#include "cloudini_lib/ros_msg_utils.hpp"

#define MCAP_IMPLEMENTATION
#include <mcap/reader.hpp>
#include <mcap/types.hpp>
#include <mcap/writer.hpp>

McapConverter::TopicsMap McapConverter::open(std::filesystem::path file_in) {
  input_stream_.open(file_in);
  data_source_ = std::make_shared<mcap::FileStreamReader>(input_stream_);
  reader_ = std::make_shared<mcap::McapReader>();
  auto res = reader_->open(*data_source_);
  if (!res.ok()) {
    reader_.reset();
    throw std::runtime_error("Error opening MCAP file: " + res.message);
  }

  TopicsMap topics;
  res = reader_->readSummary(mcap::ReadSummaryMethod::AllowFallbackScan);
  if (!res.ok()) {
    reader_.reset();
    throw std::runtime_error("Error reading MCAP sumamry: " + res.message);
  }

  for (const auto& [channel_id, channel_ptr] : reader_->channels()) {
    const auto& schema_ptr = reader_->schema(channel_ptr->schemaId);
    if (schema_ptr->name == pointcloud_schema_name || schema_ptr->name == compressed_schema_name) {
      topics[channel_ptr->topic] = schema_ptr->name;
    }
  }
  topics_ = topics;
  return topics;
}

void McapConverter::duplicateSchemasAndChannels(
    const mcap::McapReader& reader, mcap::McapWriter& writer, bool encoding) {
  const auto old_schemas = reader.schemas();
  std::set<mcap::SchemaId> ordered_schema_id;
  for (const auto& [schema_id, _] : old_schemas) {
    ordered_schema_id.insert(schema_id);
  }

  const auto old_channels = reader.channels();
  std::set<mcap::ChannelId> ordered_channels_id;
  for (const auto& [channel_id, _] : old_channels) {
    ordered_channels_id.insert(channel_id);
  }

  auto copy_string_to_vector = [](const char* str, mcap::ByteArray& array) {
    size_t len = strlen(str);
    const auto* data_ptr = reinterpret_cast<const std::byte*>(str);
    array.resize(len);
    std::memcpy(array.data(), data_ptr, len);
  };

  old_to_new_schema_id_.clear();
  old_to_new_channel_id_.clear();

  for (const auto& schema_id : ordered_schema_id) {
    const auto& schema_ptr = old_schemas.at(schema_id);
    auto schema_name = schema_ptr->name;
    auto schema_data = schema_ptr->data;

    if (encoding && schema_name == pointcloud_schema_name) {
      schema_name = compressed_schema_name;
      copy_string_to_vector(compressed_schema_data, schema_data);
    }
    if (!encoding && schema_name == compressed_schema_name) {
      schema_name = pointcloud_schema_name;
      copy_string_to_vector(pointcloud_schema_data, schema_data);
    }

    mcap::Schema new_schema(schema_name, schema_ptr->encoding, schema_data);
    writer.addSchema(new_schema);
    old_to_new_schema_id_.insert({schema_id, new_schema.id});
  }

  for (const auto& channel_id : ordered_channels_id) {
    const auto channel_ptr = old_channels.at(channel_id);
    auto new_schema_id = old_to_new_schema_id_.at(channel_ptr->schemaId);
    mcap::Channel new_channel(channel_ptr->topic, channel_ptr->messageEncoding, new_schema_id);
    writer.addChannel(new_channel);
    old_to_new_channel_id_.insert({channel_ptr->id, new_channel.id});
  }

  // copy the metadata from the reader to the writer
  for (const auto& [metadata_index_name, metadata_index] : reader_->metadataIndexes()) {
    mcap::Record mcap_record;
    auto status = mcap::McapReader::ReadRecord(*data_source_, metadata_index.offset, &mcap_record);
    if (!status.ok()) {
      throw std::runtime_error("Error reading MCAP metadata record: " + status.message);
    }
    mcap::Metadata mcap_metadata;
    status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
    if (!status.ok()) {
      throw std::runtime_error("Error parsing MCAP metadata record: " + status.message);
    }
    status = writer.write(mcap_metadata);
    if (!status.ok()) {
      throw std::runtime_error("Error copying MCAP metadata: " + status.message);
    }
  }
}

mcap::Compression toMcapCompression(Cloudini::CompressionOption compression) {
  switch (compression) {
    case Cloudini::CompressionOption::ZSTD:
      return mcap::Compression::Zstd;
    case Cloudini::CompressionOption::LZ4:
      return mcap::Compression::Lz4;
    case Cloudini::CompressionOption::NONE:
      return mcap::Compression::None;
    default:
      throw std::runtime_error("Unsupported compression option for MCAP writer");
  }
}
//------------------------------------------------------
void McapConverter::encodePointClouds(
    std::filesystem::path file_out, std::optional<float> default_resolution,
    Cloudini::CompressionOption mcap_writer_compression) {
  if (!reader_) {
    throw std::runtime_error("McapReader is not initialized. Call open() first.");
  }
  processed_messages_ = 0;
  total_input_bytes_ = 0;
  total_output_bytes_ = 0;
  total_processing_time_ = std::chrono::microseconds(0);

  mcap::Status status{};

  mcap::McapWriter writer;
  mcap::McapWriterOptions writer_options(reader_->header()->profile);
  writer_options.compression = toMcapCompression(mcap_writer_compression);
  writer_options.chunkSize = 2 * 1024 * 1024;  // 2 MB chunk size

  status = writer.open(file_out.string(), writer_options);
  if (!status.ok()) {
    throw std::runtime_error("Error opening MCAP file for writing: " + status.message);
  }

  duplicateSchemasAndChannels(*reader_, writer, true);

  mcap::ReadMessageOptions reader_options;
  mcap::ProblemCallback problem = [](const mcap::Status&) {};

  std::vector<uint8_t> compressed_dds_msg;

  for (const auto& msg : reader_->readMessages(problem, reader_options)) {
    mcap::Message new_msg = msg.message;
    new_msg.channelId = old_to_new_channel_id_.at(msg.channel->id);
    // default case (not a point cloud)
    if (msg.schema->name != pointcloud_schema_name) {
      auto status = writer.write(new_msg);
      if (!status.ok()) {
        throw std::runtime_error("Error writing message to MCAP file: " + status.message);
      }
      continue;
    }
    processed_messages_++;
    const auto t1 = std::chrono::high_resolution_clock::now();

    Cloudini::ConstBufferView raw_dds_msg(msg.message.data, msg.message.dataSize);
    auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);

    // Apply the profile to the encoding info. removing fields if resolution is 0
    // Remove first all fields that have resolution 0.0 in the profile
    cloudini_ros::applyResolutionProfile(profile_resolutions_, pc_info.fields, default_resolution);

    auto encoding_info = cloudini_ros::toEncodingInfo(pc_info);
    // no need to do ZSTD compression twice
    if (mcap_writer_compression == Cloudini::CompressionOption::ZSTD) {
      encoding_info.compression_opt = Cloudini::CompressionOption::NONE;
    }

    cloudini_ros::convertPointCloud2ToCompressedCloud(pc_info, encoding_info, compressed_dds_msg);

    // copy pointers to compressed_dds_msg
    new_msg.data = reinterpret_cast<const std::byte*>(compressed_dds_msg.data());
    new_msg.dataSize = compressed_dds_msg.size();

    const auto t2 = std::chrono::high_resolution_clock::now();
    total_processing_time_ += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1);
    total_input_bytes_ += msg.message.dataSize;
    total_output_bytes_ += new_msg.dataSize;

    auto status = writer.write(new_msg);

    if (!status.ok()) {
      throw std::runtime_error("Error writing message to MCAP file: " + status.message);
    }
  }
  writer.close();
}

//------------------------------------------------------
void McapConverter::printStatistics() const {
  if (processed_messages_ == 0) {
    return;
  }
  std::cout << "Processed " << processed_messages_ << " point cloud messages" << std::endl;
  double compression_ratio = static_cast<double>(total_output_bytes_) / static_cast<double>(total_input_bytes_);
  std::cout << "Avg Compression ratio: " << compression_ratio << std::endl;
  const double count = static_cast<double>(processed_messages_);
  const double avg_time_per_message = 0.001 * static_cast<double>(total_processing_time_.count()) / count;
  std::cout << "Average processing time per message: " << avg_time_per_message << " milliseconds" << std::endl;
}

//------------------------------------------------------
void McapConverter::decodePointClouds(
    std::filesystem::path file_out, Cloudini::CompressionOption mcap_writer_compression) {
  if (!reader_) {
    throw std::runtime_error("McapReader is not initialized. Call open() first.");
  }
  processed_messages_ = 0;
  total_input_bytes_ = 0;
  total_output_bytes_ = 0;
  total_processing_time_ = std::chrono::microseconds(0);

  mcap::McapWriter writer;
  mcap::McapWriterOptions writer_options(reader_->header()->profile);
  writer_options.compression = toMcapCompression(mcap_writer_compression);
  writer_options.chunkSize = 2 * 1024 * 1024;  // 10 MB chunk size

  auto status = writer.open(file_out.string(), writer_options);
  if (!status.ok()) {
    throw std::runtime_error("Error opening MCAP file for writing: " + status.message);
  }

  duplicateSchemasAndChannels(*reader_, writer, false);

  mcap::ReadMessageOptions reader_options;
  mcap::ProblemCallback problem = [](const mcap::Status&) {};

  std::vector<uint8_t> decoded_cloud;
  std::vector<uint8_t> decoded_dds_msg;

  for (const auto& msg : reader_->readMessages(problem, reader_options)) {
    mcap::Message new_msg = msg.message;
    new_msg.channelId = old_to_new_channel_id_.at(msg.channel->id);
    // default case (not a point cloud)
    if (msg.schema->name != compressed_schema_name) {
      auto status = writer.write(new_msg);
      if (!status.ok()) {
        throw std::runtime_error("Error writing message to MCAP file: " + status.message);
      }
      continue;
    }
    processed_messages_++;
    const auto t1 = std::chrono::high_resolution_clock::now();

    Cloudini::ConstBufferView raw_dds_msg(msg.message.data, msg.message.dataSize);
    const auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);
    cloudini_ros::convertCompressedCloudToPointCloud2(pc_info, decoded_dds_msg);

    new_msg.data = reinterpret_cast<const std::byte*>(decoded_dds_msg.data());
    new_msg.dataSize = decoded_dds_msg.size();

    const auto t2 = std::chrono::high_resolution_clock::now();
    total_processing_time_ += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1);
    total_input_bytes_ += msg.message.dataSize;
    total_output_bytes_ += new_msg.dataSize;

    auto status = writer.write(new_msg);
    if (!status.ok()) {
      throw std::runtime_error("Error writing message to MCAP file: " + status.message);
    }
  }
  writer.close();
}

//------------------------------------------------------
std::vector<std::string_view> split(std::string_view str, char delimiter) {
  std::vector<std::string_view> tokens;
  size_t start = 0, end = 0;
  while ((end = str.find(delimiter, start)) != std::string_view::npos) {
    tokens.push_back(str.substr(start, end - start));
    start = end + 1;
  }
  tokens.push_back(str.substr(start));
  return tokens;
}

// trim front and back spaces
std::string_view trimSpaces(std::string_view str) {
  size_t start = 0, end = str.size();
  while (start < end && std::isspace(str[start])) {
    start++;
  }
  while (end > start && std::isspace(str[end - 1])) {
    end--;
  }
  return str.substr(start, end - start);
}

void McapConverter::addProfile(const std::string& profile) {
  auto tokens = split(profile, ';');
  for (const auto& token : tokens) {
    auto param_tokens = split(token, ':');
    if (param_tokens.size() != 2) {
      throw std::runtime_error("Invalid profile (wrong number of parameters): " + profile);
    }
    std::string field_str(trimSpaces(param_tokens[0]));
    std::string resolution_str(trimSpaces(param_tokens[1]));
    float resolution = 1.0f;
    if (resolution_str == "remove") {
      resolution = 0.0f;
    } else {
      // check if resolution_str can be converted to float
      try {
        resolution = std::stof(resolution_str);
      } catch (const std::invalid_argument& e) {
        throw std::runtime_error("Invalid profile (failed conversion to float): " + profile);
      }
    }
    if (field_str == "xyz") {
      profile_resolutions_["x"] = resolution;
      profile_resolutions_["y"] = resolution;
      profile_resolutions_["z"] = resolution;
    } else {
      profile_resolutions_[field_str] = resolution;
    }
  }
}

std::vector<std::pair<std::string, float>> McapConverter::getProfile() const {
  std::vector<std::pair<std::string, float>> profile;
  if (profile_resolutions_.empty()) {
    return {};
  }
  if (profile_resolutions_.count("x")) {
    profile.push_back({"x", profile_resolutions_.at("x")});
  }
  if (profile_resolutions_.count("y")) {
    profile.push_back({"y", profile_resolutions_.at("y")});
  }
  if (profile_resolutions_.count("z")) {
    profile.push_back({"z", profile_resolutions_.at("z")});
  }
  for (const auto& [field, resolution] : profile_resolutions_) {
    if (field != "x" && field != "y" && field != "z") {
      profile.push_back({field, resolution});
    }
  }
  return profile;
}