use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
#[cfg(feature = "security")]
use std::path::{Path, PathBuf};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::{de::DeserializeOwned, Serialize};
use rustdds::{
dds::CreateResult,
no_key::{DeserializerAdapter, SerializerAdapter},
policy::*,
*,
};
use crate::{
builtin_topics,
entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
gid::Gid,
names::NodeName,
node::{Node, NodeOptions},
pubsub::{Publisher, Subscription},
};
lazy_static! {
pub static ref DEFAULT_SUBSCRIPTION_QOS: QosPolicies = QosPolicyBuilder::new()
.durability(Durability::Volatile) .deadline(Deadline(Duration::INFINITE)) .ownership(Ownership::Shared) .reliability(Reliability::BestEffort) .history(History::KeepLast { depth: 1 }) .lifespan(Lifespan {
duration: Duration::INFINITE
})
.build();
}
lazy_static! {
pub static ref DEFAULT_PUBLISHER_QOS: QosPolicies = QosPolicyBuilder::new()
.durability(Durability::Volatile)
.deadline(Deadline(Duration::INFINITE))
.ownership(Ownership::Shared)
.reliability(Reliability::Reliable{max_blocking_time: Duration::from_millis(100)})
.history(History::KeepLast { depth: 1 })
.lifespan(Lifespan {
duration: Duration::INFINITE
})
.build();
}
#[cfg(feature = "security")]
struct SecurityConfig {
security_config_dir: PathBuf,
private_key_password: String,
}
pub struct ContextOptions {
domain_id: u16,
#[cfg(feature = "security")]
security_config: Option<SecurityConfig>,
}
impl ContextOptions {
pub fn new() -> Self {
Self {
domain_id: 0,
#[cfg(feature = "security")]
security_config: None,
}
}
pub fn domain_id(mut self, domain_id: u16) -> Self {
self.domain_id = domain_id;
self
}
#[cfg(feature = "security")]
pub fn enable_security(
mut self,
security_config_dir: impl AsRef<Path>,
private_key_password: String,
) -> Self {
self.security_config = Some(SecurityConfig {
security_config_dir: security_config_dir.as_ref().to_path_buf(),
private_key_password,
});
self
}
}
impl Default for ContextOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Context {
inner: Arc<Mutex<ContextInner>>,
}
impl Context {
pub fn new() -> CreateResult<Context> {
Self::from_domain_participant(DomainParticipant::new(0)?)
}
pub fn with_options(opt: ContextOptions) -> CreateResult<Context> {
#[allow(unused_mut)] let mut dpb = DomainParticipantBuilder::new(opt.domain_id);
#[cfg(feature = "security")]
{
if let Some(sc) = opt.security_config {
dpb = dpb.builtin_security(
DomainParticipantSecurityConfigFiles::with_ros_default_names(
sc.security_config_dir,
sc.private_key_password,
),
);
}
}
Self::from_domain_participant(dpb.build()?)
}
pub fn from_domain_participant(domain_participant: DomainParticipant) -> CreateResult<Context> {
let i = ContextInner::from_domain_participant(domain_participant)?;
Ok(Context {
inner: Arc::new(Mutex::new(i)),
})
}
pub fn new_node(&self, node_name: NodeName, options: NodeOptions) -> CreateResult<Node> {
Node::new(node_name, options, self.clone())
}
pub fn domain_id(&self) -> u16 {
self.inner.lock().unwrap().domain_participant.domain_id()
}
pub fn discovered_topics(&self) -> Vec<rustdds::discovery::DiscoveredTopicData> {
self.domain_participant().discovered_topics()
}
pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
self.inner.lock().unwrap().participant_entities_info()
}
pub fn get_parameter_events_topic(&self) -> Topic {
self
.inner
.lock()
.unwrap()
.ros_parameter_events_topic
.clone()
}
pub fn get_rosout_topic(&self) -> Topic {
self.inner.lock().unwrap().ros_rosout_topic.clone()
}
pub fn domain_participant(&self) -> DomainParticipant {
self.inner.lock().unwrap().domain_participant.clone()
}
pub(crate) fn create_publisher<M>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> dds::CreateResult<Publisher<M>>
where
M: Serialize,
{
let datawriter = self
.get_ros_default_publisher()
.create_datawriter_no_key(topic, qos)?;
Ok(Publisher::new(datawriter))
}
pub(crate) fn create_subscription<M>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> dds::CreateResult<Subscription<M>>
where
M: 'static + DeserializeOwned,
{
let datareader = self
.get_ros_default_subscriber()
.create_simple_datareader_no_key(topic, qos)?;
Ok(Subscription::new(datareader))
}
pub(crate) fn create_datawriter<M, SA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> dds::CreateResult<no_key::DataWriter<M, SA>>
where
SA: SerializerAdapter<M>,
{
self
.get_ros_default_publisher()
.create_datawriter_no_key(topic, qos)
}
pub(crate) fn create_simpledatareader<M, DA>(
&self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> dds::CreateResult<no_key::SimpleDataReader<M, DA>>
where
M: 'static,
DA: 'static + DeserializerAdapter<M>,
{
self
.get_ros_default_subscriber()
.create_simple_datareader_no_key(topic, qos)
}
pub(crate) fn update_node(&mut self, node_info: NodeEntitiesInfo) {
self.inner.lock().unwrap().update_node(node_info);
}
pub(crate) fn remove_node(&mut self, node_name: &str) {
self.inner.lock().unwrap().remove_node(node_name);
}
fn get_ros_default_publisher(&self) -> rustdds::Publisher {
self.inner.lock().unwrap().ros_default_publisher.clone()
}
fn get_ros_default_subscriber(&self) -> rustdds::Subscriber {
self.inner.lock().unwrap().ros_default_subscriber.clone()
}
pub(crate) fn ros_discovery_topic(&self) -> Topic {
self.inner.lock().unwrap().ros_discovery_topic.clone()
}
}
struct ContextInner {
local_nodes: HashMap<String, NodeEntitiesInfo>,
ros_discovery_topic: Topic,
node_writer: Publisher<ParticipantEntitiesInfo>,
domain_participant: DomainParticipant,
ros_default_publisher: rustdds::Publisher,
ros_default_subscriber: rustdds::Subscriber,
ros_parameter_events_topic: Topic,
ros_rosout_topic: Topic,
}
impl ContextInner {
pub fn from_domain_participant(
domain_participant: DomainParticipant,
) -> CreateResult<ContextInner> {
let ros_default_publisher = domain_participant.create_publisher(&DEFAULT_PUBLISHER_QOS)?;
let ros_default_subscriber = domain_participant.create_subscriber(&DEFAULT_SUBSCRIPTION_QOS)?;
let ros_discovery_topic = domain_participant.create_topic(
builtin_topics::ros_discovery::TOPIC_NAME.to_string(),
builtin_topics::ros_discovery::TYPE_NAME.to_string(),
&builtin_topics::ros_discovery::QOS_PUB,
TopicKind::NoKey,
)?;
let ros_parameter_events_topic = domain_participant.create_topic(
builtin_topics::parameter_events::TOPIC_NAME.to_string(),
builtin_topics::parameter_events::TYPE_NAME.to_string(),
&builtin_topics::parameter_events::QOS,
TopicKind::NoKey,
)?;
let ros_rosout_topic = domain_participant.create_topic(
builtin_topics::rosout::TOPIC_NAME.to_string(),
builtin_topics::rosout::TYPE_NAME.to_string(),
&builtin_topics::rosout::QOS,
TopicKind::NoKey,
)?;
let node_writer =
Publisher::new(ros_default_publisher.create_datawriter_no_key(&ros_discovery_topic, None)?);
Ok(ContextInner {
local_nodes: HashMap::new(),
node_writer,
domain_participant,
ros_discovery_topic,
ros_default_publisher,
ros_default_subscriber,
ros_parameter_events_topic,
ros_rosout_topic,
})
}
pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
ParticipantEntitiesInfo::new(
Gid::from(self.domain_participant.guid()),
self.local_nodes.values().cloned().collect(),
)
}
fn update_node(&mut self, mut node_info: NodeEntitiesInfo) {
node_info.add_writer(Gid::from(self.node_writer.guid()));
self
.local_nodes
.insert(node_info.fully_qualified_name(), node_info);
self.broadcast_node_infos();
}
fn remove_node(&mut self, node_fqn: &str) {
self.local_nodes.remove(node_fqn);
self.broadcast_node_infos();
}
fn broadcast_node_infos(&self) {
let pei = self.participant_entities_info();
debug!("ROS discovery publish: {pei:?}");
self
.node_writer
.publish(pei)
.unwrap_or_else(|e| error!("Failed to write into node_writer {:?}", e));
}
} impl Drop for ContextInner {
fn drop(&mut self) {
self.local_nodes.clear();
self.broadcast_node_infos();
}
}