#include <cloudini_lib/cloudini.hpp>
#include <cloudini_lib/ros_msg_utils.hpp>
#include <rclcpp/generic_publisher.hpp>
#include <rclcpp/generic_subscription.hpp>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_components/register_node_macro.hpp>
#include <rosidl_typesupport_cpp/message_type_support.hpp>
#include <sensor_msgs/msg/point_cloud2.hpp>
class CloudiniPointcloudConverter : public rclcpp::Node {
public:
CloudiniPointcloudConverter(const rclcpp::NodeOptions& options = rclcpp::NodeOptions());
void callback(std::shared_ptr<rclcpp::SerializedMessage> msg);
~CloudiniPointcloudConverter() {
output_message_.get_rcl_serialized_message().buffer = nullptr;
output_message_.get_rcl_serialized_message().buffer_length = 0;
}
private:
rclcpp::GenericSubscription::SharedPtr point_cloud_subscriber_;
rclcpp::GenericPublisher::SharedPtr point_cloud_publisher_;
void point_cloud_callback(const rclcpp::SerializedMessage& serialized_msg);
std::vector<uint8_t> output_raw_message_;
rclcpp::SerializedMessage output_message_;
bool compressing_ = true;
double resolution_ = 0.001;
uint64_t tot_original_size = 0;
uint64_t tot_compressed_size = 0;
};
rclcpp::QoS adapt_request_to_offers(
const std::string& topic_name, const std::vector<rclcpp::TopicEndpointInfo>& endpoints) {
rclcpp::QoS request_qos(rmw_qos_profile_default.depth);
if (endpoints.empty()) {
return request_qos;
}
size_t reliability_reliable_endpoints_count = 0;
size_t durability_transient_local_endpoints_count = 0;
for (const auto& endpoint : endpoints) {
const auto& profile = endpoint.qos_profile().get_rmw_qos_profile();
if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
reliability_reliable_endpoints_count++;
}
if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
durability_transient_local_endpoints_count++;
}
}
if (reliability_reliable_endpoints_count == endpoints.size()) {
request_qos.reliable();
} else {
request_qos.best_effort();
}
if (durability_transient_local_endpoints_count == endpoints.size()) {
request_qos.transient_local();
} else {
request_qos.durability_volatile();
}
return request_qos;
}
CloudiniPointcloudConverter::CloudiniPointcloudConverter(const rclcpp::NodeOptions& options)
: rclcpp::Node("cloudini_pointcloud_converter", rclcpp::NodeOptions(options).use_intra_process_comms(true)) {
this->declare_parameter<bool>("compressing", true);
this->declare_parameter<std::string>("topic_input", "/points");
this->declare_parameter<std::string>("topic_output", "");
this->declare_parameter<double>("resolution", 0.001);
compressing_ = this->get_parameter("compressing").as_bool();
resolution_ = this->get_parameter("resolution").as_double();
const std::string input_topic = this->get_parameter("topic_input").as_string();
if (input_topic.empty()) {
RCLCPP_ERROR(this->get_logger(), "Input topic is not set");
throw std::runtime_error("Input topic is not set");
}
std::string output_topic = this->get_parameter("topic_output").as_string();
if (output_topic.empty()) {
output_topic = input_topic + (compressing_ ? "/compressed" : "/decompressed");
RCLCPP_WARN(this->get_logger(), "Output topic is not set, using default: %s", output_topic.c_str());
}
auto publisher_info = this->get_publishers_info_by_topic(input_topic);
auto detected_qos = adapt_request_to_offers(input_topic, publisher_info);
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> callback =
std::bind(&CloudiniPointcloudConverter::callback, this, std::placeholders::_1);
const std::string compressed_topic_type = "point_cloud_interfaces/msg/CompressedPointCloud2";
const std::string pointcloud_topic_type = "sensor_msgs/msg/PointCloud2";
const std::string input_topic_type = compressing_ ? pointcloud_topic_type : compressed_topic_type;
const std::string output_topic_type = compressing_ ? compressed_topic_type : pointcloud_topic_type;
RCLCPP_INFO(
this->get_logger(), "Subscribing to topic '%s' of type '%s'", input_topic.c_str(), input_topic_type.c_str());
point_cloud_subscriber_ = this->create_generic_subscription(
input_topic, input_topic_type, detected_qos, callback);
RCLCPP_INFO(
this->get_logger(), "Publishing to topic '%s' of type '%s'", output_topic.c_str(), output_topic_type.c_str());
point_cloud_publisher_ = this->create_generic_publisher(output_topic, output_topic_type, detected_qos);
}
void CloudiniPointcloudConverter::callback(std::shared_ptr<rclcpp::SerializedMessage> msg) {
if (point_cloud_publisher_->get_subscription_count() == 0) {
return;
}
const auto& input_msg = msg->get_rcl_serialized_message();
const Cloudini::ConstBufferView raw_dds_msg(input_msg.buffer, input_msg.buffer_length);
auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);
if (compressing_) {
cloudini_ros::applyResolutionProfile(cloudini_ros::ResolutionProfile{}, pc_info.fields, resolution_);
const auto encoding_info = cloudini_ros::toEncodingInfo(pc_info);
cloudini_ros::convertPointCloud2ToCompressedCloud(pc_info, encoding_info, output_raw_message_);
} else {
cloudini_ros::convertCompressedCloudToPointCloud2(pc_info, output_raw_message_);
}
output_message_.get_rcl_serialized_message().buffer_length = output_raw_message_.size();
output_message_.get_rcl_serialized_message().buffer = output_raw_message_.data();
point_cloud_publisher_->publish(output_message_);
tot_original_size += input_msg.buffer_length;
tot_compressed_size += output_raw_message_.size();
static int count = 0;
if (count % 20 == 0) {
double average_ratio = static_cast<double>(tot_compressed_size) / tot_original_size;
tot_compressed_size = 0;
tot_original_size = 0;
RCLCPP_INFO(this->get_logger(), "Converted %d messages, average compression ratio: %.2f", count, average_ratio);
}
count++;
}
RCLCPP_COMPONENTS_REGISTER_NODE(CloudiniPointcloudConverter)