use std::{
borrow::Cow,
ffi::{CStr, CString},
marker::PhantomData,
sync::{Arc, Mutex},
};
use rosidl_runtime_rs::{Message, RmwMessage};
use crate::{
error::{RclrsError, ToResult},
qos::QoSProfile,
rcl_bindings::*,
IntoPrimitiveOptions, Node, Promise, ENTITY_LIFECYCLE_MUTEX,
};
mod loaned_message;
pub use loaned_message::*;
unsafe impl Send for rcl_publisher_t {}
pub(crate) struct PublisherHandle {
pub(crate) rcl_publisher: Mutex<rcl_publisher_t>,
pub(crate) node: Node,
}
impl Drop for PublisherHandle {
fn drop(&mut self) {
let mut rcl_node = self.node.handle().rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
rcl_publisher_fini(self.rcl_publisher.get_mut().unwrap(), &mut *rcl_node);
}
}
}
impl PublisherHandle {
pub(crate) fn topic_name(&self) -> String {
unsafe {
let raw_topic_pointer =
rcl_publisher_get_topic_name(&*self.rcl_publisher.lock().unwrap());
CStr::from_ptr(raw_topic_pointer)
.to_string_lossy()
.into_owned()
}
}
pub(crate) fn get_subscription_count(&self) -> Result<usize, RclrsError> {
let mut subscription_count = 0;
unsafe {
rcl_publisher_get_subscription_count(
&*self.rcl_publisher.lock().unwrap(),
&mut subscription_count,
)
.ok()?
};
Ok(subscription_count)
}
}
pub type Publisher<T> = Arc<PublisherState<T>>;
pub struct PublisherState<T>
where
T: Message,
{
type_support_ptr: *const rosidl_message_type_support_t,
message: PhantomData<T>,
handle: PublisherHandle,
}
unsafe impl<T> Send for PublisherState<T> where T: Message {}
unsafe impl<T> Sync for PublisherState<T> where T: Message {}
impl<T> PublisherState<T>
where
T: Message,
{
pub(crate) fn create<'a>(
options: impl Into<PublisherOptions<'a>>,
node: Node,
) -> Result<Arc<Self>, RclrsError>
where
T: Message,
{
let PublisherOptions { topic, qos } = options.into();
let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() };
let type_support_ptr =
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
publisher_options.qos = qos.into();
{
let rcl_node = node.handle().rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
rcl_publisher_init(
&mut rcl_publisher,
&*rcl_node,
type_support_ptr,
topic_c_string.as_ptr(),
&publisher_options,
)
.ok()?;
}
}
Ok(Arc::new(Self {
type_support_ptr,
message: PhantomData,
handle: PublisherHandle {
rcl_publisher: Mutex::new(rcl_publisher),
node,
},
}))
}
pub fn topic_name(&self) -> String {
self.handle.topic_name()
}
pub fn get_subscription_count(&self) -> Result<usize, RclrsError> {
self.handle.get_subscription_count()
}
pub fn notify_on_subscriber_ready(self: &Arc<PublisherState<T>>) -> Promise<()> {
let publisher = Arc::clone(self);
self.handle.node.notify_on_graph_change(move || {
publisher
.get_subscription_count()
.is_ok_and(|count| count > 0)
})
}
pub fn publish<'a, M: MessageCow<'a, T>>(&self, message: M) -> Result<(), RclrsError> {
let rmw_message = T::into_rmw_message(message.into_cow());
let rcl_publisher = &mut *self.handle.rcl_publisher.lock().unwrap();
unsafe {
rcl_publish(
rcl_publisher,
rmw_message.as_ref() as *const <T as Message>::RmwMsg as *mut _,
std::ptr::null_mut(),
)
.ok()
}
}
}
impl<T> PublisherState<T>
where
T: RmwMessage,
{
pub fn borrow_loaned_message(&self) -> Result<LoanedMessage<'_, T>, RclrsError> {
let mut msg_ptr = std::ptr::null_mut();
unsafe {
rcl_borrow_loaned_message(
&*self.handle.rcl_publisher.lock().unwrap(),
self.type_support_ptr,
&mut msg_ptr,
)
.ok()?;
}
Ok(LoanedMessage {
publisher: self,
msg_ptr: msg_ptr as *mut T,
})
}
pub fn can_loan_messages(&self) -> bool {
unsafe { rcl_publisher_can_loan_messages(&*self.handle.rcl_publisher.lock().unwrap()) }
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct PublisherOptions<'a> {
pub topic: &'a str,
pub qos: QoSProfile,
}
impl<'a> PublisherOptions<'a> {
pub fn new(topic: &'a str) -> Self {
Self {
topic,
qos: QoSProfile::topics_default(),
}
}
}
impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for PublisherOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply_to(&mut options.qos);
options
}
}
pub trait MessageCow<'a, T: Message> {
fn into_cow(self) -> Cow<'a, T>;
}
impl<'a, T: Message> MessageCow<'a, T> for T {
fn into_cow(self) -> Cow<'a, T> {
Cow::Owned(self)
}
}
impl<'a, T: Message> MessageCow<'a, T> for &'a T {
fn into_cow(self) -> Cow<'a, T> {
Cow::Borrowed(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::*;
#[test]
fn traits() {
use crate::vendor::test_msgs;
assert_send::<Publisher<test_msgs::msg::BoundedSequences>>();
assert_sync::<Publisher<test_msgs::msg::BoundedSequences>>();
}
#[test]
fn test_publishers() -> Result<(), RclrsError> {
use crate::{vendor::test_msgs::msg, TopicEndpointInfo};
let namespace = "/test_publishers_graph";
let graph = construct_test_graph(namespace)?;
let node_1_empty_publisher = graph
.node1
.create_publisher::<msg::Empty>("graph_test_topic_1")?;
let topic1 = node_1_empty_publisher.topic_name();
let node_1_basic_types_publisher = graph
.node1
.create_publisher::<msg::BasicTypes>("graph_test_topic_2")?;
let topic2 = node_1_basic_types_publisher.topic_name();
let node_2_default_publisher = graph
.node2
.create_publisher::<msg::Defaults>("graph_test_topic_3")?;
let topic3 = node_2_default_publisher.topic_name();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(graph.node1.count_publishers(&topic1)?, 1);
assert_eq!(graph.node1.count_publishers(&topic2)?, 1);
assert_eq!(graph.node1.count_publishers(&topic3)?, 1);
let node_1_publisher_names_and_types = graph
.node1
.get_publisher_names_and_types_by_node(&graph.node1.name(), namespace)?;
let types = node_1_publisher_names_and_types.get(&topic1).unwrap();
assert!(types.contains(&"test_msgs/msg/Empty".to_string()));
let types = node_1_publisher_names_and_types.get(&topic2).unwrap();
assert!(types.contains(&"test_msgs/msg/BasicTypes".to_string()));
let node_2_publisher_names_and_types = graph
.node1
.get_publisher_names_and_types_by_node(&graph.node2.name(), namespace)?;
let types = node_2_publisher_names_and_types.get(&topic3).unwrap();
assert!(types.contains(&"test_msgs/msg/Defaults".to_string()));
let expected_publishers_info = vec![TopicEndpointInfo {
node_name: String::from("graph_test_node_1"),
node_namespace: String::from(namespace),
topic_type: String::from("test_msgs/msg/Empty"),
}];
assert_eq!(
graph.node1.get_publishers_info_by_topic(&topic1)?,
expected_publishers_info
);
assert_eq!(
graph.node2.get_publishers_info_by_topic(&topic1)?,
expected_publishers_info
);
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(0));
let _node_1_empty_subscriber = graph
.node1
.create_subscription("graph_test_topic_1", |_msg: msg::Empty| {});
let _node_1_basic_types_subscriber = graph
.node1
.create_subscription("graph_test_topic_2", |_msg: msg::BasicTypes| {});
let _node_2_default_subscriber = graph
.node2
.create_subscription("graph_test_topic_3", |_msg: msg::Defaults| {});
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(1));
Ok(())
}
}