use std::boxed::Box;
use std::ffi::CString;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, MutexGuard};
use rosidl_runtime_rs::Message;
use crate::error::{RclReturnCode, ToResult};
use crate::{rcl_bindings::*, MessageCow, Node, RclrsError};
unsafe impl Send for rcl_service_t {}
pub struct ServiceHandle {
rcl_service_mtx: Mutex<rcl_service_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}
impl ServiceHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_service_t> {
self.rcl_service_mtx.lock().unwrap()
}
}
impl Drop for ServiceHandle {
fn drop(&mut self) {
let rcl_service = self.rcl_service_mtx.get_mut().unwrap();
let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap();
unsafe {
rcl_service_fini(rcl_service, rcl_node);
}
}
}
pub trait ServiceBase: Send + Sync {
fn handle(&self) -> &ServiceHandle;
fn execute(&self) -> Result<(), RclrsError>;
}
type ServiceCallback<Request, Response> =
Box<dyn Fn(&rmw_request_id_t, Request) -> Response + 'static + Send>;
pub struct Service<T>
where
T: rosidl_runtime_rs::Service,
{
pub(crate) handle: Arc<ServiceHandle>,
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
}
impl<T> Service<T>
where
T: rosidl_runtime_rs::Service,
{
pub(crate) fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let mut rcl_service = unsafe { rcl_get_zero_initialized_service() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
as *const rosidl_service_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
let service_options = unsafe { rcl_service_get_default_options() };
unsafe {
rcl_service_init(
&mut rcl_service as *mut _,
rcl_node as *mut _,
type_support,
topic_c_string.as_ptr(),
&service_options as *const _,
)
.ok()?;
}
let handle = Arc::new(ServiceHandle {
rcl_service_mtx: Mutex::new(rcl_service),
rcl_node_mtx: node.rcl_node_mtx.clone(),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});
Ok(Self {
handle,
callback: Mutex::new(Box::new(callback)),
})
}
pub fn take_request(&self) -> Result<(T::Request, rmw_request_id_t), RclrsError> {
let mut request_id_out = rmw_request_id_t {
writer_guid: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
sequence_number: 0,
};
type RmwMsg<T> =
<<T as rosidl_runtime_rs::Service>::Request as rosidl_runtime_rs::Message>::RmwMsg;
let mut request_out = RmwMsg::<T>::default();
let handle = &*self.handle.lock();
unsafe {
rcl_take_request(
handle,
&mut request_id_out,
&mut request_out as *mut RmwMsg<T> as *mut _,
)
}
.ok()?;
Ok((T::Request::from_rmw_message(request_out), request_id_out))
}
}
impl<T> ServiceBase for Service<T>
where
T: rosidl_runtime_rs::Service,
{
fn handle(&self) -> &ServiceHandle {
&self.handle
}
fn execute(&self) -> Result<(), RclrsError> {
let (req, mut req_id) = match self.take_request() {
Ok((req, req_id)) => (req, req_id),
Err(RclrsError::RclError {
code: RclReturnCode::ServiceTakeFailed,
..
}) => {
return Ok(());
}
Err(e) => return Err(e),
};
let res = (*self.callback.lock().unwrap())(&req_id, req);
let rmw_message = <T::Response as Message>::into_rmw_message(res.into_cow());
let handle = &*self.handle.lock();
unsafe {
rcl_send_response(
handle,
&mut req_id,
rmw_message.as_ref() as *const <T::Response as Message>::RmwMsg as *mut _,
)
}
.ok()
}
}