use std::{
borrow::Borrow,
ffi::{CStr, CString},
sync::{atomic::AtomicBool, Arc, Mutex},
};
use futures::channel::mpsc::unbounded;
use crate::{
node::node_graph_task::{node_graph_task, NodeGraphAction},
rcl_bindings::*,
ClockType, ExecutorCommands, GuardCondition, Logger, Node, NodeHandle, NodeState,
ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX,
QOS_PROFILE_CLOCK,
};
pub trait IntoNodeOptions<'a>: Sized {
fn into_node_options(self) -> NodeOptions<'a>;
fn namespace(self, namespace: &'a str) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.namespace = namespace;
options
}
fn use_global_arguments(self, enable: bool) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.use_global_arguments = enable;
options
}
fn arguments<Args: IntoIterator>(self, arguments: Args) -> NodeOptions<'a>
where
Args::Item: ToString,
{
let mut options = self.into_node_options();
options.arguments = arguments.into_iter().map(|item| item.to_string()).collect();
options
}
fn enable_rosout(self, enable: bool) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.enable_rosout = enable;
options
}
fn start_parameter_services(self, start: bool) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.start_parameter_services = start;
options
}
fn clock_type(self, clock_type: ClockType) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.clock_type = clock_type;
options
}
fn clock_qos(self, clock_qos: QoSProfile) -> NodeOptions<'a> {
let mut options = self.into_node_options();
options.clock_qos = clock_qos;
options
}
}
pub struct NodeOptions<'a> {
name: &'a str,
namespace: &'a str,
use_global_arguments: bool,
arguments: Vec<String>,
enable_rosout: bool,
start_parameter_services: bool,
clock_type: ClockType,
clock_qos: QoSProfile,
}
impl<'a> NodeOptions<'a> {
pub fn new(name: &'a str) -> NodeOptions<'a> {
NodeOptions {
name,
namespace: "/",
use_global_arguments: true,
arguments: vec![],
enable_rosout: true,
start_parameter_services: true,
clock_type: ClockType::RosTime,
clock_qos: QOS_PROFILE_CLOCK,
}
}
pub(crate) fn build(self, commands: &Arc<ExecutorCommands>) -> Result<Node, RclrsError> {
let node_name = CString::new(self.name).map_err(|err| RclrsError::StringContainsNul {
err,
s: self.name.to_owned(),
})?;
let node_namespace =
CString::new(self.namespace).map_err(|err| RclrsError::StringContainsNul {
err,
s: self.namespace.to_owned(),
})?;
let rcl_node_options = self.create_rcl_node_options()?;
let rcl_context = &mut *commands.context().handle.rcl_context.lock().unwrap();
let handle = Arc::new(NodeHandle {
rcl_node: Mutex::new(unsafe { rcl_get_zero_initialized_node() }),
context_handle: Arc::clone(&commands.context().handle),
initialized: AtomicBool::new(false),
});
unsafe {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
rcl_node_init(
&mut *handle.rcl_node.lock().unwrap(),
node_name.as_ptr(),
node_namespace.as_ptr(),
rcl_context,
&rcl_node_options,
)
.ok()?;
};
handle
.initialized
.store(true, std::sync::atomic::Ordering::Release);
let parameter = {
let rcl_node = handle.rcl_node.lock().unwrap();
ParameterInterface::new(
&rcl_node,
&rcl_node_options.arguments,
&rcl_context.global_arguments,
)?
};
let logger_name = {
let rcl_node = handle.rcl_node.lock().unwrap();
let logger_name_raw_ptr = unsafe { rcl_node_get_logger_name(&*rcl_node) };
if logger_name_raw_ptr.is_null() {
""
} else {
unsafe { CStr::from_ptr(logger_name_raw_ptr) }
.to_str()
.unwrap_or("")
}
};
let (graph_change_action, graph_change_receiver) = unbounded();
let graph_change_execute_sender = graph_change_action.clone();
let rcl_graph_change_guard_condition = unsafe {
rcl_node_get_graph_guard_condition(&*handle.rcl_node.lock().unwrap())
};
let (graph_change_guard_condition, graph_change_waitable) = unsafe {
GuardCondition::from_rcl(
&commands.context().handle,
rcl_graph_change_guard_condition,
Box::new(Arc::clone(&handle)),
Some(Box::new(move || {
graph_change_execute_sender
.unbounded_send(NodeGraphAction::GraphChange)
.ok();
})),
)
};
commands.add_to_wait_set(graph_change_waitable);
let _ = commands.run(node_graph_task(
graph_change_receiver,
graph_change_guard_condition,
));
let node = Arc::new(NodeState {
time_source: TimeSource::builder(self.clock_type)
.clock_qos(self.clock_qos)
.build(),
parameter,
logger: Logger::new(logger_name)?,
graph_change_action,
commands: Arc::clone(&commands),
handle,
});
node.time_source.attach_node(&node);
if self.start_parameter_services {
node.parameter.create_services(&node)?;
}
Ok(node)
}
fn create_rcl_node_options(&self) -> Result<rcl_node_options_t, RclrsError> {
let mut rcl_node_options = unsafe { rcl_node_get_default_options() };
let cstring_args = self
.arguments
.iter()
.map(|s| match CString::new(s.as_str()) {
Ok(cstr) => Ok(cstr),
Err(err) => Err(RclrsError::StringContainsNul { s: s.clone(), err }),
})
.collect::<Result<Vec<_>, _>>()?;
let cstring_arg_ptrs = cstring_args.iter().map(|s| s.as_ptr()).collect::<Vec<_>>();
unsafe {
rcl_parse_arguments(
cstring_arg_ptrs.len() as i32,
cstring_arg_ptrs.as_ptr(),
rcutils_get_default_allocator(),
&mut rcl_node_options.arguments,
)
}
.ok()?;
rcl_node_options.use_global_arguments = self.use_global_arguments;
rcl_node_options.enable_rosout = self.enable_rosout;
rcl_node_options.allocator = unsafe { rcutils_get_default_allocator() };
Ok(rcl_node_options)
}
}
impl<'a> IntoNodeOptions<'a> for NodeOptions<'a> {
fn into_node_options(self) -> NodeOptions<'a> {
self
}
}
impl<'a, T: Borrow<str>> IntoNodeOptions<'a> for &'a T {
fn into_node_options(self) -> NodeOptions<'a> {
NodeOptions::new(self.borrow())
}
}
impl<'a> IntoNodeOptions<'a> for &'a str {
fn into_node_options(self) -> NodeOptions<'a> {
NodeOptions::new(self)
}
}
impl Drop for rcl_node_options_t {
fn drop(&mut self) {
unsafe {
rcl_node_options_fini(self).ok().unwrap();
}
}
}