use std::ffi::c_void;
use std::sync::Arc;
use std::time::Duration;
use zenrc_dds::{
DDS_ANY_STATE, RawMessageBridge, Sample, dds_entity_t,
};
use super::common::take_one;
use super::error::{DdsError, Result, check_entity, check_ret};
use super::qos::duration_to_nanos;
use super::topic::Topic;
pub struct ServiceServer<Req: RawMessageBridge, Res: RawMessageBridge> {
reader: dds_entity_t,
writer: dds_entity_t,
_req_topic: Topic<Req>,
_res_topic: Topic<Res>,
notify: Option<Arc<tokio::sync::Notify>>,
}
impl<Req: RawMessageBridge, Res: RawMessageBridge> ServiceServer<Req, Res> {
pub(crate) fn new(
reader: dds_entity_t,
writer: dds_entity_t,
req_topic: Topic<Req>,
res_topic: Topic<Res>,
) -> Self {
Self {
reader,
writer,
_req_topic: req_topic,
_res_topic: res_topic,
notify: None,
}
}
pub(crate) fn with_context(
&mut self,
context: &super::context::DdsContext,
) {
let notify = Some(context.attach(self.reader));
self.notify = notify;
}
fn take_one_request(reader: dds_entity_t) -> Result<Option<Sample<Req>>> {
take_one(reader)
}
pub fn set_event<F>(&self, handler: F) -> Result<tokio::task::JoinHandle<()>>
where
F: Fn(Sample<Req>) -> Res + Send + Sync + 'static,
Req: Send + 'static,
Res: Send + 'static,
{
let notify = match &self.notify {
Some(n) => Arc::clone(n),
None => {
return Err(DdsError::NullPtr(
"ServiceServer 未附加到 DdsContext,无法设置事件回调".into(),
));
}
};
let reader = self.reader;
let writer = self.writer;
let handler = Arc::new(handler);
Ok(tokio::spawn(async move {
loop {
notify.notified().await;
let sample = match Self::take_one_request(reader) {
Ok(Some(sample)) => sample,
Ok(None) => continue,
Err(_) => continue,
};
let res = (handler)(sample);
let raw_res = res.to_raw();
if check_ret(unsafe {
zenrc_dds::dds_write(writer, &raw_res as *const _ as *const c_void)
})
.is_err()
{
break;
}
}
}))
}
}
impl<Req: RawMessageBridge, Res: RawMessageBridge> Drop for ServiceServer<Req, Res> {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.writer) };
unsafe { zenrc_dds::dds_delete(self.reader) };
}
}
unsafe impl<Req: RawMessageBridge, Res: RawMessageBridge> Send for ServiceServer<Req, Res> {}
unsafe impl<Req: RawMessageBridge, Res: RawMessageBridge> Sync for ServiceServer<Req, Res> {}
pub struct ServiceClient<Req: RawMessageBridge, Res: RawMessageBridge> {
writer: dds_entity_t,
reader: dds_entity_t,
participant: dds_entity_t,
_req_topic: Topic<Req>,
_res_topic: Topic<Res>,
}
impl<Req: RawMessageBridge, Res: RawMessageBridge> ServiceClient<Req, Res> {
pub(crate) fn new(
writer: dds_entity_t,
reader: dds_entity_t,
participant: dds_entity_t,
req_topic: Topic<Req>,
res_topic: Topic<Res>,
) -> Self {
Self {
writer,
reader,
participant,
_req_topic: req_topic,
_res_topic: res_topic,
}
}
pub fn call(&self, req: Req, timeout: Duration) -> Result<Option<Res>> {
let raw_req = req.to_raw();
check_ret(unsafe {
zenrc_dds::dds_write(self.writer, &raw_req as *const _ as *const c_void)
})?;
let ws = check_entity(unsafe { zenrc_dds::dds_create_waitset(self.participant) })?;
let cond = match check_entity(unsafe {
zenrc_dds::dds_create_readcondition(self.reader, DDS_ANY_STATE)
}) {
Ok(c) => c,
Err(e) => {
unsafe { zenrc_dds::dds_delete(ws) };
return Err(e);
}
};
if let Err(e) = check_ret(unsafe { zenrc_dds::dds_waitset_attach(ws, cond, 1) }) {
unsafe { zenrc_dds::dds_delete(cond) };
unsafe { zenrc_dds::dds_delete(ws) };
return Err(e);
}
let timeout_ns = duration_to_nanos(timeout);
let mut xs = [0isize; 4];
let n = unsafe {
zenrc_dds::dds_waitset_wait(ws, xs.as_mut_ptr(), xs.len(), timeout_ns)
};
unsafe { zenrc_dds::dds_waitset_detach(ws, cond) };
unsafe { zenrc_dds::dds_delete(cond) };
unsafe { zenrc_dds::dds_delete(ws) };
if n <= 0 {
return Ok(None);
}
Ok(take_one::<Res>(self.reader)?.map(|sample| sample.into_parts().0))
}
}
impl<Req: RawMessageBridge, Res: RawMessageBridge> Drop for ServiceClient<Req, Res> {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.writer) };
unsafe { zenrc_dds::dds_delete(self.reader) };
}
}
unsafe impl<Req: RawMessageBridge, Res: RawMessageBridge> Send for ServiceClient<Req, Res> {}
unsafe impl<Req: RawMessageBridge, Res: RawMessageBridge> Sync for ServiceClient<Req, Res> {}