use crate::{
error::{DynError, RCLError, RCLResult},
get_allocator, is_halt,
msg::TypeSupport,
node::Node,
qos, rcl,
selector::{
async_selector::{self, SELECTOR},
CallbackResult,
},
signal_handler::Signaled,
subscriber_loaned_message::SubscriberLoanedMessage,
PhantomUnsync, RecvResult,
};
use pin_project::{pin_project, pinned_drop};
use std::{
ffi::CString,
future::Future,
marker::PhantomData,
mem::MaybeUninit,
os::raw::c_void,
pin::Pin,
ptr::null_mut,
sync::Arc,
task::{self, Poll},
};
#[cfg(feature = "rcl_stat")]
use crate::helper::statistics::{SerializableTimeStat, TimeStatistics};
#[cfg(feature = "rcl_stat")]
use parking_lot::Mutex;
pub(crate) struct RCLSubscription {
pub subscription: Box<rcl::rcl_subscription_t>,
topic_name: String,
#[cfg(feature = "rcl_stat")]
pub latency_take: Mutex<TimeStatistics<4096>>,
pub node: Arc<Node>,
}
#[cfg(feature = "rcl_stat")]
impl RCLSubscription {
fn measure_latency(&self, start: std::time::SystemTime) {
if let Ok(dur) = start.elapsed() {
let mut guard = self.latency_take.lock();
guard.add(dur);
}
}
}
impl Drop for RCLSubscription {
fn drop(&mut self) {
let (node, subscription) = (&mut self.node, &mut self.subscription);
let guard = rcl::MT_UNSAFE_FN.lock();
let _ = guard.rcl_subscription_fini(subscription.as_mut(), unsafe { node.as_ptr_mut() });
}
}
unsafe impl Sync for RCLSubscription {}
unsafe impl Send for RCLSubscription {}
pub struct Subscriber<T> {
pub(crate) subscription: Arc<RCLSubscription>,
_phantom: PhantomData<T>,
_unsync: PhantomUnsync,
}
impl<T: TypeSupport> Subscriber<T> {
pub(crate) fn new(
node: Arc<Node>,
topic_name: &str,
qos: Option<qos::Profile>,
) -> RCLResult<Self> {
let mut subscription = Box::new(rcl::MTSafeFn::rcl_get_zero_initialized_subscription());
let topic_name_c = CString::new(topic_name).unwrap_or_default();
let options = Options::new(&qos.unwrap_or_default());
{
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_subscription_init(
subscription.as_mut(),
node.as_ptr(),
T::type_support(),
topic_name_c.as_ptr(),
options.as_ptr(),
)?;
}
Ok(Subscriber {
subscription: Arc::new(RCLSubscription {
subscription,
node,
topic_name: topic_name.to_string(),
#[cfg(feature = "rcl_stat")]
latency_take: Mutex::new(TimeStatistics::new()),
}),
_phantom: Default::default(),
_unsync: Default::default(),
})
}
pub(crate) fn new_disable_loaned_message(
node: Arc<Node>,
topic_name: &str,
qos: Option<qos::Profile>,
) -> RCLResult<Self> {
let mut subscription = Box::new(rcl::MTSafeFn::rcl_get_zero_initialized_subscription());
let topic_name_c = CString::new(topic_name).unwrap_or_default();
let mut options = Options::new(&qos.unwrap_or_default());
options.disable_loaned_message();
{
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_subscription_init(
subscription.as_mut(),
node.as_ptr(),
T::type_support(),
topic_name_c.as_ptr(),
options.as_ptr(),
)?;
}
Ok(Subscriber {
subscription: Arc::new(RCLSubscription {
subscription,
node,
topic_name: topic_name.to_string(),
#[cfg(feature = "rcl_stat")]
latency_take: Mutex::new(TimeStatistics::new()),
}),
_phantom: Default::default(),
_unsync: Default::default(),
})
}
pub fn get_topic_name(&self) -> &str {
&self.subscription.topic_name
}
#[must_use]
pub fn try_recv(&self) -> RecvResult<TakenMsg<T>, ()> {
#[cfg(feature = "rcl_stat")]
let start = std::time::SystemTime::now();
let s = self.subscription.clone();
match take::<T>(&s) {
Ok(n) => {
#[cfg(feature = "rcl_stat")]
self.subscription.measure_latency(start);
RecvResult::Ok(n)
}
Err(RCLError::SubscriptionTakeFailed) => {
#[cfg(feature = "rcl_stat")]
self.subscription.measure_latency(start);
RecvResult::RetryLater(())
}
Err(e) => RecvResult::Err(e.into()),
}
}
pub async fn recv(&mut self) -> Result<TakenMsg<T>, DynError> {
AsyncReceiver {
subscription: &mut self.subscription,
is_waiting: false,
_phantom: Default::default(),
}
.await
}
#[cfg(feature = "rcl_stat")]
pub fn statistics(&self) -> SerializableTimeStat {
let guard = self.subscription.latency_take.lock();
guard.to_serializable()
}
}
#[pin_project(PinnedDrop)]
pub struct AsyncReceiver<'a, T> {
subscription: &'a mut Arc<RCLSubscription>,
is_waiting: bool,
_phantom: PhantomData<T>,
}
impl<'a, T> Future for AsyncReceiver<'a, T> {
type Output = Result<TakenMsg<T>, DynError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if is_halt() {
return Poll::Ready(Err(Signaled.into()));
}
let this = self.project();
let s = this.subscription.clone();
*this.is_waiting = false;
#[cfg(feature = "rcl_stat")]
let start = std::time::SystemTime::now();
match take::<T>(&s) {
Ok(value) => {
#[cfg(feature = "rcl_stat")]
subscription.measure_latency(start);
Poll::Ready(Ok(value))
} Err(RCLError::SubscriptionTakeFailed) => {
#[cfg(feature = "rcl_stat")]
subscription.measure_latency(start);
let mut guard = SELECTOR.lock();
let mut waker = Some(cx.waker().clone());
guard.send_command(
&this.subscription.node.context,
async_selector::Command::Subscription(
this.subscription.clone(),
Box::new(move || {
let w = waker.take();
w.unwrap().wake();
CallbackResult::Ok
}),
),
)?;
*this.is_waiting = true;
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())), }
}
}
#[pinned_drop]
impl<T> PinnedDrop for AsyncReceiver<'_, T> {
fn drop(self: Pin<&mut Self>) {
if self.is_waiting {
let mut guard = SELECTOR.lock();
let _ = guard.send_command(
&self.subscription.node.context,
async_selector::Command::RemoveSubscription(self.subscription.clone()),
);
}
}
}
struct Options {
options: rcl::rcl_subscription_options_t,
}
impl Options {
fn new(qos: &qos::Profile) -> Self {
let options = rcl::rcl_subscription_options_t {
qos: qos.into(),
allocator: get_allocator(),
rmw_subscription_options: rcl::MTSafeFn::rmw_get_default_subscription_options(),
#[cfg(any(feature = "iron", feature = "jazzy"))]
disable_loaned_message: false,
};
Options { options }
}
fn disable_loaned_message(&mut self) {
#[cfg(any(feature = "iron", feature = "jazzy"))]
{
self.options.disable_loaned_message = true;
}
}
pub(crate) fn as_ptr(&self) -> *const rcl::rcl_subscription_options_t {
&self.options
}
}
pub enum TakenMsg<T> {
Copied(T),
Loaned(SubscriberLoanedMessage<T>),
}
impl<T> TakenMsg<T> {
pub fn get_owned(self) -> Option<T> {
match self {
TakenMsg::Copied(inner) => Some(inner),
TakenMsg::Loaned(_) => None,
}
}
}
impl<T> std::ops::Deref for TakenMsg<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
TakenMsg::Copied(copied) => copied,
TakenMsg::Loaned(loaned) => loaned.get(),
}
}
}
impl<T> std::ops::DerefMut for TakenMsg<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
TakenMsg::Copied(copied) => copied,
TakenMsg::Loaned(loaned) => loaned.get(),
}
}
}
unsafe impl<T> Sync for TakenMsg<T> {}
unsafe impl<T> Send for TakenMsg<T> {}
fn take<T>(subscription: &Arc<RCLSubscription>) -> RCLResult<TakenMsg<T>> {
if rcl::MTSafeFn::rcl_subscription_can_loan_messages(subscription.subscription.as_ref()) {
take_loaned_message(subscription.clone()).map(TakenMsg::Loaned)
} else {
rcl_take(subscription.subscription.as_ref()).map(TakenMsg::Copied)
}
}
fn take_loaned_message<T>(
subscription: Arc<RCLSubscription>,
) -> RCLResult<SubscriberLoanedMessage<T>> {
let guard = rcl::MT_UNSAFE_FN.lock();
let message: *mut T = null_mut();
guard
.rcl_take_loaned_message(
subscription.subscription.as_ref(),
&message as *const _ as *mut _,
null_mut(),
null_mut(),
)
.map(|_| SubscriberLoanedMessage::new(subscription, message))
}
fn rcl_take<T>(subscription: &rcl::rcl_subscription_t) -> RCLResult<T> {
let guard = rcl::MT_UNSAFE_FN.lock();
let mut ros_message: T = unsafe { MaybeUninit::zeroed().assume_init() };
match guard.rcl_take(
subscription,
&mut ros_message as *mut _ as *mut c_void,
null_mut(),
null_mut(),
) {
Ok(_) => Ok(ros_message),
Err(e) => Err(e),
}
}