use std::any::Any;
use std::boxed::Box;
use std::ffi::CString;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use futures::future::BoxFuture;
use super::{
get_type_support_handle, get_type_support_library, DynamicMessage, DynamicMessageMetadata,
MessageStructure, MessageTypeName,
};
use crate::rcl_bindings::*;
use crate::{
MessageInfo, Node, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError,
RclrsErrorFilter, ReadyKind, SubscriptionHandle, SubscriptionOptions, ToResult, Waitable,
WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
};
pub type DynamicSubscription = Arc<DynamicSubscriptionState<Node>>;
pub type WorkerDynamicSubscription<Payload> = Arc<DynamicSubscriptionState<Worker<Payload>>>;
struct DynamicSubscriptionExecutable<Payload> {
handle: Arc<SubscriptionHandle>,
callback: Arc<Mutex<DynamicSubscriptionCallback<Payload>>>,
commands: Arc<WorkerCommands>,
metadata: Arc<DynamicMessageMetadata>,
}
pub(crate) struct NodeDynamicSubscriptionCallback(
Box<dyn Fn(DynamicMessage, MessageInfo) + Send + Sync>,
);
impl NodeDynamicSubscriptionCallback {
pub(crate) fn new(f: impl Fn(DynamicMessage, MessageInfo) + Send + Sync + 'static) -> Self {
NodeDynamicSubscriptionCallback(Box::new(f))
}
}
pub(crate) struct NodeAsyncDynamicSubscriptionCallback(
Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync>,
);
impl NodeAsyncDynamicSubscriptionCallback {
pub(crate) fn new(
f: impl FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync + 'static,
) -> Self {
NodeAsyncDynamicSubscriptionCallback(Box::new(f))
}
}
pub(crate) struct WorkerDynamicSubscriptionCallback<Payload>(
Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync>,
);
impl<Payload> WorkerDynamicSubscriptionCallback<Payload> {
pub(crate) fn new(
f: impl FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync + 'static,
) -> Self {
WorkerDynamicSubscriptionCallback(Box::new(f))
}
}
impl Deref for NodeDynamicSubscriptionCallback {
type Target = Box<dyn Fn(DynamicMessage, MessageInfo) + 'static + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Deref for NodeAsyncDynamicSubscriptionCallback {
type Target =
Box<dyn FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for NodeDynamicSubscriptionCallback {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl DerefMut for NodeAsyncDynamicSubscriptionCallback {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<Payload> Deref for WorkerDynamicSubscriptionCallback<Payload> {
type Target = Box<dyn FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<Payload> DerefMut for WorkerDynamicSubscriptionCallback<Payload> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub(crate) enum DynamicSubscriptionCallback<Payload> {
Node(NodeAsyncDynamicSubscriptionCallback),
Worker(WorkerDynamicSubscriptionCallback<Payload>),
}
impl From<NodeDynamicSubscriptionCallback> for DynamicSubscriptionCallback<()> {
fn from(value: NodeDynamicSubscriptionCallback) -> Self {
let func = Arc::new(value);
DynamicSubscriptionCallback::Node(NodeAsyncDynamicSubscriptionCallback(Box::new(
move |message, info| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message, info);
})
},
)))
}
}
impl From<NodeAsyncDynamicSubscriptionCallback> for DynamicSubscriptionCallback<()> {
fn from(value: NodeAsyncDynamicSubscriptionCallback) -> Self {
DynamicSubscriptionCallback::Node(value)
}
}
impl<Payload> From<WorkerDynamicSubscriptionCallback<Payload>>
for DynamicSubscriptionCallback<Payload>
{
fn from(value: WorkerDynamicSubscriptionCallback<Payload>) -> Self {
DynamicSubscriptionCallback::Worker(value)
}
}
impl<Payload: 'static> DynamicSubscriptionCallback<Payload> {
fn execute(
&mut self,
executable: &DynamicSubscriptionExecutable<Payload>,
any_payload: &mut dyn Any,
commands: &WorkerCommands,
) -> Result<(), RclrsError> {
let Some(payload) = any_payload.downcast_mut::<Payload>() else {
return Err(RclrsError::InvalidPayload {
expected: std::any::TypeId::of::<Payload>(),
received: (*any_payload).type_id(),
});
};
let mut evaluate = || {
match self {
Self::Node(cb) => {
let (msg, msg_info) = executable.take()?;
commands.run_async(cb(msg, msg_info));
}
Self::Worker(cb) => {
let (msg, msg_info) = executable.take()?;
cb(payload, msg, msg_info);
}
}
Ok(())
};
evaluate().take_failed_ok()
}
}
impl<Payload> DynamicSubscriptionExecutable<Payload> {
fn take(&self) -> Result<(DynamicMessage, MessageInfo), RclrsError> {
let mut dynamic_message = self.metadata.create()?;
let rmw_message = dynamic_message.storage.as_mut_ptr();
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
let rcl_subscription = &mut *self.handle.lock();
unsafe {
rcl_take(
rcl_subscription,
rmw_message as *mut _,
&mut message_info,
std::ptr::null_mut(),
)
.ok()?
};
Ok((
dynamic_message,
MessageInfo::from_rmw_message_info(&message_info),
))
}
}
impl<Payload: 'static> RclPrimitive for DynamicSubscriptionExecutable<Payload> {
unsafe fn execute(
&mut self,
ready: ReadyKind,
payload: &mut dyn Any,
) -> Result<(), RclrsError> {
ready.for_basic()?;
self.callback
.lock()
.unwrap()
.execute(&self, payload, &self.commands)
}
fn kind(&self) -> RclPrimitiveKind {
RclPrimitiveKind::Subscription
}
fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::Subscription(self.handle.lock())
}
}
pub struct DynamicSubscriptionState<Scope>
where
Scope: WorkScope,
{
handle: Arc<SubscriptionHandle>,
#[allow(unused)]
callback: Arc<Mutex<DynamicSubscriptionCallback<Scope::Payload>>>,
#[allow(unused)]
lifecycle: WaitableLifecycle,
metadata: Arc<DynamicMessageMetadata>,
#[allow(dead_code)]
type_support_library: Arc<libloading::Library>,
}
impl<Scope> DynamicSubscriptionState<Scope>
where
Scope: WorkScope,
{
pub(crate) fn create<'a>(
topic_type: MessageTypeName,
options: impl Into<SubscriptionOptions<'a>>,
callback: impl Into<DynamicSubscriptionCallback<Scope::Payload>>,
node_handle: &Arc<NodeHandle>,
commands: &Arc<WorkerCommands>,
) -> Result<Arc<Self>, RclrsError> {
let metadata = DynamicMessageMetadata::new(topic_type)?;
let SubscriptionOptions { topic, qos } = options.into();
let message_type = &metadata.message_type;
let type_support_library =
get_type_support_library(&message_type.package_name, "rosidl_typesupport_c")?;
let type_support_ptr = unsafe {
get_type_support_handle(
type_support_library.as_ref(),
"rosidl_typesupport_c",
message_type,
)?
};
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
rcl_subscription_options.qos = qos.into();
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
rcl_subscription_init(
&mut rcl_subscription,
&*rcl_node,
type_support_ptr,
topic_c_string.as_ptr(),
&rcl_subscription_options,
)
.ok()?;
}
}
let handle = Arc::new(SubscriptionHandle {
rcl_subscription: Mutex::new(rcl_subscription),
node_handle: Arc::clone(node_handle),
});
let callback = Arc::new(Mutex::new(callback.into()));
let metadata = Arc::new(metadata);
let (waitable, lifecycle) = Waitable::new(
Box::new(DynamicSubscriptionExecutable {
handle: Arc::clone(&handle),
callback: Arc::clone(&callback),
commands: Arc::clone(commands),
metadata: Arc::clone(&metadata),
}),
Some(Arc::clone(commands.get_guard_condition())),
);
commands.add_to_wait_set(waitable);
Ok(Arc::new(Self {
handle,
callback,
lifecycle,
metadata,
type_support_library,
}))
}
pub fn topic_name(&self) -> String {
self.handle.topic_name()
}
pub fn structure(&self) -> &MessageStructure {
&self.metadata.structure
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::*;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
#[test]
fn dynamic_subscription_is_sync_and_send() {
assert_send::<DynamicSubscription>();
assert_sync::<DynamicSubscription>();
}
#[test]
fn test_dynamic_subscriptions() -> Result<(), RclrsError> {
use crate::TopicEndpointInfo;
let namespace = "/test_dynamic_subscriptions_graph";
let graph = construct_test_graph(namespace)?;
let node_2_empty_subscription = graph.node2.create_dynamic_subscription::<_>(
"test_msgs/msg/Empty".try_into()?,
"graph_test_topic_1",
|_, _| {},
)?;
let topic1 = node_2_empty_subscription.topic_name();
let node_2_basic_types_subscription = graph.node2.create_dynamic_subscription::<_>(
"test_msgs/msg/BasicTypes".try_into()?,
"graph_test_topic_2",
|_, _| {},
)?;
let topic2 = node_2_basic_types_subscription.topic_name();
let node_1_defaults_subscription = graph.node1.create_dynamic_subscription::<_>(
"test_msgs/msg/Defaults".try_into()?,
"graph_test_topic_3",
|_, _| {},
)?;
let topic3 = node_1_defaults_subscription.topic_name();
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(graph.node2.count_subscriptions(&topic1)?, 1);
assert_eq!(graph.node2.count_subscriptions(&topic2)?, 1);
let node_1_subscription_names_and_types = graph
.node1
.get_subscription_names_and_types_by_node(&graph.node1.name(), namespace)?;
let types = node_1_subscription_names_and_types.get(&topic3).unwrap();
assert!(types.contains(&"test_msgs/msg/Defaults".to_string()));
let node_2_subscription_names_and_types = graph
.node2
.get_subscription_names_and_types_by_node(&graph.node2.name(), namespace)?;
let types = node_2_subscription_names_and_types.get(&topic1).unwrap();
assert!(types.contains(&"test_msgs/msg/Empty".to_string()));
let types = node_2_subscription_names_and_types.get(&topic2).unwrap();
assert!(types.contains(&"test_msgs/msg/BasicTypes".to_string()));
let expected_subscriptions_info = vec![TopicEndpointInfo {
node_name: String::from("graph_test_node_2"),
node_namespace: String::from(namespace),
topic_type: String::from("test_msgs/msg/Empty"),
}];
assert_eq!(
graph.node1.get_subscriptions_info_by_topic(&topic1)?,
expected_subscriptions_info
);
assert_eq!(
graph.node2.get_subscriptions_info_by_topic(&topic1)?,
expected_subscriptions_info
);
Ok(())
}
}