use std::{
collections::{BTreeMap, BTreeSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use futures::{pin_mut, stream::FusedStream, FutureExt, Stream, StreamExt};
use async_channel::Receiver;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::Serialize;
use rustdds::{
dds::{CreateError, CreateResult},
*,
};
use crate::{
action::*,
builtin_interfaces,
context::{Context, DEFAULT_SUBSCRIPTION_QOS},
entities_info::{NodeEntitiesInfo, ParticipantEntitiesInfo},
gid::Gid,
log as ros_log,
log::Log,
names::*,
parameters::*,
pubsub::{Publisher, Subscription},
rcl_interfaces,
ros_time::ROSTime,
service::{Client, Server, Service, ServiceMapping},
};
type ParameterFunc = dyn Fn(&str, &ParameterValue) -> SetParametersResult + Send;
#[must_use]
pub struct NodeOptions {
#[allow(dead_code)]
cli_args: Vec<String>,
#[allow(dead_code)]
use_global_arguments: bool, enable_rosout: bool, enable_rosout_reading: bool,
start_parameter_services: bool,
declared_parameters: Vec<Parameter>,
allow_undeclared_parameters: bool,
parameter_validator: Option<Box<ParameterFunc>>,
parameter_set_action: Option<Box<ParameterFunc>>,
}
impl NodeOptions {
pub fn new() -> NodeOptions {
NodeOptions {
cli_args: Vec::new(),
use_global_arguments: true,
enable_rosout: true,
enable_rosout_reading: false,
start_parameter_services: true,
declared_parameters: Vec::new(),
allow_undeclared_parameters: false,
parameter_validator: None,
parameter_set_action: None,
}
}
pub fn enable_rosout(self, enable_rosout: bool) -> NodeOptions {
NodeOptions {
enable_rosout,
..self
}
}
pub fn read_rosout(self, enable_rosout_reading: bool) -> NodeOptions {
NodeOptions {
enable_rosout_reading,
..self
}
}
pub fn declare_parameter(mut self, name: &str, value: ParameterValue) -> NodeOptions {
self.declared_parameters.push(Parameter {
name: name.to_owned(),
value,
});
self
}
pub fn parameter_validator(mut self, validator: Box<ParameterFunc>) -> NodeOptions {
self.parameter_validator = Some(validator);
self
}
pub fn parameter_set_action(mut self, action: Box<ParameterFunc>) -> NodeOptions {
self.parameter_set_action = Some(action);
self
}
}
impl Default for NodeOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub enum NodeEvent {
DDS(DomainParticipantStatusEvent),
ROS(ParticipantEntitiesInfo),
}
struct ParameterServers {
get_parameters_server: Server<rcl_interfaces::GetParametersService>,
get_parameter_types_server: Server<rcl_interfaces::GetParameterTypesService>,
list_parameters_server: Server<rcl_interfaces::ListParametersService>,
set_parameters_server: Server<rcl_interfaces::SetParametersService>,
set_parameters_atomically_server: Server<rcl_interfaces::SetParametersAtomicallyService>,
describe_parameters_server: Server<rcl_interfaces::DescribeParametersService>,
}
pub struct Spinner {
ros_context: Context,
stop_spin_receiver: async_channel::Receiver<()>,
readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
use_sim_time: Arc<AtomicBool>,
sim_time: Arc<Mutex<ROSTime>>,
clock_topic: Topic,
allow_undeclared_parameters: bool,
parameter_servers: Option<ParameterServers>,
parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
fully_qualified_node_name: String,
}
async fn next_if_some<S>(s: &mut Option<S>) -> S::Item
where
S: Stream + Unpin + FusedStream,
{
match s.as_mut() {
Some(stream) => stream.select_next_some().await,
None => std::future::pending().await,
}
}
impl Spinner {
pub async fn spin(self) -> CreateResult<()> {
let dds_status_listener = self.ros_context.domain_participant().status_listener();
let dds_status_stream = dds_status_listener.as_async_status_stream();
pin_mut!(dds_status_stream);
let ros_discovery_topic = self.ros_context.ros_discovery_topic();
let ros_discovery_reader = self
.ros_context
.create_subscription::<ParticipantEntitiesInfo>(&ros_discovery_topic, None)?;
let ros_discovery_stream = ros_discovery_reader.async_stream();
pin_mut!(ros_discovery_stream);
let ros_clock_reader = self
.ros_context
.create_subscription::<builtin_interfaces::Time>(&self.clock_topic, None)?;
let ros_clock_stream = ros_clock_reader.async_stream();
pin_mut!(ros_clock_stream);
let mut get_parameters_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.get_parameters_server.receive_request_stream());
let mut get_parameter_types_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.get_parameter_types_server.receive_request_stream());
let mut set_parameters_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.set_parameters_server.receive_request_stream());
let mut set_parameters_atomically_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.set_parameters_atomically_server.receive_request_stream());
let mut list_parameter_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.list_parameters_server.receive_request_stream());
let mut describe_parameters_stream_opt = self
.parameter_servers
.as_ref()
.map(|s| s.describe_parameters_server.receive_request_stream());
loop {
futures::select! {
_ = self.stop_spin_receiver.recv().fuse() => {
break;
}
clock_msg = ros_clock_stream.select_next_some() => {
match clock_msg {
Ok((time,_msg_info)) => {
*self.sim_time.lock().unwrap() = time.into();
}
Err(e) => warn!("Simulated clock receive error {e:?}")
}
}
get_parameters_request = next_if_some(&mut get_parameters_stream_opt).fuse() => {
match get_parameters_request {
Ok( (req_id, req) ) => {
info!("Get parameter request {req:?}");
let values = {
let param_db = self.parameters.lock().unwrap();
req.names.iter()
.map(|name| param_db.get(name.as_str())
.unwrap_or(&ParameterValue::NotSet))
.cloned()
.map( raw::ParameterValue::from)
.collect()
};
info!("Get parameters response: {values:?}");
self.parameter_servers.as_ref().unwrap().get_parameters_server
.async_send_response(req_id, rcl_interfaces::GetParametersResponse{ values })
.await
.unwrap_or_else(|e| warn!("GetParameter response error {e:?}"));
}
Err(e) => warn!("GetParameter request error {e:?}"),
}
}
get_parameter_types_request = next_if_some(&mut get_parameter_types_stream_opt).fuse() => {
match get_parameter_types_request {
Ok( (req_id, req) ) => {
warn!("Get parameter types request");
let values = {
let param_db = self.parameters.lock().unwrap();
req.names.iter()
.map(|name| param_db.get(name.as_str())
.unwrap_or(&ParameterValue::NotSet))
.map(ParameterValue::to_parameter_type_raw)
.collect()
};
info!("Get parameter types response: {values:?}");
self.parameter_servers.as_ref().unwrap().get_parameter_types_server
.async_send_response(req_id, rcl_interfaces::GetParameterTypesResponse{ values })
.await
.unwrap_or_else(|e| warn!("GetParameterTypes response error {e:?}"));
}
Err(e) => warn!("GetParameterTypes request error {e:?}"),
}
}
set_parameters_request = next_if_some(&mut set_parameters_stream_opt).fuse() => {
match set_parameters_request {
Ok( (req_id, req) ) => {
info!("Set parameter request {req:?}");
let results =
req.parameter.iter()
.cloned()
.map( Parameter::from ) .map( |Parameter{name, value}| self.set_parameter(&name,value))
.map(|r| r.into()) .collect();
info!("Set parameters response: {results:?}");
self.parameter_servers.as_ref().unwrap().set_parameters_server
.async_send_response(req_id, rcl_interfaces::SetParametersResponse{ results })
.await
.unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
}
Err(e) => warn!("SetParameters request error {e:?}"),
}
}
set_parameters_atomically_request = next_if_some(&mut set_parameters_atomically_stream_opt).fuse() => {
match set_parameters_atomically_request {
Ok( (req_id, req) ) => {
warn!("Set parameters atomically request {req:?}");
let results =
req.parameter.iter()
.cloned()
.map( Parameter::from ) .map( |Parameter{ .. } |
Err("Setting parameters atomically is not implemented.".to_owned())
)
.map(|r| r.into()) .collect();
warn!("Set parameters atomically response: {results:?}");
self.parameter_servers.as_ref().unwrap().set_parameters_atomically_server
.async_send_response(req_id, rcl_interfaces::SetParametersAtomicallyResponse{ results })
.await
.unwrap_or_else(|e| warn!("SetParameters response error {e:?}"));
}
Err(e) => warn!("SetParametersAtomically request error {e:?}"),
}
}
list_parameter_request = next_if_some(&mut list_parameter_stream_opt).fuse() => {
match list_parameter_request {
Ok( (req_id, req) ) => {
info!("List parameters request");
let prefixes = req.prefixes;
let names = {
let param_db = self.parameters.lock().unwrap();
param_db.keys()
.filter_map(|name|
if prefixes.is_empty() ||
prefixes.iter().any(|prefix| name.starts_with(prefix))
{
Some(name.clone())
} else { None }
)
.collect()
};
let result = rcl_interfaces::ListParametersResult{ names, prefixes: vec![] };
info!("List parameters response: {result:?}");
self.parameter_servers.as_ref().unwrap().list_parameters_server
.async_send_response(req_id, rcl_interfaces::ListParametersResponse{ result })
.await
.unwrap_or_else(|e| warn!("ListParameter response error {e:?}"));
}
Err(e) => warn!("ListParameter request error {e:?}"),
}
}
describe_parameters_request = next_if_some(&mut describe_parameters_stream_opt).fuse() => {
match describe_parameters_request {
Ok( (req_id, req) ) => {
info!("Describe parameters request {req:?}");
let values = {
let parameters = self.parameters.lock().unwrap();
req.names.iter()
.map( |name|
{
if let Some(value) = parameters.get(name) {
ParameterDescriptor::from_value(name, value)
} else {
ParameterDescriptor::unknown(name)
}
})
.map(|r| r.into()) .collect()
};
info!("Describe parameters response: {values:?}");
self.parameter_servers.as_ref().unwrap().describe_parameters_server
.async_send_response(req_id, rcl_interfaces::DescribeParametersResponse{ values })
.await
.unwrap_or_else(|e| warn!("DescribeParameters response error {e:?}"));
}
Err(e) => warn!("DescribeParameters request error {e:?}"),
}
}
participant_info_update = ros_discovery_stream.select_next_some() => {
match participant_info_update {
Ok((part_update, _msg_info)) => {
let mut info_map = self.external_nodes.lock().unwrap();
info_map.insert( part_update.gid, part_update.node_entities_info_seq.clone());
self.send_status_event( &NodeEvent::ROS(part_update) );
}
Err(e) => {
warn!("ros_discovery_info error {e:?}");
}
}
}
dp_status_event = dds_status_stream.select_next_some() => {
match dp_status_event {
DomainParticipantStatusEvent::RemoteReaderMatched { local_writer, remote_reader } => {
self.writers_to_remote_readers.lock().unwrap()
.entry(local_writer)
.and_modify(|s| {s.insert(remote_reader);} )
.or_insert(BTreeSet::from([remote_reader]));
}
DomainParticipantStatusEvent::RemoteWriterMatched { local_reader, remote_writer } => {
self.readers_to_remote_writers.lock().unwrap()
.entry(local_reader)
.and_modify(|s| {s.insert(remote_writer);} )
.or_insert(BTreeSet::from([remote_writer]));
}
DomainParticipantStatusEvent::ReaderLost {guid, ..} => {
for ( _local, readers)
in self.writers_to_remote_readers.lock().unwrap().iter_mut() {
readers.remove(&guid);
}
}
DomainParticipantStatusEvent::WriterLost {guid, ..} => {
for ( _local, writers)
in self.readers_to_remote_writers.lock().unwrap().iter_mut() {
writers.remove(&guid);
}
}
_ => {}
}
self.send_status_event( &NodeEvent::DDS(dp_status_event) );
}
}
}
info!("Spinner exiting .spin()");
Ok(())
} fn send_status_event(&self, event: &NodeEvent) {
let mut closed = Vec::new();
let mut sender_array = self.status_event_senders.lock().unwrap();
for (i, sender) in sender_array.iter().enumerate() {
match sender.try_send(event.clone()) {
Ok(()) => {}
Err(async_channel::TrySendError::Closed(_)) => {
closed.push(i) }
Err(_) => {}
}
}
for c in closed.iter().rev() {
sender_array.swap_remove(*c);
}
}
fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
match name {
"use_sim_time" => match value {
ParameterValue::Boolean(_) => Ok(()),
_ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
},
_ => {
match self.parameter_validator {
Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
}
}
}
fn execute_parameter_set_actions(
&self,
name: &str,
value: &ParameterValue,
) -> SetParametersResult {
match name {
"use_sim_time" => match value {
ParameterValue::Boolean(s) => {
self.use_sim_time.store(*s, Ordering::SeqCst);
Ok(())
}
_ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
},
_ => {
match self.parameter_set_action {
Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
}
}
}
pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
let already_set = self.parameters.lock().unwrap().contains_key(name);
if self.allow_undeclared_parameters || already_set {
self.validate_parameter_on_set(name, &value)?;
self.execute_parameter_set_actions(name, &value)?;
let p = raw::Parameter {
name: name.to_string(),
value: value.clone().into(),
};
let (new_parameters, changed_parameters) = if already_set {
(vec![], vec![p])
} else {
(vec![p], vec![])
};
self
.parameters
.lock()
.unwrap()
.insert(name.to_owned(), value);
self
.parameter_events_writer
.publish(raw::ParameterEvent {
timestamp: rustdds::Timestamp::now(), node: self.fully_qualified_node_name.clone(),
new_parameters,
changed_parameters,
deleted_parameters: vec![],
})
.unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
Ok(())
} else {
Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
}
}
} #[derive(Debug)]
pub enum NodeCreateError {
DDS(CreateError),
BadParameter(String),
}
impl From<CreateError> for NodeCreateError {
fn from(c: CreateError) -> NodeCreateError {
NodeCreateError::DDS(c)
}
}
pub enum ParameterError {
AlreadyDeclared,
InvalidName,
}
pub struct Node {
node_name: NodeName,
options: NodeOptions,
pub(crate) ros_context: Context,
readers: BTreeSet<Gid>,
writers: BTreeSet<Gid>,
suppress_node_info_updates: Arc<AtomicBool>,
readers_to_remote_writers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
writers_to_remote_readers: Arc<Mutex<BTreeMap<GUID, BTreeSet<GUID>>>>,
external_nodes: Arc<Mutex<BTreeMap<Gid, Vec<NodeEntitiesInfo>>>>,
stop_spin_sender: Option<async_channel::Sender<()>>,
status_event_senders: Arc<Mutex<Vec<async_channel::Sender<NodeEvent>>>>,
rosout_writer: Option<Publisher<Log>>,
rosout_reader: Option<Subscription<Log>>,
parameter_events_writer: Arc<Publisher<raw::ParameterEvent>>,
parameters: Arc<Mutex<BTreeMap<String, ParameterValue>>>,
parameter_validator: Option<Arc<Mutex<Box<ParameterFunc>>>>,
parameter_set_action: Option<Arc<Mutex<Box<ParameterFunc>>>>,
use_sim_time: Arc<AtomicBool>,
sim_time: Arc<Mutex<ROSTime>>,
}
impl Node {
pub(crate) fn new(
node_name: NodeName,
mut options: NodeOptions,
ros_context: Context,
) -> Result<Node, NodeCreateError> {
let paramtopic = ros_context.get_parameter_events_topic();
let rosout_topic = ros_context.get_rosout_topic();
let enable_rosout = options.enable_rosout;
let rosout_reader = options.enable_rosout_reading;
let parameter_events_writer = ros_context.create_publisher(¶mtopic, None)?;
options.declared_parameters.push(Parameter {
name: "use_sim_time".to_string(),
value: ParameterValue::Boolean(false),
});
let parameters = options
.declared_parameters
.iter()
.cloned()
.map(|Parameter { name, value }| (name, value))
.collect::<BTreeMap<String, ParameterValue>>();
let parameter_validator = options
.parameter_validator
.take()
.map(|b| Arc::new(Mutex::new(b)));
let parameter_set_action = options
.parameter_set_action
.take()
.map(|b| Arc::new(Mutex::new(b)));
let mut node = Node {
node_name,
options,
ros_context,
readers: BTreeSet::new(),
writers: BTreeSet::new(),
readers_to_remote_writers: Arc::new(Mutex::new(BTreeMap::new())),
writers_to_remote_readers: Arc::new(Mutex::new(BTreeMap::new())),
external_nodes: Arc::new(Mutex::new(BTreeMap::new())),
suppress_node_info_updates: Arc::new(AtomicBool::new(false)),
stop_spin_sender: None,
status_event_senders: Arc::new(Mutex::new(Vec::new())),
rosout_writer: None, rosout_reader: None,
parameter_events_writer: Arc::new(parameter_events_writer),
parameters: Arc::new(Mutex::new(parameters)),
parameter_validator,
parameter_set_action,
use_sim_time: Arc::new(AtomicBool::new(false)),
sim_time: Arc::new(Mutex::new(ROSTime::ZERO)),
};
node.suppress_node_info_updates(true);
node.rosout_writer = if enable_rosout {
Some(
node.create_publisher(&rosout_topic, None)?,
)
} else {
None
};
node.rosout_reader = if rosout_reader {
Some(node.create_subscription(&rosout_topic, None)?)
} else {
None
};
node
.parameters
.lock()
.unwrap()
.iter()
.try_for_each(|(name, value)| {
node.validate_parameter_on_set(name, value)?;
node.execute_parameter_set_actions(name, value)?;
Ok(())
})
.map_err(NodeCreateError::BadParameter)?;
node.suppress_node_info_updates(false);
Ok(node)
}
pub fn time_now(&self) -> ROSTime {
if self.use_sim_time.load(Ordering::SeqCst) {
*self.sim_time.lock().unwrap()
} else {
ROSTime::now()
}
}
pub fn time_now_not_simulated(&self) -> ROSTime {
ROSTime::now()
}
pub fn spinner(&mut self) -> CreateResult<Spinner> {
if self.stop_spin_sender.is_some() {
panic!("Attempted to crate a second spinner.");
}
let (stop_spin_sender, stop_spin_receiver) = async_channel::bounded(1);
self.stop_spin_sender = Some(stop_spin_sender);
let service_qos = QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 1 })
.build();
let node_name = self.node_name.fully_qualified_name();
self.suppress_node_info_updates(true);
let parameter_servers = if self.options.start_parameter_services {
let service_mapping = ServiceMapping::Enhanced; let get_parameters_server = self.create_server(
service_mapping,
&Name::new(&node_name, "get_parameters").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "GetParameters"),
service_qos.clone(),
service_qos.clone(),
)?;
let get_parameter_types_server = self.create_server(
service_mapping,
&Name::new(&node_name, "get_parameter_types").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "GetParameterTypes"),
service_qos.clone(),
service_qos.clone(),
)?;
let set_parameters_server = self.create_server(
service_mapping,
&Name::new(&node_name, "set_parameters").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "SetParameters"),
service_qos.clone(),
service_qos.clone(),
)?;
let set_parameters_atomically_server = self.create_server(
service_mapping,
&Name::new(&node_name, "set_parameters_atomically").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "SetParametersAtomically"),
service_qos.clone(),
service_qos.clone(),
)?;
let list_parameters_server = self.create_server(
service_mapping,
&Name::new(&node_name, "list_parameters").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "ListParameters"),
service_qos.clone(),
service_qos.clone(),
)?;
let describe_parameters_server = self.create_server(
service_mapping,
&Name::new(&node_name, "describe_parameters").unwrap(),
&ServiceTypeName::new("rcl_interfaces", "DescribeParameters"),
service_qos.clone(),
service_qos.clone(),
)?;
Some(ParameterServers {
get_parameters_server,
get_parameter_types_server,
list_parameters_server,
set_parameters_server,
set_parameters_atomically_server,
describe_parameters_server,
})
} else {
None };
let clock_topic = self.create_topic(
&Name::new("/", "clock").unwrap(),
MessageTypeName::new("builtin_interfaces", "Time"),
&DEFAULT_SUBSCRIPTION_QOS,
)?;
self.suppress_node_info_updates(false);
Ok(Spinner {
ros_context: self.ros_context.clone(),
stop_spin_receiver,
readers_to_remote_writers: Arc::clone(&self.readers_to_remote_writers),
writers_to_remote_readers: Arc::clone(&self.writers_to_remote_readers),
external_nodes: Arc::clone(&self.external_nodes),
status_event_senders: Arc::clone(&self.status_event_senders),
use_sim_time: Arc::clone(&self.use_sim_time),
sim_time: Arc::clone(&self.sim_time),
clock_topic,
parameter_servers,
parameter_events_writer: Arc::clone(&self.parameter_events_writer),
parameters: Arc::clone(&self.parameters),
allow_undeclared_parameters: self.options.allow_undeclared_parameters,
parameter_validator: self.parameter_validator.as_ref().map(Arc::clone),
parameter_set_action: self.parameter_set_action.as_ref().map(Arc::clone),
fully_qualified_node_name: self.fully_qualified_name(),
})
}
pub fn have_spinner(&self) -> bool {
self.stop_spin_sender.is_some()
}
fn generate_node_info(&self) -> NodeEntitiesInfo {
let mut node_info = NodeEntitiesInfo::new(self.node_name.clone());
node_info.add_writer(Gid::from(self.parameter_events_writer.guid()));
if let Some(row) = &self.rosout_writer {
node_info.add_writer(Gid::from(row.guid()));
}
for reader in &self.readers {
node_info.add_reader(*reader);
}
for writer in &self.writers {
node_info.add_writer(*writer);
}
node_info
}
fn suppress_node_info_updates(&mut self, suppress: bool) {
self
.suppress_node_info_updates
.store(suppress, Ordering::SeqCst);
if !suppress {
self.ros_context.update_node(self.generate_node_info());
}
}
fn add_reader(&mut self, reader: Gid) {
self.readers.insert(reader);
if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
self.ros_context.update_node(self.generate_node_info());
}
}
fn add_writer(&mut self, writer: Gid) {
self.writers.insert(writer);
if !self.suppress_node_info_updates.load(Ordering::SeqCst) {
self.ros_context.update_node(self.generate_node_info());
}
}
pub fn base_name(&self) -> &str {
self.node_name.base_name()
}
pub fn namespace(&self) -> &str {
self.node_name.namespace()
}
pub fn fully_qualified_name(&self) -> String {
self.node_name.fully_qualified_name()
}
pub fn options(&self) -> &NodeOptions {
&self.options
}
pub fn domain_id(&self) -> u16 {
self.ros_context.domain_id()
}
pub fn undeclare_parameter(&self, name: &str) {
let prev_value = self.parameters.lock().unwrap().remove(name);
if let Some(deleted_param) = prev_value {
self
.parameter_events_writer
.publish(raw::ParameterEvent {
timestamp: self.time_now().into(),
node: self.fully_qualified_name(),
new_parameters: vec![],
changed_parameters: vec![],
deleted_parameters: vec![raw::Parameter {
name: name.to_string(),
value: deleted_param.into(),
}],
})
.unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
}
}
pub fn has_parameter(&self, name: &str) -> bool {
self.parameters.lock().unwrap().contains_key(name)
}
pub fn set_parameter(&self, name: &str, value: ParameterValue) -> Result<(), String> {
let already_set = self.parameters.lock().unwrap().contains_key(name);
if self.options.allow_undeclared_parameters || already_set {
self.validate_parameter_on_set(name, &value)?;
self.execute_parameter_set_actions(name, &value)?;
let p = raw::Parameter {
name: name.to_string(),
value: value.clone().into(),
};
let (new_parameters, changed_parameters) = if already_set {
(vec![], vec![p])
} else {
(vec![p], vec![])
};
self
.parameters
.lock()
.unwrap()
.insert(name.to_owned(), value);
self
.parameter_events_writer
.publish(raw::ParameterEvent {
timestamp: self.time_now().into(),
node: self.fully_qualified_name(),
new_parameters,
changed_parameters,
deleted_parameters: vec![],
})
.unwrap_or_else(|e| warn!("undeclare_parameter: {e:?}"));
Ok(())
} else {
Err("Setting undeclared parameter '".to_owned() + name + "' is not allowed.")
}
}
pub fn allow_undeclared_parameters(&self) -> bool {
self.options.allow_undeclared_parameters
}
pub fn get_parameter(&self, name: &str) -> Option<ParameterValue> {
self
.parameters
.lock()
.unwrap()
.get(name)
.map(|p| p.to_owned())
}
pub fn list_parameters(&self) -> Vec<String> {
self
.parameters
.lock()
.unwrap()
.keys()
.map(move |k| k.to_owned())
.collect::<Vec<_>>()
}
fn validate_parameter_on_set(&self, name: &str, value: &ParameterValue) -> SetParametersResult {
match name {
"use_sim_time" => match value {
ParameterValue::Boolean(_) => Ok(()),
_ => Err("Parameter'use_sim_time' must be Boolean.".to_owned()),
},
_ => {
match self.parameter_validator {
Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
}
}
}
fn execute_parameter_set_actions(
&self,
name: &str,
value: &ParameterValue,
) -> SetParametersResult {
match name {
"use_sim_time" => match value {
ParameterValue::Boolean(s) => {
self.use_sim_time.store(*s, Ordering::SeqCst);
Ok(())
}
_ => Err("Parameter 'use_sim_time' must be Boolean.".to_owned()),
},
_ => {
match self.parameter_set_action {
Some(ref v) => v.lock().unwrap()(name, value), None => Ok(()), }
}
}
}
pub fn status_receiver(&self) -> Receiver<NodeEvent> {
if self.have_spinner() {
let (status_event_sender, status_event_receiver) = async_channel::bounded(8);
self
.status_event_senders
.lock()
.unwrap()
.push(status_event_sender);
status_event_receiver
} else {
panic!("status_receiver() cannot set up a receiver, because no Spinner is running.")
}
}
pub(crate) async fn wait_for_writer(&self, reader: GUID) {
let status_receiver = self.status_receiver();
pin_mut!(status_receiver);
let already_present = self
.readers_to_remote_writers
.lock()
.unwrap()
.get(&reader)
.map(|writers| !writers.is_empty()) .unwrap_or(false); if already_present {
info!("wait_for_writer: Already have matched a writer.");
} else {
loop {
debug!("wait_for_writer: Waiting for a writer.");
if let NodeEvent::DDS(DomainParticipantStatusEvent::RemoteWriterMatched {
local_reader,
remote_writer,
}) = status_receiver.select_next_some().await
{
if local_reader == reader {
info!("wait_for_writer: Matched remote writer {remote_writer:?}");
break; }
}
}
}
}
pub(crate) async fn wait_for_reader(&self, writer: GUID) {
let status_receiver = self.status_receiver();
pin_mut!(status_receiver);
let already_present = self
.writers_to_remote_readers
.lock()
.unwrap()
.get(&writer)
.map(|readers| !readers.is_empty()) .unwrap_or(false); if already_present {
info!("wait_for_reader: Already have matched a reader.");
} else {
loop {
debug!("wait_for_reader: Waiting for a reader.");
if let NodeEvent::DDS(DomainParticipantStatusEvent::RemoteReaderMatched {
local_writer,
remote_reader,
}) = status_receiver.select_next_some().await
{
if local_writer == writer {
info!("wait_for_reader: Matched remote reader {remote_reader:?}.");
break; }
}
}
}
}
pub(crate) fn get_publisher_count(&self, subscription_guid: GUID) -> usize {
self
.readers_to_remote_writers
.lock()
.unwrap()
.get(&subscription_guid)
.map(BTreeSet::len)
.unwrap_or_else(|| {
error!("get_publisher_count: Subscriber {subscription_guid:?} not known to node.");
0
})
}
pub(crate) fn get_subscription_count(&self, publisher_guid: GUID) -> usize {
self
.writers_to_remote_readers
.lock()
.unwrap()
.get(&publisher_guid)
.map(BTreeSet::len)
.unwrap_or_else(|| {
error!("get_subscription_count: Publisher {publisher_guid:?} not known to node.");
0
})
}
pub fn rosout_subscription(&self) -> Option<&Subscription<Log>> {
self.rosout_reader.as_ref()
}
#[allow(clippy::too_many_arguments)]
pub fn rosout_raw(
&self,
timestamp: Timestamp,
level: crate::ros2::LogLevel,
log_name: &str,
log_msg: &str,
source_file: &str,
source_function: &str,
source_line: u32,
) {
match &self.rosout_writer {
None => debug!("Rosout not enabled. msg: {log_msg}"),
Some(writer) => {
writer
.publish(ros_log::Log {
timestamp,
level: level as u8,
name: log_name.to_string(),
msg: log_msg.to_string(),
file: source_file.to_string(),
function: source_function.to_string(),
line: source_line,
})
.unwrap_or_else(|e| debug!("Rosout publish failed: {e:?}"));
}
}
}
pub fn create_topic(
&self,
topic_name: &Name,
type_name: MessageTypeName,
qos: &QosPolicies,
) -> CreateResult<Topic> {
let dds_name = topic_name.to_dds_name("rt", &self.node_name, "");
self.ros_context.create_topic(dds_name, type_name, qos)
}
pub fn create_subscription<D: 'static>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<Subscription<D>> {
let sub = self.ros_context.create_subscription(topic, qos)?;
self.add_reader(sub.guid().into());
Ok(sub)
}
pub fn create_publisher<D: Serialize>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<Publisher<D>> {
let p = self.ros_context.create_publisher(topic, qos)?;
self.add_writer(p.guid().into());
Ok(p)
}
pub(crate) fn create_simpledatareader<D, DA>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<no_key::SimpleDataReader<D, DA>>
where
D: 'static,
DA: rustdds::no_key::DeserializerAdapter<D> + 'static,
{
let r = self.ros_context.create_simpledatareader(topic, qos)?;
self.add_reader(r.guid().into());
Ok(r)
}
pub(crate) fn create_datawriter<D, SA>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
) -> CreateResult<no_key::DataWriter<D, SA>>
where
SA: rustdds::no_key::SerializerAdapter<D>,
{
let w = self.ros_context.create_datawriter(topic, qos)?;
self.add_writer(w.guid().into());
Ok(w)
}
pub fn create_client<S>(
&mut self,
service_mapping: ServiceMapping,
service_name: &Name,
service_type_name: &ServiceTypeName,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> CreateResult<Client<S>>
where
S: Service + 'static,
S::Request: Clone,
{
let rq_topic = self.ros_context.domain_participant().create_topic(
service_name.to_dds_name("rq", &self.node_name, "Request"),
service_type_name.dds_request_type(),
&request_qos,
TopicKind::NoKey,
)?;
let rs_topic = self.ros_context.domain_participant().create_topic(
service_name.to_dds_name("rr", &self.node_name, "Reply"),
service_type_name.dds_response_type(),
&response_qos,
TopicKind::NoKey,
)?;
let c = Client::<S>::new(
service_mapping,
self,
&rq_topic,
&rs_topic,
Some(request_qos),
Some(response_qos),
)?;
Ok(c)
}
pub fn create_server<S>(
&mut self,
service_mapping: ServiceMapping,
service_name: &Name,
service_type_name: &ServiceTypeName,
request_qos: QosPolicies,
response_qos: QosPolicies,
) -> CreateResult<Server<S>>
where
S: Service + 'static,
S::Request: Clone,
{
let rq_topic = self.ros_context.domain_participant().create_topic(
service_name.to_dds_name("rq", &self.node_name, "Request"),
service_type_name.dds_request_type(),
&request_qos,
TopicKind::NoKey,
)?;
let rs_topic = self.ros_context.domain_participant().create_topic(
service_name.to_dds_name("rr", &self.node_name, "Reply"),
service_type_name.dds_response_type(),
&response_qos,
TopicKind::NoKey,
)?;
let s = Server::<S>::new(
service_mapping,
self,
&rq_topic,
&rs_topic,
Some(request_qos),
Some(response_qos),
)?;
Ok(s)
}
pub fn create_action_client<A>(
&mut self,
service_mapping: ServiceMapping,
action_name: &Name,
action_type_name: &ActionTypeName,
action_qos: ActionClientQosPolicies,
) -> CreateResult<ActionClient<A>>
where
A: ActionTypes + 'static,
{
let services_base_name = action_name.push("_action");
let goal_service_type = action_type_name.dds_action_service("_SendGoal");
let my_goal_client = self.create_client(
service_mapping,
&services_base_name.push("send_goal"),
&goal_service_type,
action_qos.goal_service.clone(),
action_qos.goal_service,
)?;
let cancel_goal_type = ServiceTypeName::new("action_msgs", "CancelGoal");
let my_cancel_client = self.create_client(
service_mapping,
&services_base_name.push("cancel_goal"),
&cancel_goal_type,
action_qos.cancel_service.clone(),
action_qos.cancel_service,
)?;
let result_service_type = action_type_name.dds_action_service("_GetResult");
let my_result_client = self.create_client(
service_mapping,
&services_base_name.push("get_result"),
&result_service_type,
action_qos.result_service.clone(),
action_qos.result_service,
)?;
let action_topic_namespace = action_name.push("_action");
let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
let feedback_topic = self.create_topic(
&action_topic_namespace.push("feedback"),
feedback_topic_type,
&action_qos.feedback_subscription,
)?;
let my_feedback_subscription =
self.create_subscription(&feedback_topic, Some(action_qos.feedback_subscription))?;
let status_topic = self.create_topic(
&action_topic_namespace.push("status"),
MessageTypeName::new("action_msgs", "GoalStatusArray"),
&action_qos.status_subscription,
)?;
let my_status_subscription =
self.create_subscription(&status_topic, Some(action_qos.status_subscription))?;
Ok(ActionClient {
my_goal_client,
my_cancel_client,
my_result_client,
my_feedback_subscription,
my_status_subscription,
my_action_name: action_name.clone(),
})
}
pub fn create_action_server<A>(
&mut self,
service_mapping: ServiceMapping,
action_name: &Name,
action_type_name: &ActionTypeName,
action_qos: ActionServerQosPolicies,
) -> CreateResult<ActionServer<A>>
where
A: ActionTypes + 'static,
{
let services_base_name = action_name.push("_action");
let goal_service_type = action_type_name.dds_action_service("_SendGoal");
let my_goal_server = self.create_server(
service_mapping,
&services_base_name.push("send_goal"),
&goal_service_type,
action_qos.goal_service.clone(),
action_qos.goal_service,
)?;
let cancel_service_type = ServiceTypeName::new("action_msgs", "CancelGoal");
let my_cancel_server = self.create_server(
service_mapping,
&services_base_name.push("cancel_goal"),
&cancel_service_type,
action_qos.cancel_service.clone(),
action_qos.cancel_service,
)?;
let result_service_type = action_type_name.dds_action_service("_GetResult");
let my_result_server = self.create_server(
service_mapping,
&services_base_name.push("get_result"),
&result_service_type,
action_qos.result_service.clone(),
action_qos.result_service,
)?;
let action_topic_namespace = action_name.push("_action");
let feedback_topic_type = action_type_name.dds_action_topic("_FeedbackMessage");
let feedback_topic = self.create_topic(
&action_topic_namespace.push("feedback"),
feedback_topic_type,
&action_qos.feedback_publisher,
)?;
let my_feedback_publisher =
self.create_publisher(&feedback_topic, Some(action_qos.feedback_publisher))?;
let status_topic_type = MessageTypeName::new("action_msgs", "GoalStatusArray");
let status_topic = self.create_topic(
&action_topic_namespace.push("status"),
status_topic_type,
&action_qos.status_publisher,
)?;
let my_status_publisher =
self.create_publisher(&status_topic, Some(action_qos.status_publisher))?;
Ok(ActionServer {
my_goal_server,
my_cancel_server,
my_result_server,
my_feedback_publisher,
my_status_publisher,
my_action_name: action_name.clone(),
})
}
} impl Drop for Node {
fn drop(&mut self) {
if let Some(ref stop_spin_sender) = self.stop_spin_sender {
stop_spin_sender
.try_send(())
.unwrap_or_else(|e| error!("Cannot notify spin task to stop: {e:?}"));
}
self
.ros_context
.remove_node(self.fully_qualified_name().as_str());
}
}
#[macro_export]
macro_rules! rosout {
($node:expr, $lvl:expr, $($arg:tt)+) => (
$node.rosout_raw(
$crate::ros2::Timestamp::now(),
$lvl,
$node.base_name(),
&std::format!($($arg)+), std::file!(),
"<unknown_func>", std::line!(),
);
);
}