use std::{
collections::{HashMap, HashSet},
fmt,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
};
use cyclors::dds_entity_t;
use serde::Serialize;
use zenoh::{
bytes::ZBytes,
internal::{
buffers::{Buffer, ZBuf, ZSlice},
zwrite,
},
key_expr::{keyexpr, OwnedKeyExpr},
liveliness::LivelinessToken,
query::{Query, Queryable},
Wait,
};
use crate::{
dds_types::{DDSRawSample, TypeInfo},
dds_utils::{
create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid,
get_instance_handle, is_cdr_little_endian, serialize_entity_guid, CDR_HEADER_BE,
CDR_HEADER_LE,
},
liveliness_mgt::new_ke_liveliness_service_srv,
ros2_utils::{
is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type,
ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE,
},
routes_mgr::Context,
serialize_option_as_bool, LOG_PAYLOAD,
};
#[derive(Serialize)]
pub struct RouteServiceSrv {
ros2_name: String,
ros2_type: String,
zenoh_key_expr: OwnedKeyExpr,
#[serde(skip)]
context: Context,
#[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")]
zenoh_queryable: Option<Queryable<()>>,
#[serde(serialize_with = "serialize_entity_guid")]
req_writer: dds_entity_t,
#[serde(serialize_with = "serialize_entity_guid")]
rep_reader: dds_entity_t,
#[serde(skip)]
client_guid: u64,
#[serde(skip)]
sequence_number: Arc<AtomicU64>,
#[serde(skip)]
queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>>,
#[serde(skip)]
liveliness_token: Option<LivelinessToken>,
remote_routes: HashSet<String>,
local_nodes: HashSet<String>,
}
impl Drop for RouteServiceSrv {
fn drop(&mut self) {
match get_guid(&self.req_writer) {
Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid),
Err(e) => tracing::warn!("{self}: {e}"),
}
match get_guid(&self.rep_reader) {
Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid),
Err(e) => tracing::warn!("{self}: {e}"),
}
if let Err(e) = delete_dds_entity(self.req_writer) {
tracing::warn!("{}: error deleting DDS Writer: {}", self, e);
}
if let Err(e) = delete_dds_entity(self.rep_reader) {
tracing::warn!("{}: error deleting DDS Reader: {}", self, e);
}
}
}
impl fmt::Display for RouteServiceSrv {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Route Service Server (ROS:{} <-> Zenoh:{})",
self.ros2_name, self.zenoh_key_expr
)
}
}
impl RouteServiceSrv {
#[allow(clippy::too_many_arguments)]
pub async fn create(
ros2_name: String,
ros2_type: String,
zenoh_key_expr: OwnedKeyExpr,
type_info: &Option<Arc<TypeInfo>>,
context: Context,
) -> Result<RouteServiceSrv, String> {
let route_id = format!("Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})");
tracing::debug!("{route_id}: creation with type {ros2_type}");
let mut qos = QOS_DEFAULT_SERVICE.clone();
let client_id_str = new_service_id(&context.participant)?;
let user_data = format!("clientid= {client_id_str};");
qos.user_data = Some(user_data.into_bytes());
let req_topic_name = format!("rq{ros2_name}Request");
let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type);
let req_writer = create_dds_writer(
context.participant,
req_topic_name,
req_type_name,
true,
qos.clone(),
)?;
context
.ros_discovery_mgr
.add_dds_writer(get_guid(&req_writer)?);
let client_guid = get_instance_handle(req_writer)?;
tracing::debug!(
"{route_id}: (local client_guid={client_guid:02x?}) id='{client_id_str}' => USER_DATA={:?}",
qos.user_data.as_ref().unwrap()
);
let queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>> =
Arc::new(RwLock::new(HashMap::new()));
let rep_topic_name = format!("rr{ros2_name}Reply");
let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type);
let rep_reader = create_dds_reader(
context.participant,
rep_topic_name,
rep_type_name,
type_info,
true,
qos,
None,
{
let queries_in_progress = queries_in_progress.clone();
let zenoh_key_expr = zenoh_key_expr.clone();
move |sample| {
route_dds_reply_to_zenoh(
sample,
zenoh_key_expr.clone(),
&mut zwrite!(queries_in_progress),
&route_id,
);
}
},
)?;
context
.ros_discovery_mgr
.add_dds_reader(get_guid(&rep_reader)?);
Ok(RouteServiceSrv {
ros2_name,
ros2_type,
zenoh_key_expr,
context,
zenoh_queryable: None,
req_writer,
rep_reader,
client_guid,
sequence_number: Arc::new(AtomicU64::default()),
queries_in_progress,
liveliness_token: None,
remote_routes: HashSet::new(),
local_nodes: HashSet::new(),
})
}
async fn announce_route(&mut self) -> Result<(), String> {
let declared_ke = self
.context
.zsession
.declare_keyexpr(self.zenoh_key_expr.clone())
.await
.map_err(|e| format!("{self}: failed to declare KeyExpr: {e}"))?;
let queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>> =
self.queries_in_progress.clone();
let sequence_number: Arc<AtomicU64> = self.sequence_number.clone();
let route_id: String = self.to_string();
let client_guid = self.client_guid;
let req_writer: i32 = self.req_writer;
self.zenoh_queryable = Some(
self.context
.zsession
.declare_queryable(&self.zenoh_key_expr)
.callback(move |query| {
route_zenoh_request_to_dds(
query,
&mut zwrite!(queries_in_progress),
&sequence_number,
&route_id,
client_guid,
req_writer,
)
})
.await
.map_err(|e| {
format!(
"Failed create Queryable for key {} (rid={}): {e}",
self.zenoh_key_expr, declared_ke
)
})?,
);
if !is_service_for_action(&self.ros2_name) {
let liveliness_ke = new_ke_liveliness_service_srv(
&self.context.zsession.zid().into_keyexpr(),
&self.zenoh_key_expr,
&self.ros2_type,
)?;
tracing::debug!("{self} announce via token {liveliness_ke}");
let ros2_name = self.ros2_name.clone();
self.liveliness_token = Some(self.context.zsession
.liveliness()
.declare_token(liveliness_ke)
.await
.map_err(|e| {
format!(
"Failed create LivelinessToken associated to route for Service Server {ros2_name}: {e}"
)
})?
);
}
Ok(())
}
fn retire_route(&mut self) {
tracing::debug!("{self} retire");
self.zenoh_queryable = None;
self.liveliness_token = None;
}
#[inline]
pub fn add_remote_route(&mut self, zenoh_id: &str, zenoh_key_expr: &keyexpr) {
self.remote_routes
.insert(format!("{zenoh_id}:{zenoh_key_expr}"));
tracing::debug!("{self} now serving remote routes {:?}", self.remote_routes);
}
#[inline]
pub fn remove_remote_route(&mut self, zenoh_id: &str, zenoh_key_expr: &keyexpr) {
self.remote_routes
.remove(&format!("{zenoh_id}:{zenoh_key_expr}"));
tracing::debug!("{self} now serving remote routes {:?}", self.remote_routes);
}
#[inline]
pub fn is_serving_remote_route(&self) -> bool {
!self.remote_routes.is_empty()
}
#[inline]
pub async fn add_local_node(&mut self, node: String) {
self.local_nodes.insert(node);
tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes);
if self.local_nodes.len() == 1 {
if let Err(e) = self.announce_route().await {
tracing::error!("{self} activation failed: {e}");
}
}
}
#[inline]
pub fn remove_local_node(&mut self, node: &str) {
self.local_nodes.remove(node);
tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes);
if self.local_nodes.is_empty() {
self.retire_route();
}
}
#[inline]
pub fn is_serving_local_node(&self) -> bool {
!self.local_nodes.is_empty()
}
#[inline]
pub fn is_unused(&self) -> bool {
!self.is_serving_local_node() && !self.is_serving_remote_route()
}
}
fn route_zenoh_request_to_dds(
query: Query,
queries_in_progress: &mut HashMap<CddsRequestHeader, Query>,
sequence_number: &AtomicU64,
route_id: &str,
client_guid: u64,
req_writer: i32,
) {
let is_little_endian = match query.payload() {
Some(value) if value.len() > 4 => {
is_cdr_little_endian(value.to_bytes().as_ref()).unwrap_or(true)
}
_ => true,
};
let request_id = query
.attachment()
.and_then(|a| CddsRequestHeader::try_from(a).ok())
.unwrap_or_else(|| {
CddsRequestHeader::create(
client_guid,
sequence_number.fetch_add(1, Ordering::Relaxed),
is_little_endian,
)
});
let dds_req_buf = if let Some(value) = query.payload() {
let zenoh_req_buf = value.to_bytes();
if zenoh_req_buf.len() < 4 || zenoh_req_buf[1] > 1 {
tracing::warn!("{route_id}: received invalid request: {zenoh_req_buf:0x?}");
return;
}
let mut dds_req_buf: Vec<u8> = Vec::new();
dds_req_buf.extend_from_slice(&zenoh_req_buf[..4]);
dds_req_buf.extend_from_slice(request_id.as_slice());
dds_req_buf.extend_from_slice(&zenoh_req_buf[4..]);
dds_req_buf
} else {
let mut dds_req_buf: Vec<u8> = if request_id.is_little_endian() {
CDR_HEADER_LE.into()
} else {
CDR_HEADER_BE.into()
};
dds_req_buf.extend_from_slice(request_id.as_slice());
dds_req_buf
};
if *LOG_PAYLOAD {
tracing::debug!(
"{route_id}: routing request {request_id} from Zenoh to DDS - payload: {dds_req_buf:02x?}"
);
} else {
tracing::trace!(
"{route_id}: routing request {request_id} from Zenoh to DDS - {} bytes",
dds_req_buf.len()
);
}
queries_in_progress.insert(request_id, query);
if let Err(e) = dds_write(req_writer, dds_req_buf) {
tracing::warn!("{route_id}: routing request from Zenoh to DDS failed: {e}");
queries_in_progress.remove(&request_id);
}
}
fn route_dds_reply_to_zenoh(
sample: &DDSRawSample,
zenoh_key_expr: OwnedKeyExpr,
queries_in_progress: &mut HashMap<CddsRequestHeader, Query>,
route_id: &str,
) {
let z_bytes: ZBytes = sample.into();
let slice: ZSlice = ZBuf::from(z_bytes).to_zslice();
let (payload, request_id, header) = match (
slice.subslice(20..slice.len()), slice.subslice(4..20).map(|s| s.as_ref().try_into()), slice.subslice(0..4), is_cdr_little_endian(slice.as_ref()), ) {
(Some(payload), Some(Ok(request_id)), Some(header), Some(is_little_endian)) => {
let request_id = CddsRequestHeader::from_slice(request_id, is_little_endian);
(payload, request_id, header)
}
_ => {
tracing::warn!("{route_id}: received invalid request: {sample:0x?} (less than 20 bytes) dropping it");
return;
}
};
match queries_in_progress.remove(&request_id) {
Some(query) => {
let mut zenoh_rep_buf = ZBuf::empty();
zenoh_rep_buf.push_zslice(header);
zenoh_rep_buf.push_zslice(payload);
if *LOG_PAYLOAD {
tracing::debug!("{route_id}: routing reply {request_id} from DDS to Zenoh - payload: {zenoh_rep_buf:02x?}");
} else {
tracing::trace!(
"{route_id}: routing reply {request_id} from DDS to Zenoh - {} bytes",
zenoh_rep_buf.len()
);
}
if let Err(e) = query.reply(zenoh_key_expr, zenoh_rep_buf).wait() {
tracing::warn!("{route_id}: routing reply for request {request_id} from DDS to Zenoh failed: {e}");
}
}
None => tracing::trace!(
"{route_id}: received response from DDS an unknown query: {request_id} - ignore it"
),
}
}