1use std::{
2 collections::{BTreeMap, BTreeSet},
3 error::Error,
4 fmt,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc, Mutex,
9 },
10};
11
12use futures::{
13 pin_mut, stream, stream::FusedStream, task, task::Poll, Future, FutureExt, Stream, StreamExt,
14};
15use async_channel::Receiver;
16#[allow(unused_imports)]
17use log::{debug, error, info, trace, warn};
18use serde::Serialize;
19use rustdds::{
20 dds::{CreateError, CreateResult},
21 *,
22};
23
24use crate::{
25 action::*,
26 builtin_interfaces,
27 context::{Context, DEFAULT_SUBSCRIPTION_QOS},
28 entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
29 gid::Gid,
30 log::Log,
31 names::*,
32 parameters::*,
33 pubsub::{Publisher, Subscription},
34 rcl_interfaces,
35 ros_time::ROSTime,
36 rosout::{NodeLoggingHandle, RosoutRaw},
37 service::{Client, Server, Service, ServiceMapping},
38};
39
40type ParameterFunc = dyn Fn(&str, &ParameterValue) -> SetParametersResult + Send + Sync;
41#[must_use]
48pub struct NodeOptions {
49 #[allow(dead_code)]
50 cli_args: Vec<String>,
51 #[allow(dead_code)]
52 use_global_arguments: bool, enable_rosout: bool, enable_rosout_reading: bool,
55 start_parameter_services: bool,
56 declared_parameters: Vec<Parameter>,
57 allow_undeclared_parameters: bool,
58 parameter_validator: Option<Box<ParameterFunc>>,
59 parameter_set_action: Option<Box<ParameterFunc>>,
60}
61
62impl NodeOptions {
63 pub fn new() -> NodeOptions {
65 NodeOptions {
68 cli_args: Vec::new(),
69 use_global_arguments: true,
70 enable_rosout: true,
71 enable_rosout_reading: false,
72 start_parameter_services: true,
73 declared_parameters: Vec::new(),
74 allow_undeclared_parameters: false,
75 parameter_validator: None,
76 parameter_set_action: None,
77 }
78 }
79 pub fn enable_rosout(self, enable_rosout: bool) -> NodeOptions {
80 NodeOptions {
81 enable_rosout,
82 ..self
83 }
84 }
85
86 pub fn read_rosout(self, enable_rosout_reading: bool) -> NodeOptions {
87 NodeOptions {
88 enable_rosout_reading,
89 ..self
90 }
91 }
92
93 pub fn declare_parameter(mut self, name: &str, value: ParameterValue) -> NodeOptions {
94 self.declared_parameters.push(Parameter {
95 name: name.to_owned(),
96 value,
97 });
98 self
100 }
101
102 pub fn parameter_validator(mut self, validator: Box<ParameterFunc>) -> NodeOptions {
103 self.parameter_validator = Some(validator);
104 self
105 }
106
107 pub fn parameter_set_action(mut self, action: Box<ParameterFunc>) -> NodeOptions {
108 self.parameter_set_action = Some(action);
109 self
110 }
111}
112
113impl Default for NodeOptions {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118#[derive(Clone, Debug)]
123pub enum NodeEvent {
124 DDS(DomainParticipantStatusEvent),
125 ROS(ParticipantEntitiesInfo),
126}
127
128struct ParameterServers {
129 get_parameters_server: Server<rcl_interfaces::GetParametersService>,
130 get_parameter_types_server: Server<rcl_interfaces::GetParameterTypesService>,
131 list_parameters_server: Server<rcl_interfaces::ListParametersService>,
132 set_parameters_server: Server<rcl_interfaces::SetParametersService>,
133 set_parameters_atomically_server: Server<rcl_interfaces::SetParametersAtomicallyService>,
134 describe_parameters_server: Server<rcl_interfaces::DescribeParametersService>,
135}
136
137pub struct Spinner {
145 ros_context: Context,
146 stop_spin_receiver: async_channel::Receiver<()>,
147
148 readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
149 writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
150 external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
152 status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
154
155 use_sim_time: Arc<AtomicBool>,
156 sim_time: Arc<Mutex<ROSTime>>,
157 clock_topic: Topic,
158 allow_undeclared_parameters: bool,
159
160 parameter_servers: Option<ParameterServers>,
161 parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
162 parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
163 parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
164 parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
165 fully_qualified_node_name: String,
166}
167
168async fn next_if_some<S>(s: &mut Option<S>) -> S::Item
169where
170 S: Stream + Unpin + FusedStream,
171{
172 match s.as_mut() {
173 Some(stream) => stream.select_next_some().await,
174 None => std::future::pending().await,
175 }
176}
177
178impl Spinner {
179 pub async fn spin(self) -> CreateResult<()> {
180 info!("Starting Spinner for {}", &self.fully_qualified_node_name);
181 let dds_status_listener = self.ros_context.domain_participant().status_listener();
182 let dds_status_stream = dds_status_listener.as_async_status_stream();
183 pin_mut!(dds_status_stream);
184
185 let ros_discovery_topic = self.ros_context.ros_discovery_topic();
186 let ros_discovery_reader = self
187 .ros_context
188 .create_subscription::<ParticipantEntitiesInfo>(&ros_discovery_topic, None)?;
189 let ros_discovery_stream = ros_discovery_reader.async_stream();
190 pin_mut!(ros_discovery_stream);
191
192 let ros_clock_reader = self
193 .ros_context
194 .create_subscription::<builtin_interfaces::Time>(&self.clock_topic, None)?;
195 let ros_clock_stream = ros_clock_reader.async_stream();
196 pin_mut!(ros_clock_stream);
197
198 let mut get_parameters_stream_opt = self
200 .parameter_servers
201 .as_ref()
202 .map(|s| s.get_parameters_server.receive_request_stream());
203 let mut get_parameter_types_stream_opt = self
204 .parameter_servers
205 .as_ref()
206 .map(|s| s.get_parameter_types_server.receive_request_stream());
207 let mut set_parameters_stream_opt = self
208 .parameter_servers
209 .as_ref()
210 .map(|s| s.set_parameters_server.receive_request_stream());
211 let mut set_parameters_atomically_stream_opt = self
212 .parameter_servers
213 .as_ref()
214 .map(|s| s.set_parameters_atomically_server.receive_request_stream());
215 let mut list_parameter_stream_opt = self
216 .parameter_servers
217 .as_ref()
218 .map(|s| s.list_parameters_server.receive_request_stream());
219 let mut describe_parameters_stream_opt = self
220 .parameter_servers
221 .as_ref()
222 .map(|s| s.describe_parameters_server.receive_request_stream());
223
224 info!("Spinner {} initialized", &self.fully_qualified_node_name);
225
226 loop {
227 futures::select! {
228 _ = self.stop_spin_receiver.recv().fuse() => {
229 break;
230 }
231
232 clock_msg = ros_clock_stream.select_next_some() => {
233 match clock_msg {
234 Ok((time,_msg_info)) => {
235 *self.sim_time.lock().unwrap() = time.into();
238 }
239 Err(e) => warn!("Simulated clock receive error {e:?}")
240 }
241 }
242
243
244 get_parameters_request = next_if_some(&mut get_parameters_stream_opt).fuse() => {
245 match get_parameters_request {
246 Ok( (req_id, req) ) => {
247 info!("Get parameter request {req:?}");
248 let values = {
249 let param_db = self.parameters.lock().unwrap();
250 req.names.iter()
251 .map(|name| param_db.get(name.as_str())
252 .unwrap_or(&ParameterValue::NotSet))
253 .cloned()
254 .map( raw::ParameterValue::from)
255 .collect()
256 };
257 info!("Get parameters response: {values:?}");
258
259 self.parameter_servers.as_ref().unwrap().get_parameters_server
261 .async_send_response(req_id, rcl_interfaces::GetParametersResponse{ values })
262 .await
263 .unwrap_or_else(|e| warn!("GetParameter response error {e:?}"));
264 }
265 Err(e) => warn!("GetParameter request error {e:?}"),
266 }
267 }
268
269 get_parameter_types_request = next_if_some(&mut get_parameter_types_stream_opt).fuse() => {
270 match get_parameter_types_request {
271 Ok( (req_id, req) ) => {
272 warn!("Get parameter types request");
273 let values = {
274 let param_db = self.parameters.lock().unwrap();
275 req.names.iter()
276 .map(|name| param_db.get(name.as_str())
277 .unwrap_or(&ParameterValue::NotSet))
278 .map(ParameterValue::to_parameter_type_raw)
279 .collect()
280 };
281 info!("Get parameter types response: {values:?}");
282 self.parameter_servers.as_ref().unwrap().get_parameter_types_server
284 .async_send_response(req_id, rcl_interfaces::GetParameterTypesResponse{ values })
285 .await
286 .unwrap_or_else(|e| warn!("GetParameterTypes response error {e:?}"));
287 }
288 Err(e) => warn!("GetParameterTypes request error {e:?}"),
289 }
290 }
291
292 set_parameters_request = next_if_some(&mut set_parameters_stream_opt).fuse() => {
293 match set_parameters_request {
294 Ok( (req_id, req) ) => {
295 info!("Set parameter request {req:?}");
296 let results =
297 req.parameter.iter()
298 .cloned()
299 .map( Parameter::from ) .map( |Parameter{name, value}| self.set_parameter(&name,value))
301 .map(|r| r.into()) .collect();
303 info!("Set parameters response: {results:?}");
304 self.parameter_servers.as_ref().unwrap().set_parameters_server
306 .async_send_response(req_id, rcl_interfaces::SetParametersResponse{ results })
307 .await
308 .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
309 }
310 Err(e) => warn!("SetParameters request error {e:?}"),
311 }
312 }
313
314 set_parameters_atomically_request = next_if_some(&mut set_parameters_atomically_stream_opt).fuse() => {
315 match set_parameters_atomically_request {
316 Ok( (req_id, req) ) => {
317 warn!("Set parameters atomically request {req:?}");
318 let results =
319 req.parameter.iter()
320 .cloned()
321 .map( Parameter::from ) .map( |Parameter{ .. } |
323 Err("Setting parameters atomically is not implemented.".to_owned())
325 )
326 .map(|r| r.into()) .collect();
328 warn!("Set parameters atomically response: {results:?}");
329 self.parameter_servers.as_ref().unwrap().set_parameters_atomically_server
331 .async_send_response(req_id, rcl_interfaces::SetParametersAtomicallyResponse{ results })
332 .await
333 .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
334 }
335 Err(e) => warn!("SetParametersAtomically request error {e:?}"),
336 }
337 }
338
339 list_parameter_request = next_if_some(&mut list_parameter_stream_opt).fuse() => {
340 match list_parameter_request {
341 Ok( (req_id, req) ) => {
342 info!("List parameters request");
343 let prefixes = req.prefixes;
344 let names = {
347 let param_db = self.parameters.lock().unwrap();
348 param_db.keys()
349 .filter_map(|name|
350 if prefixes.is_empty() ||
351 prefixes.iter().any(|prefix| name.starts_with(prefix))
352 {
353 Some(name.clone())
354 } else { None }
355 )
356 .collect()
357 };
358 let result = rcl_interfaces::ListParametersResult{ names, prefixes: vec![] };
359 info!("List parameters response: {result:?}");
361 self.parameter_servers.as_ref().unwrap().list_parameters_server
362 .async_send_response(req_id, rcl_interfaces::ListParametersResponse{ result })
363 .await
364 .unwrap_or_else(|e| warn!("ListParameter response error {e:?}"));
365 }
366 Err(e) => warn!("ListParameter request error {e:?}"),
367 }
368 }
369
370 describe_parameters_request = next_if_some(&mut describe_parameters_stream_opt).fuse() => {
371 match describe_parameters_request {
372 Ok( (req_id, req) ) => {
373 info!("Describe parameters request {req:?}");
374 let values = {
375 let parameters = self.parameters.lock().unwrap();
376 req.names.iter()
377 .map( |name|
378 {
379 if let Some(value) = parameters.get(name) {
380 ParameterDescriptor::from_value(name, value)
381 } else {
382 ParameterDescriptor::unknown(name)
383 }
384 })
385 .map(|r| r.into()) .collect()
387 };
388 info!("Describe parameters response: {values:?}");
389 self.parameter_servers.as_ref().unwrap().describe_parameters_server
391 .async_send_response(req_id, rcl_interfaces::DescribeParametersResponse{ values })
392 .await
393 .unwrap_or_else(|e| warn!("DescribeParameters response error {e:?}"));
394 }
395 Err(e) => warn!("DescribeParameters request error {e:?}"),
396 }
397 }
398
399 participant_info_update = ros_discovery_stream.select_next_some() => {
400 match participant_info_update {
402 Ok((part_update, _msg_info)) => {
403 let mut info_map = self.external_nodes.lock().unwrap();
405 info_map.insert( part_update.gid, part_update.node_entities_info_seq.clone());
406 self.send_status_event( &NodeEvent::ROS(part_update) );
408 }
409 Err(e) => {
410 warn!("ros_discovery_info error {e:?}");
411 }
412 }
413 }
414
415 dp_status_event = dds_status_stream.select_next_some() => {
416 match dp_status_event {
420 DomainParticipantStatusEvent::RemoteReaderMatched { local_writer, remote_reader } => {
421 self.writers_to_remote_readers.lock().unwrap()
422 .entry(local_writer)
423 .and_modify(|s| {s.insert(remote_reader);} )
424 .or_insert(BTreeSet::from([remote_reader]));
425 }
426 DomainParticipantStatusEvent::RemoteWriterMatched { local_reader, remote_writer } => {
427 self.readers_to_remote_writers.lock().unwrap()
428 .entry(local_reader)
429 .and_modify(|s| {s.insert(remote_writer);} )
430 .or_insert(BTreeSet::from([remote_writer]));
431 }
432 DomainParticipantStatusEvent::ReaderLost {guid, ..} => {
433 for ( _local, readers)
434 in self.writers_to_remote_readers.lock().unwrap().iter_mut() {
435 readers.remove(&guid);
436 }
437 }
438 DomainParticipantStatusEvent::WriterLost {guid, ..} => {
439 for ( _local, writers)
440 in self.readers_to_remote_writers.lock().unwrap().iter_mut() {
441 writers.remove(&guid);
442 }
443 }
444
445 _ => {}
446 }
447
448 self.send_status_event( &NodeEvent::DDS(dp_status_event) );
450 }
451 }
452 }
453 info!("Spinner {} exiting .spin()", &self.fully_qualified_node_name );
454 Ok(())
455 } fn send_status_event(&self, event: &NodeEvent) {
459 let mut closed = Vec::new();
460 let mut sender_array = self.status_event_senders.lock().unwrap();
461 for (i, sender) in sender_array.iter().enumerate() {
462 match sender.try_send(event.clone()) {
463 Ok(()) => {
464 }
466 Err(async_channel::TrySendError::Closed(_)) => {
467 closed.push(i) }
470 Err(e) => {
471 debug!("send_status_event: Send error for {i}: {e:?}");
472 }
475 }
476 }
477
478 for c in closed.iter().rev() {
480 sender_array.swap_remove(*c);
481 }
482 }
483
484 fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
486 match name {
487 "use_sim_time" => match value {
489 ParameterValue::Boolean(_) => Ok(()),
490 _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
491 },
492 _ => {
494 match self.parameter_validator {
495 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
498 }
499 }
500 }
501
502 fn execute_parameter_set_actions(
504 &self,
505 name: &str,
506 value: &ParameterValue,
507 ) -> SetParametersResult {
508 match name {
509 "use_sim_time" => match value {
510 ParameterValue::Boolean(s) => {
511 self.use_sim_time.store(*s, Ordering::SeqCst);
512 Ok(())
513 }
514 _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
515 },
516 _ => {
517 match self.parameter_set_action {
518 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
521 }
522 }
523 }
524
525 pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
528 let already_set = self.parameters.lock().unwrap().contains_key(name);
529 if self.allow_undeclared_parameters || already_set {
530 self.validate_parameter_on_set(name, &value)?;
531 self.execute_parameter_set_actions(name, &value)?;
532
533 let p = raw::Parameter {
535 name: name.to_string(),
536 value: value.clone().into(),
537 };
538 let (new_parameters, changed_parameters) = if already_set {
539 (vec![], vec![p])
540 } else {
541 (vec![p], vec![])
542 };
543
544 self
546 .parameters
547 .lock()
548 .unwrap()
549 .insert(name.to_owned(), value);
550 self
552 .parameter_events_writer
553 .publish(raw::ParameterEvent {
554 timestamp: rustdds::Timestamp::now(), node: self.fully_qualified_node_name.clone(),
556 new_parameters,
557 changed_parameters,
558 deleted_parameters: vec![],
559 })
560 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
561 Ok(())
562 } else {
563 Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
564 }
565 }
566} #[derive(Debug)]
573pub enum NodeCreateError {
574 DDS(CreateError),
575 BadParameter(String),
576}
577
578impl From<CreateError> for NodeCreateError {
579 fn from(c: CreateError) -> NodeCreateError {
580 NodeCreateError::DDS(c)
581 }
582}
583
584impl fmt::Display for NodeCreateError {
585 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
586 match self {
587 Self::DDS(create_error) => write!(f, "NodeCreateError::DDS : {create_error}"),
588 Self::BadParameter(s) => write!(f, "NodeCreateError::BadParameter : {s}"),
589 }
590 }
591}
592
593impl Error for NodeCreateError {
594 fn source(&self) -> Option<&(dyn Error + 'static)> {
595 match self {
596 Self::DDS(create_error) => Some(create_error),
597 Self::BadParameter(_) => None,
598 }
599 }
600}
601
602pub enum ParameterError {
604 AlreadyDeclared,
605 InvalidName,
606}
607
608pub struct Node {
622 node_name: NodeName,
623 options: NodeOptions,
624
625 pub(crate) ros_context: Context,
626
627 readers: BTreeSet<Gid>,
630 writers: BTreeSet<Gid>,
631
632 suppress_node_info_updates: Arc<AtomicBool>,
633 readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
640 writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
641
642 external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
644 stop_spin_sender: Option<async_channel::Sender<()>>,
645
646 status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
648
649 rosout_writer: Arc<Option<Publisher<Log>>>,
651 rosout_reader: Option<Subscription<Log>>,
652
653 parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
656
657 parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
659 parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
661 parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
662
663 use_sim_time: Arc<AtomicBool>,
665 sim_time: Arc<Mutex<ROSTime>>,
666}
667
668impl Node {
669 pub(crate) fn new(
670 node_name: NodeName,
671 mut options: NodeOptions,
672 ros_context: Context,
673 ) -> Result<Node, NodeCreateError> {
674 let paramtopic = ros_context.get_parameter_events_topic();
675 let rosout_topic = ros_context.get_rosout_topic();
676
677 let enable_rosout = options.enable_rosout;
678 let rosout_reader = options.enable_rosout_reading;
679
680 let parameter_events_writer = ros_context.create_publisher(¶mtopic, None)?;
681
682 options.declared_parameters.push(Parameter {
685 name: "use_sim_time".to_string(),
686 value: ParameterValue::Boolean(false),
687 });
688 let parameters = options
689 .declared_parameters
690 .iter()
691 .cloned()
692 .map(|Parameter { name, value }| (name, value))
693 .collect::<BTreeMap<String, ParameterValue>>();
694
695 let parameter_validator = options
696 .parameter_validator
697 .take()
698 .map(|b| Arc::new(Mutex::new(b)));
699 let parameter_set_action = options
700 .parameter_set_action
701 .take()
702 .map(|b| Arc::new(Mutex::new(b)));
703
704 let mut node = Node {
705 node_name,
706 options,
707 ros_context,
708 readers: BTreeSet::new(),
709 writers: BTreeSet::new(),
710 readers_to_remote_writers: Arc::new(Mutex::new(BTreeMap::new())),
711 writers_to_remote_readers: Arc::new(Mutex::new(BTreeMap::new())),
712 external_nodes: Arc::new(Mutex::new(BTreeMap::new())),
713 suppress_node_info_updates: Arc::new(AtomicBool::new(false)),
714 stop_spin_sender: None,
715 status_event_senders: Arc::new(Mutex::new(Vec::new())),
716 rosout_writer: Arc::new(None), rosout_reader: None,
718 parameter_events_writer: Arc::new(parameter_events_writer),
719 parameters: Arc::new(Mutex::new(parameters)),
720 parameter_validator,
721 parameter_set_action,
722 use_sim_time: Arc::new(AtomicBool::new(false)),
723 sim_time: Arc::new(Mutex::new(ROSTime::ZERO)),
724 };
725
726 node.suppress_node_info_updates(true);
727
728 node.rosout_writer = if enable_rosout {
729 Arc::new(Some(
730 node.create_publisher(&rosout_topic, None)?,
732 ))
733 } else {
734 Arc::new(None) };
736 node.rosout_reader = if rosout_reader {
737 Some(node.create_subscription(&rosout_topic, None)?)
738 } else {
739 None
740 };
741
742 node
744 .parameters
745 .lock()
746 .unwrap()
747 .iter()
748 .try_for_each(|(name, value)| {
749 node.validate_parameter_on_set(name, value)?;
750 node.execute_parameter_set_actions(name, value)?;
751 Ok(())
752 })
753 .map_err(NodeCreateError::BadParameter)?;
754
755 node.suppress_node_info_updates(false);
756
757 Ok(node)
758 }
759
760 pub fn time_now(&self) -> ROSTime {
764 if self.use_sim_time.load(Ordering::SeqCst) {
765 *self.sim_time.lock().unwrap()
766 } else {
767 ROSTime::now()
768 }
769 }
770
771 pub fn time_now_not_simulated(&self) -> ROSTime {
772 ROSTime::now()
773 }
774
775 pub fn spinner(&mut self) -> CreateResult<Spinner> {
784 if self.stop_spin_sender.is_some() {
785 panic!("Attempted to crate a second spinner.");
786 }
787 let (stop_spin_sender, stop_spin_receiver) = async_channel::bounded(1);
788 self.stop_spin_sender = Some(stop_spin_sender);
789
790 let service_qos = QosPolicyBuilder::new()
792 .reliability(policy::Reliability::Reliable {
793 max_blocking_time: Duration::from_millis(100),
794 })
795 .history(policy::History::KeepLast { depth: 1 })
796 .build();
797
798 let node_name = self.node_name.fully_qualified_name();
799
800 self.suppress_node_info_updates(true);
801
802 let parameter_servers = if self.options.start_parameter_services {
803 let service_mapping = ServiceMapping::Enhanced; let get_parameters_server = self.create_server(
805 service_mapping,
806 &Name::new(&node_name, "get_parameters").unwrap(),
807 &ServiceTypeName::new("rcl_interfaces", "GetParameters"),
808 service_qos.clone(),
809 service_qos.clone(),
810 )?;
811 let get_parameter_types_server = self.create_server(
812 service_mapping,
813 &Name::new(&node_name, "get_parameter_types").unwrap(),
814 &ServiceTypeName::new("rcl_interfaces", "GetParameterTypes"),
815 service_qos.clone(),
816 service_qos.clone(),
817 )?;
818 let set_parameters_server = self.create_server(
819 service_mapping,
820 &Name::new(&node_name, "set_parameters").unwrap(),
821 &ServiceTypeName::new("rcl_interfaces", "SetParameters"),
822 service_qos.clone(),
823 service_qos.clone(),
824 )?;
825 let set_parameters_atomically_server = self.create_server(
826 service_mapping,
827 &Name::new(&node_name, "set_parameters_atomically").unwrap(),
828 &ServiceTypeName::new("rcl_interfaces", "SetParametersAtomically"),
829 service_qos.clone(),
830 service_qos.clone(),
831 )?;
832 let list_parameters_server = self.create_server(
833 service_mapping,
834 &Name::new(&node_name, "list_parameters").unwrap(),
835 &ServiceTypeName::new("rcl_interfaces", "ListParameters"),
836 service_qos.clone(),
837 service_qos.clone(),
838 )?;
839 let describe_parameters_server = self.create_server(
840 service_mapping,
841 &Name::new(&node_name, "describe_parameters").unwrap(),
842 &ServiceTypeName::new("rcl_interfaces", "DescribeParameters"),
843 service_qos.clone(),
844 service_qos.clone(),
845 )?;
846
847 Some(ParameterServers {
848 get_parameters_server,
849 get_parameter_types_server,
850 list_parameters_server,
851 set_parameters_server,
852 set_parameters_atomically_server,
853 describe_parameters_server,
854 })
855 } else {
856 None };
858
859 let clock_topic = self.create_topic(
860 &Name::new("/", "clock").unwrap(),
861 MessageTypeName::new("builtin_interfaces", "Time"),
862 &DEFAULT_SUBSCRIPTION_QOS,
863 )?;
864
865 self.suppress_node_info_updates(false);
866
867 Ok(Spinner {
868 ros_context: self.ros_context.clone(),
869 stop_spin_receiver,
870 readers_to_remote_writers: Arc::clone(&self.readers_to_remote_writers),
871 writers_to_remote_readers: Arc::clone(&self.writers_to_remote_readers),
872 external_nodes: Arc::clone(&self.external_nodes),
873 status_event_senders: Arc::clone(&self.status_event_senders),
874 use_sim_time: Arc::clone(&self.use_sim_time),
875 sim_time: Arc::clone(&self.sim_time),
876 clock_topic,
877 parameter_servers,
878 parameter_events_writer: Arc::clone(&self.parameter_events_writer),
879 parameters: Arc::clone(&self.parameters),
880 allow_undeclared_parameters: self.options.allow_undeclared_parameters,
881 parameter_validator: self.parameter_validator.as_ref().map(Arc::clone),
882 parameter_set_action: self.parameter_set_action.as_ref().map(Arc::clone),
883 fully_qualified_node_name: self.fully_qualified_name(),
884 })
885 }
886
887 pub fn have_spinner(&self) -> bool {
892 self.stop_spin_sender.is_some()
893 }
894
895 fn generate_node_info(&self) -> NodeEntitiesInfo {
897 let mut node_info = NodeEntitiesInfo::new(self.node_name.clone());
898
899 node_info.add_writer(Gid::from(self.parameter_events_writer.guid()));
900 if let Some(ref row) = *self.rosout_writer {
901 node_info.add_writer(Gid::from(row.guid()));
902 }
903
904 for reader in &self.readers {
905 node_info.add_reader(*reader);
906 }
907
908 for writer in &self.writers {
909 node_info.add_writer(*writer);
910 }
911
912 node_info
913 }
914
915 fn suppress_node_info_updates(&mut self, suppress: bool) {
916 self
917 .suppress_node_info_updates
918 .store(suppress, Ordering::SeqCst);
919
920 if !suppress {
922 self.ros_context.update_node(self.generate_node_info());
923 }
924 }
925
926 fn add_reader(&mut self, reader: Gid) {
927 self.readers.insert(reader);
928 if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
929 self.ros_context.update_node(self.generate_node_info());
930 }
931 }
932
933 fn add_writer(&mut self, writer: Gid) {
934 self.writers.insert(writer);
935 if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
936 self.ros_context.update_node(self.generate_node_info());
937 }
938 }
939
940 pub fn namespace(&self) -> &str {
941 self.node_name.namespace()
942 }
943
944 pub fn fully_qualified_name(&self) -> String {
945 self.node_name.fully_qualified_name()
946 }
947
948 pub fn options(&self) -> &NodeOptions {
949 &self.options
950 }
951
952 pub fn domain_id(&self) -> u16 {
953 self.ros_context.domain_id()
954 }
955
956 pub fn undeclare_parameter(&self, name: &str) {
960 let prev_value = self.parameters.lock().unwrap().remove(name);
961
962 if let Some(deleted_param) = prev_value {
963 self
965 .parameter_events_writer
966 .publish(raw::ParameterEvent {
967 timestamp: self.time_now().into(),
968 node: self.fully_qualified_name(),
969 new_parameters: vec![],
970 changed_parameters: vec![],
971 deleted_parameters: vec![raw::Parameter {
972 name: name.to_string(),
973 value: deleted_param.into(),
974 }],
975 })
976 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
977 }
978 }
979
980 pub fn has_parameter(&self, name: &str) -> bool {
982 self.parameters.lock().unwrap().contains_key(name)
983 }
984
985 pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
996 let already_set = self.parameters.lock().unwrap().contains_key(name);
997 if self.options.allow_undeclared_parameters || already_set {
998 self.validate_parameter_on_set(name, &value)?;
999 self.execute_parameter_set_actions(name, &value)?;
1000
1001 let p = raw::Parameter {
1003 name: name.to_string(),
1004 value: value.clone().into(),
1005 };
1006 let (new_parameters, changed_parameters) = if already_set {
1007 (vec![], vec![p])
1008 } else {
1009 (vec![p], vec![])
1010 };
1011
1012 self
1014 .parameters
1015 .lock()
1016 .unwrap()
1017 .insert(name.to_owned(), value);
1018 self
1020 .parameter_events_writer
1021 .publish(raw::ParameterEvent {
1022 timestamp: self.time_now().into(),
1023 node: self.fully_qualified_name(),
1024 new_parameters,
1025 changed_parameters,
1026 deleted_parameters: vec![],
1027 })
1028 .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
1029 Ok(())
1030 } else {
1031 Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
1032 }
1033 }
1034
1035 pub fn allow_undeclared_parameters(&self) -> bool {
1036 self.options.allow_undeclared_parameters
1037 }
1038
1039 pub fn get_parameter(&self, name: &str) -> Option<ParameterValue> {
1041 self
1042 .parameters
1043 .lock()
1044 .unwrap()
1045 .get(name)
1046 .map(|p| p.to_owned())
1047 }
1048
1049 pub fn list_parameters(&self) -> Vec<String> {
1050 self
1051 .parameters
1052 .lock()
1053 .unwrap()
1054 .keys()
1055 .map(move |k| k.to_owned())
1056 .collect::<Vec<_>>()
1057 }
1058
1059 fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
1066 match name {
1067 "use_sim_time" => match value {
1069 ParameterValue::Boolean(_) => Ok(()),
1070 _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
1071 },
1072 _ => {
1074 match self.parameter_validator {
1075 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
1078 }
1079 }
1080 }
1081
1082 fn execute_parameter_set_actions(
1084 &self,
1085 name: &str,
1086 value: &ParameterValue,
1087 ) -> SetParametersResult {
1088 match name {
1089 "use_sim_time" => match value {
1090 ParameterValue::Boolean(s) => {
1091 self.use_sim_time.store(*s, Ordering::SeqCst);
1092 Ok(())
1093 }
1094 _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
1095 },
1096 _ => {
1097 match self.parameter_set_action {
1098 Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
1101 }
1102 }
1103 }
1104
1105 pub fn status_receiver(&self) -> Receiver<NodeEvent> {
1112 if self.have_spinner() {
1113 let (status_event_sender, status_event_receiver) = async_channel::bounded(8);
1114 self
1115 .status_event_senders
1116 .lock()
1117 .unwrap()
1118 .push(status_event_sender);
1119 status_event_receiver
1120 } else {
1121 panic!("status_receiver() cannot set up a receiver, because no Spinner is running.")
1122 }
1123 }
1124
1125 pub(crate) fn wait_for_writer(&self, reader: GUID) -> impl Future<Output = ()> {
1127 let status_receiver = self.status_receiver();
1129
1130 let already_present = self
1131 .readers_to_remote_writers
1132 .lock()
1133 .unwrap()
1134 .get(&reader)
1135 .map(|writers| !writers.is_empty()) .unwrap_or(false); if already_present {
1139 WriterWait::Ready
1140 } else {
1141 WriterWait::Wait {
1142 this_reader: reader,
1143 status_event_stream: Box::pin(status_receiver),
1144 }
1145 }
1146 }
1147
1148 pub(crate) fn wait_for_reader(&self, writer: GUID) -> impl Future<Output = ()> {
1149 let status_receiver = self.status_receiver();
1151
1152 let already_present = self
1153 .writers_to_remote_readers
1154 .lock()
1155 .unwrap()
1156 .get(&writer)
1157 .map(|readers| !readers.is_empty()) .unwrap_or(false); if already_present {
1164 info!("wait_for_reader: Already have matched a reader.");
1165 ReaderWait::Ready
1166 } else {
1167 ReaderWait::Wait {
1168 this_writer: writer,
1169 status_event_stream: Box::pin(status_receiver),
1170 }
1171 }
1172 }
1173
1174 pub(crate) fn get_publisher_count(&self, subscription_guid: GUID) -> usize {
1175 self
1176 .readers_to_remote_writers
1177 .lock()
1178 .unwrap()
1179 .get(&subscription_guid)
1180 .map(BTreeSet::len)
1181 .unwrap_or_else(|| {
1182 error!("get_publisher_count: Subscriber {subscription_guid:?} not known to node.");
1183 0
1184 })
1185 }
1186
1187 pub(crate) fn get_subscription_count(&self, publisher_guid: GUID) -> usize {
1188 self
1189 .writers_to_remote_readers
1190 .lock()
1191 .unwrap()
1192 .get(&publisher_guid)
1193 .map(BTreeSet::len)
1194 .unwrap_or_else(|| {
1195 error!("get_subscription_count: Publisher {publisher_guid:?} not known to node.");
1196 0
1197 })
1198 }
1199
1200 pub fn rosout_subscription(&self) -> Option<&Subscription<Log>> {
1204 self.rosout_reader.as_ref()
1205 }
1206
1207 pub fn create_topic(
1236 &self,
1237 topic_name: &Name,
1238 type_name: MessageTypeName,
1239 qos: &QosPolicies,
1240 ) -> CreateResult<Topic> {
1241 let dds_name = topic_name.to_dds_name("rt", &self.node_name, "");
1242 self.ros_context.create_topic(dds_name, type_name, qos)
1243 }
1244
1245 pub fn create_subscription<D: 'static>(
1253 &mut self,
1254 topic: &Topic,
1255 qos: Option<QosPolicies>,
1256 ) -> CreateResult<Subscription<D>> {
1257 let sub = self.ros_context.create_subscription(topic, qos)?;
1258 self.add_reader(sub.guid().into());
1259 Ok(sub)
1260 }
1261
1262 pub fn create_publisher<D: Serialize>(
1271 &mut self,
1272 topic: &Topic,
1273 qos: Option<QosPolicies>,
1274 ) -> CreateResult<Publisher<D>> {
1275 let p = self.ros_context.create_publisher(topic, qos)?;
1276 self.add_writer(p.guid().into());
1277 Ok(p)
1278 }
1279
1280 pub(crate) fn create_simpledatareader<D, DA>(
1281 &mut self,
1282 topic: &Topic,
1283 qos: Option<QosPolicies>,
1284 ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
1285 where
1286 D: 'static,
1287 DA: rustdds::no_key::DeserializerAdapter<D> + 'static,
1288 {
1289 let r = self.ros_context.create_simpledatareader(topic, qos)?;
1290 self.add_reader(r.guid().into());
1291 Ok(r)
1292 }
1293
1294 pub(crate) fn create_datawriter<D, SA>(
1295 &mut self,
1296 topic: &Topic,
1297 qos: Option<QosPolicies>,
1298 ) -> CreateResult<no_key::DataWriter<D, SA>>
1299 where
1300 SA: rustdds::no_key::SerializerAdapter<D>,
1301 {
1302 let w = self.ros_context.create_datawriter(topic, qos)?;
1303 self.add_writer(w.guid().into());
1304 Ok(w)
1305 }
1306
1307 pub fn create_client<S>(
1315 &mut self,
1316 service_mapping: ServiceMapping,
1317 service_name: &Name,
1318 service_type_name: &ServiceTypeName,
1319 request_qos: QosPolicies,
1320 response_qos: QosPolicies,
1321 ) -> CreateResult<Client<S>>
1322 where
1323 S: Service + 'static,
1324 S::Request: Clone,
1325 {
1326 let rq_topic = self.ros_context.domain_participant().create_topic(
1332 service_name.to_dds_name("rq", &self.node_name, "Request"),
1333 service_type_name.dds_request_type(),
1335 &request_qos,
1336 TopicKind::NoKey,
1337 )?;
1338 let rs_topic = self.ros_context.domain_participant().create_topic(
1339 service_name.to_dds_name("rr", &self.node_name, "Reply"),
1340 service_type_name.dds_response_type(),
1342 &response_qos,
1343 TopicKind::NoKey,
1344 )?;
1345
1346 let c = Client::<S>::new(
1347 service_mapping,
1348 self,
1349 &rq_topic,
1350 &rs_topic,
1351 Some(request_qos),
1352 Some(response_qos),
1353 )?;
1354
1355 Ok(c)
1356 }
1357
1358 pub fn create_server<S>(
1367 &mut self,
1368 service_mapping: ServiceMapping,
1369 service_name: &Name,
1370 service_type_name: &ServiceTypeName,
1371 request_qos: QosPolicies,
1372 response_qos: QosPolicies,
1373 ) -> CreateResult<Server<S>>
1374 where
1375 S: Service + 'static,
1376 S::Request: Clone,
1377 {
1378 let rq_topic = self.ros_context.domain_participant().create_topic(
1384 service_name.to_dds_name("rq", &self.node_name, "Request"),
1386 service_type_name.dds_request_type(),
1387 &request_qos,
1388 TopicKind::NoKey,
1389 )?;
1390 let rs_topic = self.ros_context.domain_participant().create_topic(
1391 service_name.to_dds_name("rr", &self.node_name, "Reply"),
1392 service_type_name.dds_response_type(),
1393 &response_qos,
1394 TopicKind::NoKey,
1395 )?;
1396
1397 let s = Server::<S>::new(
1398 service_mapping,
1399 self,
1400 &rq_topic,
1401 &rs_topic,
1402 Some(request_qos),
1403 Some(response_qos),
1404 )?;
1405
1406 Ok(s)
1407 }
1408
1409 pub fn create_action_client<A>(
1410 &mut self,
1411 service_mapping: ServiceMapping,
1412 action_name: &Name,
1413 action_type_name: &ActionTypeName,
1414 action_qos: ActionClientQosPolicies,
1415 ) -> CreateResult<ActionClient<A>>
1416 where
1417 A: ActionTypes + 'static,
1418 {
1419 let services_base_name = action_name.push("_action");
1422
1423 let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1425 let my_goal_client = self.create_client(
1426 service_mapping,
1427 &services_base_name.push("send_goal"),
1429 &goal_service_type,
1430 action_qos.goal_service.clone(),
1431 action_qos.goal_service,
1432 )?;
1433
1434 let cancel_goal_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1436 let my_cancel_client = self.create_client(
1437 service_mapping,
1438 &services_base_name.push("cancel_goal"),
1440 &cancel_goal_type,
1441 action_qos.cancel_service.clone(),
1442 action_qos.cancel_service,
1443 )?;
1444
1445 let result_service_type = action_type_name.dds_action_service("_GetResult");
1447 let my_result_client = self.create_client(
1448 service_mapping,
1449 &services_base_name.push("get_result"),
1451 &result_service_type,
1452 action_qos.result_service.clone(),
1453 action_qos.result_service,
1454 )?;
1455
1456 let action_topic_namespace = action_name.push("_action");
1457
1458 let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1459 let feedback_topic = self.create_topic(
1460 &action_topic_namespace.push("feedback"),
1461 feedback_topic_type,
1462 &action_qos.feedback_subscription,
1463 )?;
1464 let my_feedback_subscription =
1465 self.create_subscription(&feedback_topic, Some(action_qos.feedback_subscription))?;
1466
1467 let status_topic = self.create_topic(
1469 &action_topic_namespace.push("status"),
1470 MessageTypeName::new("action_msgs", "GoalStatusArray"),
1471 &action_qos.status_subscription,
1472 )?;
1473 let my_status_subscription =
1474 self.create_subscription(&status_topic, Some(action_qos.status_subscription))?;
1475
1476 Ok(ActionClient {
1477 my_goal_client,
1478 my_cancel_client,
1479 my_result_client,
1480 my_feedback_subscription,
1481 my_status_subscription,
1482 my_action_name: action_name.clone(),
1483 })
1484 }
1485
1486 pub fn create_action_server<A>(
1487 &mut self,
1488 service_mapping: ServiceMapping,
1489 action_name: &Name,
1490 action_type_name: &ActionTypeName,
1491 action_qos: ActionServerQosPolicies,
1492 ) -> CreateResult<ActionServer<A>>
1493 where
1494 A: ActionTypes + 'static,
1495 {
1496 let services_base_name = action_name.push("_action");
1497
1498 let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1500 let my_goal_server = self.create_server(
1501 service_mapping,
1502 &services_base_name.push("send_goal"),
1504 &goal_service_type,
1505 action_qos.goal_service.clone(),
1506 action_qos.goal_service,
1507 )?;
1508
1509 let cancel_service_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1511 let my_cancel_server = self.create_server(
1512 service_mapping,
1513 &services_base_name.push("cancel_goal"),
1515 &cancel_service_type,
1516 action_qos.cancel_service.clone(),
1517 action_qos.cancel_service,
1518 )?;
1519
1520 let result_service_type = action_type_name.dds_action_service("_GetResult");
1522 let my_result_server = self.create_server(
1523 service_mapping,
1524 &services_base_name.push("get_result"),
1526 &result_service_type,
1527 action_qos.result_service.clone(),
1528 action_qos.result_service,
1529 )?;
1530
1531 let action_topic_namespace = action_name.push("_action");
1532
1533 let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1534 let feedback_topic = self.create_topic(
1535 &action_topic_namespace.push("feedback"),
1536 feedback_topic_type,
1537 &action_qos.feedback_publisher,
1538 )?;
1539 let my_feedback_publisher =
1540 self.create_publisher(&feedback_topic, Some(action_qos.feedback_publisher))?;
1541
1542 let status_topic_type = MessageTypeName::new("action_msgs", "GoalStatusArray");
1543 let status_topic = self.create_topic(
1544 &action_topic_namespace.push("status"),
1545 status_topic_type,
1546 &action_qos.status_publisher,
1547 )?;
1548 let my_status_publisher =
1549 self.create_publisher(&status_topic, Some(action_qos.status_publisher))?;
1550
1551 Ok(ActionServer {
1552 my_goal_server,
1553 my_cancel_server,
1554 my_result_server,
1555 my_feedback_publisher,
1556 my_status_publisher,
1557 my_action_name: action_name.clone(),
1558 })
1559 }
1560
1561 pub fn logging_handle(&self) -> NodeLoggingHandle {
1565 NodeLoggingHandle {
1566 rosout_writer: Arc::clone(&self.rosout_writer),
1567 base_name: self.node_name.base_name().to_string(),
1568 }
1569 }
1570
1571 pub fn stop_spinner(&self) {
1572 info!("Signalling spinner to stop (manual)");
1573 if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1574 stop_spin_sender
1575 .try_send(())
1576 .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1577 }
1578 }
1579
1580} impl Drop for Node {
1583 fn drop(&mut self) {
1584 debug!("Signalling spinner to stop (.drop)");
1585 if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1586 stop_spin_sender
1587 .try_send(())
1588 .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1589 }
1590
1591 self
1592 .ros_context
1593 .remove_node(self.fully_qualified_name().as_str());
1594 }
1595}
1596
1597impl RosoutRaw for Node {
1598 fn rosout_writer(&self) -> Arc<Option<Publisher<Log>>> {
1599 Arc::clone(&self.rosout_writer)
1600 }
1601
1602 fn base_name(&self) -> &str {
1603 self.node_name.base_name()
1604 }
1605}
1606
1607#[macro_export]
1626macro_rules! rosout {
1627 ($node:expr, $lvl:expr, $($arg:tt)+) => (
1628 $crate::rosout::RosoutRaw::rosout_raw(
1629 &$node,
1630 $crate::ros2::Timestamp::now(),
1631 $lvl,
1632 $crate::rosout::RosoutRaw::base_name(&$node),
1633 &std::format!($($arg)+), std::file!(),
1635 "<unknown_func>", std::line!(),
1637 );
1638 );
1639}
1640
1641#[must_use = "futures do nothing unless you `.await` or poll them"]
1649pub enum ReaderWait<'a> {
1650 Wait {
1652 this_writer: GUID, status_event_stream: stream::BoxStream<'a, NodeEvent>,
1654 },
1655 Ready,
1657}
1658
1659impl Future for ReaderWait<'_> {
1660 type Output = ();
1661
1662 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1663 match *self {
1664 ReaderWait::Ready => Poll::Ready(()),
1665
1666 ReaderWait::Wait {
1667 this_writer,
1668 ref mut status_event_stream,
1669 } => {
1670 debug!("wait_for_reader: Waiting for a reader.");
1671 loop {
1672 match status_event_stream.poll_next_unpin(cx) {
1673 Poll::Ready(Some(NodeEvent::DDS(
1675 DomainParticipantStatusEvent::RemoteReaderMatched {
1676 local_writer,
1677 remote_reader,
1678 },
1679 )))
1680 if local_writer == this_writer =>
1681 {
1682 debug!("wait_for_reader: Matched remote reader {remote_reader:?}.");
1683 return Poll::Ready(());
1684 }
1685
1686 Poll::Ready(_) => {
1687 debug!("wait_for_reader: other event. Continue polling.");
1689 }
1691
1692 Poll::Pending => return Poll::Pending,
1693 }
1694 }
1695 }
1696 }
1697 }
1698}
1699
1700#[must_use = "futures do nothing unless you `.await` or poll them"]
1707pub enum WriterWait<'a> {
1708 Wait {
1710 this_reader: GUID,
1711 status_event_stream: stream::BoxStream<'a, NodeEvent>,
1712 },
1713 Ready,
1715}
1716
1717impl Future for WriterWait<'_> {
1718 type Output = ();
1719
1720 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1721 match *self {
1722 WriterWait::Ready => Poll::Ready(()),
1723
1724 WriterWait::Wait {
1725 this_reader,
1726 ref mut status_event_stream,
1727 } => {
1728 debug!("wait_for_writer: Waiting for a writer.");
1729 loop {
1730 match status_event_stream.poll_next_unpin(cx) {
1734 Poll::Ready(Some(NodeEvent::DDS(
1736 DomainParticipantStatusEvent::RemoteWriterMatched {
1737 local_reader,
1738 remote_writer,
1739 },
1740 )))
1741 if local_reader == this_reader =>
1742 {
1743 debug!("wait_for_writer: Matched remote writer {remote_writer:?}.");
1744 return Poll::Ready(());
1745 }
1746
1747 Poll::Ready(_) => {
1748 trace!("=== other writer. Continue polling.");
1750 }
1752
1753 Poll::Pending => return Poll::Pending,
1754 }
1755 }
1756 }
1757 }
1758 }
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763 use crate::Context;
1764 use super::{Node, NodeName, NodeOptions};
1765
1766 #[test]
1767 fn node_is_sync() {
1768 let node = Node::new(
1769 NodeName::new("/", "base_name").unwrap(),
1770 NodeOptions::new(),
1771 Context::new().unwrap(),
1772 )
1773 .unwrap();
1774
1775 fn requires_send_sync<T: Send + Sync>(_t: T) {}
1776 requires_send_sync(node);
1777 }
1778}