use std::ffi::CString;
use std::sync::{Arc, Mutex};
use super::{
get_type_support_handle, get_type_support_library, DynamicMessage, DynamicMessageError,
DynamicMessageMetadata, MessageTypeName,
};
use crate::error::{RclrsError, ToResult};
use crate::rcl_bindings::*;
use crate::{Node, PublisherHandle, PublisherOptions, ENTITY_LIFECYCLE_MUTEX};
pub type DynamicPublisher = Arc<DynamicPublisherState>;
pub struct DynamicPublisherState {
handle: PublisherHandle,
metadata: DynamicMessageMetadata,
#[allow(dead_code)]
type_support_library: Arc<libloading::Library>,
}
impl DynamicPublisherState {
pub(crate) fn create<'a>(
topic_type: MessageTypeName,
options: impl Into<PublisherOptions<'a>>,
node: Node,
) -> Result<Arc<Self>, RclrsError> {
let metadata = DynamicMessageMetadata::new(topic_type)?;
let PublisherOptions { topic, qos } = options.into();
let message_type = &metadata.message_type;
let type_support_library =
get_type_support_library(&message_type.package_name, "rosidl_typesupport_c")?;
let type_support_ptr = unsafe {
get_type_support_handle(
type_support_library.as_ref(),
"rosidl_typesupport_c",
message_type,
)?
};
let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
publisher_options.qos = qos.into();
{
let rcl_node = node.handle().rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
rcl_publisher_init(
&mut rcl_publisher,
&*rcl_node,
type_support_ptr,
topic_c_string.as_ptr(),
&publisher_options,
)
.ok()?;
}
}
Ok(Arc::new(Self {
handle: PublisherHandle {
rcl_publisher: Mutex::new(rcl_publisher),
node,
},
metadata,
type_support_library,
}))
}
pub fn topic_name(&self) -> String {
self.handle.topic_name()
}
pub fn get_subscription_count(&self) -> Result<usize, RclrsError> {
self.handle.get_subscription_count()
}
pub fn publish(&self, mut message: DynamicMessage) -> Result<(), RclrsError> {
if message.metadata.message_type != self.metadata.message_type {
return Err(DynamicMessageError::MessageTypeMismatch.into());
}
let rcl_publisher = &mut *self.handle.rcl_publisher.lock().unwrap();
unsafe {
rcl_publish(
rcl_publisher,
message.storage.as_mut_ptr() as *mut _,
std::ptr::null_mut(),
)
.ok()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::*;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
#[test]
fn dynamic_publisher_is_sync_and_send() {
assert_send::<DynamicPublisher>();
assert_sync::<DynamicPublisher>();
}
#[test]
fn test_dynamic_publishers() -> Result<(), RclrsError> {
use crate::vendor::test_msgs::msg;
use crate::TopicEndpointInfo;
let namespace = "/test_dynamic_publishers_graph";
let graph = construct_test_graph(namespace)?;
let node_1_empty_publisher = graph
.node1
.create_dynamic_publisher("test_msgs/msg/Empty".try_into()?, "graph_test_topic_1")?;
let topic1 = node_1_empty_publisher.topic_name();
let node_1_basic_types_publisher = graph.node1.create_dynamic_publisher(
"test_msgs/msg/BasicTypes".try_into()?,
"graph_test_topic_2",
)?;
let topic2 = node_1_basic_types_publisher.topic_name();
let node_2_default_publisher = graph
.node2
.create_dynamic_publisher("test_msgs/msg/Defaults".try_into()?, "graph_test_topic_3")?;
let topic3 = node_2_default_publisher.topic_name();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(graph.node1.count_publishers(&topic1)?, 1);
assert_eq!(graph.node1.count_publishers(&topic2)?, 1);
assert_eq!(graph.node1.count_publishers(&topic3)?, 1);
let node_1_publisher_names_and_types = graph
.node1
.get_publisher_names_and_types_by_node(&graph.node1.name(), namespace)?;
let types = node_1_publisher_names_and_types.get(&topic1).unwrap();
assert!(types.contains(&"test_msgs/msg/Empty".to_string()));
let types = node_1_publisher_names_and_types.get(&topic2).unwrap();
assert!(types.contains(&"test_msgs/msg/BasicTypes".to_string()));
let node_2_publisher_names_and_types = graph
.node1
.get_publisher_names_and_types_by_node(&graph.node2.name(), namespace)?;
let types = node_2_publisher_names_and_types.get(&topic3).unwrap();
assert!(types.contains(&"test_msgs/msg/Defaults".to_string()));
let expected_publishers_info = vec![TopicEndpointInfo {
node_name: String::from("graph_test_node_1"),
node_namespace: String::from(namespace),
topic_type: String::from("test_msgs/msg/Empty"),
}];
assert_eq!(
graph.node1.get_publishers_info_by_topic(&topic1)?,
expected_publishers_info
);
assert_eq!(
graph.node2.get_publishers_info_by_topic(&topic1)?,
expected_publishers_info
);
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(0));
let _node_1_empty_subscriber = graph
.node1
.create_subscription("graph_test_topic_1", |_msg: msg::Empty| {});
let _node_1_basic_types_subscriber = graph
.node1
.create_subscription("graph_test_topic_2", |_msg: msg::BasicTypes| {});
let _node_2_default_subscriber = graph
.node2
.create_subscription("graph_test_topic_3", |_msg: msg::Defaults| {});
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(1));
let _node_1_empty_subscriber = graph.node1.create_dynamic_subscription(
"test_msgs/msg/Empty".try_into().unwrap(),
"graph_test_topic_1",
|_, _| {},
);
let _node_1_basic_types_subscriber = graph.node1.create_dynamic_subscription(
"test_msgs/msg/BasicTypes".try_into().unwrap(),
"graph_test_topic_2",
|_, _| {},
);
let _node_2_default_subscriber = graph.node2.create_dynamic_subscription(
"test_msgs/msg/Defaults".try_into().unwrap(),
"graph_test_topic_3",
|_, _| {},
);
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(2));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(2));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(2));
Ok(())
}
}