use super::Header;
use crate::{
error::{DynError, RCLError, RCLResult},
get_allocator, is_halt,
msg::ServiceMsg,
node::Node,
qos::Profile,
rcl::{self, rmw_request_id_t},
selector::{
async_selector::{self, SELECTOR},
CallbackResult,
},
signal_handler::Signaled,
PhantomUnsync, RecvResult,
};
use pin_project::{pin_project, pinned_drop};
use std::{
ffi::CString, future::Future, marker::PhantomData, mem::MaybeUninit, os::raw::c_void, pin::Pin,
sync::Arc, task::Poll,
};
pub(crate) struct ServerData {
pub(crate) service: rcl::rcl_service_t,
pub(crate) node: Arc<Node>,
}
impl Drop for ServerData {
fn drop(&mut self) {
let guard = rcl::MT_UNSAFE_FN.lock();
let _ = guard.rcl_service_fini(&mut self.service, unsafe { self.node.as_ptr_mut() });
}
}
unsafe impl Sync for ServerData {}
unsafe impl Send for ServerData {}
#[must_use]
pub struct Server<T> {
pub(crate) data: Arc<ServerData>,
_phantom: PhantomData<T>,
_unsync: PhantomUnsync,
}
impl<T: ServiceMsg> Server<T> {
pub(crate) fn new(
node: Arc<Node>,
service_name: &str,
qos: Option<Profile>,
) -> RCLResult<Self> {
let mut service = rcl::MTSafeFn::rcl_get_zero_initialized_service();
let service_name = CString::new(service_name).unwrap_or_default();
let profile = qos.unwrap_or_else(Profile::services_default);
let options = rcl::rcl_service_options_t {
qos: (&profile).into(),
allocator: get_allocator(),
};
{
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_service_init(
&mut service,
node.as_ptr(),
<T as ServiceMsg>::type_support(),
service_name.as_ptr(),
&options,
)?;
}
Ok(Server {
data: Arc::new(ServerData { service, node }),
_phantom: Default::default(),
_unsync: Default::default(),
})
}
#[must_use]
pub fn try_recv(self) -> RecvResult<(ServerSend<T>, <T as ServiceMsg>::Request, Header), Self> {
let (request, header) =
match rcl_take_request_with_info::<<T as ServiceMsg>::Request>(&self.data.service) {
Ok(data) => data,
Err(RCLError::ServiceTakeFailed) => return RecvResult::RetryLater(self),
Err(e) => return RecvResult::Err(e.into()),
};
RecvResult::Ok((
ServerSend {
data: self.data,
request_id: header.request_id,
_phantom: Default::default(),
_unsync: Default::default(),
},
request,
Header { header },
))
}
pub async fn recv(
self,
) -> Result<(ServerSend<T>, <T as ServiceMsg>::Request, Header), DynError> {
AsyncReceiver {
server: self,
is_waiting: false,
}
.await
}
}
unsafe impl<T> Send for Server<T> {}
#[must_use]
pub struct ServerSend<T> {
data: Arc<ServerData>,
request_id: rmw_request_id_t,
_phantom: PhantomData<T>,
_unsync: PhantomUnsync,
}
impl<T: ServiceMsg> ServerSend<T> {
pub fn send(
mut self,
data: &<T as ServiceMsg>::Response,
) -> Result<Server<T>, (Self, RCLError)> {
if let Err(e) = rcl::MTSafeFn::rcl_send_response(
&self.data.service,
&mut self.request_id,
data as *const _ as *mut c_void,
) {
return Err((self, e));
}
Ok(Server {
data: self.data,
_phantom: Default::default(),
_unsync: Default::default(),
})
}
pub fn give_up(self) -> Server<T> {
Server {
data: self.data,
_phantom: Default::default(),
_unsync: Default::default(),
}
}
}
fn rcl_take_request_with_info<T>(
service: &rcl::rcl_service_t,
) -> RCLResult<(T, rcl::rmw_service_info_t)> {
let mut header: rcl::rmw_service_info_t = unsafe { MaybeUninit::zeroed().assume_init() };
let mut ros_request: T = unsafe { MaybeUninit::zeroed().assume_init() };
let guard = rcl::MT_UNSAFE_FN.lock();
guard.rcl_take_request_with_info(
service,
&mut header,
&mut ros_request as *mut _ as *mut c_void,
)?;
Ok((ros_request, header))
}
#[pin_project(PinnedDrop)]
#[must_use]
pub struct AsyncReceiver<T> {
server: Server<T>,
is_waiting: bool,
}
impl<T: ServiceMsg> Future for AsyncReceiver<T> {
type Output = Result<(ServerSend<T>, <T as ServiceMsg>::Request, Header), DynError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if is_halt() {
return Poll::Ready(Err(Signaled.into()));
}
let this = self.project();
*this.is_waiting = false;
match rcl_take_request_with_info::<<T as ServiceMsg>::Request>(&this.server.data.service) {
Ok((request, header)) => Poll::Ready(Ok((
ServerSend {
data: this.server.data.clone(),
request_id: header.request_id,
_phantom: Default::default(),
_unsync: Default::default(),
},
request,
Header { header },
))),
Err(RCLError::ServiceTakeFailed) => {
let mut waker = Some(cx.waker().clone());
let mut guard = SELECTOR.lock();
if let Err(e) = guard.send_command(
&this.server.data.node.context,
async_selector::Command::Server(
this.server.data.clone(),
Box::new(move || {
let w = waker.take().unwrap();
w.wake();
CallbackResult::Ok
}),
),
) {
return Poll::Ready(Err(e));
}
*this.is_waiting = true;
Poll::Pending
}
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
#[pinned_drop]
impl<T> PinnedDrop for AsyncReceiver<T> {
fn drop(self: Pin<&mut Self>) {
if self.is_waiting {
let mut guard = SELECTOR.lock();
let _ = guard.send_command(
&self.server.data.node.context,
async_selector::Command::RemoveServer(self.server.data.clone()),
);
}
}
}