#include <filesystem>
#include <fstream>
#include <iostream>
#include <set>
#include <unordered_map>
#include "cloudini_lib/cloudini.hpp"
#include "cloudini_lib/ros_message_definitions.hpp"
#include "cxxopts.hpp"
#define MCAP_IMPLEMENTATION
#include "mcap/reader.hpp"
#include "mcap/types.hpp"
#include "mcap/writer.hpp"
class McapCutter {
public:
void cut(
const std::filesystem::path& input_file, const std::filesystem::path& output_file,
int max_messages_per_channel = 1000);
private:
std::ifstream input_stream_;
};
void McapCutter::cut(
const std::filesystem::path& input_file, const std::filesystem::path& output_file, int max_messages_per_channel) {
input_stream_.open(input_file);
auto data_source = std::make_shared<mcap::FileStreamReader>(input_stream_);
auto reader = std::make_shared<mcap::McapReader>();
auto res = reader->open(*data_source);
if (!res.ok()) {
throw std::runtime_error("Error opening MCAP file: " + res.message);
}
res = reader->readSummary(mcap::ReadSummaryMethod::AllowFallbackScan);
if (!res.ok()) {
throw std::runtime_error("Error reading MCAP summary: " + res.message);
}
std::vector<std::pair<mcap::ChannelId, std::string>> pointcloud_channels;
for (const auto& [channel_id, channel_ptr] : reader->channels()) {
const auto& schema_ptr = reader->schema(channel_ptr->schemaId);
if (schema_ptr->name == pointcloud_schema_name) {
pointcloud_channels.push_back({channel_id, channel_ptr->topic});
std::cout << "Found PointCloud2 topic: " << channel_ptr->topic << std::endl;
}
}
if (pointcloud_channels.empty()) {
std::cout << "No PointCloud2 topics found in input file." << std::endl;
return;
}
std::cout << "Found " << pointcloud_channels.size() << " PointCloud2 channels" << std::endl;
mcap::McapWriter writer;
mcap::McapWriterOptions writer_options(reader->header()->profile);
writer_options.compression = mcap::Compression::Zstd;
writer_options.chunkSize = 2 * 1024 * 1024;
auto status = writer.open(output_file.string(), writer_options);
if (!status.ok()) {
throw std::runtime_error("Error opening output MCAP file: " + status.message);
}
mcap::Schema pointcloud_schema(pointcloud_schema_name, "ros2msg", pointcloud_schema_data);
writer.addSchema(pointcloud_schema);
std::map<mcap::ChannelId, mcap::ChannelId> old_to_new_channel_id;
for (const auto& [old_channel_id, topic_name] : pointcloud_channels) {
mcap::Channel new_channel(topic_name, "cdr", pointcloud_schema.id);
writer.addChannel(new_channel);
old_to_new_channel_id[old_channel_id] = new_channel.id;
}
mcap::ReadMessageOptions reader_options;
mcap::ProblemCallback problem = [](const mcap::Status&) {};
std::unordered_map<mcap::ChannelId, int> message_counts;
int total_messages_written = 0;
std::set<mcap::ChannelId> relevant_channel_ids;
for (const auto& [channel_id, _] : pointcloud_channels) {
relevant_channel_ids.insert(channel_id);
}
for (const auto& msg : reader->readMessages(problem, reader_options)) {
if (relevant_channel_ids.find(msg.channel->id) == relevant_channel_ids.end()) {
continue;
}
if (message_counts[msg.channel->id] >= max_messages_per_channel) {
continue;
}
mcap::Message new_msg = msg.message;
new_msg.channelId = old_to_new_channel_id.at(msg.channel->id);
auto write_status = writer.write(new_msg);
if (!write_status.ok()) {
throw std::runtime_error("Error writing message: " + write_status.message);
}
message_counts[msg.channel->id]++;
total_messages_written++;
if (total_messages_written % 100 == 0) {
std::cout << "Processed " << total_messages_written << " messages..." << std::endl;
}
}
writer.close();
std::cout << "\nSummary:" << std::endl;
std::cout << "Total messages written: " << total_messages_written << std::endl;
for (const auto& [channel_id, count] : message_counts) {
const auto channel = reader->channels().at(channel_id);
std::cout << " " << channel->topic << ": " << count << " messages" << std::endl;
}
}
int main(int argc, char** argv) {
cxxopts::Options options("mcap_cutter", "Extract PointCloud2 messages from MCAP files");
options.add_options()("h,help", "Print usage")("f,filename", "Input MCAP file", cxxopts::value<std::string>())(
"o,output", "Output MCAP file", cxxopts::value<std::string>())(
"n,max-messages", "Maximum messages per channel", cxxopts::value<int>()->default_value("1000"));
auto parse_result = options.parse(argc, argv);
if (parse_result.count("help")) {
std::cout << options.help() << std::endl;
return 0;
}
if (!parse_result.count("filename")) {
std::cerr << "Error: Input file name is required." << std::endl;
std::cout << options.help() << std::endl;
return 1;
}
auto input_filename = parse_result["filename"].as<std::string>();
const std::filesystem::path input_file = input_filename;
if (input_file.extension() != ".mcap") {
std::cerr << "Error: Input file must be a .mcap file." << std::endl;
return 1;
}
if (!std::filesystem::exists(input_file)) {
std::cerr << "Error: Input file does not exist: " << input_file << std::endl;
return 1;
}
std::string output_filename = input_file.stem().string() + "_cut.mcap";
if (parse_result.count("output")) {
output_filename = parse_result["output"].as<std::string>();
}
if (std::filesystem::path(output_filename).extension() != ".mcap") {
output_filename += ".mcap";
}
int max_messages = parse_result["max-messages"].as<int>();
if (max_messages <= 0) {
std::cerr << "Error: max-messages must be positive." << std::endl;
return 1;
}
std::cout << "Input file: " << input_file << std::endl;
std::cout << "Output file: " << output_filename << std::endl;
std::cout << "Max messages per channel: " << max_messages << std::endl;
try {
McapCutter cutter;
cutter.cut(input_file, output_filename, max_messages);
std::cout << "\nFile saved as: " << output_filename << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}