use super::Header;
use crate::{
error::{DynError, RCLError, RCLResult},
get_allocator, is_halt,
msg::ServiceMsg,
node::Node,
qos::Profile,
rcl,
selector::{
async_selector::{self, SELECTOR},
CallbackResult, Selector,
},
signal_handler::Signaled,
PhantomUnsync, RecvResult, ST,
};
use pin_project::{pin_project, pinned_drop};
use std::{
ffi::CString, future::Future, marker::PhantomData, mem::MaybeUninit, os::raw::c_void, pin::Pin,
sync::Arc, task::Poll, time::Duration,
};
pub(crate) struct ClientData {
pub(crate) client: rcl::rcl_client_t,
pub(crate) node: Arc<Node>,
}
impl Drop for ClientData {
fn drop(&mut self) {
let guard = rcl::MT_UNSAFE_FN.lock();
let _ = guard.rcl_client_fini(&mut self.client, unsafe { self.node.as_ptr_mut() });
}
}
unsafe impl Sync for ClientData {}
unsafe impl Send for ClientData {}
pub struct Client<T> {
data: Arc<ClientData>,
_phantom: PhantomData<T>,
_unsync: PhantomUnsync,
}
impl<T: ServiceMsg> Client<T> {
pub(crate) fn new(
node: Arc<Node>,
service_name: &str,
qos: Option<Profile>,
) -> RCLResult<Self> {
let mut client = rcl::MTSafeFn::rcl_get_zero_initialized_client();
let service_name = CString::new(service_name).unwrap_or_default();
let profile = qos.unwrap_or_else(Profile::services_default);
let options = rcl::rcl_client_options_t {
qos: (&profile).into(),
allocator: get_allocator(),
};
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_client_init(
&mut client,
node.as_ptr(),
<T as ServiceMsg>::type_support(),
service_name.as_ptr(),
&options,
)?;
Ok(Client {
data: Arc::new(ClientData { client, node }),
_phantom: Default::default(),
_unsync: Default::default(),
})
}
pub fn send(self, data: &<T as ServiceMsg>::Request) -> RCLResult<ClientRecv<T>> {
let (s, _) = self.send_ret_seq(data)?;
Ok(s)
}
pub fn send_ret_seq(
self,
data: &<T as ServiceMsg>::Request,
) -> RCLResult<(ClientRecv<T>, i64)> {
let mut seq: i64 = 0;
rcl::MTSafeFn::rcl_send_request(
&self.data.client,
data as *const _ as *const c_void,
&mut seq,
)?;
Ok((
ClientRecv {
data: self.data,
seq,
_phantom: Default::default(),
_unsync: Default::default(),
},
seq,
))
}
}
#[derive(Clone)]
#[must_use]
pub struct ClientRecv<T> {
pub(crate) data: Arc<ClientData>,
seq: i64,
_phantom: PhantomData<T>,
_unsync: PhantomUnsync,
}
impl<T: ServiceMsg> ClientRecv<T> {
pub fn try_recv(self) -> RecvResult<(Client<T>, <T as ServiceMsg>::Response, Header), Self> {
let (response, header) = match rcl_take_response_with_info::<<T as ServiceMsg>::Response>(
&self.data.client,
self.seq,
) {
Ok(data) => data,
Err(RCLError::ClientTakeFailed) => return RecvResult::RetryLater(self),
Err(e) => return RecvResult::Err(e.into()),
};
if header.request_id.sequence_number != self.seq {
return RecvResult::RetryLater(self);
}
RecvResult::Ok((
Client {
data: self.data,
_phantom: Default::default(),
_unsync: Default::default(),
},
response,
Header { header },
))
}
pub fn recv(self) -> AsyncReceiver<T> {
AsyncReceiver {
client: self,
is_waiting: false,
}
}
pub fn recv_timeout(
self,
t: Duration,
selector: &mut Selector,
) -> RecvResult<(Client<T>, <T as ServiceMsg>::Response, Header), Self> {
let receiver = ST::new(self);
selector.add_client_recv(&receiver);
match selector.wait_timeout(t) {
Ok(true) => match receiver.try_recv() {
RecvResult::Ok((c, response, header)) => {
RecvResult::Ok((c, response, header))
}
RecvResult::RetryLater(receiver) => {
RecvResult::RetryLater(receiver.unwrap())
}
RecvResult::Err(e) => {
RecvResult::Err(e)
}
},
Ok(false) => {
RecvResult::RetryLater(receiver.unwrap())
}
Err(e) => {
RecvResult::Err(e)
}
}
}
pub fn give_up(self) -> Client<T> {
Client {
data: self.data,
_phantom: Default::default(),
_unsync: Default::default(),
}
}
}
fn rcl_take_response_with_info<T>(
client: &rcl::rcl_client_t,
seq: i64,
) -> RCLResult<(T, rcl::rmw_service_info_t)> {
let mut header: rcl::rmw_service_info_t = unsafe { MaybeUninit::zeroed().assume_init() };
let mut ros_response: T = unsafe { MaybeUninit::zeroed().assume_init() };
header.request_id.sequence_number = seq;
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_take_response_with_info(
client,
&mut header,
&mut ros_response as *mut _ as *mut c_void,
)?;
Ok((ros_response, header))
}
#[pin_project(PinnedDrop)]
#[must_use]
pub struct AsyncReceiver<T> {
client: ClientRecv<T>,
is_waiting: bool,
}
impl<T: ServiceMsg> AsyncReceiver<T> {
pub fn give_up(self) -> Client<T> {
Client {
data: self.client.data.clone(),
_phantom: Default::default(),
_unsync: Default::default(),
}
}
}
impl<T: ServiceMsg> Future for AsyncReceiver<T> {
type Output = Result<(Client<T>, <T as ServiceMsg>::Response, Header), DynError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if is_halt() {
return Poll::Ready(Err(Signaled.into()));
}
let this = self.project();
*this.is_waiting = false;
match rcl_take_response_with_info(&this.client.data.client, this.client.seq) {
Ok((val, header)) => {
if header.request_id.sequence_number == this.client.seq {
return Poll::Ready(Ok((
Client {
data: this.client.data.clone(),
_phantom: Default::default(),
_unsync: Default::default(),
},
val,
Header { header },
)));
}
}
Err(RCLError::ClientTakeFailed) => (),
Err(e) => return Poll::Ready(Err(e.into())),
}
let mut waker = Some(cx.waker().clone());
let mut guard = SELECTOR.lock();
if let Err(e) = guard.send_command(
&this.client.data.node.context,
async_selector::Command::Client(
this.client.data.clone(),
Box::new(move || {
let w = waker.take().unwrap();
w.wake();
CallbackResult::Ok
}),
),
) {
return Poll::Ready(Err(e));
}
*this.is_waiting = true;
Poll::Pending
}
}
#[pinned_drop]
impl<T> PinnedDrop for AsyncReceiver<T> {
fn drop(self: Pin<&mut Self>) {
if self.is_waiting {
let mut guard = SELECTOR.lock();
let _ = guard.send_command(
&self.client.data.node.context,
async_selector::Command::RemoveClient(self.client.data.clone()),
);
}
}
}