ros2_client/
node.rs

1use std::{
2  collections::{BTreeMap, BTreeSet},
3  error::Error,
4  fmt,
5  pin::Pin,
6  sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc, Mutex,
9  },
10};
11
12use futures::{
13  pin_mut, stream, stream::FusedStream, task, task::Poll, Future, FutureExt, Stream, StreamExt,
14};
15use async_channel::Receiver;
16#[allow(unused_imports)]
17use log::{debug, error, info, trace, warn};
18use serde::Serialize;
19use rustdds::{
20  dds::{CreateError, CreateResult},
21  *,
22};
23
24use crate::{
25  action::*,
26  builtin_interfaces,
27  context::{Context, DEFAULT_SUBSCRIPTION_QOS},
28  entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
29  gid::Gid,
30  log::Log,
31  names::*,
32  parameters::*,
33  pubsub::{Publisher, Subscription},
34  rcl_interfaces,
35  ros_time::ROSTime,
36  rosout::{NodeLoggingHandle, RosoutRaw},
37  service::{Client, Server, Service, ServiceMapping},
38};
39
40type ParameterFunc = dyn Fn(&str, &ParameterValue) -> SetParametersResult + Send + Sync;
41/// Configuration of [Node]
42/// This is a builder-like struct.
43///
44/// The NodeOptions struct does not contain
45/// node_name, context, or namespace, because
46/// they ae always needed and have no reasonable default.
47#[must_use]
48pub struct NodeOptions {
49  #[allow(dead_code)]
50  cli_args: Vec<String>,
51  #[allow(dead_code)]
52  use_global_arguments: bool, // process-wide command line args
53  enable_rosout: bool, // use rosout topic for logging?
54  enable_rosout_reading: bool,
55  start_parameter_services: bool,
56  declared_parameters: Vec<Parameter>,
57  allow_undeclared_parameters: bool,
58  parameter_validator: Option<Box<ParameterFunc>>,
59  parameter_set_action: Option<Box<ParameterFunc>>,
60}
61
62impl NodeOptions {
63  /// Get a default NodeOptions
64  pub fn new() -> NodeOptions {
65    // These defaults are from rclpy reference
66    // https://docs.ros2.org/latest/api/rclpy/api/node.html
67    NodeOptions {
68      cli_args: Vec::new(),
69      use_global_arguments: true,
70      enable_rosout: true,
71      enable_rosout_reading: false,
72      start_parameter_services: true,
73      declared_parameters: Vec::new(),
74      allow_undeclared_parameters: false,
75      parameter_validator: None,
76      parameter_set_action: None,
77    }
78  }
79  pub fn enable_rosout(self, enable_rosout: bool) -> NodeOptions {
80    NodeOptions {
81      enable_rosout,
82      ..self
83    }
84  }
85
86  pub fn read_rosout(self, enable_rosout_reading: bool) -> NodeOptions {
87    NodeOptions {
88      enable_rosout_reading,
89      ..self
90    }
91  }
92
93  pub fn declare_parameter(mut self, name: &str, value: ParameterValue) -> NodeOptions {
94    self.declared_parameters.push(Parameter {
95      name: name.to_owned(),
96      value,
97    });
98    // TODO: check for duplicate parameter names
99    self
100  }
101
102  pub fn parameter_validator(mut self, validator: Box<ParameterFunc>) -> NodeOptions {
103    self.parameter_validator = Some(validator);
104    self
105  }
106
107  pub fn parameter_set_action(mut self, action: Box<ParameterFunc>) -> NodeOptions {
108    self.parameter_set_action = Some(action);
109    self
110  }
111}
112
113impl Default for NodeOptions {
114  fn default() -> Self {
115    Self::new()
116  }
117}
118// ----------------------------------------------------------------------------------------------------
119// ----------------------------------------------------------------------------------------------------
120
121/// DDS or ROS 2 Discovery events.
122#[derive(Clone, Debug)]
123pub enum NodeEvent {
124  DDS(DomainParticipantStatusEvent),
125  ROS(ParticipantEntitiesInfo),
126}
127
128struct ParameterServers {
129  get_parameters_server: Server<rcl_interfaces::GetParametersService>,
130  get_parameter_types_server: Server<rcl_interfaces::GetParameterTypesService>,
131  list_parameters_server: Server<rcl_interfaces::ListParametersService>,
132  set_parameters_server: Server<rcl_interfaces::SetParametersService>,
133  set_parameters_atomically_server: Server<rcl_interfaces::SetParametersAtomicallyService>,
134  describe_parameters_server: Server<rcl_interfaces::DescribeParametersService>,
135}
136
137// ----------------------------------------------------------------------------------------------------
138// ----------------------------------------------------------------------------------------------------
139/// Spinner implements Node's background event loop.
140///
141/// At the moment there are only Discovery (DDS and ROS 2 Graph) event
142/// processing, but this would be extended to handle Parameters and other
143/// possible background tasks also.
144pub struct Spinner {
145  ros_context: Context,
146  stop_spin_receiver: async_channel::Receiver<()>,
147
148  readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
149  writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
150  // Keep track of ros_discovery_info
151  external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
152  //suppress_node_info_updates: Arc<AtomicBool>, // temporarily suppress sending updates
153  status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
154
155  use_sim_time: Arc<AtomicBool>,
156  sim_time: Arc<Mutex<ROSTime>>,
157  clock_topic: Topic,
158  allow_undeclared_parameters: bool,
159
160  parameter_servers: Option<ParameterServers>,
161  parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
162  parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
163  parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
164  parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
165  fully_qualified_node_name: String,
166}
167
168async fn next_if_some<S>(s: &mut Option<S>) -> S::Item
169where
170  S: Stream + Unpin + FusedStream,
171{
172  match s.as_mut() {
173    Some(stream) => stream.select_next_some().await,
174    None => std::future::pending().await,
175  }
176}
177
178impl Spinner {
179  pub async fn spin(self) -> CreateResult<()> {
180    info!("Starting Spinner for {}", &self.fully_qualified_node_name);
181    let dds_status_listener = self.ros_context.domain_participant().status_listener();
182    let dds_status_stream = dds_status_listener.as_async_status_stream();
183    pin_mut!(dds_status_stream);
184
185    let ros_discovery_topic = self.ros_context.ros_discovery_topic();
186    let ros_discovery_reader = self
187      .ros_context
188      .create_subscription::<ParticipantEntitiesInfo>(&ros_discovery_topic, None)?;
189    let ros_discovery_stream = ros_discovery_reader.async_stream();
190    pin_mut!(ros_discovery_stream);
191
192    let ros_clock_reader = self
193      .ros_context
194      .create_subscription::<builtin_interfaces::Time>(&self.clock_topic, None)?;
195    let ros_clock_stream = ros_clock_reader.async_stream();
196    pin_mut!(ros_clock_stream);
197
198    // These are Option< impl Stream<_>>
199    let mut get_parameters_stream_opt = self
200      .parameter_servers
201      .as_ref()
202      .map(|s| s.get_parameters_server.receive_request_stream());
203    let mut get_parameter_types_stream_opt = self
204      .parameter_servers
205      .as_ref()
206      .map(|s| s.get_parameter_types_server.receive_request_stream());
207    let mut set_parameters_stream_opt = self
208      .parameter_servers
209      .as_ref()
210      .map(|s| s.set_parameters_server.receive_request_stream());
211    let mut set_parameters_atomically_stream_opt = self
212      .parameter_servers
213      .as_ref()
214      .map(|s| s.set_parameters_atomically_server.receive_request_stream());
215    let mut list_parameter_stream_opt = self
216      .parameter_servers
217      .as_ref()
218      .map(|s| s.list_parameters_server.receive_request_stream());
219    let mut describe_parameters_stream_opt = self
220      .parameter_servers
221      .as_ref()
222      .map(|s| s.describe_parameters_server.receive_request_stream());
223
224    info!("Spinner {} initialized", &self.fully_qualified_node_name);
225
226    loop {
227      futures::select! {
228        _ = self.stop_spin_receiver.recv().fuse() => {
229          break;
230        }
231
232        clock_msg = ros_clock_stream.select_next_some() => {
233          match clock_msg {
234            Ok((time,_msg_info)) => {
235              // Simulated time is updated internally unconditionally.
236              // The logic in Node decides if it is used.
237              *self.sim_time.lock().unwrap() = time.into();
238            }
239            Err(e) => warn!("Simulated clock receive error {e:?}")
240          }
241        }
242
243
244        get_parameters_request = next_if_some(&mut get_parameters_stream_opt).fuse() => {
245          match get_parameters_request {
246            Ok( (req_id, req) ) => {
247              info!("Get parameter request {req:?}");
248              let values = {
249                let param_db = self.parameters.lock().unwrap();
250                req.names.iter()
251                  .map(|name| param_db.get(name.as_str())
252                    .unwrap_or(&ParameterValue::NotSet))
253                  .cloned()
254                  .map( raw::ParameterValue::from)
255                  .collect()
256              };
257              info!("Get parameters response: {values:?}");
258
259              // .unwrap() below should be safe, as we would not be here if the Server did not exist
260              self.parameter_servers.as_ref().unwrap().get_parameters_server
261                .async_send_response(req_id, rcl_interfaces::GetParametersResponse{ values })
262                .await
263                .unwrap_or_else(|e| warn!("GetParameter response error {e:?}"));
264            }
265            Err(e) => warn!("GetParameter request error {e:?}"),
266          }
267        }
268
269        get_parameter_types_request = next_if_some(&mut get_parameter_types_stream_opt).fuse() => {
270          match get_parameter_types_request {
271            Ok( (req_id, req) ) => {
272              warn!("Get parameter types request");
273              let values = {
274                let param_db = self.parameters.lock().unwrap();
275                req.names.iter()
276                  .map(|name| param_db.get(name.as_str())
277                    .unwrap_or(&ParameterValue::NotSet))
278                  .map(ParameterValue::to_parameter_type_raw)
279                  .collect()
280              };
281              info!("Get parameter types response: {values:?}");
282              // .unwrap() below should be safe, as we would not be here if the Server did not exist
283              self.parameter_servers.as_ref().unwrap().get_parameter_types_server
284                .async_send_response(req_id, rcl_interfaces::GetParameterTypesResponse{ values })
285                .await
286                .unwrap_or_else(|e| warn!("GetParameterTypes response error {e:?}"));
287            }
288            Err(e) => warn!("GetParameterTypes request error {e:?}"),
289          }
290        }
291
292        set_parameters_request = next_if_some(&mut set_parameters_stream_opt).fuse() => {
293          match set_parameters_request {
294            Ok( (req_id, req) ) => {
295              info!("Set parameter request {req:?}");
296              let results =
297                req.parameter.iter()
298                  .cloned()
299                  .map( Parameter::from ) // convert from "raw::Parameter"
300                  .map( |Parameter{name, value}| self.set_parameter(&name,value))
301                  .map(|r| r.into()) // to "raw" Result for serialization
302                  .collect();
303              info!("Set parameters response: {results:?}");
304              // .unwrap() below should be safe, as we would not be here if the Server did not exist
305              self.parameter_servers.as_ref().unwrap().set_parameters_server
306                .async_send_response(req_id, rcl_interfaces::SetParametersResponse{ results })
307                .await
308                .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
309            }
310            Err(e) => warn!("SetParameters request error {e:?}"),
311          }
312        }
313
314        set_parameters_atomically_request = next_if_some(&mut set_parameters_atomically_stream_opt).fuse() => {
315          match set_parameters_atomically_request {
316            Ok( (req_id, req) ) => {
317              warn!("Set parameters atomically request {req:?}");
318              let results =
319                req.parameter.iter()
320                  .cloned()
321                  .map( Parameter::from ) // convert from "raw::Parameter"
322                  .map( |Parameter{ .. } |
323                      // TODO: Implement atomic setting.
324                      Err("Setting parameters atomically is not implemented.".to_owned())
325                    )
326                  .map(|r| r.into()) // to "raw" Result for serialization
327                  .collect();
328              warn!("Set parameters atomically response: {results:?}");
329              // .unwrap() below should be safe, as we would not be here if the Server did not exist
330              self.parameter_servers.as_ref().unwrap().set_parameters_atomically_server
331                .async_send_response(req_id, rcl_interfaces::SetParametersAtomicallyResponse{ results })
332                .await
333                .unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
334            }
335            Err(e) => warn!("SetParametersAtomically request error {e:?}"),
336          }
337        }
338
339        list_parameter_request = next_if_some(&mut list_parameter_stream_opt).fuse() => {
340          match list_parameter_request {
341            Ok( (req_id, req) ) => {
342              info!("List parameters request");
343              let prefixes = req.prefixes;
344              // TODO: We only generate the "names" part of the ListParametersResponse
345              // What should we put into `prefixes` ?
346              let names = {
347                let param_db = self.parameters.lock().unwrap();
348                param_db.keys()
349                  .filter_map(|name|
350                    if prefixes.is_empty() ||
351                      prefixes.iter().any(|prefix| name.starts_with(prefix))
352                    {
353                      Some(name.clone())
354                    } else { None }
355                  )
356                  .collect()
357              };
358              let result = rcl_interfaces::ListParametersResult{ names, prefixes: vec![] };
359              // .unwrap() below should be safe, as we would not be here if the Server did not exist
360              info!("List parameters response: {result:?}");
361              self.parameter_servers.as_ref().unwrap().list_parameters_server
362                .async_send_response(req_id, rcl_interfaces::ListParametersResponse{ result })
363                .await
364                .unwrap_or_else(|e| warn!("ListParameter response error {e:?}"));
365            }
366            Err(e) => warn!("ListParameter request error {e:?}"),
367          }
368        }
369
370        describe_parameters_request = next_if_some(&mut describe_parameters_stream_opt).fuse() => {
371          match describe_parameters_request {
372            Ok( (req_id, req) ) => {
373              info!("Describe parameters request {req:?}");
374              let values = {
375                let parameters = self.parameters.lock().unwrap();
376                req.names.iter()
377                  .map( |name|
378                    {
379                      if let Some(value) = parameters.get(name) {
380                        ParameterDescriptor::from_value(name, value)
381                      } else {
382                        ParameterDescriptor::unknown(name)
383                      }
384                    })
385                  .map(|r| r.into()) // to "raw" Result for serialization
386                  .collect()
387              };
388              info!("Describe parameters response: {values:?}");
389              // .unwrap() below should be safe, as we would not be here if the Server did not exist
390              self.parameter_servers.as_ref().unwrap().describe_parameters_server
391                .async_send_response(req_id, rcl_interfaces::DescribeParametersResponse{ values })
392                .await
393                .unwrap_or_else(|e| warn!("DescribeParameters response error {e:?}"));
394            }
395            Err(e) => warn!("DescribeParameters request error {e:?}"),
396          }
397        }
398
399        participant_info_update = ros_discovery_stream.select_next_some() => {
400          //println!("{:?}", participant_info_update);
401          match participant_info_update {
402            Ok((part_update, _msg_info)) => {
403              // insert to Node-local ros_discovery_info bookkeeping
404              let mut info_map = self.external_nodes.lock().unwrap();
405              info_map.insert( part_update.gid, part_update.node_entities_info_seq.clone());
406              // also notify any status listeneners
407              self.send_status_event( &NodeEvent::ROS(part_update) );
408            }
409            Err(e) => {
410              warn!("ros_discovery_info error {e:?}");
411            }
412          }
413        }
414
415        dp_status_event = dds_status_stream.select_next_some() => {
416          //println!("{:?}", dp_status_event );
417
418          // update remote reader/writer databases
419          match dp_status_event {
420            DomainParticipantStatusEvent::RemoteReaderMatched { local_writer, remote_reader } => {
421              self.writers_to_remote_readers.lock().unwrap()
422                .entry(local_writer)
423                .and_modify(|s| {s.insert(remote_reader);} )
424                .or_insert(BTreeSet::from([remote_reader]));
425            }
426            DomainParticipantStatusEvent::RemoteWriterMatched { local_reader, remote_writer } => {
427              self.readers_to_remote_writers.lock().unwrap()
428                .entry(local_reader)
429                .and_modify(|s| {s.insert(remote_writer);} )
430                .or_insert(BTreeSet::from([remote_writer]));
431            }
432            DomainParticipantStatusEvent::ReaderLost {guid, ..} => {
433              for ( _local, readers)
434              in self.writers_to_remote_readers.lock().unwrap().iter_mut() {
435                readers.remove(&guid);
436              }
437            }
438            DomainParticipantStatusEvent::WriterLost {guid, ..} => {
439              for ( _local, writers)
440              in self.readers_to_remote_writers.lock().unwrap().iter_mut() {
441                writers.remove(&guid);
442              }
443            }
444
445            _ => {}
446          }
447
448          // also notify any status listeneners
449          self.send_status_event( &NodeEvent::DDS(dp_status_event) );
450        }
451      }
452    }
453    info!("Spinner {} exiting .spin()", &self.fully_qualified_node_name );
454    Ok(())
455    //}
456  } // fn
457
458  fn send_status_event(&self, event: &NodeEvent) {
459    let mut closed = Vec::new();
460    let mut sender_array = self.status_event_senders.lock().unwrap();
461    for (i, sender) in sender_array.iter().enumerate() {
462      match sender.try_send(event.clone()) {
463        Ok(()) => {
464          // expected result
465        }
466        Err(async_channel::TrySendError::Closed(_)) => {
467          // trace!("Closing {i}");
468          closed.push(i) // mark for deletion
469        }
470        Err(e) => {
471          debug!("send_status_event: Send error for {i}: {e:?}");
472          // We do not do anything about the error. It may be that the receiver
473          // is not interested and the channel is full.
474        }
475      }
476    }
477
478    // remove senders that reported they were closed
479    for c in closed.iter().rev() {
480      sender_array.swap_remove(*c);
481    }
482  }
483
484  // Keep this function in sync with the same function in Node.
485  fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
486    match name {
487      // built-in parameter check
488      "use_sim_time" => match value {
489        ParameterValue::Boolean(_) => Ok(()),
490        _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
491      },
492      // application-defined parameters
493      _ => {
494        match self.parameter_validator {
495          Some(ref v) => v.lock().unwrap()(name, value), // ask the validator to judge
496          None => Ok(()),                                // no validator defined, always accept
497        }
498      }
499    }
500  }
501
502  // Keep this function in sync with the same function in Node.
503  fn execute_parameter_set_actions(
504    &self,
505    name: &str,
506    value: &ParameterValue,
507  ) -> SetParametersResult {
508    match name {
509      "use_sim_time" => match value {
510        ParameterValue::Boolean(s) => {
511          self.use_sim_time.store(*s, Ordering::SeqCst);
512          Ok(())
513        }
514        _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
515      },
516      _ => {
517        match self.parameter_set_action {
518          Some(ref v) => v.lock().unwrap()(name, value), // execute custom action
519          None => Ok(()),                                // no action defined, always accept
520        }
521      }
522    }
523  }
524
525  /// Sets a parameter value. Parameter must be
526  /// [declared](NodeOptions::declare_parameter) before setting.
527  pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
528    let already_set = self.parameters.lock().unwrap().contains_key(name);
529    if self.allow_undeclared_parameters || already_set {
530      self.validate_parameter_on_set(name, &value)?;
531      self.execute_parameter_set_actions(name, &value)?;
532
533      // no errors, prepare for sending notificaiton
534      let p = raw::Parameter {
535        name: name.to_string(),
536        value: value.clone().into(),
537      };
538      let (new_parameters, changed_parameters) = if already_set {
539        (vec![], vec![p])
540      } else {
541        (vec![p], vec![])
542      };
543
544      // actually set the parameter
545      self
546        .parameters
547        .lock()
548        .unwrap()
549        .insert(name.to_owned(), value);
550      // and notify
551      self
552        .parameter_events_writer
553        .publish(raw::ParameterEvent {
554          timestamp: rustdds::Timestamp::now(), // differs from version in Node!!!
555          node: self.fully_qualified_node_name.clone(),
556          new_parameters,
557          changed_parameters,
558          deleted_parameters: vec![],
559        })
560        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
561      Ok(())
562    } else {
563      Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
564    }
565  }
566} // impl Spinner
567
568// ----------------------------------------------------------------------------------------------------
569// ----------------------------------------------------------------------------------------------------
570
571/// What went wrong in `Node` creation
572#[derive(Debug)]
573pub enum NodeCreateError {
574  DDS(CreateError),
575  BadParameter(String),
576}
577
578impl From<CreateError> for NodeCreateError {
579  fn from(c: CreateError) -> NodeCreateError {
580    NodeCreateError::DDS(c)
581  }
582}
583
584impl fmt::Display for NodeCreateError {
585  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
586    match self {
587      Self::DDS(create_error) => write!(f, "NodeCreateError::DDS : {create_error}"),
588      Self::BadParameter(s) => write!(f, "NodeCreateError::BadParameter : {s}"),
589    }
590  }
591}
592
593impl Error for NodeCreateError {
594  fn source(&self) -> Option<&(dyn Error + 'static)> {
595    match self {
596      Self::DDS(create_error) => Some(create_error),
597      Self::BadParameter(_) => None,
598    }
599  }
600}
601
602/// Error when setting `Parameter`s
603pub enum ParameterError {
604  AlreadyDeclared,
605  InvalidName,
606}
607
608// TODO: We should notify ROS discovery when readers or writers are removed, but
609// now we do not do that.
610
611/// Node in ROS2 network. Holds necessary readers and writers for rosout and
612/// parameter events topics internally.
613///
614/// These are produced by a [`Context`].
615///
616/// Many ROS 2 background tasks do not run unless you execute a [`Spinner`] for
617/// a `Node`. If you are using an async executor, consider running e.g.
618/// `
619/// smol::spawn(node.spinner().unwrap().spin()).detach();
620/// `
621pub struct Node {
622  node_name: NodeName,
623  options: NodeOptions,
624
625  pub(crate) ros_context: Context,
626
627  // sets of Readers and Writers belonging to ( = created via) this Node
628  // These indicate what has been created locally.
629  readers: BTreeSet<Gid>,
630  writers: BTreeSet<Gid>,
631
632  suppress_node_info_updates: Arc<AtomicBool>,
633  // temporarily suppress sending updates
634  // to prevent flood of messages. TODO: not shared: need not be atomic or Arc.
635
636  // Keep track of who is matched via DDS Discovery
637  // Map keys are lists of local Subscriptions and Publishers.
638  // Map values are lists of matched Publishers / Subscriptions.
639  readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
640  writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
641
642  // Keep track of ros_discovery_info
643  external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
644  stop_spin_sender: Option<async_channel::Sender<()>>,
645
646  // Channels to report discovery events to
647  status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
648
649  // builtin writers and readers
650  rosout_writer: Arc<Option<Publisher<Log>>>,
651  rosout_reader: Option<Subscription<Log>>,
652
653  // Parameter events (rcl_interfaces)
654  // Parameter Services are inside Spinner
655  parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
656
657  // Parameter store
658  parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
659  // allow_undeclared_parameters: bool, // this is inside "options"
660  parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
661  parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
662
663  // simulated ROSTime
664  use_sim_time: Arc<AtomicBool>,
665  sim_time: Arc<Mutex<ROSTime>>,
666}
667
668impl Node {
669  pub(crate) fn new(
670    node_name: NodeName,
671    mut options: NodeOptions,
672    ros_context: Context,
673  ) -> Result<Node, NodeCreateError> {
674    let paramtopic = ros_context.get_parameter_events_topic();
675    let rosout_topic = ros_context.get_rosout_topic();
676
677    let enable_rosout = options.enable_rosout;
678    let rosout_reader = options.enable_rosout_reading;
679
680    let parameter_events_writer = ros_context.create_publisher(&paramtopic, None)?;
681
682    // TODO: If there are duplicates, the later one will overwrite the earlier, but
683    // there is no warning or error.
684    options.declared_parameters.push(Parameter {
685      name: "use_sim_time".to_string(),
686      value: ParameterValue::Boolean(false),
687    });
688    let parameters = options
689      .declared_parameters
690      .iter()
691      .cloned()
692      .map(|Parameter { name, value }| (name, value))
693      .collect::<BTreeMap<String, ParameterValue>>();
694
695    let parameter_validator = options
696      .parameter_validator
697      .take()
698      .map(|b| Arc::new(Mutex::new(b)));
699    let parameter_set_action = options
700      .parameter_set_action
701      .take()
702      .map(|b| Arc::new(Mutex::new(b)));
703
704    let mut node = Node {
705      node_name,
706      options,
707      ros_context,
708      readers: BTreeSet::new(),
709      writers: BTreeSet::new(),
710      readers_to_remote_writers: Arc::new(Mutex::new(BTreeMap::new())),
711      writers_to_remote_readers: Arc::new(Mutex::new(BTreeMap::new())),
712      external_nodes: Arc::new(Mutex::new(BTreeMap::new())),
713      suppress_node_info_updates: Arc::new(AtomicBool::new(false)),
714      stop_spin_sender: None,
715      status_event_senders: Arc::new(Mutex::new(Vec::new())),
716      rosout_writer: Arc::new(None), // Set below
717      rosout_reader: None,
718      parameter_events_writer: Arc::new(parameter_events_writer),
719      parameters: Arc::new(Mutex::new(parameters)),
720      parameter_validator,
721      parameter_set_action,
722      use_sim_time: Arc::new(AtomicBool::new(false)),
723      sim_time: Arc::new(Mutex::new(ROSTime::ZERO)),
724    };
725
726    node.suppress_node_info_updates(true);
727
728    node.rosout_writer = if enable_rosout {
729      Arc::new(Some(
730        // topic already has QoS defined
731        node.create_publisher(&rosout_topic, None)?,
732      ))
733    } else {
734      Arc::new(None) // FIXME: we already set that above!
735    };
736    node.rosout_reader = if rosout_reader {
737      Some(node.create_subscription(&rosout_topic, None)?)
738    } else {
739      None
740    };
741
742    // returns `Err` if some parameter does not validate.
743    node
744      .parameters
745      .lock()
746      .unwrap()
747      .iter()
748      .try_for_each(|(name, value)| {
749        node.validate_parameter_on_set(name, value)?;
750        node.execute_parameter_set_actions(name, value)?;
751        Ok(())
752      })
753      .map_err(NodeCreateError::BadParameter)?;
754
755    node.suppress_node_info_updates(false);
756
757    Ok(node)
758  }
759
760  /// Return the ROSTime
761  ///
762  /// It is either the system clock time
763  pub fn time_now(&self) -> ROSTime {
764    if self.use_sim_time.load(Ordering::SeqCst) {
765      *self.sim_time.lock().unwrap()
766    } else {
767      ROSTime::now()
768    }
769  }
770
771  pub fn time_now_not_simulated(&self) -> ROSTime {
772    ROSTime::now()
773  }
774
775  /// Create a Spinner object to execute Node backround tasks.
776  ///
777  /// An async task should then be created to run the `.spin()` function of
778  /// `Spinner`.
779  ///
780  /// E.g. `executor.spawn(node.spinner().spin())`
781  ///
782  /// The `.spin()` task runs until `Node` is dropped.
783  pub fn spinner(&mut self) -> CreateResult<Spinner> {
784    if self.stop_spin_sender.is_some() {
785      panic!("Attempted to crate a second spinner.");
786    }
787    let (stop_spin_sender, stop_spin_receiver) = async_channel::bounded(1);
788    self.stop_spin_sender = Some(stop_spin_sender);
789
790    //TODO: Check QoS policies against ROS 2 specs or some refernce.
791    let service_qos = QosPolicyBuilder::new()
792      .reliability(policy::Reliability::Reliable {
793        max_blocking_time: Duration::from_millis(100),
794      })
795      .history(policy::History::KeepLast { depth: 1 })
796      .build();
797
798    let node_name = self.node_name.fully_qualified_name();
799
800    self.suppress_node_info_updates(true);
801
802    let parameter_servers = if self.options.start_parameter_services {
803      let service_mapping = ServiceMapping::Enhanced; //TODO: parameterize
804      let get_parameters_server = self.create_server(
805        service_mapping,
806        &Name::new(&node_name, "get_parameters").unwrap(),
807        &ServiceTypeName::new("rcl_interfaces", "GetParameters"),
808        service_qos.clone(),
809        service_qos.clone(),
810      )?;
811      let get_parameter_types_server = self.create_server(
812        service_mapping,
813        &Name::new(&node_name, "get_parameter_types").unwrap(),
814        &ServiceTypeName::new("rcl_interfaces", "GetParameterTypes"),
815        service_qos.clone(),
816        service_qos.clone(),
817      )?;
818      let set_parameters_server = self.create_server(
819        service_mapping,
820        &Name::new(&node_name, "set_parameters").unwrap(),
821        &ServiceTypeName::new("rcl_interfaces", "SetParameters"),
822        service_qos.clone(),
823        service_qos.clone(),
824      )?;
825      let set_parameters_atomically_server = self.create_server(
826        service_mapping,
827        &Name::new(&node_name, "set_parameters_atomically").unwrap(),
828        &ServiceTypeName::new("rcl_interfaces", "SetParametersAtomically"),
829        service_qos.clone(),
830        service_qos.clone(),
831      )?;
832      let list_parameters_server = self.create_server(
833        service_mapping,
834        &Name::new(&node_name, "list_parameters").unwrap(),
835        &ServiceTypeName::new("rcl_interfaces", "ListParameters"),
836        service_qos.clone(),
837        service_qos.clone(),
838      )?;
839      let describe_parameters_server = self.create_server(
840        service_mapping,
841        &Name::new(&node_name, "describe_parameters").unwrap(),
842        &ServiceTypeName::new("rcl_interfaces", "DescribeParameters"),
843        service_qos.clone(),
844        service_qos.clone(),
845      )?;
846
847      Some(ParameterServers {
848        get_parameters_server,
849        get_parameter_types_server,
850        list_parameters_server,
851        set_parameters_server,
852        set_parameters_atomically_server,
853        describe_parameters_server,
854      })
855    } else {
856      None // No parameter services
857    };
858
859    let clock_topic = self.create_topic(
860      &Name::new("/", "clock").unwrap(),
861      MessageTypeName::new("builtin_interfaces", "Time"),
862      &DEFAULT_SUBSCRIPTION_QOS,
863    )?;
864
865    self.suppress_node_info_updates(false);
866
867    Ok(Spinner {
868      ros_context: self.ros_context.clone(),
869      stop_spin_receiver,
870      readers_to_remote_writers: Arc::clone(&self.readers_to_remote_writers),
871      writers_to_remote_readers: Arc::clone(&self.writers_to_remote_readers),
872      external_nodes: Arc::clone(&self.external_nodes),
873      status_event_senders: Arc::clone(&self.status_event_senders),
874      use_sim_time: Arc::clone(&self.use_sim_time),
875      sim_time: Arc::clone(&self.sim_time),
876      clock_topic,
877      parameter_servers,
878      parameter_events_writer: Arc::clone(&self.parameter_events_writer),
879      parameters: Arc::clone(&self.parameters),
880      allow_undeclared_parameters: self.options.allow_undeclared_parameters,
881      parameter_validator: self.parameter_validator.as_ref().map(Arc::clone),
882      parameter_set_action: self.parameter_set_action.as_ref().map(Arc::clone),
883      fully_qualified_node_name: self.fully_qualified_name(),
884    })
885  }
886
887  /// A heuristic to detect if a spinner has been created.
888  /// But this does still not guarantee that it is running, i.e.
889  /// an async excutor is runnning spinner.spin(), but this is the best we can
890  /// do.
891  pub fn have_spinner(&self) -> bool {
892    self.stop_spin_sender.is_some()
893  }
894
895  // Generates ROS2 node info from added readers and writers.
896  fn generate_node_info(&self) -> NodeEntitiesInfo {
897    let mut node_info = NodeEntitiesInfo::new(self.node_name.clone());
898
899    node_info.add_writer(Gid::from(self.parameter_events_writer.guid()));
900    if let Some(ref row) = *self.rosout_writer {
901      node_info.add_writer(Gid::from(row.guid()));
902    }
903
904    for reader in &self.readers {
905      node_info.add_reader(*reader);
906    }
907
908    for writer in &self.writers {
909      node_info.add_writer(*writer);
910    }
911
912    node_info
913  }
914
915  fn suppress_node_info_updates(&mut self, suppress: bool) {
916    self
917      .suppress_node_info_updates
918      .store(suppress, Ordering::SeqCst);
919
920    // Send updates when suppression ends
921    if !suppress {
922      self.ros_context.update_node(self.generate_node_info());
923    }
924  }
925
926  fn add_reader(&mut self, reader: Gid) {
927    self.readers.insert(reader);
928    if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
929      self.ros_context.update_node(self.generate_node_info());
930    }
931  }
932
933  fn add_writer(&mut self, writer: Gid) {
934    self.writers.insert(writer);
935    if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
936      self.ros_context.update_node(self.generate_node_info());
937    }
938  }
939
940  pub fn namespace(&self) -> &str {
941    self.node_name.namespace()
942  }
943
944  pub fn fully_qualified_name(&self) -> String {
945    self.node_name.fully_qualified_name()
946  }
947
948  pub fn options(&self) -> &NodeOptions {
949    &self.options
950  }
951
952  pub fn domain_id(&self) -> u16 {
953    self.ros_context.domain_id()
954  }
955
956  // ///////////////////////////////////////////////
957  // Parameters
958
959  pub fn undeclare_parameter(&self, name: &str) {
960    let prev_value = self.parameters.lock().unwrap().remove(name);
961
962    if let Some(deleted_param) = prev_value {
963      // a parameter was actually undeclared. Let others know.
964      self
965        .parameter_events_writer
966        .publish(raw::ParameterEvent {
967          timestamp: self.time_now().into(),
968          node: self.fully_qualified_name(),
969          new_parameters: vec![],
970          changed_parameters: vec![],
971          deleted_parameters: vec![raw::Parameter {
972            name: name.to_string(),
973            value: deleted_param.into(),
974          }],
975        })
976        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
977    }
978  }
979
980  /// Does the parameter exist?
981  pub fn has_parameter(&self, name: &str) -> bool {
982    self.parameters.lock().unwrap().contains_key(name)
983  }
984
985  /// Sets a parameter value. Parameter must be
986  /// [declared](NodeOptions::declare_parameter) before setting.
987  //
988  // TODO: This code is duplicated in Spinner. Not good.
989  // Find a way to de-duplicate.
990  // Same for validate_parameter_on_set and execute_parameter_set_actions.
991  // TODO: This does not account for built-in parameters e.g. "use_sim_time".
992  // It thinks they are new on first set.
993  // TODO: Setting Parameter to type NotSet counts as parameter deletion. Maybe
994  // that needs special handling? At least for notifications.
995  pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
996    let already_set = self.parameters.lock().unwrap().contains_key(name);
997    if self.options.allow_undeclared_parameters || already_set {
998      self.validate_parameter_on_set(name, &value)?;
999      self.execute_parameter_set_actions(name, &value)?;
1000
1001      // no errors, prepare for sending notificaiton
1002      let p = raw::Parameter {
1003        name: name.to_string(),
1004        value: value.clone().into(),
1005      };
1006      let (new_parameters, changed_parameters) = if already_set {
1007        (vec![], vec![p])
1008      } else {
1009        (vec![p], vec![])
1010      };
1011
1012      // actually set the parameter
1013      self
1014        .parameters
1015        .lock()
1016        .unwrap()
1017        .insert(name.to_owned(), value);
1018      // and notify
1019      self
1020        .parameter_events_writer
1021        .publish(raw::ParameterEvent {
1022          timestamp: self.time_now().into(),
1023          node: self.fully_qualified_name(),
1024          new_parameters,
1025          changed_parameters,
1026          deleted_parameters: vec![],
1027        })
1028        .unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
1029      Ok(())
1030    } else {
1031      Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
1032    }
1033  }
1034
1035  pub fn allow_undeclared_parameters(&self) -> bool {
1036    self.options.allow_undeclared_parameters
1037  }
1038
1039  /// Gets the value of a parameter, or None is there is no such Parameter.
1040  pub fn get_parameter(&self, name: &str) -> Option<ParameterValue> {
1041    self
1042      .parameters
1043      .lock()
1044      .unwrap()
1045      .get(name)
1046      .map(|p| p.to_owned())
1047  }
1048
1049  pub fn list_parameters(&self) -> Vec<String> {
1050    self
1051      .parameters
1052      .lock()
1053      .unwrap()
1054      .keys()
1055      .map(move |k| k.to_owned())
1056      .collect::<Vec<_>>()
1057  }
1058
1059  // Keep this function in sync with the same function in Spinner.
1060  // TODO: This should refuse to change parameter type, unless
1061  // there is a ParamaterDescription defined and it allows
1062  // changing type.
1063  // TODO: Setting Parameter to type NotSet counts as parameter deletion. Maybe
1064  // that needs special handling?
1065  fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
1066    match name {
1067      // built-in parameter check
1068      "use_sim_time" => match value {
1069        ParameterValue::Boolean(_) => Ok(()),
1070        _ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
1071      },
1072      // application-defined parameters
1073      _ => {
1074        match self.parameter_validator {
1075          Some(ref v) => v.lock().unwrap()(name, value), // ask the validator to judge
1076          None => Ok(()),                                // no validator defined, always accept
1077        }
1078      }
1079    }
1080  }
1081
1082  // Keep this function in sync with the same function in Spinner.
1083  fn execute_parameter_set_actions(
1084    &self,
1085    name: &str,
1086    value: &ParameterValue,
1087  ) -> SetParametersResult {
1088    match name {
1089      "use_sim_time" => match value {
1090        ParameterValue::Boolean(s) => {
1091          self.use_sim_time.store(*s, Ordering::SeqCst);
1092          Ok(())
1093        }
1094        _ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
1095      },
1096      _ => {
1097        match self.parameter_set_action {
1098          Some(ref v) => v.lock().unwrap()(name, value), // execute custom action
1099          None => Ok(()),                                // no action defined, always accept
1100        }
1101      }
1102    }
1103  }
1104
1105  // ///////////////////////////////////////////////////
1106
1107  /// Get an async Receiver for discovery events.
1108  ///
1109  /// There must be an async task executing `spin` to get any data.
1110  /// This function may panic if there is no Spinner running.
1111  pub fn status_receiver(&self) -> Receiver<NodeEvent> {
1112    if self.have_spinner() {
1113      let (status_event_sender, status_event_receiver) = async_channel::bounded(8);
1114      self
1115        .status_event_senders
1116        .lock()
1117        .unwrap()
1118        .push(status_event_sender);
1119      status_event_receiver
1120    } else {
1121      panic!("status_receiver() cannot set up a receiver, because no Spinner is running.")
1122    }
1123  }
1124
1125  // reader waits for at least one writer to be present
1126  pub(crate) fn wait_for_writer(&self, reader: GUID) -> impl Future<Output = ()> {
1127    // TODO: This may contain some synchrnoization hazard
1128    let status_receiver = self.status_receiver();
1129
1130    let already_present = self
1131      .readers_to_remote_writers
1132      .lock()
1133      .unwrap()
1134      .get(&reader)
1135      .map(|writers| !writers.is_empty()) // there is someone matched
1136      .unwrap_or(false); // we do not even know the reader
1137
1138    if already_present {
1139      WriterWait::Ready
1140    } else {
1141      WriterWait::Wait {
1142        this_reader: reader,
1143        status_event_stream: Box::pin(status_receiver),
1144      }
1145    }
1146  }
1147
1148  pub(crate) fn wait_for_reader(&self, writer: GUID) -> impl Future<Output = ()> {
1149    // TODO: This may contain some synchrnoization hazard.
1150    let status_receiver = self.status_receiver();
1151
1152    let already_present = self
1153      .writers_to_remote_readers
1154      .lock()
1155      .unwrap()
1156      .get(&writer)
1157      .map(|readers| !readers.is_empty()) // there is someone matched
1158      .unwrap_or(false); // we do not even know who is asking
1159
1160    // TODO: Is is possible to miss reader events if they appear after the check
1161    // above, but do not somehow end up in the status_receiver stream?
1162
1163    if already_present {
1164      info!("wait_for_reader: Already have matched a reader.");
1165      ReaderWait::Ready
1166    } else {
1167      ReaderWait::Wait {
1168        this_writer: writer,
1169        status_event_stream: Box::pin(status_receiver),
1170      }
1171    }
1172  }
1173
1174  pub(crate) fn get_publisher_count(&self, subscription_guid: GUID) -> usize {
1175    self
1176      .readers_to_remote_writers
1177      .lock()
1178      .unwrap()
1179      .get(&subscription_guid)
1180      .map(BTreeSet::len)
1181      .unwrap_or_else(|| {
1182        error!("get_publisher_count: Subscriber {subscription_guid:?} not known to node.");
1183        0
1184      })
1185  }
1186
1187  pub(crate) fn get_subscription_count(&self, publisher_guid: GUID) -> usize {
1188    self
1189      .writers_to_remote_readers
1190      .lock()
1191      .unwrap()
1192      .get(&publisher_guid)
1193      .map(BTreeSet::len)
1194      .unwrap_or_else(|| {
1195        error!("get_subscription_count: Publisher {publisher_guid:?} not known to node.");
1196        0
1197      })
1198  }
1199
1200  /// Borrow the Subscription to our ROSOut Reader.
1201  ///
1202  /// Availability depends on Node configuration.
1203  pub fn rosout_subscription(&self) -> Option<&Subscription<Log>> {
1204    self.rosout_reader.as_ref()
1205  }
1206
1207  /// Creates ROS2 topic and handles necessary conversions from DDS to ROS2
1208  ///
1209  /// # Arguments
1210  ///
1211  /// * `domain_participant` -
1212  ///   [DomainParticipant](../dds/struct.DomainParticipant.html)
1213  /// * `name` - Name of the topic
1214  /// * `type_name` - What type the topic holds in string form
1215  /// * `qos` - Quality of Service parameters for the topic (not restricted only
1216  ///   to ROS2)
1217  ///
1218  ///  
1219  ///   [summary of all rules for topic and service names in ROS 2](https://design.ros2.org/articles/topic_and_service_names.html)
1220  ///   (as of Dec 2020)
1221  ///
1222  /// * must not be empty
1223  /// * may contain alphanumeric characters ([0-9|a-z|A-Z]), underscores (_), or
1224  ///   forward slashes (/)
1225  /// * may use balanced curly braces ({}) for substitutions
1226  /// * may start with a tilde (~), the private namespace substitution character
1227  /// * must not start with a numeric character ([0-9])
1228  /// * must not end with a forward slash (/)
1229  /// * must not contain any number of repeated forward slashes (/)
1230  /// * must not contain any number of repeated underscores (_)
1231  /// * must separate a tilde (~) from the rest of the name with a forward slash
1232  ///   (/), i.e. ~/foo not ~foo
1233  /// * must have balanced curly braces ({}) when used, i.e. {sub}/foo but not
1234  ///   {sub/foo nor /foo}
1235  pub fn create_topic(
1236    &self,
1237    topic_name: &Name,
1238    type_name: MessageTypeName,
1239    qos: &QosPolicies,
1240  ) -> CreateResult<Topic> {
1241    let dds_name = topic_name.to_dds_name("rt", &self.node_name, "");
1242    self.ros_context.create_topic(dds_name, type_name, qos)
1243  }
1244
1245  /// Creates ROS2 Subscriber
1246  ///
1247  /// # Arguments
1248  ///
1249  /// * `topic` - Reference to topic created with `create_ros_topic`.
1250  /// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if
1251  ///   it's compatible with topics QOS. `None` indicates the use of Topics QOS.
1252  pub fn create_subscription<D: 'static>(
1253    &mut self,
1254    topic: &Topic,
1255    qos: Option<QosPolicies>,
1256  ) -> CreateResult<Subscription<D>> {
1257    let sub = self.ros_context.create_subscription(topic, qos)?;
1258    self.add_reader(sub.guid().into());
1259    Ok(sub)
1260  }
1261
1262  /// Creates ROS2 Publisher
1263  ///
1264  /// # Arguments
1265  ///
1266  /// * `topic` - Reference to topic created with `create_ros_topic`.
1267  /// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use it
1268  ///   if it's compatible with topics QOS. `None` indicates the use of Topics
1269  ///   QOS.
1270  pub fn create_publisher<D: Serialize>(
1271    &mut self,
1272    topic: &Topic,
1273    qos: Option<QosPolicies>,
1274  ) -> CreateResult<Publisher<D>> {
1275    let p = self.ros_context.create_publisher(topic, qos)?;
1276    self.add_writer(p.guid().into());
1277    Ok(p)
1278  }
1279
1280  pub(crate) fn create_simpledatareader<D, DA>(
1281    &mut self,
1282    topic: &Topic,
1283    qos: Option<QosPolicies>,
1284  ) -> CreateResult<no_key::SimpleDataReader<D, DA>>
1285  where
1286    D: 'static,
1287    DA: rustdds::no_key::DeserializerAdapter<D> + 'static,
1288  {
1289    let r = self.ros_context.create_simpledatareader(topic, qos)?;
1290    self.add_reader(r.guid().into());
1291    Ok(r)
1292  }
1293
1294  pub(crate) fn create_datawriter<D, SA>(
1295    &mut self,
1296    topic: &Topic,
1297    qos: Option<QosPolicies>,
1298  ) -> CreateResult<no_key::DataWriter<D, SA>>
1299  where
1300    SA: rustdds::no_key::SerializerAdapter<D>,
1301  {
1302    let w = self.ros_context.create_datawriter(topic, qos)?;
1303    self.add_writer(w.guid().into());
1304    Ok(w)
1305  }
1306
1307  /// Creates ROS2 Service Client
1308  ///
1309  /// # Arguments
1310  ///
1311  /// * `service_mapping` - ServiceMapping to be used
1312  /// * `service_name` -
1313  /// * `qos`-
1314  pub fn create_client<S>(
1315    &mut self,
1316    service_mapping: ServiceMapping,
1317    service_name: &Name,
1318    service_type_name: &ServiceTypeName,
1319    request_qos: QosPolicies,
1320    response_qos: QosPolicies,
1321  ) -> CreateResult<Client<S>>
1322  where
1323    S: Service + 'static,
1324    S::Request: Clone,
1325  {
1326    // Add rq/ and rr/ prefixes as documented in
1327    // https://design.ros2.org/articles/topic_and_service_names.html
1328    // Where are the suffixes documented?
1329    // And why "Reply" and not "Response" ?
1330
1331    let rq_topic = self.ros_context.domain_participant().create_topic(
1332      service_name.to_dds_name("rq", &self.node_name, "Request"),
1333      //rq_name,
1334      service_type_name.dds_request_type(),
1335      &request_qos,
1336      TopicKind::NoKey,
1337    )?;
1338    let rs_topic = self.ros_context.domain_participant().create_topic(
1339      service_name.to_dds_name("rr", &self.node_name, "Reply"),
1340      //rs_name,
1341      service_type_name.dds_response_type(),
1342      &response_qos,
1343      TopicKind::NoKey,
1344    )?;
1345
1346    let c = Client::<S>::new(
1347      service_mapping,
1348      self,
1349      &rq_topic,
1350      &rs_topic,
1351      Some(request_qos),
1352      Some(response_qos),
1353    )?;
1354
1355    Ok(c)
1356  }
1357
1358  /// Creates ROS2 Service Server
1359  ///
1360  /// # Arguments
1361  ///
1362  /// * `service_mapping` - ServiceMapping to be used. See
1363  ///   [`Self.create_client`].
1364  /// * `service_name` -
1365  /// * `qos`-
1366  pub fn create_server<S>(
1367    &mut self,
1368    service_mapping: ServiceMapping,
1369    service_name: &Name,
1370    service_type_name: &ServiceTypeName,
1371    request_qos: QosPolicies,
1372    response_qos: QosPolicies,
1373  ) -> CreateResult<Server<S>>
1374  where
1375    S: Service + 'static,
1376    S::Request: Clone,
1377  {
1378    // let rq_name = Self::check_name_and_add_prefix("rq/",
1379    // &(service_name.to_owned() + "Request"))?; let rs_name =
1380    // Self::check_name_and_add_prefix("rr/", &(service_name.to_owned() +
1381    // "Reply"))?;
1382
1383    let rq_topic = self.ros_context.domain_participant().create_topic(
1384      //rq_name,
1385      service_name.to_dds_name("rq", &self.node_name, "Request"),
1386      service_type_name.dds_request_type(),
1387      &request_qos,
1388      TopicKind::NoKey,
1389    )?;
1390    let rs_topic = self.ros_context.domain_participant().create_topic(
1391      service_name.to_dds_name("rr", &self.node_name, "Reply"),
1392      service_type_name.dds_response_type(),
1393      &response_qos,
1394      TopicKind::NoKey,
1395    )?;
1396
1397    let s = Server::<S>::new(
1398      service_mapping,
1399      self,
1400      &rq_topic,
1401      &rs_topic,
1402      Some(request_qos),
1403      Some(response_qos),
1404    )?;
1405
1406    Ok(s)
1407  }
1408
1409  pub fn create_action_client<A>(
1410    &mut self,
1411    service_mapping: ServiceMapping,
1412    action_name: &Name,
1413    action_type_name: &ActionTypeName,
1414    action_qos: ActionClientQosPolicies,
1415  ) -> CreateResult<ActionClient<A>>
1416  where
1417    A: ActionTypes + 'static,
1418  {
1419    // action name is e.g. "/turtle1/rotate_absolute"
1420    // action type name is e.g. "turtlesim/action/RotateAbsolute"
1421    let services_base_name = action_name.push("_action");
1422
1423    //let goal_service_name = action_name.to_owned() + "/_action/send_goal";
1424    let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1425    let my_goal_client = self.create_client(
1426      service_mapping,
1427      //&goal_service_name,
1428      &services_base_name.push("send_goal"),
1429      &goal_service_type,
1430      action_qos.goal_service.clone(),
1431      action_qos.goal_service,
1432    )?;
1433
1434    //let cancel_service_name = action_name.to_owned() + "/_action/cancel_goal";
1435    let cancel_goal_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1436    let my_cancel_client = self.create_client(
1437      service_mapping,
1438      //&cancel_service_name,
1439      &services_base_name.push("cancel_goal"),
1440      &cancel_goal_type,
1441      action_qos.cancel_service.clone(),
1442      action_qos.cancel_service,
1443    )?;
1444
1445    //let result_service_name = action_name.to_owned() + "/_action/get_result";
1446    let result_service_type = action_type_name.dds_action_service("_GetResult");
1447    let my_result_client = self.create_client(
1448      service_mapping,
1449      //&result_service_name,
1450      &services_base_name.push("get_result"),
1451      &result_service_type,
1452      action_qos.result_service.clone(),
1453      action_qos.result_service,
1454    )?;
1455
1456    let action_topic_namespace = action_name.push("_action");
1457
1458    let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1459    let feedback_topic = self.create_topic(
1460      &action_topic_namespace.push("feedback"),
1461      feedback_topic_type,
1462      &action_qos.feedback_subscription,
1463    )?;
1464    let my_feedback_subscription =
1465      self.create_subscription(&feedback_topic, Some(action_qos.feedback_subscription))?;
1466
1467    //let status_topic_type = ;
1468    let status_topic = self.create_topic(
1469      &action_topic_namespace.push("status"),
1470      MessageTypeName::new("action_msgs", "GoalStatusArray"),
1471      &action_qos.status_subscription,
1472    )?;
1473    let my_status_subscription =
1474      self.create_subscription(&status_topic, Some(action_qos.status_subscription))?;
1475
1476    Ok(ActionClient {
1477      my_goal_client,
1478      my_cancel_client,
1479      my_result_client,
1480      my_feedback_subscription,
1481      my_status_subscription,
1482      my_action_name: action_name.clone(),
1483    })
1484  }
1485
1486  pub fn create_action_server<A>(
1487    &mut self,
1488    service_mapping: ServiceMapping,
1489    action_name: &Name,
1490    action_type_name: &ActionTypeName,
1491    action_qos: ActionServerQosPolicies,
1492  ) -> CreateResult<ActionServer<A>>
1493  where
1494    A: ActionTypes + 'static,
1495  {
1496    let services_base_name = action_name.push("_action");
1497
1498    //let goal_service_name = action_name.to_owned() + "/_action/send_goal";
1499    let goal_service_type = action_type_name.dds_action_service("_SendGoal");
1500    let my_goal_server = self.create_server(
1501      service_mapping,
1502      //&goal_service_name,
1503      &services_base_name.push("send_goal"),
1504      &goal_service_type,
1505      action_qos.goal_service.clone(),
1506      action_qos.goal_service,
1507    )?;
1508
1509    //let cancel_service_name = action_name.to_owned() + "/_action/cancel_goal";
1510    let cancel_service_type = ServiceTypeName::new("action_msgs", "CancelGoal");
1511    let my_cancel_server = self.create_server(
1512      service_mapping,
1513      //&cancel_service_name,
1514      &services_base_name.push("cancel_goal"),
1515      &cancel_service_type,
1516      action_qos.cancel_service.clone(),
1517      action_qos.cancel_service,
1518    )?;
1519
1520    //let result_service_name = action_name.to_owned() + "/_action/get_result";
1521    let result_service_type = action_type_name.dds_action_service("_GetResult");
1522    let my_result_server = self.create_server(
1523      service_mapping,
1524      //&result_service_name,
1525      &services_base_name.push("get_result"),
1526      &result_service_type,
1527      action_qos.result_service.clone(),
1528      action_qos.result_service,
1529    )?;
1530
1531    let action_topic_namespace = action_name.push("_action");
1532
1533    let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
1534    let feedback_topic = self.create_topic(
1535      &action_topic_namespace.push("feedback"),
1536      feedback_topic_type,
1537      &action_qos.feedback_publisher,
1538    )?;
1539    let my_feedback_publisher =
1540      self.create_publisher(&feedback_topic, Some(action_qos.feedback_publisher))?;
1541
1542    let status_topic_type = MessageTypeName::new("action_msgs", "GoalStatusArray");
1543    let status_topic = self.create_topic(
1544      &action_topic_namespace.push("status"),
1545      status_topic_type,
1546      &action_qos.status_publisher,
1547    )?;
1548    let my_status_publisher =
1549      self.create_publisher(&status_topic, Some(action_qos.status_publisher))?;
1550
1551    Ok(ActionServer {
1552      my_goal_server,
1553      my_cancel_server,
1554      my_result_server,
1555      my_feedback_publisher,
1556      my_status_publisher,
1557      my_action_name: action_name.clone(),
1558    })
1559  }
1560
1561  /// Makes a handle to the `rosout` logging publisher.
1562  ///
1563  /// You can send these across threads
1564  pub fn logging_handle(&self) -> NodeLoggingHandle {
1565    NodeLoggingHandle {
1566      rosout_writer: Arc::clone(&self.rosout_writer),
1567      base_name: self.node_name.base_name().to_string(),
1568    }
1569  }
1570
1571  pub fn stop_spinner(&self) {
1572    info!("Signalling spinner to stop (manual)");
1573    if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1574      stop_spin_sender
1575        .try_send(())
1576        .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1577    }
1578  }
1579
1580} // impl Node
1581
1582impl Drop for Node {
1583  fn drop(&mut self) {
1584    debug!("Signalling spinner to stop (.drop)");
1585    if let Some(ref stop_spin_sender) = self.stop_spin_sender {
1586      stop_spin_sender
1587        .try_send(())
1588        .unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
1589    }
1590
1591    self
1592      .ros_context
1593      .remove_node(self.fully_qualified_name().as_str());
1594  }
1595}
1596
1597impl RosoutRaw for Node {
1598  fn rosout_writer(&self) -> Arc<Option<Publisher<Log>>> {
1599    Arc::clone(&self.rosout_writer)
1600  }
1601
1602  fn base_name(&self) -> &str {
1603    self.node_name.base_name()
1604  }
1605}
1606
1607/// Macro for writing to [rosout](https://wiki.ros.org/rosout) topic.
1608///
1609/// # Example
1610///
1611/// ```
1612/// # use ros2_client::*;
1613/// #
1614/// # let context = Context::new().unwrap();
1615/// # let mut node = context
1616/// #     .new_node(
1617/// #       NodeName::new("/", "some_node").unwrap(),
1618/// #       NodeOptions::new().enable_rosout(true),
1619/// #     )
1620/// #     .unwrap();
1621/// let kind = "silly";
1622///
1623/// rosout!(node, ros2::LogLevel::Info, "A {} event was seen.", kind);
1624/// ```
1625#[macro_export]
1626macro_rules! rosout {
1627    ($node:expr, $lvl:expr, $($arg:tt)+) => (
1628        $crate::rosout::RosoutRaw::rosout_raw(
1629            &$node,
1630            $crate::ros2::Timestamp::now(),
1631            $lvl,
1632            $crate::rosout::RosoutRaw::base_name(&$node),
1633            &std::format!($($arg)+), // msg
1634            std::file!(),
1635            "<unknown_func>", // is there a macro to get current function name? (Which may be undefined)
1636            std::line!(),
1637        );
1638    );
1639}
1640
1641/// Future type for waiting Readers to appear over ROS2 Topic.
1642///
1643/// Produced by `node.wait_for_reader(writer_guid)`
1644//
1645// This is implemented as a separate struct instead of just async function in
1646// Node so that it does not borrow the node and thus can be Send.
1647//use pin_project::pin_project;
1648#[must_use = "futures do nothing unless you `.await` or poll them"]
1649pub enum ReaderWait<'a> {
1650  // We need to wait for an event that is for us
1651  Wait {
1652    this_writer: GUID, // Writer who is waiting for Readers to appear
1653    status_event_stream: stream::BoxStream<'a, NodeEvent>,
1654  },
1655  // No need to wait, can resolve immediately.
1656  Ready,
1657}
1658
1659impl Future for ReaderWait<'_> {
1660  type Output = ();
1661
1662  fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1663    match *self {
1664      ReaderWait::Ready => Poll::Ready(()),
1665
1666      ReaderWait::Wait {
1667        this_writer,
1668        ref mut status_event_stream,
1669      } => {
1670        debug!("wait_for_reader: Waiting for a reader.");
1671        loop {
1672          match status_event_stream.poll_next_unpin(cx) {
1673            // Check if we have RemoteReaderMatched event and it is for this_writer
1674            Poll::Ready(Some(NodeEvent::DDS(
1675              DomainParticipantStatusEvent::RemoteReaderMatched {
1676                local_writer,
1677                remote_reader,
1678              },
1679            )))
1680              if local_writer == this_writer =>
1681            {
1682              debug!("wait_for_reader: Matched remote reader {remote_reader:?}.");
1683              return Poll::Ready(());
1684            }
1685
1686            Poll::Ready(_) => {
1687              // Received something else, such as other event or error
1688              debug!("wait_for_reader: other event. Continue polling.");
1689              // So we do nothing but go to the next iteration.
1690            }
1691
1692            Poll::Pending => return Poll::Pending,
1693          }
1694        }
1695      }
1696    }
1697  }
1698}
1699
1700/// Future type for waiting Writers to appear over ROS2 Topic.
1701///
1702/// Produced by `node.wait_for_writer(writer_guid)`
1703//
1704// This is implemented as a separate struct instead of just async function in
1705// Node so that it does not borrow the node and thus can be Send.
1706#[must_use = "futures do nothing unless you `.await` or poll them"]
1707pub enum WriterWait<'a> {
1708  // We need to wait for an event that is for us
1709  Wait {
1710    this_reader: GUID,
1711    status_event_stream: stream::BoxStream<'a, NodeEvent>,
1712  },
1713  // No need to wait, can resolve immediately.
1714  Ready,
1715}
1716
1717impl Future for WriterWait<'_> {
1718  type Output = ();
1719
1720  fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1721    match *self {
1722      WriterWait::Ready => Poll::Ready(()),
1723
1724      WriterWait::Wait {
1725        this_reader,
1726        ref mut status_event_stream,
1727      } => {
1728        debug!("wait_for_writer: Waiting for a writer.");
1729        loop {
1730          // We loop to pump events out of the stream until we get the desired event
1731          // or "Pending". If we stop at the first event, then there is no waker
1732          // installed and we are stuck.
1733          match status_event_stream.poll_next_unpin(cx) {
1734            // Check if we have RemoteWriterMatched event and it is for this_writer
1735            Poll::Ready(Some(NodeEvent::DDS(
1736              DomainParticipantStatusEvent::RemoteWriterMatched {
1737                local_reader,
1738                remote_writer,
1739              },
1740            )))
1741              if local_reader == this_reader =>
1742            {
1743              debug!("wait_for_writer: Matched remote writer {remote_writer:?}.");
1744              return Poll::Ready(());
1745            }
1746
1747            Poll::Ready(_) => {
1748              // Received something else, such as other event or error
1749              trace!("=== other writer. Continue polling.");
1750              // No return, go to next iteration.
1751            }
1752
1753            Poll::Pending => return Poll::Pending,
1754          }
1755        }
1756      }
1757    }
1758  }
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763  use crate::Context;
1764  use super::{Node, NodeName, NodeOptions};
1765
1766  #[test]
1767  fn node_is_sync() {
1768    let node = Node::new(
1769      NodeName::new("/", "base_name").unwrap(),
1770      NodeOptions::new(),
1771      Context::new().unwrap(),
1772    )
1773    .unwrap();
1774
1775    fn requires_send_sync<T: Send + Sync>(_t: T) {}
1776    requires_send_sync(node);
1777  }
1778}