use std::{
env::VarError,
str,
sync::atomic::{AtomicU32, Ordering},
};
use cyclors::{
dds_entity_t,
qos::{
Durability, DurabilityKind, History, HistoryKind, IgnoreLocal, IgnoreLocalKind, Qos,
Reliability, ReliabilityKind, TypeConsistency, TypeConsistencyKind, WriterDataLifecycle,
DDS_INFINITE_TIME,
},
};
use zenoh::{
bytes::ZBytes,
internal::bail,
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
Error as ZError,
};
use crate::{config::Config, dds_utils::get_guid};
pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal";
pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray";
pub const ROS2_ACTION_STATUS_MSG_TYPE_HASH: &str =
"RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e";
pub const ASSUMED_ROS_DISTRO: &str = "iron";
pub const USER_DATA_PROPS_SEPARATOR: char = ';';
pub const USER_DATA_TYPEHASH_KEY: &str = "typehash=";
lazy_static::lazy_static!(
pub static ref ROS_DISTRO: String = get_ros_distro();
pub static ref KE_SUFFIX_ACTION_SEND_GOAL: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("_action/send_goal") };
pub static ref KE_SUFFIX_ACTION_CANCEL_GOAL: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("_action/cancel_goal") };
pub static ref KE_SUFFIX_ACTION_GET_RESULT: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("_action/get_result") };
pub static ref KE_SUFFIX_ACTION_FEEDBACK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("_action/feedback") };
pub static ref KE_SUFFIX_ACTION_STATUS: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("_action/status") };
pub static ref QOS_DEFAULT_SERVICE: Qos = ros2_service_default_qos();
pub static ref QOS_DEFAULT_ACTION_FEEDBACK: Qos = ros2_action_feedback_default_qos();
pub static ref QOS_DEFAULT_ACTION_STATUS: Qos = ros2_action_status_default_qos();
);
pub fn get_ros_distro() -> String {
match std::env::var("ROS_DISTRO") {
Ok(s) if !s.is_empty() => {
tracing::debug!("ROS_DISTRO detected: {s}");
s
}
Ok(_) | Err(VarError::NotPresent) => {
tracing::warn!(
"ROS_DISTRO environment variable is not set. \
Assuming '{ASSUMED_ROS_DISTRO}', but this could lead to errors on 'ros_discovery_info' \
(see https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/issues/21)"
);
ASSUMED_ROS_DISTRO.to_string()
}
Err(VarError::NotUnicode(s)) => {
tracing::warn!(
"ROS_DISTRO environment variable is invalid ('{s:?}'). \
Assuming '{ASSUMED_ROS_DISTRO}', but this could lead to errors on 'ros_discovery_info' \
(see https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/issues/21)"
);
ASSUMED_ROS_DISTRO.to_string()
}
}
}
pub fn ros_distro_is_less_than(distro: &str) -> bool {
assert!(!distro.is_empty());
ROS_DISTRO.chars().next() < distro.chars().next()
}
pub fn ros2_name_to_key_expr(ros2_name: &str, config: &Config) -> OwnedKeyExpr {
if config.namespace == "/" {
unsafe { keyexpr::from_str_unchecked(&ros2_name[1..]) }.to_owned()
} else {
unsafe {
keyexpr::from_str_unchecked(&config.namespace[1..])
/ keyexpr::from_str_unchecked(&ros2_name[1..])
}
}
}
pub fn key_expr_to_ros2_name(key_expr: &keyexpr, config: &Config) -> String {
if config.namespace == "/" {
format!("/{key_expr}")
} else {
match key_expr.as_str().strip_prefix(&config.namespace[1..]) {
Some(s) => s.to_string(),
None => format!("/{key_expr}"),
}
}
}
pub fn dds_type_to_ros2_message_type(dds_topic: &str) -> String {
let result = dds_topic.replace("::dds_::", "::").replace("::", "/");
if result.ends_with('_') {
result[..result.len() - 1].into()
} else {
result
}
}
pub fn ros2_message_type_to_dds_type(ros_topic: &str) -> String {
let mut result = ros_topic.replace('/', "::");
if let Some(pos) = result.rfind(':') {
result.insert_str(pos + 1, "dds_::")
}
result.push('_');
result
}
pub fn dds_type_to_ros2_service_type(dds_topic: &str) -> String {
dds_type_to_ros2_message_type(
dds_topic
.strip_suffix("_Request_")
.or(dds_topic.strip_suffix("_Response_"))
.unwrap_or(dds_topic),
)
}
pub fn ros2_service_type_to_request_dds_type(ros_service: &str) -> String {
ros2_message_type_to_dds_type(&format!("{ros_service}_Request"))
}
pub fn ros2_service_type_to_reply_dds_type(ros_service: &str) -> String {
ros2_message_type_to_dds_type(&format!("{ros_service}_Response"))
}
pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String {
dds_type_to_ros2_message_type(
dds_topic
.strip_suffix("_SendGoal_Request_")
.or(dds_topic.strip_suffix("_SendGoal_Response_"))
.or(dds_topic.strip_suffix("_GetResult_Request_"))
.or(dds_topic.strip_suffix("_GetResult_Response_"))
.or(dds_topic.strip_suffix("_FeedbackMessage_"))
.unwrap_or(dds_topic),
)
}
const ATTACHMENT_KEY_REQUEST_HEADER: [u8; 3] = [0x72, 0x71, 0x68];
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
pub struct CddsRequestHeader {
header: [u8; 16],
is_little_endian: bool,
}
impl CddsRequestHeader {
pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> CddsRequestHeader {
let mut header = [0u8; 16];
if is_little_endian {
header[..8].copy_from_slice(&client_id.to_le_bytes());
header[8..].copy_from_slice(&seq_num.to_le_bytes())
} else {
header[..8].copy_from_slice(&client_id.to_be_bytes());
header[8..].copy_from_slice(&seq_num.to_be_bytes())
}
CddsRequestHeader {
header,
is_little_endian,
}
}
pub fn from_slice(header: [u8; 16], is_little_endian: bool) -> CddsRequestHeader {
CddsRequestHeader {
header,
is_little_endian,
}
}
pub fn is_little_endian(&self) -> bool {
self.is_little_endian
}
pub fn as_slice(&self) -> &[u8] {
&self.header
}
pub fn as_attachment(&self) -> ZBytes {
let mut buf = [0u8; 17];
buf[0..16].copy_from_slice(&self.header);
buf[16] = self.is_little_endian as u8;
let mut writer = ZBytes::writer();
writer.append(ZBytes::from(ATTACHMENT_KEY_REQUEST_HEADER));
writer.append(ZBytes::from(buf));
writer.finish()
}
}
impl TryFrom<&ZBytes> for CddsRequestHeader {
type Error = ZError;
fn try_from(value: &ZBytes) -> Result<Self, Self::Error> {
let bytes = value.to_bytes();
let header = match bytes.get(0..ATTACHMENT_KEY_REQUEST_HEADER.len()) {
Some(header) => header,
None => bail!("No 'key request header' bytes found in attachment"),
};
if header != ATTACHMENT_KEY_REQUEST_HEADER {
bail!(
"Initial {:?} bytes do not match ATTACHMENT_KEY_REQUEST_HEADER",
ATTACHMENT_KEY_REQUEST_HEADER.len()
)
}
if let Some(buf) = bytes.get(ATTACHMENT_KEY_REQUEST_HEADER.len()..) {
if buf.len() == 17 {
let header: [u8; 16] = buf[0..16]
.try_into()
.expect("Shouldn't happen: buf is 17 bytes");
Ok(CddsRequestHeader {
header,
is_little_endian: buf[16] != 0,
})
} else {
bail!("Attachment 'header' is not 16 bytes: {buf:02x?}")
}
} else {
bail!("Could not Read Remaining Attachment Buffer")
}
}
}
impl std::fmt::Display for CddsRequestHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(")?;
for i in &self.header[0..8] {
write!(f, "{i:02x}")?;
}
let seq_num = if self.is_little_endian {
u64::from_le_bytes(
self.header[8..]
.try_into()
.expect("Shouldn't happen: self.header is 16 bytes"),
)
} else {
u64::from_be_bytes(
self.header[8..]
.try_into()
.expect("Shouldn't happen: self.header is 16 bytes"),
)
};
write!(f, ",{seq_num})",)
}
}
fn ros2_service_default_qos() -> Qos {
Qos {
history: Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 10,
}),
reliability: Some(Reliability {
kind: ReliabilityKind::RELIABLE,
max_blocking_time: DDS_INFINITE_TIME,
}),
ignore_local: Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
}
}
fn ros2_action_feedback_default_qos() -> Qos {
let mut qos = Qos {
history: Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 10,
}),
reliability: Some(Reliability {
kind: ReliabilityKind::RELIABLE,
max_blocking_time: DDS_INFINITE_TIME,
}),
data_representation: Some([0].into()),
writer_data_lifecycle: Some(WriterDataLifecycle {
autodispose_unregistered_instances: false,
}),
type_consistency: Some(TypeConsistency {
kind: TypeConsistencyKind::ALLOW_TYPE_COERCION,
ignore_sequence_bounds: true,
ignore_string_bounds: true,
ignore_member_names: false,
prevent_type_widening: false,
force_type_validation: false,
}),
ignore_local: Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
insert_type_hash(
&mut qos,
"RIHS01_0000000000000000000000000000000000000000000000000000000000000000",
);
}
qos
}
fn ros2_action_status_default_qos() -> Qos {
let mut qos = Qos {
durability: Some(Durability {
kind: DurabilityKind::TRANSIENT_LOCAL,
}),
reliability: Some(Reliability {
kind: ReliabilityKind::RELIABLE,
max_blocking_time: DDS_INFINITE_TIME,
}),
data_representation: Some([0].into()),
writer_data_lifecycle: Some(WriterDataLifecycle {
autodispose_unregistered_instances: false,
}),
type_consistency: Some(TypeConsistency {
kind: TypeConsistencyKind::ALLOW_TYPE_COERCION,
ignore_sequence_bounds: true,
ignore_string_bounds: true,
ignore_member_names: false,
prevent_type_widening: false,
force_type_validation: false,
}),
ignore_local: Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
insert_type_hash(&mut qos, ROS2_ACTION_STATUS_MSG_TYPE_HASH);
}
qos
}
pub fn is_service_for_action(ros2_service_name: &str) -> bool {
ros2_service_name.ends_with(KE_SUFFIX_ACTION_SEND_GOAL.as_str())
|| ros2_service_name.ends_with(KE_SUFFIX_ACTION_CANCEL_GOAL.as_str())
|| ros2_service_name.ends_with(KE_SUFFIX_ACTION_GET_RESULT.as_str())
}
pub fn is_message_for_action(ros2_message_name: &str) -> bool {
ros2_message_name.ends_with(KE_SUFFIX_ACTION_FEEDBACK.as_str())
|| ros2_message_name.ends_with(KE_SUFFIX_ACTION_STATUS.as_str())
}
#[inline]
pub fn check_ros_name(name: &str) -> Result<(), String> {
if !name.starts_with('/') || KeyExpr::try_from("&(name[1..])").is_err() {
Err(format!(
"'{name}' cannot be converted as a Zenoh key expression"
))
} else {
Ok(())
}
}
pub fn insert_type_hash(qos: &mut Qos, type_hash: &str) {
let mut s = USER_DATA_TYPEHASH_KEY.to_string();
s.push_str(type_hash);
s.push(USER_DATA_PROPS_SEPARATOR);
match qos.user_data {
Some(ref mut v) => v.extend(s.into_bytes().iter()),
None => qos.user_data = Some(s.into_bytes()),
}
}
lazy_static::lazy_static!(
pub static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::default();
);
pub fn new_service_id(participant: &dds_entity_t) -> Result<String, String> {
let mut id: [u8; 16] = *get_guid(participant)?;
let counter_be = CLIENT_ID_COUNTER
.fetch_add(1, Ordering::Relaxed)
.to_be_bytes();
id[12..].copy_from_slice(&counter_be);
let id_str = id
.iter()
.map(|b| format!("{b:x}"))
.collect::<Vec<String>>()
.join(".");
Ok(id_str)
}
mod tests {
#[test]
fn test_types_conversions() {
use crate::ros2_utils::*;
assert_eq!(
dds_type_to_ros2_message_type("geometry_msgs::msg::dds_::Twist_"),
"geometry_msgs/msg/Twist"
);
assert_eq!(
dds_type_to_ros2_message_type("rcl_interfaces::msg::dds_::Log_"),
"rcl_interfaces/msg/Log"
);
assert_eq!(
ros2_message_type_to_dds_type("geometry_msgs/msg/Twist"),
"geometry_msgs::msg::dds_::Twist_"
);
assert_eq!(
ros2_message_type_to_dds_type("rcl_interfaces/msg/Log"),
"rcl_interfaces::msg::dds_::Log_"
);
assert_eq!(
dds_type_to_ros2_service_type("example_interfaces::srv::dds_::AddTwoInts_Request_"),
"example_interfaces/srv/AddTwoInts"
);
assert_eq!(
dds_type_to_ros2_service_type("example_interfaces::srv::dds_::AddTwoInts_Response_"),
"example_interfaces/srv/AddTwoInts"
);
assert_eq!(
dds_type_to_ros2_service_type("rcl_interfaces::srv::dds_::ListParameters_Request_"),
"rcl_interfaces/srv/ListParameters"
);
assert_eq!(
dds_type_to_ros2_service_type("rcl_interfaces::srv::dds_::ListParameters_Response_"),
"rcl_interfaces/srv/ListParameters"
);
assert_eq!(
dds_type_to_ros2_action_type(
"example_interfaces::action::dds_::Fibonacci_SendGoal_Request_"
),
"example_interfaces/action/Fibonacci"
);
assert_eq!(
dds_type_to_ros2_action_type(
"example_interfaces::action::dds_::Fibonacci_SendGoal_Response_"
),
"example_interfaces/action/Fibonacci"
);
assert_eq!(
dds_type_to_ros2_action_type(
"example_interfaces::action::dds_::Fibonacci_GetResult_Request_"
),
"example_interfaces/action/Fibonacci"
);
assert_eq!(
dds_type_to_ros2_action_type(
"example_interfaces::action::dds_::Fibonacci_GetResult_Response_"
),
"example_interfaces/action/Fibonacci"
);
assert_eq!(
dds_type_to_ros2_action_type(
"example_interfaces::action::dds_::Fibonacci_FeedbackMessage_"
),
"example_interfaces/action/Fibonacci"
);
}
}