use std::ffi::c_void;
use std::marker::PhantomData;
use std::time::Duration;
use super::error::{check_entity, check_ret, Result};
use super::qos::duration_to_nanos;
use super::topic::Topic;
use zenrc_dds::{CdrSample, RawMessageBridge};
use zenrc_dds::{dds_entity_t, dds_instance_handle_t};
use super::common::{forward_cdr, write_cdr};
pub struct Publisher<T: RawMessageBridge> {
writer: dds_entity_t,
topic: Topic<T>,
_marker: PhantomData<T>,
}
impl<T: RawMessageBridge> Publisher<T> {
pub(crate) fn new(
writer: dds_entity_t,
topic: Topic<T>,
) -> Self {
Self {
writer,
topic,
_marker: PhantomData,
}
}
pub fn publish(&self, msg: T) -> Result<()> {
let raw = msg.to_raw();
check_ret(unsafe {
zenrc_dds::dds_write(self.writer, &raw as *const _ as *const c_void)
})
}
pub fn flush(&self) -> Result<()> {
check_ret(unsafe {
zenrc_dds::dds_write_flush(self.writer)
})
}
pub fn publish_with_duration(&self, msg: T, timestamp: Duration) -> Result<()> {
let raw = msg.to_raw();
check_ret(unsafe {
zenrc_dds::dds_write_ts(self.writer, &raw as *const _ as *const c_void, duration_to_nanos(timestamp))
})
}
pub fn entity(&self) -> dds_entity_t {
self.writer
}
pub fn topic_entity(&self) -> dds_entity_t {
self.topic.entity
}
pub fn write_cdr(&self, cdr: CdrSample) -> Result<()> {
write_cdr(self.writer, cdr)
}
pub fn forward_cdr(&self, cdr: CdrSample) -> Result<()> {
forward_cdr(self.writer, cdr)
}
pub fn get_publication_status(
&self,
) -> Result<zenrc_dds::dds_publication_matched_status_t> {
let mut status = unsafe { std::mem::zeroed() };
check_ret(unsafe {
zenrc_dds::dds_get_publication_matched_status(self.writer, &mut status)
})?;
Ok(status)
}
pub fn has_readers(&self) -> Result<bool> {
Ok(self.get_publication_status()?.current_count > 0)
}
pub fn get_subscriptions(&self) -> Result<Vec<dds_instance_handle_t>> {
const MAX: usize = 64;
let mut handles = vec![0u64; MAX];
let ret = unsafe {
zenrc_dds::dds_get_matched_subscriptions(self.writer, handles.as_mut_ptr(), MAX)
};
let n = check_entity(ret)? as usize;
handles.truncate(n);
Ok(handles)
}
}
impl<T: RawMessageBridge> Drop for Publisher<T> {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.writer) };
}
}
unsafe impl<T: RawMessageBridge> Send for Publisher<T> {}
unsafe impl<T: RawMessageBridge> Sync for Publisher<T> {}