ros2_client/
context.rs

1use std::{
2  collections::HashMap,
3  sync::{Arc, Mutex},
4};
5//use futures::{pin_mut, StreamExt};
6#[cfg(feature = "security")]
7use std::path::{Path, PathBuf};
8
9#[allow(unused_imports)]
10use log::{debug, error, info, trace, warn};
11//use mio::Evented;
12use serde::Serialize;
13use rustdds::{
14  dds::CreateResult,
15  no_key::{DeserializerAdapter, SerializerAdapter},
16  policy::*,
17  *,
18};
19
20use crate::{
21  builtin_topics,
22  entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
23  gid::Gid,
24  names::*,
25  node::{Node, NodeOptions},
26  pubsub::{Publisher, Subscription},
27  NodeCreateError,
28};
29
30lazy_static! {
31/// Basic BestEffort QoS for subscribers
32///
33/// Note: If you want Reliable communication, both publisher and Subscriber
34/// must specify QoS Reliability = Reliable.
35  pub static ref DEFAULT_SUBSCRIPTION_QOS: QosPolicies = QosPolicyBuilder::new()
36    .durability(Durability::Volatile) // default per table in DDS Spec v1.4 Section 2.2.3 Supported QoS
37    .deadline(Deadline(Duration::INFINITE)) // default per table in DDS Spec v1.4 Section 2.2.3 Supported QoS
38    .ownership(Ownership::Shared) // default per table in DDS Spec v1.4 Section 2.2.3 Supported QoS
39    .reliability(Reliability::BestEffort) // default for DataReaders and Topics
40    .history(History::KeepLast { depth: 1 }) // default per table in DDS Spec v1.4 Section 2.2.3 Supported QoS
41    .lifespan(Lifespan {
42      // default per table in DDS Spec v1.4 Section 2.2.3 Supported QoS
43      duration: Duration::INFINITE
44    })
45    .build();
46}
47
48lazy_static! {
49/// Basic Reliable QoS for publishing.
50  pub static ref DEFAULT_PUBLISHER_QOS: QosPolicies = QosPolicyBuilder::new()
51    .durability(Durability::Volatile)
52    .deadline(Deadline(Duration::INFINITE))
53    .ownership(Ownership::Shared)
54    .reliability(Reliability::Reliable{max_blocking_time: Duration::from_millis(100)})
55      // Reliability = Reliable is the default for DataWriters, different from above.
56    .history(History::KeepLast { depth: 1 })
57    .lifespan(Lifespan {
58      duration: Duration::INFINITE
59    })
60    .build();
61}
62
63#[cfg(feature = "security")]
64struct SecurityConfig {
65  /// Path to a directory of configuration files.
66  security_config_dir: PathBuf,
67  /// Password used for decryption the private key file.
68  private_key_password: String,
69}
70
71/// Builder for configuring a `Context`
72pub struct ContextOptions {
73  domain_id: u16,
74  #[cfg(feature = "security")]
75  security_config: Option<SecurityConfig>,
76}
77
78impl ContextOptions {
79  pub fn new() -> Self {
80    Self {
81      domain_id: 0,
82      #[cfg(feature = "security")]
83      security_config: None,
84    }
85  }
86
87  /// Set the DDS Domain Id.
88  ///
89  /// Please refer to the
90  /// [ROS_DOMAIN_ID](https://docs.ros.org/en/iron/Concepts/Intermediate/About-Domain-ID.html)
91  /// or DDS documentation.
92  pub fn domain_id(mut self, domain_id: u16) -> Self {
93    self.domain_id = domain_id;
94    self
95  }
96
97  /// Enable DDS security features.
98  ///
99  /// Using security requires providing appropriate configuration files.
100  ///
101  /// [Security in RustDDS](https://github.com/jhelovuo/RustDDS/blob/master/SECURITY.md)
102  #[cfg(feature = "security")]
103  pub fn enable_security(
104    mut self,
105    security_config_dir: impl AsRef<Path>,
106    private_key_password: String,
107  ) -> Self {
108    self.security_config = Some(SecurityConfig {
109      security_config_dir: security_config_dir.as_ref().to_path_buf(),
110      private_key_password,
111    });
112    self
113  }
114}
115
116impl Default for ContextOptions {
117  fn default() -> Self {
118    Self::new()
119  }
120}
121
122/// [Context] communicates with other
123/// participants information in ROS2 network. It keeps track of
124/// [`NodeEntitiesInfo`]s. Also acts as a wrapper for a RustDDS instance.
125///
126/// Context is shut down by dropping it, and all of its RosNodes.
127/// There should be no need for `ok()` or `shutdown()` methods.
128#[derive(Clone)]
129pub struct Context {
130  inner: Arc<Mutex<ContextInner>>,
131}
132
133impl Context {
134  /// Create a new Context with default settings.
135  pub fn new() -> CreateResult<Context> {
136    Self::from_domain_participant(DomainParticipant::new(0)?)
137  }
138
139  /// Create a new Context.
140  pub fn with_options(opt: ContextOptions) -> CreateResult<Context> {
141    #[allow(unused_mut)] // only mutated with security
142    let mut dpb = DomainParticipantBuilder::new(opt.domain_id);
143
144    #[cfg(feature = "security")]
145    {
146      if let Some(sc) = opt.security_config {
147        dpb = dpb.builtin_security(
148          DomainParticipantSecurityConfigFiles::with_ros_default_names(
149            sc.security_config_dir,
150            sc.private_key_password,
151          ),
152        );
153      }
154    }
155
156    Self::from_domain_participant(dpb.build()?)
157  }
158
159  /// Create a new Context from an existing [`DomainParticipant`].
160  pub fn from_domain_participant(domain_participant: DomainParticipant) -> CreateResult<Context> {
161    let i = ContextInner::from_domain_participant(domain_participant)?;
162    Ok(Context {
163      inner: Arc::new(Mutex::new(i)),
164    })
165  }
166
167  /// Create a new ROS2 [`Node`]
168  pub fn new_node(
169    &self,
170    node_name: NodeName,
171    options: NodeOptions,
172  ) -> Result<Node, NodeCreateError> {
173    Node::new(node_name, options, self.clone())
174  }
175
176  /// Query which DDS Domain Id we are using.
177  pub fn domain_id(&self) -> u16 {
178    self.inner.lock().unwrap().domain_participant.domain_id()
179  }
180
181  /// Which topics have been discovered?
182  pub fn discovered_topics(&self) -> Vec<rustdds::discovery::DiscoveredTopicData> {
183    self.domain_participant().discovered_topics()
184  }
185
186  /// Gets the ParticipantEntitiesInfo describing the current state of
187  /// this Context. This is what we send to ROS Discovery.
188  pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
189    self.inner.lock().unwrap().participant_entities_info()
190  }
191
192  /// Get a (handle to) the ROSOut logging Topic.
193  pub fn get_parameter_events_topic(&self) -> Topic {
194    self
195      .inner
196      .lock()
197      .unwrap()
198      .ros_parameter_events_topic
199      .clone()
200  }
201
202  /// Get a (handle to) the ROSOut logging Topic.
203  ///
204  /// Note: The recommended way to write log messages to ROSOut is via the
205  /// [`crate::rosout!`] macro.
206  pub fn get_rosout_topic(&self) -> Topic {
207    self.inner.lock().unwrap().ros_rosout_topic.clone()
208  }
209
210  /// Get the contained DDS [`DomainParticipant`].
211  ///
212  /// The return value is owned, but it is just a cloned smart pointer.
213  pub fn domain_participant(&self) -> DomainParticipant {
214    self.inner.lock().unwrap().domain_participant.clone()
215  }
216
217  // pub fn ros_discovery_stream(&self) -> impl Stream<Item =
218  // ReadResult<(ParticipantEntitiesInfo, MessageInfo)>> + FusedStream + '_ {
219  //   self.inner.lock().unwrap().node_reader.async_stream()
220  // }
221
222  // -----------------------------------------------------------------------
223
224  pub fn create_topic(
225    &self,
226    topic_dds_name: String,
227    type_name: MessageTypeName,
228    qos: &QosPolicies,
229  ) -> CreateResult<Topic> {
230    info!("Creating topic, DDS name: {}", topic_dds_name);
231    let topic = self.domain_participant().create_topic(
232      topic_dds_name,
233      type_name.dds_msg_type(),
234      qos,
235      TopicKind::NoKey,
236    )?;
237    // ROS2 does not use WithKey topics, so always NoKey
238    info!("Created topic");
239    Ok(topic)
240  }
241
242  pub(crate) fn create_publisher<M>(
243    &self,
244    topic: &Topic,
245    qos: Option<QosPolicies>,
246  ) -> dds::CreateResult<Publisher<M>>
247  where
248    M: Serialize,
249  {
250    let datawriter = self
251      .get_ros_default_publisher()
252      .create_datawriter_no_key(topic, qos)?;
253
254    Ok(Publisher::new(datawriter))
255  }
256
257  pub(crate) fn create_subscription<M>(
258    &self,
259    topic: &Topic,
260    qos: Option<QosPolicies>,
261  ) -> dds::CreateResult<Subscription<M>>
262  where
263    M: 'static,
264  {
265    let datareader = self
266      .get_ros_default_subscriber()
267      .create_simple_datareader_no_key(topic, qos)?;
268    Ok(Subscription::new(datareader))
269  }
270
271  pub(crate) fn create_datawriter<M, SA>(
272    &self,
273    topic: &Topic,
274    qos: Option<QosPolicies>,
275  ) -> dds::CreateResult<no_key::DataWriter<M, SA>>
276  where
277    SA: SerializerAdapter<M>,
278  {
279    self
280      .get_ros_default_publisher()
281      .create_datawriter_no_key(topic, qos)
282  }
283
284  pub(crate) fn create_simpledatareader<M, DA>(
285    &self,
286    topic: &Topic,
287    qos: Option<QosPolicies>,
288  ) -> dds::CreateResult<no_key::SimpleDataReader<M, DA>>
289  where
290    M: 'static,
291    DA: 'static + DeserializerAdapter<M>,
292  {
293    self
294      .get_ros_default_subscriber()
295      .create_simple_datareader_no_key(topic, qos)
296  }
297
298  pub(crate) fn update_node(&mut self, node_info: NodeEntitiesInfo) {
299    self.inner.lock().unwrap().update_node(node_info);
300  }
301
302  pub(crate) fn remove_node(&mut self, node_name: &str) {
303    self.inner.lock().unwrap().remove_node(node_name);
304  }
305
306  fn get_ros_default_publisher(&self) -> rustdds::Publisher {
307    self.inner.lock().unwrap().ros_default_publisher.clone()
308  }
309
310  fn get_ros_default_subscriber(&self) -> rustdds::Subscriber {
311    self.inner.lock().unwrap().ros_default_subscriber.clone()
312  }
313
314  pub(crate) fn ros_discovery_topic(&self) -> Topic {
315    self.inner.lock().unwrap().ros_discovery_topic.clone()
316  }
317}
318
319struct ContextInner {
320  local_nodes: HashMap<String, NodeEntitiesInfo>,
321
322  // ROS Discovery: topic, reader and writer
323  ros_discovery_topic: Topic,
324  node_writer: Publisher<ParticipantEntitiesInfo>,
325  // Corresponding ParticipantEntitiesInfo Subscriber is
326  // (optionally) in Node --> Spinner, if it is
327  // activated. Context does not have its own thread of control, so
328  // it cannot do reading.
329  domain_participant: DomainParticipant,
330  // DDS Requires Publisher and Subscriber to create (and group)
331  // DataWriters and DataReaders, so we create one of each.
332  ros_default_publisher: rustdds::Publisher,
333  ros_default_subscriber: rustdds::Subscriber,
334
335  ros_parameter_events_topic: Topic,
336  ros_rosout_topic: Topic,
337}
338
339impl ContextInner {
340  // "new"
341  pub fn from_domain_participant(
342    domain_participant: DomainParticipant,
343  ) -> CreateResult<ContextInner> {
344    let ros_default_publisher = domain_participant.create_publisher(&DEFAULT_PUBLISHER_QOS)?;
345    let ros_default_subscriber = domain_participant.create_subscriber(&DEFAULT_SUBSCRIPTION_QOS)?;
346
347    // This is for tracking (ROS) Node to (DDS) Participant mapping
348    let ros_discovery_topic = domain_participant.create_topic(
349      builtin_topics::ros_discovery::TOPIC_NAME.to_string(),
350      builtin_topics::ros_discovery::TYPE_NAME.to_string(),
351      &builtin_topics::ros_discovery::QOS_PUB,
352      TopicKind::NoKey,
353    )?;
354
355    let ros_parameter_events_topic = domain_participant.create_topic(
356      builtin_topics::parameter_events::TOPIC_NAME.to_string(),
357      builtin_topics::parameter_events::TYPE_NAME.to_string(),
358      &builtin_topics::parameter_events::QOS,
359      TopicKind::NoKey,
360    )?;
361
362    let ros_rosout_topic = domain_participant.create_topic(
363      builtin_topics::rosout::TOPIC_NAME.to_string(),
364      builtin_topics::rosout::TYPE_NAME.to_string(),
365      &builtin_topics::rosout::QOS,
366      TopicKind::NoKey,
367    )?;
368
369    let node_writer =
370      Publisher::new(ros_default_publisher.create_datawriter_no_key(&ros_discovery_topic, None)?);
371
372    Ok(ContextInner {
373      local_nodes: HashMap::new(),
374      node_writer,
375
376      domain_participant,
377      ros_discovery_topic,
378      ros_default_publisher,
379      ros_default_subscriber,
380      ros_parameter_events_topic,
381      ros_rosout_topic,
382    })
383  }
384
385  /// Gets our current participant info we have sent to ROS2 network
386  pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
387    ParticipantEntitiesInfo::new(
388      Gid::from(self.domain_participant.guid()),
389      self.local_nodes.values().cloned().collect(),
390    )
391  }
392
393  // Adds new NodeEntitiesInfo and updates our ContextInfo to ROS2 network
394  fn update_node(&mut self, mut node_info: NodeEntitiesInfo) {
395    // Each node connects also to the ROS discovery topic
396    node_info.add_writer(Gid::from(self.node_writer.guid()));
397
398    self
399      .local_nodes
400      .insert(node_info.fully_qualified_name(), node_info);
401    self.broadcast_node_infos();
402  }
403
404  /// Removes NodeEntitiesInfo and updates our ContextInfo to ROS2 network
405  fn remove_node(&mut self, node_fqn: &str) {
406    self.local_nodes.remove(node_fqn);
407    self.broadcast_node_infos();
408  }
409
410  fn broadcast_node_infos(&self) {
411    let pei = self.participant_entities_info();
412    debug!("ROS discovery publish: {pei:?}");
413    self
414      .node_writer
415      .publish(pei)
416      .unwrap_or_else(|e| error!("Failed to write into node_writer {:?}", e));
417  }
418} // impl ContextInner
419
420impl Drop for ContextInner {
421  fn drop(&mut self) {
422    // Clears all nodes and updates our ContextInfo to ROS2 network
423    self.local_nodes.clear();
424    self.broadcast_node_infos();
425  }
426}
427
428// -------------------------------------------------------------------------------------
429// -------------------------------------------------------------------------------------
430
431#[test]
432fn test_node_create() {
433  use crate::ParameterValue;
434
435  let context = Context::new().unwrap();
436  let _node = context
437    .new_node(
438      NodeName::new("/rustdds", "test_node").unwrap(),
439      NodeOptions::new().declare_parameter("foo", ParameterValue::Boolean(true)),
440    )
441    .is_ok();
442}