1use std::{
2 collections::{BTreeMap, BTreeSet},
3 error::Error,
4 fmt,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc, Mutex,
9 },
10};
11
12use futures::{
13 pin_mut, stream, stream::FusedStream, task, task::Poll, Future, FutureExt, Stream, StreamExt,
14};
15use async_channel::Receiver;
16#[allow(unused_imports)]
17use log::{debug, error, info, trace, warn};
18use serde::Serialize;
19use rustdds::{
20 dds::{CreateError, CreateResult},
21 *,
22};
23
24use crate::{
25 action::*,
26 builtin_interfaces,
27 context::{Context, DEFAULT_SUBSCRIPTION_QOS},
28 entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
29 gid::Gid,
30 log::Log,
31 names::*,
32 parameters::*,
33 pubsub::{Publisher, Subscription},
34 rcl_interfaces,
35 ros_time::ROSTime,
36 rosout::{NodeLoggingHandle, RosoutRaw},
37 service::{Client, Server, Service, ServiceMapping},
38};
39
40type ParameterFunc = dyn Fn(&str, &ParameterValue) -> SetParametersResult + Send + Sync;
41#[must_use]
48pub struct NodeOptions {
49 #[allow(dead_code)]
50 cli_args: Vec<String>,
51 #[allow(dead_code)]
52 use_global_arguments: bool, enable_rosout: bool, enable_rosout_reading: bool,
55 start_parameter_services: bool,
56 declared_parameters: Vec<Parameter>,
57 allow_undeclared_parameters: bool,
58 parameter_validator: Option<Box<ParameterFunc>>,
59 parameter_set_action: Option<Box<ParameterFunc>>,
60}
61
62impl NodeOptions {
63 pub fn new() -> NodeOptions {
65 NodeOptions {
68 cli_args: Vec::new(),
69 use_global_arguments: true,
70 enable_rosout: true,
71 enable_rosout_reading: false,
72 start_parameter_services: true,
73 declared_parameters: Vec::new(),
74 allow_undeclared_parameters: false,
75 parameter_validator: None,
76 parameter_set_action: None,
77 }
78 }
79 pub fn enable_rosout(self, enable_rosout: bool) -> NodeOptions {
80 NodeOptions {
81 enable_rosout,
82 ..self
83 }
84 }
85
86 pub fn read_rosout(self, enable_rosout_reading: bool) -> NodeOptions {
87 NodeOptions {
88 enable_rosout_reading,
89 ..self
90 }
91 }
92
93 pub fn declare_parameter(mut self, name: &str, value: ParameterValue) -> NodeOptions {
94 self.declared_parameters.push(Parameter {
95 name: name.to_owned(),
96 value,
97 });
98 self
100 }
101
102 pub fn parameter_validator(mut self, validator: Box<ParameterFunc>) -> NodeOptions {
103 self.parameter_validator = Some(validator);
104 self
105 }
106
107 pub fn parameter_set_action(mut self, action: Box<ParameterFunc>) -> NodeOptions {
108 self.parameter_set_action = Some(action);
109 self
110 }
111}
112
113impl Default for NodeOptions {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118#[derive(Clone, Debug)]
123pub enum NodeEvent {
124 DDS(DomainParticipantStatusEvent),
125 ROS(ParticipantEntitiesInfo),
126}
127
128struct ParameterServers {
129 get_parameters_server: Server<rcl_interfaces::GetParametersService>,
130 get_parameter_types_server: Server<rcl_interfaces::GetParameterTypesService>,
131 list_parameters_server: Server<rcl_interfaces::ListParametersService>,
132 set_parameters_server: Server<rcl_interfaces::SetParametersService>,
133 set_parameters_atomically_server: Server<rcl_interfaces::SetParametersAtomicallyService>,
134 describe_parameters_server: Server<rcl_interfaces::DescribeParametersService>,
135}
136
137pub struct Spinner {
145 ros_context: Context,
146 stop_spin_receiver: async_channel::Receiver<()>,
147
148 readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
149 writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
150 external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
152 status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
154
155 use_sim_time: Arc<AtomicBool>,
156 sim_time: Arc<Mutex<ROSTime>>,
157 clock_topic: Topic,
158 allow_undeclared_parameters: bool,
159
160 parameter_servers: Option<ParameterServers>,
161 parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
162 parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
163 parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
164 parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
165 fully_qualified_node_name: String,
166}
167
168async fn next_if_some<S>(s: &mut Option<S>) -> S::Item
169where
170 S: Stream + Unpin + FusedStream,
171{
172 match s.as_mut() {
173 Some(stream) => stream.select_next_some().await,
174 None => std::future::pending().await,
175 }
176}
177
178impl Spinner {
179 pub async fn spin(self) -> CreateResult<()> {
180 let dds_status_listener = self.ros_context.domain_participant().status_listener();
181 let dds_status_stream = dds_status_listener.as_async_status_stream();
182 pin_mut!(dds_status_stream);
183
184 let ros_discovery_topic = self.ros_context.ros_discovery_topic();
185 let ros_discovery_reader = self
186 .ros_context
187 .create_subscription::<ParticipantEntitiesInfo>(&ros_discovery_topic, None)?;
188 let ros_discovery_stream = ros_discovery_reader.async_stream();
189 pin_mut!(ros_discovery_stream);
190
191 let ros_clock_reader = self
192 .ros_context
193 .create_subscription::<builtin_interfaces::Time>(&self.clock_topic, None)?;
194 let ros_clock_stream = ros_clock_reader.async_stream();
195 pin_mut!(ros_clock_stream);
196
197 let mut get_parameters_stream_opt = self
199 .parameter_servers
200 .as_ref()
201 .map(|s| s.get_parameters_server.receive_request_stream());
202 let mut get_parameter_types_stream_opt = self
203 .parameter_servers
204 .as_ref()
205 .map(|s| s.get_parameter_types_server.receive_request_stream());
206 let mut set_parameters_stream_opt = self
207 .parameter_servers
208 .as_ref()
209 .map(|s| s.set_parameters_server.receive_request_stream());
210 let mut set_parameters_atomically_stream_opt = self
211 .parameter_servers
212 .as_ref()
213 .map(|s| s.set_parameters_atomically_server.receive_request_stream());
214 let mut list_parameter_stream_opt = self
215 .parameter_servers
216 .as_ref()
217 .map(|s| s.list_parameters_server.receive_request_stream());
218 let mut describe_parameters_stream_opt = self
219 .parameter_servers
220 .as_ref()
221 .map(|s| s.describe_parameters_server.receive_request_stream());
222
223 loop {
224 futures::select! {
225 _ = self.stop_spin_receiver.recv().fuse() => {
226 break;
227 }
228
229 clock_msg = ros_clock_stream.select_next_some() => {
230 match clock_msg {
231 Ok((time,_msg_info)) => {
232 *self.sim_time.lock().unwrap() = time.into();
235 }
236 Err(e) => warn!("Simulated clock receive error {e:?}")
237 }
238 }
239
240
241 get_parameters_request = next_if_some(&mut get_parameters_stream_opt).fuse() => {
242 match get_parameters_request {
243 Ok( (req_id, req) ) => {
244 info!("Get parameter request {req:?}");
245 let values = {
246 let param_db = self.parameters.lock().unwrap();
247 req.names.iter()
248 .map(|name| param_db.get(name.as_str())
249 .unwrap_or(&ParameterValue::NotSet))
250 .cloned()
251 .map( raw::ParameterValue::from)
252 .collect()
253 };
254 info!("Get parameters response: {values:?}");
255
256 self.parameter_servers.as_ref().unwrap().get_parameters_server
258 .async_send_response(req_id, rcl_interfaces::GetParametersResponse{ values })
259 .await
260 .unwrap_or_else(|e| warn!("GetParameter response error {e:?}"));
261 }
262 Err(e) => warn!("GetParameter request error {e:?}"),
263 }
264 }
265
266 get_parameter_types_request = next_if_some(&mut get_parameter_types_stream_opt).fuse() => {
267 match get_parameter_types_request {
268 Ok( (req_id, req) ) => {
269 warn!("Get parameter types request");
270 let values = {
271 let param_db = self.parameters.lock().unwrap();
272 req.names.iter()
273 .map(|name| param_db.get(name.as_str())
274 .unwrap_or(&ParameterValue::NotSet))
275 .map(ParameterValue::to_parameter_type_raw)
276 .collect()
277 };
278 info!("Get parameter types response: {values:?}");
279 self.parameter_servers.as_ref().unwrap().get_parameter_types_server
281 .async_send_response(req_id, rcl_interfaces::GetParameterTypesResponse{ values })
282 .await
283 .unwrap_or_else(|e| warn!("GetParameterTypes response error {e:?}"));
284 }
285 Err(e) => warn!("GetParameterTypes request error {e:?}"),
286 }
287 }
288
289 set_parameters_request = next_if_some(&mut set_parameters_stream_opt).fuse() => {
290 match set_parameters_request {
291 Ok( (req_id, req) ) => {
292 info!("Set parameter request {req:?}");
293 let results =
294 req.parameter.iter()
295 .cloned()
296 .map( Parameter::from ) .map( |Parameter{name, value}| self.set_parameter(&name,value))
298 .map(|r| r.into()) .collect();
300 info!("Set parameters response: {results:?}");
301 self.parameter_servers.as_ref().unwrap().set_parameters_server
303 .async_send_response(req_id, rcl_interfaces::SetParametersResponse{ results })
304 .await
305 .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
306 }
307 Err(e) => warn!("SetParameters request error {e:?}"),
308 }
309 }
310
311 set_parameters_atomically_request = next_if_some(&mut set_parameters_atomically_stream_opt).fuse() => {
312 match set_parameters_atomically_request {
313 Ok( (req_id, req) ) => {
314 warn!("Set parameters atomically request {req:?}");
315 let results =
316 req.parameter.iter()
317 .cloned()
318 .map( Parameter::from ) .map( |Parameter{ .. } |
320 Err("Setting parameters atomically is not implemented.".to_owned())
322 )
323 .map(|r| r.into()) .collect();
325 warn!("Set parameters atomically response: {results:?}");
326 self.parameter_servers.as_ref().unwrap().set_parameters_atomically_server
328 .async_send_response(req_id, rcl_interfaces::SetParametersAtomicallyResponse{ results })
329 .await
330 .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
331 }
332 Err(e) => warn!("SetParametersAtomically request error {e:?}"),
333 }
334 }
335
336 list_parameter_request = next_if_some(&mut list_parameter_stream_opt).fuse() => {
337 match list_parameter_request {
338 Ok( (req_id, req) ) => {
339 info!("List parameters request");
340 let prefixes = req.prefixes;
341 let names = {
344 let param_db = self.parameters.lock().unwrap();
345 param_db.keys()
346 .filter_map(|name|
347 if prefixes.is_empty() ||
348 prefixes.iter().any(|prefix| name.starts_with(prefix))
349 {
350 Some(name.clone())
351 } else { None }
352 )
353 .collect()
354 };
355 let result = rcl_interfaces::ListParametersResult{ names, prefixes: vec![] };
356 info!("List parameters response: {result:?}");
358 self.parameter_servers.as_ref().unwrap().list_parameters_server
359 .async_send_response(req_id, rcl_interfaces::ListParametersResponse{ result })
360 .await
361 .unwrap_or_else(|e| warn!("ListParameter response error {e:?}"));
362 }
363 Err(e) => warn!("ListParameter request error {e:?}"),
364 }
365 }
366
367 describe_parameters_request = next_if_some(&mut describe_parameters_stream_opt).fuse() => {
368 match describe_parameters_request {
369 Ok( (req_id, req) ) => {
370 info!("Describe parameters request {req:?}");
371 let values = {
372 let parameters = self.parameters.lock().unwrap();
373 req.names.iter()
374 .map( |name|
375 {
376 if let Some(value) = parameters.get(name) {
377 ParameterDescriptor::from_value(name, value)
378 } else {
379 ParameterDescriptor::unknown(name)
380 }
381 })
382 .map(|r| r.into()) .collect()
384 };
385 info!("Describe parameters response: {values:?}");
386 self.parameter_servers.as_ref().unwrap().describe_parameters_server
388 .async_send_response(req_id, rcl_interfaces::DescribeParametersResponse{ values })
389 .await
390 .unwrap_or_else(|e| warn!("DescribeParameters response error {e:?}"));
391 }
392 Err(e) => warn!("DescribeParameters request error {e:?}"),
393 }
394 }
395
396 participant_info_update = ros_discovery_stream.select_next_some() => {
397 match participant_info_update {
399 Ok((part_update, _msg_info)) => {
400 let mut info_map = self.external_nodes.lock().unwrap();
402 info_map.insert( part_update.gid, part_update.node_entities_info_seq.clone());
403 self.send_status_event( &NodeEvent::ROS(part_update) );
405 }
406 Err(e) => {
407 warn!("ros_discovery_info error {e:?}");
408 }
409 }
410 }
411
412 dp_status_event = dds_status_stream.select_next_some() => {
413 match dp_status_event {
417 DomainParticipantStatusEvent::RemoteReaderMatched { local_writer, remote_reader } => {
418 self.writers_to_remote_readers.lock().unwrap()
419 .entry(local_writer)
420 .and_modify(|s| {s.insert(remote_reader);} )
421 .or_insert(BTreeSet::from([remote_reader]));
422 }
423 DomainParticipantStatusEvent::RemoteWriterMatched { local_reader, remote_writer } => {
424 self.readers_to_remote_writers.lock().unwrap()
425 .entry(local_reader)
426 .and_modify(|s| {s.insert(remote_writer);} )
427 .or_insert(BTreeSet::from([remote_writer]));
428 }
429 DomainParticipantStatusEvent::ReaderLost {guid, ..} => {
430 for ( _local, readers)
431 in self.writers_to_remote_readers.lock().unwrap().iter_mut() {
432 readers.remove(&guid);
433 }
434 }
435 DomainParticipantStatusEvent::WriterLost {guid, ..} => {
436 for ( _local, writers)
437 in self.readers_to_remote_writers.lock().unwrap().iter_mut() {
438 writers.remove(&guid);
439 }
440 }
441
442 _ => {}
443 }
444
445 self.send_status_event( &NodeEvent::DDS(dp_status_event) );
447 }
448 }
449 }
450 info!("Spinner exiting .spin()");
451 Ok(())
452 } fn send_status_event(&self, event: &NodeEvent) {
456 let mut closed = Vec::new();
457 let mut sender_array = self.status_event_senders.lock().unwrap();
458 for (i, sender) in sender_array.iter().enumerate() {
459 match sender.try_send(event.clone()) {
460 Ok(()) => {
461 }
463 Err(async_channel::TrySendError::Closed(_)) => {
464 closed.push(i) }
467 Err(e) => {
468 debug!("send_status_event: Send error for {i}: {e:?}");
469 }
472 }
473 }
474
475 for c in closed.iter().rev() {
477 sender_array.swap_remove(*c);
478 }
479 }
480
481 fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
483 match name {
484 "use_sim_time" => match value {
486 ParameterValue::Boolean(_) => Ok(()),
487 _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
488 },
489 _ => {
491 match self.parameter_validator {
492 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
495 }
496 }
497 }
498
499 fn execute_parameter_set_actions(
501 &self,
502 name: &str,
503 value: &ParameterValue,
504 ) -> SetParametersResult {
505 match name {
506 "use_sim_time" => match value {
507 ParameterValue::Boolean(s) => {
508 self.use_sim_time.store(*s, Ordering::SeqCst);
509 Ok(())
510 }
511 _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
512 },
513 _ => {
514 match self.parameter_set_action {
515 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
518 }
519 }
520 }
521
522 pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
525 let already_set = self.parameters.lock().unwrap().contains_key(name);
526 if self.allow_undeclared_parameters || already_set {
527 self.validate_parameter_on_set(name, &value)?;
528 self.execute_parameter_set_actions(name, &value)?;
529
530 let p = raw::Parameter {
532 name: name.to_string(),
533 value: value.clone().into(),
534 };
535 let (new_parameters, changed_parameters) = if already_set {
536 (vec![], vec![p])
537 } else {
538 (vec![p], vec![])
539 };
540
541 self
543 .parameters
544 .lock()
545 .unwrap()
546 .insert(name.to_owned(), value);
547 self
549 .parameter_events_writer
550 .publish(raw::ParameterEvent {
551 timestamp: rustdds::Timestamp::now(), node: self.fully_qualified_node_name.clone(),
553 new_parameters,
554 changed_parameters,
555 deleted_parameters: vec![],
556 })
557 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
558 Ok(())
559 } else {
560 Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
561 }
562 }
563} #[derive(Debug)]
570pub enum NodeCreateError {
571 DDS(CreateError),
572 BadParameter(String),
573}
574
575impl From<CreateError> for NodeCreateError {
576 fn from(c: CreateError) -> NodeCreateError {
577 NodeCreateError::DDS(c)
578 }
579}
580
581impl fmt::Display for NodeCreateError {
582 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
583 match self {
584 Self::DDS(create_error) => write!(f, "NodeCreateError::DDS : {create_error}"),
585 Self::BadParameter(s) => write!(f, "NodeCreateError::BadParameter : {s}"),
586 }
587 }
588}
589
590impl Error for NodeCreateError {
591 fn source(&self) -> Option<&(dyn Error + 'static)> {
592 match self {
593 Self::DDS(create_error) => Some(create_error),
594 Self::BadParameter(_) => None,
595 }
596 }
597}
598
599pub enum ParameterError {
601 AlreadyDeclared,
602 InvalidName,
603}
604
605pub struct Node {
619 node_name: NodeName,
620 options: NodeOptions,
621
622 pub(crate) ros_context: Context,
623
624 readers: BTreeSet<Gid>,
627 writers: BTreeSet<Gid>,
628
629 suppress_node_info_updates: Arc<AtomicBool>,
630 readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
637 writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
638
639 external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
641 stop_spin_sender: Option<async_channel::Sender<()>>,
642
643 status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
645
646 rosout_writer: Arc<Option<Publisher<Log>>>,
648 rosout_reader: Option<Subscription<Log>>,
649
650 parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
653
654 parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
656 parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
658 parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
659
660 use_sim_time: Arc<AtomicBool>,
662 sim_time: Arc<Mutex<ROSTime>>,
663}
664
665impl Node {
666 pub(crate) fn new(
667 node_name: NodeName,
668 mut options: NodeOptions,
669 ros_context: Context,
670 ) -> Result<Node, NodeCreateError> {
671 let paramtopic = ros_context.get_parameter_events_topic();
672 let rosout_topic = ros_context.get_rosout_topic();
673
674 let enable_rosout = options.enable_rosout;
675 let rosout_reader = options.enable_rosout_reading;
676
677 let parameter_events_writer = ros_context.create_publisher(¶mtopic, None)?;
678
679 options.declared_parameters.push(Parameter {
682 name: "use_sim_time".to_string(),
683 value: ParameterValue::Boolean(false),
684 });
685 let parameters = options
686 .declared_parameters
687 .iter()
688 .cloned()
689 .map(|Parameter { name, value }| (name, value))
690 .collect::<BTreeMap<String, ParameterValue>>();
691
692 let parameter_validator = options
693 .parameter_validator
694 .take()
695 .map(|b| Arc::new(Mutex::new(b)));
696 let parameter_set_action = options
697 .parameter_set_action
698 .take()
699 .map(|b| Arc::new(Mutex::new(b)));
700
701 let mut node = Node {
702 node_name,
703 options,
704 ros_context,
705 readers: BTreeSet::new(),
706 writers: BTreeSet::new(),
707 readers_to_remote_writers: Arc::new(Mutex::new(BTreeMap::new())),
708 writers_to_remote_readers: Arc::new(Mutex::new(BTreeMap::new())),
709 external_nodes: Arc::new(Mutex::new(BTreeMap::new())),
710 suppress_node_info_updates: Arc::new(AtomicBool::new(false)),
711 stop_spin_sender: None,
712 status_event_senders: Arc::new(Mutex::new(Vec::new())),
713 rosout_writer: Arc::new(None), rosout_reader: None,
715 parameter_events_writer: Arc::new(parameter_events_writer),
716 parameters: Arc::new(Mutex::new(parameters)),
717 parameter_validator,
718 parameter_set_action,
719 use_sim_time: Arc::new(AtomicBool::new(false)),
720 sim_time: Arc::new(Mutex::new(ROSTime::ZERO)),
721 };
722
723 node.suppress_node_info_updates(true);
724
725 node.rosout_writer = if enable_rosout {
726 Arc::new(Some(
727 node.create_publisher(&rosout_topic, None)?,
729 ))
730 } else {
731 Arc::new(None) };
733 node.rosout_reader = if rosout_reader {
734 Some(node.create_subscription(&rosout_topic, None)?)
735 } else {
736 None
737 };
738
739 node
741 .parameters
742 .lock()
743 .unwrap()
744 .iter()
745 .try_for_each(|(name, value)| {
746 node.validate_parameter_on_set(name, value)?;
747 node.execute_parameter_set_actions(name, value)?;
748 Ok(())
749 })
750 .map_err(NodeCreateError::BadParameter)?;
751
752 node.suppress_node_info_updates(false);
753
754 Ok(node)
755 }
756
757 pub fn time_now(&self) -> ROSTime {
761 if self.use_sim_time.load(Ordering::SeqCst) {
762 *self.sim_time.lock().unwrap()
763 } else {
764 ROSTime::now()
765 }
766 }
767
768 pub fn time_now_not_simulated(&self) -> ROSTime {
769 ROSTime::now()
770 }
771
772 pub fn spinner(&mut self) -> CreateResult<Spinner> {
781 if self.stop_spin_sender.is_some() {
782 panic!("Attempted to crate a second spinner.");
783 }
784 let (stop_spin_sender, stop_spin_receiver) = async_channel::bounded(1);
785 self.stop_spin_sender = Some(stop_spin_sender);
786
787 let service_qos = QosPolicyBuilder::new()
789 .reliability(policy::Reliability::Reliable {
790 max_blocking_time: Duration::from_millis(100),
791 })
792 .history(policy::History::KeepLast { depth: 1 })
793 .build();
794
795 let node_name = self.node_name.fully_qualified_name();
796
797 self.suppress_node_info_updates(true);
798
799 let parameter_servers = if self.options.start_parameter_services {
800 let service_mapping = ServiceMapping::Enhanced; let get_parameters_server = self.create_server(
802 service_mapping,
803 &Name::new(&node_name, "get_parameters").unwrap(),
804 &ServiceTypeName::new("rcl_interfaces", "GetParameters"),
805 service_qos.clone(),
806 service_qos.clone(),
807 )?;
808 let get_parameter_types_server = self.create_server(
809 service_mapping,
810 &Name::new(&node_name, "get_parameter_types").unwrap(),
811 &ServiceTypeName::new("rcl_interfaces", "GetParameterTypes"),
812 service_qos.clone(),
813 service_qos.clone(),
814 )?;
815 let set_parameters_server = self.create_server(
816 service_mapping,
817 &Name::new(&node_name, "set_parameters").unwrap(),
818 &ServiceTypeName::new("rcl_interfaces", "SetParameters"),
819 service_qos.clone(),
820 service_qos.clone(),
821 )?;
822 let set_parameters_atomically_server = self.create_server(
823 service_mapping,
824 &Name::new(&node_name, "set_parameters_atomically").unwrap(),
825 &ServiceTypeName::new("rcl_interfaces", "SetParametersAtomically"),
826 service_qos.clone(),
827 service_qos.clone(),
828 )?;
829 let list_parameters_server = self.create_server(
830 service_mapping,
831 &Name::new(&node_name, "list_parameters").unwrap(),
832 &ServiceTypeName::new("rcl_interfaces", "ListParameters"),
833 service_qos.clone(),
834 service_qos.clone(),
835 )?;
836 let describe_parameters_server = self.create_server(
837 service_mapping,
838 &Name::new(&node_name, "describe_parameters").unwrap(),
839 &ServiceTypeName::new("rcl_interfaces", "DescribeParameters"),
840 service_qos.clone(),
841 service_qos.clone(),
842 )?;
843
844 Some(ParameterServers {
845 get_parameters_server,
846 get_parameter_types_server,
847 list_parameters_server,
848 set_parameters_server,
849 set_parameters_atomically_server,
850 describe_parameters_server,
851 })
852 } else {
853 None };
855
856 let clock_topic = self.create_topic(
857 &Name::new("/", "clock").unwrap(),
858 MessageTypeName::new("builtin_interfaces", "Time"),
859 &DEFAULT_SUBSCRIPTION_QOS,
860 )?;
861
862 self.suppress_node_info_updates(false);
863
864 Ok(Spinner {
865 ros_context: self.ros_context.clone(),
866 stop_spin_receiver,
867 readers_to_remote_writers: Arc::clone(&self.readers_to_remote_writers),
868 writers_to_remote_readers: Arc::clone(&self.writers_to_remote_readers),
869 external_nodes: Arc::clone(&self.external_nodes),
870 status_event_senders: Arc::clone(&self.status_event_senders),
871 use_sim_time: Arc::clone(&self.use_sim_time),
872 sim_time: Arc::clone(&self.sim_time),
873 clock_topic,
874 parameter_servers,
875 parameter_events_writer: Arc::clone(&self.parameter_events_writer),
876 parameters: Arc::clone(&self.parameters),
877 allow_undeclared_parameters: self.options.allow_undeclared_parameters,
878 parameter_validator: self.parameter_validator.as_ref().map(Arc::clone),
879 parameter_set_action: self.parameter_set_action.as_ref().map(Arc::clone),
880 fully_qualified_node_name: self.fully_qualified_name(),
881 })
882 }
883
884 pub fn have_spinner(&self) -> bool {
889 self.stop_spin_sender.is_some()
890 }
891
892 fn generate_node_info(&self) -> NodeEntitiesInfo {
894 let mut node_info = NodeEntitiesInfo::new(self.node_name.clone());
895
896 node_info.add_writer(Gid::from(self.parameter_events_writer.guid()));
897 if let Some(ref row) = *self.rosout_writer {
898 node_info.add_writer(Gid::from(row.guid()));
899 }
900
901 for reader in &self.readers {
902 node_info.add_reader(*reader);
903 }
904
905 for writer in &self.writers {
906 node_info.add_writer(*writer);
907 }
908
909 node_info
910 }
911
912 fn suppress_node_info_updates(&mut self, suppress: bool) {
913 self
914 .suppress_node_info_updates
915 .store(suppress, Ordering::SeqCst);
916
917 if !suppress {
919 self.ros_context.update_node(self.generate_node_info());
920 }
921 }
922
923 fn add_reader(&mut self, reader: Gid) {
924 self.readers.insert(reader);
925 if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
926 self.ros_context.update_node(self.generate_node_info());
927 }
928 }
929
930 fn add_writer(&mut self, writer: Gid) {
931 self.writers.insert(writer);
932 if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
933 self.ros_context.update_node(self.generate_node_info());
934 }
935 }
936
937 pub fn namespace(&self) -> &str {
938 self.node_name.namespace()
939 }
940
941 pub fn fully_qualified_name(&self) -> String {
942 self.node_name.fully_qualified_name()
943 }
944
945 pub fn options(&self) -> &NodeOptions {
946 &self.options
947 }
948
949 pub fn domain_id(&self) -> u16 {
950 self.ros_context.domain_id()
951 }
952
953 pub fn undeclare_parameter(&self, name: &str) {
957 let prev_value = self.parameters.lock().unwrap().remove(name);
958
959 if let Some(deleted_param) = prev_value {
960 self
962 .parameter_events_writer
963 .publish(raw::ParameterEvent {
964 timestamp: self.time_now().into(),
965 node: self.fully_qualified_name(),
966 new_parameters: vec![],
967 changed_parameters: vec![],
968 deleted_parameters: vec![raw::Parameter {
969 name: name.to_string(),
970 value: deleted_param.into(),
971 }],
972 })
973 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
974 }
975 }
976
977 pub fn has_parameter(&self, name: &str) -> bool {
979 self.parameters.lock().unwrap().contains_key(name)
980 }
981
982 pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
993 let already_set = self.parameters.lock().unwrap().contains_key(name);
994 if self.options.allow_undeclared_parameters || already_set {
995 self.validate_parameter_on_set(name, &value)?;
996 self.execute_parameter_set_actions(name, &value)?;
997
998 let p = raw::Parameter {
1000 name: name.to_string(),
1001 value: value.clone().into(),
1002 };
1003 let (new_parameters, changed_parameters) = if already_set {
1004 (vec![], vec![p])
1005 } else {
1006 (vec![p], vec![])
1007 };
1008
1009 self
1011 .parameters
1012 .lock()
1013 .unwrap()
1014 .insert(name.to_owned(), value);
1015 self
1017 .parameter_events_writer
1018 .publish(raw::ParameterEvent {
1019 timestamp: self.time_now().into(),
1020 node: self.fully_qualified_name(),
1021 new_parameters,
1022 changed_parameters,
1023 deleted_parameters: vec![],
1024 })
1025 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
1026 Ok(())
1027 } else {
1028 Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
1029 }
1030 }
1031
1032 pub fn allow_undeclared_parameters(&self) -> bool {
1033 self.options.allow_undeclared_parameters
1034 }
1035
1036 pub fn get_parameter(&self, name: &str) -> Option<ParameterValue> {
1038 self
1039 .parameters
1040 .lock()
1041 .unwrap()
1042 .get(name)
1043 .map(|p| p.to_owned())
1044 }
1045
1046 pub fn list_parameters(&self) -> Vec<String> {
1047 self
1048 .parameters
1049 .lock()
1050 .unwrap()
1051 .keys()
1052 .map(move |k| k.to_owned())
1053 .collect::<Vec<_>>()
1054 }
1055
1056 fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
1063 match name {
1064 "use_sim_time" => match value {
1066 ParameterValue::Boolean(_) => Ok(()),
1067 _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
1068 },
1069 _ => {
1071 match self.parameter_validator {
1072 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
1075 }
1076 }
1077 }
1078
1079 fn execute_parameter_set_actions(
1081 &self,
1082 name: &str,
1083 value: &ParameterValue,
1084 ) -> SetParametersResult {
1085 match name {
1086 "use_sim_time" => match value {
1087 ParameterValue::Boolean(s) => {
1088 self.use_sim_time.store(*s, Ordering::SeqCst);
1089 Ok(())
1090 }
1091 _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
1092 },
1093 _ => {
1094 match self.parameter_set_action {
1095 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
1098 }
1099 }
1100 }
1101
1102 pub fn status_receiver(&self) -> Receiver<NodeEvent> {
1109 if self.have_spinner() {
1110 let (status_event_sender, status_event_receiver) = async_channel::bounded(8);
1111 self
1112 .status_event_senders
1113 .lock()
1114 .unwrap()
1115 .push(status_event_sender);
1116 status_event_receiver
1117 } else {
1118 panic!("status_receiver() cannot set up a receiver, because no Spinner is running.")
1119 }
1120 }
1121
1122 pub(crate) fn wait_for_writer(&self, reader: GUID) -> impl Future<Output = ()> {
1124 let status_receiver = self.status_receiver();
1126
1127 let already_present = self
1128 .readers_to_remote_writers
1129 .lock()
1130 .unwrap()
1131 .get(&reader)
1132 .map(|writers| !writers.is_empty()) .unwrap_or(false); if already_present {
1136 WriterWait::Ready
1137 } else {
1138 WriterWait::Wait {
1139 this_reader: reader,
1140 status_event_stream: Box::pin(status_receiver),
1141 }
1142 }
1143 }
1144
1145 pub(crate) fn wait_for_reader(&self, writer: GUID) -> impl Future<Output = ()> {
1146 let status_receiver = self.status_receiver();
1148
1149 let already_present = self
1150 .writers_to_remote_readers
1151 .lock()
1152 .unwrap()
1153 .get(&writer)
1154 .map(|readers| !readers.is_empty()) .unwrap_or(false); if already_present {
1161 info!("wait_for_reader: Already have matched a reader.");
1162 ReaderWait::Ready
1163 } else {
1164 ReaderWait::Wait {
1165 this_writer: writer,
1166 status_event_stream: Box::pin(status_receiver),
1167 }
1168 }
1169 }
1170
1171 pub(crate) fn get_publisher_count(&self, subscription_guid: GUID) -> usize {
1172 self
1173 .readers_to_remote_writers
1174 .lock()
1175 .unwrap()
1176 .get(&subscription_guid)
1177 .map(BTreeSet::len)
1178 .unwrap_or_else(|| {
1179 error!("get_publisher_count: Subscriber {subscription_guid:?} not known to node.");
1180 0
1181 })
1182 }
1183
1184 pub(crate) fn get_subscription_count(&self, publisher_guid: GUID) -> usize {
1185 self
1186 .writers_to_remote_readers
1187 .lock()
1188 .unwrap()
1189 .get(&publisher_guid)
1190 .map(BTreeSet::len)
1191 .unwrap_or_else(|| {
1192 error!("get_subscription_count: Publisher {publisher_guid:?} not known to node.");
1193 0
1194 })
1195 }
1196
1197 pub fn rosout_subscription(&self) -> Option<&Subscription<Log>> {
1201 self.rosout_reader.as_ref()
1202 }
1203
1204 pub fn create_topic(
1233 &self,
1234 topic_name: &Name,
1235 type_name: MessageTypeName,
1236 qos: &QosPolicies,
1237 ) -> CreateResult<Topic> {
1238 let dds_name = topic_name.to_dds_name("rt", &self.node_name, "");
1239 self.ros_context.create_topic(dds_name, type_name, qos)
1240 }
1241
1242 pub fn create_subscription<D: 'static>(
1250 &mut self,
1251 topic: &Topic,
1252 qos: Option<QosPolicies>,
1253 ) -> CreateResult<Subscription<D>> {
1254 let sub = self.ros_context.create_subscription(topic, qos)?;
1255 self.add_reader(sub.guid().into());
1256 Ok(sub)
1257 }
1258
1259 pub fn create_publisher<D: Serialize>(
1268 &mut self,
1269 topic: &Topic,
1270 qos: Option<QosPolicies>,
1271 ) -> CreateResult<Publisher<D>> {
1272 let p = self.ros_context.create_publisher(topic, qos)?;
1273 self.add_writer(p.guid().into());
1274 Ok(p)
1275 }
1276
1277 pub(crate) fn create_simpledatareader<D, DA>(
1278 &mut self,
1279 topic: &Topic,
1280 qos: Option<QosPolicies>,
1281 ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
1282 where
1283 D: 'static,
1284 DA: rustdds::no_key::DeserializerAdapter<D> + 'static,
1285 {
1286 let r = self.ros_context.create_simpledatareader(topic, qos)?;
1287 self.add_reader(r.guid().into());
1288 Ok(r)
1289 }
1290
1291 pub(crate) fn create_datawriter<D, SA>(
1292 &mut self,
1293 topic: &Topic,
1294 qos: Option<QosPolicies>,
1295 ) -> CreateResult<no_key::DataWriter<D, SA>>
1296 where
1297 SA: rustdds::no_key::SerializerAdapter<D>,
1298 {
1299 let w = self.ros_context.create_datawriter(topic, qos)?;
1300 self.add_writer(w.guid().into());
1301 Ok(w)
1302 }
1303
1304 pub fn create_client<S>(
1312 &mut self,
1313 service_mapping: ServiceMapping,
1314 service_name: &Name,
1315 service_type_name: &ServiceTypeName,
1316 request_qos: QosPolicies,
1317 response_qos: QosPolicies,
1318 ) -> CreateResult<Client<S>>
1319 where
1320 S: Service + 'static,
1321 S::Request: Clone,
1322 {
1323 let rq_topic = self.ros_context.domain_participant().create_topic(
1329 service_name.to_dds_name("rq", &self.node_name, "Request"),
1330 service_type_name.dds_request_type(),
1332 &request_qos,
1333 TopicKind::NoKey,
1334 )?;
1335 let rs_topic = self.ros_context.domain_participant().create_topic(
1336 service_name.to_dds_name("rr", &self.node_name, "Reply"),
1337 service_type_name.dds_response_type(),
1339 &response_qos,
1340 TopicKind::NoKey,
1341 )?;
1342
1343 let c = Client::<S>::new(
1344 service_mapping,
1345 self,
1346 &rq_topic,
1347 &rs_topic,
1348 Some(request_qos),
1349 Some(response_qos),
1350 )?;
1351
1352 Ok(c)
1353 }
1354
1355 pub fn create_server<S>(
1364 &mut self,
1365 service_mapping: ServiceMapping,
1366 service_name: &Name,
1367 service_type_name: &ServiceTypeName,
1368 request_qos: QosPolicies,
1369 response_qos: QosPolicies,
1370 ) -> CreateResult<Server<S>>
1371 where
1372 S: Service + 'static,
1373 S::Request: Clone,
1374 {
1375 let rq_topic = self.ros_context.domain_participant().create_topic(
1381 service_name.to_dds_name("rq", &self.node_name, "Request"),
1383 service_type_name.dds_request_type(),
1384 &request_qos,
1385 TopicKind::NoKey,
1386 )?;
1387 let rs_topic = self.ros_context.domain_participant().create_topic(
1388 service_name.to_dds_name("rr", &self.node_name, "Reply"),
1389 service_type_name.dds_response_type(),
1390 &response_qos,
1391 TopicKind::NoKey,
1392 )?;
1393
1394 let s = Server::<S>::new(
1395 service_mapping,
1396 self,
1397 &rq_topic,
1398 &rs_topic,
1399 Some(request_qos),
1400 Some(response_qos),
1401 )?;
1402
1403 Ok(s)
1404 }
1405
1406 pub fn create_action_client<A>(
1407 &mut self,
1408 service_mapping: ServiceMapping,
1409 action_name: &Name,
1410 action_type_name: &ActionTypeName,
1411 action_qos: ActionClientQosPolicies,
1412 ) -> CreateResult<ActionClient<A>>
1413 where
1414 A: ActionTypes + 'static,
1415 {
1416 let services_base_name = action_name.push("_action");
1419
1420 let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1422 let my_goal_client = self.create_client(
1423 service_mapping,
1424 &services_base_name.push("send_goal"),
1426 &goal_service_type,
1427 action_qos.goal_service.clone(),
1428 action_qos.goal_service,
1429 )?;
1430
1431 let cancel_goal_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1433 let my_cancel_client = self.create_client(
1434 service_mapping,
1435 &services_base_name.push("cancel_goal"),
1437 &cancel_goal_type,
1438 action_qos.cancel_service.clone(),
1439 action_qos.cancel_service,
1440 )?;
1441
1442 let result_service_type = action_type_name.dds_action_service("_GetResult");
1444 let my_result_client = self.create_client(
1445 service_mapping,
1446 &services_base_name.push("get_result"),
1448 &result_service_type,
1449 action_qos.result_service.clone(),
1450 action_qos.result_service,
1451 )?;
1452
1453 let action_topic_namespace = action_name.push("_action");
1454
1455 let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1456 let feedback_topic = self.create_topic(
1457 &action_topic_namespace.push("feedback"),
1458 feedback_topic_type,
1459 &action_qos.feedback_subscription,
1460 )?;
1461 let my_feedback_subscription =
1462 self.create_subscription(&feedback_topic, Some(action_qos.feedback_subscription))?;
1463
1464 let status_topic = self.create_topic(
1466 &action_topic_namespace.push("status"),
1467 MessageTypeName::new("action_msgs", "GoalStatusArray"),
1468 &action_qos.status_subscription,
1469 )?;
1470 let my_status_subscription =
1471 self.create_subscription(&status_topic, Some(action_qos.status_subscription))?;
1472
1473 Ok(ActionClient {
1474 my_goal_client,
1475 my_cancel_client,
1476 my_result_client,
1477 my_feedback_subscription,
1478 my_status_subscription,
1479 my_action_name: action_name.clone(),
1480 })
1481 }
1482
1483 pub fn create_action_server<A>(
1484 &mut self,
1485 service_mapping: ServiceMapping,
1486 action_name: &Name,
1487 action_type_name: &ActionTypeName,
1488 action_qos: ActionServerQosPolicies,
1489 ) -> CreateResult<ActionServer<A>>
1490 where
1491 A: ActionTypes + 'static,
1492 {
1493 let services_base_name = action_name.push("_action");
1494
1495 let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1497 let my_goal_server = self.create_server(
1498 service_mapping,
1499 &services_base_name.push("send_goal"),
1501 &goal_service_type,
1502 action_qos.goal_service.clone(),
1503 action_qos.goal_service,
1504 )?;
1505
1506 let cancel_service_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1508 let my_cancel_server = self.create_server(
1509 service_mapping,
1510 &services_base_name.push("cancel_goal"),
1512 &cancel_service_type,
1513 action_qos.cancel_service.clone(),
1514 action_qos.cancel_service,
1515 )?;
1516
1517 let result_service_type = action_type_name.dds_action_service("_GetResult");
1519 let my_result_server = self.create_server(
1520 service_mapping,
1521 &services_base_name.push("get_result"),
1523 &result_service_type,
1524 action_qos.result_service.clone(),
1525 action_qos.result_service,
1526 )?;
1527
1528 let action_topic_namespace = action_name.push("_action");
1529
1530 let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1531 let feedback_topic = self.create_topic(
1532 &action_topic_namespace.push("feedback"),
1533 feedback_topic_type,
1534 &action_qos.feedback_publisher,
1535 )?;
1536 let my_feedback_publisher =
1537 self.create_publisher(&feedback_topic, Some(action_qos.feedback_publisher))?;
1538
1539 let status_topic_type = MessageTypeName::new("action_msgs", "GoalStatusArray");
1540 let status_topic = self.create_topic(
1541 &action_topic_namespace.push("status"),
1542 status_topic_type,
1543 &action_qos.status_publisher,
1544 )?;
1545 let my_status_publisher =
1546 self.create_publisher(&status_topic, Some(action_qos.status_publisher))?;
1547
1548 Ok(ActionServer {
1549 my_goal_server,
1550 my_cancel_server,
1551 my_result_server,
1552 my_feedback_publisher,
1553 my_status_publisher,
1554 my_action_name: action_name.clone(),
1555 })
1556 }
1557
1558 pub fn logging_handle(&self) -> NodeLoggingHandle {
1562 NodeLoggingHandle {
1563 rosout_writer: Arc::clone(&self.rosout_writer),
1564 base_name: self.node_name.base_name().to_string(),
1565 }
1566 }
1567} impl Drop for Node {
1570 fn drop(&mut self) {
1571 if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1572 stop_spin_sender
1573 .try_send(())
1574 .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1575 }
1576
1577 self
1578 .ros_context
1579 .remove_node(self.fully_qualified_name().as_str());
1580 }
1581}
1582
1583impl RosoutRaw for Node {
1584 fn rosout_writer(&self) -> Arc<Option<Publisher<Log>>> {
1585 Arc::clone(&self.rosout_writer)
1586 }
1587
1588 fn base_name(&self) -> &str {
1589 self.node_name.base_name()
1590 }
1591}
1592
1593#[macro_export]
1612macro_rules! rosout {
1613 ($node:expr, $lvl:expr, $($arg:tt)+) => (
1614 $crate::rosout::RosoutRaw::rosout_raw(
1615 &$node,
1616 $crate::ros2::Timestamp::now(),
1617 $lvl,
1618 $crate::rosout::RosoutRaw::base_name(&$node),
1619 &std::format!($($arg)+), std::file!(),
1621 "<unknown_func>", std::line!(),
1623 );
1624 );
1625}
1626
1627#[must_use = "futures do nothing unless you `.await` or poll them"]
1635pub enum ReaderWait<'a> {
1636 Wait {
1638 this_writer: GUID, status_event_stream: stream::BoxStream<'a, NodeEvent>,
1640 },
1641 Ready,
1643}
1644
1645impl Future for ReaderWait<'_> {
1646 type Output = ();
1647
1648 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1649 match *self {
1650 ReaderWait::Ready => Poll::Ready(()),
1651
1652 ReaderWait::Wait {
1653 this_writer,
1654 ref mut status_event_stream,
1655 } => {
1656 debug!("wait_for_reader: Waiting for a reader.");
1657 loop {
1658 match status_event_stream.poll_next_unpin(cx) {
1659 Poll::Ready(Some(NodeEvent::DDS(
1661 DomainParticipantStatusEvent::RemoteReaderMatched {
1662 local_writer,
1663 remote_reader,
1664 },
1665 )))
1666 if local_writer == this_writer =>
1667 {
1668 debug!("wait_for_reader: Matched remote reader {remote_reader:?}.");
1669 return Poll::Ready(());
1670 }
1671
1672 Poll::Ready(_) => {
1673 debug!("wait_for_reader: other event. Continue polling.");
1675 }
1677
1678 Poll::Pending => return Poll::Pending,
1679 }
1680 }
1681 }
1682 }
1683 }
1684}
1685
1686#[must_use = "futures do nothing unless you `.await` or poll them"]
1693pub enum WriterWait<'a> {
1694 Wait {
1696 this_reader: GUID,
1697 status_event_stream: stream::BoxStream<'a, NodeEvent>,
1698 },
1699 Ready,
1701}
1702
1703impl Future for WriterWait<'_> {
1704 type Output = ();
1705
1706 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1707 match *self {
1708 WriterWait::Ready => Poll::Ready(()),
1709
1710 WriterWait::Wait {
1711 this_reader,
1712 ref mut status_event_stream,
1713 } => {
1714 debug!("wait_for_writer: Waiting for a writer.");
1715 loop {
1716 match status_event_stream.poll_next_unpin(cx) {
1720 Poll::Ready(Some(NodeEvent::DDS(
1722 DomainParticipantStatusEvent::RemoteWriterMatched {
1723 local_reader,
1724 remote_writer,
1725 },
1726 )))
1727 if local_reader == this_reader =>
1728 {
1729 debug!("wait_for_writer: Matched remote writer {remote_writer:?}.");
1730 return Poll::Ready(());
1731 }
1732
1733 Poll::Ready(_) => {
1734 trace!("=== other writer. Continue polling.");
1736 }
1738
1739 Poll::Pending => return Poll::Pending,
1740 }
1741 }
1742 }
1743 }
1744 }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749 use crate::Context;
1750 use super::{Node, NodeName, NodeOptions};
1751
1752 #[test]
1753 fn node_is_sync() {
1754 let node = Node::new(
1755 NodeName::new("/", "base_name").unwrap(),
1756 NodeOptions::new(),
1757 Context::new().unwrap(),
1758 )
1759 .unwrap();
1760
1761 fn requires_send_sync<T: Send + Sync>(_t: T) {}
1762 requires_send_sync(node);
1763 }
1764}