use std::collections::HashSet;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::{de::DeserializeOwned, Serialize};
use rustdds::*;
use crate::{
context::Context,
gid::Gid,
log::Log,
node_entities_info::NodeEntitiesInfo,
parameters::*,
pubsub::{Publisher, Subscription},
service::{
basic::BasicServiceMapping, cyclone::CycloneServiceMapping, enhanced::EnhancedServiceMapping,
Client, ClientGeneric, ClientT, Server, ServerGeneric, ServerT, Service, ServiceMapping,
},
};
pub struct NodeOptions {
#[allow(dead_code)]
cli_args: Vec<String>,
#[allow(dead_code)]
use_global_arguments: bool, enable_rosout: bool, #[allow(dead_code)]
start_parameter_services: bool,
#[allow(dead_code)]
parameter_overrides: Vec<Parameter>,
#[allow(dead_code)]
allow_undeclared_parameters: bool,
#[allow(dead_code)]
automatically_declare_parameters_from_overrides: bool,
}
impl NodeOptions {
pub fn new() -> NodeOptions {
NodeOptions {
cli_args: Vec::new(),
use_global_arguments: true,
enable_rosout: true,
start_parameter_services: true,
parameter_overrides: Vec::new(),
allow_undeclared_parameters: false,
automatically_declare_parameters_from_overrides: false,
}
}
pub fn enable_rosout(self, enable: bool) -> NodeOptions {
NodeOptions {
enable_rosout: enable,
..self
}
}
}
impl Default for NodeOptions {
fn default() -> Self {
Self::new()
}
}
pub enum ServiceMappings {
Basic,
Enhanced,
Cyclone,
}
pub struct Node {
name: String,
namespace: String,
options: NodeOptions,
pub(crate) ros_context: Context,
readers: HashSet<GUID>,
writers: HashSet<GUID>,
rosout_writer: Option<Publisher<Log>>,
#[allow(dead_code)]
rosout_reader: Option<Subscription<Log>>,
parameter_events_writer: Publisher<raw::ParameterEvent>,
}
impl Node {
pub(crate) fn new(
name: &str,
namespace: &str,
options: NodeOptions,
ros_context: Context,
) -> Result<Node, dds::Error> {
let paramtopic = ros_context.get_parameter_events_topic();
let rosout_topic = ros_context.get_rosout_topic();
let rosout_writer = if options.enable_rosout {
Some(
ros_context.create_publisher(&rosout_topic, None)?,
)
} else {
None
};
let parameter_events_writer = ros_context
.create_publisher(¶mtopic, None)?;
Ok(Node {
name: String::from(name),
namespace: String::from(namespace),
options,
ros_context,
readers: HashSet::new(),
writers: HashSet::new(),
rosout_writer,
rosout_reader: None, parameter_events_writer,
})
}
fn generate_node_info(&self) -> NodeEntitiesInfo {
let mut node_info = NodeEntitiesInfo::new(self.name.clone(), self.namespace.clone());
node_info.add_writer(Gid::from_guid(self.parameter_events_writer.guid()));
if let Some(row) = &self.rosout_writer {
node_info.add_writer(Gid::from_guid(row.guid()));
}
for reader in &self.readers {
node_info.add_reader(Gid::from_guid(*reader));
}
for writer in &self.writers {
node_info.add_writer(Gid::from_guid(*writer));
}
node_info
}
fn add_reader(&mut self, reader: GUID) {
self.readers.insert(reader);
self.ros_context.add_node_info(self.generate_node_info());
}
pub fn remove_reader(&mut self, reader: &GUID) {
self.readers.remove(reader);
self.ros_context.add_node_info(self.generate_node_info());
}
fn add_writer(&mut self, writer: GUID) {
self.writers.insert(writer);
self.ros_context.add_node_info(self.generate_node_info());
}
pub fn remove_writer(&mut self, writer: &GUID) {
self.writers.remove(writer);
self.ros_context.add_node_info(self.generate_node_info());
}
pub fn clear_node(&mut self) {
self.readers.clear();
self.writers.clear();
self.ros_context.add_node_info(self.generate_node_info());
}
pub fn name(&self) -> &str {
&self.name
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn fully_qualified_name(&self) -> String {
let mut nn = self.name.clone();
nn.push_str(&self.namespace);
nn
}
pub fn options(&self) -> &NodeOptions {
&self.options
}
pub fn domain_id(&self) -> u16 {
self.ros_context.domain_id()
}
pub fn create_topic(
&self,
name: &str,
type_name: String,
qos: &QosPolicies,
) -> Result<Topic, dds::Error> {
let oname = Self::check_name_and_add_prefix("rt/".to_owned(), name)?;
info!("Creating topic, DDS name: {}", oname);
let topic = self.ros_context.domain_participant().create_topic(
oname,
type_name,
qos,
TopicKind::NoKey,
)?;
info!("Created topic");
Ok(topic)
}
pub fn create_subscription<D: DeserializeOwned + 'static>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<Subscription<D>, dds::Error> {
let sub = self.ros_context.create_subscription(topic, qos)?;
self.add_reader(sub.guid());
Ok(sub)
}
fn check_name_and_add_prefix(mut prefix: String, name: &str) -> Result<String, dds::Error> {
if name.is_empty() {
return dds::Error::bad_parameter("Topic name must not be empty.");
}
prefix.push_str(name.strip_prefix('/').unwrap_or(name));
Ok(prefix)
}
pub fn create_publisher<D: Serialize>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> Result<Publisher<D>, dds::Error> {
let p = self.ros_context.create_publisher(topic, qos)?;
self.add_writer(p.guid());
Ok(p)
}
pub fn create_client<S>(
&mut self,
service_mapping: ServiceMappings,
service_name: &str,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> Result<Client<S>, dds::Error>
where
S: Service + 'static,
S::Request: Clone,
{
let inner: Box<dyn ClientT<S>> = match service_mapping {
ServiceMappings::Basic => Box::new(self.create_client_generic::<S, BasicServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?),
ServiceMappings::Enhanced => {
Box::new(self.create_client_generic::<S, EnhancedServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?)
}
ServiceMappings::Cyclone => {
Box::new(self.create_client_generic::<S, CycloneServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?)
}
};
Ok(Client { inner })
}
fn create_client_generic<S, W>(
&mut self,
service_name: &str,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> Result<ClientGeneric<S, W>, dds::Error>
where
S: Service + 'static,
S::Request: Clone,
W: 'static + ServiceMapping<S>,
{
let rq_name =
Self::check_name_and_add_prefix("rq/".to_owned(), &(service_name.to_owned() + "Request"))?;
let rs_name =
Self::check_name_and_add_prefix("rr/".to_owned(), &(service_name.to_owned() + "Reply"))?;
let rq_topic = self.ros_context.domain_participant().create_topic(
rq_name,
S::request_type_name(),
&request_qos,
TopicKind::NoKey,
)?;
let rs_topic = self.ros_context.domain_participant().create_topic(
rs_name,
S::response_type_name(),
&response_qos,
TopicKind::NoKey,
)?;
ClientGeneric::<S, W>::new(
self,
&rq_topic,
&rs_topic,
Some(request_qos),
Some(response_qos),
)
}
pub fn create_server<S>(
&mut self,
service_mapping: ServiceMappings,
service_name: &str,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> Result<Server<S>, dds::Error>
where
S: Service + 'static,
S::Request: Clone,
{
let inner: Box<dyn ServerT<S>> = match service_mapping {
ServiceMappings::Basic => Box::new(self.create_server_generic::<S, BasicServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?),
ServiceMappings::Enhanced => {
Box::new(self.create_server_generic::<S, EnhancedServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?)
}
ServiceMappings::Cyclone => {
Box::new(self.create_server_generic::<S, CycloneServiceMapping<S>>(
service_name,
request_qos,
response_qos,
)?)
}
};
Ok(Server { inner })
}
fn create_server_generic<S, SW>(
&mut self,
service_name: &str,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> Result<ServerGeneric<S, SW>, dds::Error>
where
S: Service + 'static,
S::Request: Clone,
SW: 'static + ServiceMapping<S>,
{
let rq_name =
Self::check_name_and_add_prefix("rq/".to_owned(), &(service_name.to_owned() + "Request"))?;
let rs_name =
Self::check_name_and_add_prefix("rr/".to_owned(), &(service_name.to_owned() + "Reply"))?;
let rq_topic = self.ros_context.domain_participant().create_topic(
rq_name,
S::request_type_name(),
&request_qos,
TopicKind::NoKey,
)?;
let rs_topic = self.ros_context.domain_participant().create_topic(
rs_name,
S::response_type_name(),
&response_qos,
TopicKind::NoKey,
)?;
ServerGeneric::<S, SW>::new(
self,
&rq_topic,
&rs_topic,
Some(request_qos),
Some(response_qos),
)
}
}