use std::time::Duration;
use super::context::DomainParticipant;
use super::error::{check_entity, check_ret, DdsError, Result};
use super::publisher::Publisher;
use super::qos::duration_to_nanos;
use super::subscriber::Subscription;
use zenrc_dds::RawMessageBridge;
use zenrc_dds::{dds_attach_t, dds_entity_t};
pub type WaitResult = Vec<dds_attach_t>;
pub struct WaitSet {
entity: dds_entity_t,
}
impl WaitSet {
pub fn new(participant: &DomainParticipant) -> Result<Self> {
let entity = unsafe { zenrc_dds::dds_create_waitset(participant.entity()) };
let entity = check_entity(entity)?;
Ok(Self { entity })
}
pub fn attach_reader<T: RawMessageBridge>(
&self,
subscription: &Subscription<T>,
token: isize,
) -> Result<()> {
check_ret(unsafe {
zenrc_dds::dds_waitset_attach(self.entity, subscription.entity(), token)
})
}
pub fn attach_writer<T: RawMessageBridge>(
&self,
publisher: &Publisher<T>,
token: isize,
) -> Result<()> {
check_ret(unsafe {
zenrc_dds::dds_waitset_attach(self.entity, publisher.entity(), token)
})
}
pub fn attach_entity(&self, entity: dds_entity_t, token: isize) -> Result<()> {
check_ret(unsafe { zenrc_dds::dds_waitset_attach(self.entity, entity, token) })
}
pub fn detach_entity(&self, entity: dds_entity_t) -> Result<()> {
check_ret(unsafe { zenrc_dds::dds_waitset_detach(self.entity, entity) })
}
pub fn create_guard_condition(&self) -> Result<GuardCondition> {
let entity = unsafe { zenrc_dds::dds_create_guardcondition(self.entity) };
let entity = check_entity(entity)?;
Ok(GuardCondition { entity })
}
pub fn wait(&self, timeout: Duration) -> Result<WaitResult> {
self.wait_until_ns(duration_to_nanos(timeout))
}
pub fn wait_abs(&self, abs_timestamp_ns: i64) -> Result<WaitResult> {
const MAX_TRIGGERS: usize = 32;
let mut xs: Vec<dds_attach_t> = vec![0; MAX_TRIGGERS];
let n = unsafe {
zenrc_dds::dds_waitset_wait_until(
self.entity,
xs.as_mut_ptr(),
MAX_TRIGGERS,
abs_timestamp_ns,
)
};
self.handle_wait_result(n, xs)
}
pub fn trigger(&self) -> Result<()> {
check_ret(unsafe { zenrc_dds::dds_waitset_set_trigger(self.entity, true) })
}
pub fn attached_entities(&self) -> Result<Vec<dds_entity_t>> {
const MAX: usize = 64;
let mut buf = vec![0i32; MAX];
let n = unsafe {
zenrc_dds::dds_waitset_get_entities(self.entity, buf.as_mut_ptr(), MAX)
};
let n = check_entity(n)? as usize;
buf.truncate(n);
Ok(buf)
}
pub fn entity(&self) -> dds_entity_t {
self.entity
}
fn wait_until_ns(&self, timeout_ns: i64) -> Result<WaitResult> {
const MAX_TRIGGERS: usize = 32;
let mut xs: Vec<dds_attach_t> = vec![0; MAX_TRIGGERS];
let n = unsafe {
zenrc_dds::dds_waitset_wait(
self.entity,
xs.as_mut_ptr(),
MAX_TRIGGERS,
timeout_ns,
)
};
self.handle_wait_result(n, xs)
}
fn handle_wait_result(
&self,
n: zenrc_dds::dds_return_t,
mut xs: Vec<dds_attach_t>,
) -> Result<WaitResult> {
if n < 0 {
return Err(DdsError::RetCode(n, "dds_waitset_wait failed".into()));
}
xs.truncate(n as usize);
Ok(xs)
}
}
impl Drop for WaitSet {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.entity) };
}
}
unsafe impl Send for WaitSet {}
unsafe impl Sync for WaitSet {}
pub struct GuardCondition {
entity: dds_entity_t,
}
impl GuardCondition {
pub fn trigger(&self) -> Result<()> {
check_ret(unsafe { zenrc_dds::dds_set_guardcondition(self.entity, true) })
}
pub fn reset(&self) -> Result<()> {
check_ret(unsafe { zenrc_dds::dds_set_guardcondition(self.entity, false) })
}
pub fn is_triggered(&self) -> Result<bool> {
let mut triggered = false;
check_ret(unsafe { zenrc_dds::dds_read_guardcondition(self.entity, &mut triggered) })?;
Ok(triggered)
}
pub fn take_triggered(&self) -> Result<bool> {
let mut triggered = false;
check_ret(unsafe { zenrc_dds::dds_take_guardcondition(self.entity, &mut triggered) })?;
Ok(triggered)
}
pub fn entity(&self) -> dds_entity_t {
self.entity
}
}
impl Drop for GuardCondition {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.entity) };
}
}
unsafe impl Send for GuardCondition {}
unsafe impl Sync for GuardCondition {}