1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5#[cfg(feature = "security")]
7use std::path::{Path, PathBuf};
8
9#[allow(unused_imports)]
10use log::{debug, error, info, trace, warn};
11use serde::Serialize;
13use rustdds::{
14 dds::CreateResult,
15 no_key::{DeserializerAdapter, SerializerAdapter},
16 policy::*,
17 *,
18};
19
20use crate::{
21 builtin_topics,
22 entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
23 gid::Gid,
24 names::*,
25 node::{Node, NodeOptions},
26 pubsub::{Publisher, Subscription},
27 NodeCreateError,
28};
29
30lazy_static! {
31pub static ref DEFAULT_SUBSCRIPTION_QOS: QosPolicies = QosPolicyBuilder::new()
36 .durability(Durability::Volatile) .deadline(Deadline(Duration::INFINITE)) .ownership(Ownership::Shared) .reliability(Reliability::BestEffort) .history(History::KeepLast { depth: 1 }) .lifespan(Lifespan {
42 duration: Duration::INFINITE
44 })
45 .build();
46}
47
48lazy_static! {
49pub static ref DEFAULT_PUBLISHER_QOS: QosPolicies = QosPolicyBuilder::new()
51 .durability(Durability::Volatile)
52 .deadline(Deadline(Duration::INFINITE))
53 .ownership(Ownership::Shared)
54 .reliability(Reliability::Reliable{max_blocking_time: Duration::from_millis(100)})
55 .history(History::KeepLast { depth: 1 })
57 .lifespan(Lifespan {
58 duration: Duration::INFINITE
59 })
60 .build();
61}
62
63#[cfg(feature = "security")]
64struct SecurityConfig {
65 security_config_dir: PathBuf,
67 private_key_password: String,
69}
70
71pub struct ContextOptions {
73 domain_id: u16,
74 #[cfg(feature = "security")]
75 security_config: Option<SecurityConfig>,
76}
77
78impl ContextOptions {
79 pub fn new() -> Self {
80 Self {
81 domain_id: 0,
82 #[cfg(feature = "security")]
83 security_config: None,
84 }
85 }
86
87 pub fn domain_id(mut self, domain_id: u16) -> Self {
93 self.domain_id = domain_id;
94 self
95 }
96
97 #[cfg(feature = "security")]
103 pub fn enable_security(
104 mut self,
105 security_config_dir: impl AsRef<Path>,
106 private_key_password: String,
107 ) -> Self {
108 self.security_config = Some(SecurityConfig {
109 security_config_dir: security_config_dir.as_ref().to_path_buf(),
110 private_key_password,
111 });
112 self
113 }
114}
115
116impl Default for ContextOptions {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[derive(Clone)]
129pub struct Context {
130 inner: Arc<Mutex<ContextInner>>,
131}
132
133impl Context {
134 pub fn new() -> CreateResult<Context> {
136 Self::from_domain_participant(DomainParticipant::new(0)?)
137 }
138
139 pub fn with_options(opt: ContextOptions) -> CreateResult<Context> {
141 #[allow(unused_mut)] let mut dpb = DomainParticipantBuilder::new(opt.domain_id);
143
144 #[cfg(feature = "security")]
145 {
146 if let Some(sc) = opt.security_config {
147 dpb = dpb.builtin_security(
148 DomainParticipantSecurityConfigFiles::with_ros_default_names(
149 sc.security_config_dir,
150 sc.private_key_password,
151 ),
152 );
153 }
154 }
155
156 Self::from_domain_participant(dpb.build()?)
157 }
158
159 pub fn from_domain_participant(domain_participant: DomainParticipant) -> CreateResult<Context> {
161 let i = ContextInner::from_domain_participant(domain_participant)?;
162 Ok(Context {
163 inner: Arc::new(Mutex::new(i)),
164 })
165 }
166
167 pub fn new_node(
169 &self,
170 node_name: NodeName,
171 options: NodeOptions,
172 ) -> Result<Node, NodeCreateError> {
173 Node::new(node_name, options, self.clone())
174 }
175
176 pub fn domain_id(&self) -> u16 {
178 self.inner.lock().unwrap().domain_participant.domain_id()
179 }
180
181 pub fn discovered_topics(&self) -> Vec<rustdds::discovery::DiscoveredTopicData> {
183 self.domain_participant().discovered_topics()
184 }
185
186 pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
189 self.inner.lock().unwrap().participant_entities_info()
190 }
191
192 pub fn get_parameter_events_topic(&self) -> Topic {
194 self
195 .inner
196 .lock()
197 .unwrap()
198 .ros_parameter_events_topic
199 .clone()
200 }
201
202 pub fn get_rosout_topic(&self) -> Topic {
207 self.inner.lock().unwrap().ros_rosout_topic.clone()
208 }
209
210 pub fn domain_participant(&self) -> DomainParticipant {
214 self.inner.lock().unwrap().domain_participant.clone()
215 }
216
217 pub fn create_topic(
225 &self,
226 topic_dds_name: String,
227 type_name: MessageTypeName,
228 qos: &QosPolicies,
229 ) -> CreateResult<Topic> {
230 info!("Creating topic, DDS name: {}", topic_dds_name);
231 let topic = self.domain_participant().create_topic(
232 topic_dds_name,
233 type_name.dds_msg_type(),
234 qos,
235 TopicKind::NoKey,
236 )?;
237 info!("Created topic");
239 Ok(topic)
240 }
241
242 pub(crate) fn create_publisher<M>(
243 &self,
244 topic: &Topic,
245 qos: Option<QosPolicies>,
246 ) -> dds::CreateResult<Publisher<M>>
247 where
248 M: Serialize,
249 {
250 let datawriter = self
251 .get_ros_default_publisher()
252 .create_datawriter_no_key(topic, qos)?;
253
254 Ok(Publisher::new(datawriter))
255 }
256
257 pub(crate) fn create_subscription<M>(
258 &self,
259 topic: &Topic,
260 qos: Option<QosPolicies>,
261 ) -> dds::CreateResult<Subscription<M>>
262 where
263 M: 'static,
264 {
265 let datareader = self
266 .get_ros_default_subscriber()
267 .create_simple_datareader_no_key(topic, qos)?;
268 Ok(Subscription::new(datareader))
269 }
270
271 pub(crate) fn create_datawriter<M, SA>(
272 &self,
273 topic: &Topic,
274 qos: Option<QosPolicies>,
275 ) -> dds::CreateResult<no_key::DataWriter<M, SA>>
276 where
277 SA: SerializerAdapter<M>,
278 {
279 self
280 .get_ros_default_publisher()
281 .create_datawriter_no_key(topic, qos)
282 }
283
284 pub(crate) fn create_simpledatareader<M, DA>(
285 &self,
286 topic: &Topic,
287 qos: Option<QosPolicies>,
288 ) -> dds::CreateResult<no_key::SimpleDataReader<M, DA>>
289 where
290 M: 'static,
291 DA: 'static + DeserializerAdapter<M>,
292 {
293 self
294 .get_ros_default_subscriber()
295 .create_simple_datareader_no_key(topic, qos)
296 }
297
298 pub(crate) fn update_node(&mut self, node_info: NodeEntitiesInfo) {
299 self.inner.lock().unwrap().update_node(node_info);
300 }
301
302 pub(crate) fn remove_node(&mut self, node_name: &str) {
303 self.inner.lock().unwrap().remove_node(node_name);
304 }
305
306 fn get_ros_default_publisher(&self) -> rustdds::Publisher {
307 self.inner.lock().unwrap().ros_default_publisher.clone()
308 }
309
310 fn get_ros_default_subscriber(&self) -> rustdds::Subscriber {
311 self.inner.lock().unwrap().ros_default_subscriber.clone()
312 }
313
314 pub(crate) fn ros_discovery_topic(&self) -> Topic {
315 self.inner.lock().unwrap().ros_discovery_topic.clone()
316 }
317}
318
319struct ContextInner {
320 local_nodes: HashMap<String, NodeEntitiesInfo>,
321
322 ros_discovery_topic: Topic,
324 node_writer: Publisher<ParticipantEntitiesInfo>,
325 domain_participant: DomainParticipant,
330 ros_default_publisher: rustdds::Publisher,
333 ros_default_subscriber: rustdds::Subscriber,
334
335 ros_parameter_events_topic: Topic,
336 ros_rosout_topic: Topic,
337}
338
339impl ContextInner {
340 pub fn from_domain_participant(
342 domain_participant: DomainParticipant,
343 ) -> CreateResult<ContextInner> {
344 let ros_default_publisher = domain_participant.create_publisher(&DEFAULT_PUBLISHER_QOS)?;
345 let ros_default_subscriber = domain_participant.create_subscriber(&DEFAULT_SUBSCRIPTION_QOS)?;
346
347 let ros_discovery_topic = domain_participant.create_topic(
349 builtin_topics::ros_discovery::TOPIC_NAME.to_string(),
350 builtin_topics::ros_discovery::TYPE_NAME.to_string(),
351 &builtin_topics::ros_discovery::QOS_PUB,
352 TopicKind::NoKey,
353 )?;
354
355 let ros_parameter_events_topic = domain_participant.create_topic(
356 builtin_topics::parameter_events::TOPIC_NAME.to_string(),
357 builtin_topics::parameter_events::TYPE_NAME.to_string(),
358 &builtin_topics::parameter_events::QOS,
359 TopicKind::NoKey,
360 )?;
361
362 let ros_rosout_topic = domain_participant.create_topic(
363 builtin_topics::rosout::TOPIC_NAME.to_string(),
364 builtin_topics::rosout::TYPE_NAME.to_string(),
365 &builtin_topics::rosout::QOS,
366 TopicKind::NoKey,
367 )?;
368
369 let node_writer =
370 Publisher::new(ros_default_publisher.create_datawriter_no_key(&ros_discovery_topic, None)?);
371
372 Ok(ContextInner {
373 local_nodes: HashMap::new(),
374 node_writer,
375
376 domain_participant,
377 ros_discovery_topic,
378 ros_default_publisher,
379 ros_default_subscriber,
380 ros_parameter_events_topic,
381 ros_rosout_topic,
382 })
383 }
384
385 pub fn participant_entities_info(&self) -> ParticipantEntitiesInfo {
387 ParticipantEntitiesInfo::new(
388 Gid::from(self.domain_participant.guid()),
389 self.local_nodes.values().cloned().collect(),
390 )
391 }
392
393 fn update_node(&mut self, mut node_info: NodeEntitiesInfo) {
395 node_info.add_writer(Gid::from(self.node_writer.guid()));
397
398 self
399 .local_nodes
400 .insert(node_info.fully_qualified_name(), node_info);
401 self.broadcast_node_infos();
402 }
403
404 fn remove_node(&mut self, node_fqn: &str) {
406 self.local_nodes.remove(node_fqn);
407 self.broadcast_node_infos();
408 }
409
410 fn broadcast_node_infos(&self) {
411 let pei = self.participant_entities_info();
412 debug!("ROS discovery publish: {pei:?}");
413 self
414 .node_writer
415 .publish(pei)
416 .unwrap_or_else(|e| error!("Failed to write into node_writer {:?}", e));
417 }
418} impl Drop for ContextInner {
421 fn drop(&mut self) {
422 self.local_nodes.clear();
424 self.broadcast_node_infos();
425 }
426}
427
428#[test]
432fn test_node_create() {
433 use crate::ParameterValue;
434
435 let context = Context::new().unwrap();
436 let _node = context
437 .new_node(
438 NodeName::new("/rustdds", "test_node").unwrap(),
439 NodeOptions::new().declare_parameter("foo", ParameterValue::Boolean(true)),
440 )
441 .is_ok();
442}