#include "cloudini_ros/cloudini_subscriber_pcl.hpp"
#include <cloudini_lib/cloudini.hpp>
#include <cloudini_lib/pcl_conversion.hpp>
#include <cloudini_lib/ros_msg_utils.hpp>
namespace cloudini_ros {
using namespace Cloudini;
CloudiniSubscriberPCL::CloudiniSubscriberPCL(
rclcpp::Node::SharedPtr node, const std::string& topic_name, CallbackType callback, const rclcpp::QoS& qos_profile)
: user_callback_(callback), node_(node.get()) {
const std::string compressed_topic_type = "point_cloud_interfaces/msg/CompressedPointCloud2";
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> generic_callback =
std::bind(&CloudiniSubscriberPCL::messageCallback, this, std::placeholders::_1);
subscription_ = node->create_generic_subscription(topic_name, compressed_topic_type, qos_profile, generic_callback);
RCLCPP_INFO(node_->get_logger(), "CloudiniSubscriberPCL created for topic: %s", topic_name.c_str());
}
CloudiniSubscriberPCL::CloudiniSubscriberPCL(
rclcpp::Node* node, const std::string& topic_name, CallbackType callback, const rclcpp::QoS& qos_profile)
: user_callback_(callback), node_(node) {
const std::string compressed_topic_type = "point_cloud_interfaces/msg/CompressedPointCloud2";
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> generic_callback =
std::bind(&CloudiniSubscriberPCL::messageCallback, this, std::placeholders::_1);
subscription_ = node->create_generic_subscription(topic_name, compressed_topic_type, qos_profile, generic_callback);
RCLCPP_INFO(node_->get_logger(), "CloudiniSubscriberPCL created for topic: %s", topic_name.c_str());
}
CloudiniSubscriberPCL::~CloudiniSubscriberPCL() {
std::lock_guard<std::mutex> lock(pool_mutex_);
for (auto* ptr : cloud_pool_) {
delete ptr;
}
cloud_pool_.clear();
}
std::string CloudiniSubscriberPCL::getTopicName() const {
return subscription_->get_topic_name();
}
pcl::PCLPointCloud2::Ptr CloudiniSubscriberPCL::acquireCloudFromPool() {
pcl::PCLPointCloud2* raw_ptr = nullptr;
std::lock_guard<std::mutex> lock(pool_mutex_);
if (!cloud_pool_.empty()) {
raw_ptr = cloud_pool_.back();
cloud_pool_.pop_back();
} else {
raw_ptr = new pcl::PCLPointCloud2();
}
return pcl::PCLPointCloud2::Ptr(raw_ptr, [this](pcl::PCLPointCloud2* ptr) {
std::lock_guard<std::mutex> lock(pool_mutex_);
if (cloud_pool_.size() < MAX_POOL_SIZE) {
ptr->data.clear();
ptr->fields.clear();
cloud_pool_.push_back(ptr);
} else {
delete ptr;
}
});
}
void CloudiniSubscriberPCL::messageCallback(std::shared_ptr<rclcpp::SerializedMessage> msg) {
try {
const auto& input_msg = msg->get_rcl_serialized_message();
const ConstBufferView raw_dds_msg(input_msg.buffer, input_msg.buffer_length);
const auto pc_info = cloudini_ros::getDeserializedPointCloudMessage(raw_dds_msg);
auto pcl_cloud = acquireCloudFromPool();
PCLPointCloudDecode(pc_info.data, *pcl_cloud);
user_callback_(pcl_cloud);
} catch (const std::exception& e) {
RCLCPP_ERROR(node_->get_logger(), "Failed to decode Cloudini point cloud: %s", e.what());
}
}
}