use futures::{channel::oneshot, Future, TryFutureExt};
use std::{
ffi::{c_void, CString},
fmt::Debug,
marker::PhantomData,
sync::{Mutex, Once, Weak},
};
use crate::{error::*, msg_types::*, qos::QosProfile};
use r2r_rcl::*;
unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
pub(crate) struct Publisher_ {
handle: rcl_publisher_t,
poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>,
}
impl Publisher_ {
fn get_inter_process_subscription_count(&self) -> Result<usize> {
let mut inter_process_subscription_count = 0;
let result = unsafe {
rcl_publisher_get_subscription_count(
&self.handle as *const rcl_publisher_t,
&mut inter_process_subscription_count as *mut usize,
)
};
if result == RCL_RET_OK as i32 {
Ok(inter_process_subscription_count)
} else {
Err(Error::from_rcl_error(result))
}
}
pub(crate) fn poll_has_inter_process_subscribers(&self) {
let mut poll_inter_process_subscriber_channels =
self.poll_inter_process_subscriber_channels.lock().unwrap();
if poll_inter_process_subscriber_channels.is_empty() {
return;
}
let inter_process_subscription_count = self.get_inter_process_subscription_count();
match inter_process_subscription_count {
Ok(0) => {
}
Ok(_) => {
while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
let _res = sender.send(()); }
}
Err(_) => {
poll_inter_process_subscriber_channels.clear();
}
}
}
pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };
}
}
#[derive(Debug, Clone)]
pub struct Publisher<T>
where
T: WrappedTypesupport,
{
pub(crate) handle: Weak<Publisher_>,
type_: PhantomData<T>,
}
unsafe impl Send for PublisherUntyped {}
#[derive(Debug, Clone)]
pub struct PublisherUntyped {
pub(crate) handle: Weak<Publisher_>,
type_: String,
}
pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
where
T: WrappedTypesupport,
{
Publisher {
handle,
type_: PhantomData,
}
}
pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
PublisherUntyped { handle, type_ }
}
pub fn create_publisher_helper(
node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
) -> Result<Publisher_> {
let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
let result = unsafe {
let mut publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = qos_profile.into();
rcl_publisher_init(
&mut publisher_handle,
node,
typesupport,
topic_c_string.as_ptr(),
&publisher_options,
)
};
if result == RCL_RET_OK as i32 {
Ok(Publisher_ {
handle: publisher_handle,
poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
})
} else {
Err(Error::from_rcl_error(result))
}
}
impl PublisherUntyped {
pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
native_msg.from_json(msg)?;
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
Ok(())
} else {
log::error!("could not publish {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn publish_raw(&self, data: &[u8]) -> Result<()> {
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let msg_buf = rcl_serialized_message_t {
buffer: data.as_ptr() as *mut u8,
buffer_length: data.len(),
buffer_capacity: data.len(),
allocator: unsafe { rcutils_get_default_allocator() },
};
let result = unsafe {
rcl_publish_serialized_message(
&publisher.handle,
&msg_buf as *const rcl_serialized_message_t,
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
Ok(())
} else {
log::error!("could not publish {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
}
impl<T: 'static> Publisher<T>
where
T: WrappedTypesupport,
{
pub fn publish(&self, msg: &T) -> Result<()>
where
T: WrappedTypesupport,
{
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
let result = unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
native_msg.void_ptr(),
std::ptr::null_mut(),
)
};
if result == RCL_RET_OK as i32 {
Ok(())
} else {
log::error!("could not publish {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn borrow_loaned_message(&self) -> Result<WrappedNativeMsg<T>>
where
T: WrappedTypesupport,
{
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
let mut loaned_msg: *mut c_void = std::ptr::null_mut();
let ret = unsafe {
rcl_borrow_loaned_message(
&publisher.handle as *const rcl_publisher_t,
T::get_ts(),
&mut loaned_msg,
)
};
if ret != RCL_RET_OK as i32 {
log::error!("Failed getting loaned message");
return Err(Error::from_rcl_error(ret));
}
let handle_box = Box::new(publisher.handle);
let msg = WrappedNativeMsg::<T>::from_loaned(
loaned_msg as *mut T::CStruct,
Box::new(|msg: *mut T::CStruct| {
let ret = unsafe {
let handle_ptr = Box::into_raw(handle_box);
let ret = rcl_return_loaned_message_from_publisher(
handle_ptr,
msg as *mut c_void,
);
drop(Box::from_raw(handle_ptr));
ret
};
if ret != RCL_RET_OK as i32 {
panic!("rcl_deallocate_loaned_message failed");
}
}),
);
Ok(msg)
} else {
static LOG_LOANED_ERROR: Once = Once::new();
LOG_LOANED_ERROR.call_once(|| {
log::error!(
"Currently used middleware can't loan messages. Local allocator will be used."
);
});
Ok(WrappedNativeMsg::<T>::new())
}
}
pub fn publish_native(&self, msg: &mut WrappedNativeMsg<T>) -> Result<()>
where
T: WrappedTypesupport,
{
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
let result = if msg.is_loaned {
unsafe {
msg.release();
rcl_publish_loaned_message(
&publisher.handle as *const rcl_publisher_t,
msg.void_ptr_mut(),
std::ptr::null_mut(),
)
}
} else {
unsafe {
rcl_publish(
&publisher.handle as *const rcl_publisher_t,
msg.void_ptr(),
std::ptr::null_mut(),
)
}
};
if result == RCL_RET_OK as i32 {
Ok(())
} else {
log::error!("could not publish native {}", result);
Err(Error::from_rcl_error(result))
}
}
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.get_inter_process_subscription_count()
}
pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
let (sender, receiver) = oneshot::channel();
self.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
.poll_inter_process_subscriber_channels
.lock()
.unwrap()
.push(sender);
Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
}
}