ros2_client/
node.rs

1use std::{
2  collections::{BTreeMap, BTreeSet},
3  error::Error,
4  fmt,
5  pin::Pin,
6  sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc, Mutex,
9  },
10};
11
12use futures::{
13  pin_mut, stream, stream::FusedStream, task, task::Poll, Future, FutureExt, Stream, StreamExt,
14};
15use async_channel::Receiver;
16#[allow(unused_imports)]
17use log::{debug, error, info, trace, warn};
18use serde::Serialize;
19use rustdds::{
20  dds::{CreateError, CreateResult},
21  *,
22};
23
24use crate::{
25  action::*,
26  builtin_interfaces,
27  context::{Context, DEFAULT_SUBSCRIPTION_QOS},
28  entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
29  gid::Gid,
30  log::Log,
31  names::*,
32  parameters::*,
33  pubsub::{Publisher, Subscription},
34  rcl_interfaces,
35  ros_time::ROSTime,
36  rosout::{NodeLoggingHandle, RosoutRaw},
37  service::{Client, Server, Service, ServiceMapping},
38};
39
40type ParameterFunc = dyn Fn(&str, &ParameterValue) -> SetParametersResult + Send + Sync;
41/// Configuration of [Node]
42/// This is a builder-like struct.
43///
44/// The NodeOptions struct does not contain
45/// node_name, context, or namespace, because
46/// they ae always needed and have no reasonable default.
47#[must_use]
48pub struct NodeOptions {
49  #[allow(dead_code)]
50  cli_args: Vec<String>,
51  #[allow(dead_code)]
52  use_global_arguments: bool, // process-wide command line args
53  enable_rosout: bool, // use rosout topic for logging?
54  enable_rosout_reading: bool,
55  start_parameter_services: bool,
56  declared_parameters: Vec<Parameter>,
57  allow_undeclared_parameters: bool,
58  parameter_validator: Option<Box<ParameterFunc>>,
59  parameter_set_action: Option<Box<ParameterFunc>>,
60}
61
62impl NodeOptions {
63  /// Get a default NodeOptions
64  pub fn new() -> NodeOptions {
65    // These defaults are from rclpy reference
66    // https://docs.ros2.org/latest/api/rclpy/api/node.html
67    NodeOptions {
68      cli_args: Vec::new(),
69      use_global_arguments: true,
70      enable_rosout: true,
71      enable_rosout_reading: false,
72      start_parameter_services: true,
73      declared_parameters: Vec::new(),
74      allow_undeclared_parameters: false,
75      parameter_validator: None,
76      parameter_set_action: None,
77    }
78  }
79  pub fn enable_rosout(self, enable_rosout: bool) -> NodeOptions {
80    NodeOptions {
81      enable_rosout,
82      ..self
83    }
84  }
85
86  pub fn read_rosout(self, enable_rosout_reading: bool) -> NodeOptions {
87    NodeOptions {
88      enable_rosout_reading,
89      ..self
90    }
91  }
92
93  pub fn declare_parameter(mut self, name: &str, value: ParameterValue) -> NodeOptions {
94    self.declared_parameters.push(Parameter {
95      name: name.to_owned(),
96      value,
97    });
98    // TODO: check for duplicate parameter names
99    self
100  }
101
102  pub fn parameter_validator(mut self, validator: Box<ParameterFunc>) -> NodeOptions {
103    self.parameter_validator = Some(validator);
104    self
105  }
106
107  pub fn parameter_set_action(mut self, action: Box<ParameterFunc>) -> NodeOptions {
108    self.parameter_set_action = Some(action);
109    self
110  }
111}
112
113impl Default for NodeOptions {
114  fn default() -> Self {
115    Self::new()
116  }
117}
118// ----------------------------------------------------------------------------------------------------
119// ----------------------------------------------------------------------------------------------------
120
121/// DDS or ROS 2 Discovery events.
122#[derive(Clone, Debug)]
123pub enum NodeEvent {
124  DDS(DomainParticipantStatusEvent),
125  ROS(ParticipantEntitiesInfo),
126}
127
128struct ParameterServers {
129  get_parameters_server: Server<rcl_interfaces::GetParametersService>,
130  get_parameter_types_server: Server<rcl_interfaces::GetParameterTypesService>,
131  list_parameters_server: Server<rcl_interfaces::ListParametersService>,
132  set_parameters_server: Server<rcl_interfaces::SetParametersService>,
133  set_parameters_atomically_server: Server<rcl_interfaces::SetParametersAtomicallyService>,
134  describe_parameters_server: Server<rcl_interfaces::DescribeParametersService>,
135}
136
137// ----------------------------------------------------------------------------------------------------
138// ----------------------------------------------------------------------------------------------------
139/// Spinner implements Node's background event loop.
140///
141/// At the moment there are only Discovery (DDS and ROS 2 Graph) event
142/// processing, but this would be extended to handle Parameters and other
143/// possible background tasks also.
144pub struct Spinner {
145  ros_context: Context,
146  stop_spin_receiver: async_channel::Receiver<()>,
147
148  readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
149  writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
150  // Keep track of ros_discovery_info
151  external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
152  //suppress_node_info_updates: Arc<AtomicBool>, // temporarily suppress sending updates
153  status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
154
155  use_sim_time: Arc<AtomicBool>,
156  sim_time: Arc<Mutex<ROSTime>>,
157  clock_topic: Topic,
158  allow_undeclared_parameters: bool,
159
160  parameter_servers: Option<ParameterServers>,
161  parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
162  parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
163  parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
164  parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
165  fully_qualified_node_name: String,
166}
167
168async fn next_if_some<S>(s: &mut Option<S>) -> S::Item
169where
170  S: Stream + Unpin + FusedStream,
171{
172  match s.as_mut() {
173    Some(stream) => stream.select_next_some().await,
174    None => std::future::pending().await,
175  }
176}
177
178impl Spinner {
179  pub async fn spin(self) -> CreateResult<()> {
180    let dds_status_listener = self.ros_context.domain_participant().status_listener();
181    let dds_status_stream = dds_status_listener.as_async_status_stream();
182    pin_mut!(dds_status_stream);
183
184    let ros_discovery_topic = self.ros_context.ros_discovery_topic();
185    let ros_discovery_reader = self
186      .ros_context
187      .create_subscription::<ParticipantEntitiesInfo>(&ros_discovery_topic, None)?;
188    let ros_discovery_stream = ros_discovery_reader.async_stream();
189    pin_mut!(ros_discovery_stream);
190
191    let ros_clock_reader = self
192      .ros_context
193      .create_subscription::<builtin_interfaces::Time>(&self.clock_topic, None)?;
194    let ros_clock_stream = ros_clock_reader.async_stream();
195    pin_mut!(ros_clock_stream);
196
197    // These are Option< impl Stream<_>>
198    let mut get_parameters_stream_opt = self
199      .parameter_servers
200      .as_ref()
201      .map(|s| s.get_parameters_server.receive_request_stream());
202    let mut get_parameter_types_stream_opt = self
203      .parameter_servers
204      .as_ref()
205      .map(|s| s.get_parameter_types_server.receive_request_stream());
206    let mut set_parameters_stream_opt = self
207      .parameter_servers
208      .as_ref()
209      .map(|s| s.set_parameters_server.receive_request_stream());
210    let mut set_parameters_atomically_stream_opt = self
211      .parameter_servers
212      .as_ref()
213      .map(|s| s.set_parameters_atomically_server.receive_request_stream());
214    let mut list_parameter_stream_opt = self
215      .parameter_servers
216      .as_ref()
217      .map(|s| s.list_parameters_server.receive_request_stream());
218    let mut describe_parameters_stream_opt = self
219      .parameter_servers
220      .as_ref()
221      .map(|s| s.describe_parameters_server.receive_request_stream());
222
223    loop {
224      futures::select! {
225        _ = self.stop_spin_receiver.recv().fuse() => {
226          break;
227        }
228
229        clock_msg = ros_clock_stream.select_next_some() => {
230          match clock_msg {
231            Ok((time,_msg_info)) => {
232              // Simulated time is updated internally unconditionally.
233              // The logic in Node decides if it is used.
234              *self.sim_time.lock().unwrap() = time.into();
235            }
236            Err(e) => warn!("Simulated clock receive error {e:?}")
237          }
238        }
239
240
241        get_parameters_request = next_if_some(&mut get_parameters_stream_opt).fuse() => {
242          match get_parameters_request {
243            Ok( (req_id, req) ) => {
244              info!("Get parameter request {req:?}");
245              let values = {
246                let param_db = self.parameters.lock().unwrap();
247                req.names.iter()
248                  .map(|name| param_db.get(name.as_str())
249                    .unwrap_or(&ParameterValue::NotSet))
250                  .cloned()
251                  .map( raw::ParameterValue::from)
252                  .collect()
253              };
254              info!("Get parameters response: {values:?}");
255
256              // .unwrap() below should be safe, as we would not be here if the Server did not exist
257              self.parameter_servers.as_ref().unwrap().get_parameters_server
258                .async_send_response(req_id, rcl_interfaces::GetParametersResponse{ values })
259                .await
260                .unwrap_or_else(|e| warn!("GetParameter response error {e:?}"));
261            }
262            Err(e) => warn!("GetParameter request error {e:?}"),
263          }
264        }
265
266        get_parameter_types_request = next_if_some(&mut get_parameter_types_stream_opt).fuse() => {
267          match get_parameter_types_request {
268            Ok( (req_id, req) ) => {
269              warn!("Get parameter types request");
270              let values = {
271                let param_db = self.parameters.lock().unwrap();
272                req.names.iter()
273                  .map(|name| param_db.get(name.as_str())
274                    .unwrap_or(&ParameterValue::NotSet))
275                  .map(ParameterValue::to_parameter_type_raw)
276                  .collect()
277              };
278              info!("Get parameter types response: {values:?}");
279              // .unwrap() below should be safe, as we would not be here if the Server did not exist
280              self.parameter_servers.as_ref().unwrap().get_parameter_types_server
281                .async_send_response(req_id, rcl_interfaces::GetParameterTypesResponse{ values })
282                .await
283                .unwrap_or_else(|e| warn!("GetParameterTypes response error {e:?}"));
284            }
285            Err(e) => warn!("GetParameterTypes request error {e:?}"),
286          }
287        }
288
289        set_parameters_request = next_if_some(&mut set_parameters_stream_opt).fuse() => {
290          match set_parameters_request {
291            Ok( (req_id, req) ) => {
292              info!("Set parameter request {req:?}");
293              let results =
294                req.parameter.iter()
295                  .cloned()
296                  .map( Parameter::from ) // convert from "raw::Parameter"
297                  .map( |Parameter{name, value}| self.set_parameter(&name,value))
298                  .map(|r| r.into()) // to "raw" Result for serialization
299                  .collect();
300              info!("Set parameters response: {results:?}");
301              // .unwrap() below should be safe, as we would not be here if the Server did not exist
302              self.parameter_servers.as_ref().unwrap().set_parameters_server
303                .async_send_response(req_id, rcl_interfaces::SetParametersResponse{ results })
304                .await
305                .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
306            }
307            Err(e) => warn!("SetParameters request error {e:?}"),
308          }
309        }
310
311        set_parameters_atomically_request = next_if_some(&mut set_parameters_atomically_stream_opt).fuse() => {
312          match set_parameters_atomically_request {
313            Ok( (req_id, req) ) => {
314              warn!("Set parameters atomically request {req:?}");
315              let results =
316                req.parameter.iter()
317                  .cloned()
318                  .map( Parameter::from ) // convert from "raw::Parameter"
319                  .map( |Parameter{ .. } |
320                      // TODO: Implement atomic setting.
321                      Err("Setting parameters atomically is not implemented.".to_owned())
322                    )
323                  .map(|r| r.into()) // to "raw" Result for serialization
324                  .collect();
325              warn!("Set parameters atomically response: {results:?}");
326              // .unwrap() below should be safe, as we would not be here if the Server did not exist
327              self.parameter_servers.as_ref().unwrap().set_parameters_atomically_server
328                .async_send_response(req_id, rcl_interfaces::SetParametersAtomicallyResponse{ results })
329                .await
330                .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
331            }
332            Err(e) => warn!("SetParametersAtomically request error {e:?}"),
333          }
334        }
335
336        list_parameter_request = next_if_some(&mut list_parameter_stream_opt).fuse() => {
337          match list_parameter_request {
338            Ok( (req_id, req) ) => {
339              info!("List parameters request");
340              let prefixes = req.prefixes;
341              // TODO: We only generate the "names" part of the ListParametersResponse
342              // What should we put into `prefixes` ?
343              let names = {
344                let param_db = self.parameters.lock().unwrap();
345                param_db.keys()
346                  .filter_map(|name|
347                    if prefixes.is_empty() ||
348                      prefixes.iter().any(|prefix| name.starts_with(prefix))
349                    {
350                      Some(name.clone())
351                    } else { None }
352                  )
353                  .collect()
354              };
355              let result = rcl_interfaces::ListParametersResult{ names, prefixes: vec![] };
356              // .unwrap() below should be safe, as we would not be here if the Server did not exist
357              info!("List parameters response: {result:?}");
358              self.parameter_servers.as_ref().unwrap().list_parameters_server
359                .async_send_response(req_id, rcl_interfaces::ListParametersResponse{ result })
360                .await
361                .unwrap_or_else(|e| warn!("ListParameter response error {e:?}"));
362            }
363            Err(e) => warn!("ListParameter request error {e:?}"),
364          }
365        }
366
367        describe_parameters_request = next_if_some(&mut describe_parameters_stream_opt).fuse() => {
368          match describe_parameters_request {
369            Ok( (req_id, req) ) => {
370              info!("Describe parameters request {req:?}");
371              let values = {
372                let parameters = self.parameters.lock().unwrap();
373                req.names.iter()
374                  .map( |name|
375                    {
376                      if let Some(value) = parameters.get(name) {
377                        ParameterDescriptor::from_value(name, value)
378                      } else {
379                        ParameterDescriptor::unknown(name)
380                      }
381                    })
382                  .map(|r| r.into()) // to "raw" Result for serialization
383                  .collect()
384              };
385              info!("Describe parameters response: {values:?}");
386              // .unwrap() below should be safe, as we would not be here if the Server did not exist
387              self.parameter_servers.as_ref().unwrap().describe_parameters_server
388                .async_send_response(req_id, rcl_interfaces::DescribeParametersResponse{ values })
389                .await
390                .unwrap_or_else(|e| warn!("DescribeParameters response error {e:?}"));
391            }
392            Err(e) => warn!("DescribeParameters request error {e:?}"),
393          }
394        }
395
396        participant_info_update = ros_discovery_stream.select_next_some() => {
397          //println!("{:?}", participant_info_update);
398          match participant_info_update {
399            Ok((part_update, _msg_info)) => {
400              // insert to Node-local ros_discovery_info bookkeeping
401              let mut info_map = self.external_nodes.lock().unwrap();
402              info_map.insert( part_update.gid, part_update.node_entities_info_seq.clone());
403              // also notify any status listeneners
404              self.send_status_event( &NodeEvent::ROS(part_update) );
405            }
406            Err(e) => {
407              warn!("ros_discovery_info error {e:?}");
408            }
409          }
410        }
411
412        dp_status_event = dds_status_stream.select_next_some() => {
413          //println!("{:?}", dp_status_event );
414
415          // update remote reader/writer databases
416          match dp_status_event {
417            DomainParticipantStatusEvent::RemoteReaderMatched { local_writer, remote_reader } => {
418              self.writers_to_remote_readers.lock().unwrap()
419                .entry(local_writer)
420                .and_modify(|s| {s.insert(remote_reader);} )
421                .or_insert(BTreeSet::from([remote_reader]));
422            }
423            DomainParticipantStatusEvent::RemoteWriterMatched { local_reader, remote_writer } => {
424              self.readers_to_remote_writers.lock().unwrap()
425                .entry(local_reader)
426                .and_modify(|s| {s.insert(remote_writer);} )
427                .or_insert(BTreeSet::from([remote_writer]));
428            }
429            DomainParticipantStatusEvent::ReaderLost {guid, ..} => {
430              for ( _local, readers)
431              in self.writers_to_remote_readers.lock().unwrap().iter_mut() {
432                readers.remove(&guid);
433              }
434            }
435            DomainParticipantStatusEvent::WriterLost {guid, ..} => {
436              for ( _local, writers)
437              in self.readers_to_remote_writers.lock().unwrap().iter_mut() {
438                writers.remove(&guid);
439              }
440            }
441
442            _ => {}
443          }
444
445          // also notify any status listeneners
446          self.send_status_event( &NodeEvent::DDS(dp_status_event) );
447        }
448      }
449    }
450    info!("Spinner exiting .spin()");
451    Ok(())
452    //}
453  } // fn
454
455  fn send_status_event(&self, event: &NodeEvent) {
456    let mut closed = Vec::new();
457    let mut sender_array = self.status_event_senders.lock().unwrap();
458    for (i, sender) in sender_array.iter().enumerate() {
459      match sender.try_send(event.clone()) {
460        Ok(()) => {
461          // expected result
462        }
463        Err(async_channel::TrySendError::Closed(_)) => {
464          // trace!("Closing {i}");
465          closed.push(i) // mark for deletion
466        }
467        Err(e) => {
468          debug!("send_status_event: Send error for {i}: {e:?}");
469          // We do not do anything about the error. It may be that the receiver
470          // is not interested and the channel is full.
471        }
472      }
473    }
474
475    // remove senders that reported they were closed
476    for c in closed.iter().rev() {
477      sender_array.swap_remove(*c);
478    }
479  }
480
481  // Keep this function in sync with the same function in Node.
482  fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
483    match name {
484      // built-in parameter check
485      "use_sim_time" => match value {
486        ParameterValue::Boolean(_) => Ok(()),
487        _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
488      },
489      // application-defined parameters
490      _ => {
491        match self.parameter_validator {
492          Some(ref v) => v.lock().unwrap()(name, value), // ask the validator to judge
493          None => Ok(()),                                // no validator defined, always accept
494        }
495      }
496    }
497  }
498
499  // Keep this function in sync with the same function in Node.
500  fn execute_parameter_set_actions(
501    &self,
502    name: &str,
503    value: &ParameterValue,
504  ) -> SetParametersResult {
505    match name {
506      "use_sim_time" => match value {
507        ParameterValue::Boolean(s) => {
508          self.use_sim_time.store(*s, Ordering::SeqCst);
509          Ok(())
510        }
511        _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
512      },
513      _ => {
514        match self.parameter_set_action {
515          Some(ref v) => v.lock().unwrap()(name, value), // execute custom action
516          None => Ok(()),                                // no action defined, always accept
517        }
518      }
519    }
520  }
521
522  /// Sets a parameter value. Parameter must be
523  /// [declared](NodeOptions::declare_parameter) before setting.
524  pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
525    let already_set = self.parameters.lock().unwrap().contains_key(name);
526    if self.allow_undeclared_parameters || already_set {
527      self.validate_parameter_on_set(name, &value)?;
528      self.execute_parameter_set_actions(name, &value)?;
529
530      // no errors, prepare for sending notificaiton
531      let p = raw::Parameter {
532        name: name.to_string(),
533        value: value.clone().into(),
534      };
535      let (new_parameters, changed_parameters) = if already_set {
536        (vec![], vec![p])
537      } else {
538        (vec![p], vec![])
539      };
540
541      // actually set the parameter
542      self
543        .parameters
544        .lock()
545        .unwrap()
546        .insert(name.to_owned(), value);
547      // and notify
548      self
549        .parameter_events_writer
550        .publish(raw::ParameterEvent {
551          timestamp: rustdds::Timestamp::now(), // differs from version in Node!!!
552          node: self.fully_qualified_node_name.clone(),
553          new_parameters,
554          changed_parameters,
555          deleted_parameters: vec![],
556        })
557        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
558      Ok(())
559    } else {
560      Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
561    }
562  }
563} // impl Spinner
564
565// ----------------------------------------------------------------------------------------------------
566// ----------------------------------------------------------------------------------------------------
567
568/// What went wrong in `Node` creation
569#[derive(Debug)]
570pub enum NodeCreateError {
571  DDS(CreateError),
572  BadParameter(String),
573}
574
575impl From<CreateError> for NodeCreateError {
576  fn from(c: CreateError) -> NodeCreateError {
577    NodeCreateError::DDS(c)
578  }
579}
580
581impl fmt::Display for NodeCreateError {
582  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
583    match self {
584      Self::DDS(create_error) => write!(f, "NodeCreateError::DDS : {create_error}"),
585      Self::BadParameter(s) => write!(f, "NodeCreateError::BadParameter : {s}"),
586    }
587  }
588}
589
590impl Error for NodeCreateError {
591  fn source(&self) -> Option<&(dyn Error + 'static)> {
592    match self {
593      Self::DDS(create_error) => Some(create_error),
594      Self::BadParameter(_) => None,
595    }
596  }
597}
598
599/// Error when setting `Parameter`s
600pub enum ParameterError {
601  AlreadyDeclared,
602  InvalidName,
603}
604
605// TODO: We should notify ROS discovery when readers or writers are removed, but
606// now we do not do that.
607
608/// Node in ROS2 network. Holds necessary readers and writers for rosout and
609/// parameter events topics internally.
610///
611/// These are produced by a [`Context`].
612///
613/// Many ROS 2 background tasks do not run unless you execute a [`Spinner`] for
614/// a `Node`. If you are using an async executor, consider running e.g.
615/// `
616/// smol::spawn(node.spinner().unwrap().spin()).detach();
617/// `
618pub struct Node {
619  node_name: NodeName,
620  options: NodeOptions,
621
622  pub(crate) ros_context: Context,
623
624  // sets of Readers and Writers belonging to ( = created via) this Node
625  // These indicate what has been created locally.
626  readers: BTreeSet<Gid>,
627  writers: BTreeSet<Gid>,
628
629  suppress_node_info_updates: Arc<AtomicBool>,
630  // temporarily suppress sending updates
631  // to prevent flood of messages. TODO: not shared: need not be atomic or Arc.
632
633  // Keep track of who is matched via DDS Discovery
634  // Map keys are lists of local Subscriptions and Publishers.
635  // Map values are lists of matched Publishers / Subscriptions.
636  readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
637  writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
638
639  // Keep track of ros_discovery_info
640  external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
641  stop_spin_sender: Option<async_channel::Sender<()>>,
642
643  // Channels to report discovery events to
644  status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
645
646  // builtin writers and readers
647  rosout_writer: Arc<Option<Publisher<Log>>>,
648  rosout_reader: Option<Subscription<Log>>,
649
650  // Parameter events (rcl_interfaces)
651  // Parameter Services are inside Spinner
652  parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
653
654  // Parameter store
655  parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
656  // allow_undeclared_parameters: bool, // this is inside "options"
657  parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
658  parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
659
660  // simulated ROSTime
661  use_sim_time: Arc<AtomicBool>,
662  sim_time: Arc<Mutex<ROSTime>>,
663}
664
665impl Node {
666  pub(crate) fn new(
667    node_name: NodeName,
668    mut options: NodeOptions,
669    ros_context: Context,
670  ) -> Result<Node, NodeCreateError> {
671    let paramtopic = ros_context.get_parameter_events_topic();
672    let rosout_topic = ros_context.get_rosout_topic();
673
674    let enable_rosout = options.enable_rosout;
675    let rosout_reader = options.enable_rosout_reading;
676
677    let parameter_events_writer = ros_context.create_publisher(&paramtopic, None)?;
678
679    // TODO: If there are duplicates, the later one will overwrite the earlier, but
680    // there is no warning or error.
681    options.declared_parameters.push(Parameter {
682      name: "use_sim_time".to_string(),
683      value: ParameterValue::Boolean(false),
684    });
685    let parameters = options
686      .declared_parameters
687      .iter()
688      .cloned()
689      .map(|Parameter { name, value }| (name, value))
690      .collect::<BTreeMap<String, ParameterValue>>();
691
692    let parameter_validator = options
693      .parameter_validator
694      .take()
695      .map(|b| Arc::new(Mutex::new(b)));
696    let parameter_set_action = options
697      .parameter_set_action
698      .take()
699      .map(|b| Arc::new(Mutex::new(b)));
700
701    let mut node = Node {
702      node_name,
703      options,
704      ros_context,
705      readers: BTreeSet::new(),
706      writers: BTreeSet::new(),
707      readers_to_remote_writers: Arc::new(Mutex::new(BTreeMap::new())),
708      writers_to_remote_readers: Arc::new(Mutex::new(BTreeMap::new())),
709      external_nodes: Arc::new(Mutex::new(BTreeMap::new())),
710      suppress_node_info_updates: Arc::new(AtomicBool::new(false)),
711      stop_spin_sender: None,
712      status_event_senders: Arc::new(Mutex::new(Vec::new())),
713      rosout_writer: Arc::new(None), // Set below
714      rosout_reader: None,
715      parameter_events_writer: Arc::new(parameter_events_writer),
716      parameters: Arc::new(Mutex::new(parameters)),
717      parameter_validator,
718      parameter_set_action,
719      use_sim_time: Arc::new(AtomicBool::new(false)),
720      sim_time: Arc::new(Mutex::new(ROSTime::ZERO)),
721    };
722
723    node.suppress_node_info_updates(true);
724
725    node.rosout_writer = if enable_rosout {
726      Arc::new(Some(
727        // topic already has QoS defined
728        node.create_publisher(&rosout_topic, None)?,
729      ))
730    } else {
731      Arc::new(None) // FIXME: we already set that above!
732    };
733    node.rosout_reader = if rosout_reader {
734      Some(node.create_subscription(&rosout_topic, None)?)
735    } else {
736      None
737    };
738
739    // returns `Err` if some parameter does not validate.
740    node
741      .parameters
742      .lock()
743      .unwrap()
744      .iter()
745      .try_for_each(|(name, value)| {
746        node.validate_parameter_on_set(name, value)?;
747        node.execute_parameter_set_actions(name, value)?;
748        Ok(())
749      })
750      .map_err(NodeCreateError::BadParameter)?;
751
752    node.suppress_node_info_updates(false);
753
754    Ok(node)
755  }
756
757  /// Return the ROSTime
758  ///
759  /// It is either the system clock time
760  pub fn time_now(&self) -> ROSTime {
761    if self.use_sim_time.load(Ordering::SeqCst) {
762      *self.sim_time.lock().unwrap()
763    } else {
764      ROSTime::now()
765    }
766  }
767
768  pub fn time_now_not_simulated(&self) -> ROSTime {
769    ROSTime::now()
770  }
771
772  /// Create a Spinner object to execute Node backround tasks.
773  ///
774  /// An async task should then be created to run the `.spin()` function of
775  /// `Spinner`.
776  ///
777  /// E.g. `executor.spawn(node.spinner().spin())`
778  ///
779  /// The `.spin()` task runs until `Node` is dropped.
780  pub fn spinner(&mut self) -> CreateResult<Spinner> {
781    if self.stop_spin_sender.is_some() {
782      panic!("Attempted to crate a second spinner.");
783    }
784    let (stop_spin_sender, stop_spin_receiver) = async_channel::bounded(1);
785    self.stop_spin_sender = Some(stop_spin_sender);
786
787    //TODO: Check QoS policies against ROS 2 specs or some refernce.
788    let service_qos = QosPolicyBuilder::new()
789      .reliability(policy::Reliability::Reliable {
790        max_blocking_time: Duration::from_millis(100),
791      })
792      .history(policy::History::KeepLast { depth: 1 })
793      .build();
794
795    let node_name = self.node_name.fully_qualified_name();
796
797    self.suppress_node_info_updates(true);
798
799    let parameter_servers = if self.options.start_parameter_services {
800      let service_mapping = ServiceMapping::Enhanced; //TODO: parameterize
801      let get_parameters_server = self.create_server(
802        service_mapping,
803        &Name::new(&node_name, "get_parameters").unwrap(),
804        &ServiceTypeName::new("rcl_interfaces", "GetParameters"),
805        service_qos.clone(),
806        service_qos.clone(),
807      )?;
808      let get_parameter_types_server = self.create_server(
809        service_mapping,
810        &Name::new(&node_name, "get_parameter_types").unwrap(),
811        &ServiceTypeName::new("rcl_interfaces", "GetParameterTypes"),
812        service_qos.clone(),
813        service_qos.clone(),
814      )?;
815      let set_parameters_server = self.create_server(
816        service_mapping,
817        &Name::new(&node_name, "set_parameters").unwrap(),
818        &ServiceTypeName::new("rcl_interfaces", "SetParameters"),
819        service_qos.clone(),
820        service_qos.clone(),
821      )?;
822      let set_parameters_atomically_server = self.create_server(
823        service_mapping,
824        &Name::new(&node_name, "set_parameters_atomically").unwrap(),
825        &ServiceTypeName::new("rcl_interfaces", "SetParametersAtomically"),
826        service_qos.clone(),
827        service_qos.clone(),
828      )?;
829      let list_parameters_server = self.create_server(
830        service_mapping,
831        &Name::new(&node_name, "list_parameters").unwrap(),
832        &ServiceTypeName::new("rcl_interfaces", "ListParameters"),
833        service_qos.clone(),
834        service_qos.clone(),
835      )?;
836      let describe_parameters_server = self.create_server(
837        service_mapping,
838        &Name::new(&node_name, "describe_parameters").unwrap(),
839        &ServiceTypeName::new("rcl_interfaces", "DescribeParameters"),
840        service_qos.clone(),
841        service_qos.clone(),
842      )?;
843
844      Some(ParameterServers {
845        get_parameters_server,
846        get_parameter_types_server,
847        list_parameters_server,
848        set_parameters_server,
849        set_parameters_atomically_server,
850        describe_parameters_server,
851      })
852    } else {
853      None // No parameter services
854    };
855
856    let clock_topic = self.create_topic(
857      &Name::new("/", "clock").unwrap(),
858      MessageTypeName::new("builtin_interfaces", "Time"),
859      &DEFAULT_SUBSCRIPTION_QOS,
860    )?;
861
862    self.suppress_node_info_updates(false);
863
864    Ok(Spinner {
865      ros_context: self.ros_context.clone(),
866      stop_spin_receiver,
867      readers_to_remote_writers: Arc::clone(&self.readers_to_remote_writers),
868      writers_to_remote_readers: Arc::clone(&self.writers_to_remote_readers),
869      external_nodes: Arc::clone(&self.external_nodes),
870      status_event_senders: Arc::clone(&self.status_event_senders),
871      use_sim_time: Arc::clone(&self.use_sim_time),
872      sim_time: Arc::clone(&self.sim_time),
873      clock_topic,
874      parameter_servers,
875      parameter_events_writer: Arc::clone(&self.parameter_events_writer),
876      parameters: Arc::clone(&self.parameters),
877      allow_undeclared_parameters: self.options.allow_undeclared_parameters,
878      parameter_validator: self.parameter_validator.as_ref().map(Arc::clone),
879      parameter_set_action: self.parameter_set_action.as_ref().map(Arc::clone),
880      fully_qualified_node_name: self.fully_qualified_name(),
881    })
882  }
883
884  /// A heuristic to detect if a spinner has been created.
885  /// But this does still not guarantee that it is running, i.e.
886  /// an async excutor is runnning spinner.spin(), but this is the best we can
887  /// do.
888  pub fn have_spinner(&self) -> bool {
889    self.stop_spin_sender.is_some()
890  }
891
892  // Generates ROS2 node info from added readers and writers.
893  fn generate_node_info(&self) -> NodeEntitiesInfo {
894    let mut node_info = NodeEntitiesInfo::new(self.node_name.clone());
895
896    node_info.add_writer(Gid::from(self.parameter_events_writer.guid()));
897    if let Some(ref row) = *self.rosout_writer {
898      node_info.add_writer(Gid::from(row.guid()));
899    }
900
901    for reader in &self.readers {
902      node_info.add_reader(*reader);
903    }
904
905    for writer in &self.writers {
906      node_info.add_writer(*writer);
907    }
908
909    node_info
910  }
911
912  fn suppress_node_info_updates(&mut self, suppress: bool) {
913    self
914      .suppress_node_info_updates
915      .store(suppress, Ordering::SeqCst);
916
917    // Send updates when suppression ends
918    if !suppress {
919      self.ros_context.update_node(self.generate_node_info());
920    }
921  }
922
923  fn add_reader(&mut self, reader: Gid) {
924    self.readers.insert(reader);
925    if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
926      self.ros_context.update_node(self.generate_node_info());
927    }
928  }
929
930  fn add_writer(&mut self, writer: Gid) {
931    self.writers.insert(writer);
932    if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
933      self.ros_context.update_node(self.generate_node_info());
934    }
935  }
936
937  pub fn namespace(&self) -> &str {
938    self.node_name.namespace()
939  }
940
941  pub fn fully_qualified_name(&self) -> String {
942    self.node_name.fully_qualified_name()
943  }
944
945  pub fn options(&self) -> &NodeOptions {
946    &self.options
947  }
948
949  pub fn domain_id(&self) -> u16 {
950    self.ros_context.domain_id()
951  }
952
953  // ///////////////////////////////////////////////
954  // Parameters
955
956  pub fn undeclare_parameter(&self, name: &str) {
957    let prev_value = self.parameters.lock().unwrap().remove(name);
958
959    if let Some(deleted_param) = prev_value {
960      // a parameter was actually undeclared. Let others know.
961      self
962        .parameter_events_writer
963        .publish(raw::ParameterEvent {
964          timestamp: self.time_now().into(),
965          node: self.fully_qualified_name(),
966          new_parameters: vec![],
967          changed_parameters: vec![],
968          deleted_parameters: vec![raw::Parameter {
969            name: name.to_string(),
970            value: deleted_param.into(),
971          }],
972        })
973        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
974    }
975  }
976
977  /// Does the parameter exist?
978  pub fn has_parameter(&self, name: &str) -> bool {
979    self.parameters.lock().unwrap().contains_key(name)
980  }
981
982  /// Sets a parameter value. Parameter must be
983  /// [declared](NodeOptions::declare_parameter) before setting.
984  //
985  // TODO: This code is duplicated in Spinner. Not good.
986  // Find a way to de-duplicate.
987  // Same for validate_parameter_on_set and execute_parameter_set_actions.
988  // TODO: This does not account for built-in parameters e.g. "use_sim_time".
989  // It thinks they are new on first set.
990  // TODO: Setting Parameter to type NotSet counts as parameter deletion. Maybe
991  // that needs special handling? At least for notifications.
992  pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
993    let already_set = self.parameters.lock().unwrap().contains_key(name);
994    if self.options.allow_undeclared_parameters || already_set {
995      self.validate_parameter_on_set(name, &value)?;
996      self.execute_parameter_set_actions(name, &value)?;
997
998      // no errors, prepare for sending notificaiton
999      let p = raw::Parameter {
1000        name: name.to_string(),
1001        value: value.clone().into(),
1002      };
1003      let (new_parameters, changed_parameters) = if already_set {
1004        (vec![], vec![p])
1005      } else {
1006        (vec![p], vec![])
1007      };
1008
1009      // actually set the parameter
1010      self
1011        .parameters
1012        .lock()
1013        .unwrap()
1014        .insert(name.to_owned(), value);
1015      // and notify
1016      self
1017        .parameter_events_writer
1018        .publish(raw::ParameterEvent {
1019          timestamp: self.time_now().into(),
1020          node: self.fully_qualified_name(),
1021          new_parameters,
1022          changed_parameters,
1023          deleted_parameters: vec![],
1024        })
1025        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
1026      Ok(())
1027    } else {
1028      Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
1029    }
1030  }
1031
1032  pub fn allow_undeclared_parameters(&self) -> bool {
1033    self.options.allow_undeclared_parameters
1034  }
1035
1036  /// Gets the value of a parameter, or None is there is no such Parameter.
1037  pub fn get_parameter(&self, name: &str) -> Option<ParameterValue> {
1038    self
1039      .parameters
1040      .lock()
1041      .unwrap()
1042      .get(name)
1043      .map(|p| p.to_owned())
1044  }
1045
1046  pub fn list_parameters(&self) -> Vec<String> {
1047    self
1048      .parameters
1049      .lock()
1050      .unwrap()
1051      .keys()
1052      .map(move |k| k.to_owned())
1053      .collect::<Vec<_>>()
1054  }
1055
1056  // Keep this function in sync with the same function in Spinner.
1057  // TODO: This should refuse to change parameter type, unless
1058  // there is a ParamaterDescription defined and it allows
1059  // changing type.
1060  // TODO: Setting Parameter to type NotSet counts as parameter deletion. Maybe
1061  // that needs special handling?
1062  fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
1063    match name {
1064      // built-in parameter check
1065      "use_sim_time" => match value {
1066        ParameterValue::Boolean(_) => Ok(()),
1067        _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
1068      },
1069      // application-defined parameters
1070      _ => {
1071        match self.parameter_validator {
1072          Some(ref v) => v.lock().unwrap()(name, value), // ask the validator to judge
1073          None => Ok(()),                                // no validator defined, always accept
1074        }
1075      }
1076    }
1077  }
1078
1079  // Keep this function in sync with the same function in Spinner.
1080  fn execute_parameter_set_actions(
1081    &self,
1082    name: &str,
1083    value: &ParameterValue,
1084  ) -> SetParametersResult {
1085    match name {
1086      "use_sim_time" => match value {
1087        ParameterValue::Boolean(s) => {
1088          self.use_sim_time.store(*s, Ordering::SeqCst);
1089          Ok(())
1090        }
1091        _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
1092      },
1093      _ => {
1094        match self.parameter_set_action {
1095          Some(ref v) => v.lock().unwrap()(name, value), // execute custom action
1096          None => Ok(()),                                // no action defined, always accept
1097        }
1098      }
1099    }
1100  }
1101
1102  // ///////////////////////////////////////////////////
1103
1104  /// Get an async Receiver for discovery events.
1105  ///
1106  /// There must be an async task executing `spin` to get any data.
1107  /// This function may panic if there is no Spinner running.
1108  pub fn status_receiver(&self) -> Receiver<NodeEvent> {
1109    if self.have_spinner() {
1110      let (status_event_sender, status_event_receiver) = async_channel::bounded(8);
1111      self
1112        .status_event_senders
1113        .lock()
1114        .unwrap()
1115        .push(status_event_sender);
1116      status_event_receiver
1117    } else {
1118      panic!("status_receiver() cannot set up a receiver, because no Spinner is running.")
1119    }
1120  }
1121
1122  // reader waits for at least one writer to be present
1123  pub(crate) fn wait_for_writer(&self, reader: GUID) -> impl Future<Output = ()> {
1124    // TODO: This may contain some synchrnoization hazard
1125    let status_receiver = self.status_receiver();
1126
1127    let already_present = self
1128      .readers_to_remote_writers
1129      .lock()
1130      .unwrap()
1131      .get(&reader)
1132      .map(|writers| !writers.is_empty()) // there is someone matched
1133      .unwrap_or(false); // we do not even know the reader
1134
1135    if already_present {
1136      WriterWait::Ready
1137    } else {
1138      WriterWait::Wait {
1139        this_reader: reader,
1140        status_event_stream: Box::pin(status_receiver),
1141      }
1142    }
1143  }
1144
1145  pub(crate) fn wait_for_reader(&self, writer: GUID) -> impl Future<Output = ()> {
1146    // TODO: This may contain some synchrnoization hazard.
1147    let status_receiver = self.status_receiver();
1148
1149    let already_present = self
1150      .writers_to_remote_readers
1151      .lock()
1152      .unwrap()
1153      .get(&writer)
1154      .map(|readers| !readers.is_empty()) // there is someone matched
1155      .unwrap_or(false); // we do not even know who is asking
1156
1157    // TODO: Is is possible to miss reader events if they appear after the check
1158    // above, but do not somehow end up in the status_receiver stream?
1159
1160    if already_present {
1161      info!("wait_for_reader: Already have matched a reader.");
1162      ReaderWait::Ready
1163    } else {
1164      ReaderWait::Wait {
1165        this_writer: writer,
1166        status_event_stream: Box::pin(status_receiver),
1167      }
1168    }
1169  }
1170
1171  pub(crate) fn get_publisher_count(&self, subscription_guid: GUID) -> usize {
1172    self
1173      .readers_to_remote_writers
1174      .lock()
1175      .unwrap()
1176      .get(&subscription_guid)
1177      .map(BTreeSet::len)
1178      .unwrap_or_else(|| {
1179        error!("get_publisher_count: Subscriber {subscription_guid:?} not known to node.");
1180        0
1181      })
1182  }
1183
1184  pub(crate) fn get_subscription_count(&self, publisher_guid: GUID) -> usize {
1185    self
1186      .writers_to_remote_readers
1187      .lock()
1188      .unwrap()
1189      .get(&publisher_guid)
1190      .map(BTreeSet::len)
1191      .unwrap_or_else(|| {
1192        error!("get_subscription_count: Publisher {publisher_guid:?} not known to node.");
1193        0
1194      })
1195  }
1196
1197  /// Borrow the Subscription to our ROSOut Reader.
1198  ///
1199  /// Availability depends on Node configuration.
1200  pub fn rosout_subscription(&self) -> Option<&Subscription<Log>> {
1201    self.rosout_reader.as_ref()
1202  }
1203
1204  /// Creates ROS2 topic and handles necessary conversions from DDS to ROS2
1205  ///
1206  /// # Arguments
1207  ///
1208  /// * `domain_participant` -
1209  ///   [DomainParticipant](../dds/struct.DomainParticipant.html)
1210  /// * `name` - Name of the topic
1211  /// * `type_name` - What type the topic holds in string form
1212  /// * `qos` - Quality of Service parameters for the topic (not restricted only
1213  ///   to ROS2)
1214  ///
1215  ///  
1216  ///   [summary of all rules for topic and service names in ROS 2](https://design.ros2.org/articles/topic_and_service_names.html)
1217  ///   (as of Dec 2020)
1218  ///
1219  /// * must not be empty
1220  /// * may contain alphanumeric characters ([0-9|a-z|A-Z]), underscores (_), or
1221  ///   forward slashes (/)
1222  /// * may use balanced curly braces ({}) for substitutions
1223  /// * may start with a tilde (~), the private namespace substitution character
1224  /// * must not start with a numeric character ([0-9])
1225  /// * must not end with a forward slash (/)
1226  /// * must not contain any number of repeated forward slashes (/)
1227  /// * must not contain any number of repeated underscores (_)
1228  /// * must separate a tilde (~) from the rest of the name with a forward slash
1229  ///   (/), i.e. ~/foo not ~foo
1230  /// * must have balanced curly braces ({}) when used, i.e. {sub}/foo but not
1231  ///   {sub/foo nor /foo}
1232  pub fn create_topic(
1233    &self,
1234    topic_name: &Name,
1235    type_name: MessageTypeName,
1236    qos: &QosPolicies,
1237  ) -> CreateResult<Topic> {
1238    let dds_name = topic_name.to_dds_name("rt", &self.node_name, "");
1239    self.ros_context.create_topic(dds_name, type_name, qos)
1240  }
1241
1242  /// Creates ROS2 Subscriber
1243  ///
1244  /// # Arguments
1245  ///
1246  /// * `topic` - Reference to topic created with `create_ros_topic`.
1247  /// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if
1248  ///   it's compatible with topics QOS. `None` indicates the use of Topics QOS.
1249  pub fn create_subscription<D: 'static>(
1250    &mut self,
1251    topic: &Topic,
1252    qos: Option<QosPolicies>,
1253  ) -> CreateResult<Subscription<D>> {
1254    let sub = self.ros_context.create_subscription(topic, qos)?;
1255    self.add_reader(sub.guid().into());
1256    Ok(sub)
1257  }
1258
1259  /// Creates ROS2 Publisher
1260  ///
1261  /// # Arguments
1262  ///
1263  /// * `topic` - Reference to topic created with `create_ros_topic`.
1264  /// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use it
1265  ///   if it's compatible with topics QOS. `None` indicates the use of Topics
1266  ///   QOS.
1267  pub fn create_publisher<D: Serialize>(
1268    &mut self,
1269    topic: &Topic,
1270    qos: Option<QosPolicies>,
1271  ) -> CreateResult<Publisher<D>> {
1272    let p = self.ros_context.create_publisher(topic, qos)?;
1273    self.add_writer(p.guid().into());
1274    Ok(p)
1275  }
1276
1277  pub(crate) fn create_simpledatareader<D, DA>(
1278    &mut self,
1279    topic: &Topic,
1280    qos: Option<QosPolicies>,
1281  ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
1282  where
1283    D: 'static,
1284    DA: rustdds::no_key::DeserializerAdapter<D> + 'static,
1285  {
1286    let r = self.ros_context.create_simpledatareader(topic, qos)?;
1287    self.add_reader(r.guid().into());
1288    Ok(r)
1289  }
1290
1291  pub(crate) fn create_datawriter<D, SA>(
1292    &mut self,
1293    topic: &Topic,
1294    qos: Option<QosPolicies>,
1295  ) -> CreateResult<no_key::DataWriter<D, SA>>
1296  where
1297    SA: rustdds::no_key::SerializerAdapter<D>,
1298  {
1299    let w = self.ros_context.create_datawriter(topic, qos)?;
1300    self.add_writer(w.guid().into());
1301    Ok(w)
1302  }
1303
1304  /// Creates ROS2 Service Client
1305  ///
1306  /// # Arguments
1307  ///
1308  /// * `service_mapping` - ServiceMapping to be used
1309  /// * `service_name` -
1310  /// * `qos`-
1311  pub fn create_client<S>(
1312    &mut self,
1313    service_mapping: ServiceMapping,
1314    service_name: &Name,
1315    service_type_name: &ServiceTypeName,
1316    request_qos: QosPolicies,
1317    response_qos: QosPolicies,
1318  ) -> CreateResult<Client<S>>
1319  where
1320    S: Service + 'static,
1321    S::Request: Clone,
1322  {
1323    // Add rq/ and rr/ prefixes as documented in
1324    // https://design.ros2.org/articles/topic_and_service_names.html
1325    // Where are the suffixes documented?
1326    // And why "Reply" and not "Response" ?
1327
1328    let rq_topic = self.ros_context.domain_participant().create_topic(
1329      service_name.to_dds_name("rq", &self.node_name, "Request"),
1330      //rq_name,
1331      service_type_name.dds_request_type(),
1332      &request_qos,
1333      TopicKind::NoKey,
1334    )?;
1335    let rs_topic = self.ros_context.domain_participant().create_topic(
1336      service_name.to_dds_name("rr", &self.node_name, "Reply"),
1337      //rs_name,
1338      service_type_name.dds_response_type(),
1339      &response_qos,
1340      TopicKind::NoKey,
1341    )?;
1342
1343    let c = Client::<S>::new(
1344      service_mapping,
1345      self,
1346      &rq_topic,
1347      &rs_topic,
1348      Some(request_qos),
1349      Some(response_qos),
1350    )?;
1351
1352    Ok(c)
1353  }
1354
1355  /// Creates ROS2 Service Server
1356  ///
1357  /// # Arguments
1358  ///
1359  /// * `service_mapping` - ServiceMapping to be used. See
1360  ///   [`Self.create_client`].
1361  /// * `service_name` -
1362  /// * `qos`-
1363  pub fn create_server<S>(
1364    &mut self,
1365    service_mapping: ServiceMapping,
1366    service_name: &Name,
1367    service_type_name: &ServiceTypeName,
1368    request_qos: QosPolicies,
1369    response_qos: QosPolicies,
1370  ) -> CreateResult<Server<S>>
1371  where
1372    S: Service + 'static,
1373    S::Request: Clone,
1374  {
1375    // let rq_name = Self::check_name_and_add_prefix("rq/",
1376    // &(service_name.to_owned() + "Request"))?; let rs_name =
1377    // Self::check_name_and_add_prefix("rr/", &(service_name.to_owned() +
1378    // "Reply"))?;
1379
1380    let rq_topic = self.ros_context.domain_participant().create_topic(
1381      //rq_name,
1382      service_name.to_dds_name("rq", &self.node_name, "Request"),
1383      service_type_name.dds_request_type(),
1384      &request_qos,
1385      TopicKind::NoKey,
1386    )?;
1387    let rs_topic = self.ros_context.domain_participant().create_topic(
1388      service_name.to_dds_name("rr", &self.node_name, "Reply"),
1389      service_type_name.dds_response_type(),
1390      &response_qos,
1391      TopicKind::NoKey,
1392    )?;
1393
1394    let s = Server::<S>::new(
1395      service_mapping,
1396      self,
1397      &rq_topic,
1398      &rs_topic,
1399      Some(request_qos),
1400      Some(response_qos),
1401    )?;
1402
1403    Ok(s)
1404  }
1405
1406  pub fn create_action_client<A>(
1407    &mut self,
1408    service_mapping: ServiceMapping,
1409    action_name: &Name,
1410    action_type_name: &ActionTypeName,
1411    action_qos: ActionClientQosPolicies,
1412  ) -> CreateResult<ActionClient<A>>
1413  where
1414    A: ActionTypes + 'static,
1415  {
1416    // action name is e.g. "/turtle1/rotate_absolute"
1417    // action type name is e.g. "turtlesim/action/RotateAbsolute"
1418    let services_base_name = action_name.push("_action");
1419
1420    //let goal_service_name = action_name.to_owned() + "/_action/send_goal";
1421    let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1422    let my_goal_client = self.create_client(
1423      service_mapping,
1424      //&goal_service_name,
1425      &services_base_name.push("send_goal"),
1426      &goal_service_type,
1427      action_qos.goal_service.clone(),
1428      action_qos.goal_service,
1429    )?;
1430
1431    //let cancel_service_name = action_name.to_owned() + "/_action/cancel_goal";
1432    let cancel_goal_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1433    let my_cancel_client = self.create_client(
1434      service_mapping,
1435      //&cancel_service_name,
1436      &services_base_name.push("cancel_goal"),
1437      &cancel_goal_type,
1438      action_qos.cancel_service.clone(),
1439      action_qos.cancel_service,
1440    )?;
1441
1442    //let result_service_name = action_name.to_owned() + "/_action/get_result";
1443    let result_service_type = action_type_name.dds_action_service("_GetResult");
1444    let my_result_client = self.create_client(
1445      service_mapping,
1446      //&result_service_name,
1447      &services_base_name.push("get_result"),
1448      &result_service_type,
1449      action_qos.result_service.clone(),
1450      action_qos.result_service,
1451    )?;
1452
1453    let action_topic_namespace = action_name.push("_action");
1454
1455    let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1456    let feedback_topic = self.create_topic(
1457      &action_topic_namespace.push("feedback"),
1458      feedback_topic_type,
1459      &action_qos.feedback_subscription,
1460    )?;
1461    let my_feedback_subscription =
1462      self.create_subscription(&feedback_topic, Some(action_qos.feedback_subscription))?;
1463
1464    //let status_topic_type = ;
1465    let status_topic = self.create_topic(
1466      &action_topic_namespace.push("status"),
1467      MessageTypeName::new("action_msgs", "GoalStatusArray"),
1468      &action_qos.status_subscription,
1469    )?;
1470    let my_status_subscription =
1471      self.create_subscription(&status_topic, Some(action_qos.status_subscription))?;
1472
1473    Ok(ActionClient {
1474      my_goal_client,
1475      my_cancel_client,
1476      my_result_client,
1477      my_feedback_subscription,
1478      my_status_subscription,
1479      my_action_name: action_name.clone(),
1480    })
1481  }
1482
1483  pub fn create_action_server<A>(
1484    &mut self,
1485    service_mapping: ServiceMapping,
1486    action_name: &Name,
1487    action_type_name: &ActionTypeName,
1488    action_qos: ActionServerQosPolicies,
1489  ) -> CreateResult<ActionServer<A>>
1490  where
1491    A: ActionTypes + 'static,
1492  {
1493    let services_base_name = action_name.push("_action");
1494
1495    //let goal_service_name = action_name.to_owned() + "/_action/send_goal";
1496    let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1497    let my_goal_server = self.create_server(
1498      service_mapping,
1499      //&goal_service_name,
1500      &services_base_name.push("send_goal"),
1501      &goal_service_type,
1502      action_qos.goal_service.clone(),
1503      action_qos.goal_service,
1504    )?;
1505
1506    //let cancel_service_name = action_name.to_owned() + "/_action/cancel_goal";
1507    let cancel_service_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1508    let my_cancel_server = self.create_server(
1509      service_mapping,
1510      //&cancel_service_name,
1511      &services_base_name.push("cancel_goal"),
1512      &cancel_service_type,
1513      action_qos.cancel_service.clone(),
1514      action_qos.cancel_service,
1515    )?;
1516
1517    //let result_service_name = action_name.to_owned() + "/_action/get_result";
1518    let result_service_type = action_type_name.dds_action_service("_GetResult");
1519    let my_result_server = self.create_server(
1520      service_mapping,
1521      //&result_service_name,
1522      &services_base_name.push("get_result"),
1523      &result_service_type,
1524      action_qos.result_service.clone(),
1525      action_qos.result_service,
1526    )?;
1527
1528    let action_topic_namespace = action_name.push("_action");
1529
1530    let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1531    let feedback_topic = self.create_topic(
1532      &action_topic_namespace.push("feedback"),
1533      feedback_topic_type,
1534      &action_qos.feedback_publisher,
1535    )?;
1536    let my_feedback_publisher =
1537      self.create_publisher(&feedback_topic, Some(action_qos.feedback_publisher))?;
1538
1539    let status_topic_type = MessageTypeName::new("action_msgs", "GoalStatusArray");
1540    let status_topic = self.create_topic(
1541      &action_topic_namespace.push("status"),
1542      status_topic_type,
1543      &action_qos.status_publisher,
1544    )?;
1545    let my_status_publisher =
1546      self.create_publisher(&status_topic, Some(action_qos.status_publisher))?;
1547
1548    Ok(ActionServer {
1549      my_goal_server,
1550      my_cancel_server,
1551      my_result_server,
1552      my_feedback_publisher,
1553      my_status_publisher,
1554      my_action_name: action_name.clone(),
1555    })
1556  }
1557
1558  /// Makes a handle to the `rosout` logging publisher.
1559  ///
1560  /// You can send these across threads
1561  pub fn logging_handle(&self) -> NodeLoggingHandle {
1562    NodeLoggingHandle {
1563      rosout_writer: Arc::clone(&self.rosout_writer),
1564      base_name: self.node_name.base_name().to_string(),
1565    }
1566  }
1567} // impl Node
1568
1569impl Drop for Node {
1570  fn drop(&mut self) {
1571    if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1572      stop_spin_sender
1573        .try_send(())
1574        .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1575    }
1576
1577    self
1578      .ros_context
1579      .remove_node(self.fully_qualified_name().as_str());
1580  }
1581}
1582
1583impl RosoutRaw for Node {
1584  fn rosout_writer(&self) -> Arc<Option<Publisher<Log>>> {
1585    Arc::clone(&self.rosout_writer)
1586  }
1587
1588  fn base_name(&self) -> &str {
1589    self.node_name.base_name()
1590  }
1591}
1592
1593/// Macro for writing to [rosout](https://wiki.ros.org/rosout) topic.
1594///
1595/// # Example
1596///
1597/// ```
1598/// # use ros2_client::*;
1599/// #
1600/// # let context = Context::new().unwrap();
1601/// # let mut node = context
1602/// #     .new_node(
1603/// #       NodeName::new("/", "some_node").unwrap(),
1604/// #       NodeOptions::new().enable_rosout(true),
1605/// #     )
1606/// #     .unwrap();
1607/// let kind = "silly";
1608///
1609/// rosout!(node, ros2::LogLevel::Info, "A {} event was seen.", kind);
1610/// ```
1611#[macro_export]
1612macro_rules! rosout {
1613    ($node:expr, $lvl:expr, $($arg:tt)+) => (
1614        $crate::rosout::RosoutRaw::rosout_raw(
1615            &$node,
1616            $crate::ros2::Timestamp::now(),
1617            $lvl,
1618            $crate::rosout::RosoutRaw::base_name(&$node),
1619            &std::format!($($arg)+), // msg
1620            std::file!(),
1621            "<unknown_func>", // is there a macro to get current function name? (Which may be undefined)
1622            std::line!(),
1623        );
1624    );
1625}
1626
1627/// Future type for waiting Readers to appear over ROS2 Topic.
1628///
1629/// Produced by `node.wait_for_reader(writer_guid)`
1630//
1631// This is implemented as a separate struct instead of just async function in
1632// Node so that it does not borrow the node and thus can be Send.
1633//use pin_project::pin_project;
1634#[must_use = "futures do nothing unless you `.await` or poll them"]
1635pub enum ReaderWait<'a> {
1636  // We need to wait for an event that is for us
1637  Wait {
1638    this_writer: GUID, // Writer who is waiting for Readers to appear
1639    status_event_stream: stream::BoxStream<'a, NodeEvent>,
1640  },
1641  // No need to wait, can resolve immediately.
1642  Ready,
1643}
1644
1645impl Future for ReaderWait<'_> {
1646  type Output = ();
1647
1648  fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1649    match *self {
1650      ReaderWait::Ready => Poll::Ready(()),
1651
1652      ReaderWait::Wait {
1653        this_writer,
1654        ref mut status_event_stream,
1655      } => {
1656        debug!("wait_for_reader: Waiting for a reader.");
1657        loop {
1658          match status_event_stream.poll_next_unpin(cx) {
1659            // Check if we have RemoteReaderMatched event and it is for this_writer
1660            Poll::Ready(Some(NodeEvent::DDS(
1661              DomainParticipantStatusEvent::RemoteReaderMatched {
1662                local_writer,
1663                remote_reader,
1664              },
1665            )))
1666              if local_writer == this_writer =>
1667            {
1668              debug!("wait_for_reader: Matched remote reader {remote_reader:?}.");
1669              return Poll::Ready(());
1670            }
1671
1672            Poll::Ready(_) => {
1673              // Received something else, such as other event or error
1674              debug!("wait_for_reader: other event. Continue polling.");
1675              // So we do nothing but go to the next iteration.
1676            }
1677
1678            Poll::Pending => return Poll::Pending,
1679          }
1680        }
1681      }
1682    }
1683  }
1684}
1685
1686/// Future type for waiting Writers to appear over ROS2 Topic.
1687///
1688/// Produced by `node.wait_for_writer(writer_guid)`
1689//
1690// This is implemented as a separate struct instead of just async function in
1691// Node so that it does not borrow the node and thus can be Send.
1692#[must_use = "futures do nothing unless you `.await` or poll them"]
1693pub enum WriterWait<'a> {
1694  // We need to wait for an event that is for us
1695  Wait {
1696    this_reader: GUID,
1697    status_event_stream: stream::BoxStream<'a, NodeEvent>,
1698  },
1699  // No need to wait, can resolve immediately.
1700  Ready,
1701}
1702
1703impl Future for WriterWait<'_> {
1704  type Output = ();
1705
1706  fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1707    match *self {
1708      WriterWait::Ready => Poll::Ready(()),
1709
1710      WriterWait::Wait {
1711        this_reader,
1712        ref mut status_event_stream,
1713      } => {
1714        debug!("wait_for_writer: Waiting for a writer.");
1715        loop {
1716          // We loop to pump events out of the stream until we get the desired event
1717          // or "Pending". If we stop at the first event, then there is no waker
1718          // installed and we are stuck.
1719          match status_event_stream.poll_next_unpin(cx) {
1720            // Check if we have RemoteWriterMatched event and it is for this_writer
1721            Poll::Ready(Some(NodeEvent::DDS(
1722              DomainParticipantStatusEvent::RemoteWriterMatched {
1723                local_reader,
1724                remote_writer,
1725              },
1726            )))
1727              if local_reader == this_reader =>
1728            {
1729              debug!("wait_for_writer: Matched remote writer {remote_writer:?}.");
1730              return Poll::Ready(());
1731            }
1732
1733            Poll::Ready(_) => {
1734              // Received something else, such as other event or error
1735              trace!("=== other writer. Continue polling.");
1736              // No return, go to next iteration.
1737            }
1738
1739            Poll::Pending => return Poll::Pending,
1740          }
1741        }
1742      }
1743    }
1744  }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749  use crate::Context;
1750  use super::{Node, NodeName, NodeOptions};
1751
1752  #[test]
1753  fn node_is_sync() {
1754    let node = Node::new(
1755      NodeName::new("/", "base_name").unwrap(),
1756      NodeOptions::new(),
1757      Context::new().unwrap(),
1758    )
1759    .unwrap();
1760
1761    fn requires_send_sync<T: Send + Sync>(_t: T) {}
1762    requires_send_sync(node);
1763  }
1764}